1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,pps')):
133 return 1e2 * self.drop_total / float(self.tx_total)
134 except ZeroDivisionError:
139 # calculate the effective throughput in Mpps
140 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143 def can_be_lost(self):
144 return int(self.tx_total * self.tolerated / 1e2)
147 def drop_total(self):
148 return self.tx_total - self.rx_total
152 return self.drop_total <= self.can_be_lost
154 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
156 pkt_loss = self.pkt_loss
158 if port_samples is None:
168 "Throughput": self.mpps,
169 "DropPackets": pkt_loss,
170 "CurrentDropPackets": pkt_loss,
171 "TxThroughput": self.pps / 1e6,
172 "RxThroughput": self.mpps,
176 samples.update(port_samples)
178 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181 def log_data(self, logger=None):
185 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190 class PacketDump(object):
192 def assert_func(func, value1, value2, template=None):
193 assert func(value1, value2), template.format(value1, value2)
195 def __init__(self, port_id, data_len, payload):
196 template = "Packet dump has specified length {}, but payload is {} bytes long"
197 self.assert_func(operator.eq, data_len, len(payload), template)
198 self._port_id = port_id
199 self._data_len = data_len
200 self._payload = payload
204 """Get the port id of the packet dump"""
209 """Get the length of the data received"""
210 return self._data_len
213 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
215 def payload(self, start=None, end=None):
216 """Get part of the payload as a list of ordinals.
218 Returns a list of byte values, matching the contents of the packet dump.
219 Optional start and end parameters can be specified to retrieve only a
220 part of the packet contents.
222 The number of elements in the list is equal to end - start + 1, so end
223 is the offset of the last character.
226 start (pos. int): the starting offset in the payload. If it is not
227 specified or None, offset 0 is assumed.
228 end (pos. int): the ending offset of the payload. If it is not
229 specified or None, the contents until the end of the packet are
233 [int, int, ...]. Each int represents the ordinal value of a byte in
240 end = self.data_len - 1
242 # Bounds checking on offsets
243 template = "Start offset must be non-negative"
244 self.assert_func(operator.ge, start, 0, template)
246 template = "End offset must be less than {1}"
247 self.assert_func(operator.lt, end, self.data_len, template)
249 # Adjust for splice operation: end offset must be 1 more than the offset
250 # of the last desired character.
253 return self._payload[start:end]
256 class ProxSocketHelper(object):
258 def __init__(self, sock=None):
259 """ creates new prox instance """
260 super(ProxSocketHelper, self).__init__()
263 sock = socket.socket()
267 self.master_stats = None
269 def connect(self, ip, port):
270 """Connect to the prox instance on the remote system"""
271 self._sock.connect((ip, port))
273 def get_socket(self):
274 """ get the socket connected to the remote instance """
277 def _parse_socket_data(self, decoded_data, pkt_dump_only):
278 def get_newline_index():
279 return decoded_data.find('\n', index)
283 for newline_index in iter(get_newline_index, -1):
284 ret_str = decoded_data[index:newline_index]
287 mode, port_id, data_len = ret_str.split(',', 2)
289 mode, port_id, data_len = None, None, None
291 if mode != 'pktdump':
292 # Regular 1-line message. Stop reading from the socket.
293 LOG.debug("Regular response read")
296 LOG.debug("Packet dump header read: [%s]", ret_str)
298 # The line is a packet dump header. Parse it, read the
299 # packet payload, store the dump for later retrieval.
300 # Skip over the packet dump and continue processing: a
301 # 1-line response may follow the packet dump.
303 data_len = int(data_len)
304 data_start = newline_index + 1 # + 1 to skip over \n
305 data_end = data_start + data_len
306 sub_data = decoded_data[data_start:data_end]
307 pkt_payload = array.array('B', (ord(v) for v in sub_data))
308 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309 self._pkt_dumps.append(pkt_dump)
312 # Return boolean instead of string to signal
313 # successful reception of the packet dump.
314 LOG.debug("Packet dump stored, returning")
321 def get_data(self, pkt_dump_only=False, timeout=1):
322 """ read data from the socket """
324 # This method behaves slightly differently depending on whether it is
325 # called to read the response to a command (pkt_dump_only = 0) or if
326 # it is called specifically to read a packet dump (pkt_dump_only = 1).
328 # Packet dumps look like:
329 # pktdump,<port_id>,<data_len>\n
330 # <packet contents as byte array>\n
331 # This means the total packet dump message consists of 2 lines instead
334 # - Response for a command (pkt_dump_only = 0):
335 # 1) Read response from the socket until \n (end of message)
336 # 2a) If the response is a packet dump header (starts with "pktdump,"):
337 # - Read the packet payload and store the packet dump for later
339 # - Reset the state and restart from 1). Eventually state 2b) will
340 # be reached and the function will return.
341 # 2b) If the response is not a packet dump:
342 # - Return the received message as a string
344 # - Explicit request to read a packet dump (pkt_dump_only = 1):
345 # - Read the dump header and payload
346 # - Store the packet dump for later retrieval
347 # - Return True to signify a packet dump was successfully read
350 # recv() is blocking, so avoid calling it when no data is waiting.
351 ready = select.select([self._sock], [], [], timeout)
352 return bool(ready[0])
356 for status in iter(is_ready, False):
357 decoded_data = self._sock.recv(256).decode('utf-8')
358 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360 LOG.debug("Received data from socket: [%s]", ret_str)
361 return ret_str if status else ''
363 def put_command(self, to_send):
364 """ send data to the remote instance """
365 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367 self._sock.sendall(to_send.encode('utf-8'))
371 def get_packet_dump(self):
372 """ get the next packet dump """
374 return self._pkt_dumps.pop(0)
377 def stop_all_reset(self):
378 """ stop the remote instance and reset stats """
379 LOG.debug("Stop all and reset stats")
384 """ stop all cores on the remote instance """
385 LOG.debug("Stop all")
386 self.put_command("stop all\n")
389 def stop(self, cores, task=''):
390 """ stop specific cores on the remote instance """
391 LOG.debug("Stopping cores %s", cores)
392 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
396 """ start all cores on the remote instance """
397 LOG.debug("Start all")
398 self.put_command("start all\n")
400 def start(self, cores):
401 """ start specific cores on the remote instance """
402 LOG.debug("Starting cores %s", cores)
403 self.put_command("start {}\n".format(join_non_strings(',', cores)))
406 def reset_stats(self):
407 """ reset the statistics on the remote instance """
408 LOG.debug("Reset stats")
409 self.put_command("reset stats\n")
412 def _run_template_over_cores(self, template, cores, *args):
414 self.put_command(template.format(core, *args))
416 def set_pkt_size(self, cores, pkt_size):
417 """ set the packet size to generate on the remote instance """
418 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
423 def set_value(self, cores, offset, value, length):
424 """ set value on the remote instance """
425 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426 LOG.debug(msg, cores, value, length, offset)
427 template = "set value {} 0 {} {} {}\n"
428 self._run_template_over_cores(template, cores, offset, value, length)
430 def reset_values(self, cores):
431 """ reset values on the remote instance """
432 LOG.debug("Set value for core(s) %s", cores)
433 self._run_template_over_cores("reset values {} 0\n", cores)
435 def set_speed(self, cores, speed, tasks=None):
436 """ set speed on the remote instance """
438 tasks = [0] * len(cores)
439 elif len(tasks) != len(cores):
440 LOG.error("set_speed: cores and tasks must have the same len")
441 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442 for (core, task) in list(zip(cores, tasks)):
443 self.put_command("speed {} {} {}\n".format(core, task, speed))
445 def slope_speed(self, cores_speed, duration, n_steps=0):
446 """will start to increase speed from 0 to N where N is taken from
447 a['speed'] for each a in cores_speed"""
448 # by default, each step will take 0.5 sec
450 n_steps = duration * 2
452 private_core_data = []
453 step_duration = float(duration) / n_steps
454 for core_data in cores_speed:
455 target = float(core_data['speed'])
456 private_core_data.append({
457 'cores': core_data['cores'],
459 'delta': target / n_steps,
464 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466 time.sleep(step_duration)
467 for core_data in private_core_data:
468 core_data['current'] = core_data[key1] + core_data[key2]
469 self.set_speed(core_data['cores'], core_data['current'])
471 def set_pps(self, cores, pps, pkt_size):
472 """ set packets per second for specific cores on the remote instance """
473 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474 LOG.debug(msg, cores, pps, pkt_size)
476 # speed in percent of line-rate
477 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
480 def lat_stats(self, cores, task=0):
481 """Get the latency statistics from the remote system"""
482 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
487 self.put_command("lat stats {} {} \n".format(core, task))
488 ret = self.get_data()
491 lat_min[core], lat_max[core], lat_avg[core] = \
492 tuple(int(n) for n in ret.split(",")[:3])
494 except (AttributeError, ValueError, TypeError):
497 return lat_min, lat_max, lat_avg
499 def get_all_tot_stats(self):
500 self.put_command("tot stats\n")
501 all_stats_str = self.get_data().split(",")
502 if len(all_stats_str) != 4:
505 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506 self.master_stats = all_stats
510 return self.get_all_tot_stats()[3]
514 def rx_stats(self, cores, task=0):
515 return self.core_stats(cores, task)
517 def core_stats(self, cores, task=0):
518 """Get the receive statistics from the remote system"""
519 rx = tx = drop = tsc = 0
521 self.put_command("core stats {} {}\n".format(core, task))
522 ret = self.get_data().split(",")
527 return rx, tx, drop, tsc
529 def port_stats(self, ports):
530 """get counter values from a specific port"""
531 tot_result = [0] * 12
533 self.put_command("port_stats {}\n".format(port))
534 ret = [try_int(s, 0) for s in self.get_data().split(",")]
535 tot_result = [sum(x) for x in zip(tot_result, ret)]
539 def measure_tot_stats(self):
540 start = self.get_all_tot_stats()
541 container = {'start_tot': start}
545 container['end_tot'] = end = self.get_all_tot_stats()
547 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
550 """Get the total statistics from the remote system"""
551 stats = self.get_all_tot_stats()
554 def tot_ierrors(self):
555 """Get the total ierrors from the remote system"""
556 self.put_command("tot ierrors tot\n")
557 recv = self.get_data().split(',')
558 tot_ierrors = int(recv[0])
560 return tot_ierrors, tsc
562 def set_count(self, count, cores):
563 """Set the number of packets to send on the specified core"""
564 self._run_template_over_cores("count {} 0 {}\n", cores, count)
566 def dump_rx(self, core_id, task_id=0, count=1):
567 """Activate dump on rx on the specified core"""
568 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
569 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
570 time.sleep(1.5) # Give PROX time to set up packet dumping
578 """ stop all cores on the remote instance """
579 LOG.debug("Quit prox")
580 self.put_command("quit\n")
583 def force_quit(self):
584 """ stop all cores on the remote instance """
585 LOG.debug("Force Quit prox")
586 self.put_command("quit_force\n")
590 _LOCAL_OBJECT = object()
593 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
594 # the actual app is lowercase
597 LUA_PARAMETER_NAME = ""
598 LUA_PARAMETER_PEER = {
603 CONFIG_QUEUE_TIMEOUT = 120
605 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
606 self.remote_path = None
607 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
608 self.remote_prox_file_name = None
609 self._prox_config_data = None
610 self.additional_files = {}
611 self.config_queue = Queue()
612 self._global_section = None
615 def prox_config_data(self):
616 if self._prox_config_data is None:
617 # this will block, but it needs too
618 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
619 return self._prox_config_data
622 def global_section(self):
623 if self._global_section is None and self.prox_config_data:
624 self._global_section = self.find_section("global")
625 return self._global_section
627 def find_section(self, name, default=_LOCAL_OBJECT):
628 result = next((value for key, value in self.prox_config_data if key == name), default)
629 if result is _LOCAL_OBJECT:
630 raise KeyError('{} not found in Prox config'.format(name))
633 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
634 section = self.find_section(section_name, [])
635 result = next((value for key, value in section if key == section_key), default)
636 if result is _LOCAL_OBJECT:
637 template = '{} not found in {} section of Prox config'
638 raise KeyError(template.format(section_key, section_name))
641 def _build_pipeline_kwargs(self):
642 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
643 self.pipeline_kwargs = {
644 'tool_path': tool_path,
645 'tool_dir': os.path.dirname(tool_path),
648 def copy_to_target(self, config_file_path, prox_file):
649 remote_path = os.path.join("/tmp", prox_file)
650 self.ssh_helper.put(config_file_path, remote_path)
654 def _get_tx_port(section, sections):
656 for item in sections[section]:
657 if item[0] == "tx port":
658 iface_port = re.findall(r'\d+', item[1])
659 # do we want the last one?
660 # if yes, then can we reverse?
661 return int(iface_port[0])
664 def _replace_quoted_with_value(quoted, value, count=1):
665 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
668 def _insert_additional_file(self, value):
669 file_str = value.split('"')
670 base_name = os.path.basename(file_str[1])
671 file_str[1] = self.additional_files[base_name]
672 return '"'.join(file_str)
674 def generate_prox_config_file(self, config_path):
676 prox_config = ConfigParser(config_path, sections)
679 # Ensure MAC is set "hardware"
680 all_ports = self.vnfd_helper.port_pairs.all_ports
681 # use dpdk port number
682 for port_name in all_ports:
683 port_num = self.vnfd_helper.port_num(port_name)
684 port_section_name = "port {}".format(port_num)
685 for section_name, section in sections:
686 if port_section_name != section_name:
689 for index, section_data in enumerate(section):
690 if section_data[0] == "mac":
691 section_data[1] = "hardware"
694 for _, section in sections:
695 # for index, (item_key, item_val) in enumerate(section):
696 for index, section_data in enumerate(section):
697 item_key, item_val = section_data
698 if item_val.startswith("@@dst_mac"):
699 tx_port_iter = re.finditer(r'\d+', item_val)
700 tx_port_no = int(next(tx_port_iter).group(0))
701 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
702 mac = intf["virtual-interface"]["dst_mac"]
703 section_data[1] = mac.replace(":", " ", 6)
705 if item_key == "dst mac" and item_val.startswith("@@"):
706 tx_port_iter = re.finditer(r'\d+', item_val)
707 tx_port_no = int(next(tx_port_iter).group(0))
708 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
709 mac = intf["virtual-interface"]["dst_mac"]
710 section_data[1] = mac
712 # if addition file specified in prox config
713 if not self.additional_files:
716 for section_name, section in sections:
717 for index, section_data in enumerate(section):
719 if section_data[0].startswith("dofile"):
720 section_data[0] = self._insert_additional_file(section_data[0])
722 if section_data[1].startswith("dofile"):
723 section_data[1] = self._insert_additional_file(section_data[1])
730 def write_prox_config(prox_config):
732 Write an .ini-format config file for PROX
733 PROX does not allow a space before/after the =, so we need
737 for i, (section_name, section) in enumerate(prox_config):
738 out.append("[{}]".format(section_name))
739 for index, item in enumerate(section):
741 if key == "__name__":
743 if value is not None and value != '@':
744 key = "=".join((key, str(value).replace('\n', '\n\t')))
747 key = str(key).replace('\n', '\n\t')
749 return os.linesep.join(out)
751 def put_string_to_file(self, s, remote_path):
752 file_obj = cStringIO(s)
753 self.ssh_helper.put_file_obj(file_obj, remote_path)
756 def generate_prox_lua_file(self):
758 all_ports = self.vnfd_helper.port_pairs.all_ports
759 lua_param = self.LUA_PARAMETER_NAME
760 for port_name in all_ports:
761 peer = self.LUA_PARAMETER_PEER[lua_param]
762 port_num = self.vnfd_helper.port_num(port_name)
763 intf = self.vnfd_helper.find_interface(name=port_name)
764 vintf = intf['virtual-interface']
765 local_ip = vintf["local_ip"]
766 dst_ip = vintf["dst_ip"]
767 local_ip_hex = ip_to_hex(local_ip, separator=' ')
768 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
770 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
771 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
772 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
773 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
775 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
778 def upload_prox_lua(self, config_dir, prox_config_data):
779 # we could have multiple lua directives
780 lau_dict = prox_config_data.get('lua', {})
781 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
782 lua_file = next((found[0] for found in find_iter if found), None)
786 out = self.generate_prox_lua_file()
787 remote_path = os.path.join(config_dir, lua_file)
788 return self.put_string_to_file(out, remote_path)
790 def upload_prox_config(self, config_file, prox_config_data):
791 # prox can't handle spaces around ' = ' so use custom method
792 out = StringIO(self.write_prox_config(prox_config_data))
794 remote_path = os.path.join("/tmp", config_file)
795 self.ssh_helper.put_file_obj(out, remote_path)
799 def build_config_file(self):
800 task_path = self.scenario_helper.task_path
801 options = self.scenario_helper.options
802 config_path = options['prox_config']
803 config_file = os.path.basename(config_path)
804 config_path = find_relative_file(config_path, task_path)
805 self.additional_files = {}
807 prox_files = options.get('prox_files', [])
808 if isinstance(prox_files, six.string_types):
809 prox_files = [prox_files]
810 for key_prox_file in prox_files:
811 base_prox_file = os.path.basename(key_prox_file)
812 key_prox_path = find_relative_file(key_prox_file, task_path)
813 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
814 self.additional_files[base_prox_file] = remote_prox_file
816 self._prox_config_data = self.generate_prox_config_file(config_path)
817 # copy config to queue so we can read it from traffic_runner process
818 self.config_queue.put(self._prox_config_data)
819 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
821 def build_config(self):
822 self.build_config_file()
824 options = self.scenario_helper.options
826 prox_args = options['prox_args']
827 LOG.info("Provision and start the %s", self.APP_NAME)
828 self._build_pipeline_kwargs()
829 self.pipeline_kwargs["args"] = " ".join(
830 " ".join([k, v if v else ""]) for k, v in prox_args.items())
831 self.pipeline_kwargs["cfg_file"] = self.remote_path
833 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
834 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
838 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
839 class ProxResourceHelper(ClientResourceHelper):
841 RESOURCE_WORD = 'prox'
848 def find_pci(pci, bound_pci):
849 # we have to substring match PCI bus address from the end
850 return any(b.endswith(pci) for b in bound_pci)
852 def __init__(self, setup_helper):
853 super(ProxResourceHelper, self).__init__(setup_helper)
854 self.mgmt_interface = self.vnfd_helper.mgmt_interface
855 self._user = self.mgmt_interface["user"]
856 self._ip = self.mgmt_interface["ip"]
859 self._vpci_to_if_name_map = None
860 self.additional_file = {}
861 self.remote_prox_file_name = None
866 self._test_type = None
871 self.client = self._connect()
876 if self._test_type is None:
877 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
878 return self._test_type
880 def run_traffic(self, traffic_profile):
884 traffic_profile.init(self._queue)
885 # this frees up the run_traffic loop
886 self.client_started.value = 1
888 while not self._terminated.value:
889 # move it all to traffic_profile
890 self._run_traffic_once(traffic_profile)
892 def _run_traffic_once(self, traffic_profile):
893 traffic_profile.execute_traffic(self)
894 if traffic_profile.done:
895 self._queue.put({'done': True})
896 LOG.debug("tg_prox done")
897 self._terminated.value = 1
899 # For VNF use ResourceHelper method to collect KPIs directly.
900 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
901 def collect_collectd_kpi(self):
902 return self._collect_resource_kpi()
904 def collect_kpi(self):
905 result = super(ProxResourceHelper, self).collect_kpi()
906 # add in collectd kpis manually
908 result['collect_stats'] = self._collect_resource_kpi()
912 # should not be called, use VNF terminate
913 raise NotImplementedError()
916 return self.sut # force connection
918 def execute(self, cmd, *args, **kwargs):
919 func = getattr(self.sut, cmd, None)
921 return func(*args, **kwargs)
923 def _connect(self, client=None):
924 """Run and connect to prox on the remote system """
925 # De-allocating a large amount of hugepages takes some time. If a new
926 # PROX instance is started immediately after killing the previous one,
927 # it might not be able to allocate hugepages, because they are still
928 # being freed. Hence the -w switch.
929 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
930 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
931 # -f ./handle_none-4.cfg"
932 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
934 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
935 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
937 # + "./build/Prox " + prox_args
938 # log.debug("Starting PROX with command [%s]", prox_cmd)
939 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
940 # self._ip, prox_cmd))
942 client = ProxSocketHelper()
944 # try connecting to Prox for 60s
945 for _ in range(RETRY_SECONDS):
946 time.sleep(RETRY_INTERVAL)
948 client.connect(self._ip, PROX_PORT)
949 except (socket.gaierror, socket.error):
954 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
955 raise Exception(msg.format(self._ip, PROX_PORT))
958 class ProxDataHelper(object):
960 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
961 super(ProxDataHelper, self).__init__()
962 self.vnfd_helper = vnfd_helper
964 self.pkt_size = pkt_size
966 self.tolerated_loss = tolerated_loss
967 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
969 self.measured_stats = None
971 self._totals_and_pps = None
972 self.result_tuple = None
975 def totals_and_pps(self):
976 if self._totals_and_pps is None:
977 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
978 pps = self.value / 100.0 * self.line_rate_to_pps()
979 self._totals_and_pps = rx_total, tx_total, pps
980 return self._totals_and_pps
984 return self.totals_and_pps[0]
988 return self.totals_and_pps[1]
992 return self.totals_and_pps[2]
997 for port_name, port_num in self.vnfd_helper.ports_iter():
998 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
999 samples[port_name] = {
1000 "in_packets": port_rx_total,
1001 "out_packets": port_tx_total,
1005 def __enter__(self):
1006 self.check_interface_count()
1009 def __exit__(self, exc_type, exc_val, exc_tb):
1012 def make_tuple(self):
1013 if self.result_tuple:
1016 self.result_tuple = ProxTestDataTuple(
1017 self.tolerated_loss,
1019 self.measured_stats['delta'].rx,
1020 self.measured_stats['delta'].tx,
1021 self.measured_stats['delta'].tsc,
1027 self.result_tuple.log_data()
1030 def measure_tot_stats(self):
1031 with self.sut.measure_tot_stats() as self.measured_stats:
1034 def check_interface_count(self):
1035 # do this assert in init? unless we expect interface count to
1036 # change from one run to another run...
1037 assert self.port_count in {1, 2, 4}, \
1038 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1040 def capture_tsc_hz(self):
1041 self.tsc_hz = float(self.sut.hz())
1043 def line_rate_to_pps(self):
1044 # FIXME Don't hardcode 10Gb/s
1045 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1048 class ProxProfileHelper(object):
1050 __prox_profile_type__ = "Generic"
1052 PROX_CORE_GEN_MODE = "gen"
1053 PROX_CORE_LAT_MODE = "lat"
1056 def get_cls(cls, helper_type):
1057 """Return class of specified type."""
1059 return ProxProfileHelper
1061 for profile_helper_class in utils.itersubclasses(cls):
1062 if helper_type == profile_helper_class.__prox_profile_type__:
1063 return profile_helper_class
1065 return ProxProfileHelper
1068 def make_profile_helper(cls, resource_helper):
1069 return cls.get_cls(resource_helper.test_type)(resource_helper)
1071 def __init__(self, resource_helper):
1072 super(ProxProfileHelper, self).__init__()
1073 self.resource_helper = resource_helper
1074 self._cpu_topology = None
1075 self._test_cores = None
1076 self._latency_cores = None
1079 def cpu_topology(self):
1080 if not self._cpu_topology:
1081 stdout = io.BytesIO()
1082 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1083 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1084 return self._cpu_topology
1087 def test_cores(self):
1088 if not self._test_cores:
1089 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1090 return self._test_cores
1093 def latency_cores(self):
1094 if not self._latency_cores:
1095 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1096 return self._latency_cores
1099 def traffic_context(self, pkt_size, value):
1101 self.sut.reset_stats()
1103 self.sut.set_pkt_size(self.test_cores, pkt_size)
1104 self.sut.set_speed(self.test_cores, value)
1105 self.sut.start_all()
1110 def get_cores(self, mode):
1113 for section_name, section in self.setup_helper.prox_config_data:
1114 if not section_name.startswith("core"):
1117 for key, value in section:
1118 if key == "mode" and value == mode:
1119 core_tuple = CoreSocketTuple(section_name)
1120 core = core_tuple.find_in_topology(self.cpu_topology)
1125 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1126 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1128 with data_helper, self.traffic_context(pkt_size, value):
1129 with data_helper.measure_tot_stats():
1130 time.sleep(duration)
1131 # Getting statistics to calculate PPS at right speed....
1132 data_helper.capture_tsc_hz()
1133 data_helper.latency = self.get_latency()
1135 return data_helper.result_tuple, data_helper.samples
1137 def get_latency(self):
1139 :return: return lat_min, lat_max, lat_avg
1142 if self._latency_cores:
1143 return self.sut.lat_stats(self._latency_cores)
1146 def terminate(self):
1149 def __getattr__(self, item):
1150 return getattr(self.resource_helper, item)
1153 class ProxMplsProfileHelper(ProxProfileHelper):
1155 __prox_profile_type__ = "MPLS tag/untag"
1157 def __init__(self, resource_helper):
1158 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1159 self._cores_tuple = None
1162 def mpls_cores(self):
1163 if not self._cores_tuple:
1164 self._cores_tuple = self.get_cores_mpls()
1165 return self._cores_tuple
1168 def tagged_cores(self):
1169 return self.mpls_cores[0]
1172 def plain_cores(self):
1173 return self.mpls_cores[1]
1175 def get_cores_mpls(self):
1178 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1179 if not section_name.startswith("core"):
1182 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1185 for item_key, item_value in section:
1186 if item_key != 'name':
1189 if item_value.startswith("tag"):
1190 core_tuple = CoreSocketTuple(section_name)
1191 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1192 cores_tagged.append(core_tag)
1194 elif item_value.startswith("udp"):
1195 core_tuple = CoreSocketTuple(section_name)
1196 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1197 cores_plain.append(core_udp)
1199 return cores_tagged, cores_plain
1202 def traffic_context(self, pkt_size, value):
1204 self.sut.reset_stats()
1206 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1207 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1208 self.sut.set_speed(self.tagged_cores, value)
1209 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1210 self.sut.set_speed(self.plain_cores, value * ratio)
1211 self.sut.start_all()
1217 class ProxBngProfileHelper(ProxProfileHelper):
1219 __prox_profile_type__ = "BNG gen"
1221 def __init__(self, resource_helper):
1222 super(ProxBngProfileHelper, self).__init__(resource_helper)
1223 self._cores_tuple = None
1226 def bng_cores(self):
1227 if not self._cores_tuple:
1228 self._cores_tuple = self.get_cores_gen_bng_qos()
1229 return self._cores_tuple
1232 def cpe_cores(self):
1233 return self.bng_cores[0]
1236 def inet_cores(self):
1237 return self.bng_cores[1]
1240 def arp_cores(self):
1241 return self.bng_cores[2]
1244 def arp_task_cores(self):
1245 return self.bng_cores[3]
1248 def all_rx_cores(self):
1249 return self.latency_cores
1251 def get_cores_gen_bng_qos(self):
1255 arp_tasks_core = [0]
1256 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1257 if not section_name.startswith("core"):
1260 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1263 for item_key, item_value in section:
1264 if item_key == "name" and item_value.startswith("cpe"):
1265 core_tuple = CoreSocketTuple(section_name)
1266 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1267 cpe_cores.append(core_tag)
1269 elif item_key == "name" and item_value.startswith("inet"):
1270 core_tuple = CoreSocketTuple(section_name)
1271 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1272 inet_cores.append(inet_core)
1274 elif item_key == "name" and item_value.startswith("arp"):
1275 core_tuple = CoreSocketTuple(section_name)
1276 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1277 arp_cores.append(arp_core)
1279 # We check the tasks/core separately
1280 if item_key == "name" and item_value.startswith("arp_task"):
1281 core_tuple = CoreSocketTuple(section_name)
1282 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1283 arp_tasks_core.append(arp_task_core)
1285 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1288 def traffic_context(self, pkt_size, value):
1289 # Tester is sending packets at the required speed already after
1290 # setup_test(). Just get the current statistics, sleep the required
1291 # amount of time and calculate packet loss.
1292 inet_pkt_size = pkt_size
1293 cpe_pkt_size = pkt_size - 24
1294 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1296 curr_up_speed = curr_down_speed = 0
1297 max_up_speed = max_down_speed = value
1299 max_down_speed = value * ratio
1301 max_up_speed = value / ratio
1307 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1309 self.sut.start(self.all_rx_cores)
1311 self.sut.stop(self.all_rx_cores)
1313 self.sut.reset_stats()
1315 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1316 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1318 self.sut.reset_values(self.cpe_cores)
1319 self.sut.reset_values(self.inet_cores)
1321 # Set correct IP and UDP lengths in packet headers
1323 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1324 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1325 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1326 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1329 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1330 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1331 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1332 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1333 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1334 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1336 # Sending ARP to initialize tables - need a few seconds of generation
1337 # to make sure all CPEs are initialized
1338 LOG.info("Initializing SUT: sending ARP packets")
1339 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1340 self.sut.set_speed(self.inet_cores, curr_up_speed)
1341 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1342 self.sut.start(self.arp_cores)
1345 # Ramp up the transmission speed. First go to the common speed, then
1346 # increase steps for the faster one.
1347 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1349 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1351 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1352 # The min(..., ...) takes care of 1) floating point rounding errors
1353 # that could make curr_*_speed to be slightly greater than
1354 # max_*_speed and 2) max_*_speed not being an exact multiple of
1356 if curr_up_speed < max_up_speed:
1357 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1358 if curr_down_speed < max_down_speed:
1359 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1361 self.sut.set_speed(self.inet_cores, curr_up_speed)
1362 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1363 time.sleep(self.step_time)
1365 LOG.info("Target speeds reached. Starting real test.")
1369 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1370 LOG.info("Test ended. Flushing NIC buffers")
1371 self.sut.start(self.all_rx_cores)
1373 self.sut.stop(self.all_rx_cores)
1375 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1376 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1378 with data_helper, self.traffic_context(pkt_size, value):
1379 with data_helper.measure_tot_stats():
1380 time.sleep(duration)
1381 # Getting statistics to calculate PPS at right speed....
1382 data_helper.capture_tsc_hz()
1383 data_helper.latency = self.get_latency()
1385 return data_helper.result_tuple, data_helper.samples