# Copyright (c) 2016-2017 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ NSPerf specific scenario definition """ from __future__ import absolute_import import logging import errno import os import re from itertools import chain from operator import itemgetter from collections import defaultdict from yardstick.benchmark.scenarios import base from yardstick.common.utils import import_modules_from_package, itersubclasses from yardstick.common.yaml_loader import yaml_load from yardstick.network_services.collector.subscriber import Collector from yardstick.network_services.vnf_generic import vnfdgen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.traffic_profile.base import TrafficProfile from yardstick import ssh LOG = logging.getLogger(__name__) class SSHError(Exception): """Class handles ssh connection error exception""" pass class SSHTimeout(SSHError): """Class handles ssh connection timeout exception""" pass class IncorrectConfig(Exception): """Class handles incorrect configuration during setup""" pass class IncorrectSetup(Exception): """Class handles incorrect setup during setup""" pass class SshManager(object): def __init__(self, node): super(SshManager, self).__init__() self.node = node self.conn = None def __enter__(self): """ args -> network device mappings returns -> ssh connection ready to be used """ try: self.conn = ssh.SSH.from_node(self.node) self.conn.wait() except SSHError as error: LOG.info("connect failed to %s, due to %s", self.node["ip"], error) # self.conn defaults to None return self.conn def __exit__(self, exc_type, exc_val, exc_tb): if self.conn: self.conn.close() def find_relative_file(path, task_path): # fixme: create schema to validate all fields have been provided try: with open(path): pass return path except IOError as e: if e.errno != errno.ENOENT: raise else: rel_path = os.path.join(task_path, path) with open(rel_path): pass return rel_path def open_relative_file(path, task_path): try: return open(path) except IOError as e: if e.errno == errno.ENOENT: return open(os.path.join(task_path, path)) raise class NetworkServiceTestCase(base.Scenario): """Class handles Generic framework to do pre-deployment VNF & Network service testing """ __scenario_type__ = "NSPerf" def __init__(self, scenario_cfg, context_cfg): # Yardstick API super(NetworkServiceTestCase, self).__init__() self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg # fixme: create schema to validate all fields have been provided with open_relative_file(scenario_cfg["topology"], scenario_cfg['task_path']) as stream: topology_yaml = yaml_load(stream) self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0] self.vnfs = [] self.collector = None self.traffic_profile = None def _get_traffic_flow(self): try: with open(self.scenario_cfg["traffic_options"]["flow"]) as fflow: flow = yaml_load(fflow) except (KeyError, IOError, OSError): flow = {} return flow def _get_traffic_imix(self): try: with open(self.scenario_cfg["traffic_options"]["imix"]) as fimix: imix = yaml_load(fimix) except (KeyError, IOError, OSError): imix = {} return imix def _get_traffic_profile(self): profile = self.scenario_cfg["traffic_profile"] path = self.scenario_cfg["task_path"] with open_relative_file(profile, path) as infile: return infile.read() def _fill_traffic_profile(self): traffic_mapping = self._get_traffic_profile() traffic_map_data = { 'flow': self._get_traffic_flow(), 'imix': self._get_traffic_imix(), 'private': {}, 'public': {}, } traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data) self.traffic_profile = TrafficProfile.get(traffic_vnfd) return self.traffic_profile def _find_vnf_name_from_id(self, vnf_id): return next((vnfd["vnfd-id-ref"] for vnfd in self.topology["constituent-vnfd"] if vnf_id == vnfd["member-vnf-index"]), None) @staticmethod def get_vld_networks(networks): return {n['vld_id']: n for n in networks.values()} def _resolve_topology(self): for vld in self.topology["vld"]: try: node0_data, node1_data = vld["vnfd-connection-point-ref"] except (ValueError, TypeError): raise IncorrectConfig("Topology file corrupted, " "wrong endpoint count for connection") node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"]) node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"]) node0_if_name = node0_data["vnfd-connection-point-ref"] node1_if_name = node1_data["vnfd-connection-point-ref"] try: nodes = self.context_cfg["nodes"] node0_if = nodes[node0_name]["interfaces"][node0_if_name] node1_if = nodes[node1_name]["interfaces"][node1_if_name] # names so we can do reverse lookups node0_if["ifname"] = node0_if_name node1_if["ifname"] = node1_if_name node0_if["node_name"] = node0_name node1_if["node_name"] = node1_name vld_networks = self.get_vld_networks(self.context_cfg["networks"]) node0_if["vld_id"] = vld["id"] node1_if["vld_id"] = vld["id"] # set peer name node0_if["peer_name"] = node1_name node1_if["peer_name"] = node0_name # set peer interface name node0_if["peer_ifname"] = node1_if_name node1_if["peer_ifname"] = node0_if_name # just load the network node0_if["network"] = vld_networks.get(vld["id"], {}) node1_if["network"] = vld_networks.get(vld["id"], {}) node0_if["dst_mac"] = node1_if["local_mac"] node0_if["dst_ip"] = node1_if["local_ip"] node1_if["dst_mac"] = node0_if["local_mac"] node1_if["dst_ip"] = node0_if["local_ip"] except KeyError: LOG.exception("") raise IncorrectConfig("Required interface not found, " "topology file corrupted") for vld in self.topology['vld']: try: node0_data, node1_data = vld["vnfd-connection-point-ref"] except (ValueError, TypeError): raise IncorrectConfig("Topology file corrupted, " "wrong endpoint count for connection") node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"]) node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"]) node0_if_name = node0_data["vnfd-connection-point-ref"] node1_if_name = node1_data["vnfd-connection-point-ref"] nodes = self.context_cfg["nodes"] node0_if = nodes[node0_name]["interfaces"][node0_if_name] node1_if = nodes[node1_name]["interfaces"][node1_if_name] # add peer interface dict, but remove circular link # TODO: don't waste memory node0_copy = node0_if.copy() node1_copy = node1_if.copy() node0_if["peer_intf"] = node1_copy node1_if["peer_intf"] = node0_copy def _find_vnfd_from_vnf_idx(self, vnf_idx): return next((vnfd for vnfd in self.topology["constituent-vnfd"] if vnf_idx == vnfd["member-vnf-index"]), None) def _update_context_with_topology(self): for vnfd in self.topology["constituent-vnfd"]: vnf_idx = vnfd["member-vnf-index"] vnf_name = self._find_vnf_name_from_id(vnf_idx) vnfd = self._find_vnfd_from_vnf_idx(vnf_idx) self.context_cfg["nodes"][vnf_name].update(vnfd) @staticmethod def _sort_dpdk_port_num(netdevs): # dpdk_port_num is PCI BUS ID ordering, lowest first s = sorted(netdevs.values(), key=itemgetter('pci_bus_id')) for dpdk_port_num, netdev in enumerate(s): netdev['dpdk_port_num'] = dpdk_port_num @classmethod def _probe_missing_values(cls, netdevs, network, missing): mac_lower = network['local_mac'].lower() for netdev in netdevs.values(): if netdev['address'].lower() != mac_lower: continue network.update({ 'driver': netdev['driver'], 'vpci': netdev['pci_bus_id'], 'ifindex': netdev['ifindex'], }) TOPOLOGY_REQUIRED_KEYS = frozenset({ "vpci", "local_ip", "netmask", "local_mac", "driver"}) def map_topology_to_infrastructure(self): """ This method should verify if the available resources defined in pod.yaml match the topology.yaml file. :return: None. Side effect: context_cfg is updated """ for node, node_dict in self.context_cfg["nodes"].items(): cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show" with SshManager(node_dict) as conn: exit_status = conn.execute(cmd)[0] if exit_status != 0: raise IncorrectSetup("Node's %s lacks ip tool." % node) exit_status, stdout, _ = conn.execute( self.FIND_NETDEVICE_STRING) if exit_status != 0: raise IncorrectSetup( "Cannot find netdev info in sysfs" % node) netdevs = node_dict['netdevs'] = self.parse_netdev_info( stdout) for network in node_dict["interfaces"].values(): missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network) if not missing: continue try: self._probe_missing_values(netdevs, network, missing) except KeyError: pass else: missing = self.TOPOLOGY_REQUIRED_KEYS.difference( network) if missing: raise IncorrectConfig( "Require interface fields '%s' not found, topology file " "corrupted" % ', '.join(missing)) # 3. Use topology file to find connections & resolve dest address self._resolve_topology() self._update_context_with_topology() FIND_NETDEVICE_STRING = r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \ $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \ $1/device/subsystem_vendor $1/device/subsystem_device ; \ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ ' sh \{\}/* \; """ BASE_ADAPTER_RE = re.compile( '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M) @classmethod def parse_netdev_info(cls, stdout): network_devices = defaultdict(dict) matches = cls.BASE_ADAPTER_RE.findall(stdout) for bus_path, interface_name, name, value in matches: dirname, bus_id = os.path.split(bus_path) if 'virtio' in bus_id: # for some stupid reason VMs include virtio1/ # in PCI device path bus_id = os.path.basename(dirname) # remove extra 'device/' from 'device/vendor, # device/subsystem_vendor', etc. if 'device/' in name: name = name.split('/')[1] network_devices[interface_name][name] = value network_devices[interface_name][ 'interface_name'] = interface_name network_devices[interface_name]['pci_bus_id'] = bus_id # convert back to regular dict return dict(network_devices) @classmethod def get_vnf_impl(cls, vnf_model_id): """ Find the implementing class from vnf_model["vnf"]["name"] field :param vnf_model_id: parsed vnfd model ID field :return: subclass of GenericVNF """ import_modules_from_package( "yardstick.network_services.vnf_generic.vnf") expected_name = vnf_model_id classes_found = [] def impl(): for name, class_ in ((c.__name__, c) for c in itersubclasses(GenericVNF)): if name == expected_name: yield class_ classes_found.append(name) try: return next(impl()) except StopIteration: pass raise IncorrectConfig("No implementation for %s found in %s" % (expected_name, classes_found)) @staticmethod def update_interfaces_from_node(vnfd, node): for intf in vnfd["vdu"][0]["external-interface"]: node_intf = node['interfaces'][intf['name']] intf['virtual-interface'].update(node_intf) def load_vnf_models(self, scenario_cfg=None, context_cfg=None): """ Create VNF objects based on YAML descriptors :param scenario_cfg: :type scenario_cfg: :param context_cfg: :return: """ if scenario_cfg is None: scenario_cfg = self.scenario_cfg if context_cfg is None: context_cfg = self.context_cfg vnfs = [] # we assume OrderedDict for consistenct in instantiation for node_name, node in context_cfg["nodes"].items(): LOG.debug(node) file_name = node["VNF model"] file_path = scenario_cfg['task_path'] with open_relative_file(file_name, file_path) as stream: vnf_model = stream.read() vnfd = vnfdgen.generate_vnfd(vnf_model, node) # TODO: here add extra context_cfg["nodes"] regardless of template vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0] self.update_interfaces_from_node(vnfd, node) vnf_impl = self.get_vnf_impl(vnfd['id']) vnf_instance = vnf_impl(node_name, vnfd) vnfs.append(vnf_instance) self.vnfs = vnfs return vnfs def setup(self): """ Setup infrastructure, provission VNFs & start traffic :return: """ # 1. Verify if infrastructure mapping can meet topology self.map_topology_to_infrastructure() # 1a. Load VNF models self.load_vnf_models() # 1b. Fill traffic profile with information from topology self._fill_traffic_profile() # 2. Provision VNFs # link events will cause VNF application to exit # so we should start traffic runners before VNFs traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic] non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic] try: for vnf in chain(traffic_runners, non_traffic_runners): LOG.info("Instantiating %s", vnf.name) vnf.instantiate(self.scenario_cfg, self.context_cfg) for vnf in chain(traffic_runners, non_traffic_runners): LOG.info("Waiting for %s to instantiate", vnf.name) vnf.wait_for_instantiate() except RuntimeError: for vnf in self.vnfs: vnf.terminate() raise # 3. Run experiment # Start listeners first to avoid losing packets for traffic_gen in traffic_runners: traffic_gen.listen_traffic(self.traffic_profile) # register collector with yardstick for KPI collection. self.collector = Collector(self.vnfs, self.traffic_profile) self.collector.start() # Start the actual traffic for traffic_gen in traffic_runners: LOG.info("Starting traffic on %s", traffic_gen.name) traffic_gen.run_traffic(self.traffic_profile) def run(self, result): # yardstick API """ Yardstick calls run() at intervals defined in the yaml and produces timestamped samples :param result: dictionary with results to update :return: None """ for vnf in self.vnfs: # Result example: # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} LOG.debug("vnf") result.update(self.collector.get_kpi(vnf)) def teardown(self): """ Stop the collector and terminate VNF & TG instance :return """ self.collector.stop() for vnf in self.vnfs: LOG.info("Stopping %s", vnf.name) vnf.terminate()