1 # Copyright 2017-2018 Spirent Communications.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 Collects samples from collectd through collectd_bucky.
17 Depending on the policy - decides to keep the sample or discard.
18 Plot the values of the stored samples once the test is completed
25 import multiprocessing
27 from collections import OrderedDict
30 import matplotlib.pyplot as plt
32 import tools.collectors.collectd.collectd_bucky as cb
33 from tools.collectors.collector import collector
34 from tools import tasks
35 from conf import settings
37 # The y-lables. Keys in this dictionary are used as y-labels.
38 YLABELS = {'No/Of Packets': ['dropped', 'packets', 'if_octets', 'errors',
39 'if_rx_octets', 'if_tx_octets'],
40 'Jiffies': ['cputime'],
41 'Bandwidth b/s': ['memory_bandwidth'],
42 'Bytes': ['bytes.llc']}
45 def get_label(sample):
47 Returns the y-label for the plot.
50 if any(r in sample for r in YLABELS[label]):
55 def plot_graphs(dict_of_arrays):
58 Store the data used for plotting.
61 results_dir = settings.getValue('RESULTS_PATH')
62 for key in dict_of_arrays:
63 tup_list = dict_of_arrays[key]
64 two_lists = list(map(list, zip(*tup_list)))
65 y_axis_list = two_lists[0]
66 x_axis_list = two_lists[1]
67 if np.count_nonzero(y_axis_list) > 0:
68 with open(os.path.join(results_dir,
69 str(key) + '.data'), "w") as pfile:
70 writer = csv.writer(pfile, delimiter='\t')
71 writer.writerows(zip(x_axis_list, y_axis_list))
73 plt.plot(x_axis_list, y_axis_list)
74 plt.xlabel("Time (Ticks)")
75 plt.ylabel(get_label(key))
76 plt.savefig(os.path.join(results_dir, str(key) + '.png'))
83 def get_results_to_print(dict_of_arrays):
85 Return a results dictionary for report tool to
86 print the process-statistics.
88 presults = OrderedDict()
89 results = OrderedDict()
90 for key in dict_of_arrays:
91 if ('processes' in key and
92 any(proc in key for proc in ['ovs', 'vpp', 'qemu'])):
93 reskey = '.'.join(key.split('.')[2:])
94 preskey = key.split('.')[1] + '_collectd'
95 tup_list = dict_of_arrays[key]
96 two_lists = list(map(list, zip(*tup_list)))
97 y_axis_list = two_lists[0]
99 if np.count_nonzero(y_axis_list) > 0:
100 mean = np.mean(y_axis_list)
101 results[reskey] = mean
102 presults[preskey] = results
106 class Receiver(multiprocessing.Process):
108 Wrapper Receiver (of samples) class
110 def __init__(self, pd_dict, control):
113 A queue will be shared with collectd_bucky
115 super(Receiver, self).__init__()
117 self.q_of_samples = multiprocessing.Queue()
118 self.server = cb.get_collectd_server(self.q_of_samples)
119 self.control = control
120 self.pd_dict = pd_dict
121 self.collectd_cpu_keys = settings.getValue('COLLECTD_CPU_KEYS')
122 self.collectd_processes_keys = settings.getValue(
123 'COLLECTD_PROCESSES_KEYS')
124 self.collectd_iface_keys = settings.getValue(
125 'COLLECTD_INTERFACE_KEYS')
126 self.collectd_iface_xkeys = settings.getValue(
127 'COLLECTD_INTERFACE_XKEYS')
128 self.collectd_intelrdt_keys = settings.getValue(
129 'COLLECTD_INTELRDT_KEYS')
130 self.collectd_ovsstats_keys = settings.getValue(
131 'COLLECTD_OVSSTAT_KEYS')
132 self.collectd_dpdkstats_keys = settings.getValue(
133 'COLLECTD_DPDKSTAT_KEYS')
134 self.collectd_intelrdt_xkeys = settings.getValue(
135 'COLLECTD_INTELRDT_XKEYS')
136 self.exclude_coreids = []
137 # Expand the ranges in the intelrdt-xkeys
138 for xkey in self.collectd_intelrdt_xkeys:
140 self.exclude_coreids.append(int(xkey))
142 left, right = map(int, xkey.split('-'))
143 self.exclude_coreids += range(left, right + 1)
147 Start receiving the samples.
149 while not self.control.value:
151 sample = self.q_of_samples.get(True, 1)
159 except (ValueError, IndexError, KeyError, MemoryError):
163 # pylint: disable=too-many-boolean-expressions
164 def handle(self, sample):
165 ''' Store values and names if names matches following:
168 3. interface + keys + !xkeys
171 6. intel_rdt + keys + !xkeys
172 sample[1] is the name of the sample, which is . separated strings.
173 The first field in sample[1] is the type - cpu, proceesses, etc.
174 For intel_rdt, the second field contains the core-id, which is
175 used to make the decision on 'exclusions'
176 sample[0]: Contains the host information - which is not considered.
177 sample[2]: Contains the Value.
178 sample[3]: Contains the Time (in ticks)
180 if (('cpu' in sample[1] and
181 any(c in sample[1] for c in self.collectd_cpu_keys)) or
182 ('processes' in sample[1] and
183 any(p in sample[1] for p in self.collectd_processes_keys)) or
184 ('interface' in sample[1] and
185 (any(i in sample[1] for i in self.collectd_iface_keys) and
186 any(x not in sample[1]
187 for x in self.collectd_iface_xkeys))) or
188 ('ovs_stats' in sample[1] and
189 any(o in sample[1] for o in self.collectd_ovsstats_keys)) or
190 ('dpdkstat' in sample[1] and
191 any(d in sample[1] for d in self.collectd_dpdkstats_keys)) or
192 ('intel_rdt' in sample[1] and
193 any(r in sample[1] for r in self.collectd_intelrdt_keys) and
194 (int(sample[1].split('.')[1]) not in self.exclude_coreids))):
195 if sample[1] not in self.pd_dict:
196 self.pd_dict[sample[1]] = list()
197 val = self.pd_dict[sample[1]]
198 val.append((sample[2], sample[3]))
199 self.pd_dict[sample[1]] = val
200 logging.debug("COLLECTD %s", ' '.join(str(p) for p in sample))
204 Stop receiving the samples.
207 self.q_of_samples.put(None)
208 self.control.value = True
211 # inherit from collector.Icollector.
212 class Collectd(collector.ICollector):
213 """A collector of system statistics based on collectd
215 It starts a UDP server, receives metrics from collectd
216 and plot the results.
219 def __init__(self, results_dir, test_name):
221 Initialize collection of statistics
223 self.logger = logging.getLogger(__name__)
224 self.resultsdir = results_dir
225 self.testname = test_name
227 self.sample_dict = multiprocessing.Manager().dict()
228 self.control = multiprocessing.Value('b', False)
229 self.receiver = Receiver(self.sample_dict, self.control)
230 self.cleanup_metrics()
231 # Assumption: collected is installed at /opt/collectd
232 # And collected is configured to write to csv at /tmp/csv
233 self.pid = tasks.run_background_task(
234 ['sudo', '/opt/collectd/sbin/collectd'],
235 self.logger, 'Staring Collectd')
237 def cleanup_metrics(self):
239 Cleaup the old or archived metrics
241 for name in glob.glob(os.path.join('/tmp/csv/', '*')):
242 tasks.run_task(['sudo', 'rm', '-rf', name], self.logger,
243 'Cleaning up Metrics', True)
247 Start receiving samples
249 self.receiver.server.start()
250 self.receiver.start()
254 Stop receiving samples
256 tasks.terminate_task_subtree(self.pid, logger=self.logger)
257 # At times collectd fails to fully terminate.
258 # Killing process by name too helps.
259 tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'],
260 self.logger, 'Stopping Collectd', True)
261 self.control.value = True
263 self.receiver.server.join(5)
264 self.receiver.join(5)
265 if self.receiver.server.is_alive():
266 self.receiver.server.terminate()
267 if self.receiver.is_alive():
268 self.receiver.terminate()
269 self.results = copy.deepcopy(self.sample_dict)
270 # Backup the collectd-metrics for this test into a zipfile
271 filename = ('/tmp/collectd-' + settings.getValue('LOG_TIMESTAMP') +
273 tasks.run_task(['sudo', 'tar', '-czvf', filename, '/tmp/csv/'],
274 self.logger, 'Zipping File', True)
275 self.cleanup_metrics()
277 def get_results(self):
281 return get_results_to_print(self.results)
283 def print_results(self):
285 Print - Plot and save raw-data.
286 log the collected statistics
288 plot_graphs(self.results)
289 proc_stats = get_results_to_print(self.results)
290 for process in proc_stats:
291 logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
292 for(key, value) in proc_stats[process].items():
293 logging.info(" Statistic: " + str(key) +
294 ", Value: " + str(value))