__scenario_type__ = "NSPerf"
- def __init__(self, scenario_cfg, context_cfg): # Yardstick API
+ def __init__(self, scenario_cfg, context_cfg): # pragma: no cover
super(NetworkServiceTestCase, self).__init__()
self.scenario_cfg = scenario_cfg
self.context_cfg = context_cfg
self.traffic_profile = None
self.node_netdevs = {}
self.bin_path = get_nsb_option('bin_path', '')
+ self._mq_ids = []
+
+ def is_ended(self):
+ return self.traffic_profile is not None and self.traffic_profile.is_ended()
def _get_ip_flow_range(self, ip_start_range):
+ """Retrieve a CIDR first and last viable IPs
- # IP range is specified as 'x.x.x.x-y.y.y.y'
+ :param ip_start_range: could be the IP range itself or a dictionary
+ with the host name and the port.
+ :return: (str) IP range (min, max) with this format "x.x.x.x-y.y.y.y"
+ """
if isinstance(ip_start_range, six.string_types):
return ip_start_range
- node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
+ node_name, range_or_interface = next(iter(ip_start_range.items()),
+ (None, '0.0.0.0'))
if node_name is None:
- # we are manually specifying the range
- ip_addr_range = range_or_interface
+ return range_or_interface
+
+ node = self.context_cfg['nodes'].get(node_name, {})
+ interface = node.get('interfaces', {}).get(range_or_interface)
+ if interface:
+ ip = interface['local_ip']
+ mask = interface['netmask']
else:
- node = self.context_cfg["nodes"].get(node_name, {})
- try:
- # the ip_range is the interface name
- interface = node.get("interfaces", {})[range_or_interface]
- except KeyError:
- ip = "0.0.0.0"
- mask = "255.255.255.0"
- else:
- ip = interface["local_ip"]
- # we can't default these values, they must both exist to be valid
- mask = interface["netmask"]
-
- ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
- hosts = list(ipaddr.hosts())
- if len(hosts) > 2:
- # skip the first host in case of gateway
- ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
- else:
- LOG.warning("Only single IP in range %s", ipaddr)
- # fall back to single IP range
- ip_addr_range = ip
+ ip = '0.0.0.0'
+ mask = '255.255.255.0'
+
+ ipaddr = ipaddress.ip_network(
+ six.text_type('{}/{}'.format(ip, mask)), strict=False)
+ if ipaddr.prefixlen + 2 < ipaddr.max_prefixlen:
+ ip_addr_range = '{}-{}'.format(ipaddr[2], ipaddr[-2])
+ else:
+ LOG.warning('Only single IP in range %s', ipaddr)
+ ip_addr_range = ip
return ip_addr_range
def _get_traffic_flow(self):
for index, dst_port in enumerate(fflow.get("dst_port", [])):
flow["dst_port_{}".format(index)] = dst_port
- flow["count"] = fflow["count"]
+ if "count" in fflow:
+ flow["count"] = fflow["count"]
+
+ if "srcseed" in fflow:
+ flow["srcseed"] = fflow["srcseed"]
+
+ if "dstseed" in fflow:
+ flow["dstseed"] = fflow["dstseed"]
+
except KeyError:
flow = {}
return {"flow": flow}
return options.get('duration',
tprofile_base.TrafficProfileConfig.DEFAULT_DURATION)
+ def _key_list_to_dict(self, key, value_list):
+ value_dict = {}
+ try:
+ for index, count in enumerate(value_list[key]):
+ value_dict["{}_{}".format(key, index)] = count
+ except KeyError:
+ value_dict = {}
+
+ return value_dict
+
+ def _get_simulated_users(self):
+ users = self.scenario_cfg.get("options", {}).get("simulated_users", {})
+ simulated_users = self._key_list_to_dict("uplink", users)
+ return {"simulated_users": simulated_users}
+
+ def _get_page_object(self):
+ objects = self.scenario_cfg.get("options", {}).get("page_object", {})
+ page_object = self._key_list_to_dict("uplink", objects)
+ return {"page_object": page_object}
+
def _fill_traffic_profile(self):
tprofile = self._get_traffic_profile()
extra_args = self.scenario_cfg.get('extra_args', {})
tprofile_base.TrafficProfile.UPLINK: {},
tprofile_base.TrafficProfile.DOWNLINK: {},
'extra_args': extra_args,
- 'duration': self._get_duration()}
+ 'duration': self._get_duration(),
+ 'page_object': self._get_page_object(),
+ 'simulated_users': self._get_simulated_users()}
traffic_vnfd = vnfdgen.generate_vnfd(tprofile, tprofile_data)
- self.traffic_profile = tprofile_base.TrafficProfile.get(traffic_vnfd)
+
+ traffic_config = \
+ self.scenario_cfg.get("options", {}).get("traffic_config", {})
+
+ traffic_vnfd.setdefault("traffic_profile", {})
+ traffic_vnfd["traffic_profile"].update(traffic_config)
+
+ self.traffic_profile = \
+ tprofile_base.TrafficProfile.get(traffic_vnfd)
def _get_topology(self):
topology = self.scenario_cfg["topology"]
topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data)
self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
- def _find_vnf_name_from_id(self, vnf_id):
+ def _find_vnf_name_from_id(self, vnf_id): # pragma: no cover
return next((vnfd["vnfd-id-ref"]
for vnfd in self.topology["constituent-vnfd"]
if vnf_id == vnfd["member-vnf-index"]), None)
- def _find_vnfd_from_vnf_idx(self, vnf_id):
+ def _find_vnfd_from_vnf_idx(self, vnf_id): # pragma: no cover
return next((vnfd
for vnfd in self.topology["constituent-vnfd"]
if vnf_id == vnfd["member-vnf-index"]), None)
@staticmethod
- def find_node_if(nodes, name, if_name, vld_id):
+ def find_node_if(nodes, name, if_name, vld_id): # pragma: no cover
try:
# check for xe0, xe1
intf = nodes[name]["interfaces"][if_name]
node0_if["peer_intf"] = node1_copy
node1_if["peer_intf"] = node0_copy
- def _update_context_with_topology(self):
+ def _update_context_with_topology(self): # pragma: no cover
for vnfd in self.topology["constituent-vnfd"]:
vnf_idx = vnfd["member-vnf-index"]
vnf_name = self._find_vnf_name_from_id(vnf_idx)
vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
self.context_cfg["nodes"][vnf_name].update(vnfd)
- def _generate_pod_yaml(self):
+ def _generate_pod_yaml(self): # pragma: no cover
context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
# convert OrderedDict to a list
# pod.yaml nodes is a list
explicit_start=True)
@staticmethod
- def _serialize_node(node):
+ def _serialize_node(node): # pragma: no cover
new_node = copy.deepcopy(node)
# name field is required
# remove context suffix
self._update_context_with_topology()
@classmethod
- def get_vnf_impl(cls, vnf_model_id):
+ def get_vnf_impl(cls, vnf_model_id): # pragma: no cover
""" Find the implementing class from vnf_model["vnf"]["name"] field
:param vnf_model_id: parsed vnfd model ID field
raise exceptions.IncorrectConfig(error_msg=message)
@staticmethod
- def create_interfaces_from_node(vnfd, node):
+ def create_interfaces_from_node(vnfd, node): # pragma: no cover
ext_intfs = vnfd["vdu"][0]["external-interface"] = []
# have to sort so xe0 goes first
for intf_name, intf in sorted(node['interfaces'].items()):
pass
self.create_interfaces_from_node(vnfd, node)
vnf_impl = self.get_vnf_impl(vnfd['id'])
- vnf_instance = vnf_impl(node_name, vnfd)
+ vnf_instance = vnf_impl(node_name, vnfd, scenario_cfg['task_id'])
vnfs.append(vnf_instance)
self.vnfs = vnfs
return vnfs
def setup(self):
- """ Setup infrastructure, provission VNFs & start traffic
-
- :return:
- """
+ """Setup infrastructure, provission VNFs & start traffic"""
# 1. Verify if infrastructure mapping can meet topology
self.map_topology_to_infrastructure()
# 1a. Load VNF models
for traffic_gen in traffic_runners:
LOG.info("Starting traffic on %s", traffic_gen.name)
traffic_gen.run_traffic(self.traffic_profile)
+ self._mq_ids.append(traffic_gen.get_mq_producer_id())
+
+ def get_mq_ids(self): # pragma: no cover
+ """Return stored MQ producer IDs"""
+ return self._mq_ids
def run(self, result): # yardstick API
""" Yardstick calls run() at intervals defined in the yaml and
LOG.exception("")
raise RuntimeError("Error in teardown")
- def pre_run_wait_time(self, time_seconds):
+ def pre_run_wait_time(self, time_seconds): # pragma: no cover
"""Time waited before executing the run method"""
time.sleep(time_seconds)
- def post_run_wait_time(self, time_seconds):
+ def post_run_wait_time(self, time_seconds): # pragma: no cover
"""Time waited after executing the run method"""
pass