X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fvpe_vnf.py;h=cd4a008cefa3ae25e34b0107c8e4e74b03bb6c21;hb=c84187fc404d44082826f98b47c28d3d8f6690e5;hp=e9e80bdfb8cdd616fdd1b42b4a2ec2481c307990;hpb=87559e992c1f68559950b258146a530c49db9df0;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index e9e80bdfb..cd4a008ce 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -15,313 +15,281 @@ from __future__ import absolute_import from __future__ import print_function -import tempfile -import time + + import os import logging import re -from multiprocessing import Queue -import multiprocessing -import ipaddress -import six +import posixpath -from yardstick import ssh -from yardstick.network_services.vnf_generic.vnf.base import GenericVNF -from yardstick.network_services.utils import provision_tool -from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper -from yardstick.network_services.nfvi.resource import ResourceProfile +from six.moves import configparser, zip -LOG = logging.getLogger(__name__) -VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}' -CORES = ['0', '1', '2'] -WAIT_TIME = 20 +from yardstick.network_services.helpers.samplevnf_helper import PortPairs +from yardstick.network_services.pipeline import PipelineRules +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper +LOG = logging.getLogger(__name__) -class VpeApproxVnf(GenericVNF): +VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script}""" + +VPE_COLLECT_KPI = """\ +Pkts in:\s(\d+)\r\n\ +\tPkts dropped by AH:\s(\d+)\r\n\ +\tPkts dropped by other:\s(\d+)\ +""" + + +class ConfigCreate(object): + + @staticmethod + def vpe_tmq(config, index): + tm_q = 'TM{0}'.format(index) + config.add_section(tm_q) + config.set(tm_q, 'burst_read', '24') + config.set(tm_q, 'burst_write', '32') + config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg') + return config + + def __init__(self, uplink_ports, downlink_ports, socket): + super(ConfigCreate, self).__init__() + self.sw_q = -1 + self.sink_q = -1 + self.n_pipeline = 1 + self.uplink_ports = uplink_ports + self.downlink_ports = downlink_ports + self.pipeline_per_port = 9 + self.socket = socket + + def vpe_initialize(self, config): + config.add_section('EAL') + config.set('EAL', 'log_level', '0') + + config.add_section('PIPELINE0') + config.set('PIPELINE0', 'type', 'MASTER') + config.set('PIPELINE0', 'core', 's%sC0' % self.socket) + + config.add_section('MEMPOOL0') + config.set('MEMPOOL0', 'pool_size', '256K') + + config.add_section('MEMPOOL1') + config.set('MEMPOOL1', 'pool_size', '2M') + return config + + def vpe_rxq(self, config): + for port in self.downlink_ports: + new_section = 'RXQ{0}.0'.format(port) + config.add_section(new_section) + config.set(new_section, 'mempool', 'MEMPOOL1') + + return config + + def get_sink_swq(self, parser, pipeline, k, index): + sink = "" + pktq = parser.get(pipeline, k) + if "SINK" in pktq: + self.sink_q += 1 + sink = " SINK{0}".format(self.sink_q) + if "TM" in pktq: + sink = " TM{0}".format(index) + pktq = "SWQ{0}{1}".format(self.sw_q, sink) + return pktq + + def vpe_upstream(self, vnf_cfg, index=0): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_upstream')) + + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + if k == "pktq_in": + if "RXQ" in v: + value = "RXQ{0}.0".format(self.uplink_ports[index]) + else: + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + elif k == "pktq_out": + if "TXQ" in v: + value = "TXQ{0}.0".format(self.downlink_ports[index]) + else: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def vpe_downstream(self, vnf_cfg, index): + parser = configparser.ConfigParser() + parser.read(os.path.join(vnf_cfg, 'vpe_downstream')) + for pipeline in parser.sections(): + for k, v in parser.items(pipeline): + + if k == "pktq_in": + if "RXQ" not in v: + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "RXQ{0}.0 TM{1}".format(self.downlink_ports[index], index) + else: + value = "RXQ{0}.0".format(self.downlink_ports[index]) + + parser.set(pipeline, k, value) + + if k == "pktq_out": + if "TXQ" not in v: + self.sw_q += 1 + value = self.get_sink_swq(parser, pipeline, k, index) + elif "TM" in v: + value = "TXQ{0}.0 TM{1}".format(self.uplink_ports[index], index) + else: + value = "TXQ{0}.0".format(self.uplink_ports[index]) + + parser.set(pipeline, k, value) + + new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline) + if new_pipeline != pipeline: + parser._sections[new_pipeline] = parser._sections[pipeline] + parser._sections.pop(pipeline) + self.n_pipeline += 1 + return parser + + def create_vpe_config(self, vnf_cfg): + config = configparser.ConfigParser() + vpe_cfg = os.path.join("/tmp/vpe_config") + with open(vpe_cfg, 'w') as cfg_file: + config = self.vpe_initialize(config) + config = self.vpe_rxq(config) + config.write(cfg_file) + for index in range(0, len(self.uplink_ports)): + config = self.vpe_upstream(vnf_cfg, index) + config.write(cfg_file) + config = self.vpe_downstream(vnf_cfg, index) + config = self.vpe_tmq(config, index) + config.write(cfg_file) + + def generate_vpe_script(self, interfaces): + rules = PipelineRules(pipeline_id=1) + for priv_port, pub_port in zip(self.uplink_ports, self.downlink_ports): + priv_intf = interfaces[priv_port]["virtual-interface"] + pub_intf = interfaces[pub_port]["virtual-interface"] + + dst_port0_ip = priv_intf["dst_ip"] + dst_port1_ip = pub_intf["dst_ip"] + dst_port0_mac = priv_intf["dst_mac"] + dst_port1_mac = pub_intf["dst_mac"] + + rules.add_firewall_script(dst_port0_ip) + rules.next_pipeline() + rules.add_flow_classification_script() + rules.next_pipeline() + rules.add_flow_action() + rules.next_pipeline() + rules.add_flow_action2() + rules.next_pipeline() + rules.add_route_script(dst_port1_ip, dst_port1_mac) + rules.next_pipeline() + rules.add_route_script2(dst_port0_ip, dst_port0_mac) + rules.next_pipeline(num=4) + + return rules.get_string() + + def generate_tm_cfg(self, vnf_cfg, index=0): + vnf_cfg = os.path.join(vnf_cfg, "full_tm_profile_10G.cfg") + if os.path.exists(vnf_cfg): + return open(vnf_cfg).read() + + +class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): + + APP_NAME = 'vPE_vnf' + CFG_CONFIG = "/tmp/vpe_config" + CFG_SCRIPT = "/tmp/vpe_script" + TM_CONFIG = "/tmp/full_tm_profile_10G.cfg" + CORES = ['0', '1', '2', '3', '4', '5'] + PIPELINE_COMMAND = VPE_PIPELINE_COMMAND + + def _build_vnf_ports(self): + self._port_pairs = PortPairs(self.vnfd_helper.interfaces) + self.uplink_ports = self._port_pairs.uplink_ports + self.downlink_ports = self._port_pairs.downlink_ports + self.all_ports = self._port_pairs.all_ports + + def build_config(self): + vpe_vars = { + "bin_path": self.ssh_helper.bin_path, + "socket": self.socket, + } + + self._build_vnf_ports() + vpe_conf = ConfigCreate(self.vnfd_helper.port_pairs.uplink_ports, + self.vnfd_helper.port_pairs.downlink_ports, self.socket) + vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg) + + config_basename = posixpath.basename(self.CFG_CONFIG) + script_basename = posixpath.basename(self.CFG_SCRIPT) + tm_basename = posixpath.basename(self.TM_CONFIG) + with open(self.CFG_CONFIG) as handle: + vpe_config = handle.read() + + self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars)) + + vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces) + self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars)) + + tm_config = vpe_conf.generate_tm_cfg(self.scenario_helper.vnf_cfg) + self.ssh_helper.upload_config_file(tm_basename, tm_config) + + LOG.info("Provision and start the %s", self.APP_NAME) + LOG.info(self.CFG_CONFIG) + LOG.info(self.CFG_SCRIPT) + self._build_pipeline_kwargs() + return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) + + +class VpeApproxVnf(SampleVNF): """ This class handles vPE VNF model-driver definitions """ - def __init__(self, vnfd): - super(VpeApproxVnf, self).__init__(vnfd) - self.socket = None - self.q_in = Queue() - self.q_out = Queue() - self.vnf_cfg = None - self._vnf_process = None - self.connection = None - self.resource = None - - def _resource_collect_start(self): - self.resource.initiate_systemagent(self.bin_path) - self.resource.start() - - def _resource_collect_stop(self): - self.resource.stop() + APP_NAME = 'vPE_vnf' + APP_WORD = 'vpe' + COLLECT_KPI = VPE_COLLECT_KPI + WAIT_TIME = 20 - def _collect_resource_kpi(self): - result = {} + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + if setup_env_helper_type is None: + setup_env_helper_type = VpeApproxSetupEnvHelper - status = self.resource.check_if_sa_running("collectd")[0] - if status: - result = self.resource.amqp_collect_nfvi_kpi() + super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) - result = {"core": result} - - return result - - @classmethod - def __setup_hugepages(cls, connection): - hugepages = \ - connection.execute( - "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1] - hugepages = hugepages.rstrip() - - memory_path = \ - '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages - connection.execute("awk -F: '{ print $1 }' < %s" % memory_path) - - pages = 16384 if hugepages.rstrip() == "2048kB" else 16 - connection.execute("echo %s > %s" % (pages, memory_path)) - - def setup_vnf_environment(self, connection): - ''' setup dpdk environment needed for vnf to run ''' - - self.__setup_hugepages(connection) - connection.execute("modprobe uio && modprobe igb_uio") - - exit_status = connection.execute("lsmod | grep -i igb_uio")[0] - if exit_status == 0: - return - - dpdk = os.path.join(self.bin_path, "dpdk-16.07") - dpdk_setup = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "nsb_setup.sh")) - status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0] - if status: - connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - - def _get_cpu_sibling_list(self): - cpu_topo = [] - for core in CORES: - sys_cmd = \ - "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \ - % core - cpuid = \ - self.connection.execute("awk -F: '{ print $1 }' < %s" % - sys_cmd)[1] - cpu_topo += \ - [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')] - - return [cpu.strip() for cpu in cpu_topo] - - def scale(self, flavor=""): - ''' scale vnfbased on flavor input ''' - super(VpeApproxVnf, self).scale(flavor) - - def instantiate(self, scenario_cfg, context_cfg): - vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg'] - - mgmt_interface = self.vnfd["mgmt-interface"] - self.connection = ssh.SSH.from_node(mgmt_interface) - - self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc']) - - self.setup_vnf_environment(self.connection) - - cores = self._get_cpu_sibling_list() - self.resource = ResourceProfile(self.vnfd, cores) - - self.connection.execute("pkill vPE_vnf") - dpdk_nic_bind = \ - provision_tool(self.connection, - os.path.join(self.bin_path, "dpdk_nic_bind.py")) - - interfaces = self.vnfd["vdu"][0]['external-interface'] - self.socket = \ - next((0 for v in interfaces - if v['virtual-interface']["vpci"][5] == "0"), 1) - - bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] - for vpci in bound_pci: - self.connection.execute( - "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci)) - queue_wrapper = \ - QueueFileWrapper(self.q_in, self.q_out, "pipeline>") - self._vnf_process = multiprocessing.Process(target=self._run_vpe, - args=(queue_wrapper, - vnf_cfg,)) - self._vnf_process.start() - buf = [] - time.sleep(WAIT_TIME) # Give some time for config to load - while True: - message = '' - while self.q_out.qsize() > 0: - buf.append(self.q_out.get()) - message = ''.join(buf) - if "pipeline>" in message: - LOG.info("VPE VNF is up and running.") - queue_wrapper.clear() - self._resource_collect_start() - return self._vnf_process.exitcode - if "PANIC" in message: - raise RuntimeError("Error starting vPE VNF.") - - LOG.info("Waiting for VNF to start.. ") - time.sleep(3) - if not self._vnf_process.is_alive(): - raise RuntimeError("vPE VNF process died.") - - def _get_ports_gateway(self, name): - if 'routing_table' in self.vnfd['vdu'][0]: - routing_table = self.vnfd['vdu'][0]['routing_table'] - - for route in routing_table: - if name == route['if']: - return route['gateway'] - - def terminate(self): - self.execute_command("quit") - if self._vnf_process: - self._vnf_process.terminate() - - def _run_vpe(self, filewrapper, vnf_cfg): - mgmt_interface = self.vnfd["mgmt-interface"] - - self.connection = ssh.SSH.from_node(mgmt_interface) - self.connection.wait() - - interfaces = self.vnfd["vdu"][0]['external-interface'] - port0_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"], - interfaces[0]["virtual-interface"]["netmask"]))) - port1_ip = ipaddress.ip_interface(six.text_type( - "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"], - interfaces[1]["virtual-interface"]["netmask"]))) - dst_port0_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"], - interfaces[0]["virtual-interface"]["netmask"])) - dst_port1_ip = ipaddress.ip_interface( - u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"], - interfaces[1]["virtual-interface"]["netmask"])) - - vpe_vars = {"port0_local_ip": port0_ip.ip.exploded, - "port0_dst_ip": dst_port0_ip.ip.exploded, - "port0_local_ip_hex": - self._ip_to_hex(port0_ip.ip.exploded), - "port0_prefixlen": port0_ip.network.prefixlen, - "port0_netmask": port0_ip.network.netmask.exploded, - "port0_netmask_hex": - self._ip_to_hex(port0_ip.network.netmask.exploded), - "port0_local_mac": - interfaces[0]["virtual-interface"]["local_mac"], - "port0_dst_mac": - interfaces[0]["virtual-interface"]["dst_mac"], - "port0_gateway": - self._get_ports_gateway(interfaces[0]["name"]), - "port0_local_network": - port0_ip.network.network_address.exploded, - "port0_prefix": port0_ip.network.prefixlen, - "port1_local_ip": port1_ip.ip.exploded, - "port1_dst_ip": dst_port1_ip.ip.exploded, - "port1_local_ip_hex": - self._ip_to_hex(port1_ip.ip.exploded), - "port1_prefixlen": port1_ip.network.prefixlen, - "port1_netmask": port1_ip.network.netmask.exploded, - "port1_netmask_hex": - self._ip_to_hex(port1_ip.network.netmask.exploded), - "port1_local_mac": - interfaces[1]["virtual-interface"]["local_mac"], - "port1_dst_mac": - interfaces[1]["virtual-interface"]["dst_mac"], - "port1_gateway": - self._get_ports_gateway(interfaces[1]["name"]), - "port1_local_network": - port1_ip.network.network_address.exploded, - "port1_prefix": port1_ip.network.prefixlen, - "port0_local_ip6": self._get_port0localip6(), - "port1_local_ip6": self._get_port1localip6(), - "port0_prefixlen6": self._get_port0prefixlen6(), - "port1_prefixlen6": self._get_port1prefixlen6(), - "port0_gateway6": self._get_port0gateway6(), - "port1_gateway6": self._get_port1gateway6(), - "port0_dst_ip_hex6": self._get_port0localip6(), - "port1_dst_ip_hex6": self._get_port1localip6(), - "port0_dst_netmask_hex6": self._get_port0prefixlen6(), - "port1_dst_netmask_hex6": self._get_port1prefixlen6(), - "bin_path": self.bin_path, - "socket": self.socket} - - for cfg in os.listdir(vnf_cfg): - vpe_config = "" - with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg: - vpe_config = vpe_cfg.read() - - self._provide_config_file(cfg, vpe_config, vpe_vars) - - LOG.info("Provision and start the vPE") - tool_path = provision_tool(self.connection, - os.path.join(self.bin_path, "vPE_vnf")) - cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config", - script="/tmp/vpe_script", - tool_path=tool_path) - self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper, - keep_stdin_open=True, pty=True) - - def _provide_config_file(self, prefix, template, args): - cfg, cfg_content = tempfile.mkstemp() - cfg = os.fdopen(cfg, "w+") - cfg.write(template.format(**args)) - cfg.close() - cfg_file = "/tmp/%s" % prefix - self.connection.put(cfg_content, cfg_file) - return cfg_file - - def execute_command(self, cmd): - ''' send cmd to vnf process ''' - LOG.info("VPE command: %s", cmd) - output = [] - if self.q_in: - self.q_in.put(cmd + "\r\n") - time.sleep(3) - while self.q_out.qsize() > 0: - output.append(self.q_out.get()) - return "".join(output) + def get_stats(self, *args, **kwargs): + raise NotImplementedError def collect_kpi(self): - result = self.get_stats_vpe() - collect_stats = self._collect_resource_kpi() - result["collect_stats"] = collect_stats - LOG.debug("vPE collet Kpis: %s", result) - return result - - def get_stats_vpe(self): - ''' get vpe statistics ''' - result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, - 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0} - up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0', - 'p 5 stats port out 1'] - down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0'] - pattern = \ - "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \ - "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)" - - for cmd in up_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_up_stream"] = \ - result.get("pkt_in_up_stream", 0) + int(match.group(1)) - result["pkt_drop_up_stream"] = \ - result.get("pkt_drop_up_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) - - for cmd in down_stat_commands: - stats = self.execute_command(cmd) - match = re.search(pattern, stats, re.MULTILINE) - if match: - result["pkt_in_down_stream"] = \ - result.get("pkt_in_down_stream", 0) + int(match.group(1)) - result["pkt_drop_down_stream"] = \ - result.get("pkt_drop_down_stream", 0) + \ - int(match.group(2)) + int(match.group(3)) + result = { + 'pkt_in_up_stream': 0, + 'pkt_drop_up_stream': 0, + 'pkt_in_down_stream': 0, + 'pkt_drop_down_stream': 0, + 'collect_stats': self.resource_helper.collect_kpi(), + } + + indexes_in = [1] + indexes_drop = [2, 3] + command = 'p {0} stats port {1} 0' + for index, direction in ((5, 'up'), (9, 'down')): + key_in = "pkt_in_{0}_stream".format(direction) + key_drop = "pkt_drop_{0}_stream".format(direction) + for mode in ('in', 'out'): + stats = self.vnf_execute(command.format(index, mode)) + match = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + if not match: + continue + result[key_in] += sum(int(match.group(x)) for x in indexes_in) + result[key_drop] += sum(int(match.group(x)) for x in indexes_drop) + + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result