1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
26 from contextlib import contextmanager
27 from itertools import repeat, chain
28 from multiprocessing import Queue
31 from six.moves import cStringIO
32 from six.moves import zip, StringIO
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common import utils
36 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
37 from yardstick.network_services.helpers.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
54 CONFIGURATION_OPTIONS = (
55 # dict key section key default value
56 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57 ('testDuration', 'general', 'test_duration', 5.0),
58 ('testPrecision', 'general', 'test_precision', 1.0),
59 ('tests', 'general', 'tests', None),
60 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62 ('logFile', 'logging', 'file', 'dats.log'),
63 ('logDateFormat', 'logging', 'datefmt', None),
64 ('logLevel', 'logging', 'level', 'INFO'),
65 ('logOverwrite', 'logging', 'overwrite', 1),
67 ('testerIp', 'tester', 'ip', None),
68 ('testerUser', 'tester', 'user', 'root'),
69 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72 ('testerSocketId', 'tester', 'socket_id', 0),
74 ('sutIp', 'sut', 'ip', None),
75 ('sutUser', 'sut', 'user', 'root'),
76 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79 ('sutSocketId', 'sut', 'socket_id', 0),
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86 def __new__(cls, *args):
88 matches = cls.CORE_RE.search(str(args[0]))
90 args = matches.groups()
92 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93 'h' if args[2] else '')
95 except (AttributeError, TypeError, IndexError, ValueError):
96 raise ValueError('Invalid core spec {}'.format(args))
98 def is_hyperthread(self):
99 return self.hyperthread == 'h'
103 return int(self.is_hyperthread())
105 def find_in_topology(self, cpu_topology):
107 socket_core_match = cpu_topology[self.socket_id][self.core_id]
108 sorted_match = sorted(socket_core_match.values())
109 return sorted_match[self.index][0]
110 except (KeyError, IndexError):
111 template = "Core {}{} on socket {} does not exist"
112 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116 def __new__(cls, *args):
118 assert args[0] is not str(args[0])
119 args = tuple(args[0])
120 except (AssertionError, IndexError, TypeError):
123 return super(TotStatsTuple, cls).__new__(cls, *args)
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127 'delta_tx,delta_tsc,'
128 'latency,rx_total,tx_total,pps')):
132 return 1e2 * self.drop_total / float(self.tx_total)
133 except ZeroDivisionError:
138 # calculate the effective throughput in Mpps
139 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
142 def can_be_lost(self):
143 return int(self.tx_total * self.tolerated / 1e2)
146 def drop_total(self):
147 return self.tx_total - self.rx_total
151 return self.drop_total <= self.can_be_lost
153 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
155 pkt_loss = self.pkt_loss
157 if port_samples is None:
167 "Throughput": self.mpps,
168 "DropPackets": pkt_loss,
169 "CurrentDropPackets": pkt_loss,
170 "TxThroughput": self.pps / 1e6,
171 "RxThroughput": self.mpps,
175 samples.update(port_samples)
177 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
180 def log_data(self, logger=None):
184 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
185 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
186 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
189 class PacketDump(object):
191 def assert_func(func, value1, value2, template=None):
192 assert func(value1, value2), template.format(value1, value2)
194 def __init__(self, port_id, data_len, payload):
195 template = "Packet dump has specified length {}, but payload is {} bytes long"
196 self.assert_func(operator.eq, data_len, len(payload), template)
197 self._port_id = port_id
198 self._data_len = data_len
199 self._payload = payload
203 """Get the port id of the packet dump"""
208 """Get the length of the data received"""
209 return self._data_len
212 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
214 def payload(self, start=None, end=None):
215 """Get part of the payload as a list of ordinals.
217 Returns a list of byte values, matching the contents of the packet dump.
218 Optional start and end parameters can be specified to retrieve only a
219 part of the packet contents.
221 The number of elements in the list is equal to end - start + 1, so end
222 is the offset of the last character.
225 start (pos. int): the starting offset in the payload. If it is not
226 specified or None, offset 0 is assumed.
227 end (pos. int): the ending offset of the payload. If it is not
228 specified or None, the contents until the end of the packet are
232 [int, int, ...]. Each int represents the ordinal value of a byte in
239 end = self.data_len - 1
241 # Bounds checking on offsets
242 template = "Start offset must be non-negative"
243 self.assert_func(operator.ge, start, 0, template)
245 template = "End offset must be less than {1}"
246 self.assert_func(operator.lt, end, self.data_len, template)
248 # Adjust for splice operation: end offset must be 1 more than the offset
249 # of the last desired character.
252 return self._payload[start:end]
255 class ProxSocketHelper(object):
257 def __init__(self, sock=None):
258 """ creates new prox instance """
259 super(ProxSocketHelper, self).__init__()
262 sock = socket.socket()
266 self.master_stats = None
268 def connect(self, ip, port):
269 """Connect to the prox instance on the remote system"""
270 self._sock.connect((ip, port))
272 def get_socket(self):
273 """ get the socket connected to the remote instance """
276 def _parse_socket_data(self, decoded_data, pkt_dump_only):
277 def get_newline_index():
278 return decoded_data.find('\n', index)
282 for newline_index in iter(get_newline_index, -1):
283 ret_str = decoded_data[index:newline_index]
286 mode, port_id, data_len = ret_str.split(',', 2)
288 mode, port_id, data_len = None, None, None
290 if mode != 'pktdump':
291 # Regular 1-line message. Stop reading from the socket.
292 LOG.debug("Regular response read")
295 LOG.debug("Packet dump header read: [%s]", ret_str)
297 # The line is a packet dump header. Parse it, read the
298 # packet payload, store the dump for later retrieval.
299 # Skip over the packet dump and continue processing: a
300 # 1-line response may follow the packet dump.
302 data_len = int(data_len)
303 data_start = newline_index + 1 # + 1 to skip over \n
304 data_end = data_start + data_len
305 sub_data = decoded_data[data_start:data_end]
306 pkt_payload = array.array('B', (ord(v) for v in sub_data))
307 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
308 self._pkt_dumps.append(pkt_dump)
311 # Return boolean instead of string to signal
312 # successful reception of the packet dump.
313 LOG.debug("Packet dump stored, returning")
320 def get_data(self, pkt_dump_only=False, timeout=1):
321 """ read data from the socket """
323 # This method behaves slightly differently depending on whether it is
324 # called to read the response to a command (pkt_dump_only = 0) or if
325 # it is called specifically to read a packet dump (pkt_dump_only = 1).
327 # Packet dumps look like:
328 # pktdump,<port_id>,<data_len>\n
329 # <packet contents as byte array>\n
330 # This means the total packet dump message consists of 2 lines instead
333 # - Response for a command (pkt_dump_only = 0):
334 # 1) Read response from the socket until \n (end of message)
335 # 2a) If the response is a packet dump header (starts with "pktdump,"):
336 # - Read the packet payload and store the packet dump for later
338 # - Reset the state and restart from 1). Eventually state 2b) will
339 # be reached and the function will return.
340 # 2b) If the response is not a packet dump:
341 # - Return the received message as a string
343 # - Explicit request to read a packet dump (pkt_dump_only = 1):
344 # - Read the dump header and payload
345 # - Store the packet dump for later retrieval
346 # - Return True to signify a packet dump was successfully read
349 # recv() is blocking, so avoid calling it when no data is waiting.
350 ready = select.select([self._sock], [], [], timeout)
351 return bool(ready[0])
355 for status in iter(is_ready, False):
356 decoded_data = self._sock.recv(256).decode('utf-8')
357 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
359 LOG.debug("Received data from socket: [%s]", ret_str)
360 return ret_str if status else ''
362 def put_command(self, to_send):
363 """ send data to the remote instance """
364 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
366 # TODO: sendall will block, we need a timeout
367 self._sock.sendall(to_send.encode('utf-8'))
371 def get_packet_dump(self):
372 """ get the next packet dump """
374 return self._pkt_dumps.pop(0)
377 def stop_all_reset(self):
378 """ stop the remote instance and reset stats """
379 LOG.debug("Stop all and reset stats")
384 """ stop all cores on the remote instance """
385 LOG.debug("Stop all")
386 self.put_command("stop all\n")
389 def stop(self, cores, task=''):
390 """ stop specific cores on the remote instance """
391 LOG.debug("Stopping cores %s", cores)
392 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
396 """ start all cores on the remote instance """
397 LOG.debug("Start all")
398 self.put_command("start all\n")
400 def start(self, cores):
401 """ start specific cores on the remote instance """
402 LOG.debug("Starting cores %s", cores)
403 self.put_command("start {}\n".format(join_non_strings(',', cores)))
406 def reset_stats(self):
407 """ reset the statistics on the remote instance """
408 LOG.debug("Reset stats")
409 self.put_command("reset stats\n")
412 def _run_template_over_cores(self, template, cores, *args):
414 self.put_command(template.format(core, *args))
416 def set_pkt_size(self, cores, pkt_size):
417 """ set the packet size to generate on the remote instance """
418 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
423 def set_value(self, cores, offset, value, length):
424 """ set value on the remote instance """
425 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426 LOG.debug(msg, cores, value, length, offset)
427 template = "set value {} 0 {} {} {}\n"
428 self._run_template_over_cores(template, cores, offset, value, length)
430 def reset_values(self, cores):
431 """ reset values on the remote instance """
432 LOG.debug("Set value for core(s) %s", cores)
433 self._run_template_over_cores("reset values {} 0\n", cores)
435 def set_speed(self, cores, speed, tasks=None):
436 """ set speed on the remote instance """
438 tasks = [0] * len(cores)
439 elif len(tasks) != len(cores):
440 LOG.error("set_speed: cores and tasks must have the same len")
441 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442 for (core, task) in list(zip(cores, tasks)):
443 self.put_command("speed {} {} {}\n".format(core, task, speed))
445 def slope_speed(self, cores_speed, duration, n_steps=0):
446 """will start to increase speed from 0 to N where N is taken from
447 a['speed'] for each a in cores_speed"""
448 # by default, each step will take 0.5 sec
450 n_steps = duration * 2
452 private_core_data = []
453 step_duration = float(duration) / n_steps
454 for core_data in cores_speed:
455 target = float(core_data['speed'])
456 private_core_data.append({
457 'cores': core_data['cores'],
459 'delta': target / n_steps,
464 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466 time.sleep(step_duration)
467 for core_data in private_core_data:
468 core_data['current'] = core_data[key1] + core_data[key2]
469 self.set_speed(core_data['cores'], core_data['current'])
471 def set_pps(self, cores, pps, pkt_size):
472 """ set packets per second for specific cores on the remote instance """
473 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474 LOG.debug(msg, cores, pps, pkt_size)
476 # speed in percent of line-rate
477 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
480 def lat_stats(self, cores, task=0):
481 """Get the latency statistics from the remote system"""
482 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
487 self.put_command("lat stats {} {} \n".format(core, task))
488 ret = self.get_data()
491 lat_min[core], lat_max[core], lat_avg[core] = \
492 tuple(int(n) for n in ret.split(",")[:3])
494 except (AttributeError, ValueError, TypeError):
497 return lat_min, lat_max, lat_avg
499 def get_all_tot_stats(self):
500 self.put_command("tot stats\n")
501 all_stats_str = self.get_data().split(",")
502 if len(all_stats_str) != 4:
505 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506 self.master_stats = all_stats
510 return self.get_all_tot_stats()[3]
512 def core_stats(self, cores, task=0):
513 """Get the receive statistics from the remote system"""
514 rx = tx = drop = tsc = 0
516 self.put_command("core stats {} {}\n".format(core, task))
517 ret = self.get_data().split(",")
522 return rx, tx, drop, tsc
524 def port_stats(self, ports):
525 """get counter values from a specific port"""
526 tot_result = [0] * 12
528 self.put_command("port_stats {}\n".format(port))
529 ret = [try_int(s, 0) for s in self.get_data().split(",")]
530 tot_result = [sum(x) for x in zip(tot_result, ret)]
534 def measure_tot_stats(self):
535 start = self.get_all_tot_stats()
536 container = {'start_tot': start}
540 container['end_tot'] = end = self.get_all_tot_stats()
542 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
545 """Get the total statistics from the remote system"""
546 stats = self.get_all_tot_stats()
549 def tot_ierrors(self):
550 """Get the total ierrors from the remote system"""
551 self.put_command("tot ierrors tot\n")
552 recv = self.get_data().split(',')
553 tot_ierrors = int(recv[0])
555 return tot_ierrors, tsc
557 def set_count(self, count, cores):
558 """Set the number of packets to send on the specified core"""
559 self._run_template_over_cores("count {} 0 {}\n", cores, count)
561 def dump_rx(self, core_id, task_id=0, count=1):
562 """Activate dump on rx on the specified core"""
563 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
564 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
565 time.sleep(1.5) # Give PROX time to set up packet dumping
573 """ stop all cores on the remote instance """
574 LOG.debug("Quit prox")
575 self.put_command("quit\n")
578 def force_quit(self):
579 """ stop all cores on the remote instance """
580 LOG.debug("Force Quit prox")
581 self.put_command("quit_force\n")
585 _LOCAL_OBJECT = object()
588 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
589 # the actual app is lowercase
591 # not used for Prox but added for consistency
594 LUA_PARAMETER_NAME = ""
595 LUA_PARAMETER_PEER = {
600 CONFIG_QUEUE_TIMEOUT = 120
602 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
603 self.remote_path = None
604 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
605 self.remote_prox_file_name = None
606 self._prox_config_data = None
607 self.additional_files = {}
608 self.config_queue = Queue()
609 # allow_exit_without_flush
610 self.config_queue.cancel_join_thread()
611 self._global_section = None
614 def prox_config_data(self):
615 if self._prox_config_data is None:
616 # this will block, but it needs too
617 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
618 return self._prox_config_data
621 def global_section(self):
622 if self._global_section is None and self.prox_config_data:
623 self._global_section = self.find_section("global")
624 return self._global_section
626 def find_section(self, name, default=_LOCAL_OBJECT):
627 result = next((value for key, value in self.prox_config_data if key == name), default)
628 if result is _LOCAL_OBJECT:
629 raise KeyError('{} not found in Prox config'.format(name))
632 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
633 section = self.find_section(section_name, [])
634 result = next((value for key, value in section if key == section_key), default)
635 if result is _LOCAL_OBJECT:
636 template = '{} not found in {} section of Prox config'
637 raise KeyError(template.format(section_key, section_name))
640 def _build_pipeline_kwargs(self):
641 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
642 self.pipeline_kwargs = {
643 'tool_path': tool_path,
644 'tool_dir': os.path.dirname(tool_path),
647 def copy_to_target(self, config_file_path, prox_file):
648 remote_path = os.path.join("/tmp", prox_file)
649 self.ssh_helper.put(config_file_path, remote_path)
653 def _get_tx_port(section, sections):
655 for item in sections[section]:
656 if item[0] == "tx port":
657 iface_port = re.findall(r'\d+', item[1])
658 # do we want the last one?
659 # if yes, then can we reverse?
660 return int(iface_port[0])
663 def _replace_quoted_with_value(quoted, value, count=1):
664 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
667 def _insert_additional_file(self, value):
668 file_str = value.split('"')
669 base_name = os.path.basename(file_str[1])
670 file_str[1] = self.additional_files[base_name]
671 return '"'.join(file_str)
673 def generate_prox_config_file(self, config_path):
675 prox_config = ConfigParser(config_path, sections)
678 # Ensure MAC is set "hardware"
679 all_ports = self.vnfd_helper.port_pairs.all_ports
680 # use dpdk port number
681 for port_name in all_ports:
682 port_num = self.vnfd_helper.port_num(port_name)
683 port_section_name = "port {}".format(port_num)
684 for section_name, section in sections:
685 if port_section_name != section_name:
688 for index, section_data in enumerate(section):
689 if section_data[0] == "mac":
690 section_data[1] = "hardware"
693 for _, section in sections:
694 # for index, (item_key, item_val) in enumerate(section):
695 for index, section_data in enumerate(section):
696 item_key, item_val = section_data
697 if item_val.startswith("@@dst_mac"):
698 tx_port_iter = re.finditer(r'\d+', item_val)
699 tx_port_no = int(next(tx_port_iter).group(0))
700 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
701 mac = intf["virtual-interface"]["dst_mac"]
702 section_data[1] = mac.replace(":", " ", 6)
704 if item_key == "dst mac" and item_val.startswith("@@"):
705 tx_port_iter = re.finditer(r'\d+', item_val)
706 tx_port_no = int(next(tx_port_iter).group(0))
707 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
708 mac = intf["virtual-interface"]["dst_mac"]
709 section_data[1] = mac
711 # if addition file specified in prox config
712 if not self.additional_files:
715 for section_name, section in sections:
716 for index, section_data in enumerate(section):
718 if section_data[0].startswith("dofile"):
719 section_data[0] = self._insert_additional_file(section_data[0])
721 if section_data[1].startswith("dofile"):
722 section_data[1] = self._insert_additional_file(section_data[1])
729 def write_prox_lua(lua_config):
731 Write an .ini-format config file for PROX (parameters.lua)
732 PROX does not allow a space before/after the =, so we need
736 for key in lua_config:
737 value = '"' + lua_config[key] + '"'
738 if key == "__name__":
740 if value is not None and value != '@':
741 key = "=".join((key, str(value).replace('\n', '\n\t')))
744 key = str(key).replace('\n', '\n\t')
746 return os.linesep.join(out)
749 def write_prox_config(prox_config):
751 Write an .ini-format config file for PROX
752 PROX does not allow a space before/after the =, so we need
756 for i, (section_name, section) in enumerate(prox_config):
757 out.append("[{}]".format(section_name))
758 for index, item in enumerate(section):
760 if key == "__name__":
762 if value is not None and value != '@':
763 key = "=".join((key, str(value).replace('\n', '\n\t')))
766 key = str(key).replace('\n', '\n\t')
768 return os.linesep.join(out)
770 def put_string_to_file(self, s, remote_path):
771 file_obj = cStringIO(s)
772 self.ssh_helper.put_file_obj(file_obj, remote_path)
775 def generate_prox_lua_file(self):
777 all_ports = self.vnfd_helper.port_pairs.all_ports
778 for port_name in all_ports:
779 port_num = self.vnfd_helper.port_num(port_name)
780 intf = self.vnfd_helper.find_interface(name=port_name)
781 vintf = intf['virtual-interface']
782 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
783 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
787 def upload_prox_lua(self, config_file, lua_data):
788 # prox can't handle spaces around ' = ' so use custom method
789 out = StringIO(self.write_prox_lua(lua_data))
791 remote_path = os.path.join("/tmp", config_file)
792 self.ssh_helper.put_file_obj(out, remote_path)
796 def upload_prox_config(self, config_file, prox_config_data):
797 # prox can't handle spaces around ' = ' so use custom method
798 out = StringIO(self.write_prox_config(prox_config_data))
800 remote_path = os.path.join("/tmp", config_file)
801 self.ssh_helper.put_file_obj(out, remote_path)
805 def build_config_file(self):
806 task_path = self.scenario_helper.task_path
807 options = self.scenario_helper.options
808 config_path = options['prox_config']
809 config_file = os.path.basename(config_path)
810 config_path = find_relative_file(config_path, task_path)
811 self.additional_files = {}
814 if options['prox_generate_parameter']:
816 self.lua = self.generate_prox_lua_file()
817 if len(self.lua) > 0:
818 self.upload_prox_lua("parameters.lua", self.lua)
822 prox_files = options.get('prox_files', [])
823 if isinstance(prox_files, six.string_types):
824 prox_files = [prox_files]
825 for key_prox_file in prox_files:
826 base_prox_file = os.path.basename(key_prox_file)
827 key_prox_path = find_relative_file(key_prox_file, task_path)
828 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
829 self.additional_files[base_prox_file] = remote_prox_file
831 self._prox_config_data = self.generate_prox_config_file(config_path)
832 # copy config to queue so we can read it from traffic_runner process
833 self.config_queue.put(self._prox_config_data)
834 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
836 def build_config(self):
837 self.build_config_file()
839 options = self.scenario_helper.options
841 prox_args = options['prox_args']
842 LOG.info("Provision and start the %s", self.APP_NAME)
843 self._build_pipeline_kwargs()
844 self.pipeline_kwargs["args"] = " ".join(
845 " ".join([k, v if v else ""]) for k, v in prox_args.items())
846 self.pipeline_kwargs["cfg_file"] = self.remote_path
848 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
849 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
853 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
854 class ProxResourceHelper(ClientResourceHelper):
856 RESOURCE_WORD = 'prox'
863 def find_pci(pci, bound_pci):
864 # we have to substring match PCI bus address from the end
865 return any(b.endswith(pci) for b in bound_pci)
867 def __init__(self, setup_helper):
868 super(ProxResourceHelper, self).__init__(setup_helper)
869 self.mgmt_interface = self.vnfd_helper.mgmt_interface
870 self._user = self.mgmt_interface["user"]
871 self._ip = self.mgmt_interface["ip"]
874 self._vpci_to_if_name_map = None
875 self.additional_file = {}
876 self.remote_prox_file_name = None
881 self._test_type = None
886 self.client = self._connect()
891 if self._test_type is None:
892 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
893 return self._test_type
895 def run_traffic(self, traffic_profile):
896 self._queue.cancel_join_thread()
900 traffic_profile.init(self._queue)
901 # this frees up the run_traffic loop
902 self.client_started.value = 1
904 while not self._terminated.value:
905 # move it all to traffic_profile
906 self._run_traffic_once(traffic_profile)
908 def _run_traffic_once(self, traffic_profile):
909 traffic_profile.execute_traffic(self)
910 if traffic_profile.done:
911 self._queue.put({'done': True})
912 LOG.debug("tg_prox done")
913 self._terminated.value = 1
915 # For VNF use ResourceHelper method to collect KPIs directly.
916 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
917 def collect_collectd_kpi(self):
918 return self._collect_resource_kpi()
920 def collect_kpi(self):
921 result = super(ProxResourceHelper, self).collect_kpi()
922 # add in collectd kpis manually
924 result['collect_stats'] = self._collect_resource_kpi()
928 # should not be called, use VNF terminate
929 raise NotImplementedError()
932 return self.sut # force connection
934 def execute(self, cmd, *args, **kwargs):
935 func = getattr(self.sut, cmd, None)
937 return func(*args, **kwargs)
939 def _connect(self, client=None):
940 """Run and connect to prox on the remote system """
941 # De-allocating a large amount of hugepages takes some time. If a new
942 # PROX instance is started immediately after killing the previous one,
943 # it might not be able to allocate hugepages, because they are still
944 # being freed. Hence the -w switch.
945 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
946 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
947 # -f ./handle_none-4.cfg"
948 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
950 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
951 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
953 # + "./build/Prox " + prox_args
954 # log.debug("Starting PROX with command [%s]", prox_cmd)
955 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
956 # self._ip, prox_cmd))
958 client = ProxSocketHelper()
960 # try connecting to Prox for 60s
961 for _ in range(RETRY_SECONDS):
962 time.sleep(RETRY_INTERVAL)
964 client.connect(self._ip, PROX_PORT)
965 except (socket.gaierror, socket.error):
970 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
971 raise Exception(msg.format(self._ip, PROX_PORT))
974 class ProxDataHelper(object):
976 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
977 super(ProxDataHelper, self).__init__()
978 self.vnfd_helper = vnfd_helper
980 self.pkt_size = pkt_size
982 self.tolerated_loss = tolerated_loss
983 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
985 self.measured_stats = None
987 self._totals_and_pps = None
988 self.result_tuple = None
991 def totals_and_pps(self):
992 if self._totals_and_pps is None:
993 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
994 pps = self.value / 100.0 * self.line_rate_to_pps()
995 self._totals_and_pps = rx_total, tx_total, pps
996 return self._totals_and_pps
1000 return self.totals_and_pps[0]
1004 return self.totals_and_pps[1]
1008 return self.totals_and_pps[2]
1013 for port_name, port_num in self.vnfd_helper.ports_iter():
1014 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1015 samples[port_name] = {
1016 "in_packets": port_rx_total,
1017 "out_packets": port_tx_total,
1021 def __enter__(self):
1022 self.check_interface_count()
1025 def __exit__(self, exc_type, exc_val, exc_tb):
1028 def make_tuple(self):
1029 if self.result_tuple:
1032 self.result_tuple = ProxTestDataTuple(
1033 self.tolerated_loss,
1035 self.measured_stats['delta'].rx,
1036 self.measured_stats['delta'].tx,
1037 self.measured_stats['delta'].tsc,
1043 self.result_tuple.log_data()
1046 def measure_tot_stats(self):
1047 with self.sut.measure_tot_stats() as self.measured_stats:
1050 def check_interface_count(self):
1051 # do this assert in init? unless we expect interface count to
1052 # change from one run to another run...
1053 assert self.port_count in {1, 2, 4}, \
1054 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1056 def capture_tsc_hz(self):
1057 self.tsc_hz = float(self.sut.hz())
1059 def line_rate_to_pps(self):
1060 # FIXME Don't hardcode 10Gb/s
1061 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1064 class ProxProfileHelper(object):
1066 __prox_profile_type__ = "Generic"
1068 PROX_CORE_GEN_MODE = "gen"
1069 PROX_CORE_LAT_MODE = "lat"
1072 def get_cls(cls, helper_type):
1073 """Return class of specified type."""
1075 return ProxProfileHelper
1077 for profile_helper_class in utils.itersubclasses(cls):
1078 if helper_type == profile_helper_class.__prox_profile_type__:
1079 return profile_helper_class
1081 return ProxProfileHelper
1084 def make_profile_helper(cls, resource_helper):
1085 return cls.get_cls(resource_helper.test_type)(resource_helper)
1087 def __init__(self, resource_helper):
1088 super(ProxProfileHelper, self).__init__()
1089 self.resource_helper = resource_helper
1090 self._cpu_topology = None
1091 self._test_cores = None
1092 self._latency_cores = None
1095 def cpu_topology(self):
1096 if not self._cpu_topology:
1097 stdout = io.BytesIO()
1098 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1099 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1100 return self._cpu_topology
1103 def test_cores(self):
1104 if not self._test_cores:
1105 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1106 return self._test_cores
1109 def latency_cores(self):
1110 if not self._latency_cores:
1111 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1112 return self._latency_cores
1115 def traffic_context(self, pkt_size, value):
1117 self.sut.reset_stats()
1119 self.sut.set_pkt_size(self.test_cores, pkt_size)
1120 self.sut.set_speed(self.test_cores, value)
1121 self.sut.start_all()
1126 def get_cores(self, mode):
1129 for section_name, section in self.setup_helper.prox_config_data:
1130 if not section_name.startswith("core"):
1133 for key, value in section:
1134 if key == "mode" and value == mode:
1135 core_tuple = CoreSocketTuple(section_name)
1136 core = core_tuple.find_in_topology(self.cpu_topology)
1141 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1142 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1144 with data_helper, self.traffic_context(pkt_size, value):
1145 with data_helper.measure_tot_stats():
1146 time.sleep(duration)
1147 # Getting statistics to calculate PPS at right speed....
1148 data_helper.capture_tsc_hz()
1149 data_helper.latency = self.get_latency()
1151 return data_helper.result_tuple, data_helper.samples
1153 def get_latency(self):
1155 :return: return lat_min, lat_max, lat_avg
1158 if self._latency_cores:
1159 return self.sut.lat_stats(self._latency_cores)
1162 def terminate(self):
1165 def __getattr__(self, item):
1166 return getattr(self.resource_helper, item)
1169 class ProxMplsProfileHelper(ProxProfileHelper):
1171 __prox_profile_type__ = "MPLS tag/untag"
1173 def __init__(self, resource_helper):
1174 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1175 self._cores_tuple = None
1178 def mpls_cores(self):
1179 if not self._cores_tuple:
1180 self._cores_tuple = self.get_cores_mpls()
1181 return self._cores_tuple
1184 def tagged_cores(self):
1185 return self.mpls_cores[0]
1188 def plain_cores(self):
1189 return self.mpls_cores[1]
1191 def get_cores_mpls(self):
1194 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1195 if not section_name.startswith("core"):
1198 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1201 for item_key, item_value in section:
1202 if item_key != 'name':
1205 if item_value.startswith("tag"):
1206 core_tuple = CoreSocketTuple(section_name)
1207 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1208 cores_tagged.append(core_tag)
1210 elif item_value.startswith("udp"):
1211 core_tuple = CoreSocketTuple(section_name)
1212 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1213 cores_plain.append(core_udp)
1215 return cores_tagged, cores_plain
1218 def traffic_context(self, pkt_size, value):
1220 self.sut.reset_stats()
1222 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1223 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1224 self.sut.set_speed(self.tagged_cores, value)
1225 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1226 self.sut.set_speed(self.plain_cores, value * ratio)
1227 self.sut.start_all()
1233 class ProxBngProfileHelper(ProxProfileHelper):
1235 __prox_profile_type__ = "BNG gen"
1237 def __init__(self, resource_helper):
1238 super(ProxBngProfileHelper, self).__init__(resource_helper)
1239 self._cores_tuple = None
1242 def bng_cores(self):
1243 if not self._cores_tuple:
1244 self._cores_tuple = self.get_cores_gen_bng_qos()
1245 return self._cores_tuple
1248 def cpe_cores(self):
1249 return self.bng_cores[0]
1252 def inet_cores(self):
1253 return self.bng_cores[1]
1256 def arp_cores(self):
1257 return self.bng_cores[2]
1260 def arp_task_cores(self):
1261 return self.bng_cores[3]
1264 def all_rx_cores(self):
1265 return self.latency_cores
1267 def get_cores_gen_bng_qos(self):
1271 arp_tasks_core = [0]
1272 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1273 if not section_name.startswith("core"):
1276 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1279 for item_key, item_value in section:
1280 if item_key != 'name':
1283 if item_value.startswith("cpe"):
1284 core_tuple = CoreSocketTuple(section_name)
1285 cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1286 cpe_cores.append(cpe_core)
1288 elif item_value.startswith("inet"):
1289 core_tuple = CoreSocketTuple(section_name)
1290 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1291 inet_cores.append(inet_core)
1293 elif item_value.startswith("arp"):
1294 core_tuple = CoreSocketTuple(section_name)
1295 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1296 arp_cores.append(arp_core)
1298 # We check the tasks/core separately
1299 if item_value.startswith("arp_task"):
1300 core_tuple = CoreSocketTuple(section_name)
1301 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1302 arp_tasks_core.append(arp_task_core)
1304 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1307 def traffic_context(self, pkt_size, value):
1308 # Tester is sending packets at the required speed already after
1309 # setup_test(). Just get the current statistics, sleep the required
1310 # amount of time and calculate packet loss.
1311 inet_pkt_size = pkt_size
1312 cpe_pkt_size = pkt_size - 24
1313 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1315 curr_up_speed = curr_down_speed = 0
1316 max_up_speed = max_down_speed = value
1318 max_down_speed = value * ratio
1320 max_up_speed = value / ratio
1326 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1328 self.sut.start(self.all_rx_cores)
1330 self.sut.stop(self.all_rx_cores)
1332 self.sut.reset_stats()
1334 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1335 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1337 self.sut.reset_values(self.cpe_cores)
1338 self.sut.reset_values(self.inet_cores)
1340 # Set correct IP and UDP lengths in packet headers
1342 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1343 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1344 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1345 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1348 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1349 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1350 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1351 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1352 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1353 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1355 # Sending ARP to initialize tables - need a few seconds of generation
1356 # to make sure all CPEs are initialized
1357 LOG.info("Initializing SUT: sending ARP packets")
1358 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1359 self.sut.set_speed(self.inet_cores, curr_up_speed)
1360 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1361 self.sut.start(self.arp_cores)
1364 # Ramp up the transmission speed. First go to the common speed, then
1365 # increase steps for the faster one.
1366 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1368 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1370 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1371 # The min(..., ...) takes care of 1) floating point rounding errors
1372 # that could make curr_*_speed to be slightly greater than
1373 # max_*_speed and 2) max_*_speed not being an exact multiple of
1375 if curr_up_speed < max_up_speed:
1376 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1377 if curr_down_speed < max_down_speed:
1378 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1380 self.sut.set_speed(self.inet_cores, curr_up_speed)
1381 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1382 time.sleep(self.step_time)
1384 LOG.info("Target speeds reached. Starting real test.")
1388 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1389 LOG.info("Test ended. Flushing NIC buffers")
1390 self.sut.start(self.all_rx_cores)
1392 self.sut.stop(self.all_rx_cores)
1394 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1395 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1397 with data_helper, self.traffic_context(pkt_size, value):
1398 with data_helper.measure_tot_stats():
1399 time.sleep(duration)
1400 # Getting statistics to calculate PPS at right speed....
1401 data_helper.capture_tsc_hz()
1402 data_helper.latency = self.get_latency()
1404 return data_helper.result_tuple, data_helper.samples
1407 class ProxVpeProfileHelper(ProxProfileHelper):
1409 __prox_profile_type__ = "vPE gen"
1411 def __init__(self, resource_helper):
1412 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1413 self._cores_tuple = None
1414 self._ports_tuple = None
1417 def vpe_cores(self):
1418 if not self._cores_tuple:
1419 self._cores_tuple = self.get_cores_gen_vpe()
1420 return self._cores_tuple
1423 def cpe_cores(self):
1424 return self.vpe_cores[0]
1427 def inet_cores(self):
1428 return self.vpe_cores[1]
1431 def all_rx_cores(self):
1432 return self.latency_cores
1435 def vpe_ports(self):
1436 if not self._ports_tuple:
1437 self._ports_tuple = self.get_ports_gen_vpe()
1438 return self._ports_tuple
1441 def cpe_ports(self):
1442 return self.vpe_ports[0]
1445 def inet_ports(self):
1446 return self.vpe_ports[1]
1448 def get_cores_gen_vpe(self):
1451 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1452 if not section_name.startswith("core"):
1455 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1458 for item_key, item_value in section:
1459 if item_key != 'name':
1462 if item_value.startswith("cpe"):
1463 core_tuple = CoreSocketTuple(section_name)
1464 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1465 cpe_cores.append(core_tag)
1467 elif item_value.startswith("inet"):
1468 core_tuple = CoreSocketTuple(section_name)
1469 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1470 inet_cores.append(inet_core)
1472 return cpe_cores, inet_cores
1474 def get_ports_gen_vpe(self):
1478 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1479 if not section_name.startswith("port"):
1481 tx_port_iter = re.finditer(r'\d+', section_name)
1482 tx_port_no = int(next(tx_port_iter).group(0))
1484 for item_key, item_value in section:
1485 if item_key != 'name':
1488 if item_value.startswith("cpe"):
1489 cpe_ports.append(tx_port_no)
1491 elif item_value.startswith("inet"):
1492 inet_ports.append(tx_port_no)
1494 return cpe_ports, inet_ports
1497 def traffic_context(self, pkt_size, value):
1498 # Calculate the target upload and download speed. The upload and
1499 # download packets have different packet sizes, so in order to get
1500 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1501 # of the packet sizes.
1502 cpe_pkt_size = pkt_size
1503 inet_pkt_size = pkt_size - 4
1504 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1506 curr_up_speed = curr_down_speed = 0
1507 max_up_speed = max_down_speed = value
1509 max_down_speed = value * ratio
1511 max_up_speed = value / ratio
1513 # Adjust speed when multiple cores per port are used to generate traffic
1514 if len(self.cpe_ports) != len(self.cpe_cores):
1515 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1516 if len(self.inet_ports) != len(self.inet_cores):
1517 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1523 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1525 self.sut.start(self.all_rx_cores)
1527 self.sut.stop(self.all_rx_cores)
1529 self.sut.reset_stats()
1531 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1532 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1534 self.sut.reset_values(self.cpe_cores)
1535 self.sut.reset_values(self.inet_cores)
1537 # Set correct IP and UDP lengths in packet headers
1538 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1539 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1540 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1541 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1543 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1544 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1545 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1546 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1548 self.sut.set_speed(self.inet_cores, curr_up_speed)
1549 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1551 # Ramp up the transmission speed. First go to the common speed, then
1552 # increase steps for the faster one.
1553 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1555 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1557 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1558 # The min(..., ...) takes care of 1) floating point rounding errors
1559 # that could make curr_*_speed to be slightly greater than
1560 # max_*_speed and 2) max_*_speed not being an exact multiple of
1562 if curr_up_speed < max_up_speed:
1563 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1564 if curr_down_speed < max_down_speed:
1565 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1567 self.sut.set_speed(self.inet_cores, curr_up_speed)
1568 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1569 time.sleep(self.step_time)
1571 LOG.info("Target speeds reached. Starting real test.")
1575 self.sut.stop(self.cpe_cores + self.inet_cores)
1576 LOG.info("Test ended. Flushing NIC buffers")
1577 self.sut.start(self.all_rx_cores)
1579 self.sut.stop(self.all_rx_cores)
1581 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1582 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1584 with data_helper, self.traffic_context(pkt_size, value):
1585 with data_helper.measure_tot_stats():
1586 time.sleep(duration)
1587 # Getting statistics to calculate PPS at right speed....
1588 data_helper.capture_tsc_hz()
1589 data_helper.latency = self.get_latency()
1591 return data_helper.result_tuple, data_helper.samples
1594 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1596 __prox_profile_type__ = "lwAFTR gen"
1598 def __init__(self, resource_helper):
1599 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1600 self._cores_tuple = None
1601 self._ports_tuple = None
1603 self.step_time = 0.5
1606 def _lwaftr_cores(self):
1607 if not self._cores_tuple:
1608 self._cores_tuple = self._get_cores_gen_lwaftr()
1609 return self._cores_tuple
1612 def tun_cores(self):
1613 return self._lwaftr_cores[0]
1616 def inet_cores(self):
1617 return self._lwaftr_cores[1]
1620 def _lwaftr_ports(self):
1621 if not self._ports_tuple:
1622 self._ports_tuple = self._get_ports_gen_lw_aftr()
1623 return self._ports_tuple
1626 def tun_ports(self):
1627 return self._lwaftr_ports[0]
1630 def inet_ports(self):
1631 return self._lwaftr_ports[1]
1634 def all_rx_cores(self):
1635 return self.latency_cores
1637 def _get_cores_gen_lwaftr(self):
1640 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1641 if not section_name.startswith("core"):
1644 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1647 core_tuple = CoreSocketTuple(section_name)
1648 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1649 for item_value in (v for k, v in section if k == 'name'):
1650 if item_value.startswith('tun'):
1651 tun_cores.append(core_tag)
1652 elif item_value.startswith('inet'):
1653 inet_cores.append(core_tag)
1655 return tun_cores, inet_cores
1657 def _get_ports_gen_lw_aftr(self):
1661 re_port = re.compile('port (\d+)')
1662 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1663 match = re_port.search(section_name)
1667 tx_port_no = int(match.group(1))
1668 for item_value in (v for k, v in section if k == 'name'):
1669 if item_value.startswith('lwB4'):
1670 tun_ports.append(tx_port_no)
1671 elif item_value.startswith('inet'):
1672 inet_ports.append(tx_port_no)
1674 return tun_ports, inet_ports
1677 def _resize(len1, len2):
1680 return 1.0 * len1 / len2
1683 def traffic_context(self, pkt_size, value):
1684 # Tester is sending packets at the required speed already after
1685 # setup_test(). Just get the current statistics, sleep the required
1686 # amount of time and calculate packet loss.
1687 tun_pkt_size = pkt_size
1688 inet_pkt_size = pkt_size - 40
1689 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1691 curr_up_speed = curr_down_speed = 0
1692 max_up_speed = max_down_speed = value
1694 max_up_speed = value / ratio
1696 # Adjust speed when multiple cores per port are used to generate traffic
1697 if len(self.tun_ports) != len(self.tun_cores):
1698 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1699 if len(self.inet_ports) != len(self.inet_cores):
1700 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1706 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1708 self.sut.start(self.all_rx_cores)
1710 self.sut.stop(self.all_rx_cores)
1712 self.sut.reset_stats()
1714 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1715 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1717 self.sut.reset_values(self.tun_cores)
1718 self.sut.reset_values(self.inet_cores)
1720 # Set correct IP and UDP lengths in packet headers
1722 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1723 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1724 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1725 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1726 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1727 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1730 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1731 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1732 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1733 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1735 LOG.info("Initializing SUT: sending lwAFTR packets")
1736 self.sut.set_speed(self.inet_cores, curr_up_speed)
1737 self.sut.set_speed(self.tun_cores, curr_down_speed)
1740 # Ramp up the transmission speed. First go to the common speed, then
1741 # increase steps for the faster one.
1742 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1744 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1746 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1747 # The min(..., ...) takes care of 1) floating point rounding errors
1748 # that could make curr_*_speed to be slightly greater than
1749 # max_*_speed and 2) max_*_speed not being an exact multiple of
1751 if curr_up_speed < max_up_speed:
1752 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1753 if curr_down_speed < max_down_speed:
1754 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1756 self.sut.set_speed(self.inet_cores, curr_up_speed)
1757 self.sut.set_speed(self.tun_cores, curr_down_speed)
1758 time.sleep(self.step_time)
1760 LOG.info("Target speeds reached. Starting real test.")
1764 self.sut.stop(self.tun_cores + self.inet_cores)
1765 LOG.info("Test ended. Flushing NIC buffers")
1766 self.sut.start(self.all_rx_cores)
1768 self.sut.stop(self.all_rx_cores)
1770 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1771 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1773 with data_helper, self.traffic_context(pkt_size, value):
1774 with data_helper.measure_tot_stats():
1775 time.sleep(duration)
1776 # Getting statistics to calculate PPS at right speed....
1777 data_helper.capture_tsc_hz()
1778 data_helper.latency = self.get_latency()
1780 return data_helper.result_tuple, data_helper.samples