several bug fixes for release 1.1 59/46159/1
authorKoren Lev <korenlev@gmail.com>
Tue, 24 Oct 2017 10:35:42 +0000 (13:35 +0300)
committerKoren Lev <korenlev@gmail.com>
Tue, 24 Oct 2017 10:35:42 +0000 (13:35 +0300)
Change-Id: I433d41245107a68959efdcd6b56ce3348c7bbeb5
Signed-off-by: Koren Lev <korenlev@gmail.com>
app/connection_test/connection_test.py [new file with mode: 0644]
app/discover/clique_finder.py
app/discover/event_manager.py
app/discover/fetchers/api/api_access.py
app/discover/fetchers/db/db_access.py
app/discover/manager.py
app/discover/scan.py
app/discover/scan_manager.py
app/monitoring/setup/monitoring_setup_manager.py
app/utils/constants.py

diff --git a/app/connection_test/connection_test.py b/app/connection_test/connection_test.py
new file mode 100644 (file)
index 0000000..d9d6af7
--- /dev/null
@@ -0,0 +1,283 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems)   #
+# and others                                                                  #
+#                                                                             #
+# All rights reserved. This program and the accompanying materials            #
+# are made available under the terms of the Apache License, Version 2.0       #
+# which accompanies this distribution, and is available at                    #
+# http://www.apache.org/licenses/LICENSE-2.0                                  #
+###############################################################################
+import argparse
+import datetime
+from kombu import Connection
+
+import time
+
+import pymongo
+from functools import partial
+
+from discover.fetchers.api.api_access import ApiAccess
+from discover.fetchers.db.db_access import DbAccess
+from discover.manager import Manager
+from utils.constants import ConnectionTestStatus, ConnectionTestType
+from utils.logging.file_logger import FileLogger
+from utils.mongo_access import MongoAccess
+from utils.ssh_connection import *
+
+
+def test_openstack(config, test_request):
+    try:
+        api = ApiAccess(config)
+        ConnectionTest.report_success(test_request,
+                                      ConnectionTestType.OPENSTACK.value)
+        if api:
+            pass
+    except ValueError:
+        pass
+
+
+def test_mysql(config, test_request):
+    db_access = DbAccess(config)
+    ConnectionTest.report_success(test_request, ConnectionTestType.MYSQL.value)
+    if db_access:
+        pass
+
+
+def test_ssh_connect(config) -> bool:
+    ssh = SshConnection(config.get('host', ''),
+                        config.get('user', ''),
+                        _pwd=config.get('pwd'),
+                        _key=config.get('key'),
+                        _port=int(config.get('port',
+                                             SshConnection.DEFAULT_PORT)))
+    ret = ssh.connect()
+    return ret
+
+
+def test_cli(config, test_request):
+    ret = test_ssh_connect(config)
+    ConnectionTest.set_test_result(test_request,
+                                   ConnectionTestType.CLI.value,
+                                   ret)
+
+
+def test_amqp_connect(config):
+    connect_url = 'amqp://{user}:{pwd}@{host}:{port}//' \
+        .format(user=config.get("user", ''),
+                pwd=config.get('pwd', ''),
+                host=config.get('host', ''),
+                port=int(config.get('port', 5671)))
+    conn = Connection(connect_url)
+    conn.connect()
+
+
+def test_amqp(config, test_request):
+    test_amqp_connect(config)
+    ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value)
+
+
+def test_monitoring(config, test_request):
+    # for monitoring configuration test, need to test:
+    # 1. SSH access
+    # 2. RabbitMQ access
+    ssh_config = {
+        'host': config.get('server_ip'),
+        'user': config.get('ssh_user'),
+        'pwd': config.get('ssh_password'),
+        'port': int(config.get('ssh_port', 0))
+    }
+    if not test_ssh_connect(ssh_config):
+        return
+    amqp_connect_config = {
+        'user': config.get('rabbitmq_user', ''),
+        'pwd': config.get('rabbitmq_pass', ''),
+        'host': config.get('server_ip'),
+        'port': int(config.get('rabbitmq_port', 5672)),
+    }
+    test_amqp_connect(amqp_connect_config)
+    ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value)
+
+
+def test_aci(config, test_request):
+    pass
+
+
+TEST_HANDLERS = {
+    ConnectionTestType.OPENSTACK.value: test_openstack,
+    ConnectionTestType.MYSQL.value: test_mysql,
+    ConnectionTestType.CLI.value: test_cli,
+    ConnectionTestType.AMQP.value: test_amqp,
+    ConnectionTestType.ACI.value: test_aci,
+    ConnectionTestType.MONITORING.value: test_monitoring
+}
+
+
+class ConnectionTest(Manager):
+
+    DEFAULTS = {
+        'mongo_config': '',
+        'connection_tests': 'connection_tests',
+        'environments': 'environments_config',
+        'interval': 1,
+        'loglevel': 'WARNING'
+    }
+
+    def __init__(self):
+        self.args = self.get_args()
+        super().__init__(log_directory=self.args.log_directory,
+                         mongo_config_file=self.args.mongo_config)
+        self.db_client = None
+        self.connection_tests_collection = None
+        self.environments_collection = None
+
+    @staticmethod
+    def get_args():
+        parser = argparse.ArgumentParser()
+        parser.add_argument('-m', '--mongo_config', nargs='?', type=str,
+                            default=ConnectionTest.DEFAULTS['mongo_config'],
+                            help='Name of config file ' +
+                                 'with MongoDB server access details')
+        parser.add_argument('-c', '--connection_tests_collection', nargs='?',
+                            type=str,
+                            default=ConnectionTest.DEFAULTS['connection_tests'],
+                            help='connection_tests collection to read from')
+        parser.add_argument('-e', '--environments_collection', nargs='?',
+                            type=str,
+                            default=ConnectionTest.DEFAULTS['environments'],
+                            help='Environments collection to update '
+                                 'after tests')
+        parser.add_argument('-i', '--interval', nargs='?', type=float,
+                            default=ConnectionTest.DEFAULTS['interval'],
+                            help='Interval between collection polls'
+                                 '(must be more than {} seconds)'
+                                 .format(ConnectionTest.MIN_INTERVAL))
+        parser.add_argument('-l', '--loglevel', nargs='?', type=str,
+                            default=ConnectionTest.DEFAULTS['loglevel'],
+                            help='Logging level \n(default: {})'
+                                 .format(ConnectionTest.DEFAULTS['loglevel']))
+        parser.add_argument('-d', '--log_directory', nargs='?', type=str,
+                            default=FileLogger.LOG_DIRECTORY,
+                            help='File logger directory \n(default: {})'
+                                 .format(FileLogger.LOG_DIRECTORY))
+        args = parser.parse_args()
+        return args
+
+    def configure(self):
+        self.db_client = MongoAccess()
+        self.connection_tests_collection = \
+            self.db_client.db[self.args.connection_tests_collection]
+        self.environments_collection = \
+            self.db_client.db[self.args.environments_collection]
+        self._update_document = \
+            partial(MongoAccess.update_document,
+                    self.connection_tests_collection)
+        self.interval = max(self.MIN_INTERVAL, self.args.interval)
+        self.log.set_loglevel(self.args.loglevel)
+
+        self.log.info('Started ConnectionTest with following configuration:\n'
+                      'Mongo config file path: {0.args.mongo_config}\n'
+                      'connection_tests collection: '
+                      '{0.connection_tests_collection.name}\n'
+                      'Polling interval: {0.interval} second(s)'
+                      .format(self))
+
+    def _build_test_args(self, test_request: dict):
+        args = {
+            'mongo_config': self.args.mongo_config
+        }
+
+        def set_arg(name_from: str, name_to: str = None):
+            if name_to is None:
+                name_to = name_from
+            val = test_request.get(name_from)
+            if val:
+                args[name_to] = val
+
+        set_arg('object_id', 'id')
+        set_arg('log_level', 'loglevel')
+        set_arg('environment', 'env')
+        set_arg('scan_only_inventory', 'inventory_only')
+        set_arg('scan_only_links', 'links_only')
+        set_arg('scan_only_cliques', 'cliques_only')
+        set_arg('inventory')
+        set_arg('clear')
+        set_arg('clear_all')
+
+        return args
+
+    def _finalize_test(self, test_request: dict):
+        # update the status and timestamps.
+        self.log.info('Request {} has been tested.'
+                      .format(test_request['_id']))
+        start_time = test_request['submit_timestamp']
+        end_time = datetime.datetime.utcnow()
+        test_request['response_timestamp'] = end_time
+        test_request['response_time'] = \
+            str(end_time - start_time.replace(tzinfo=None))
+        test_request['status'] = ConnectionTestStatus.RESPONSE.value
+        self._update_document(test_request)
+
+    @staticmethod
+    def set_test_result(test_request, target, result):
+        test_request.get('test_results', {})[target] = result
+
+    @staticmethod
+    def report_success(test_request, target):
+        ConnectionTest.set_test_result(test_request, target, True)
+
+    @staticmethod
+    def handle_test_target(target, test_request):
+        targets_config = test_request.get('targets_configuration', [])
+        try:
+            config = next(t for t in targets_config if t['name'] == target)
+        except StopIteration:
+            raise ValueError('failed to find {} in targets_configuration'
+                             .format(target))
+        handler = TEST_HANDLERS.get(target)
+        if not handler:
+            raise ValueError('unknown test target: {}'.format(target))
+        handler(config, test_request)
+
+    def do_test(self, test_request):
+        targets = [t for t in test_request.get('test_targets', [])]
+        test_request['test_results'] = {t: False for t in targets}
+        for test_target in test_request.get('test_targets', []):
+            self.log.info('testing connection to: {}'.format(test_target))
+            try:
+                self.handle_test_target(test_target, test_request)
+            except Exception as e:
+                self.log.exception(e)
+                if 'errors' not in test_request:
+                    test_request['errors'] = {}
+                test_request['errors'][test_target] = str(e)
+                self.log.error('Test of target {} failed (id: {}):\n{}'
+                               .format(test_target,
+                                       test_request['_id'],
+                                       str(e)))
+        self._finalize_test(test_request)
+        self._set_env_operational(test_request['environment'])
+
+    # if environment_config document for this specific environment exists,
+    # update the value of the 'operational' field to 'running'
+    def _set_env_operational(self, env):
+        self.environments_collection. \
+            update_one({'name': env}, {'$set': {'operational': 'running'}})
+
+    def do_action(self):
+        while True:
+            # Find a pending request that is waiting the longest time
+            results = self.connection_tests_collection \
+                .find({'status': ConnectionTestStatus.REQUEST.value,
+                       'submit_timestamp': {'$ne': None}}) \
+                .sort('submit_timestamp', pymongo.ASCENDING) \
+                .limit(1)
+
+            # If no connection tests are pending, sleep for some time
+            if results.count() == 0:
+                time.sleep(self.interval)
+            else:
+                self.do_test(results[0])
+
+
+if __name__ == '__main__':
+    ConnectionTest().run()
index 47843e6..57b2e3b 100644 (file)
@@ -19,6 +19,7 @@ class CliqueFinder(Fetcher):
 
     def __init__(self):
         super().__init__()
+        self.env_config = None
         self.inv = InventoryMgr()
         self.inventory = self.inv.inventory_collection
         self.links = self.inv.collections["links"]
@@ -27,6 +28,10 @@ class CliqueFinder(Fetcher):
         self.clique_constraints = self.inv.collections["clique_constraints"]
         self.cliques = self.inv.collections["cliques"]
 
+    def set_env(self, env):
+        super().set_env(env)
+        self.env_config = self.configuration.environment
+
     def find_cliques_by_link(self, links_list):
         return self.links.find({'links': {'$in': links_list}})
 
@@ -43,23 +48,62 @@ class CliqueFinder(Fetcher):
             self.find_cliques_for_type(clique_type)
         self.log.info("finished scanning for cliques")
 
+    # Calculate priority score
+    def _get_priority_score(self, clique_type):
+        if self.env == clique_type['environment']:
+            return 4
+        if (self.env_config['distribution'] == clique_type.get('distribution') and
+            self.env_config['distribution_version'] == clique_type.get('distribution_version')):
+            return 3
+        if clique_type.get('mechanism_drivers') in self.env_config['mechanism_drivers']:
+            return 2
+        if self.env_config['type_drivers'] == clique_type.get('type_drivers'):
+            return 1
+        else:
+            return 0
+
+    # Get clique type with max priority
+    # for given environment configuration and focal point type
+    def _get_clique_type(self, focal_point, clique_types):
+        # If there's no configuration match for the specified environment,
+        # we use the default clique type definition with environment='ANY'
+        fallback_type = next(
+            filter(lambda t: t['environment'] == 'ANY', clique_types),
+            None
+        )
+        if not fallback_type:
+            raise ValueError("No fallback clique type (ANY) "
+                             "defined for focal point type '{}'"
+                             .format(focal_point))
+
+        clique_types.remove(fallback_type)
+
+        priority_scores = [self._get_priority_score(clique_type)
+                           for clique_type
+                           in clique_types]
+        max_score = max(priority_scores) if priority_scores else 0
+
+        return (fallback_type
+                if max_score == 0
+                else clique_types[priority_scores.index(max_score)])
+
     def get_clique_types(self):
         if not self.clique_types_by_type:
-            clique_types = self.clique_types \
-                .find({"environment": self.get_env()})
-            default_clique_types = \
-                self.clique_types.find({'environment': 'ANY'})
-            for clique_type in clique_types:
-                focal_point_type = clique_type['focal_point_type']
-                self.clique_types_by_type[focal_point_type] = clique_type
-            # if some focal point type does not have an explicit definition in
-            # clique_types for this specific environment, use the default
-            # clique type definition with environment=ANY
-            for clique_type in default_clique_types:
-                focal_point_type = clique_type['focal_point_type']
-                if focal_point_type not in self.clique_types_by_type:
-                    self.clique_types_by_type[focal_point_type] = clique_type
-            return self.clique_types_by_type
+            clique_types_by_focal_point = self.clique_types.aggregate([{
+                "$group": {
+                    "_id": "$focal_point_type",
+                    "types": {"$push": "$$ROOT"}
+                }
+            }])
+
+            self.clique_types_by_type = {
+                cliques['_id']: self._get_clique_type(cliques['_id'],
+                                                      cliques['types'])
+                for cliques in
+                clique_types_by_focal_point
+            }
+
+        return self.clique_types_by_type
 
     def find_cliques_for_type(self, clique_type):
         focal_point_type = clique_type["focal_point_type"]
index e2f8282..4855acc 100644 (file)
@@ -44,6 +44,7 @@ class EventManager(Manager):
             '6.0': DefaultListener,
             '7.0': DefaultListener,
             '8.0': DefaultListener,
+            '9.0': DefaultListener
         },
         'RDO': {
             'Mitaka': DefaultListener,
@@ -112,7 +113,8 @@ class EventManager(Manager):
     def get_listener(self, env: str):
         env_config = self.inv.get_env_config(env)
         return (self.LISTENERS.get(env_config.get('distribution'), {})
-                              .get(env_config.get('distribution_version')))
+                              .get(env_config.get('distribution_version',
+                                                  DefaultListener)))
 
     def listen_to_events(self, listener: ListenerBase, env_name: str, process_vars: dict):
         listener.listen({
index 84c4de3..f685faf 100644 (file)
@@ -36,24 +36,23 @@ class ApiAccess(Fetcher):
         "neutron": ["quantum"]\r
     }\r
 \r
-    # identitity API v2 version with admin token\r
-    def __init__(self):\r
+    # identity API v2 version with admin token\r
+    def __init__(self, config=None):\r
         super(ApiAccess, self).__init__()\r
         if ApiAccess.initialized:\r
             return\r
-        ApiAccess.config = Configuration()\r
+        ApiAccess.config = {'OpenStack': config} if config else Configuration()\r
         ApiAccess.api_config = ApiAccess.config.get("OpenStack")\r
-        host = ApiAccess.api_config["host"]\r
+        host = ApiAccess.api_config.get("host", "")\r
         ApiAccess.host = host\r
-        port = ApiAccess.api_config["port"]\r
+        port = ApiAccess.api_config.get("port", "")\r
         if not (host and port):\r
             raise ValueError('Missing definition of host or port ' +\r
                              'for OpenStack API access')\r
         ApiAccess.base_url = "http://" + host + ":" + port\r
-        ApiAccess.admin_token = ApiAccess.api_config["admin_token"]\r
-        ApiAccess.admin_project = ApiAccess.api_config["admin_project"] \\r
-            if "admin_project" in ApiAccess.api_config \\r
-            else 'admin'\r
+        ApiAccess.admin_token = ApiAccess.api_config.get("admin_token", "")\r
+        ApiAccess.admin_project = ApiAccess.api_config.get("admin_project",\r
+                                                           "admin")\r
         ApiAccess.admin_endpoint = "http://" + host + ":" + "35357"\r
 \r
         token = self.v2_auth_pwd(ApiAccess.admin_project)\r
@@ -97,7 +96,8 @@ class ApiAccess(Fetcher):
         if subject_token:\r
             return subject_token\r
         req_url = ApiAccess.base_url + "/v2.0/tokens"\r
-        response = requests.post(req_url, json=post_body, headers=headers)\r
+        response = requests.post(req_url, json=post_body, headers=headers,\r
+                                 timeout=5)\r
         response = response.json()\r
         ApiAccess.auth_response[project_id] = response\r
         if 'error' in response:\r
index 64d7372..090ab84 100644 (file)
@@ -40,11 +40,12 @@ class DbAccess(Fetcher):
 
     # connection timeout set to 30 seconds,
     # due to problems over long connections
-    TIMEOUT = 30
+    TIMEOUT = 5
 
-    def __init__(self):
+    def __init__(self, mysql_config=None):
         super().__init__()
-        self.config = Configuration()
+        self.config = {'mysql': mysql_config} if mysql_config \
+            else Configuration()
         self.conf = self.config.get("mysql")
         self.connect_to_db()
         self.neutron_db = self.get_neutron_db_name()
@@ -61,8 +62,9 @@ class DbAccess(Fetcher):
                                               database=_database,
                                               raise_on_warnings=True)
             DbAccess.conn.ping(True)  # auto-reconnect if necessary
-        except:
-            self.log.critical("failed to connect to MySQL DB")
+        except Exception as e:
+            self.log.critical("failed to connect to MySQL DB: {}"
+                              .format(str(e)))
             return
         DbAccess.query_count_per_con = 0
 
@@ -91,10 +93,9 @@ class DbAccess(Fetcher):
             DbAccess.conn = None
         self.conf = self.config.get("mysql")
         cnf = self.conf
-        cnf['schema'] = cnf['schema'] if 'schema' in cnf else 'nova'
-        self.db_connect(cnf["host"], cnf["port"],
-                        cnf["user"], cnf["pwd"],
-                        cnf["schema"])
+        self.db_connect(cnf.get('host', ''), cnf.get('port', ''),
+                        cnf.get('user', ''), cnf.get('pwd', ''),
+                        cnf.get('schema', 'nova'))
 
     @with_cursor
     def get_objects_list_for_id(self, query, object_type, object_id,
index e37bb31..503c8a8 100644 (file)
@@ -9,6 +9,7 @@
 # http://www.apache.org/licenses/LICENSE-2.0                                  #
 ###############################################################################
 from abc import ABC, abstractmethod
+import datetime
 
 from utils.logging.file_logger import FileLogger
 from utils.logging.full_logger import FullLogger
@@ -19,6 +20,14 @@ class Manager(ABC):
 
     MIN_INTERVAL = 0.1  # To prevent needlessly frequent scans
 
+    INTERVALS = {
+        'YEARLY': datetime.timedelta(days=365.25),
+        'MONTHLY': datetime.timedelta(days=365.25/12),
+        'WEEKLY': datetime.timedelta(weeks=1),
+        'DAILY': datetime.timedelta(days=1),
+        'HOURLY': datetime.timedelta(hours=1)
+    }
+
     def __init__(self, log_directory: str = None,
                  mongo_config_file: str = None):
         super().__init__()
index 6c40a7f..49f37ff 100755 (executable)
@@ -319,13 +319,20 @@ class ScanController(Fetcher):
         SshConnection.disconnect_all()
         status = 'ok' if not scanner.found_errors.get(env_name, False) \
             else 'errors detected'
+        if status == 'ok' and scan_plan.object_type == "environment":
+            self.mark_env_scanned(scan_plan.env)
         self.log.info('Scan completed, status: {}'.format(status))
         return True, status
 
+    def mark_env_scanned(self, env):
+        environments_collection = self.inv.collection['environments_config']
+        environments_collection \
+            .update_one(filter={'name': env},
+                        update={'$set': {'scanned': True}})
 
 if __name__ == '__main__':
-    scan_manager = ScanController()
-    ret, msg = scan_manager.run()
+    scan_controller = ScanController()
+    ret, msg = scan_controller.run()
     if not ret:
-        scan_manager.log.error(msg)
+        scan_controller.log.error(msg)
     sys.exit(0 if ret else 1)
index 12dbec0..6c46d47 100644 (file)
@@ -170,14 +170,6 @@ class ScanManager(Manager):
                 .update_many(filter={'name': {'$in': env_scans}},
                              update={'$set': {'scanned': False}})
 
-    INTERVALS = {
-        'YEARLY': datetime.timedelta(days=365.25),
-        'MONTHLY': datetime.timedelta(days=365.25/12),
-        'WEEKLY': datetime.timedelta(weeks=1),
-        'DAILY': datetime.timedelta(days=1),
-        'HOURLY': datetime.timedelta(hours=1)
-    }
-
     def _submit_scan_request_for_schedule(self, scheduled_scan, interval, ts):
         scans = self.scans_collection
         new_scan = {
index 9faf5b8..bc4fe01 100644 (file)
@@ -40,14 +40,13 @@ class MonitoringSetupManager(MonitoringHandler):
         if self.provision == self.provision_levels['none']:
             self.log.debug('Monitoring config setup skipped')
             return
-        sensu_server_files = [
-            'transport.json',
-            'client.json',
-            'rabbitmq.json',
-            'handlers.json',
-            'redis.json',
-            'api.json'
-        ]
+        sensu_server_files_templates = \
+            self.inv.find({'side': 'server'},
+                          projection={'type': 1},
+                          collection='monitoring_config_templates')
+        sensu_server_files = []
+        for f in sensu_server_files_templates:
+            sensu_server_files.append(f.get('type', ''))
         conf = self.env_monitoring_config
         is_container = bool(conf.get('ssh_user', ''))
         server_host = conf['server_ip']
index 01bf09f..5b53921 100644 (file)
@@ -18,6 +18,20 @@ class StringEnum(Enum):
         return repr(self.value)
 
 
+class ConnectionTestType(StringEnum):
+    AMQP = "AMQP"
+    CLI = "CLI"
+    ACI = "ACI"
+    MYSQL = "mysql"
+    OPENSTACK = "OpenStack"
+    MONITORING = "Monitoring"
+
+
+class ConnectionTestStatus(StringEnum):
+    REQUEST = "request"
+    RESPONSE = "response"
+
+
 class ScanStatus(StringEnum):
     DRAFT = "draft"
     PENDING = "pending"