Initial InfluxDB dispatcher 41/5141/9
authorQiLiang <liangqi1@huawei.com>
Wed, 23 Dec 2015 14:13:15 +0000 (22:13 +0800)
committerqi liang <liangqi1@huawei.com>
Thu, 7 Jan 2016 03:53:37 +0000 (03:53 +0000)
Supports:
    - Basic influxDB write with timestamp
    - Add general result format func
    - Add UT

TODO:
    - refine database schema (e.g. add more tags) plan in another patch

JIRA: YARDSTICK-212

Change-Id: I1526568bbd850f1343135420ec59ed1b833bb99f
Signed-off-by: QiLiang <liangqi1@huawei.com>
etc/yardstick/yardstick.conf.sample
tests/unit/dispatcher/__init__.py [new file with mode: 0644]
tests/unit/dispatcher/test_influxdb.py [new file with mode: 0644]
tests/unit/dispatcher/test_influxdb_line_protocol.py [new file with mode: 0644]
yardstick/dispatcher/influxdb.py [new file with mode: 0644]
yardstick/dispatcher/influxdb_line_protocol.py [new file with mode: 0644]

index 82326dd..63462c5 100644 (file)
@@ -11,3 +11,8 @@
 # file_path = /tmp/yardstick.out
 # max_bytes = 0
 # backup_count = 0
+
+[dispatcher_influxdb]
+# timeout = 5
+# target = http://127.0.0.1:8086
+# db_name = yardstick
diff --git a/tests/unit/dispatcher/__init__.py b/tests/unit/dispatcher/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py
new file mode 100644 (file)
index 0000000..3989f58
--- /dev/null
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+
+##############################################################################
+# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# Unittest for yardstick.dispatcher.influxdb
+
+import mock
+import unittest
+
+from yardstick.dispatcher.influxdb import InfluxdbDispatcher
+
+class InfluxdbDispatcherTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.data1 = {
+            "runner_id": 8921,
+            "context_cfg": {
+                "host": {
+                    "ip": "10.229.43.154",
+                    "key_filename": "/root/yardstick/yardstick/resources/files/yardstick_key",
+                    "name": "kvm.LF",
+                    "user": "root"
+                },
+                "target": {
+                    "ipaddr": "10.229.44.134"
+                }
+            },
+            "scenario_cfg": {
+                "runner": {
+                    "interval": 1,
+                    "object": "yardstick.benchmark.scenarios.networking.ping.Ping",
+                    "output_filename": "/tmp/yardstick.out",
+                    "runner_id": 8921,
+                    "duration": 10,
+                    "type": "Duration"
+                },
+                "host": "kvm.LF",
+                "type": "Ping",
+                "target": "10.229.44.134",
+                "sla": {
+                    "action": "monitor",
+                    "max_rtt": 10
+                }
+            }
+        }
+        self.data2 = {
+            "benchmark": {
+                "timestamp": "1451478117.883505",
+                "errors": "",
+                "data": {
+                    "rtt": 0.613
+                },
+                "sequence": 1
+            },
+            "runner_id": 8921
+        }
+        self.data3 ={
+            "benchmark": {
+                "data": {
+                    "mpstat": {
+                        "cpu0": {
+                            "%sys": "0.00",
+                            "%idle": "99.00"
+                        },
+                        "loadavg": [
+                            "1.09",
+                            "0.29"
+                        ]
+                    },
+                    "rtt": "1.03"
+                }
+            }
+        }
+
+    def test_record_result_data_no_target(self):
+        influxdb = InfluxdbDispatcher(None)
+        influxdb.target = ''
+        self.assertEqual(influxdb.record_result_data(self.data1), -1)
+
+    def test_record_result_data_no_case_name(self):
+        influxdb = InfluxdbDispatcher(None)
+        self.assertEqual(influxdb.record_result_data(self.data2), -1)
+
+    @mock.patch('yardstick.dispatcher.influxdb.requests')
+    def test_record_result_data(self, mock_requests):
+        type(mock_requests.post.return_value).status_code = 204
+        influxdb = InfluxdbDispatcher(None)
+        self.assertEqual(influxdb.record_result_data(self.data1), 0)
+        self.assertEqual(influxdb.record_result_data(self.data2), 0)
+        self.assertEqual(influxdb.flush_result_data(), 0)
+
+    def test__dict_key_flatten(self):
+        line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00'
+        influxdb = InfluxdbDispatcher(None)
+        flattened_data = influxdb._dict_key_flatten(self.data3['benchmark']['data'])
+        result = ",".join([k+"="+v for k, v in flattened_data.items()])
+        self.assertEqual(result, line)
+
+    def test__get_nano_timestamp(self):
+        influxdb = InfluxdbDispatcher(None)
+        results = {'benchmark': {'timestamp': '1451461248.925574'}}
+        self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144')
+
+    @mock.patch('yardstick.dispatcher.influxdb.time')
+    def test__get_nano_timestamp_except(self, mock_time):
+        results = {}
+        influxdb = InfluxdbDispatcher(None)
+        mock_time.time.return_value = 1451461248.925574
+        self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144')
+
+def main():
+    unittest.main()
+
+if __name__ == '__main__':
+    main()
diff --git a/tests/unit/dispatcher/test_influxdb_line_protocol.py b/tests/unit/dispatcher/test_influxdb_line_protocol.py
new file mode 100644 (file)
index 0000000..cb05bf4
--- /dev/null
@@ -0,0 +1,55 @@
+# Unittest for yardstick.dispatcher.influxdb_line_protocol
+
+# yardstick comment: this file is a modified copy of
+# influxdb-python/influxdb/tests/test_line_protocol.py
+
+import unittest
+from yardstick.dispatcher.influxdb_line_protocol import make_lines
+
+
+class TestLineProtocol(unittest.TestCase):
+
+    def test_make_lines(self):
+        data = {
+            "tags": {
+                "empty_tag": "",
+                "none_tag": None,
+                "integer_tag": 2,
+                "string_tag": "hello"
+            },
+            "points": [
+                {
+                    "measurement": "test",
+                    "fields": {
+                        "string_val": "hello!",
+                        "int_val": 1,
+                        "float_val": 1.1,
+                        "none_field": None,
+                        "bool_val": True,
+                    }
+                }
+            ]
+        }
+
+        self.assertEqual(
+            make_lines(data),
+            'test,integer_tag=2,string_tag=hello '
+            'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n'
+        )
+
+    def test_string_val_newline(self):
+        data = {
+            "points": [
+                {
+                    "measurement": "m1",
+                    "fields": {
+                        "multi_line": "line1\nline1\nline3"
+                    }
+                }
+            ]
+        }
+
+        self.assertEqual(
+            make_lines(data),
+            'm1 multi_line="line1\\nline1\\nline3"\n'
+        )
diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py
new file mode 100644 (file)
index 0000000..c580541
--- /dev/null
@@ -0,0 +1,135 @@
+##############################################################################
+# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import json
+import logging
+import requests
+import time
+
+from oslo_config import cfg
+
+from yardstick.dispatcher.base import Base as DispatchBase
+from yardstick.dispatcher.influxdb_line_protocol import make_lines
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+influx_dispatcher_opts = [
+    cfg.StrOpt('target',
+               default='http://127.0.0.1:8086',
+               help='The target where the http request will be sent. '
+                    'If this is not set, no data will be posted. For '
+                    'example: target = http://hostname:1234/path'),
+    cfg.StrOpt('db_name',
+               default='yardstick',
+               help='The database name to store test results.'),
+    cfg.IntOpt('timeout',
+               default=5,
+               help='The max time in seconds to wait for a request to '
+                    'timeout.'),
+]
+
+CONF.register_opts(influx_dispatcher_opts, group="dispatcher_influxdb")
+
+
+class InfluxdbDispatcher(DispatchBase):
+    """Dispatcher class for posting data into an influxdb target.
+    """
+
+    __dispatcher_type__ = "Influxdb"
+
+    def __init__(self, conf):
+        super(InfluxdbDispatcher, self).__init__(conf)
+        self.timeout = CONF.dispatcher_influxdb.timeout
+        self.target = CONF.dispatcher_influxdb.target
+        self.db_name = CONF.dispatcher_influxdb.db_name
+        self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
+        self.raw_result = []
+        self.case_name = ""
+        self.static_tags = {
+            "pod_name": os.environ.get('POD_NAME', 'unknown'),
+            "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
+            "version": os.environ.get('YARDSTICK_VERSION', 'unknown')
+        }
+
+    def _dict_key_flatten(self, data):
+        next_data = {}
+
+        if not [v for v in data.values()
+                if type(v) == dict or type(v) == list]:
+            return data
+
+        for k, v in data.iteritems():
+            if type(v) == dict:
+                for n_k, n_v in v.iteritems():
+                    next_data["%s.%s" % (k, n_k)] = n_v
+            elif type(v) == list:
+                for index, item in enumerate(v):
+                    next_data["%s%d" % (k, index)] = item
+            else:
+                next_data[k] = v
+
+        return self._dict_key_flatten(next_data)
+
+    def _get_nano_timestamp(self, results):
+        try:
+            timestamp = results["benchmark"]["timestamp"]
+        except Exception:
+            timestamp = time.time()
+
+        return str(int(float(timestamp) * 1000000000))
+
+    def _data_to_line_protocol(self, data):
+        msg = {}
+        point = {}
+        point["measurement"] = self.case_name
+        point["fields"] = self._dict_key_flatten(data["benchmark"]["data"])
+        point["time"] = self._get_nano_timestamp(data)
+        msg["points"] = [point]
+        msg["tags"] = self.static_tags
+
+        return make_lines(msg).encode('utf-8')
+
+    def record_result_data(self, data):
+        LOG.debug('Test result : %s' % json.dumps(data))
+        self.raw_result.append(data)
+        if self.target == '':
+            # if the target was not set, do not do anything
+            LOG.error('Dispatcher target was not set, no data will'
+                      'be posted.')
+            return -1
+
+        if isinstance(data, dict) and "scenario_cfg" in data:
+            self.case_name = data["scenario_cfg"]["type"]
+            return 0
+
+        if self.case_name == "":
+            LOG.error('Test result : %s' % json.dumps(data))
+            LOG.error('The case_name cannot be found, no data will be posted.')
+            return -1
+
+        try:
+            line = self._data_to_line_protocol(data)
+            LOG.debug('Test result line format : %s' % line)
+            res = requests.post(self.influxdb_url,
+                                data=line,
+                                timeout=self.timeout)
+            if res.status_code != 204:
+                LOG.error('Test result posting finished with status code'
+                          ' %d.' % res.status_code)
+        except Exception as err:
+            LOG.exception('Failed to record result data: %s',
+                          err)
+            return -1
+        return 0
+
+    def flush_result_data(self):
+        LOG.debug('Test result all : %s' % json.dumps(self.raw_result))
+        return 0
diff --git a/yardstick/dispatcher/influxdb_line_protocol.py b/yardstick/dispatcher/influxdb_line_protocol.py
new file mode 100644 (file)
index 0000000..3e830ed
--- /dev/null
@@ -0,0 +1,114 @@
+# yardstick comment: this file is a modified copy of
+# influxdb-python/influxdb/line_protocol.py
+
+from __future__ import unicode_literals
+from copy import copy
+
+from six import binary_type, text_type, integer_types
+
+
+def _escape_tag(tag):
+    tag = _get_unicode(tag, force=True)
+    return tag.replace(
+        "\\", "\\\\"
+    ).replace(
+        " ", "\\ "
+    ).replace(
+        ",", "\\,"
+    ).replace(
+        "=", "\\="
+    )
+
+
+def _escape_value(value):
+    value = _get_unicode(value)
+    if isinstance(value, text_type) and value != '':
+        return "\"{}\"".format(
+            value.replace(
+                "\"", "\\\""
+            ).replace(
+                "\n", "\\n"
+            )
+        )
+    elif isinstance(value, integer_types) and not isinstance(value, bool):
+        return str(value) + 'i'
+    else:
+        return str(value)
+
+
+def _get_unicode(data, force=False):
+    """
+    Try to return a text aka unicode object from the given data.
+    """
+    if isinstance(data, binary_type):
+        return data.decode('utf-8')
+    elif data is None:
+        return ''
+    elif force:
+        return str(data)
+    else:
+        return data
+
+
+def make_lines(data):
+    """
+    Extracts the points from the given dict and returns a Unicode string
+    matching the line protocol introduced in InfluxDB 0.9.0.
+
+    line protocol format:
+        <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>\
+            [,<field2-key>=<field2-value>...] [unix-nano-timestamp]
+
+    Ref:
+        https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html
+        https://influxdb.com/docs/v0.9/write_protocols/line.html
+    """
+    lines = []
+    static_tags = data.get('tags', None)
+    for point in data['points']:
+        elements = []
+
+        # add measurement name
+        measurement = _escape_tag(_get_unicode(
+            point.get('measurement', data.get('measurement'))
+        ))
+        key_values = [measurement]
+
+        # add tags
+        if static_tags is None:
+            tags = point.get('tags', {})
+        else:
+            tags = copy(static_tags)
+            tags.update(point.get('tags', {}))
+
+        # tags should be sorted client-side to take load off server
+        for tag_key in sorted(tags.keys()):
+            key = _escape_tag(tag_key)
+            value = _escape_tag(tags[tag_key])
+
+            if key != '' and value != '':
+                key_values.append("{key}={value}".format(key=key, value=value))
+        key_values = ','.join(key_values)
+        elements.append(key_values)
+
+        # add fields
+        field_values = []
+        for field_key in sorted(point['fields'].keys()):
+            key = _escape_tag(field_key)
+            value = _escape_value(point['fields'][field_key])
+            if key != '' and value != '':
+                field_values.append("{key}={value}".format(
+                    key=key,
+                    value=value
+                ))
+        field_values = ','.join(field_values)
+        elements.append(field_values)
+
+        # add timestamp
+        if 'time' in point:
+            elements.append(point['time'])
+
+        line = ' '.join(elements)
+        lines.append(line)
+    lines = '\n'.join(lines)
+    return lines + '\n'