5a5af003bbc1651ff881ceeaa9a7aa34fba465cb
[yardstick.git] / yardstick / dispatcher / influxdb.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 from __future__ import absolute_import
11
12 import logging
13 import os
14 import time
15
16 import collections
17 import requests
18 import six
19 from oslo_config import cfg
20 from oslo_serialization import jsonutils
21
22 from third_party.influxdb.influxdb_line_protocol import make_lines
23 from yardstick.dispatcher.base import Base as DispatchBase
24
25 LOG = logging.getLogger(__name__)
26
27 CONF = cfg.CONF
28 influx_dispatcher_opts = [
29     cfg.StrOpt('target',
30                default='http://127.0.0.1:8086',
31                help='The target where the http request will be sent. '
32                     'If this is not set, no data will be posted. For '
33                     'example: target = http://hostname:1234/path'),
34     cfg.StrOpt('db_name',
35                default='yardstick',
36                help='The database name to store test results.'),
37     cfg.StrOpt('username',
38                default='root',
39                help='The user name to access database.'),
40     cfg.StrOpt('password',
41                default='root',
42                help='The user password to access database.'),
43     cfg.IntOpt('timeout',
44                default=5,
45                help='The max time in seconds to wait for a request to '
46                     'timeout.'),
47 ]
48
49 CONF.register_opts(influx_dispatcher_opts, group="dispatcher_influxdb")
50
51
52 class InfluxdbDispatcher(DispatchBase):
53     """Dispatcher class for posting data into an influxdb target.
54     """
55
56     __dispatcher_type__ = "Influxdb"
57
58     def __init__(self, conf):
59         super(InfluxdbDispatcher, self).__init__(conf)
60         self.timeout = CONF.dispatcher_influxdb.timeout
61         self.target = CONF.dispatcher_influxdb.target
62         self.db_name = CONF.dispatcher_influxdb.db_name
63         self.username = CONF.dispatcher_influxdb.username
64         self.password = CONF.dispatcher_influxdb.password
65         self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
66         self.raw_result = []
67         self.case_name = ""
68         self.tc = ""
69         self.task_id = -1
70         self.runners_info = {}
71         self.static_tags = {
72             "pod_name": os.environ.get('NODE_NAME', 'unknown'),
73             "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
74             "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'),
75             "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH',
76                                                        'unknown'))
77
78         }
79
80     def _dict_key_flatten(self, data):
81         next_data = {}
82
83         # use list, because iterable is too generic
84         if not [v for v in data.values() if
85                 isinstance(v, (collections.Mapping, list))]:
86             return data
87
88         for k, v in six.iteritems(data):
89             if isinstance(v, collections.Mapping):
90                 for n_k, n_v in six.iteritems(v):
91                     next_data["%s.%s" % (k, n_k)] = n_v
92             # use list because iterable is too generic
93             elif isinstance(v, list):
94                 for index, item in enumerate(v):
95                     next_data["%s%d" % (k, index)] = item
96             else:
97                 next_data[k] = v
98
99         return self._dict_key_flatten(next_data)
100
101     def _get_nano_timestamp(self, results):
102         try:
103             timestamp = results["benchmark"]["timestamp"]
104         except Exception:
105             timestamp = time.time()
106
107         return str(int(float(timestamp) * 1000000000))
108
109     def _get_extended_tags(self, data):
110         runner_info = self.runners_info[data["runner_id"]]
111         tags = {
112             "runner_id": data["runner_id"],
113             "task_id": self.task_id,
114             "scenarios": runner_info["scenarios"]
115         }
116         if "host" in runner_info:
117             tags["host"] = runner_info["host"]
118         if "target" in runner_info:
119             tags["target"] = runner_info["target"]
120
121         return tags
122
123     def _data_to_line_protocol(self, data):
124         msg = {}
125         point = {
126             "measurement": self.tc,
127             "fields": self._dict_key_flatten(data["benchmark"]["data"]),
128             "time": self._get_nano_timestamp(data),
129             "tags": self._get_extended_tags(data),
130         }
131         msg["points"] = [point]
132         msg["tags"] = self.static_tags
133
134         return make_lines(msg).encode('utf-8')
135
136     def record_result_data(self, data):
137         LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data))
138         self.raw_result.append(data)
139         if self.target == '':
140             # if the target was not set, do not do anything
141             LOG.error('Dispatcher target was not set, no data will'
142                       'be posted.')
143             return -1
144
145         if isinstance(data, dict) and "scenario_cfg" in data:
146             self.tc = data["scenario_cfg"]["tc"]
147             self.task_id = data["scenario_cfg"]["task_id"]
148             scenario_cfg = data["scenario_cfg"]
149             runner_id = data["runner_id"]
150             self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]}
151             if "host" in scenario_cfg:
152                 self.runners_info[runner_id]["host"] = scenario_cfg["host"]
153             if "target" in scenario_cfg:
154                 self.runners_info[runner_id]["target"] = scenario_cfg["target"]
155             return 0
156
157         if self.tc == "":
158             LOG.error('Test result : %s', jsonutils.dump_as_bytes(data))
159             LOG.error('The case_name cannot be found, no data will be posted.')
160             return -1
161
162         try:
163             line = self._data_to_line_protocol(data)
164             LOG.debug('Test result line format : %s', line)
165             res = requests.post(self.influxdb_url,
166                                 data=line,
167                                 auth=(self.username, self.password),
168                                 timeout=self.timeout)
169             if res.status_code != 204:
170                 LOG.error('Test result posting finished with status code'
171                           ' %d.', res.status_code)
172                 LOG.error(res.text)
173
174         except Exception as err:
175             LOG.exception('Failed to record result data: %s',
176                           err)
177             return -1
178         return 0
179
180     def flush_result_data(self):
181         LOG.debug('Test result all : %s',
182                   jsonutils.dump_as_bytes(self.raw_result))
183         return 0