Merge "Add doctor-test package"
authorTomi Juvonen <tomi.juvonen@nokia.com>
Tue, 5 Sep 2017 12:07:27 +0000 (12:07 +0000)
committerGerrit Code Review <gerrit@opnfv.org>
Tue, 5 Sep 2017 12:07:27 +0000 (12:07 +0000)
43 files changed:
.gitignore
docs/development/design/index.rst
docs/development/design/maintenance-design-guideline.rst [new file with mode: 0644]
docs/development/manuals/monitors.rst [new file with mode: 0644]
etc/doctor.sample.conf
tests/alarm.py
tests/common/__init__.py [new file with mode: 0644]
tests/common/constants.py [new file with mode: 0644]
tests/common/utils.py [new file with mode: 0644]
tests/config.py
tests/consumer/__init__.py
tests/consumer/base.py [new file with mode: 0644]
tests/consumer/sample.py [new file with mode: 0644]
tests/inspector.py
tests/inspector/__init__.py
tests/inspector/congress.py [new file with mode: 0644]
tests/inspector/sample.py
tests/installer/__init__.py [new file with mode: 0644]
tests/installer/apex.py [new file with mode: 0644]
tests/installer/base.py [new file with mode: 0644]
tests/installer/common/congress.py [new file with mode: 0644]
tests/installer/common/restore_ceilometer.py [new file with mode: 0644]
tests/installer/common/set_ceilometer.py [new file with mode: 0644]
tests/installer/local.py [new file with mode: 0644]
tests/lib/installers/apex
tests/lib/monitor [new file with mode: 0644]
tests/lib/monitors/collectd/collectd [new file with mode: 0644]
tests/lib/monitors/collectd/collectd_plugin.py [new file with mode: 0644]
tests/lib/monitors/sample/monitor.py [moved from tests/monitor.py with 100% similarity]
tests/lib/monitors/sample/sample [new file with mode: 0644]
tests/logger.py
tests/main.py
tests/monitor/__init__.py [new file with mode: 0644]
tests/monitor/base.py [new file with mode: 0644]
tests/monitor/collectd.py [new file with mode: 0644]
tests/monitor/sample.py [new file with mode: 0644]
tests/os_clients.py
tests/profiler_poc.py [moved from tests/profiler-poc.py with 87% similarity]
tests/run.sh
tests/scenario/__init__.py [new file with mode: 0644]
tests/scenario/common.py [new file with mode: 0644]
tests/scenario/network_failure.py [new file with mode: 0644]
tox.ini

index 83868b5..7e89386 100644 (file)
@@ -10,5 +10,8 @@
 /releng/
 /tests/*.img
 
+#IntelJ Idea
+.idea/
+
 #Build results
 .tox
index 87d14d4..e50c170 100644 (file)
@@ -26,3 +26,4 @@ See also https://wiki.opnfv.org/requirements_projects .
    port-data-plane-status.rst
    inspector-design-guideline.rst
    performance-profiler.rst
+   maintenance-design-guideline.rst
diff --git a/docs/development/design/maintenance-design-guideline.rst b/docs/development/design/maintenance-design-guideline.rst
new file mode 100644 (file)
index 0000000..93c3cf4
--- /dev/null
@@ -0,0 +1,155 @@
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. http://creativecommons.org/licenses/by/4.0
+
+====================================
+Planned Maintenance Design Guideline
+====================================
+
+.. NOTE::
+   This is spec draft of design guideline for planned maintenance.
+   JIRA ticket to track the update and collect comments: `DOCTOR-52`_.
+
+This document describes how one can implement planned maintenance by utilizing
+the `OPNFV Doctor project`_. framework and to meet the set requirements.
+
+Problem Description
+===================
+
+Telco application need to know when planned maintenance is going to happen in
+order to guarantee zero down time in its operation. It needs to be possible to
+make own actions to have application running on not affected resource or give
+guidance to admin actions like migration. More details are defined in
+requirement documentation: `use cases`_, `architecture`_ and `implementation`_.
+Also discussion in the OPNFV summit about `planned maintenance session`_.
+
+Guidelines
+==========
+
+Cloud admin needs to make a notification about planned maintenance including
+all details that application needs in order to make decisions upon his affected
+service. This notification payload can be consumed by application by subscribing
+to corresponding event alarm trough alarming service like OpenStack AODH.
+
+Before maintenance starts application needs to be able to make switch over for
+his ACT-STBY service affected, do operation to move service to not effected part
+of infra or give a hint for admin operation like migration that can be
+automatically issued by admin tool according to agreed policy.
+
+Flow diagram::
+
+  admin alarming project  controller  inspector
+    |   service  app manager   |           |
+    |  1.   |         |        |           |
+    +------------------------->+           |
+    +<-------------------------+           |
+    |  2.   |         |        |           |
+    +------>+    3.   |        |           |
+    |       +-------->+   4.   |           |
+    |       |         +------->+           |
+    |       |    5.   +<-------+           |
+    +<----------------+        |           |
+    |                 |   6.   |           |
+    +------------------------->+           |
+    +<-------------------------+     7.    |
+    +------------------------------------->+
+    |   8.  |         |        |           |
+    +------>+    9.   |        |           |
+    |       +-------->+        |           |
+    +--------------------------------------+
+    |                10.                   |
+    +--------------------------------------+
+    |  11.  |         |        |           |
+    +------------------------->+           |
+    +<-------------------------+           |
+    |  12.  |         |        |           |
+    +------>+-------->+        |    13.    |
+    +------------------------------------->+
+    +-------+---------+--------+-----------+
+
+Concepts used below:
+
+- `full maintenance`: This means maintenance will take a longer time and
+  resource should be emptied, meaning container or VM need to be moved or
+  deleted. Admin might need to test resource to work after maintenance.
+
+- `reboot`: Only a reboot is needed and admin does not need separate testing
+  after that. Container or VM can be left in place if so wanted.
+
+- `notification`: Notification to rabbitmq.
+
+Admin makes a planned maintenance session where he sets
+a `maintenance_session_id` that is a unique ID for all the hardware resources he
+is going to have the maintenance at the same time. Mostly maintenance should be
+done node by node, meaning a single compute node at a time would be in single
+planned maintenance session having unique `maintenance_session_id`. This ID will
+be carried trough the whole session in all places and can be used to query
+maintenance in admin tool API. Project running a Telco application should set
+a specific role for admin tool to know it cannot do planned maintenance unless
+project has agreed actions to be done for its VMs or containers. This means the
+project has configured itself to get alarms upon planned maintenance and it is
+capable of agreeing needed actions. Admin is supposed to use an admin tool to
+automate maintenance process partially or entirely.
+
+The flow of a successful planned maintenance session as in OpenStack example
+case:
+
+1.  Admin disables nova-compute in order to do planned maintenance on a compute
+    host and gets ACK from the API call. This action needs to be done to ensure
+    no thing will be placed in this compute host by any user. Action is always
+    done regardless the whole compute will be affected or not.
+2.  Admin sends a project specific maintenance notification with state
+    `planned maintenance`. This includes detailed information about maintenance,
+    like when it is going to start, is it `reboot` or `full maintenance`
+    including the information about project containers or VMs running on host or
+    the part of it that will need maintenance. Also default action like
+    migration will be mentioned that will be issued by admin before maintenance
+    starts if no other action is set by project. In case project has a specific
+    role set, planned maintenance cannot start unless project has agreed the
+    admin action. Available admin actions are also listed in notification.
+3.  Application manager of the project receives AODH alarm about the same.
+4.  Application manager can do switch over to his ACT-STBY service, delete and
+    re-instantiate his service on not affected resource if so wanted.
+5.  Application manager may call admin tool API to give preferred instructions
+    for leaving VMs and containers in place or do admin action to migrate them.
+    In case admin does not receive this instruction before maintenance is to
+    start it will do the pre-configured default action like migration to
+    projects without a specific role to say project need to agree the action.
+    VMs or Containers can be left on host if type of maintenance is just `reboot`.
+6.  Admin does possible actions to VMs and containers and receives an ACK.
+7.  In case everything went ok, Admin sends admin type of maintenance
+    notification with state `in maintenance`. This notification can be consumed
+    by Inspector and other cloud services to know there is ongoing maintenance
+    which means things like automatic fault management actions for the hardware
+    resources should be disabled.
+8.  If maintenance type is `reboot` and project is still having containers or
+    VMs running on affected hardware resource, Admin sends project specific
+    maintenance notification with state updated to `in maintenance`. If project
+    do not have anything left running on affected hardware resource, state will
+    be `maintenance over` instead. If maintenance can not be performed for some
+    reason state should be `maintenance cancelled`. In this case last operation
+    remaining for admin is to re-enable nova-compute service, ensure
+    everything is running and not to proceed any further steps.
+9.  Application manager of the project receives AODH alarm about the same.
+10. Admin will do the maintenance. This is out of Doctor scope.
+11. Admin enables nova-compute service when maintenance is over and host can be
+    put back to production. An ACK is received from API call.
+12. In case project had left containers or VMs on hardware resource over
+    maintenance, Admin sends project specific maintenance notification with
+    state updated to `maintenance over`.
+13. Admin sends admin type of maintenance notification with state updated to
+    `maintenance over`. Inspector and other
+    cloud services can consume this to know hardware resource is back in use.
+
+POC
+---
+
+There was a `Maintenance POC`_ for planned maintenance in the OPNFV Beijing
+summit to show the basic concept of using framework defined by the project.
+
+.. _DOCTOR-52: https://jira.opnfv.org/browse/DOCTOR-52
+.. _OPNFV Doctor project: https://wiki.opnfv.org/doctor
+.. _use cases: http://artifacts.opnfv.org/doctor/docs/requirements/02-use_cases.html#nvfi-maintenance
+.. _architecture: http://artifacts.opnfv.org/doctor/docs/requirements/03-architecture.html#nfvi-maintenance
+.. _implementation:  http://artifacts.opnfv.org/doctor/docs/requirements/05-implementation.html#nfvi-maintenance
+.. _planned maintenance session: https://lists.opnfv.org/pipermail/opnfv-tech-discuss/2017-June/016677.html
+.. _Maintenance POC: https://wiki.opnfv.org/download/attachments/5046291/Doctor%20Maintenance%20PoC%202017.pptx?version=1&modificationDate=1498182869000&api=v2
diff --git a/docs/development/manuals/monitors.rst b/docs/development/manuals/monitors.rst
new file mode 100644 (file)
index 0000000..0d22b1d
--- /dev/null
@@ -0,0 +1,36 @@
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. http://creativecommons.org/licenses/by/4.0
+
+Monitor Types and Limitations
+=============================
+
+Currently there are two monitor types supported: sample and collectd
+
+Sample Monitor
+--------------
+
+Sample monitor type pings the compute host from the control host and calculates the
+notification time after the ping timeout.
+Also if inspector type is sample, the compute node needs to communicate with the control
+node on port 12345. This port needs to be opened for incomming traffic on control node.
+
+Collectd Monitor
+----------------
+
+Collectd monitor type uses collectd daemon running ovs_events plugin. Collectd runs on
+compute to send instant notification to the control node. The notification time is
+calculated by using the difference of time at which compute node sends notification to
+control node and the time at which consumer is notified. The time on control and compute
+node has to be synchronized for this reason. For further details on setting up collectd
+on the compute node, use the following link:
+http://docs.opnfv.org/en/stable-danube/submodules/barometer/docs/release/userguide/feature.userguide.html#id18
+
+Collectd monitors an interface managed by OVS. If the interface is not be assigned
+an IP, the user has to provide the name of interface to be monitored. The command to
+launch the doctor test in that case is:
+MONITOR_TYPE=collectd INSPECTOR_TYPE=sample INTERFACE_NAME=example_iface ./run.sh
+
+If the interface name or IP is not provided, the collectd monitor type will monitor the
+default management interface. This may result in the failure of doctor run.sh test case.
+The test case sets the monitored interface down and if the inspector (sample or congress)
+is running on the same subnet, collectd monitor will not be able to communicate with it.
index 8a1ddc3..f81d3b2 100644 (file)
@@ -7,20 +7,46 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 [DEFAULT]
-image_name = cirros
-image_format = qcow2
-image_filename = cirros.img
-image_download_url = https://launchpad.net/cirros/trunk/0.3.0/+download/cirros-0.3.0-x86_64-disk.img
+#image_name = cirros
+#image_format = qcow2
+#image_filename = cirros.img
+#image_download_url = https://launchpad.net/cirros/trunk/0.3.0/+download/cirros-0.3.0-x86_64-disk.img
 
-glance_version = 2
-nova_version = 2.34
+#glance_version = 2
+#nova_version = 2.34
+#aodh_version = 2
 
-doctor_user = doctor
-doctor_passwd = doctor
-doctor_project = doctor
-doctor_role = _member_
-quota_instances = 1
-quota_cores = 1
+#doctor_user = doctor
+#doctor_passwd = doctor
+#doctor_project = doctor
+#doctor_role = _member_
+#quota_instances = 1
+#quota_cores = 1
 
+#net_name = doctor_net
+#net_cidr = 192.168.168.0/24
+#flavor = m1.tiny
+#instance_count = 1
+#instance_basename = doctor_vm
 
+#alarm_basename = doctor_alarm
 
+#profiler_type = poc
+
+[installer]
+#type = local
+#ip = 127.0.0.1
+#username = root
+
+[monitor]
+#type = sample
+
+[inspector]
+#type = sample
+#ip = 127.0.0.1
+#port = 12345
+
+[consumer]
+#type = sample
+#ip = 127.0.0.1
+#port = 12346
index 0582085..916f440 100644 (file)
@@ -26,9 +26,7 @@ class Alarm(object):
     def __init__(self, conf, log):
         self.conf = conf
         self.log = log
-        self.auth = get_identity_auth(username=self.conf.doctor_user,
-                                      password=self.conf.doctor_passwd,
-                                      project=self.conf.doctor_project)
+        self.auth = get_identity_auth(project=self.conf.doctor_project)
         self.aodh = \
             aodh_client(conf.aodh_version,
                         get_session(auth=self.auth))
diff --git a/tests/common/__init__.py b/tests/common/__init__.py
new file mode 100644 (file)
index 0000000..e68a307
--- /dev/null
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
\ No newline at end of file
diff --git a/tests/common/constants.py b/tests/common/constants.py
new file mode 100644 (file)
index 0000000..72d037a
--- /dev/null
@@ -0,0 +1,12 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from collections import namedtuple
+
+
+Host = namedtuple('Host', ['name', 'ip'])
diff --git a/tests/common/utils.py b/tests/common/utils.py
new file mode 100644 (file)
index 0000000..2e823ac
--- /dev/null
@@ -0,0 +1,105 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+import os
+import paramiko
+import re
+
+
+def load_json_file(full_path):
+    """Loads JSON from file
+    :param target_filename:
+    :return:
+    """
+    if not os.path.isfile(full_path):
+        raise Exception('File(%s) does not exist' % full_path)
+
+    with open(full_path, 'r') as file:
+        return json.load(file)
+
+
+def write_json_file(full_path, data):
+    """write JSON from file
+    :param target_filename:
+    :return:
+    """
+
+    with open(full_path, 'w+') as file:
+        file.write(json.dumps(data))
+
+
+def match_rep_in_file(regex, full_path):
+    if not os.path.isfile(full_path):
+        raise Exception('File(%s) does not exist' % full_path)
+
+    with open(full_path, 'r') as file:
+        for line in file:
+            result = re.search(regex, line)
+            if result:
+                return result
+
+    return None
+
+
+class SSHClient(object):
+    def __init__(self, ip, username, password=None, pkey=None,
+                 key_filename=None, log=None, look_for_keys=False,
+                 allow_agent=False):
+        self.client = paramiko.SSHClient()
+        self.client.load_system_host_keys()
+        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        self.client.connect(ip, username=username, password=password,
+                            pkey=pkey, key_filename=key_filename,
+                            look_for_keys=look_for_keys,
+                            allow_agent=allow_agent)
+        self.log = log
+
+    def __del__(self):
+        self.client.close()
+
+    def ssh(self, command):
+        if self.log:
+            self.log.info("Executing: %s" % command)
+        stdin, stdout, stderr = self.client.exec_command(command)
+        ret = stdout.channel.recv_exit_status()
+        output = list()
+        for line in stdout.read().splitlines():
+            output.append(line.decode('utf-8'))
+        if ret:
+            if self.log:
+                self.log.info("*** FAILED to run command %s (%s)" % (command, ret))
+            raise Exception(
+                "Unable to run \ncommand: %s\nret: %s"
+                % (command, ret))
+        if self.log:
+            self.log.info("*** SUCCESSFULLY run command %s" % command)
+        return ret, output
+
+    def scp(self, source, dest, method='put'):
+        if self.log:
+            self.log.info("Copy %s -> %s" % (source, dest))
+        ftp = self.client.open_sftp()
+        if method == 'put':
+            ftp.put(source, dest)
+        elif method == 'get':
+            ftp.get(source, dest)
+        ftp.close()
+
+
+def run_async(func):
+    from threading import Thread
+    from functools import wraps
+
+    @wraps(func)
+    def async_func(*args, **kwargs):
+        thread = Thread(target=func, args=args, kwargs=kwargs)
+        thread.start()
+        return thread
+
+    return async_func
index 969d829..c71d5ad 100644 (file)
@@ -14,23 +14,29 @@ import alarm
 import consumer\r
 import image\r
 import instance\r
+import installer\r
 import network\r
 import inspector\r
+import monitor\r
 import os_clients\r
+import profiler_poc\r
 import user\r
 \r
 \r
 def list_opts():\r
     return [\r
-        ('consumer', consumer.OPTS),\r
+        ('installer', installer.OPTS),\r
+        ('monitor', monitor.OPTS),\r
         ('inspector', inspector.OPTS),\r
+        ('consumer', consumer.OPTS),\r
         ('DEFAULT', itertools.chain(\r
             os_clients.OPTS,\r
             image.OPTS,\r
             user.OPTS,\r
             network.OPTS,\r
             instance.OPTS,\r
-            alarm.OPTS))\r
+            alarm.OPTS,\r
+            profiler_poc.OPTS))\r
     ]\r
 \r
 \r
index 68cc5dc..ccec864 100644 (file)
@@ -7,6 +7,7 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 from oslo_config import cfg
+from oslo_utils import importutils
 
 
 OPTS = [
@@ -24,3 +25,13 @@ OPTS = [
                help='the port of doctor consumer',
                required=True),
 ]
+
+
+_consumer_name_class_mapping = {
+    'sample': 'consumer.sample.SampleConsumer'
+}
+
+
+def get_consumer(conf, log):
+    consumer_class = _consumer_name_class_mapping.get(conf.consumer.type)
+    return importutils.import_object(consumer_class, conf, log)
\ No newline at end of file
diff --git a/tests/consumer/base.py b/tests/consumer/base.py
new file mode 100644 (file)
index 0000000..3517074
--- /dev/null
@@ -0,0 +1,26 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import abc
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseConsumer(object):
+
+    def __init__(self, conf, log):
+        self.conf = conf
+        self.log = log
+
+    @abc.abstractmethod
+    def start(self):
+        pass
+
+    @abc.abstractmethod
+    def stop(self):
+        pass
\ No newline at end of file
diff --git a/tests/consumer/sample.py b/tests/consumer/sample.py
new file mode 100644 (file)
index 0000000..20ad9d5
--- /dev/null
@@ -0,0 +1,71 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from flask import Flask
+from flask import request
+import json
+import time
+from threading import Thread
+import requests
+
+from consumer.base import BaseConsumer
+
+
+class SampleConsumer(BaseConsumer):
+
+    def __init__(self, conf, log):
+        super(SampleConsumer, self).__init__(conf, log)
+        self.app = None
+
+    def start(self):
+        self.log.info('sample consumer start......')
+        self.app = ConsumerApp(self.conf.consumer.port, self, self.log)
+        self.app.start()
+
+    def stop(self):
+        self.log.info('sample consumer stop......')
+        if not self.app:
+            return
+        headers = {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json',
+        }
+        url = 'http://%s:%d/shutdown'\
+              % (self.conf.consumer.ip,
+                 self.conf.consumer.port)
+        requests.post(url, data='', headers=headers)
+
+
+class ConsumerApp(Thread):
+
+    def __init__(self, port, consumer, log):
+        Thread.__init__(self)
+        self.port = port
+        self.consumer = consumer
+        self.log = log
+
+    def run(self):
+        app = Flask('consumer')
+
+        @app.route('/failure', methods=['POST'])
+        def event_posted():
+            self.log.info('doctor consumer notified at %s' % time.time())
+            self.log.info('sample consumer received data = %s' % request.data)
+            data = json.loads(request.data.decode('utf8'))
+            return 'OK'
+
+        @app.route('/shutdown', methods=['POST'])
+        def shutdown():
+            self.log.info('shutdown consumer app server at %s' % time.time())
+            func = request.environ.get('werkzeug.server.shutdown')
+            if func is None:
+                raise RuntimeError('Not running with the Werkzeug Server')
+            func()
+            return 'consumer app shutting down...'
+
+        app.run(host="0.0.0.0", port=self.port)
\ No newline at end of file
index a61051f..82ffc33 100644 (file)
@@ -116,8 +116,7 @@ def get_args():
 
 def main():
     args = get_args()
-    app.run(port=args.port)
-
+    app.run(host='0.0.0.0', port=args.port)
 
 if __name__ == '__main__':
     main()
index 35bdb5b..afba480 100644 (file)
@@ -11,6 +11,7 @@ import os
 from oslo_config import cfg
 from oslo_utils import importutils
 
+
 OPTS = [
     cfg.StrOpt('type',
                default=os.environ.get('INSPECTOR_TYPE', 'sample'),
@@ -19,7 +20,7 @@ OPTS = [
                required=True),
     cfg.StrOpt('ip',
                default='127.0.0.1',
-               help='the ip of default inspector',
+               help='the host ip of inspector',
                required=False),
     cfg.StrOpt('port',
                default='12345',
@@ -30,8 +31,10 @@ OPTS = [
 
 _inspector_name_class_mapping = {
     'sample': 'inspector.sample.SampleInspector',
+    'congress': 'inspector.congress.CongressInspector',
 }
 
+
 def get_inspector(conf, log):
     inspector_class = _inspector_name_class_mapping[conf.inspector.type]
     return importutils.import_object(inspector_class, conf, log)
diff --git a/tests/inspector/congress.py b/tests/inspector/congress.py
new file mode 100644 (file)
index 0000000..ae29585
--- /dev/null
@@ -0,0 +1,94 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from identity_auth import get_identity_auth
+from identity_auth import get_session
+from os_clients import congress_client
+
+from inspector.base import BaseInspector
+
+
+class CongressInspector(BaseInspector):
+    nova_api_min_version = '2.11'
+    doctor_driver = 'doctor'
+    doctor_datasource = 'doctor'
+    policy = 'classification'
+    rules = {
+        'host_down':
+            'host_down(host) :- doctor:events(hostname=host, type="compute.host.down", status="down")',
+        'active_instance_in_host':
+            'active_instance_in_host(vmid, host) :- nova:servers(id=vmid, host_name=host, status="ACTIVE")',
+        'host_force_down':
+            'execute[nova:services.force_down(host, "nova-compute", "True")] :- host_down(host)',
+        'error_vm_states':
+            'execute[nova:servers.reset_state(vmid, "error")] :- host_down(host), active_instance_in_host(vmid, host)'
+    }
+
+    def __init__(self, conf, log):
+        super(CongressInspector, self).__init__(conf, log)
+        self.auth = get_identity_auth()
+        self.congress = congress_client(get_session(auth=self.auth))
+        self._init_driver_and_ds()
+        self.inspector_url = self.get_inspector_url()
+
+    def _init_driver_and_ds(self):
+        datasources = \
+            {ds['name']: ds for ds in self.congress.list_datasources()['results']}
+
+        # check nova_api version
+        nova_api_version = datasources['nova']['config'].get('api_version')
+        if nova_api_version and nova_api_version < self.nova_api_min_version:
+            raise Exception('Congress Nova datasource API version < nova_api_min_version(%s)'
+                            % self.nova_api_min_version)
+
+        # create doctor datasource if it's not exist
+        if self.doctor_datasource not in datasources:
+            self.congress.create_datasource(
+                body={'driver': self.doctor_driver,
+                      'name': self.doctor_datasource})
+
+        # check whether doctor driver exist
+        drivers = \
+            {driver['id']: driver for driver in self.congress.list_drivers()['results']}
+        if self.doctor_driver not in drivers:
+            raise Exception('Do not support doctor driver in congress')
+
+        self.policy_rules = \
+            {rule['name']: rule for rule in
+             self.congress.list_policy_rules(self.policy)['results']}
+
+    def get_inspector_url(self):
+        ds = self.congress.list_datasources()['results']
+        doctor_ds = next((item for item in ds if item['driver'] == 'doctor'),
+                         None)
+        congress_endpoint = self.congress.httpclient.get_endpoint(auth=self.auth)
+        return ('%s/v1/data-sources/%s/tables/events/rows' %
+                (congress_endpoint, doctor_ds['id']))
+
+    def start(self):
+        self.log.info('congress inspector start......')
+
+        for rule_name, rule in self.rules.items():
+            self._add_rule(rule_name, rule)
+
+    def stop(self):
+        self.log.info('congress inspector stop......')
+
+        for rule_name in self.rules.keys():
+            self._del_rule(rule_name)
+
+    def _add_rule(self, rule_name, rule):
+        if rule_name not in self.policy_rules:
+            self.congress.create_policy_rule(self.policy,
+                                             body={'name': rule_name,
+                                                   'rule': rule})
+
+    def _del_rule(self, rule_name):
+        if rule_name in self.policy_rules:
+            rule_id = self.policy_rules[rule_name]['id']
+            self.congress.delete_policy_rule(self.policy, rule_id)
index db477de..1c05ced 100644 (file)
@@ -14,9 +14,11 @@ import time
 from threading import Thread
 import requests
 
+from common import utils
 from identity_auth import get_identity_auth
 from identity_auth import get_session
 from os_clients import nova_client
+from os_clients import neutron_client
 from inspector.base import BaseInspector
 
 
@@ -31,6 +33,10 @@ class SampleInspector(BaseInspector):
         # Normally we use this client for non redundant API calls
         self.nova = self.novaclients[0]
 
+        auth = get_identity_auth(project=self.conf.doctor_project)
+        session = get_session(auth=auth)
+        self.neutron = neutron_client(session)
+
         self.servers = collections.defaultdict(list)
         self.hostnames = list()
         self.app = None
@@ -86,37 +92,49 @@ class SampleInspector(BaseInspector):
             event_type = event['type']
             if event_type == self.event_type:
                 self.hostnames.append(hostname)
-                self.disable_compute_host(hostname)
-
-    def disable_compute_host(self, hostname):
-        threads = []
-        if len(self.servers[hostname]) > self.NUMBER_OF_CLIENTS:
-            # TODO(tojuvone): This could be enhanced in future with dynamic
-            # reuse of self.novaclients when all threads in use
-            self.log.error('%d servers in %s. Can handle only %d'%(
-                           self.servers[hostname], hostname, self.NUMBER_OF_CLIENTS))
-        for nova, server in zip(self.novaclients, self.servers[hostname]):
-            t = ThreadedResetState(nova, "error", server, self.log)
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
+                thr1 = self._disable_compute_host(hostname)
+                thr2 = self._vms_reset_state('error', hostname)
+                thr3 = self._set_ports_data_plane_status('DOWN', hostname)
+                thr1.join()
+                thr2.join()
+                thr3.join()
+
+    @utils.run_async
+    def _disable_compute_host(self, hostname):
         self.nova.services.force_down(hostname, 'nova-compute', True)
         self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time()))
 
+    @utils.run_async
+    def _vms_reset_state(self, state, hostname):
 
-class ThreadedResetState(Thread):
+        @utils.run_async
+        def _vm_reset_state(nova, server, state):
+            nova.servers.reset_state(server, state)
+            self.log.info('doctor mark vm(%s) error at %s' % (server, time.time()))
 
-    def __init__(self, nova, state, server, log):
-        Thread.__init__(self)
-        self.nova = nova
-        self.state = state
-        self.server = server
-        self.log = log
+        thrs = []
+        for nova, server in zip(self.novaclients, self.servers[hostname]):
+            t = _vm_reset_state(nova, server, state)
+            thrs.append(t)
+        for t in thrs:
+            t.join()
 
-    def run(self):
-        self.nova.servers.reset_state(self.server, self.state)
-        self.log.info('doctor mark vm(%s) error at %s' % (self.server, time.time()))
+    @utils.run_async
+    def _set_ports_data_plane_status(self, status, hostname):
+        body = {'data_plane_status': status}
+
+        @utils.run_async
+        def _set_port_data_plane_status(port_id):
+            self.neutron.update_port(port_id, body)
+            self.log.info('doctor set data plane status %s on port %s' % (status, port_id))
+
+        thrs = []
+        params = {'binding:host_id': hostname}
+        for port_id in self.neutron.list_ports(**params):
+            t = _set_port_data_plane_status(port_id)
+            thrs.append(t)
+        for t in thrs:
+            t.join()
 
 
 class InspectorApp(Thread):
@@ -135,7 +153,7 @@ class InspectorApp(Thread):
             self.log.info('event posted in sample inspector at %s' % time.time())
             self.log.info('sample inspector = %s' % self.inspector)
             self.log.info('sample inspector received data = %s' % request.data)
-            events = json.loads(request.data)
+            events = json.loads(request.data.decode('utf8'))
             self.inspector.handle_events(events)
             return "OK"
 
diff --git a/tests/installer/__init__.py b/tests/installer/__init__.py
new file mode 100644 (file)
index 0000000..bb0e452
--- /dev/null
@@ -0,0 +1,38 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+
+from oslo_config import cfg
+from oslo_utils import importutils
+
+OPTS = [
+    cfg.StrOpt('type',
+               default=os.environ.get('INSTALLER_TYPE', 'local'),
+               choices=['local', 'apex'],
+               help='the type of installer',
+               required=True),
+    cfg.StrOpt('ip',
+               default=os.environ.get('INSTALLER_IP', '127.0.0.1'),
+               help='the ip of installer'),
+    cfg.StrOpt('username',
+               default='root',
+               help='the user name for login installer server',
+               required=True),
+]
+
+
+_installer_name_class_mapping = {
+    'local': 'installer.local.LocalInstaller',
+    'apex': 'installer.apex.ApexInstaller'
+}
+
+
+def get_installer(conf, log):
+    installer_class = _installer_name_class_mapping[conf.installer.type]
+    return importutils.import_object(installer_class, conf, log)
\ No newline at end of file
diff --git a/tests/installer/apex.py b/tests/installer/apex.py
new file mode 100644 (file)
index 0000000..98eb6c9
--- /dev/null
@@ -0,0 +1,127 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import getpass
+import grp
+import os
+import pwd
+import stat
+import subprocess
+import sys
+
+from common.utils import SSHClient
+from installer.base import BaseInstaller
+
+
+class ApexInstaller(BaseInstaller):
+    node_user_name = 'heat-admin'
+    cm_set_script = 'set_ceilometer.py'
+    cm_restore_script = 'restore_ceilometer.py'
+
+    def __init__(self, conf, log):
+        super(ApexInstaller, self).__init__(conf, log)
+        self.client = SSHClient(self.conf.installer.ip,
+                                self.conf.installer.username,
+                                look_for_keys=True)
+        self.key_file = None
+        self.controllers = list()
+        self.controller_clients = list()
+        self.servers = list()
+
+    def setup(self):
+        self.log.info('Setup Apex installer start......')
+
+        self.get_ssh_key_from_installer()
+        self.get_controller_ips()
+        self.set_apply_patches()
+        self.setup_stunnel()
+
+    def cleanup(self):
+        self.restore_apply_patches()
+        for server in self.servers:
+            server.terminate()
+
+    def get_ssh_key_from_installer(self):
+        self.log.info('Get SSH keys from Apex installer......')
+
+        if self.key_file is not None:
+            self.log.info('Already have SSH keys from Apex installer......')
+            return self.key_file
+
+        self.client.scp('/home/stack/.ssh/id_rsa', './instack_key', method='get')
+        user = getpass.getuser()
+        uid = pwd.getpwnam(user).pw_uid
+        gid = grp.getgrnam(user).gr_gid
+        os.chown('./instack_key', uid, gid)
+        os.chmod('./instack_key', stat.S_IREAD)
+        current_dir = sys.path[0]
+        self.key_file = '{0}/{1}'.format(current_dir, 'instack_key')
+        return self.key_file
+
+    def get_controller_ips(self):
+        self.log.info('Get controller ips from Apex installer......')
+
+        command = "source stackrc; " \
+                  "nova list | grep ' overcloud-controller-[0-9] ' " \
+                  "| sed -e 's/^.*ctlplane=//' |awk '{print $1}'"
+        ret, controllers = self.client.ssh(command)
+        if ret:
+            raise Exception('Exec command to get controller ips in Apex installer failed'
+                            'ret=%s, output=%s' % (ret, controllers))
+        self.log.info('Get controller_ips:%s from Apex installer' % controllers)
+        self.controllers = controllers
+
+    def get_host_ip_from_hostname(self, hostname):
+        self.log.info('Get host ip from host name in Apex installer......')
+
+        hostname_in_undercloud = hostname.split('.')[0]
+
+        command = "source stackrc; nova show %s  | awk '/ ctlplane network /{print $5}'" % (hostname_in_undercloud)
+        ret, host_ip = self.client.ssh(command)
+        if ret:
+            raise Exception('Exec command to get host ip from hostname(%s) in Apex installer failed'
+                            'ret=%s, output=%s' % (hostname, ret, host_ip))
+        self.log.info('Get host_ip:%s from host_name:%s in Apex installer' % (host_ip, hostname))
+        return host_ip[0]
+
+    def setup_stunnel(self):
+        self.log.info('Setup ssh stunnel in controller nodes in Apex installer......')
+        for node_ip in self.controllers:
+            cmd = "sudo ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s %s@%s -R %s:localhost:%s sleep 600 > ssh_tunnel.%s.log 2>&1 < /dev/null &" \
+                  % (self.key_file, self.node_user_name, node_ip,
+                     self.conf.consumer.port, self.conf.consumer.port, node_ip)
+            server = subprocess.Popen(cmd, shell=True)
+            self.servers.append(server)
+            server.communicate()
+
+    def set_apply_patches(self):
+        self.log.info('Set apply patches start......')
+
+        for node_ip in self.controllers:
+            client = SSHClient(node_ip, self.node_user_name, key_filename=self.key_file)
+            self.controller_clients.append(client)
+            self._ceilometer_apply_patches(client, self.cm_set_script)
+
+    def restore_apply_patches(self):
+        self.log.info('restore apply patches start......')
+
+        for client in self.controller_clients:
+            self._ceilometer_apply_patches(client, self.cm_restore_script)
+
+    def _ceilometer_apply_patches(self, ssh_client, script_name):
+        installer_dir = os.path.dirname(os.path.realpath(__file__))
+        script_abs_path = '{0}/{1}/{2}'.format(installer_dir, 'common', script_name)
+
+        ssh_client.scp(script_abs_path, script_name)
+        cmd = 'sudo python %s' % script_name
+        ret, output = ssh_client.ssh(cmd)
+        if ret:
+            raise Exception('Do the ceilometer command in controller node failed....'
+                            'ret=%s, cmd=%s, output=%s' % (ret, cmd, output))
+        ssh_client.ssh('sudo systemctl restart openstack-ceilometer-notification.service')
+
diff --git a/tests/installer/base.py b/tests/installer/base.py
new file mode 100644 (file)
index 0000000..fa39816
--- /dev/null
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import abc
+import six
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseInstaller(object):
+    def __init__(self, conf, log):
+        self.conf = conf
+        self.log = log
+
+    @abc.abstractproperty
+    def node_user_name(self):
+        """user name for login to cloud node"""
+
+    @abc.abstractmethod
+    def get_ssh_key_from_installer(self):
+        pass
+
+    @abc.abstractmethod
+    def get_host_ip_from_hostname(self, hostname):
+        pass
+
+    @abc.abstractmethod
+    def setup(self):
+        pass
+
+    @abc.abstractmethod
+    def cleanup(self):
+        pass
diff --git a/tests/installer/common/congress.py b/tests/installer/common/congress.py
new file mode 100644 (file)
index 0000000..db882de
--- /dev/null
@@ -0,0 +1,47 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+def set_doctor_driver_conf(ssh_client, restart_cmd):
+    cg_set_cmd = '''#!/bin/bash
+co_conf=/etc/congress/congress.conf
+co_conf_bak=/etc/congress/congress.conf.bak
+co_entry="congress.datasources.doctor_driver.DoctorDriver"
+if sudo grep -e "^drivers.*$co_entry" $co_conf; then
+    echo "NOTE: congress is configured as we needed"
+else
+    echo "modify the congress config"
+    sudo cp $co_conf $co_conf_bak
+    sudo sed -i -e "/^drivers/s/$/,$co_entry/"  $co_conf
+    %s
+fi
+    ''' % (restart_cmd)
+
+    ret, output = ssh_client.ssh(cg_set_cmd)
+    if ret:
+        raise Exception('Do the congress command in controller node failed....'
+                        'ret=%s, cmd=%s, output=%s' % (ret, cg_set_cmd, output))
+
+
+def restore_doctor_driver_conf(ssh_client, restart_cmd):
+    cg_restore_cmd = '''#!/bin/bash
+co_conf=/etc/congress/congress.conf
+co_conf_bak=/etc/congress/congress.conf.bak
+if [ -e $co_conf_bak ]; then
+    echo "restore the congress config"
+    sudo cp $co_conf_bak $co_conf
+    sudo rm $co_conf_bak
+    %s
+else
+    echo "Do not need to restore the congress config"
+fi
+    ''' % (restart_cmd)
+
+    ret, output = ssh_client.ssh(cg_restore_cmd)
+    if ret:
+        raise Exception('Do the congress command in controller node failed....'
+                        'ret=%s, cmd=%s, output=%s' % (ret, cg_restore_cmd, output))
diff --git a/tests/installer/common/restore_ceilometer.py b/tests/installer/common/restore_ceilometer.py
new file mode 100644 (file)
index 0000000..d25b9ed
--- /dev/null
@@ -0,0 +1,27 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+import shutil
+
+ep_file = '/etc/ceilometer/event_pipeline.yaml'
+ep_file_bak = '/etc/ceilometer/event_pipeline.yaml.bak'
+
+
+def restore_ep_config():
+
+    if not os.path.isfile(ep_file_bak):
+        print('Bak_file:%s does not exist.' % ep_file_bak)
+    else:
+        print('restore')
+        shutil.copyfile(ep_file_bak, ep_file)
+        os.remove(ep_file_bak)
+    return
+
+
+restore_ep_config()
diff --git a/tests/installer/common/set_ceilometer.py b/tests/installer/common/set_ceilometer.py
new file mode 100644 (file)
index 0000000..f5946cb
--- /dev/null
@@ -0,0 +1,44 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+import shutil
+import yaml
+
+ep_file = '/etc/ceilometer/event_pipeline.yaml'
+ep_file_bak = '/etc/ceilometer/event_pipeline.yaml.bak'
+event_notifier_topic = 'notifier://?topic=alarm.all'
+
+
+def set_notifier_topic():
+    config_modified = False
+
+    if not os.path.isfile(ep_file):
+        raise Exception("File doesn't exist: %s." % ep_file)
+
+    with open(ep_file, 'r') as file:
+        config = yaml.safe_load(file)
+
+    sinks = config['sinks']
+    for sink in sinks:
+        if sink['name'] == 'event_sink':
+            publishers = sink['publishers']
+            if event_notifier_topic not in publishers:
+                print('Add event notifier in ceilometer')
+                publishers.append(event_notifier_topic)
+                config_modified = True
+            else:
+                print('NOTE: event notifier is configured in ceilometer as we needed')
+
+    if config_modified:
+        shutil.copyfile(ep_file, ep_file_bak)
+        with open(ep_file, 'w+') as file:
+            file.write(yaml.safe_dump(config))
+
+
+set_notifier_topic()
diff --git a/tests/installer/local.py b/tests/installer/local.py
new file mode 100644 (file)
index 0000000..dcdf41e
--- /dev/null
@@ -0,0 +1,109 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+import shutil
+import subprocess
+
+from installer.base import BaseInstaller
+from common.utils import load_json_file
+from common.utils import write_json_file
+
+
+class LocalInstaller(BaseInstaller):
+    node_user_name = 'root'
+
+    nova_policy_file = '/etc/nova/policy.json'
+    nova_policy_file_backup = '%s%s' % (nova_policy_file, '.bak')
+
+    def __init__(self, conf, log):
+        super(LocalInstaller, self).__init__(conf, log)
+        self.policy_modified = False
+        self.add_policy_file = False
+
+    def setup(self):
+        self.get_ssh_key_from_installer()
+        self.set_apply_patches()
+
+    def cleanup(self):
+        self.restore_apply_patches()
+
+    def get_ssh_key_from_installer(self):
+        self.log.info('Assuming SSH keys already exchanged with computer for local installer type')
+        return None
+
+    def get_host_ip_from_hostname(self, hostname):
+        self.log.info('Get host ip from host name in local installer......')
+
+        cmd = "getent hosts %s | awk '{ print $1 }'" % (hostname)
+        server = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+        stdout, stderr = server.communicate()
+        host_ip = stdout.strip()
+
+        self.log.info('Get host_ip:%s from host_name:%s in local installer' % (host_ip, hostname))
+        return host_ip
+
+    def set_apply_patches(self):
+        self._set_nova_policy()
+
+    def restore_apply_patches(self):
+        self._restore_nova_policy()
+
+    def _set_nova_policy(self):
+        host_status_policy = 'os_compute_api:servers:show:host_status'
+        host_status_rule = 'rule:admin_or_owner'
+        policy_data = {
+            'context_is_admin': 'role:admin',
+            'owner': 'user_id:%(user_id)s',
+            'admin_or_owner': 'rule:context_is_admin or rule:owner',
+            host_status_policy: host_status_rule
+        }
+
+        if os.path.isfile(self.nova_policy_file):
+            data = load_json_file(self.nova_policy_file)
+            if host_status_policy in data:
+                rule_origion = data[host_status_policy]
+                if host_status_rule == rule_origion:
+                    self.log.info('Do not need to modify nova policy.')
+                    self.policy_modified = False
+                else:
+                    # update the host_status_policy
+                    data[host_status_policy] = host_status_rule
+                    self.policy_modified = True
+            else:
+                # add the host_status_policy, if the admin_or_owner is not
+                # defined, add it also
+                for policy, rule in policy_data.items():
+                    if policy not in data:
+                        data[policy] = rule
+                self.policy_modified = True
+            if self.policy_modified:
+                self.log.info('Nova policy is Modified.')
+                shutil.copyfile(self.nova_policy_file,
+                                self.nova_policy_file_backup)
+        else:
+            # file does not exit, create a new one and add the policy
+            self.log.info('Nova policy file not exist. Creating a new one')
+            data = policy_data
+            self.add_policy_file = True
+
+        if self.policy_modified or self.add_policy_file:
+            write_json_file(self.nova_policy_file, data)
+            os.system('screen -S stack -p n-api -X stuff "^C^M^[[A^M"')
+
+    def _restore_nova_policy(self):
+        if self.policy_modified:
+            shutil.copyfile(self.nova_policy_file_backup, self.nova_policy_file)
+            os.remove(self.nova_policy_file_backup)
+        elif self.add_policy_file:
+            os.remove(self.nova_policy_file)
+
+        if self.add_policy_file or self.policy_modified:
+            os.system('screen -S stack -p n-api -X stuff "^C^M^[[A^M"')
+            self.add_policy_file = False
+            self.policy_modified = False
index 3d94e1c..f7b9624 100644 (file)
@@ -44,13 +44,14 @@ function installer_apply_patches {
             fi
 
             co_conf=/etc/congress/congress.conf
+            co_conf_bak=/etc/congress/congress.conf.bak
             co_entry="congress.datasources.doctor_driver.DoctorDriver"
             if sudo grep -e "^drivers.*$co_entry" $co_conf; then
                 echo "NOTE: congress is configured as we needed"
             else
                 echo "modify the congress config"
-                sudo sed -i -e "/^drivers/s/$/,$co_entry    # added by doctor script/" \
-                    $co_conf
+                sudo cp $co_conf $co_conf_bak
+                sudo sed -i -e "/^drivers/s/$/,$co_entry/"  $co_conf
                 sudo systemctl restart openstack-congress-server.service
             fi
             ' > installer_apply_patches_$node.log 2>&1
@@ -90,10 +91,10 @@ function installer_revert_patches {
             date
 
             co_conf=/etc/congress/congress.conf
-            co_entry="congress.datasources.doctor_driver.DoctorDriver"
-            if sudo grep -q -e "# added by doctor script" $co_conf; then
-                echo "modify the congress config"
-                sudo sed -i -e "/^drivers/s/^\(.*\),$co_entry    # added by doctor script/\1/" $co_conf
+            co_conf_bak=/etc/congress/congress.conf.bak
+            if [ -e $co_conf_bak ]; then
+                echo "restore the congress config"
+                sudo mv $co_conf_bak $co_conf
                 sudo systemctl restart openstack-congress-server.service
             fi
 
diff --git a/tests/lib/monitor b/tests/lib/monitor
new file mode 100644 (file)
index 0000000..6b804ec
--- /dev/null
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+MONITOR_TYPE=${MONITOR_TYPE:-sample}
+
+function is_monitor_supported {
+    local monitor="$1"
+    [[ -f $TOP_DIR/lib/monitors/$monitor/$monitor ]]
+}
+
+function is_monitor {
+    local monitor="$1"
+    [[ $monitor == $MONITOR_TYPE ]]
+}
+
+function start_monitor {
+    start_monitor_$MONITOR_TYPE
+}
+
+function stop_monitor {
+    stop_monitor_$MONITOR_TYPE
+}
+
+function cleanup_monitor {
+    cleanup_monitor_$MONITOR_TYPE
+}
+
+if ! is_monitor_supported $MONITOR_TYPE; then
+    die $LINENO "MONITOR_TYPE=$MONITOR_TYPE is not supported."
+fi
+
+source $TOP_DIR/lib/monitors/$MONITOR_TYPE/$MONITOR_TYPE
diff --git a/tests/lib/monitors/collectd/collectd b/tests/lib/monitors/collectd/collectd
new file mode 100644 (file)
index 0000000..f509665
--- /dev/null
@@ -0,0 +1,101 @@
+#!/bin/bash
+
+function start_monitor_collectd {
+    ## CONTROL_IP is the IP of primary interface of control node i.e.
+    ## eth0, eno1. It is used by collectd monitor to communicate with
+    ## sample inspector.
+    ## @TODO (umar) see if mgmt IP of control is a better option. Also
+    ## primary interface may not be the right option
+    CONTROL_IP="$(ip a | sed -En 's/127.0.0.1//;s/.*inet (addr:)?(([0-9]*\.){3}[0-9]*).*/\2/p' | sed -n 1p)"
+    #CONTROL_IP=192.168.98.6
+
+    echo "
+Hostname \"$COMPUTE_HOST\"
+FQDNLookup false
+Interval 1
+MaxReadInterval 2
+
+<LoadPlugin python>
+    Globals true
+</LoadPlugin>
+LoadPlugin ovs_events
+LoadPlugin logfile
+
+<Plugin logfile>
+  File \"/var/log/collectd.log\"
+  Timestamp true
+  LogLevel \"info\"
+</Plugin>
+
+<Plugin python>
+    ModulePath \"/home/$COMPUTE_USER\"
+    LogTraces true
+    Interactive false
+    Import \"collectd_plugin\"
+    <Module \"collectd_plugin\">
+        control_ip \"$CONTROL_IP\"
+        compute_ip \"$COMPUTE_IP\"
+        compute_host \"$COMPUTE_HOST\"
+        compute_user \"$COMPUTE_USER\"
+        inspector_type \"$INSPECTOR_TYPE\"
+        os_auth_url \"$OS_AUTH_URL\"
+        os_username \"$OS_USERNAME\"
+        os_password \"$OS_PASSWORD\"
+        os_project_name \"$OS_PROJECT_NAME\"
+        os_user_domain_name \"$OS_USER_DOMAIN_NAME\"
+        os_user_domain_id \"$OS_USER_DOMAIN_ID\"
+        os_project_domain_name \"$OS_PROJECT_DOMAIN_NAME\"
+        os_project_domain_id \"$OS_PROJECT_DOMAIN_ID\"
+    </Module>
+</Plugin>
+
+<Plugin ovs_events>
+    Port 6640
+    Socket \"/var/run/openvswitch/db.sock\"
+    Interfaces \"@INTERFACE_NAME@\"
+    SendNotification true
+    DispatchValues false
+</Plugin>
+
+" > $TOP_DIR/lib/monitors/collectd.conf
+
+    scp $ssh_opts_cpu $TOP_DIR/lib/monitors/collectd.conf $COMPUTE_USER@$COMPUTE_IP:
+    ## @TODO (umar) Always assuming that the interface is assigned an IP if
+    ## interface name is not provided. See if there is a better approach
+    ssh $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP" "
+        if [ -n \"$INTERFACE_NAME\" ]; then
+            dev=$INTERFACE_NAME
+        else
+            dev=\$(sudo ip a | awk '/ $COMPUTE_IP\//{print \$NF}')
+        fi
+        sed -i -e \"s/@INTERFACE_NAME@/\$dev/\" collectd.conf
+        collectd_conf=/opt/collectd/etc/collectd.conf
+        if [ -e \$collectd_conf ]; then
+            sudo cp \$collectd_conf \${collectd_conf}-doctor-saved
+        else
+            sudo touch \${collectd_conf}-doctor-created
+        fi
+        sudo mv collectd.conf /opt/collectd/etc/collectd.conf"
+
+    scp $ssh_opts_cpu $TOP_DIR/lib/monitors/collectd/collectd_plugin.py $COMPUTE_USER@$COMPUTE_IP:collectd_plugin.py
+    ssh $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP" "sudo pkill collectd
+                                                   sudo /opt/collectd/sbin/collectd"
+}
+
+function stop_monitor_collectd {
+    ssh $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP" 'sudo pkill collectd'
+}
+
+function cleanup_monitor_collectd {
+    ssh $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP" "
+        collectd_conf=/opt/collectd/etc/collectd.conf
+        if [ -e \"\${collectd_conf}-doctor-created\" ]; then
+            sudo rm \"\${collectd_conf}-doctor-created\"
+            sudo rm \$collectd_conf
+        elif [ -e \"\${collectd_conf}-doctor-saved\" ]; then
+            sudo cp -f \"\${collectd_conf}-doctor-saved\" \$collectd_conf
+            sudo rm \"\${collectd_conf}-doctor-saved\"
+        fi"
+
+    rm $TOP_DIR/lib/monitors/collectd.conf
+}
diff --git a/tests/lib/monitors/collectd/collectd_plugin.py b/tests/lib/monitors/collectd/collectd_plugin.py
new file mode 100644 (file)
index 0000000..70fcf26
--- /dev/null
@@ -0,0 +1,167 @@
+##############################################################################
+# Copyright (c) 2017 NEC Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import collectd
+import sys
+from netifaces import interfaces, ifaddresses, AF_INET
+from datetime import datetime
+import json
+import requests
+import time
+from requests.exceptions import ConnectionError
+
+from keystoneauth1 import loading
+from keystoneauth1 import session
+from congressclient.v1 import client
+
+
+def write_debug(str_write, write_type, compute_user):
+    file_name = ('/home/%s/monitor.log' % compute_user)
+    file_tmp = open(file_name, write_type)
+    file_tmp.write( "%s" % str_write)
+    file_tmp.close()
+
+
+class DoctorMonitorCollectd(object):
+    def __init__(self):
+        self.control_ip = ''
+        self.compute_user = ''
+        self.compute_ip = ''
+        self.host_name = ''
+        self.inspector_type = ''
+        self.inspector_url = ''
+        self.os_auth_url = ''
+        self.os_username = ''
+        self.os_password = ''
+        self.os_project_name = ''
+        self.os_user_domain_name = ''
+        self.os_user_domain_id = ''
+        self.os_project_domain_name = ''
+        self.os_project_domain_id = ''
+        self.sess = ''
+        self.auth = ''
+        self.inspector_notified = 0
+        self.start_notifications = 0
+        self.monitor_type = 'sample'
+
+    def config_func(self, config):
+        for node in config.children:
+            key = node.key.lower()
+            val = node.values[0]
+
+            if key == 'compute_host':
+                self.host_name = val
+            elif key == 'control_ip':
+                self.control_ip = val
+            elif key == 'compute_ip':
+                self.compute_ip = val
+            elif key == 'compute_user':
+                self.compute_user = val
+            elif key == 'inspector_type':
+                self.inspector_type = val
+            elif key == 'os_auth_url':
+                self.os_auth_url = val
+            elif key == 'os_username':
+                self.os_username = val
+            elif key == 'os_password':
+                self.os_password = val
+            elif key == 'os_project_name':
+                self.os_project_name = val
+            elif key == 'os_user_domain_name':
+                self.os_user_domain_name = val
+            elif key == 'os_user_domain_id':
+                self.os_user_domain_id = val
+            elif key == 'os_project_domain_name':
+                self.os_project_domain_name = val
+            elif key == 'os_project_domain_id':
+                self.os_project_domain_id = val
+            else:
+                collectd.info('Unknown config key "%s"' % key)
+
+    def init_collectd(self):
+        write_debug("Compute node collectd monitor start at %s\n\n" % datetime.now().isoformat(), "w", self.compute_user)
+
+        if self.inspector_type == 'sample':
+            self.inspector_url = ('http://%s:12345/events' % self.control_ip)
+        elif self.inspector_type == 'congress':
+            loader = loading.get_plugin_loader('password')
+            self.auth = loader.load_from_options(auth_url=self.os_auth_url,
+                        username=self.os_username,
+                        password=self.os_password,
+                        project_name=self.os_project_name,
+                        user_domain_name=self.os_user_domain_name,
+                        user_domain_id=self.os_user_domain_id,
+                        project_domain_name=self.os_project_domain_name,
+                        project_domain_id=self.os_project_domain_id)
+            self.sess=session.Session(auth=self.auth)
+            congress = client.Client(session=self.sess, service_type='policy')
+            ds = congress.list_datasources()['results']
+            doctor_ds = next((item for item in ds if item['driver'] == 'doctor'),
+                         None)
+
+            congress_endpoint = congress.httpclient.get_endpoint(auth=self.auth)
+            self.inspector_url = ('%s/v1/data-sources/%s/tables/events/rows' %
+                              (congress_endpoint, doctor_ds['id']))
+        else:
+            sys.exit()
+        self.start_notifications = 1
+
+
+    def notify_inspector(self):
+        event_type = "compute.host.down"
+        payload = [
+            {
+                 'id': ("monitor_%s_id1" % self.monitor_type),
+                 'time': datetime.now().isoformat(),
+                 'type': event_type,
+                 'details': {
+                     'hostname': self.host_name,
+                     'status': 'down',
+                     'monitor': ("monitor_%s" % self.monitor_type),
+                     'monitor_event_id': ("monitor_%s_event1" % self.monitor_type)
+                 },
+             },
+        ]
+        data = json.dumps(payload)
+        self.inspector_notified = 1
+
+        if self.inspector_type == 'sample':
+            headers = {'content-type': 'application/json'}
+            try:
+                requests.post(self.inspector_url, data=data, headers=headers)
+            except ConnectionError as err:
+                print err
+        elif self.inspector_type == 'congress':
+            # TODO(umar) enhance for token expiry case
+            headers = {
+                'Content-Type': 'application/json',
+                'Accept': 'application/json',
+                'X-Auth-Token': self.sess.get_token()
+            }
+            requests.put(self.inspector_url, data=data, headers=headers)
+
+
+    def handle_notif(self, notification, data=None):
+        if (notification.severity == collectd.NOTIF_FAILURE or
+            notification.severity == collectd.NOTIF_WARNING):
+            if (self.start_notifications == 1 and self.inspector_notified == 0):
+                write_debug("Received down notification: doctor monitor detected at %s\n" % time.time(), "a", self.compute_user)
+                self.notify_inspector()
+
+        elif notification.severity == collectd.NOTIF_OKAY:
+            collectd.info("Interface status: UP again %s\n" % time.time())
+        else:
+            collectd.info("Unknown notification severity %s\n" % notification.severity)
+
+
+monitor = DoctorMonitorCollectd()
+
+collectd.register_config(monitor.config_func)
+collectd.register_init(monitor.init_collectd)
+collectd.register_notification(monitor.handle_notif)
diff --git a/tests/lib/monitors/sample/sample b/tests/lib/monitors/sample/sample
new file mode 100644 (file)
index 0000000..1d31033
--- /dev/null
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+function start_monitor_sample {
+    cp $TOP_DIR/lib/monitors/sample/monitor.py $TOP_DIR/monitor.py
+    pgrep -f "python monitor.py" && return 0
+    sudo -E python monitor.py "$COMPUTE_HOST" "$COMPUTE_IP" "$INSPECTOR_TYPE" \
+        > monitor.log 2>&1 &
+}
+
+function stop_monitor_sample {
+    pgrep -f "python monitor.py" || return 0
+    sudo kill $(pgrep -f "python monitor.py")
+}
+
+function cleanup_monitor_sample {
+    rm monitor.py
+    return
+}
index 72043ab..b7a49fd 100644 (file)
@@ -36,12 +36,11 @@ class Logger(object):
             ch.setLevel(logging.INFO)
         self.logger.addHandler(ch)
 
-        file_handler = logging.FileHandler('%s.log' % logger_name)
+        filename = '%s.log' % logger_name
+        file_handler = logging.FileHandler(filename, mode='w')
         file_handler.setFormatter(formatter)
         file_handler.setLevel(logging.DEBUG)
         self.logger.addHandler(file_handler)
 
-
     def getLogger(self):
         return self.logger
-
index 7714d7d..edff16f 100644 (file)
@@ -8,16 +8,30 @@
 ##############################################################################
 import os
 from os.path import isfile, join
+import random
 import sys
+import time
 
 from alarm import Alarm
+from common.constants import Host
+from common.utils import match_rep_in_file
 import config
+from consumer import get_consumer
+from identity_auth import get_identity_auth
+from identity_auth import get_session
 from image import Image
 from instance import Instance
 from inspector import get_inspector
+from installer import get_installer
 import logger as doctor_log
-from user import User
 from network import Network
+from monitor import get_monitor
+from os_clients import nova_client
+from profiler_poc import main as profiler_main
+from scenario.common import calculate_notification_time
+from scenario.network_failure import NetworkFault
+from user import User
+
 
 LOG = doctor_log.Logger('doctor').getLogger()
 
@@ -31,10 +45,21 @@ class DoctorTest(object):
         self.network = Network(self.conf, LOG)
         self.instance = Instance(self.conf, LOG)
         self.alarm = Alarm(self.conf, LOG)
+        self.installer = get_installer(self.conf, LOG)
         self.inspector = get_inspector(self.conf, LOG)
+        self.monitor = get_monitor(self.conf,
+                                   self.inspector.get_inspector_url(),
+                                   LOG)
+        self.consumer = get_consumer(self.conf, LOG)
+        self.fault = NetworkFault(self.conf, self.installer, LOG)
+        auth = get_identity_auth(project=self.conf.doctor_project)
+        self.nova = nova_client(self.conf.nova_version,
+                                get_session(auth=auth))
+        self.down_host = None
 
     def setup(self):
         # prepare the cloud env
+        self.installer.setup()
 
         # preparing VM image...
         self.image.create()
@@ -54,30 +79,124 @@ class DoctorTest(object):
         # starting doctor sample components...
         self.inspector.start()
 
+        self.down_host = self.get_host_info_for_random_vm()
+        self.monitor.start(self.down_host)
+
+        self.consumer.start()
+
     def run(self):
         """run doctor test"""
         try:
             LOG.info('doctor test starting.......')
 
+            # prepare test env
             self.setup()
 
+            # wait for aodh alarms are updated in caches for event evaluator,
+            # sleep time should be larger than event_alarm_cache_ttl(default 60)
+            time.sleep(60)
+
             # injecting host failure...
+            # NOTE (umar) add INTERFACE_NAME logic to host injection
 
-            # verify the test results
+            self.fault.start(self.down_host)
+            time.sleep(10)
 
+            # verify the test results
+            # NOTE (umar) copy remote monitor.log file when monitor=collectd
+            self.check_host_status(self.down_host.name, 'down')
+
+            notification_time = calculate_notification_time()
+            if notification_time < 1 and notification_time > 0:
+                LOG.info('doctor test successfully, notification_time=%s' % notification_time)
+            else:
+                LOG.error('doctor test failed, notification_time=%s' % notification_time)
+                sys.exit(1)
+
+            if self.conf.profiler_type:
+                LOG.info('doctor test begin to run profile.......')
+                self.collect_logs()
+                self.run_profiler()
         except Exception as e:
             LOG.error('doctor test failed, Exception=%s' % e)
             sys.exit(1)
         finally:
             self.cleanup()
 
+    def get_host_info_for_random_vm(self):
+        num = random.randint(0, self.conf.instance_count - 1)
+        vm_name = "%s%d" % (self.conf.instance_basename, num)
+
+        servers = \
+            {getattr(server, 'name'): server
+             for server in self.nova.servers.list()}
+        server = servers.get(vm_name)
+        if not server:
+            raise \
+                Exception('Can not find instance: vm_name(%s)' % vm_name)
+        host_name = server.__dict__.get('OS-EXT-SRV-ATTR:hypervisor_hostname')
+        host_ip = self.installer.get_host_ip_from_hostname(host_name)
+
+        LOG.info('Get host info(name:%s, ip:%s) which vm(%s) launched at'
+                 % (host_name, host_ip, vm_name))
+        return Host(host_name, host_ip)
+
+    def check_host_status(self, hostname, state):
+        service = self.nova.services.list(host=hostname, binary='nova-compute')
+        host_state = service[0].__dict__.get('state')
+        assert host_state == state
+
+    def unset_forced_down_hosts(self):
+        if self.down_host:
+            self.nova.services.force_down(self.down_host.name, 'nova-compute', False)
+            time.sleep(2)
+            self.check_host_status(self.down_host.name, 'up')
+
+    def collect_logs(self):
+        self.fault.get_disable_network_log()
+
+    def run_profiler(self):
+
+        log_file = '{0}/{1}'.format(sys.path[0], 'disable_network.log')
+        reg = '(?<=doctor set link down at )\d+.\d+'
+        linkdown = float(match_rep_in_file(reg, log_file).group(0))
+
+        log_file = '{0}/{1}'.format(sys.path[0], 'doctor.log')
+        reg = '(.* doctor mark vm.* error at )(\d+.\d+)'
+        vmdown = float(match_rep_in_file(reg, log_file).group(2))
+
+        reg = '(?<=doctor mark host.* down at )\d+.\d+'
+        hostdown = float(match_rep_in_file(reg, log_file).group(2))
+
+        reg = '(?<=doctor monitor detected at )\d+.\d+'
+        detected = float(match_rep_in_file(reg, log_file).group(0))
+
+        reg = '(?<=doctor consumer notified at )\d+.\d+'
+        notified = float(match_rep_in_file(reg, log_file).group(0))
+
+        # TODO(yujunz) check the actual delay to verify time sync status
+        # expected ~1s delay from $trigger to $linkdown
+        relative_start = linkdown
+        os.environ['DOCTOR_PROFILER_T00'] = str(int((linkdown - relative_start)*1000))
+        os.environ['DOCTOR_PROFILER_T01'] = str(int((detected - relative_start) * 1000))
+        os.environ['DOCTOR_PROFILER_T03'] = str(int((vmdown - relative_start) * 1000))
+        os.environ['DOCTOR_PROFILER_T04'] = str(int((hostdown - relative_start) * 1000))
+        os.environ['DOCTOR_PROFILER_T09'] = str(int((notified - relative_start) * 1000))
+
+        profiler_main(log=LOG)
+
     def cleanup(self):
+        self.unset_forced_down_hosts()
+        self.inspector.stop()
+        self.monitor.stop()
+        self.consumer.stop()
+        self.installer.cleanup()
         self.alarm.delete()
         self.instance.delete()
         self.network.delete()
         self.image.delete()
+        self.fault.cleanup()
         self.user.delete()
-        self.inspector.stop()
 
 
 def main():
diff --git a/tests/monitor/__init__.py b/tests/monitor/__init__.py
new file mode 100644 (file)
index 0000000..e268907
--- /dev/null
@@ -0,0 +1,29 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from oslo_config import cfg
+from oslo_utils import importutils
+
+OPTS = [
+    cfg.StrOpt('type',
+               default='sample',
+               choices=['sample', 'collectd'],
+               help='the type of doctor monitor component',
+               required=True),
+]
+
+
+_monitor_name_class_mapping = {
+    'sample': 'monitor.sample.SampleMonitor',
+    'collectd': 'monitor.collectd.CollectdMonitor'
+}
+
+def get_monitor(conf, inspector_url, log):
+    monitor_class = _monitor_name_class_mapping.get(conf.monitor.type)
+    return importutils.import_object(monitor_class, conf,
+                                     inspector_url, log)
diff --git a/tests/monitor/base.py b/tests/monitor/base.py
new file mode 100644 (file)
index 0000000..119c8a1
--- /dev/null
@@ -0,0 +1,27 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import abc
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseMonitor(object):
+    """Monitor computer fault and report error to the inspector"""
+    def __init__(self, conf, inspector_url, log):
+        self.conf = conf
+        self.log = log
+        self.inspector_url = inspector_url
+
+    @abc.abstractmethod
+    def start(self, host):
+        pass
+
+    @abc.abstractmethod
+    def stop(self):
+        pass
diff --git a/tests/monitor/collectd.py b/tests/monitor/collectd.py
new file mode 100644 (file)
index 0000000..e2a800e
--- /dev/null
@@ -0,0 +1,138 @@
+##############################################################################
+# Copyright (c) 2017 NEC Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import socket
+import getpass
+import sys
+
+from monitor.base import BaseMonitor
+
+
+class CollectdMonitor(BaseMonitor):
+    def __init__(self, conf, inspector_url, log):
+        super(CollectdMonitor, self).__init__(conf, inspector_url, log)
+        self.top_dir = os.path.dirname(sys.path[0])
+        tmp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        tmp_sock.connect(("8.8.8.8", 80))
+
+        ## control_ip is the IP of primary interface of control node i.e.
+        ## eth0, eno1. It is used by collectd monitor to communicate with
+        ## sample inspector.
+        ## TODO (umar) see if mgmt IP of control is a better option. Also
+        ## primary interface may not be the right option
+        self.control_ip = tmp_sock.getsockname()[0]
+        self.compute_user = getpass.getuser()
+        self.interface_name = os.environ.get('INTERFACE_NAME') or ''
+        self.inspector_type = os.environ.get('INSPECTOR_TYPE', 'sample')
+        self.auth_url = os.environ.get('OS_AUTH_URL')
+        self.username = os.environ.get('OS_USERNAME')
+        self.password = os.environ.get('OS_PASSWORD')
+        self.project_name = os.environ.get('OS_PROJECT_NAME')
+        self.user_domain_name = os.environ.get('OS_USER_DOMAIN_NAME') or 'default'
+        self.user_domain_id = os.environ.get('OS_USER_DOMAIN_ID')
+        self.project_domain_name = os.environ.get('OS_PROJECT_DOMAIN_NAME') or 'default'
+        self.project_domain_id = os.environ.get('OS_PROJECT_DOMAIN_ID')
+        self.ssh_opts_cpu = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no'
+
+    def start(self, host):
+        self.log.info("Collectd monitor start.........")
+        self.compute_host = host.name
+        self.compute_ip = host.ip
+        f = open("%s/tests/collectd.conf" % self.top_dir, 'w')
+        collectd_conf_file = """ 
+Hostname %s
+FQDNLookup false
+Interval 1
+MaxReadInterval 2
+
+<LoadPlugin python>
+Globals true
+</LoadPlugin>
+LoadPlugin ovs_events
+LoadPlugin logfile
+
+<Plugin logfile>
+    File \"/var/log/collectd.log\"
+    Timestamp true
+    LogLevel \"info\"
+</Plugin>
+
+<Plugin python>
+    ModulePath \"/home/%s\"
+    LogTraces true
+    Interactive false
+    Import \"collectd_plugin\"
+    <Module \"collectd_plugin\">
+        control_ip \"%s\"
+        compute_ip \"%s\"
+        compute_host \"%s\"
+        compute_user \"%s\"
+        inspector_type \"%s\"
+        os_auth_url \"%s\"
+        os_username \"%s\"
+        os_password \"%s\"
+        os_project_name \"%s\"
+        os_user_domain_name \"%s\"
+        os_user_domain_id \"%s\"
+        os_project_domain_name \"%s\"
+        os_project_domain_id \"%s\"
+    </Module>
+</Plugin>
+
+<Plugin ovs_events>
+    Port 6640
+    Socket \"/var/run/openvswitch/db.sock\"
+    Interfaces \"@INTERFACE_NAME@\"
+    SendNotification true
+    DispatchValues false
+</Plugin>
+            """ % (self.compute_host, self.compute_user, self.control_ip, self.compute_ip, self.compute_host, self.compute_user,
+                   self.inspector_type, self.auth_url, self.username, self.password, self.project_name, self.user_domain_name,
+                   self.user_domain_id, self.project_domain_name, self.project_domain_id)
+        f.write(collectd_conf_file)
+        f.close()
+
+        os.system(" scp %s %s/tests/collectd.conf %s@%s: " % (self.ssh_opts_cpu, self.top_dir, self.compute_user, self.compute_ip))
+        self.log.info("after first scp")
+        ## @TODO (umar) Always assuming that the interface is assigned an IP if
+        ## interface name is not provided. See if there is a better approach
+        os.system(""" ssh %s %s@%s \"if [ -n \"%s\" ]; then
+            dev=%s
+        else
+            dev=\$(sudo ip a | awk '/ %s\//{print \$NF}')
+        fi
+        sed -i -e \"s/@INTERFACE_NAME@/\$dev/\" collectd.conf
+        collectd_conf=/opt/collectd/etc/collectd.conf
+        if [ -e \$collectd_conf ]; then
+            sudo cp \$collectd_conf \${collectd_conf}-doctor-saved
+        else
+            sudo touch \${collectd_conf}-doctor-created
+        fi
+        sudo mv collectd.conf /opt/collectd/etc/collectd.conf\" """ % (self.ssh_opts_cpu, self.compute_user, self.compute_ip, self.interface_name, self.interface_name, self.compute_ip))
+        self.log.info("after first ssh")
+        os.system(" scp  %s %s/tests/lib/monitors/collectd/collectd_plugin.py %s@%s:collectd_plugin.py " % (self.ssh_opts_cpu, self.top_dir, self.compute_user, self.compute_ip))
+        self.log.info("after sec scp")
+        os.system(" ssh %s %s@%s \"sudo pkill collectd; sudo /opt/collectd/sbin/collectd\" " % (self.ssh_opts_cpu, self.compute_user, self.compute_ip))
+        self.log.info("after sec ssh")
+
+    def stop(self):
+        os.system(" ssh %s %s@%s \"sudo pkill collectd\" " % (self.ssh_opts_cpu, self.compute_user, self.compute_ip))
+
+    def cleanup(self):
+        os.system(""" ssh %s %s@%s \"
+            collectd_conf=/opt/collectd/etc/collectd.conf
+            if [ -e \"\${collectd_conf}-doctor-created\" ]; then
+                sudo rm \"\${collectd_conf}-doctor-created\"
+                sudo rm \$collectd_conf
+            elif [ -e \"\${collectd_conf}-doctor-saved\" ]; then
+                sudo cp -f \"\${collectd_conf}-doctor-saved\" \$collectd_conf
+                sudo rm \"\${collectd_conf}-doctor-saved\"
+            fi\" """ % (self.ssh_opts_cpu, self.compute_user, self.compute_ip))
+        os.remove("%s/tests/collectd.conf" % self.top_dir)
diff --git a/tests/monitor/sample.py b/tests/monitor/sample.py
new file mode 100644 (file)
index 0000000..9ac1bcc
--- /dev/null
@@ -0,0 +1,106 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from datetime import datetime
+import json
+import requests
+import socket
+from threading import Thread
+import time
+
+from identity_auth import get_session
+from monitor.base import BaseMonitor
+
+
+class SampleMonitor(BaseMonitor):
+    event_type = "compute.host.down"
+
+    def __init__(self, conf, inspector_url, log):
+        super(SampleMonitor, self).__init__(conf, inspector_url, log)
+        self.session = get_session()
+        self.pinger = None
+
+    def start(self, host):
+        self.log.info('sample monitor start......')
+        self.pinger = Pinger(host.name, host.ip, self, self.log)
+        self.pinger.start()
+
+    def stop(self):
+        self.log.info('sample monitor stop......')
+        if self.pinger is not None:
+            self.pinger.stop()
+            self.pinger.join()
+
+    def report_error(self, hostname):
+        self.log.info('sample monitor report error......')
+        data = [
+            {
+                'id': 'monitor_sample_id1',
+                'time': datetime.now().isoformat(),
+                'type': self.event_type,
+                'details': {
+                    'hostname': hostname,
+                    'status': 'down',
+                    'monitor': 'monitor_sample',
+                    'monitor_event_id': 'monitor_sample_event1'
+                },
+            },
+        ]
+
+        auth_token = self.session.get_token() if \
+                     self.conf.inspector.type != 'sample' else None
+        headers = {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json',
+            'X-Auth-Token': auth_token,
+        }
+
+        url = '%s%s' % (self.inspector_url, 'events') \
+            if self.inspector_url.endswith('/') else \
+            '%s%s' % (self.inspector_url, '/events')
+        requests.put(url, data=json.dumps(data), headers=headers)
+
+
+class Pinger(Thread):
+    interval = 0.1  # second
+    timeout = 0.1   # second
+    ICMP_ECHO_MESSAGE = bytes([0x08, 0x00, 0xf7, 0xff, 0x00, 0x00, 0x00, 0x00])
+
+    def __init__(self, host_name, host_ip, monitor, log):
+        Thread.__init__(self)
+        self.monitor = monitor
+        self.hostname = host_name
+        self.ip_addr = host_ip or socket.gethostbyname(self.hostname)
+        self.log = log
+        self._stopped = False
+
+    def run(self):
+        self.log.info("Starting Pinger host_name(%s), host_ip(%s)"
+                      % (self.hostname, self.ip_addr))
+
+        sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
+                             socket.IPPROTO_ICMP)
+        sock.settimeout(self.timeout)
+        while True:
+            if self._stopped:
+                return
+            try:
+                sock.sendto(self.ICMP_ECHO_MESSAGE, (self.ip_addr, 0))
+                sock.recv(4096)
+            except socket.timeout:
+                self.log.info("doctor monitor detected at %s" % time.time())
+                self.monitor.report_error(self.hostname)
+                self.log.info("ping timeout, quit monitoring...")
+                self._stopped = True
+                return
+            time.sleep(self.interval)
+
+    def stop(self):
+        self.log.info("Stopping Pinger host_name(%s), host_ip(%s)"
+                      % (self.hostname, self.ip_addr))
+        self._stopped = True
index 2699930..44fa3aa 100644 (file)
@@ -9,6 +9,7 @@
 from oslo_config import cfg\r
 \r
 import aodhclient.client as aodhclient\r
+from congressclient.v1 import client as congressclient\r
 import glanceclient.client as glanceclient\r
 from keystoneclient.v2_0 import client as ks_client\r
 from neutronclient.v2_0 import client as neutronclient\r
@@ -42,3 +43,8 @@ def neutron_client(session):
 \r
 def aodh_client(version, session):\r
     return aodhclient.Client(version, session=session)\r
+\r
+\r
+def congress_client(session):\r
+    return congressclient.Client(session=session,\r
+                                 service_type='policy')\r
similarity index 87%
rename from tests/profiler-poc.py
rename to tests/profiler_poc.py
index 408cb09..ea36eae 100644 (file)
@@ -21,6 +21,16 @@ See also: https://goo.gl/98Osig
 import json
 import os
 
+from oslo_config import cfg
+
+
+OPTS = [
+    cfg.StrOpt('profiler_type',
+               default=os.environ.get('PROFILER_TYPE', 'poc'),
+               help='the type of installer'),
+]
+
+
 OUTPUT = 'doctor_profiling_output'
 PREFIX = 'DOCTOR_PROFILER'
 TOTAL_CHECK_POINTS = 10
@@ -46,7 +56,7 @@ link down:{T00}|      |      |      |          |        |      |      |      |
 """
 
 
-def main():
+def main(log=None):
     check_points = ["T{:02d}".format(i) for i in range(TOTAL_CHECK_POINTS)]
     module_map = {"M{:02d}".format(i):
                   (MODULE_CHECK_POINTS[i], MODULE_CHECK_POINTS[i + 1])
@@ -59,7 +69,7 @@ def main():
     def format_tag(tag):
         return TAG_FORMAT.format(tag or '?')
 
-    tags = {cp: format_tag(ms) for cp, ms in elapsed_ms.iteritems()}
+    tags = {cp: format_tag(ms) for cp, ms in elapsed_ms.items()}
 
     def time_cost(cp):
         if elapsed_ms[cp[0]] and elapsed_ms[cp[1]]:
@@ -69,10 +79,10 @@ def main():
 
     # module time cost tags
     modules_cost_ms = {module: time_cost(cp)
-                       for module, cp in module_map.iteritems()}
+                       for module, cp in module_map.items()}
 
     tags.update({module: format_tag(cost)
-                 for module, cost in modules_cost_ms.iteritems()})
+                 for module, cost in modules_cost_ms.items()})
 
     tags.update({'total': time_cost((check_points[0], check_points[-1]))})
 
@@ -81,7 +91,10 @@ def main():
     logfile = open('{}.json'.format(OUTPUT), 'w')
     logfile.write(json.dumps(tags))
 
-    print profile
+    print(profile)
+    if log:
+        log.info('%s' % profile)
+
 
 if __name__ == '__main__':
     main()
index 5c92226..e1875e0 100755 (executable)
@@ -29,7 +29,7 @@ DOCTOR_USER=doctor
 DOCTOR_PW=doctor
 DOCTOR_PROJECT=doctor
 DOCTOR_ROLE=_member_
-PROFILER_TYPE=${PROFILER_TYPE:-none}
+PROFILER_TYPE=${PROFILER_TYPE:-poc}
 PYTHON_ENABLE=${PYTHON_ENABLE:-false}
 
 TOP_DIR=$(cd $(dirname "$0") && pwd)
@@ -212,17 +212,6 @@ create_alarm() {
      done
 }
 
-start_monitor() {
-    pgrep -f "python monitor.py" && return 0
-    sudo -E python monitor.py "$COMPUTE_HOST" "$COMPUTE_IP" "$INSPECTOR_TYPE" \
-        > monitor.log 2>&1 &
-}
-
-stop_monitor() {
-    pgrep -f "python monitor.py" || return 0
-    sudo kill $(pgrep -f "python monitor.py")
-}
-
 start_consumer() {
     pgrep -f "python consumer.py" && return 0
     python consumer.py "$CONSUMER_PORT" > consumer.log 2>&1 &
@@ -294,8 +283,12 @@ inject_failure() {
     echo "disabling network of compute host [$COMPUTE_HOST] for 3 mins..."
     cat > disable_network.sh << 'END_TXT'
 #!/bin/bash -x
-dev=$(sudo ip a | awk '/ @COMPUTE_IP@\//{print $NF}')
 sleep 1
+if [ -n "@INTERFACE_NAME@" ]; then
+    dev=@INTERFACE_NAME@
+else
+    dev=$(sudo ip a | awk '/ @COMPUTE_IP@\//{print $NF}')
+fi
 sudo ip link set $dev down
 echo "doctor set link down at" $(date "+%s.%N")
 sleep 180
@@ -303,6 +296,7 @@ sudo ip link set $dev up
 sleep 1
 END_TXT
     sed -i -e "s/@COMPUTE_IP@/$COMPUTE_IP/" disable_network.sh
+    sed -i -e "s/@INTERFACE_NAME@/$INTERFACE_NAME/" disable_network.sh
     chmod +x disable_network.sh
     scp $ssh_opts_cpu disable_network.sh "$COMPUTE_USER@$COMPUTE_IP:"
     ssh $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP" 'nohup ./disable_network.sh > disable_network.log 2>&1 &'
@@ -327,8 +321,11 @@ calculate_notification_time() {
     wait_consumer 60
     #keep 'at' as the last keyword just before the value, and
     #use regex to get value instead of the fixed column
+    if [ ! -f monitor.log ]; then
+        scp $ssh_opts_cpu "$COMPUTE_USER@$COMPUTE_IP:monitor.log" .
+    fi
     detected=$(grep "doctor monitor detected at" monitor.log |\
-               sed -e "s/^.* at //")
+               sed -e "s/^.* at //" | tail -1)
     notified=$(grep "doctor consumer notified at" consumer.log |\
                sed -e "s/^.* at //" | tail -1)
 
@@ -424,18 +421,18 @@ run_profiler() {
         export DOCTOR_PROFILER_T09=$(python -c \
           "print(int(($notified-$relative_start)*1000))")
 
-        python profiler-poc.py >doctor_profiler.log 2>&1
+        python profiler_poc.py > doctor_profiler.log 2>&1
     fi
 }
 
 cleanup() {
     set +e
     echo "cleanup..."
-    stop_monitor
     stop_inspector
     stop_consumer
 
     unset_forced_down_hosts
+    stop_monitor
     collect_logs
 
     vms=$(openstack $as_doctor_user server list)
@@ -467,6 +464,7 @@ cleanup() {
 
     cleanup_installer
     cleanup_inspector
+    cleanup_monitor
 
     # NOTE: Temporal log printer.
     for f in $(find . -name '*.log')
@@ -478,6 +476,13 @@ cleanup() {
     done
 }
 
+setup_python_packages() {
+    sudo pip install flask==0.10.1
+    command -v openstack || sudo pip install python-openstackclient==2.3.0
+    command -v ceilometer || sudo pip install python-ceilometerclient==2.6.2
+    command -v congress || sudo pip install python-congressclient==1.5.0
+}
+
 # Main process
 
 if [[ $PYTHON_ENABLE == [Tt]rue ]]; then
@@ -499,9 +504,14 @@ git log --oneline -1 || true   # ignore even you don't have git installed
 
 trap cleanup EXIT
 
+setup_python_packages
+
 source $TOP_DIR/functions-common
 source $TOP_DIR/lib/installer
 source $TOP_DIR/lib/inspector
+source $TOP_DIR/lib/monitor
+
+rm -f *.log
 
 setup_installer
 
@@ -531,8 +541,8 @@ echo "injecting host failure..."
 inject_failure
 
 check_host_status "(DOWN|UNKNOWN)" 60
-calculate_notification_time
 unset_forced_down_hosts
+calculate_notification_time
 collect_logs
 run_profiler
 
diff --git a/tests/scenario/__init__.py b/tests/scenario/__init__.py
new file mode 100644 (file)
index 0000000..48893ae
--- /dev/null
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/tests/scenario/common.py b/tests/scenario/common.py
new file mode 100644 (file)
index 0000000..a33c50f
--- /dev/null
@@ -0,0 +1,28 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import sys
+from common.utils import match_rep_in_file
+
+
+def calculate_notification_time():
+    log_file = '{0}/{1}'.format(sys.path[0], 'doctor.log')
+
+    reg = '(?<=doctor monitor detected at )\d+.\d+'
+    result = match_rep_in_file(reg, log_file)
+    if not result:
+        raise Exception('Can not match detected time')
+    detected = result.group(0)
+
+    reg = '(?<=doctor consumer notified at )\d+.\d+'
+    result = match_rep_in_file(reg, log_file)
+    if not result:
+        raise Exception('Can not match notified time')
+    notified = result.group(0)
+
+    return float(notified) - float(detected)
\ No newline at end of file
diff --git a/tests/scenario/network_failure.py b/tests/scenario/network_failure.py
new file mode 100644 (file)
index 0000000..e9a239d
--- /dev/null
@@ -0,0 +1,71 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from identity_auth import get_session
+from os_clients import nova_client
+from common.utils import SSHClient
+
+LINK_DOWN_SCRIPT = """
+#!/bin/bash -x
+dev=$(sudo ip a | awk '/ {compute_ip}\//{{print $NF}}')
+sleep 1
+sudo ip link set $dev down
+echo "doctor set link down at" $(date "+%s.%N")
+sleep 10
+sudo ip link set $dev up
+sleep 1
+"""
+
+
+class NetworkFault(object):
+
+    def __init__(self, conf, installer, log):
+        self.conf = conf
+        self.log = log
+        self.installer = installer
+        self.nova = nova_client(self.conf.nova_version, get_session())
+        self.host = None
+        self.GetLog = False
+
+    def start(self, host):
+        self.log.info('fault inject start......')
+        self._set_link_down(host.ip)
+        self.host = host
+        self.log.info('fault inject end......')
+
+    def cleanup(self):
+        self.log.info('fault inject cleanup......')
+        self.get_disable_network_log()
+
+    def get_disable_network_log(self):
+        if self.GetLog:
+            self.log.info('Already get the disable_netork.log from down_host......')
+            return
+        if self.host is not None:
+            client = SSHClient(self.host.ip,
+                               self.installer.node_user_name,
+                               key_filename=self.installer.get_ssh_key_from_installer(),
+                               look_for_keys=True,
+                               log=self.log)
+            client.scp('disable_network.log', './disable_network.log', method='get')
+            self.log.info('Get the disable_netork.log from down_host(host_name:%s, host_ip:%s)'
+                          % (self.host.name, self.host.ip))
+        self.GetLog = True
+
+    def _set_link_down(self, compute_ip):
+        file_name = './disable_network.sh'
+        with open(file_name, 'w') as file:
+            file.write(LINK_DOWN_SCRIPT.format(compute_ip=compute_ip))
+        client = SSHClient(compute_ip,
+                           self.installer.node_user_name,
+                           key_filename=self.installer.get_ssh_key_from_installer(),
+                           look_for_keys=True,
+                           log=self.log)
+        client.scp('./disable_network.sh', 'disable_network.sh')
+        command = 'bash disable_network.sh > disable_network.log 2>&1 &'
+        client.ssh(command)
diff --git a/tox.ini b/tox.ini
index c6262ed..c8cebe8 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -23,5 +23,8 @@ passenv =
     PROFILER_TYPE
     PYTHON_ENABLE
     CI_DEBUG
+    INSTALLER_TYPE
+    INSTALLER_IP
+    PROFILER_TYPE
 changedir = {toxinidir}/tests
 commands = doctor-test