1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from collections import OrderedDict, namedtuple
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27 from multiprocessing import Queue
30 from six.moves import cStringIO
31 from six.moves import zip, StringIO
33 from yardstick.common import utils
34 from yardstick.common.utils import SocketTopology, join_non_strings, try_int
35 from yardstick.network_services.helpers.iniparser import ConfigParser
36 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
38 from yardstick.network_services import constants
45 LOG = logging.getLogger(__name__)
46 LOG.setLevel(logging.DEBUG)
47 LOG_RESULT = logging.getLogger('yardstick')
48 LOG_RESULT.setLevel(logging.DEBUG)
54 CONFIGURATION_OPTIONS = (
55 # dict key section key default value
56 ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57 ('testDuration', 'general', 'test_duration', 5.0),
58 ('testPrecision', 'general', 'test_precision', 1.0),
59 ('tests', 'general', 'tests', None),
60 ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62 ('logFile', 'logging', 'file', 'dats.log'),
63 ('logDateFormat', 'logging', 'datefmt', None),
64 ('logLevel', 'logging', 'level', 'INFO'),
65 ('logOverwrite', 'logging', 'overwrite', 1),
67 ('testerIp', 'tester', 'ip', None),
68 ('testerUser', 'tester', 'user', 'root'),
69 ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70 ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71 ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72 ('testerSocketId', 'tester', 'socket_id', 0),
74 ('sutIp', 'sut', 'ip', None),
75 ('sutUser', 'sut', 'user', 'root'),
76 ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77 ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78 ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79 ('sutSocketId', 'sut', 'socket_id', 0),
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84 CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86 def __new__(cls, *args):
88 matches = cls.CORE_RE.search(str(args[0]))
90 args = matches.groups()
92 return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
93 'h' if args[2] else '')
95 except (AttributeError, TypeError, IndexError, ValueError):
96 raise ValueError('Invalid core spec {}'.format(args))
98 def is_hyperthread(self):
99 return self.hyperthread == 'h'
103 return int(self.is_hyperthread())
105 def find_in_topology(self, cpu_topology):
107 socket_core_match = cpu_topology[self.socket_id][self.core_id]
108 sorted_match = sorted(socket_core_match.values())
109 return sorted_match[self.index][0]
110 except (KeyError, IndexError):
111 template = "Core {}{} on socket {} does not exist"
112 raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
115 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
116 def __new__(cls, *args):
118 assert args[0] is not str(args[0])
119 args = tuple(args[0])
120 except (AssertionError, IndexError, TypeError):
123 return super(TotStatsTuple, cls).__new__(cls, *args)
126 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
127 'delta_tx,delta_tsc,'
128 'latency,rx_total,tx_total,'
133 return 1e2 * self.drop_total / float(self.tx_total)
134 except ZeroDivisionError:
139 # calculate the effective throughput in Mpps
140 return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
144 # calculate the effective throughput in Mpps
145 return float(self.delta_rx) * self.tsc_hz / self.delta_tsc / 1e6
148 def can_be_lost(self):
149 return int(self.tx_total * self.tolerated / 1e2)
152 def drop_total(self):
153 return self.tx_total - self.rx_total
157 return self.drop_total <= self.can_be_lost
159 def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
161 pkt_loss = self.pkt_loss
163 if port_samples is None:
173 "Throughput": self.rx_mpps,
174 "RxThroughput": self.rx_mpps,
175 "DropPackets": pkt_loss,
176 "CurrentDropPackets": pkt_loss,
177 "RequestedTxThroughput": self.requested_pps / 1e6,
178 "TxThroughput": self.tx_mpps,
182 samples.update(port_samples)
184 samples.update((key, value) for key, value in zip(latency_keys, self.latency))
187 def log_data(self, logger=None):
191 template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
192 logger.info(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
193 logger.info("Mpps configured: %f; Mpps generated %f; Mpps received %f",
194 self.requested_pps / 1e6, self.tx_mpps, self.rx_mpps)
197 class PacketDump(object):
199 def assert_func(func, value1, value2, template=None):
200 assert func(value1, value2), template.format(value1, value2)
202 def __init__(self, port_id, data_len, payload):
203 template = "Packet dump has specified length {}, but payload is {} bytes long"
204 self.assert_func(operator.eq, data_len, len(payload), template)
205 self._port_id = port_id
206 self._data_len = data_len
207 self._payload = payload
211 """Get the port id of the packet dump"""
216 """Get the length of the data received"""
217 return self._data_len
220 return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
222 def payload(self, start=None, end=None):
223 """Get part of the payload as a list of ordinals.
225 Returns a list of byte values, matching the contents of the packet dump.
226 Optional start and end parameters can be specified to retrieve only a
227 part of the packet contents.
229 The number of elements in the list is equal to end - start + 1, so end
230 is the offset of the last character.
233 start (pos. int): the starting offset in the payload. If it is not
234 specified or None, offset 0 is assumed.
235 end (pos. int): the ending offset of the payload. If it is not
236 specified or None, the contents until the end of the packet are
240 [int, int, ...]. Each int represents the ordinal value of a byte in
247 end = self.data_len - 1
249 # Bounds checking on offsets
250 template = "Start offset must be non-negative"
251 self.assert_func(operator.ge, start, 0, template)
253 template = "End offset must be less than {1}"
254 self.assert_func(operator.lt, end, self.data_len, template)
256 # Adjust for splice operation: end offset must be 1 more than the offset
257 # of the last desired character.
260 return self._payload[start:end]
263 class ProxSocketHelper(object):
265 def __init__(self, sock=None):
266 """ creates new prox instance """
267 super(ProxSocketHelper, self).__init__()
270 sock = socket.socket()
274 self.master_stats = None
276 def connect(self, ip, port):
277 """Connect to the prox instance on the remote system"""
278 self._sock.connect((ip, port))
280 def get_socket(self):
281 """ get the socket connected to the remote instance """
284 def _parse_socket_data(self, decoded_data, pkt_dump_only):
285 def get_newline_index():
286 return decoded_data.find('\n', index)
290 for newline_index in iter(get_newline_index, -1):
291 ret_str = decoded_data[index:newline_index]
294 mode, port_id, data_len = ret_str.split(',', 2)
296 mode, port_id, data_len = None, None, None
298 if mode != 'pktdump':
299 # Regular 1-line message. Stop reading from the socket.
300 LOG.debug("Regular response read")
303 LOG.debug("Packet dump header read: [%s]", ret_str)
305 # The line is a packet dump header. Parse it, read the
306 # packet payload, store the dump for later retrieval.
307 # Skip over the packet dump and continue processing: a
308 # 1-line response may follow the packet dump.
310 data_len = int(data_len)
311 data_start = newline_index + 1 # + 1 to skip over \n
312 data_end = data_start + data_len
313 sub_data = decoded_data[data_start:data_end]
314 pkt_payload = array.array('B', (ord(v) for v in sub_data))
315 pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
316 self._pkt_dumps.append(pkt_dump)
319 # Return boolean instead of string to signal
320 # successful reception of the packet dump.
321 LOG.debug("Packet dump stored, returning")
326 return ret_str, False
328 def get_data(self, pkt_dump_only=False, timeout=1):
329 """ read data from the socket """
331 # This method behaves slightly differently depending on whether it is
332 # called to read the response to a command (pkt_dump_only = 0) or if
333 # it is called specifically to read a packet dump (pkt_dump_only = 1).
335 # Packet dumps look like:
336 # pktdump,<port_id>,<data_len>\n
337 # <packet contents as byte array>\n
338 # This means the total packet dump message consists of 2 lines instead
341 # - Response for a command (pkt_dump_only = 0):
342 # 1) Read response from the socket until \n (end of message)
343 # 2a) If the response is a packet dump header (starts with "pktdump,"):
344 # - Read the packet payload and store the packet dump for later
346 # - Reset the state and restart from 1). Eventually state 2b) will
347 # be reached and the function will return.
348 # 2b) If the response is not a packet dump:
349 # - Return the received message as a string
351 # - Explicit request to read a packet dump (pkt_dump_only = 1):
352 # - Read the dump header and payload
353 # - Store the packet dump for later retrieval
354 # - Return True to signify a packet dump was successfully read
357 # recv() is blocking, so avoid calling it when no data is waiting.
358 ready = select.select([self._sock], [], [], timeout)
359 return bool(ready[0])
363 for status in iter(is_ready, False):
364 decoded_data = self._sock.recv(256).decode('utf-8')
365 ret_str, done = self._parse_socket_data(decoded_data, pkt_dump_only)
369 LOG.debug("Received data from socket: [%s]", ret_str)
370 return ret_str if status else ''
372 def put_command(self, to_send):
373 """ send data to the remote instance """
374 LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
376 # NOTE: sendall will block, we need a timeout
377 self._sock.sendall(to_send.encode('utf-8'))
378 except: # pylint: disable=bare-except
381 def get_packet_dump(self):
382 """ get the next packet dump """
384 return self._pkt_dumps.pop(0)
387 def stop_all_reset(self):
388 """ stop the remote instance and reset stats """
389 LOG.debug("Stop all and reset stats")
394 """ stop all cores on the remote instance """
395 LOG.debug("Stop all")
396 self.put_command("stop all\n")
399 def stop(self, cores, task=''):
400 """ stop specific cores on the remote instance """
404 if core not in tmpcores:
405 tmpcores.append(core)
407 LOG.debug("Stopping cores %s", tmpcores)
408 self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
412 """ start all cores on the remote instance """
413 LOG.debug("Start all")
414 self.put_command("start all\n")
416 def start(self, cores):
417 """ start specific cores on the remote instance """
421 if core not in tmpcores:
422 tmpcores.append(core)
424 LOG.debug("Starting cores %s", tmpcores)
425 self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
428 def reset_stats(self):
429 """ reset the statistics on the remote instance """
430 LOG.debug("Reset stats")
431 self.put_command("reset stats\n")
434 def _run_template_over_cores(self, template, cores, *args):
436 self.put_command(template.format(core, *args))
438 def set_pkt_size(self, cores, pkt_size):
439 """ set the packet size to generate on the remote instance """
440 LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
442 self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
445 def set_value(self, cores, offset, value, length):
446 """ set value on the remote instance """
447 msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
448 LOG.debug(msg, cores, value, length, offset)
449 template = "set value {} 0 {} {} {}\n"
450 self._run_template_over_cores(template, cores, offset, value, length)
452 def reset_values(self, cores):
453 """ reset values on the remote instance """
454 LOG.debug("Set value for core(s) %s", cores)
455 self._run_template_over_cores("reset values {} 0\n", cores)
457 def set_speed(self, cores, speed, tasks=None):
458 """ set speed on the remote instance """
460 tasks = [0] * len(cores)
461 elif len(tasks) != len(cores):
462 LOG.error("set_speed: cores and tasks must have the same len")
463 LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed)
464 for (core, task) in list(zip(cores, tasks)):
465 self.put_command("speed {} {} {}\n".format(core, task, speed))
467 def slope_speed(self, cores_speed, duration, n_steps=0):
468 """will start to increase speed from 0 to N where N is taken from
469 a['speed'] for each a in cores_speed"""
470 # by default, each step will take 0.5 sec
472 n_steps = duration * 2
474 private_core_data = []
475 step_duration = float(duration) / n_steps
476 for core_data in cores_speed:
477 target = float(core_data['speed'])
478 private_core_data.append({
479 'cores': core_data['cores'],
481 'delta': target / n_steps,
486 deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
487 for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
488 time.sleep(step_duration)
489 for core_data in private_core_data:
490 core_data['current'] = core_data[key1] + core_data[key2]
491 self.set_speed(core_data['cores'], core_data['current'])
493 def set_pps(self, cores, pps, pkt_size,
494 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
495 """ set packets per second for specific cores on the remote instance """
496 msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
497 LOG.debug(msg, cores, pps, pkt_size)
499 # speed in percent of line-rate
500 speed = float(pps) * (pkt_size + 20) / line_speed / BITS_PER_BYTE
501 self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
503 def lat_stats(self, cores, task=0):
504 """Get the latency statistics from the remote system"""
505 # 1-based index, if max core is 4, then 0, 1, 2, 3, 4 len = 5
510 self.put_command("lat stats {} {} \n".format(core, task))
511 ret = self.get_data()
514 lat_min[core], lat_max[core], lat_avg[core] = \
515 tuple(int(n) for n in ret.split(",")[:3])
517 except (AttributeError, ValueError, TypeError):
520 return lat_min, lat_max, lat_avg
522 def get_all_tot_stats(self):
523 self.put_command("tot stats\n")
524 all_stats_str = self.get_data().split(",")
525 if len(all_stats_str) != 4:
528 all_stats = TotStatsTuple(int(v) for v in all_stats_str)
529 self.master_stats = all_stats
533 return self.get_all_tot_stats()[3]
535 def core_stats(self, cores, task=0):
536 """Get the receive statistics from the remote system"""
537 rx = tx = drop = tsc = 0
539 self.put_command("core stats {} {}\n".format(core, task))
540 ret = self.get_data().split(",")
545 return rx, tx, drop, tsc
547 def port_stats(self, ports):
548 """get counter values from a specific port"""
549 tot_result = [0] * 12
551 self.put_command("port_stats {}\n".format(port))
552 ret = [try_int(s, 0) for s in self.get_data().split(",")]
553 tot_result = [sum(x) for x in zip(tot_result, ret)]
557 def measure_tot_stats(self):
558 start = self.get_all_tot_stats()
559 container = {'start_tot': start}
563 container['end_tot'] = end = self.get_all_tot_stats()
565 container['delta'] = TotStatsTuple(e - s for s, e in zip(start, end))
568 """Get the total statistics from the remote system"""
569 stats = self.get_all_tot_stats()
572 def tot_ierrors(self):
573 """Get the total ierrors from the remote system"""
574 self.put_command("tot ierrors tot\n")
575 recv = self.get_data().split(',')
576 tot_ierrors = int(recv[0])
578 return tot_ierrors, tsc
580 def set_count(self, count, cores):
581 """Set the number of packets to send on the specified core"""
582 self._run_template_over_cores("count {} 0 {}\n", cores, count)
584 def dump_rx(self, core_id, task_id=0, count=1):
585 """Activate dump on rx on the specified core"""
586 LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
587 self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
588 time.sleep(1.5) # Give PROX time to set up packet dumping
596 """ stop all cores on the remote instance """
597 LOG.debug("Quit prox")
598 self.put_command("quit\n")
601 def force_quit(self):
602 """ stop all cores on the remote instance """
603 LOG.debug("Force Quit prox")
604 self.put_command("quit_force\n")
608 _LOCAL_OBJECT = object()
611 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
612 # the actual app is lowercase
614 # not used for Prox but added for consistency
617 LUA_PARAMETER_NAME = ""
618 LUA_PARAMETER_PEER = {
623 CONFIG_QUEUE_TIMEOUT = 120
625 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
626 self.remote_path = None
627 super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
628 self.remote_prox_file_name = None
629 self._prox_config_data = None
630 self.additional_files = {}
631 self.config_queue = Queue()
632 # allow_exit_without_flush
633 self.config_queue.cancel_join_thread()
634 self._global_section = None
637 def prox_config_data(self):
638 if self._prox_config_data is None:
639 # this will block, but it needs too
640 self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
641 return self._prox_config_data
644 def global_section(self):
645 if self._global_section is None and self.prox_config_data:
646 self._global_section = self.find_section("global")
647 return self._global_section
649 def find_section(self, name, default=_LOCAL_OBJECT):
650 result = next((value for key, value in self.prox_config_data if key == name), default)
651 if result is _LOCAL_OBJECT:
652 raise KeyError('{} not found in Prox config'.format(name))
655 def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT):
656 section = self.find_section(section_name, [])
657 result = next((value for key, value in section if key == section_key), default)
658 if result is _LOCAL_OBJECT:
659 template = '{} not found in {} section of Prox config'
660 raise KeyError(template.format(section_key, section_name))
663 def copy_to_target(self, config_file_path, prox_file):
664 remote_path = os.path.join("/tmp", prox_file)
665 self.ssh_helper.put(config_file_path, remote_path)
669 def _get_tx_port(section, sections):
671 for item in sections[section]:
672 if item[0] == "tx port":
673 iface_port = re.findall(r'\d+', item[1])
674 # do we want the last one?
675 # if yes, then can we reverse?
676 return int(iface_port[0])
679 def _replace_quoted_with_value(quoted, value, count=1):
680 new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
683 def _insert_additional_file(self, value):
684 file_str = value.split('"')
685 base_name = os.path.basename(file_str[1])
686 file_str[1] = self.additional_files[base_name]
687 return '"'.join(file_str)
689 def generate_prox_config_file(self, config_path):
691 prox_config = ConfigParser(config_path, sections)
694 # Ensure MAC is set "hardware"
695 all_ports = self.vnfd_helper.port_pairs.all_ports
696 # use dpdk port number
697 for port_name in all_ports:
698 port_num = self.vnfd_helper.port_num(port_name)
699 port_section_name = "port {}".format(port_num)
700 for section_name, section in sections:
701 if port_section_name != section_name:
704 for section_data in section:
705 if section_data[0] == "mac":
706 section_data[1] = "hardware"
709 for _, section in sections:
710 for section_data in section:
711 item_key, item_val = section_data
712 if item_val.startswith("@@dst_mac"):
713 tx_port_iter = re.finditer(r'\d+', item_val)
714 tx_port_no = int(next(tx_port_iter).group(0))
715 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
716 mac = intf["virtual-interface"]["dst_mac"]
717 section_data[1] = mac.replace(":", " ", 6)
719 if item_key == "dst mac" and item_val.startswith("@@"):
720 tx_port_iter = re.finditer(r'\d+', item_val)
721 tx_port_no = int(next(tx_port_iter).group(0))
722 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
723 mac = intf["virtual-interface"]["dst_mac"]
724 section_data[1] = mac
726 if item_val.startswith("@@src_mac"):
727 tx_port_iter = re.finditer(r'\d+', item_val)
728 tx_port_no = int(next(tx_port_iter).group(0))
729 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
730 mac = intf["virtual-interface"]["local_mac"]
731 section_data[1] = mac.replace(":", " ", 6)
733 if item_key == "src mac" and item_val.startswith("@@"):
734 tx_port_iter = re.finditer(r'\d+', item_val)
735 tx_port_no = int(next(tx_port_iter).group(0))
736 intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
737 mac = intf["virtual-interface"]["local_mac"]
738 section_data[1] = mac
740 # if addition file specified in prox config
741 if not self.additional_files:
744 for section_name, section in sections:
745 for section_data in section:
747 if section_data[0].startswith("dofile"):
748 section_data[0] = self._insert_additional_file(section_data[0])
750 if section_data[1].startswith("dofile"):
751 section_data[1] = self._insert_additional_file(section_data[1])
752 except: # pylint: disable=bare-except
758 def write_prox_lua(lua_config):
760 Write an .ini-format config file for PROX (parameters.lua)
761 PROX does not allow a space before/after the =, so we need
765 for key in lua_config:
766 value = '"' + lua_config[key] + '"'
767 if key == "__name__":
769 if value is not None and value != '@':
770 key = "=".join((key, str(value).replace('\n', '\n\t')))
773 key = str(key).replace('\n', '\n\t')
775 return os.linesep.join(out)
778 def write_prox_config(prox_config):
780 Write an .ini-format config file for PROX
781 PROX does not allow a space before/after the =, so we need
785 for (section_name, section) in prox_config:
786 out.append("[{}]".format(section_name))
789 if key == "__name__":
791 if value is not None and value != '@':
792 key = "=".join((key, str(value).replace('\n', '\n\t')))
795 key = str(key).replace('\n', '\n\t')
797 return os.linesep.join(out)
799 def put_string_to_file(self, s, remote_path):
800 file_obj = cStringIO(s)
801 self.ssh_helper.put_file_obj(file_obj, remote_path)
804 def generate_prox_lua_file(self):
806 all_ports = self.vnfd_helper.port_pairs.all_ports
807 for port_name in all_ports:
808 port_num = self.vnfd_helper.port_num(port_name)
809 intf = self.vnfd_helper.find_interface(name=port_name)
810 vintf = intf['virtual-interface']
811 p["tester_mac{0}".format(port_num)] = vintf["dst_mac"]
812 p["src_mac{0}".format(port_num)] = vintf["local_mac"]
816 def upload_prox_lua(self, config_file, lua_data):
817 # prox can't handle spaces around ' = ' so use custom method
818 out = StringIO(self.write_prox_lua(lua_data))
820 remote_path = os.path.join("/tmp", config_file)
821 self.ssh_helper.put_file_obj(out, remote_path)
825 def upload_prox_config(self, config_file, prox_config_data):
826 # prox can't handle spaces around ' = ' so use custom method
827 out = StringIO(self.write_prox_config(prox_config_data))
829 remote_path = os.path.join("/tmp", config_file)
830 self.ssh_helper.put_file_obj(out, remote_path)
834 def build_config_file(self):
835 task_path = self.scenario_helper.task_path
836 options = self.scenario_helper.options
837 config_path = options['prox_config']
838 config_file = os.path.basename(config_path)
839 config_path = utils.find_relative_file(config_path, task_path)
840 self.additional_files = {}
843 if options['prox_generate_parameter']:
845 self.lua = self.generate_prox_lua_file()
846 if len(self.lua) > 0:
847 self.upload_prox_lua("parameters.lua", self.lua)
848 except: # pylint: disable=bare-except
851 prox_files = options.get('prox_files', [])
852 if isinstance(prox_files, six.string_types):
853 prox_files = [prox_files]
854 for key_prox_file in prox_files:
855 base_prox_file = os.path.basename(key_prox_file)
856 key_prox_path = utils.find_relative_file(key_prox_file, task_path)
857 remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
858 self.additional_files[base_prox_file] = remote_prox_file
860 self._prox_config_data = self.generate_prox_config_file(config_path)
861 # copy config to queue so we can read it from traffic_runner process
862 self.config_queue.put(self._prox_config_data)
863 self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
865 def build_config(self):
866 self.build_config_file()
868 options = self.scenario_helper.options
869 prox_args = options['prox_args']
870 tool_path = self.ssh_helper.join_bin_path(self.APP_NAME)
872 self.pipeline_kwargs = {
873 'tool_path': tool_path,
874 'tool_dir': os.path.dirname(tool_path),
875 'cfg_file': self.remote_path,
876 'args': ' '.join(' '.join([str(k), str(v) if v else ''])
877 for k, v in prox_args.items())
880 cmd_template = ("sudo bash -c 'cd {tool_dir}; {tool_path} -o cli "
881 "{args} -f {cfg_file} '")
882 return cmd_template.format(**self.pipeline_kwargs)
885 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
886 class ProxResourceHelper(ClientResourceHelper):
888 RESOURCE_WORD = 'prox'
895 def find_pci(pci, bound_pci):
896 # we have to substring match PCI bus address from the end
897 return any(b.endswith(pci) for b in bound_pci)
899 def __init__(self, setup_helper):
900 super(ProxResourceHelper, self).__init__(setup_helper)
901 self.mgmt_interface = self.vnfd_helper.mgmt_interface
902 self._user = self.mgmt_interface["user"]
903 self._ip = self.mgmt_interface["ip"]
906 self._vpci_to_if_name_map = None
907 self.additional_file = {}
908 self.remote_prox_file_name = None
913 self._test_type = None
918 self.client = self._connect()
923 if self._test_type is None:
924 self._test_type = self.setup_helper.find_in_section('global', 'name', None)
925 return self._test_type
927 def run_traffic(self, traffic_profile):
928 self._queue.cancel_join_thread()
932 traffic_profile.init(self._queue)
933 # this frees up the run_traffic loop
934 self.client_started.value = 1
936 while not self._terminated.value:
937 # move it all to traffic_profile
938 self._run_traffic_once(traffic_profile)
940 def _run_traffic_once(self, traffic_profile):
941 traffic_profile.execute_traffic(self)
942 if traffic_profile.done:
943 self._queue.put({'done': True})
944 LOG.debug("tg_prox done")
945 self._terminated.value = 1
947 # For VNF use ResourceHelper method to collect KPIs directly.
948 # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
949 def collect_collectd_kpi(self):
950 return self._collect_resource_kpi()
952 def collect_kpi(self):
953 result = super(ProxResourceHelper, self).collect_kpi()
954 # add in collectd kpis manually
956 result['collect_stats'] = self._collect_resource_kpi()
960 # should not be called, use VNF terminate
961 raise NotImplementedError()
964 return self.sut # force connection
966 def execute(self, cmd, *args, **kwargs):
967 func = getattr(self.sut, cmd, None)
969 return func(*args, **kwargs)
972 def _connect(self, client=None):
973 """Run and connect to prox on the remote system """
974 # De-allocating a large amount of hugepages takes some time. If a new
975 # PROX instance is started immediately after killing the previous one,
976 # it might not be able to allocate hugepages, because they are still
977 # being freed. Hence the -w switch.
978 # self.connection.execute("sudo killall -w Prox 2>/dev/null")
979 # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
980 # -f ./handle_none-4.cfg"
981 # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
983 # + "export RTE_TARGET=" + self._dpdk_target + ";" \
984 # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
986 # + "./build/Prox " + prox_args
987 # log.debug("Starting PROX with command [%s]", prox_cmd)
988 # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
989 # self._ip, prox_cmd))
991 client = ProxSocketHelper()
993 # try connecting to Prox for 60s
994 for _ in range(RETRY_SECONDS):
995 time.sleep(RETRY_INTERVAL)
997 client.connect(self._ip, PROX_PORT)
998 except (socket.gaierror, socket.error):
1003 msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1004 raise Exception(msg.format(self._ip, PROX_PORT))
1007 class ProxDataHelper(object):
1009 def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss, line_speed):
1010 super(ProxDataHelper, self).__init__()
1011 self.vnfd_helper = vnfd_helper
1013 self.pkt_size = pkt_size
1015 self.line_speed = line_speed
1016 self.tolerated_loss = tolerated_loss
1017 self.port_count = len(self.vnfd_helper.port_pairs.all_ports)
1019 self.measured_stats = None
1021 self._totals_and_pps = None
1022 self.result_tuple = None
1025 def totals_and_pps(self):
1026 if self._totals_and_pps is None:
1027 rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8]
1028 requested_pps = self.value / 100.0 * self.line_rate_to_pps()
1029 self._totals_and_pps = rx_total, tx_total, requested_pps
1030 return self._totals_and_pps
1034 return self.totals_and_pps[0]
1038 return self.totals_and_pps[1]
1041 def requested_pps(self):
1042 return self.totals_and_pps[2]
1047 for port_name, port_num in self.vnfd_helper.ports_iter():
1049 port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8]
1050 samples[port_name] = {
1051 "in_packets": port_rx_total,
1052 "out_packets": port_tx_total,
1054 except (KeyError, TypeError, NameError, MemoryError, ValueError,
1055 SystemError, BufferError):
1056 samples[port_name] = {
1062 def __enter__(self):
1063 self.check_interface_count()
1066 def __exit__(self, exc_type, exc_val, exc_tb):
1069 def make_tuple(self):
1070 if self.result_tuple:
1073 self.result_tuple = ProxTestDataTuple(
1074 self.tolerated_loss,
1076 self.measured_stats['delta'].rx,
1077 self.measured_stats['delta'].tx,
1078 self.measured_stats['delta'].tsc,
1084 self.result_tuple.log_data()
1087 def measure_tot_stats(self):
1088 with self.sut.measure_tot_stats() as self.measured_stats:
1091 def check_interface_count(self):
1092 # do this assert in init? unless we expect interface count to
1093 # change from one run to another run...
1094 assert self.port_count in {1, 2, 4}, \
1095 "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
1097 def capture_tsc_hz(self):
1098 self.tsc_hz = float(self.sut.hz())
1100 def line_rate_to_pps(self):
1101 return self.port_count * self.line_speed / BITS_PER_BYTE / (self.pkt_size + 20)
1103 class ProxProfileHelper(object):
1105 __prox_profile_type__ = "Generic"
1107 PROX_CORE_GEN_MODE = "gen"
1108 PROX_CORE_LAT_MODE = "lat"
1111 def get_cls(cls, helper_type):
1112 """Return class of specified type."""
1114 return ProxProfileHelper
1116 for profile_helper_class in utils.itersubclasses(cls):
1117 if helper_type == profile_helper_class.__prox_profile_type__:
1118 return profile_helper_class
1120 return ProxProfileHelper
1123 def make_profile_helper(cls, resource_helper):
1124 return cls.get_cls(resource_helper.test_type)(resource_helper)
1126 def __init__(self, resource_helper):
1127 super(ProxProfileHelper, self).__init__()
1128 self.resource_helper = resource_helper
1129 self._cpu_topology = None
1130 self._test_cores = None
1131 self._latency_cores = None
1134 def cpu_topology(self):
1135 if not self._cpu_topology:
1136 stdout = io.BytesIO()
1137 self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
1138 self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
1139 return self._cpu_topology
1142 def test_cores(self):
1143 if not self._test_cores:
1144 self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
1145 return self._test_cores
1148 def latency_cores(self):
1149 if not self._latency_cores:
1150 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1151 return self._latency_cores
1154 def traffic_context(self, pkt_size, value):
1156 self.sut.reset_stats()
1158 self.sut.set_pkt_size(self.test_cores, pkt_size)
1159 self.sut.set_speed(self.test_cores, value)
1160 self.sut.start_all()
1166 def get_cores(self, mode):
1169 for section_name, section in self.setup_helper.prox_config_data:
1170 if not section_name.startswith("core"):
1173 for key, value in section:
1174 if key == "mode" and value == mode:
1175 core_tuple = CoreSocketTuple(section_name)
1176 core = core_tuple.core_id
1181 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1182 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1183 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1184 value, tolerated_loss, line_speed)
1186 with data_helper, self.traffic_context(pkt_size, value):
1187 with data_helper.measure_tot_stats():
1188 time.sleep(duration)
1189 # Getting statistics to calculate PPS at right speed....
1190 data_helper.capture_tsc_hz()
1191 data_helper.latency = self.get_latency()
1193 return data_helper.result_tuple, data_helper.samples
1195 def get_latency(self):
1197 :return: return lat_min, lat_max, lat_avg
1201 if not self._latency_cores:
1202 self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
1204 if self._latency_cores:
1205 return self.sut.lat_stats(self._latency_cores)
1208 def terminate(self):
1211 def __getattr__(self, item):
1212 return getattr(self.resource_helper, item)
1215 class ProxMplsProfileHelper(ProxProfileHelper):
1217 __prox_profile_type__ = "MPLS tag/untag"
1219 def __init__(self, resource_helper):
1220 super(ProxMplsProfileHelper, self).__init__(resource_helper)
1221 self._cores_tuple = None
1224 def mpls_cores(self):
1225 if not self._cores_tuple:
1226 self._cores_tuple = self.get_cores_mpls()
1227 return self._cores_tuple
1230 def tagged_cores(self):
1231 return self.mpls_cores[0]
1234 def plain_cores(self):
1235 return self.mpls_cores[1]
1237 def get_cores_mpls(self):
1240 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1241 if not section_name.startswith("core"):
1244 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1247 for item_key, item_value in section:
1248 if item_key != 'name':
1251 if item_value.startswith("tag"):
1252 core_tuple = CoreSocketTuple(section_name)
1253 core_tag = core_tuple.core_id
1254 cores_tagged.append(core_tag)
1256 elif item_value.startswith("udp"):
1257 core_tuple = CoreSocketTuple(section_name)
1258 core_udp = core_tuple.core_id
1259 cores_plain.append(core_udp)
1261 return cores_tagged, cores_plain
1264 def traffic_context(self, pkt_size, value):
1266 self.sut.reset_stats()
1268 self.sut.set_pkt_size(self.tagged_cores, pkt_size)
1269 self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
1270 self.sut.set_speed(self.tagged_cores, value)
1271 ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
1272 self.sut.set_speed(self.plain_cores, value * ratio)
1273 self.sut.start_all()
1280 class ProxBngProfileHelper(ProxProfileHelper):
1282 __prox_profile_type__ = "BNG gen"
1284 def __init__(self, resource_helper):
1285 super(ProxBngProfileHelper, self).__init__(resource_helper)
1286 self._cores_tuple = None
1289 def bng_cores(self):
1290 if not self._cores_tuple:
1291 self._cores_tuple = self.get_cores_gen_bng_qos()
1292 return self._cores_tuple
1295 def cpe_cores(self):
1296 return self.bng_cores[0]
1299 def inet_cores(self):
1300 return self.bng_cores[1]
1303 def arp_cores(self):
1304 return self.bng_cores[2]
1307 def arp_task_cores(self):
1308 return self.bng_cores[3]
1311 def all_rx_cores(self):
1312 return self.latency_cores
1314 def get_cores_gen_bng_qos(self):
1318 arp_tasks_core = [0]
1319 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1320 if not section_name.startswith("core"):
1323 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1326 for item_key, item_value in section:
1327 if item_key != 'name':
1330 if item_value.startswith("cpe"):
1331 core_tuple = CoreSocketTuple(section_name)
1332 cpe_core = core_tuple.core_id
1333 cpe_cores.append(cpe_core)
1335 elif item_value.startswith("inet"):
1336 core_tuple = CoreSocketTuple(section_name)
1337 inet_core = core_tuple.core_id
1338 inet_cores.append(inet_core)
1340 elif item_value.startswith("arp"):
1341 core_tuple = CoreSocketTuple(section_name)
1342 arp_core = core_tuple.core_id
1343 arp_cores.append(arp_core)
1345 # We check the tasks/core separately
1346 if item_value.startswith("arp_task"):
1347 core_tuple = CoreSocketTuple(section_name)
1348 arp_task_core = core_tuple.core_id
1349 arp_tasks_core.append(arp_task_core)
1351 return cpe_cores, inet_cores, arp_cores, arp_tasks_core
1354 def traffic_context(self, pkt_size, value):
1355 # Tester is sending packets at the required speed already after
1356 # setup_test(). Just get the current statistics, sleep the required
1357 # amount of time and calculate packet loss.
1358 inet_pkt_size = pkt_size
1359 cpe_pkt_size = pkt_size - 24
1360 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1362 curr_up_speed = curr_down_speed = 0
1363 max_up_speed = max_down_speed = value
1365 max_down_speed = value * ratio
1367 max_up_speed = value / ratio
1373 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1375 self.sut.start(self.all_rx_cores)
1377 self.sut.stop(self.all_rx_cores)
1379 self.sut.reset_stats()
1381 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1382 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1384 self.sut.reset_values(self.cpe_cores)
1385 self.sut.reset_values(self.inet_cores)
1387 # Set correct IP and UDP lengths in packet headers
1389 # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1390 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1391 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1392 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1395 # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1396 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1397 # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4)
1398 self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2)
1399 # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4)
1400 self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2)
1402 # Sending ARP to initialize tables - need a few seconds of generation
1403 # to make sure all CPEs are initialized
1404 LOG.info("Initializing SUT: sending ARP packets")
1405 self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores)
1406 self.sut.set_speed(self.inet_cores, curr_up_speed)
1407 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1408 self.sut.start(self.arp_cores)
1411 # Ramp up the transmission speed. First go to the common speed, then
1412 # increase steps for the faster one.
1413 self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores)
1415 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1417 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1418 # The min(..., ...) takes care of 1) floating point rounding errors
1419 # that could make curr_*_speed to be slightly greater than
1420 # max_*_speed and 2) max_*_speed not being an exact multiple of
1422 if curr_up_speed < max_up_speed:
1423 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1424 if curr_down_speed < max_down_speed:
1425 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1427 self.sut.set_speed(self.inet_cores, curr_up_speed)
1428 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1429 time.sleep(self.step_time)
1431 LOG.info("Target speeds reached. Starting real test.")
1435 self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores)
1436 LOG.info("Test ended. Flushing NIC buffers")
1437 self.sut.start(self.all_rx_cores)
1439 self.sut.stop(self.all_rx_cores)
1441 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1442 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1443 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1444 value, tolerated_loss, line_speed)
1446 with data_helper, self.traffic_context(pkt_size, value):
1447 with data_helper.measure_tot_stats():
1448 time.sleep(duration)
1449 # Getting statistics to calculate PPS at right speed....
1450 data_helper.capture_tsc_hz()
1451 data_helper.latency = self.get_latency()
1453 return data_helper.result_tuple, data_helper.samples
1456 class ProxVpeProfileHelper(ProxProfileHelper):
1458 __prox_profile_type__ = "vPE gen"
1460 def __init__(self, resource_helper):
1461 super(ProxVpeProfileHelper, self).__init__(resource_helper)
1462 self._cores_tuple = None
1463 self._ports_tuple = None
1466 def vpe_cores(self):
1467 if not self._cores_tuple:
1468 self._cores_tuple = self.get_cores_gen_vpe()
1469 return self._cores_tuple
1472 def cpe_cores(self):
1473 return self.vpe_cores[0]
1476 def inet_cores(self):
1477 return self.vpe_cores[1]
1480 def all_rx_cores(self):
1481 return self.latency_cores
1484 def vpe_ports(self):
1485 if not self._ports_tuple:
1486 self._ports_tuple = self.get_ports_gen_vpe()
1487 return self._ports_tuple
1490 def cpe_ports(self):
1491 return self.vpe_ports[0]
1494 def inet_ports(self):
1495 return self.vpe_ports[1]
1497 def get_cores_gen_vpe(self):
1500 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1501 if not section_name.startswith("core"):
1504 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1507 for item_key, item_value in section:
1508 if item_key != 'name':
1511 if item_value.startswith("cpe"):
1512 core_tuple = CoreSocketTuple(section_name)
1513 core_tag = core_tuple.core_id
1514 cpe_cores.append(core_tag)
1516 elif item_value.startswith("inet"):
1517 core_tuple = CoreSocketTuple(section_name)
1518 inet_core = core_tuple.core_id
1519 inet_cores.append(inet_core)
1521 return cpe_cores, inet_cores
1523 def get_ports_gen_vpe(self):
1527 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1528 if not section_name.startswith("port"):
1530 tx_port_iter = re.finditer(r'\d+', section_name)
1531 tx_port_no = int(next(tx_port_iter).group(0))
1533 for item_key, item_value in section:
1534 if item_key != 'name':
1537 if item_value.startswith("cpe"):
1538 cpe_ports.append(tx_port_no)
1540 elif item_value.startswith("inet"):
1541 inet_ports.append(tx_port_no)
1543 return cpe_ports, inet_ports
1546 def traffic_context(self, pkt_size, value):
1547 # Calculate the target upload and download speed. The upload and
1548 # download packets have different packet sizes, so in order to get
1549 # equal bandwidth usage, the ratio of the speeds has to match the ratio
1550 # of the packet sizes.
1551 cpe_pkt_size = pkt_size
1552 inet_pkt_size = pkt_size - 4
1553 ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20)
1555 curr_up_speed = curr_down_speed = 0
1556 max_up_speed = max_down_speed = value
1558 max_down_speed = value * ratio
1560 max_up_speed = value / ratio
1562 # Adjust speed when multiple cores per port are used to generate traffic
1563 if len(self.cpe_ports) != len(self.cpe_cores):
1564 max_down_speed *= 1.0 * len(self.cpe_ports) / len(self.cpe_cores)
1565 if len(self.inet_ports) != len(self.inet_cores):
1566 max_up_speed *= 1.0 * len(self.inet_ports) / len(self.inet_cores)
1572 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1574 self.sut.start(self.all_rx_cores)
1576 self.sut.stop(self.all_rx_cores)
1578 self.sut.reset_stats()
1580 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1581 self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size)
1583 self.sut.reset_values(self.cpe_cores)
1584 self.sut.reset_values(self.inet_cores)
1586 # Set correct IP and UDP lengths in packet headers
1587 # CPE: IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4)
1588 self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2)
1589 # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4)
1590 self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2)
1592 # INET: IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4)
1593 self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2)
1594 # UDP length (byte 42): 42 for MAC(12), EthType(2), MPLS(4), IP(20), CRC(4)
1595 self.sut.set_value(self.inet_cores, 42, inet_pkt_size - 42, 2)
1597 self.sut.set_speed(self.inet_cores, curr_up_speed)
1598 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1600 # Ramp up the transmission speed. First go to the common speed, then
1601 # increase steps for the faster one.
1602 self.sut.start(self.cpe_cores + self.inet_cores + self.all_rx_cores)
1604 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1606 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1607 # The min(..., ...) takes care of 1) floating point rounding errors
1608 # that could make curr_*_speed to be slightly greater than
1609 # max_*_speed and 2) max_*_speed not being an exact multiple of
1611 if curr_up_speed < max_up_speed:
1612 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1613 if curr_down_speed < max_down_speed:
1614 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1616 self.sut.set_speed(self.inet_cores, curr_up_speed)
1617 self.sut.set_speed(self.cpe_cores, curr_down_speed)
1618 time.sleep(self.step_time)
1620 LOG.info("Target speeds reached. Starting real test.")
1624 self.sut.stop(self.cpe_cores + self.inet_cores)
1625 LOG.info("Test ended. Flushing NIC buffers")
1626 self.sut.start(self.all_rx_cores)
1628 self.sut.stop(self.all_rx_cores)
1630 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1631 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1632 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1633 value, tolerated_loss, line_speed)
1635 with data_helper, self.traffic_context(pkt_size, value):
1636 with data_helper.measure_tot_stats():
1637 time.sleep(duration)
1638 # Getting statistics to calculate PPS at right speed....
1639 data_helper.capture_tsc_hz()
1640 data_helper.latency = self.get_latency()
1642 return data_helper.result_tuple, data_helper.samples
1645 class ProxlwAFTRProfileHelper(ProxProfileHelper):
1647 __prox_profile_type__ = "lwAFTR gen"
1649 def __init__(self, resource_helper):
1650 super(ProxlwAFTRProfileHelper, self).__init__(resource_helper)
1651 self._cores_tuple = None
1652 self._ports_tuple = None
1654 self.step_time = 0.5
1657 def _lwaftr_cores(self):
1658 if not self._cores_tuple:
1659 self._cores_tuple = self._get_cores_gen_lwaftr()
1660 return self._cores_tuple
1663 def tun_cores(self):
1664 return self._lwaftr_cores[0]
1667 def inet_cores(self):
1668 return self._lwaftr_cores[1]
1671 def _lwaftr_ports(self):
1672 if not self._ports_tuple:
1673 self._ports_tuple = self._get_ports_gen_lw_aftr()
1674 return self._ports_tuple
1677 def tun_ports(self):
1678 return self._lwaftr_ports[0]
1681 def inet_ports(self):
1682 return self._lwaftr_ports[1]
1685 def all_rx_cores(self):
1686 return self.latency_cores
1688 def _get_cores_gen_lwaftr(self):
1691 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1692 if not section_name.startswith("core"):
1695 if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section):
1698 core_tuple = CoreSocketTuple(section_name)
1699 core_tag = core_tuple.core_id
1700 for item_value in (v for k, v in section if k == 'name'):
1701 if item_value.startswith('tun'):
1702 tun_cores.append(core_tag)
1703 elif item_value.startswith('inet'):
1704 inet_cores.append(core_tag)
1706 return tun_cores, inet_cores
1708 def _get_ports_gen_lw_aftr(self):
1712 re_port = re.compile(r'port (\d+)')
1713 for section_name, section in self.resource_helper.setup_helper.prox_config_data:
1714 match = re_port.search(section_name)
1718 tx_port_no = int(match.group(1))
1719 for item_value in (v for k, v in section if k == 'name'):
1720 if item_value.startswith('lwB4'):
1721 tun_ports.append(tx_port_no)
1722 elif item_value.startswith('inet'):
1723 inet_ports.append(tx_port_no)
1725 return tun_ports, inet_ports
1728 def _resize(len1, len2):
1731 return 1.0 * len1 / len2
1734 def traffic_context(self, pkt_size, value):
1735 # Tester is sending packets at the required speed already after
1736 # setup_test(). Just get the current statistics, sleep the required
1737 # amount of time and calculate packet loss.
1738 tun_pkt_size = pkt_size
1739 inet_pkt_size = pkt_size - 40
1740 ratio = 1.0 * (tun_pkt_size + 20) / (inet_pkt_size + 20)
1742 curr_up_speed = curr_down_speed = 0
1743 max_up_speed = max_down_speed = value
1745 max_up_speed = value / ratio
1747 # Adjust speed when multiple cores per port are used to generate traffic
1748 if len(self.tun_ports) != len(self.tun_cores):
1749 max_down_speed *= self._resize(len(self.tun_ports), len(self.tun_cores))
1750 if len(self.inet_ports) != len(self.inet_cores):
1751 max_up_speed *= self._resize(len(self.inet_ports), len(self.inet_cores))
1757 # Flush any packets in the NIC RX buffers, otherwise the stats will be
1759 self.sut.start(self.all_rx_cores)
1761 self.sut.stop(self.all_rx_cores)
1763 self.sut.reset_stats()
1765 self.sut.set_pkt_size(self.inet_cores, inet_pkt_size)
1766 self.sut.set_pkt_size(self.tun_cores, tun_pkt_size)
1768 self.sut.reset_values(self.tun_cores)
1769 self.sut.reset_values(self.inet_cores)
1771 # Set correct IP and UDP lengths in packet headers
1773 # IPv6 length (byte 18): 58 for MAC(12), EthType(2), IPv6(40) , CRC(4)
1774 self.sut.set_value(self.tun_cores, 18, tun_pkt_size - 58, 2)
1775 # IP length (byte 56): 58 for MAC(12), EthType(2), CRC(4)
1776 self.sut.set_value(self.tun_cores, 56, tun_pkt_size - 58, 2)
1777 # UDP length (byte 78): 78 for MAC(12), EthType(2), IP(20), UDP(8), CRC(4)
1778 self.sut.set_value(self.tun_cores, 78, tun_pkt_size - 78, 2)
1781 # IP length (byte 20): 22 for MAC(12), EthType(2), CRC(4)
1782 self.sut.set_value(self.inet_cores, 16, inet_pkt_size - 18, 2)
1783 # UDP length (byte 42): 42 for MAC(12), EthType(2), IP(20), UPD(8), CRC(4)
1784 self.sut.set_value(self.inet_cores, 38, inet_pkt_size - 38, 2)
1786 LOG.info("Initializing SUT: sending lwAFTR packets")
1787 self.sut.set_speed(self.inet_cores, curr_up_speed)
1788 self.sut.set_speed(self.tun_cores, curr_down_speed)
1791 # Ramp up the transmission speed. First go to the common speed, then
1792 # increase steps for the faster one.
1793 self.sut.start(self.tun_cores + self.inet_cores + self.latency_cores)
1795 LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed)
1797 while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed):
1798 # The min(..., ...) takes care of 1) floating point rounding errors
1799 # that could make curr_*_speed to be slightly greater than
1800 # max_*_speed and 2) max_*_speed not being an exact multiple of
1802 if curr_up_speed < max_up_speed:
1803 curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed)
1804 if curr_down_speed < max_down_speed:
1805 curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed)
1807 self.sut.set_speed(self.inet_cores, curr_up_speed)
1808 self.sut.set_speed(self.tun_cores, curr_down_speed)
1809 time.sleep(self.step_time)
1811 LOG.info("Target speeds reached. Starting real test.")
1815 self.sut.stop(self.tun_cores + self.inet_cores)
1816 LOG.info("Test ended. Flushing NIC buffers")
1817 self.sut.start(self.all_rx_cores)
1819 self.sut.stop(self.all_rx_cores)
1821 def run_test(self, pkt_size, duration, value, tolerated_loss=0.0,
1822 line_speed=(constants.ONE_GIGABIT_IN_BITS * constants.NIC_GBPS_DEFAULT)):
1823 data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size,
1824 value, tolerated_loss, line_speed)
1826 with data_helper, self.traffic_context(pkt_size, value):
1827 with data_helper.measure_tot_stats():
1828 time.sleep(duration)
1829 # Getting statistics to calculate PPS at right speed....
1830 data_helper.capture_tsc_hz()
1831 data_helper.latency = self.get_latency()
1833 return data_helper.result_tuple, data_helper.samples