Merge "NSB PROX test hang fixes"
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ NSPerf specific scenario definition """
15
16 from __future__ import absolute_import
17
18 import logging
19 import errno
20
21 import ipaddress
22 import os
23 import sys
24 import re
25 from itertools import chain
26
27 import six
28 import yaml
29 from collections import defaultdict
30
31 from yardstick.benchmark.scenarios import base
32 from yardstick.common.constants import LOG_DIR
33 from yardstick.common.process import terminate_children
34 from yardstick.common.utils import import_modules_from_package, itersubclasses
35 from yardstick.common.yaml_loader import yaml_load
36 from yardstick.network_services.collector.subscriber import Collector
37 from yardstick.network_services.vnf_generic import vnfdgen
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.traffic_profile.base import TrafficProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick import ssh
42
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SSHError(Exception):
48     """Class handles ssh connection error exception"""
49     pass
50
51
52 class SSHTimeout(SSHError):
53     """Class handles ssh connection timeout exception"""
54     pass
55
56
57 class IncorrectConfig(Exception):
58     """Class handles incorrect configuration during setup"""
59     pass
60
61
62 class IncorrectSetup(Exception):
63     """Class handles incorrect setup during setup"""
64     pass
65
66
67 class SshManager(object):
68     def __init__(self, node, timeout=120):
69         super(SshManager, self).__init__()
70         self.node = node
71         self.conn = None
72         self.timeout = timeout
73
74     def __enter__(self):
75         """
76         args -> network device mappings
77         returns -> ssh connection ready to be used
78         """
79         try:
80             self.conn = ssh.SSH.from_node(self.node)
81             self.conn.wait(timeout=self.timeout)
82         except SSHError as error:
83             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
84         # self.conn defaults to None
85         return self.conn
86
87     def __exit__(self, exc_type, exc_val, exc_tb):
88         if self.conn:
89             self.conn.close()
90
91
92 def find_relative_file(path, task_path):
93     """
94     Find file in one of places: in abs of path or
95     relative to TC scenario file. In this order.
96
97     :param path:
98     :param task_path:
99     :return str: full path to file
100     """
101     # fixme: create schema to validate all fields have been provided
102     for lookup in [os.path.abspath(path), os.path.join(task_path, path)]:
103         try:
104             with open(lookup):
105                 return lookup
106         except IOError:
107             pass
108     raise IOError(errno.ENOENT, 'Unable to find {} file'.format(path))
109
110
111 def open_relative_file(path, task_path):
112     try:
113         return open(path)
114     except IOError as e:
115         if e.errno == errno.ENOENT:
116             return open(os.path.join(task_path, path))
117         raise
118
119
120 class NetworkServiceTestCase(base.Scenario):
121     """Class handles Generic framework to do pre-deployment VNF &
122        Network service testing  """
123
124     __scenario_type__ = "NSPerf"
125
126     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
127         super(NetworkServiceTestCase, self).__init__()
128         self.scenario_cfg = scenario_cfg
129         self.context_cfg = context_cfg
130
131         # fixme: create schema to validate all fields have been provided
132         with open_relative_file(scenario_cfg["topology"],
133                                 scenario_cfg['task_path']) as stream:
134             topology_yaml = yaml_load(stream)
135
136         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
137         self.vnfs = []
138         self.collector = None
139         self.traffic_profile = None
140         self.node_netdevs = {}
141
142     def _get_ip_flow_range(self, ip_start_range):
143
144         # IP range is specified as 'x.x.x.x-y.y.y.y'
145         if isinstance(ip_start_range, six.string_types):
146             return ip_start_range
147
148         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
149         if node_name is None:
150             # we are manually specifying the range
151             ip_addr_range = range_or_interface
152         else:
153             node = self.context_cfg["nodes"].get(node_name, {})
154             try:
155                 # the ip_range is the interface name
156                 interface = node.get("interfaces", {})[range_or_interface]
157             except KeyError:
158                 ip = "0.0.0.0"
159                 mask = "255.255.255.0"
160             else:
161                 ip = interface["local_ip"]
162                 # we can't default these values, they must both exist to be valid
163                 mask = interface["netmask"]
164
165             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
166             hosts = list(ipaddr.hosts())
167             if len(hosts) > 2:
168                 # skip the first host in case of gateway
169                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
170             else:
171                 LOG.warning("Only single IP in range %s", ipaddr)
172                 # fall back to single IP range
173                 ip_addr_range = ip
174         return ip_addr_range
175
176     def _get_traffic_flow(self):
177         flow = {}
178         try:
179             # TODO: should be .0  or .1 so we can use list
180             # but this also roughly matches uplink_0, downlink_0
181             fflow = self.scenario_cfg["options"]["flow"]
182             for index, src in enumerate(fflow.get("src_ip", [])):
183                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
184
185             for index, dst in enumerate(fflow.get("dst_ip", [])):
186                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
187
188             for index, publicip in enumerate(fflow.get("public_ip", [])):
189                 flow["public_ip_{}".format(index)] = publicip
190
191             flow["count"] = fflow["count"]
192         except KeyError:
193             flow = {}
194         return {"flow": flow}
195
196     def _get_traffic_imix(self):
197         try:
198             imix = {"imix": self.scenario_cfg['options']['framesize']}
199         except KeyError:
200             imix = {}
201         return imix
202
203     def _get_traffic_profile(self):
204         profile = self.scenario_cfg["traffic_profile"]
205         path = self.scenario_cfg["task_path"]
206         with open_relative_file(profile, path) as infile:
207             return infile.read()
208
209     def _fill_traffic_profile(self):
210         traffic_mapping = self._get_traffic_profile()
211         traffic_map_data = {
212             'flow': self._get_traffic_flow(),
213             'imix': self._get_traffic_imix(),
214             TrafficProfile.UPLINK: {},
215             TrafficProfile.DOWNLINK: {},
216         }
217
218         traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
219         self.traffic_profile = TrafficProfile.get(traffic_vnfd)
220         return self.traffic_profile
221
222     def _find_vnf_name_from_id(self, vnf_id):
223         return next((vnfd["vnfd-id-ref"]
224                      for vnfd in self.topology["constituent-vnfd"]
225                      if vnf_id == vnfd["member-vnf-index"]), None)
226
227     @staticmethod
228     def get_vld_networks(networks):
229         # network name is vld_id
230         vld_map = {}
231         for name, n in networks.items():
232             try:
233                 vld_map[n['vld_id']] = n
234             except KeyError:
235                 vld_map[name] = n
236         return vld_map
237
238     @staticmethod
239     def find_node_if(nodes, name, if_name, vld_id):
240         try:
241             # check for xe0, xe1
242             intf = nodes[name]["interfaces"][if_name]
243         except KeyError:
244             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
245             # pop it and re-insert with the correct name from topology
246             intf = nodes[name]["interfaces"].pop(vld_id)
247             nodes[name]["interfaces"][if_name] = intf
248         return intf
249
250     def _resolve_topology(self):
251         for vld in self.topology["vld"]:
252             try:
253                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
254             except (ValueError, TypeError):
255                 raise IncorrectConfig("Topology file corrupted, "
256                                       "wrong endpoint count for connection")
257
258             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
259             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
260
261             node0_if_name = node0_data["vnfd-connection-point-ref"]
262             node1_if_name = node1_data["vnfd-connection-point-ref"]
263
264             try:
265                 nodes = self.context_cfg["nodes"]
266                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
267                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
268
269                 # names so we can do reverse lookups
270                 node0_if["ifname"] = node0_if_name
271                 node1_if["ifname"] = node1_if_name
272
273                 node0_if["node_name"] = node0_name
274                 node1_if["node_name"] = node1_name
275
276                 node0_if["vld_id"] = vld["id"]
277                 node1_if["vld_id"] = vld["id"]
278
279                 # set peer name
280                 node0_if["peer_name"] = node1_name
281                 node1_if["peer_name"] = node0_name
282
283                 # set peer interface name
284                 node0_if["peer_ifname"] = node1_if_name
285                 node1_if["peer_ifname"] = node0_if_name
286
287                 # just load the network
288                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
289                 node0_if["network"] = vld_networks.get(vld["id"], {})
290                 node1_if["network"] = vld_networks.get(vld["id"], {})
291
292                 node0_if["dst_mac"] = node1_if["local_mac"]
293                 node0_if["dst_ip"] = node1_if["local_ip"]
294
295                 node1_if["dst_mac"] = node0_if["local_mac"]
296                 node1_if["dst_ip"] = node0_if["local_ip"]
297
298             except KeyError:
299                 LOG.exception("")
300                 raise IncorrectConfig("Required interface not found, "
301                                       "topology file corrupted")
302
303         for vld in self.topology['vld']:
304             try:
305                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
306             except (ValueError, TypeError):
307                 raise IncorrectConfig("Topology file corrupted, "
308                                       "wrong endpoint count for connection")
309
310             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
311             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
312
313             node0_if_name = node0_data["vnfd-connection-point-ref"]
314             node1_if_name = node1_data["vnfd-connection-point-ref"]
315
316             nodes = self.context_cfg["nodes"]
317             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
318             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
319
320             # add peer interface dict, but remove circular link
321             # TODO: don't waste memory
322             node0_copy = node0_if.copy()
323             node1_copy = node1_if.copy()
324             node0_if["peer_intf"] = node1_copy
325             node1_if["peer_intf"] = node0_copy
326
327     def _find_vnfd_from_vnf_idx(self, vnf_idx):
328         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
329                      if vnf_idx == vnfd["member-vnf-index"]), None)
330
331     def _update_context_with_topology(self):
332         for vnfd in self.topology["constituent-vnfd"]:
333             vnf_idx = vnfd["member-vnf-index"]
334             vnf_name = self._find_vnf_name_from_id(vnf_idx)
335             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
336             self.context_cfg["nodes"][vnf_name].update(vnfd)
337
338     def _probe_netdevs(self, node, node_dict, timeout=120):
339         try:
340             return self.node_netdevs[node]
341         except KeyError:
342             pass
343
344         netdevs = {}
345         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
346
347         with SshManager(node_dict, timeout=timeout) as conn:
348             if conn:
349                 exit_status = conn.execute(cmd)[0]
350                 if exit_status != 0:
351                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
352                 exit_status, stdout, _ = conn.execute(
353                     self.FIND_NETDEVICE_STRING)
354                 if exit_status != 0:
355                     raise IncorrectSetup(
356                         "Cannot find netdev info in sysfs" % node)
357                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
358
359         self.node_netdevs[node] = netdevs
360         return netdevs
361
362     @classmethod
363     def _probe_missing_values(cls, netdevs, network):
364
365         mac_lower = network['local_mac'].lower()
366         for netdev in netdevs.values():
367             if netdev['address'].lower() != mac_lower:
368                 continue
369             network.update({
370                 'driver': netdev['driver'],
371                 'vpci': netdev['pci_bus_id'],
372                 'ifindex': netdev['ifindex'],
373             })
374
375     def _generate_pod_yaml(self):
376         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
377         # convert OrderedDict to a list
378         # pod.yaml nodes is a list
379         nodes = []
380         for node in self.context_cfg["nodes"].values():
381             # name field is required
382             # remove context suffix
383             node['name'] = node['name'].split('.')[0]
384             nodes.append(node)
385         nodes = self._convert_pkeys_to_string(nodes)
386         pod_dict = {
387             "nodes": nodes,
388             "networks": self.context_cfg["networks"]
389         }
390         with open(context_yaml, "w") as context_out:
391             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
392                            explicit_start=True)
393
394     @staticmethod
395     def _convert_pkeys_to_string(nodes):
396         # make copy because we are mutating
397         nodes = nodes[:]
398         for i, node in enumerate(nodes):
399             try:
400                 nodes[i] = dict(node, pkey=ssh.convert_key_to_str(node["pkey"]))
401             except KeyError:
402                 pass
403         return nodes
404
405     TOPOLOGY_REQUIRED_KEYS = frozenset({
406         "vpci", "local_ip", "netmask", "local_mac", "driver"})
407
408     def map_topology_to_infrastructure(self):
409         """ This method should verify if the available resources defined in pod.yaml
410         match the topology.yaml file.
411
412         :return: None. Side effect: context_cfg is updated
413         """
414         num_nodes = len(self.context_cfg["nodes"])
415         # OpenStack instance creation time is probably proportional to the number
416         # of instances
417         timeout = 120 * num_nodes
418         for node, node_dict in self.context_cfg["nodes"].items():
419
420             for network in node_dict["interfaces"].values():
421                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
422                 if not missing:
423                     continue
424
425                 # only ssh probe if there are missing values
426                 # ssh probe won't work on Ixia, so we had better define all our values
427                 try:
428                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
429                 except (SSHError, SSHTimeout):
430                     raise IncorrectConfig(
431                         "Unable to probe missing interface fields '%s', on node %s "
432                         "SSH Error" % (', '.join(missing), node))
433                 try:
434                     self._probe_missing_values(netdevs, network)
435                 except KeyError:
436                     pass
437                 else:
438                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
439                         network)
440                 if missing:
441                     raise IncorrectConfig(
442                         "Require interface fields '%s' not found, topology file "
443                         "corrupted" % ', '.join(missing))
444
445         # we have to generate pod.yaml here so we have vpci and driver
446         self._generate_pod_yaml()
447         # 3. Use topology file to find connections & resolve dest address
448         self._resolve_topology()
449         self._update_context_with_topology()
450
451     FIND_NETDEVICE_STRING = r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
452 $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
453 $1/device/subsystem_vendor $1/device/subsystem_device ; \
454 printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
455 ' sh  \{\}/* \;
456 """
457     BASE_ADAPTER_RE = re.compile(
458         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
459
460     @classmethod
461     def parse_netdev_info(cls, stdout):
462         network_devices = defaultdict(dict)
463         matches = cls.BASE_ADAPTER_RE.findall(stdout)
464         for bus_path, interface_name, name, value in matches:
465             dirname, bus_id = os.path.split(bus_path)
466             if 'virtio' in bus_id:
467                 # for some stupid reason VMs include virtio1/
468                 # in PCI device path
469                 bus_id = os.path.basename(dirname)
470             # remove extra 'device/' from 'device/vendor,
471             # device/subsystem_vendor', etc.
472             if 'device/' in name:
473                 name = name.split('/')[1]
474             network_devices[interface_name][name] = value
475             network_devices[interface_name][
476                 'interface_name'] = interface_name
477             network_devices[interface_name]['pci_bus_id'] = bus_id
478         # convert back to regular dict
479         return dict(network_devices)
480
481     @classmethod
482     def get_vnf_impl(cls, vnf_model_id):
483         """ Find the implementing class from vnf_model["vnf"]["name"] field
484
485         :param vnf_model_id: parsed vnfd model ID field
486         :return: subclass of GenericVNF
487         """
488         import_modules_from_package(
489             "yardstick.network_services.vnf_generic.vnf")
490         expected_name = vnf_model_id
491         classes_found = []
492
493         def impl():
494             for name, class_ in ((c.__name__, c) for c in itersubclasses(GenericVNF)):
495                 if name == expected_name:
496                     yield class_
497                 classes_found.append(name)
498
499         try:
500             return next(impl())
501         except StopIteration:
502             pass
503
504         raise IncorrectConfig("No implementation for %s found in %s" %
505                               (expected_name, classes_found))
506
507     @staticmethod
508     def create_interfaces_from_node(vnfd, node):
509         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
510         # have to sort so xe0 goes first
511         for intf_name, intf in sorted(node['interfaces'].items()):
512             # only interfaces with vld_id are added.
513             # Thus there are two layers of filters, only intefaces with vld_id
514             # show up in interfaces, and only interfaces with traffic profiles
515             # are used by the generators
516             if intf.get('vld_id'):
517                 # force dpkd_port_num to int so we can do reverse lookup
518                 try:
519                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
520                 except KeyError:
521                     pass
522                 ext_intf = {
523                     "name": intf_name,
524                     "virtual-interface": intf,
525                     "vnfd-connection-point-ref": intf_name,
526                 }
527                 ext_intfs.append(ext_intf)
528
529     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
530         """ Create VNF objects based on YAML descriptors
531
532         :param scenario_cfg:
533         :type scenario_cfg:
534         :param context_cfg:
535         :return:
536         """
537         trex_lib_path = get_nsb_option('trex_client_lib')
538         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
539
540         if scenario_cfg is None:
541             scenario_cfg = self.scenario_cfg
542
543         if context_cfg is None:
544             context_cfg = self.context_cfg
545
546         vnfs = []
547         # we assume OrderedDict for consistenct in instantiation
548         for node_name, node in context_cfg["nodes"].items():
549             LOG.debug(node)
550             try:
551                 file_name = node["VNF model"]
552             except KeyError:
553                 LOG.debug("no model for %s, skipping", node_name)
554                 continue
555             file_path = scenario_cfg['task_path']
556             with open_relative_file(file_name, file_path) as stream:
557                 vnf_model = stream.read()
558             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
559             # TODO: here add extra context_cfg["nodes"] regardless of template
560             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
561             # force inject pkey if it exists
562             # we want to standardize Heat using pkey as a string so we don't rely
563             # on the filesystem
564             try:
565                 vnfd['mgmt-interface']['pkey'] = node['pkey']
566             except KeyError:
567                 pass
568             self.create_interfaces_from_node(vnfd, node)
569             vnf_impl = self.get_vnf_impl(vnfd['id'])
570             vnf_instance = vnf_impl(node_name, vnfd)
571             vnfs.append(vnf_instance)
572
573         self.vnfs = vnfs
574         return vnfs
575
576     def setup(self):
577         """ Setup infrastructure, provission VNFs & start traffic
578
579         :return:
580         """
581         # 1. Verify if infrastructure mapping can meet topology
582         self.map_topology_to_infrastructure()
583         # 1a. Load VNF models
584         self.load_vnf_models()
585         # 1b. Fill traffic profile with information from topology
586         self._fill_traffic_profile()
587
588         # 2. Provision VNFs
589
590         # link events will cause VNF application to exit
591         # so we should start traffic runners before VNFs
592         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
593         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
594         try:
595             for vnf in chain(traffic_runners, non_traffic_runners):
596                 LOG.info("Instantiating %s", vnf.name)
597                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
598                 LOG.info("Waiting for %s to instantiate", vnf.name)
599                 vnf.wait_for_instantiate()
600         except:
601             LOG.exception("")
602             for vnf in self.vnfs:
603                 vnf.terminate()
604             raise
605
606         # 3. Run experiment
607         # Start listeners first to avoid losing packets
608         for traffic_gen in traffic_runners:
609             traffic_gen.listen_traffic(self.traffic_profile)
610
611         # register collector with yardstick for KPI collection.
612         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
613         self.collector.start()
614
615         # Start the actual traffic
616         for traffic_gen in traffic_runners:
617             LOG.info("Starting traffic on %s", traffic_gen.name)
618             traffic_gen.run_traffic(self.traffic_profile)
619
620     def run(self, result):  # yardstick API
621         """ Yardstick calls run() at intervals defined in the yaml and
622             produces timestamped samples
623
624         :param result: dictionary with results to update
625         :return: None
626         """
627
628         # this is the only method that is check from the runner
629         # so if we have any fatal error it must be raised via these methods
630         # otherwise we will not terminate
631
632         result.update(self.collector.get_kpi())
633
634     def teardown(self):
635         """ Stop the collector and terminate VNF & TG instance
636
637         :return
638         """
639
640         try:
641             try:
642                 self.collector.stop()
643                 for vnf in self.vnfs:
644                     LOG.info("Stopping %s", vnf.name)
645                     vnf.terminate()
646                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
647             finally:
648                 terminate_children()
649         except Exception:
650             # catch any exception in teardown and convert to simple exception
651             # never pass exceptions back to multiprocessing, because some exceptions can
652             # be unpicklable
653             # https://bugs.python.org/issue9400
654             LOG.exception("")
655             raise RuntimeError("Error in teardown")