1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,pps')):
133 return 1e2 * self.drop_total / float(self.tx_total)
134 except ZeroDivisionError:
139 # calculate the effective throughput in Mpps
140 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143 def can_be_lost(self):
144 return int(self.tx_total * self.tolerated / 1e2)
147 def drop_total(self):
148 return self.tx_total - self.rx_total
152 return self.drop_total <= self.can_be_lost
154 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
156 pkt_loss = self.pkt_loss
158 if port_samples is None:
168 "Throughput": self.mpps,
169 "DropPackets": pkt_loss,
170 "CurrentDropPackets": pkt_loss,
171 "TxThroughput": self.pps / 1e6,
172 "RxThroughput": self.mpps,
176 samples.update(port_samples)
178 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181 def log_data(self, logger=None):
185 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190 class PacketDump(object):
192 def assert_func(func, value1, value2, template=None):
193 assert func(value1, value2), template.format(value1, value2)
195 def __init__(self, port_id, data_len, payload):
196 template = "Packet dump has specified length {}, but payload is {} bytes long"
197 self.assert_func(operator.eq, data_len, len(payload), template)
198 self._port_id = port_id
199 self._data_len = data_len
200 self._payload = payload
204 """Get the port id of the packet dump"""
209 """Get the length of the data received"""
210 return self._data_len
213 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
215 def payload(self, start=None, end=None):
216 """Get part of the payload as a list of ordinals.
218 Returns a list of byte values, matching the contents of the packet dump.
219 Optional start and end parameters can be specified to retrieve only a
220 part of the packet contents.
222 The number of elements in the list is equal to end - start + 1, so end
223 is the offset of the last character.
226 start (pos. int): the starting offset in the payload. If it is not
227 specified or None, offset 0 is assumed.
228 end (pos. int): the ending offset of the payload. If it is not
229 specified or None, the contents until the end of the packet are
233 [int, int, ...]. Each int represents the ordinal value of a byte in
240 end = self.data_len - 1
242 # Bounds checking on offsets
243 template = "Start offset must be non-negative"
244 self.assert_func(operator.ge, start, 0, template)
246 template = "End offset must be less than {1}"
247 self.assert_func(operator.lt, end, self.data_len, template)
249 # Adjust for splice operation: end offset must be 1 more than the offset
250 # of the last desired character.
253 return self._payload[start:end]
256 class ProxSocketHelper(object):
258 def __init__(self, sock=None):
259 """ creates new prox instance """
260 super(ProxSocketHelper, self).__init__()
263 sock = socket.socket()
267 self.master_stats = None
269 def connect(self, ip, port):
270 """Connect to the prox instance on the remote system"""
271 self._sock.connect((ip, port))
273 def get_socket(self):
274 """ get the socket connected to the remote instance """
277 def _parse_socket_data(self, decoded_data, pkt_dump_only):
278 def get_newline_index():
279 return decoded_data.find('\n', index)
283 for newline_index in iter(get_newline_index, -1):
284 ret_str = decoded_data[index:newline_index]
287 mode, port_id, data_len = ret_str.split(',', 2)
289 mode, port_id, data_len = None, None, None
291 if mode != 'pktdump':
292 # Regular 1-line message. Stop reading from the socket.
293 LOG.debug("Regular response read")
296 LOG.debug("Packet dump header read: [%s]", ret_str)
298 # The line is a packet dump header. Parse it, read the
299 # packet payload, store the dump for later retrieval.
300 # Skip over the packet dump and continue processing: a
301 # 1-line response may follow the packet dump.
303 data_len = int(data_len)
304 data_start = newline_index + 1 # + 1 to skip over \n
305 data_end = data_start + data_len
306 sub_data = decoded_data[data_start:data_end]
307 pkt_payload = array.array('B', (ord(v) for v in sub_data))
308 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309 self._pkt_dumps.append(pkt_dump)
312 # Return boolean instead of string to signal
313 # successful reception of the packet dump.
314 LOG.debug("Packet dump stored, returning")
321 def get_data(self, pkt_dump_only=False, timeout=1):
322 """ read data from the socket """
324 # This method behaves slightly differently depending on whether it is
325 # called to read the response to a command (pkt_dump_only = 0) or if
326 # it is called specifically to read a packet dump (pkt_dump_only = 1).
328 # Packet dumps look like:
329 # pktdump,<port_id>,<data_len>\n
330 # <packet contents as byte array>\n
331 # This means the total packet dump message consists of 2 lines instead
334 # - Response for a command (pkt_dump_only = 0):
335 # 1) Read response from the socket until \n (end of message)
336 # 2a) If the response is a packet dump header (starts with "pktdump,"):
337 # - Read the packet payload and store the packet dump for later
339 # - Reset the state and restart from 1). Eventually state 2b) will
340 # be reached and the function will return.
341 # 2b) If the response is not a packet dump:
342 # - Return the received message as a string
344 # - Explicit request to read a packet dump (pkt_dump_only = 1):
345 # - Read the dump header and payload
346 # - Store the packet dump for later retrieval
347 # - Return True to signify a packet dump was successfully read
350 # recv() is blocking, so avoid calling it when no data is waiting.
351 ready = select.select([self._sock], [], [], timeout)
352 return bool(ready[0])
356 for status in iter(is_ready, False):
357 decoded_data = self._sock.recv(256).decode('utf-8')
358 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360 LOG.debug("Received data from socket: [%s]", ret_str)
361 return ret_str if status else ''
363 def put_command(self, to_send):
364 """ send data to the remote instance """
365 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367 # TODO: sendall will block, we need a timeout
368 self._sock.sendall(to_send.encode('utf-8'))
372 def get_packet_dump(self):
373 """ get the next packet dump """
375 return self._pkt_dumps.pop(0)
378 def stop_all_reset(self):
379 """ stop the remote instance and reset stats """
380 LOG.debug("Stop all and reset stats")
385 """ stop all cores on the remote instance """
386 LOG.debug("Stop all")
387 self.put_command("stop all\n")
390 def stop(self, cores, task=''):
391 """ stop specific cores on the remote instance """
392 LOG.debug("Stopping cores %s", cores)
393 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
397 """ start all cores on the remote instance """
398 LOG.debug("Start all")
399 self.put_command("start all\n")
401 def start(self, cores):
402 """ start specific cores on the remote instance """
403 LOG.debug("Starting cores %s", cores)
404 self.put_command("start {}\n".format(join_non_strings(',', cores)))
407 def reset_stats(self):
408 """ reset the statistics on the remote instance """
409 LOG.debug("Reset stats")
410 self.put_command("reset stats\n")
413 def _run_template_over_cores(self, template, cores, *args):
415 self.put_command(template.format(core, *args))
417 def set_pkt_size(self, cores, pkt_size):
418 """ set the packet size to generate on the remote instance """
419 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
421 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
424 def set_value(self, cores, offset, value, length):
425 """ set value on the remote instance """
426 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427 LOG.debug(msg, cores, value, length, offset)
428 template = "set value {} 0 {} {} {}\n"
429 self._run_template_over_cores(template, cores, offset, value, length)
431 def reset_values(self, cores):
432 """ reset values on the remote instance """
433 LOG.debug("Set value for core(s) %s", cores)
434 self._run_template_over_cores("reset values {} 0\n", cores)
436 def set_speed(self, cores, speed, tasks=None):
437 """ set speed on the remote instance """
439 tasks = [0] * len(cores)
440 elif len(tasks) != len(cores):
441 LOG.error("set_speed: cores and tasks must have the same len")
442 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
443 for (core, task) in list(zip(cores, tasks)):
444 self.put_command("speed {} {} {}\n".format(core, task, speed))
446 def slope_speed(self, cores_speed, duration, n_steps=0):
447 """will start to increase speed from 0 to N where N is taken from
448 a['speed'] for each a in cores_speed"""
449 # by default, each step will take 0.5 sec
451 n_steps = duration * 2
453 private_core_data = []
454 step_duration = float(duration) / n_steps
455 for core_data in cores_speed:
456 target = float(core_data['speed'])
457 private_core_data.append({
458 'cores': core_data['cores'],
460 'delta': target / n_steps,
465 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
466 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
467 time.sleep(step_duration)
468 for core_data in private_core_data:
469 core_data['current'] = core_data[key1] + core_data[key2]
470 self.set_speed(core_data['cores'], core_data['current'])
472 def set_pps(self, cores, pps, pkt_size):
473 """ set packets per second for specific cores on the remote instance """
474 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
475 LOG.debug(msg, cores, pps, pkt_size)
477 # speed in percent of line-rate
478 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
479 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
481 def lat_stats(self, cores, task=0):
482 """Get the latency statistics from the remote system"""
483 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
488 self.put_command("lat stats {} {} \n".format(core, task))
489 ret = self.get_data()
492 lat_min[core], lat_max[core], lat_avg[core] = \
493 tuple(int(n) for n in ret.split(",")[:3])
495 except (AttributeError, ValueError, TypeError):
498 return lat_min, lat_max, lat_avg
500 def get_all_tot_stats(self):
501 self.put_command("tot stats\n")
502 all_stats_str = self.get_data().split(",")
503 if len(all_stats_str) != 4:
506 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
507 self.master_stats = all_stats
511 return self.get_all_tot_stats()[3]
515 def rx_stats(self, cores, task=0):
516 return self.core_stats(cores, task)
518 def core_stats(self, cores, task=0):
519 """Get the receive statistics from the remote system"""
520 rx = tx = drop = tsc = 0
522 self.put_command("core stats {} {}\n".format(core, task))
523 ret = self.get_data().split(",")
528 return rx, tx, drop, tsc
530 def port_stats(self, ports):
531 """get counter values from a specific port"""
532 tot_result = [0] * 12
534 self.put_command("port_stats {}\n".format(port))
535 ret = [try_int(s, 0) for s in self.get_data().split(",")]
536 tot_result = [sum(x) for x in zip(tot_result, ret)]
540 def measure_tot_stats(self):
541 start = self.get_all_tot_stats()
542 container = {'start_tot': start}
546 container['end_tot'] = end = self.get_all_tot_stats()
548 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
551 """Get the total statistics from the remote system"""
552 stats = self.get_all_tot_stats()
555 def tot_ierrors(self):
556 """Get the total ierrors from the remote system"""
557 self.put_command("tot ierrors tot\n")
558 recv = self.get_data().split(',')
559 tot_ierrors = int(recv[0])
561 return tot_ierrors, tsc
563 def set_count(self, count, cores):
564 """Set the number of packets to send on the specified core"""
565 self._run_template_over_cores("count {} 0 {}\n", cores, count)
567 def dump_rx(self, core_id, task_id=0, count=1):
568 """Activate dump on rx on the specified core"""
569 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
570 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
571 time.sleep(1.5) # Give PROX time to set up packet dumping
579 """ stop all cores on the remote instance """
580 LOG.debug("Quit prox")
581 self.put_command("quit\n")
584 def force_quit(self):
585 """ stop all cores on the remote instance """
586 LOG.debug("Force Quit prox")
587 self.put_command("quit_force\n")
591 _LOCAL_OBJECT = object()
594 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
595 # the actual app is lowercase
597 # not used for Prox but added for consistency
600 LUA_PARAMETER_NAME = ""
601 LUA_PARAMETER_PEER = {
606 CONFIG_QUEUE_TIMEOUT = 120
608 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
609 self.remote_path = None
610 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
611 self.remote_prox_file_name = None
612 self._prox_config_data = None
613 self.additional_files = {}
614 self.config_queue = Queue()
615 # allow_exit_without_flush
616 self.config_queue.cancel_join_thread()
617 self._global_section = None
620 def prox_config_data(self):
621 if self._prox_config_data is None:
622 # this will block, but it needs too
623 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
624 return self._prox_config_data
627 def global_section(self):
628 if self._global_section is None and self.prox_config_data:
629 self._global_section = self.find_section("global")
630 return self._global_section
632 def find_section(self, name, default=_LOCAL_OBJECT):
633 result = next((value for key, value in self.prox_config_data if key == name), default)
634 if result is _LOCAL_OBJECT:
635 raise KeyError('{} not found in Prox config'.format(name))
638 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
639 section = self.find_section(section_name, [])
640 result = next((value for key, value in section if key == section_key), default)
641 if result is _LOCAL_OBJECT:
642 template = '{} not found in {} section of Prox config'
643 raise KeyError(template.format(section_key, section_name))
646 def _build_pipeline_kwargs(self):
647 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
648 self.pipeline_kwargs = {
649 'tool_path': tool_path,
650 'tool_dir': os.path.dirname(tool_path),
653 def copy_to_target(self, config_file_path, prox_file):
654 remote_path = os.path.join("/tmp", prox_file)
655 self.ssh_helper.put(config_file_path, remote_path)
659 def _get_tx_port(section, sections):
661 for item in sections[section]:
662 if item[0] == "tx port":
663 iface_port = re.findall(r'\d+', item[1])
664 # do we want the last one?
665 # if yes, then can we reverse?
666 return int(iface_port[0])
669 def _replace_quoted_with_value(quoted, value, count=1):
670 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
673 def _insert_additional_file(self, value):
674 file_str = value.split('"')
675 base_name = os.path.basename(file_str[1])
676 file_str[1] = self.additional_files[base_name]
677 return '"'.join(file_str)
679 def generate_prox_config_file(self, config_path):
681 prox_config = ConfigParser(config_path, sections)
684 # Ensure MAC is set "hardware"
685 all_ports = self.vnfd_helper.port_pairs.all_ports
686 # use dpdk port number
687 for port_name in all_ports:
688 port_num = self.vnfd_helper.port_num(port_name)
689 port_section_name = "port {}".format(port_num)
690 for section_name, section in sections:
691 if port_section_name != section_name:
694 for index, section_data in enumerate(section):
695 if section_data[0] == "mac":
696 section_data[1] = "hardware"
699 for _, section in sections:
700 # for index, (item_key, item_val) in enumerate(section):
701 for index, section_data in enumerate(section):
702 item_key, item_val = section_data
703 if item_val.startswith("@@dst_mac"):
704 tx_port_iter = re.finditer(r'\d+', item_val)
705 tx_port_no = int(next(tx_port_iter).group(0))
706 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
707 mac = intf["virtual-interface"]["dst_mac"]
708 section_data[1] = mac.replace(":", " ", 6)
710 if item_key == "dst mac" and item_val.startswith("@@"):
711 tx_port_iter = re.finditer(r'\d+', item_val)
712 tx_port_no = int(next(tx_port_iter).group(0))
713 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
714 mac = intf["virtual-interface"]["dst_mac"]
715 section_data[1] = mac
717 # if addition file specified in prox config
718 if not self.additional_files:
721 for section_name, section in sections:
722 for index, section_data in enumerate(section):
724 if section_data[0].startswith("dofile"):
725 section_data[0] = self._insert_additional_file(section_data[0])
727 if section_data[1].startswith("dofile"):
728 section_data[1] = self._insert_additional_file(section_data[1])
735 def write_prox_config(prox_config):
737 Write an .ini-format config file for PROX
738 PROX does not allow a space before/after the =, so we need
742 for i, (section_name, section) in enumerate(prox_config):
743 out.append("[{}]".format(section_name))
744 for index, item in enumerate(section):
746 if key == "__name__":
748 if value is not None and value != '@':
749 key = "=".join((key, str(value).replace('\n', '\n\t')))
752 key = str(key).replace('\n', '\n\t')
754 return os.linesep.join(out)
756 def put_string_to_file(self, s, remote_path):
757 file_obj = cStringIO(s)
758 self.ssh_helper.put_file_obj(file_obj, remote_path)
761 def generate_prox_lua_file(self):
763 all_ports = self.vnfd_helper.port_pairs.all_ports
764 lua_param = self.LUA_PARAMETER_NAME
765 for port_name in all_ports:
766 peer = self.LUA_PARAMETER_PEER[lua_param]
767 port_num = self.vnfd_helper.port_num(port_name)
768 intf = self.vnfd_helper.find_interface(name=port_name)
769 vintf = intf['virtual-interface']
770 local_ip = vintf["local_ip"]
771 dst_ip = vintf["dst_ip"]
772 local_ip_hex = ip_to_hex(local_ip, separator=' ')
773 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
775 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
776 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
777 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
778 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
780 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
783 def upload_prox_lua(self, config_dir, prox_config_data):
784 # we could have multiple lua directives
785 lau_dict = prox_config_data.get('lua', {})
786 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
787 lua_file = next((found[0] for found in find_iter if found), None)
791 out = self.generate_prox_lua_file()
792 remote_path = os.path.join(config_dir, lua_file)
793 return self.put_string_to_file(out, remote_path)
795 def upload_prox_config(self, config_file, prox_config_data):
796 # prox can't handle spaces around ' = ' so use custom method
797 out = StringIO(self.write_prox_config(prox_config_data))
799 remote_path = os.path.join("/tmp", config_file)
800 self.ssh_helper.put_file_obj(out, remote_path)
804 def build_config_file(self):
805 task_path = self.scenario_helper.task_path
806 options = self.scenario_helper.options
807 config_path = options['prox_config']
808 config_file = os.path.basename(config_path)
809 config_path = find_relative_file(config_path, task_path)
810 self.additional_files = {}
812 prox_files = options.get('prox_files', [])
813 if isinstance(prox_files, six.string_types):
814 prox_files = [prox_files]
815 for key_prox_file in prox_files:
816 base_prox_file = os.path.basename(key_prox_file)
817 key_prox_path = find_relative_file(key_prox_file, task_path)
818 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
819 self.additional_files[base_prox_file] = remote_prox_file
821 self._prox_config_data = self.generate_prox_config_file(config_path)
822 # copy config to queue so we can read it from traffic_runner process
823 self.config_queue.put(self._prox_config_data)
824 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
826 def build_config(self):
827 self.build_config_file()
829 options = self.scenario_helper.options
831 prox_args = options['prox_args']
832 LOG.info("Provision and start the %s", self.APP_NAME)
833 self._build_pipeline_kwargs()
834 self.pipeline_kwargs["args"] = " ".join(
835 " ".join([k, v if v else ""]) for k, v in prox_args.items())
836 self.pipeline_kwargs["cfg_file"] = self.remote_path
838 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
839 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
843 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
844 class ProxResourceHelper(ClientResourceHelper):
846 RESOURCE_WORD = 'prox'
853 def find_pci(pci, bound_pci):
854 # we have to substring match PCI bus address from the end
855 return any(b.endswith(pci) for b in bound_pci)
857 def __init__(self, setup_helper):
858 super(ProxResourceHelper, self).__init__(setup_helper)
859 self.mgmt_interface = self.vnfd_helper.mgmt_interface
860 self._user = self.mgmt_interface["user"]
861 self._ip = self.mgmt_interface["ip"]
864 self._vpci_to_if_name_map = None
865 self.additional_file = {}
866 self.remote_prox_file_name = None
871 self._test_type = None
876 self.client = self._connect()
881 if self._test_type is None:
882 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
883 return self._test_type
885 def run_traffic(self, traffic_profile):
886 self._queue.cancel_join_thread()
890 traffic_profile.init(self._queue)
891 # this frees up the run_traffic loop
892 self.client_started.value = 1
894 while not self._terminated.value:
895 # move it all to traffic_profile
896 self._run_traffic_once(traffic_profile)
898 def _run_traffic_once(self, traffic_profile):
899 traffic_profile.execute_traffic(self)
900 if traffic_profile.done:
901 self._queue.put({'done': True})
902 LOG.debug("tg_prox done")
903 self._terminated.value = 1
905 # For VNF use ResourceHelper method to collect KPIs directly.
906 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
907 def collect_collectd_kpi(self):
908 return self._collect_resource_kpi()
910 def collect_kpi(self):
911 result = super(ProxResourceHelper, self).collect_kpi()
912 # add in collectd kpis manually
914 result['collect_stats'] = self._collect_resource_kpi()
918 # should not be called, use VNF terminate
919 raise NotImplementedError()
922 return self.sut # force connection
924 def execute(self, cmd, *args, **kwargs):
925 func = getattr(self.sut, cmd, None)
927 return func(*args, **kwargs)
929 def _connect(self, client=None):
930 """Run and connect to prox on the remote system """
931 # De-allocating a large amount of hugepages takes some time. If a new
932 # PROX instance is started immediately after killing the previous one,
933 # it might not be able to allocate hugepages, because they are still
934 # being freed. Hence the -w switch.
935 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
936 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
937 # -f ./handle_none-4.cfg"
938 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
940 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
941 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
943 # + "./build/Prox " + prox_args
944 # log.debug("Starting PROX with command [%s]", prox_cmd)
945 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
946 # self._ip, prox_cmd))
948 client = ProxSocketHelper()
950 # try connecting to Prox for 60s
951 for _ in range(RETRY_SECONDS):
952 time.sleep(RETRY_INTERVAL)
954 client.connect(self._ip, PROX_PORT)
955 except (socket.gaierror, socket.error):
960 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
961 raise Exception(msg.format(self._ip, PROX_PORT))
964 class ProxDataHelper(object):
966 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
967 super(ProxDataHelper, self).__init__()
968 self.vnfd_helper = vnfd_helper
970 self.pkt_size = pkt_size
972 self.tolerated_loss = tolerated_loss
973 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
975 self.measured_stats = None
977 self._totals_and_pps = None
978 self.result_tuple = None
981 def totals_and_pps(self):
982 if self._totals_and_pps is None:
983 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
984 pps = self.value / 100.0 * self.line_rate_to_pps()
985 self._totals_and_pps = rx_total, tx_total, pps
986 return self._totals_and_pps
990 return self.totals_and_pps[0]
994 return self.totals_and_pps[1]
998 return self.totals_and_pps[2]
1003 for port_name, port_num in self.vnfd_helper.ports_iter():
1004 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1005 samples[port_name] = {
1006 "in_packets": port_rx_total,
1007 "out_packets": port_tx_total,
1011 def __enter__(self):
1012 self.check_interface_count()
1015 def __exit__(self, exc_type, exc_val, exc_tb):
1018 def make_tuple(self):
1019 if self.result_tuple:
1022 self.result_tuple = ProxTestDataTuple(
1023 self.tolerated_loss,
1025 self.measured_stats['delta'].rx,
1026 self.measured_stats['delta'].tx,
1027 self.measured_stats['delta'].tsc,
1033 self.result_tuple.log_data()
1036 def measure_tot_stats(self):
1037 with self.sut.measure_tot_stats() as self.measured_stats:
1040 def check_interface_count(self):
1041 # do this assert in init? unless we expect interface count to
1042 # change from one run to another run...
1043 assert self.port_count in {1, 2, 4}, \
1044 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1046 def capture_tsc_hz(self):
1047 self.tsc_hz = float(self.sut.hz())
1049 def line_rate_to_pps(self):
1050 # FIXME Don't hardcode 10Gb/s
1051 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1054 class ProxProfileHelper(object):
1056 __prox_profile_type__ = "Generic"
1058 PROX_CORE_GEN_MODE = "gen"
1059 PROX_CORE_LAT_MODE = "lat"
1062 def get_cls(cls, helper_type):
1063 """Return class of specified type."""
1065 return ProxProfileHelper
1067 for profile_helper_class in utils.itersubclasses(cls):
1068 if helper_type == profile_helper_class.__prox_profile_type__:
1069 return profile_helper_class
1071 return ProxProfileHelper
1074 def make_profile_helper(cls, resource_helper):
1075 return cls.get_cls(resource_helper.test_type)(resource_helper)
1077 def __init__(self, resource_helper):
1078 super(ProxProfileHelper, self).__init__()
1079 self.resource_helper = resource_helper
1080 self._cpu_topology = None
1081 self._test_cores = None
1082 self._latency_cores = None
1085 def cpu_topology(self):
1086 if not self._cpu_topology:
1087 stdout = io.BytesIO()
1088 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1089 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1090 return self._cpu_topology
1093 def test_cores(self):
1094 if not self._test_cores:
1095 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1096 return self._test_cores
1099 def latency_cores(self):
1100 if not self._latency_cores:
1101 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1102 return self._latency_cores
1105 def traffic_context(self, pkt_size, value):
1107 self.sut.reset_stats()
1109 self.sut.set_pkt_size(self.test_cores, pkt_size)
1110 self.sut.set_speed(self.test_cores, value)
1111 self.sut.start_all()
1116 def get_cores(self, mode):
1119 for section_name, section in self.setup_helper.prox_config_data:
1120 if not section_name.startswith("core"):
1123 for key, value in section:
1124 if key == "mode" and value == mode:
1125 core_tuple = CoreSocketTuple(section_name)
1126 core = core_tuple.find_in_topology(self.cpu_topology)
1131 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1132 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1134 with data_helper, self.traffic_context(pkt_size, value):
1135 with data_helper.measure_tot_stats():
1136 time.sleep(duration)
1137 # Getting statistics to calculate PPS at right speed....
1138 data_helper.capture_tsc_hz()
1139 data_helper.latency = self.get_latency()
1141 return data_helper.result_tuple, data_helper.samples
1143 def get_latency(self):
1145 :return: return lat_min, lat_max, lat_avg
1148 if self._latency_cores:
1149 return self.sut.lat_stats(self._latency_cores)
1152 def terminate(self):
1155 def __getattr__(self, item):
1156 return getattr(self.resource_helper, item)
1159 class ProxMplsProfileHelper(ProxProfileHelper):
1161 __prox_profile_type__ = "MPLS tag/untag"
1163 def __init__(self, resource_helper):
1164 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1165 self._cores_tuple = None
1168 def mpls_cores(self):
1169 if not self._cores_tuple:
1170 self._cores_tuple = self.get_cores_mpls()
1171 return self._cores_tuple
1174 def tagged_cores(self):
1175 return self.mpls_cores[0]
1178 def plain_cores(self):
1179 return self.mpls_cores[1]
1181 def get_cores_mpls(self):
1184 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1185 if not section_name.startswith("core"):
1188 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1191 for item_key, item_value in section:
1192 if item_key != 'name':
1195 if item_value.startswith("tag"):
1196 core_tuple = CoreSocketTuple(section_name)
1197 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1198 cores_tagged.append(core_tag)
1200 elif item_value.startswith("udp"):
1201 core_tuple = CoreSocketTuple(section_name)
1202 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1203 cores_plain.append(core_udp)
1205 return cores_tagged, cores_plain
1208 def traffic_context(self, pkt_size, value):
1210 self.sut.reset_stats()
1212 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1213 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1214 self.sut.set_speed(self.tagged_cores, value)
1215 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1216 self.sut.set_speed(self.plain_cores, value * ratio)
1217 self.sut.start_all()
1223 class ProxBngProfileHelper(ProxProfileHelper):
1225 __prox_profile_type__ = "BNG gen"
1227 def __init__(self, resource_helper):
1228 super(ProxBngProfileHelper, self).__init__(resource_helper)
1229 self._cores_tuple = None
1232 def bng_cores(self):
1233 if not self._cores_tuple:
1234 self._cores_tuple = self.get_cores_gen_bng_qos()
1235 return self._cores_tuple
1238 def cpe_cores(self):
1239 return self.bng_cores[0]
1242 def inet_cores(self):
1243 return self.bng_cores[1]
1246 def arp_cores(self):
1247 return self.bng_cores[2]
1250 def arp_task_cores(self):
1251 return self.bng_cores[3]
1254 def all_rx_cores(self):
1255 return self.latency_cores
1257 def get_cores_gen_bng_qos(self):
1261 arp_tasks_core = [0]
1262 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1263 if not section_name.startswith("core"):
1266 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1269 for item_key, item_value in section:
1270 if item_key != 'name':
1273 if item_value.startswith("cpe"):
1274 core_tuple = CoreSocketTuple(section_name)
1275 cpe_core = core_tuple.find_in_topology(self.cpu_topology)
1276 cpe_cores.append(cpe_core)
1278 elif item_value.startswith("inet"):
1279 core_tuple = CoreSocketTuple(section_name)
1280 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1281 inet_cores.append(inet_core)
1283 elif item_value.startswith("arp"):
1284 core_tuple = CoreSocketTuple(section_name)
1285 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1286 arp_cores.append(arp_core)
1288 # We check the tasks/core separately
1289 if item_value.startswith("arp_task"):
1290 core_tuple = CoreSocketTuple(section_name)
1291 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1292 arp_tasks_core.append(arp_task_core)
1294 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1297 def traffic_context(self, pkt_size, value):
1298 # Tester is sending packets at the required speed already after
1299 # setup_test(). Just get the current statistics, sleep the required
1300 # amount of time and calculate packet loss.
1301 inet_pkt_size = pkt_size
1302 cpe_pkt_size = pkt_size - 24
1303 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1305 curr_up_speed = curr_down_speed = 0
1306 max_up_speed = max_down_speed = value
1308 max_down_speed = value * ratio
1310 max_up_speed = value / ratio
1316 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1318 self.sut.start(self.all_rx_cores)
1320 self.sut.stop(self.all_rx_cores)
1322 self.sut.reset_stats()
1324 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1325 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1327 self.sut.reset_values(self.cpe_cores)
1328 self.sut.reset_values(self.inet_cores)
1330 # Set correct IP and UDP lengths in packet headers
1332 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1333 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1334 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1335 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1338 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1339 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1340 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1341 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1342 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1343 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1345 # Sending ARP to initialize tables - need a few seconds of generation
1346 # to make sure all CPEs are initialized
1347 LOG.info("Initializing SUT: sending ARP packets")
1348 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1349 self.sut.set_speed(self.inet_cores, curr_up_speed)
1350 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1351 self.sut.start(self.arp_cores)
1354 # Ramp up the transmission speed. First go to the common speed, then
1355 # increase steps for the faster one.
1356 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1358 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1360 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1361 # The min(..., ...) takes care of 1) floating point rounding errors
1362 # that could make curr_*_speed to be slightly greater than
1363 # max_*_speed and 2) max_*_speed not being an exact multiple of
1365 if curr_up_speed < max_up_speed:
1366 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1367 if curr_down_speed < max_down_speed:
1368 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1370 self.sut.set_speed(self.inet_cores, curr_up_speed)
1371 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1372 time.sleep(self.step_time)
1374 LOG.info("Target speeds reached. Starting real test.")
1378 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1379 LOG.info("Test ended. Flushing NIC buffers")
1380 self.sut.start(self.all_rx_cores)
1382 self.sut.stop(self.all_rx_cores)
1384 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1385 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1387 with data_helper, self.traffic_context(pkt_size, value):
1388 with data_helper.measure_tot_stats():
1389 time.sleep(duration)
1390 # Getting statistics to calculate PPS at right speed....
1391 data_helper.capture_tsc_hz()
1392 data_helper.latency = self.get_latency()
1394 return data_helper.result_tuple, data_helper.samples