Initial InfluxDB dispatcher
[yardstick.git] / yardstick / dispatcher / influxdb_line_protocol.py
1 # yardstick comment: this file is a modified copy of
2 # influxdb-python/influxdb/line_protocol.py
3
4 from __future__ import unicode_literals
5 from copy import copy
6
7 from six import binary_type, text_type, integer_types
8
9
10 def _escape_tag(tag):
11     tag = _get_unicode(tag, force=True)
12     return tag.replace(
13         "\\", "\\\\"
14     ).replace(
15         " ", "\\ "
16     ).replace(
17         ",", "\\,"
18     ).replace(
19         "=", "\\="
20     )
21
22
23 def _escape_value(value):
24     value = _get_unicode(value)
25     if isinstance(value, text_type) and value != '':
26         return "\"{}\"".format(
27             value.replace(
28                 "\"", "\\\""
29             ).replace(
30                 "\n", "\\n"
31             )
32         )
33     elif isinstance(value, integer_types) and not isinstance(value, bool):
34         return str(value) + 'i'
35     else:
36         return str(value)
37
38
39 def _get_unicode(data, force=False):
40     """
41     Try to return a text aka unicode object from the given data.
42     """
43     if isinstance(data, binary_type):
44         return data.decode('utf-8')
45     elif data is None:
46         return ''
47     elif force:
48         return str(data)
49     else:
50         return data
51
52
53 def make_lines(data):
54     """
55     Extracts the points from the given dict and returns a Unicode string
56     matching the line protocol introduced in InfluxDB 0.9.0.
57
58     line protocol format:
59         <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>\
60             [,<field2-key>=<field2-value>...] [unix-nano-timestamp]
61
62     Ref:
63         https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html
64         https://influxdb.com/docs/v0.9/write_protocols/line.html
65     """
66     lines = []
67     static_tags = data.get('tags', None)
68     for point in data['points']:
69         elements = []
70
71         # add measurement name
72         measurement = _escape_tag(_get_unicode(
73             point.get('measurement', data.get('measurement'))
74         ))
75         key_values = [measurement]
76
77         # add tags
78         if static_tags is None:
79             tags = point.get('tags', {})
80         else:
81             tags = copy(static_tags)
82             tags.update(point.get('tags', {}))
83
84         # tags should be sorted client-side to take load off server
85         for tag_key in sorted(tags.keys()):
86             key = _escape_tag(tag_key)
87             value = _escape_tag(tags[tag_key])
88
89             if key != '' and value != '':
90                 key_values.append("{key}={value}".format(key=key, value=value))
91         key_values = ','.join(key_values)
92         elements.append(key_values)
93
94         # add fields
95         field_values = []
96         for field_key in sorted(point['fields'].keys()):
97             key = _escape_tag(field_key)
98             value = _escape_value(point['fields'][field_key])
99             if key != '' and value != '':
100                 field_values.append("{key}={value}".format(
101                     key=key,
102                     value=value
103                 ))
104         field_values = ','.join(field_values)
105         elements.append(field_values)
106
107         # add timestamp
108         if 'time' in point:
109             elements.append(point['time'])
110
111         line = ' '.join(elements)
112         lines.append(line)
113     lines = '\n'.join(lines)
114     return lines + '\n'