1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
31 from six.moves import zip, StringIO
32 from six.moves import cStringIO
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
36 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
54 CONFIGURATION_OPTIONS = (
55 # dict key section key default value
56 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57 ('testDuration', 'general', 'test_duration', 5.0),
58 ('testPrecision', 'general', 'test_precision', 1.0),
59 ('tests', 'general', 'tests', None),
60 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62 ('logFile', 'logging', 'file', 'dats.log'),
63 ('logDateFormat', 'logging', 'datefmt', None),
64 ('logLevel', 'logging', 'level', 'INFO'),
65 ('logOverwrite', 'logging', 'overwrite', 1),
67 ('testerIp', 'tester', 'ip', None),
68 ('testerUser', 'tester', 'user', 'root'),
69 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72 ('testerSocketId', 'tester', 'socket_id', 0),
74 ('sutIp', 'sut', 'ip', None),
75 ('sutUser', 'sut', 'user', 'root'),
76 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79 ('sutSocketId', 'sut', 'socket_id', 0),
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
118 def __new__(cls, *args):
120 assert args[0] is not str(args[0])
121 args = tuple(args[0])
122 except (AssertionError, IndexError, TypeError):
125 return super(TotStatsTuple, cls).__new__(cls, *args)
128 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
129 'delta_tx,delta_tsc,'
130 'latency,rx_total,tx_total,pps')):
135 return 1e2 * self.drop_total / float(self.tx_total)
136 except ZeroDivisionError:
141 # calculate the effective throughput in Mpps
142 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
145 def can_be_lost(self):
146 return int(self.tx_total * self.tolerated / 1e2)
149 def drop_total(self):
150 return self.tx_total - self.rx_total
154 return self.drop_total <= self.can_be_lost
156 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
158 pkt_loss = self.pkt_loss
160 if port_samples is None:
170 "Throughput": self.mpps,
171 "DropPackets": pkt_loss,
172 "CurrentDropPackets": pkt_loss,
173 "TxThroughput": self.pps / 1e6,
174 "RxThroughput": self.mpps,
178 samples.update(port_samples)
180 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
183 def log_data(self, logger=None):
187 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
188 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
189 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
192 class PacketDump(object):
195 def assert_func(func, value1, value2, template=None):
196 assert func(value1, value2), template.format(value1, value2)
198 def __init__(self, port_id, data_len, payload):
199 template = "Packet dump has specified length {}, but payload is {} bytes long"
200 self.assert_func(operator.eq, data_len, len(payload), template)
201 self._port_id = port_id
202 self._data_len = data_len
203 self._payload = payload
207 """Get the port id of the packet dump"""
212 """Get the length of the data received"""
213 return self._data_len
216 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
218 def payload(self, start=None, end=None):
219 """Get part of the payload as a list of ordinals.
221 Returns a list of byte values, matching the contents of the packet dump.
222 Optional start and end parameters can be specified to retrieve only a
223 part of the packet contents.
225 The number of elements in the list is equal to end - start + 1, so end
226 is the offset of the last character.
229 start (pos. int): the starting offset in the payload. If it is not
230 specified or None, offset 0 is assumed.
231 end (pos. int): the ending offset of the payload. If it is not
232 specified or None, the contents until the end of the packet are
236 [int, int, ...]. Each int represents the ordinal value of a byte in
243 end = self.data_len - 1
245 # Bounds checking on offsets
246 template = "Start offset must be non-negative"
247 self.assert_func(operator.ge, start, 0, template)
249 template = "End offset must be less than {1}"
250 self.assert_func(operator.lt, end, self.data_len, template)
252 # Adjust for splice operation: end offset must be 1 more than the offset
253 # of the last desired character.
256 return self._payload[start:end]
259 class ProxSocketHelper(object):
261 def __init__(self, sock=None):
262 """ creates new prox instance """
263 super(ProxSocketHelper, self).__init__()
266 sock = socket.socket()
271 def connect(self, ip, port):
272 """Connect to the prox instance on the remote system"""
273 self._sock.connect((ip, port))
275 def get_socket(self):
276 """ get the socket connected to the remote instance """
279 def _parse_socket_data(self, decoded_data, pkt_dump_only):
280 def get_newline_index():
281 return decoded_data.find('\n', index)
285 for newline_index in iter(get_newline_index, -1):
286 ret_str = decoded_data[index:newline_index]
289 mode, port_id, data_len = ret_str.split(',', 2)
291 mode, port_id, data_len = None, None, None
293 if mode != 'pktdump':
294 # Regular 1-line message. Stop reading from the socket.
295 LOG.debug("Regular response read")
298 LOG.debug("Packet dump header read: [%s]", ret_str)
300 # The line is a packet dump header. Parse it, read the
301 # packet payload, store the dump for later retrieval.
302 # Skip over the packet dump and continue processing: a
303 # 1-line response may follow the packet dump.
305 data_len = int(data_len)
306 data_start = newline_index + 1 # + 1 to skip over \n
307 data_end = data_start + data_len
308 sub_data = decoded_data[data_start:data_end]
309 pkt_payload = array.array('B', (ord(v) for v in sub_data))
310 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
311 self._pkt_dumps.append(pkt_dump)
314 # Return boolean instead of string to signal
315 # successful reception of the packet dump.
316 LOG.debug("Packet dump stored, returning")
323 def get_data(self, pkt_dump_only=False, timeout=1):
324 """ read data from the socket """
325 # This method behaves slightly differently depending on whether it is
326 # called to read the response to a command (pkt_dump_only = 0) or if
327 # it is called specifically to read a packet dump (pkt_dump_only = 1).
329 # Packet dumps look like:
330 # pktdump,<port_id>,<data_len>\n
331 # <packet contents as byte array>\n
332 # This means the total packet dump message consists of 2 lines instead
335 # - Response for a command (pkt_dump_only = 0):
336 # 1) Read response from the socket until \n (end of message)
337 # 2a) If the response is a packet dump header (starts with "pktdump,"):
338 # - Read the packet payload and store the packet dump for later
340 # - Reset the state and restart from 1). Eventually state 2b) will
341 # be reached and the function will return.
342 # 2b) If the response is not a packet dump:
343 # - Return the received message as a string
345 # - Explicit request to read a packet dump (pkt_dump_only = 1):
346 # - Read the dump header and payload
347 # - Store the packet dump for later retrieval
348 # - Return True to signify a packet dump was successfully read
351 # recv() is blocking, so avoid calling it when no data is waiting.
352 ready = select.select([self._sock], [], [], timeout)
353 return bool(ready[0])
357 for status in iter(is_ready, False):
358 decoded_data = self._sock.recv(256).decode('utf-8')
359 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
361 LOG.debug("Received data from socket: [%s]", ret_str)
362 return ret_str if status else ''
364 def put_command(self, to_send):
365 """ send data to the remote instance """
366 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
368 self._sock.sendall(to_send.encode('utf-8'))
372 def get_packet_dump(self):
373 """ get the next packet dump """
375 return self._pkt_dumps.pop(0)
378 def stop_all_reset(self):
379 """ stop the remote instance and reset stats """
380 LOG.debug("Stop all and reset stats")
385 """ stop all cores on the remote instance """
386 LOG.debug("Stop all")
387 self.put_command("stop all\n")
390 def stop(self, cores, task=''):
391 """ stop specific cores on the remote instance """
392 LOG.debug("Stopping cores %s", cores)
393 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
397 """ start all cores on the remote instance """
398 LOG.debug("Start all")
399 self.put_command("start all\n")
401 def start(self, cores):
402 """ start specific cores on the remote instance """
403 LOG.debug("Starting cores %s", cores)
404 self.put_command("start {}\n".format(join_non_strings(',', cores)))
407 def reset_stats(self):
408 """ reset the statistics on the remote instance """
409 LOG.debug("Reset stats")
410 self.put_command("reset stats\n")
413 def _run_template_over_cores(self, template, cores, *args):
415 self.put_command(template.format(core, *args))
417 def set_pkt_size(self, cores, pkt_size):
418 """ set the packet size to generate on the remote instance """
419 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
421 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
424 def set_value(self, cores, offset, value, length):
425 """ set value on the remote instance """
426 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427 LOG.debug(msg, cores, value, length, offset)
428 template = "set value {} 0 {} {} {}\n"
429 self._run_template_over_cores(template, cores, offset, value, length)
431 def reset_values(self, cores):
432 """ reset values on the remote instance """
433 LOG.debug("Set value for core(s) %s", cores)
434 self._run_template_over_cores("reset values {} 0\n", cores)
436 def set_speed(self, cores, speed):
437 """ set speed on the remote instance """
438 LOG.debug("Set speed for core(s) %s to %g", cores, speed)
439 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
441 def slope_speed(self, cores_speed, duration, n_steps=0):
442 """will start to increase speed from 0 to N where N is taken from
443 a['speed'] for each a in cores_speed"""
444 # by default, each step will take 0.5 sec
446 n_steps = duration * 2
448 private_core_data = []
449 step_duration = float(duration) / n_steps
450 for core_data in cores_speed:
451 target = float(core_data['speed'])
452 private_core_data.append({
453 'cores': core_data['cores'],
455 'delta': target / n_steps,
460 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
461 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
462 time.sleep(step_duration)
463 for core_data in private_core_data:
464 core_data['current'] = core_data[key1] + core_data[key2]
465 self.set_speed(core_data['cores'], core_data['current'])
467 def set_pps(self, cores, pps, pkt_size):
468 """ set packets per second for specific cores on the remote instance """
469 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
470 LOG.debug(msg, cores, pps, pkt_size)
472 # speed in percent of line-rate
473 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
474 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
476 def lat_stats(self, cores, task=0):
477 """Get the latency statistics from the remote system"""
478 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
483 self.put_command("lat stats {} {} \n".format(core, task))
484 ret = self.get_data()
487 lat_min[core], lat_max[core], lat_avg[core] = \
488 tuple(int(n) for n in ret.split(",")[:3])
490 except (AttributeError, ValueError, TypeError):
493 return lat_min, lat_max, lat_avg
495 def get_all_tot_stats(self):
496 self.put_command("tot stats\n")
497 all_stats_str = self.get_data().split(",")
498 if len(all_stats_str) != 4:
501 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
502 self.master_stats = all_stats
506 return self.get_all_tot_stats()[3]
510 def rx_stats(self, cores, task=0):
511 return self.core_stats(cores, task)
513 def core_stats(self, cores, task=0):
514 """Get the receive statistics from the remote system"""
515 rx = tx = drop = tsc = 0
517 self.put_command("core stats {} {}\n".format(core, task))
518 ret = self.get_data().split(",")
523 return rx, tx, drop, tsc
525 def port_stats(self, ports):
526 """get counter values from a specific port"""
527 tot_result = [0] * 12
529 self.put_command("port_stats {}\n".format(port))
530 ret = [try_int(s, 0) for s in self.get_data().split(",")]
531 tot_result = [sum(x) for x in zip(tot_result, ret)]
535 def measure_tot_stats(self):
536 start = self.get_all_tot_stats()
537 container = {'start_tot': start}
541 container['end_tot'] = end = self.get_all_tot_stats()
543 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
546 """Get the total statistics from the remote system"""
547 stats = self.get_all_tot_stats()
550 def tot_ierrors(self):
551 """Get the total ierrors from the remote system"""
552 self.put_command("tot ierrors tot\n")
553 recv = self.get_data().split(',')
554 tot_ierrors = int(recv[0])
556 return tot_ierrors, tsc
558 def set_count(self, count, cores):
559 """Set the number of packets to send on the specified core"""
560 self._run_template_over_cores("count {} 0 {}\n", cores, count)
562 def dump_rx(self, core_id, task_id=0, count=1):
563 """Activate dump on rx on the specified core"""
564 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
565 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
566 time.sleep(1.5) # Give PROX time to set up packet dumping
574 """ stop all cores on the remote instance """
575 LOG.debug("Quit prox")
576 self.put_command("quit\n")
579 def force_quit(self):
580 """ stop all cores on the remote instance """
581 LOG.debug("Force Quit prox")
582 self.put_command("quit_force\n")
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587 # the actual app is lowercase
590 LUA_PARAMETER_NAME = ""
591 LUA_PARAMETER_PEER = {
596 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
597 self.remote_path = None
598 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
599 self.remote_prox_file_name = None
600 self.prox_config_dict = None
601 self.additional_files = {}
603 def _build_pipeline_kwargs(self):
604 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
605 self.pipeline_kwargs = {
606 'tool_path': tool_path,
607 'tool_dir': os.path.dirname(tool_path),
610 def copy_to_target(self, config_file_path, prox_file):
611 remote_path = os.path.join("/tmp", prox_file)
612 self.ssh_helper.put(config_file_path, remote_path)
616 def _get_tx_port(section, sections):
618 for item in sections[section]:
619 if item[0] == "tx port":
620 iface_port = re.findall(r'\d+', item[1])
621 # do we want the last one?
622 # if yes, then can we reverse?
623 return int(iface_port[0])
626 def _replace_quoted_with_value(quoted, value, count=1):
627 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
630 def _insert_additional_file(self, value):
631 file_str = value.split('"')
632 base_name = os.path.basename(file_str[1])
633 file_str[1] = self.additional_files[base_name]
634 return '"'.join(file_str)
636 def generate_prox_config_file(self, config_path):
638 prox_config = ConfigParser(config_path, sections)
641 # Ensure MAC is set "hardware"
642 all_ports = self.vnfd_helper.port_pairs.all_ports
643 # use dpdk port number
644 for port_name in all_ports:
645 port_num = self.vnfd_helper.port_num(port_name)
646 port_section_name = "port {}".format(port_num)
647 for section_name, section in sections:
648 if port_section_name != section_name:
651 for index, section_data in enumerate(section):
652 if section_data[0] == "mac":
653 section_data[1] = "hardware"
656 for _, section in sections:
657 # for index, (item_key, item_val) in enumerate(section):
658 for index, section_data in enumerate(section):
659 item_key, item_val = section_data
660 if item_val.startswith("@@dst_mac"):
661 tx_port_iter = re.finditer(r'\d+', item_val)
662 tx_port_no = int(next(tx_port_iter).group(0))
663 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
664 mac = intf["virtual-interface"]["dst_mac"]
665 section_data[1] = mac.replace(":", " ", 6)
667 if item_key == "dst mac" and item_val.startswith("@@"):
668 tx_port_iter = re.finditer(r'\d+', item_val)
669 tx_port_no = int(next(tx_port_iter).group(0))
670 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
671 mac = intf["virtual-interface"]["dst_mac"]
672 section_data[1] = mac
674 # if addition file specified in prox config
675 if not self.additional_files:
678 for section_name, section in sections:
679 for index, section_data in enumerate(section):
681 if section_data[0].startswith("dofile"):
682 section_data[0] = self._insert_additional_file(section_data[0])
684 if section_data[1].startswith("dofile"):
685 section_data[1] = self._insert_additional_file(section_data[1])
692 def write_prox_config(prox_config):
694 Write an .ini-format config file for PROX
695 PROX does not allow a space before/after the =, so we need
699 for i, (section_name, section) in enumerate(prox_config):
700 out.append("[{}]".format(section_name))
701 for index, item in enumerate(section):
703 if key == "__name__":
705 if value is not None and value != '@':
706 key = "=".join((key, str(value).replace('\n', '\n\t')))
709 key = str(key).replace('\n', '\n\t')
711 return os.linesep.join(out)
713 def put_string_to_file(self, s, remote_path):
714 file_obj = cStringIO(s)
715 self.ssh_helper.put_file_obj(file_obj, remote_path)
718 def generate_prox_lua_file(self):
720 all_ports = self.vnfd_helper.port_pairs.all_ports
721 lua_param = self.LUA_PARAMETER_NAME
722 for port_name in all_ports:
723 peer = self.LUA_PARAMETER_PEER[lua_param]
724 port_num = self.vnfd_helper.port_num(port_name)
725 intf = self.vnfd_helper.find_interface(name=port_name)
726 vintf = intf['virtual-interface']
727 local_ip = vintf["local_ip"]
728 dst_ip = vintf["dst_ip"]
729 local_ip_hex = ip_to_hex(local_ip, separator=' ')
730 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
732 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
733 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
734 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
735 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
737 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
740 def upload_prox_lua(self, config_dir, prox_config_dict):
741 # we could have multiple lua directives
742 lau_dict = prox_config_dict.get('lua', {})
743 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
744 lua_file = next((found[0] for found in find_iter if found), None)
748 out = self.generate_prox_lua_file()
749 remote_path = os.path.join(config_dir, lua_file)
750 return self.put_string_to_file(out, remote_path)
752 def upload_prox_config(self, config_file, prox_config_dict):
753 # prox can't handle spaces around ' = ' so use custom method
754 out = StringIO(self.write_prox_config(prox_config_dict))
756 remote_path = os.path.join("/tmp", config_file)
757 self.ssh_helper.put_file_obj(out, remote_path)
761 def build_config_file(self):
762 task_path = self.scenario_helper.task_path
763 options = self.scenario_helper.options
764 config_path = options['prox_config']
765 config_file = os.path.basename(config_path)
766 config_path = find_relative_file(config_path, task_path)
767 self.additional_files = {}
769 prox_files = options.get('prox_files', [])
770 if isinstance(prox_files, six.string_types):
771 prox_files = [prox_files]
772 for key_prox_file in prox_files:
773 base_prox_file = os.path.basename(key_prox_file)
774 remote_prox_file = self.copy_to_target(key_prox_file, base_prox_file)
775 self.additional_files[base_prox_file] = remote_prox_file
777 self.prox_config_dict = self.generate_prox_config_file(config_path)
778 self.remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
780 def build_config(self):
782 options = self.scenario_helper.options
784 prox_args = options['prox_args']
785 LOG.info("Provision and start the %s", self.APP_NAME)
786 self._build_pipeline_kwargs()
787 self.pipeline_kwargs["args"] = " ".join(
788 " ".join([k, v if v else ""]) for k, v in prox_args.items())
789 self.pipeline_kwargs["cfg_file"] = self.remote_path
791 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
792 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
796 class ProxResourceHelper(ClientResourceHelper):
798 RESOURCE_WORD = 'prox'
799 PROX_CORE_GEN_MODE = "gen"
800 PROX_CORE_LAT_MODE = "lat"
801 PROX_CORE_MPLS_TEST = "MPLS tag/untag"
808 def line_rate_to_pps(pkt_size, n_ports):
809 # FIXME Don't hardcode 10Gb/s
810 return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
813 def find_pci(pci, bound_pci):
814 # we have to substring match PCI bus address from the end
815 return any(b.endswith(pci) for b in bound_pci)
817 def __init__(self, setup_helper):
818 super(ProxResourceHelper, self).__init__(setup_helper)
819 self.mgmt_interface = self.vnfd_helper.mgmt_interface
820 self._user = self.mgmt_interface["user"]
821 self._ip = self.mgmt_interface["ip"]
824 self._cpu_topology = None
825 self._vpci_to_if_name_map = None
826 self.additional_file = {}
827 self.remote_prox_file_name = None
830 self._test_cores = None
831 self._latency_cores = None
832 self._tagged_cores = None
833 self._plain_cores = None
838 self.client = self._connect()
842 def cpu_topology(self):
843 if not self._cpu_topology:
844 stdout = io.BytesIO()
845 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
846 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
847 return self._cpu_topology
850 def test_cores(self):
851 if not self._test_cores:
852 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
853 return self._test_cores
856 def mpls_cores(self):
857 if not self._tagged_cores:
858 self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
859 return self._tagged_cores, self._plain_cores
862 def tagged_cores(self):
863 return self.mpls_cores[0]
866 def plain_cores(self):
867 return self.mpls_cores[1]
870 def latency_cores(self):
871 if not self._latency_cores:
872 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
873 return self._latency_cores
875 def run_traffic(self, traffic_profile):
879 traffic_profile.init(self._queue)
880 # this frees up the run_traffic loop
881 self.client_started.value = 1
883 while not self._terminated.value:
884 # move it all to traffic_profile
885 self._run_traffic_once(traffic_profile)
887 def _run_traffic_once(self, traffic_profile):
888 traffic_profile.execute_traffic(self)
889 if traffic_profile.done:
890 self._queue.put({'done': True})
891 LOG.debug("tg_prox done")
892 self._terminated.value = 1
894 def start_collect(self):
898 # should not be called, use VNF terminate
899 raise NotImplementedError()
902 return self.sut # force connection
904 def execute(self, cmd, *args, **kwargs):
905 func = getattr(self.sut, cmd, None)
907 return func(*args, **kwargs)
910 def traffic_context(self, pkt_size, value):
912 self.sut.reset_stats()
913 if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
914 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
915 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
916 self.sut.set_speed(self.tagged_cores, value)
917 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
918 self.sut.set_speed(self.plain_cores, value * ratio)
920 self.sut.set_pkt_size(self.test_cores, pkt_size)
921 self.sut.set_speed(self.test_cores, value)
929 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
930 # do this assert in init? unless we expect interface count to
931 # change from one run to another run...
932 ports = self.vnfd_helper.port_pairs.all_ports
933 port_count = len(ports)
934 assert port_count in {1, 2, 4}, \
935 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
937 with self.traffic_context(pkt_size, value):
938 # Getting statistics to calculate PPS at right speed....
939 tsc_hz = float(self.sut.hz())
941 with self.sut.measure_tot_stats() as data:
944 # Get stats before stopping the cores. Stopping cores takes some time
945 # and might skew results otherwise.
946 latency = self.get_latency()
948 deltas = data['delta']
949 rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8]
950 pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count)
953 # we are currently using enumeration to map logical port num to interface
954 for port_name in ports:
955 port = self.vnfd_helper.port_num(port_name)
956 port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8]
957 samples[port_name] = {
958 "in_packets": port_rx_total,
959 "out_packets": port_tx_total,
962 result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
963 deltas.tsc, latency, rx_total, tx_total, pps)
965 return result, samples
967 def get_test_type(self):
969 for section_name, section in self.setup_helper.prox_config_dict:
970 if section_name != "global":
973 for key, value in section:
974 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
975 test_type = self.PROX_CORE_MPLS_TEST
979 def get_cores(self, mode):
982 for section_name, section in self.setup_helper.prox_config_dict:
983 if not section_name.startswith("core"):
986 for key, value in section:
987 if key == "mode" and value == mode:
988 core_tuple = CoreSocketTuple(section_name)
989 core = core_tuple.find_in_topology(self.cpu_topology)
994 def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
997 for section_name, section in self.setup_helper.prox_config_dict:
998 if not section_name.startswith("core"):
1001 if all(key != "mode" or value != mode for key, value in section):
1004 for item_key, item_value in section:
1005 if item_key == "name" and item_value.startswith("tag"):
1006 core_tuple = CoreSocketTuple(section_name)
1007 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1008 cores_tagged.append(core_tag)
1010 elif item_key == "name" and item_value.startswith("udp"):
1011 core_tuple = CoreSocketTuple(section_name)
1012 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1013 cores_plain.append(core_udp)
1015 return cores_tagged, cores_plain
1017 def get_latency(self):
1019 :return: return lat_min, lat_max, lat_avg
1022 if self._latency_cores:
1023 return self.sut.lat_stats(self._latency_cores)
1026 def _get_logical_if_name(self, vpci):
1027 return self._vpci_to_if_name_map[vpci]
1029 def _connect(self, client=None):
1030 """Run and connect to prox on the remote system """
1031 # De-allocating a large amount of hugepages takes some time. If a new
1032 # PROX instance is started immediately after killing the previous one,
1033 # it might not be able to allocate hugepages, because they are still
1034 # being freed. Hence the -w switch.
1035 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1036 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1037 # -f ./handle_none-4.cfg"
1038 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1040 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1041 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1043 # + "./build/Prox " + prox_args
1044 # log.debug("Starting PROX with command [%s]", prox_cmd)
1045 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1046 # self._ip, prox_cmd))
1048 client = ProxSocketHelper()
1050 # try connecting to Prox for 60s
1051 for _ in range(RETRY_SECONDS):
1052 time.sleep(RETRY_INTERVAL)
1054 client.connect(self._ip, PROX_PORT)
1055 except (socket.gaierror, socket.error):
1060 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1061 raise Exception(msg.format(self._ip, PROX_PORT))