Collectd: Additional metrics storing options
[vswitchperf.git] / tools / collectors / collectd / collectd.py
1 # Copyright 2017-2018 Spirent Communications.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """
16 Collects samples from collectd through collectd_bucky.
17 Depending on the policy - decides to keep the sample or discard.
18 Plot the values of the stored samples once the test is completed
19 """
20
21 import copy
22 import csv
23 import glob
24 import logging
25 import multiprocessing
26 import os
27 from collections import OrderedDict
28 import queue
29
30 import matplotlib.pyplot as plt
31 import numpy as np
32 import tools.collectors.collectd.collectd_bucky as cb
33 from tools.collectors.collector import collector
34 from tools import tasks
35 from conf import settings
36
37 # The y-lables. Keys in this dictionary are used as y-labels.
38 YLABELS = {'No/Of Packets': ['dropped', 'packets', 'if_octets', 'errors',
39                              'if_rx_octets', 'if_tx_octets'],
40            'Jiffies': ['cputime'],
41            'Bandwidth b/s': ['memory_bandwidth'],
42            'Bytes': ['bytes.llc']}
43
44
45 def get_label(sample):
46     """
47     Returns the y-label for the plot.
48     """
49     for label in YLABELS:
50         if any(r in sample for r in YLABELS[label]):
51             return label
52     return None
53
54
55 def plot_graphs(dict_of_arrays):
56     """
57     Plot the values
58     Store the data used for plotting.
59     """
60     i = 1
61     results_dir = settings.getValue('RESULTS_PATH')
62     for key in dict_of_arrays:
63         tup_list = dict_of_arrays[key]
64         two_lists = list(map(list, zip(*tup_list)))
65         y_axis_list = two_lists[0]
66         x_axis_list = two_lists[1]
67         if np.count_nonzero(y_axis_list) > 0:
68             with open(os.path.join(results_dir,
69                                    str(key) + '.data'), "w") as pfile:
70                 writer = csv.writer(pfile, delimiter='\t')
71                 writer.writerows(zip(x_axis_list, y_axis_list))
72             plt.figure(i)
73             plt.plot(x_axis_list, y_axis_list)
74             plt.xlabel("Time (Ticks)")
75             plt.ylabel(get_label(key))
76             plt.savefig(os.path.join(results_dir, str(key) + '.png'))
77             plt.cla()
78             plt.clf()
79             plt.close()
80             i = i + 1
81
82
83 def get_results_to_print(dict_of_arrays):
84     """
85     Return a results dictionary for report tool to
86     print the process-statistics.
87     """
88     presults = OrderedDict()
89     results = OrderedDict()
90     for key in dict_of_arrays:
91         if ('processes' in key and
92                 any(proc in key for proc in ['ovs', 'vpp', 'qemu'])):
93             reskey = '.'.join(key.split('.')[2:])
94             preskey = key.split('.')[1] + '_collectd'
95             tup_list = dict_of_arrays[key]
96             two_lists = list(map(list, zip(*tup_list)))
97             y_axis_list = two_lists[0]
98             mean = 0.0
99             if np.count_nonzero(y_axis_list) > 0:
100                 mean = np.mean(y_axis_list)
101             results[reskey] = mean
102             presults[preskey] = results
103     return presults
104
105
106 class Receiver(multiprocessing.Process):
107     """
108     Wrapper Receiver (of samples) class
109     """
110     def __init__(self, pd_dict, control):
111         """
112         Initialize.
113         A queue will be shared with collectd_bucky
114         """
115         super(Receiver, self).__init__()
116         self.daemon = False
117         self.q_of_samples = multiprocessing.Queue()
118         self.server = cb.get_collectd_server(self.q_of_samples)
119         self.control = control
120         self.pd_dict = pd_dict
121         self.collectd_cpu_keys = settings.getValue('COLLECTD_CPU_KEYS')
122         self.collectd_processes_keys = settings.getValue(
123             'COLLECTD_PROCESSES_KEYS')
124         self.collectd_iface_keys = settings.getValue(
125             'COLLECTD_INTERFACE_KEYS')
126         self.collectd_iface_xkeys = settings.getValue(
127             'COLLECTD_INTERFACE_XKEYS')
128         self.collectd_intelrdt_keys = settings.getValue(
129             'COLLECTD_INTELRDT_KEYS')
130         self.collectd_ovsstats_keys = settings.getValue(
131             'COLLECTD_OVSSTAT_KEYS')
132         self.collectd_dpdkstats_keys = settings.getValue(
133             'COLLECTD_DPDKSTAT_KEYS')
134         self.collectd_intelrdt_xkeys = settings.getValue(
135             'COLLECTD_INTELRDT_XKEYS')
136         self.exclude_coreids = []
137         # Expand the ranges in the intelrdt-xkeys
138         for xkey in self.collectd_intelrdt_xkeys:
139             if '-' not in xkey:
140                 self.exclude_coreids.append(int(xkey))
141             else:
142                 left, right = map(int, xkey.split('-'))
143                 self.exclude_coreids += range(left, right + 1)
144
145     def run(self):
146         """
147         Start receiving the samples.
148         """
149         while not self.control.value:
150             try:
151                 sample = self.q_of_samples.get(True, 1)
152                 if not sample:
153                     break
154                 self.handle(sample)
155             except queue.Empty:
156                 pass
157             except IOError:
158                 continue
159             except (ValueError, IndexError, KeyError, MemoryError):
160                 self.stop()
161                 break
162
163     # pylint: disable=too-many-boolean-expressions
164     def handle(self, sample):
165         ''' Store values and names if names matches following:
166             1. cpu + keys
167             2. processes + keys
168             3. interface + keys +  !xkeys
169             4. ovs_stats + keys
170             5. dpdkstat + keys
171             6. intel_rdt + keys + !xkeys
172             sample[1] is the name of the sample, which is . separated strings.
173             The first field in sample[1] is the type - cpu, proceesses, etc.
174             For intel_rdt, the second field contains the core-id, which is
175             used to make the decision on 'exclusions'
176             sample[0]: Contains the host information - which is not considered.
177             sample[2]: Contains the Value.
178             sample[3]: Contains the Time (in ticks)
179             '''
180         if (('cpu' in sample[1] and
181              any(c in sample[1] for c in self.collectd_cpu_keys)) or
182                 ('processes' in sample[1] and
183                  any(p in sample[1] for p in self.collectd_processes_keys)) or
184                 ('interface' in sample[1] and
185                  (any(i in sample[1] for i in self.collectd_iface_keys) and
186                   any(x not in sample[1]
187                       for x in self.collectd_iface_xkeys))) or
188                 ('ovs_stats' in sample[1] and
189                  any(o in sample[1] for o in self.collectd_ovsstats_keys)) or
190                 ('dpdkstat' in sample[1] and
191                  any(d in sample[1] for d in self.collectd_dpdkstats_keys)) or
192                 ('intel_rdt' in sample[1] and
193                  any(r in sample[1] for r in self.collectd_intelrdt_keys) and
194                  (int(sample[1].split('.')[1]) not in self.exclude_coreids))):
195             if sample[1] not in self.pd_dict:
196                 self.pd_dict[sample[1]] = list()
197             val = self.pd_dict[sample[1]]
198             val.append((sample[2], sample[3]))
199             self.pd_dict[sample[1]] = val
200             logging.debug("COLLECTD  %s", ' '.join(str(p) for p in sample))
201
202     def stop(self):
203         """
204         Stop receiving the samples.
205         """
206         self.server.close()
207         self.q_of_samples.put(None)
208         self.control.value = True
209
210
211 # inherit from collector.Icollector.
212 class Collectd(collector.ICollector):
213     """A collector of system statistics based on collectd
214
215     It starts a UDP server, receives metrics from collectd
216     and plot the results.
217     """
218
219     def __init__(self, results_dir, test_name):
220         """
221         Initialize collection of statistics
222         """
223         self.logger = logging.getLogger(__name__)
224         self.resultsdir = results_dir
225         self.testname = test_name
226         self.results = {}
227         self.sample_dict = multiprocessing.Manager().dict()
228         self.control = multiprocessing.Value('b', False)
229         self.receiver = Receiver(self.sample_dict, self.control)
230         self.cleanup_metrics()
231         # Assumption: collected is installed at /opt/collectd
232         # And collected is configured to write to csv at /tmp/csv
233         self.pid = tasks.run_background_task(
234             ['sudo', '/opt/collectd/sbin/collectd'],
235             self.logger, 'Staring Collectd')
236
237     def cleanup_metrics(self):
238         """
239         Cleaup the old or archived metrics
240         """
241         for name in glob.glob(os.path.join('/tmp/csv/', '*')):
242             tasks.run_task(['sudo', 'rm', '-rf', name], self.logger,
243                            'Cleaning up Metrics', True)
244
245     def start(self):
246         """
247         Start receiving samples
248         """
249         self.receiver.server.start()
250         self.receiver.start()
251
252     def stop(self):
253         """
254         Stop receiving samples
255         """
256         tasks.terminate_task_subtree(self.pid, logger=self.logger)
257         # At times collectd fails to fully terminate.
258         # Killing process by name too helps.
259         tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'],
260                        self.logger, 'Stopping Collectd', True)
261         self.control.value = True
262         self.receiver.stop()
263         self.receiver.server.join(5)
264         self.receiver.join(5)
265         if self.receiver.server.is_alive():
266             self.receiver.server.terminate()
267         if self.receiver.is_alive():
268             self.receiver.terminate()
269         self.results = copy.deepcopy(self.sample_dict)
270         # Backup the collectd-metrics for this test into a zipfile
271         filename = ('/tmp/collectd-' + settings.getValue('LOG_TIMESTAMP') +
272                     '.tar.gz')
273         tasks.run_task(['sudo', 'tar', '-czvf', filename, '/tmp/csv/'],
274                        self.logger, 'Zipping File', True)
275         self.cleanup_metrics()
276
277     def get_results(self):
278         """
279         Return the results.
280         """
281         return get_results_to_print(self.results)
282
283     def print_results(self):
284         """
285         Print - Plot and save raw-data.
286         log the collected statistics
287         """
288         plot_graphs(self.results)
289         proc_stats = get_results_to_print(self.results)
290         for process in proc_stats:
291             logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
292             for(key, value) in proc_stats[process].items():
293                 logging.info("         Statistic: " + str(key) +
294                              ", Value: " + str(value))