1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
31 from six.moves import zip, StringIO
32 from six.moves import cStringIO
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
36 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
54 CONFIGURATION_OPTIONS = (
55 # dict key section key default value
56 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57 ('testDuration', 'general', 'test_duration', 5.0),
58 ('testPrecision', 'general', 'test_precision', 1.0),
59 ('tests', 'general', 'tests', None),
60 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62 ('logFile', 'logging', 'file', 'dats.log'),
63 ('logDateFormat', 'logging', 'datefmt', None),
64 ('logLevel', 'logging', 'level', 'INFO'),
65 ('logOverwrite', 'logging', 'overwrite', 1),
67 ('testerIp', 'tester', 'ip', None),
68 ('testerUser', 'tester', 'user', 'root'),
69 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72 ('testerSocketId', 'tester', 'socket_id', 0),
74 ('sutIp', 'sut', 'ip', None),
75 ('sutUser', 'sut', 'user', 'root'),
76 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79 ('sutSocketId', 'sut', 'socket_id', 0),
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
118 def __new__(cls, *args):
120 assert args[0] is not str(args[0])
121 args = tuple(args[0])
122 except (AssertionError, IndexError, TypeError):
125 return super(TotStatsTuple, cls).__new__(cls, *args)
128 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
129 'delta_tx,delta_tsc,'
130 'latency,rx_total,tx_total,pps')):
135 return 1e2 * self.drop_total / float(self.tx_total)
136 except ZeroDivisionError:
141 # calculate the effective throughput in Mpps
142 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
145 def can_be_lost(self):
146 return int(self.tx_total * self.tolerated / 1e2)
149 def drop_total(self):
150 return self.tx_total - self.rx_total
154 return self.drop_total <= self.can_be_lost
156 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
158 pkt_loss = self.pkt_loss
160 if port_samples is None:
170 "Throughput": self.mpps,
171 "DropPackets": pkt_loss,
172 "CurrentDropPackets": pkt_loss,
173 "TxThroughput": self.pps / 1e6,
174 "RxThroughput": self.mpps,
178 samples.update(port_samples)
180 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
183 def log_data(self, logger=None):
187 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
188 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
189 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
192 class PacketDump(object):
195 def assert_func(func, value1, value2, template=None):
196 assert func(value1, value2), template.format(value1, value2)
198 def __init__(self, port_id, data_len, payload):
199 template = "Packet dump has specified length {}, but payload is {} bytes long"
200 self.assert_func(operator.eq, data_len, len(payload), template)
201 self._port_id = port_id
202 self._data_len = data_len
203 self._payload = payload
207 """Get the port id of the packet dump"""
212 """Get the length of the data received"""
213 return self._data_len
216 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
218 def payload(self, start=None, end=None):
219 """Get part of the payload as a list of ordinals.
221 Returns a list of byte values, matching the contents of the packet dump.
222 Optional start and end parameters can be specified to retrieve only a
223 part of the packet contents.
225 The number of elements in the list is equal to end - start + 1, so end
226 is the offset of the last character.
229 start (pos. int): the starting offset in the payload. If it is not
230 specified or None, offset 0 is assumed.
231 end (pos. int): the ending offset of the payload. If it is not
232 specified or None, the contents until the end of the packet are
236 [int, int, ...]. Each int represents the ordinal value of a byte in
243 end = self.data_len - 1
245 # Bounds checking on offsets
246 template = "Start offset must be non-negative"
247 self.assert_func(operator.ge, start, 0, template)
249 template = "End offset must be less than {1}"
250 self.assert_func(operator.lt, end, self.data_len, template)
252 # Adjust for splice operation: end offset must be 1 more than the offset
253 # of the last desired character.
256 return self._payload[start:end]
259 class ProxSocketHelper(object):
261 def __init__(self, sock=None):
262 """ creates new prox instance """
263 super(ProxSocketHelper, self).__init__()
266 sock = socket.socket()
271 def connect(self, ip, port):
272 """Connect to the prox instance on the remote system"""
273 self._sock.connect((ip, port))
275 def get_socket(self):
276 """ get the socket connected to the remote instance """
279 def _parse_socket_data(self, decoded_data, pkt_dump_only):
280 def get_newline_index():
281 return decoded_data.find('\n', index)
285 for newline_index in iter(get_newline_index, -1):
286 ret_str = decoded_data[index:newline_index]
289 mode, port_id, data_len = ret_str.split(',', 2)
291 mode, port_id, data_len = None, None, None
293 if mode != 'pktdump':
294 # Regular 1-line message. Stop reading from the socket.
295 LOG.debug("Regular response read")
298 LOG.debug("Packet dump header read: [%s]", ret_str)
300 # The line is a packet dump header. Parse it, read the
301 # packet payload, store the dump for later retrieval.
302 # Skip over the packet dump and continue processing: a
303 # 1-line response may follow the packet dump.
305 data_len = int(data_len)
306 data_start = newline_index + 1 # + 1 to skip over \n
307 data_end = data_start + data_len
308 sub_data = decoded_data[data_start:data_end]
309 pkt_payload = array.array('B', (ord(v) for v in sub_data))
310 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
311 self._pkt_dumps.append(pkt_dump)
314 # Return boolean instead of string to signal
315 # successful reception of the packet dump.
316 LOG.debug("Packet dump stored, returning")
323 def get_data(self, pkt_dump_only=False, timeout=1):
324 """ read data from the socket """
325 # This method behaves slightly differently depending on whether it is
326 # called to read the response to a command (pkt_dump_only = 0) or if
327 # it is called specifically to read a packet dump (pkt_dump_only = 1).
329 # Packet dumps look like:
330 # pktdump,<port_id>,<data_len>\n
331 # <packet contents as byte array>\n
332 # This means the total packet dump message consists of 2 lines instead
335 # - Response for a command (pkt_dump_only = 0):
336 # 1) Read response from the socket until \n (end of message)
337 # 2a) If the response is a packet dump header (starts with "pktdump,"):
338 # - Read the packet payload and store the packet dump for later
340 # - Reset the state and restart from 1). Eventually state 2b) will
341 # be reached and the function will return.
342 # 2b) If the response is not a packet dump:
343 # - Return the received message as a string
345 # - Explicit request to read a packet dump (pkt_dump_only = 1):
346 # - Read the dump header and payload
347 # - Store the packet dump for later retrieval
348 # - Return True to signify a packet dump was successfully read
351 # recv() is blocking, so avoid calling it when no data is waiting.
352 ready = select.select([self._sock], [], [], timeout)
353 return bool(ready[0])
357 for status in iter(is_ready, False):
358 decoded_data = self._sock.recv(256).decode('utf-8')
359 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
361 LOG.debug("Received data from socket: [%s]", ret_str)
362 return ret_str if status else ''
364 def put_command(self, to_send):
365 """ send data to the remote instance """
366 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
368 self._sock.sendall(to_send.encode('utf-8'))
372 def get_packet_dump(self):
373 """ get the next packet dump """
375 return self._pkt_dumps.pop(0)
378 def stop_all_reset(self):
379 """ stop the remote instance and reset stats """
380 LOG.debug("Stop all and reset stats")
385 """ stop all cores on the remote instance """
386 LOG.debug("Stop all")
387 self.put_command("stop all\n")
390 def stop(self, cores, task=''):
391 """ stop specific cores on the remote instance """
392 LOG.debug("Stopping cores %s", cores)
393 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
397 """ start all cores on the remote instance """
398 LOG.debug("Start all")
399 self.put_command("start all\n")
401 def start(self, cores):
402 """ start specific cores on the remote instance """
403 LOG.debug("Starting cores %s", cores)
404 self.put_command("start {}\n".format(join_non_strings(',', cores)))
407 def reset_stats(self):
408 """ reset the statistics on the remote instance """
409 LOG.debug("Reset stats")
410 self.put_command("reset stats\n")
413 def _run_template_over_cores(self, template, cores, *args):
415 self.put_command(template.format(core, *args))
417 def set_pkt_size(self, cores, pkt_size):
418 """ set the packet size to generate on the remote instance """
419 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
421 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
424 def set_value(self, cores, offset, value, length):
425 """ set value on the remote instance """
426 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427 LOG.debug(msg, cores, value, length, offset)
428 template = "set value {} 0 {} {} {}\n"
429 self._run_template_over_cores(template, cores, offset, value, length)
431 def reset_values(self, cores):
432 """ reset values on the remote instance """
433 LOG.debug("Set value for core(s) %s", cores)
434 self._run_template_over_cores("reset values {} 0\n", cores)
436 def set_speed(self, cores, speed):
437 """ set speed on the remote instance """
438 LOG.debug("Set speed for core(s) %s to %g", cores, speed)
439 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
441 def slope_speed(self, cores_speed, duration, n_steps=0):
442 """will start to increase speed from 0 to N where N is taken from
443 a['speed'] for each a in cores_speed"""
444 # by default, each step will take 0.5 sec
446 n_steps = duration * 2
448 private_core_data = []
449 step_duration = float(duration) / n_steps
450 for core_data in cores_speed:
451 target = float(core_data['speed'])
452 private_core_data.append({
453 'cores': core_data['cores'],
455 'delta': target / n_steps,
460 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
461 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
462 time.sleep(step_duration)
463 for core_data in private_core_data:
464 core_data['current'] = core_data[key1] + core_data[key2]
465 self.set_speed(core_data['cores'], core_data['current'])
467 def set_pps(self, cores, pps, pkt_size):
468 """ set packets per second for specific cores on the remote instance """
469 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
470 LOG.debug(msg, cores, pps, pkt_size)
472 # speed in percent of line-rate
473 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
474 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
476 def lat_stats(self, cores, task=0):
477 """Get the latency statistics from the remote system"""
478 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
483 self.put_command("lat stats {} {} \n".format(core, task))
484 ret = self.get_data()
487 lat_min[core], lat_max[core], lat_avg[core] = \
488 tuple(int(n) for n in ret.split(",")[:3])
490 except (AttributeError, ValueError, TypeError):
493 return lat_min, lat_max, lat_avg
495 def get_all_tot_stats(self):
496 self.put_command("tot stats\n")
497 all_stats_str = self.get_data().split(",")
498 if len(all_stats_str) != 4:
501 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
502 self.master_stats = all_stats
506 return self.get_all_tot_stats()[3]
510 def rx_stats(self, cores, task=0):
511 return self.core_stats(cores, task)
513 def core_stats(self, cores, task=0):
514 """Get the receive statistics from the remote system"""
515 rx = tx = drop = tsc = 0
517 self.put_command("core stats {} {}\n".format(core, task))
518 ret = self.get_data().split(",")
523 return rx, tx, drop, tsc
525 def port_stats(self, ports):
526 """get counter values from a specific port"""
527 tot_result = [0] * 12
529 self.put_command("port_stats {}\n".format(port))
530 ret = [try_int(s, 0) for s in self.get_data().split(",")]
531 tot_result = [sum(x) for x in zip(tot_result, ret)]
535 def measure_tot_stats(self):
536 start = self.get_all_tot_stats()
537 container = {'start_tot': start}
541 container['end_tot'] = end = self.get_all_tot_stats()
543 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
546 """Get the total statistics from the remote system"""
547 stats = self.get_all_tot_stats()
550 def tot_ierrors(self):
551 """Get the total ierrors from the remote system"""
552 self.put_command("tot ierrors tot\n")
553 recv = self.get_data().split(',')
554 tot_ierrors = int(recv[0])
556 return tot_ierrors, tsc
558 def set_count(self, count, cores):
559 """Set the number of packets to send on the specified core"""
560 self._run_template_over_cores("count {} 0 {}\n", cores, count)
562 def dump_rx(self, core_id, task_id=0, count=1):
563 """Activate dump on rx on the specified core"""
564 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
565 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
566 time.sleep(1.5) # Give PROX time to set up packet dumping
574 """ stop all cores on the remote instance """
575 LOG.debug("Quit prox")
576 self.put_command("quit\n")
579 def force_quit(self):
580 """ stop all cores on the remote instance """
581 LOG.debug("Force Quit prox")
582 self.put_command("quit_force\n")
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587 # the actual app is lowercase
590 LUA_PARAMETER_NAME = ""
591 LUA_PARAMETER_PEER = {
596 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
597 self.remote_path = None
598 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
599 self.remote_prox_file_name = None
600 self.prox_config_dict = None
601 self.additional_files = {}
603 def _build_pipeline_kwargs(self):
604 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
605 self.pipeline_kwargs = {
606 'tool_path': tool_path,
607 'tool_dir': os.path.dirname(tool_path),
610 def copy_to_target(self, config_file_path, prox_file):
611 remote_path = os.path.join("/tmp", prox_file)
612 self.ssh_helper.put(config_file_path, remote_path)
616 def _get_tx_port(section, sections):
618 for item in sections[section]:
619 if item[0] == "tx port":
620 iface_port = re.findall(r'\d+', item[1])
621 # do we want the last one?
622 # if yes, then can we reverse?
623 return int(iface_port[0])
626 def _replace_quoted_with_value(quoted, value, count=1):
627 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
630 def _insert_additional_file(self, value):
631 file_str = value.split('"')
632 base_name = os.path.basename(file_str[1])
633 file_str[1] = self.additional_files[base_name]
634 return '"'.join(file_str)
636 def generate_prox_config_file(self, config_path):
638 prox_config = ConfigParser(config_path, sections)
641 # Ensure MAC is set "hardware"
642 ext_intf = self.vnfd_helper.interfaces
643 # we are using enumeration to map logical port numbers to interfaces
644 for port_num, intf in enumerate(ext_intf):
645 port_section_name = "port {}".format(port_num)
646 for section_name, section in sections:
647 if port_section_name != section_name:
650 for index, section_data in enumerate(section):
651 if section_data[0] == "mac":
652 section_data[1] = "hardware"
655 for _, section in sections:
656 # for index, (item_key, item_val) in enumerate(section):
657 for index, section_data in enumerate(section):
658 item_key, item_val = section_data
659 if item_val.startswith("@@dst_mac"):
660 tx_port_iter = re.finditer(r'\d+', item_val)
661 tx_port_no = int(next(tx_port_iter).group(0))
662 mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
663 section_data[1] = mac.replace(":", " ", 6)
665 if item_key == "dst mac" and item_val.startswith("@@"):
666 tx_port_iter = re.finditer(r'\d+', item_val)
667 tx_port_no = int(next(tx_port_iter).group(0))
668 mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
669 section_data[1] = mac
671 # if addition file specified in prox config
672 if not self.additional_files:
675 for section_name, section in sections:
676 for index, section_data in enumerate(section):
678 if section_data[0].startswith("dofile"):
679 section_data[0] = self._insert_additional_file(section_data[0])
681 if section_data[1].startswith("dofile"):
682 section_data[1] = self._insert_additional_file(section_data[1])
689 def write_prox_config(prox_config):
691 Write an .ini-format config file for PROX
692 PROX does not allow a space before/after the =, so we need
696 for i, (section_name, section) in enumerate(prox_config):
697 out.append("[{}]".format(section_name))
698 for index, item in enumerate(section):
700 if key == "__name__":
702 if value is not None and value != '@':
703 key = "=".join((key, str(value).replace('\n', '\n\t')))
706 key = str(key).replace('\n', '\n\t')
708 return os.linesep.join(out)
710 def put_string_to_file(self, s, remote_path):
711 file_obj = cStringIO(s)
712 self.ssh_helper.put_file_obj(file_obj, remote_path)
715 def generate_prox_lua_file(self):
717 ext_intf = self.vnfd_helper.interfaces
718 lua_param = self.LUA_PARAMETER_NAME
719 for intf in ext_intf:
720 peer = self.LUA_PARAMETER_PEER[lua_param]
721 port_num = intf["virtual-interface"]["dpdk_port_num"]
722 local_ip = intf["local_ip"]
723 dst_ip = intf["dst_ip"]
724 local_ip_hex = ip_to_hex(local_ip, separator=' ')
725 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
727 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
728 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
729 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
730 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
732 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
735 def upload_prox_lua(self, config_dir, prox_config_dict):
736 # we could have multiple lua directives
737 lau_dict = prox_config_dict.get('lua', {})
738 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
739 lua_file = next((found[0] for found in find_iter if found), None)
743 out = self.generate_prox_lua_file()
744 remote_path = os.path.join(config_dir, lua_file)
745 return self.put_string_to_file(out, remote_path)
747 def upload_prox_config(self, config_file, prox_config_dict):
748 # prox can't handle spaces around ' = ' so use custom method
749 out = StringIO(self.write_prox_config(prox_config_dict))
751 remote_path = os.path.join("/tmp", config_file)
752 self.ssh_helper.put_file_obj(out, remote_path)
756 def build_config_file(self):
757 task_path = self.scenario_helper.task_path
758 options = self.scenario_helper.options
759 config_path = options['prox_config']
760 config_file = os.path.basename(config_path)
761 config_path = find_relative_file(config_path, task_path)
762 self.additional_files = {}
764 prox_files = options.get('prox_files', [])
765 if isinstance(prox_files, six.string_types):
766 prox_files = [prox_files]
767 for key_prox_file in prox_files:
768 base_prox_file = os.path.basename(key_prox_file)
769 remote_prox_file = self.copy_to_target(key_prox_file, base_prox_file)
770 self.additional_files[base_prox_file] = remote_prox_file
772 self.prox_config_dict = self.generate_prox_config_file(config_path)
773 self.remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
775 def build_config(self):
777 options = self.scenario_helper.options
779 prox_args = options['prox_args']
780 LOG.info("Provision and start the %s", self.APP_NAME)
781 self._build_pipeline_kwargs()
782 self.pipeline_kwargs["args"] = " ".join(
783 " ".join([k, v if v else ""]) for k, v in prox_args.items())
784 self.pipeline_kwargs["cfg_file"] = self.remote_path
786 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
787 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
791 class ProxResourceHelper(ClientResourceHelper):
793 RESOURCE_WORD = 'prox'
794 PROX_CORE_GEN_MODE = "gen"
795 PROX_CORE_LAT_MODE = "lat"
796 PROX_CORE_MPLS_TEST = "MPLS tag/untag"
803 def line_rate_to_pps(pkt_size, n_ports):
804 # FIXME Don't hardcode 10Gb/s
805 return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
808 def find_pci(pci, bound_pci):
809 # we have to substring match PCI bus address from the end
810 return any(b.endswith(pci) for b in bound_pci)
812 def __init__(self, setup_helper):
813 super(ProxResourceHelper, self).__init__(setup_helper)
814 self.mgmt_interface = self.vnfd_helper.mgmt_interface
815 self._user = self.mgmt_interface["user"]
816 self._ip = self.mgmt_interface["ip"]
819 self._cpu_topology = None
820 self._vpci_to_if_name_map = None
821 self.additional_file = {}
822 self.remote_prox_file_name = None
825 self._test_cores = None
826 self._latency_cores = None
827 self._tagged_cores = None
828 self._plain_cores = None
833 self.client = self._connect()
837 def cpu_topology(self):
838 if not self._cpu_topology:
839 stdout = io.BytesIO()
840 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
841 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
842 return self._cpu_topology
845 def test_cores(self):
846 if not self._test_cores:
847 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
848 return self._test_cores
851 def mpls_cores(self):
852 if not self._tagged_cores:
853 self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
854 return self._tagged_cores, self._plain_cores
857 def tagged_cores(self):
858 return self.mpls_cores[0]
861 def plain_cores(self):
862 return self.mpls_cores[1]
865 def latency_cores(self):
866 if not self._latency_cores:
867 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
868 return self._latency_cores
870 def run_traffic(self, traffic_profile):
874 traffic_profile.init(self._queue)
875 # this frees up the run_traffic loop
876 self.client_started.value = 1
878 while not self._terminated.value:
879 # move it all to traffic_profile
880 self._run_traffic_once(traffic_profile)
882 def _run_traffic_once(self, traffic_profile):
883 traffic_profile.execute(self)
884 if traffic_profile.done:
885 self._queue.put({'done': True})
886 LOG.debug("tg_prox done")
887 self._terminated.value = 1
889 def start_collect(self):
893 # should not be called, use VNF terminate
894 raise NotImplementedError()
897 return self.sut # force connection
899 def execute(self, cmd, *args, **kwargs):
900 func = getattr(self.sut, cmd, None)
902 return func(*args, **kwargs)
905 def traffic_context(self, pkt_size, value):
907 self.sut.reset_stats()
908 if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
909 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
910 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
911 self.sut.set_speed(self.tagged_cores, value)
912 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
913 self.sut.set_speed(self.plain_cores, value * ratio)
915 self.sut.set_pkt_size(self.test_cores, pkt_size)
916 self.sut.set_speed(self.test_cores, value)
924 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
925 # type: (object, object, object, object) -> object
926 # do this assert in init? unless we expect interface count to
927 # change from one run to another run...
928 interfaces = self.vnfd_helper.interfaces
929 interface_count = len(interfaces)
930 assert interface_count in {1, 2, 4}, \
931 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
933 with self.traffic_context(pkt_size, value):
934 # Getting statistics to calculate PPS at right speed....
935 tsc_hz = float(self.sut.hz())
937 with self.sut.measure_tot_stats() as data:
940 # Get stats before stopping the cores. Stopping cores takes some time
941 # and might skew results otherwise.
942 latency = self.get_latency()
944 deltas = data['delta']
945 rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8]
946 pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count)
949 # we are currently using enumeration to map logical port num to interface
950 for index, iface in enumerate(interfaces):
951 port_rx_total, port_tx_total = self.sut.port_stats([index])[6:8]
952 samples[iface["name"]] = {"in_packets": port_rx_total,
953 "out_packets": port_tx_total}
955 result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
956 deltas.tsc, latency, rx_total, tx_total, pps)
958 return result, samples
960 def get_test_type(self):
962 for section_name, section in self.setup_helper.prox_config_dict:
963 if section_name != "global":
966 for key, value in section:
967 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
968 test_type = self.PROX_CORE_MPLS_TEST
972 def get_cores(self, mode):
975 for section_name, section in self.setup_helper.prox_config_dict:
976 if not section_name.startswith("core"):
979 for key, value in section:
980 if key == "mode" and value == mode:
981 core_tuple = CoreSocketTuple(section_name)
982 core = core_tuple.find_in_topology(self.cpu_topology)
987 def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
990 for section_name, section in self.setup_helper.prox_config_dict:
991 if not section_name.startswith("core"):
994 if all(key != "mode" or value != mode for key, value in section):
997 for item_key, item_value in section:
998 if item_key == "name" and item_value.startswith("tag"):
999 core_tuple = CoreSocketTuple(section_name)
1000 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1001 cores_tagged.append(core_tag)
1003 elif item_key == "name" and item_value.startswith("udp"):
1004 core_tuple = CoreSocketTuple(section_name)
1005 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1006 cores_plain.append(core_udp)
1008 return cores_tagged, cores_plain
1010 def get_latency(self):
1012 :return: return lat_min, lat_max, lat_avg
1015 if self._latency_cores:
1016 return self.sut.lat_stats(self._latency_cores)
1019 def _get_logical_if_name(self, vpci):
1020 return self._vpci_to_if_name_map[vpci]
1022 def _connect(self, client=None):
1023 """Run and connect to prox on the remote system """
1024 # De-allocating a large amount of hugepages takes some time. If a new
1025 # PROX instance is started immediately after killing the previous one,
1026 # it might not be able to allocate hugepages, because they are still
1027 # being freed. Hence the -w switch.
1028 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1029 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1030 # -f ./handle_none-4.cfg"
1031 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1033 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1034 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1036 # + "./build/Prox " + prox_args
1037 # log.debug("Starting PROX with command [%s]", prox_cmd)
1038 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1039 # self._ip, prox_cmd))
1041 client = ProxSocketHelper()
1043 # try connecting to Prox for 60s
1044 for _ in range(RETRY_SECONDS):
1045 time.sleep(RETRY_INTERVAL)
1047 client.connect(self._ip, PROX_PORT)
1048 except (socket.gaierror, socket.error):
1053 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1054 raise Exception(msg.format(self._ip, PROX_PORT))