1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39 from yardstick.network_services import constants
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48 LOG_RESULT = logging.getLogger('yardstick')
49 LOG_RESULT.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,'
134 return 1e2 * self.drop_total / float(self.tx_total)
135 except ZeroDivisionError:
140 # calculate the effective throughput in Mpps
141 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
145 # calculate the effective throughput in Mpps
146 return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
149 def can_be_lost(self):
150 return int(self.tx_total * self.tolerated / 1e2)
153 def drop_total(self):
154 return self.tx_total - self.rx_total
158 return self.drop_total <= self.can_be_lost
160 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
162 pkt_loss = self.pkt_loss
164 if port_samples is None:
174 "Throughput": self.rx_mpps,
175 "RxThroughput": self.rx_mpps,
176 "DropPackets": pkt_loss,
177 "CurrentDropPackets": pkt_loss,
178 "RequestedTxThroughput": self.requested_pps / 1e6,
179 "TxThroughput": self.tx_mpps,
183 samples.update(port_samples)
185 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
188 def log_data(self, logger=None):
192 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
193 logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
194 logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
195 self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
198 class PacketDump(object):
200 def assert_func(func, value1, value2, template=None):
201 assert func(value1, value2), template.format(value1, value2)
203 def __init__(self, port_id, data_len, payload):
204 template = "Packet dump has specified length {}, but payload is {} bytes long"
205 self.assert_func(operator.eq, data_len, len(payload), template)
206 self._port_id = port_id
207 self._data_len = data_len
208 self._payload = payload
212 """Get the port id of the packet dump"""
217 """Get the length of the data received"""
218 return self._data_len
221 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
223 def payload(self, start=None, end=None):
224 """Get part of the payload as a list of ordinals.
226 Returns a list of byte values, matching the contents of the packet dump.
227 Optional start and end parameters can be specified to retrieve only a
228 part of the packet contents.
230 The number of elements in the list is equal to end - start + 1, so end
231 is the offset of the last character.
234 start (pos. int): the starting offset in the payload. If it is not
235 specified or None, offset 0 is assumed.
236 end (pos. int): the ending offset of the payload. If it is not
237 specified or None, the contents until the end of the packet are
241 [int, int, ...]. Each int represents the ordinal value of a byte in
248 end = self.data_len - 1
250 # Bounds checking on offsets
251 template = "Start offset must be non-negative"
252 self.assert_func(operator.ge, start, 0, template)
254 template = "End offset must be less than {1}"
255 self.assert_func(operator.lt, end, self.data_len, template)
257 # Adjust for splice operation: end offset must be 1 more than the offset
258 # of the last desired character.
261 return self._payload[start:end]
264 class ProxSocketHelper(object):
266 def __init__(self, sock=None):
267 """ creates new prox instance """
268 super(ProxSocketHelper, self).__init__()
271 sock = socket.socket()
275 self.master_stats = None
277 def connect(self, ip, port):
278 """Connect to the prox instance on the remote system"""
279 self._sock.connect((ip, port))
281 def get_socket(self):
282 """ get the socket connected to the remote instance """
285 def _parse_socket_data(self, decoded_data, pkt_dump_only):
286 def get_newline_index():
287 return decoded_data.find('\n', index)
291 for newline_index in iter(get_newline_index, -1):
292 ret_str = decoded_data[index:newline_index]
295 mode, port_id, data_len = ret_str.split(',', 2)
297 mode, port_id, data_len = None, None, None
299 if mode != 'pktdump':
300 # Regular 1-line message. Stop reading from the socket.
301 LOG.debug("Regular response read")
304 LOG.debug("Packet dump header read: [%s]", ret_str)
306 # The line is a packet dump header. Parse it, read the
307 # packet payload, store the dump for later retrieval.
308 # Skip over the packet dump and continue processing: a
309 # 1-line response may follow the packet dump.
311 data_len = int(data_len)
312 data_start = newline_index + 1 # + 1 to skip over \n
313 data_end = data_start + data_len
314 sub_data = decoded_data[data_start:data_end]
315 pkt_payload = array.array('B', (ord(v) for v in sub_data))
316 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
317 self._pkt_dumps.append(pkt_dump)
320 # Return boolean instead of string to signal
321 # successful reception of the packet dump.
322 LOG.debug("Packet dump stored, returning")
327 return ret_str, False
329 def get_string(self, pkt_dump_only=False, timeout=0.01):
331 def is_ready_string():
332 # recv() is blocking, so avoid calling it when no data is waiting.
333 ready = select.select([self._sock], [], [], timeout)
334 return bool(ready[0])
338 while status is False:
339 for status in iter(is_ready_string, False):
340 decoded_data = self._sock.recv(256).decode('utf-8')
341 ret_str, done = self._parse_socket_data(decoded_data,
347 LOG.debug("Received data from socket: [%s]", ret_str)
348 return status, ret_str
350 def get_data(self, pkt_dump_only=False, timeout=0.01):
351 """ read data from the socket """
353 # This method behaves slightly differently depending on whether it is
354 # called to read the response to a command (pkt_dump_only = 0) or if
355 # it is called specifically to read a packet dump (pkt_dump_only = 1).
357 # Packet dumps look like:
358 # pktdump,<port_id>,<data_len>\n
359 # <packet contents as byte array>\n
360 # This means the total packet dump message consists of 2 lines instead
363 # - Response for a command (pkt_dump_only = 0):
364 # 1) Read response from the socket until \n (end of message)
365 # 2a) If the response is a packet dump header (starts with "pktdump,"):
366 # - Read the packet payload and store the packet dump for later
368 # - Reset the state and restart from 1). Eventually state 2b) will
369 # be reached and the function will return.
370 # 2b) If the response is not a packet dump:
371 # - Return the received message as a string
373 # - Explicit request to read a packet dump (pkt_dump_only = 1):
374 # - Read the dump header and payload
375 # - Store the packet dump for later retrieval
376 # - Return True to signify a packet dump was successfully read
379 # recv() is blocking, so avoid calling it when no data is waiting.
380 ready = select.select([self._sock], [], [], timeout)
381 return bool(ready[0])
385 for status in iter(is_ready, False):
386 decoded_data = self._sock.recv(256).decode('utf-8')
387 ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
391 LOG.debug("Received data from socket: [%s]", ret_str)
392 return ret_str if status else ''
394 def put_command(self, to_send):
395 """ send data to the remote instance """
396 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
398 # NOTE: sendall will block, we need a timeout
399 self._sock.sendall(to_send.encode('utf-8'))
400 except: # pylint: disable=bare-except
403 def get_packet_dump(self):
404 """ get the next packet dump """
406 return self._pkt_dumps.pop(0)
409 def stop_all_reset(self):
410 """ stop the remote instance and reset stats """
411 LOG.debug("Stop all and reset stats")
416 """ stop all cores on the remote instance """
417 LOG.debug("Stop all")
418 self.put_command("stop all\n")
420 def stop(self, cores, task=''):
421 """ stop specific cores on the remote instance """
425 if core not in tmpcores:
426 tmpcores.append(core)
428 LOG.debug("Stopping cores %s", tmpcores)
429 self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
432 """ start all cores on the remote instance """
433 LOG.debug("Start all")
434 self.put_command("start all\n")
436 def start(self, cores):
437 """ start specific cores on the remote instance """
441 if core not in tmpcores:
442 tmpcores.append(core)
444 LOG.debug("Starting cores %s", tmpcores)
445 self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
447 def reset_stats(self):
448 """ reset the statistics on the remote instance """
449 LOG.debug("Reset stats")
450 self.put_command("reset stats\n")
452 def _run_template_over_cores(self, template, cores, *args):
454 self.put_command(template.format(core, *args))
456 def set_pkt_size(self, cores, pkt_size):
457 """ set the packet size to generate on the remote instance """
458 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
460 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
462 def set_value(self, cores, offset, value, length):
463 """ set value on the remote instance """
464 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
465 LOG.debug(msg, cores, value, length, offset)
466 template = "set value {} 0 {} {} {}\n"
467 self._run_template_over_cores(template, cores, offset, value, length)
469 def reset_values(self, cores):
470 """ reset values on the remote instance """
471 LOG.debug("Set value for core(s) %s", cores)
472 self._run_template_over_cores("reset values {} 0\n", cores)
474 def set_speed(self, cores, speed, tasks=None):
475 """ set speed on the remote instance """
477 tasks = [0] * len(cores)
478 elif len(tasks) != len(cores):
479 LOG.error("set_speed: cores and tasks must have the same len")
480 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
481 for (core, task) in list(zip(cores, tasks)):
482 self.put_command("speed {} {} {}\n".format(core, task, speed))
484 def slope_speed(self, cores_speed, duration, n_steps=0):
485 """will start to increase speed from 0 to N where N is taken from
486 a['speed'] for each a in cores_speed"""
487 # by default, each step will take 0.5 sec
489 n_steps = duration * 2
491 private_core_data = []
492 step_duration = float(duration) / n_steps
493 for core_data in cores_speed:
494 target = float(core_data['speed'])
495 private_core_data.append({
496 'cores': core_data['cores'],
498 'delta': target / n_steps,
503 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
504 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
505 time.sleep(step_duration)
506 for core_data in private_core_data:
507 core_data['current'] = core_data[key1] + core_data[key2]
508 self.set_speed(core_data['cores'], core_data['current'])
510 def set_pps(self, cores, pps, pkt_size,
511 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
512 """ set packets per second for specific cores on the remote instance """
513 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
514 LOG.debug(msg, cores, pps, pkt_size)
516 # speed in percent of line-rate
517 speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
518 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
520 def lat_stats(self, cores, task=0):
521 """Get the latency statistics from the remote system"""
522 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
527 self.put_command("lat stats {} {} \n".format(core, task))
528 ret = self.get_data()
531 lat_min[core], lat_max[core], lat_avg[core] = \
532 tuple(int(n) for n in ret.split(",")[:3])
534 except (AttributeError, ValueError, TypeError):
537 return lat_min, lat_max, lat_avg
539 def get_all_tot_stats(self):
540 self.put_command("tot stats\n")
541 all_stats_str = self.get_data().split(",")
542 if len(all_stats_str) != 4:
545 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
546 self.master_stats = all_stats
550 return self.get_all_tot_stats()[3]
552 def core_stats(self, cores, task=0):
553 """Get the receive statistics from the remote system"""
554 rx = tx = drop = tsc = 0
556 self.put_command("core stats {} {}\n".format(core, task))
557 ret = self.get_data().split(",")
562 return rx, tx, drop, tsc
564 def irq_core_stats(self, cores_tasks):
565 """ get IRQ stats per core"""
570 for core, task in cores_tasks:
571 self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0),"
572 "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2),"
573 "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4),"
574 "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6),"
575 "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8),"
576 "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10),"
577 "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)"
578 "\n".format(core, task, core, task, core, task, core, task,
579 core, task, core, task, core, task, core, task,
580 core, task, core, task, core, task, core, task,
581 core, task, core, task))
582 in_data_str = self.get_data().split(",")
583 ret = [try_int(s, 0) for s in in_data_str]
584 key = "core_" + str(core)
586 stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1],
587 "bucket_1" : ret[2], "bucket_2" : ret[3],
588 "bucket_3" : ret[4], "bucket_4" : ret[5],
589 "bucket_5" : ret[6], "bucket_6" : ret[7],
590 "bucket_7" : ret[8], "bucket_8" : ret[9],
591 "bucket_9" : ret[10], "bucket_10" : ret[11],
592 "bucket_11" : ret[12], "bucket_12" : ret[13],
593 "overflow": ret[10] + ret[11] + ret[12] + ret[13]}
594 except (KeyError, IndexError):
595 LOG.error("Corrupted PACKET %s", in_data_str)
599 def multi_port_stats(self, ports):
600 """get counter values from all ports at once"""
602 ports_str = ",".join(map(str, ports))
604 tot_result = [0] * len(ports)
607 while (len(ports) is not len(ports_all_data)):
608 self.put_command("multi port stats {}\n".format(ports_str))
609 status, ports_all_data_str = self.get_string()
614 ports_all_data = ports_all_data_str.split(";")
616 if len(ports) is len(ports_all_data):
617 for port_data_str in ports_all_data:
621 tmpdata = [try_int(s, 0) for s in port_data_str.split(",")]
622 except (IndexError, TypeError):
623 LOG.error("Unpacking data error %s", port_data_str)
626 if (len(tmpdata) < 6) or tmpdata[0] not in ports:
627 LOG.error("Corrupted PACKET %s - retrying", port_data_str)
630 tot_result[port_index] = tmpdata
631 port_index = port_index + 1
633 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
636 LOG.debug("Multi port packet ..OK.. %s", tot_result)
637 return True, tot_result
640 def multi_port_stats_tuple(stats, ports):
642 Create a statistics tuple from port stats.
644 Returns a dict with contains the port stats indexed by port name
646 :param stats: (List) - List of List of port stats in pps
647 :param ports (Iterator) - to List of Ports
649 :return: (Dict) of port stats indexed by port_name
655 port_names = {port_num: port_name for port_name, port_num in ports}
656 except (TypeError, IndexError, KeyError):
657 LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR")
663 samples[port_names[port_num]] = {
664 "in_packets": stat[1],
665 "out_packets": stat[2]}
666 except (TypeError, IndexError, KeyError):
667 LOG.error("Ports data and samples data is incompatable ....")
673 def multi_port_stats_diff(prev_stats, new_stats, hz):
675 Create a statistics tuple from difference between prev port stats
676 and current port stats. And store results in pps.
678 :param prev_stats: (List) - Previous List of port statistics
679 :param new_stats: (List) - Current List of port statistics
680 :param hz (float) - speed of system in Hz
682 :return: sample (List) - Difference of prev_port_stats and
683 new_port_stats in pps
692 if len(prev_stats) is not len(new_stats):
693 for port_index, stat in enumerate(new_stats):
694 stats.append([port_index, float(0), float(0), 0, 0, 0])
698 for port_index, stat in enumerate(new_stats):
699 if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]:
700 rx_total = stat[RX_TOTAL_INDEX] - \
701 prev_stats[port_index][RX_TOTAL_INDEX]
703 rx_total = stat[RX_TOTAL_INDEX]
705 if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]:
706 tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX]
708 tx_total = stat[TX_TOTAL_INDEX]
710 if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]:
711 tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX]
713 tsc = stat[TSC_INDEX]
716 rx_total = tx_total = float(0)
719 LOG.error("HZ is ZERO ..")
720 rx_total = tx_total = float(0)
722 rx_total = float(rx_total * hz / tsc)
723 tx_total = float(tx_total * hz / tsc)
725 stats.append([port_index, rx_total, tx_total, 0, 0, tsc])
726 except (TypeError, IndexError, KeyError):
728 LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded")
732 def port_stats(self, ports):
733 """get counter values from a specific port"""
734 tot_result = [0] * 12
736 self.put_command("port_stats {}\n".format(port))
737 ret = [try_int(s, 0) for s in self.get_data().split(",")]
738 tot_result = [sum(x) for x in zip(tot_result, ret)]
742 def measure_tot_stats(self):
743 start = self.get_all_tot_stats()
744 container = {'start_tot': start}
748 container['end_tot'] = end = self.get_all_tot_stats()
750 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
753 """Get the total statistics from the remote system"""
754 stats = self.get_all_tot_stats()
757 def tot_ierrors(self):
758 """Get the total ierrors from the remote system"""
759 self.put_command("tot ierrors tot\n")
760 recv = self.get_data().split(',')
761 tot_ierrors = int(recv[0])
763 return tot_ierrors, tsc
765 def set_count(self, count, cores):
766 """Set the number of packets to send on the specified core"""
767 self._run_template_over_cores("count {} 0 {}\n", cores, count)
769 def dump_rx(self, core_id, task_id=0, count=1):
770 """Activate dump on rx on the specified core"""
771 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
772 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
773 time.sleep(1.5) # Give PROX time to set up packet dumping
781 """ stop all cores on the remote instance """
782 LOG.debug("Quit prox")
783 self.put_command("quit\n")
786 def force_quit(self):
787 """ stop all cores on the remote instance """
788 LOG.debug("Force Quit prox")
789 self.put_command("quit_force\n")
792 _LOCAL_OBJECT = object()
795 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
796 # the actual app is lowercase
798 # not used for Prox but added for consistency
801 LUA_PARAMETER_NAME = ""
802 LUA_PARAMETER_PEER = {
807 CONFIG_QUEUE_TIMEOUT = 120
809 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
810 self.remote_path = None
811 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
812 self.remote_prox_file_name = None
813 self._prox_config_data = None
814 self.additional_files = {}
815 self.config_queue = Queue()
816 # allow_exit_without_flush
817 self.config_queue.cancel_join_thread()
818 self._global_section = None
821 def prox_config_data(self):
822 if self._prox_config_data is None:
823 # this will block, but it needs too
824 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
825 return self._prox_config_data
828 def global_section(self):
829 if self._global_section is None and self.prox_config_data:
830 self._global_section = self.find_section("global")
831 return self._global_section
833 def find_section(self, name, default=_LOCAL_OBJECT):
834 result = next((value for key, value in self.prox_config_data if key == name), default)
835 if result is _LOCAL_OBJECT:
836 raise KeyError('{} not found in Prox config'.format(name))
839 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
840 section = self.find_section(section_name, [])
841 result = next((value for key, value in section if key == section_key), default)
842 if result is _LOCAL_OBJECT:
843 template = '{} not found in {} section of Prox config'
844 raise KeyError(template.format(section_key, section_name))
847 def copy_to_target(self, config_file_path, prox_file):
848 remote_path = os.path.join("/tmp", prox_file)
849 self.ssh_helper.put(config_file_path, remote_path)
853 def _get_tx_port(section, sections):
855 for item in sections[section]:
856 if item[0] == "tx port":
857 iface_port = re.findall(r'\d+', item[1])
858 # do we want the last one?
859 # if yes, then can we reverse?
860 return int(iface_port[0])
863 def _replace_quoted_with_value(quoted, value, count=1):
864 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
867 def _insert_additional_file(self, value):
868 file_str = value.split('"')
869 base_name = os.path.basename(file_str[1])
870 file_str[1] = self.additional_files[base_name]
871 return '"'.join(file_str)
873 def generate_prox_config_file(self, config_path):
875 prox_config = ConfigParser(config_path, sections)
878 # Ensure MAC is set "hardware"
879 all_ports = self.vnfd_helper.port_pairs.all_ports
880 # use dpdk port number
881 for port_name in all_ports:
882 port_num = self.vnfd_helper.port_num(port_name)
883 port_section_name = "port {}".format(port_num)
884 for section_name, section in sections:
885 if port_section_name != section_name:
888 for section_data in section:
889 if section_data[0] == "mac":
890 section_data[1] = "hardware"
893 for _, section in sections:
894 for section_data in section:
895 item_key, item_val = section_data
896 if item_val.startswith("@@dst_mac"):
897 tx_port_iter = re.finditer(r'\d+', item_val)
898 tx_port_no = int(next(tx_port_iter).group(0))
899 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
900 mac = intf["virtual-interface"]["dst_mac"]
901 section_data[1] = mac.replace(":", " ", 6)
903 if item_key == "dst mac" and item_val.startswith("@@"):
904 tx_port_iter = re.finditer(r'\d+', item_val)
905 tx_port_no = int(next(tx_port_iter).group(0))
906 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
907 mac = intf["virtual-interface"]["dst_mac"]
908 section_data[1] = mac
910 if item_val.startswith("@@src_mac"):
911 tx_port_iter = re.finditer(r'\d+', item_val)
912 tx_port_no = int(next(tx_port_iter).group(0))
913 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
914 mac = intf["virtual-interface"]["local_mac"]
915 section_data[1] = mac.replace(":", " ", 6)
917 if item_key == "src mac" and item_val.startswith("@@"):
918 tx_port_iter = re.finditer(r'\d+', item_val)
919 tx_port_no = int(next(tx_port_iter).group(0))
920 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
921 mac = intf["virtual-interface"]["local_mac"]
922 section_data[1] = mac
924 # if addition file specified in prox config
925 if not self.additional_files:
928 for section_name, section in sections:
929 for section_data in section:
931 if section_data[0].startswith("dofile"):
932 section_data[0] = self._insert_additional_file(section_data[0])
934 if section_data[1].startswith("dofile"):
935 section_data[1] = self._insert_additional_file(section_data[1])
936 except: # pylint: disable=bare-except
942 def write_prox_lua(lua_config):
944 Write an .ini-format config file for PROX (parameters.lua)
945 PROX does not allow a space before/after the =, so we need
949 for key in lua_config:
950 value = '"' + lua_config[key] + '"'
951 if key == "__name__":
953 if value is not None and value != '@':
954 key = "=".join((key, str(value).replace('\n', '\n\t')))
957 key = str(key).replace('\n', '\n\t')
959 return os.linesep.join(out)
962 def write_prox_config(prox_config):
964 Write an .ini-format config file for PROX
965 PROX does not allow a space before/after the =, so we need
969 for (section_name, section) in prox_config:
970 out.append("[{}]".format(section_name))
973 if key == "__name__":
975 if value is not None and value != '@':
976 key = "=".join((key, str(value).replace('\n', '\n\t')))
979 key = str(key).replace('\n', '\n\t')
981 return os.linesep.join(out)
983 def put_string_to_file(self, s, remote_path):
984 file_obj = cStringIO(s)
985 self.ssh_helper.put_file_obj(file_obj, remote_path)
988 def generate_prox_lua_file(self):
990 all_ports = self.vnfd_helper.port_pairs.all_ports
991 for port_name in all_ports:
992 port_num = self.vnfd_helper.port_num(port_name)
993 intf = self.vnfd_helper.find_interface(name=port_name)
994 vintf = intf['virtual-interface']
995 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
996 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
1000 def upload_prox_lua(self, config_file, lua_data):
1001 # prox can't handle spaces around ' = ' so use custom method
1002 out = StringIO(self.write_prox_lua(lua_data))
1004 remote_path = os.path.join("/tmp", config_file)
1005 self.ssh_helper.put_file_obj(out, remote_path)
1009 def upload_prox_config(self, config_file, prox_config_data):
1010 # prox can't handle spaces around ' = ' so use custom method
1011 out = StringIO(self.write_prox_config(prox_config_data))
1013 remote_path = os.path.join("/tmp", config_file)
1014 self.ssh_helper.put_file_obj(out, remote_path)
1018 def build_config_file(self):
1019 task_path = self.scenario_helper.task_path
1020 options = self.scenario_helper.options
1021 config_path = options['prox_config']
1022 config_file = os.path.basename(config_path)
1023 config_path = utils.find_relative_file(config_path, task_path)
1024 self.additional_files = {}
1027 if options['prox_generate_parameter']:
1029 self.lua = self.generate_prox_lua_file()
1030 if len(self.lua) > 0:
1031 self.upload_prox_lua("parameters.lua", self.lua)
1032 except: # pylint: disable=bare-except
1035 prox_files = options.get('prox_files', [])
1036 if isinstance(prox_files, six.string_types):
1037 prox_files = [prox_files]
1038 for key_prox_file in prox_files:
1039 base_prox_file = os.path.basename(key_prox_file)
1040 key_prox_path = utils.find_relative_file(key_prox_file, task_path)
1041 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
1042 self.additional_files[base_prox_file] = remote_prox_file
1044 self._prox_config_data = self.generate_prox_config_file(config_path)
1045 # copy config to queue so we can read it from traffic_runner process
1046 self.config_queue.put(self._prox_config_data)
1047 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
1049 def build_config(self):
1050 self.build_config_file()
1052 options = self.scenario_helper.options
1053 prox_args = options['prox_args']
1054 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
1056 self.pipeline_kwargs = {
1057 'tool_path': tool_path,
1058 'tool_dir': os.path.dirname(tool_path),
1059 'cfg_file': self.remote_path,
1060 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
1061 for k, v in prox_args.items())
1064 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
1065 "{args} -f {cfg_file} '")
1066 return cmd_template.format(**self.pipeline_kwargs)
1069 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
1070 class ProxResourceHelper(ClientResourceHelper):
1072 RESOURCE_WORD = 'prox'
1079 def find_pci(pci, bound_pci):
1080 # we have to substring match PCI bus address from the end
1081 return any(b.endswith(pci) for b in bound_pci)
1083 def __init__(self, setup_helper):
1084 super(ProxResourceHelper, self).__init__(setup_helper)
1085 self.mgmt_interface = self.vnfd_helper.mgmt_interface
1086 self._user = self.mgmt_interface["user"]
1087 self._ip = self.mgmt_interface["ip"]
1090 self._vpci_to_if_name_map = None
1091 self.additional_file = {}
1092 self.remote_prox_file_name = None
1096 self.step_time = 0.5
1097 self._test_type = None
1098 self.prev_multi_port = []
1104 self.client = self._connect()
1108 def test_type(self):
1109 if self._test_type is None:
1110 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
1111 return self._test_type
1113 def run_traffic(self, traffic_profile, *args):
1114 self._queue.cancel_join_thread()
1118 traffic_profile.init(self._queue)
1119 # this frees up the run_traffic loop
1120 self.client_started.value = 1
1122 while not self._terminated.value:
1123 # move it all to traffic_profile
1124 self._run_traffic_once(traffic_profile)
1126 def _run_traffic_once(self, traffic_profile):
1127 traffic_profile.execute_traffic(self)
1128 if traffic_profile.done.is_set():
1129 self._queue.put({'done': True})
1130 LOG.debug("tg_prox done")
1131 self._terminated.value = 1
1133 # For VNF use ResourceHelper method to collect KPIs directly.
1134 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
1135 def collect_collectd_kpi(self):
1136 return self._collect_resource_kpi()
1138 def collect_live_stats(self):
1140 for _, port_num in self.vnfd_helper.ports_iter():
1141 ports.append(port_num)
1143 ok, curr_port_stats = self.sut.multi_port_stats(ports)
1153 new_all_port_stats = \
1154 self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz)
1156 self.prev_multi_port = curr_port_stats
1158 live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats,
1159 self.vnfd_helper.ports_iter())
1160 return True, live_stats
1162 def collect_kpi(self):
1163 result = super(ProxResourceHelper, self).collect_kpi()
1164 # add in collectd kpis manually
1166 result['collect_stats'] = self._collect_resource_kpi()
1168 ok, live_stats = self.collect_live_stats()
1170 result.update({'live_stats': live_stats})
1174 def terminate(self):
1175 # should not be called, use VNF terminate
1176 raise NotImplementedError()
1179 return self.sut # force connection
1181 def execute(self, cmd, *args, **kwargs):
1182 func = getattr(self.sut, cmd, None)
1184 return func(*args, **kwargs)
1187 def _connect(self, client=None):
1188 """Run and connect to prox on the remote system """
1189 # De-allocating a large amount of hugepages takes some time. If a new
1190 # PROX instance is started immediately after killing the previous one,
1191 # it might not be able to allocate hugepages, because they are still
1192 # being freed. Hence the -w switch.
1193 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1194 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1195 # -f ./handle_none-4.cfg"
1196 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1198 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1199 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1201 # + "./build/Prox " + prox_args
1202 # log.debug("Starting PROX with command [%s]", prox_cmd)
1203 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1204 # self._ip, prox_cmd))
1206 client = ProxSocketHelper()
1208 # try connecting to Prox for 60s
1209 for _ in range(RETRY_SECONDS):
1210 time.sleep(RETRY_INTERVAL)
1212 client.connect(self._ip, PROX_PORT)
1213 except (socket.gaierror, socket.error):
1218 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1219 raise Exception(msg.format(self._ip, PROX_PORT))
1222 class ProxDataHelper(object):
1224 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1225 super(ProxDataHelper, self).__init__()
1226 self.vnfd_helper = vnfd_helper
1228 self.pkt_size = pkt_size
1230 self.line_speed = line_speed
1231 self.tolerated_loss = tolerated_loss
1232 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1234 self.measured_stats = None
1236 self._totals_and_pps = None
1237 self.result_tuple = None
1240 def totals_and_pps(self):
1241 if self._totals_and_pps is None:
1242 rx_total = tx_total = 0
1244 timeout = time.time() + constants.RETRY_TIMEOUT
1246 ok, all_ports = self.sut.multi_port_stats([
1247 self.vnfd_helper.port_num(port_name)
1248 for port_name in self.vnfd_helper.port_pairs.all_ports])
1249 if time.time() > timeout:
1252 for port in all_ports:
1253 rx_total = rx_total + port[1]
1254 tx_total = tx_total + port[2]
1255 requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1256 self._totals_and_pps = rx_total, tx_total, requested_pps
1257 return self._totals_and_pps
1262 ret_val = self.totals_and_pps[0]
1263 except (AttributeError, ValueError, TypeError, LookupError):
1270 ret_val = self.totals_and_pps[1]
1271 except (AttributeError, ValueError, TypeError, LookupError):
1276 def requested_pps(self):
1278 ret_val = self.totals_and_pps[2]
1279 except (AttributeError, ValueError, TypeError, LookupError):
1288 for port_name, port_num in self.vnfd_helper.ports_iter():
1289 ports.append(port_num)
1290 port_names[port_num] = port_name
1293 timeout = time.time() + constants.RETRY_TIMEOUT
1295 ok, results = self.sut.multi_port_stats(ports)
1296 if time.time() > timeout:
1299 for result in results:
1300 port_num = result[0]
1302 samples[port_names[port_num]] = {
1303 "in_packets": result[1],
1304 "out_packets": result[2]}
1305 except (IndexError, KeyError):
1309 def __enter__(self):
1310 self.check_interface_count()
1313 def __exit__(self, exc_type, exc_val, exc_tb):
1316 def make_tuple(self):
1317 if self.result_tuple:
1320 self.result_tuple = ProxTestDataTuple(
1321 self.tolerated_loss,
1323 self.measured_stats['delta'].rx,
1324 self.measured_stats['delta'].tx,
1325 self.measured_stats['delta'].tsc,
1331 self.result_tuple.log_data()
1334 def measure_tot_stats(self):
1335 with self.sut.measure_tot_stats() as self.measured_stats:
1338 def check_interface_count(self):
1339 # do this assert in init? unless we expect interface count to
1340 # change from one run to another run...
1341 assert self.port_count in {1, 2, 4}, \
1342 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1344 def capture_tsc_hz(self):
1345 self.tsc_hz = float(self.sut.hz())
1347 def line_rate_to_pps(self):
1348 return self.port_count * self.line_speed / BITS_PER_BYTE / (self.pkt_size + 20)
1350 class ProxProfileHelper(object):
1352 __prox_profile_type__ = "Generic"
1354 PROX_CORE_GEN_MODE = "gen"
1355 PROX_CORE_LAT_MODE = "lat"
1358 def get_cls(cls, helper_type):
1359 """Return class of specified type."""
1361 return ProxProfileHelper
1363 for profile_helper_class in utils.itersubclasses(cls):
1364 if helper_type == profile_helper_class.__prox_profile_type__:
1365 return profile_helper_class
1367 return ProxProfileHelper
1370 def make_profile_helper(cls, resource_helper):
1371 return cls.get_cls(resource_helper.test_type)(resource_helper)
1373 def __init__(self, resource_helper):
1374 super(ProxProfileHelper, self).__init__()
1375 self.resource_helper = resource_helper
1376 self._cpu_topology = None
1377 self._test_cores = None
1378 self._latency_cores = None
1381 def cpu_topology(self):
1382 if not self._cpu_topology:
1383 stdout = io.BytesIO()
1384 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1385 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1386 return self._cpu_topology
1389 def test_cores(self):
1390 if not self._test_cores:
1391 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1392 return self._test_cores
1395 def latency_cores(self):
1396 if not self._latency_cores:
1397 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1398 return self._latency_cores
1401 def traffic_context(self, pkt_size, value):
1403 self.sut.reset_stats()
1405 self.sut.set_pkt_size(self.test_cores, pkt_size)
1406 self.sut.set_speed(self.test_cores, value)
1407 self.sut.start_all()
1413 def get_cores(self, mode):
1416 for section_name, section in self.setup_helper.prox_config_data:
1417 if not section_name.startswith("core"):
1420 for key, value in section:
1421 if key == "mode" and value == mode:
1422 core_tuple = CoreSocketTuple(section_name)
1423 core = core_tuple.core_id
1428 def pct_10gbps(self, percent, line_speed):
1429 """Get rate in percent of 10 Gbps.
1431 Returns the rate in percent of 10 Gbps.
1432 For instance 100.0 = 10 Gbps; 400.0 = 40 Gbps.
1434 This helper method isrequired when setting interface_speed option in
1435 the testcase because NSB/PROX considers 10Gbps as 100% of line rate,
1436 this means that the line rate must be expressed as a percentage of
1439 :param percent: (float) Percent of line rate (100.0 = line rate).
1440 :param line_speed: (int) line rate speed, in bits per second.
1442 :return: (float) Represents the rate in percent of 10Gbps.
1444 return (percent * line_speed / (
1445 constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT))
1447 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1448 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1449 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1450 value, tolerated_loss, line_speed)
1452 with data_helper, self.traffic_context(pkt_size,
1453 self.pct_10gbps(value, line_speed)):
1454 with data_helper.measure_tot_stats():
1455 time.sleep(duration)
1456 # Getting statistics to calculate PPS at right speed....
1457 data_helper.capture_tsc_hz()
1458 data_helper.latency = self.get_latency()
1460 return data_helper.result_tuple, data_helper.samples
1462 def get_latency(self):
1464 :return: return lat_min, lat_max, lat_avg
1468 if not self._latency_cores:
1469 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1471 if self._latency_cores:
1472 return self.sut.lat_stats(self._latency_cores)
1475 def terminate(self):
1478 def __getattr__(self, item):
1479 return getattr(self.resource_helper, item)
1482 class ProxMplsProfileHelper(ProxProfileHelper):
1484 __prox_profile_type__ = "MPLS tag/untag"
1486 def __init__(self, resource_helper):
1487 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1488 self._cores_tuple = None
1491 def mpls_cores(self):
1492 if not self._cores_tuple:
1493 self._cores_tuple = self.get_cores_mpls()
1494 return self._cores_tuple
1497 def tagged_cores(self):
1498 return self.mpls_cores[0]
1501 def plain_cores(self):
1502 return self.mpls_cores[1]
1504 def get_cores_mpls(self):
1507 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1508 if not section_name.startswith("core"):
1511 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1514 for item_key, item_value in section:
1515 if item_key != 'name':
1518 if item_value.startswith("tag"):
1519 core_tuple = CoreSocketTuple(section_name)
1520 core_tag = core_tuple.core_id
1521 cores_tagged.append(core_tag)
1523 elif item_value.startswith("udp"):
1524 core_tuple = CoreSocketTuple(section_name)
1525 core_udp = core_tuple.core_id
1526 cores_plain.append(core_udp)
1528 return cores_tagged, cores_plain
1531 def traffic_context(self, pkt_size, value):
1533 self.sut.reset_stats()
1535 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1536 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1537 self.sut.set_speed(self.tagged_cores, value)
1538 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1539 self.sut.set_speed(self.plain_cores, value * ratio)
1540 self.sut.start_all()
1547 class ProxBngProfileHelper(ProxProfileHelper):
1549 __prox_profile_type__ = "BNG gen"
1551 def __init__(self, resource_helper):
1552 super(ProxBngProfileHelper, self).__init__(resource_helper)
1553 self._cores_tuple = None
1556 def bng_cores(self):
1557 if not self._cores_tuple:
1558 self._cores_tuple = self.get_cores_gen_bng_qos()
1559 return self._cores_tuple
1562 def cpe_cores(self):
1563 return self.bng_cores[0]
1566 def inet_cores(self):
1567 return self.bng_cores[1]
1570 def arp_cores(self):
1571 return self.bng_cores[2]
1574 def arp_task_cores(self):
1575 return self.bng_cores[3]
1578 def all_rx_cores(self):
1579 return self.latency_cores
1581 def get_cores_gen_bng_qos(self):
1585 arp_tasks_core = [0]
1586 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1587 if not section_name.startswith("core"):
1590 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1593 for item_key, item_value in section:
1594 if item_key != 'name':
1597 if item_value.startswith("cpe"):
1598 core_tuple = CoreSocketTuple(section_name)
1599 cpe_core = core_tuple.core_id
1600 cpe_cores.append(cpe_core)
1602 elif item_value.startswith("inet"):
1603 core_tuple = CoreSocketTuple(section_name)
1604 inet_core = core_tuple.core_id
1605 inet_cores.append(inet_core)
1607 elif item_value.startswith("arp"):
1608 core_tuple = CoreSocketTuple(section_name)
1609 arp_core = core_tuple.core_id
1610 arp_cores.append(arp_core)
1612 # We check the tasks/core separately
1613 if item_value.startswith("arp_task"):
1614 core_tuple = CoreSocketTuple(section_name)
1615 arp_task_core = core_tuple.core_id
1616 arp_tasks_core.append(arp_task_core)
1618 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1621 def traffic_context(self, pkt_size, value):
1622 # Tester is sending packets at the required speed already after
1623 # setup_test(). Just get the current statistics, sleep the required
1624 # amount of time and calculate packet loss.
1625 inet_pkt_size = pkt_size
1626 cpe_pkt_size = pkt_size - 24
1627 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1629 curr_up_speed = curr_down_speed = 0
1630 max_up_speed = max_down_speed = value
1632 max_down_speed = value * ratio
1634 max_up_speed = value / ratio
1640 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1642 self.sut.start(self.all_rx_cores)
1644 self.sut.stop(self.all_rx_cores)
1646 self.sut.reset_stats()
1648 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1649 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1651 self.sut.reset_values(self.cpe_cores)
1652 self.sut.reset_values(self.inet_cores)
1654 # Set correct IP and UDP lengths in packet headers
1656 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1657 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1658 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1659 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1662 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1663 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1664 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1665 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1666 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1667 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1669 # Sending ARP to initialize tables - need a few seconds of generation
1670 # to make sure all CPEs are initialized
1671 LOG.info("Initializing SUT: sending ARP packets")
1672 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1673 self.sut.set_speed(self.inet_cores, curr_up_speed)
1674 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1675 self.sut.start(self.arp_cores)
1678 # Ramp up the transmission speed. First go to the common speed, then
1679 # increase steps for the faster one.
1680 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1682 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1684 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1685 # The min(..., ...) takes care of 1) floating point rounding errors
1686 # that could make curr_*_speed to be slightly greater than
1687 # max_*_speed and 2) max_*_speed not being an exact multiple of
1689 if curr_up_speed < max_up_speed:
1690 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1691 if curr_down_speed < max_down_speed:
1692 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1694 self.sut.set_speed(self.inet_cores, curr_up_speed)
1695 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1696 time.sleep(self.step_time)
1698 LOG.info("Target speeds reached. Starting real test.")
1702 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1703 LOG.info("Test ended. Flushing NIC buffers")
1704 self.sut.start(self.all_rx_cores)
1706 self.sut.stop(self.all_rx_cores)
1708 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1709 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1710 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1711 value, tolerated_loss, line_speed)
1713 with data_helper, self.traffic_context(pkt_size,
1714 self.pct_10gbps(value, line_speed)):
1715 with data_helper.measure_tot_stats():
1716 time.sleep(duration)
1717 # Getting statistics to calculate PPS at right speed....
1718 data_helper.capture_tsc_hz()
1719 data_helper.latency = self.get_latency()
1721 return data_helper.result_tuple, data_helper.samples
1724 class ProxVpeProfileHelper(ProxProfileHelper):
1726 __prox_profile_type__ = "vPE gen"
1728 def __init__(self, resource_helper):
1729 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1730 self._cores_tuple = None
1731 self._ports_tuple = None
1734 def vpe_cores(self):
1735 if not self._cores_tuple:
1736 self._cores_tuple = self.get_cores_gen_vpe()
1737 return self._cores_tuple
1740 def cpe_cores(self):
1741 return self.vpe_cores[0]
1744 def inet_cores(self):
1745 return self.vpe_cores[1]
1748 def all_rx_cores(self):
1749 return self.latency_cores
1752 def vpe_ports(self):
1753 if not self._ports_tuple:
1754 self._ports_tuple = self.get_ports_gen_vpe()
1755 return self._ports_tuple
1758 def cpe_ports(self):
1759 return self.vpe_ports[0]
1762 def inet_ports(self):
1763 return self.vpe_ports[1]
1765 def get_cores_gen_vpe(self):
1768 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1769 if not section_name.startswith("core"):
1772 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1775 for item_key, item_value in section:
1776 if item_key != 'name':
1779 if item_value.startswith("cpe"):
1780 core_tuple = CoreSocketTuple(section_name)
1781 core_tag = core_tuple.core_id
1782 cpe_cores.append(core_tag)
1784 elif item_value.startswith("inet"):
1785 core_tuple = CoreSocketTuple(section_name)
1786 inet_core = core_tuple.core_id
1787 inet_cores.append(inet_core)
1789 return cpe_cores, inet_cores
1791 def get_ports_gen_vpe(self):
1795 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1796 if not section_name.startswith("port"):
1798 tx_port_iter = re.finditer(r'\d+', section_name)
1799 tx_port_no = int(next(tx_port_iter).group(0))
1801 for item_key, item_value in section:
1802 if item_key != 'name':
1805 if item_value.startswith("cpe"):
1806 cpe_ports.append(tx_port_no)
1808 elif item_value.startswith("inet"):
1809 inet_ports.append(tx_port_no)
1811 return cpe_ports, inet_ports
1814 def traffic_context(self, pkt_size, value):
1815 # Calculate the target upload and download speed. The upload and
1816 # download packets have different packet sizes, so in order to get
1817 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1818 # of the packet sizes.
1819 cpe_pkt_size = pkt_size
1820 inet_pkt_size = pkt_size - 4
1821 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1823 curr_up_speed = curr_down_speed = 0
1824 max_up_speed = max_down_speed = value
1826 max_down_speed = value * ratio
1828 max_up_speed = value / ratio
1830 # Adjust speed when multiple cores per port are used to generate traffic
1831 if len(self.cpe_ports) != len(self.cpe_cores):
1832 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1833 if len(self.inet_ports) != len(self.inet_cores):
1834 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1840 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1842 self.sut.start(self.all_rx_cores)
1844 self.sut.stop(self.all_rx_cores)
1846 self.sut.reset_stats()
1848 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1849 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1851 self.sut.reset_values(self.cpe_cores)
1852 self.sut.reset_values(self.inet_cores)
1854 # Set correct IP and UDP lengths in packet headers
1855 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1856 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1857 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1858 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1860 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1861 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1862 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1863 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1865 self.sut.set_speed(self.inet_cores, curr_up_speed)
1866 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1868 # Ramp up the transmission speed. First go to the common speed, then
1869 # increase steps for the faster one.
1870 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1872 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1874 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1875 # The min(..., ...) takes care of 1) floating point rounding errors
1876 # that could make curr_*_speed to be slightly greater than
1877 # max_*_speed and 2) max_*_speed not being an exact multiple of
1879 if curr_up_speed < max_up_speed:
1880 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1881 if curr_down_speed < max_down_speed:
1882 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1884 self.sut.set_speed(self.inet_cores, curr_up_speed)
1885 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1886 time.sleep(self.step_time)
1888 LOG.info("Target speeds reached. Starting real test.")
1892 self.sut.stop(self.cpe_cores + self.inet_cores)
1893 LOG.info("Test ended. Flushing NIC buffers")
1894 self.sut.start(self.all_rx_cores)
1896 self.sut.stop(self.all_rx_cores)
1898 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1899 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1900 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1901 value, tolerated_loss, line_speed)
1903 with data_helper, self.traffic_context(pkt_size,
1904 self.pct_10gbps(value, line_speed)):
1905 with data_helper.measure_tot_stats():
1906 time.sleep(duration)
1907 # Getting statistics to calculate PPS at right speed....
1908 data_helper.capture_tsc_hz()
1909 data_helper.latency = self.get_latency()
1911 return data_helper.result_tuple, data_helper.samples
1914 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1916 __prox_profile_type__ = "lwAFTR gen"
1918 def __init__(self, resource_helper):
1919 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1920 self._cores_tuple = None
1921 self._ports_tuple = None
1923 self.step_time = 0.5
1926 def _lwaftr_cores(self):
1927 if not self._cores_tuple:
1928 self._cores_tuple = self._get_cores_gen_lwaftr()
1929 return self._cores_tuple
1932 def tun_cores(self):
1933 return self._lwaftr_cores[0]
1936 def inet_cores(self):
1937 return self._lwaftr_cores[1]
1940 def _lwaftr_ports(self):
1941 if not self._ports_tuple:
1942 self._ports_tuple = self._get_ports_gen_lw_aftr()
1943 return self._ports_tuple
1946 def tun_ports(self):
1947 return self._lwaftr_ports[0]
1950 def inet_ports(self):
1951 return self._lwaftr_ports[1]
1954 def all_rx_cores(self):
1955 return self.latency_cores
1957 def _get_cores_gen_lwaftr(self):
1960 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1961 if not section_name.startswith("core"):
1964 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1967 core_tuple = CoreSocketTuple(section_name)
1968 core_tag = core_tuple.core_id
1969 for item_value in (v for k, v in section if k == 'name'):
1970 if item_value.startswith('tun'):
1971 tun_cores.append(core_tag)
1972 elif item_value.startswith('inet'):
1973 inet_cores.append(core_tag)
1975 return tun_cores, inet_cores
1977 def _get_ports_gen_lw_aftr(self):
1981 re_port = re.compile(r'port (\d+)')
1982 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1983 match = re_port.search(section_name)
1987 tx_port_no = int(match.group(1))
1988 for item_value in (v for k, v in section if k == 'name'):
1989 if item_value.startswith('lwB4'):
1990 tun_ports.append(tx_port_no)
1991 elif item_value.startswith('inet'):
1992 inet_ports.append(tx_port_no)
1994 return tun_ports, inet_ports
1997 def _resize(len1, len2):
2000 return 1.0 * len1 / len2
2003 def traffic_context(self, pkt_size, value):
2004 # Tester is sending packets at the required speed already after
2005 # setup_test(). Just get the current statistics, sleep the required
2006 # amount of time and calculate packet loss.
2007 tun_pkt_size = pkt_size
2008 inet_pkt_size = pkt_size - 40
2009 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
2011 curr_up_speed = curr_down_speed = 0
2012 max_up_speed = max_down_speed = value
2014 max_up_speed = value / ratio
2016 # Adjust speed when multiple cores per port are used to generate traffic
2017 if len(self.tun_ports) != len(self.tun_cores):
2018 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
2019 if len(self.inet_ports) != len(self.inet_cores):
2020 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
2026 # Flush any packets in the NIC RX buffers, otherwise the stats will be
2028 self.sut.start(self.all_rx_cores)
2030 self.sut.stop(self.all_rx_cores)
2032 self.sut.reset_stats()
2034 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
2035 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
2037 self.sut.reset_values(self.tun_cores)
2038 self.sut.reset_values(self.inet_cores)
2040 # Set correct IP and UDP lengths in packet headers
2042 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
2043 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
2044 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
2045 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
2046 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
2047 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
2050 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
2051 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
2052 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
2053 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
2055 LOG.info("Initializing SUT: sending lwAFTR packets")
2056 self.sut.set_speed(self.inet_cores, curr_up_speed)
2057 self.sut.set_speed(self.tun_cores, curr_down_speed)
2060 # Ramp up the transmission speed. First go to the common speed, then
2061 # increase steps for the faster one.
2062 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
2064 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
2066 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
2067 # The min(..., ...) takes care of 1) floating point rounding errors
2068 # that could make curr_*_speed to be slightly greater than
2069 # max_*_speed and 2) max_*_speed not being an exact multiple of
2071 if curr_up_speed < max_up_speed:
2072 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
2073 if curr_down_speed < max_down_speed:
2074 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
2076 self.sut.set_speed(self.inet_cores, curr_up_speed)
2077 self.sut.set_speed(self.tun_cores, curr_down_speed)
2078 time.sleep(self.step_time)
2080 LOG.info("Target speeds reached. Starting real test.")
2084 self.sut.stop(self.tun_cores + self.inet_cores)
2085 LOG.info("Test ended. Flushing NIC buffers")
2086 self.sut.start(self.all_rx_cores)
2088 self.sut.stop(self.all_rx_cores)
2090 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
2091 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
2092 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
2093 value, tolerated_loss, line_speed)
2095 with data_helper, self.traffic_context(pkt_size,
2096 self.pct_10gbps(value, line_speed)):
2097 with data_helper.measure_tot_stats():
2098 time.sleep(duration)
2099 # Getting statistics to calculate PPS at right speed....
2100 data_helper.capture_tsc_hz()
2101 data_helper.latency = self.get_latency()
2103 return data_helper.result_tuple, data_helper.samples
2106 class ProxIrqProfileHelper(ProxProfileHelper):
2108 __prox_profile_type__ = "IRQ Query"
2110 def __init__(self, resource_helper):
2111 super(ProxIrqProfileHelper, self).__init__(resource_helper)
2112 self._cores_tuple = None
2113 self._ports_tuple = None
2115 self.step_time = 0.5