1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
25 from collections import OrderedDict, namedtuple
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common import utils
37 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
38 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
40 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
55 CONFIGURATION_OPTIONS = (
56 # dict key section key default value
57 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58 ('testDuration', 'general', 'test_duration', 5.0),
59 ('testPrecision', 'general', 'test_precision', 1.0),
60 ('tests', 'general', 'tests', None),
61 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
63 ('logFile', 'logging', 'file', 'dats.log'),
64 ('logDateFormat', 'logging', 'datefmt', None),
65 ('logLevel', 'logging', 'level', 'INFO'),
66 ('logOverwrite', 'logging', 'overwrite', 1),
68 ('testerIp', 'tester', 'ip', None),
69 ('testerUser', 'tester', 'user', 'root'),
70 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73 ('testerSocketId', 'tester', 'socket_id', 0),
75 ('sutIp', 'sut', 'ip', None),
76 ('sutUser', 'sut', 'user', 'root'),
77 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80 ('sutSocketId', 'sut', 'socket_id', 0),
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87 def __new__(cls, *args):
89 matches = cls.CORE_RE.search(str(args[0]))
91 args = matches.groups()
93 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94 'h' if args[2] else '')
96 except (AttributeError, TypeError, IndexError, ValueError):
97 raise ValueError('Invalid core spec {}'.format(args))
99 def is_hyperthread(self):
100 return self.hyperthread == 'h'
104 return int(self.is_hyperthread())
106 def find_in_topology(self, cpu_topology):
108 socket_core_match = cpu_topology[self.socket_id][self.core_id]
109 sorted_match = sorted(socket_core_match.values())
110 return sorted_match[self.index][0]
111 except (KeyError, IndexError):
112 template = "Core {}{} on socket {} does not exist"
113 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117 def __new__(cls, *args):
119 assert args[0] is not str(args[0])
120 args = tuple(args[0])
121 except (AssertionError, IndexError, TypeError):
124 return super(TotStatsTuple, cls).__new__(cls, *args)
127 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
128 'delta_tx,delta_tsc,'
129 'latency,rx_total,tx_total,pps')):
133 return 1e2 * self.drop_total / float(self.tx_total)
134 except ZeroDivisionError:
139 # calculate the effective throughput in Mpps
140 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143 def can_be_lost(self):
144 return int(self.tx_total * self.tolerated / 1e2)
147 def drop_total(self):
148 return self.tx_total - self.rx_total
152 return self.drop_total <= self.can_be_lost
154 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
156 pkt_loss = self.pkt_loss
158 if port_samples is None:
168 "Throughput": self.mpps,
169 "DropPackets": pkt_loss,
170 "CurrentDropPackets": pkt_loss,
171 "TxThroughput": self.pps / 1e6,
172 "RxThroughput": self.mpps,
176 samples.update(port_samples)
178 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181 def log_data(self, logger=None):
185 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
186 logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
187 logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190 class PacketDump(object):
192 def assert_func(func, value1, value2, template=None):
193 assert func(value1, value2), template.format(value1, value2)
195 def __init__(self, port_id, data_len, payload):
196 template = "Packet dump has specified length {}, but payload is {} bytes long"
197 self.assert_func(operator.eq, data_len, len(payload), template)
198 self._port_id = port_id
199 self._data_len = data_len
200 self._payload = payload
204 """Get the port id of the packet dump"""
209 """Get the length of the data received"""
210 return self._data_len
213 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
215 def payload(self, start=None, end=None):
216 """Get part of the payload as a list of ordinals.
218 Returns a list of byte values, matching the contents of the packet dump.
219 Optional start and end parameters can be specified to retrieve only a
220 part of the packet contents.
222 The number of elements in the list is equal to end - start + 1, so end
223 is the offset of the last character.
226 start (pos. int): the starting offset in the payload. If it is not
227 specified or None, offset 0 is assumed.
228 end (pos. int): the ending offset of the payload. If it is not
229 specified or None, the contents until the end of the packet are
233 [int, int, ...]. Each int represents the ordinal value of a byte in
240 end = self.data_len - 1
242 # Bounds checking on offsets
243 template = "Start offset must be non-negative"
244 self.assert_func(operator.ge, start, 0, template)
246 template = "End offset must be less than {1}"
247 self.assert_func(operator.lt, end, self.data_len, template)
249 # Adjust for splice operation: end offset must be 1 more than the offset
250 # of the last desired character.
253 return self._payload[start:end]
256 class ProxSocketHelper(object):
258 def __init__(self, sock=None):
259 """ creates new prox instance """
260 super(ProxSocketHelper, self).__init__()
263 sock = socket.socket()
267 self.master_stats = None
269 def connect(self, ip, port):
270 """Connect to the prox instance on the remote system"""
271 self._sock.connect((ip, port))
273 def get_socket(self):
274 """ get the socket connected to the remote instance """
277 def _parse_socket_data(self, decoded_data, pkt_dump_only):
278 def get_newline_index():
279 return decoded_data.find('\n', index)
283 for newline_index in iter(get_newline_index, -1):
284 ret_str = decoded_data[index:newline_index]
287 mode, port_id, data_len = ret_str.split(',', 2)
289 mode, port_id, data_len = None, None, None
291 if mode != 'pktdump':
292 # Regular 1-line message. Stop reading from the socket.
293 LOG.debug("Regular response read")
296 LOG.debug("Packet dump header read: [%s]", ret_str)
298 # The line is a packet dump header. Parse it, read the
299 # packet payload, store the dump for later retrieval.
300 # Skip over the packet dump and continue processing: a
301 # 1-line response may follow the packet dump.
303 data_len = int(data_len)
304 data_start = newline_index + 1 # + 1 to skip over \n
305 data_end = data_start + data_len
306 sub_data = decoded_data[data_start:data_end]
307 pkt_payload = array.array('B', (ord(v) for v in sub_data))
308 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
309 self._pkt_dumps.append(pkt_dump)
312 # Return boolean instead of string to signal
313 # successful reception of the packet dump.
314 LOG.debug("Packet dump stored, returning")
321 def get_data(self, pkt_dump_only=False, timeout=1):
322 """ read data from the socket """
324 # This method behaves slightly differently depending on whether it is
325 # called to read the response to a command (pkt_dump_only = 0) or if
326 # it is called specifically to read a packet dump (pkt_dump_only = 1).
328 # Packet dumps look like:
329 # pktdump,<port_id>,<data_len>\n
330 # <packet contents as byte array>\n
331 # This means the total packet dump message consists of 2 lines instead
334 # - Response for a command (pkt_dump_only = 0):
335 # 1) Read response from the socket until \n (end of message)
336 # 2a) If the response is a packet dump header (starts with "pktdump,"):
337 # - Read the packet payload and store the packet dump for later
339 # - Reset the state and restart from 1). Eventually state 2b) will
340 # be reached and the function will return.
341 # 2b) If the response is not a packet dump:
342 # - Return the received message as a string
344 # - Explicit request to read a packet dump (pkt_dump_only = 1):
345 # - Read the dump header and payload
346 # - Store the packet dump for later retrieval
347 # - Return True to signify a packet dump was successfully read
350 # recv() is blocking, so avoid calling it when no data is waiting.
351 ready = select.select([self._sock], [], [], timeout)
352 return bool(ready[0])
356 for status in iter(is_ready, False):
357 decoded_data = self._sock.recv(256).decode('utf-8')
358 ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360 LOG.debug("Received data from socket: [%s]", ret_str)
361 return ret_str if status else ''
363 def put_command(self, to_send):
364 """ send data to the remote instance """
365 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367 self._sock.sendall(to_send.encode('utf-8'))
371 def get_packet_dump(self):
372 """ get the next packet dump """
374 return self._pkt_dumps.pop(0)
377 def stop_all_reset(self):
378 """ stop the remote instance and reset stats """
379 LOG.debug("Stop all and reset stats")
384 """ stop all cores on the remote instance """
385 LOG.debug("Stop all")
386 self.put_command("stop all\n")
389 def stop(self, cores, task=''):
390 """ stop specific cores on the remote instance """
391 LOG.debug("Stopping cores %s", cores)
392 self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
396 """ start all cores on the remote instance """
397 LOG.debug("Start all")
398 self.put_command("start all\n")
400 def start(self, cores):
401 """ start specific cores on the remote instance """
402 LOG.debug("Starting cores %s", cores)
403 self.put_command("start {}\n".format(join_non_strings(',', cores)))
406 def reset_stats(self):
407 """ reset the statistics on the remote instance """
408 LOG.debug("Reset stats")
409 self.put_command("reset stats\n")
412 def _run_template_over_cores(self, template, cores, *args):
414 self.put_command(template.format(core, *args))
416 def set_pkt_size(self, cores, pkt_size):
417 """ set the packet size to generate on the remote instance """
418 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
423 def set_value(self, cores, offset, value, length):
424 """ set value on the remote instance """
425 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
426 LOG.debug(msg, cores, value, length, offset)
427 template = "set value {} 0 {} {} {}\n"
428 self._run_template_over_cores(template, cores, offset, value, length)
430 def reset_values(self, cores):
431 """ reset values on the remote instance """
432 LOG.debug("Set value for core(s) %s", cores)
433 self._run_template_over_cores("reset values {} 0\n", cores)
435 def set_speed(self, cores, speed, tasks=None):
436 """ set speed on the remote instance """
438 tasks = [0] * len(cores)
439 elif len(tasks) != len(cores):
440 LOG.error("set_speed: cores and tasks must have the same len")
441 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
442 for (core, task) in list(zip(cores, tasks)):
443 self.put_command("speed {} {} {}\n".format(core, task, speed))
445 def slope_speed(self, cores_speed, duration, n_steps=0):
446 """will start to increase speed from 0 to N where N is taken from
447 a['speed'] for each a in cores_speed"""
448 # by default, each step will take 0.5 sec
450 n_steps = duration * 2
452 private_core_data = []
453 step_duration = float(duration) / n_steps
454 for core_data in cores_speed:
455 target = float(core_data['speed'])
456 private_core_data.append({
457 'cores': core_data['cores'],
459 'delta': target / n_steps,
464 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
465 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
466 time.sleep(step_duration)
467 for core_data in private_core_data:
468 core_data['current'] = core_data[key1] + core_data[key2]
469 self.set_speed(core_data['cores'], core_data['current'])
471 def set_pps(self, cores, pps, pkt_size):
472 """ set packets per second for specific cores on the remote instance """
473 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
474 LOG.debug(msg, cores, pps, pkt_size)
476 # speed in percent of line-rate
477 speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
478 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
480 def lat_stats(self, cores, task=0):
481 """Get the latency statistics from the remote system"""
482 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
487 self.put_command("lat stats {} {} \n".format(core, task))
488 ret = self.get_data()
491 lat_min[core], lat_max[core], lat_avg[core] = \
492 tuple(int(n) for n in ret.split(",")[:3])
494 except (AttributeError, ValueError, TypeError):
497 return lat_min, lat_max, lat_avg
499 def get_all_tot_stats(self):
500 self.put_command("tot stats\n")
501 all_stats_str = self.get_data().split(",")
502 if len(all_stats_str) != 4:
505 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
506 self.master_stats = all_stats
510 return self.get_all_tot_stats()[3]
514 def rx_stats(self, cores, task=0):
515 return self.core_stats(cores, task)
517 def core_stats(self, cores, task=0):
518 """Get the receive statistics from the remote system"""
519 rx = tx = drop = tsc = 0
521 self.put_command("core stats {} {}\n".format(core, task))
522 ret = self.get_data().split(",")
527 return rx, tx, drop, tsc
529 def port_stats(self, ports):
530 """get counter values from a specific port"""
531 tot_result = [0] * 12
533 self.put_command("port_stats {}\n".format(port))
534 ret = [try_int(s, 0) for s in self.get_data().split(",")]
535 tot_result = [sum(x) for x in zip(tot_result, ret)]
539 def measure_tot_stats(self):
540 start = self.get_all_tot_stats()
541 container = {'start_tot': start}
545 container['end_tot'] = end = self.get_all_tot_stats()
547 container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
550 """Get the total statistics from the remote system"""
551 stats = self.get_all_tot_stats()
554 def tot_ierrors(self):
555 """Get the total ierrors from the remote system"""
556 self.put_command("tot ierrors tot\n")
557 recv = self.get_data().split(',')
558 tot_ierrors = int(recv[0])
560 return tot_ierrors, tsc
562 def set_count(self, count, cores):
563 """Set the number of packets to send on the specified core"""
564 self._run_template_over_cores("count {} 0 {}\n", cores, count)
566 def dump_rx(self, core_id, task_id=0, count=1):
567 """Activate dump on rx on the specified core"""
568 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
569 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
570 time.sleep(1.5) # Give PROX time to set up packet dumping
578 """ stop all cores on the remote instance """
579 LOG.debug("Quit prox")
580 self.put_command("quit\n")
583 def force_quit(self):
584 """ stop all cores on the remote instance """
585 LOG.debug("Force Quit prox")
586 self.put_command("quit_force\n")
590 _LOCAL_OBJECT = object()
593 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
594 # the actual app is lowercase
597 LUA_PARAMETER_NAME = ""
598 LUA_PARAMETER_PEER = {
603 CONFIG_QUEUE_TIMEOUT = 120
605 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
606 self.remote_path = None
607 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
608 self.remote_prox_file_name = None
609 self._prox_config_data = None
610 self.additional_files = {}
611 self.config_queue = Queue()
612 self._global_section = None
615 def prox_config_data(self):
616 if self._prox_config_data is None:
617 # this will block, but it needs too
618 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
619 return self._prox_config_data
622 def global_section(self):
623 if self._global_section is None and self.prox_config_data:
624 self._global_section = self.find_section("global")
625 return self._global_section
627 def find_section(self, name, default=_LOCAL_OBJECT):
628 result = next((value for key, value in self.prox_config_data if key == name), default)
629 if result is _LOCAL_OBJECT:
630 raise KeyError('{} not found in Prox config'.format(name))
633 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
634 section = self.find_section(section_name, [])
635 result = next((value for key, value in section if key == section_key), default)
636 if result is _LOCAL_OBJECT:
637 template = '{} not found in {} section of Prox config'
638 raise KeyError(template.format(section_key, section_name))
641 def _build_pipeline_kwargs(self):
642 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
643 self.pipeline_kwargs = {
644 'tool_path': tool_path,
645 'tool_dir': os.path.dirname(tool_path),
648 def copy_to_target(self, config_file_path, prox_file):
649 remote_path = os.path.join("/tmp", prox_file)
650 self.ssh_helper.put(config_file_path, remote_path)
654 def _get_tx_port(section, sections):
656 for item in sections[section]:
657 if item[0] == "tx port":
658 iface_port = re.findall(r'\d+', item[1])
659 # do we want the last one?
660 # if yes, then can we reverse?
661 return int(iface_port[0])
664 def _replace_quoted_with_value(quoted, value, count=1):
665 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
668 def _insert_additional_file(self, value):
669 file_str = value.split('"')
670 base_name = os.path.basename(file_str[1])
671 file_str[1] = self.additional_files[base_name]
672 return '"'.join(file_str)
674 def generate_prox_config_file(self, config_path):
676 prox_config = ConfigParser(config_path, sections)
679 # Ensure MAC is set "hardware"
680 all_ports = self.vnfd_helper.port_pairs.all_ports
681 # use dpdk port number
682 for port_name in all_ports:
683 port_num = self.vnfd_helper.port_num(port_name)
684 port_section_name = "port {}".format(port_num)
685 for section_name, section in sections:
686 if port_section_name != section_name:
689 for index, section_data in enumerate(section):
690 if section_data[0] == "mac":
691 section_data[1] = "hardware"
694 for _, section in sections:
695 # for index, (item_key, item_val) in enumerate(section):
696 for index, section_data in enumerate(section):
697 item_key, item_val = section_data
698 if item_val.startswith("@@dst_mac"):
699 tx_port_iter = re.finditer(r'\d+', item_val)
700 tx_port_no = int(next(tx_port_iter).group(0))
701 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
702 mac = intf["virtual-interface"]["dst_mac"]
703 section_data[1] = mac.replace(":", " ", 6)
705 if item_key == "dst mac" and item_val.startswith("@@"):
706 tx_port_iter = re.finditer(r'\d+', item_val)
707 tx_port_no = int(next(tx_port_iter).group(0))
708 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
709 mac = intf["virtual-interface"]["dst_mac"]
710 section_data[1] = mac
712 # if addition file specified in prox config
713 if not self.additional_files:
716 for section_name, section in sections:
717 for index, section_data in enumerate(section):
719 if section_data[0].startswith("dofile"):
720 section_data[0] = self._insert_additional_file(section_data[0])
722 if section_data[1].startswith("dofile"):
723 section_data[1] = self._insert_additional_file(section_data[1])
730 def write_prox_config(prox_config):
732 Write an .ini-format config file for PROX
733 PROX does not allow a space before/after the =, so we need
737 for i, (section_name, section) in enumerate(prox_config):
738 out.append("[{}]".format(section_name))
739 for index, item in enumerate(section):
741 if key == "__name__":
743 if value is not None and value != '@':
744 key = "=".join((key, str(value).replace('\n', '\n\t')))
747 key = str(key).replace('\n', '\n\t')
749 return os.linesep.join(out)
751 def put_string_to_file(self, s, remote_path):
752 file_obj = cStringIO(s)
753 self.ssh_helper.put_file_obj(file_obj, remote_path)
756 def generate_prox_lua_file(self):
758 all_ports = self.vnfd_helper.port_pairs.all_ports
759 lua_param = self.LUA_PARAMETER_NAME
760 for port_name in all_ports:
761 peer = self.LUA_PARAMETER_PEER[lua_param]
762 port_num = self.vnfd_helper.port_num(port_name)
763 intf = self.vnfd_helper.find_interface(name=port_name)
764 vintf = intf['virtual-interface']
765 local_ip = vintf["local_ip"]
766 dst_ip = vintf["dst_ip"]
767 local_ip_hex = ip_to_hex(local_ip, separator=' ')
768 dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
770 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
771 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
772 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
773 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
775 lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
778 def upload_prox_lua(self, config_dir, prox_config_data):
779 # we could have multiple lua directives
780 lau_dict = prox_config_data.get('lua', {})
781 find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
782 lua_file = next((found[0] for found in find_iter if found), None)
786 out = self.generate_prox_lua_file()
787 remote_path = os.path.join(config_dir, lua_file)
788 return self.put_string_to_file(out, remote_path)
790 def upload_prox_config(self, config_file, prox_config_data):
791 # prox can't handle spaces around ' = ' so use custom method
792 out = StringIO(self.write_prox_config(prox_config_data))
794 remote_path = os.path.join("/tmp", config_file)
795 self.ssh_helper.put_file_obj(out, remote_path)
799 def build_config_file(self):
800 task_path = self.scenario_helper.task_path
801 options = self.scenario_helper.options
802 config_path = options['prox_config']
803 config_file = os.path.basename(config_path)
804 config_path = find_relative_file(config_path, task_path)
805 self.additional_files = {}
807 prox_files = options.get('prox_files', [])
808 if isinstance(prox_files, six.string_types):
809 prox_files = [prox_files]
810 for key_prox_file in prox_files:
811 base_prox_file = os.path.basename(key_prox_file)
812 key_prox_path = find_relative_file(key_prox_file, task_path)
813 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
814 self.additional_files[base_prox_file] = remote_prox_file
816 self._prox_config_data = self.generate_prox_config_file(config_path)
817 # copy config to queue so we can read it from traffic_runner process
818 self.config_queue.put(self._prox_config_data)
819 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
821 def build_config(self):
822 self.build_config_file()
824 options = self.scenario_helper.options
826 prox_args = options['prox_args']
827 LOG.info("Provision and start the %s", self.APP_NAME)
828 self._build_pipeline_kwargs()
829 self.pipeline_kwargs["args"] = " ".join(
830 " ".join([k, v if v else ""]) for k, v in prox_args.items())
831 self.pipeline_kwargs["cfg_file"] = self.remote_path
833 cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
834 prox_cmd = cmd_template.format(**self.pipeline_kwargs)
838 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
839 class ProxResourceHelper(ClientResourceHelper):
841 RESOURCE_WORD = 'prox'
848 def find_pci(pci, bound_pci):
849 # we have to substring match PCI bus address from the end
850 return any(b.endswith(pci) for b in bound_pci)
852 def __init__(self, setup_helper):
853 super(ProxResourceHelper, self).__init__(setup_helper)
854 self.mgmt_interface = self.vnfd_helper.mgmt_interface
855 self._user = self.mgmt_interface["user"]
856 self._ip = self.mgmt_interface["ip"]
859 self._vpci_to_if_name_map = None
860 self.additional_file = {}
861 self.remote_prox_file_name = None
866 self._test_type = None
871 self.client = self._connect()
876 if self._test_type is None:
877 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
878 return self._test_type
880 def run_traffic(self, traffic_profile):
881 self._queue.cancel_join_thread()
885 traffic_profile.init(self._queue)
886 # this frees up the run_traffic loop
887 self.client_started.value = 1
889 while not self._terminated.value:
890 # move it all to traffic_profile
891 self._run_traffic_once(traffic_profile)
893 def _run_traffic_once(self, traffic_profile):
894 traffic_profile.execute_traffic(self)
895 if traffic_profile.done:
896 self._queue.put({'done': True})
897 LOG.debug("tg_prox done")
898 self._terminated.value = 1
900 # For VNF use ResourceHelper method to collect KPIs directly.
901 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
902 def collect_collectd_kpi(self):
903 return self._collect_resource_kpi()
905 def collect_kpi(self):
906 result = super(ProxResourceHelper, self).collect_kpi()
907 # add in collectd kpis manually
909 result['collect_stats'] = self._collect_resource_kpi()
913 # should not be called, use VNF terminate
914 raise NotImplementedError()
917 return self.sut # force connection
919 def execute(self, cmd, *args, **kwargs):
920 func = getattr(self.sut, cmd, None)
922 return func(*args, **kwargs)
924 def _connect(self, client=None):
925 """Run and connect to prox on the remote system """
926 # De-allocating a large amount of hugepages takes some time. If a new
927 # PROX instance is started immediately after killing the previous one,
928 # it might not be able to allocate hugepages, because they are still
929 # being freed. Hence the -w switch.
930 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
931 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
932 # -f ./handle_none-4.cfg"
933 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
935 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
936 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
938 # + "./build/Prox " + prox_args
939 # log.debug("Starting PROX with command [%s]", prox_cmd)
940 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
941 # self._ip, prox_cmd))
943 client = ProxSocketHelper()
945 # try connecting to Prox for 60s
946 for _ in range(RETRY_SECONDS):
947 time.sleep(RETRY_INTERVAL)
949 client.connect(self._ip, PROX_PORT)
950 except (socket.gaierror, socket.error):
955 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
956 raise Exception(msg.format(self._ip, PROX_PORT))
959 class ProxDataHelper(object):
961 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss):
962 super(ProxDataHelper, self).__init__()
963 self.vnfd_helper = vnfd_helper
965 self.pkt_size = pkt_size
967 self.tolerated_loss = tolerated_loss
968 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
970 self.measured_stats = None
972 self._totals_and_pps = None
973 self.result_tuple = None
976 def totals_and_pps(self):
977 if self._totals_and_pps is None:
978 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
979 pps = self.value / 100.0 * self.line_rate_to_pps()
980 self._totals_and_pps = rx_total, tx_total, pps
981 return self._totals_and_pps
985 return self.totals_and_pps[0]
989 return self.totals_and_pps[1]
993 return self.totals_and_pps[2]
998 for port_name, port_num in self.vnfd_helper.ports_iter():
999 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1000 samples[port_name] = {
1001 "in_packets": port_rx_total,
1002 "out_packets": port_tx_total,
1006 def __enter__(self):
1007 self.check_interface_count()
1010 def __exit__(self, exc_type, exc_val, exc_tb):
1013 def make_tuple(self):
1014 if self.result_tuple:
1017 self.result_tuple = ProxTestDataTuple(
1018 self.tolerated_loss,
1020 self.measured_stats['delta'].rx,
1021 self.measured_stats['delta'].tx,
1022 self.measured_stats['delta'].tsc,
1028 self.result_tuple.log_data()
1031 def measure_tot_stats(self):
1032 with self.sut.measure_tot_stats() as self.measured_stats:
1035 def check_interface_count(self):
1036 # do this assert in init? unless we expect interface count to
1037 # change from one run to another run...
1038 assert self.port_count in {1, 2, 4}, \
1039 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1041 def capture_tsc_hz(self):
1042 self.tsc_hz = float(self.sut.hz())
1044 def line_rate_to_pps(self):
1045 # FIXME Don't hardcode 10Gb/s
1046 return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20)
1049 class ProxProfileHelper(object):
1051 __prox_profile_type__ = "Generic"
1053 PROX_CORE_GEN_MODE = "gen"
1054 PROX_CORE_LAT_MODE = "lat"
1057 def get_cls(cls, helper_type):
1058 """Return class of specified type."""
1060 return ProxProfileHelper
1062 for profile_helper_class in utils.itersubclasses(cls):
1063 if helper_type == profile_helper_class.__prox_profile_type__:
1064 return profile_helper_class
1066 return ProxProfileHelper
1069 def make_profile_helper(cls, resource_helper):
1070 return cls.get_cls(resource_helper.test_type)(resource_helper)
1072 def __init__(self, resource_helper):
1073 super(ProxProfileHelper, self).__init__()
1074 self.resource_helper = resource_helper
1075 self._cpu_topology = None
1076 self._test_cores = None
1077 self._latency_cores = None
1080 def cpu_topology(self):
1081 if not self._cpu_topology:
1082 stdout = io.BytesIO()
1083 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1084 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1085 return self._cpu_topology
1088 def test_cores(self):
1089 if not self._test_cores:
1090 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1091 return self._test_cores
1094 def latency_cores(self):
1095 if not self._latency_cores:
1096 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1097 return self._latency_cores
1100 def traffic_context(self, pkt_size, value):
1102 self.sut.reset_stats()
1104 self.sut.set_pkt_size(self.test_cores, pkt_size)
1105 self.sut.set_speed(self.test_cores, value)
1106 self.sut.start_all()
1111 def get_cores(self, mode):
1114 for section_name, section in self.setup_helper.prox_config_data:
1115 if not section_name.startswith("core"):
1118 for key, value in section:
1119 if key == "mode" and value == mode:
1120 core_tuple = CoreSocketTuple(section_name)
1121 core = core_tuple.find_in_topology(self.cpu_topology)
1126 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1127 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1129 with data_helper, self.traffic_context(pkt_size, value):
1130 with data_helper.measure_tot_stats():
1131 time.sleep(duration)
1132 # Getting statistics to calculate PPS at right speed....
1133 data_helper.capture_tsc_hz()
1134 data_helper.latency = self.get_latency()
1136 return data_helper.result_tuple, data_helper.samples
1138 def get_latency(self):
1140 :return: return lat_min, lat_max, lat_avg
1143 if self._latency_cores:
1144 return self.sut.lat_stats(self._latency_cores)
1147 def terminate(self):
1150 def __getattr__(self, item):
1151 return getattr(self.resource_helper, item)
1154 class ProxMplsProfileHelper(ProxProfileHelper):
1156 __prox_profile_type__ = "MPLS tag/untag"
1158 def __init__(self, resource_helper):
1159 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1160 self._cores_tuple = None
1163 def mpls_cores(self):
1164 if not self._cores_tuple:
1165 self._cores_tuple = self.get_cores_mpls()
1166 return self._cores_tuple
1169 def tagged_cores(self):
1170 return self.mpls_cores[0]
1173 def plain_cores(self):
1174 return self.mpls_cores[1]
1176 def get_cores_mpls(self):
1179 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1180 if not section_name.startswith("core"):
1183 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1186 for item_key, item_value in section:
1187 if item_key != 'name':
1190 if item_value.startswith("tag"):
1191 core_tuple = CoreSocketTuple(section_name)
1192 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1193 cores_tagged.append(core_tag)
1195 elif item_value.startswith("udp"):
1196 core_tuple = CoreSocketTuple(section_name)
1197 core_udp = core_tuple.find_in_topology(self.cpu_topology)
1198 cores_plain.append(core_udp)
1200 return cores_tagged, cores_plain
1203 def traffic_context(self, pkt_size, value):
1205 self.sut.reset_stats()
1207 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1208 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1209 self.sut.set_speed(self.tagged_cores, value)
1210 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1211 self.sut.set_speed(self.plain_cores, value * ratio)
1212 self.sut.start_all()
1218 class ProxBngProfileHelper(ProxProfileHelper):
1220 __prox_profile_type__ = "BNG gen"
1222 def __init__(self, resource_helper):
1223 super(ProxBngProfileHelper, self).__init__(resource_helper)
1224 self._cores_tuple = None
1227 def bng_cores(self):
1228 if not self._cores_tuple:
1229 self._cores_tuple = self.get_cores_gen_bng_qos()
1230 return self._cores_tuple
1233 def cpe_cores(self):
1234 return self.bng_cores[0]
1237 def inet_cores(self):
1238 return self.bng_cores[1]
1241 def arp_cores(self):
1242 return self.bng_cores[2]
1245 def arp_task_cores(self):
1246 return self.bng_cores[3]
1249 def all_rx_cores(self):
1250 return self.latency_cores
1252 def get_cores_gen_bng_qos(self):
1256 arp_tasks_core = [0]
1257 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1258 if not section_name.startswith("core"):
1261 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1264 for item_key, item_value in section:
1265 if item_key == "name" and item_value.startswith("cpe"):
1266 core_tuple = CoreSocketTuple(section_name)
1267 core_tag = core_tuple.find_in_topology(self.cpu_topology)
1268 cpe_cores.append(core_tag)
1270 elif item_key == "name" and item_value.startswith("inet"):
1271 core_tuple = CoreSocketTuple(section_name)
1272 inet_core = core_tuple.find_in_topology(self.cpu_topology)
1273 inet_cores.append(inet_core)
1275 elif item_key == "name" and item_value.startswith("arp"):
1276 core_tuple = CoreSocketTuple(section_name)
1277 arp_core = core_tuple.find_in_topology(self.cpu_topology)
1278 arp_cores.append(arp_core)
1280 # We check the tasks/core separately
1281 if item_key == "name" and item_value.startswith("arp_task"):
1282 core_tuple = CoreSocketTuple(section_name)
1283 arp_task_core = core_tuple.find_in_topology(self.cpu_topology)
1284 arp_tasks_core.append(arp_task_core)
1286 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1289 def traffic_context(self, pkt_size, value):
1290 # Tester is sending packets at the required speed already after
1291 # setup_test(). Just get the current statistics, sleep the required
1292 # amount of time and calculate packet loss.
1293 inet_pkt_size = pkt_size
1294 cpe_pkt_size = pkt_size - 24
1295 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1297 curr_up_speed = curr_down_speed = 0
1298 max_up_speed = max_down_speed = value
1300 max_down_speed = value * ratio
1302 max_up_speed = value / ratio
1308 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1310 self.sut.start(self.all_rx_cores)
1312 self.sut.stop(self.all_rx_cores)
1314 self.sut.reset_stats()
1316 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1317 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1319 self.sut.reset_values(self.cpe_cores)
1320 self.sut.reset_values(self.inet_cores)
1322 # Set correct IP and UDP lengths in packet headers
1324 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1325 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1326 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1327 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1330 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1331 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1332 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1333 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1334 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1335 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1337 # Sending ARP to initialize tables - need a few seconds of generation
1338 # to make sure all CPEs are initialized
1339 LOG.info("Initializing SUT: sending ARP packets")
1340 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1341 self.sut.set_speed(self.inet_cores, curr_up_speed)
1342 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1343 self.sut.start(self.arp_cores)
1346 # Ramp up the transmission speed. First go to the common speed, then
1347 # increase steps for the faster one.
1348 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1350 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1352 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1353 # The min(..., ...) takes care of 1) floating point rounding errors
1354 # that could make curr_*_speed to be slightly greater than
1355 # max_*_speed and 2) max_*_speed not being an exact multiple of
1357 if curr_up_speed < max_up_speed:
1358 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1359 if curr_down_speed < max_down_speed:
1360 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1362 self.sut.set_speed(self.inet_cores, curr_up_speed)
1363 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1364 time.sleep(self.step_time)
1366 LOG.info("Target speeds reached. Starting real test.")
1370 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1371 LOG.info("Test ended. Flushing NIC buffers")
1372 self.sut.start(self.all_rx_cores)
1374 self.sut.stop(self.all_rx_cores)
1376 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
1377 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss)
1379 with data_helper, self.traffic_context(pkt_size, value):
1380 with data_helper.measure_tot_stats():
1381 time.sleep(duration)
1382 # Getting statistics to calculate PPS at right speed....
1383 data_helper.capture_tsc_hz()
1384 data_helper.latency = self.get_latency()
1386 return data_helper.result_tuple, data_helper.samples