1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
44 LOG = logging.getLogger(__name__)
45 LOG.setLevel(logging.DEBUG)
52 CONFIGURATION_OPTIONS = (
53 # dict key section key default value
54 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
55 ('testDuration', 'general', 'test_duration', 5.0),
56 ('testPrecision', 'general', 'test_precision', 1.0),
57 ('tests', 'general', 'tests', None),
58 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
60 ('logFile', 'logging', 'file', 'dats.log'),
61 ('logDateFormat', 'logging', 'datefmt', None),
62 ('logLevel', 'logging', 'level', 'INFO'),
63 ('logOverwrite', 'logging', 'overwrite', 1),
65 ('testerIp', 'tester', 'ip', None),
66 ('testerUser', 'tester', 'user', 'root'),
67 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
68 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
70 ('testerSocketId', 'tester', 'socket_id', 0),
72 ('sutIp', 'sut', 'ip', None),
73 ('sutUser', 'sut', 'user', 'root'),
74 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
75 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
76 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
77 ('sutSocketId', 'sut', 'socket_id', 0),
81 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
82 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
84 def __new__(cls, *args):
86 matches = cls.CORE_RE.search(str(args[0]))
88 args = matches.groups()
90 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
91 'h' if args[2] else '')
93 except (AttributeError, TypeError, IndexError, ValueError):
94 raise ValueError('Invalid core spec {}'.format(args))
96 def is_hyperthread(self):
97 return self.hyperthread == 'h'
101 return int(self.is_hyperthread())
103 def find_in_topology(self, cpu_topology):
105 socket_core_match = cpu_topology[self.socket_id][self.core_id]
106 sorted_match = sorted(socket_core_match.values())
107 return sorted_match[self.index][0]
108 except (KeyError, IndexError):
109 template = "Core {}{} on socket {} does not exist"
110 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
113 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
114 def __new__(cls, *args):
116 assert args[0] is not str(args[0])
117 args = tuple(args[0])
118 except (AssertionError, IndexError, TypeError):
121 return super(TotStatsTuple, cls).__new__(cls, *args)
124 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
125 'delta_tx,delta_tsc,'
126 'latency,rx_total,tx_total,pps')):
130 return 1e2 * self.drop_total / float(self.tx_total)
131 except ZeroDivisionError:
136 # calculate the effective throughput in Mpps
137 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
140 def can_be_lost(self):
141 return int(self.tx_total * self.tolerated / 1e2)
144 def drop_total(self):
145 return self.tx_total - self.rx_total
149 return self.drop_total <= self.can_be_lost
151 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
153 pkt_loss = self.pkt_loss
155 if port_samples is None:
165 "Throughput": self.mpps,
166 "DropPackets": pkt_loss,
167 "CurrentDropPackets": pkt_loss,
168 "TxThroughput": self.pps / 1e6,
169 "RxThroughput": self.mpps,
173 samples.update(port_samples)
175 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
178 def log_data(self, logger=None):
182 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
183 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
184 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
187 class PacketDump(object):
189 def assert_func(func, value1, value2, template=None):
190 assert func(value1, value2), template.format(value1, value2)
192 def __init__(self, port_id, data_len, payload):
193 template = "Packet dump has specified length {}, but payload is {} bytes long"
194 self.assert_func(operator.eq, data_len, len(payload), template)
195 self._port_id = port_id
196 self._data_len = data_len
197 self._payload = payload
201 """Get the port id of the packet dump"""
206 """Get the length of the data received"""
207 return self._data_len
210 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
212 def payload(self, start=None, end=None):
213 """Get part of the payload as a list of ordinals.
215 Returns a list of byte values, matching the contents of the packet dump.
216 Optional start and end parameters can be specified to retrieve only a
217 part of the packet contents.
219 The number of elements in the list is equal to end - start + 1, so end
220 is the offset of the last character.
223 start (pos. int): the starting offset in the payload. If it is not
224 specified or None, offset 0 is assumed.
225 end (pos. int): the ending offset of the payload. If it is not
226 specified or None, the contents until the end of the packet are
230 [int, int, ...]. Each int represents the ordinal value of a byte in
237 end = self.data_len - 1
239 # Bounds checking on offsets
240 template = "Start offset must be non-negative"
241 self.assert_func(operator.ge, start, 0, template)
243 template = "End offset must be less than {1}"
244 self.assert_func(operator.lt, end, self.data_len, template)
246 # Adjust for splice operation: end offset must be 1 more than the offset
247 # of the last desired character.
250 return self._payload[start:end]
253 class ProxSocketHelper(object):
255 def __init__(self, sock=None):
256 """ creates new prox instance """
257 super(ProxSocketHelper, self).__init__()
260 sock = socket.socket()
264 self.master_stats = None
266 def connect(self, ip, port):
267 """Connect to the prox instance on the remote system"""
268 self._sock.connect((ip, port))
270 def get_socket(self):
271 """ get the socket connected to the remote instance """
274 def _parse_socket_data(self, decoded_data, pkt_dump_only):
275 def get_newline_index():
276 return decoded_data.find('\n', index)
280 for newline_index in iter(get_newline_index, -1):
281 ret_str = decoded_data[index:newline_index]
284 mode, port_id, data_len = ret_str.split(',', 2)
286 mode, port_id, data_len = None, None, None
288 if mode != 'pktdump':
289 # Regular 1-line message. Stop reading from the socket.
290 LOG.debug("Regular response read")
293 LOG.debug("Packet dump header read: [%s]", ret_str)
295 # The line is a packet dump header. Parse it, read the
296 # packet payload, store the dump for later retrieval.
297 # Skip over the packet dump and continue processing: a
298 # 1-line response may follow the packet dump.
300 data_len = int(data_len)
301 data_start = newline_index + 1 # + 1 to skip over \n
302 data_end = data_start + data_len
303 sub_data = decoded_data[data_start:data_end]
304 pkt_payload = array.array('B', (ord(v) for v in sub_data))
305 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
306 self._pkt_dumps.append(pkt_dump)
309 # Return boolean instead of string to signal
310 # successful reception of the packet dump.
311 LOG.debug("Packet dump stored, returning")
318 def get_data(self, pkt_dump_only=False, timeout=1):
319 """ read data from the socket """
321 # This method behaves slightly differently depending on whether it is
322 # called to read the response to a command (pkt_dump_only = 0) or if
323 # it is called specifically to read a packet dump (pkt_dump_only = 1).
325 # Packet dumps look like:
326 # pktdump,<port_id>,<data_len>\n
327 # <packet contents as byte array>\n
328 # This means the total packet dump message consists of 2 lines instead
331 # - Response for a command (pkt_dump_only = 0):
332 # 1) Read response from the socket until \n (end of message)
333 # 2a) If the response is a packet dump header (starts with "pktdump,"):
334 # - Read the packet payload and store the packet dump for later
336 # - Reset the state and restart from 1). Eventually state 2b) will
337 # be reached and the function will return.
338 # 2b) If the response is not a packet dump:
339 # - Return the received message as a string
341 # - Explicit request to read a packet dump (pkt_dump_only = 1):
342 # - Read the dump header and payload
343 # - Store the packet dump for later retrieval
344 # - Return True to signify a packet dump was successfully read
347 # recv() is blocking, so avoid calling it when no data is waiting.
348 ready = select.select([self._sock], [], [], timeout)
349 return bool(ready[0])
353 for status in iter(is_ready, False):
354 decoded_data = self._sock.recv(256).decode('utf-8')
355 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
357 LOG.debug("Received data from socket: [%s]", ret_str)
358 return ret_str if status else ''
360 def put_command(self, to_send):
361 """ send data to the remote instance """
362 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
364 # NOTE: sendall will block, we need a timeout
365 self._sock.sendall(to_send.encode('utf-8'))
366 except: # pylint: disable=bare-except
369 def get_packet_dump(self):
370 """ get the next packet dump """
372 return self._pkt_dumps.pop(0)
375 def stop_all_reset(self):
376 """ stop the remote instance and reset stats """
377 LOG.debug("Stop all and reset stats")
382 """ stop all cores on the remote instance """
383 LOG.debug("Stop all")
384 self.put_command("stop all\n")
387 def stop(self, cores, task=''):
388 """ stop specific cores on the remote instance """
389 LOG.debug("Stopping cores %s", cores)
390 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
394 """ start all cores on the remote instance """
395 LOG.debug("Start all")
396 self.put_command("start all\n")
398 def start(self, cores):
399 """ start specific cores on the remote instance """
400 LOG.debug("Starting cores %s", cores)
401 self.put_command("start {}\n".format(join_non_strings(',', cores)))
404 def reset_stats(self):
405 """ reset the statistics on the remote instance """
406 LOG.debug("Reset stats")
407 self.put_command("reset stats\n")
410 def _run_template_over_cores(self, template, cores, *args):
412 self.put_command(template.format(core, *args))
414 def set_pkt_size(self, cores, pkt_size):
415 """ set the packet size to generate on the remote instance """
416 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
418 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
421 def set_value(self, cores, offset, value, length):
422 """ set value on the remote instance """
423 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
424 LOG.debug(msg, cores, value, length, offset)
425 template = "set value {} 0 {} {} {}\n"
426 self._run_template_over_cores(template, cores, offset, value, length)
428 def reset_values(self, cores):
429 """ reset values on the remote instance """
430 LOG.debug("Set value for core(s) %s", cores)
431 self._run_template_over_cores("reset values {} 0\n", cores)
433 def set_speed(self, cores, speed, tasks=None):
434 """ set speed on the remote instance """
436 tasks = [0] * len(cores)
437 elif len(tasks) != len(cores):
438 LOG.error("set_speed: cores and tasks must have the same len")
439 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
440 for (core, task) in list(zip(cores, tasks)):
441 self.put_command("speed {} {} {}\n".format(core, task, speed))
443 def slope_speed(self, cores_speed, duration, n_steps=0):
444 """will start to increase speed from 0 to N where N is taken from
445 a['speed'] for each a in cores_speed"""
446 # by default, each step will take 0.5 sec
448 n_steps = duration * 2
450 private_core_data = []
451 step_duration = float(duration) / n_steps
452 for core_data in cores_speed:
453 target = float(core_data['speed'])
454 private_core_data.append({
455 'cores': core_data['cores'],
457 'delta': target / n_steps,
462 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
463 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
464 time.sleep(step_duration)
465 for core_data in private_core_data:
466 core_data['current'] = core_data[key1] + core_data[key2]
467 self.set_speed(core_data['cores'], core_data['current'])
469 def set_pps(self, cores, pps, pkt_size):
470 """ set packets per second for specific cores on the remote instance """
471 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
472 LOG.debug(msg, cores, pps, pkt_size)
474 # speed in percent of line-rate
475 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
476 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
478 def lat_stats(self, cores, task=0):
479 """Get the latency statistics from the remote system"""
480 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
485 self.put_command("lat stats {} {} \n".format(core, task))
486 ret = self.get_data()
489 lat_min[core], lat_max[core], lat_avg[core] = \
490 tuple(int(n) for n in ret.split(",")[:3])
492 except (AttributeError, ValueError, TypeError):
495 return lat_min, lat_max, lat_avg
497 def get_all_tot_stats(self):
498 self.put_command("tot stats\n")
499 all_stats_str = self.get_data().split(",")
500 if len(all_stats_str) != 4:
503 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
504 self.master_stats = all_stats
508 return self.get_all_tot_stats()[3]
510 def core_stats(self, cores, task=0):
511 """Get the receive statistics from the remote system"""
512 rx = tx = drop = tsc = 0
514 self.put_command("core stats {} {}\n".format(core, task))
515 ret = self.get_data().split(",")
520 return rx, tx, drop, tsc
522 def port_stats(self, ports):
523 """get counter values from a specific port"""
524 tot_result = [0] * 12
526 self.put_command("port_stats {}\n".format(port))
527 ret = [try_int(s, 0) for s in self.get_data().split(",")]
528 tot_result = [sum(x) for x in zip(tot_result, ret)]
532 def measure_tot_stats(self):
533 start = self.get_all_tot_stats()
534 container = {'start_tot': start}
538 container['end_tot'] = end = self.get_all_tot_stats()
540 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
543 """Get the total statistics from the remote system"""
544 stats = self.get_all_tot_stats()
547 def tot_ierrors(self):
548 """Get the total ierrors from the remote system"""
549 self.put_command("tot ierrors tot\n")
550 recv = self.get_data().split(',')
551 tot_ierrors = int(recv[0])
553 return tot_ierrors, tsc
555 def set_count(self, count, cores):
556 """Set the number of packets to send on the specified core"""
557 self._run_template_over_cores("count {} 0 {}\n", cores, count)
559 def dump_rx(self, core_id, task_id=0, count=1):
560 """Activate dump on rx on the specified core"""
561 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
562 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
563 time.sleep(1.5) # Give PROX time to set up packet dumping
571 """ stop all cores on the remote instance """
572 LOG.debug("Quit prox")
573 self.put_command("quit\n")
576 def force_quit(self):
577 """ stop all cores on the remote instance """
578 LOG.debug("Force Quit prox")
579 self.put_command("quit_force\n")
583 _LOCAL_OBJECT = object()
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587 # the actual app is lowercase
589 # not used for Prox but added for consistency
592 LUA_PARAMETER_NAME = ""
593 LUA_PARAMETER_PEER = {
598 CONFIG_QUEUE_TIMEOUT = 120
600 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
601 self.remote_path = None
602 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
603 self.remote_prox_file_name = None
604 self._prox_config_data = None
605 self.additional_files = {}
606 self.config_queue = Queue()
607 # allow_exit_without_flush
608 self.config_queue.cancel_join_thread()
609 self._global_section = None
612 def prox_config_data(self):
613 if self._prox_config_data is None:
614 # this will block, but it needs too
615 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
616 return self._prox_config_data
619 def global_section(self):
620 if self._global_section is None and self.prox_config_data:
621 self._global_section = self.find_section("global")
622 return self._global_section
624 def find_section(self, name, default=_LOCAL_OBJECT):
625 result = next((value for key, value in self.prox_config_data if key == name), default)
626 if result is _LOCAL_OBJECT:
627 raise KeyError('{} not found in Prox config'.format(name))
630 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
631 section = self.find_section(section_name, [])
632 result = next((value for key, value in section if key == section_key), default)
633 if result is _LOCAL_OBJECT:
634 template = '{} not found in {} section of Prox config'
635 raise KeyError(template.format(section_key, section_name))
638 def copy_to_target(self, config_file_path, prox_file):
639 remote_path = os.path.join("/tmp", prox_file)
640 self.ssh_helper.put(config_file_path, remote_path)
644 def _get_tx_port(section, sections):
646 for item in sections[section]:
647 if item[0] == "tx port":
648 iface_port = re.findall(r'\d+', item[1])
649 # do we want the last one?
650 # if yes, then can we reverse?
651 return int(iface_port[0])
654 def _replace_quoted_with_value(quoted, value, count=1):
655 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
658 def _insert_additional_file(self, value):
659 file_str = value.split('"')
660 base_name = os.path.basename(file_str[1])
661 file_str[1] = self.additional_files[base_name]
662 return '"'.join(file_str)
664 def generate_prox_config_file(self, config_path):
666 prox_config = ConfigParser(config_path, sections)
669 # Ensure MAC is set "hardware"
670 all_ports = self.vnfd_helper.port_pairs.all_ports
671 # use dpdk port number
672 for port_name in all_ports:
673 port_num = self.vnfd_helper.port_num(port_name)
674 port_section_name = "port {}".format(port_num)
675 for section_name, section in sections:
676 if port_section_name != section_name:
679 for section_data in section:
680 if section_data[0] == "mac":
681 section_data[1] = "hardware"
684 for _, section in sections:
685 for section_data in section:
686 item_key, item_val = section_data
687 if item_val.startswith("@@dst_mac"):
688 tx_port_iter = re.finditer(r'\d+', item_val)
689 tx_port_no = int(next(tx_port_iter).group(0))
690 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
691 mac = intf["virtual-interface"]["dst_mac"]
692 section_data[1] = mac.replace(":", " ", 6)
694 if item_key == "dst mac" and item_val.startswith("@@"):
695 tx_port_iter = re.finditer(r'\d+', item_val)
696 tx_port_no = int(next(tx_port_iter).group(0))
697 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
698 mac = intf["virtual-interface"]["dst_mac"]
699 section_data[1] = mac
701 # if addition file specified in prox config
702 if not self.additional_files:
705 for section_name, section in sections:
706 for section_data in section:
708 if section_data[0].startswith("dofile"):
709 section_data[0] = self._insert_additional_file(section_data[0])
711 if section_data[1].startswith("dofile"):
712 section_data[1] = self._insert_additional_file(section_data[1])
713 except: # pylint: disable=bare-except
719 def write_prox_lua(lua_config):
721 Write an .ini-format config file for PROX (parameters.lua)
722 PROX does not allow a space before/after the =, so we need
726 for key in lua_config:
727 value = '"' + lua_config[key] + '"'
728 if key == "__name__":
730 if value is not None and value != '@':
731 key = "=".join((key, str(value).replace('\n', '\n\t')))
734 key = str(key).replace('\n', '\n\t')
736 return os.linesep.join(out)
739 def write_prox_config(prox_config):
741 Write an .ini-format config file for PROX
742 PROX does not allow a space before/after the =, so we need
746 for (section_name, section) in prox_config:
747 out.append("[{}]".format(section_name))
750 if key == "__name__":
752 if value is not None and value != '@':
753 key = "=".join((key, str(value).replace('\n', '\n\t')))
756 key = str(key).replace('\n', '\n\t')
758 return os.linesep.join(out)
760 def put_string_to_file(self, s, remote_path):
761 file_obj = cStringIO(s)
762 self.ssh_helper.put_file_obj(file_obj, remote_path)
765 def generate_prox_lua_file(self):
767 all_ports = self.vnfd_helper.port_pairs.all_ports
768 for port_name in all_ports:
769 port_num = self.vnfd_helper.port_num(port_name)
770 intf = self.vnfd_helper.find_interface(name=port_name)
771 vintf = intf['virtual-interface']
772 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
773 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
777 def upload_prox_lua(self, config_file, lua_data):
778 # prox can't handle spaces around ' = ' so use custom method
779 out = StringIO(self.write_prox_lua(lua_data))
781 remote_path = os.path.join("/tmp", config_file)
782 self.ssh_helper.put_file_obj(out, remote_path)
786 def upload_prox_config(self, config_file, prox_config_data):
787 # prox can't handle spaces around ' = ' so use custom method
788 out = StringIO(self.write_prox_config(prox_config_data))
790 remote_path = os.path.join("/tmp", config_file)
791 self.ssh_helper.put_file_obj(out, remote_path)
795 def build_config_file(self):
796 task_path = self.scenario_helper.task_path
797 options = self.scenario_helper.options
798 config_path = options['prox_config']
799 config_file = os.path.basename(config_path)
800 config_path = utils.find_relative_file(config_path, task_path)
801 self.additional_files = {}
804 if options['prox_generate_parameter']:
806 self.lua = self.generate_prox_lua_file()
807 if len(self.lua) > 0:
808 self.upload_prox_lua("parameters.lua", self.lua)
809 except: # pylint: disable=bare-except
812 prox_files = options.get('prox_files', [])
813 if isinstance(prox_files, six.string_types):
814 prox_files = [prox_files]
815 for key_prox_file in prox_files:
816 base_prox_file = os.path.basename(key_prox_file)
817 key_prox_path = utils.find_relative_file(key_prox_file, task_path)
818 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
819 self.additional_files[base_prox_file] = remote_prox_file
821 self._prox_config_data = self.generate_prox_config_file(config_path)
822 # copy config to queue so we can read it from traffic_runner process
823 self.config_queue.put(self._prox_config_data)
824 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
826 def build_config(self):
827 self.build_config_file()
829 options = self.scenario_helper.options
830 prox_args = options['prox_args']
831 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
833 self.pipeline_kwargs = {
834 'tool_path': tool_path,
835 'tool_dir': os.path.dirname(tool_path),
836 'cfg_file': self.remote_path,
837 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
838 for k, v in prox_args.items())
841 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
842 "{args} -f {cfg_file} '")
843 return cmd_template.format(**self.pipeline_kwargs)
846 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
847 class ProxResourceHelper(ClientResourceHelper):
849 RESOURCE_WORD = 'prox'
856 def find_pci(pci, bound_pci):
857 # we have to substring match PCI bus address from the end
858 return any(b.endswith(pci) for b in bound_pci)
860 def __init__(self, setup_helper):
861 super(ProxResourceHelper, self).__init__(setup_helper)
862 self.mgmt_interface = self.vnfd_helper.mgmt_interface
863 self._user = self.mgmt_interface["user"]
864 self._ip = self.mgmt_interface["ip"]
867 self._vpci_to_if_name_map = None
868 self.additional_file = {}
869 self.remote_prox_file_name = None
874 self._test_type = None
879 self.client = self._connect()
884 if self._test_type is None:
885 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
886 return self._test_type
888 def run_traffic(self, traffic_profile):
889 self._queue.cancel_join_thread()
893 traffic_profile.init(self._queue)
894 # this frees up the run_traffic loop
895 self.client_started.value = 1
897 while not self._terminated.value:
898 # move it all to traffic_profile
899 self._run_traffic_once(traffic_profile)
901 def _run_traffic_once(self, traffic_profile):
902 traffic_profile.execute_traffic(self)
903 if traffic_profile.done:
904 self._queue.put({'done': True})
905 LOG.debug("tg_prox done")
906 self._terminated.value = 1
908 # For VNF use ResourceHelper method to collect KPIs directly.
909 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
910 def collect_collectd_kpi(self):
911 return self._collect_resource_kpi()
913 def collect_kpi(self):
914 result = super(ProxResourceHelper, self).collect_kpi()
915 # add in collectd kpis manually
917 result['collect_stats'] = self._collect_resource_kpi()
921 # should not be called, use VNF terminate
922 raise NotImplementedError()
925 return self.sut # force connection
927 def execute(self, cmd, *args, **kwargs):
928 func = getattr(self.sut, cmd, None)
930 return func(*args, **kwargs)
933 def _connect(self, client=None):
934 """Run and connect to prox on the remote system """
935 # De-allocating a large amount of hugepages takes some time. If a new
936 # PROX instance is started immediately after killing the previous one,
937 # it might not be able to allocate hugepages, because they are still
938 # being freed. Hence the -w switch.
939 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
940 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
941 # -f ./handle_none-4.cfg"
942 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
944 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
945 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
947 # + "./build/Prox " + prox_args
948 # log.debug("Starting PROX with command [%s]", prox_cmd)
949 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
950 # self._ip, prox_cmd))
952 client = ProxSocketHelper()
954 # try connecting to Prox for 60s
955 for _ in range(RETRY_SECONDS):
956 time.sleep(RETRY_INTERVAL)
958 client.connect(self._ip, PROX_PORT)
959 except (socket.gaierror, socket.error):
964 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
965 raise Exception(msg.format(self._ip, PROX_PORT))
968 class ProxDataHelper(object):
970 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
971 super(ProxDataHelper, self).__init__()
972 self.vnfd_helper = vnfd_helper
974 self.pkt_size = pkt_size
976 self.tolerated_loss = tolerated_loss
977 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
979 self.measured_stats = None
981 self._totals_and_pps = None
982 self.result_tuple = None
985 def totals_and_pps(self):
986 if self._totals_and_pps is None:
987 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
988 pps = self.value / 100.0 * self.line_rate_to_pps()
989 self._totals_and_pps = rx_total, tx_total, pps
990 return self._totals_and_pps
994 return self.totals_and_pps[0]
998 return self.totals_and_pps[1]
1002 return self.totals_and_pps[2]
1007 for port_name, port_num in self.vnfd_helper.ports_iter():
1009 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1010 samples[port_name] = {
1011 "in_packets": port_rx_total,
1012 "out_packets": port_tx_total,
1014 except (KeyError, TypeError, NameError, MemoryError, ValueError,
1015 SystemError, BufferError):
1016 samples[port_name] = {
1022 def __enter__(self):
1023 self.check_interface_count()
1026 def __exit__(self, exc_type, exc_val, exc_tb):
1029 def make_tuple(self):
1030 if self.result_tuple:
1033 self.result_tuple = ProxTestDataTuple(
1034 self.tolerated_loss,
1036 self.measured_stats['delta'].rx,
1037 self.measured_stats['delta'].tx,
1038 self.measured_stats['delta'].tsc,
1044 self.result_tuple.log_data()
1047 def measure_tot_stats(self):
1048 with self.sut.measure_tot_stats() as self.measured_stats:
1051 def check_interface_count(self):
1052 # do this assert in init? unless we expect interface count to
1053 # change from one run to another run...
1054 assert self.port_count in {1, 2, 4}, \
1055 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1057 def capture_tsc_hz(self):
1058 self.tsc_hz = float(self.sut.hz())
1060 def line_rate_to_pps(self):
1061 # NOTE: to fix, don't hardcode 10Gb/s
1062 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1065 class ProxProfileHelper(object):
1067 __prox_profile_type__ = "Generic"
1069 PROX_CORE_GEN_MODE = "gen"
1070 PROX_CORE_LAT_MODE = "lat"
1073 def get_cls(cls, helper_type):
1074 """Return class of specified type."""
1076 return ProxProfileHelper
1078 for profile_helper_class in utils.itersubclasses(cls):
1079 if helper_type == profile_helper_class.__prox_profile_type__:
1080 return profile_helper_class
1082 return ProxProfileHelper
1085 def make_profile_helper(cls, resource_helper):
1086 return cls.get_cls(resource_helper.test_type)(resource_helper)
1088 def __init__(self, resource_helper):
1089 super(ProxProfileHelper, self).__init__()
1090 self.resource_helper = resource_helper
1091 self._cpu_topology = None
1092 self._test_cores = None
1093 self._latency_cores = None
1096 def cpu_topology(self):
1097 if not self._cpu_topology:
1098 stdout = io.BytesIO()
1099 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1100 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1101 return self._cpu_topology
1104 def test_cores(self):
1105 if not self._test_cores:
1106 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1107 return self._test_cores
1110 def latency_cores(self):
1111 if not self._latency_cores:
1112 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1113 return self._latency_cores
1116 def traffic_context(self, pkt_size, value):
1118 self.sut.reset_stats()
1120 self.sut.set_pkt_size(self.test_cores, pkt_size)
1121 self.sut.set_speed(self.test_cores, value)
1122 self.sut.start_all()
1127 def get_cores(self, mode):
1130 for section_name, section in self.setup_helper.prox_config_data:
1131 if not section_name.startswith("core"):
1134 for key, value in section:
1135 if key == "mode" and value == mode:
1136 core_tuple = CoreSocketTuple(section_name)
1137 core = core_tuple.core_id
1142 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1143 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1145 with data_helper, self.traffic_context(pkt_size, value):
1146 with data_helper.measure_tot_stats():
1147 time.sleep(duration)
1148 # Getting statistics to calculate PPS at right speed....
1149 data_helper.capture_tsc_hz()
1150 data_helper.latency = self.get_latency()
1152 return data_helper.result_tuple, data_helper.samples
1154 def get_latency(self):
1156 :return: return lat_min, lat_max, lat_avg
1160 if not self._latency_cores:
1161 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1163 if self._latency_cores:
1164 return self.sut.lat_stats(self._latency_cores)
1167 def terminate(self):
1170 def __getattr__(self, item):
1171 return getattr(self.resource_helper, item)
1174 class ProxMplsProfileHelper(ProxProfileHelper):
1176 __prox_profile_type__ = "MPLS tag/untag"
1178 def __init__(self, resource_helper):
1179 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1180 self._cores_tuple = None
1183 def mpls_cores(self):
1184 if not self._cores_tuple:
1185 self._cores_tuple = self.get_cores_mpls()
1186 return self._cores_tuple
1189 def tagged_cores(self):
1190 return self.mpls_cores[0]
1193 def plain_cores(self):
1194 return self.mpls_cores[1]
1196 def get_cores_mpls(self):
1199 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1200 if not section_name.startswith("core"):
1203 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1206 for item_key, item_value in section:
1207 if item_key != 'name':
1210 if item_value.startswith("tag"):
1211 core_tuple = CoreSocketTuple(section_name)
1212 core_tag = core_tuple.core_id
1213 cores_tagged.append(core_tag)
1215 elif item_value.startswith("udp"):
1216 core_tuple = CoreSocketTuple(section_name)
1217 core_udp = core_tuple.core_id
1218 cores_plain.append(core_udp)
1220 return cores_tagged, cores_plain
1223 def traffic_context(self, pkt_size, value):
1225 self.sut.reset_stats()
1227 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1228 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1229 self.sut.set_speed(self.tagged_cores, value)
1230 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1231 self.sut.set_speed(self.plain_cores, value * ratio)
1232 self.sut.start_all()
1238 class ProxBngProfileHelper(ProxProfileHelper):
1240 __prox_profile_type__ = "BNG gen"
1242 def __init__(self, resource_helper):
1243 super(ProxBngProfileHelper, self).__init__(resource_helper)
1244 self._cores_tuple = None
1247 def bng_cores(self):
1248 if not self._cores_tuple:
1249 self._cores_tuple = self.get_cores_gen_bng_qos()
1250 return self._cores_tuple
1253 def cpe_cores(self):
1254 return self.bng_cores[0]
1257 def inet_cores(self):
1258 return self.bng_cores[1]
1261 def arp_cores(self):
1262 return self.bng_cores[2]
1265 def arp_task_cores(self):
1266 return self.bng_cores[3]
1269 def all_rx_cores(self):
1270 return self.latency_cores
1272 def get_cores_gen_bng_qos(self):
1276 arp_tasks_core = [0]
1277 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1278 if not section_name.startswith("core"):
1281 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1284 for item_key, item_value in section:
1285 if item_key != 'name':
1288 if item_value.startswith("cpe"):
1289 core_tuple = CoreSocketTuple(section_name)
1290 cpe_core = core_tuple.core_id
1291 cpe_cores.append(cpe_core)
1293 elif item_value.startswith("inet"):
1294 core_tuple = CoreSocketTuple(section_name)
1295 inet_core = core_tuple.core_id
1296 inet_cores.append(inet_core)
1298 elif item_value.startswith("arp"):
1299 core_tuple = CoreSocketTuple(section_name)
1300 arp_core = core_tuple.core_id
1301 arp_cores.append(arp_core)
1303 # We check the tasks/core separately
1304 if item_value.startswith("arp_task"):
1305 core_tuple = CoreSocketTuple(section_name)
1306 arp_task_core = core_tuple.core_id
1307 arp_tasks_core.append(arp_task_core)
1309 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1312 def traffic_context(self, pkt_size, value):
1313 # Tester is sending packets at the required speed already after
1314 # setup_test(). Just get the current statistics, sleep the required
1315 # amount of time and calculate packet loss.
1316 inet_pkt_size = pkt_size
1317 cpe_pkt_size = pkt_size - 24
1318 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1320 curr_up_speed = curr_down_speed = 0
1321 max_up_speed = max_down_speed = value
1323 max_down_speed = value * ratio
1325 max_up_speed = value / ratio
1331 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1333 self.sut.start(self.all_rx_cores)
1335 self.sut.stop(self.all_rx_cores)
1337 self.sut.reset_stats()
1339 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1340 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1342 self.sut.reset_values(self.cpe_cores)
1343 self.sut.reset_values(self.inet_cores)
1345 # Set correct IP and UDP lengths in packet headers
1347 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1348 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1349 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1350 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1353 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1354 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1355 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1356 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1357 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1358 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1360 # Sending ARP to initialize tables - need a few seconds of generation
1361 # to make sure all CPEs are initialized
1362 LOG.info("Initializing SUT: sending ARP packets")
1363 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1364 self.sut.set_speed(self.inet_cores, curr_up_speed)
1365 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1366 self.sut.start(self.arp_cores)
1369 # Ramp up the transmission speed. First go to the common speed, then
1370 # increase steps for the faster one.
1371 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1373 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1375 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1376 # The min(..., ...) takes care of 1) floating point rounding errors
1377 # that could make curr_*_speed to be slightly greater than
1378 # max_*_speed and 2) max_*_speed not being an exact multiple of
1380 if curr_up_speed < max_up_speed:
1381 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1382 if curr_down_speed < max_down_speed:
1383 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1385 self.sut.set_speed(self.inet_cores, curr_up_speed)
1386 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1387 time.sleep(self.step_time)
1389 LOG.info("Target speeds reached. Starting real test.")
1393 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1394 LOG.info("Test ended. Flushing NIC buffers")
1395 self.sut.start(self.all_rx_cores)
1397 self.sut.stop(self.all_rx_cores)
1399 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1400 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1402 with data_helper, self.traffic_context(pkt_size, value):
1403 with data_helper.measure_tot_stats():
1404 time.sleep(duration)
1405 # Getting statistics to calculate PPS at right speed....
1406 data_helper.capture_tsc_hz()
1407 data_helper.latency = self.get_latency()
1409 return data_helper.result_tuple, data_helper.samples
1412 class ProxVpeProfileHelper(ProxProfileHelper):
1414 __prox_profile_type__ = "vPE gen"
1416 def __init__(self, resource_helper):
1417 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1418 self._cores_tuple = None
1419 self._ports_tuple = None
1422 def vpe_cores(self):
1423 if not self._cores_tuple:
1424 self._cores_tuple = self.get_cores_gen_vpe()
1425 return self._cores_tuple
1428 def cpe_cores(self):
1429 return self.vpe_cores[0]
1432 def inet_cores(self):
1433 return self.vpe_cores[1]
1436 def all_rx_cores(self):
1437 return self.latency_cores
1440 def vpe_ports(self):
1441 if not self._ports_tuple:
1442 self._ports_tuple = self.get_ports_gen_vpe()
1443 return self._ports_tuple
1446 def cpe_ports(self):
1447 return self.vpe_ports[0]
1450 def inet_ports(self):
1451 return self.vpe_ports[1]
1453 def get_cores_gen_vpe(self):
1456 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1457 if not section_name.startswith("core"):
1460 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1463 for item_key, item_value in section:
1464 if item_key != 'name':
1467 if item_value.startswith("cpe"):
1468 core_tuple = CoreSocketTuple(section_name)
1469 core_tag = core_tuple.core_id
1470 cpe_cores.append(core_tag)
1472 elif item_value.startswith("inet"):
1473 core_tuple = CoreSocketTuple(section_name)
1474 inet_core = core_tuple.core_id
1475 inet_cores.append(inet_core)
1477 return cpe_cores, inet_cores
1479 def get_ports_gen_vpe(self):
1483 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1484 if not section_name.startswith("port"):
1486 tx_port_iter = re.finditer(r'\d+', section_name)
1487 tx_port_no = int(next(tx_port_iter).group(0))
1489 for item_key, item_value in section:
1490 if item_key != 'name':
1493 if item_value.startswith("cpe"):
1494 cpe_ports.append(tx_port_no)
1496 elif item_value.startswith("inet"):
1497 inet_ports.append(tx_port_no)
1499 return cpe_ports, inet_ports
1502 def traffic_context(self, pkt_size, value):
1503 # Calculate the target upload and download speed. The upload and
1504 # download packets have different packet sizes, so in order to get
1505 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1506 # of the packet sizes.
1507 cpe_pkt_size = pkt_size
1508 inet_pkt_size = pkt_size - 4
1509 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1511 curr_up_speed = curr_down_speed = 0
1512 max_up_speed = max_down_speed = value
1514 max_down_speed = value * ratio
1516 max_up_speed = value / ratio
1518 # Adjust speed when multiple cores per port are used to generate traffic
1519 if len(self.cpe_ports) != len(self.cpe_cores):
1520 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1521 if len(self.inet_ports) != len(self.inet_cores):
1522 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1528 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1530 self.sut.start(self.all_rx_cores)
1532 self.sut.stop(self.all_rx_cores)
1534 self.sut.reset_stats()
1536 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1537 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1539 self.sut.reset_values(self.cpe_cores)
1540 self.sut.reset_values(self.inet_cores)
1542 # Set correct IP and UDP lengths in packet headers
1543 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1544 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1545 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1546 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1548 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1549 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1550 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1551 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1553 self.sut.set_speed(self.inet_cores, curr_up_speed)
1554 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1556 # Ramp up the transmission speed. First go to the common speed, then
1557 # increase steps for the faster one.
1558 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1560 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1562 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1563 # The min(..., ...) takes care of 1) floating point rounding errors
1564 # that could make curr_*_speed to be slightly greater than
1565 # max_*_speed and 2) max_*_speed not being an exact multiple of
1567 if curr_up_speed < max_up_speed:
1568 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1569 if curr_down_speed < max_down_speed:
1570 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1572 self.sut.set_speed(self.inet_cores, curr_up_speed)
1573 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1574 time.sleep(self.step_time)
1576 LOG.info("Target speeds reached. Starting real test.")
1580 self.sut.stop(self.cpe_cores + self.inet_cores)
1581 LOG.info("Test ended. Flushing NIC buffers")
1582 self.sut.start(self.all_rx_cores)
1584 self.sut.stop(self.all_rx_cores)
1586 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1587 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1589 with data_helper, self.traffic_context(pkt_size, value):
1590 with data_helper.measure_tot_stats():
1591 time.sleep(duration)
1592 # Getting statistics to calculate PPS at right speed....
1593 data_helper.capture_tsc_hz()
1594 data_helper.latency = self.get_latency()
1596 return data_helper.result_tuple, data_helper.samples
1599 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1601 __prox_profile_type__ = "lwAFTR gen"
1603 def __init__(self, resource_helper):
1604 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1605 self._cores_tuple = None
1606 self._ports_tuple = None
1608 self.step_time = 0.5
1611 def _lwaftr_cores(self):
1612 if not self._cores_tuple:
1613 self._cores_tuple = self._get_cores_gen_lwaftr()
1614 return self._cores_tuple
1617 def tun_cores(self):
1618 return self._lwaftr_cores[0]
1621 def inet_cores(self):
1622 return self._lwaftr_cores[1]
1625 def _lwaftr_ports(self):
1626 if not self._ports_tuple:
1627 self._ports_tuple = self._get_ports_gen_lw_aftr()
1628 return self._ports_tuple
1631 def tun_ports(self):
1632 return self._lwaftr_ports[0]
1635 def inet_ports(self):
1636 return self._lwaftr_ports[1]
1639 def all_rx_cores(self):
1640 return self.latency_cores
1642 def _get_cores_gen_lwaftr(self):
1645 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1646 if not section_name.startswith("core"):
1649 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1652 core_tuple = CoreSocketTuple(section_name)
1653 core_tag = core_tuple.core_id
1654 for item_value in (v for k, v in section if k == 'name'):
1655 if item_value.startswith('tun'):
1656 tun_cores.append(core_tag)
1657 elif item_value.startswith('inet'):
1658 inet_cores.append(core_tag)
1660 return tun_cores, inet_cores
1662 def _get_ports_gen_lw_aftr(self):
1666 re_port = re.compile(r'port (\d+)')
1667 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1668 match = re_port.search(section_name)
1672 tx_port_no = int(match.group(1))
1673 for item_value in (v for k, v in section if k == 'name'):
1674 if item_value.startswith('lwB4'):
1675 tun_ports.append(tx_port_no)
1676 elif item_value.startswith('inet'):
1677 inet_ports.append(tx_port_no)
1679 return tun_ports, inet_ports
1682 def _resize(len1, len2):
1685 return 1.0 * len1 / len2
1688 def traffic_context(self, pkt_size, value):
1689 # Tester is sending packets at the required speed already after
1690 # setup_test(). Just get the current statistics, sleep the required
1691 # amount of time and calculate packet loss.
1692 tun_pkt_size = pkt_size
1693 inet_pkt_size = pkt_size - 40
1694 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1696 curr_up_speed = curr_down_speed = 0
1697 max_up_speed = max_down_speed = value
1699 max_up_speed = value / ratio
1701 # Adjust speed when multiple cores per port are used to generate traffic
1702 if len(self.tun_ports) != len(self.tun_cores):
1703 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1704 if len(self.inet_ports) != len(self.inet_cores):
1705 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1711 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1713 self.sut.start(self.all_rx_cores)
1715 self.sut.stop(self.all_rx_cores)
1717 self.sut.reset_stats()
1719 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1720 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1722 self.sut.reset_values(self.tun_cores)
1723 self.sut.reset_values(self.inet_cores)
1725 # Set correct IP and UDP lengths in packet headers
1727 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1728 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1729 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1730 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1731 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1732 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1735 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1736 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1737 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1738 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1740 LOG.info("Initializing SUT: sending lwAFTR packets")
1741 self.sut.set_speed(self.inet_cores, curr_up_speed)
1742 self.sut.set_speed(self.tun_cores, curr_down_speed)
1745 # Ramp up the transmission speed. First go to the common speed, then
1746 # increase steps for the faster one.
1747 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1749 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1751 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1752 # The min(..., ...) takes care of 1) floating point rounding errors
1753 # that could make curr_*_speed to be slightly greater than
1754 # max_*_speed and 2) max_*_speed not being an exact multiple of
1756 if curr_up_speed < max_up_speed:
1757 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1758 if curr_down_speed < max_down_speed:
1759 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1761 self.sut.set_speed(self.inet_cores, curr_up_speed)
1762 self.sut.set_speed(self.tun_cores, curr_down_speed)
1763 time.sleep(self.step_time)
1765 LOG.info("Target speeds reached. Starting real test.")
1769 self.sut.stop(self.tun_cores + self.inet_cores)
1770 LOG.info("Test ended. Flushing NIC buffers")
1771 self.sut.start(self.all_rx_cores)
1773 self.sut.stop(self.all_rx_cores)
1775 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1776 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1778 with data_helper, self.traffic_context(pkt_size, value):
1779 with data_helper.measure_tot_stats():
1780 time.sleep(duration)
1781 # Getting statistics to calculate PPS at right speed....
1782 data_helper.capture_tsc_hz()
1783 data_helper.latency = self.get_latency()
1785 return data_helper.result_tuple, data_helper.samples