Yardstick output format unified 57/35257/7
authorchenjiankun <chenjiankun1@huawei.com>
Wed, 24 May 2017 07:22:51 +0000 (07:22 +0000)
committerchenjiankun <chenjiankun1@huawei.com>
Thu, 22 Jun 2017 11:06:17 +0000 (11:06 +0000)
JIRA: YARDSTICK-658

Currently the yardstick have three dispatcher: file, influxdb, mongodb.
(influxdb using API to get result and mongodb using testAPI to get result)
But their output format is different. It is hard to use.

In this patch, make all dispatchers using the same data source.
And make the output format of file and influxdb unified.
As for mongodb, since it is related to testAPI, so I make it push data
every test case.

The unified output format is:
    http://paste.openstack.org/show/610125/

Change-Id: I854ac4f03e6f904469b07b0c924c7d850545ae5b
Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
15 files changed:
api/resources/results.py
tests/unit/benchmark/core/test_task.py
tests/unit/benchmark/runner/test_base.py
tests/unit/dispatcher/test_influxdb.py
yardstick/benchmark/core/task.py
yardstick/benchmark/runners/arithmetic.py
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/duration.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/runners/sequence.py
yardstick/cmd/commands/task.py
yardstick/dispatcher/base.py
yardstick/dispatcher/file.py
yardstick/dispatcher/http.py
yardstick/dispatcher/influxdb.py

index 86fc251..a0527ed 100644 (file)
@@ -28,12 +28,12 @@ def getResult(args):
         uuid.UUID(task_id)
     except KeyError:
         message = 'task_id must be provided'
-        return common_utils.error_handler(message)
+        return common_utils.result_handler(2, message)
 
     task = TasksHandler().get_task_by_taskid(task_id)
 
     def _unfinished():
-        return common_utils.result_handler(0, [])
+        return common_utils.result_handler(0, {})
 
     def _finished():
         testcases = task.details.split(',')
@@ -44,7 +44,7 @@ def getResult(args):
             data = common_utils.translate_to_str(influx_utils.query(query_sql))
             return data
 
-        result = {k: get_data(k) for k in testcases}
+        result = _format_data({k: get_data(k) for k in testcases})
 
         return common_utils.result_handler(1, result)
 
@@ -61,4 +61,59 @@ def getResult(args):
         }
         return switcher.get(status, lambda: 'nothing')()
     except IndexError:
-        return common_utils.error_handler('no such task')
+        return common_utils.result_handler(2, 'no such task')
+
+
+def _format_data(data):
+    try:
+        first_value = data.values()[0][0]
+    except IndexError:
+        return {'criteria': 'FAIL', 'testcases': {}}
+    else:
+        info = {
+            'deploy_scenario': first_value.get('deploy_scenario'),
+            'installer': first_value.get('installer'),
+            'pod_name': first_value.get('pod_name'),
+            'version': first_value.get('version')
+        }
+        task_id = first_value.get('task_id')
+        criteria = first_value.get('criteria')
+        testcases = {k: _get_case_data(v) for k, v in data.items()}
+
+        result = {
+            'criteria': criteria,
+            'info': info,
+            'task_id': task_id,
+            'testcases': testcases
+        }
+        return result
+
+
+def _get_case_data(data):
+    try:
+        scenario = data[0]
+    except IndexError:
+        return {'tc_data': [], 'criteria': 'FAIL'}
+    else:
+        tc_data = [_get_scenario_data(s) for s in data]
+        criteria = scenario.get('criteria')
+        return {'tc_data': tc_data, 'criteria': criteria}
+
+
+def _get_scenario_data(data):
+    result = {
+        'data': {},
+        'timestamp': ''
+    }
+
+    blacklist = {'criteria', 'deploy_scenario', 'host', 'installer',
+                 'pod_name', 'runner_id', 'scenarios', 'target',
+                 'task_id', 'time', 'version'}
+
+    keys = set(data.keys()) - set(blacklist)
+    for k in keys:
+        result['data'][k] = data[k]
+
+    result['timestamp'] = data.get('time')
+
+    return result
index 8034392..b64bb8e 100644 (file)
@@ -65,6 +65,7 @@ class TaskTestCase(unittest.TestCase):
         runner = mock.Mock()
         runner.join.return_value = 0
         runner.get_output.return_value = {}
+        runner.get_result.return_value = []
         mock_base_runner.Runner.get.return_value = runner
         t._run([scenario], False, "yardstick.out")
         self.assertTrue(runner.run.called)
index 7880fe5..6e72fa5 100644 (file)
@@ -13,7 +13,6 @@ from __future__ import print_function
 from __future__ import absolute_import
 
 import unittest
-import multiprocessing
 import time
 
 from yardstick.benchmark.runners.iteration import IterationRunner
@@ -22,8 +21,7 @@ from yardstick.benchmark.runners.iteration import IterationRunner
 class RunnerTestCase(unittest.TestCase):
 
     def test_get_output(self):
-        queue = multiprocessing.Queue()
-        runner = IterationRunner({}, queue)
+        runner = IterationRunner({})
         runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
         runner.output_queue.put({'criteria': 'PASS'})
 
index dca3c41..a5d9b07 100644 (file)
@@ -94,31 +94,31 @@ class InfluxdbDispatcherTestCase(unittest.TestCase):
             }
         }
 
-        self.yardstick_conf = {'yardstick': {}}
-
-    def test_record_result_data_no_target(self):
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
-        influxdb.target = ''
-        self.assertEqual(influxdb.record_result_data(self.data1), -1)
-
-    def test_record_result_data_no_case_name(self):
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
-        self.assertEqual(influxdb.record_result_data(self.data2), -1)
+        self.yardstick_conf = {'dispatcher_influxdb': {}}
 
     @mock.patch('yardstick.dispatcher.influxdb.requests')
     def test_record_result_data(self, mock_requests):
         type(mock_requests.post.return_value).status_code = 204
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
-        self.assertEqual(influxdb.record_result_data(self.data1), 0)
-        self.assertEqual(influxdb.record_result_data(self.data2), 0)
-        self.assertEqual(influxdb.flush_result_data(), 0)
+        influxdb = InfluxdbDispatcher(self.yardstick_conf)
+        data = {
+            'status': 1,
+            'result': {
+                'criteria': 'PASS',
+                'info': {
+                },
+                'task_id': 'b9e2bbc2-dfd8-410d-8c24-07771e9f7979',
+                'testcases': {
+                }
+            }
+        }
+        self.assertEqual(influxdb.flush_result_data(data), 0)
 
     def test__dict_key_flatten(self):
         line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,' \
                'mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00'
         # need to sort for assert to work
         line = ",".join(sorted(line.split(',')))
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
+        influxdb = InfluxdbDispatcher(self.yardstick_conf)
         flattened_data = influxdb._dict_key_flatten(
             self.data3['benchmark']['data'])
         result = ",".join(
@@ -126,15 +126,15 @@ class InfluxdbDispatcherTestCase(unittest.TestCase):
         self.assertEqual(result, line)
 
     def test__get_nano_timestamp(self):
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
-        results = {'benchmark': {'timestamp': '1451461248.925574'}}
+        influxdb = InfluxdbDispatcher(self.yardstick_conf)
+        results = {'timestamp': '1451461248.925574'}
         self.assertEqual(influxdb._get_nano_timestamp(results),
                          '1451461248925574144')
 
     @mock.patch('yardstick.dispatcher.influxdb.time')
     def test__get_nano_timestamp_except(self, mock_time):
         results = {}
-        influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
+        influxdb = InfluxdbDispatcher(self.yardstick_conf)
         mock_time.time.return_value = 1451461248.925574
         self.assertEqual(influxdb._get_nano_timestamp(results),
                          '1451461248925574144')
index c44081b..091aa99 100644 (file)
@@ -24,6 +24,7 @@ from six.moves import filter
 
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.runners import base as base_runner
+from yardstick.dispatcher.base import Base as DispatcherBase
 from yardstick.common.task_template import TaskTemplate
 from yardstick.common.utils import source_env
 from yardstick.common import utils
@@ -42,7 +43,6 @@ class Task(object):     # pragma: no cover
     """
 
     def __init__(self):
-        self.config = {}
         self.contexts = []
         self.outputs = {}
 
@@ -55,7 +55,14 @@ class Task(object):     # pragma: no cover
 
         check_environment()
 
-        self.config['yardstick'] = utils.parse_ini_file(config_file)
+        output_config = utils.parse_ini_file(config_file)
+        self._init_output_config(output_config)
+        self._set_output_config(output_config, args.output_file)
+        LOG.debug('Output configuration is: %s', output_config)
+
+        if output_config['DEFAULT'].get('dispatcher') == 'file':
+            result = {'status': 0, 'result': {}}
+            utils.write_json_to_file(args.output_file, result)
 
         total_start_time = time.time()
         parser = TaskParser(args.inputfile[0])
@@ -75,6 +82,7 @@ class Task(object):     # pragma: no cover
         if args.parse_only:
             sys.exit(0)
 
+        testcases = {}
         # parse task_files
         for i in range(0, len(task_files)):
             one_task_start_time = time.time()
@@ -90,7 +98,15 @@ class Task(object):     # pragma: no cover
                          meet_precondition)
                 continue
 
-            self._run(scenarios, run_in_parallel, args.output_file)
+            case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
+            try:
+                data = self._run(scenarios, run_in_parallel, args.output_file)
+            except KeyboardInterrupt:
+                raise
+            except Exception:
+                testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
+            else:
+                testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
 
             if args.keep_deploy:
                 # keep deployment, forget about stack
@@ -104,6 +120,10 @@ class Task(object):     # pragma: no cover
             LOG.info("task %s finished in %d secs", task_files[i],
                      one_task_end_time - one_task_start_time)
 
+        result = self._get_format_result(testcases)
+
+        self._do_output(output_config, result)
+
         total_end_time = time.time()
         LOG.info("total finished in %d secs",
                  total_end_time - total_start_time)
@@ -114,6 +134,65 @@ class Task(object):     # pragma: no cover
 
         print("Done, exiting")
 
+    def _init_output_config(self, output_config):
+        output_config.setdefault('DEFAULT', {})
+        output_config.setdefault('dispatcher_http', {})
+        output_config.setdefault('dispatcher_file', {})
+        output_config.setdefault('dispatcher_influxdb', {})
+        output_config.setdefault('nsb', {})
+
+    def _set_output_config(self, output_config, file_path):
+        try:
+            out_type = os.environ['DISPATCHER']
+        except KeyError:
+            output_config['DEFAULT'].setdefault('dispatcher', 'file')
+        else:
+            output_config['DEFAULT']['dispatcher'] = out_type
+
+        output_config['dispatcher_file']['file_path'] = file_path
+
+        try:
+            target = os.environ['TARGET']
+        except KeyError:
+            pass
+        else:
+            k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
+            output_config[k]['target'] = target
+
+    def _get_format_result(self, testcases):
+        criteria = self._get_task_criteria(testcases)
+
+        info = {
+            'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
+            'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
+            'pod_name': os.environ.get('NODE_NAME', 'unknown'),
+            'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
+        }
+
+        result = {
+            'status': 1,
+            'result': {
+                'criteria': criteria,
+                'task_id': self.task_id,
+                'info': info,
+                'testcases': testcases
+            }
+        }
+
+        return result
+
+    def _get_task_criteria(self, testcases):
+        criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
+        if criteria:
+            return 'FAIL'
+        else:
+            return 'PASS'
+
+    def _do_output(self, output_config, result):
+
+        dispatcher = DispatcherBase.get(output_config)
+        dispatcher.flush_result_data(result)
+
     def _run(self, scenarios, run_in_parallel, output_file):
         """Deploys context and calls runners"""
         for context in self.contexts:
@@ -121,6 +200,7 @@ class Task(object):     # pragma: no cover
 
         background_runners = []
 
+        result = []
         # Start all background scenarios
         for scenario in filter(_is_background_scenario, scenarios):
             scenario["runner"] = dict(type="Duration", duration=1000000000)
@@ -136,16 +216,23 @@ class Task(object):     # pragma: no cover
 
             # Wait for runners to finish
             for runner in runners:
-                runner_join(runner)
+                status = runner_join(runner)
+                if status != 0:
+                    raise RuntimeError
                 self.outputs.update(runner.get_output())
+                result.extend(runner.get_result())
                 print("Runner ended, output in", output_file)
         else:
             # run serially
             for scenario in scenarios:
                 if not _is_background_scenario(scenario):
                     runner = self.run_one_scenario(scenario, output_file)
-                    runner_join(runner)
+                    status = runner_join(runner)
+                    if status != 0:
+                        LOG.error('Scenario: %s ERROR', scenario.get('type'))
+                        raise RuntimeError
                     self.outputs.update(runner.get_output())
+                    result.extend(runner.get_result())
                     print("Runner ended, output in", output_file)
 
         # Abort background runners
@@ -154,15 +241,21 @@ class Task(object):     # pragma: no cover
 
         # Wait for background runners to finish
         for runner in background_runners:
-            if runner.join(timeout=60) is None:
+            status = runner.join(timeout=60)
+            if status is None:
                 # Nuke if it did not stop nicely
                 base_runner.Runner.terminate(runner)
-                runner_join(runner)
+                status = runner_join(runner)
                 self.outputs.update(runner.get_output())
+                result.extend(runner.get_result())
             else:
                 base_runner.Runner.release(runner)
+            if status != 0:
+                raise RuntimeError
             print("Background task ended")
 
+        return result
+
     def atexit_handler(self):
         """handler for process termination"""
         base_runner.Runner.terminate_all()
@@ -227,7 +320,7 @@ class Task(object):     # pragma: no cover
 
         if "nodes" in scenario_cfg:
             context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
-        runner = base_runner.Runner.get(runner_cfg, self.config)
+        runner = base_runner.Runner.get(runner_cfg)
 
         print("Starting runner of type '%s'" % runner_cfg["type"])
         runner.run(scenario_cfg, context_cfg)
@@ -489,8 +582,7 @@ def runner_join(runner):
     """join (wait for) a runner, exit process at runner failure"""
     status = runner.join()
     base_runner.Runner.release(runner)
-    if status != 0:
-        sys.exit("Runner failed")
+    return status
 
 
 def print_invalid_header(source_name, args):
index 7ec5933..7898ae2 100755 (executable)
@@ -63,10 +63,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     benchmark.setup()
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -132,10 +128,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
-        record = {'runner_id': runner_cfg['runner_id'],
-                  'benchmark': benchmark_output}
-
-        queue.put(record)
+        queue.put(benchmark_output)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
index ebb9a91..f6816c7 100755 (executable)
@@ -22,46 +22,13 @@ import logging
 import multiprocessing
 import subprocess
 import time
-import os
 import traceback
 
-from oslo_config import cfg
-
 import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
-from yardstick.dispatcher.base import Base as DispatcherBase
 
 log = logging.getLogger(__name__)
 
-CONF = cfg.CONF
-
-
-def _output_serializer_main(filename, queue, config):
-    """entrypoint for the singleton subprocess writing to outfile
-    Use of this process enables multiple instances of a scenario without
-    messing up the output file.
-    """
-    try:
-        out_type = config['yardstick'].get('DEFAULT', {})['dispatcher']
-    except KeyError:
-        out_type = os.environ.get('DISPATCHER', 'file')
-
-    conf = {
-        'type': out_type.capitalize(),
-        'file_path': filename
-    }
-
-    dispatcher = DispatcherBase.get(conf, config)
-
-    while True:
-        # blocks until data becomes available
-        record = queue.get()
-        if record == '_TERMINATE_':
-            dispatcher.flush_result_data()
-            break
-        else:
-            dispatcher.record_result_data(record)
-
 
 def _execute_shell_command(command):
     """execute shell script with error handling"""
@@ -110,8 +77,6 @@ def _periodic_action(interval, command, queue):
 
 
 class Runner(object):
-    queue = None
-    dump_process = None
     runners = []
 
     @staticmethod
@@ -131,30 +96,10 @@ class Runner(object):
         return types
 
     @staticmethod
-    def get(runner_cfg, config):
+    def get(runner_cfg):
         """Returns instance of a scenario runner for execution type.
         """
-        # if there is no runner, start the output serializer subprocess
-        if not Runner.runners:
-            log.debug("Starting dump process file '%s'",
-                      runner_cfg["output_filename"])
-            Runner.queue = multiprocessing.Queue()
-            Runner.dump_process = multiprocessing.Process(
-                target=_output_serializer_main,
-                name="Dumper",
-                args=(runner_cfg["output_filename"], Runner.queue, config))
-            Runner.dump_process.start()
-
-        return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue)
-
-    @staticmethod
-    def release_dump_process():
-        """Release the dumper process"""
-        log.debug("Stopping dump process")
-        if Runner.dump_process:
-            Runner.queue.put('_TERMINATE_')
-            Runner.dump_process.join()
-            Runner.dump_process = None
+        return Runner.get_cls(runner_cfg["type"])(runner_cfg)
 
     @staticmethod
     def release(runner):
@@ -162,10 +107,6 @@ class Runner(object):
         if runner in Runner.runners:
             Runner.runners.remove(runner)
 
-        # if this was the last runner, stop the output serializer subprocess
-        if not Runner.runners:
-            Runner.release_dump_process()
-
     @staticmethod
     def terminate(runner):
         """Terminate the runner"""
@@ -179,7 +120,6 @@ class Runner(object):
 
         # release dumper process as some errors before any runner is created
         if not Runner.runners:
-            Runner.release_dump_process()
             return
 
         for runner in Runner.runners:
@@ -193,11 +133,11 @@ class Runner(object):
                 runner.periodic_action_process = None
             Runner.release(runner)
 
-    def __init__(self, config, queue):
+    def __init__(self, config):
         self.config = config
         self.periodic_action_process = None
-        self.result_queue = queue
         self.output_queue = multiprocessing.Queue()
+        self.result_queue = multiprocessing.Queue()
         self.process = None
         self.aborted = multiprocessing.Event()
         Runner.runners.append(self)
@@ -276,3 +216,9 @@ class Runner(object):
         while not self.output_queue.empty():
             result.update(self.output_queue.get())
         return result
+
+    def get_result(self):
+        result = []
+        while not self.result_queue.empty():
+            result.append(self.result_queue.get())
+        return result
index 2bf2cd2..69d7445 100644 (file)
@@ -52,10 +52,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     start = time.time()
     while True:
 
@@ -90,10 +86,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
-        record = {'runner_id': runner_cfg['runner_id'],
-                  'benchmark': benchmark_output}
-
-        queue.put(record)
+        queue.put(benchmark_output)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
index 973bb9a..50fe106 100644 (file)
@@ -53,10 +53,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -105,10 +101,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                 'errors': errors
             }
 
-            record = {'runner_id': runner_cfg['runner_id'],
-                      'benchmark': benchmark_output}
-
-            queue.put(record)
+            queue.put(benchmark_output)
 
             LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                       {"runner": runner_cfg["runner_id"],
index 74ff822..68e272c 100644 (file)
@@ -57,10 +57,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
     benchmark.setup()
     method = getattr(benchmark, method_name)
 
-    queue.put({'runner_id': runner_cfg['runner_id'],
-               'scenario_cfg': scenario_cfg,
-               'context_cfg': context_cfg})
-
     sla_action = None
     if "sla" in scenario_cfg:
         sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -99,10 +95,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             'errors': errors
         }
 
-        record = {'runner_id': runner_cfg['runner_id'],
-                  'benchmark': benchmark_output}
-
-        queue.put(record)
+        queue.put(benchmark_output)
 
         LOG.debug("runner=%(runner)s seq=%(sequence)s END",
                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
index 16a4db2..6384e6e 100644 (file)
@@ -14,7 +14,6 @@ from __future__ import absolute_import
 from yardstick.benchmark.core.task import Task
 from yardstick.common.utils import cliargs
 from yardstick.common.utils import write_json_to_file
-from yardstick.common.utils import read_json_from_file
 from yardstick.cmd.commands import change_osloobj_to_paras
 
 output_file_default = "/tmp/yardstick.out"
@@ -46,22 +45,11 @@ class TaskCommands(object):
         param = change_osloobj_to_paras(args)
         self.output_file = param.output_file
 
-        self._init_result_file()
-
         try:
             Task().start(param, **kwargs)
-            self._finish()
         except Exception as e:
             self._write_error_data(e)
-
-    def _init_result_file(self):
-        data = {'status': 0, 'result': []}
-        write_json_to_file(self.output_file, data)
-
-    def _finish(self):
-        result = read_json_from_file(self.output_file).get('result')
-        data = {'status': 1, 'result': result}
-        write_json_to_file(self.output_file, data)
+            raise
 
     def _write_error_data(self, error):
         data = {'status': 2, 'result': str(error)}
index a1c8582..e77249c 100644 (file)
@@ -38,15 +38,13 @@ class Base(object):
         raise RuntimeError("No such dispatcher_type %s" % dispatcher_type)
 
     @staticmethod
-    def get(conf, config):
+    def get(config):
         """Returns instance of a dispatcher for dispatcher type.
         """
-        return Base.get_cls(conf["type"])(conf, config)
+        out_type = config['DEFAULT']['dispatcher']
 
-    @abc.abstractmethod
-    def record_result_data(self, data):
-        """Recording result data interface."""
+        return Base.get_cls(out_type.capitalize())(config)
 
     @abc.abstractmethod
-    def flush_result_data(self):
+    def flush_result_data(self, data):
         """Flush result data into permanent storage media interface."""
index 8acd5df..24fc22d 100644 (file)
@@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase):
 
     __dispatcher_type__ = "File"
 
-    def __init__(self, conf, config):
+    def __init__(self, conf):
         super(FileDispatcher, self).__init__(conf)
-        self.result = []
+        self.target = conf['dispatcher_file'].get('file_path',
+                                                  consts.DEFAULT_OUTPUT_FILE)
 
-    def record_result_data(self, data):
-        self.result.append(data)
-
-    def flush_result_data(self):
-        file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE)
-
-        res = utils.read_json_from_file(file_path).get('result')
-        res.extend(self.result)
-
-        data = {'status': 0, 'result': res}
-        utils.write_json_to_file(file_path, data)
+    def flush_result_data(self, data):
+        utils.write_json_to_file(self.target, data)
index 0d8d2a3..9bf9af3 100644 (file)
@@ -20,30 +20,15 @@ from __future__ import absolute_import
 
 import logging
 import os
+from datetime import datetime
 
 from oslo_serialization import jsonutils
 import requests
-from oslo_config import cfg
 
 from yardstick.dispatcher.base import Base as DispatchBase
 
 LOG = logging.getLogger(__name__)
 
-CONF = cfg.CONF
-http_dispatcher_opts = [
-    cfg.StrOpt('target',
-               default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'),
-               help='The target where the http request will be sent. '
-                    'If this is not set, no data will be posted. For '
-                    'example: target = http://hostname:1234/path'),
-    cfg.IntOpt('timeout',
-               default=5,
-               help='The max time in seconds to wait for a request to '
-                    'timeout.'),
-]
-
-CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
-
 
 class HttpDispatcher(DispatchBase):
     """Dispatcher class for posting data into a http target.
@@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase):
 
     __dispatcher_type__ = "Http"
 
-    def __init__(self, conf, config):
+    def __init__(self, conf):
         super(HttpDispatcher, self).__init__(conf)
+        http_conf = conf['dispatcher_http']
         self.headers = {'Content-type': 'application/json'}
-        self.timeout = CONF.dispatcher_http.timeout
-        self.target = CONF.dispatcher_http.target
-        self.raw_result = []
-        self.result = {
-            "project_name": "yardstick",
-            "description": "yardstick test cases result",
-            "pod_name": os.environ.get('NODE_NAME', 'unknown'),
-            "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
-            "version": os.environ.get('YARDSTICK_VERSION', 'unknown'),
-            "build_tag": os.environ.get('BUILD_TAG')
-        }
-
-    def record_result_data(self, data):
-        self.raw_result.append(data)
+        self.timeout = int(http_conf.get('timeout', 5))
+        self.target = http_conf.get('target', 'http://127.0.0.1:8000/results')
 
-    def flush_result_data(self):
+    def flush_result_data(self, data):
         if self.target == '':
             # if the target was not set, do not do anything
             LOG.error('Dispatcher target was not set, no data will'
                       'be posted.')
             return
 
-        self.result["details"] = {'results': self.raw_result}
-
-        case_name = ""
-        for v in self.raw_result:
-            if isinstance(v, dict) and "scenario_cfg" in v:
-                case_name = v["scenario_cfg"]["tc"]
-                break
-        if case_name == "":
-            LOG.error('Test result : %s',
-                      jsonutils.dump_as_bytes(self.result))
-            LOG.error('The case_name cannot be found, no data will be posted.')
-            return
+        result = data['result']
+        self.info = result['info']
+        self.task_id = result['task_id']
+        self.criteria = result['criteria']
+        testcases = result['testcases']
+
+        for case, data in testcases.items():
+            self._upload_case_result(case, data)
 
-        self.result["case_name"] = case_name
+    def _upload_case_result(self, case, data):
+        try:
+            scenario_data = data.get('tc_data', [])[0]
+        except IndexError:
+            current_time = datetime.now()
+        else:
+            timestamp = float(scenario_data.get('timestamp', 0.0))
+            current_time = datetime.fromtimestamp(timestamp)
+
+        result = {
+            "project_name": "yardstick",
+            "case_name": case,
+            "description": "yardstick ci scenario status",
+            "scenario": self.info.get('deploy_scenario'),
+            "version": self.info.get('version'),
+            "pod_name": self.info.get('pod_name'),
+            "installer": self.info.get('installer'),
+            "build_tag": os.environ.get('BUILD_TAG'),
+            "criteria": data.get('criteria'),
+            "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+            "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+            "trust_indicator": "",
+            "details": ""
+        }
 
         try:
-            LOG.debug('Test result : %s',
-                      jsonutils.dump_as_bytes(self.result))
+            LOG.debug('Test result : %s', result)
             res = requests.post(self.target,
-                                data=jsonutils.dump_as_bytes(self.result),
+                                data=jsonutils.dump_as_bytes(result),
                                 headers=self.headers,
                                 timeout=self.timeout)
             LOG.debug('Test result posting finished with status code'
                       ' %d.' % res.status_code)
         except Exception as err:
-            LOG.exception('Failed to record result data: %s',
-                          err)
+            LOG.exception('Failed to record result data: %s', err)
index 53af79c..373aae1 100644 (file)
 from __future__ import absolute_import
 
 import logging
-import os
 import time
 
 import collections
 import requests
 import six
-from oslo_serialization import jsonutils
 
 from third_party.influxdb.influxdb_line_protocol import make_lines
 from yardstick.dispatcher.base import Base as DispatchBase
@@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase):
 
     __dispatcher_type__ = "Influxdb"
 
-    def __init__(self, conf, config):
+    def __init__(self, conf):
         super(InfluxdbDispatcher, self).__init__(conf)
-        db_conf = config['yardstick'].get('dispatcher_influxdb', {})
+        db_conf = conf['dispatcher_influxdb']
         self.timeout = int(db_conf.get('timeout', 5))
         self.target = db_conf.get('target', 'http://127.0.0.1:8086')
         self.db_name = db_conf.get('db_name', 'yardstick')
         self.username = db_conf.get('username', 'root')
         self.password = db_conf.get('password', 'root')
+
         self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
-        self.raw_result = []
-        self.case_name = ""
-        self.tc = ""
+
         self.task_id = -1
-        self.runners_info = {}
-        self.static_tags = {
-            "pod_name": os.environ.get('NODE_NAME', 'unknown'),
-            "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
-            "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'),
-            "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH',
-                                                       'unknown'))
 
+    def flush_result_data(self, data):
+        LOG.debug('Test result all : %s', data)
+        if self.target == '':
+            # if the target was not set, do not do anything
+            LOG.error('Dispatcher target was not set, no data will be posted.')
+
+        result = data['result']
+        self.tags = result['info']
+        self.task_id = result['task_id']
+        self.criteria = result['criteria']
+        testcases = result['testcases']
+
+        for case, data in testcases.items():
+            tc_criteria = data['criteria']
+            for record in data['tc_data']:
+                self._upload_one_record(record, case, tc_criteria)
+
+        return 0
+
+    def _upload_one_record(self, data, case, tc_criteria):
+        try:
+            line = self._data_to_line_protocol(data, case, tc_criteria)
+            LOG.debug('Test result line format : %s', line)
+            res = requests.post(self.influxdb_url,
+                                data=line,
+                                auth=(self.username, self.password),
+                                timeout=self.timeout)
+            if res.status_code != 204:
+                LOG.error('Test result posting finished with status code'
+                          ' %d.', res.status_code)
+                LOG.error(res.text)
+
+        except Exception as err:
+            LOG.exception('Failed to record result data: %s', err)
+
+    def _data_to_line_protocol(self, data, case, criteria):
+        msg = {}
+        point = {
+            "measurement": case,
+            "fields": self._dict_key_flatten(data["data"]),
+            "time": self._get_nano_timestamp(data),
+            "tags": self._get_extended_tags(criteria),
         }
+        msg["points"] = [point]
+        msg["tags"] = self.tags
+
+        return make_lines(msg).encode('utf-8')
 
     def _dict_key_flatten(self, data):
         next_data = {}
@@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase):
 
     def _get_nano_timestamp(self, results):
         try:
-            timestamp = results["benchmark"]["timestamp"]
+            timestamp = results["timestamp"]
         except Exception:
             timestamp = time.time()
 
         return str(int(float(timestamp) * 1000000000))
 
-    def _get_extended_tags(self, data):
-        runner_info = self.runners_info[data["runner_id"]]
+    def _get_extended_tags(self, criteria):
         tags = {
-            "runner_id": data["runner_id"],
             "task_id": self.task_id,
-            "scenarios": runner_info["scenarios"]
+            "criteria": criteria
         }
-        if "host" in runner_info:
-            tags["host"] = runner_info["host"]
-        if "target" in runner_info:
-            tags["target"] = runner_info["target"]
 
         return tags
-
-    def _data_to_line_protocol(self, data):
-        msg = {}
-        point = {
-            "measurement": self.tc,
-            "fields": self._dict_key_flatten(data["benchmark"]["data"]),
-            "time": self._get_nano_timestamp(data),
-            "tags": self._get_extended_tags(data),
-        }
-        msg["points"] = [point]
-        msg["tags"] = self.static_tags
-
-        return make_lines(msg).encode('utf-8')
-
-    def record_result_data(self, data):
-        LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data))
-        self.raw_result.append(data)
-        if self.target == '':
-            # if the target was not set, do not do anything
-            LOG.error('Dispatcher target was not set, no data will'
-                      'be posted.')
-            return -1
-
-        if isinstance(data, dict) and "scenario_cfg" in data:
-            self.tc = data["scenario_cfg"]["tc"]
-            self.task_id = data["scenario_cfg"]["task_id"]
-            scenario_cfg = data["scenario_cfg"]
-            runner_id = data["runner_id"]
-            self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]}
-            if "host" in scenario_cfg:
-                self.runners_info[runner_id]["host"] = scenario_cfg["host"]
-            if "target" in scenario_cfg:
-                self.runners_info[runner_id]["target"] = scenario_cfg["target"]
-            return 0
-
-        if self.tc == "":
-            LOG.error('Test result : %s', jsonutils.dump_as_bytes(data))
-            LOG.error('The case_name cannot be found, no data will be posted.')
-            return -1
-
-        try:
-            line = self._data_to_line_protocol(data)
-            LOG.debug('Test result line format : %s', line)
-            res = requests.post(self.influxdb_url,
-                                data=line,
-                                auth=(self.username, self.password),
-                                timeout=self.timeout)
-            if res.status_code != 204:
-                LOG.error('Test result posting finished with status code'
-                          ' %d.', res.status_code)
-                LOG.error(res.text)
-
-        except Exception as err:
-            LOG.exception('Failed to record result data: %s',
-                          err)
-            return -1
-        return 0
-
-    def flush_result_data(self):
-        LOG.debug('Test result all : %s',
-                  jsonutils.dump_as_bytes(self.raw_result))
-        return 0