Merge "add wait_for_instantiate"
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ NSPerf specific scenario definition """
15
16 from __future__ import absolute_import
17
18 import logging
19 import errno
20
21 import ipaddress
22
23 import copy
24 import os
25 import sys
26 import re
27 from itertools import chain
28
29 import six
30 import yaml
31 from collections import defaultdict
32
33 from yardstick.benchmark.scenarios import base
34 from yardstick.common.constants import LOG_DIR
35 from yardstick.common.process import terminate_children
36 from yardstick.common.utils import import_modules_from_package, itersubclasses
37 from yardstick.common.yaml_loader import yaml_load
38 from yardstick.network_services.collector.subscriber import Collector
39 from yardstick.network_services.vnf_generic import vnfdgen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.traffic_profile.base import TrafficProfile
42 from yardstick.network_services.utils import get_nsb_option
43 from yardstick import ssh
44
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SSHError(Exception):
50     """Class handles ssh connection error exception"""
51     pass
52
53
54 class SSHTimeout(SSHError):
55     """Class handles ssh connection timeout exception"""
56     pass
57
58
59 class IncorrectConfig(Exception):
60     """Class handles incorrect configuration during setup"""
61     pass
62
63
64 class IncorrectSetup(Exception):
65     """Class handles incorrect setup during setup"""
66     pass
67
68
69 class SshManager(object):
70     def __init__(self, node, timeout=120):
71         super(SshManager, self).__init__()
72         self.node = node
73         self.conn = None
74         self.timeout = timeout
75
76     def __enter__(self):
77         """
78         args -> network device mappings
79         returns -> ssh connection ready to be used
80         """
81         try:
82             self.conn = ssh.SSH.from_node(self.node)
83             self.conn.wait(timeout=self.timeout)
84         except SSHError as error:
85             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
86         # self.conn defaults to None
87         return self.conn
88
89     def __exit__(self, exc_type, exc_val, exc_tb):
90         if self.conn:
91             self.conn.close()
92
93
94 def find_relative_file(path, task_path):
95     """
96     Find file in one of places: in abs of path or
97     relative to TC scenario file. In this order.
98
99     :param path:
100     :param task_path:
101     :return str: full path to file
102     """
103     # fixme: create schema to validate all fields have been provided
104     for lookup in [os.path.abspath(path), os.path.join(task_path, path)]:
105         try:
106             with open(lookup):
107                 return lookup
108         except IOError:
109             pass
110     raise IOError(errno.ENOENT, 'Unable to find {} file'.format(path))
111
112
113 def open_relative_file(path, task_path):
114     try:
115         return open(path)
116     except IOError as e:
117         if e.errno == errno.ENOENT:
118             return open(os.path.join(task_path, path))
119         raise
120
121
122 class NetworkServiceTestCase(base.Scenario):
123     """Class handles Generic framework to do pre-deployment VNF &
124        Network service testing  """
125
126     __scenario_type__ = "NSPerf"
127
128     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
129         super(NetworkServiceTestCase, self).__init__()
130         self.scenario_cfg = scenario_cfg
131         self.context_cfg = context_cfg
132
133         # fixme: create schema to validate all fields have been provided
134         with open_relative_file(scenario_cfg["topology"],
135                                 scenario_cfg['task_path']) as stream:
136             topology_yaml = yaml_load(stream)
137
138         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
139         self.vnfs = []
140         self.collector = None
141         self.traffic_profile = None
142         self.node_netdevs = {}
143
144     def _get_ip_flow_range(self, ip_start_range):
145
146         # IP range is specified as 'x.x.x.x-y.y.y.y'
147         if isinstance(ip_start_range, six.string_types):
148             return ip_start_range
149
150         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
151         if node_name is None:
152             # we are manually specifying the range
153             ip_addr_range = range_or_interface
154         else:
155             node = self.context_cfg["nodes"].get(node_name, {})
156             try:
157                 # the ip_range is the interface name
158                 interface = node.get("interfaces", {})[range_or_interface]
159             except KeyError:
160                 ip = "0.0.0.0"
161                 mask = "255.255.255.0"
162             else:
163                 ip = interface["local_ip"]
164                 # we can't default these values, they must both exist to be valid
165                 mask = interface["netmask"]
166
167             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
168             hosts = list(ipaddr.hosts())
169             if len(hosts) > 2:
170                 # skip the first host in case of gateway
171                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
172             else:
173                 LOG.warning("Only single IP in range %s", ipaddr)
174                 # fall back to single IP range
175                 ip_addr_range = ip
176         return ip_addr_range
177
178     def _get_traffic_flow(self):
179         flow = {}
180         try:
181             # TODO: should be .0  or .1 so we can use list
182             # but this also roughly matches uplink_0, downlink_0
183             fflow = self.scenario_cfg["options"]["flow"]
184             for index, src in enumerate(fflow.get("src_ip", [])):
185                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
186
187             for index, dst in enumerate(fflow.get("dst_ip", [])):
188                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
189
190             for index, publicip in enumerate(fflow.get("public_ip", [])):
191                 flow["public_ip_{}".format(index)] = publicip
192
193             flow["count"] = fflow["count"]
194         except KeyError:
195             flow = {}
196         return {"flow": flow}
197
198     def _get_traffic_imix(self):
199         try:
200             imix = {"imix": self.scenario_cfg['options']['framesize']}
201         except KeyError:
202             imix = {}
203         return imix
204
205     def _get_traffic_profile(self):
206         profile = self.scenario_cfg["traffic_profile"]
207         path = self.scenario_cfg["task_path"]
208         with open_relative_file(profile, path) as infile:
209             return infile.read()
210
211     def _fill_traffic_profile(self):
212         traffic_mapping = self._get_traffic_profile()
213         traffic_map_data = {
214             'flow': self._get_traffic_flow(),
215             'imix': self._get_traffic_imix(),
216             TrafficProfile.UPLINK: {},
217             TrafficProfile.DOWNLINK: {},
218         }
219
220         traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
221         self.traffic_profile = TrafficProfile.get(traffic_vnfd)
222         return self.traffic_profile
223
224     def _find_vnf_name_from_id(self, vnf_id):
225         return next((vnfd["vnfd-id-ref"]
226                      for vnfd in self.topology["constituent-vnfd"]
227                      if vnf_id == vnfd["member-vnf-index"]), None)
228
229     @staticmethod
230     def get_vld_networks(networks):
231         # network name is vld_id
232         vld_map = {}
233         for name, n in networks.items():
234             try:
235                 vld_map[n['vld_id']] = n
236             except KeyError:
237                 vld_map[name] = n
238         return vld_map
239
240     @staticmethod
241     def find_node_if(nodes, name, if_name, vld_id):
242         try:
243             # check for xe0, xe1
244             intf = nodes[name]["interfaces"][if_name]
245         except KeyError:
246             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
247             # pop it and re-insert with the correct name from topology
248             intf = nodes[name]["interfaces"].pop(vld_id)
249             nodes[name]["interfaces"][if_name] = intf
250         return intf
251
252     def _resolve_topology(self):
253         for vld in self.topology["vld"]:
254             try:
255                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
256             except (ValueError, TypeError):
257                 raise IncorrectConfig("Topology file corrupted, "
258                                       "wrong endpoint count for connection")
259
260             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
261             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
262
263             node0_if_name = node0_data["vnfd-connection-point-ref"]
264             node1_if_name = node1_data["vnfd-connection-point-ref"]
265
266             try:
267                 nodes = self.context_cfg["nodes"]
268                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
269                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
270
271                 # names so we can do reverse lookups
272                 node0_if["ifname"] = node0_if_name
273                 node1_if["ifname"] = node1_if_name
274
275                 node0_if["node_name"] = node0_name
276                 node1_if["node_name"] = node1_name
277
278                 node0_if["vld_id"] = vld["id"]
279                 node1_if["vld_id"] = vld["id"]
280
281                 # set peer name
282                 node0_if["peer_name"] = node1_name
283                 node1_if["peer_name"] = node0_name
284
285                 # set peer interface name
286                 node0_if["peer_ifname"] = node1_if_name
287                 node1_if["peer_ifname"] = node0_if_name
288
289                 # just load the network
290                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
291                 node0_if["network"] = vld_networks.get(vld["id"], {})
292                 node1_if["network"] = vld_networks.get(vld["id"], {})
293
294                 node0_if["dst_mac"] = node1_if["local_mac"]
295                 node0_if["dst_ip"] = node1_if["local_ip"]
296
297                 node1_if["dst_mac"] = node0_if["local_mac"]
298                 node1_if["dst_ip"] = node0_if["local_ip"]
299
300             except KeyError:
301                 LOG.exception("")
302                 raise IncorrectConfig("Required interface not found, "
303                                       "topology file corrupted")
304
305         for vld in self.topology['vld']:
306             try:
307                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
308             except (ValueError, TypeError):
309                 raise IncorrectConfig("Topology file corrupted, "
310                                       "wrong endpoint count for connection")
311
312             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
313             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
314
315             node0_if_name = node0_data["vnfd-connection-point-ref"]
316             node1_if_name = node1_data["vnfd-connection-point-ref"]
317
318             nodes = self.context_cfg["nodes"]
319             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
320             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
321
322             # add peer interface dict, but remove circular link
323             # TODO: don't waste memory
324             node0_copy = node0_if.copy()
325             node1_copy = node1_if.copy()
326             node0_if["peer_intf"] = node1_copy
327             node1_if["peer_intf"] = node0_copy
328
329     def _find_vnfd_from_vnf_idx(self, vnf_idx):
330         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
331                      if vnf_idx == vnfd["member-vnf-index"]), None)
332
333     def _update_context_with_topology(self):
334         for vnfd in self.topology["constituent-vnfd"]:
335             vnf_idx = vnfd["member-vnf-index"]
336             vnf_name = self._find_vnf_name_from_id(vnf_idx)
337             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
338             self.context_cfg["nodes"][vnf_name].update(vnfd)
339
340     def _probe_netdevs(self, node, node_dict, timeout=120):
341         try:
342             return self.node_netdevs[node]
343         except KeyError:
344             pass
345
346         netdevs = {}
347         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
348
349         with SshManager(node_dict, timeout=timeout) as conn:
350             if conn:
351                 exit_status = conn.execute(cmd)[0]
352                 if exit_status != 0:
353                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
354                 exit_status, stdout, _ = conn.execute(
355                     self.FIND_NETDEVICE_STRING)
356                 if exit_status != 0:
357                     raise IncorrectSetup(
358                         "Cannot find netdev info in sysfs" % node)
359                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
360
361         self.node_netdevs[node] = netdevs
362         return netdevs
363
364     @classmethod
365     def _probe_missing_values(cls, netdevs, network):
366
367         mac_lower = network['local_mac'].lower()
368         for netdev in netdevs.values():
369             if netdev['address'].lower() != mac_lower:
370                 continue
371             network.update({
372                 'driver': netdev['driver'],
373                 'vpci': netdev['pci_bus_id'],
374                 'ifindex': netdev['ifindex'],
375             })
376
377     def _generate_pod_yaml(self):
378         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
379         # convert OrderedDict to a list
380         # pod.yaml nodes is a list
381         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
382         pod_dict = {
383             "nodes": nodes,
384             "networks": self.context_cfg["networks"]
385         }
386         with open(context_yaml, "w") as context_out:
387             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
388                            explicit_start=True)
389
390     @staticmethod
391     def _serialize_node(node):
392         new_node = copy.deepcopy(node)
393         # name field is required
394         # remove context suffix
395         new_node["name"] = node['name'].split('.')[0]
396         try:
397             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
398         except KeyError:
399             pass
400         return new_node
401
402     TOPOLOGY_REQUIRED_KEYS = frozenset({
403         "vpci", "local_ip", "netmask", "local_mac", "driver"})
404
405     def map_topology_to_infrastructure(self):
406         """ This method should verify if the available resources defined in pod.yaml
407         match the topology.yaml file.
408
409         :return: None. Side effect: context_cfg is updated
410         """
411         num_nodes = len(self.context_cfg["nodes"])
412         # OpenStack instance creation time is probably proportional to the number
413         # of instances
414         timeout = 120 * num_nodes
415         for node, node_dict in self.context_cfg["nodes"].items():
416
417             for network in node_dict["interfaces"].values():
418                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
419                 if not missing:
420                     continue
421
422                 # only ssh probe if there are missing values
423                 # ssh probe won't work on Ixia, so we had better define all our values
424                 try:
425                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
426                 except (SSHError, SSHTimeout):
427                     raise IncorrectConfig(
428                         "Unable to probe missing interface fields '%s', on node %s "
429                         "SSH Error" % (', '.join(missing), node))
430                 try:
431                     self._probe_missing_values(netdevs, network)
432                 except KeyError:
433                     pass
434                 else:
435                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
436                         network)
437                 if missing:
438                     raise IncorrectConfig(
439                         "Require interface fields '%s' not found, topology file "
440                         "corrupted" % ', '.join(missing))
441
442         # we have to generate pod.yaml here so we have vpci and driver
443         self._generate_pod_yaml()
444         # 3. Use topology file to find connections & resolve dest address
445         self._resolve_topology()
446         self._update_context_with_topology()
447
448     FIND_NETDEVICE_STRING = r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
449 $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
450 $1/device/subsystem_vendor $1/device/subsystem_device ; \
451 printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
452 ' sh  \{\}/* \;
453 """
454     BASE_ADAPTER_RE = re.compile(
455         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
456
457     @classmethod
458     def parse_netdev_info(cls, stdout):
459         network_devices = defaultdict(dict)
460         matches = cls.BASE_ADAPTER_RE.findall(stdout)
461         for bus_path, interface_name, name, value in matches:
462             dirname, bus_id = os.path.split(bus_path)
463             if 'virtio' in bus_id:
464                 # for some stupid reason VMs include virtio1/
465                 # in PCI device path
466                 bus_id = os.path.basename(dirname)
467             # remove extra 'device/' from 'device/vendor,
468             # device/subsystem_vendor', etc.
469             if 'device/' in name:
470                 name = name.split('/')[1]
471             network_devices[interface_name][name] = value
472             network_devices[interface_name][
473                 'interface_name'] = interface_name
474             network_devices[interface_name]['pci_bus_id'] = bus_id
475         # convert back to regular dict
476         return dict(network_devices)
477
478     @classmethod
479     def get_vnf_impl(cls, vnf_model_id):
480         """ Find the implementing class from vnf_model["vnf"]["name"] field
481
482         :param vnf_model_id: parsed vnfd model ID field
483         :return: subclass of GenericVNF
484         """
485         import_modules_from_package(
486             "yardstick.network_services.vnf_generic.vnf")
487         expected_name = vnf_model_id
488         classes_found = []
489
490         def impl():
491             for name, class_ in ((c.__name__, c) for c in itersubclasses(GenericVNF)):
492                 if name == expected_name:
493                     yield class_
494                 classes_found.append(name)
495
496         try:
497             return next(impl())
498         except StopIteration:
499             pass
500
501         raise IncorrectConfig("No implementation for %s found in %s" %
502                               (expected_name, classes_found))
503
504     @staticmethod
505     def create_interfaces_from_node(vnfd, node):
506         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
507         # have to sort so xe0 goes first
508         for intf_name, intf in sorted(node['interfaces'].items()):
509             # only interfaces with vld_id are added.
510             # Thus there are two layers of filters, only intefaces with vld_id
511             # show up in interfaces, and only interfaces with traffic profiles
512             # are used by the generators
513             if intf.get('vld_id'):
514                 # force dpkd_port_num to int so we can do reverse lookup
515                 try:
516                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
517                 except KeyError:
518                     pass
519                 ext_intf = {
520                     "name": intf_name,
521                     "virtual-interface": intf,
522                     "vnfd-connection-point-ref": intf_name,
523                 }
524                 ext_intfs.append(ext_intf)
525
526     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
527         """ Create VNF objects based on YAML descriptors
528
529         :param scenario_cfg:
530         :type scenario_cfg:
531         :param context_cfg:
532         :return:
533         """
534         trex_lib_path = get_nsb_option('trex_client_lib')
535         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
536
537         if scenario_cfg is None:
538             scenario_cfg = self.scenario_cfg
539
540         if context_cfg is None:
541             context_cfg = self.context_cfg
542
543         vnfs = []
544         # we assume OrderedDict for consistenct in instantiation
545         for node_name, node in context_cfg["nodes"].items():
546             LOG.debug(node)
547             try:
548                 file_name = node["VNF model"]
549             except KeyError:
550                 LOG.debug("no model for %s, skipping", node_name)
551                 continue
552             file_path = scenario_cfg['task_path']
553             with open_relative_file(file_name, file_path) as stream:
554                 vnf_model = stream.read()
555             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
556             # TODO: here add extra context_cfg["nodes"] regardless of template
557             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
558             # force inject pkey if it exists
559             # we want to standardize Heat using pkey as a string so we don't rely
560             # on the filesystem
561             try:
562                 vnfd['mgmt-interface']['pkey'] = node['pkey']
563             except KeyError:
564                 pass
565             self.create_interfaces_from_node(vnfd, node)
566             vnf_impl = self.get_vnf_impl(vnfd['id'])
567             vnf_instance = vnf_impl(node_name, vnfd)
568             vnfs.append(vnf_instance)
569
570         self.vnfs = vnfs
571         return vnfs
572
573     def setup(self):
574         """ Setup infrastructure, provission VNFs & start traffic
575
576         :return:
577         """
578         # 1. Verify if infrastructure mapping can meet topology
579         self.map_topology_to_infrastructure()
580         # 1a. Load VNF models
581         self.load_vnf_models()
582         # 1b. Fill traffic profile with information from topology
583         self._fill_traffic_profile()
584
585         # 2. Provision VNFs
586
587         # link events will cause VNF application to exit
588         # so we should start traffic runners before VNFs
589         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
590         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
591         try:
592             for vnf in chain(traffic_runners, non_traffic_runners):
593                 LOG.info("Instantiating %s", vnf.name)
594                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
595                 LOG.info("Waiting for %s to instantiate", vnf.name)
596                 vnf.wait_for_instantiate()
597         except:
598             LOG.exception("")
599             for vnf in self.vnfs:
600                 vnf.terminate()
601             raise
602
603         # 3. Run experiment
604         # Start listeners first to avoid losing packets
605         for traffic_gen in traffic_runners:
606             traffic_gen.listen_traffic(self.traffic_profile)
607
608         # register collector with yardstick for KPI collection.
609         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
610         self.collector.start()
611
612         # Start the actual traffic
613         for traffic_gen in traffic_runners:
614             LOG.info("Starting traffic on %s", traffic_gen.name)
615             traffic_gen.run_traffic(self.traffic_profile)
616
617     def run(self, result):  # yardstick API
618         """ Yardstick calls run() at intervals defined in the yaml and
619             produces timestamped samples
620
621         :param result: dictionary with results to update
622         :return: None
623         """
624
625         # this is the only method that is check from the runner
626         # so if we have any fatal error it must be raised via these methods
627         # otherwise we will not terminate
628
629         result.update(self.collector.get_kpi())
630
631     def teardown(self):
632         """ Stop the collector and terminate VNF & TG instance
633
634         :return
635         """
636
637         try:
638             try:
639                 self.collector.stop()
640                 for vnf in self.vnfs:
641                     LOG.info("Stopping %s", vnf.name)
642                     vnf.terminate()
643                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
644             finally:
645                 terminate_children()
646         except Exception:
647             # catch any exception in teardown and convert to simple exception
648             # never pass exceptions back to multiprocessing, because some exceptions can
649             # be unpicklable
650             # https://bugs.python.org/issue9400
651             LOG.exception("")
652             raise RuntimeError("Error in teardown")