1 # Copyright 2017-2018 Spirent Communications.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 Collects samples from collectd through collectd_bucky.
17 Depending on the policy - decides to keep the sample or discard.
18 Plot the values of the stored samples once the test is completed
24 import multiprocessing
26 from collections import OrderedDict
29 import matplotlib.pyplot as plt
31 import tools.collectors.collectd.collectd_bucky as cb
32 from tools.collectors.collector import collector
33 from conf import settings
35 # The y-lables. Keys in this dictionary are used as y-labels.
36 YLABELS = {'No/Of Packets': ['dropped', 'packets', 'if_octets', 'errors',
37 'if_rx_octets', 'if_tx_octets'],
38 'Jiffies': ['cputime'],
39 'Bandwidth b/s': ['memory_bandwidth'],
40 'Bytes': ['bytes.llc']}
43 def get_label(sample):
45 Returns the y-label for the plot.
48 if any(r in sample for r in YLABELS[label]):
52 def plot_graphs(dict_of_arrays):
55 Store the data used for plotting.
58 results_dir = settings.getValue('RESULTS_PATH')
59 for key in dict_of_arrays:
60 tup_list = dict_of_arrays[key]
61 two_lists = list(map(list, zip(*tup_list)))
62 y_axis_list = two_lists[0]
63 x_axis_list = two_lists[1]
64 if np.count_nonzero(y_axis_list) > 0:
65 with open(os.path.join(results_dir,
66 str(key) + '.data'), "w") as pfile:
67 writer = csv.writer(pfile, delimiter='\t')
68 writer.writerows(zip(x_axis_list, y_axis_list))
70 plt.plot(x_axis_list, y_axis_list)
71 plt.xlabel("Time (Ticks)")
72 plt.ylabel(get_label(key))
73 plt.savefig(os.path.join(results_dir, str(key) + '.png'))
80 def get_results_to_print(dict_of_arrays):
82 Return a results dictionary for report tool to
83 print the process-statistics.
85 presults = OrderedDict()
86 results = OrderedDict()
87 for key in dict_of_arrays:
88 if ('processes' in key and
89 any(proc in key for proc in ['ovs', 'vpp', 'qemu'])):
90 reskey = '.'.join(key.split('.')[2:])
91 preskey = key.split('.')[1] + '_collectd'
92 tup_list = dict_of_arrays[key]
93 two_lists = list(map(list, zip(*tup_list)))
94 y_axis_list = two_lists[0]
96 if np.count_nonzero(y_axis_list) > 0:
97 mean = np.mean(y_axis_list)
98 results[reskey] = mean
99 presults[preskey] = results
103 class Receiver(multiprocessing.Process):
105 Wrapper Receiver (of samples) class
107 def __init__(self, pd_dict, control):
110 A queue will be shared with collectd_bucky
112 super(Receiver, self).__init__()
114 self.q_of_samples = multiprocessing.Queue()
115 self.server = cb.get_collectd_server(self.q_of_samples)
116 self.control = control
117 self.pd_dict = pd_dict
118 self.collectd_cpu_keys = settings.getValue('COLLECTD_CPU_KEYS')
119 self.collectd_processes_keys = settings.getValue(
120 'COLLECTD_PROCESSES_KEYS')
121 self.collectd_iface_keys = settings.getValue(
122 'COLLECTD_INTERFACE_KEYS')
123 self.collectd_iface_xkeys = settings.getValue(
124 'COLLECTD_INTERFACE_XKEYS')
125 self.collectd_intelrdt_keys = settings.getValue(
126 'COLLECTD_INTELRDT_KEYS')
127 self.collectd_ovsstats_keys = settings.getValue(
128 'COLLECTD_OVSSTAT_KEYS')
129 self.collectd_dpdkstats_keys = settings.getValue(
130 'COLLECTD_DPDKSTAT_KEYS')
131 self.collectd_intelrdt_xkeys = settings.getValue(
132 'COLLECTD_INTELRDT_XKEYS')
133 self.exclude_coreids = []
134 # Expand the ranges in the intelrdt-xkeys
135 for xkey in self.collectd_intelrdt_xkeys:
137 self.exclude_coreids.append(int(xkey))
139 left, right = map(int, xkey.split('-'))
140 self.exclude_coreids += range(left, right + 1)
144 Start receiving the samples.
146 while not self.control.value:
148 sample = self.q_of_samples.get(True, 1)
156 except (ValueError, IndexError, KeyError, MemoryError):
160 # pylint: disable=too-many-boolean-expressions
161 def handle(self, sample):
162 ''' Store values and names if names matches following:
165 3. interface + keys + !xkeys
168 6. intel_rdt + keys + !xkeys
169 sample[1] is the name of the sample, which is . separated strings.
170 The first field in sample[1] is the type - cpu, proceesses, etc.
171 For intel_rdt, the second field contains the core-id, which is
172 used to make the decision on 'exclusions'
173 sample[0]: Contains the host information - which is not considered.
174 sample[2]: Contains the Value.
175 sample[3]: Contains the Time (in ticks)
177 if (('cpu' in sample[1] and
178 any(c in sample[1] for c in self.collectd_cpu_keys)) or
179 ('processes' in sample[1] and
180 any(p in sample[1] for p in self.collectd_processes_keys)) or
181 ('interface' in sample[1] and
182 (any(i in sample[1] for i in self.collectd_iface_keys) and
183 any(x not in sample[1]
184 for x in self.collectd_iface_xkeys))) or
185 ('ovs_stats' in sample[1] and
186 any(o in sample[1] for o in self.collectd_ovsstats_keys)) or
187 ('dpdkstat' in sample[1] and
188 any(d in sample[1] for d in self.collectd_dpdkstats_keys)) or
189 ('intel_rdt' in sample[1] and
190 any(r in sample[1] for r in self.collectd_intelrdt_keys) and
191 (int(sample[1].split('.')[1]) not in self.exclude_coreids))):
192 if sample[1] not in self.pd_dict:
193 self.pd_dict[sample[1]] = list()
194 val = self.pd_dict[sample[1]]
195 val.append((sample[2], sample[3]))
196 self.pd_dict[sample[1]] = val
200 Stop receiving the samples.
203 self.q_of_samples.put(None)
204 self.control.value = True
207 # inherit from collector.Icollector.
208 class Collectd(collector.ICollector):
209 """A collector of system statistics based on collectd
211 It starts a UDP server, receives metrics from collectd
212 and plot the results.
215 def __init__(self, results_dir, test_name):
217 Initialize collection of statistics
219 self._log = os.path.join(results_dir,
220 settings.getValue('LOG_FILE_COLLECTD') +
221 '_' + test_name + '.log')
223 self.sample_dict = multiprocessing.Manager().dict()
224 self.control = multiprocessing.Value('b', False)
225 self.receiver = Receiver(self.sample_dict, self.control)
229 Start receiving samples
231 self.receiver.server.start()
232 self.receiver.start()
236 Stop receiving samples
238 self.control.value = True
240 self.receiver.server.join(5)
241 self.receiver.join(5)
242 if self.receiver.server.is_alive():
243 self.receiver.server.terminate()
244 if self.receiver.is_alive():
245 self.receiver.terminate()
246 self.results = copy.deepcopy(self.sample_dict)
248 def get_results(self):
252 return get_results_to_print(self.results)
254 def print_results(self):
256 Print - Plot and save raw-data.
257 log the collected statistics
259 plot_graphs(self.results)
260 proc_stats = get_results_to_print(self.results)
261 for process in proc_stats:
262 logging.info("Process: " + '_'.join(process.split('_')[:-1]))
263 for(key, value) in proc_stats[process].items():
264 logging.info(" Statistic: " + str(key) +
265 ", Value: " + str(value))