Merge "libyajl: install from pkg"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
30
31 import six
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
34
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
41
42 PROX_PORT = 8474
43
44 SECTION_NAME = 0
45 SECTION_CONTENTS = 1
46
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
49
50 TEN_GIGABIT = 1e10
51 BITS_PER_BYTE = 8
52 RETRY_SECONDS = 60
53 RETRY_INTERVAL = 1
54
55 CONFIGURATION_OPTIONS = (
56     # dict key           section     key               default value
57     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58     ('testDuration', 'general', 'test_duration', 5.0),
59     ('testPrecision', 'general', 'test_precision', 1.0),
60     ('tests', 'general', 'tests', None),
61     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62
63     ('logFile', 'logging', 'file', 'dats.log'),
64     ('logDateFormat', 'logging', 'datefmt', None),
65     ('logLevel', 'logging', 'level', 'INFO'),
66     ('logOverwrite', 'logging', 'overwrite', 1),
67
68     ('testerIp', 'tester', 'ip', None),
69     ('testerUser', 'tester', 'user', 'root'),
70     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73     ('testerSocketId', 'tester', 'socket_id', 0),
74
75     ('sutIp', 'sut', 'ip', None),
76     ('sutUser', 'sut', 'user', 'root'),
77     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80     ('sutSocketId', 'sut', 'socket_id', 0),
81 )
82
83
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117     def __new__(cls, *args):
118         try:
119             assert args[0] is not str(args[0])
120             args = tuple(args[0])
121         except (AssertionError, IndexError, TypeError):
122             pass
123
124         return super(TotStatsTuple, cls).__new__(cls, *args)
125
126
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128                                                         'delta_tx,delta_tsc,'
129                                                         'latency,rx_total,tx_total,pps')):
130     @property
131     def pkt_loss(self):
132         try:
133             return 1e2 * self.drop_total / float(self.tx_total)
134         except ZeroDivisionError:
135             return 100.0
136
137     @property
138     def mpps(self):
139         # calculate the effective throughput in Mpps
140         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141
142     @property
143     def can_be_lost(self):
144         return int(self.tx_total * self.tolerated / 1e2)
145
146     @property
147     def drop_total(self):
148         return self.tx_total - self.rx_total
149
150     @property
151     def success(self):
152         return self.drop_total <= self.can_be_lost
153
154     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
155         if pkt_loss is None:
156             pkt_loss = self.pkt_loss
157
158         if port_samples is None:
159             port_samples = {}
160
161         latency_keys = [
162             "LatencyMin",
163             "LatencyMax",
164             "LatencyAvg",
165         ]
166
167         samples = {
168             "Throughput": self.mpps,
169             "DropPackets": pkt_loss,
170             "CurrentDropPackets": pkt_loss,
171             "TxThroughput": self.pps / 1e6,
172             "RxThroughput": self.mpps,
173             "PktSize": pkt_size,
174         }
175         if port_samples:
176             samples.update(port_samples)
177
178         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
179         return samples
180
181     def log_data(self, logger=None):
182         if logger is None:
183             logger = LOG
184
185         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
188
189
190 class PacketDump(object):
191     @staticmethod
192     def assert_func(func, value1, value2, template=None):
193         assert func(value1, value2), template.format(value1, value2)
194
195     def __init__(self, port_id, data_len, payload):
196         template = "Packet dump has specified length {}, but payload is {} bytes long"
197         self.assert_func(operator.eq, data_len, len(payload), template)
198         self._port_id = port_id
199         self._data_len = data_len
200         self._payload = payload
201
202     @property
203     def port_id(self):
204         """Get the port id of the packet dump"""
205         return self._port_id
206
207     @property
208     def data_len(self):
209         """Get the length of the data received"""
210         return self._data_len
211
212     def __str__(self):
213         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
214
215     def payload(self, start=None, end=None):
216         """Get part of the payload as a list of ordinals.
217
218         Returns a list of byte values, matching the contents of the packet dump.
219         Optional start and end parameters can be specified to retrieve only a
220         part of the packet contents.
221
222         The number of elements in the list is equal to end - start + 1, so end
223         is the offset of the last character.
224
225         Args:
226             start (pos. int): the starting offset in the payload. If it is not
227                 specified or None, offset 0 is assumed.
228             end (pos. int): the ending offset of the payload. If it is not
229                 specified or None, the contents until the end of the packet are
230                 returned.
231
232         Returns:
233             [int, int, ...]. Each int represents the ordinal value of a byte in
234             the packet payload.
235         """
236         if start is None:
237             start = 0
238
239         if end is None:
240             end = self.data_len - 1
241
242         # Bounds checking on offsets
243         template = "Start offset must be non-negative"
244         self.assert_func(operator.ge, start, 0, template)
245
246         template = "End offset must be less than {1}"
247         self.assert_func(operator.lt, end, self.data_len, template)
248
249         # Adjust for splice operation: end offset must be 1 more than the offset
250         # of the last desired character.
251         end += 1
252
253         return self._payload[start:end]
254
255
256 class ProxSocketHelper(object):
257
258     def __init__(self, sock=None):
259         """ creates new prox instance """
260         super(ProxSocketHelper, self).__init__()
261
262         if sock is None:
263             sock = socket.socket()
264
265         self._sock = sock
266         self._pkt_dumps = []
267         self.master_stats = None
268
269     def connect(self, ip, port):
270         """Connect to the prox instance on the remote system"""
271         self._sock.connect((ip, port))
272
273     def get_socket(self):
274         """ get the socket connected to the remote instance """
275         return self._sock
276
277     def _parse_socket_data(self, decoded_data, pkt_dump_only):
278         def get_newline_index():
279             return decoded_data.find('\n', index)
280
281         ret_str = ''
282         index = 0
283         for newline_index in iter(get_newline_index, -1):
284             ret_str = decoded_data[index:newline_index]
285
286             try:
287                 mode, port_id, data_len = ret_str.split(',', 2)
288             except ValueError:
289                 mode, port_id, data_len = None, None, None
290
291             if mode != 'pktdump':
292                 # Regular 1-line message. Stop reading from the socket.
293                 LOG.debug("Regular response read")
294                 return ret_str
295
296             LOG.debug("Packet dump header read: [%s]", ret_str)
297
298             # The line is a packet dump header. Parse it, read the
299             # packet payload, store the dump for later retrieval.
300             # Skip over the packet dump and continue processing: a
301             # 1-line response may follow the packet dump.
302
303             data_len = int(data_len)
304             data_start = newline_index + 1  # + 1 to skip over \n
305             data_end = data_start + data_len
306             sub_data = decoded_data[data_start:data_end]
307             pkt_payload = array.array('B', (ord(v) for v in sub_data))
308             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309             self._pkt_dumps.append(pkt_dump)
310
311             if pkt_dump_only:
312                 # Return boolean instead of string to signal
313                 # successful reception of the packet dump.
314                 LOG.debug("Packet dump stored, returning")
315                 return True
316
317             index = data_end + 1
318
319         return ret_str
320
321     def get_data(self, pkt_dump_only=False, timeout=1):
322         """ read data from the socket """
323
324         # This method behaves slightly differently depending on whether it is
325         # called to read the response to a command (pkt_dump_only = 0) or if
326         # it is called specifically to read a packet dump (pkt_dump_only = 1).
327         #
328         # Packet dumps look like:
329         #   pktdump,<port_id>,<data_len>\n
330         #   <packet contents as byte array>\n
331         # This means the total packet dump message consists of 2 lines instead
332         # of 1 line.
333         #
334         # - Response for a command (pkt_dump_only = 0):
335         #   1) Read response from the socket until \n (end of message)
336         #   2a) If the response is a packet dump header (starts with "pktdump,"):
337         #     - Read the packet payload and store the packet dump for later
338         #       retrieval.
339         #     - Reset the state and restart from 1). Eventually state 2b) will
340         #       be reached and the function will return.
341         #   2b) If the response is not a packet dump:
342         #     - Return the received message as a string
343         #
344         # - Explicit request to read a packet dump (pkt_dump_only = 1):
345         #   - Read the dump header and payload
346         #   - Store the packet dump for later retrieval
347         #   - Return True to signify a packet dump was successfully read
348
349         def is_ready():
350             # recv() is blocking, so avoid calling it when no data is waiting.
351             ready = select.select([self._sock], [], [], timeout)
352             return bool(ready[0])
353
354         status = False
355         ret_str = ""
356         for status in iter(is_ready, False):
357             decoded_data = self._sock.recv(256).decode('utf-8')
358             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
359
360         LOG.debug("Received data from socket: [%s]", ret_str)
361         return ret_str if status else ''
362
363     def put_command(self, to_send):
364         """ send data to the remote instance """
365         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
366         try:
367             # TODO: sendall will block, we need a timeout
368             self._sock.sendall(to_send.encode('utf-8'))
369         except:
370             pass
371
372     def get_packet_dump(self):
373         """ get the next packet dump """
374         if self._pkt_dumps:
375             return self._pkt_dumps.pop(0)
376         return None
377
378     def stop_all_reset(self):
379         """ stop the remote instance and reset stats """
380         LOG.debug("Stop all and reset stats")
381         self.stop_all()
382         self.reset_stats()
383
384     def stop_all(self):
385         """ stop all cores on the remote instance """
386         LOG.debug("Stop all")
387         self.put_command("stop all\n")
388         time.sleep(3)
389
390     def stop(self, cores, task=''):
391         """ stop specific cores on the remote instance """
392         LOG.debug("Stopping cores %s", cores)
393         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
394         time.sleep(3)
395
396     def start_all(self):
397         """ start all cores on the remote instance """
398         LOG.debug("Start all")
399         self.put_command("start all\n")
400
401     def start(self, cores):
402         """ start specific cores on the remote instance """
403         LOG.debug("Starting cores %s", cores)
404         self.put_command("start {}\n".format(join_non_strings(',', cores)))
405         time.sleep(3)
406
407     def reset_stats(self):
408         """ reset the statistics on the remote instance """
409         LOG.debug("Reset stats")
410         self.put_command("reset stats\n")
411         time.sleep(1)
412
413     def _run_template_over_cores(self, template, cores, *args):
414         for core in cores:
415             self.put_command(template.format(core, *args))
416
417     def set_pkt_size(self, cores, pkt_size):
418         """ set the packet size to generate on the remote instance """
419         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420         pkt_size -= 4
421         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422         time.sleep(1)
423
424     def set_value(self, cores, offset, value, length):
425         """ set value on the remote instance """
426         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427         LOG.debug(msg, cores, value, length, offset)
428         template = "set value {} 0 {} {} {}\n"
429         self._run_template_over_cores(template, cores, offset, value, length)
430
431     def reset_values(self, cores):
432         """ reset values on the remote instance """
433         LOG.debug("Set value for core(s) %s", cores)
434         self._run_template_over_cores("reset values {} 0\n", cores)
435
436     def set_speed(self, cores, speed, tasks=None):
437         """ set speed on the remote instance """
438         if tasks is None:
439             tasks = [0] * len(cores)
440         elif len(tasks) != len(cores):
441             LOG.error("set_speed: cores and tasks must have the same len")
442         LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
443         for (core, task) in list(zip(cores, tasks)):
444             self.put_command("speed {} {} {}\n".format(core, task, speed))
445
446     def slope_speed(self, cores_speed, duration, n_steps=0):
447         """will start to increase speed from 0 to N where N is taken from
448         a['speed'] for each a in cores_speed"""
449         # by default, each step will take 0.5 sec
450         if n_steps == 0:
451             n_steps = duration * 2
452
453         private_core_data = []
454         step_duration = float(duration) / n_steps
455         for core_data in cores_speed:
456             target = float(core_data['speed'])
457             private_core_data.append({
458                 'cores': core_data['cores'],
459                 'zero': 0,
460                 'delta': target / n_steps,
461                 'current': 0,
462                 'speed': target,
463             })
464
465         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
466         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
467             time.sleep(step_duration)
468             for core_data in private_core_data:
469                 core_data['current'] = core_data[key1] + core_data[key2]
470                 self.set_speed(core_data['cores'], core_data['current'])
471
472     def set_pps(self, cores, pps, pkt_size):
473         """ set packets per second for specific cores on the remote instance """
474         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
475         LOG.debug(msg, cores, pps, pkt_size)
476
477         # speed in percent of line-rate
478         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
479         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
480
481     def lat_stats(self, cores, task=0):
482         """Get the latency statistics from the remote system"""
483         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
484         lat_min = {}
485         lat_max = {}
486         lat_avg = {}
487         for core in cores:
488             self.put_command("lat stats {} {} \n".format(core, task))
489             ret = self.get_data()
490
491             try:
492                 lat_min[core], lat_max[core], lat_avg[core] = \
493                     tuple(int(n) for n in ret.split(",")[:3])
494
495             except (AttributeError, ValueError, TypeError):
496                 pass
497
498         return lat_min, lat_max, lat_avg
499
500     def get_all_tot_stats(self):
501         self.put_command("tot stats\n")
502         all_stats_str = self.get_data().split(",")
503         if len(all_stats_str) != 4:
504             all_stats = [0] * 4
505             return all_stats
506         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
507         self.master_stats = all_stats
508         return all_stats
509
510     def hz(self):
511         return self.get_all_tot_stats()[3]
512
513     # Deprecated
514     # TODO: remove
515     def rx_stats(self, cores, task=0):
516         return self.core_stats(cores, task)
517
518     def core_stats(self, cores, task=0):
519         """Get the receive statistics from the remote system"""
520         rx = tx = drop = tsc = 0
521         for core in cores:
522             self.put_command("core stats {} {}\n".format(core, task))
523             ret = self.get_data().split(",")
524             rx += int(ret[0])
525             tx += int(ret[1])
526             drop += int(ret[2])
527             tsc = int(ret[3])
528         return rx, tx, drop, tsc
529
530     def port_stats(self, ports):
531         """get counter values from a specific port"""
532         tot_result = [0] * 12
533         for port in ports:
534             self.put_command("port_stats {}\n".format(port))
535             ret = [try_int(s, 0) for s in self.get_data().split(",")]
536             tot_result = [sum(x) for x in zip(tot_result, ret)]
537         return tot_result
538
539     @contextmanager
540     def measure_tot_stats(self):
541         start = self.get_all_tot_stats()
542         container = {'start_tot': start}
543         try:
544             yield container
545         finally:
546             container['end_tot'] = end = self.get_all_tot_stats()
547
548         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
549
550     def tot_stats(self):
551         """Get the total statistics from the remote system"""
552         stats = self.get_all_tot_stats()
553         return stats[:3]
554
555     def tot_ierrors(self):
556         """Get the total ierrors from the remote system"""
557         self.put_command("tot ierrors tot\n")
558         recv = self.get_data().split(',')
559         tot_ierrors = int(recv[0])
560         tsc = int(recv[0])
561         return tot_ierrors, tsc
562
563     def set_count(self, count, cores):
564         """Set the number of packets to send on the specified core"""
565         self._run_template_over_cores("count {} 0 {}\n", cores, count)
566
567     def dump_rx(self, core_id, task_id=0, count=1):
568         """Activate dump on rx on the specified core"""
569         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
570         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
571         time.sleep(1.5)  # Give PROX time to set up packet dumping
572
573     def quit(self):
574         self.stop_all()
575         self._quit()
576         self.force_quit()
577
578     def _quit(self):
579         """ stop all cores on the remote instance """
580         LOG.debug("Quit prox")
581         self.put_command("quit\n")
582         time.sleep(3)
583
584     def force_quit(self):
585         """ stop all cores on the remote instance """
586         LOG.debug("Force Quit prox")
587         self.put_command("quit_force\n")
588         time.sleep(3)
589
590
591 _LOCAL_OBJECT = object()
592
593
594 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
595     # the actual app is lowercase
596     APP_NAME = 'prox'
597     # not used for Prox but added for consistency
598     VNF_TYPE = "PROX"
599
600     LUA_PARAMETER_NAME = ""
601     LUA_PARAMETER_PEER = {
602         "gen": "sut",
603         "sut": "gen",
604     }
605
606     CONFIG_QUEUE_TIMEOUT = 120
607
608     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
609         self.remote_path = None
610         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
611         self.remote_prox_file_name = None
612         self._prox_config_data = None
613         self.additional_files = {}
614         self.config_queue = Queue()
615         # allow_exit_without_flush
616         self.config_queue.cancel_join_thread()
617         self._global_section = None
618
619     @property
620     def prox_config_data(self):
621         if self._prox_config_data is None:
622             # this will block, but it needs too
623             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
624         return self._prox_config_data
625
626     @property
627     def global_section(self):
628         if self._global_section is None and self.prox_config_data:
629             self._global_section = self.find_section("global")
630         return self._global_section
631
632     def find_section(self, name, default=_LOCAL_OBJECT):
633         result = next((value for key, value in self.prox_config_data if key == name), default)
634         if result is _LOCAL_OBJECT:
635             raise KeyError('{} not found in Prox config'.format(name))
636         return result
637
638     def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
639         section = self.find_section(section_name, [])
640         result = next((value for key, value in section if key == section_key), default)
641         if result is _LOCAL_OBJECT:
642             template = '{} not found in {} section of Prox config'
643             raise KeyError(template.format(section_key, section_name))
644         return result
645
646     def _build_pipeline_kwargs(self):
647         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
648         self.pipeline_kwargs = {
649             'tool_path': tool_path,
650             'tool_dir': os.path.dirname(tool_path),
651         }
652
653     def copy_to_target(self, config_file_path, prox_file):
654         remote_path = os.path.join("/tmp", prox_file)
655         self.ssh_helper.put(config_file_path, remote_path)
656         return remote_path
657
658     @staticmethod
659     def _get_tx_port(section, sections):
660         iface_port = [-1]
661         for item in sections[section]:
662             if item[0] == "tx port":
663                 iface_port = re.findall(r'\d+', item[1])
664                 # do we want the last one?
665                 #   if yes, then can we reverse?
666         return int(iface_port[0])
667
668     @staticmethod
669     def _replace_quoted_with_value(quoted, value, count=1):
670         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
671         return new_string
672
673     def _insert_additional_file(self, value):
674         file_str = value.split('"')
675         base_name = os.path.basename(file_str[1])
676         file_str[1] = self.additional_files[base_name]
677         return '"'.join(file_str)
678
679     def generate_prox_config_file(self, config_path):
680         sections = []
681         prox_config = ConfigParser(config_path, sections)
682         prox_config.parse()
683
684         # Ensure MAC is set "hardware"
685         all_ports = self.vnfd_helper.port_pairs.all_ports
686         # use dpdk port number
687         for port_name in all_ports:
688             port_num = self.vnfd_helper.port_num(port_name)
689             port_section_name = "port {}".format(port_num)
690             for section_name, section in sections:
691                 if port_section_name != section_name:
692                     continue
693
694                 for index, section_data in enumerate(section):
695                     if section_data[0] == "mac":
696                         section_data[1] = "hardware"
697
698         # search for dst mac
699         for _, section in sections:
700             # for index, (item_key, item_val) in enumerate(section):
701             for index, section_data in enumerate(section):
702                 item_key, item_val = section_data
703                 if item_val.startswith("@@dst_mac"):
704                     tx_port_iter = re.finditer(r'\d+', item_val)
705                     tx_port_no = int(next(tx_port_iter).group(0))
706                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
707                     mac = intf["virtual-interface"]["dst_mac"]
708                     section_data[1] = mac.replace(":", " ", 6)
709
710                 if item_key == "dst mac" and item_val.startswith("@@"):
711                     tx_port_iter = re.finditer(r'\d+', item_val)
712                     tx_port_no = int(next(tx_port_iter).group(0))
713                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
714                     mac = intf["virtual-interface"]["dst_mac"]
715                     section_data[1] = mac
716
717         # if addition file specified in prox config
718         if not self.additional_files:
719             return sections
720
721         for section_name, section in sections:
722             for index, section_data in enumerate(section):
723                 try:
724                     if section_data[0].startswith("dofile"):
725                         section_data[0] = self._insert_additional_file(section_data[0])
726
727                     if section_data[1].startswith("dofile"):
728                         section_data[1] = self._insert_additional_file(section_data[1])
729                 except:
730                     pass
731
732         return sections
733
734     @staticmethod
735     def write_prox_lua(lua_config):
736         """
737         Write an .ini-format config file for PROX (parameters.lua)
738         PROX does not allow a space before/after the =, so we need
739         a custom method
740         """
741         out = []
742         for key in lua_config:
743             value = '"' + lua_config[key] + '"'
744             if key == "__name__":
745                 continue
746             if value is not None and value != '@':
747                 key = "=".join((key, str(value).replace('\n', '\n\t')))
748                 out.append(key)
749             else:
750                 key = str(key).replace('\n', '\n\t')
751                 out.append(key)
752         return os.linesep.join(out)
753
754     @staticmethod
755     def write_prox_config(prox_config):
756         """
757         Write an .ini-format config file for PROX
758         PROX does not allow a space before/after the =, so we need
759         a custom method
760         """
761         out = []
762         for i, (section_name, section) in enumerate(prox_config):
763             out.append("[{}]".format(section_name))
764             for index, item in enumerate(section):
765                 key, value = item
766                 if key == "__name__":
767                     continue
768                 if value is not None and value != '@':
769                     key = "=".join((key, str(value).replace('\n', '\n\t')))
770                     out.append(key)
771                 else:
772                     key = str(key).replace('\n', '\n\t')
773                     out.append(key)
774         return os.linesep.join(out)
775
776     def put_string_to_file(self, s, remote_path):
777         file_obj = cStringIO(s)
778         self.ssh_helper.put_file_obj(file_obj, remote_path)
779         return remote_path
780
781     def generate_prox_lua_file(self):
782         p = OrderedDict()
783         all_ports = self.vnfd_helper.port_pairs.all_ports
784         for port_name in all_ports:
785             port_num = self.vnfd_helper.port_num(port_name)
786             intf = self.vnfd_helper.find_interface(name=port_name)
787             vintf = intf['virtual-interface']
788             p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
789             p["src_mac{0}".format(port_num)] = vintf["local_mac"]
790
791         return p
792
793     def upload_prox_lua(self, config_file, lua_data):
794         # prox can't handle spaces around ' = ' so use custom method
795         out = StringIO(self.write_prox_lua(lua_data))
796         out.seek(0)
797         remote_path = os.path.join("/tmp", config_file)
798         self.ssh_helper.put_file_obj(out, remote_path)
799
800         return remote_path
801
802     def upload_prox_config(self, config_file, prox_config_data):
803         # prox can't handle spaces around ' = ' so use custom method
804         out = StringIO(self.write_prox_config(prox_config_data))
805         out.seek(0)
806         remote_path = os.path.join("/tmp", config_file)
807         self.ssh_helper.put_file_obj(out, remote_path)
808
809         return remote_path
810
811     def build_config_file(self):
812         task_path = self.scenario_helper.task_path
813         options = self.scenario_helper.options
814         config_path = options['prox_config']
815         config_file = os.path.basename(config_path)
816         config_path = find_relative_file(config_path, task_path)
817         self.additional_files = {}
818
819         try:
820             if options['prox_generate_parameter']:
821                 self.lua = []
822                 self.lua = self.generate_prox_lua_file()
823                 if len(self.lua) > 0:
824                     self.upload_prox_lua("parameters.lua", self.lua)
825         except:
826             pass
827
828         prox_files = options.get('prox_files', [])
829         if isinstance(prox_files, six.string_types):
830             prox_files = [prox_files]
831         for key_prox_file in prox_files:
832             base_prox_file = os.path.basename(key_prox_file)
833             key_prox_path = find_relative_file(key_prox_file, task_path)
834             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
835             self.additional_files[base_prox_file] = remote_prox_file
836
837         self._prox_config_data = self.generate_prox_config_file(config_path)
838         # copy config to queue so we can read it from traffic_runner process
839         self.config_queue.put(self._prox_config_data)
840         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
841
842     def build_config(self):
843         self.build_config_file()
844
845         options = self.scenario_helper.options
846
847         prox_args = options['prox_args']
848         LOG.info("Provision and start the %s", self.APP_NAME)
849         self._build_pipeline_kwargs()
850         self.pipeline_kwargs["args"] = " ".join(
851             " ".join([k, v if v else ""]) for k, v in prox_args.items())
852         self.pipeline_kwargs["cfg_file"] = self.remote_path
853
854         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
855         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
856         return prox_cmd
857
858
859 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
860 class ProxResourceHelper(ClientResourceHelper):
861
862     RESOURCE_WORD = 'prox'
863
864     PROX_MODE = ""
865
866     WAIT_TIME = 3
867
868     @staticmethod
869     def find_pci(pci, bound_pci):
870         # we have to substring match PCI bus address from the end
871         return any(b.endswith(pci) for b in bound_pci)
872
873     def __init__(self, setup_helper):
874         super(ProxResourceHelper, self).__init__(setup_helper)
875         self.mgmt_interface = self.vnfd_helper.mgmt_interface
876         self._user = self.mgmt_interface["user"]
877         self._ip = self.mgmt_interface["ip"]
878
879         self.done = False
880         self._vpci_to_if_name_map = None
881         self.additional_file = {}
882         self.remote_prox_file_name = None
883         self.lower = None
884         self.upper = None
885         self.step_delta = 1
886         self.step_time = 0.5
887         self._test_type = None
888
889     @property
890     def sut(self):
891         if not self.client:
892             self.client = self._connect()
893         return self.client
894
895     @property
896     def test_type(self):
897         if self._test_type is None:
898             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
899         return self._test_type
900
901     def run_traffic(self, traffic_profile):
902         self._queue.cancel_join_thread()
903         self.lower = 0.0
904         self.upper = 100.0
905
906         traffic_profile.init(self._queue)
907         # this frees up the run_traffic loop
908         self.client_started.value = 1
909
910         while not self._terminated.value:
911             # move it all to traffic_profile
912             self._run_traffic_once(traffic_profile)
913
914     def _run_traffic_once(self, traffic_profile):
915         traffic_profile.execute_traffic(self)
916         if traffic_profile.done:
917             self._queue.put({'done': True})
918             LOG.debug("tg_prox done")
919             self._terminated.value = 1
920
921     # For VNF use ResourceHelper method to collect KPIs directly.
922     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
923     def collect_collectd_kpi(self):
924         return self._collect_resource_kpi()
925
926     def collect_kpi(self):
927         result = super(ProxResourceHelper, self).collect_kpi()
928         # add in collectd kpis manually
929         if result:
930             result['collect_stats'] = self._collect_resource_kpi()
931         return result
932
933     def terminate(self):
934         # should not be called, use VNF terminate
935         raise NotImplementedError()
936
937     def up_post(self):
938         return self.sut  # force connection
939
940     def execute(self, cmd, *args, **kwargs):
941         func = getattr(self.sut, cmd, None)
942         if func:
943             return func(*args, **kwargs)
944
945     def _connect(self, client=None):
946         """Run and connect to prox on the remote system """
947         # De-allocating a large amount of hugepages takes some time. If a new
948         # PROX instance is started immediately after killing the previous one,
949         # it might not be able to allocate hugepages, because they are still
950         # being freed. Hence the -w switch.
951         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
952         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
953         # -f ./handle_none-4.cfg"
954         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
955         #  "; " \
956         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
957         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
958         # sudo " \
959         #    + "./build/Prox " + prox_args
960         # log.debug("Starting PROX with command [%s]", prox_cmd)
961         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
962         # self._ip, prox_cmd))
963         if client is None:
964             client = ProxSocketHelper()
965
966         # try connecting to Prox for 60s
967         for _ in range(RETRY_SECONDS):
968             time.sleep(RETRY_INTERVAL)
969             try:
970                 client.connect(self._ip, PROX_PORT)
971             except (socket.gaierror, socket.error):
972                 continue
973             else:
974                 return client
975
976         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
977         raise Exception(msg.format(self._ip, PROX_PORT))
978
979
980 class ProxDataHelper(object):
981
982     def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
983         super(ProxDataHelper, self).__init__()
984         self.vnfd_helper = vnfd_helper
985         self.sut = sut
986         self.pkt_size = pkt_size
987         self.value = value
988         self.tolerated_loss = tolerated_loss
989         self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
990         self.tsc_hz = None
991         self.measured_stats = None
992         self.latency = None
993         self._totals_and_pps = None
994         self.result_tuple = None
995
996     @property
997     def totals_and_pps(self):
998         if self._totals_and_pps is None:
999             rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1000             pps = self.value / 100.0 * self.line_rate_to_pps()
1001             self._totals_and_pps = rx_total, tx_total, pps
1002         return self._totals_and_pps
1003
1004     @property
1005     def rx_total(self):
1006         return self.totals_and_pps[0]
1007
1008     @property
1009     def tx_total(self):
1010         return self.totals_and_pps[1]
1011
1012     @property
1013     def pps(self):
1014         return self.totals_and_pps[2]
1015
1016     @property
1017     def samples(self):
1018         samples = {}
1019         for port_name, port_num in self.vnfd_helper.ports_iter():
1020             port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1021             samples[port_name] = {
1022                 "in_packets": port_rx_total,
1023                 "out_packets": port_tx_total,
1024             }
1025         return samples
1026
1027     def __enter__(self):
1028         self.check_interface_count()
1029         return self
1030
1031     def __exit__(self, exc_type, exc_val, exc_tb):
1032         self.make_tuple()
1033
1034     def make_tuple(self):
1035         if self.result_tuple:
1036             return
1037
1038         self.result_tuple = ProxTestDataTuple(
1039             self.tolerated_loss,
1040             self.tsc_hz,
1041             self.measured_stats['delta'].rx,
1042             self.measured_stats['delta'].tx,
1043             self.measured_stats['delta'].tsc,
1044             self.latency,
1045             self.rx_total,
1046             self.tx_total,
1047             self.pps,
1048         )
1049         self.result_tuple.log_data()
1050
1051     @contextmanager
1052     def measure_tot_stats(self):
1053         with self.sut.measure_tot_stats() as self.measured_stats:
1054             yield
1055
1056     def check_interface_count(self):
1057         # do this assert in init?  unless we expect interface count to
1058         # change from one run to another run...
1059         assert self.port_count in {1, 2, 4}, \
1060             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1061
1062     def capture_tsc_hz(self):
1063         self.tsc_hz = float(self.sut.hz())
1064
1065     def line_rate_to_pps(self):
1066         # FIXME Don't hardcode 10Gb/s
1067         return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1068
1069
1070 class ProxProfileHelper(object):
1071
1072     __prox_profile_type__ = "Generic"
1073
1074     PROX_CORE_GEN_MODE = "gen"
1075     PROX_CORE_LAT_MODE = "lat"
1076
1077     @classmethod
1078     def get_cls(cls, helper_type):
1079         """Return class of specified type."""
1080         if not helper_type:
1081             return ProxProfileHelper
1082
1083         for profile_helper_class in utils.itersubclasses(cls):
1084             if helper_type == profile_helper_class.__prox_profile_type__:
1085                 return profile_helper_class
1086
1087         return ProxProfileHelper
1088
1089     @classmethod
1090     def make_profile_helper(cls, resource_helper):
1091         return cls.get_cls(resource_helper.test_type)(resource_helper)
1092
1093     def __init__(self, resource_helper):
1094         super(ProxProfileHelper, self).__init__()
1095         self.resource_helper = resource_helper
1096         self._cpu_topology = None
1097         self._test_cores = None
1098         self._latency_cores = None
1099
1100     @property
1101     def cpu_topology(self):
1102         if not self._cpu_topology:
1103             stdout = io.BytesIO()
1104             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1105             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1106         return self._cpu_topology
1107
1108     @property
1109     def test_cores(self):
1110         if not self._test_cores:
1111             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1112         return self._test_cores
1113
1114     @property
1115     def latency_cores(self):
1116         if not self._latency_cores:
1117             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1118         return self._latency_cores
1119
1120     @contextmanager
1121     def traffic_context(self, pkt_size, value):
1122         self.sut.stop_all()
1123         self.sut.reset_stats()
1124         try:
1125             self.sut.set_pkt_size(self.test_cores, pkt_size)
1126             self.sut.set_speed(self.test_cores, value)
1127             self.sut.start_all()
1128             yield
1129         finally:
1130             self.sut.stop_all()
1131
1132     def get_cores(self, mode):
1133         cores = []
1134
1135         for section_name, section in self.setup_helper.prox_config_data:
1136             if not section_name.startswith("core"):
1137                 continue
1138
1139             for key, value in section:
1140                 if key == "mode" and value == mode:
1141                     core_tuple = CoreSocketTuple(section_name)
1142                     core = core_tuple.find_in_topology(self.cpu_topology)
1143                     cores.append(core)
1144
1145         return cores
1146
1147     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1148         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1149
1150         with data_helper, self.traffic_context(pkt_size, value):
1151             with data_helper.measure_tot_stats():
1152                 time.sleep(duration)
1153                 # Getting statistics to calculate PPS at right speed....
1154                 data_helper.capture_tsc_hz()
1155                 data_helper.latency = self.get_latency()
1156
1157         return data_helper.result_tuple, data_helper.samples
1158
1159     def get_latency(self):
1160         """
1161         :return: return lat_min, lat_max, lat_avg
1162         :rtype: list
1163         """
1164         if self._latency_cores:
1165             return self.sut.lat_stats(self._latency_cores)
1166         return []
1167
1168     def terminate(self):
1169         pass
1170
1171     def __getattr__(self, item):
1172         return getattr(self.resource_helper, item)
1173
1174
1175 class ProxMplsProfileHelper(ProxProfileHelper):
1176
1177     __prox_profile_type__ = "MPLS tag/untag"
1178
1179     def __init__(self, resource_helper):
1180         super(ProxMplsProfileHelper, self).__init__(resource_helper)
1181         self._cores_tuple = None
1182
1183     @property
1184     def mpls_cores(self):
1185         if not self._cores_tuple:
1186             self._cores_tuple = self.get_cores_mpls()
1187         return self._cores_tuple
1188
1189     @property
1190     def tagged_cores(self):
1191         return self.mpls_cores[0]
1192
1193     @property
1194     def plain_cores(self):
1195         return self.mpls_cores[1]
1196
1197     def get_cores_mpls(self):
1198         cores_tagged = []
1199         cores_plain = []
1200         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1201             if not section_name.startswith("core"):
1202                 continue
1203
1204             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1205                 continue
1206
1207             for item_key, item_value in section:
1208                 if item_key != 'name':
1209                     continue
1210
1211                 if item_value.startswith("tag"):
1212                     core_tuple = CoreSocketTuple(section_name)
1213                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1214                     cores_tagged.append(core_tag)
1215
1216                 elif item_value.startswith("udp"):
1217                     core_tuple = CoreSocketTuple(section_name)
1218                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1219                     cores_plain.append(core_udp)
1220
1221         return cores_tagged, cores_plain
1222
1223     @contextmanager
1224     def traffic_context(self, pkt_size, value):
1225         self.sut.stop_all()
1226         self.sut.reset_stats()
1227         try:
1228             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1229             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1230             self.sut.set_speed(self.tagged_cores, value)
1231             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1232             self.sut.set_speed(self.plain_cores, value * ratio)
1233             self.sut.start_all()
1234             yield
1235         finally:
1236             self.sut.stop_all()
1237
1238
1239 class ProxBngProfileHelper(ProxProfileHelper):
1240
1241     __prox_profile_type__ = "BNG gen"
1242
1243     def __init__(self, resource_helper):
1244         super(ProxBngProfileHelper, self).__init__(resource_helper)
1245         self._cores_tuple = None
1246
1247     @property
1248     def bng_cores(self):
1249         if not self._cores_tuple:
1250             self._cores_tuple = self.get_cores_gen_bng_qos()
1251         return self._cores_tuple
1252
1253     @property
1254     def cpe_cores(self):
1255         return self.bng_cores[0]
1256
1257     @property
1258     def inet_cores(self):
1259         return self.bng_cores[1]
1260
1261     @property
1262     def arp_cores(self):
1263         return self.bng_cores[2]
1264
1265     @property
1266     def arp_task_cores(self):
1267         return self.bng_cores[3]
1268
1269     @property
1270     def all_rx_cores(self):
1271         return self.latency_cores
1272
1273     def get_cores_gen_bng_qos(self):
1274         cpe_cores = []
1275         inet_cores = []
1276         arp_cores = []
1277         arp_tasks_core = [0]
1278         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1279             if not section_name.startswith("core"):
1280                 continue
1281
1282             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1283                 continue
1284
1285             for item_key, item_value in section:
1286                 if item_key != 'name':
1287                     continue
1288
1289                 if item_value.startswith("cpe"):
1290                     core_tuple = CoreSocketTuple(section_name)
1291                     cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1292                     cpe_cores.append(cpe_core)
1293
1294                 elif item_value.startswith("inet"):
1295                     core_tuple = CoreSocketTuple(section_name)
1296                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1297                     inet_cores.append(inet_core)
1298
1299                 elif item_value.startswith("arp"):
1300                     core_tuple = CoreSocketTuple(section_name)
1301                     arp_core = core_tuple.find_in_topology(self.cpu_topology)
1302                     arp_cores.append(arp_core)
1303
1304                 # We check the tasks/core separately
1305                 if item_value.startswith("arp_task"):
1306                     core_tuple = CoreSocketTuple(section_name)
1307                     arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1308                     arp_tasks_core.append(arp_task_core)
1309
1310         return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1311
1312     @contextmanager
1313     def traffic_context(self, pkt_size, value):
1314         # Tester is sending packets at the required speed already after
1315         # setup_test(). Just get the current statistics, sleep the required
1316         # amount of time and calculate packet loss.
1317         inet_pkt_size = pkt_size
1318         cpe_pkt_size = pkt_size - 24
1319         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1320
1321         curr_up_speed = curr_down_speed = 0
1322         max_up_speed = max_down_speed = value
1323         if ratio < 1:
1324             max_down_speed = value * ratio
1325         else:
1326             max_up_speed = value / ratio
1327
1328         # Initialize cores
1329         self.sut.stop_all()
1330         time.sleep(0.5)
1331
1332         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1333         # wrong.
1334         self.sut.start(self.all_rx_cores)
1335         time.sleep(0.5)
1336         self.sut.stop(self.all_rx_cores)
1337         time.sleep(0.5)
1338         self.sut.reset_stats()
1339
1340         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1341         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1342
1343         self.sut.reset_values(self.cpe_cores)
1344         self.sut.reset_values(self.inet_cores)
1345
1346         # Set correct IP and UDP lengths in packet headers
1347         # CPE
1348         # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1349         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1350         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1351         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1352
1353         # INET
1354         # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1355         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1356         # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1357         self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1358         # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1359         self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1360
1361         # Sending ARP to initialize tables - need a few seconds of generation
1362         # to make sure all CPEs are initialized
1363         LOG.info("Initializing SUT: sending ARP packets")
1364         self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1365         self.sut.set_speed(self.inet_cores, curr_up_speed)
1366         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1367         self.sut.start(self.arp_cores)
1368         time.sleep(4)
1369
1370         # Ramp up the transmission speed. First go to the common speed, then
1371         # increase steps for the faster one.
1372         self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1373
1374         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1375
1376         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1377             # The min(..., ...) takes care of 1) floating point rounding errors
1378             # that could make curr_*_speed to be slightly greater than
1379             # max_*_speed and 2) max_*_speed not being an exact multiple of
1380             # self._step_delta.
1381             if curr_up_speed < max_up_speed:
1382                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1383             if curr_down_speed < max_down_speed:
1384                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1385
1386             self.sut.set_speed(self.inet_cores, curr_up_speed)
1387             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1388             time.sleep(self.step_time)
1389
1390         LOG.info("Target speeds reached. Starting real test.")
1391
1392         yield
1393
1394         self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1395         LOG.info("Test ended. Flushing NIC buffers")
1396         self.sut.start(self.all_rx_cores)
1397         time.sleep(3)
1398         self.sut.stop(self.all_rx_cores)
1399
1400     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1401         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1402
1403         with data_helper, self.traffic_context(pkt_size, value):
1404             with data_helper.measure_tot_stats():
1405                 time.sleep(duration)
1406                 # Getting statistics to calculate PPS at right speed....
1407                 data_helper.capture_tsc_hz()
1408                 data_helper.latency = self.get_latency()
1409
1410         return data_helper.result_tuple, data_helper.samples
1411
1412
1413 class ProxVpeProfileHelper(ProxProfileHelper):
1414
1415     __prox_profile_type__ = "vPE gen"
1416
1417     def __init__(self, resource_helper):
1418         super(ProxVpeProfileHelper, self).__init__(resource_helper)
1419         self._cores_tuple = None
1420         self._ports_tuple = None
1421
1422     @property
1423     def vpe_cores(self):
1424         if not self._cores_tuple:
1425             self._cores_tuple = self.get_cores_gen_vpe()
1426         return self._cores_tuple
1427
1428     @property
1429     def cpe_cores(self):
1430         return self.vpe_cores[0]
1431
1432     @property
1433     def inet_cores(self):
1434         return self.vpe_cores[1]
1435
1436     @property
1437     def all_rx_cores(self):
1438         return self.latency_cores
1439
1440     @property
1441     def vpe_ports(self):
1442         if not self._ports_tuple:
1443             self._ports_tuple = self.get_ports_gen_vpe()
1444         return self._ports_tuple
1445
1446     @property
1447     def cpe_ports(self):
1448         return self.vpe_ports[0]
1449
1450     @property
1451     def inet_ports(self):
1452         return self.vpe_ports[1]
1453
1454     def get_cores_gen_vpe(self):
1455         cpe_cores = []
1456         inet_cores = []
1457         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1458             if not section_name.startswith("core"):
1459                 continue
1460
1461             if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1462                 continue
1463
1464             for item_key, item_value in section:
1465                 if item_key != 'name':
1466                     continue
1467
1468                 if item_value.startswith("cpe"):
1469                     core_tuple = CoreSocketTuple(section_name)
1470                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1471                     cpe_cores.append(core_tag)
1472
1473                 elif item_value.startswith("inet"):
1474                     core_tuple = CoreSocketTuple(section_name)
1475                     inet_core = core_tuple.find_in_topology(self.cpu_topology)
1476                     inet_cores.append(inet_core)
1477
1478         return cpe_cores, inet_cores
1479
1480     def get_ports_gen_vpe(self):
1481         cpe_ports = []
1482         inet_ports = []
1483
1484         for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1485             if not section_name.startswith("port"):
1486                 continue
1487             tx_port_iter = re.finditer(r'\d+', section_name)
1488             tx_port_no = int(next(tx_port_iter).group(0))
1489
1490             for item_key, item_value in section:
1491                 if item_key != 'name':
1492                     continue
1493
1494             for item_key, item_value in section:
1495                 if item_value.startswith("cpe"):
1496                     cpe_ports.append(tx_port_no)
1497
1498                 elif item_value.startswith("inet"):
1499                     inet_ports.append(tx_port_no)
1500
1501         return cpe_ports, inet_ports
1502
1503     @contextmanager
1504     def traffic_context(self, pkt_size, value):
1505         # Calculate the target upload and download speed. The upload and
1506         # download packets have different packet sizes, so in order to get
1507         # equal bandwidth usage, the ratio of the speeds has to match the ratio
1508         # of the packet sizes.
1509         cpe_pkt_size = pkt_size
1510         inet_pkt_size = pkt_size - 4
1511         ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1512
1513         curr_up_speed = curr_down_speed = 0
1514         max_up_speed = max_down_speed = value
1515         if ratio < 1:
1516             max_down_speed = value * ratio
1517         else:
1518             max_up_speed = value / ratio
1519
1520         # Adjust speed when multiple cores per port are used to generate traffic
1521         if len(self.cpe_ports) != len(self.cpe_cores):
1522             max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1523         if len(self.inet_ports) != len(self.inet_cores):
1524             max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1525
1526         # Initialize cores
1527         self.sut.stop_all()
1528         time.sleep(2)
1529
1530         # Flush any packets in the NIC RX buffers, otherwise the stats will be
1531         # wrong.
1532         self.sut.start(self.all_rx_cores)
1533         time.sleep(2)
1534         self.sut.stop(self.all_rx_cores)
1535         time.sleep(2)
1536         self.sut.reset_stats()
1537
1538         self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1539         self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1540
1541         self.sut.reset_values(self.cpe_cores)
1542         self.sut.reset_values(self.inet_cores)
1543
1544         # Set correct IP and UDP lengths in packet headers
1545         # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1546         self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1547         # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1548         self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1549
1550         # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1551         self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1552         # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1553         self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1554
1555         self.sut.set_speed(self.inet_cores, curr_up_speed)
1556         self.sut.set_speed(self.cpe_cores, curr_down_speed)
1557
1558         # Ramp up the transmission speed. First go to the common speed, then
1559         # increase steps for the faster one.
1560         self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1561
1562         LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1563
1564         while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1565             # The min(..., ...) takes care of 1) floating point rounding errors
1566             # that could make curr_*_speed to be slightly greater than
1567             # max_*_speed and 2) max_*_speed not being an exact multiple of
1568             # self._step_delta.
1569             if curr_up_speed < max_up_speed:
1570                 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1571             if curr_down_speed < max_down_speed:
1572                 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1573
1574             self.sut.set_speed(self.inet_cores, curr_up_speed)
1575             self.sut.set_speed(self.cpe_cores, curr_down_speed)
1576             time.sleep(self.step_time)
1577
1578         LOG.info("Target speeds reached. Starting real test.")
1579
1580         yield
1581
1582         self.sut.stop(self.cpe_cores + self.inet_cores)
1583         LOG.info("Test ended. Flushing NIC buffers")
1584         self.sut.start(self.all_rx_cores)
1585         time.sleep(3)
1586         self.sut.stop(self.all_rx_cores)
1587
1588     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1589         data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1590
1591         with data_helper, self.traffic_context(pkt_size, value):
1592             with data_helper.measure_tot_stats():
1593                 time.sleep(duration)
1594                 # Getting statistics to calculate PPS at right speed....
1595                 data_helper.capture_tsc_hz()
1596                 data_helper.latency = self.get_latency()
1597
1598         return data_helper.result_tuple, data_helper.samples