Update Orchestrator to use Kubernetes 83/46383/1
authorThomas Duval <thomas.duval@orange.com>
Sun, 29 Oct 2017 20:02:25 +0000 (21:02 +0100)
committerThomas Duval <thomas.duval@orange.com>
Sun, 29 Oct 2017 20:02:25 +0000 (21:02 +0100)
Change-Id: Ib1f1fb54544e4ac985ef2063ff8114d804e83d0e

moonv4/moon_orchestrator/Dockerfile
moonv4/moon_orchestrator/bootstrap.py [deleted file]
moonv4/moon_orchestrator/moon_orchestrator/api/containers.py
moonv4/moon_orchestrator/moon_orchestrator/api/generic.py
moonv4/moon_orchestrator/moon_orchestrator/drivers.py [new file with mode: 0644]
moonv4/moon_orchestrator/moon_orchestrator/http_server.py [new file with mode: 0644]
moonv4/moon_orchestrator/moon_orchestrator/messenger.py [deleted file]
moonv4/moon_orchestrator/moon_orchestrator/server.py
moonv4/moon_orchestrator/requirements.txt

index b68c130..70eef9a 100644 (file)
@@ -9,6 +9,7 @@ RUN pip3 install pip --upgrade
 ADD . /root
 WORKDIR /root/
 RUN pip3 install -r requirements.txt --upgrade
+RUN pip3 install /root/dist/* --upgrade
 RUN pip3 install . --upgrade
 
-CMD ["python3", "bootstrap.py"]
\ No newline at end of file
+CMD ["python3", "-m", "moon_orchestrator"]
\ No newline at end of file
diff --git a/moonv4/moon_orchestrator/bootstrap.py b/moonv4/moon_orchestrator/bootstrap.py
deleted file mode 100644 (file)
index dab78f2..0000000
+++ /dev/null
@@ -1,203 +0,0 @@
-import sys
-import time
-import requests
-import yaml
-import logging
-import json
-import base64
-import mysql.connector
-import re
-import subprocess
-import pika
-import pika.credentials
-import pika.exceptions
-
-logging.basicConfig(level=logging.INFO)
-log = logging.getLogger("moon.bootstrap")
-requests_log = logging.getLogger("requests.packages.urllib3")
-requests_log.setLevel(logging.WARNING)
-requests_log.propagate = True
-pika_log = logging.getLogger("pika")
-pika_log.setLevel(logging.ERROR)
-pika_log.propagate = True
-
-CONSUL_HOST = sys.argv[1] if len(sys.argv) > 1 else "consul"
-CONSUL_PORT = sys.argv[2] if len(sys.argv) > 2 else 8500
-HEADERS = {"content-type": "application/json"}
-
-
-def search_config_file():
-    data_config = None
-    for _file in (
-            "moon.conf",
-            "conf/moon.conf",
-            "../moon.conf",
-            "/etc/moon/moon.conf",
-    ):
-        try:
-            data_config = yaml.safe_load(open(_file))
-        except FileNotFoundError:
-            data_config = None
-            continue
-        else:
-            break
-    if not data_config:
-        raise Exception("Configuration file not found...")
-    return data_config
-
-
-def put(key, value):
-    url = "http://{host}:{port}/v1/kv/{key}".format(host=CONSUL_HOST, port=CONSUL_PORT, key=key)
-    log.info(url)
-    req = requests.put(
-        url,
-        headers=HEADERS,
-        json=value
-    )
-    if req.status_code != 200:
-        raise Exception("Error connecting to Consul ({}, {})".format(req.status_code, req.text))
-
-
-def get(key):
-    url = "http://{host}:{port}/v1/kv/{key}".format(host=CONSUL_HOST, port=CONSUL_PORT, key=key)
-    req = requests.get(url)
-    data = req.json()
-    for item in data:
-        log.info("{} {} -> {}".format(
-            req.status_code,
-            item["Key"],
-            json.loads(base64.b64decode(item["Value"]).decode("utf-8"))
-        ))
-        yield json.loads(base64.b64decode(item["Value"]).decode("utf-8"))
-
-
-def populate_consul(data_config):
-    while True:
-        try:
-            req = requests.get("http://{}:{}/ui".format(CONSUL_HOST, CONSUL_PORT))
-        except requests.exceptions.ConnectionError:
-            log.info("Waiting for Consul ({}:{})".format(CONSUL_HOST, CONSUL_PORT))
-            time.sleep(1)
-            continue
-        else:
-            break
-        # if req.status_code in (302, 200):
-        #     break
-        # log.info("Waiting for Consul ({}:{})".format(CONSUL_HOST, CONSUL_PORT))
-        # time.sleep(1)
-    log.info("Consul is up")
-
-    req = requests.get("http://{}:{}/v1/kv/database".format(CONSUL_HOST, CONSUL_PORT))
-    if req.status_code == 200:
-        log.info("Consul is already populated")
-        return
-
-    put("database", data_config["database"])
-    put("messenger", data_config["messenger"])
-    put("slave", data_config["slave"])
-    put("docker", data_config["docker"])
-    put("logging", data_config["logging"])
-    put("components_port_start", data_config["components"]["port_start"])
-
-    for _key, _value in data_config["components"].items():
-        if type(_value) is dict:
-            put("components/{}".format(_key), data_config["components"][_key])
-
-    for _key, _value in data_config["plugins"].items():
-        put("plugins/{}".format(_key), data_config["plugins"][_key])
-
-    for _key, _value in data_config["openstack"].items():
-        put("openstack/{}".format(_key), data_config["openstack"][_key])
-
-
-def wait_for_database():
-    log.info(get("database"))
-    for database in get("database"):
-        database_url = database['url']
-        match = re.search("(?P<proto>^[\\w+]+):\/\/(?P<user>\\w+):(?P<password>.+)@(?P<host>\\w+):*(?P<port>\\d*)",
-                          database_url)
-        config = match.groupdict()
-        while True:
-            try:
-                conn = mysql.connector.connect(
-                    host=config["host"],
-                    user=config["user"],
-                    password=config["password"],
-                    database="moon"
-                )
-                conn.close()
-            except mysql.connector.errors.InterfaceError:
-                log.info("Waiting for Database ({})".format(config["host"]))
-                time.sleep(1)
-                continue
-            else:
-                log.info("Database i up, populating it...")
-                output = subprocess.run(["moon_db_manager", "upgrade"])
-                if output.returncode != 0:
-                    raise Exception("Error populating the database!")
-                break
-
-
-def wait_for_message_queue():
-    for messenger in get("messenger"):
-        url = messenger['url']
-        match = re.search("(?P<proto>^[\\w+]+):\/\/(?P<user>\\w+):(?P<password>.+)@(?P<host>\\w+):?(?P<port>\\d*)/?(?P<virtual_host>\\w+)",
-                          url)
-        config = match.groupdict()
-        while True:
-            try:
-                connection = pika.BlockingConnection(
-                    pika.ConnectionParameters(
-                        host=config['host'],
-                        port=int(config['port']),
-                        virtual_host=config['virtual_host'],
-                        credentials=pika.credentials.PlainCredentials(
-                            config['user'],
-                            config['password']
-                        )
-                    )
-                )
-                connection.close()
-            except (
-                pika.exceptions.ProbableAuthenticationError,
-                pika.exceptions.ConnectionClosed,
-                ConnectionResetError,
-                pika.exceptions.IncompatibleProtocolError
-            ):
-                log.info("Waiting for MessageQueue ({})".format(config["host"]))
-                time.sleep(1)
-                continue
-            else:
-                log.info("MessageQueue is up")
-                break
-
-
-def wait_for_keystone():
-    # TODO: Keystone answers request too quickly
-    #       even if it is not fully loaded
-    #       we must test if a token retrieval is possible or not
-    #       to see if Keystone is truly up and running
-    for config in get("openstack/keystone"):
-        while True:
-            try:
-                req = requests.get(config["url"])
-            except requests.exceptions.ConnectionError:
-                log.info("Waiting for Keystone ({})".format(config["url"]))
-                time.sleep(1)
-                continue
-            else:
-                log.info("Keystone is up")
-                break
-
-
-def main():
-    data_config = search_config_file()
-    populate_consul(data_config)
-    wait_for_database()
-    wait_for_message_queue()
-    wait_for_keystone()
-    import moon_orchestrator.server
-    moon_orchestrator.server.main()
-
-main()
-
index 23acea5..cb36541 100644 (file)
@@ -8,7 +8,7 @@ from oslo_config import cfg
 from oslo_log import log as logging
 # from moon_db.core import IntraExtensionRootManager
 # from moon_db.core import ConfigurationManager
-from moon_utilities.security_functions import call
+from moon_utilities.security_functions import call
 
 LOG = logging.getLogger("moon.orchestrator.api.containers")
 CONF = cfg.CONF
@@ -24,9 +24,9 @@ class Containers(object):
     def __init__(self, docker_manager):
         self.docker_manager = docker_manager
         self.components = dict()
-        for pdp_key, pdp_value in call("moon_manager", method="get_pdp",
-                                       ctx={"user_id": "admin", "id": None})["pdps"].items():
-            self.add_container(ctx={"id": pdp_key, "pipeline": pdp_value["security_pipeline"]})
+        for pdp_key, pdp_value in call("moon_manager", method="get_pdp",
+                                       ctx={"user_id": "admin", "id": None})["pdps"].items():
+            self.add_container(ctx={"id": pdp_key, "pipeline": pdp_value["security_pipeline"]})
 
     def get_container(self, ctx, args=None):
         """Get containers linked to an intra-extension
index cadd98d..fd43b1c 100644 (file)
 # This software is distributed under the terms and conditions of the 'Apache-2.0'
 # license which can be found in the file 'LICENSE' in this package distribution
 # or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+"""
+Those API are helping API used to manage the Moon platform.
+"""
 
+from flask_restful import Resource, request
+import logging
+import moon_orchestrator.api
+from moon_utilities.security_functions import check_auth
 
-class Status(object):
+__version__ = "0.1.0"
+
+LOG = logging.getLogger("moon.orchestrator.api." + __name__)
+
+
+class Status(Resource):
     """
-    Retrieve the current status of all components.
+    Endpoint for status requests
     """
 
-    __version__ = "0.1.0"
+    __urls__ = ("/status", "/status/", "/status/<string:component_id>")
 
-    def get_status(self, ctx, args):
-        """Retrieve the current status of all components."""
-        return {"status": "Running"}
+    def get(self, component_id=None):
+        """Retrieve status of all components
 
+        :return: {
+          "orchestrator": {
+            "status": "Running"
+          },
+          "security_router": {
+            "status": "Running"
+          }
+        }
+        """
+        raise NotImplemented
 
-class Logs(object):
+
+class Logs(Resource):
     """
-    Retrieve the current status of all components.
+    Endpoint for logs requests
     """
 
-    __version__ = "0.1.0"
+    __urls__ = ("/logs", "/logs/", "/logs/<string:component_id>")
+
+    def get(self, component_id=None):
+        """Get logs from the Moon platform
+
+        :param component_id: the ID of the component your are looking for (optional)
+        :return: [
+            "2015-04-15-13:45:20
+            "2015-04-15-13:45:21
+            "2015-04-15-13:45:22
+            "2015-04-15-13:45:23
+        ]
+        """
+        filter_str = request.args.get('filter', '')
+        from_str = request.args.get('from', '')
+        to_str = request.args.get('to', '')
+        event_number = request.args.get('event_number', '')
+        try:
+            event_number = int(event_number)
+        except ValueError:
+            event_number = None
+        args = dict()
+        args["filter"] = filter_str
+        args["from"] = from_str
+        args["to"] = to_str
+        args["event_number"] = event_number
+
+        raise NotImplemented
+
+
+class API(Resource):
+    """
+    Endpoint for API requests
+    """
 
-    def get_logs(self, ctx, args):
-        return {"error": "NotImplemented", "ctx": ctx, "args": args}
+    __urls__ = (
+        "/api",
+        "/api/",
+        "/api/<string:group_id>",
+        "/api/<string:group_id>/",
+        "/api/<string:group_id>/<string:endpoint_id>")
 
+    @check_auth
+    def get(self, group_id="", endpoint_id="", user_id=""):
+        """Retrieve all API endpoints or a specific endpoint if endpoint_id is given
 
+        :param group_id: the name of one existing group (ie generic, ...)
+        :param endpoint_id: the name of one existing component (ie Logs, Status, ...)
+        :return: {
+            "group_name": {
+                "endpoint_name": {
+                    "description": "a description",
+                    "methods": {
+                        "get": "description of the HTTP method"
+                    },
+                    "urls": ('/api', '/api/', '/api/<string:endpoint_id>')
+                }
+        }
+        """
+        __methods = ("get", "post", "put", "delete", "options", "patch")
+        api_list = filter(lambda x: "__" not in x, dir(moon_orchestrator.api))
+        api_desc = dict()
+        for api_name in api_list:
+            api_desc[api_name] = {}
+            group_api_obj = eval("moon_interface.api.{}".format(api_name))
+            api_desc[api_name]["description"] = group_api_obj.__doc__
+            if "__version__" in dir(group_api_obj):
+                api_desc[api_name]["version"] = group_api_obj.__version__
+            object_list = list(filter(lambda x: "__" not in x, dir(group_api_obj)))
+            for obj in map(lambda x: eval("moon_interface.api.{}.{}".format(api_name, x)), object_list):
+                if "__urls__" in dir(obj):
+                    api_desc[api_name][obj.__name__] = dict()
+                    api_desc[api_name][obj.__name__]["urls"] = obj.__urls__
+                    api_desc[api_name][obj.__name__]["methods"] = dict()
+                    for _method in filter(lambda x: x in __methods, dir(obj)):
+                        docstring = eval("moon_interface.api.{}.{}.{}.__doc__".format(api_name, obj.__name__, _method))
+                        api_desc[api_name][obj.__name__]["methods"][_method] = docstring
+                    api_desc[api_name][obj.__name__]["description"] = str(obj.__doc__)
+        if group_id in api_desc:
+            if endpoint_id in api_desc[group_id]:
+                return {group_id: {endpoint_id: api_desc[group_id][endpoint_id]}}
+            elif len(endpoint_id) > 0:
+                LOG.error("Unknown endpoint_id {}".format(endpoint_id))
+                return {"error": "Unknown endpoint_id {}".format(endpoint_id)}
+            return {group_id: api_desc[group_id]}
+        return api_desc
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/drivers.py b/moonv4/moon_orchestrator/moon_orchestrator/drivers.py
new file mode 100644 (file)
index 0000000..970914a
--- /dev/null
@@ -0,0 +1,189 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from kubernetes import client, config
+import logging
+import urllib3.exceptions
+import time
+from moon_utilities import configuration
+
+logger = logging.getLogger("moon.orchestrator.drivers")
+
+
+def get_driver():
+    try:
+        driver = K8S()
+    except urllib3.exceptions.MaxRetryError as e:
+        logger.exception(e)
+        return Docker()
+    else:
+        return K8S()
+
+
+class Driver:
+
+    def __init__(self):
+        self.cache = {}
+        # example of cache:
+        # {
+        #     "uuid_of_pod": {
+        #         "ip": "",
+        #         "hostname": "",
+        #         "port": 30001,
+        #         "pdp": "",
+        #         "keystone_project_id": "",
+        #         "plugin_name": "",
+        #         "namespace": ""
+        #     }
+        # }
+
+    def get_pods(self, namespace=None):
+        raise NotImplementedError
+
+    def load_pod(self, data, api_client=None, ext_client=None):
+        raise NotImplementedError
+
+    def delete_pod(self, uuid=None, name=None):
+        raise NotImplementedError
+
+    def get_slaves(self):
+        raise NotImplementedError
+
+
+class K8S(Driver):
+
+    def __init__(self):
+        super(K8S, self).__init__()
+        config.load_kube_config()
+        self.client = client.CoreV1Api()
+
+    def get_pods(self, name=None):
+        # pods = self.client.list_pod_for_all_namespaces(watch=False)
+        # if not namespace:
+        #     return pods
+        # # TODO: get pods with specific namespace
+        # for pod in pods:
+        #     logger.info("%s\t%s\t%s" % (pod.status.pod_ip,
+        #                                 pod.metadata.namespace,
+        #                                 pod.metadata.name))
+        # return pods
+        if name:
+            pods = self.client.list_pod_for_all_namespaces(watch=False)
+            for pod in pods:
+                if pod.metadata.name == name:
+                    return pod
+            else:
+                return None
+        return self.cache
+
+    def __create_pod(self, client, data):
+        pod_manifest = {
+            'apiVersion': 'extensions/v1beta1',
+            'kind': 'Deployment',
+            'metadata': {
+                'name': data[0].get('name')
+            },
+            'spec': {
+                'replicas': 1,
+                'template': {
+                    'metadata': {'labels': {'app': data[0].get('name')}},
+                    # 'hostname': data.get('name'),
+                    'spec': {
+                        'containers': []
+                    }
+                },
+            }
+        }
+        for _data in data:
+            pod_manifest['spec']['template']['spec']['containers'].append(
+                {
+                    'image': _data.get('container', "busybox"),
+                    'name': _data.get('name'),
+                    'ports': [
+                        {"containerPort": _data.get('port', 80)},
+                    ],
+                    'env': [
+                        {'name': "UUID", "value": _data.get('name', "None")},
+                        {'name': "TYPE", "value": _data.get('genre', "None")},
+                        {'name': "PORT", "value": str(_data.get('port', 80))},
+                        {'name': "PDP_ID", "value": _data.get('pdp_id', "None")},
+                        {'name': "META_RULE_ID", "value": "None"},
+                        {'name': "KEYSTONE_PROJECT_ID",
+                         "value": _data.get('keystone_project_id', "None")},
+                    ]
+                }
+            )
+        resp = client.create_namespaced_deployment(body=pod_manifest,
+                                                   namespace='moon')
+        logger.info("Pod {} created!".format(data[0].get('name')))
+        return resp
+
+    def __create_service(self, client, data, expose=False):
+        service_manifest = {
+            'apiVersion': 'v1',
+            'kind': 'Service',
+            'metadata': {
+                'name': data.get('name'),
+                'namespace': 'moon'
+            },
+            'spec': {
+                'ports': [{
+                    'port': data.get('port', 80),
+                    'targetPort': data.get('port', 80)
+                }],
+                'selector': {
+                    'app': data.get('name')
+                },
+                'type': 'NodePort',
+                'endpoints': [{
+                    'port': data.get('port', 80),
+                    'protocol': 'TCP',
+                }],
+            }
+        }
+        if expose:
+            service_manifest['spec']['ports'][0]['nodePort'] = \
+                configuration.increment_port()
+            service_manifest['spec']['type'] = "NodePort"
+        resp = client.create_namespaced_service(namespace="moon",
+                                                body=service_manifest)
+        logger.info("Service {} created!".format(data.get('name')))
+        return resp
+
+    def load_pod(self, data, api_client=None, ext_client=None, expose=False):
+        _client = api_client if api_client else self.client
+        logger.info("Creating pod/service {}".format(data[0].get('name')))
+        logger.info("Creating pod/service {}".format(data))
+        pod = self.__create_pod(client=ext_client, data=data)
+        service = self.__create_service(client=_client, data=data[0],
+                                        expose=expose)
+        # logger.info("data={}".format(data))
+        # logger.info("service={}".format(service))
+        self.cache[pod.metadata.uid] = data
+        #     {
+        #     "ip": "",
+        #     "hostname": pod.metadata.name,
+        #     "port": service.spec.ports[0].node_port,
+        #     "pdp": "",
+        #     "keystone_project_id": data[0].get('keystone_project_id'),
+        #     "plugin_names": [d.get('genre') for d in data],
+        #     "namespace": "moon"
+        # }
+
+    def delete_pod(self, uuid=None, name=None):
+        logger.info("Deleting pod {}".format(uuid))
+
+    def get_slaves(self):
+        contexts, active_context = config.list_kube_config_contexts()
+        return contexts, active_context
+
+
+class Docker(Driver):
+
+    def load_pod(self, data, api_client=None, ext_client=None):
+        logger.info("Creating pod {}".format(data[0].get('name')))
+
+    def delete_pod(self, uuid=None, name=None):
+        logger.info("Deleting pod {}".format(uuid))
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/http_server.py b/moonv4/moon_orchestrator/moon_orchestrator/http_server.py
new file mode 100644 (file)
index 0000000..a0738f4
--- /dev/null
@@ -0,0 +1,262 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from flask import Flask, jsonify
+from flask_cors import CORS, cross_origin
+from flask_restful import Resource, Api
+import logging
+from kubernetes import client, config
+import random
+import requests
+from moon_orchestrator import __version__
+from moon_orchestrator.api.pods import Pods
+from moon_orchestrator.api.generic import Logs, Status
+from moon_utilities import configuration, exceptions
+from moon_utilities.misc import get_random_name
+from moon_orchestrator.drivers import get_driver
+
+logger = logging.getLogger("moon.orchestrator.http")
+
+
+class Server:
+    """Base class for HTTP server"""
+
+    def __init__(self, host="localhost", port=80, api=None, **kwargs):
+        """Run a server
+
+        :param host: hostname of the server
+        :param port: port for the running server
+        :param kwargs: optional parameters
+        :return: a running server
+        """
+        self._host = host
+        self._port = port
+        self._api = api
+        self._extra = kwargs
+
+    @property
+    def host(self):
+        return self._host
+
+    @host.setter
+    def host(self, name):
+        self._host = name
+
+    @host.deleter
+    def host(self):
+        self._host = ""
+
+    @property
+    def port(self):
+        return self._port
+
+    @port.setter
+    def port(self, number):
+        self._port = number
+
+    @port.deleter
+    def port(self):
+        self._port = 80
+
+    def run(self):
+        raise NotImplementedError()
+
+__API__ = (
+    Status, Logs
+ )
+
+
+class Root(Resource):
+    """
+    The root of the web service
+    """
+    __urls__ = ("/", )
+    __methods = ("get", "post", "put", "delete", "options")
+
+    def get(self):
+        tree = {"/": {"methods": ("get",), "description": "List all methods for that service."}}
+        for item in __API__:
+            tree[item.__name__] = {"urls": item.__urls__}
+            _methods = []
+            for _method in self.__methods:
+                if _method in dir(item):
+                    _methods.append(_method)
+            tree[item.__name__]["methods"] = _methods
+            tree[item.__name__]["description"] = item.__doc__.strip()
+        return {
+            "version": __version__,
+            "tree": tree
+        }
+
+
+class HTTPServer(Server):
+
+    def __init__(self, host="localhost", port=80, **kwargs):
+        super(HTTPServer, self).__init__(host=host, port=port, **kwargs)
+        self.app = Flask(__name__)
+        conf = configuration.get_configuration("components/orchestrator")
+        self.orchestrator_hostname = conf["components/orchestrator"].get("hostname", "orchestrator")
+        self.orchestrator_port = conf["components/orchestrator"].get("port", 80)
+        conf = configuration.get_configuration("components/manager")
+        self.manager_hostname = conf["components/manager"].get("hostname", "manager")
+        self.manager_port = conf["components/manager"].get("port", 80)
+        # TODO : specify only few urls instead of *
+        # CORS(self.app)
+        self.api = Api(self.app)
+        self.driver = get_driver()
+        logger.info("Driver = {}".format(self.driver.__class__))
+        self.__set_route()
+        self.__hook_errors()
+        self.create_wrappers()
+        pdp = requests.get("http://{}:{}/pdp".format(self.manager_hostname,
+                                                     self.manager_port))
+        logger.info("pdp={}".format(pdp))
+        for _pdp_key, _pdp_value in pdp.json()['pdps'].items():
+            if _pdp_value.get('keystone_project_id'):
+                # TODO: select context to add security function
+                self.create_security_function(
+                    keystone_project_id=_pdp_value.get('keystone_project_id'),
+                    pdp_id=_pdp_key,
+                    policy_ids=_pdp_value.get('security_pipeline', []))
+
+    def __hook_errors(self):
+
+        def get_404_json(e):
+            return jsonify({"result": False, "code": 404, "description": str(e)}), 404
+        self.app.register_error_handler(404, get_404_json)
+
+        def get_400_json(e):
+            return jsonify({"result": False, "code": 400, "description": str(e)}), 400
+        self.app.register_error_handler(400, lambda e: get_400_json)
+        self.app.register_error_handler(403, exceptions.AuthException)
+
+    def __set_route(self):
+        self.api.add_resource(Root, '/')
+
+        for api in __API__:
+            self.api.add_resource(api, *api.__urls__)
+        self.api.add_resource(Pods, *Pods.__urls__,
+                              resource_class_kwargs={
+                                  "driver": self.driver,
+                              })
+
+    def run(self):
+        self.app.run(host=self._host, port=self._port)  # nosec
+
+    @staticmethod
+    def __filter_str(data):
+        return data.replace("@", "-")
+
+    def create_wrappers(self):
+        contexts, active_context = self.driver.get_slaves()
+        logger.info("contexts: {}".format(contexts))
+        logger.info("active_context: {}".format(active_context))
+        conf = configuration.get_configuration("components/wrapper")
+        hostname = conf["components/wrapper"].get(
+            "hostname", "wrapper")
+        port = conf["components/wrapper"].get("port", 80)
+        container = conf["components/wrapper"].get(
+            "container",
+            "wukongsun/moon_wrapper:v4.3")
+        for _ctx in contexts:
+            _config = config.new_client_from_config(context=_ctx['name'])
+            logger.info("_config={}".format(_config))
+            api_client = client.CoreV1Api(_config)
+            ext_client = client.ExtensionsV1beta1Api(_config)
+            # TODO: get data from consul
+            data = [{
+                "name": hostname + "-" + get_random_name(),
+                "container": container,
+                "port": port,
+                "namespace": "moon"
+            }, ]
+            pod = self.driver.load_pod(data, api_client, ext_client, expose=True)
+            logger.info('wrapper pod={}'.format(pod))
+
+    def create_security_function(self, keystone_project_id,
+                                 pdp_id, policy_ids, active_context=None,
+                                 active_context_name=None):
+        """ Create security functions
+
+        :param policy_id: the policy ID mapped to this security function
+        :param active_context: if present, add the security function in this
+                               context
+        :param active_context_name: if present,  add the security function in
+                                    this context name
+        if active_context_name and active_context are not present, add the
+        security function in all context (ie, in all slaves)
+        :return: None
+        """
+        logger.info(self.driver.get_pods())
+        for key, value in self.driver.get_pods().items():
+            for _pod in value:
+                if _pod.get('keystone_project_id') == keystone_project_id:
+                    logger.warning("A pod for this Keystone project {} "
+                                   "already exists.".format(keystone_project_id))
+                    return
+        plugins = configuration.get_plugins()
+        conf = configuration.get_configuration("components/interface")
+        i_hostname = conf["components/interface"].get("hostname", "interface")
+        i_port = conf["components/interface"].get("port", 80)
+        i_container = conf["components/interface"].get(
+            "container",
+            "wukongsun/moon_interface:v4.3")
+        data = [
+            {
+                "name": i_hostname + "-" + get_random_name(),
+                "container": i_container,
+                "port": i_port,
+                'pdp_id': pdp_id,
+                'genre': "interface",
+                'keystone_project_id': keystone_project_id,
+                "namespace": "moon"
+            },
+        ]
+        policies = requests.get("http://{}:{}/policies".format(
+            self.manager_hostname, self.manager_port)).json().get(
+            "policies", dict())
+        models = requests.get("http://{}:{}/models".format(
+            self.manager_hostname, self.manager_port)).json().get(
+            "models", dict())
+
+        for policy_id in policy_ids:
+            if policy_id in policies:
+                genre = policies[policy_id].get("genre", "authz")
+                if genre in plugins:
+                    for meta_rule in models[policies[policy_id]['model_id']]['meta_rules']:
+                        data.append({
+                            "name": genre + "-" + get_random_name(),
+                            "container": plugins[genre]['container'],
+                            'pdp_id': pdp_id,
+                            "port": plugins[genre].get('port', 8080),
+                            'genre': genre,
+                            'policy_id': policy_id,
+                            'meta_rule_id': meta_rule,
+                            'keystone_project_id': keystone_project_id,
+                            "namespace": "moon"
+                        })
+        contexts, _active_context = self.driver.get_slaves()
+        if active_context_name:
+            for _context in contexts:
+                if _context["name"] == active_context_name:
+                    active_context = _context
+                    break
+        if active_context:
+            active_context = _active_context
+            _config = config.new_client_from_config(
+                context=active_context['name'])
+            logger.info("_config={}".format(_config))
+            api_client = client.CoreV1Api(_config)
+            ext_client = client.ExtensionsV1beta1Api(_config)
+            self.driver.load_pod(data, api_client, ext_client)
+            return
+        for _ctx in contexts:
+            _config = config.new_client_from_config(context=_ctx['name'])
+            logger.info("_config={}".format(_config))
+            api_client = client.CoreV1Api(_config)
+            ext_client = client.ExtensionsV1beta1Api(_config)
+            self.driver.load_pod(data, api_client, ext_client)
+
+
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/messenger.py b/moonv4/moon_orchestrator/moon_orchestrator/messenger.py
deleted file mode 100644 (file)
index 2b7b386..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
-# This software is distributed under the terms and conditions of the 'Apache-2.0'
-# license which can be found in the file 'LICENSE' in this package distribution
-# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-import oslo_messaging
-from oslo_log import log as logging
-import time
-from moon_utilities.api import APIList
-from moon_utilities import configuration
-
-from oslo_config import cfg
-from moon_orchestrator.api.generic import Status, Logs
-from moon_orchestrator.api.containers import Containers
-from moon_orchestrator.api.slaves import Slaves
-
-TOPIC = "orchestrator"
-LOG = logging.getLogger("moon.orchestrator.messenger")
-CONF = cfg.CONF
-
-
-class Server:
-
-    def __init__(self, containers, docker_manager, slaves):
-        cfg.CONF.transport_url = self.__get_transport_url()
-        self.CONTAINERS = containers
-        self.transport = oslo_messaging.get_transport(cfg.CONF)
-        self.target = oslo_messaging.Target(topic=TOPIC, server='server1')
-        LOG.info("Starting MQ server with topic: {}".format(TOPIC))
-        self.docker_manager = docker_manager
-        for _container in containers:
-            Status._container = containers[_container]
-        self.endpoints = [
-            APIList((Status, Logs, Containers)),
-            Status(),
-            Logs(),
-            Containers(self.docker_manager),
-            Slaves(slaves)
-        ]
-        self.server = oslo_messaging.get_rpc_server(self.transport, self.target, self.endpoints,
-                                                    executor='threading',
-                                                    access_policy=oslo_messaging.DefaultRPCAccessPolicy)
-
-    @staticmethod
-    def __get_transport_url():
-        messenger = configuration.get_configuration(configuration.MESSENGER)["messenger"]
-        return messenger['url']
-
-    def run(self):
-        try:
-            self.server.start()
-            while True:
-                time.sleep(1)
-        except KeyboardInterrupt:
-            LOG.warning("Stopping server by crtl+c (please be patient, closing connections...)")
-        except SystemExit:
-            LOG.warning("Stopping server (please be patient, closing connections...)")
-        except Exception as e:
-            LOG.error("Exception occurred: {}".format(e))
-
-        self.server.stop()
-        self.server.wait()
-
index 8fd2740..d8e312d 100644 (file)
 # license which can be found in the file 'LICENSE' in this package distribution
 # or at 'http://www.apache.org/licenses/LICENSE-2.0'.
 
-import sys
 import os
-import hashlib
-from oslo_log import log as logging
-from docker import Client
-import docker.errors as docker_errors
+import logging
 from moon_utilities import configuration, exceptions
-from moon_orchestrator import messenger
-
+from moon_orchestrator.http_server import HTTPServer
 
 LOG = logging.getLogger("moon.orchestrator")
+DOMAIN = "moon_orchestrator"
 
-CONTAINERS = {}
-SLAVES = {}
-docker_conf = configuration.get_configuration("docker")['docker']
-docker = Client(base_url=docker_conf['url'])
-LOG.info("docker_url={}".format(docker_conf['url']))
-docker_network = docker_conf['network']
-
-
-def kill_handler(signum, frame):
-    _exit(0)
-
-
-class DockerManager:
-
-    def load(self, component, uuid=None, container_data=None):
-        """Load a new docker mapping the component given
-
-        :param component: the name of the component (policy or function)
-        :param uuid: the uuid of the intra_extension linked to that component
-        :return: the created component
-        """
-        component_id = component+"_"+hashlib.sha224(uuid.encode("utf-8")).hexdigest()
-        plugins = configuration.get_plugins()
-        if component in plugins.keys():
-            components = configuration.get_components()
-            configuration.add_component(
-                name=component_id,
-                uuid=component_id,
-                port=configuration.increment_port(),
-                bind="0.0.0.0",
-                extra=container_data,
-                container=plugins[component]['container']
-            )
-            # _command = plugins[component]['command']
-            # try:
-            #     _index = _command.index("<UUID>")
-            #     _command[_index] = component_id
-            # except ValueError:
-            #     pass
-            self.run(component_id, environment={"UUID": component_id})
-            CONTAINERS[component_id] = components.get(component_id)
-            CONTAINERS[component_id]["running"] = True
-            return CONTAINERS[component_id]
-
-    def load_all_containers(self):
-        LOG.info("Try to load all containers...")
-        current_containers = [item["Names"][0] for item in docker.containers()]
-        components = configuration.get_components()
-        containers_not_running = []
-        for c_name in (
-                '/keystone',
-                '/consul',
-                '/db',
-                '/messenger'
-        ):
-            if c_name not in current_containers:
-                containers_not_running.append(c_name)
-        if containers_not_running:
-            raise exceptions.ContainerMissing(
-                "Following containers are missing: {}".format(", ".join(containers_not_running)))
-        for c_name in (
-           '/interface',
-           '/manager',
-           '/router'):
-            if c_name not in current_containers:
-                LOG.info("Starting container {}...".format(c_name))
-                self.run(c_name.strip("/"))
-            else:
-                LOG.info("Container {} already running...".format(c_name))
-            CONTAINERS[c_name] = components.get(c_name.strip("/"))
-            CONTAINERS[c_name]["running"] = True
-
-    def run(self, name, environment=None):
-        components = configuration.get_components()
-        if name in components:
-            image = components[name]['container']
-            params = {
-                'image': image,
-                'name': name,
-                'hostname': name,
-                'detach': True,
-                'host_config': docker.create_host_config(network_mode=docker_network)
-            }
-            if 'port' in components[name] and components[name]['port']:
-                params["ports"] = [components[name]['port'], ]
-                params["host_config"] = docker.create_host_config(
-                    network_mode=docker_network,
-                    port_bindings={components[name]['port']: components[name]['port']}
-                )
-            if environment:
-                params["environment"] = environment
-            container = docker.create_container(**params)
-            docker.start(container=container.get('Id'))
-
-    @staticmethod
-    def get_component(uuid=None):
-        if uuid:
-            return CONTAINERS.get(uuid, None)
-        return CONTAINERS
-
-    @staticmethod
-    def kill(component_id, delete=True):
-        LOG.info("Killing container {}".format(component_id))
-        docker.kill(container=component_id)
-        if delete:
-            docker.remove_container(container=component_id)
-
+__CWD__ = os.path.dirname(os.path.abspath(__file__))
 
-def _exit(exit_number=0, error=None):
-    for _container in CONTAINERS:
-        LOG.warning("Deleting containers named {}...".format(_container))
-        # print(40 * "-" + _container)
-        try:
-            # print(docker.logs(container=_container).decode("utf-8"))
-            docker.kill(container=_container)
-        except docker_errors.NotFound:
-            LOG.error("The container {} was not found".format(_container))
-        except docker_errors.APIError as e:
-            LOG.error(e)
-        else:
-            docker.remove_container(container=_container)
-    LOG.info("Moon orchestrator: offline")
-
-    # TODO (asteroide): put in the debug log
-    if error:
-        LOG.info(str(error))
-    sys.exit(exit_number)
-
-
-def __save_pid():
-    try:
-        open("/var/run/moon_orchestrator.pid", "w").write(str(os.getpid()))
-    except PermissionError:
-        LOG.warning("You don't have the right to write PID file in /var/run... Continuing anyway.")
-        LOG.warning("Writing PID file in {}".format(os.getcwd()))
-        open("./moon_orchestrator.pid", "w").write(str(os.getpid()))
-
-
-def server():
 
+def main():
     configuration.init_logging()
-    conf = configuration.add_component("orchestrator", "orchestrator")
-    LOG.info("Starting main server {}".format(conf["components/orchestrator"]["hostname"]))
-
-    docker_manager = DockerManager()
-
-    docker_manager.load_all_containers()
-    serv = messenger.Server(containers=CONTAINERS, docker_manager=docker_manager, slaves=SLAVES)
     try:
-        serv.run()
-    finally:
-        _exit(0)
-
-
-def main():
-    server()
+        conf = configuration.get_configuration("components/orchestrator")
+        hostname = conf["components/orchestrator"].get("hostname", "orchestrator")
+        port = conf["components/orchestrator"].get("port", 80)
+        bind = conf["components/orchestrator"].get("bind", "127.0.0.1")
+    except exceptions.ConsulComponentNotFound:
+        hostname = "orchestrator"
+        bind = "127.0.0.1"
+        port = 80
+        configuration.add_component(uuid="orchestrator", name=hostname, port=port, bind=bind)
+    LOG.info("Starting server with IP {} on port {} bind to {}".format(hostname, port, bind))
+    server = HTTPServer(host=bind, port=port)
+    server.run()
 
 
 if __name__ == '__main__':
index c765327..29885a4 100644 (file)
@@ -1,12 +1,7 @@
-docker-py
-kombu !=4.0.1,!=4.0.0
-oslo.messaging !=5.14.0,!=5.13.0
-oslo.config
-oslo.log
-vine
-jinja2
-sqlalchemy
-pymysql
+flask
+flask_restful
+flask_cors
 werkzeug
 moon_utilities
-moon_db
\ No newline at end of file
+moon_db
+kubernetes
\ No newline at end of file