import errno
import ipaddress
+
+import copy
import os
import sys
import re
from itertools import chain
import six
-from operator import itemgetter
+import yaml
from collections import defaultdict
from yardstick.benchmark.scenarios import base
+from yardstick.common.constants import LOG_DIR
+from yardstick.common.process import terminate_children
from yardstick.common.utils import import_modules_from_package, itersubclasses
from yardstick.common.yaml_loader import yaml_load
from yardstick.network_services.collector.subscriber import Collector
class SshManager(object):
- def __init__(self, node):
+ def __init__(self, node, timeout=120):
super(SshManager, self).__init__()
self.node = node
self.conn = None
+ self.timeout = timeout
def __enter__(self):
"""
"""
try:
self.conn = ssh.SSH.from_node(self.node)
- self.conn.wait()
+ self.conn.wait(timeout=self.timeout)
except SSHError as error:
LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
# self.conn defaults to None
self.vnfs = []
self.collector = None
self.traffic_profile = None
+ self.node_netdevs = {}
def _get_ip_flow_range(self, ip_start_range):
+ # IP range is specified as 'x.x.x.x-y.y.y.y'
+ if isinstance(ip_start_range, six.string_types):
+ return ip_start_range
+
node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
- if node_name is not None:
+ if node_name is None:
+ # we are manually specifying the range
+ ip_addr_range = range_or_interface
+ else:
node = self.context_cfg["nodes"].get(node_name, {})
try:
# the ip_range is the interface name
LOG.warning("Only single IP in range %s", ipaddr)
# fall back to single IP range
ip_addr_range = ip
- else:
- # we are manually specifying the range
- ip_addr_range = range_or_interface
return ip_addr_range
def _get_traffic_flow(self):
flow = {}
try:
+ # TODO: should be .0 or .1 so we can use list
+ # but this also roughly matches uplink_0, downlink_0
fflow = self.scenario_cfg["options"]["flow"]
for index, src in enumerate(fflow.get("src_ip", [])):
- flow["src_ip{}".format(index)] = self._get_ip_flow_range(src)
+ flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
for index, dst in enumerate(fflow.get("dst_ip", [])):
- flow["dst_ip{}".format(index)] = self._get_ip_flow_range(dst)
+ flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
+
+ for index, publicip in enumerate(fflow.get("public_ip", [])):
+ flow["public_ip_{}".format(index)] = publicip
+
+ for index, src_port in enumerate(fflow.get("src_port", [])):
+ flow["src_port_{}".format(index)] = src_port
- for index, publicip in enumerate(fflow.get("publicip", [])):
- flow["public_ip{}".format(index)] = publicip
+ for index, dst_port in enumerate(fflow.get("dst_port", [])):
+ flow["dst_port_{}".format(index)] = dst_port
flow["count"] = fflow["count"]
except KeyError:
traffic_map_data = {
'flow': self._get_traffic_flow(),
'imix': self._get_traffic_imix(),
- 'private': {},
- 'public': {},
+ TrafficProfile.UPLINK: {},
+ TrafficProfile.DOWNLINK: {},
}
traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
# check for xe0, xe1
intf = nodes[name]["interfaces"][if_name]
except KeyError:
- # if not xe0, then maybe vld_id, private_0, public_0
+ # if not xe0, then maybe vld_id, uplink_0, downlink_0
# pop it and re-insert with the correct name from topology
intf = nodes[name]["interfaces"].pop(vld_id)
nodes[name]["interfaces"][if_name] = intf
node0_if["node_name"] = node0_name
node1_if["node_name"] = node1_name
- vld_networks = self.get_vld_networks(self.context_cfg["networks"])
node0_if["vld_id"] = vld["id"]
node1_if["vld_id"] = vld["id"]
node1_if["peer_ifname"] = node0_if_name
# just load the network
+ vld_networks = self.get_vld_networks(self.context_cfg["networks"])
node0_if["network"] = vld_networks.get(vld["id"], {})
node1_if["network"] = vld_networks.get(vld["id"], {})
vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
self.context_cfg["nodes"][vnf_name].update(vnfd)
- @staticmethod
- def _sort_dpdk_port_num(netdevs):
- # dpdk_port_num is PCI BUS ID ordering, lowest first
- s = sorted(netdevs.values(), key=itemgetter('pci_bus_id'))
- for dpdk_port_num, netdev in enumerate(s):
- netdev['dpdk_port_num'] = dpdk_port_num
+ def _probe_netdevs(self, node, node_dict, timeout=120):
+ try:
+ return self.node_netdevs[node]
+ except KeyError:
+ pass
- def _probe_netdevs(self, node, node_dict):
- cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
netdevs = {}
- with SshManager(node_dict) as conn:
+ cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
+
+ with SshManager(node_dict, timeout=timeout) as conn:
if conn:
exit_status = conn.execute(cmd)[0]
if exit_status != 0:
raise IncorrectSetup(
"Cannot find netdev info in sysfs" % node)
netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
+
+ self.node_netdevs[node] = netdevs
return netdevs
@classmethod
'ifindex': netdev['ifindex'],
})
+ def _generate_pod_yaml(self):
+ context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
+ # convert OrderedDict to a list
+ # pod.yaml nodes is a list
+ nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
+ pod_dict = {
+ "nodes": nodes,
+ "networks": self.context_cfg["networks"]
+ }
+ with open(context_yaml, "w") as context_out:
+ yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
+ explicit_start=True)
+
+ @staticmethod
+ def _serialize_node(node):
+ new_node = copy.deepcopy(node)
+ # name field is required
+ # remove context suffix
+ new_node["name"] = node['name'].split('.')[0]
+ try:
+ new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
+ except KeyError:
+ pass
+ return new_node
+
TOPOLOGY_REQUIRED_KEYS = frozenset({
"vpci", "local_ip", "netmask", "local_mac", "driver"})
:return: None. Side effect: context_cfg is updated
"""
+ num_nodes = len(self.context_cfg["nodes"])
+ # OpenStack instance creation time is probably proportional to the number
+ # of instances
+ timeout = 120 * num_nodes
for node, node_dict in self.context_cfg["nodes"].items():
for network in node_dict["interfaces"].values():
# only ssh probe if there are missing values
# ssh probe won't work on Ixia, so we had better define all our values
try:
- netdevs = self._probe_netdevs(node, node_dict)
+ netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
except (SSHError, SSHTimeout):
raise IncorrectConfig(
"Unable to probe missing interface fields '%s', on node %s "
"Require interface fields '%s' not found, topology file "
"corrupted" % ', '.join(missing))
+ # we have to generate pod.yaml here so we have vpci and driver
+ self._generate_pod_yaml()
# 3. Use topology file to find connections & resolve dest address
self._resolve_topology()
self._update_context_with_topology()
(expected_name, classes_found))
@staticmethod
- def update_interfaces_from_node(vnfd, node):
- for intf in vnfd["vdu"][0]["external-interface"]:
- node_intf = node['interfaces'][intf['name']]
- intf['virtual-interface'].update(node_intf)
+ def create_interfaces_from_node(vnfd, node):
+ ext_intfs = vnfd["vdu"][0]["external-interface"] = []
+ # have to sort so xe0 goes first
+ for intf_name, intf in sorted(node['interfaces'].items()):
+ # only interfaces with vld_id are added.
+ # Thus there are two layers of filters, only intefaces with vld_id
+ # show up in interfaces, and only interfaces with traffic profiles
+ # are used by the generators
+ if intf.get('vld_id'):
+ # force dpkd_port_num to int so we can do reverse lookup
+ try:
+ intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
+ except KeyError:
+ pass
+ ext_intf = {
+ "name": intf_name,
+ "virtual-interface": intf,
+ "vnfd-connection-point-ref": intf_name,
+ }
+ ext_intfs.append(ext_intf)
def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
""" Create VNF objects based on YAML descriptors
# we assume OrderedDict for consistenct in instantiation
for node_name, node in context_cfg["nodes"].items():
LOG.debug(node)
- file_name = node["VNF model"]
+ try:
+ file_name = node["VNF model"]
+ except KeyError:
+ LOG.debug("no model for %s, skipping", node_name)
+ continue
file_path = scenario_cfg['task_path']
with open_relative_file(file_name, file_path) as stream:
vnf_model = stream.read()
vnfd = vnfdgen.generate_vnfd(vnf_model, node)
# TODO: here add extra context_cfg["nodes"] regardless of template
vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
- self.update_interfaces_from_node(vnfd, node)
+ # force inject pkey if it exists
+ # we want to standardize Heat using pkey as a string so we don't rely
+ # on the filesystem
+ try:
+ vnfd['mgmt-interface']['pkey'] = node['pkey']
+ except KeyError:
+ pass
+ self.create_interfaces_from_node(vnfd, node)
vnf_impl = self.get_vnf_impl(vnfd['id'])
vnf_instance = vnf_impl(node_name, vnfd)
vnfs.append(vnf_instance)
vnf.instantiate(self.scenario_cfg, self.context_cfg)
LOG.info("Waiting for %s to instantiate", vnf.name)
vnf.wait_for_instantiate()
- except RuntimeError:
+ except:
+ LOG.exception("")
for vnf in self.vnfs:
vnf.terminate()
raise
traffic_gen.listen_traffic(self.traffic_profile)
# register collector with yardstick for KPI collection.
- self.collector = Collector(self.vnfs, self.traffic_profile)
+ self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
self.collector.start()
# Start the actual traffic
:return: None
"""
- for vnf in self.vnfs:
- # Result example:
- # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
- LOG.debug("collect KPI for %s", vnf.name)
- result.update(self.collector.get_kpi(vnf))
+ # this is the only method that is check from the runner
+ # so if we have any fatal error it must be raised via these methods
+ # otherwise we will not terminate
+
+ result.update(self.collector.get_kpi())
def teardown(self):
""" Stop the collector and terminate VNF & TG instance
:return
"""
- self.collector.stop()
- for vnf in self.vnfs:
- LOG.info("Stopping %s", vnf.name)
- vnf.terminate()
+ try:
+ try:
+ self.collector.stop()
+ for vnf in self.vnfs:
+ LOG.info("Stopping %s", vnf.name)
+ vnf.terminate()
+ LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
+ finally:
+ terminate_children()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise RuntimeError("Error in teardown")