Merge "conf/testcases: add new continuous tests"
[vswitchperf.git] / tools / collectors / collectd / collectd.py
1 # Copyright 2017-2018 Spirent Communications.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """
16 Collects samples from collectd through collectd_bucky.
17 Depending on the policy - decides to keep the sample or discard.
18 Plot the values of the stored samples once the test is completed
19 """
20
21 import copy
22 import csv
23 import logging
24 import multiprocessing
25 import os
26 from collections import OrderedDict
27 import queue
28
29 import matplotlib.pyplot as plt
30 import numpy as np
31 import tools.collectors.collectd.collectd_bucky as cb
32 from tools.collectors.collector import collector
33 from conf import settings
34
35 # The y-lables. Keys in this dictionary are used as y-labels.
36 YLABELS = {'No/Of Packets': ['dropped', 'packets', 'if_octets', 'errors',
37                              'if_rx_octets', 'if_tx_octets'],
38            'Jiffies': ['cputime'],
39            'Bandwidth b/s': ['memory_bandwidth'],
40            'Bytes': ['bytes.llc']}
41
42
43 def get_label(sample):
44     """
45     Returns the y-label for the plot.
46     """
47     for label in YLABELS:
48         if any(r in sample for r in YLABELS[label]):
49             return label
50     return None
51
52 def plot_graphs(dict_of_arrays):
53     """
54     Plot the values
55     Store the data used for plotting.
56     """
57     i = 1
58     results_dir = settings.getValue('RESULTS_PATH')
59     for key in dict_of_arrays:
60         tup_list = dict_of_arrays[key]
61         two_lists = list(map(list, zip(*tup_list)))
62         y_axis_list = two_lists[0]
63         x_axis_list = two_lists[1]
64         if np.count_nonzero(y_axis_list) > 0:
65             with open(os.path.join(results_dir,
66                                    str(key) + '.data'), "w") as pfile:
67                 writer = csv.writer(pfile, delimiter='\t')
68                 writer.writerows(zip(x_axis_list, y_axis_list))
69             plt.figure(i)
70             plt.plot(x_axis_list, y_axis_list)
71             plt.xlabel("Time (Ticks)")
72             plt.ylabel(get_label(key))
73             plt.savefig(os.path.join(results_dir, str(key) + '.png'))
74             plt.cla()
75             plt.clf()
76             plt.close()
77             i = i + 1
78
79
80 def get_results_to_print(dict_of_arrays):
81     """
82     Return a results dictionary for report tool to
83     print the process-statistics.
84     """
85     presults = OrderedDict()
86     results = OrderedDict()
87     for key in dict_of_arrays:
88         if ('processes' in key and
89                 any(proc in key for proc in ['ovs', 'vpp', 'qemu'])):
90             reskey = '.'.join(key.split('.')[2:])
91             preskey = key.split('.')[1] + '_collectd'
92             tup_list = dict_of_arrays[key]
93             two_lists = list(map(list, zip(*tup_list)))
94             y_axis_list = two_lists[0]
95             mean = 0.0
96             if np.count_nonzero(y_axis_list) > 0:
97                 mean = np.mean(y_axis_list)
98             results[reskey] = mean
99             presults[preskey] = results
100     return presults
101
102
103 class Receiver(multiprocessing.Process):
104     """
105     Wrapper Receiver (of samples) class
106     """
107     def __init__(self, pd_dict, control):
108         """
109         Initialize.
110         A queue will be shared with collectd_bucky
111         """
112         super(Receiver, self).__init__()
113         self.daemon = False
114         self.q_of_samples = multiprocessing.Queue()
115         self.server = cb.get_collectd_server(self.q_of_samples)
116         self.control = control
117         self.pd_dict = pd_dict
118         self.collectd_cpu_keys = settings.getValue('COLLECTD_CPU_KEYS')
119         self.collectd_processes_keys = settings.getValue(
120             'COLLECTD_PROCESSES_KEYS')
121         self.collectd_iface_keys = settings.getValue(
122             'COLLECTD_INTERFACE_KEYS')
123         self.collectd_iface_xkeys = settings.getValue(
124             'COLLECTD_INTERFACE_XKEYS')
125         self.collectd_intelrdt_keys = settings.getValue(
126             'COLLECTD_INTELRDT_KEYS')
127         self.collectd_ovsstats_keys = settings.getValue(
128             'COLLECTD_OVSSTAT_KEYS')
129         self.collectd_dpdkstats_keys = settings.getValue(
130             'COLLECTD_DPDKSTAT_KEYS')
131         self.collectd_intelrdt_xkeys = settings.getValue(
132             'COLLECTD_INTELRDT_XKEYS')
133         self.exclude_coreids = []
134         # Expand the ranges in the intelrdt-xkeys
135         for xkey in self.collectd_intelrdt_xkeys:
136             if '-' not in xkey:
137                 self.exclude_coreids.append(int(xkey))
138             else:
139                 left, right = map(int, xkey.split('-'))
140                 self.exclude_coreids += range(left, right + 1)
141
142     def run(self):
143         """
144         Start receiving the samples.
145         """
146         while not self.control.value:
147             try:
148                 sample = self.q_of_samples.get(True, 1)
149                 if not sample:
150                     break
151                 self.handle(sample)
152             except queue.Empty:
153                 pass
154             except IOError:
155                 continue
156             except (ValueError, IndexError, KeyError, MemoryError):
157                 self.stop()
158                 break
159
160     # pylint: disable=too-many-boolean-expressions
161     def handle(self, sample):
162         ''' Store values and names if names matches following:
163             1. cpu + keys
164             2. processes + keys
165             3. interface + keys +  !xkeys
166             4. ovs_stats + keys
167             5. dpdkstat + keys
168             6. intel_rdt + keys + !xkeys
169             sample[1] is the name of the sample, which is . separated strings.
170             The first field in sample[1] is the type - cpu, proceesses, etc.
171             For intel_rdt, the second field contains the core-id, which is
172             used to make the decision on 'exclusions'
173             sample[0]: Contains the host information - which is not considered.
174             sample[2]: Contains the Value.
175             sample[3]: Contains the Time (in ticks)
176             '''
177         if (('cpu' in sample[1] and
178              any(c in sample[1] for c in self.collectd_cpu_keys)) or
179                 ('processes' in sample[1] and
180                  any(p in sample[1] for p in self.collectd_processes_keys)) or
181                 ('interface' in sample[1] and
182                  (any(i in sample[1] for i in self.collectd_iface_keys) and
183                   any(x not in sample[1]
184                       for x in self.collectd_iface_xkeys))) or
185                 ('ovs_stats' in sample[1] and
186                  any(o in sample[1] for o in self.collectd_ovsstats_keys)) or
187                 ('dpdkstat' in sample[1] and
188                  any(d in sample[1] for d in self.collectd_dpdkstats_keys)) or
189                 ('intel_rdt' in sample[1] and
190                  any(r in sample[1] for r in self.collectd_intelrdt_keys) and
191                  (int(sample[1].split('.')[1]) not in self.exclude_coreids))):
192             if sample[1] not in self.pd_dict:
193                 self.pd_dict[sample[1]] = list()
194             val = self.pd_dict[sample[1]]
195             val.append((sample[2], sample[3]))
196             self.pd_dict[sample[1]] = val
197
198     def stop(self):
199         """
200         Stop receiving the samples.
201         """
202         self.server.close()
203         self.q_of_samples.put(None)
204         self.control.value = True
205
206
207 # inherit from collector.Icollector.
208 class Collectd(collector.ICollector):
209     """A collector of system statistics based on collectd
210
211     It starts a UDP server, receives metrics from collectd
212     and plot the results.
213     """
214
215     def __init__(self, results_dir, test_name):
216         """
217         Initialize collection of statistics
218         """
219         self._log = os.path.join(results_dir,
220                                  settings.getValue('LOG_FILE_COLLECTD') +
221                                  '_' + test_name + '.log')
222         self.results = {}
223         self.sample_dict = multiprocessing.Manager().dict()
224         self.control = multiprocessing.Value('b', False)
225         self.receiver = Receiver(self.sample_dict, self.control)
226
227     def start(self):
228         """
229         Start receiving samples
230         """
231         self.receiver.server.start()
232         self.receiver.start()
233
234     def stop(self):
235         """
236         Stop receiving samples
237         """
238         self.control.value = True
239         self.receiver.stop()
240         self.receiver.server.join(5)
241         self.receiver.join(5)
242         if self.receiver.server.is_alive():
243             self.receiver.server.terminate()
244         if self.receiver.is_alive():
245             self.receiver.terminate()
246         self.results = copy.deepcopy(self.sample_dict)
247
248     def get_results(self):
249         """
250         Return the results.
251         """
252         return get_results_to_print(self.results)
253
254     def print_results(self):
255         """
256         Print - Plot and save raw-data.
257         log the collected statistics
258         """
259         plot_graphs(self.results)
260         proc_stats = get_results_to_print(self.results)
261         for process in proc_stats:
262             logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
263             for(key, value) in proc_stats[process].items():
264                 logging.info("         Statistic: " + str(key) +
265                              ", Value: " + str(value))