1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
33 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
53 CONFIGURATION_OPTIONS = (
54 # dict key section key default value
55 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
56 ('testDuration', 'general', 'test_duration', 5.0),
57 ('testPrecision', 'general', 'test_precision', 1.0),
58 ('tests', 'general', 'tests', None),
59 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61 ('logFile', 'logging', 'file', 'dats.log'),
62 ('logDateFormat', 'logging', 'datefmt', None),
63 ('logLevel', 'logging', 'level', 'INFO'),
64 ('logOverwrite', 'logging', 'overwrite', 1),
66 ('testerIp', 'tester', 'ip', None),
67 ('testerUser', 'tester', 'user', 'root'),
68 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
69 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
70 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
71 ('testerSocketId', 'tester', 'socket_id', 0),
73 ('sutIp', 'sut', 'ip', None),
74 ('sutUser', 'sut', 'user', 'root'),
75 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
76 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
77 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
78 ('sutSocketId', 'sut', 'socket_id', 0),
82 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
83 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85 def __new__(cls, *args):
87 matches = cls.CORE_RE.search(str(args[0]))
89 args = matches.groups()
91 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
92 'h' if args[2] else '')
94 except (AttributeError, TypeError, IndexError, ValueError):
95 raise ValueError('Invalid core spec {}'.format(args))
97 def is_hyperthread(self):
98 return self.hyperthread == 'h'
102 return int(self.is_hyperthread())
104 def find_in_topology(self, cpu_topology):
106 socket_core_match = cpu_topology[self.socket_id][self.core_id]
107 sorted_match = sorted(socket_core_match.values())
108 return sorted_match[self.index][0]
109 except (KeyError, IndexError):
110 template = "Core {}{} on socket {} does not exist"
111 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
115 def __new__(cls, *args):
117 assert args[0] is not str(args[0])
118 args = tuple(args[0])
119 except (AssertionError, IndexError, TypeError):
122 return super(TotStatsTuple, cls).__new__(cls, *args)
125 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
126 'delta_tx,delta_tsc,'
127 'latency,rx_total,tx_total,pps')):
131 return 1e2 * self.drop_total / float(self.tx_total)
132 except ZeroDivisionError:
137 # calculate the effective throughput in Mpps
138 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141 def can_be_lost(self):
142 return int(self.tx_total * self.tolerated / 1e2)
145 def drop_total(self):
146 return self.tx_total - self.rx_total
150 return self.drop_total <= self.can_be_lost
152 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
154 pkt_loss = self.pkt_loss
156 if port_samples is None:
166 "Throughput": self.mpps,
167 "DropPackets": pkt_loss,
168 "CurrentDropPackets": pkt_loss,
169 "TxThroughput": self.pps / 1e6,
170 "RxThroughput": self.mpps,
174 samples.update(port_samples)
176 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
179 def log_data(self, logger=None):
183 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
184 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
185 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
188 class PacketDump(object):
190 def assert_func(func, value1, value2, template=None):
191 assert func(value1, value2), template.format(value1, value2)
193 def __init__(self, port_id, data_len, payload):
194 template = "Packet dump has specified length {}, but payload is {} bytes long"
195 self.assert_func(operator.eq, data_len, len(payload), template)
196 self._port_id = port_id
197 self._data_len = data_len
198 self._payload = payload
202 """Get the port id of the packet dump"""
207 """Get the length of the data received"""
208 return self._data_len
211 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
213 def payload(self, start=None, end=None):
214 """Get part of the payload as a list of ordinals.
216 Returns a list of byte values, matching the contents of the packet dump.
217 Optional start and end parameters can be specified to retrieve only a
218 part of the packet contents.
220 The number of elements in the list is equal to end - start + 1, so end
221 is the offset of the last character.
224 start (pos. int): the starting offset in the payload. If it is not
225 specified or None, offset 0 is assumed.
226 end (pos. int): the ending offset of the payload. If it is not
227 specified or None, the contents until the end of the packet are
231 [int, int, ...]. Each int represents the ordinal value of a byte in
238 end = self.data_len - 1
240 # Bounds checking on offsets
241 template = "Start offset must be non-negative"
242 self.assert_func(operator.ge, start, 0, template)
244 template = "End offset must be less than {1}"
245 self.assert_func(operator.lt, end, self.data_len, template)
247 # Adjust for splice operation: end offset must be 1 more than the offset
248 # of the last desired character.
251 return self._payload[start:end]
254 class ProxSocketHelper(object):
256 def __init__(self, sock=None):
257 """ creates new prox instance """
258 super(ProxSocketHelper, self).__init__()
261 sock = socket.socket()
265 self.master_stats = None
267 def connect(self, ip, port):
268 """Connect to the prox instance on the remote system"""
269 self._sock.connect((ip, port))
271 def get_socket(self):
272 """ get the socket connected to the remote instance """
275 def _parse_socket_data(self, decoded_data, pkt_dump_only):
276 def get_newline_index():
277 return decoded_data.find('\n', index)
281 for newline_index in iter(get_newline_index, -1):
282 ret_str = decoded_data[index:newline_index]
285 mode, port_id, data_len = ret_str.split(',', 2)
287 mode, port_id, data_len = None, None, None
289 if mode != 'pktdump':
290 # Regular 1-line message. Stop reading from the socket.
291 LOG.debug("Regular response read")
294 LOG.debug("Packet dump header read: [%s]", ret_str)
296 # The line is a packet dump header. Parse it, read the
297 # packet payload, store the dump for later retrieval.
298 # Skip over the packet dump and continue processing: a
299 # 1-line response may follow the packet dump.
301 data_len = int(data_len)
302 data_start = newline_index + 1 # + 1 to skip over \n
303 data_end = data_start + data_len
304 sub_data = decoded_data[data_start:data_end]
305 pkt_payload = array.array('B', (ord(v) for v in sub_data))
306 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
307 self._pkt_dumps.append(pkt_dump)
310 # Return boolean instead of string to signal
311 # successful reception of the packet dump.
312 LOG.debug("Packet dump stored, returning")
319 def get_data(self, pkt_dump_only=False, timeout=1):
320 """ read data from the socket """
322 # This method behaves slightly differently depending on whether it is
323 # called to read the response to a command (pkt_dump_only = 0) or if
324 # it is called specifically to read a packet dump (pkt_dump_only = 1).
326 # Packet dumps look like:
327 # pktdump,<port_id>,<data_len>\n
328 # <packet contents as byte array>\n
329 # This means the total packet dump message consists of 2 lines instead
332 # - Response for a command (pkt_dump_only = 0):
333 # 1) Read response from the socket until \n (end of message)
334 # 2a) If the response is a packet dump header (starts with "pktdump,"):
335 # - Read the packet payload and store the packet dump for later
337 # - Reset the state and restart from 1). Eventually state 2b) will
338 # be reached and the function will return.
339 # 2b) If the response is not a packet dump:
340 # - Return the received message as a string
342 # - Explicit request to read a packet dump (pkt_dump_only = 1):
343 # - Read the dump header and payload
344 # - Store the packet dump for later retrieval
345 # - Return True to signify a packet dump was successfully read
348 # recv() is blocking, so avoid calling it when no data is waiting.
349 ready = select.select([self._sock], [], [], timeout)
350 return bool(ready[0])
354 for status in iter(is_ready, False):
355 decoded_data = self._sock.recv(256).decode('utf-8')
356 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
358 LOG.debug("Received data from socket: [%s]", ret_str)
359 return ret_str if status else ''
361 def put_command(self, to_send):
362 """ send data to the remote instance """
363 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
365 # NOTE: sendall will block, we need a timeout
366 self._sock.sendall(to_send.encode('utf-8'))
367 except: # pylint: disable=bare-except
370 def get_packet_dump(self):
371 """ get the next packet dump """
373 return self._pkt_dumps.pop(0)
376 def stop_all_reset(self):
377 """ stop the remote instance and reset stats """
378 LOG.debug("Stop all and reset stats")
383 """ stop all cores on the remote instance """
384 LOG.debug("Stop all")
385 self.put_command("stop all\n")
388 def stop(self, cores, task=''):
389 """ stop specific cores on the remote instance """
390 LOG.debug("Stopping cores %s", cores)
391 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
395 """ start all cores on the remote instance """
396 LOG.debug("Start all")
397 self.put_command("start all\n")
399 def start(self, cores):
400 """ start specific cores on the remote instance """
401 LOG.debug("Starting cores %s", cores)
402 self.put_command("start {}\n".format(join_non_strings(',', cores)))
405 def reset_stats(self):
406 """ reset the statistics on the remote instance """
407 LOG.debug("Reset stats")
408 self.put_command("reset stats\n")
411 def _run_template_over_cores(self, template, cores, *args):
413 self.put_command(template.format(core, *args))
415 def set_pkt_size(self, cores, pkt_size):
416 """ set the packet size to generate on the remote instance """
417 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
419 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422 def set_value(self, cores, offset, value, length):
423 """ set value on the remote instance """
424 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
425 LOG.debug(msg, cores, value, length, offset)
426 template = "set value {} 0 {} {} {}\n"
427 self._run_template_over_cores(template, cores, offset, value, length)
429 def reset_values(self, cores):
430 """ reset values on the remote instance """
431 LOG.debug("Set value for core(s) %s", cores)
432 self._run_template_over_cores("reset values {} 0\n", cores)
434 def set_speed(self, cores, speed, tasks=None):
435 """ set speed on the remote instance """
437 tasks = [0] * len(cores)
438 elif len(tasks) != len(cores):
439 LOG.error("set_speed: cores and tasks must have the same len")
440 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
441 for (core, task) in list(zip(cores, tasks)):
442 self.put_command("speed {} {} {}\n".format(core, task, speed))
444 def slope_speed(self, cores_speed, duration, n_steps=0):
445 """will start to increase speed from 0 to N where N is taken from
446 a['speed'] for each a in cores_speed"""
447 # by default, each step will take 0.5 sec
449 n_steps = duration * 2
451 private_core_data = []
452 step_duration = float(duration) / n_steps
453 for core_data in cores_speed:
454 target = float(core_data['speed'])
455 private_core_data.append({
456 'cores': core_data['cores'],
458 'delta': target / n_steps,
463 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
464 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
465 time.sleep(step_duration)
466 for core_data in private_core_data:
467 core_data['current'] = core_data[key1] + core_data[key2]
468 self.set_speed(core_data['cores'], core_data['current'])
470 def set_pps(self, cores, pps, pkt_size):
471 """ set packets per second for specific cores on the remote instance """
472 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473 LOG.debug(msg, cores, pps, pkt_size)
475 # speed in percent of line-rate
476 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
477 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
479 def lat_stats(self, cores, task=0):
480 """Get the latency statistics from the remote system"""
481 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
486 self.put_command("lat stats {} {} \n".format(core, task))
487 ret = self.get_data()
490 lat_min[core], lat_max[core], lat_avg[core] = \
491 tuple(int(n) for n in ret.split(",")[:3])
493 except (AttributeError, ValueError, TypeError):
496 return lat_min, lat_max, lat_avg
498 def get_all_tot_stats(self):
499 self.put_command("tot stats\n")
500 all_stats_str = self.get_data().split(",")
501 if len(all_stats_str) != 4:
504 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505 self.master_stats = all_stats
509 return self.get_all_tot_stats()[3]
511 def core_stats(self, cores, task=0):
512 """Get the receive statistics from the remote system"""
513 rx = tx = drop = tsc = 0
515 self.put_command("core stats {} {}\n".format(core, task))
516 ret = self.get_data().split(",")
521 return rx, tx, drop, tsc
523 def port_stats(self, ports):
524 """get counter values from a specific port"""
525 tot_result = [0] * 12
527 self.put_command("port_stats {}\n".format(port))
528 ret = [try_int(s, 0) for s in self.get_data().split(",")]
529 tot_result = [sum(x) for x in zip(tot_result, ret)]
533 def measure_tot_stats(self):
534 start = self.get_all_tot_stats()
535 container = {'start_tot': start}
539 container['end_tot'] = end = self.get_all_tot_stats()
541 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
544 """Get the total statistics from the remote system"""
545 stats = self.get_all_tot_stats()
548 def tot_ierrors(self):
549 """Get the total ierrors from the remote system"""
550 self.put_command("tot ierrors tot\n")
551 recv = self.get_data().split(',')
552 tot_ierrors = int(recv[0])
554 return tot_ierrors, tsc
556 def set_count(self, count, cores):
557 """Set the number of packets to send on the specified core"""
558 self._run_template_over_cores("count {} 0 {}\n", cores, count)
560 def dump_rx(self, core_id, task_id=0, count=1):
561 """Activate dump on rx on the specified core"""
562 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564 time.sleep(1.5) # Give PROX time to set up packet dumping
572 """ stop all cores on the remote instance """
573 LOG.debug("Quit prox")
574 self.put_command("quit\n")
577 def force_quit(self):
578 """ stop all cores on the remote instance """
579 LOG.debug("Force Quit prox")
580 self.put_command("quit_force\n")
584 _LOCAL_OBJECT = object()
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588 # the actual app is lowercase
590 # not used for Prox but added for consistency
593 LUA_PARAMETER_NAME = ""
594 LUA_PARAMETER_PEER = {
599 CONFIG_QUEUE_TIMEOUT = 120
601 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602 self.remote_path = None
603 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604 self.remote_prox_file_name = None
605 self._prox_config_data = None
606 self.additional_files = {}
607 self.config_queue = Queue()
608 # allow_exit_without_flush
609 self.config_queue.cancel_join_thread()
610 self._global_section = None
613 def prox_config_data(self):
614 if self._prox_config_data is None:
615 # this will block, but it needs too
616 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617 return self._prox_config_data
620 def global_section(self):
621 if self._global_section is None and self.prox_config_data:
622 self._global_section = self.find_section("global")
623 return self._global_section
625 def find_section(self, name, default=_LOCAL_OBJECT):
626 result = next((value for key, value in self.prox_config_data if key == name), default)
627 if result is _LOCAL_OBJECT:
628 raise KeyError('{} not found in Prox config'.format(name))
631 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632 section = self.find_section(section_name, [])
633 result = next((value for key, value in section if key == section_key), default)
634 if result is _LOCAL_OBJECT:
635 template = '{} not found in {} section of Prox config'
636 raise KeyError(template.format(section_key, section_name))
639 def copy_to_target(self, config_file_path, prox_file):
640 remote_path = os.path.join("/tmp", prox_file)
641 self.ssh_helper.put(config_file_path, remote_path)
645 def _get_tx_port(section, sections):
647 for item in sections[section]:
648 if item[0] == "tx port":
649 iface_port = re.findall(r'\d+', item[1])
650 # do we want the last one?
651 # if yes, then can we reverse?
652 return int(iface_port[0])
655 def _replace_quoted_with_value(quoted, value, count=1):
656 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
659 def _insert_additional_file(self, value):
660 file_str = value.split('"')
661 base_name = os.path.basename(file_str[1])
662 file_str[1] = self.additional_files[base_name]
663 return '"'.join(file_str)
665 def generate_prox_config_file(self, config_path):
667 prox_config = ConfigParser(config_path, sections)
670 # Ensure MAC is set "hardware"
671 all_ports = self.vnfd_helper.port_pairs.all_ports
672 # use dpdk port number
673 for port_name in all_ports:
674 port_num = self.vnfd_helper.port_num(port_name)
675 port_section_name = "port {}".format(port_num)
676 for section_name, section in sections:
677 if port_section_name != section_name:
680 for section_data in section:
681 if section_data[0] == "mac":
682 section_data[1] = "hardware"
685 for _, section in sections:
686 for section_data in section:
687 item_key, item_val = section_data
688 if item_val.startswith("@@dst_mac"):
689 tx_port_iter = re.finditer(r'\d+', item_val)
690 tx_port_no = int(next(tx_port_iter).group(0))
691 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692 mac = intf["virtual-interface"]["dst_mac"]
693 section_data[1] = mac.replace(":", " ", 6)
695 if item_key == "dst mac" and item_val.startswith("@@"):
696 tx_port_iter = re.finditer(r'\d+', item_val)
697 tx_port_no = int(next(tx_port_iter).group(0))
698 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699 mac = intf["virtual-interface"]["dst_mac"]
700 section_data[1] = mac
702 # if addition file specified in prox config
703 if not self.additional_files:
706 for section_name, section in sections:
707 for section_data in section:
709 if section_data[0].startswith("dofile"):
710 section_data[0] = self._insert_additional_file(section_data[0])
712 if section_data[1].startswith("dofile"):
713 section_data[1] = self._insert_additional_file(section_data[1])
714 except: # pylint: disable=bare-except
720 def write_prox_lua(lua_config):
722 Write an .ini-format config file for PROX (parameters.lua)
723 PROX does not allow a space before/after the =, so we need
727 for key in lua_config:
728 value = '"' + lua_config[key] + '"'
729 if key == "__name__":
731 if value is not None and value != '@':
732 key = "=".join((key, str(value).replace('\n', '\n\t')))
735 key = str(key).replace('\n', '\n\t')
737 return os.linesep.join(out)
740 def write_prox_config(prox_config):
742 Write an .ini-format config file for PROX
743 PROX does not allow a space before/after the =, so we need
747 for (section_name, section) in prox_config:
748 out.append("[{}]".format(section_name))
751 if key == "__name__":
753 if value is not None and value != '@':
754 key = "=".join((key, str(value).replace('\n', '\n\t')))
757 key = str(key).replace('\n', '\n\t')
759 return os.linesep.join(out)
761 def put_string_to_file(self, s, remote_path):
762 file_obj = cStringIO(s)
763 self.ssh_helper.put_file_obj(file_obj, remote_path)
766 def generate_prox_lua_file(self):
768 all_ports = self.vnfd_helper.port_pairs.all_ports
769 for port_name in all_ports:
770 port_num = self.vnfd_helper.port_num(port_name)
771 intf = self.vnfd_helper.find_interface(name=port_name)
772 vintf = intf['virtual-interface']
773 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
774 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
778 def upload_prox_lua(self, config_file, lua_data):
779 # prox can't handle spaces around ' = ' so use custom method
780 out = StringIO(self.write_prox_lua(lua_data))
782 remote_path = os.path.join("/tmp", config_file)
783 self.ssh_helper.put_file_obj(out, remote_path)
787 def upload_prox_config(self, config_file, prox_config_data):
788 # prox can't handle spaces around ' = ' so use custom method
789 out = StringIO(self.write_prox_config(prox_config_data))
791 remote_path = os.path.join("/tmp", config_file)
792 self.ssh_helper.put_file_obj(out, remote_path)
796 def build_config_file(self):
797 task_path = self.scenario_helper.task_path
798 options = self.scenario_helper.options
799 config_path = options['prox_config']
800 config_file = os.path.basename(config_path)
801 config_path = find_relative_file(config_path, task_path)
802 self.additional_files = {}
805 if options['prox_generate_parameter']:
807 self.lua = self.generate_prox_lua_file()
808 if len(self.lua) > 0:
809 self.upload_prox_lua("parameters.lua", self.lua)
810 except: # pylint: disable=bare-except
813 prox_files = options.get('prox_files', [])
814 if isinstance(prox_files, six.string_types):
815 prox_files = [prox_files]
816 for key_prox_file in prox_files:
817 base_prox_file = os.path.basename(key_prox_file)
818 key_prox_path = find_relative_file(key_prox_file, task_path)
819 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
820 self.additional_files[base_prox_file] = remote_prox_file
822 self._prox_config_data = self.generate_prox_config_file(config_path)
823 # copy config to queue so we can read it from traffic_runner process
824 self.config_queue.put(self._prox_config_data)
825 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
827 def build_config(self):
828 self.build_config_file()
830 options = self.scenario_helper.options
831 prox_args = options['prox_args']
832 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
834 self.pipeline_kwargs = {
835 'tool_path': tool_path,
836 'tool_dir': os.path.dirname(tool_path),
837 'cfg_file': self.remote_path,
838 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
839 for k, v in prox_args.items())
842 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
843 "{args} -f {cfg_file} '")
844 return cmd_template.format(**self.pipeline_kwargs)
847 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
848 class ProxResourceHelper(ClientResourceHelper):
850 RESOURCE_WORD = 'prox'
857 def find_pci(pci, bound_pci):
858 # we have to substring match PCI bus address from the end
859 return any(b.endswith(pci) for b in bound_pci)
861 def __init__(self, setup_helper):
862 super(ProxResourceHelper, self).__init__(setup_helper)
863 self.mgmt_interface = self.vnfd_helper.mgmt_interface
864 self._user = self.mgmt_interface["user"]
865 self._ip = self.mgmt_interface["ip"]
868 self._vpci_to_if_name_map = None
869 self.additional_file = {}
870 self.remote_prox_file_name = None
875 self._test_type = None
880 self.client = self._connect()
885 if self._test_type is None:
886 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
887 return self._test_type
889 def run_traffic(self, traffic_profile):
890 self._queue.cancel_join_thread()
894 traffic_profile.init(self._queue)
895 # this frees up the run_traffic loop
896 self.client_started.value = 1
898 while not self._terminated.value:
899 # move it all to traffic_profile
900 self._run_traffic_once(traffic_profile)
902 def _run_traffic_once(self, traffic_profile):
903 traffic_profile.execute_traffic(self)
904 if traffic_profile.done:
905 self._queue.put({'done': True})
906 LOG.debug("tg_prox done")
907 self._terminated.value = 1
909 # For VNF use ResourceHelper method to collect KPIs directly.
910 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
911 def collect_collectd_kpi(self):
912 return self._collect_resource_kpi()
914 def collect_kpi(self):
915 result = super(ProxResourceHelper, self).collect_kpi()
916 # add in collectd kpis manually
918 result['collect_stats'] = self._collect_resource_kpi()
922 # should not be called, use VNF terminate
923 raise NotImplementedError()
926 return self.sut # force connection
928 def execute(self, cmd, *args, **kwargs):
929 func = getattr(self.sut, cmd, None)
931 return func(*args, **kwargs)
933 def _connect(self, client=None):
934 """Run and connect to prox on the remote system """
935 # De-allocating a large amount of hugepages takes some time. If a new
936 # PROX instance is started immediately after killing the previous one,
937 # it might not be able to allocate hugepages, because they are still
938 # being freed. Hence the -w switch.
939 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
940 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
941 # -f ./handle_none-4.cfg"
942 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
944 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
945 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
947 # + "./build/Prox " + prox_args
948 # log.debug("Starting PROX with command [%s]", prox_cmd)
949 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
950 # self._ip, prox_cmd))
952 client = ProxSocketHelper()
954 # try connecting to Prox for 60s
955 for _ in range(RETRY_SECONDS):
956 time.sleep(RETRY_INTERVAL)
958 client.connect(self._ip, PROX_PORT)
959 except (socket.gaierror, socket.error):
964 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
965 raise Exception(msg.format(self._ip, PROX_PORT))
968 class ProxDataHelper(object):
970 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
971 super(ProxDataHelper, self).__init__()
972 self.vnfd_helper = vnfd_helper
974 self.pkt_size = pkt_size
976 self.tolerated_loss = tolerated_loss
977 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
979 self.measured_stats = None
981 self._totals_and_pps = None
982 self.result_tuple = None
985 def totals_and_pps(self):
986 if self._totals_and_pps is None:
987 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
988 pps = self.value / 100.0 * self.line_rate_to_pps()
989 self._totals_and_pps = rx_total, tx_total, pps
990 return self._totals_and_pps
994 return self.totals_and_pps[0]
998 return self.totals_and_pps[1]
1002 return self.totals_and_pps[2]
1007 for port_name, port_num in self.vnfd_helper.ports_iter():
1008 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1009 samples[port_name] = {
1010 "in_packets": port_rx_total,
1011 "out_packets": port_tx_total,
1015 def __enter__(self):
1016 self.check_interface_count()
1019 def __exit__(self, exc_type, exc_val, exc_tb):
1022 def make_tuple(self):
1023 if self.result_tuple:
1026 self.result_tuple = ProxTestDataTuple(
1027 self.tolerated_loss,
1029 self.measured_stats['delta'].rx,
1030 self.measured_stats['delta'].tx,
1031 self.measured_stats['delta'].tsc,
1037 self.result_tuple.log_data()
1040 def measure_tot_stats(self):
1041 with self.sut.measure_tot_stats() as self.measured_stats:
1044 def check_interface_count(self):
1045 # do this assert in init? unless we expect interface count to
1046 # change from one run to another run...
1047 assert self.port_count in {1, 2, 4}, \
1048 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1050 def capture_tsc_hz(self):
1051 self.tsc_hz = float(self.sut.hz())
1053 def line_rate_to_pps(self):
1054 # NOTE: to fix, don't hardcode 10Gb/s
1055 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1058 class ProxProfileHelper(object):
1060 __prox_profile_type__ = "Generic"
1062 PROX_CORE_GEN_MODE = "gen"
1063 PROX_CORE_LAT_MODE = "lat"
1066 def get_cls(cls, helper_type):
1067 """Return class of specified type."""
1069 return ProxProfileHelper
1071 for profile_helper_class in utils.itersubclasses(cls):
1072 if helper_type == profile_helper_class.__prox_profile_type__:
1073 return profile_helper_class
1075 return ProxProfileHelper
1078 def make_profile_helper(cls, resource_helper):
1079 return cls.get_cls(resource_helper.test_type)(resource_helper)
1081 def __init__(self, resource_helper):
1082 super(ProxProfileHelper, self).__init__()
1083 self.resource_helper = resource_helper
1084 self._cpu_topology = None
1085 self._test_cores = None
1086 self._latency_cores = None
1089 def cpu_topology(self):
1090 if not self._cpu_topology:
1091 stdout = io.BytesIO()
1092 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1093 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1094 return self._cpu_topology
1097 def test_cores(self):
1098 if not self._test_cores:
1099 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1100 return self._test_cores
1103 def latency_cores(self):
1104 if not self._latency_cores:
1105 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1106 return self._latency_cores
1109 def traffic_context(self, pkt_size, value):
1111 self.sut.reset_stats()
1113 self.sut.set_pkt_size(self.test_cores, pkt_size)
1114 self.sut.set_speed(self.test_cores, value)
1115 self.sut.start_all()
1120 def get_cores(self, mode):
1123 for section_name, section in self.setup_helper.prox_config_data:
1124 if not section_name.startswith("core"):
1127 for key, value in section:
1128 if key == "mode" and value == mode:
1129 core_tuple = CoreSocketTuple(section_name)
1130 core = core_tuple.find_in_topology(self.cpu_topology)
1135 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1136 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1138 with data_helper, self.traffic_context(pkt_size, value):
1139 with data_helper.measure_tot_stats():
1140 time.sleep(duration)
1141 # Getting statistics to calculate PPS at right speed....
1142 data_helper.capture_tsc_hz()
1143 data_helper.latency = self.get_latency()
1145 return data_helper.result_tuple, data_helper.samples
1147 def get_latency(self):
1149 :return: return lat_min, lat_max, lat_avg
1152 if self._latency_cores:
1153 return self.sut.lat_stats(self._latency_cores)
1156 def terminate(self):
1159 def __getattr__(self, item):
1160 return getattr(self.resource_helper, item)
1163 class ProxMplsProfileHelper(ProxProfileHelper):
1165 __prox_profile_type__ = "MPLS tag/untag"
1167 def __init__(self, resource_helper):
1168 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1169 self._cores_tuple = None
1172 def mpls_cores(self):
1173 if not self._cores_tuple:
1174 self._cores_tuple = self.get_cores_mpls()
1175 return self._cores_tuple
1178 def tagged_cores(self):
1179 return self.mpls_cores[0]
1182 def plain_cores(self):
1183 return self.mpls_cores[1]
1185 def get_cores_mpls(self):
1188 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1189 if not section_name.startswith("core"):
1192 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1195 for item_key, item_value in section:
1196 if item_key != 'name':
1199 if item_value.startswith("tag"):
1200 core_tuple = CoreSocketTuple(section_name)
1201 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1202 cores_tagged.append(core_tag)
1204 elif item_value.startswith("udp"):
1205 core_tuple = CoreSocketTuple(section_name)
1206 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1207 cores_plain.append(core_udp)
1209 return cores_tagged, cores_plain
1212 def traffic_context(self, pkt_size, value):
1214 self.sut.reset_stats()
1216 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1217 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1218 self.sut.set_speed(self.tagged_cores, value)
1219 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1220 self.sut.set_speed(self.plain_cores, value * ratio)
1221 self.sut.start_all()
1227 class ProxBngProfileHelper(ProxProfileHelper):
1229 __prox_profile_type__ = "BNG gen"
1231 def __init__(self, resource_helper):
1232 super(ProxBngProfileHelper, self).__init__(resource_helper)
1233 self._cores_tuple = None
1236 def bng_cores(self):
1237 if not self._cores_tuple:
1238 self._cores_tuple = self.get_cores_gen_bng_qos()
1239 return self._cores_tuple
1242 def cpe_cores(self):
1243 return self.bng_cores[0]
1246 def inet_cores(self):
1247 return self.bng_cores[1]
1250 def arp_cores(self):
1251 return self.bng_cores[2]
1254 def arp_task_cores(self):
1255 return self.bng_cores[3]
1258 def all_rx_cores(self):
1259 return self.latency_cores
1261 def get_cores_gen_bng_qos(self):
1265 arp_tasks_core = [0]
1266 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1267 if not section_name.startswith("core"):
1270 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1273 for item_key, item_value in section:
1274 if item_key != 'name':
1277 if item_value.startswith("cpe"):
1278 core_tuple = CoreSocketTuple(section_name)
1279 cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1280 cpe_cores.append(cpe_core)
1282 elif item_value.startswith("inet"):
1283 core_tuple = CoreSocketTuple(section_name)
1284 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1285 inet_cores.append(inet_core)
1287 elif item_value.startswith("arp"):
1288 core_tuple = CoreSocketTuple(section_name)
1289 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1290 arp_cores.append(arp_core)
1292 # We check the tasks/core separately
1293 if item_value.startswith("arp_task"):
1294 core_tuple = CoreSocketTuple(section_name)
1295 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1296 arp_tasks_core.append(arp_task_core)
1298 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1301 def traffic_context(self, pkt_size, value):
1302 # Tester is sending packets at the required speed already after
1303 # setup_test(). Just get the current statistics, sleep the required
1304 # amount of time and calculate packet loss.
1305 inet_pkt_size = pkt_size
1306 cpe_pkt_size = pkt_size - 24
1307 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1309 curr_up_speed = curr_down_speed = 0
1310 max_up_speed = max_down_speed = value
1312 max_down_speed = value * ratio
1314 max_up_speed = value / ratio
1320 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1322 self.sut.start(self.all_rx_cores)
1324 self.sut.stop(self.all_rx_cores)
1326 self.sut.reset_stats()
1328 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1329 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1331 self.sut.reset_values(self.cpe_cores)
1332 self.sut.reset_values(self.inet_cores)
1334 # Set correct IP and UDP lengths in packet headers
1336 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1337 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1338 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1339 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1342 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1343 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1344 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1345 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1346 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1347 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1349 # Sending ARP to initialize tables - need a few seconds of generation
1350 # to make sure all CPEs are initialized
1351 LOG.info("Initializing SUT: sending ARP packets")
1352 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1353 self.sut.set_speed(self.inet_cores, curr_up_speed)
1354 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1355 self.sut.start(self.arp_cores)
1358 # Ramp up the transmission speed. First go to the common speed, then
1359 # increase steps for the faster one.
1360 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1362 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1364 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1365 # The min(..., ...) takes care of 1) floating point rounding errors
1366 # that could make curr_*_speed to be slightly greater than
1367 # max_*_speed and 2) max_*_speed not being an exact multiple of
1369 if curr_up_speed < max_up_speed:
1370 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1371 if curr_down_speed < max_down_speed:
1372 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1374 self.sut.set_speed(self.inet_cores, curr_up_speed)
1375 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1376 time.sleep(self.step_time)
1378 LOG.info("Target speeds reached. Starting real test.")
1382 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1383 LOG.info("Test ended. Flushing NIC buffers")
1384 self.sut.start(self.all_rx_cores)
1386 self.sut.stop(self.all_rx_cores)
1388 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1389 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1391 with data_helper, self.traffic_context(pkt_size, value):
1392 with data_helper.measure_tot_stats():
1393 time.sleep(duration)
1394 # Getting statistics to calculate PPS at right speed....
1395 data_helper.capture_tsc_hz()
1396 data_helper.latency = self.get_latency()
1398 return data_helper.result_tuple, data_helper.samples
1401 class ProxVpeProfileHelper(ProxProfileHelper):
1403 __prox_profile_type__ = "vPE gen"
1405 def __init__(self, resource_helper):
1406 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1407 self._cores_tuple = None
1408 self._ports_tuple = None
1411 def vpe_cores(self):
1412 if not self._cores_tuple:
1413 self._cores_tuple = self.get_cores_gen_vpe()
1414 return self._cores_tuple
1417 def cpe_cores(self):
1418 return self.vpe_cores[0]
1421 def inet_cores(self):
1422 return self.vpe_cores[1]
1425 def all_rx_cores(self):
1426 return self.latency_cores
1429 def vpe_ports(self):
1430 if not self._ports_tuple:
1431 self._ports_tuple = self.get_ports_gen_vpe()
1432 return self._ports_tuple
1435 def cpe_ports(self):
1436 return self.vpe_ports[0]
1439 def inet_ports(self):
1440 return self.vpe_ports[1]
1442 def get_cores_gen_vpe(self):
1445 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1446 if not section_name.startswith("core"):
1449 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1452 for item_key, item_value in section:
1453 if item_key != 'name':
1456 if item_value.startswith("cpe"):
1457 core_tuple = CoreSocketTuple(section_name)
1458 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1459 cpe_cores.append(core_tag)
1461 elif item_value.startswith("inet"):
1462 core_tuple = CoreSocketTuple(section_name)
1463 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1464 inet_cores.append(inet_core)
1466 return cpe_cores, inet_cores
1468 def get_ports_gen_vpe(self):
1472 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1473 if not section_name.startswith("port"):
1475 tx_port_iter = re.finditer(r'\d+', section_name)
1476 tx_port_no = int(next(tx_port_iter).group(0))
1478 for item_key, item_value in section:
1479 if item_key != 'name':
1482 if item_value.startswith("cpe"):
1483 cpe_ports.append(tx_port_no)
1485 elif item_value.startswith("inet"):
1486 inet_ports.append(tx_port_no)
1488 return cpe_ports, inet_ports
1491 def traffic_context(self, pkt_size, value):
1492 # Calculate the target upload and download speed. The upload and
1493 # download packets have different packet sizes, so in order to get
1494 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1495 # of the packet sizes.
1496 cpe_pkt_size = pkt_size
1497 inet_pkt_size = pkt_size - 4
1498 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1500 curr_up_speed = curr_down_speed = 0
1501 max_up_speed = max_down_speed = value
1503 max_down_speed = value * ratio
1505 max_up_speed = value / ratio
1507 # Adjust speed when multiple cores per port are used to generate traffic
1508 if len(self.cpe_ports) != len(self.cpe_cores):
1509 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1510 if len(self.inet_ports) != len(self.inet_cores):
1511 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1517 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1519 self.sut.start(self.all_rx_cores)
1521 self.sut.stop(self.all_rx_cores)
1523 self.sut.reset_stats()
1525 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1526 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1528 self.sut.reset_values(self.cpe_cores)
1529 self.sut.reset_values(self.inet_cores)
1531 # Set correct IP and UDP lengths in packet headers
1532 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1533 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1534 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1535 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1537 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1538 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1539 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1540 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1542 self.sut.set_speed(self.inet_cores, curr_up_speed)
1543 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1545 # Ramp up the transmission speed. First go to the common speed, then
1546 # increase steps for the faster one.
1547 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1549 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1551 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1552 # The min(..., ...) takes care of 1) floating point rounding errors
1553 # that could make curr_*_speed to be slightly greater than
1554 # max_*_speed and 2) max_*_speed not being an exact multiple of
1556 if curr_up_speed < max_up_speed:
1557 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1558 if curr_down_speed < max_down_speed:
1559 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1561 self.sut.set_speed(self.inet_cores, curr_up_speed)
1562 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1563 time.sleep(self.step_time)
1565 LOG.info("Target speeds reached. Starting real test.")
1569 self.sut.stop(self.cpe_cores + self.inet_cores)
1570 LOG.info("Test ended. Flushing NIC buffers")
1571 self.sut.start(self.all_rx_cores)
1573 self.sut.stop(self.all_rx_cores)
1575 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1576 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1578 with data_helper, self.traffic_context(pkt_size, value):
1579 with data_helper.measure_tot_stats():
1580 time.sleep(duration)
1581 # Getting statistics to calculate PPS at right speed....
1582 data_helper.capture_tsc_hz()
1583 data_helper.latency = self.get_latency()
1585 return data_helper.result_tuple, data_helper.samples
1588 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1590 __prox_profile_type__ = "lwAFTR gen"
1592 def __init__(self, resource_helper):
1593 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1594 self._cores_tuple = None
1595 self._ports_tuple = None
1597 self.step_time = 0.5
1600 def _lwaftr_cores(self):
1601 if not self._cores_tuple:
1602 self._cores_tuple = self._get_cores_gen_lwaftr()
1603 return self._cores_tuple
1606 def tun_cores(self):
1607 return self._lwaftr_cores[0]
1610 def inet_cores(self):
1611 return self._lwaftr_cores[1]
1614 def _lwaftr_ports(self):
1615 if not self._ports_tuple:
1616 self._ports_tuple = self._get_ports_gen_lw_aftr()
1617 return self._ports_tuple
1620 def tun_ports(self):
1621 return self._lwaftr_ports[0]
1624 def inet_ports(self):
1625 return self._lwaftr_ports[1]
1628 def all_rx_cores(self):
1629 return self.latency_cores
1631 def _get_cores_gen_lwaftr(self):
1634 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1635 if not section_name.startswith("core"):
1638 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1641 core_tuple = CoreSocketTuple(section_name)
1642 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1643 for item_value in (v for k, v in section if k == 'name'):
1644 if item_value.startswith('tun'):
1645 tun_cores.append(core_tag)
1646 elif item_value.startswith('inet'):
1647 inet_cores.append(core_tag)
1649 return tun_cores, inet_cores
1651 def _get_ports_gen_lw_aftr(self):
1655 re_port = re.compile(r'port (\d+)')
1656 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1657 match = re_port.search(section_name)
1661 tx_port_no = int(match.group(1))
1662 for item_value in (v for k, v in section if k == 'name'):
1663 if item_value.startswith('lwB4'):
1664 tun_ports.append(tx_port_no)
1665 elif item_value.startswith('inet'):
1666 inet_ports.append(tx_port_no)
1668 return tun_ports, inet_ports
1671 def _resize(len1, len2):
1674 return 1.0 * len1 / len2
1677 def traffic_context(self, pkt_size, value):
1678 # Tester is sending packets at the required speed already after
1679 # setup_test(). Just get the current statistics, sleep the required
1680 # amount of time and calculate packet loss.
1681 tun_pkt_size = pkt_size
1682 inet_pkt_size = pkt_size - 40
1683 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1685 curr_up_speed = curr_down_speed = 0
1686 max_up_speed = max_down_speed = value
1688 max_up_speed = value / ratio
1690 # Adjust speed when multiple cores per port are used to generate traffic
1691 if len(self.tun_ports) != len(self.tun_cores):
1692 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1693 if len(self.inet_ports) != len(self.inet_cores):
1694 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1700 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1702 self.sut.start(self.all_rx_cores)
1704 self.sut.stop(self.all_rx_cores)
1706 self.sut.reset_stats()
1708 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1709 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1711 self.sut.reset_values(self.tun_cores)
1712 self.sut.reset_values(self.inet_cores)
1714 # Set correct IP and UDP lengths in packet headers
1716 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1717 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1718 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1719 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1720 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1721 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1724 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1725 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1726 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1727 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1729 LOG.info("Initializing SUT: sending lwAFTR packets")
1730 self.sut.set_speed(self.inet_cores, curr_up_speed)
1731 self.sut.set_speed(self.tun_cores, curr_down_speed)
1734 # Ramp up the transmission speed. First go to the common speed, then
1735 # increase steps for the faster one.
1736 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1738 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1740 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1741 # The min(..., ...) takes care of 1) floating point rounding errors
1742 # that could make curr_*_speed to be slightly greater than
1743 # max_*_speed and 2) max_*_speed not being an exact multiple of
1745 if curr_up_speed < max_up_speed:
1746 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1747 if curr_down_speed < max_down_speed:
1748 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1750 self.sut.set_speed(self.inet_cores, curr_up_speed)
1751 self.sut.set_speed(self.tun_cores, curr_down_speed)
1752 time.sleep(self.step_time)
1754 LOG.info("Target speeds reached. Starting real test.")
1758 self.sut.stop(self.tun_cores + self.inet_cores)
1759 LOG.info("Test ended. Flushing NIC buffers")
1760 self.sut.start(self.all_rx_cores)
1762 self.sut.stop(self.all_rx_cores)
1764 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1765 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1767 with data_helper, self.traffic_context(pkt_size, value):
1768 with data_helper.measure_tot_stats():
1769 time.sleep(duration)
1770 # Getting statistics to calculate PPS at right speed....
1771 data_helper.capture_tsc_hz()
1772 data_helper.latency = self.get_latency()
1774 return data_helper.result_tuple, data_helper.samples