1 # Copyright (c) 2018-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39 from yardstick.network_services import constants
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48 LOG_RESULT = logging.getLogger('yardstick')
49 LOG_RESULT.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,'
134 return 1e2 * self.drop_total / float(self.tx_total)
135 except ZeroDivisionError:
140 # calculate the effective throughput in Mpps
141 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
145 # calculate the effective throughput in Mpps
146 return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
149 def can_be_lost(self):
150 return int(self.tx_total * self.tolerated / 1e2)
153 def drop_total(self):
154 return self.tx_total - self.rx_total
158 return self.drop_total <= self.can_be_lost
160 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
162 pkt_loss = self.pkt_loss
164 if port_samples is None:
174 "Throughput": self.rx_mpps,
175 "RxThroughput": self.rx_mpps,
176 "DropPackets": pkt_loss,
177 "CurrentDropPackets": pkt_loss,
178 "RequestedTxThroughput": self.requested_pps / 1e6,
179 "TxThroughput": self.tx_mpps,
183 samples.update(port_samples)
185 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
188 def log_data(self, logger=None):
192 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
193 logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
194 logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
195 self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
198 class PacketDump(object):
200 def assert_func(func, value1, value2, template=None):
201 assert func(value1, value2), template.format(value1, value2)
203 def __init__(self, port_id, data_len, payload):
204 template = "Packet dump has specified length {}, but payload is {} bytes long"
205 self.assert_func(operator.eq, data_len, len(payload), template)
206 self._port_id = port_id
207 self._data_len = data_len
208 self._payload = payload
212 """Get the port id of the packet dump"""
217 """Get the length of the data received"""
218 return self._data_len
221 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
223 def payload(self, start=None, end=None):
224 """Get part of the payload as a list of ordinals.
226 Returns a list of byte values, matching the contents of the packet dump.
227 Optional start and end parameters can be specified to retrieve only a
228 part of the packet contents.
230 The number of elements in the list is equal to end - start + 1, so end
231 is the offset of the last character.
234 start (pos. int): the starting offset in the payload. If it is not
235 specified or None, offset 0 is assumed.
236 end (pos. int): the ending offset of the payload. If it is not
237 specified or None, the contents until the end of the packet are
241 [int, int, ...]. Each int represents the ordinal value of a byte in
248 end = self.data_len - 1
250 # Bounds checking on offsets
251 template = "Start offset must be non-negative"
252 self.assert_func(operator.ge, start, 0, template)
254 template = "End offset must be less than {1}"
255 self.assert_func(operator.lt, end, self.data_len, template)
257 # Adjust for splice operation: end offset must be 1 more than the offset
258 # of the last desired character.
261 return self._payload[start:end]
264 class ProxSocketHelper(object):
266 def __init__(self, sock=None):
267 """ creates new prox instance """
268 super(ProxSocketHelper, self).__init__()
271 sock = socket.socket()
275 self.master_stats = None
277 def connect(self, ip, port):
278 """Connect to the prox instance on the remote system"""
279 self._sock.connect((ip, port))
281 def get_socket(self):
282 """ get the socket connected to the remote instance """
285 def _parse_socket_data(self, decoded_data, pkt_dump_only):
286 def get_newline_index():
287 return decoded_data.find('\n', index)
291 for newline_index in iter(get_newline_index, -1):
292 ret_str = decoded_data[index:newline_index]
295 mode, port_id, data_len = ret_str.split(',', 2)
297 mode, port_id, data_len = None, None, None
299 if mode != 'pktdump':
300 # Regular 1-line message. Stop reading from the socket.
301 LOG.debug("Regular response read")
304 LOG.debug("Packet dump header read: [%s]", ret_str)
306 # The line is a packet dump header. Parse it, read the
307 # packet payload, store the dump for later retrieval.
308 # Skip over the packet dump and continue processing: a
309 # 1-line response may follow the packet dump.
311 data_len = int(data_len)
312 data_start = newline_index + 1 # + 1 to skip over \n
313 data_end = data_start + data_len
314 sub_data = decoded_data[data_start:data_end]
315 pkt_payload = array.array('B', (ord(v) for v in sub_data))
316 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
317 self._pkt_dumps.append(pkt_dump)
320 # Return boolean instead of string to signal
321 # successful reception of the packet dump.
322 LOG.debug("Packet dump stored, returning")
327 return ret_str, False
329 def get_string(self, pkt_dump_only=False, timeout=0.01):
331 def is_ready_string():
332 # recv() is blocking, so avoid calling it when no data is waiting.
333 ready = select.select([self._sock], [], [], timeout)
334 return bool(ready[0])
338 while status is False:
339 for status in iter(is_ready_string, False):
340 decoded_data = self._sock.recv(256).decode('utf-8')
341 ret_str, done = self._parse_socket_data(decoded_data,
347 LOG.debug("Received data from socket: [%s]", ret_str)
348 return status, ret_str
350 def get_data(self, pkt_dump_only=False, timeout=10.0):
351 """ read data from the socket """
353 # This method behaves slightly differently depending on whether it is
354 # called to read the response to a command (pkt_dump_only = 0) or if
355 # it is called specifically to read a packet dump (pkt_dump_only = 1).
357 # Packet dumps look like:
358 # pktdump,<port_id>,<data_len>\n
359 # <packet contents as byte array>\n
360 # This means the total packet dump message consists of 2 lines instead
363 # - Response for a command (pkt_dump_only = 0):
364 # 1) Read response from the socket until \n (end of message)
365 # 2a) If the response is a packet dump header (starts with "pktdump,"):
366 # - Read the packet payload and store the packet dump for later
368 # - Reset the state and restart from 1). Eventually state 2b) will
369 # be reached and the function will return.
370 # 2b) If the response is not a packet dump:
371 # - Return the received message as a string
373 # - Explicit request to read a packet dump (pkt_dump_only = 1):
374 # - Read the dump header and payload
375 # - Store the packet dump for later retrieval
376 # - Return True to signify a packet dump was successfully read
379 # recv() is blocking, so avoid calling it when no data is waiting.
380 ready = select.select([self._sock], [], [], timeout)
381 return bool(ready[0])
385 for status in iter(is_ready, False):
386 decoded_data = self._sock.recv(256).decode('utf-8')
387 ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
391 LOG.debug("Received data from socket: [%s]", ret_str)
392 return ret_str if status else ''
394 def put_command(self, to_send):
395 """ send data to the remote instance """
396 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
398 # NOTE: sendall will block, we need a timeout
399 self._sock.sendall(to_send.encode('utf-8'))
400 except: # pylint: disable=bare-except
403 def get_packet_dump(self):
404 """ get the next packet dump """
406 return self._pkt_dumps.pop(0)
409 def stop_all_reset(self):
410 """ stop the remote instance and reset stats """
411 LOG.debug("Stop all and reset stats")
416 """ stop all cores on the remote instance """
417 LOG.debug("Stop all")
418 self.put_command("stop all\n")
420 def stop(self, cores, task=''):
421 """ stop specific cores on the remote instance """
425 if core not in tmpcores:
426 tmpcores.append(core)
428 LOG.debug("Stopping cores %s", tmpcores)
429 self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
432 """ start all cores on the remote instance """
433 LOG.debug("Start all")
434 self.put_command("start all\n")
436 def start(self, cores):
437 """ start specific cores on the remote instance """
441 if core not in tmpcores:
442 tmpcores.append(core)
444 LOG.debug("Starting cores %s", tmpcores)
445 self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
447 def reset_stats(self):
448 """ reset the statistics on the remote instance """
449 LOG.debug("Reset stats")
450 self.put_command("reset stats\n")
452 def _run_template_over_cores(self, template, cores, *args):
454 self.put_command(template.format(core, *args))
456 def set_pkt_size(self, cores, pkt_size):
457 """ set the packet size to generate on the remote instance """
458 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
460 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
462 def set_value(self, cores, offset, value, length):
463 """ set value on the remote instance """
464 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
465 LOG.debug(msg, cores, value, length, offset)
466 template = "set value {} 0 {} {} {}\n"
467 self._run_template_over_cores(template, cores, offset, value, length)
469 def reset_values(self, cores):
470 """ reset values on the remote instance """
471 LOG.debug("Set value for core(s) %s", cores)
472 self._run_template_over_cores("reset values {} 0\n", cores)
474 def set_speed(self, cores, speed, tasks=None):
475 """ set speed on the remote instance """
477 tasks = [0] * len(cores)
478 elif len(tasks) != len(cores):
479 LOG.error("set_speed: cores and tasks must have the same len")
480 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
481 for (core, task) in list(zip(cores, tasks)):
482 self.put_command("speed {} {} {}\n".format(core, task, speed))
484 def slope_speed(self, cores_speed, duration, n_steps=0):
485 """will start to increase speed from 0 to N where N is taken from
486 a['speed'] for each a in cores_speed"""
487 # by default, each step will take 0.5 sec
489 n_steps = duration * 2
491 private_core_data = []
492 step_duration = float(duration) / n_steps
493 for core_data in cores_speed:
494 target = float(core_data['speed'])
495 private_core_data.append({
496 'cores': core_data['cores'],
498 'delta': target / n_steps,
503 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
504 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
505 time.sleep(step_duration)
506 for core_data in private_core_data:
507 core_data['current'] = core_data[key1] + core_data[key2]
508 self.set_speed(core_data['cores'], core_data['current'])
510 def set_pps(self, cores, pps, pkt_size,
511 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
512 """ set packets per second for specific cores on the remote instance """
513 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
514 LOG.debug(msg, cores, pps, pkt_size)
516 # speed in percent of line-rate
517 speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
518 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
520 def lat_stats(self, cores, task=0):
521 """Get the latency statistics from the remote system"""
522 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
527 self.put_command("lat stats {} {} \n".format(core, task))
528 ret = self.get_data()
531 lat_min[core], lat_max[core], lat_avg[core] = \
532 tuple(int(n) for n in ret.split(",")[:3])
534 except (AttributeError, ValueError, TypeError):
537 return lat_min, lat_max, lat_avg
539 def get_all_tot_stats(self):
540 self.put_command("tot stats\n")
541 all_stats_str = self.get_data().split(",")
542 if len(all_stats_str) != 4:
545 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
546 self.master_stats = all_stats
550 return self.get_all_tot_stats()[3]
552 def core_stats(self, cores, task=0):
553 """Get the receive statistics from the remote system"""
554 rx = tx = drop = tsc = 0
556 self.put_command("core stats {} {}\n".format(core, task))
557 ret = self.get_data().split(",")
562 return rx, tx, drop, tsc
564 def irq_core_stats(self, cores_tasks):
565 """ get IRQ stats per core"""
570 for core, task in cores_tasks:
571 self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0),"
572 "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2),"
573 "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4),"
574 "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6),"
575 "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8),"
576 "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10),"
577 "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)"
578 "\n".format(core, task, core, task, core, task, core, task,
579 core, task, core, task, core, task, core, task,
580 core, task, core, task, core, task, core, task,
581 core, task, core, task))
582 in_data_str = self.get_data().split(",")
583 ret = [try_int(s, 0) for s in in_data_str]
584 key = "core_" + str(core)
586 stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1],
587 "bucket_1" : ret[2], "bucket_2" : ret[3],
588 "bucket_3" : ret[4], "bucket_4" : ret[5],
589 "bucket_5" : ret[6], "bucket_6" : ret[7],
590 "bucket_7" : ret[8], "bucket_8" : ret[9],
591 "bucket_9" : ret[10], "bucket_10" : ret[11],
592 "bucket_11" : ret[12], "bucket_12" : ret[13],
593 "overflow": ret[10] + ret[11] + ret[12] + ret[13]}
594 except (KeyError, IndexError):
595 LOG.error("Corrupted PACKET %s", in_data_str)
599 def multi_port_stats(self, ports):
600 """get counter values from all ports at once"""
602 ports_str = ",".join(map(str, ports))
604 tot_result = [0] * len(ports)
607 while (len(ports) is not len(ports_all_data)):
608 self.put_command("multi port stats {}\n".format(ports_str))
609 status, ports_all_data_str = self.get_string()
614 ports_all_data = ports_all_data_str.split(";")
616 if len(ports) is len(ports_all_data):
617 for port_data_str in ports_all_data:
621 tmpdata = [try_int(s, 0) for s in port_data_str.split(",")]
622 except (IndexError, TypeError):
623 LOG.error("Unpacking data error %s", port_data_str)
626 if (len(tmpdata) < 6) or tmpdata[0] not in ports:
627 LOG.error("Corrupted PACKET %s - retrying", port_data_str)
630 tot_result[port_index] = tmpdata
631 port_index = port_index + 1
633 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
636 LOG.debug("Multi port packet ..OK.. %s", tot_result)
637 return True, tot_result
640 def multi_port_stats_tuple(stats, ports):
642 Create a statistics tuple from port stats.
644 Returns a dict with contains the port stats indexed by port name
646 :param stats: (List) - List of List of port stats in pps
647 :param ports (Iterator) - to List of Ports
649 :return: (Dict) of port stats indexed by port_name
655 port_names = {port_num: port_name for port_name, port_num in ports}
656 except (TypeError, IndexError, KeyError):
657 LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR")
663 samples[port_names[port_num]] = {
664 "in_packets": stat[1],
665 "out_packets": stat[2]}
666 except (TypeError, IndexError, KeyError):
667 LOG.error("Ports data and samples data is incompatable ....")
673 def multi_port_stats_diff(prev_stats, new_stats, hz):
675 Create a statistics tuple from difference between prev port stats
676 and current port stats. And store results in pps.
678 :param prev_stats: (List) - Previous List of port statistics
679 :param new_stats: (List) - Current List of port statistics
680 :param hz (float) - speed of system in Hz
682 :return: sample (List) - Difference of prev_port_stats and
683 new_port_stats in pps
692 if len(prev_stats) is not len(new_stats):
693 for port_index, stat in enumerate(new_stats):
694 stats.append([port_index, float(0), float(0), 0, 0, 0])
698 for port_index, stat in enumerate(new_stats):
699 if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]:
700 rx_total = stat[RX_TOTAL_INDEX] - \
701 prev_stats[port_index][RX_TOTAL_INDEX]
703 rx_total = stat[RX_TOTAL_INDEX]
705 if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]:
706 tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX]
708 tx_total = stat[TX_TOTAL_INDEX]
710 if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]:
711 tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX]
713 tsc = stat[TSC_INDEX]
716 rx_total = tx_total = float(0)
719 LOG.error("HZ is ZERO ..")
720 rx_total = tx_total = float(0)
722 rx_total = float(rx_total * hz / tsc)
723 tx_total = float(tx_total * hz / tsc)
725 stats.append([port_index, rx_total, tx_total, 0, 0, tsc])
726 except (TypeError, IndexError, KeyError):
728 LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded")
732 def port_stats(self, ports):
733 """get counter values from a specific port"""
734 tot_result = [0] * 12
736 self.put_command("port_stats {}\n".format(port))
737 ret = [try_int(s, 0) for s in self.get_data().split(",")]
738 tot_result = [sum(x) for x in zip(tot_result, ret)]
742 def measure_tot_stats(self):
743 start = self.get_all_tot_stats()
744 container = {'start_tot': start}
748 container['end_tot'] = end = self.get_all_tot_stats()
750 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
753 """Get the total statistics from the remote system"""
754 stats = self.get_all_tot_stats()
757 def tot_ierrors(self):
758 """Get the total ierrors from the remote system"""
759 self.put_command("tot ierrors tot\n")
760 recv = self.get_data().split(',')
761 tot_ierrors = int(recv[0])
763 return tot_ierrors, tsc
765 def set_count(self, count, cores):
766 """Set the number of packets to send on the specified core"""
767 self._run_template_over_cores("count {} 0 {}\n", cores, count)
769 def dump_rx(self, core_id, task_id=0, count=1):
770 """Activate dump on rx on the specified core"""
771 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
772 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
773 time.sleep(1.5) # Give PROX time to set up packet dumping
781 """ stop all cores on the remote instance """
782 LOG.debug("Quit prox")
783 self.put_command("quit\n")
786 def force_quit(self):
787 """ stop all cores on the remote instance """
788 LOG.debug("Force Quit prox")
789 self.put_command("quit_force\n")
792 _LOCAL_OBJECT = object()
795 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
796 # the actual app is lowercase
798 # not used for Prox but added for consistency
801 LUA_PARAMETER_NAME = ""
802 LUA_PARAMETER_PEER = {
807 CONFIG_QUEUE_TIMEOUT = 120
809 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
810 self.remote_path = None
811 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
812 self.remote_prox_file_name = None
813 self._prox_config_data = None
814 self.additional_files = {}
815 self.config_queue = Queue()
816 # allow_exit_without_flush
817 self.config_queue.cancel_join_thread()
818 self._global_section = None
821 def prox_config_data(self):
822 if self._prox_config_data is None:
823 # this will block, but it needs too
824 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
825 return self._prox_config_data
828 def global_section(self):
829 if self._global_section is None and self.prox_config_data:
830 self._global_section = self.find_section("global")
831 return self._global_section
833 def find_section(self, name, default=_LOCAL_OBJECT):
834 result = next((value for key, value in self.prox_config_data if key == name), default)
835 if result is _LOCAL_OBJECT:
836 raise KeyError('{} not found in Prox config'.format(name))
839 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
840 section = self.find_section(section_name, [])
841 result = next((value for key, value in section if key == section_key), default)
842 if result is _LOCAL_OBJECT:
843 template = '{} not found in {} section of Prox config'
844 raise KeyError(template.format(section_key, section_name))
847 def copy_to_target(self, config_file_path, prox_file):
848 remote_path = os.path.join("/tmp", prox_file)
849 self.ssh_helper.put(config_file_path, remote_path)
853 def _get_tx_port(section, sections):
855 for item in sections[section]:
856 if item[0] == "tx port":
857 iface_port = re.findall(r'\d+', item[1])
858 # do we want the last one?
859 # if yes, then can we reverse?
860 return int(iface_port[0])
863 def _replace_quoted_with_value(quoted, value, count=1):
864 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
867 def _insert_additional_file(self, value):
868 file_str = value.split('"')
869 base_name = os.path.basename(file_str[1])
870 file_str[1] = self.additional_files[base_name]
871 return '"'.join(file_str)
873 def _make_core_list(self, inputStr):
875 my_input = inputStr.split("core ", 1)[1]
878 substrs = [x.strip() for x in my_input.split(',')]
885 substr = [int(k.strip()) for k in i.split('-')]
888 endstr = substr[len(substr) - 1]
889 for z in range(startstr, endstr + 1):
892 LOG.error("Error in cores list ... resuming ")
897 def generate_prox_config_file(self, config_path):
899 prox_config = ConfigParser(config_path, sections)
902 # Ensure MAC is set "hardware"
903 all_ports = self.vnfd_helper.port_pairs.all_ports
904 # use dpdk port number
905 for port_name in all_ports:
906 port_num = self.vnfd_helper.port_num(port_name)
907 port_section_name = "port {}".format(port_num)
908 for section_name, section in sections:
909 if port_section_name != section_name:
912 for section_data in section:
913 if section_data[0] == "mac":
914 section_data[1] = "hardware"
916 # adjust for range of cores
918 for section_name, section in sections:
919 if section_name.startswith('core') and section_name.find('$') == -1:
920 core_list = self._make_core_list(section_name)
921 for core in core_list:
922 new_sections.append(["core " + str(core), section])
924 new_sections.append([section_name, section])
926 sections = new_sections
929 for _, section in sections:
930 for section_data in section:
931 item_key, item_val = section_data
932 if item_val.startswith("@@dst_mac"):
933 tx_port_iter = re.finditer(r'\d+', item_val)
934 tx_port_no = int(next(tx_port_iter).group(0))
935 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
936 mac = intf["virtual-interface"]["dst_mac"]
937 section_data[1] = mac.replace(":", " ", 6)
939 if item_key == "dst mac" and item_val.startswith("@@"):
940 tx_port_iter = re.finditer(r'\d+', item_val)
941 tx_port_no = int(next(tx_port_iter).group(0))
942 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
943 mac = intf["virtual-interface"]["dst_mac"]
944 section_data[1] = mac
946 if item_val.startswith("@@src_mac"):
947 tx_port_iter = re.finditer(r'\d+', item_val)
948 tx_port_no = int(next(tx_port_iter).group(0))
949 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
950 mac = intf["virtual-interface"]["local_mac"]
951 section_data[1] = mac.replace(":", " ", 6)
953 if item_key == "src mac" and item_val.startswith("@@"):
954 tx_port_iter = re.finditer(r'\d+', item_val)
955 tx_port_no = int(next(tx_port_iter).group(0))
956 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
957 mac = intf["virtual-interface"]["local_mac"]
958 section_data[1] = mac
960 # if addition file specified in prox config
961 if not self.additional_files:
964 for section_name, section in sections:
965 for section_data in section:
967 if section_data[0].startswith("dofile"):
968 section_data[0] = self._insert_additional_file(section_data[0])
970 if section_data[1].startswith("dofile"):
971 section_data[1] = self._insert_additional_file(section_data[1])
972 except: # pylint: disable=bare-except
978 def write_prox_lua(lua_config):
980 Write an .ini-format config file for PROX (parameters.lua)
981 PROX does not allow a space before/after the =, so we need
985 for key in lua_config:
986 value = '"' + lua_config[key] + '"'
987 if key == "__name__":
989 if value is not None and value != '@':
990 key = "=".join((key, str(value).replace('\n', '\n\t')))
993 key = str(key).replace('\n', '\n\t')
995 return os.linesep.join(out)
998 def write_prox_config(prox_config):
1000 Write an .ini-format config file for PROX
1001 PROX does not allow a space before/after the =, so we need
1005 for (section_name, section) in prox_config:
1006 out.append("[{}]".format(section_name))
1007 for item in section:
1009 if key == "__name__":
1011 if value is not None and value != '@':
1012 key = "=".join((key, str(value).replace('\n', '\n\t')))
1015 key = str(key).replace('\n', '\n\t')
1017 return os.linesep.join(out)
1019 def put_string_to_file(self, s, remote_path):
1020 file_obj = cStringIO(s)
1021 self.ssh_helper.put_file_obj(file_obj, remote_path)
1024 def generate_prox_lua_file(self):
1026 all_ports = self.vnfd_helper.port_pairs.all_ports
1027 for port_name in all_ports:
1028 port_num = self.vnfd_helper.port_num(port_name)
1029 intf = self.vnfd_helper.find_interface(name=port_name)
1030 vintf = intf['virtual-interface']
1031 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
1032 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
1036 def upload_prox_lua(self, config_file, lua_data):
1037 # prox can't handle spaces around ' = ' so use custom method
1038 out = StringIO(self.write_prox_lua(lua_data))
1040 remote_path = os.path.join("/tmp", config_file)
1041 self.ssh_helper.put_file_obj(out, remote_path)
1045 def upload_prox_config(self, config_file, prox_config_data):
1046 # prox can't handle spaces around ' = ' so use custom method
1047 out = StringIO(self.write_prox_config(prox_config_data))
1049 remote_path = os.path.join("/tmp", config_file)
1050 self.ssh_helper.put_file_obj(out, remote_path)
1054 def build_config_file(self):
1055 task_path = self.scenario_helper.task_path
1056 options = self.scenario_helper.options
1057 config_path = options['prox_config']
1058 config_file = os.path.basename(config_path)
1059 config_path = utils.find_relative_file(config_path, task_path)
1060 self.additional_files = {}
1063 if options['prox_generate_parameter']:
1065 self.lua = self.generate_prox_lua_file()
1066 if len(self.lua) > 0:
1067 self.upload_prox_lua("parameters.lua", self.lua)
1068 except: # pylint: disable=bare-except
1071 prox_files = options.get('prox_files', [])
1072 if isinstance(prox_files, six.string_types):
1073 prox_files = [prox_files]
1074 for key_prox_file in prox_files:
1075 base_prox_file = os.path.basename(key_prox_file)
1076 key_prox_path = utils.find_relative_file(key_prox_file, task_path)
1077 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
1078 self.additional_files[base_prox_file] = remote_prox_file
1080 self._prox_config_data = self.generate_prox_config_file(config_path)
1081 # copy config to queue so we can read it from traffic_runner process
1082 self.config_queue.put(self._prox_config_data)
1083 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
1085 def build_config(self):
1086 self.build_config_file()
1088 options = self.scenario_helper.options
1089 prox_args = options['prox_args']
1090 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
1092 self.pipeline_kwargs = {
1093 'tool_path': tool_path,
1094 'tool_dir': os.path.dirname(tool_path),
1095 'cfg_file': self.remote_path,
1096 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
1097 for k, v in prox_args.items())
1100 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
1101 "{args} -f {cfg_file} '")
1102 return cmd_template.format(**self.pipeline_kwargs)
1105 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
1106 class ProxResourceHelper(ClientResourceHelper):
1108 RESOURCE_WORD = 'prox'
1115 def find_pci(pci, bound_pci):
1116 # we have to substring match PCI bus address from the end
1117 return any(b.endswith(pci) for b in bound_pci)
1119 def __init__(self, setup_helper):
1120 super(ProxResourceHelper, self).__init__(setup_helper)
1121 self.mgmt_interface = self.vnfd_helper.mgmt_interface
1122 self._user = self.mgmt_interface["user"]
1123 self._ip = self.mgmt_interface["ip"]
1126 self._vpci_to_if_name_map = None
1127 self.additional_file = {}
1128 self.remote_prox_file_name = None
1132 self.step_time = 0.5
1133 self._test_type = None
1134 self.prev_multi_port = []
1140 self.client = self._connect()
1144 def test_type(self):
1145 if self._test_type is None:
1146 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
1147 return self._test_type
1149 def run_traffic(self, traffic_profile):
1150 self._queue.cancel_join_thread()
1154 traffic_profile.init(self._queue)
1155 # this frees up the run_traffic loop
1156 self.client_started.value = 1
1158 while not self._terminated.value:
1159 # move it all to traffic_profile
1160 self._run_traffic_once(traffic_profile)
1162 def _run_traffic_once(self, traffic_profile):
1163 traffic_profile.execute_traffic(self)
1164 if traffic_profile.done.is_set():
1165 self._queue.put({'done': True})
1166 LOG.debug("tg_prox done")
1167 self._terminated.value = 1
1169 # For VNF use ResourceHelper method to collect KPIs directly.
1170 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
1171 def collect_collectd_kpi(self):
1172 return self._collect_resource_kpi()
1174 def collect_live_stats(self):
1176 for _, port_num in self.vnfd_helper.ports_iter():
1177 ports.append(port_num)
1179 ok, curr_port_stats = self.sut.multi_port_stats(ports)
1189 new_all_port_stats = \
1190 self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz)
1192 self.prev_multi_port = curr_port_stats
1194 live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats,
1195 self.vnfd_helper.ports_iter())
1196 return True, live_stats
1198 def collect_kpi(self):
1199 result = super(ProxResourceHelper, self).collect_kpi()
1200 # add in collectd kpis manually
1202 result['collect_stats'] = self._collect_resource_kpi()
1204 ok, live_stats = self.collect_live_stats()
1206 result.update({'live_stats': live_stats})
1210 def terminate(self):
1211 # should not be called, use VNF terminate
1212 raise NotImplementedError()
1215 return self.sut # force connection
1217 def execute(self, cmd, *args, **kwargs):
1218 func = getattr(self.sut, cmd, None)
1220 return func(*args, **kwargs)
1223 def _connect(self, client=None):
1224 """Run and connect to prox on the remote system """
1225 # De-allocating a large amount of hugepages takes some time. If a new
1226 # PROX instance is started immediately after killing the previous one,
1227 # it might not be able to allocate hugepages, because they are still
1228 # being freed. Hence the -w switch.
1229 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1230 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1231 # -f ./handle_none-4.cfg"
1232 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1234 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1235 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1237 # + "./build/Prox " + prox_args
1238 # log.debug("Starting PROX with command [%s]", prox_cmd)
1239 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1240 # self._ip, prox_cmd))
1242 client = ProxSocketHelper()
1244 # try connecting to Prox for 60s
1245 for _ in range(RETRY_SECONDS):
1246 time.sleep(RETRY_INTERVAL)
1248 client.connect(self._ip, PROX_PORT)
1249 except (socket.gaierror, socket.error):
1254 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1255 raise Exception(msg.format(self._ip, PROX_PORT))
1258 class ProxDataHelper(object):
1260 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1261 super(ProxDataHelper, self).__init__()
1262 self.vnfd_helper = vnfd_helper
1264 self.pkt_size = pkt_size
1266 self.line_speed = line_speed
1267 self.tolerated_loss = tolerated_loss
1268 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1270 self.measured_stats = None
1272 self._totals_and_pps = None
1273 self.result_tuple = None
1276 def totals_and_pps(self):
1277 if self._totals_and_pps is None:
1278 rx_total = tx_total = 0
1280 timeout = time.time() + constants.RETRY_TIMEOUT
1282 ok, all_ports = self.sut.multi_port_stats([
1283 self.vnfd_helper.port_num(port_name)
1284 for port_name in self.vnfd_helper.port_pairs.all_ports])
1285 if time.time() > timeout:
1288 for port in all_ports:
1289 rx_total = rx_total + port[1]
1290 tx_total = tx_total + port[2]
1291 requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1292 self._totals_and_pps = rx_total, tx_total, requested_pps
1293 return self._totals_and_pps
1298 ret_val = self.totals_and_pps[0]
1299 except (AttributeError, ValueError, TypeError, LookupError):
1306 ret_val = self.totals_and_pps[1]
1307 except (AttributeError, ValueError, TypeError, LookupError):
1312 def requested_pps(self):
1314 ret_val = self.totals_and_pps[2]
1315 except (AttributeError, ValueError, TypeError, LookupError):
1324 for port_name, port_num in self.vnfd_helper.ports_iter():
1325 ports.append(port_num)
1326 port_names[port_num] = port_name
1329 timeout = time.time() + constants.RETRY_TIMEOUT
1331 ok, results = self.sut.multi_port_stats(ports)
1332 if time.time() > timeout:
1335 for result in results:
1336 port_num = result[0]
1338 samples[port_names[port_num]] = {
1339 "in_packets": result[1],
1340 "out_packets": result[2]}
1341 except (IndexError, KeyError):
1345 def __enter__(self):
1346 self.check_interface_count()
1349 def __exit__(self, exc_type, exc_val, exc_tb):
1352 def make_tuple(self):
1353 if self.result_tuple:
1356 self.result_tuple = ProxTestDataTuple(
1357 self.tolerated_loss,
1359 self.measured_stats['delta'].rx,
1360 self.measured_stats['delta'].tx,
1361 self.measured_stats['delta'].tsc,
1367 self.result_tuple.log_data()
1370 def measure_tot_stats(self):
1371 with self.sut.measure_tot_stats() as self.measured_stats:
1374 def check_interface_count(self):
1375 # do this assert in init? unless we expect interface count to
1376 # change from one run to another run...
1377 assert self.port_count in {1, 2, 4}, \
1378 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1380 def capture_tsc_hz(self):
1381 self.tsc_hz = float(self.sut.hz())
1383 def line_rate_to_pps(self):
1384 return self.port_count * self.line_speed / BITS_PER_BYTE / (self.pkt_size + 20)
1386 class ProxProfileHelper(object):
1388 __prox_profile_type__ = "Generic"
1390 PROX_CORE_GEN_MODE = "gen"
1391 PROX_CORE_LAT_MODE = "lat"
1394 def get_cls(cls, helper_type):
1395 """Return class of specified type."""
1397 return ProxProfileHelper
1399 for profile_helper_class in utils.itersubclasses(cls):
1400 if helper_type == profile_helper_class.__prox_profile_type__:
1401 return profile_helper_class
1403 return ProxProfileHelper
1406 def make_profile_helper(cls, resource_helper):
1407 return cls.get_cls(resource_helper.test_type)(resource_helper)
1409 def __init__(self, resource_helper):
1410 super(ProxProfileHelper, self).__init__()
1411 self.resource_helper = resource_helper
1412 self._cpu_topology = None
1413 self._test_cores = None
1414 self._latency_cores = None
1417 def cpu_topology(self):
1418 if not self._cpu_topology:
1419 stdout = io.BytesIO()
1420 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1421 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1422 return self._cpu_topology
1425 def test_cores(self):
1426 if not self._test_cores:
1427 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1428 return self._test_cores
1431 def latency_cores(self):
1432 if not self._latency_cores:
1433 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1434 return self._latency_cores
1437 def traffic_context(self, pkt_size, value):
1439 self.sut.reset_stats()
1441 self.sut.set_pkt_size(self.test_cores, pkt_size)
1442 self.sut.set_speed(self.test_cores, value)
1443 self.sut.start_all()
1449 def get_cores(self, mode):
1452 for section_name, section in self.setup_helper.prox_config_data:
1453 if not section_name.startswith("core"):
1456 for key, value in section:
1457 if key == "mode" and value == mode:
1458 core_tuple = CoreSocketTuple(section_name)
1459 core = core_tuple.core_id
1464 def pct_10gbps(self, percent, line_speed):
1465 """Get rate in percent of 10 Gbps.
1467 Returns the rate in percent of 10 Gbps.
1468 For instance 100.0 = 10 Gbps; 400.0 = 40 Gbps.
1470 This helper method isrequired when setting interface_speed option in
1471 the testcase because NSB/PROX considers 10Gbps as 100% of line rate,
1472 this means that the line rate must be expressed as a percentage of
1475 :param percent: (float) Percent of line rate (100.0 = line rate).
1476 :param line_speed: (int) line rate speed, in bits per second.
1478 :return: (float) Represents the rate in percent of 10Gbps.
1480 return (percent * line_speed / (
1481 constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT))
1483 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1484 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1485 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1486 value, tolerated_loss, line_speed)
1488 with data_helper, self.traffic_context(pkt_size,
1489 self.pct_10gbps(value, line_speed)):
1490 with data_helper.measure_tot_stats():
1491 time.sleep(duration)
1492 # Getting statistics to calculate PPS at right speed....
1493 data_helper.capture_tsc_hz()
1494 data_helper.latency = self.get_latency()
1496 return data_helper.result_tuple, data_helper.samples
1498 def get_latency(self):
1500 :return: return lat_min, lat_max, lat_avg
1504 if not self._latency_cores:
1505 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1507 if self._latency_cores:
1508 return self.sut.lat_stats(self._latency_cores)
1511 def terminate(self):
1514 def __getattr__(self, item):
1515 return getattr(self.resource_helper, item)
1518 class ProxMplsProfileHelper(ProxProfileHelper):
1520 __prox_profile_type__ = "MPLS tag/untag"
1522 def __init__(self, resource_helper):
1523 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1524 self._cores_tuple = None
1527 def mpls_cores(self):
1528 if not self._cores_tuple:
1529 self._cores_tuple = self.get_cores_mpls()
1530 return self._cores_tuple
1533 def tagged_cores(self):
1534 return self.mpls_cores[0]
1537 def plain_cores(self):
1538 return self.mpls_cores[1]
1540 def get_cores_mpls(self):
1543 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1544 if not section_name.startswith("core"):
1547 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1550 for item_key, item_value in section:
1551 if item_key != 'name':
1554 if item_value.startswith("tag"):
1555 core_tuple = CoreSocketTuple(section_name)
1556 core_tag = core_tuple.core_id
1557 cores_tagged.append(core_tag)
1559 elif item_value.startswith("udp"):
1560 core_tuple = CoreSocketTuple(section_name)
1561 core_udp = core_tuple.core_id
1562 cores_plain.append(core_udp)
1564 return cores_tagged, cores_plain
1567 def traffic_context(self, pkt_size, value):
1569 self.sut.reset_stats()
1571 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1572 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1573 self.sut.set_speed(self.tagged_cores, value)
1574 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1575 self.sut.set_speed(self.plain_cores, value * ratio)
1576 self.sut.start_all()
1583 class ProxBngProfileHelper(ProxProfileHelper):
1585 __prox_profile_type__ = "BNG gen"
1587 def __init__(self, resource_helper):
1588 super(ProxBngProfileHelper, self).__init__(resource_helper)
1589 self._cores_tuple = None
1592 def bng_cores(self):
1593 if not self._cores_tuple:
1594 self._cores_tuple = self.get_cores_gen_bng_qos()
1595 return self._cores_tuple
1598 def cpe_cores(self):
1599 return self.bng_cores[0]
1602 def inet_cores(self):
1603 return self.bng_cores[1]
1606 def arp_cores(self):
1607 return self.bng_cores[2]
1610 def arp_task_cores(self):
1611 return self.bng_cores[3]
1614 def all_rx_cores(self):
1615 return self.latency_cores
1617 def get_cores_gen_bng_qos(self):
1621 arp_tasks_core = [0]
1622 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1623 if not section_name.startswith("core"):
1626 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1629 for item_key, item_value in section:
1630 if item_key != 'name':
1633 if item_value.startswith("cpe"):
1634 core_tuple = CoreSocketTuple(section_name)
1635 cpe_core = core_tuple.core_id
1636 cpe_cores.append(cpe_core)
1638 elif item_value.startswith("inet"):
1639 core_tuple = CoreSocketTuple(section_name)
1640 inet_core = core_tuple.core_id
1641 inet_cores.append(inet_core)
1643 elif item_value.startswith("arp"):
1644 core_tuple = CoreSocketTuple(section_name)
1645 arp_core = core_tuple.core_id
1646 arp_cores.append(arp_core)
1648 # We check the tasks/core separately
1649 if item_value.startswith("arp_task"):
1650 core_tuple = CoreSocketTuple(section_name)
1651 arp_task_core = core_tuple.core_id
1652 arp_tasks_core.append(arp_task_core)
1654 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1657 def traffic_context(self, pkt_size, value):
1658 # Tester is sending packets at the required speed already after
1659 # setup_test(). Just get the current statistics, sleep the required
1660 # amount of time and calculate packet loss.
1661 inet_pkt_size = pkt_size
1662 cpe_pkt_size = pkt_size - 24
1663 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1665 curr_up_speed = curr_down_speed = 0
1666 max_up_speed = max_down_speed = value
1668 max_down_speed = value * ratio
1670 max_up_speed = value / ratio
1676 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1678 self.sut.start(self.all_rx_cores)
1680 self.sut.stop(self.all_rx_cores)
1682 self.sut.reset_stats()
1684 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1685 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1687 self.sut.reset_values(self.cpe_cores)
1688 self.sut.reset_values(self.inet_cores)
1690 # Set correct IP and UDP lengths in packet headers
1692 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1693 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1694 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1695 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1698 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1699 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1700 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1701 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1702 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1703 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1705 # Sending ARP to initialize tables - need a few seconds of generation
1706 # to make sure all CPEs are initialized
1707 LOG.info("Initializing SUT: sending ARP packets")
1708 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1709 self.sut.set_speed(self.inet_cores, curr_up_speed)
1710 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1711 self.sut.start(self.arp_cores)
1714 # Ramp up the transmission speed. First go to the common speed, then
1715 # increase steps for the faster one.
1716 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1718 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1720 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1721 # The min(..., ...) takes care of 1) floating point rounding errors
1722 # that could make curr_*_speed to be slightly greater than
1723 # max_*_speed and 2) max_*_speed not being an exact multiple of
1725 if curr_up_speed < max_up_speed:
1726 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1727 if curr_down_speed < max_down_speed:
1728 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1730 self.sut.set_speed(self.inet_cores, curr_up_speed)
1731 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1732 time.sleep(self.step_time)
1734 LOG.info("Target speeds reached. Starting real test.")
1738 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1739 LOG.info("Test ended. Flushing NIC buffers")
1740 self.sut.start(self.all_rx_cores)
1742 self.sut.stop(self.all_rx_cores)
1744 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1745 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1746 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1747 value, tolerated_loss, line_speed)
1749 with data_helper, self.traffic_context(pkt_size,
1750 self.pct_10gbps(value, line_speed)):
1751 with data_helper.measure_tot_stats():
1752 time.sleep(duration)
1753 # Getting statistics to calculate PPS at right speed....
1754 data_helper.capture_tsc_hz()
1755 data_helper.latency = self.get_latency()
1757 return data_helper.result_tuple, data_helper.samples
1760 class ProxVpeProfileHelper(ProxProfileHelper):
1762 __prox_profile_type__ = "vPE gen"
1764 def __init__(self, resource_helper):
1765 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1766 self._cores_tuple = None
1767 self._ports_tuple = None
1770 def vpe_cores(self):
1771 if not self._cores_tuple:
1772 self._cores_tuple = self.get_cores_gen_vpe()
1773 return self._cores_tuple
1776 def cpe_cores(self):
1777 return self.vpe_cores[0]
1780 def inet_cores(self):
1781 return self.vpe_cores[1]
1784 def all_rx_cores(self):
1785 return self.latency_cores
1788 def vpe_ports(self):
1789 if not self._ports_tuple:
1790 self._ports_tuple = self.get_ports_gen_vpe()
1791 return self._ports_tuple
1794 def cpe_ports(self):
1795 return self.vpe_ports[0]
1798 def inet_ports(self):
1799 return self.vpe_ports[1]
1801 def get_cores_gen_vpe(self):
1804 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1805 if not section_name.startswith("core"):
1808 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1811 for item_key, item_value in section:
1812 if item_key != 'name':
1815 if item_value.startswith("cpe"):
1816 core_tuple = CoreSocketTuple(section_name)
1817 core_tag = core_tuple.core_id
1818 cpe_cores.append(core_tag)
1820 elif item_value.startswith("inet"):
1821 core_tuple = CoreSocketTuple(section_name)
1822 inet_core = core_tuple.core_id
1823 inet_cores.append(inet_core)
1825 return cpe_cores, inet_cores
1827 def get_ports_gen_vpe(self):
1831 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1832 if not section_name.startswith("port"):
1834 tx_port_iter = re.finditer(r'\d+', section_name)
1835 tx_port_no = int(next(tx_port_iter).group(0))
1837 for item_key, item_value in section:
1838 if item_key != 'name':
1841 if item_value.startswith("cpe"):
1842 cpe_ports.append(tx_port_no)
1844 elif item_value.startswith("inet"):
1845 inet_ports.append(tx_port_no)
1847 return cpe_ports, inet_ports
1850 def traffic_context(self, pkt_size, value):
1851 # Calculate the target upload and download speed. The upload and
1852 # download packets have different packet sizes, so in order to get
1853 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1854 # of the packet sizes.
1855 cpe_pkt_size = pkt_size
1856 inet_pkt_size = pkt_size - 4
1857 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1859 curr_up_speed = curr_down_speed = 0
1860 max_up_speed = max_down_speed = value
1862 max_down_speed = value * ratio
1864 max_up_speed = value / ratio
1866 # Adjust speed when multiple cores per port are used to generate traffic
1867 if len(self.cpe_ports) != len(self.cpe_cores):
1868 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1869 if len(self.inet_ports) != len(self.inet_cores):
1870 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1876 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1878 self.sut.start(self.all_rx_cores)
1880 self.sut.stop(self.all_rx_cores)
1882 self.sut.reset_stats()
1884 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1885 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1887 self.sut.reset_values(self.cpe_cores)
1888 self.sut.reset_values(self.inet_cores)
1890 # Set correct IP and UDP lengths in packet headers
1891 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1892 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1893 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1894 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1896 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1897 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1898 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1899 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1901 self.sut.set_speed(self.inet_cores, curr_up_speed)
1902 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1904 # Ramp up the transmission speed. First go to the common speed, then
1905 # increase steps for the faster one.
1906 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1908 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1910 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1911 # The min(..., ...) takes care of 1) floating point rounding errors
1912 # that could make curr_*_speed to be slightly greater than
1913 # max_*_speed and 2) max_*_speed not being an exact multiple of
1915 if curr_up_speed < max_up_speed:
1916 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1917 if curr_down_speed < max_down_speed:
1918 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1920 self.sut.set_speed(self.inet_cores, curr_up_speed)
1921 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1922 time.sleep(self.step_time)
1924 LOG.info("Target speeds reached. Starting real test.")
1928 self.sut.stop(self.cpe_cores + self.inet_cores)
1929 LOG.info("Test ended. Flushing NIC buffers")
1930 self.sut.start(self.all_rx_cores)
1932 self.sut.stop(self.all_rx_cores)
1934 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1935 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1936 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1937 value, tolerated_loss, line_speed)
1939 with data_helper, self.traffic_context(pkt_size,
1940 self.pct_10gbps(value, line_speed)):
1941 with data_helper.measure_tot_stats():
1942 time.sleep(duration)
1943 # Getting statistics to calculate PPS at right speed....
1944 data_helper.capture_tsc_hz()
1945 data_helper.latency = self.get_latency()
1947 return data_helper.result_tuple, data_helper.samples
1950 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1952 __prox_profile_type__ = "lwAFTR gen"
1954 def __init__(self, resource_helper):
1955 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1956 self._cores_tuple = None
1957 self._ports_tuple = None
1959 self.step_time = 0.5
1962 def _lwaftr_cores(self):
1963 if not self._cores_tuple:
1964 self._cores_tuple = self._get_cores_gen_lwaftr()
1965 return self._cores_tuple
1968 def tun_cores(self):
1969 return self._lwaftr_cores[0]
1972 def inet_cores(self):
1973 return self._lwaftr_cores[1]
1976 def _lwaftr_ports(self):
1977 if not self._ports_tuple:
1978 self._ports_tuple = self._get_ports_gen_lw_aftr()
1979 return self._ports_tuple
1982 def tun_ports(self):
1983 return self._lwaftr_ports[0]
1986 def inet_ports(self):
1987 return self._lwaftr_ports[1]
1990 def all_rx_cores(self):
1991 return self.latency_cores
1993 def _get_cores_gen_lwaftr(self):
1996 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1997 if not section_name.startswith("core"):
2000 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
2003 core_tuple = CoreSocketTuple(section_name)
2004 core_tag = core_tuple.core_id
2005 for item_value in (v for k, v in section if k == 'name'):
2006 if item_value.startswith('tun'):
2007 tun_cores.append(core_tag)
2008 elif item_value.startswith('inet'):
2009 inet_cores.append(core_tag)
2011 return tun_cores, inet_cores
2013 def _get_ports_gen_lw_aftr(self):
2017 re_port = re.compile(r'port (\d+)')
2018 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
2019 match = re_port.search(section_name)
2023 tx_port_no = int(match.group(1))
2024 for item_value in (v for k, v in section if k == 'name'):
2025 if item_value.startswith('lwB4'):
2026 tun_ports.append(tx_port_no)
2027 elif item_value.startswith('inet'):
2028 inet_ports.append(tx_port_no)
2030 return tun_ports, inet_ports
2033 def _resize(len1, len2):
2036 return 1.0 * len1 / len2
2039 def traffic_context(self, pkt_size, value):
2040 # Tester is sending packets at the required speed already after
2041 # setup_test(). Just get the current statistics, sleep the required
2042 # amount of time and calculate packet loss.
2043 tun_pkt_size = pkt_size
2044 inet_pkt_size = pkt_size - 40
2045 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
2047 curr_up_speed = curr_down_speed = 0
2048 max_up_speed = max_down_speed = value
2050 max_up_speed = value / ratio
2052 # Adjust speed when multiple cores per port are used to generate traffic
2053 if len(self.tun_ports) != len(self.tun_cores):
2054 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
2055 if len(self.inet_ports) != len(self.inet_cores):
2056 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
2062 # Flush any packets in the NIC RX buffers, otherwise the stats will be
2064 self.sut.start(self.all_rx_cores)
2066 self.sut.stop(self.all_rx_cores)
2068 self.sut.reset_stats()
2070 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
2071 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
2073 self.sut.reset_values(self.tun_cores)
2074 self.sut.reset_values(self.inet_cores)
2076 # Set correct IP and UDP lengths in packet headers
2078 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
2079 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
2080 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
2081 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
2082 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
2083 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
2086 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
2087 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
2088 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
2089 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
2091 LOG.info("Initializing SUT: sending lwAFTR packets")
2092 self.sut.set_speed(self.inet_cores, curr_up_speed)
2093 self.sut.set_speed(self.tun_cores, curr_down_speed)
2096 # Ramp up the transmission speed. First go to the common speed, then
2097 # increase steps for the faster one.
2098 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
2100 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
2102 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
2103 # The min(..., ...) takes care of 1) floating point rounding errors
2104 # that could make curr_*_speed to be slightly greater than
2105 # max_*_speed and 2) max_*_speed not being an exact multiple of
2107 if curr_up_speed < max_up_speed:
2108 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
2109 if curr_down_speed < max_down_speed:
2110 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
2112 self.sut.set_speed(self.inet_cores, curr_up_speed)
2113 self.sut.set_speed(self.tun_cores, curr_down_speed)
2114 time.sleep(self.step_time)
2116 LOG.info("Target speeds reached. Starting real test.")
2120 self.sut.stop(self.tun_cores + self.inet_cores)
2121 LOG.info("Test ended. Flushing NIC buffers")
2122 self.sut.start(self.all_rx_cores)
2124 self.sut.stop(self.all_rx_cores)
2126 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
2127 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
2128 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
2129 value, tolerated_loss, line_speed)
2131 with data_helper, self.traffic_context(pkt_size,
2132 self.pct_10gbps(value, line_speed)):
2133 with data_helper.measure_tot_stats():
2134 time.sleep(duration)
2135 # Getting statistics to calculate PPS at right speed....
2136 data_helper.capture_tsc_hz()
2137 data_helper.latency = self.get_latency()
2139 return data_helper.result_tuple, data_helper.samples
2142 class ProxIrqProfileHelper(ProxProfileHelper):
2144 __prox_profile_type__ = "IRQ Query"
2146 def __init__(self, resource_helper):
2147 super(ProxIrqProfileHelper, self).__init__(resource_helper)
2148 self._cores_tuple = None
2149 self._ports_tuple = None
2151 self.step_time = 0.5