1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
31 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
37 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
86 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
88 def __new__(cls, *args):
90 matches = cls.CORE_RE.search(str(args[0]))
92 args = matches.groups()
94 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
95 'h' if args[2] else '')
97 except (AttributeError, TypeError, IndexError, ValueError):
98 raise ValueError('Invalid core spec {}'.format(args))
100 def is_hyperthread(self):
101 return self.hyperthread == 'h'
105 return int(self.is_hyperthread())
107 def find_in_topology(self, cpu_topology):
109 socket_core_match = cpu_topology[self.socket_id][self.core_id]
110 sorted_match = sorted(socket_core_match.values())
111 return sorted_match[self.index][0]
112 except (KeyError, IndexError):
113 template = "Core {}{} on socket {} does not exist"
114 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
117 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
119 def __new__(cls, *args):
121 assert args[0] is not str(args[0])
122 args = tuple(args[0])
123 except (AssertionError, IndexError, TypeError):
126 return super(TotStatsTuple, cls).__new__(cls, *args)
129 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
130 'delta_tx,delta_tsc,'
131 'latency,rx_total,tx_total,pps')):
136 return 1e2 * self.drop_total / float(self.tx_total)
137 except ZeroDivisionError:
142 # calculate the effective throughput in Mpps
143 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
146 def can_be_lost(self):
147 return int(self.tx_total * self.tolerated / 1e2)
150 def drop_total(self):
151 return self.tx_total - self.rx_total
155 return self.drop_total <= self.can_be_lost
157 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
159 pkt_loss = self.pkt_loss
161 if port_samples is None:
171 "Throughput": self.mpps,
172 "DropPackets": pkt_loss,
173 "CurrentDropPackets": pkt_loss,
174 "TxThroughput": self.pps / 1e6,
175 "RxThroughput": self.mpps,
179 samples.update(port_samples)
181 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
184 def log_data(self, logger=None):
188 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
189 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
190 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
193 class PacketDump(object):
196 def assert_func(func, value1, value2, template=None):
197 assert func(value1, value2), template.format(value1, value2)
199 def __init__(self, port_id, data_len, payload):
200 template = "Packet dump has specified length {}, but payload is {} bytes long"
201 self.assert_func(operator.eq, data_len, len(payload), template)
202 self._port_id = port_id
203 self._data_len = data_len
204 self._payload = payload
208 """Get the port id of the packet dump"""
213 """Get the length of the data received"""
214 return self._data_len
217 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
219 def payload(self, start=None, end=None):
220 """Get part of the payload as a list of ordinals.
222 Returns a list of byte values, matching the contents of the packet dump.
223 Optional start and end parameters can be specified to retrieve only a
224 part of the packet contents.
226 The number of elements in the list is equal to end - start + 1, so end
227 is the offset of the last character.
230 start (pos. int): the starting offset in the payload. If it is not
231 specified or None, offset 0 is assumed.
232 end (pos. int): the ending offset of the payload. If it is not
233 specified or None, the contents until the end of the packet are
237 [int, int, ...]. Each int represents the ordinal value of a byte in
244 end = self.data_len - 1
246 # Bounds checking on offsets
247 template = "Start offset must be non-negative"
248 self.assert_func(operator.ge, start, 0, template)
250 template = "End offset must be less than {1}"
251 self.assert_func(operator.lt, end, self.data_len, template)
253 # Adjust for splice operation: end offset must be 1 more than the offset
254 # of the last desired character.
257 return self._payload[start:end]
260 class ProxSocketHelper(object):
262 def __init__(self, sock=None):
263 """ creates new prox instance """
264 super(ProxSocketHelper, self).__init__()
267 sock = socket.socket()
272 def connect(self, ip, port):
273 """Connect to the prox instance on the remote system"""
274 self._sock.connect((ip, port))
276 def get_socket(self):
277 """ get the socket connected to the remote instance """
280 def _parse_socket_data(self, decoded_data, pkt_dump_only):
281 def get_newline_index():
282 return decoded_data.find('\n', index)
286 for newline_index in iter(get_newline_index, -1):
287 ret_str = decoded_data[index:newline_index]
290 mode, port_id, data_len = ret_str.split(',', 2)
292 mode, port_id, data_len = None, None, None
294 if mode != 'pktdump':
295 # Regular 1-line message. Stop reading from the socket.
296 LOG.debug("Regular response read")
299 LOG.debug("Packet dump header read: [%s]", ret_str)
301 # The line is a packet dump header. Parse it, read the
302 # packet payload, store the dump for later retrieval.
303 # Skip over the packet dump and continue processing: a
304 # 1-line response may follow the packet dump.
306 data_len = int(data_len)
307 data_start = newline_index + 1 # + 1 to skip over \n
308 data_end = data_start + data_len
309 sub_data = decoded_data[data_start:data_end]
310 pkt_payload = array.array('B', (ord(v) for v in sub_data))
311 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
312 self._pkt_dumps.append(pkt_dump)
315 # Return boolean instead of string to signal
316 # successful reception of the packet dump.
317 LOG.debug("Packet dump stored, returning")
324 def get_data(self, pkt_dump_only=False, timeout=1):
325 """ read data from the socket """
326 # This method behaves slightly differently depending on whether it is
327 # called to read the response to a command (pkt_dump_only = 0) or if
328 # it is called specifically to read a packet dump (pkt_dump_only = 1).
330 # Packet dumps look like:
331 # pktdump,<port_id>,<data_len>\n
332 # <packet contents as byte array>\n
333 # This means the total packet dump message consists of 2 lines instead
336 # - Response for a command (pkt_dump_only = 0):
337 # 1) Read response from the socket until \n (end of message)
338 # 2a) If the response is a packet dump header (starts with "pktdump,"):
339 # - Read the packet payload and store the packet dump for later
341 # - Reset the state and restart from 1). Eventually state 2b) will
342 # be reached and the function will return.
343 # 2b) If the response is not a packet dump:
344 # - Return the received message as a string
346 # - Explicit request to read a packet dump (pkt_dump_only = 1):
347 # - Read the dump header and payload
348 # - Store the packet dump for later retrieval
349 # - Return True to signify a packet dump was successfully read
352 # recv() is blocking, so avoid calling it when no data is waiting.
353 ready = select.select([self._sock], [], [], timeout)
354 return bool(ready[0])
358 for status in iter(is_ready, False):
359 decoded_data = self._sock.recv(256).decode('utf-8')
360 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
362 LOG.debug("Received data from socket: [%s]", ret_str)
363 return ret_str if status else ''
365 def put_command(self, to_send):
366 """ send data to the remote instance """
367 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
369 self._sock.sendall(to_send.encode('utf-8'))
373 def get_packet_dump(self):
374 """ get the next packet dump """
376 return self._pkt_dumps.pop(0)
379 def stop_all_reset(self):
380 """ stop the remote instance and reset stats """
381 LOG.debug("Stop all and reset stats")
386 """ stop all cores on the remote instance """
387 LOG.debug("Stop all")
388 self.put_command("stop all\n")
391 def stop(self, cores, task=''):
392 """ stop specific cores on the remote instance """
393 LOG.debug("Stopping cores %s", cores)
394 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
398 """ start all cores on the remote instance """
399 LOG.debug("Start all")
400 self.put_command("start all\n")
402 def start(self, cores):
403 """ start specific cores on the remote instance """
404 LOG.debug("Starting cores %s", cores)
405 self.put_command("start {}\n".format(join_non_strings(',', cores)))
408 def reset_stats(self):
409 """ reset the statistics on the remote instance """
410 LOG.debug("Reset stats")
411 self.put_command("reset stats\n")
414 def _run_template_over_cores(self, template, cores, *args):
416 self.put_command(template.format(core, *args))
418 def set_pkt_size(self, cores, pkt_size):
419 """ set the packet size to generate on the remote instance """
420 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
422 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
425 def set_value(self, cores, offset, value, length):
426 """ set value on the remote instance """
427 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
428 LOG.debug(msg, cores, value, length, offset)
429 template = "set value {} 0 {} {} {}\n"
430 self._run_template_over_cores(template, cores, offset, value, length)
432 def reset_values(self, cores):
433 """ reset values on the remote instance """
434 LOG.debug("Set value for core(s) %s", cores)
435 self._run_template_over_cores("reset values {} 0\n", cores)
437 def set_speed(self, cores, speed):
438 """ set speed on the remote instance """
439 LOG.debug("Set speed for core(s) %s to %g", cores, speed)
440 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
442 def slope_speed(self, cores_speed, duration, n_steps=0):
443 """will start to increase speed from 0 to N where N is taken from
444 a['speed'] for each a in cores_speed"""
445 # by default, each step will take 0.5 sec
447 n_steps = duration * 2
449 private_core_data = []
450 step_duration = float(duration) / n_steps
451 for core_data in cores_speed:
452 target = float(core_data['speed'])
453 private_core_data.append({
454 'cores': core_data['cores'],
456 'delta': target / n_steps,
461 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
462 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
463 time.sleep(step_duration)
464 for core_data in private_core_data:
465 core_data['current'] = core_data[key1] + core_data[key2]
466 self.set_speed(core_data['cores'], core_data['current'])
468 def set_pps(self, cores, pps, pkt_size):
469 """ set packets per second for specific cores on the remote instance """
470 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
471 LOG.debug(msg, cores, pps, pkt_size)
473 # speed in percent of line-rate
474 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
475 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
477 def lat_stats(self, cores, task=0):
478 """Get the latency statistics from the remote system"""
479 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
484 self.put_command("lat stats {} {} \n".format(core, task))
485 ret = self.get_data()
488 lat_min[core], lat_max[core], lat_avg[core] = \
489 tuple(int(n) for n in ret.split(",")[:3])
491 except (AttributeError, ValueError, TypeError):
494 return lat_min, lat_max, lat_avg
496 def get_all_tot_stats(self):
497 self.put_command("tot stats\n")
498 all_stats_str = self.get_data().split(",")
499 if len(all_stats_str) != 4:
502 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
503 self.master_stats = all_stats
507 return self.get_all_tot_stats()[3]
511 def rx_stats(self, cores, task=0):
512 return self.core_stats(cores, task)
514 def core_stats(self, cores, task=0):
515 """Get the receive statistics from the remote system"""
516 rx = tx = drop = tsc = 0
518 self.put_command("core stats {} {}\n".format(core, task))
519 ret = self.get_data().split(",")
524 return rx, tx, drop, tsc
526 def port_stats(self, ports):
527 """get counter values from a specific port"""
528 tot_result = [0] * 12
530 self.put_command("port_stats {}\n".format(port))
531 ret = [try_int(s, 0) for s in self.get_data().split(",")]
532 tot_result = [sum(x) for x in zip(tot_result, ret)]
536 def measure_tot_stats(self):
537 start = self.get_all_tot_stats()
538 container = {'start_tot': start}
542 container['end_tot'] = end = self.get_all_tot_stats()
544 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
547 """Get the total statistics from the remote system"""
548 stats = self.get_all_tot_stats()
551 def tot_ierrors(self):
552 """Get the total ierrors from the remote system"""
553 self.put_command("tot ierrors tot\n")
554 recv = self.get_data().split(',')
555 tot_ierrors = int(recv[0])
557 return tot_ierrors, tsc
559 def set_count(self, count, cores):
560 """Set the number of packets to send on the specified core"""
561 self._run_template_over_cores("count {} 0 {}\n", cores, count)
563 def dump_rx(self, core_id, task_id=0, count=1):
564 """Activate dump on rx on the specified core"""
565 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
566 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
567 time.sleep(1.5) # Give PROX time to set up packet dumping
575 """ stop all cores on the remote instance """
576 LOG.debug("Quit prox")
577 self.put_command("quit\n")
580 def force_quit(self):
581 """ stop all cores on the remote instance """
582 LOG.debug("Force Quit prox")
583 self.put_command("quit_force\n")
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588 # the actual app is lowercase
591 LUA_PARAMETER_NAME = ""
592 LUA_PARAMETER_PEER = {
597 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
598 self.remote_path = None
599 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
600 self.remote_prox_file_name = None
601 self._prox_config_data = None
602 self.additional_files = {}
603 self.config_queue = Queue()
605 def _build_pipeline_kwargs(self):
606 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
607 self.pipeline_kwargs = {
608 'tool_path': tool_path,
609 'tool_dir': os.path.dirname(tool_path),
612 def copy_to_target(self, config_file_path, prox_file):
613 remote_path = os.path.join("/tmp", prox_file)
614 self.ssh_helper.put(config_file_path, remote_path)
618 def _get_tx_port(section, sections):
620 for item in sections[section]:
621 if item[0] == "tx port":
622 iface_port = re.findall(r'\d+', item[1])
623 # do we want the last one?
624 # if yes, then can we reverse?
625 return int(iface_port[0])
628 def _replace_quoted_with_value(quoted, value, count=1):
629 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
632 def _insert_additional_file(self, value):
633 file_str = value.split('"')
634 base_name = os.path.basename(file_str[1])
635 file_str[1] = self.additional_files[base_name]
636 return '"'.join(file_str)
638 def generate_prox_config_file(self, config_path):
640 prox_config = ConfigParser(config_path, sections)
643 # Ensure MAC is set "hardware"
644 all_ports = self.vnfd_helper.port_pairs.all_ports
645 # use dpdk port number
646 for port_name in all_ports:
647 port_num = self.vnfd_helper.port_num(port_name)
648 port_section_name = "port {}".format(port_num)
649 for section_name, section in sections:
650 if port_section_name != section_name:
653 for index, section_data in enumerate(section):
654 if section_data[0] == "mac":
655 section_data[1] = "hardware"
658 for _, section in sections:
659 # for index, (item_key, item_val) in enumerate(section):
660 for index, section_data in enumerate(section):
661 item_key, item_val = section_data
662 if item_val.startswith("@@dst_mac"):
663 tx_port_iter = re.finditer(r'\d+', item_val)
664 tx_port_no = int(next(tx_port_iter).group(0))
665 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
666 mac = intf["virtual-interface"]["dst_mac"]
667 section_data[1] = mac.replace(":", " ", 6)
669 if item_key == "dst mac" and item_val.startswith("@@"):
670 tx_port_iter = re.finditer(r'\d+', item_val)
671 tx_port_no = int(next(tx_port_iter).group(0))
672 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
673 mac = intf["virtual-interface"]["dst_mac"]
674 section_data[1] = mac
676 # if addition file specified in prox config
677 if not self.additional_files:
680 for section_name, section in sections:
681 for index, section_data in enumerate(section):
683 if section_data[0].startswith("dofile"):
684 section_data[0] = self._insert_additional_file(section_data[0])
686 if section_data[1].startswith("dofile"):
687 section_data[1] = self._insert_additional_file(section_data[1])
694 def write_prox_config(prox_config):
696 Write an .ini-format config file for PROX
697 PROX does not allow a space before/after the =, so we need
701 for i, (section_name, section) in enumerate(prox_config):
702 out.append("[{}]".format(section_name))
703 for index, item in enumerate(section):
705 if key == "__name__":
707 if value is not None and value != '@':
708 key = "=".join((key, str(value).replace('\n', '\n\t')))
711 key = str(key).replace('\n', '\n\t')
713 return os.linesep.join(out)
715 def put_string_to_file(self, s, remote_path):
716 file_obj = cStringIO(s)
717 self.ssh_helper.put_file_obj(file_obj, remote_path)
720 def generate_prox_lua_file(self):
722 all_ports = self.vnfd_helper.port_pairs.all_ports
723 lua_param = self.LUA_PARAMETER_NAME
724 for port_name in all_ports:
725 peer = self.LUA_PARAMETER_PEER[lua_param]
726 port_num = self.vnfd_helper.port_num(port_name)
727 intf = self.vnfd_helper.find_interface(name=port_name)
728 vintf = intf['virtual-interface']
729 local_ip = vintf["local_ip"]
730 dst_ip = vintf["dst_ip"]
731 local_ip_hex = ip_to_hex(local_ip, separator=' ')
732 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
734 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
735 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
736 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
737 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
739 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
742 def upload_prox_lua(self, config_dir, prox_config_dict):
743 # we could have multiple lua directives
744 lau_dict = prox_config_dict.get('lua', {})
745 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
746 lua_file = next((found[0] for found in find_iter if found), None)
750 out = self.generate_prox_lua_file()
751 remote_path = os.path.join(config_dir, lua_file)
752 return self.put_string_to_file(out, remote_path)
754 def upload_prox_config(self, config_file, prox_config_dict):
755 # prox can't handle spaces around ' = ' so use custom method
756 out = StringIO(self.write_prox_config(prox_config_dict))
758 remote_path = os.path.join("/tmp", config_file)
759 self.ssh_helper.put_file_obj(out, remote_path)
763 CONFIG_QUEUE_TIMEOUT = 120
766 def prox_config_data(self):
767 if self._prox_config_data is None:
768 # this will block, but it needs too
769 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
770 return self._prox_config_data
772 def build_config_file(self):
773 task_path = self.scenario_helper.task_path
774 options = self.scenario_helper.options
775 config_path = options['prox_config']
776 config_file = os.path.basename(config_path)
777 config_path = find_relative_file(config_path, task_path)
778 self.additional_files = {}
780 prox_files = options.get('prox_files', [])
781 if isinstance(prox_files, six.string_types):
782 prox_files = [prox_files]
783 for key_prox_file in prox_files:
784 base_prox_file = os.path.basename(key_prox_file)
785 remote_prox_file = self.copy_to_target(key_prox_file, base_prox_file)
786 self.additional_files[base_prox_file] = remote_prox_file
788 self._prox_config_data = self.generate_prox_config_file(config_path)
789 # copy config to queue so we can read it from traffic_runner process
790 self.config_queue.put(self._prox_config_data)
791 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
793 def build_config(self):
794 self.build_config_file()
796 options = self.scenario_helper.options
798 prox_args = options['prox_args']
799 LOG.info("Provision and start the %s", self.APP_NAME)
800 self._build_pipeline_kwargs()
801 self.pipeline_kwargs["args"] = " ".join(
802 " ".join([k, v if v else ""]) for k, v in prox_args.items())
803 self.pipeline_kwargs["cfg_file"] = self.remote_path
805 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
806 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
810 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
811 class ProxResourceHelper(ClientResourceHelper):
813 RESOURCE_WORD = 'prox'
814 PROX_CORE_GEN_MODE = "gen"
815 PROX_CORE_LAT_MODE = "lat"
816 PROX_CORE_MPLS_TEST = "MPLS tag/untag"
823 def line_rate_to_pps(pkt_size, n_ports):
824 # FIXME Don't hardcode 10Gb/s
825 return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
828 def find_pci(pci, bound_pci):
829 # we have to substring match PCI bus address from the end
830 return any(b.endswith(pci) for b in bound_pci)
832 def __init__(self, setup_helper):
833 super(ProxResourceHelper, self).__init__(setup_helper)
834 self.mgmt_interface = self.vnfd_helper.mgmt_interface
835 self._user = self.mgmt_interface["user"]
836 self._ip = self.mgmt_interface["ip"]
839 self._cpu_topology = None
840 self._vpci_to_if_name_map = None
841 self.additional_file = {}
842 self.remote_prox_file_name = None
845 self._test_cores = None
846 self._latency_cores = None
847 self._tagged_cores = None
848 self._plain_cores = None
853 self.client = self._connect()
857 def cpu_topology(self):
858 if not self._cpu_topology:
859 stdout = io.BytesIO()
860 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
861 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
862 return self._cpu_topology
865 def test_cores(self):
866 if not self._test_cores:
867 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
868 return self._test_cores
871 def mpls_cores(self):
872 if not self._tagged_cores:
873 self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
874 return self._tagged_cores, self._plain_cores
877 def tagged_cores(self):
878 return self.mpls_cores[0]
881 def plain_cores(self):
882 return self.mpls_cores[1]
885 def latency_cores(self):
886 if not self._latency_cores:
887 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
888 return self._latency_cores
890 def run_traffic(self, traffic_profile):
894 traffic_profile.init(self._queue)
895 # this frees up the run_traffic loop
896 self.client_started.value = 1
898 while not self._terminated.value:
899 # move it all to traffic_profile
900 self._run_traffic_once(traffic_profile)
902 def _run_traffic_once(self, traffic_profile):
903 traffic_profile.execute_traffic(self)
904 if traffic_profile.done:
905 self._queue.put({'done': True})
906 LOG.debug("tg_prox done")
907 self._terminated.value = 1
909 # use ResourceHelper method to collect KPIs directly.
910 def collect_kpi(self):
911 return self._collect_resource_kpi()
914 # should not be called, use VNF terminate
915 raise NotImplementedError()
918 return self.sut # force connection
920 def execute(self, cmd, *args, **kwargs):
921 func = getattr(self.sut, cmd, None)
923 return func(*args, **kwargs)
926 def traffic_context(self, pkt_size, value):
928 self.sut.reset_stats()
929 if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
930 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
931 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
932 self.sut.set_speed(self.tagged_cores, value)
933 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
934 self.sut.set_speed(self.plain_cores, value * ratio)
936 self.sut.set_pkt_size(self.test_cores, pkt_size)
937 self.sut.set_speed(self.test_cores, value)
945 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
946 # do this assert in init? unless we expect interface count to
947 # change from one run to another run...
948 ports = self.vnfd_helper.port_pairs.all_ports
949 port_count = len(ports)
950 assert port_count in {1, 2, 4}, \
951 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
953 with self.traffic_context(pkt_size, value):
954 # Getting statistics to calculate PPS at right speed....
955 tsc_hz = float(self.sut.hz())
957 with self.sut.measure_tot_stats() as data:
960 # Get stats before stopping the cores. Stopping cores takes some time
961 # and might skew results otherwise.
962 latency = self.get_latency()
964 deltas = data['delta']
965 rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8]
966 pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count)
969 # we are currently using enumeration to map logical port num to interface
970 for port_name in ports:
971 port = self.vnfd_helper.port_num(port_name)
972 port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8]
973 samples[port_name] = {
974 "in_packets": port_rx_total,
975 "out_packets": port_tx_total,
978 result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
979 deltas.tsc, latency, rx_total, tx_total, pps)
981 return result, samples
983 def get_test_type(self):
985 for section_name, section in self.setup_helper.prox_config_data:
986 if section_name != "global":
989 for key, value in section:
990 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
991 test_type = self.PROX_CORE_MPLS_TEST
995 def get_cores(self, mode):
998 for section_name, section in self.setup_helper.prox_config_data:
999 if not section_name.startswith("core"):
1002 for key, value in section:
1003 if key == "mode" and value == mode:
1004 core_tuple = CoreSocketTuple(section_name)
1005 core = core_tuple.find_in_topology(self.cpu_topology)
1010 def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
1013 for section_name, section in self.setup_helper.prox_config_data:
1014 if not section_name.startswith("core"):
1017 if all(key != "mode" or value != mode for key, value in section):
1020 for item_key, item_value in section:
1021 if item_key == "name" and item_value.startswith("tag"):
1022 core_tuple = CoreSocketTuple(section_name)
1023 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1024 cores_tagged.append(core_tag)
1026 elif item_key == "name" and item_value.startswith("udp"):
1027 core_tuple = CoreSocketTuple(section_name)
1028 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1029 cores_plain.append(core_udp)
1031 return cores_tagged, cores_plain
1033 def get_latency(self):
1035 :return: return lat_min, lat_max, lat_avg
1038 if self._latency_cores:
1039 return self.sut.lat_stats(self._latency_cores)
1042 def _connect(self, client=None):
1043 """Run and connect to prox on the remote system """
1044 # De-allocating a large amount of hugepages takes some time. If a new
1045 # PROX instance is started immediately after killing the previous one,
1046 # it might not be able to allocate hugepages, because they are still
1047 # being freed. Hence the -w switch.
1048 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1049 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1050 # -f ./handle_none-4.cfg"
1051 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1053 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1054 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1056 # + "./build/Prox " + prox_args
1057 # log.debug("Starting PROX with command [%s]", prox_cmd)
1058 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1059 # self._ip, prox_cmd))
1061 client = ProxSocketHelper()
1063 # try connecting to Prox for 60s
1064 for _ in range(RETRY_SECONDS):
1065 time.sleep(RETRY_INTERVAL)
1067 client.connect(self._ip, PROX_PORT)
1068 except (socket.gaierror, socket.error):
1073 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1074 raise Exception(msg.format(self._ip, PROX_PORT))