X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Ftg_rfc2544_trex.py;h=d94a9a6e6ee77da99e303b6b09987e593975a50f;hb=36375f388880519336328406de1d1bbc408d4599;hp=ee7498cb8ce49f99073a8e092f5367ddf48ef10c;hpb=8315a0b074e90e3f38515a51820f9bce761b2b3a;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index ee7498cb8..d94a9a6e6 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -15,271 +15,101 @@ from __future__ import absolute_import from __future__ import print_function -import multiprocessing import time import logging -import os -import yaml +from collections import Mapping +from itertools import chain -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen -from yardstick.network_services.utils import get_nsb_option -from stl.trex_stl_lib.trex_stl_client import STLClient -from stl.trex_stl_lib.trex_stl_client import LoggerApi -from stl.trex_stl_lib.trex_stl_exceptions import STLError +from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen +from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper +from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper LOGGING = logging.getLogger(__name__) -DURATION = 30 -WAIT_TIME = 3 -TREX_SYNC_PORT = 4500 -TREX_ASYNC_PORT = 4501 +class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper): -class TrexTrafficGenRFC(GenericTrafficGen): - """ - This class handles mapping traffic profile and generating - traffic for rfc2544 testcase. - """ - - def __init__(self, vnfd): - super(TrexTrafficGenRFC, self).__init__(vnfd) - self._result = {} - self._terminated = multiprocessing.Value('i', 0) - self._queue = multiprocessing.Queue() - self._terminated = multiprocessing.Value('i', 0) - self._traffic_process = None - self._vpci_ascending = None - self.tc_file_name = None - self.client = None - self.my_ports = None - - mgmt_interface = self.vnfd["mgmt-interface"] - ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT) - self.connection = ssh.SSH(mgmt_interface["user"], mgmt_interface["ip"], - password=mgmt_interface["password"], - port=ssh_port) - self.connection.wait() - - @classmethod - def _split_mac_address_into_list(cls, mac): - octets = mac.split(':') - for i, elem in enumerate(octets): - octets[i] = "0x" + str(elem) - return octets - - def _generate_trex_cfg(self, vnfd): - """ - - :param vnfd: vnfd.yaml - :return: trex_cfg.yaml file - """ - trex_cfg = dict( - port_limit=0, - version='2', - interfaces=[], - port_info=list(dict( - )) - ) - trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"]) - trex_cfg["version"] = '2' - - cfg_file = [] - vpci = [] - port = {} - - ext_intf = vnfd["vdu"][0]["external-interface"] - for interface in ext_intf: - virt_intf = interface["virtual-interface"] - vpci.append(virt_intf["vpci"]) - - port["src_mac"] = \ - self._split_mac_address_into_list(virt_intf["local_mac"]) - - time.sleep(WAIT_TIME) - port["dest_mac"] = \ - self._split_mac_address_into_list(virt_intf["dst_mac"]) - if virt_intf["dst_mac"]: - trex_cfg["port_info"].append(port.copy()) - - trex_cfg["interfaces"] = vpci - cfg_file.append(trex_cfg) - - with open('/tmp/trex_cfg.yaml', 'w') as outfile: - outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False)) - self.connection.put('/tmp/trex_cfg.yaml', '/etc') - - self._vpci_ascending = sorted(vpci) - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(TrexTrafficGenRFC, self).scale(flavor) + def is_done(self): + return self.latency and self.iteration.value > 10 - def instantiate(self, scenario_cfg, context_cfg): - self._generate_trex_cfg(self.vnfd) - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - trex = os.path.join(self.bin_path, "trex") - err, _, _ = \ - self.connection.execute("ls {} >/dev/null 2>&1".format(trex)) - if err != 0: - self.connection.put(trex, trex, True) - LOGGING.debug("Starting TRex server...") - _tg_server = \ - multiprocessing.Process(target=self._start_server) - _tg_server.start() - while True: - LOGGING.info("Waiting for TG Server to start.. ") - time.sleep(1) +class TrexRfcResourceHelper(TrexResourceHelper): - status = \ - self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0] - if status == 0: - LOGGING.info("TG server is up and running.") - return _tg_server.exitcode - if not _tg_server.is_alive(): - raise RuntimeError("Traffic Generator process died.") + LATENCY_TIME_SLEEP = 120 + RUN_DURATION = 30 + WAIT_TIME = 3 - def listen_traffic(self, traffic_profile): - pass + def __init__(self, setup_helper, rfc_helper_type=None): + super(TrexRfcResourceHelper, self).__init__(setup_helper) - def _get_logical_if_name(self, vpci): - ext_intf = self.vnfd["vdu"][0]["external-interface"] - for interface in range(len(self.vnfd["vdu"][0]["external-interface"])): - virtual_intf = ext_intf[interface]["virtual-interface"] - if virtual_intf["vpci"] == vpci: - return ext_intf[interface]["name"] + if rfc_helper_type is None: + rfc_helper_type = TrexRfc2544ResourceHelper - def run_traffic(self, traffic_profile, - client_started=multiprocessing.Value('i', 0)): + self.rfc2544_helper = rfc_helper_type(self.scenario_helper) + # self.tg_port_pairs = [] - self._traffic_process = \ - multiprocessing.Process(target=self._traffic_runner, - args=(traffic_profile, self._queue, - client_started, self._terminated)) - self._traffic_process.start() - # Wait for traffic process to start - while client_started.value == 0: - time.sleep(1) + def _build_ports(self): + self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs( + self.vnfd_helper.interfaces) + self.priv_ports = [int(x[0][2:]) for x in self.tg_port_pairs] + self.pub_ports = [int(x[1][2:]) for x in self.tg_port_pairs] + self.my_ports = list(set(chain(self.priv_ports, self.pub_ports))) - return self._traffic_process.is_alive() + def _run_traffic_once(self, traffic_profile): + if self._terminated.value: + return - def _start_server(self): - mgmt_interface = self.vnfd["mgmt-interface"] - ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT) - _server = ssh.SSH(mgmt_interface["user"], mgmt_interface["ip"], - password=mgmt_interface["password"], - port=ssh_port) - _server.wait() - - _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) - _server.execute("pkill -9 rex > /dev/null 2>&1") - - trex_path = os.path.join(self.bin_path, "trex/scripts") - path = get_nsb_option("trex_path", trex_path) - trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1" - - _server.execute(trex_cmd) - - def _connect_client(self, client=None): - if client is None: - client = STLClient(username=self.vnfd["mgmt-interface"]["user"], - server=self.vnfd["mgmt-interface"]["ip"], - verbose_level=LoggerApi.VERBOSE_QUIET) - for idx in range(6): - try: - client.connect() - break - except STLError: - LOGGING.info("Unable to connect to Trex. Attempt %s", idx) - time.sleep(WAIT_TIME) - return client - - @classmethod - def _get_rfc_tolerance(cls, tc_yaml): - tolerance = '0.8 - 1.0' - if 'tc_options' in tc_yaml['scenarios'][0]: - tc_options = tc_yaml['scenarios'][0]['tc_options'] - if 'rfc2544' in tc_options: - tolerance = \ - tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0') - - tolerance = tolerance.split('-') - min_tol = float(tolerance[0]) - if len(tolerance) == 2: - max_tol = float(tolerance[1]) - else: - max_tol = float(tolerance[0]) - - return [min_tol, max_tol] - - def _traffic_runner(self, traffic_profile, queue, - client_started, terminated): - LOGGING.info("Starting TRex client...") - tc_yaml = {} - - with open(self.tc_file_name) as tc_file: - tc_yaml = yaml.load(tc_file.read()) + traffic_profile.execute(self) + self.client_started.value = 1 + time.sleep(self.RUN_DURATION) + self.client.stop(self.my_ports) + time.sleep(self.WAIT_TIME) + samples = traffic_profile.get_drop_percentage(self) + self._queue.put(samples) - tolerance = self._get_rfc_tolerance(tc_yaml) + if not self.rfc2544_helper.is_done(): + return - # fixme: fix passing correct trex config file, - # instead of searching the default path - self.my_ports = [0, 1] - self.client = self._connect_client() + self.client.stop(self.my_ports) self.client.reset(ports=self.my_ports) - self.client.remove_all_streams(self.my_ports) # remove all streams - while not terminated.value: - traffic_profile.execute(self) - client_started.value = 1 - time.sleep(DURATION) + self.client.remove_all_streams(self.my_ports) + traffic_profile.execute_latency(samples=samples) + multiplier = traffic_profile.calculate_pps(samples)[1] + for _ in range(5): + time.sleep(self.LATENCY_TIME_SLEEP) self.client.stop(self.my_ports) - time.sleep(WAIT_TIME) + time.sleep(self.WAIT_TIME) last_res = self.client.get_stats(self.my_ports) - samples = {} - for vpci_idx in range(len(self._vpci_ascending)): - name = \ - self._get_logical_if_name(self._vpci_ascending[vpci_idx]) - # fixme: VNFDs KPIs values needs to be mapped to TRex structure - if not isinstance(last_res, dict): - terminated.value = 1 - last_res = {} + if not isinstance(last_res, Mapping): + self._terminated.value = 1 + continue + self.generate_samples('latency', {}) + self._queue.put(samples) + self.client.start(mult=str(multiplier), + ports=self.my_ports, + duration=120, force=True) - samples[name] = \ - {"rx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)), - "tx_throughput_fps": - float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)), - "rx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)), - "tx_throughput_mbps": - float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)), - "in_packets": - last_res.get(vpci_idx, {}).get("ipackets", 0), - "out_packets": - last_res.get(vpci_idx, {}).get("opackets", 0)} + def start_client(self, mult, duration, force=True): + self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force) - samples = \ - traffic_profile.get_drop_percentage(self, samples, - tolerance[0], tolerance[1]) - queue.put(samples) - self.client.stop(self.my_ports) - self.client.disconnect() - queue.put(samples) + def clear_client_stats(self): + self.client.clear_stats(ports=self.my_ports) def collect_kpi(self): - if not self._queue.empty(): - result = self._queue.get() - self._result.update(result) - LOGGING.debug("trex collect Kpis %s", self._result) - return self._result + self.rfc2544_helper.iteration.value += 1 + super(TrexRfcResourceHelper, self).collect_kpi() + - def terminate(self): - self._terminated.value = 1 # stop Trex clinet +class TrexTrafficGenRFC(TrexTrafficGen): + """ + This class handles mapping traffic profile and generating + traffic for rfc2544 testcase. + """ - self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" % - (TREX_SYNC_PORT, TREX_ASYNC_PORT)) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = TrexRfcResourceHelper - if self._traffic_process: - self._traffic_process.terminate() + super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type)