1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
33 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
34 from yardstick.common import utils
35 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
36 from yardstick.network_services.helpers.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
53 CONFIGURATION_OPTIONS = (
54 # dict key section key default value
55 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
56 ('testDuration', 'general', 'test_duration', 5.0),
57 ('testPrecision', 'general', 'test_precision', 1.0),
58 ('tests', 'general', 'tests', None),
59 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61 ('logFile', 'logging', 'file', 'dats.log'),
62 ('logDateFormat', 'logging', 'datefmt', None),
63 ('logLevel', 'logging', 'level', 'INFO'),
64 ('logOverwrite', 'logging', 'overwrite', 1),
66 ('testerIp', 'tester', 'ip', None),
67 ('testerUser', 'tester', 'user', 'root'),
68 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
69 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
70 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
71 ('testerSocketId', 'tester', 'socket_id', 0),
73 ('sutIp', 'sut', 'ip', None),
74 ('sutUser', 'sut', 'user', 'root'),
75 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
76 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
77 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
78 ('sutSocketId', 'sut', 'socket_id', 0),
82 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
83 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
85 def __new__(cls, *args):
87 matches = cls.CORE_RE.search(str(args[0]))
89 args = matches.groups()
91 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
92 'h' if args[2] else '')
94 except (AttributeError, TypeError, IndexError, ValueError):
95 raise ValueError('Invalid core spec {}'.format(args))
97 def is_hyperthread(self):
98 return self.hyperthread == 'h'
102 return int(self.is_hyperthread())
104 def find_in_topology(self, cpu_topology):
106 socket_core_match = cpu_topology[self.socket_id][self.core_id]
107 sorted_match = sorted(socket_core_match.values())
108 return sorted_match[self.index][0]
109 except (KeyError, IndexError):
110 template = "Core {}{} on socket {} does not exist"
111 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
115 def __new__(cls, *args):
117 assert args[0] is not str(args[0])
118 args = tuple(args[0])
119 except (AssertionError, IndexError, TypeError):
122 return super(TotStatsTuple, cls).__new__(cls, *args)
125 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
126 'delta_tx,delta_tsc,'
127 'latency,rx_total,tx_total,pps')):
131 return 1e2 * self.drop_total / float(self.tx_total)
132 except ZeroDivisionError:
137 # calculate the effective throughput in Mpps
138 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
141 def can_be_lost(self):
142 return int(self.tx_total * self.tolerated / 1e2)
145 def drop_total(self):
146 return self.tx_total - self.rx_total
150 return self.drop_total <= self.can_be_lost
152 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
154 pkt_loss = self.pkt_loss
156 if port_samples is None:
166 "Throughput": self.mpps,
167 "DropPackets": pkt_loss,
168 "CurrentDropPackets": pkt_loss,
169 "TxThroughput": self.pps / 1e6,
170 "RxThroughput": self.mpps,
174 samples.update(port_samples)
176 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
179 def log_data(self, logger=None):
183 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
184 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
185 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
188 class PacketDump(object):
190 def assert_func(func, value1, value2, template=None):
191 assert func(value1, value2), template.format(value1, value2)
193 def __init__(self, port_id, data_len, payload):
194 template = "Packet dump has specified length {}, but payload is {} bytes long"
195 self.assert_func(operator.eq, data_len, len(payload), template)
196 self._port_id = port_id
197 self._data_len = data_len
198 self._payload = payload
202 """Get the port id of the packet dump"""
207 """Get the length of the data received"""
208 return self._data_len
211 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
213 def payload(self, start=None, end=None):
214 """Get part of the payload as a list of ordinals.
216 Returns a list of byte values, matching the contents of the packet dump.
217 Optional start and end parameters can be specified to retrieve only a
218 part of the packet contents.
220 The number of elements in the list is equal to end - start + 1, so end
221 is the offset of the last character.
224 start (pos. int): the starting offset in the payload. If it is not
225 specified or None, offset 0 is assumed.
226 end (pos. int): the ending offset of the payload. If it is not
227 specified or None, the contents until the end of the packet are
231 [int, int, ...]. Each int represents the ordinal value of a byte in
238 end = self.data_len - 1
240 # Bounds checking on offsets
241 template = "Start offset must be non-negative"
242 self.assert_func(operator.ge, start, 0, template)
244 template = "End offset must be less than {1}"
245 self.assert_func(operator.lt, end, self.data_len, template)
247 # Adjust for splice operation: end offset must be 1 more than the offset
248 # of the last desired character.
251 return self._payload[start:end]
254 class ProxSocketHelper(object):
256 def __init__(self, sock=None):
257 """ creates new prox instance """
258 super(ProxSocketHelper, self).__init__()
261 sock = socket.socket()
265 self.master_stats = None
267 def connect(self, ip, port):
268 """Connect to the prox instance on the remote system"""
269 self._sock.connect((ip, port))
271 def get_socket(self):
272 """ get the socket connected to the remote instance """
275 def _parse_socket_data(self, decoded_data, pkt_dump_only):
276 def get_newline_index():
277 return decoded_data.find('\n', index)
281 for newline_index in iter(get_newline_index, -1):
282 ret_str = decoded_data[index:newline_index]
285 mode, port_id, data_len = ret_str.split(',', 2)
287 mode, port_id, data_len = None, None, None
289 if mode != 'pktdump':
290 # Regular 1-line message. Stop reading from the socket.
291 LOG.debug("Regular response read")
294 LOG.debug("Packet dump header read: [%s]", ret_str)
296 # The line is a packet dump header. Parse it, read the
297 # packet payload, store the dump for later retrieval.
298 # Skip over the packet dump and continue processing: a
299 # 1-line response may follow the packet dump.
301 data_len = int(data_len)
302 data_start = newline_index + 1 # + 1 to skip over \n
303 data_end = data_start + data_len
304 sub_data = decoded_data[data_start:data_end]
305 pkt_payload = array.array('B', (ord(v) for v in sub_data))
306 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
307 self._pkt_dumps.append(pkt_dump)
310 # Return boolean instead of string to signal
311 # successful reception of the packet dump.
312 LOG.debug("Packet dump stored, returning")
319 def get_data(self, pkt_dump_only=False, timeout=1):
320 """ read data from the socket """
322 # This method behaves slightly differently depending on whether it is
323 # called to read the response to a command (pkt_dump_only = 0) or if
324 # it is called specifically to read a packet dump (pkt_dump_only = 1).
326 # Packet dumps look like:
327 # pktdump,<port_id>,<data_len>\n
328 # <packet contents as byte array>\n
329 # This means the total packet dump message consists of 2 lines instead
332 # - Response for a command (pkt_dump_only = 0):
333 # 1) Read response from the socket until \n (end of message)
334 # 2a) If the response is a packet dump header (starts with "pktdump,"):
335 # - Read the packet payload and store the packet dump for later
337 # - Reset the state and restart from 1). Eventually state 2b) will
338 # be reached and the function will return.
339 # 2b) If the response is not a packet dump:
340 # - Return the received message as a string
342 # - Explicit request to read a packet dump (pkt_dump_only = 1):
343 # - Read the dump header and payload
344 # - Store the packet dump for later retrieval
345 # - Return True to signify a packet dump was successfully read
348 # recv() is blocking, so avoid calling it when no data is waiting.
349 ready = select.select([self._sock], [], [], timeout)
350 return bool(ready[0])
354 for status in iter(is_ready, False):
355 decoded_data = self._sock.recv(256).decode('utf-8')
356 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
358 LOG.debug("Received data from socket: [%s]", ret_str)
359 return ret_str if status else ''
361 def put_command(self, to_send):
362 """ send data to the remote instance """
363 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
365 # NOTE: sendall will block, we need a timeout
366 self._sock.sendall(to_send.encode('utf-8'))
367 except: # pylint: disable=bare-except
370 def get_packet_dump(self):
371 """ get the next packet dump """
373 return self._pkt_dumps.pop(0)
376 def stop_all_reset(self):
377 """ stop the remote instance and reset stats """
378 LOG.debug("Stop all and reset stats")
383 """ stop all cores on the remote instance """
384 LOG.debug("Stop all")
385 self.put_command("stop all\n")
388 def stop(self, cores, task=''):
389 """ stop specific cores on the remote instance """
390 LOG.debug("Stopping cores %s", cores)
391 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
395 """ start all cores on the remote instance """
396 LOG.debug("Start all")
397 self.put_command("start all\n")
399 def start(self, cores):
400 """ start specific cores on the remote instance """
401 LOG.debug("Starting cores %s", cores)
402 self.put_command("start {}\n".format(join_non_strings(',', cores)))
405 def reset_stats(self):
406 """ reset the statistics on the remote instance """
407 LOG.debug("Reset stats")
408 self.put_command("reset stats\n")
411 def _run_template_over_cores(self, template, cores, *args):
413 self.put_command(template.format(core, *args))
415 def set_pkt_size(self, cores, pkt_size):
416 """ set the packet size to generate on the remote instance """
417 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
419 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422 def set_value(self, cores, offset, value, length):
423 """ set value on the remote instance """
424 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
425 LOG.debug(msg, cores, value, length, offset)
426 template = "set value {} 0 {} {} {}\n"
427 self._run_template_over_cores(template, cores, offset, value, length)
429 def reset_values(self, cores):
430 """ reset values on the remote instance """
431 LOG.debug("Set value for core(s) %s", cores)
432 self._run_template_over_cores("reset values {} 0\n", cores)
434 def set_speed(self, cores, speed, tasks=None):
435 """ set speed on the remote instance """
437 tasks = [0] * len(cores)
438 elif len(tasks) != len(cores):
439 LOG.error("set_speed: cores and tasks must have the same len")
440 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
441 for (core, task) in list(zip(cores, tasks)):
442 self.put_command("speed {} {} {}\n".format(core, task, speed))
444 def slope_speed(self, cores_speed, duration, n_steps=0):
445 """will start to increase speed from 0 to N where N is taken from
446 a['speed'] for each a in cores_speed"""
447 # by default, each step will take 0.5 sec
449 n_steps = duration * 2
451 private_core_data = []
452 step_duration = float(duration) / n_steps
453 for core_data in cores_speed:
454 target = float(core_data['speed'])
455 private_core_data.append({
456 'cores': core_data['cores'],
458 'delta': target / n_steps,
463 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
464 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
465 time.sleep(step_duration)
466 for core_data in private_core_data:
467 core_data['current'] = core_data[key1] + core_data[key2]
468 self.set_speed(core_data['cores'], core_data['current'])
470 def set_pps(self, cores, pps, pkt_size):
471 """ set packets per second for specific cores on the remote instance """
472 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
473 LOG.debug(msg, cores, pps, pkt_size)
475 # speed in percent of line-rate
476 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
477 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
479 def lat_stats(self, cores, task=0):
480 """Get the latency statistics from the remote system"""
481 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
486 self.put_command("lat stats {} {} \n".format(core, task))
487 ret = self.get_data()
490 lat_min[core], lat_max[core], lat_avg[core] = \
491 tuple(int(n) for n in ret.split(",")[:3])
493 except (AttributeError, ValueError, TypeError):
496 return lat_min, lat_max, lat_avg
498 def get_all_tot_stats(self):
499 self.put_command("tot stats\n")
500 all_stats_str = self.get_data().split(",")
501 if len(all_stats_str) != 4:
504 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
505 self.master_stats = all_stats
509 return self.get_all_tot_stats()[3]
511 def core_stats(self, cores, task=0):
512 """Get the receive statistics from the remote system"""
513 rx = tx = drop = tsc = 0
515 self.put_command("core stats {} {}\n".format(core, task))
516 ret = self.get_data().split(",")
521 return rx, tx, drop, tsc
523 def port_stats(self, ports):
524 """get counter values from a specific port"""
525 tot_result = [0] * 12
527 self.put_command("port_stats {}\n".format(port))
528 ret = [try_int(s, 0) for s in self.get_data().split(",")]
529 tot_result = [sum(x) for x in zip(tot_result, ret)]
533 def measure_tot_stats(self):
534 start = self.get_all_tot_stats()
535 container = {'start_tot': start}
539 container['end_tot'] = end = self.get_all_tot_stats()
541 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
544 """Get the total statistics from the remote system"""
545 stats = self.get_all_tot_stats()
548 def tot_ierrors(self):
549 """Get the total ierrors from the remote system"""
550 self.put_command("tot ierrors tot\n")
551 recv = self.get_data().split(',')
552 tot_ierrors = int(recv[0])
554 return tot_ierrors, tsc
556 def set_count(self, count, cores):
557 """Set the number of packets to send on the specified core"""
558 self._run_template_over_cores("count {} 0 {}\n", cores, count)
560 def dump_rx(self, core_id, task_id=0, count=1):
561 """Activate dump on rx on the specified core"""
562 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
563 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
564 time.sleep(1.5) # Give PROX time to set up packet dumping
572 """ stop all cores on the remote instance """
573 LOG.debug("Quit prox")
574 self.put_command("quit\n")
577 def force_quit(self):
578 """ stop all cores on the remote instance """
579 LOG.debug("Force Quit prox")
580 self.put_command("quit_force\n")
584 _LOCAL_OBJECT = object()
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588 # the actual app is lowercase
590 # not used for Prox but added for consistency
593 LUA_PARAMETER_NAME = ""
594 LUA_PARAMETER_PEER = {
599 CONFIG_QUEUE_TIMEOUT = 120
601 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
602 self.remote_path = None
603 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
604 self.remote_prox_file_name = None
605 self._prox_config_data = None
606 self.additional_files = {}
607 self.config_queue = Queue()
608 # allow_exit_without_flush
609 self.config_queue.cancel_join_thread()
610 self._global_section = None
613 def prox_config_data(self):
614 if self._prox_config_data is None:
615 # this will block, but it needs too
616 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
617 return self._prox_config_data
620 def global_section(self):
621 if self._global_section is None and self.prox_config_data:
622 self._global_section = self.find_section("global")
623 return self._global_section
625 def find_section(self, name, default=_LOCAL_OBJECT):
626 result = next((value for key, value in self.prox_config_data if key == name), default)
627 if result is _LOCAL_OBJECT:
628 raise KeyError('{} not found in Prox config'.format(name))
631 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
632 section = self.find_section(section_name, [])
633 result = next((value for key, value in section if key == section_key), default)
634 if result is _LOCAL_OBJECT:
635 template = '{} not found in {} section of Prox config'
636 raise KeyError(template.format(section_key, section_name))
639 def copy_to_target(self, config_file_path, prox_file):
640 remote_path = os.path.join("/tmp", prox_file)
641 self.ssh_helper.put(config_file_path, remote_path)
645 def _get_tx_port(section, sections):
647 for item in sections[section]:
648 if item[0] == "tx port":
649 iface_port = re.findall(r'\d+', item[1])
650 # do we want the last one?
651 # if yes, then can we reverse?
652 return int(iface_port[0])
655 def _replace_quoted_with_value(quoted, value, count=1):
656 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
659 def _insert_additional_file(self, value):
660 file_str = value.split('"')
661 base_name = os.path.basename(file_str[1])
662 file_str[1] = self.additional_files[base_name]
663 return '"'.join(file_str)
665 def generate_prox_config_file(self, config_path):
667 prox_config = ConfigParser(config_path, sections)
670 # Ensure MAC is set "hardware"
671 all_ports = self.vnfd_helper.port_pairs.all_ports
672 # use dpdk port number
673 for port_name in all_ports:
674 port_num = self.vnfd_helper.port_num(port_name)
675 port_section_name = "port {}".format(port_num)
676 for section_name, section in sections:
677 if port_section_name != section_name:
680 for section_data in section:
681 if section_data[0] == "mac":
682 section_data[1] = "hardware"
685 for _, section in sections:
686 for section_data in section:
687 item_key, item_val = section_data
688 if item_val.startswith("@@dst_mac"):
689 tx_port_iter = re.finditer(r'\d+', item_val)
690 tx_port_no = int(next(tx_port_iter).group(0))
691 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
692 mac = intf["virtual-interface"]["dst_mac"]
693 section_data[1] = mac.replace(":", " ", 6)
695 if item_key == "dst mac" and item_val.startswith("@@"):
696 tx_port_iter = re.finditer(r'\d+', item_val)
697 tx_port_no = int(next(tx_port_iter).group(0))
698 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
699 mac = intf["virtual-interface"]["dst_mac"]
700 section_data[1] = mac
702 # if addition file specified in prox config
703 if not self.additional_files:
706 for section_name, section in sections:
707 for section_data in section:
709 if section_data[0].startswith("dofile"):
710 section_data[0] = self._insert_additional_file(section_data[0])
712 if section_data[1].startswith("dofile"):
713 section_data[1] = self._insert_additional_file(section_data[1])
714 except: # pylint: disable=bare-except
720 def write_prox_lua(lua_config):
722 Write an .ini-format config file for PROX (parameters.lua)
723 PROX does not allow a space before/after the =, so we need
727 for key in lua_config:
728 value = '"' + lua_config[key] + '"'
729 if key == "__name__":
731 if value is not None and value != '@':
732 key = "=".join((key, str(value).replace('\n', '\n\t')))
735 key = str(key).replace('\n', '\n\t')
737 return os.linesep.join(out)
740 def write_prox_config(prox_config):
742 Write an .ini-format config file for PROX
743 PROX does not allow a space before/after the =, so we need
747 for (section_name, section) in prox_config:
748 out.append("[{}]".format(section_name))
751 if key == "__name__":
753 if value is not None and value != '@':
754 key = "=".join((key, str(value).replace('\n', '\n\t')))
757 key = str(key).replace('\n', '\n\t')
759 return os.linesep.join(out)
761 def put_string_to_file(self, s, remote_path):
762 file_obj = cStringIO(s)
763 self.ssh_helper.put_file_obj(file_obj, remote_path)
766 def generate_prox_lua_file(self):
768 all_ports = self.vnfd_helper.port_pairs.all_ports
769 for port_name in all_ports:
770 port_num = self.vnfd_helper.port_num(port_name)
771 intf = self.vnfd_helper.find_interface(name=port_name)
772 vintf = intf['virtual-interface']
773 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
774 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
778 def upload_prox_lua(self, config_file, lua_data):
779 # prox can't handle spaces around ' = ' so use custom method
780 out = StringIO(self.write_prox_lua(lua_data))
782 remote_path = os.path.join("/tmp", config_file)
783 self.ssh_helper.put_file_obj(out, remote_path)
787 def upload_prox_config(self, config_file, prox_config_data):
788 # prox can't handle spaces around ' = ' so use custom method
789 out = StringIO(self.write_prox_config(prox_config_data))
791 remote_path = os.path.join("/tmp", config_file)
792 self.ssh_helper.put_file_obj(out, remote_path)
796 def build_config_file(self):
797 task_path = self.scenario_helper.task_path
798 options = self.scenario_helper.options
799 config_path = options['prox_config']
800 config_file = os.path.basename(config_path)
801 config_path = find_relative_file(config_path, task_path)
802 self.additional_files = {}
805 if options['prox_generate_parameter']:
807 self.lua = self.generate_prox_lua_file()
808 if len(self.lua) > 0:
809 self.upload_prox_lua("parameters.lua", self.lua)
810 except: # pylint: disable=bare-except
813 prox_files = options.get('prox_files', [])
814 if isinstance(prox_files, six.string_types):
815 prox_files = [prox_files]
816 for key_prox_file in prox_files:
817 base_prox_file = os.path.basename(key_prox_file)
818 key_prox_path = find_relative_file(key_prox_file, task_path)
819 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
820 self.additional_files[base_prox_file] = remote_prox_file
822 self._prox_config_data = self.generate_prox_config_file(config_path)
823 # copy config to queue so we can read it from traffic_runner process
824 self.config_queue.put(self._prox_config_data)
825 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
827 def build_config(self):
828 self.build_config_file()
830 options = self.scenario_helper.options
831 prox_args = options['prox_args']
832 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
834 self.pipeline_kwargs = {
835 'tool_path': tool_path,
836 'tool_dir': os.path.dirname(tool_path),
837 'cfg_file': self.remote_path,
838 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
839 for k, v in prox_args.items())
842 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
843 "{args} -f {cfg_file} '")
844 return cmd_template.format(**self.pipeline_kwargs)
847 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
848 class ProxResourceHelper(ClientResourceHelper):
850 RESOURCE_WORD = 'prox'
857 def find_pci(pci, bound_pci):
858 # we have to substring match PCI bus address from the end
859 return any(b.endswith(pci) for b in bound_pci)
861 def __init__(self, setup_helper):
862 super(ProxResourceHelper, self).__init__(setup_helper)
863 self.mgmt_interface = self.vnfd_helper.mgmt_interface
864 self._user = self.mgmt_interface["user"]
865 self._ip = self.mgmt_interface["ip"]
868 self._vpci_to_if_name_map = None
869 self.additional_file = {}
870 self.remote_prox_file_name = None
875 self._test_type = None
880 self.client = self._connect()
885 if self._test_type is None:
886 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
887 return self._test_type
889 def run_traffic(self, traffic_profile):
890 self._queue.cancel_join_thread()
894 traffic_profile.init(self._queue)
895 # this frees up the run_traffic loop
896 self.client_started.value = 1
898 while not self._terminated.value:
899 # move it all to traffic_profile
900 self._run_traffic_once(traffic_profile)
902 def _run_traffic_once(self, traffic_profile):
903 traffic_profile.execute_traffic(self)
904 if traffic_profile.done:
905 self._queue.put({'done': True})
906 LOG.debug("tg_prox done")
907 self._terminated.value = 1
909 # For VNF use ResourceHelper method to collect KPIs directly.
910 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
911 def collect_collectd_kpi(self):
912 return self._collect_resource_kpi()
914 def collect_kpi(self):
915 result = super(ProxResourceHelper, self).collect_kpi()
916 # add in collectd kpis manually
918 result['collect_stats'] = self._collect_resource_kpi()
922 # should not be called, use VNF terminate
923 raise NotImplementedError()
926 return self.sut # force connection
928 def execute(self, cmd, *args, **kwargs):
929 func = getattr(self.sut, cmd, None)
931 return func(*args, **kwargs)
934 def _connect(self, client=None):
935 """Run and connect to prox on the remote system """
936 # De-allocating a large amount of hugepages takes some time. If a new
937 # PROX instance is started immediately after killing the previous one,
938 # it might not be able to allocate hugepages, because they are still
939 # being freed. Hence the -w switch.
940 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
941 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
942 # -f ./handle_none-4.cfg"
943 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
945 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
946 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
948 # + "./build/Prox " + prox_args
949 # log.debug("Starting PROX with command [%s]", prox_cmd)
950 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
951 # self._ip, prox_cmd))
953 client = ProxSocketHelper()
955 # try connecting to Prox for 60s
956 for _ in range(RETRY_SECONDS):
957 time.sleep(RETRY_INTERVAL)
959 client.connect(self._ip, PROX_PORT)
960 except (socket.gaierror, socket.error):
965 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
966 raise Exception(msg.format(self._ip, PROX_PORT))
969 class ProxDataHelper(object):
971 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
972 super(ProxDataHelper, self).__init__()
973 self.vnfd_helper = vnfd_helper
975 self.pkt_size = pkt_size
977 self.tolerated_loss = tolerated_loss
978 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
980 self.measured_stats = None
982 self._totals_and_pps = None
983 self.result_tuple = None
986 def totals_and_pps(self):
987 if self._totals_and_pps is None:
988 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
989 pps = self.value / 100.0 * self.line_rate_to_pps()
990 self._totals_and_pps = rx_total, tx_total, pps
991 return self._totals_and_pps
995 return self.totals_and_pps[0]
999 return self.totals_and_pps[1]
1003 return self.totals_and_pps[2]
1008 for port_name, port_num in self.vnfd_helper.ports_iter():
1010 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1011 samples[port_name] = {
1012 "in_packets": port_rx_total,
1013 "out_packets": port_tx_total,
1015 except (KeyError, TypeError, NameError, MemoryError, ValueError,
1016 SystemError, BufferError):
1017 samples[port_name] = {
1023 def __enter__(self):
1024 self.check_interface_count()
1027 def __exit__(self, exc_type, exc_val, exc_tb):
1030 def make_tuple(self):
1031 if self.result_tuple:
1034 self.result_tuple = ProxTestDataTuple(
1035 self.tolerated_loss,
1037 self.measured_stats['delta'].rx,
1038 self.measured_stats['delta'].tx,
1039 self.measured_stats['delta'].tsc,
1045 self.result_tuple.log_data()
1048 def measure_tot_stats(self):
1049 with self.sut.measure_tot_stats() as self.measured_stats:
1052 def check_interface_count(self):
1053 # do this assert in init? unless we expect interface count to
1054 # change from one run to another run...
1055 assert self.port_count in {1, 2, 4}, \
1056 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1058 def capture_tsc_hz(self):
1059 self.tsc_hz = float(self.sut.hz())
1061 def line_rate_to_pps(self):
1062 # NOTE: to fix, don't hardcode 10Gb/s
1063 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1066 class ProxProfileHelper(object):
1068 __prox_profile_type__ = "Generic"
1070 PROX_CORE_GEN_MODE = "gen"
1071 PROX_CORE_LAT_MODE = "lat"
1074 def get_cls(cls, helper_type):
1075 """Return class of specified type."""
1077 return ProxProfileHelper
1079 for profile_helper_class in utils.itersubclasses(cls):
1080 if helper_type == profile_helper_class.__prox_profile_type__:
1081 return profile_helper_class
1083 return ProxProfileHelper
1086 def make_profile_helper(cls, resource_helper):
1087 return cls.get_cls(resource_helper.test_type)(resource_helper)
1089 def __init__(self, resource_helper):
1090 super(ProxProfileHelper, self).__init__()
1091 self.resource_helper = resource_helper
1092 self._cpu_topology = None
1093 self._test_cores = None
1094 self._latency_cores = None
1097 def cpu_topology(self):
1098 if not self._cpu_topology:
1099 stdout = io.BytesIO()
1100 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1101 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1102 return self._cpu_topology
1105 def test_cores(self):
1106 if not self._test_cores:
1107 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1108 return self._test_cores
1111 def latency_cores(self):
1112 if not self._latency_cores:
1113 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1114 return self._latency_cores
1117 def traffic_context(self, pkt_size, value):
1119 self.sut.reset_stats()
1121 self.sut.set_pkt_size(self.test_cores, pkt_size)
1122 self.sut.set_speed(self.test_cores, value)
1123 self.sut.start_all()
1128 def get_cores(self, mode):
1131 for section_name, section in self.setup_helper.prox_config_data:
1132 if not section_name.startswith("core"):
1135 for key, value in section:
1136 if key == "mode" and value == mode:
1137 core_tuple = CoreSocketTuple(section_name)
1138 core = core_tuple.core_id
1143 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1144 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1146 with data_helper, self.traffic_context(pkt_size, value):
1147 with data_helper.measure_tot_stats():
1148 time.sleep(duration)
1149 # Getting statistics to calculate PPS at right speed....
1150 data_helper.capture_tsc_hz()
1151 data_helper.latency = self.get_latency()
1153 return data_helper.result_tuple, data_helper.samples
1155 def get_latency(self):
1157 :return: return lat_min, lat_max, lat_avg
1161 if not self._latency_cores:
1162 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1164 if self._latency_cores:
1165 return self.sut.lat_stats(self._latency_cores)
1168 def terminate(self):
1171 def __getattr__(self, item):
1172 return getattr(self.resource_helper, item)
1175 class ProxMplsProfileHelper(ProxProfileHelper):
1177 __prox_profile_type__ = "MPLS tag/untag"
1179 def __init__(self, resource_helper):
1180 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1181 self._cores_tuple = None
1184 def mpls_cores(self):
1185 if not self._cores_tuple:
1186 self._cores_tuple = self.get_cores_mpls()
1187 return self._cores_tuple
1190 def tagged_cores(self):
1191 return self.mpls_cores[0]
1194 def plain_cores(self):
1195 return self.mpls_cores[1]
1197 def get_cores_mpls(self):
1200 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1201 if not section_name.startswith("core"):
1204 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1207 for item_key, item_value in section:
1208 if item_key != 'name':
1211 if item_value.startswith("tag"):
1212 core_tuple = CoreSocketTuple(section_name)
1213 core_tag = core_tuple.core_id
1214 cores_tagged.append(core_tag)
1216 elif item_value.startswith("udp"):
1217 core_tuple = CoreSocketTuple(section_name)
1218 core_udp = core_tuple.core_id
1219 cores_plain.append(core_udp)
1221 return cores_tagged, cores_plain
1224 def traffic_context(self, pkt_size, value):
1226 self.sut.reset_stats()
1228 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1229 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1230 self.sut.set_speed(self.tagged_cores, value)
1231 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1232 self.sut.set_speed(self.plain_cores, value * ratio)
1233 self.sut.start_all()
1239 class ProxBngProfileHelper(ProxProfileHelper):
1241 __prox_profile_type__ = "BNG gen"
1243 def __init__(self, resource_helper):
1244 super(ProxBngProfileHelper, self).__init__(resource_helper)
1245 self._cores_tuple = None
1248 def bng_cores(self):
1249 if not self._cores_tuple:
1250 self._cores_tuple = self.get_cores_gen_bng_qos()
1251 return self._cores_tuple
1254 def cpe_cores(self):
1255 return self.bng_cores[0]
1258 def inet_cores(self):
1259 return self.bng_cores[1]
1262 def arp_cores(self):
1263 return self.bng_cores[2]
1266 def arp_task_cores(self):
1267 return self.bng_cores[3]
1270 def all_rx_cores(self):
1271 return self.latency_cores
1273 def get_cores_gen_bng_qos(self):
1277 arp_tasks_core = [0]
1278 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1279 if not section_name.startswith("core"):
1282 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1285 for item_key, item_value in section:
1286 if item_key != 'name':
1289 if item_value.startswith("cpe"):
1290 core_tuple = CoreSocketTuple(section_name)
1291 cpe_core = core_tuple.core_id
1292 cpe_cores.append(cpe_core)
1294 elif item_value.startswith("inet"):
1295 core_tuple = CoreSocketTuple(section_name)
1296 inet_core = core_tuple.core_id
1297 inet_cores.append(inet_core)
1299 elif item_value.startswith("arp"):
1300 core_tuple = CoreSocketTuple(section_name)
1301 arp_core = core_tuple.core_id
1302 arp_cores.append(arp_core)
1304 # We check the tasks/core separately
1305 if item_value.startswith("arp_task"):
1306 core_tuple = CoreSocketTuple(section_name)
1307 arp_task_core = core_tuple.core_id
1308 arp_tasks_core.append(arp_task_core)
1310 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1313 def traffic_context(self, pkt_size, value):
1314 # Tester is sending packets at the required speed already after
1315 # setup_test(). Just get the current statistics, sleep the required
1316 # amount of time and calculate packet loss.
1317 inet_pkt_size = pkt_size
1318 cpe_pkt_size = pkt_size - 24
1319 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1321 curr_up_speed = curr_down_speed = 0
1322 max_up_speed = max_down_speed = value
1324 max_down_speed = value * ratio
1326 max_up_speed = value / ratio
1332 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1334 self.sut.start(self.all_rx_cores)
1336 self.sut.stop(self.all_rx_cores)
1338 self.sut.reset_stats()
1340 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1341 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1343 self.sut.reset_values(self.cpe_cores)
1344 self.sut.reset_values(self.inet_cores)
1346 # Set correct IP and UDP lengths in packet headers
1348 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1349 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1350 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1351 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1354 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1355 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1356 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1357 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1358 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1359 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1361 # Sending ARP to initialize tables - need a few seconds of generation
1362 # to make sure all CPEs are initialized
1363 LOG.info("Initializing SUT: sending ARP packets")
1364 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1365 self.sut.set_speed(self.inet_cores, curr_up_speed)
1366 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1367 self.sut.start(self.arp_cores)
1370 # Ramp up the transmission speed. First go to the common speed, then
1371 # increase steps for the faster one.
1372 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1374 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1376 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1377 # The min(..., ...) takes care of 1) floating point rounding errors
1378 # that could make curr_*_speed to be slightly greater than
1379 # max_*_speed and 2) max_*_speed not being an exact multiple of
1381 if curr_up_speed < max_up_speed:
1382 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1383 if curr_down_speed < max_down_speed:
1384 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1386 self.sut.set_speed(self.inet_cores, curr_up_speed)
1387 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1388 time.sleep(self.step_time)
1390 LOG.info("Target speeds reached. Starting real test.")
1394 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1395 LOG.info("Test ended. Flushing NIC buffers")
1396 self.sut.start(self.all_rx_cores)
1398 self.sut.stop(self.all_rx_cores)
1400 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1401 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1403 with data_helper, self.traffic_context(pkt_size, value):
1404 with data_helper.measure_tot_stats():
1405 time.sleep(duration)
1406 # Getting statistics to calculate PPS at right speed....
1407 data_helper.capture_tsc_hz()
1408 data_helper.latency = self.get_latency()
1410 return data_helper.result_tuple, data_helper.samples
1413 class ProxVpeProfileHelper(ProxProfileHelper):
1415 __prox_profile_type__ = "vPE gen"
1417 def __init__(self, resource_helper):
1418 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1419 self._cores_tuple = None
1420 self._ports_tuple = None
1423 def vpe_cores(self):
1424 if not self._cores_tuple:
1425 self._cores_tuple = self.get_cores_gen_vpe()
1426 return self._cores_tuple
1429 def cpe_cores(self):
1430 return self.vpe_cores[0]
1433 def inet_cores(self):
1434 return self.vpe_cores[1]
1437 def all_rx_cores(self):
1438 return self.latency_cores
1441 def vpe_ports(self):
1442 if not self._ports_tuple:
1443 self._ports_tuple = self.get_ports_gen_vpe()
1444 return self._ports_tuple
1447 def cpe_ports(self):
1448 return self.vpe_ports[0]
1451 def inet_ports(self):
1452 return self.vpe_ports[1]
1454 def get_cores_gen_vpe(self):
1457 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1458 if not section_name.startswith("core"):
1461 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1464 for item_key, item_value in section:
1465 if item_key != 'name':
1468 if item_value.startswith("cpe"):
1469 core_tuple = CoreSocketTuple(section_name)
1470 core_tag = core_tuple.core_id
1471 cpe_cores.append(core_tag)
1473 elif item_value.startswith("inet"):
1474 core_tuple = CoreSocketTuple(section_name)
1475 inet_core = core_tuple.core_id
1476 inet_cores.append(inet_core)
1478 return cpe_cores, inet_cores
1480 def get_ports_gen_vpe(self):
1484 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1485 if not section_name.startswith("port"):
1487 tx_port_iter = re.finditer(r'\d+', section_name)
1488 tx_port_no = int(next(tx_port_iter).group(0))
1490 for item_key, item_value in section:
1491 if item_key != 'name':
1494 if item_value.startswith("cpe"):
1495 cpe_ports.append(tx_port_no)
1497 elif item_value.startswith("inet"):
1498 inet_ports.append(tx_port_no)
1500 return cpe_ports, inet_ports
1503 def traffic_context(self, pkt_size, value):
1504 # Calculate the target upload and download speed. The upload and
1505 # download packets have different packet sizes, so in order to get
1506 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1507 # of the packet sizes.
1508 cpe_pkt_size = pkt_size
1509 inet_pkt_size = pkt_size - 4
1510 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1512 curr_up_speed = curr_down_speed = 0
1513 max_up_speed = max_down_speed = value
1515 max_down_speed = value * ratio
1517 max_up_speed = value / ratio
1519 # Adjust speed when multiple cores per port are used to generate traffic
1520 if len(self.cpe_ports) != len(self.cpe_cores):
1521 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1522 if len(self.inet_ports) != len(self.inet_cores):
1523 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1529 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1531 self.sut.start(self.all_rx_cores)
1533 self.sut.stop(self.all_rx_cores)
1535 self.sut.reset_stats()
1537 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1538 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1540 self.sut.reset_values(self.cpe_cores)
1541 self.sut.reset_values(self.inet_cores)
1543 # Set correct IP and UDP lengths in packet headers
1544 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1545 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1546 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1547 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1549 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1550 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1551 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1552 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1554 self.sut.set_speed(self.inet_cores, curr_up_speed)
1555 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1557 # Ramp up the transmission speed. First go to the common speed, then
1558 # increase steps for the faster one.
1559 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1561 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1563 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1564 # The min(..., ...) takes care of 1) floating point rounding errors
1565 # that could make curr_*_speed to be slightly greater than
1566 # max_*_speed and 2) max_*_speed not being an exact multiple of
1568 if curr_up_speed < max_up_speed:
1569 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1570 if curr_down_speed < max_down_speed:
1571 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1573 self.sut.set_speed(self.inet_cores, curr_up_speed)
1574 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1575 time.sleep(self.step_time)
1577 LOG.info("Target speeds reached. Starting real test.")
1581 self.sut.stop(self.cpe_cores + self.inet_cores)
1582 LOG.info("Test ended. Flushing NIC buffers")
1583 self.sut.start(self.all_rx_cores)
1585 self.sut.stop(self.all_rx_cores)
1587 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1588 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1590 with data_helper, self.traffic_context(pkt_size, value):
1591 with data_helper.measure_tot_stats():
1592 time.sleep(duration)
1593 # Getting statistics to calculate PPS at right speed....
1594 data_helper.capture_tsc_hz()
1595 data_helper.latency = self.get_latency()
1597 return data_helper.result_tuple, data_helper.samples
1600 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1602 __prox_profile_type__ = "lwAFTR gen"
1604 def __init__(self, resource_helper):
1605 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1606 self._cores_tuple = None
1607 self._ports_tuple = None
1609 self.step_time = 0.5
1612 def _lwaftr_cores(self):
1613 if not self._cores_tuple:
1614 self._cores_tuple = self._get_cores_gen_lwaftr()
1615 return self._cores_tuple
1618 def tun_cores(self):
1619 return self._lwaftr_cores[0]
1622 def inet_cores(self):
1623 return self._lwaftr_cores[1]
1626 def _lwaftr_ports(self):
1627 if not self._ports_tuple:
1628 self._ports_tuple = self._get_ports_gen_lw_aftr()
1629 return self._ports_tuple
1632 def tun_ports(self):
1633 return self._lwaftr_ports[0]
1636 def inet_ports(self):
1637 return self._lwaftr_ports[1]
1640 def all_rx_cores(self):
1641 return self.latency_cores
1643 def _get_cores_gen_lwaftr(self):
1646 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1647 if not section_name.startswith("core"):
1650 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1653 core_tuple = CoreSocketTuple(section_name)
1654 core_tag = core_tuple.core_id
1655 for item_value in (v for k, v in section if k == 'name'):
1656 if item_value.startswith('tun'):
1657 tun_cores.append(core_tag)
1658 elif item_value.startswith('inet'):
1659 inet_cores.append(core_tag)
1661 return tun_cores, inet_cores
1663 def _get_ports_gen_lw_aftr(self):
1667 re_port = re.compile(r'port (\d+)')
1668 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1669 match = re_port.search(section_name)
1673 tx_port_no = int(match.group(1))
1674 for item_value in (v for k, v in section if k == 'name'):
1675 if item_value.startswith('lwB4'):
1676 tun_ports.append(tx_port_no)
1677 elif item_value.startswith('inet'):
1678 inet_ports.append(tx_port_no)
1680 return tun_ports, inet_ports
1683 def _resize(len1, len2):
1686 return 1.0 * len1 / len2
1689 def traffic_context(self, pkt_size, value):
1690 # Tester is sending packets at the required speed already after
1691 # setup_test(). Just get the current statistics, sleep the required
1692 # amount of time and calculate packet loss.
1693 tun_pkt_size = pkt_size
1694 inet_pkt_size = pkt_size - 40
1695 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1697 curr_up_speed = curr_down_speed = 0
1698 max_up_speed = max_down_speed = value
1700 max_up_speed = value / ratio
1702 # Adjust speed when multiple cores per port are used to generate traffic
1703 if len(self.tun_ports) != len(self.tun_cores):
1704 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1705 if len(self.inet_ports) != len(self.inet_cores):
1706 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1712 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1714 self.sut.start(self.all_rx_cores)
1716 self.sut.stop(self.all_rx_cores)
1718 self.sut.reset_stats()
1720 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1721 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1723 self.sut.reset_values(self.tun_cores)
1724 self.sut.reset_values(self.inet_cores)
1726 # Set correct IP and UDP lengths in packet headers
1728 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1729 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1730 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1731 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1732 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1733 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1736 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1737 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1738 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1739 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1741 LOG.info("Initializing SUT: sending lwAFTR packets")
1742 self.sut.set_speed(self.inet_cores, curr_up_speed)
1743 self.sut.set_speed(self.tun_cores, curr_down_speed)
1746 # Ramp up the transmission speed. First go to the common speed, then
1747 # increase steps for the faster one.
1748 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1750 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1752 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1753 # The min(..., ...) takes care of 1) floating point rounding errors
1754 # that could make curr_*_speed to be slightly greater than
1755 # max_*_speed and 2) max_*_speed not being an exact multiple of
1757 if curr_up_speed < max_up_speed:
1758 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1759 if curr_down_speed < max_down_speed:
1760 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1762 self.sut.set_speed(self.inet_cores, curr_up_speed)
1763 self.sut.set_speed(self.tun_cores, curr_down_speed)
1764 time.sleep(self.step_time)
1766 LOG.info("Target speeds reached. Starting real test.")
1770 self.sut.stop(self.tun_cores + self.inet_cores)
1771 LOG.info("Test ended. Flushing NIC buffers")
1772 self.sut.start(self.all_rx_cores)
1774 self.sut.stop(self.all_rx_cores)
1776 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1777 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1779 with data_helper, self.traffic_context(pkt_size, value):
1780 with data_helper.measure_tot_stats():
1781 time.sleep(duration)
1782 # Getting statistics to calculate PPS at right speed....
1783 data_helper.capture_tsc_hz()
1784 data_helper.latency = self.get_latency()
1786 return data_helper.result_tuple, data_helper.samples