Yardstick real-time influxdb KPI reporting regressions 51/51651/4
authorchenjiankun <chenjiankun1@huawei.com>
Mon, 5 Feb 2018 08:13:07 +0000 (08:13 +0000)
committerchenjiankun <chenjiankun1@huawei.com>
Fri, 23 Feb 2018 06:52:21 +0000 (06:52 +0000)
JIRA: YARDSTICK-989

We used to have real-time influxdb reporting of test KPIs. The user
could monitor using grafana and see the real-time output.
The record format was changed to now only report KPIs at the end of the
test.
This is a problem for test cases which run for a long duration,  we need
to wait until the end of the test execution to get any results from
influxdb. If the test fails in between or doesn't exit cleanly for some
reason, we are left with no results stored in influxdb which gives the
user no information.
This is also a regression from the previous behaviour.

Change-Id: I0f476dff9162a359f0286fb421f2e9c4befaa5cc
Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
yardstick/benchmark/core/task.py
yardstick/benchmark/runners/base.py
yardstick/dispatcher/influxdb.py
yardstick/tests/unit/benchmark/core/test_task.py
yardstick/tests/unit/benchmark/runner/test_base.py

index 5fcc918..f5d2b18 100644 (file)
@@ -120,7 +120,7 @@ class Task(object):     # pragma: no cover
 
             case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
             try:
-                data = self._run(scenarios, run_in_parallel, args.output_file)
+                data = self._run(scenarios, run_in_parallel, output_config)
             except KeyboardInterrupt:
                 raise
             except Exception:  # pylint: disable=broad-except
@@ -232,11 +232,12 @@ class Task(object):     # pragma: no cover
 
     def _do_output(self, output_config, result):
         dispatchers = DispatcherBase.get(output_config)
+        dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
 
         for dispatcher in dispatchers:
             dispatcher.flush_result_data(result)
 
-    def _run(self, scenarios, run_in_parallel, output_file):
+    def _run(self, scenarios, run_in_parallel, output_config):
         """Deploys context and calls runners"""
         for context in self.contexts:
             context.deploy()
@@ -247,14 +248,14 @@ class Task(object):     # pragma: no cover
         # Start all background scenarios
         for scenario in filter(_is_background_scenario, scenarios):
             scenario["runner"] = dict(type="Duration", duration=1000000000)
-            runner = self.run_one_scenario(scenario, output_file)
+            runner = self.run_one_scenario(scenario, output_config)
             background_runners.append(runner)
 
         runners = []
         if run_in_parallel:
             for scenario in scenarios:
                 if not _is_background_scenario(scenario):
-                    runner = self.run_one_scenario(scenario, output_file)
+                    runner = self.run_one_scenario(scenario, output_config)
                     runners.append(runner)
 
             # Wait for runners to finish
@@ -263,12 +264,12 @@ class Task(object):     # pragma: no cover
                 if status != 0:
                     raise RuntimeError(
                         "{0} runner status {1}".format(runner.__execution_type__, status))
-                LOG.info("Runner ended, output in %s", output_file)
+                LOG.info("Runner ended")
         else:
             # run serially
             for scenario in scenarios:
                 if not _is_background_scenario(scenario):
-                    runner = self.run_one_scenario(scenario, output_file)
+                    runner = self.run_one_scenario(scenario, output_config)
                     status = runner_join(runner, background_runners, self.outputs, result)
                     if status != 0:
                         LOG.error('Scenario NO.%s: "%s" ERROR!',
@@ -276,7 +277,7 @@ class Task(object):     # pragma: no cover
                                   scenario.get('type'))
                         raise RuntimeError(
                             "{0} runner status {1}".format(runner.__execution_type__, status))
-                    LOG.info("Runner ended, output in %s", output_file)
+                    LOG.info("Runner ended")
 
         # Abort background runners
         for runner in background_runners:
@@ -313,10 +314,10 @@ class Task(object):     # pragma: no cover
         else:
             return op
 
-    def run_one_scenario(self, scenario_cfg, output_file):
+    def run_one_scenario(self, scenario_cfg, output_config):
         """run one scenario using context"""
         runner_cfg = scenario_cfg["runner"]
-        runner_cfg['output_filename'] = output_file
+        runner_cfg['output_config'] = output_config
 
         options = scenario_cfg.get('options', {})
         scenario_cfg['options'] = self._parse_options(options)
index a887fa5..99386a4 100755 (executable)
@@ -23,6 +23,7 @@ import multiprocessing
 import subprocess
 import time
 import traceback
+from subprocess import CalledProcessError
 
 import importlib
 
@@ -30,6 +31,7 @@ from six.moves.queue import Empty
 
 import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.dispatcher.base import Base as DispatcherBase
 
 log = logging.getLogger(__name__)
 
@@ -39,7 +41,7 @@ def _execute_shell_command(command):
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except Exception:
+    except CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
@@ -137,6 +139,8 @@ class Runner(object):
             Runner.release(runner)
 
     def __init__(self, config):
+        self.task_id = None
+        self.case_name = None
         self.config = config
         self.periodic_action_process = None
         self.output_queue = multiprocessing.Queue()
@@ -170,6 +174,8 @@ class Runner(object):
         cls = getattr(module, path_split[-1])
 
         self.config['object'] = class_name
+        self.case_name = scenario_cfg['tc']
+        self.task_id = scenario_cfg['task_id']
         self.aborted.clear()
 
         # run a potentially configured pre-start action
@@ -245,10 +251,24 @@ class Runner(object):
 
     def get_result(self):
         result = []
+
+        dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
+        output_in_influxdb = 'influxdb' in dispatcher
+
         while not self.result_queue.empty():
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
-                result.append(self.result_queue.get(True, 1))
+                one_record = self.result_queue.get(True, 1)
             except Empty:
                 pass
+            else:
+                if output_in_influxdb:
+                    self._output_to_influxdb(one_record)
+
+                result.append(one_record)
         return result
+
+    def _output_to_influxdb(self, record):
+        dispatchers = DispatcherBase.get(self.config['output_config'])
+        dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
+        dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
index 632b433..e8c7cf5 100644 (file)
@@ -11,8 +11,10 @@ from __future__ import absolute_import
 
 import logging
 import time
+import os
 
 import requests
+from requests import ConnectionError
 
 from yardstick.common import utils
 from third_party.influxdb.influxdb_line_protocol import make_lines
@@ -38,7 +40,8 @@ class InfluxdbDispatcher(DispatchBase):
 
         self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
 
-        self.task_id = -1
+        self.task_id = None
+        self.tags = None
 
     def flush_result_data(self, data):
         LOG.debug('Test result all : %s', data)
@@ -57,28 +60,41 @@ class InfluxdbDispatcher(DispatchBase):
             for record in data['tc_data']:
                 # skip results with no data because we influxdb encode empty dicts
                 if record.get("data"):
-                    self._upload_one_record(record, case, tc_criteria)
+                    self.upload_one_record(record, case, tc_criteria)
 
         return 0
 
-    def _upload_one_record(self, data, case, tc_criteria):
+    def upload_one_record(self, data, case, tc_criteria, task_id=None):
+        if task_id:
+            self.task_id = task_id
+
+        line = self._data_to_line_protocol(data, case, tc_criteria)
+        LOG.debug('Test result line format : %s', line)
+
         try:
-            line = self._data_to_line_protocol(data, case, tc_criteria)
-            LOG.debug('Test result line format : %s', line)
             res = requests.post(self.influxdb_url,
                                 data=line,
                                 auth=(self.username, self.password),
                                 timeout=self.timeout)
+        except ConnectionError as err:
+            LOG.exception('Failed to record result data: %s', err)
+        else:
             if res.status_code != 204:
                 LOG.error('Test result posting finished with status code'
                           ' %d.', res.status_code)
                 LOG.error(res.text)
 
-        except Exception as err:
-            LOG.exception('Failed to record result data: %s', err)
-
     def _data_to_line_protocol(self, data, case, criteria):
         msg = {}
+
+        if not self.tags:
+            self.tags = {
+                'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
+                'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
+                'pod_name': os.environ.get('NODE_NAME', 'unknown'),
+                'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
+            }
+
         point = {
             "measurement": case,
             "fields": utils.flatten_dict_key(data["data"]),
@@ -93,7 +109,7 @@ class InfluxdbDispatcher(DispatchBase):
     def _get_nano_timestamp(self, results):
         try:
             timestamp = results["timestamp"]
-        except Exception:
+        except KeyError:
             timestamp = time.time()
 
         return str(int(float(timestamp) * 1000000000))
index bac035f..ee00d88 100644 (file)
@@ -48,8 +48,15 @@ class TaskTestCase(unittest.TestCase):
     def test__do_output(self, mock_dispatcher):
         t = task.Task()
         output_config = {"DEFAULT": {"dispatcher": "file, http"}}
-        mock_dispatcher.get = mock.MagicMock(return_value=[mock.MagicMock(),
-                                                           mock.MagicMock()])
+
+        dispatcher1 = mock.MagicMock()
+        dispatcher1.__dispatcher_type__ = 'file'
+
+        dispatcher2 = mock.MagicMock()
+        dispatcher2.__dispatcher_type__ = 'http'
+
+        mock_dispatcher.get = mock.MagicMock(return_value=[dispatcher1,
+                                                           dispatcher2])
         self.assertEqual(None, t._do_output(output_config, {}))
 
     @mock.patch.object(task, 'Context')
index 0fdc423..59739c5 100644 (file)
@@ -11,6 +11,8 @@ import time
 
 import mock
 import unittest
+from subprocess import CalledProcessError
+
 
 from yardstick.benchmark.runners import base
 from yardstick.benchmark.runners import iteration
@@ -20,19 +22,19 @@ class ActionTestCase(unittest.TestCase):
 
     @mock.patch("yardstick.benchmark.runners.base.subprocess")
     def test__execute_shell_command(self, mock_subprocess):
-        mock_subprocess.check_output.side_effect = Exception()
+        mock_subprocess.check_output.side_effect = CalledProcessError(-1, '')
 
         self.assertEqual(base._execute_shell_command("")[0], -1)
 
     @mock.patch("yardstick.benchmark.runners.base.subprocess")
     def test__single_action(self, mock_subprocess):
-        mock_subprocess.check_output.side_effect = Exception()
+        mock_subprocess.check_output.side_effect = CalledProcessError(-1, '')
 
         base._single_action(0, "echo", mock.MagicMock())
 
     @mock.patch("yardstick.benchmark.runners.base.subprocess")
     def test__periodic_action(self, mock_subprocess):
-        mock_subprocess.check_output.side_effect = Exception()
+        mock_subprocess.check_output.side_effect = CalledProcessError(-1, '')
 
         base._periodic_action(0, "echo", mock.MagicMock())
 
@@ -40,7 +42,14 @@ class ActionTestCase(unittest.TestCase):
 class RunnerTestCase(unittest.TestCase):
 
     def setUp(self):
-        self.runner = iteration.IterationRunner({})
+        config = {
+            'output_config': {
+                'DEFAULT': {
+                    'dispatcher': 'file'
+                }
+            }
+        }
+        self.runner = iteration.IterationRunner(config)
 
     @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
     def test_get_output(self, *args):