Decrease Sampling interval
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47 LOG_RESULT = logging.getLogger('yardstick')
48 LOG_RESULT.setLevel(logging.DEBUG)
49
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85
86     def __new__(cls, *args):
87         try:
88             matches = cls.CORE_RE.search(str(args[0]))
89             if matches:
90                 args = matches.groups()
91
92             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93                                                        'h' if args[2] else '')
94
95         except (AttributeError, TypeError, IndexError, ValueError):
96             raise ValueError('Invalid core spec {}'.format(args))
97
98     def is_hyperthread(self):
99         return self.hyperthread == 'h'
100
101     @property
102     def index(self):
103         return int(self.is_hyperthread())
104
105     def find_in_topology(self, cpu_topology):
106         try:
107             socket_core_match = cpu_topology[self.socket_id][self.core_id]
108             sorted_match = sorted(socket_core_match.values())
109             return sorted_match[self.index][0]
110         except (KeyError, IndexError):
111             template = "Core {}{} on socket {} does not exist"
112             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113
114
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116     def __new__(cls, *args):
117         try:
118             assert args[0] is not str(args[0])
119             args = tuple(args[0])
120         except (AssertionError, IndexError, TypeError):
121             pass
122
123         return super(TotStatsTuple, cls).__new__(cls, *args)
124
125
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127                                                         'delta_tx,delta_tsc,'
128                                                         'latency,rx_total,tx_total,'
129                                                         'requested_pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def tx_mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def rx_mpps(self):
144         # calculate the effective throughput in Mpps
145         return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
146
147     @property
148     def can_be_lost(self):
149         return int(self.tx_total * self.tolerated / 1e2)
150
151     @property
152     def drop_total(self):
153         return self.tx_total - self.rx_total
154
155     @property
156     def success(self):
157         return self.drop_total <= self.can_be_lost
158
159     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
160         if pkt_loss is None:
161             pkt_loss = self.pkt_loss
162
163         if port_samples is None:
164             port_samples = {}
165
166         latency_keys = [
167             "LatencyMin",
168             "LatencyMax",
169             "LatencyAvg",
170         ]
171
172         samples = {
173             "Throughput": self.rx_mpps,
174             "RxThroughput": self.rx_mpps,
175             "DropPackets": pkt_loss,
176             "CurrentDropPackets": pkt_loss,
177             "RequestedTxThroughput": self.requested_pps / 1e6,
178             "TxThroughput": self.tx_mpps,
179             "PktSize": pkt_size,
180         }
181         if port_samples:
182             samples.update(port_samples)
183
184         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
185         return samples
186
187     def log_data(self, logger=None):
188         if logger is None:
189             logger = LOG_RESULT
190
191         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
192         logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
193         logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
194                     self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
195
196
197 class PacketDump(object):
198     @staticmethod
199     def assert_func(func, value1, value2, template=None):
200         assert func(value1, value2), template.format(value1, value2)
201
202     def __init__(self, port_id, data_len, payload):
203         template = "Packet dump has specified length {}, but payload is {} bytes long"
204         self.assert_func(operator.eq, data_len, len(payload), template)
205         self._port_id = port_id
206         self._data_len = data_len
207         self._payload = payload
208
209     @property
210     def port_id(self):
211         """Get the port id of the packet dump"""
212         return self._port_id
213
214     @property
215     def data_len(self):
216         """Get the length of the data received"""
217         return self._data_len
218
219     def __str__(self):
220         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
221
222     def payload(self, start=None, end=None):
223         """Get part of the payload as a list of ordinals.
224
225         Returns a list of byte values, matching the contents of the packet dump.
226         Optional start and end parameters can be specified to retrieve only a
227         part of the packet contents.
228
229         The number of elements in the list is equal to end - start + 1, so end
230         is the offset of the last character.
231
232         Args:
233             start (pos. int): the starting offset in the payload. If it is not
234                 specified or None, offset 0 is assumed.
235             end (pos. int): the ending offset of the payload. If it is not
236                 specified or None, the contents until the end of the packet are
237                 returned.
238
239         Returns:
240             [int, int, ...]. Each int represents the ordinal value of a byte in
241             the packet payload.
242         """
243         if start is None:
244             start = 0
245
246         if end is None:
247             end = self.data_len - 1
248
249         # Bounds checking on offsets
250         template = "Start offset must be non-negative"
251         self.assert_func(operator.ge, start, 0, template)
252
253         template = "End offset must be less than {1}"
254         self.assert_func(operator.lt, end, self.data_len, template)
255
256         # Adjust for splice operation: end offset must be 1 more than the offset
257         # of the last desired character.
258         end += 1
259
260         return self._payload[start:end]
261
262
263 class ProxSocketHelper(object):
264
265     def __init__(self, sock=None):
266         """ creates new prox instance """
267         super(ProxSocketHelper, self).__init__()
268
269         if sock is None:
270             sock = socket.socket()
271
272         self._sock = sock
273         self._pkt_dumps = []
274         self.master_stats = None
275
276     def connect(self, ip, port):
277         """Connect to the prox instance on the remote system"""
278         self._sock.connect((ip, port))
279
280     def get_socket(self):
281         """ get the socket connected to the remote instance """
282         return self._sock
283
284     def _parse_socket_data(self, decoded_data, pkt_dump_only):
285         def get_newline_index():
286             return decoded_data.find('\n', index)
287
288         ret_str = ''
289         index = 0
290         for newline_index in iter(get_newline_index, -1):
291             ret_str = decoded_data[index:newline_index]
292
293             try:
294                 mode, port_id, data_len = ret_str.split(',', 2)
295             except ValueError:
296                 mode, port_id, data_len = None, None, None
297
298             if mode != 'pktdump':
299                 # Regular 1-line message. Stop reading from the socket.
300                 LOG.debug("Regular response read")
301                 return ret_str, True
302
303             LOG.debug("Packet dump header read: [%s]", ret_str)
304
305             # The line is a packet dump header. Parse it, read the
306             # packet payload, store the dump for later retrieval.
307             # Skip over the packet dump and continue processing: a
308             # 1-line response may follow the packet dump.
309
310             data_len = int(data_len)
311             data_start = newline_index + 1  # + 1 to skip over \n
312             data_end = data_start + data_len
313             sub_data = decoded_data[data_start:data_end]
314             pkt_payload = array.array('B', (ord(v) for v in sub_data))
315             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
316             self._pkt_dumps.append(pkt_dump)
317
318             if pkt_dump_only:
319                 # Return boolean instead of string to signal
320                 # successful reception of the packet dump.
321                 LOG.debug("Packet dump stored, returning")
322                 return True, False
323
324             index = data_end + 1
325
326         return ret_str, False
327
328     def get_data(self, pkt_dump_only=False, timeout=0.01):
329         """ read data from the socket """
330
331         # This method behaves slightly differently depending on whether it is
332         # called to read the response to a command (pkt_dump_only = 0) or if
333         # it is called specifically to read a packet dump (pkt_dump_only = 1).
334         #
335         # Packet dumps look like:
336         #   pktdump,<port_id>,<data_len>\n
337         #   <packet contents as byte array>\n
338         # This means the total packet dump message consists of 2 lines instead
339         # of 1 line.
340         #
341         # - Response for a command (pkt_dump_only = 0):
342         #   1) Read response from the socket until \n (end of message)
343         #   2a) If the response is a packet dump header (starts with "pktdump,"):
344         #     - Read the packet payload and store the packet dump for later
345         #       retrieval.
346         #     - Reset the state and restart from 1). Eventually state 2b) will
347         #       be reached and the function will return.
348         #   2b) If the response is not a packet dump:
349         #     - Return the received message as a string
350         #
351         # - Explicit request to read a packet dump (pkt_dump_only = 1):
352         #   - Read the dump header and payload
353         #   - Store the packet dump for later retrieval
354         #   - Return True to signify a packet dump was successfully read
355
356         def is_ready():
357             # recv() is blocking, so avoid calling it when no data is waiting.
358             ready = select.select([self._sock], [], [], timeout)
359             return bool(ready[0])
360
361         status = False
362         ret_str = ""
363         for status in iter(is_ready, False):
364             decoded_data = self._sock.recv(256).decode('utf-8')
365             ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
366             if (done):
367                 break
368
369         LOG.debug("Received data from socket: [%s]", ret_str)
370         return ret_str if status else ''
371
372     def put_command(self, to_send):
373         """ send data to the remote instance """
374         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
375         try:
376             # NOTE: sendall will block, we need a timeout
377             self._sock.sendall(to_send.encode('utf-8'))
378         except:  # pylint: disable=bare-except
379             pass
380
381     def get_packet_dump(self):
382         """ get the next packet dump """
383         if self._pkt_dumps:
384             return self._pkt_dumps.pop(0)
385         return None
386
387     def stop_all_reset(self):
388         """ stop the remote instance and reset stats """
389         LOG.debug("Stop all and reset stats")
390         self.stop_all()
391         self.reset_stats()
392
393     def stop_all(self):
394         """ stop all cores on the remote instance """
395         LOG.debug("Stop all")
396         self.put_command("stop all\n")
397         time.sleep(3)
398
399     def stop(self, cores, task=''):
400         """ stop specific cores on the remote instance """
401         LOG.debug("Stopping cores %s", cores)
402         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
403         time.sleep(3)
404
405     def start_all(self):
406         """ start all cores on the remote instance """
407         LOG.debug("Start all")
408         self.put_command("start all\n")
409
410     def start(self, cores):
411         """ start specific cores on the remote instance """
412         LOG.debug("Starting cores %s", cores)
413         self.put_command("start {}\n".format(join_non_strings(',', cores)))
414         time.sleep(3)
415
416     def reset_stats(self):
417         """ reset the statistics on the remote instance """
418         LOG.debug("Reset stats")
419         self.put_command("reset stats\n")
420         time.sleep(1)
421
422     def _run_template_over_cores(self, template, cores, *args):
423         for core in cores:
424             self.put_command(template.format(core, *args))
425
426     def set_pkt_size(self, cores, pkt_size):
427         """ set the packet size to generate on the remote instance """
428         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
429         pkt_size -= 4
430         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
431         time.sleep(1)
432
433     def set_value(self, cores, offset, value, length):
434         """ set value on the remote instance """
435         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
436         LOG.debug(msg, cores, value, length, offset)
437         template = "set value {} 0 {} {} {}\n"
438         self._run_template_over_cores(template, cores, offset, value, length)
439
440     def reset_values(self, cores):
441         """ reset values on the remote instance """
442         LOG.debug("Set value for core(s) %s", cores)
443         self._run_template_over_cores("reset values {} 0\n", cores)
444
445     def set_speed(self, cores, speed, tasks=None):
446         """ set speed on the remote instance """
447         if tasks is None:
448             tasks = [0] * len(cores)
449         elif len(tasks) != len(cores):
450             LOG.error("set_speed: cores and tasks must have the same len")
451         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
452         for (core, task) in list(zip(cores, tasks)):
453             self.put_command("speed {} {} {}\n".format(core, task, speed))
454
455     def slope_speed(self, cores_speed, duration, n_steps=0):
456         """will start to increase speed from 0 to N where N is taken from
457         a['speed'] for each a in cores_speed"""
458         # by default, each step will take 0.5 sec
459         if n_steps == 0:
460             n_steps = duration * 2
461
462         private_core_data = []
463         step_duration = float(duration) / n_steps
464         for core_data in cores_speed:
465             target = float(core_data['speed'])
466             private_core_data.append({
467                 'cores': core_data['cores'],
468                 'zero': 0,
469                 'delta': target / n_steps,
470                 'current': 0,
471                 'speed': target,
472             })
473
474         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
475         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
476             time.sleep(step_duration)
477             for core_data in private_core_data:
478                 core_data['current'] = core_data[key1] + core_data[key2]
479                 self.set_speed(core_data['cores'], core_data['current'])
480
481     def set_pps(self, cores, pps, pkt_size,
482                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
483         """ set packets per second for specific cores on the remote instance """
484         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
485         LOG.debug(msg, cores, pps, pkt_size)
486
487         # speed in percent of line-rate
488         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
489         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
490
491     def lat_stats(self, cores, task=0):
492         """Get the latency statistics from the remote system"""
493         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
494         lat_min = {}
495         lat_max = {}
496         lat_avg = {}
497         for core in cores:
498             self.put_command("lat stats {} {} \n".format(core, task))
499             ret = self.get_data()
500
501             try:
502                 lat_min[core], lat_max[core], lat_avg[core] = \
503                     tuple(int(n) for n in ret.split(",")[:3])
504
505             except (AttributeError, ValueError, TypeError):
506                 pass
507
508         return lat_min, lat_max, lat_avg
509
510     def get_all_tot_stats(self):
511         self.put_command("tot stats\n")
512         all_stats_str = self.get_data().split(",")
513         if len(all_stats_str) != 4:
514             all_stats = [0] * 4
515             return all_stats
516         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
517         self.master_stats = all_stats
518         return all_stats
519
520     def hz(self):
521         return self.get_all_tot_stats()[3]
522
523     def core_stats(self, cores, task=0):
524         """Get the receive statistics from the remote system"""
525         rx = tx = drop = tsc = 0
526         for core in cores:
527             self.put_command("core stats {} {}\n".format(core, task))
528             ret = self.get_data().split(",")
529             rx += int(ret[0])
530             tx += int(ret[1])
531             drop += int(ret[2])
532             tsc = int(ret[3])
533         return rx, tx, drop, tsc
534
535     def multi_port_stats(self, ports):
536         """get counter values from all ports port"""
537
538         ports_str = ""
539         for port in ports:
540             ports_str = ports_str + str(port) + ","
541         ports_str = ports_str[:-1]
542
543         ports_all_data = []
544         tot_result = [0] * len(ports)
545
546         retry_counter = 0
547         port_index = 0
548         while (len(ports) is not len(ports_all_data)) and (retry_counter < 10):
549             self.put_command("multi port stats {}\n".format(ports_str))
550             ports_all_data = self.get_data().split(";")
551
552             if len(ports) is len(ports_all_data):
553                 for port_data_str in ports_all_data:
554
555                     try:
556                         tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")]
557                     except (IndexError, TypeError):
558                         LOG.error("Port Index error %d  %s - retrying ", port_index, port_data_str)
559
560                     if (len(tot_result[port_index]) is not 6) or \
561                                     tot_result[port_index][0] is not ports[port_index]:
562                         ports_all_data = []
563                         tot_result = [0] * len(ports)
564                         port_index = 0
565                         time.sleep(0.1)
566                         LOG.error("Corrupted PACKET %s - retrying", port_data_str)
567                         break
568                     else:
569                         port_index = port_index + 1
570             else:
571                 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
572                 ports_all_data = []
573                 tot_result = [0] * len(ports)
574                 port_index = 0
575                 time.sleep(0.1)
576
577             retry_counter = retry_counter + 1
578         return tot_result
579
580     def port_stats(self, ports):
581         """get counter values from a specific port"""
582         tot_result = [0] * 12
583         for port in ports:
584             self.put_command("port_stats {}\n".format(port))
585             ret = [try_int(s, 0) for s in self.get_data().split(",")]
586             tot_result = [sum(x) for x in zip(tot_result, ret)]
587         return tot_result
588
589     @contextmanager
590     def measure_tot_stats(self):
591         start = self.get_all_tot_stats()
592         container = {'start_tot': start}
593         try:
594             yield container
595         finally:
596             container['end_tot'] = end = self.get_all_tot_stats()
597
598         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
599
600     def tot_stats(self):
601         """Get the total statistics from the remote system"""
602         stats = self.get_all_tot_stats()
603         return stats[:3]
604
605     def tot_ierrors(self):
606         """Get the total ierrors from the remote system"""
607         self.put_command("tot ierrors tot\n")
608         recv = self.get_data().split(',')
609         tot_ierrors = int(recv[0])
610         tsc = int(recv[0])
611         return tot_ierrors, tsc
612
613     def set_count(self, count, cores):
614         """Set the number of packets to send on the specified core"""
615         self._run_template_over_cores("count {} 0 {}\n", cores, count)
616
617     def dump_rx(self, core_id, task_id=0, count=1):
618         """Activate dump on rx on the specified core"""
619         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
620         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
621         time.sleep(1.5)  # Give PROX time to set up packet dumping
622
623     def quit(self):
624         self.stop_all()
625         self._quit()
626         self.force_quit()
627
628     def _quit(self):
629         """ stop all cores on the remote instance """
630         LOG.debug("Quit prox")
631         self.put_command("quit\n")
632         time.sleep(3)
633
634     def force_quit(self):
635         """ stop all cores on the remote instance """
636         LOG.debug("Force Quit prox")
637         self.put_command("quit_force\n")
638         time.sleep(3)
639
640
641 _LOCAL_OBJECT = object()
642
643
644 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
645     # the actual app is lowercase
646     APP_NAME = 'prox'
647     # not used for Prox but added for consistency
648     VNF_TYPE = "PROX"
649
650     LUA_PARAMETER_NAME = ""
651     LUA_PARAMETER_PEER = {
652         "gen": "sut",
653         "sut": "gen",
654     }
655
656     CONFIG_QUEUE_TIMEOUT = 120
657
658     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
659         self.remote_path = None
660         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
661         self.remote_prox_file_name = None
662         self._prox_config_data = None
663         self.additional_files = {}
664         self.config_queue = Queue()
665         # allow_exit_without_flush
666         self.config_queue.cancel_join_thread()
667         self._global_section = None
668
669     @property
670     def prox_config_data(self):
671         if self._prox_config_data is None:
672             # this will block, but it needs too
673             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
674         return self._prox_config_data
675
676     @property
677     def global_section(self):
678         if self._global_section is None and self.prox_config_data:
679             self._global_section = self.find_section("global")
680         return self._global_section
681
682     def find_section(self, name, default=_LOCAL_OBJECT):
683         result = next((value for key, value in self.prox_config_data if key == name), default)
684         if result is _LOCAL_OBJECT:
685             raise KeyError('{} not found in Prox config'.format(name))
686         return result
687
688     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
689         section = self.find_section(section_name, [])
690         result = next((value for key, value in section if key == section_key), default)
691         if result is _LOCAL_OBJECT:
692             template = '{} not found in {} section of Prox config'
693             raise KeyError(template.format(section_key, section_name))
694         return result
695
696     def copy_to_target(self, config_file_path, prox_file):
697         remote_path = os.path.join("/tmp", prox_file)
698         self.ssh_helper.put(config_file_path, remote_path)
699         return remote_path
700
701     @staticmethod
702     def _get_tx_port(section, sections):
703         iface_port = [-1]
704         for item in sections[section]:
705             if item[0] == "tx port":
706                 iface_port = re.findall(r'\d+', item[1])
707                 # do we want the last one?
708                 #   if yes, then can we reverse?
709         return int(iface_port[0])
710
711     @staticmethod
712     def _replace_quoted_with_value(quoted, value, count=1):
713         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
714         return new_string
715
716     def _insert_additional_file(self, value):
717         file_str = value.split('"')
718         base_name = os.path.basename(file_str[1])
719         file_str[1] = self.additional_files[base_name]
720         return '"'.join(file_str)
721
722     def generate_prox_config_file(self, config_path):
723         sections = []
724         prox_config = ConfigParser(config_path, sections)
725         prox_config.parse()
726
727         # Ensure MAC is set "hardware"
728         all_ports = self.vnfd_helper.port_pairs.all_ports
729         # use dpdk port number
730         for port_name in all_ports:
731             port_num = self.vnfd_helper.port_num(port_name)
732             port_section_name = "port {}".format(port_num)
733             for section_name, section in sections:
734                 if port_section_name != section_name:
735                     continue
736
737                 for section_data in section:
738                     if section_data[0] == "mac":
739                         section_data[1] = "hardware"
740
741         # search for dst mac
742         for _, section in sections:
743             for section_data in section:
744                 item_key, item_val = section_data
745                 if item_val.startswith("@@dst_mac"):
746                     tx_port_iter = re.finditer(r'\d+', item_val)
747                     tx_port_no = int(next(tx_port_iter).group(0))
748                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
749                     mac = intf["virtual-interface"]["dst_mac"]
750                     section_data[1] = mac.replace(":", " ", 6)
751
752                 if item_key == "dst mac" and item_val.startswith("@@"):
753                     tx_port_iter = re.finditer(r'\d+', item_val)
754                     tx_port_no = int(next(tx_port_iter).group(0))
755                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
756                     mac = intf["virtual-interface"]["dst_mac"]
757                     section_data[1] = mac
758
759                 if item_val.startswith("@@src_mac"):
760                     tx_port_iter = re.finditer(r'\d+', item_val)
761                     tx_port_no = int(next(tx_port_iter).group(0))
762                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
763                     mac = intf["virtual-interface"]["local_mac"]
764                     section_data[1] = mac.replace(":", " ", 6)
765
766                 if item_key == "src mac" and item_val.startswith("@@"):
767                     tx_port_iter = re.finditer(r'\d+', item_val)
768                     tx_port_no = int(next(tx_port_iter).group(0))
769                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
770                     mac = intf["virtual-interface"]["local_mac"]
771                     section_data[1] = mac
772
773         # if addition file specified in prox config
774         if not self.additional_files:
775             return sections
776
777         for section_name, section in sections:
778             for section_data in section:
779                 try:
780                     if section_data[0].startswith("dofile"):
781                         section_data[0] = self._insert_additional_file(section_data[0])
782
783                     if section_data[1].startswith("dofile"):
784                         section_data[1] = self._insert_additional_file(section_data[1])
785                 except:  # pylint: disable=bare-except
786                     pass
787
788         return sections
789
790     @staticmethod
791     def write_prox_lua(lua_config):
792         """
793         Write an .ini-format config file for PROX (parameters.lua)
794         PROX does not allow a space before/after the =, so we need
795         a custom method
796         """
797         out = []
798         for key in lua_config:
799             value = '"' + lua_config[key] + '"'
800             if key == "__name__":
801                 continue
802             if value is not None and value != '@':
803                 key = "=".join((key, str(value).replace('\n', '\n\t')))
804                 out.append(key)
805             else:
806                 key = str(key).replace('\n', '\n\t')
807                 out.append(key)
808         return os.linesep.join(out)
809
810     @staticmethod
811     def write_prox_config(prox_config):
812         """
813         Write an .ini-format config file for PROX
814         PROX does not allow a space before/after the =, so we need
815         a custom method
816         """
817         out = []
818         for (section_name, section) in prox_config:
819             out.append("[{}]".format(section_name))
820             for item in section:
821                 key, value = item
822                 if key == "__name__":
823                     continue
824                 if value is not None and value != '@':
825                     key = "=".join((key, str(value).replace('\n', '\n\t')))
826                     out.append(key)
827                 else:
828                     key = str(key).replace('\n', '\n\t')
829                     out.append(key)
830         return os.linesep.join(out)
831
832     def put_string_to_file(self, s, remote_path):
833         file_obj = cStringIO(s)
834         self.ssh_helper.put_file_obj(file_obj, remote_path)
835         return remote_path
836
837     def generate_prox_lua_file(self):
838         p = OrderedDict()
839         all_ports = self.vnfd_helper.port_pairs.all_ports
840         for port_name in all_ports:
841             port_num = self.vnfd_helper.port_num(port_name)
842             intf = self.vnfd_helper.find_interface(name=port_name)
843             vintf = intf['virtual-interface']
844             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
845             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
846
847         return p
848
849     def upload_prox_lua(self, config_file, lua_data):
850         # prox can't handle spaces around ' = ' so use custom method
851         out = StringIO(self.write_prox_lua(lua_data))
852         out.seek(0)
853         remote_path = os.path.join("/tmp", config_file)
854         self.ssh_helper.put_file_obj(out, remote_path)
855
856         return remote_path
857
858     def upload_prox_config(self, config_file, prox_config_data):
859         # prox can't handle spaces around ' = ' so use custom method
860         out = StringIO(self.write_prox_config(prox_config_data))
861         out.seek(0)
862         remote_path = os.path.join("/tmp", config_file)
863         self.ssh_helper.put_file_obj(out, remote_path)
864
865         return remote_path
866
867     def build_config_file(self):
868         task_path = self.scenario_helper.task_path
869         options = self.scenario_helper.options
870         config_path = options['prox_config']
871         config_file = os.path.basename(config_path)
872         config_path = utils.find_relative_file(config_path, task_path)
873         self.additional_files = {}
874
875         try:
876             if options['prox_generate_parameter']:
877                 self.lua = []
878                 self.lua = self.generate_prox_lua_file()
879                 if len(self.lua) > 0:
880                     self.upload_prox_lua("parameters.lua", self.lua)
881         except:  # pylint: disable=bare-except
882             pass
883
884         prox_files = options.get('prox_files', [])
885         if isinstance(prox_files, six.string_types):
886             prox_files = [prox_files]
887         for key_prox_file in prox_files:
888             base_prox_file = os.path.basename(key_prox_file)
889             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
890             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
891             self.additional_files[base_prox_file] = remote_prox_file
892
893         self._prox_config_data = self.generate_prox_config_file(config_path)
894         # copy config to queue so we can read it from traffic_runner process
895         self.config_queue.put(self._prox_config_data)
896         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
897
898     def build_config(self):
899         self.build_config_file()
900
901         options = self.scenario_helper.options
902         prox_args = options['prox_args']
903         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
904
905         self.pipeline_kwargs = {
906             'tool_path': tool_path,
907             'tool_dir': os.path.dirname(tool_path),
908             'cfg_file': self.remote_path,
909             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
910                              for k, v in prox_args.items())
911         }
912
913         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
914                         "{args} -f {cfg_file} '")
915         return cmd_template.format(**self.pipeline_kwargs)
916
917
918 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
919 class ProxResourceHelper(ClientResourceHelper):
920
921     RESOURCE_WORD = 'prox'
922
923     PROX_MODE = ""
924
925     WAIT_TIME = 3
926
927     @staticmethod
928     def find_pci(pci, bound_pci):
929         # we have to substring match PCI bus address from the end
930         return any(b.endswith(pci) for b in bound_pci)
931
932     def __init__(self, setup_helper):
933         super(ProxResourceHelper, self).__init__(setup_helper)
934         self.mgmt_interface = self.vnfd_helper.mgmt_interface
935         self._user = self.mgmt_interface["user"]
936         self._ip = self.mgmt_interface["ip"]
937
938         self.done = False
939         self._vpci_to_if_name_map = None
940         self.additional_file = {}
941         self.remote_prox_file_name = None
942         self.lower = None
943         self.upper = None
944         self.step_delta = 1
945         self.step_time = 0.5
946         self._test_type = None
947
948     @property
949     def sut(self):
950         if not self.client:
951             self.client = self._connect()
952         return self.client
953
954     @property
955     def test_type(self):
956         if self._test_type is None:
957             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
958         return self._test_type
959
960     def run_traffic(self, traffic_profile):
961         self._queue.cancel_join_thread()
962         self.lower = 0.0
963         self.upper = 100.0
964
965         traffic_profile.init(self._queue)
966         # this frees up the run_traffic loop
967         self.client_started.value = 1
968
969         while not self._terminated.value:
970             # move it all to traffic_profile
971             self._run_traffic_once(traffic_profile)
972
973     def _run_traffic_once(self, traffic_profile):
974         traffic_profile.execute_traffic(self)
975         if traffic_profile.done:
976             self._queue.put({'done': True})
977             LOG.debug("tg_prox done")
978             self._terminated.value = 1
979
980     # For VNF use ResourceHelper method to collect KPIs directly.
981     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
982     def collect_collectd_kpi(self):
983         return self._collect_resource_kpi()
984
985     def collect_kpi(self):
986         result = super(ProxResourceHelper, self).collect_kpi()
987         # add in collectd kpis manually
988         if result:
989             result['collect_stats'] = self._collect_resource_kpi()
990         return result
991
992     def terminate(self):
993         # should not be called, use VNF terminate
994         raise NotImplementedError()
995
996     def up_post(self):
997         return self.sut  # force connection
998
999     def execute(self, cmd, *args, **kwargs):
1000         func = getattr(self.sut, cmd, None)
1001         if func:
1002             return func(*args, **kwargs)
1003         return None
1004
1005     def _connect(self, client=None):
1006         """Run and connect to prox on the remote system """
1007         # De-allocating a large amount of hugepages takes some time. If a new
1008         # PROX instance is started immediately after killing the previous one,
1009         # it might not be able to allocate hugepages, because they are still
1010         # being freed. Hence the -w switch.
1011         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1012         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1013         # -f ./handle_none-4.cfg"
1014         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1015         #  "; " \
1016         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1017         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1018         # sudo " \
1019         #    + "./build/Prox " + prox_args
1020         # log.debug("Starting PROX with command [%s]", prox_cmd)
1021         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1022         # self._ip, prox_cmd))
1023         if client is None:
1024             client = ProxSocketHelper()
1025
1026         # try connecting to Prox for 60s
1027         for _ in range(RETRY_SECONDS):
1028             time.sleep(RETRY_INTERVAL)
1029             try:
1030                 client.connect(self._ip, PROX_PORT)
1031             except (socket.gaierror, socket.error):
1032                 continue
1033             else:
1034                 return client
1035
1036         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1037         raise Exception(msg.format(self._ip, PROX_PORT))
1038
1039
1040 class ProxDataHelper(object):
1041
1042     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1043         super(ProxDataHelper, self).__init__()
1044         self.vnfd_helper = vnfd_helper
1045         self.sut = sut
1046         self.pkt_size = pkt_size
1047         self.value = value
1048         self.line_speed = line_speed
1049         self.tolerated_loss = tolerated_loss
1050         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1051         self.tsc_hz = None
1052         self.measured_stats = None
1053         self.latency = None
1054         self._totals_and_pps = None
1055         self.result_tuple = None
1056
1057     @property
1058     def totals_and_pps(self):
1059         if self._totals_and_pps is None:
1060             rx_total = tx_total = 0
1061             all_ports = self.sut.multi_port_stats(range(self.port_count))
1062             for port in all_ports:
1063                 rx_total = rx_total + port[1]
1064                 tx_total = tx_total + port[2]
1065             requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1066             self._totals_and_pps = rx_total, tx_total, requested_pps
1067         return self._totals_and_pps
1068
1069     @property
1070     def rx_total(self):
1071         return self.totals_and_pps[0]
1072
1073     @property
1074     def tx_total(self):
1075         return self.totals_and_pps[1]
1076
1077     @property
1078     def requested_pps(self):
1079         return self.totals_and_pps[2]
1080
1081     @property
1082     def samples(self):
1083         samples = {}
1084         ports = []
1085         port_names = []
1086         for port_name, port_num in self.vnfd_helper.ports_iter():
1087             ports.append(port_num)
1088             port_names.append(port_name)
1089
1090         results = self.sut.multi_port_stats(ports)
1091         for result in results:
1092             port_num = result[0]
1093             samples[port_names[port_num]] = {
1094                     "in_packets": result[1],
1095                     "out_packets": result[2]}
1096         return samples
1097
1098     def __enter__(self):
1099         self.check_interface_count()
1100         return self
1101
1102     def __exit__(self, exc_type, exc_val, exc_tb):
1103         self.make_tuple()
1104
1105     def make_tuple(self):
1106         if self.result_tuple:
1107             return
1108
1109         self.result_tuple = ProxTestDataTuple(
1110             self.tolerated_loss,
1111             self.tsc_hz,
1112             self.measured_stats['delta'].rx,
1113             self.measured_stats['delta'].tx,
1114             self.measured_stats['delta'].tsc,
1115             self.latency,
1116             self.rx_total,
1117             self.tx_total,
1118             self.requested_pps,
1119         )
1120         self.result_tuple.log_data()
1121
1122     @contextmanager
1123     def measure_tot_stats(self):
1124         with self.sut.measure_tot_stats() as self.measured_stats:
1125             yield
1126
1127     def check_interface_count(self):
1128         # do this assert in init?  unless we expect interface count to
1129         # change from one run to another run...
1130         assert self.port_count in {1, 2, 4}, \
1131             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1132
1133     def capture_tsc_hz(self):
1134         self.tsc_hz = float(self.sut.hz())
1135
1136     def line_rate_to_pps(self):
1137       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1138
1139 class ProxProfileHelper(object):
1140
1141     __prox_profile_type__ = "Generic"
1142
1143     PROX_CORE_GEN_MODE = "gen"
1144     PROX_CORE_LAT_MODE = "lat"
1145
1146     @classmethod
1147     def get_cls(cls, helper_type):
1148         """Return class of specified type."""
1149         if not helper_type:
1150             return ProxProfileHelper
1151
1152         for profile_helper_class in utils.itersubclasses(cls):
1153             if helper_type == profile_helper_class.__prox_profile_type__:
1154                 return profile_helper_class
1155
1156         return ProxProfileHelper
1157
1158     @classmethod
1159     def make_profile_helper(cls, resource_helper):
1160         return cls.get_cls(resource_helper.test_type)(resource_helper)
1161
1162     def __init__(self, resource_helper):
1163         super(ProxProfileHelper, self).__init__()
1164         self.resource_helper = resource_helper
1165         self._cpu_topology = None
1166         self._test_cores = None
1167         self._latency_cores = None
1168
1169     @property
1170     def cpu_topology(self):
1171         if not self._cpu_topology:
1172             stdout = io.BytesIO()
1173             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1174             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1175         return self._cpu_topology
1176
1177     @property
1178     def test_cores(self):
1179         if not self._test_cores:
1180             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1181         return self._test_cores
1182
1183     @property
1184     def latency_cores(self):
1185         if not self._latency_cores:
1186             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1187         return self._latency_cores
1188
1189     @contextmanager
1190     def traffic_context(self, pkt_size, value):
1191         self.sut.stop_all()
1192         self.sut.reset_stats()
1193         try:
1194             self.sut.set_pkt_size(self.test_cores, pkt_size)
1195             self.sut.set_speed(self.test_cores, value)
1196             self.sut.start_all()
1197             time.sleep(1)
1198             yield
1199         finally:
1200             self.sut.stop_all()
1201
1202     def get_cores(self, mode):
1203         cores = []
1204
1205         for section_name, section in self.setup_helper.prox_config_data:
1206             if not section_name.startswith("core"):
1207                 continue
1208
1209             for key, value in section:
1210                 if key == "mode" and value == mode:
1211                     core_tuple = CoreSocketTuple(section_name)
1212                     core = core_tuple.core_id
1213                     cores.append(core)
1214
1215         return cores
1216
1217     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1218                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1219         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1220                                      value, tolerated_loss, line_speed)
1221
1222         with data_helper, self.traffic_context(pkt_size, value):
1223             with data_helper.measure_tot_stats():
1224                 time.sleep(duration)
1225                 # Getting statistics to calculate PPS at right speed....
1226                 data_helper.capture_tsc_hz()
1227                 data_helper.latency = self.get_latency()
1228
1229         return data_helper.result_tuple, data_helper.samples
1230
1231     def get_latency(self):
1232         """
1233         :return: return lat_min, lat_max, lat_avg
1234         :rtype: list
1235         """
1236
1237         if not self._latency_cores:
1238             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1239
1240         if self._latency_cores:
1241             return self.sut.lat_stats(self._latency_cores)
1242         return []
1243
1244     def terminate(self):
1245         pass
1246
1247     def __getattr__(self, item):
1248         return getattr(self.resource_helper, item)
1249
1250
1251 class ProxMplsProfileHelper(ProxProfileHelper):
1252
1253     __prox_profile_type__ = "MPLS tag/untag"
1254
1255     def __init__(self, resource_helper):
1256         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1257         self._cores_tuple = None
1258
1259     @property
1260     def mpls_cores(self):
1261         if not self._cores_tuple:
1262             self._cores_tuple = self.get_cores_mpls()
1263         return self._cores_tuple
1264
1265     @property
1266     def tagged_cores(self):
1267         return self.mpls_cores[0]
1268
1269     @property
1270     def plain_cores(self):
1271         return self.mpls_cores[1]
1272
1273     def get_cores_mpls(self):
1274         cores_tagged = []
1275         cores_plain = []
1276         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1277             if not section_name.startswith("core"):
1278                 continue
1279
1280             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1281                 continue
1282
1283             for item_key, item_value in section:
1284                 if item_key != 'name':
1285                     continue
1286
1287                 if item_value.startswith("tag"):
1288                     core_tuple = CoreSocketTuple(section_name)
1289                     core_tag = core_tuple.core_id
1290                     cores_tagged.append(core_tag)
1291
1292                 elif item_value.startswith("udp"):
1293                     core_tuple = CoreSocketTuple(section_name)
1294                     core_udp = core_tuple.core_id
1295                     cores_plain.append(core_udp)
1296
1297         return cores_tagged, cores_plain
1298
1299     @contextmanager
1300     def traffic_context(self, pkt_size, value):
1301         self.sut.stop_all()
1302         self.sut.reset_stats()
1303         try:
1304             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1305             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1306             self.sut.set_speed(self.tagged_cores, value)
1307             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1308             self.sut.set_speed(self.plain_cores, value * ratio)
1309             self.sut.start_all()
1310             time.sleep(1)
1311             yield
1312         finally:
1313             self.sut.stop_all()
1314
1315
1316 class ProxBngProfileHelper(ProxProfileHelper):
1317
1318     __prox_profile_type__ = "BNG gen"
1319
1320     def __init__(self, resource_helper):
1321         super(ProxBngProfileHelper, self).__init__(resource_helper)
1322         self._cores_tuple = None
1323
1324     @property
1325     def bng_cores(self):
1326         if not self._cores_tuple:
1327             self._cores_tuple = self.get_cores_gen_bng_qos()
1328         return self._cores_tuple
1329
1330     @property
1331     def cpe_cores(self):
1332         return self.bng_cores[0]
1333
1334     @property
1335     def inet_cores(self):
1336         return self.bng_cores[1]
1337
1338     @property
1339     def arp_cores(self):
1340         return self.bng_cores[2]
1341
1342     @property
1343     def arp_task_cores(self):
1344         return self.bng_cores[3]
1345
1346     @property
1347     def all_rx_cores(self):
1348         return self.latency_cores
1349
1350     def get_cores_gen_bng_qos(self):
1351         cpe_cores = []
1352         inet_cores = []
1353         arp_cores = []
1354         arp_tasks_core = [0]
1355         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1356             if not section_name.startswith("core"):
1357                 continue
1358
1359             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1360                 continue
1361
1362             for item_key, item_value in section:
1363                 if item_key != 'name':
1364                     continue
1365
1366                 if item_value.startswith("cpe"):
1367                     core_tuple = CoreSocketTuple(section_name)
1368                     cpe_core = core_tuple.core_id
1369                     cpe_cores.append(cpe_core)
1370
1371                 elif item_value.startswith("inet"):
1372                     core_tuple = CoreSocketTuple(section_name)
1373                     inet_core = core_tuple.core_id
1374                     inet_cores.append(inet_core)
1375
1376                 elif item_value.startswith("arp"):
1377                     core_tuple = CoreSocketTuple(section_name)
1378                     arp_core = core_tuple.core_id
1379                     arp_cores.append(arp_core)
1380
1381                 # We check the tasks/core separately
1382                 if item_value.startswith("arp_task"):
1383                     core_tuple = CoreSocketTuple(section_name)
1384                     arp_task_core = core_tuple.core_id
1385                     arp_tasks_core.append(arp_task_core)
1386
1387         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1388
1389     @contextmanager
1390     def traffic_context(self, pkt_size, value):
1391         # Tester is sending packets at the required speed already after
1392         # setup_test(). Just get the current statistics, sleep the required
1393         # amount of time and calculate packet loss.
1394         inet_pkt_size = pkt_size
1395         cpe_pkt_size = pkt_size - 24
1396         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1397
1398         curr_up_speed = curr_down_speed = 0
1399         max_up_speed = max_down_speed = value
1400         if ratio < 1:
1401             max_down_speed = value * ratio
1402         else:
1403             max_up_speed = value / ratio
1404
1405         # Initialize cores
1406         self.sut.stop_all()
1407         time.sleep(0.5)
1408
1409         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1410         # wrong.
1411         self.sut.start(self.all_rx_cores)
1412         time.sleep(0.5)
1413         self.sut.stop(self.all_rx_cores)
1414         time.sleep(0.5)
1415         self.sut.reset_stats()
1416
1417         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1418         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1419
1420         self.sut.reset_values(self.cpe_cores)
1421         self.sut.reset_values(self.inet_cores)
1422
1423         # Set correct IP and UDP lengths in packet headers
1424         # CPE
1425         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1426         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1427         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1428         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1429
1430         # INET
1431         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1432         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1433         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1434         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1435         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1436         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1437
1438         # Sending ARP to initialize tables - need a few seconds of generation
1439         # to make sure all CPEs are initialized
1440         LOG.info("Initializing SUT: sending ARP packets")
1441         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1442         self.sut.set_speed(self.inet_cores, curr_up_speed)
1443         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1444         self.sut.start(self.arp_cores)
1445         time.sleep(4)
1446
1447         # Ramp up the transmission speed. First go to the common speed, then
1448         # increase steps for the faster one.
1449         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1450
1451         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1452
1453         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1454             # The min(..., ...) takes care of 1) floating point rounding errors
1455             # that could make curr_*_speed to be slightly greater than
1456             # max_*_speed and 2) max_*_speed not being an exact multiple of
1457             # self._step_delta.
1458             if curr_up_speed < max_up_speed:
1459                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1460             if curr_down_speed < max_down_speed:
1461                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1462
1463             self.sut.set_speed(self.inet_cores, curr_up_speed)
1464             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1465             time.sleep(self.step_time)
1466
1467         LOG.info("Target speeds reached. Starting real test.")
1468
1469         yield
1470
1471         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1472         LOG.info("Test ended. Flushing NIC buffers")
1473         self.sut.start(self.all_rx_cores)
1474         time.sleep(3)
1475         self.sut.stop(self.all_rx_cores)
1476
1477     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1478                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1479         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1480                                      value, tolerated_loss, line_speed)
1481
1482         with data_helper, self.traffic_context(pkt_size, value):
1483             with data_helper.measure_tot_stats():
1484                 time.sleep(duration)
1485                 # Getting statistics to calculate PPS at right speed....
1486                 data_helper.capture_tsc_hz()
1487                 data_helper.latency = self.get_latency()
1488
1489         return data_helper.result_tuple, data_helper.samples
1490
1491
1492 class ProxVpeProfileHelper(ProxProfileHelper):
1493
1494     __prox_profile_type__ = "vPE gen"
1495
1496     def __init__(self, resource_helper):
1497         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1498         self._cores_tuple = None
1499         self._ports_tuple = None
1500
1501     @property
1502     def vpe_cores(self):
1503         if not self._cores_tuple:
1504             self._cores_tuple = self.get_cores_gen_vpe()
1505         return self._cores_tuple
1506
1507     @property
1508     def cpe_cores(self):
1509         return self.vpe_cores[0]
1510
1511     @property
1512     def inet_cores(self):
1513         return self.vpe_cores[1]
1514
1515     @property
1516     def all_rx_cores(self):
1517         return self.latency_cores
1518
1519     @property
1520     def vpe_ports(self):
1521         if not self._ports_tuple:
1522             self._ports_tuple = self.get_ports_gen_vpe()
1523         return self._ports_tuple
1524
1525     @property
1526     def cpe_ports(self):
1527         return self.vpe_ports[0]
1528
1529     @property
1530     def inet_ports(self):
1531         return self.vpe_ports[1]
1532
1533     def get_cores_gen_vpe(self):
1534         cpe_cores = []
1535         inet_cores = []
1536         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1537             if not section_name.startswith("core"):
1538                 continue
1539
1540             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1541                 continue
1542
1543             for item_key, item_value in section:
1544                 if item_key != 'name':
1545                     continue
1546
1547                 if item_value.startswith("cpe"):
1548                     core_tuple = CoreSocketTuple(section_name)
1549                     core_tag = core_tuple.core_id
1550                     cpe_cores.append(core_tag)
1551
1552                 elif item_value.startswith("inet"):
1553                     core_tuple = CoreSocketTuple(section_name)
1554                     inet_core = core_tuple.core_id
1555                     inet_cores.append(inet_core)
1556
1557         return cpe_cores, inet_cores
1558
1559     def get_ports_gen_vpe(self):
1560         cpe_ports = []
1561         inet_ports = []
1562
1563         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1564             if not section_name.startswith("port"):
1565                 continue
1566             tx_port_iter = re.finditer(r'\d+', section_name)
1567             tx_port_no = int(next(tx_port_iter).group(0))
1568
1569             for item_key, item_value in section:
1570                 if item_key != 'name':
1571                     continue
1572
1573                 if item_value.startswith("cpe"):
1574                     cpe_ports.append(tx_port_no)
1575
1576                 elif item_value.startswith("inet"):
1577                     inet_ports.append(tx_port_no)
1578
1579         return cpe_ports, inet_ports
1580
1581     @contextmanager
1582     def traffic_context(self, pkt_size, value):
1583         # Calculate the target upload and download speed. The upload and
1584         # download packets have different packet sizes, so in order to get
1585         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1586         # of the packet sizes.
1587         cpe_pkt_size = pkt_size
1588         inet_pkt_size = pkt_size - 4
1589         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1590
1591         curr_up_speed = curr_down_speed = 0
1592         max_up_speed = max_down_speed = value
1593         if ratio < 1:
1594             max_down_speed = value * ratio
1595         else:
1596             max_up_speed = value / ratio
1597
1598         # Adjust speed when multiple cores per port are used to generate traffic
1599         if len(self.cpe_ports) != len(self.cpe_cores):
1600             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1601         if len(self.inet_ports) != len(self.inet_cores):
1602             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1603
1604         # Initialize cores
1605         self.sut.stop_all()
1606         time.sleep(2)
1607
1608         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1609         # wrong.
1610         self.sut.start(self.all_rx_cores)
1611         time.sleep(2)
1612         self.sut.stop(self.all_rx_cores)
1613         time.sleep(2)
1614         self.sut.reset_stats()
1615
1616         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1617         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1618
1619         self.sut.reset_values(self.cpe_cores)
1620         self.sut.reset_values(self.inet_cores)
1621
1622         # Set correct IP and UDP lengths in packet headers
1623         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1624         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1625         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1626         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1627
1628         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1629         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1630         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1631         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1632
1633         self.sut.set_speed(self.inet_cores, curr_up_speed)
1634         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1635
1636         # Ramp up the transmission speed. First go to the common speed, then
1637         # increase steps for the faster one.
1638         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1639
1640         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1641
1642         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1643             # The min(..., ...) takes care of 1) floating point rounding errors
1644             # that could make curr_*_speed to be slightly greater than
1645             # max_*_speed and 2) max_*_speed not being an exact multiple of
1646             # self._step_delta.
1647             if curr_up_speed < max_up_speed:
1648                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1649             if curr_down_speed < max_down_speed:
1650                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1651
1652             self.sut.set_speed(self.inet_cores, curr_up_speed)
1653             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1654             time.sleep(self.step_time)
1655
1656         LOG.info("Target speeds reached. Starting real test.")
1657
1658         yield
1659
1660         self.sut.stop(self.cpe_cores + self.inet_cores)
1661         LOG.info("Test ended. Flushing NIC buffers")
1662         self.sut.start(self.all_rx_cores)
1663         time.sleep(3)
1664         self.sut.stop(self.all_rx_cores)
1665
1666     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1667                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1668         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1669                                      value, tolerated_loss, line_speed)
1670
1671         with data_helper, self.traffic_context(pkt_size, value):
1672             with data_helper.measure_tot_stats():
1673                 time.sleep(duration)
1674                 # Getting statistics to calculate PPS at right speed....
1675                 data_helper.capture_tsc_hz()
1676                 data_helper.latency = self.get_latency()
1677
1678         return data_helper.result_tuple, data_helper.samples
1679
1680
1681 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1682
1683     __prox_profile_type__ = "lwAFTR gen"
1684
1685     def __init__(self, resource_helper):
1686         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1687         self._cores_tuple = None
1688         self._ports_tuple = None
1689         self.step_delta = 5
1690         self.step_time = 0.5
1691
1692     @property
1693     def _lwaftr_cores(self):
1694         if not self._cores_tuple:
1695             self._cores_tuple = self._get_cores_gen_lwaftr()
1696         return self._cores_tuple
1697
1698     @property
1699     def tun_cores(self):
1700         return self._lwaftr_cores[0]
1701
1702     @property
1703     def inet_cores(self):
1704         return self._lwaftr_cores[1]
1705
1706     @property
1707     def _lwaftr_ports(self):
1708         if not self._ports_tuple:
1709             self._ports_tuple = self._get_ports_gen_lw_aftr()
1710         return self._ports_tuple
1711
1712     @property
1713     def tun_ports(self):
1714         return self._lwaftr_ports[0]
1715
1716     @property
1717     def inet_ports(self):
1718         return self._lwaftr_ports[1]
1719
1720     @property
1721     def all_rx_cores(self):
1722         return self.latency_cores
1723
1724     def _get_cores_gen_lwaftr(self):
1725         tun_cores = []
1726         inet_cores = []
1727         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1728             if not section_name.startswith("core"):
1729                 continue
1730
1731             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1732                 continue
1733
1734             core_tuple = CoreSocketTuple(section_name)
1735             core_tag = core_tuple.core_id
1736             for item_value in (v for k, v in section if k == 'name'):
1737                 if item_value.startswith('tun'):
1738                     tun_cores.append(core_tag)
1739                 elif item_value.startswith('inet'):
1740                     inet_cores.append(core_tag)
1741
1742         return tun_cores, inet_cores
1743
1744     def _get_ports_gen_lw_aftr(self):
1745         tun_ports = []
1746         inet_ports = []
1747
1748         re_port = re.compile(r'port (\d+)')
1749         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1750             match = re_port.search(section_name)
1751             if not match:
1752                 continue
1753
1754             tx_port_no = int(match.group(1))
1755             for item_value in (v for k, v in section if k == 'name'):
1756                 if item_value.startswith('lwB4'):
1757                     tun_ports.append(tx_port_no)
1758                 elif item_value.startswith('inet'):
1759                     inet_ports.append(tx_port_no)
1760
1761         return tun_ports, inet_ports
1762
1763     @staticmethod
1764     def _resize(len1, len2):
1765         if len1 == len2:
1766             return 1.0
1767         return 1.0 * len1 / len2
1768
1769     @contextmanager
1770     def traffic_context(self, pkt_size, value):
1771         # Tester is sending packets at the required speed already after
1772         # setup_test(). Just get the current statistics, sleep the required
1773         # amount of time and calculate packet loss.
1774         tun_pkt_size = pkt_size
1775         inet_pkt_size = pkt_size - 40
1776         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1777
1778         curr_up_speed = curr_down_speed = 0
1779         max_up_speed = max_down_speed = value
1780
1781         max_up_speed = value / ratio
1782
1783         # Adjust speed when multiple cores per port are used to generate traffic
1784         if len(self.tun_ports) != len(self.tun_cores):
1785             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1786         if len(self.inet_ports) != len(self.inet_cores):
1787             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1788
1789         # Initialize cores
1790         self.sut.stop_all()
1791         time.sleep(0.5)
1792
1793         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1794         # wrong.
1795         self.sut.start(self.all_rx_cores)
1796         time.sleep(0.5)
1797         self.sut.stop(self.all_rx_cores)
1798         time.sleep(0.5)
1799         self.sut.reset_stats()
1800
1801         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1802         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1803
1804         self.sut.reset_values(self.tun_cores)
1805         self.sut.reset_values(self.inet_cores)
1806
1807         # Set correct IP and UDP lengths in packet headers
1808         # tun
1809         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1810         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1811         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1812         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1813         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1814         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1815
1816         # INET
1817         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1818         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1819         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1820         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1821
1822         LOG.info("Initializing SUT: sending lwAFTR packets")
1823         self.sut.set_speed(self.inet_cores, curr_up_speed)
1824         self.sut.set_speed(self.tun_cores, curr_down_speed)
1825         time.sleep(4)
1826
1827         # Ramp up the transmission speed. First go to the common speed, then
1828         # increase steps for the faster one.
1829         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1830
1831         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1832
1833         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1834             # The min(..., ...) takes care of 1) floating point rounding errors
1835             # that could make curr_*_speed to be slightly greater than
1836             # max_*_speed and 2) max_*_speed not being an exact multiple of
1837             # self._step_delta.
1838             if curr_up_speed < max_up_speed:
1839                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1840             if curr_down_speed < max_down_speed:
1841                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1842
1843             self.sut.set_speed(self.inet_cores, curr_up_speed)
1844             self.sut.set_speed(self.tun_cores, curr_down_speed)
1845             time.sleep(self.step_time)
1846
1847         LOG.info("Target speeds reached. Starting real test.")
1848
1849         yield
1850
1851         self.sut.stop(self.tun_cores + self.inet_cores)
1852         LOG.info("Test ended. Flushing NIC buffers")
1853         self.sut.start(self.all_rx_cores)
1854         time.sleep(3)
1855         self.sut.stop(self.all_rx_cores)
1856
1857     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1858                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1859         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1860                                      value, tolerated_loss, line_speed)
1861
1862         with data_helper, self.traffic_context(pkt_size, value):
1863             with data_helper.measure_tot_stats():
1864                 time.sleep(duration)
1865                 # Getting statistics to calculate PPS at right speed....
1866                 data_helper.capture_tsc_hz()
1867                 data_helper.latency = self.get_latency()
1868
1869         return data_helper.result_tuple, data_helper.samples