1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
31 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
37 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
86 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
88 def __new__(cls, *args):
90 matches = cls.CORE_RE.search(str(args[0]))
92 args = matches.groups()
94 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
95 'h' if args[2] else '')
97 except (AttributeError, TypeError, IndexError, ValueError):
98 raise ValueError('Invalid core spec {}'.format(args))
100 def is_hyperthread(self):
101 return self.hyperthread == 'h'
105 return int(self.is_hyperthread())
107 def find_in_topology(self, cpu_topology):
109 socket_core_match = cpu_topology[self.socket_id][self.core_id]
110 sorted_match = sorted(socket_core_match.values())
111 return sorted_match[self.index][0]
112 except (KeyError, IndexError):
113 template = "Core {}{} on socket {} does not exist"
114 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
117 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
119 def __new__(cls, *args):
121 assert args[0] is not str(args[0])
122 args = tuple(args[0])
123 except (AssertionError, IndexError, TypeError):
126 return super(TotStatsTuple, cls).__new__(cls, *args)
129 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
130 'delta_tx,delta_tsc,'
131 'latency,rx_total,tx_total,pps')):
136 return 1e2 * self.drop_total / float(self.tx_total)
137 except ZeroDivisionError:
142 # calculate the effective throughput in Mpps
143 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
146 def can_be_lost(self):
147 return int(self.tx_total * self.tolerated / 1e2)
150 def drop_total(self):
151 return self.tx_total - self.rx_total
155 return self.drop_total <= self.can_be_lost
157 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
159 pkt_loss = self.pkt_loss
161 if port_samples is None:
171 "Throughput": self.mpps,
172 "DropPackets": pkt_loss,
173 "CurrentDropPackets": pkt_loss,
174 "TxThroughput": self.pps / 1e6,
175 "RxThroughput": self.mpps,
179 samples.update(port_samples)
181 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
184 def log_data(self, logger=None):
188 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
189 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
190 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
193 class PacketDump(object):
196 def assert_func(func, value1, value2, template=None):
197 assert func(value1, value2), template.format(value1, value2)
199 def __init__(self, port_id, data_len, payload):
200 template = "Packet dump has specified length {}, but payload is {} bytes long"
201 self.assert_func(operator.eq, data_len, len(payload), template)
202 self._port_id = port_id
203 self._data_len = data_len
204 self._payload = payload
208 """Get the port id of the packet dump"""
213 """Get the length of the data received"""
214 return self._data_len
217 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
219 def payload(self, start=None, end=None):
220 """Get part of the payload as a list of ordinals.
222 Returns a list of byte values, matching the contents of the packet dump.
223 Optional start and end parameters can be specified to retrieve only a
224 part of the packet contents.
226 The number of elements in the list is equal to end - start + 1, so end
227 is the offset of the last character.
230 start (pos. int): the starting offset in the payload. If it is not
231 specified or None, offset 0 is assumed.
232 end (pos. int): the ending offset of the payload. If it is not
233 specified or None, the contents until the end of the packet are
237 [int, int, ...]. Each int represents the ordinal value of a byte in
244 end = self.data_len - 1
246 # Bounds checking on offsets
247 template = "Start offset must be non-negative"
248 self.assert_func(operator.ge, start, 0, template)
250 template = "End offset must be less than {1}"
251 self.assert_func(operator.lt, end, self.data_len, template)
253 # Adjust for splice operation: end offset must be 1 more than the offset
254 # of the last desired character.
257 return self._payload[start:end]
260 class ProxSocketHelper(object):
262 def __init__(self, sock=None):
263 """ creates new prox instance """
264 super(ProxSocketHelper, self).__init__()
267 sock = socket.socket()
272 def connect(self, ip, port):
273 """Connect to the prox instance on the remote system"""
274 self._sock.connect((ip, port))
276 def get_socket(self):
277 """ get the socket connected to the remote instance """
280 def _parse_socket_data(self, decoded_data, pkt_dump_only):
281 def get_newline_index():
282 return decoded_data.find('\n', index)
286 for newline_index in iter(get_newline_index, -1):
287 ret_str = decoded_data[index:newline_index]
290 mode, port_id, data_len = ret_str.split(',', 2)
292 mode, port_id, data_len = None, None, None
294 if mode != 'pktdump':
295 # Regular 1-line message. Stop reading from the socket.
296 LOG.debug("Regular response read")
299 LOG.debug("Packet dump header read: [%s]", ret_str)
301 # The line is a packet dump header. Parse it, read the
302 # packet payload, store the dump for later retrieval.
303 # Skip over the packet dump and continue processing: a
304 # 1-line response may follow the packet dump.
306 data_len = int(data_len)
307 data_start = newline_index + 1 # + 1 to skip over \n
308 data_end = data_start + data_len
309 sub_data = decoded_data[data_start:data_end]
310 pkt_payload = array.array('B', (ord(v) for v in sub_data))
311 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
312 self._pkt_dumps.append(pkt_dump)
315 # Return boolean instead of string to signal
316 # successful reception of the packet dump.
317 LOG.debug("Packet dump stored, returning")
324 def get_data(self, pkt_dump_only=False, timeout=1):
325 """ read data from the socket """
326 # This method behaves slightly differently depending on whether it is
327 # called to read the response to a command (pkt_dump_only = 0) or if
328 # it is called specifically to read a packet dump (pkt_dump_only = 1).
330 # Packet dumps look like:
331 # pktdump,<port_id>,<data_len>\n
332 # <packet contents as byte array>\n
333 # This means the total packet dump message consists of 2 lines instead
336 # - Response for a command (pkt_dump_only = 0):
337 # 1) Read response from the socket until \n (end of message)
338 # 2a) If the response is a packet dump header (starts with "pktdump,"):
339 # - Read the packet payload and store the packet dump for later
341 # - Reset the state and restart from 1). Eventually state 2b) will
342 # be reached and the function will return.
343 # 2b) If the response is not a packet dump:
344 # - Return the received message as a string
346 # - Explicit request to read a packet dump (pkt_dump_only = 1):
347 # - Read the dump header and payload
348 # - Store the packet dump for later retrieval
349 # - Return True to signify a packet dump was successfully read
352 # recv() is blocking, so avoid calling it when no data is waiting.
353 ready = select.select([self._sock], [], [], timeout)
354 return bool(ready[0])
358 for status in iter(is_ready, False):
359 decoded_data = self._sock.recv(256).decode('utf-8')
360 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
362 LOG.debug("Received data from socket: [%s]", ret_str)
363 return ret_str if status else ''
365 def put_command(self, to_send):
366 """ send data to the remote instance """
367 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
369 self._sock.sendall(to_send.encode('utf-8'))
373 def get_packet_dump(self):
374 """ get the next packet dump """
376 return self._pkt_dumps.pop(0)
379 def stop_all_reset(self):
380 """ stop the remote instance and reset stats """
381 LOG.debug("Stop all and reset stats")
386 """ stop all cores on the remote instance """
387 LOG.debug("Stop all")
388 self.put_command("stop all\n")
391 def stop(self, cores, task=''):
392 """ stop specific cores on the remote instance """
393 LOG.debug("Stopping cores %s", cores)
394 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
398 """ start all cores on the remote instance """
399 LOG.debug("Start all")
400 self.put_command("start all\n")
402 def start(self, cores):
403 """ start specific cores on the remote instance """
404 LOG.debug("Starting cores %s", cores)
405 self.put_command("start {}\n".format(join_non_strings(',', cores)))
408 def reset_stats(self):
409 """ reset the statistics on the remote instance """
410 LOG.debug("Reset stats")
411 self.put_command("reset stats\n")
414 def _run_template_over_cores(self, template, cores, *args):
416 self.put_command(template.format(core, *args))
418 def set_pkt_size(self, cores, pkt_size):
419 """ set the packet size to generate on the remote instance """
420 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
422 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
425 def set_value(self, cores, offset, value, length):
426 """ set value on the remote instance """
427 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
428 LOG.debug(msg, cores, value, length, offset)
429 template = "set value {} 0 {} {} {}\n"
430 self._run_template_over_cores(template, cores, offset, value, length)
432 def reset_values(self, cores):
433 """ reset values on the remote instance """
434 LOG.debug("Set value for core(s) %s", cores)
435 self._run_template_over_cores("reset values {} 0\n", cores)
437 def set_speed(self, cores, speed):
438 """ set speed on the remote instance """
439 LOG.debug("Set speed for core(s) %s to %g", cores, speed)
440 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
442 def slope_speed(self, cores_speed, duration, n_steps=0):
443 """will start to increase speed from 0 to N where N is taken from
444 a['speed'] for each a in cores_speed"""
445 # by default, each step will take 0.5 sec
447 n_steps = duration * 2
449 private_core_data = []
450 step_duration = float(duration) / n_steps
451 for core_data in cores_speed:
452 target = float(core_data['speed'])
453 private_core_data.append({
454 'cores': core_data['cores'],
456 'delta': target / n_steps,
461 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
462 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
463 time.sleep(step_duration)
464 for core_data in private_core_data:
465 core_data['current'] = core_data[key1] + core_data[key2]
466 self.set_speed(core_data['cores'], core_data['current'])
468 def set_pps(self, cores, pps, pkt_size):
469 """ set packets per second for specific cores on the remote instance """
470 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
471 LOG.debug(msg, cores, pps, pkt_size)
473 # speed in percent of line-rate
474 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
475 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
477 def lat_stats(self, cores, task=0):
478 """Get the latency statistics from the remote system"""
479 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
484 self.put_command("lat stats {} {} \n".format(core, task))
485 ret = self.get_data()
488 lat_min[core], lat_max[core], lat_avg[core] = \
489 tuple(int(n) for n in ret.split(",")[:3])
491 except (AttributeError, ValueError, TypeError):
494 return lat_min, lat_max, lat_avg
496 def get_all_tot_stats(self):
497 self.put_command("tot stats\n")
498 all_stats_str = self.get_data().split(",")
499 if len(all_stats_str) != 4:
502 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
503 self.master_stats = all_stats
507 return self.get_all_tot_stats()[3]
511 def rx_stats(self, cores, task=0):
512 return self.core_stats(cores, task)
514 def core_stats(self, cores, task=0):
515 """Get the receive statistics from the remote system"""
516 rx = tx = drop = tsc = 0
518 self.put_command("core stats {} {}\n".format(core, task))
519 ret = self.get_data().split(",")
524 return rx, tx, drop, tsc
526 def port_stats(self, ports):
527 """get counter values from a specific port"""
528 tot_result = [0] * 12
530 self.put_command("port_stats {}\n".format(port))
531 ret = [try_int(s, 0) for s in self.get_data().split(",")]
532 tot_result = [sum(x) for x in zip(tot_result, ret)]
536 def measure_tot_stats(self):
537 start = self.get_all_tot_stats()
538 container = {'start_tot': start}
542 container['end_tot'] = end = self.get_all_tot_stats()
544 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
547 """Get the total statistics from the remote system"""
548 stats = self.get_all_tot_stats()
551 def tot_ierrors(self):
552 """Get the total ierrors from the remote system"""
553 self.put_command("tot ierrors tot\n")
554 recv = self.get_data().split(',')
555 tot_ierrors = int(recv[0])
557 return tot_ierrors, tsc
559 def set_count(self, count, cores):
560 """Set the number of packets to send on the specified core"""
561 self._run_template_over_cores("count {} 0 {}\n", cores, count)
563 def dump_rx(self, core_id, task_id=0, count=1):
564 """Activate dump on rx on the specified core"""
565 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
566 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
567 time.sleep(1.5) # Give PROX time to set up packet dumping
575 """ stop all cores on the remote instance """
576 LOG.debug("Quit prox")
577 self.put_command("quit\n")
580 def force_quit(self):
581 """ stop all cores on the remote instance """
582 LOG.debug("Force Quit prox")
583 self.put_command("quit_force\n")
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588 # the actual app is lowercase
591 LUA_PARAMETER_NAME = ""
592 LUA_PARAMETER_PEER = {
597 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
598 self.remote_path = None
599 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
600 self.remote_prox_file_name = None
601 self._prox_config_data = None
602 self.additional_files = {}
603 self.config_queue = Queue()
605 def _build_pipeline_kwargs(self):
606 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
607 self.pipeline_kwargs = {
608 'tool_path': tool_path,
609 'tool_dir': os.path.dirname(tool_path),
612 def copy_to_target(self, config_file_path, prox_file):
613 remote_path = os.path.join("/tmp", prox_file)
614 self.ssh_helper.put(config_file_path, remote_path)
618 def _get_tx_port(section, sections):
620 for item in sections[section]:
621 if item[0] == "tx port":
622 iface_port = re.findall(r'\d+', item[1])
623 # do we want the last one?
624 # if yes, then can we reverse?
625 return int(iface_port[0])
628 def _replace_quoted_with_value(quoted, value, count=1):
629 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
632 def _insert_additional_file(self, value):
633 file_str = value.split('"')
634 base_name = os.path.basename(file_str[1])
635 file_str[1] = self.additional_files[base_name]
636 return '"'.join(file_str)
638 def generate_prox_config_file(self, config_path):
640 prox_config = ConfigParser(config_path, sections)
643 # Ensure MAC is set "hardware"
644 all_ports = self.vnfd_helper.port_pairs.all_ports
645 # use dpdk port number
646 for port_name in all_ports:
647 port_num = self.vnfd_helper.port_num(port_name)
648 port_section_name = "port {}".format(port_num)
649 for section_name, section in sections:
650 if port_section_name != section_name:
653 for index, section_data in enumerate(section):
654 if section_data[0] == "mac":
655 section_data[1] = "hardware"
658 for _, section in sections:
659 # for index, (item_key, item_val) in enumerate(section):
660 for index, section_data in enumerate(section):
661 item_key, item_val = section_data
662 if item_val.startswith("@@dst_mac"):
663 tx_port_iter = re.finditer(r'\d+', item_val)
664 tx_port_no = int(next(tx_port_iter).group(0))
665 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
666 mac = intf["virtual-interface"]["dst_mac"]
667 section_data[1] = mac.replace(":", " ", 6)
669 if item_key == "dst mac" and item_val.startswith("@@"):
670 tx_port_iter = re.finditer(r'\d+', item_val)
671 tx_port_no = int(next(tx_port_iter).group(0))
672 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
673 mac = intf["virtual-interface"]["dst_mac"]
674 section_data[1] = mac
676 # if addition file specified in prox config
677 if not self.additional_files:
680 for section_name, section in sections:
681 for index, section_data in enumerate(section):
683 if section_data[0].startswith("dofile"):
684 section_data[0] = self._insert_additional_file(section_data[0])
686 if section_data[1].startswith("dofile"):
687 section_data[1] = self._insert_additional_file(section_data[1])
694 def write_prox_config(prox_config):
696 Write an .ini-format config file for PROX
697 PROX does not allow a space before/after the =, so we need
701 for i, (section_name, section) in enumerate(prox_config):
702 out.append("[{}]".format(section_name))
703 for index, item in enumerate(section):
705 if key == "__name__":
707 if value is not None and value != '@':
708 key = "=".join((key, str(value).replace('\n', '\n\t')))
711 key = str(key).replace('\n', '\n\t')
713 return os.linesep.join(out)
715 def put_string_to_file(self, s, remote_path):
716 file_obj = cStringIO(s)
717 self.ssh_helper.put_file_obj(file_obj, remote_path)
720 def generate_prox_lua_file(self):
722 all_ports = self.vnfd_helper.port_pairs.all_ports
723 lua_param = self.LUA_PARAMETER_NAME
724 for port_name in all_ports:
725 peer = self.LUA_PARAMETER_PEER[lua_param]
726 port_num = self.vnfd_helper.port_num(port_name)
727 intf = self.vnfd_helper.find_interface(name=port_name)
728 vintf = intf['virtual-interface']
729 local_ip = vintf["local_ip"]
730 dst_ip = vintf["dst_ip"]
731 local_ip_hex = ip_to_hex(local_ip, separator=' ')
732 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
734 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
735 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
736 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
737 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
739 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
742 def upload_prox_lua(self, config_dir, prox_config_dict):
743 # we could have multiple lua directives
744 lau_dict = prox_config_dict.get('lua', {})
745 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
746 lua_file = next((found[0] for found in find_iter if found), None)
750 out = self.generate_prox_lua_file()
751 remote_path = os.path.join(config_dir, lua_file)
752 return self.put_string_to_file(out, remote_path)
754 def upload_prox_config(self, config_file, prox_config_dict):
755 # prox can't handle spaces around ' = ' so use custom method
756 out = StringIO(self.write_prox_config(prox_config_dict))
758 remote_path = os.path.join("/tmp", config_file)
759 self.ssh_helper.put_file_obj(out, remote_path)
763 CONFIG_QUEUE_TIMEOUT = 120
766 def prox_config_data(self):
767 if self._prox_config_data is None:
768 # this will block, but it needs too
769 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
770 return self._prox_config_data
772 def build_config_file(self):
773 task_path = self.scenario_helper.task_path
774 options = self.scenario_helper.options
775 config_path = options['prox_config']
776 config_file = os.path.basename(config_path)
777 config_path = find_relative_file(config_path, task_path)
778 self.additional_files = {}
780 prox_files = options.get('prox_files', [])
781 if isinstance(prox_files, six.string_types):
782 prox_files = [prox_files]
783 for key_prox_file in prox_files:
784 base_prox_file = os.path.basename(key_prox_file)
785 key_prox_path = find_relative_file(key_prox_file, task_path)
786 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
787 self.additional_files[base_prox_file] = remote_prox_file
789 self._prox_config_data = self.generate_prox_config_file(config_path)
790 # copy config to queue so we can read it from traffic_runner process
791 self.config_queue.put(self._prox_config_data)
792 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
794 def build_config(self):
795 self.build_config_file()
797 options = self.scenario_helper.options
799 prox_args = options['prox_args']
800 LOG.info("Provision and start the %s", self.APP_NAME)
801 self._build_pipeline_kwargs()
802 self.pipeline_kwargs["args"] = " ".join(
803 " ".join([k, v if v else ""]) for k, v in prox_args.items())
804 self.pipeline_kwargs["cfg_file"] = self.remote_path
806 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
807 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
811 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
812 class ProxResourceHelper(ClientResourceHelper):
814 RESOURCE_WORD = 'prox'
815 PROX_CORE_GEN_MODE = "gen"
816 PROX_CORE_LAT_MODE = "lat"
817 PROX_CORE_MPLS_TEST = "MPLS tag/untag"
824 def line_rate_to_pps(pkt_size, n_ports):
825 # FIXME Don't hardcode 10Gb/s
826 return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
829 def find_pci(pci, bound_pci):
830 # we have to substring match PCI bus address from the end
831 return any(b.endswith(pci) for b in bound_pci)
833 def __init__(self, setup_helper):
834 super(ProxResourceHelper, self).__init__(setup_helper)
835 self.mgmt_interface = self.vnfd_helper.mgmt_interface
836 self._user = self.mgmt_interface["user"]
837 self._ip = self.mgmt_interface["ip"]
840 self._cpu_topology = None
841 self._vpci_to_if_name_map = None
842 self.additional_file = {}
843 self.remote_prox_file_name = None
846 self._test_cores = None
847 self._latency_cores = None
848 self._tagged_cores = None
849 self._plain_cores = None
854 self.client = self._connect()
858 def cpu_topology(self):
859 if not self._cpu_topology:
860 stdout = io.BytesIO()
861 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
862 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
863 return self._cpu_topology
866 def test_cores(self):
867 if not self._test_cores:
868 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
869 return self._test_cores
872 def mpls_cores(self):
873 if not self._tagged_cores:
874 self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
875 return self._tagged_cores, self._plain_cores
878 def tagged_cores(self):
879 return self.mpls_cores[0]
882 def plain_cores(self):
883 return self.mpls_cores[1]
886 def latency_cores(self):
887 if not self._latency_cores:
888 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
889 return self._latency_cores
891 def run_traffic(self, traffic_profile):
895 traffic_profile.init(self._queue)
896 # this frees up the run_traffic loop
897 self.client_started.value = 1
899 while not self._terminated.value:
900 # move it all to traffic_profile
901 self._run_traffic_once(traffic_profile)
903 def _run_traffic_once(self, traffic_profile):
904 traffic_profile.execute_traffic(self)
905 if traffic_profile.done:
906 self._queue.put({'done': True})
907 LOG.debug("tg_prox done")
908 self._terminated.value = 1
910 # For VNF use ResourceHelper method to collect KPIs directly.
911 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
912 def collect_collectd_kpi(self):
913 return self._collect_resource_kpi()
915 def collect_kpi(self):
916 result = super(ProxResourceHelper, self).collect_kpi()
917 # add in collectd kpis manually
919 result['collect_stats'] = self._collect_resource_kpi()
923 # should not be called, use VNF terminate
924 raise NotImplementedError()
927 return self.sut # force connection
929 def execute(self, cmd, *args, **kwargs):
930 func = getattr(self.sut, cmd, None)
932 return func(*args, **kwargs)
935 def traffic_context(self, pkt_size, value):
937 self.sut.reset_stats()
938 if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
939 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
940 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
941 self.sut.set_speed(self.tagged_cores, value)
942 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
943 self.sut.set_speed(self.plain_cores, value * ratio)
945 self.sut.set_pkt_size(self.test_cores, pkt_size)
946 self.sut.set_speed(self.test_cores, value)
954 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
955 # do this assert in init? unless we expect interface count to
956 # change from one run to another run...
957 ports = self.vnfd_helper.port_pairs.all_ports
958 port_count = len(ports)
959 assert port_count in {1, 2, 4}, \
960 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
962 with self.traffic_context(pkt_size, value):
963 # Getting statistics to calculate PPS at right speed....
964 tsc_hz = float(self.sut.hz())
966 with self.sut.measure_tot_stats() as data:
969 # Get stats before stopping the cores. Stopping cores takes some time
970 # and might skew results otherwise.
971 latency = self.get_latency()
973 deltas = data['delta']
974 rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8]
975 pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count)
978 # we are currently using enumeration to map logical port num to interface
979 for port_name in ports:
980 port = self.vnfd_helper.port_num(port_name)
981 port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8]
982 samples[port_name] = {
983 "in_packets": port_rx_total,
984 "out_packets": port_tx_total,
987 result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
988 deltas.tsc, latency, rx_total, tx_total, pps)
990 return result, samples
992 def get_test_type(self):
994 for section_name, section in self.setup_helper.prox_config_data:
995 if section_name != "global":
998 for key, value in section:
999 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
1000 test_type = self.PROX_CORE_MPLS_TEST
1004 def get_cores(self, mode):
1007 for section_name, section in self.setup_helper.prox_config_data:
1008 if not section_name.startswith("core"):
1011 for key, value in section:
1012 if key == "mode" and value == mode:
1013 core_tuple = CoreSocketTuple(section_name)
1014 core = core_tuple.find_in_topology(self.cpu_topology)
1019 def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
1022 for section_name, section in self.setup_helper.prox_config_data:
1023 if not section_name.startswith("core"):
1026 if all(key != "mode" or value != mode for key, value in section):
1029 for item_key, item_value in section:
1030 if item_key == "name" and item_value.startswith("tag"):
1031 core_tuple = CoreSocketTuple(section_name)
1032 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1033 cores_tagged.append(core_tag)
1035 elif item_key == "name" and item_value.startswith("udp"):
1036 core_tuple = CoreSocketTuple(section_name)
1037 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1038 cores_plain.append(core_udp)
1040 return cores_tagged, cores_plain
1042 def get_latency(self):
1044 :return: return lat_min, lat_max, lat_avg
1047 if self._latency_cores:
1048 return self.sut.lat_stats(self._latency_cores)
1051 def _connect(self, client=None):
1052 """Run and connect to prox on the remote system """
1053 # De-allocating a large amount of hugepages takes some time. If a new
1054 # PROX instance is started immediately after killing the previous one,
1055 # it might not be able to allocate hugepages, because they are still
1056 # being freed. Hence the -w switch.
1057 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1058 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1059 # -f ./handle_none-4.cfg"
1060 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1062 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
1063 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1065 # + "./build/Prox " + prox_args
1066 # log.debug("Starting PROX with command [%s]", prox_cmd)
1067 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1068 # self._ip, prox_cmd))
1070 client = ProxSocketHelper()
1072 # try connecting to Prox for 60s
1073 for _ in range(RETRY_SECONDS):
1074 time.sleep(RETRY_INTERVAL)
1076 client.connect(self._ip, PROX_PORT)
1077 except (socket.gaierror, socket.error):
1082 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1083 raise Exception(msg.format(self._ip, PROX_PORT))