X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Fscenarios%2Fnetworking%2Fvnf_generic.py;h=b94bfc9abe66b572f7aa041c9d1f62ce83209ca8;hb=0c0ba9bff5e54d02b924fe1bfe30c68190a24956;hp=aecc228bc27ae203e66ce97a95b9c3390774cc46;hpb=ffecd5b84eecdb74d7534b801c1359251c34b34e;p=yardstick.git diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index aecc228bc..b94bfc9ab 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -19,16 +19,20 @@ import logging import errno import ipaddress + +import copy import os import sys import re from itertools import chain import six -from operator import itemgetter +import yaml from collections import defaultdict from yardstick.benchmark.scenarios import base +from yardstick.common.constants import LOG_DIR +from yardstick.common.process import terminate_children from yardstick.common.utils import import_modules_from_package, itersubclasses from yardstick.common.yaml_loader import yaml_load from yardstick.network_services.collector.subscriber import Collector @@ -63,10 +67,11 @@ class IncorrectSetup(Exception): class SshManager(object): - def __init__(self, node): + def __init__(self, node, timeout=120): super(SshManager, self).__init__() self.node = node self.conn = None + self.timeout = timeout def __enter__(self): """ @@ -75,7 +80,7 @@ class SshManager(object): """ try: self.conn = ssh.SSH.from_node(self.node) - self.conn.wait() + self.conn.wait(timeout=self.timeout) except SSHError as error: LOG.info("connect failed to %s, due to %s", self.node["ip"], error) # self.conn defaults to None @@ -87,19 +92,22 @@ class SshManager(object): def find_relative_file(path, task_path): + """ + Find file in one of places: in abs of path or + relative to TC scenario file. In this order. + + :param path: + :param task_path: + :return str: full path to file + """ # fixme: create schema to validate all fields have been provided - try: - with open(path): + for lookup in [os.path.abspath(path), os.path.join(task_path, path)]: + try: + with open(lookup): + return lookup + except IOError: pass - return path - except IOError as e: - if e.errno != errno.ENOENT: - raise - else: - rel_path = os.path.join(task_path, path) - with open(rel_path): - pass - return rel_path + raise IOError(errno.ENOENT, 'Unable to find {} file'.format(path)) def open_relative_file(path, task_path): @@ -131,11 +139,19 @@ class NetworkServiceTestCase(base.Scenario): self.vnfs = [] self.collector = None self.traffic_profile = None + self.node_netdevs = {} def _get_ip_flow_range(self, ip_start_range): + # IP range is specified as 'x.x.x.x-y.y.y.y' + if isinstance(ip_start_range, six.string_types): + return ip_start_range + node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0')) - if node_name is not None: + if node_name is None: + # we are manually specifying the range + ip_addr_range = range_or_interface + else: node = self.context_cfg["nodes"].get(node_name, {}) try: # the ip_range is the interface name @@ -150,24 +166,37 @@ class NetworkServiceTestCase(base.Scenario): ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False) hosts = list(ipaddr.hosts()) - ip_addr_range = "{}-{}".format(hosts[0], hosts[-1]) - else: - # we are manually specifying the range - ip_addr_range = range_or_interface + if len(hosts) > 2: + # skip the first host in case of gateway + ip_addr_range = "{}-{}".format(hosts[1], hosts[-1]) + else: + LOG.warning("Only single IP in range %s", ipaddr) + # fall back to single IP range + ip_addr_range = ip return ip_addr_range def _get_traffic_flow(self): flow = {} try: + # TODO: should be .0 or .1 so we can use list + # but this also roughly matches uplink_0, downlink_0 fflow = self.scenario_cfg["options"]["flow"] for index, src in enumerate(fflow.get("src_ip", [])): - flow["src_ip{}".format(index)] = self._get_ip_flow_range(src) + flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src) for index, dst in enumerate(fflow.get("dst_ip", [])): - flow["dst_ip{}".format(index)] = self._get_ip_flow_range(dst) + flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst) + + for index, publicip in enumerate(fflow.get("public_ip", [])): + flow["public_ip_{}".format(index)] = publicip + + for index, src_port in enumerate(fflow.get("src_port", [])): + flow["src_port_{}".format(index)] = src_port + + for index, dst_port in enumerate(fflow.get("dst_port", [])): + flow["dst_port_{}".format(index)] = dst_port - for index, publicip in enumerate(fflow.get("publicip", [])): - flow["public_ip{}".format(index)] = publicip + flow["count"] = fflow["count"] except KeyError: flow = {} return {"flow": flow} @@ -190,8 +219,8 @@ class NetworkServiceTestCase(base.Scenario): traffic_map_data = { 'flow': self._get_traffic_flow(), 'imix': self._get_traffic_imix(), - 'private': {}, - 'public': {}, + TrafficProfile.UPLINK: {}, + TrafficProfile.DOWNLINK: {}, } traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data) @@ -205,7 +234,26 @@ class NetworkServiceTestCase(base.Scenario): @staticmethod def get_vld_networks(networks): - return {n['vld_id']: n for n in networks.values()} + # network name is vld_id + vld_map = {} + for name, n in networks.items(): + try: + vld_map[n['vld_id']] = n + except KeyError: + vld_map[name] = n + return vld_map + + @staticmethod + def find_node_if(nodes, name, if_name, vld_id): + try: + # check for xe0, xe1 + intf = nodes[name]["interfaces"][if_name] + except KeyError: + # if not xe0, then maybe vld_id, uplink_0, downlink_0 + # pop it and re-insert with the correct name from topology + intf = nodes[name]["interfaces"].pop(vld_id) + nodes[name]["interfaces"][if_name] = intf + return intf def _resolve_topology(self): for vld in self.topology["vld"]: @@ -223,8 +271,8 @@ class NetworkServiceTestCase(base.Scenario): try: nodes = self.context_cfg["nodes"] - node0_if = nodes[node0_name]["interfaces"][node0_if_name] - node1_if = nodes[node1_name]["interfaces"][node1_if_name] + node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"]) + node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"]) # names so we can do reverse lookups node0_if["ifname"] = node0_if_name @@ -233,7 +281,6 @@ class NetworkServiceTestCase(base.Scenario): node0_if["node_name"] = node0_name node1_if["node_name"] = node1_name - vld_networks = self.get_vld_networks(self.context_cfg["networks"]) node0_if["vld_id"] = vld["id"] node1_if["vld_id"] = vld["id"] @@ -246,6 +293,7 @@ class NetworkServiceTestCase(base.Scenario): node1_if["peer_ifname"] = node0_if_name # just load the network + vld_networks = self.get_vld_networks(self.context_cfg["networks"]) node0_if["network"] = vld_networks.get(vld["id"], {}) node1_if["network"] = vld_networks.get(vld["id"], {}) @@ -274,8 +322,8 @@ class NetworkServiceTestCase(base.Scenario): node1_if_name = node1_data["vnfd-connection-point-ref"] nodes = self.context_cfg["nodes"] - node0_if = nodes[node0_name]["interfaces"][node0_if_name] - node1_if = nodes[node1_name]["interfaces"][node1_if_name] + node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"]) + node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"]) # add peer interface dict, but remove circular link # TODO: don't waste memory @@ -295,17 +343,16 @@ class NetworkServiceTestCase(base.Scenario): vnfd = self._find_vnfd_from_vnf_idx(vnf_idx) self.context_cfg["nodes"][vnf_name].update(vnfd) - @staticmethod - def _sort_dpdk_port_num(netdevs): - # dpdk_port_num is PCI BUS ID ordering, lowest first - s = sorted(netdevs.values(), key=itemgetter('pci_bus_id')) - for dpdk_port_num, netdev in enumerate(s): - netdev['dpdk_port_num'] = dpdk_port_num + def _probe_netdevs(self, node, node_dict, timeout=120): + try: + return self.node_netdevs[node] + except KeyError: + pass - def _probe_netdevs(self, node, node_dict): - cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show" netdevs = {} - with SshManager(node_dict) as conn: + cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show" + + with SshManager(node_dict, timeout=timeout) as conn: if conn: exit_status = conn.execute(cmd)[0] if exit_status != 0: @@ -316,6 +363,8 @@ class NetworkServiceTestCase(base.Scenario): raise IncorrectSetup( "Cannot find netdev info in sysfs" % node) netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout) + + self.node_netdevs[node] = netdevs return netdevs @classmethod @@ -331,6 +380,31 @@ class NetworkServiceTestCase(base.Scenario): 'ifindex': netdev['ifindex'], }) + def _generate_pod_yaml(self): + context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id'])) + # convert OrderedDict to a list + # pod.yaml nodes is a list + nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()] + pod_dict = { + "nodes": nodes, + "networks": self.context_cfg["networks"] + } + with open(context_yaml, "w") as context_out: + yaml.safe_dump(pod_dict, context_out, default_flow_style=False, + explicit_start=True) + + @staticmethod + def _serialize_node(node): + new_node = copy.deepcopy(node) + # name field is required + # remove context suffix + new_node["name"] = node['name'].split('.')[0] + try: + new_node["pkey"] = ssh.convert_key_to_str(node["pkey"]) + except KeyError: + pass + return new_node + TOPOLOGY_REQUIRED_KEYS = frozenset({ "vpci", "local_ip", "netmask", "local_mac", "driver"}) @@ -340,6 +414,10 @@ class NetworkServiceTestCase(base.Scenario): :return: None. Side effect: context_cfg is updated """ + num_nodes = len(self.context_cfg["nodes"]) + # OpenStack instance creation time is probably proportional to the number + # of instances + timeout = 120 * num_nodes for node, node_dict in self.context_cfg["nodes"].items(): for network in node_dict["interfaces"].values(): @@ -350,7 +428,7 @@ class NetworkServiceTestCase(base.Scenario): # only ssh probe if there are missing values # ssh probe won't work on Ixia, so we had better define all our values try: - netdevs = self._probe_netdevs(node, node_dict) + netdevs = self._probe_netdevs(node, node_dict, timeout=timeout) except (SSHError, SSHTimeout): raise IncorrectConfig( "Unable to probe missing interface fields '%s', on node %s " @@ -367,6 +445,8 @@ class NetworkServiceTestCase(base.Scenario): "Require interface fields '%s' not found, topology file " "corrupted" % ', '.join(missing)) + # we have to generate pod.yaml here so we have vpci and driver + self._generate_pod_yaml() # 3. Use topology file to find connections & resolve dest address self._resolve_topology() self._update_context_with_topology() @@ -428,10 +508,26 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ (expected_name, classes_found)) @staticmethod - def update_interfaces_from_node(vnfd, node): - for intf in vnfd["vdu"][0]["external-interface"]: - node_intf = node['interfaces'][intf['name']] - intf['virtual-interface'].update(node_intf) + def create_interfaces_from_node(vnfd, node): + ext_intfs = vnfd["vdu"][0]["external-interface"] = [] + # have to sort so xe0 goes first + for intf_name, intf in sorted(node['interfaces'].items()): + # only interfaces with vld_id are added. + # Thus there are two layers of filters, only intefaces with vld_id + # show up in interfaces, and only interfaces with traffic profiles + # are used by the generators + if intf.get('vld_id'): + # force dpkd_port_num to int so we can do reverse lookup + try: + intf['dpdk_port_num'] = int(intf['dpdk_port_num']) + except KeyError: + pass + ext_intf = { + "name": intf_name, + "virtual-interface": intf, + "vnfd-connection-point-ref": intf_name, + } + ext_intfs.append(ext_intf) def load_vnf_models(self, scenario_cfg=None, context_cfg=None): """ Create VNF objects based on YAML descriptors @@ -454,14 +550,25 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ # we assume OrderedDict for consistenct in instantiation for node_name, node in context_cfg["nodes"].items(): LOG.debug(node) - file_name = node["VNF model"] + try: + file_name = node["VNF model"] + except KeyError: + LOG.debug("no model for %s, skipping", node_name) + continue file_path = scenario_cfg['task_path'] with open_relative_file(file_name, file_path) as stream: vnf_model = stream.read() vnfd = vnfdgen.generate_vnfd(vnf_model, node) # TODO: here add extra context_cfg["nodes"] regardless of template vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0] - self.update_interfaces_from_node(vnfd, node) + # force inject pkey if it exists + # we want to standardize Heat using pkey as a string so we don't rely + # on the filesystem + try: + vnfd['mgmt-interface']['pkey'] = node['pkey'] + except KeyError: + pass + self.create_interfaces_from_node(vnfd, node) vnf_impl = self.get_vnf_impl(vnfd['id']) vnf_instance = vnf_impl(node_name, vnfd) vnfs.append(vnf_instance) @@ -493,7 +600,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ vnf.instantiate(self.scenario_cfg, self.context_cfg) LOG.info("Waiting for %s to instantiate", vnf.name) vnf.wait_for_instantiate() - except RuntimeError: + except: + LOG.exception("") for vnf in self.vnfs: vnf.terminate() raise @@ -504,7 +612,7 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ traffic_gen.listen_traffic(self.traffic_profile) # register collector with yardstick for KPI collection. - self.collector = Collector(self.vnfs, self.traffic_profile) + self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile) self.collector.start() # Start the actual traffic @@ -520,11 +628,11 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ :return: None """ - for vnf in self.vnfs: - # Result example: - # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} - LOG.debug("vnf") - result.update(self.collector.get_kpi(vnf)) + # this is the only method that is check from the runner + # so if we have any fatal error it must be raised via these methods + # otherwise we will not terminate + + result.update(self.collector.get_kpi()) def teardown(self): """ Stop the collector and terminate VNF & TG instance @@ -532,7 +640,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ :return """ - self.collector.stop() - for vnf in self.vnfs: - LOG.info("Stopping %s", vnf.name) - vnf.terminate() + try: + try: + self.collector.stop() + for vnf in self.vnfs: + LOG.info("Stopping %s", vnf.name) + vnf.terminate() + LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) + finally: + terminate_children() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise RuntimeError("Error in teardown")