Decrease Sampling interval
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import array
16 import io
17 import logging
18 import operator
19 import os
20 import re
21 import select
22 import socket
23 import time
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
28
29 import six
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
32
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
39
40 PROX_PORT = 8474
41
42 SECTION_NAME = 0
43 SECTION_CONTENTS = 1
44
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47
48 BITS_PER_BYTE = 8
49 RETRY_SECONDS = 60
50 RETRY_INTERVAL = 1
51
52 CONFIGURATION_OPTIONS = (
53     # dict key           section     key               default value
54     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
55     ('testDuration', 'general', 'test_duration', 5.0),
56     ('testPrecision', 'general', 'test_precision', 1.0),
57     ('tests', 'general', 'tests', None),
58     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
59
60     ('logFile', 'logging', 'file', 'dats.log'),
61     ('logDateFormat', 'logging', 'datefmt', None),
62     ('logLevel', 'logging', 'level', 'INFO'),
63     ('logOverwrite', 'logging', 'overwrite', 1),
64
65     ('testerIp', 'tester', 'ip', None),
66     ('testerUser', 'tester', 'user', 'root'),
67     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
68     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
70     ('testerSocketId', 'tester', 'socket_id', 0),
71
72     ('sutIp', 'sut', 'ip', None),
73     ('sutUser', 'sut', 'user', 'root'),
74     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
75     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
76     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
77     ('sutSocketId', 'sut', 'socket_id', 0),
78 )
79
80
81 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
82     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
83
84     def __new__(cls, *args):
85         try:
86             matches = cls.CORE_RE.search(str(args[0]))
87             if matches:
88                 args = matches.groups()
89
90             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
91                                                        'h' if args[2] else '')
92
93         except (AttributeError, TypeError, IndexError, ValueError):
94             raise ValueError('Invalid core spec {}'.format(args))
95
96     def is_hyperthread(self):
97         return self.hyperthread == 'h'
98
99     @property
100     def index(self):
101         return int(self.is_hyperthread())
102
103     def find_in_topology(self, cpu_topology):
104         try:
105             socket_core_match = cpu_topology[self.socket_id][self.core_id]
106             sorted_match = sorted(socket_core_match.values())
107             return sorted_match[self.index][0]
108         except (KeyError, IndexError):
109             template = "Core {}{} on socket {} does not exist"
110             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
111
112
113 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
114     def __new__(cls, *args):
115         try:
116             assert args[0] is not str(args[0])
117             args = tuple(args[0])
118         except (AssertionError, IndexError, TypeError):
119             pass
120
121         return super(TotStatsTuple, cls).__new__(cls, *args)
122
123
124 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
125                                                         'delta_tx,delta_tsc,'
126                                                         'latency,rx_total,tx_total,pps')):
127     @property
128     def pkt_loss(self):
129         try:
130             return 1e2 * self.drop_total / float(self.tx_total)
131         except ZeroDivisionError:
132             return 100.0
133
134     @property
135     def mpps(self):
136         # calculate the effective throughput in Mpps
137         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
138
139     @property
140     def can_be_lost(self):
141         return int(self.tx_total * self.tolerated / 1e2)
142
143     @property
144     def drop_total(self):
145         return self.tx_total - self.rx_total
146
147     @property
148     def success(self):
149         return self.drop_total <= self.can_be_lost
150
151     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
152         if pkt_loss is None:
153             pkt_loss = self.pkt_loss
154
155         if port_samples is None:
156             port_samples = {}
157
158         latency_keys = [
159             "LatencyMin",
160             "LatencyMax",
161             "LatencyAvg",
162         ]
163
164         samples = {
165             "Throughput": self.mpps,
166             "DropPackets": pkt_loss,
167             "CurrentDropPackets": pkt_loss,
168             "TxThroughput": self.pps / 1e6,
169             "RxThroughput": self.mpps,
170             "PktSize": pkt_size,
171         }
172         if port_samples:
173             samples.update(port_samples)
174
175         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
176         return samples
177
178     def log_data(self, logger=None):
179         if logger is None:
180             logger = LOG
181
182         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
183         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
184         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
185
186
187 class PacketDump(object):
188     @staticmethod
189     def assert_func(func, value1, value2, template=None):
190         assert func(value1, value2), template.format(value1, value2)
191
192     def __init__(self, port_id, data_len, payload):
193         template = "Packet dump has specified length {}, but payload is {} bytes long"
194         self.assert_func(operator.eq, data_len, len(payload), template)
195         self._port_id = port_id
196         self._data_len = data_len
197         self._payload = payload
198
199     @property
200     def port_id(self):
201         """Get the port id of the packet dump"""
202         return self._port_id
203
204     @property
205     def data_len(self):
206         """Get the length of the data received"""
207         return self._data_len
208
209     def __str__(self):
210         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
211
212     def payload(self, start=None, end=None):
213         """Get part of the payload as a list of ordinals.
214
215         Returns a list of byte values, matching the contents of the packet dump.
216         Optional start and end parameters can be specified to retrieve only a
217         part of the packet contents.
218
219         The number of elements in the list is equal to end - start + 1, so end
220         is the offset of the last character.
221
222         Args:
223             start (pos. int): the starting offset in the payload. If it is not
224                 specified or None, offset 0 is assumed.
225             end (pos. int): the ending offset of the payload. If it is not
226                 specified or None, the contents until the end of the packet are
227                 returned.
228
229         Returns:
230             [int, int, ...]. Each int represents the ordinal value of a byte in
231             the packet payload.
232         """
233         if start is None:
234             start = 0
235
236         if end is None:
237             end = self.data_len - 1
238
239         # Bounds checking on offsets
240         template = "Start offset must be non-negative"
241         self.assert_func(operator.ge, start, 0, template)
242
243         template = "End offset must be less than {1}"
244         self.assert_func(operator.lt, end, self.data_len, template)
245
246         # Adjust for splice operation: end offset must be 1 more than the offset
247         # of the last desired character.
248         end += 1
249
250         return self._payload[start:end]
251
252
253 class ProxSocketHelper(object):
254
255     def __init__(self, sock=None):
256         """ creates new prox instance """
257         super(ProxSocketHelper, self).__init__()
258
259         if sock is None:
260             sock = socket.socket()
261
262         self._sock = sock
263         self._pkt_dumps = []
264         self.master_stats = None
265
266     def connect(self, ip, port):
267         """Connect to the prox instance on the remote system"""
268         self._sock.connect((ip, port))
269
270     def get_socket(self):
271         """ get the socket connected to the remote instance """
272         return self._sock
273
274     def _parse_socket_data(self, decoded_data, pkt_dump_only):
275         def get_newline_index():
276             return decoded_data.find('\n', index)
277
278         ret_str = ''
279         index = 0
280         for newline_index in iter(get_newline_index, -1):
281             ret_str = decoded_data[index:newline_index]
282
283             try:
284                 mode, port_id, data_len = ret_str.split(',', 2)
285             except ValueError:
286                 mode, port_id, data_len = None, None, None
287
288             if mode != 'pktdump':
289                 # Regular 1-line message. Stop reading from the socket.
290                 LOG.debug("Regular response read")
291                 return ret_str
292
293             LOG.debug("Packet dump header read: [%s]", ret_str)
294
295             # The line is a packet dump header. Parse it, read the
296             # packet payload, store the dump for later retrieval.
297             # Skip over the packet dump and continue processing: a
298             # 1-line response may follow the packet dump.
299
300             data_len = int(data_len)
301             data_start = newline_index + 1  # + 1 to skip over \n
302             data_end = data_start + data_len
303             sub_data = decoded_data[data_start:data_end]
304             pkt_payload = array.array('B', (ord(v) for v in sub_data))
305             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
306             self._pkt_dumps.append(pkt_dump)
307
308             if pkt_dump_only:
309                 # Return boolean instead of string to signal
310                 # successful reception of the packet dump.
311                 LOG.debug("Packet dump stored, returning")
312                 return True
313
314             index = data_end + 1
315
316         return ret_str
317
318     def get_data(self, pkt_dump_only=False, timeout=0.01):
319         """ read data from the socket """
320
321         # This method behaves slightly differently depending on whether it is
322         # called to read the response to a command (pkt_dump_only = 0) or if
323         # it is called specifically to read a packet dump (pkt_dump_only = 1).
324         #
325         # Packet dumps look like:
326         #   pktdump,<port_id>,<data_len>\n
327         #   <packet contents as byte array>\n
328         # This means the total packet dump message consists of 2 lines instead
329         # of 1 line.
330         #
331         # - Response for a command (pkt_dump_only = 0):
332         #   1) Read response from the socket until \n (end of message)
333         #   2a) If the response is a packet dump header (starts with "pktdump,"):
334         #     - Read the packet payload and store the packet dump for later
335         #       retrieval.
336         #     - Reset the state and restart from 1). Eventually state 2b) will
337         #       be reached and the function will return.
338         #   2b) If the response is not a packet dump:
339         #     - Return the received message as a string
340         #
341         # - Explicit request to read a packet dump (pkt_dump_only = 1):
342         #   - Read the dump header and payload
343         #   - Store the packet dump for later retrieval
344         #   - Return True to signify a packet dump was successfully read
345
346         def is_ready():
347             # recv() is blocking, so avoid calling it when no data is waiting.
348             ready = select.select([self._sock], [], [], timeout)
349             return bool(ready[0])
350
351         status = False
352         ret_str = ""
353         for status in iter(is_ready, False):
354             decoded_data = self._sock.recv(256).decode('utf-8')
355             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
356
357         LOG.debug("Received data from socket: [%s]", ret_str)
358         return ret_str if status else ''
359
360     def put_command(self, to_send):
361         """ send data to the remote instance """
362         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
363         try:
364             # NOTE: sendall will block, we need a timeout
365             self._sock.sendall(to_send.encode('utf-8'))
366         except:  # pylint: disable=bare-except
367             pass
368
369     def get_packet_dump(self):
370         """ get the next packet dump """
371         if self._pkt_dumps:
372             return self._pkt_dumps.pop(0)
373         return None
374
375     def stop_all_reset(self):
376         """ stop the remote instance and reset stats """
377         LOG.debug("Stop all and reset stats")
378         self.stop_all()
379         self.reset_stats()
380
381     def stop_all(self):
382         """ stop all cores on the remote instance """
383         LOG.debug("Stop all")
384         self.put_command("stop all\n")
385         time.sleep(3)
386
387     def stop(self, cores, task=''):
388         """ stop specific cores on the remote instance """
389         LOG.debug("Stopping cores %s", cores)
390         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
391         time.sleep(3)
392
393     def start_all(self):
394         """ start all cores on the remote instance """
395         LOG.debug("Start all")
396         self.put_command("start all\n")
397
398     def start(self, cores):
399         """ start specific cores on the remote instance """
400         LOG.debug("Starting cores %s", cores)
401         self.put_command("start {}\n".format(join_non_strings(',', cores)))
402         time.sleep(3)
403
404     def reset_stats(self):
405         """ reset the statistics on the remote instance """
406         LOG.debug("Reset stats")
407         self.put_command("reset stats\n")
408         time.sleep(1)
409
410     def _run_template_over_cores(self, template, cores, *args):
411         for core in cores:
412             self.put_command(template.format(core, *args))
413
414     def set_pkt_size(self, cores, pkt_size):
415         """ set the packet size to generate on the remote instance """
416         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
417         pkt_size -= 4
418         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
419         time.sleep(1)
420
421     def set_value(self, cores, offset, value, length):
422         """ set value on the remote instance """
423         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
424         LOG.debug(msg, cores, value, length, offset)
425         template = "set value {} 0 {} {} {}\n"
426         self._run_template_over_cores(template, cores, offset, value, length)
427
428     def reset_values(self, cores):
429         """ reset values on the remote instance """
430         LOG.debug("Set value for core(s) %s", cores)
431         self._run_template_over_cores("reset values {} 0\n", cores)
432
433     def set_speed(self, cores, speed, tasks=None):
434         """ set speed on the remote instance """
435         if tasks is None:
436             tasks = [0] * len(cores)
437         elif len(tasks) != len(cores):
438             LOG.error("set_speed: cores and tasks must have the same len")
439         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
440         for (core, task) in list(zip(cores, tasks)):
441             self.put_command("speed {} {} {}\n".format(core, task, speed))
442
443     def slope_speed(self, cores_speed, duration, n_steps=0):
444         """will start to increase speed from 0 to N where N is taken from
445         a['speed'] for each a in cores_speed"""
446         # by default, each step will take 0.5 sec
447         if n_steps == 0:
448             n_steps = duration * 2
449
450         private_core_data = []
451         step_duration = float(duration) / n_steps
452         for core_data in cores_speed:
453             target = float(core_data['speed'])
454             private_core_data.append({
455                 'cores': core_data['cores'],
456                 'zero': 0,
457                 'delta': target / n_steps,
458                 'current': 0,
459                 'speed': target,
460             })
461
462         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
463         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
464             time.sleep(step_duration)
465             for core_data in private_core_data:
466                 core_data['current'] = core_data[key1] + core_data[key2]
467                 self.set_speed(core_data['cores'], core_data['current'])
468
469     def set_pps(self, cores, pps, pkt_size,
470                 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
471         """ set packets per second for specific cores on the remote instance """
472         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473         LOG.debug(msg, cores, pps, pkt_size)
474
475         # speed in percent of line-rate
476         speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
477         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478
479     def lat_stats(self, cores, task=0):
480         """Get the latency statistics from the remote system"""
481         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
482         lat_min = {}
483         lat_max = {}
484         lat_avg = {}
485         for core in cores:
486             self.put_command("lat stats {} {} \n".format(core, task))
487             ret = self.get_data()
488
489             try:
490                 lat_min[core], lat_max[core], lat_avg[core] = \
491                     tuple(int(n) for n in ret.split(",")[:3])
492
493             except (AttributeError, ValueError, TypeError):
494                 pass
495
496         return lat_min, lat_max, lat_avg
497
498     def get_all_tot_stats(self):
499         self.put_command("tot stats\n")
500         all_stats_str = self.get_data().split(",")
501         if len(all_stats_str) != 4:
502             all_stats = [0] * 4
503             return all_stats
504         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505         self.master_stats = all_stats
506         return all_stats
507
508     def hz(self):
509         return self.get_all_tot_stats()[3]
510
511     def core_stats(self, cores, task=0):
512         """Get the receive statistics from the remote system"""
513         rx = tx = drop = tsc = 0
514         for core in cores:
515             self.put_command("core stats {} {}\n".format(core, task))
516             ret = self.get_data().split(",")
517             rx += int(ret[0])
518             tx += int(ret[1])
519             drop += int(ret[2])
520             tsc = int(ret[3])
521         return rx, tx, drop, tsc
522
523     def multi_port_stats(self, ports):
524         """get counter values from all ports port"""
525
526         ports_str = ""
527         for port in ports:
528             ports_str = ports_str + str(port) + ","
529         ports_str = ports_str[:-1]
530
531         ports_all_data = []
532         tot_result = [0] * len(ports)
533
534         retry_counter = 0
535         port_index = 0
536         while (len(ports) is not len(ports_all_data)) and (retry_counter < 10):
537             self.put_command("multi port stats {}\n".format(ports_str))
538             ports_all_data = self.get_data().split(";")
539
540             if len(ports) is len(ports_all_data):
541                 for port_data_str in ports_all_data:
542
543                     try:
544                         tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")]
545                     except (IndexError, TypeError):
546                         LOG.error("Port Index error %d  %s - retrying ", port_index, port_data_str)
547
548                     if (len(tot_result[port_index]) is not 6) or \
549                                     tot_result[port_index][0] is not ports[port_index]:
550                         ports_all_data = []
551                         tot_result = [0] * len(ports)
552                         port_index = 0
553                         time.sleep(0.1)
554                         LOG.error("Corrupted PACKET %s - retrying", port_data_str)
555                         break
556                     else:
557                         port_index = port_index + 1
558             else:
559                 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
560                 ports_all_data = []
561                 tot_result = [0] * len(ports)
562                 port_index = 0
563                 time.sleep(0.1)
564
565             retry_counter = retry_counter + 1
566         return tot_result
567
568     def port_stats(self, ports):
569         """get counter values from a specific port"""
570         tot_result = [0] * 12
571         for port in ports:
572             self.put_command("port_stats {}\n".format(port))
573             ret = [try_int(s, 0) for s in self.get_data().split(",")]
574             tot_result = [sum(x) for x in zip(tot_result, ret)]
575         return tot_result
576
577     @contextmanager
578     def measure_tot_stats(self):
579         start = self.get_all_tot_stats()
580         container = {'start_tot': start}
581         try:
582             yield container
583         finally:
584             container['end_tot'] = end = self.get_all_tot_stats()
585
586         container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
587
588     def tot_stats(self):
589         """Get the total statistics from the remote system"""
590         stats = self.get_all_tot_stats()
591         return stats[:3]
592
593     def tot_ierrors(self):
594         """Get the total ierrors from the remote system"""
595         self.put_command("tot ierrors tot\n")
596         recv = self.get_data().split(',')
597         tot_ierrors = int(recv[0])
598         tsc = int(recv[0])
599         return tot_ierrors, tsc
600
601     def set_count(self, count, cores):
602         """Set the number of packets to send on the specified core"""
603         self._run_template_over_cores("count {} 0 {}\n", cores, count)
604
605     def dump_rx(self, core_id, task_id=0, count=1):
606         """Activate dump on rx on the specified core"""
607         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
608         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
609         time.sleep(1.5)  # Give PROX time to set up packet dumping
610
611     def quit(self):
612         self.stop_all()
613         self._quit()
614         self.force_quit()
615
616     def _quit(self):
617         """ stop all cores on the remote instance """
618         LOG.debug("Quit prox")
619         self.put_command("quit\n")
620         time.sleep(3)
621
622     def force_quit(self):
623         """ stop all cores on the remote instance """
624         LOG.debug("Force Quit prox")
625         self.put_command("quit_force\n")
626         time.sleep(3)
627
628
629 _LOCAL_OBJECT = object()
630
631
632 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
633     # the actual app is lowercase
634     APP_NAME = 'prox'
635     # not used for Prox but added for consistency
636     VNF_TYPE = "PROX"
637
638     LUA_PARAMETER_NAME = ""
639     LUA_PARAMETER_PEER = {
640         "gen": "sut",
641         "sut": "gen",
642     }
643
644     CONFIG_QUEUE_TIMEOUT = 120
645
646     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
647         self.remote_path = None
648         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
649         self.remote_prox_file_name = None
650         self._prox_config_data = None
651         self.additional_files = {}
652         self.config_queue = Queue()
653         # allow_exit_without_flush
654         self.config_queue.cancel_join_thread()
655         self._global_section = None
656
657     @property
658     def prox_config_data(self):
659         if self._prox_config_data is None:
660             # this will block, but it needs too
661             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
662         return self._prox_config_data
663
664     @property
665     def global_section(self):
666         if self._global_section is None and self.prox_config_data:
667             self._global_section = self.find_section("global")
668         return self._global_section
669
670     def find_section(self, name, default=_LOCAL_OBJECT):
671         result = next((value for key, value in self.prox_config_data if key == name), default)
672         if result is _LOCAL_OBJECT:
673             raise KeyError('{} not found in Prox config'.format(name))
674         return result
675
676     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
677         section = self.find_section(section_name, [])
678         result = next((value for key, value in section if key == section_key), default)
679         if result is _LOCAL_OBJECT:
680             template = '{} not found in {} section of Prox config'
681             raise KeyError(template.format(section_key, section_name))
682         return result
683
684     def copy_to_target(self, config_file_path, prox_file):
685         remote_path = os.path.join("/tmp", prox_file)
686         self.ssh_helper.put(config_file_path, remote_path)
687         return remote_path
688
689     @staticmethod
690     def _get_tx_port(section, sections):
691         iface_port = [-1]
692         for item in sections[section]:
693             if item[0] == "tx port":
694                 iface_port = re.findall(r'\d+', item[1])
695                 # do we want the last one?
696                 #   if yes, then can we reverse?
697         return int(iface_port[0])
698
699     @staticmethod
700     def _replace_quoted_with_value(quoted, value, count=1):
701         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
702         return new_string
703
704     def _insert_additional_file(self, value):
705         file_str = value.split('"')
706         base_name = os.path.basename(file_str[1])
707         file_str[1] = self.additional_files[base_name]
708         return '"'.join(file_str)
709
710     def generate_prox_config_file(self, config_path):
711         sections = []
712         prox_config = ConfigParser(config_path, sections)
713         prox_config.parse()
714
715         # Ensure MAC is set "hardware"
716         all_ports = self.vnfd_helper.port_pairs.all_ports
717         # use dpdk port number
718         for port_name in all_ports:
719             port_num = self.vnfd_helper.port_num(port_name)
720             port_section_name = "port {}".format(port_num)
721             for section_name, section in sections:
722                 if port_section_name != section_name:
723                     continue
724
725                 for section_data in section:
726                     if section_data[0] == "mac":
727                         section_data[1] = "hardware"
728
729         # search for dst mac
730         for _, section in sections:
731             for section_data in section:
732                 item_key, item_val = section_data
733                 if item_val.startswith("@@dst_mac"):
734                     tx_port_iter = re.finditer(r'\d+', item_val)
735                     tx_port_no = int(next(tx_port_iter).group(0))
736                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
737                     mac = intf["virtual-interface"]["dst_mac"]
738                     section_data[1] = mac.replace(":", " ", 6)
739
740                 if item_key == "dst mac" and item_val.startswith("@@"):
741                     tx_port_iter = re.finditer(r'\d+', item_val)
742                     tx_port_no = int(next(tx_port_iter).group(0))
743                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
744                     mac = intf["virtual-interface"]["dst_mac"]
745                     section_data[1] = mac
746
747                 if item_val.startswith("@@src_mac"):
748                     tx_port_iter = re.finditer(r'\d+', item_val)
749                     tx_port_no = int(next(tx_port_iter).group(0))
750                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
751                     mac = intf["virtual-interface"]["local_mac"]
752                     section_data[1] = mac.replace(":", " ", 6)
753
754                 if item_key == "src mac" and item_val.startswith("@@"):
755                     tx_port_iter = re.finditer(r'\d+', item_val)
756                     tx_port_no = int(next(tx_port_iter).group(0))
757                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
758                     mac = intf["virtual-interface"]["local_mac"]
759                     section_data[1] = mac
760
761         # if addition file specified in prox config
762         if not self.additional_files:
763             return sections
764
765         for section_name, section in sections:
766             for section_data in section:
767                 try:
768                     if section_data[0].startswith("dofile"):
769                         section_data[0] = self._insert_additional_file(section_data[0])
770
771                     if section_data[1].startswith("dofile"):
772                         section_data[1] = self._insert_additional_file(section_data[1])
773                 except:  # pylint: disable=bare-except
774                     pass
775
776         return sections
777
778     @staticmethod
779     def write_prox_lua(lua_config):
780         """
781         Write an .ini-format config file for PROX (parameters.lua)
782         PROX does not allow a space before/after the =, so we need
783         a custom method
784         """
785         out = []
786         for key in lua_config:
787             value = '"' + lua_config[key] + '"'
788             if key == "__name__":
789                 continue
790             if value is not None and value != '@':
791                 key = "=".join((key, str(value).replace('\n', '\n\t')))
792                 out.append(key)
793             else:
794                 key = str(key).replace('\n', '\n\t')
795                 out.append(key)
796         return os.linesep.join(out)
797
798     @staticmethod
799     def write_prox_config(prox_config):
800         """
801         Write an .ini-format config file for PROX
802         PROX does not allow a space before/after the =, so we need
803         a custom method
804         """
805         out = []
806         for (section_name, section) in prox_config:
807             out.append("[{}]".format(section_name))
808             for item in section:
809                 key, value = item
810                 if key == "__name__":
811                     continue
812                 if value is not None and value != '@':
813                     key = "=".join((key, str(value).replace('\n', '\n\t')))
814                     out.append(key)
815                 else:
816                     key = str(key).replace('\n', '\n\t')
817                     out.append(key)
818         return os.linesep.join(out)
819
820     def put_string_to_file(self, s, remote_path):
821         file_obj = cStringIO(s)
822         self.ssh_helper.put_file_obj(file_obj, remote_path)
823         return remote_path
824
825     def generate_prox_lua_file(self):
826         p = OrderedDict()
827         all_ports = self.vnfd_helper.port_pairs.all_ports
828         for port_name in all_ports:
829             port_num = self.vnfd_helper.port_num(port_name)
830             intf = self.vnfd_helper.find_interface(name=port_name)
831             vintf = intf['virtual-interface']
832             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
833             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
834
835         return p
836
837     def upload_prox_lua(self, config_file, lua_data):
838         # prox can't handle spaces around ' = ' so use custom method
839         out = StringIO(self.write_prox_lua(lua_data))
840         out.seek(0)
841         remote_path = os.path.join("/tmp", config_file)
842         self.ssh_helper.put_file_obj(out, remote_path)
843
844         return remote_path
845
846     def upload_prox_config(self, config_file, prox_config_data):
847         # prox can't handle spaces around ' = ' so use custom method
848         out = StringIO(self.write_prox_config(prox_config_data))
849         out.seek(0)
850         remote_path = os.path.join("/tmp", config_file)
851         self.ssh_helper.put_file_obj(out, remote_path)
852
853         return remote_path
854
855     def build_config_file(self):
856         task_path = self.scenario_helper.task_path
857         options = self.scenario_helper.options
858         config_path = options['prox_config']
859         config_file = os.path.basename(config_path)
860         config_path = utils.find_relative_file(config_path, task_path)
861         self.additional_files = {}
862
863         try:
864             if options['prox_generate_parameter']:
865                 self.lua = []
866                 self.lua = self.generate_prox_lua_file()
867                 if len(self.lua) > 0:
868                     self.upload_prox_lua("parameters.lua", self.lua)
869         except:  # pylint: disable=bare-except
870             pass
871
872         prox_files = options.get('prox_files', [])
873         if isinstance(prox_files, six.string_types):
874             prox_files = [prox_files]
875         for key_prox_file in prox_files:
876             base_prox_file = os.path.basename(key_prox_file)
877             key_prox_path = utils.find_relative_file(key_prox_file, task_path)
878             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
879             self.additional_files[base_prox_file] = remote_prox_file
880
881         self._prox_config_data = self.generate_prox_config_file(config_path)
882         # copy config to queue so we can read it from traffic_runner process
883         self.config_queue.put(self._prox_config_data)
884         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
885
886     def build_config(self):
887         self.build_config_file()
888
889         options = self.scenario_helper.options
890         prox_args = options['prox_args']
891         tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
892
893         self.pipeline_kwargs = {
894             'tool_path': tool_path,
895             'tool_dir': os.path.dirname(tool_path),
896             'cfg_file': self.remote_path,
897             'args': ' '.join(' '.join([str(k), str(v) if v else ''])
898                              for k, v in prox_args.items())
899         }
900
901         cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
902                         "{args} -f {cfg_file} '")
903         return cmd_template.format(**self.pipeline_kwargs)
904
905
906 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
907 class ProxResourceHelper(ClientResourceHelper):
908
909     RESOURCE_WORD = 'prox'
910
911     PROX_MODE = ""
912
913     WAIT_TIME = 3
914
915     @staticmethod
916     def find_pci(pci, bound_pci):
917         # we have to substring match PCI bus address from the end
918         return any(b.endswith(pci) for b in bound_pci)
919
920     def __init__(self, setup_helper):
921         super(ProxResourceHelper, self).__init__(setup_helper)
922         self.mgmt_interface = self.vnfd_helper.mgmt_interface
923         self._user = self.mgmt_interface["user"]
924         self._ip = self.mgmt_interface["ip"]
925
926         self.done = False
927         self._vpci_to_if_name_map = None
928         self.additional_file = {}
929         self.remote_prox_file_name = None
930         self.lower = None
931         self.upper = None
932         self.step_delta = 1
933         self.step_time = 0.5
934         self._test_type = None
935
936     @property
937     def sut(self):
938         if not self.client:
939             self.client = self._connect()
940         return self.client
941
942     @property
943     def test_type(self):
944         if self._test_type is None:
945             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
946         return self._test_type
947
948     def run_traffic(self, traffic_profile):
949         self._queue.cancel_join_thread()
950         self.lower = 0.0
951         self.upper = 100.0
952
953         traffic_profile.init(self._queue)
954         # this frees up the run_traffic loop
955         self.client_started.value = 1
956
957         while not self._terminated.value:
958             # move it all to traffic_profile
959             self._run_traffic_once(traffic_profile)
960
961     def _run_traffic_once(self, traffic_profile):
962         traffic_profile.execute_traffic(self)
963         if traffic_profile.done:
964             self._queue.put({'done': True})
965             LOG.debug("tg_prox done")
966             self._terminated.value = 1
967
968     # For VNF use ResourceHelper method to collect KPIs directly.
969     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
970     def collect_collectd_kpi(self):
971         return self._collect_resource_kpi()
972
973     def collect_kpi(self):
974         result = super(ProxResourceHelper, self).collect_kpi()
975         # add in collectd kpis manually
976         if result:
977             result['collect_stats'] = self._collect_resource_kpi()
978         return result
979
980     def terminate(self):
981         # should not be called, use VNF terminate
982         raise NotImplementedError()
983
984     def up_post(self):
985         return self.sut  # force connection
986
987     def execute(self, cmd, *args, **kwargs):
988         func = getattr(self.sut, cmd, None)
989         if func:
990             return func(*args, **kwargs)
991         return None
992
993     def _connect(self, client=None):
994         """Run and connect to prox on the remote system """
995         # De-allocating a large amount of hugepages takes some time. If a new
996         # PROX instance is started immediately after killing the previous one,
997         # it might not be able to allocate hugepages, because they are still
998         # being freed. Hence the -w switch.
999         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1000         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1001         # -f ./handle_none-4.cfg"
1002         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1003         #  "; " \
1004         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1005         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1006         # sudo " \
1007         #    + "./build/Prox " + prox_args
1008         # log.debug("Starting PROX with command [%s]", prox_cmd)
1009         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1010         # self._ip, prox_cmd))
1011         if client is None:
1012             client = ProxSocketHelper()
1013
1014         # try connecting to Prox for 60s
1015         for _ in range(RETRY_SECONDS):
1016             time.sleep(RETRY_INTERVAL)
1017             try:
1018                 client.connect(self._ip, PROX_PORT)
1019             except (socket.gaierror, socket.error):
1020                 continue
1021             else:
1022                 return client
1023
1024         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1025         raise Exception(msg.format(self._ip, PROX_PORT))
1026
1027
1028 class ProxDataHelper(object):
1029
1030     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1031         super(ProxDataHelper, self).__init__()
1032         self.vnfd_helper = vnfd_helper
1033         self.sut = sut
1034         self.pkt_size = pkt_size
1035         self.value = value
1036         self.line_speed = line_speed
1037         self.tolerated_loss = tolerated_loss
1038         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1039         self.tsc_hz = None
1040         self.measured_stats = None
1041         self.latency = None
1042         self._totals_and_pps = None
1043         self.result_tuple = None
1044
1045     @property
1046     def totals_and_pps(self):
1047         if self._totals_and_pps is None:
1048             rx_total = tx_total = 0
1049             all_ports = self.sut.multi_port_stats(range(self.port_count))
1050             for port in all_ports:
1051                 rx_total = rx_total + port[1]
1052                 tx_total = tx_total + port[2]
1053             requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1054             self._totals_and_pps = rx_total, tx_total, requested_pps
1055         return self._totals_and_pps
1056
1057     @property
1058     def rx_total(self):
1059         return self.totals_and_pps[0]
1060
1061     @property
1062     def tx_total(self):
1063         return self.totals_and_pps[1]
1064
1065     @property
1066     def pps(self):
1067         return self.totals_and_pps[2]
1068
1069     @property
1070     def samples(self):
1071         samples = {}
1072         ports = []
1073         port_names = []
1074         for port_name, port_num in self.vnfd_helper.ports_iter():
1075             ports.append(port_num)
1076             port_names.append(port_name)
1077
1078         results = self.sut.multi_port_stats(ports)
1079         for result in results:
1080             port_num = result[0]
1081             samples[port_names[port_num]] = {
1082                     "in_packets": result[1],
1083                     "out_packets": result[2]}
1084         return samples
1085
1086     def __enter__(self):
1087         self.check_interface_count()
1088         return self
1089
1090     def __exit__(self, exc_type, exc_val, exc_tb):
1091         self.make_tuple()
1092
1093     def make_tuple(self):
1094         if self.result_tuple:
1095             return
1096
1097         self.result_tuple = ProxTestDataTuple(
1098             self.tolerated_loss,
1099             self.tsc_hz,
1100             self.measured_stats['delta'].rx,
1101             self.measured_stats['delta'].tx,
1102             self.measured_stats['delta'].tsc,
1103             self.latency,
1104             self.rx_total,
1105             self.tx_total,
1106             self.pps,
1107         )
1108         self.result_tuple.log_data()
1109
1110     @contextmanager
1111     def measure_tot_stats(self):
1112         with self.sut.measure_tot_stats() as self.measured_stats:
1113             yield
1114
1115     def check_interface_count(self):
1116         # do this assert in init?  unless we expect interface count to
1117         # change from one run to another run...
1118         assert self.port_count in {1, 2, 4}, \
1119             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1120
1121     def capture_tsc_hz(self):
1122         self.tsc_hz = float(self.sut.hz())
1123
1124     def line_rate_to_pps(self):
1125       return self.port_count * self.line_speed  / BITS_PER_BYTE / (self.pkt_size + 20)
1126
1127 class ProxProfileHelper(object):
1128
1129     __prox_profile_type__ = "Generic"
1130
1131     PROX_CORE_GEN_MODE = "gen"
1132     PROX_CORE_LAT_MODE = "lat"
1133
1134     @classmethod
1135     def get_cls(cls, helper_type):
1136         """Return class of specified type."""
1137         if not helper_type:
1138             return ProxProfileHelper
1139
1140         for profile_helper_class in utils.itersubclasses(cls):
1141             if helper_type == profile_helper_class.__prox_profile_type__:
1142                 return profile_helper_class
1143
1144         return ProxProfileHelper
1145
1146     @classmethod
1147     def make_profile_helper(cls, resource_helper):
1148         return cls.get_cls(resource_helper.test_type)(resource_helper)
1149
1150     def __init__(self, resource_helper):
1151         super(ProxProfileHelper, self).__init__()
1152         self.resource_helper = resource_helper
1153         self._cpu_topology = None
1154         self._test_cores = None
1155         self._latency_cores = None
1156
1157     @property
1158     def cpu_topology(self):
1159         if not self._cpu_topology:
1160             stdout = io.BytesIO()
1161             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1162             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1163         return self._cpu_topology
1164
1165     @property
1166     def test_cores(self):
1167         if not self._test_cores:
1168             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1169         return self._test_cores
1170
1171     @property
1172     def latency_cores(self):
1173         if not self._latency_cores:
1174             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1175         return self._latency_cores
1176
1177     @contextmanager
1178     def traffic_context(self, pkt_size, value):
1179         self.sut.stop_all()
1180         self.sut.reset_stats()
1181         try:
1182             self.sut.set_pkt_size(self.test_cores, pkt_size)
1183             self.sut.set_speed(self.test_cores, value)
1184             self.sut.start_all()
1185             yield
1186         finally:
1187             self.sut.stop_all()
1188
1189     def get_cores(self, mode):
1190         cores = []
1191
1192         for section_name, section in self.setup_helper.prox_config_data:
1193             if not section_name.startswith("core"):
1194                 continue
1195
1196             for key, value in section:
1197                 if key == "mode" and value == mode:
1198                     core_tuple = CoreSocketTuple(section_name)
1199                     core = core_tuple.core_id
1200                     cores.append(core)
1201
1202         return cores
1203
1204     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1205                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1206         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1207                                      value, tolerated_loss, line_speed)
1208
1209         with data_helper, self.traffic_context(pkt_size, value):
1210             with data_helper.measure_tot_stats():
1211                 time.sleep(duration)
1212                 # Getting statistics to calculate PPS at right speed....
1213                 data_helper.capture_tsc_hz()
1214                 data_helper.latency = self.get_latency()
1215
1216         return data_helper.result_tuple, data_helper.samples
1217
1218     def get_latency(self):
1219         """
1220         :return: return lat_min, lat_max, lat_avg
1221         :rtype: list
1222         """
1223
1224         if not self._latency_cores:
1225             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1226
1227         if self._latency_cores:
1228             return self.sut.lat_stats(self._latency_cores)
1229         return []
1230
1231     def terminate(self):
1232         pass
1233
1234     def __getattr__(self, item):
1235         return getattr(self.resource_helper, item)
1236
1237
1238 class ProxMplsProfileHelper(ProxProfileHelper):
1239
1240     __prox_profile_type__ = "MPLS tag/untag"
1241
1242     def __init__(self, resource_helper):
1243         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1244         self._cores_tuple = None
1245
1246     @property
1247     def mpls_cores(self):
1248         if not self._cores_tuple:
1249             self._cores_tuple = self.get_cores_mpls()
1250         return self._cores_tuple
1251
1252     @property
1253     def tagged_cores(self):
1254         return self.mpls_cores[0]
1255
1256     @property
1257     def plain_cores(self):
1258         return self.mpls_cores[1]
1259
1260     def get_cores_mpls(self):
1261         cores_tagged = []
1262         cores_plain = []
1263         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1264             if not section_name.startswith("core"):
1265                 continue
1266
1267             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1268                 continue
1269
1270             for item_key, item_value in section:
1271                 if item_key != 'name':
1272                     continue
1273
1274                 if item_value.startswith("tag"):
1275                     core_tuple = CoreSocketTuple(section_name)
1276                     core_tag = core_tuple.core_id
1277                     cores_tagged.append(core_tag)
1278
1279                 elif item_value.startswith("udp"):
1280                     core_tuple = CoreSocketTuple(section_name)
1281                     core_udp = core_tuple.core_id
1282                     cores_plain.append(core_udp)
1283
1284         return cores_tagged, cores_plain
1285
1286     @contextmanager
1287     def traffic_context(self, pkt_size, value):
1288         self.sut.stop_all()
1289         self.sut.reset_stats()
1290         try:
1291             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1292             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1293             self.sut.set_speed(self.tagged_cores, value)
1294             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1295             self.sut.set_speed(self.plain_cores, value * ratio)
1296             self.sut.start_all()
1297             yield
1298         finally:
1299             self.sut.stop_all()
1300
1301
1302 class ProxBngProfileHelper(ProxProfileHelper):
1303
1304     __prox_profile_type__ = "BNG gen"
1305
1306     def __init__(self, resource_helper):
1307         super(ProxBngProfileHelper, self).__init__(resource_helper)
1308         self._cores_tuple = None
1309
1310     @property
1311     def bng_cores(self):
1312         if not self._cores_tuple:
1313             self._cores_tuple = self.get_cores_gen_bng_qos()
1314         return self._cores_tuple
1315
1316     @property
1317     def cpe_cores(self):
1318         return self.bng_cores[0]
1319
1320     @property
1321     def inet_cores(self):
1322         return self.bng_cores[1]
1323
1324     @property
1325     def arp_cores(self):
1326         return self.bng_cores[2]
1327
1328     @property
1329     def arp_task_cores(self):
1330         return self.bng_cores[3]
1331
1332     @property
1333     def all_rx_cores(self):
1334         return self.latency_cores
1335
1336     def get_cores_gen_bng_qos(self):
1337         cpe_cores = []
1338         inet_cores = []
1339         arp_cores = []
1340         arp_tasks_core = [0]
1341         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1342             if not section_name.startswith("core"):
1343                 continue
1344
1345             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1346                 continue
1347
1348             for item_key, item_value in section:
1349                 if item_key != 'name':
1350                     continue
1351
1352                 if item_value.startswith("cpe"):
1353                     core_tuple = CoreSocketTuple(section_name)
1354                     cpe_core = core_tuple.core_id
1355                     cpe_cores.append(cpe_core)
1356
1357                 elif item_value.startswith("inet"):
1358                     core_tuple = CoreSocketTuple(section_name)
1359                     inet_core = core_tuple.core_id
1360                     inet_cores.append(inet_core)
1361
1362                 elif item_value.startswith("arp"):
1363                     core_tuple = CoreSocketTuple(section_name)
1364                     arp_core = core_tuple.core_id
1365                     arp_cores.append(arp_core)
1366
1367                 # We check the tasks/core separately
1368                 if item_value.startswith("arp_task"):
1369                     core_tuple = CoreSocketTuple(section_name)
1370                     arp_task_core = core_tuple.core_id
1371                     arp_tasks_core.append(arp_task_core)
1372
1373         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1374
1375     @contextmanager
1376     def traffic_context(self, pkt_size, value):
1377         # Tester is sending packets at the required speed already after
1378         # setup_test(). Just get the current statistics, sleep the required
1379         # amount of time and calculate packet loss.
1380         inet_pkt_size = pkt_size
1381         cpe_pkt_size = pkt_size - 24
1382         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1383
1384         curr_up_speed = curr_down_speed = 0
1385         max_up_speed = max_down_speed = value
1386         if ratio < 1:
1387             max_down_speed = value * ratio
1388         else:
1389             max_up_speed = value / ratio
1390
1391         # Initialize cores
1392         self.sut.stop_all()
1393         time.sleep(0.5)
1394
1395         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1396         # wrong.
1397         self.sut.start(self.all_rx_cores)
1398         time.sleep(0.5)
1399         self.sut.stop(self.all_rx_cores)
1400         time.sleep(0.5)
1401         self.sut.reset_stats()
1402
1403         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1404         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1405
1406         self.sut.reset_values(self.cpe_cores)
1407         self.sut.reset_values(self.inet_cores)
1408
1409         # Set correct IP and UDP lengths in packet headers
1410         # CPE
1411         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1412         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1413         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1414         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1415
1416         # INET
1417         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1418         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1419         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1420         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1421         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1422         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1423
1424         # Sending ARP to initialize tables - need a few seconds of generation
1425         # to make sure all CPEs are initialized
1426         LOG.info("Initializing SUT: sending ARP packets")
1427         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1428         self.sut.set_speed(self.inet_cores, curr_up_speed)
1429         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1430         self.sut.start(self.arp_cores)
1431         time.sleep(4)
1432
1433         # Ramp up the transmission speed. First go to the common speed, then
1434         # increase steps for the faster one.
1435         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1436
1437         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1438
1439         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1440             # The min(..., ...) takes care of 1) floating point rounding errors
1441             # that could make curr_*_speed to be slightly greater than
1442             # max_*_speed and 2) max_*_speed not being an exact multiple of
1443             # self._step_delta.
1444             if curr_up_speed < max_up_speed:
1445                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1446             if curr_down_speed < max_down_speed:
1447                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1448
1449             self.sut.set_speed(self.inet_cores, curr_up_speed)
1450             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1451             time.sleep(self.step_time)
1452
1453         LOG.info("Target speeds reached. Starting real test.")
1454
1455         yield
1456
1457         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1458         LOG.info("Test ended. Flushing NIC buffers")
1459         self.sut.start(self.all_rx_cores)
1460         time.sleep(3)
1461         self.sut.stop(self.all_rx_cores)
1462
1463     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1464                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1465         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1466                                      value, tolerated_loss, line_speed)
1467
1468         with data_helper, self.traffic_context(pkt_size, value):
1469             with data_helper.measure_tot_stats():
1470                 time.sleep(duration)
1471                 # Getting statistics to calculate PPS at right speed....
1472                 data_helper.capture_tsc_hz()
1473                 data_helper.latency = self.get_latency()
1474
1475         return data_helper.result_tuple, data_helper.samples
1476
1477
1478 class ProxVpeProfileHelper(ProxProfileHelper):
1479
1480     __prox_profile_type__ = "vPE gen"
1481
1482     def __init__(self, resource_helper):
1483         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1484         self._cores_tuple = None
1485         self._ports_tuple = None
1486
1487     @property
1488     def vpe_cores(self):
1489         if not self._cores_tuple:
1490             self._cores_tuple = self.get_cores_gen_vpe()
1491         return self._cores_tuple
1492
1493     @property
1494     def cpe_cores(self):
1495         return self.vpe_cores[0]
1496
1497     @property
1498     def inet_cores(self):
1499         return self.vpe_cores[1]
1500
1501     @property
1502     def all_rx_cores(self):
1503         return self.latency_cores
1504
1505     @property
1506     def vpe_ports(self):
1507         if not self._ports_tuple:
1508             self._ports_tuple = self.get_ports_gen_vpe()
1509         return self._ports_tuple
1510
1511     @property
1512     def cpe_ports(self):
1513         return self.vpe_ports[0]
1514
1515     @property
1516     def inet_ports(self):
1517         return self.vpe_ports[1]
1518
1519     def get_cores_gen_vpe(self):
1520         cpe_cores = []
1521         inet_cores = []
1522         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1523             if not section_name.startswith("core"):
1524                 continue
1525
1526             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1527                 continue
1528
1529             for item_key, item_value in section:
1530                 if item_key != 'name':
1531                     continue
1532
1533                 if item_value.startswith("cpe"):
1534                     core_tuple = CoreSocketTuple(section_name)
1535                     core_tag = core_tuple.core_id
1536                     cpe_cores.append(core_tag)
1537
1538                 elif item_value.startswith("inet"):
1539                     core_tuple = CoreSocketTuple(section_name)
1540                     inet_core = core_tuple.core_id
1541                     inet_cores.append(inet_core)
1542
1543         return cpe_cores, inet_cores
1544
1545     def get_ports_gen_vpe(self):
1546         cpe_ports = []
1547         inet_ports = []
1548
1549         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1550             if not section_name.startswith("port"):
1551                 continue
1552             tx_port_iter = re.finditer(r'\d+', section_name)
1553             tx_port_no = int(next(tx_port_iter).group(0))
1554
1555             for item_key, item_value in section:
1556                 if item_key != 'name':
1557                     continue
1558
1559                 if item_value.startswith("cpe"):
1560                     cpe_ports.append(tx_port_no)
1561
1562                 elif item_value.startswith("inet"):
1563                     inet_ports.append(tx_port_no)
1564
1565         return cpe_ports, inet_ports
1566
1567     @contextmanager
1568     def traffic_context(self, pkt_size, value):
1569         # Calculate the target upload and download speed. The upload and
1570         # download packets have different packet sizes, so in order to get
1571         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1572         # of the packet sizes.
1573         cpe_pkt_size = pkt_size
1574         inet_pkt_size = pkt_size - 4
1575         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1576
1577         curr_up_speed = curr_down_speed = 0
1578         max_up_speed = max_down_speed = value
1579         if ratio < 1:
1580             max_down_speed = value * ratio
1581         else:
1582             max_up_speed = value / ratio
1583
1584         # Adjust speed when multiple cores per port are used to generate traffic
1585         if len(self.cpe_ports) != len(self.cpe_cores):
1586             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1587         if len(self.inet_ports) != len(self.inet_cores):
1588             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1589
1590         # Initialize cores
1591         self.sut.stop_all()
1592         time.sleep(2)
1593
1594         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1595         # wrong.
1596         self.sut.start(self.all_rx_cores)
1597         time.sleep(2)
1598         self.sut.stop(self.all_rx_cores)
1599         time.sleep(2)
1600         self.sut.reset_stats()
1601
1602         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1603         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1604
1605         self.sut.reset_values(self.cpe_cores)
1606         self.sut.reset_values(self.inet_cores)
1607
1608         # Set correct IP and UDP lengths in packet headers
1609         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1610         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1611         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1612         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1613
1614         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1615         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1616         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1617         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1618
1619         self.sut.set_speed(self.inet_cores, curr_up_speed)
1620         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1621
1622         # Ramp up the transmission speed. First go to the common speed, then
1623         # increase steps for the faster one.
1624         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1625
1626         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1627
1628         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1629             # The min(..., ...) takes care of 1) floating point rounding errors
1630             # that could make curr_*_speed to be slightly greater than
1631             # max_*_speed and 2) max_*_speed not being an exact multiple of
1632             # self._step_delta.
1633             if curr_up_speed < max_up_speed:
1634                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1635             if curr_down_speed < max_down_speed:
1636                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1637
1638             self.sut.set_speed(self.inet_cores, curr_up_speed)
1639             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1640             time.sleep(self.step_time)
1641
1642         LOG.info("Target speeds reached. Starting real test.")
1643
1644         yield
1645
1646         self.sut.stop(self.cpe_cores + self.inet_cores)
1647         LOG.info("Test ended. Flushing NIC buffers")
1648         self.sut.start(self.all_rx_cores)
1649         time.sleep(3)
1650         self.sut.stop(self.all_rx_cores)
1651
1652     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1653                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1654         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1655                                      value, tolerated_loss, line_speed)
1656
1657         with data_helper, self.traffic_context(pkt_size, value):
1658             with data_helper.measure_tot_stats():
1659                 time.sleep(duration)
1660                 # Getting statistics to calculate PPS at right speed....
1661                 data_helper.capture_tsc_hz()
1662                 data_helper.latency = self.get_latency()
1663
1664         return data_helper.result_tuple, data_helper.samples
1665
1666
1667 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1668
1669     __prox_profile_type__ = "lwAFTR gen"
1670
1671     def __init__(self, resource_helper):
1672         super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1673         self._cores_tuple = None
1674         self._ports_tuple = None
1675         self.step_delta = 5
1676         self.step_time = 0.5
1677
1678     @property
1679     def _lwaftr_cores(self):
1680         if not self._cores_tuple:
1681             self._cores_tuple = self._get_cores_gen_lwaftr()
1682         return self._cores_tuple
1683
1684     @property
1685     def tun_cores(self):
1686         return self._lwaftr_cores[0]
1687
1688     @property
1689     def inet_cores(self):
1690         return self._lwaftr_cores[1]
1691
1692     @property
1693     def _lwaftr_ports(self):
1694         if not self._ports_tuple:
1695             self._ports_tuple = self._get_ports_gen_lw_aftr()
1696         return self._ports_tuple
1697
1698     @property
1699     def tun_ports(self):
1700         return self._lwaftr_ports[0]
1701
1702     @property
1703     def inet_ports(self):
1704         return self._lwaftr_ports[1]
1705
1706     @property
1707     def all_rx_cores(self):
1708         return self.latency_cores
1709
1710     def _get_cores_gen_lwaftr(self):
1711         tun_cores = []
1712         inet_cores = []
1713         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1714             if not section_name.startswith("core"):
1715                 continue
1716
1717             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1718                 continue
1719
1720             core_tuple = CoreSocketTuple(section_name)
1721             core_tag = core_tuple.core_id
1722             for item_value in (v for k, v in section if k == 'name'):
1723                 if item_value.startswith('tun'):
1724                     tun_cores.append(core_tag)
1725                 elif item_value.startswith('inet'):
1726                     inet_cores.append(core_tag)
1727
1728         return tun_cores, inet_cores
1729
1730     def _get_ports_gen_lw_aftr(self):
1731         tun_ports = []
1732         inet_ports = []
1733
1734         re_port = re.compile(r'port (\d+)')
1735         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1736             match = re_port.search(section_name)
1737             if not match:
1738                 continue
1739
1740             tx_port_no = int(match.group(1))
1741             for item_value in (v for k, v in section if k == 'name'):
1742                 if item_value.startswith('lwB4'):
1743                     tun_ports.append(tx_port_no)
1744                 elif item_value.startswith('inet'):
1745                     inet_ports.append(tx_port_no)
1746
1747         return tun_ports, inet_ports
1748
1749     @staticmethod
1750     def _resize(len1, len2):
1751         if len1 == len2:
1752             return 1.0
1753         return 1.0 * len1 / len2
1754
1755     @contextmanager
1756     def traffic_context(self, pkt_size, value):
1757         # Tester is sending packets at the required speed already after
1758         # setup_test(). Just get the current statistics, sleep the required
1759         # amount of time and calculate packet loss.
1760         tun_pkt_size = pkt_size
1761         inet_pkt_size = pkt_size - 40
1762         ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1763
1764         curr_up_speed = curr_down_speed = 0
1765         max_up_speed = max_down_speed = value
1766
1767         max_up_speed = value / ratio
1768
1769         # Adjust speed when multiple cores per port are used to generate traffic
1770         if len(self.tun_ports) != len(self.tun_cores):
1771             max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1772         if len(self.inet_ports) != len(self.inet_cores):
1773             max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1774
1775         # Initialize cores
1776         self.sut.stop_all()
1777         time.sleep(0.5)
1778
1779         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1780         # wrong.
1781         self.sut.start(self.all_rx_cores)
1782         time.sleep(0.5)
1783         self.sut.stop(self.all_rx_cores)
1784         time.sleep(0.5)
1785         self.sut.reset_stats()
1786
1787         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1788         self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1789
1790         self.sut.reset_values(self.tun_cores)
1791         self.sut.reset_values(self.inet_cores)
1792
1793         # Set correct IP and UDP lengths in packet headers
1794         # tun
1795         # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1796         self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1797         # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1798         self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1799         # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1800         self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1801
1802         # INET
1803         # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1804         self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1805         # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1806         self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1807
1808         LOG.info("Initializing SUT: sending lwAFTR packets")
1809         self.sut.set_speed(self.inet_cores, curr_up_speed)
1810         self.sut.set_speed(self.tun_cores, curr_down_speed)
1811         time.sleep(4)
1812
1813         # Ramp up the transmission speed. First go to the common speed, then
1814         # increase steps for the faster one.
1815         self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1816
1817         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1818
1819         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1820             # The min(..., ...) takes care of 1) floating point rounding errors
1821             # that could make curr_*_speed to be slightly greater than
1822             # max_*_speed and 2) max_*_speed not being an exact multiple of
1823             # self._step_delta.
1824             if curr_up_speed < max_up_speed:
1825                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1826             if curr_down_speed < max_down_speed:
1827                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1828
1829             self.sut.set_speed(self.inet_cores, curr_up_speed)
1830             self.sut.set_speed(self.tun_cores, curr_down_speed)
1831             time.sleep(self.step_time)
1832
1833         LOG.info("Target speeds reached. Starting real test.")
1834
1835         yield
1836
1837         self.sut.stop(self.tun_cores + self.inet_cores)
1838         LOG.info("Test ended. Flushing NIC buffers")
1839         self.sut.start(self.all_rx_cores)
1840         time.sleep(3)
1841         self.sut.stop(self.all_rx_cores)
1842
1843     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1844                  line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1845         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1846                                      value, tolerated_loss, line_speed)
1847
1848         with data_helper, self.traffic_context(pkt_size, value):
1849             with data_helper.measure_tot_stats():
1850                 time.sleep(duration)
1851                 # Getting statistics to calculate PPS at right speed....
1852                 data_helper.capture_tsc_hz()
1853                 data_helper.latency = self.get_latency()
1854
1855         return data_helper.result_tuple, data_helper.samples