Merge "Fio: support input job file configuration"
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ NSPerf specific scenario definition """
15
16 from __future__ import absolute_import
17
18 import logging
19 import errno
20
21 import ipaddress
22
23 import copy
24 import os
25 import sys
26 import re
27 from itertools import chain
28
29 import six
30 import yaml
31 from collections import defaultdict
32
33 from yardstick.benchmark.scenarios import base
34 from yardstick.common.constants import LOG_DIR
35 from yardstick.common.process import terminate_children
36 from yardstick.common.utils import import_modules_from_package, itersubclasses
37 from yardstick.common.yaml_loader import yaml_load
38 from yardstick.network_services.collector.subscriber import Collector
39 from yardstick.network_services.vnf_generic import vnfdgen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.traffic_profile.base import TrafficProfile
42 from yardstick.network_services.utils import get_nsb_option
43 from yardstick import ssh
44
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SSHError(Exception):
50     """Class handles ssh connection error exception"""
51     pass
52
53
54 class SSHTimeout(SSHError):
55     """Class handles ssh connection timeout exception"""
56     pass
57
58
59 class IncorrectConfig(Exception):
60     """Class handles incorrect configuration during setup"""
61     pass
62
63
64 class IncorrectSetup(Exception):
65     """Class handles incorrect setup during setup"""
66     pass
67
68
69 class SshManager(object):
70     def __init__(self, node, timeout=120):
71         super(SshManager, self).__init__()
72         self.node = node
73         self.conn = None
74         self.timeout = timeout
75
76     def __enter__(self):
77         """
78         args -> network device mappings
79         returns -> ssh connection ready to be used
80         """
81         try:
82             self.conn = ssh.SSH.from_node(self.node)
83             self.conn.wait(timeout=self.timeout)
84         except SSHError as error:
85             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
86         # self.conn defaults to None
87         return self.conn
88
89     def __exit__(self, exc_type, exc_val, exc_tb):
90         if self.conn:
91             self.conn.close()
92
93
94 def find_relative_file(path, task_path):
95     """
96     Find file in one of places: in abs of path or
97     relative to TC scenario file. In this order.
98
99     :param path:
100     :param task_path:
101     :return str: full path to file
102     """
103     # fixme: create schema to validate all fields have been provided
104     for lookup in [os.path.abspath(path), os.path.join(task_path, path)]:
105         try:
106             with open(lookup):
107                 return lookup
108         except IOError:
109             pass
110     raise IOError(errno.ENOENT, 'Unable to find {} file'.format(path))
111
112
113 def open_relative_file(path, task_path):
114     try:
115         return open(path)
116     except IOError as e:
117         if e.errno == errno.ENOENT:
118             return open(os.path.join(task_path, path))
119         raise
120
121
122 class NetworkServiceTestCase(base.Scenario):
123     """Class handles Generic framework to do pre-deployment VNF &
124        Network service testing  """
125
126     __scenario_type__ = "NSPerf"
127
128     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
129         super(NetworkServiceTestCase, self).__init__()
130         self.scenario_cfg = scenario_cfg
131         self.context_cfg = context_cfg
132
133         # fixme: create schema to validate all fields have been provided
134         with open_relative_file(scenario_cfg["topology"],
135                                 scenario_cfg['task_path']) as stream:
136             topology_yaml = yaml_load(stream)
137
138         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
139         self.vnfs = []
140         self.collector = None
141         self.traffic_profile = None
142         self.node_netdevs = {}
143
144     def _get_ip_flow_range(self, ip_start_range):
145
146         # IP range is specified as 'x.x.x.x-y.y.y.y'
147         if isinstance(ip_start_range, six.string_types):
148             return ip_start_range
149
150         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
151         if node_name is None:
152             # we are manually specifying the range
153             ip_addr_range = range_or_interface
154         else:
155             node = self.context_cfg["nodes"].get(node_name, {})
156             try:
157                 # the ip_range is the interface name
158                 interface = node.get("interfaces", {})[range_or_interface]
159             except KeyError:
160                 ip = "0.0.0.0"
161                 mask = "255.255.255.0"
162             else:
163                 ip = interface["local_ip"]
164                 # we can't default these values, they must both exist to be valid
165                 mask = interface["netmask"]
166
167             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
168             hosts = list(ipaddr.hosts())
169             if len(hosts) > 2:
170                 # skip the first host in case of gateway
171                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
172             else:
173                 LOG.warning("Only single IP in range %s", ipaddr)
174                 # fall back to single IP range
175                 ip_addr_range = ip
176         return ip_addr_range
177
178     def _get_traffic_flow(self):
179         flow = {}
180         try:
181             # TODO: should be .0  or .1 so we can use list
182             # but this also roughly matches uplink_0, downlink_0
183             fflow = self.scenario_cfg["options"]["flow"]
184             for index, src in enumerate(fflow.get("src_ip", [])):
185                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
186
187             for index, dst in enumerate(fflow.get("dst_ip", [])):
188                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
189
190             for index, publicip in enumerate(fflow.get("public_ip", [])):
191                 flow["public_ip_{}".format(index)] = publicip
192
193             for index, src_port in enumerate(fflow.get("src_port", [])):
194                 flow["src_port_{}".format(index)] = src_port
195
196             for index, dst_port in enumerate(fflow.get("dst_port", [])):
197                 flow["dst_port_{}".format(index)] = dst_port
198
199             flow["count"] = fflow["count"]
200         except KeyError:
201             flow = {}
202         return {"flow": flow}
203
204     def _get_traffic_imix(self):
205         try:
206             imix = {"imix": self.scenario_cfg['options']['framesize']}
207         except KeyError:
208             imix = {}
209         return imix
210
211     def _get_traffic_profile(self):
212         profile = self.scenario_cfg["traffic_profile"]
213         path = self.scenario_cfg["task_path"]
214         with open_relative_file(profile, path) as infile:
215             return infile.read()
216
217     def _fill_traffic_profile(self):
218         traffic_mapping = self._get_traffic_profile()
219         traffic_map_data = {
220             'flow': self._get_traffic_flow(),
221             'imix': self._get_traffic_imix(),
222             TrafficProfile.UPLINK: {},
223             TrafficProfile.DOWNLINK: {},
224         }
225
226         traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
227         self.traffic_profile = TrafficProfile.get(traffic_vnfd)
228         return self.traffic_profile
229
230     def _find_vnf_name_from_id(self, vnf_id):
231         return next((vnfd["vnfd-id-ref"]
232                      for vnfd in self.topology["constituent-vnfd"]
233                      if vnf_id == vnfd["member-vnf-index"]), None)
234
235     @staticmethod
236     def get_vld_networks(networks):
237         # network name is vld_id
238         vld_map = {}
239         for name, n in networks.items():
240             try:
241                 vld_map[n['vld_id']] = n
242             except KeyError:
243                 vld_map[name] = n
244         return vld_map
245
246     @staticmethod
247     def find_node_if(nodes, name, if_name, vld_id):
248         try:
249             # check for xe0, xe1
250             intf = nodes[name]["interfaces"][if_name]
251         except KeyError:
252             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
253             # pop it and re-insert with the correct name from topology
254             intf = nodes[name]["interfaces"].pop(vld_id)
255             nodes[name]["interfaces"][if_name] = intf
256         return intf
257
258     def _resolve_topology(self):
259         for vld in self.topology["vld"]:
260             try:
261                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
262             except (ValueError, TypeError):
263                 raise IncorrectConfig("Topology file corrupted, "
264                                       "wrong endpoint count for connection")
265
266             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
267             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
268
269             node0_if_name = node0_data["vnfd-connection-point-ref"]
270             node1_if_name = node1_data["vnfd-connection-point-ref"]
271
272             try:
273                 nodes = self.context_cfg["nodes"]
274                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
275                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
276
277                 # names so we can do reverse lookups
278                 node0_if["ifname"] = node0_if_name
279                 node1_if["ifname"] = node1_if_name
280
281                 node0_if["node_name"] = node0_name
282                 node1_if["node_name"] = node1_name
283
284                 node0_if["vld_id"] = vld["id"]
285                 node1_if["vld_id"] = vld["id"]
286
287                 # set peer name
288                 node0_if["peer_name"] = node1_name
289                 node1_if["peer_name"] = node0_name
290
291                 # set peer interface name
292                 node0_if["peer_ifname"] = node1_if_name
293                 node1_if["peer_ifname"] = node0_if_name
294
295                 # just load the network
296                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
297                 node0_if["network"] = vld_networks.get(vld["id"], {})
298                 node1_if["network"] = vld_networks.get(vld["id"], {})
299
300                 node0_if["dst_mac"] = node1_if["local_mac"]
301                 node0_if["dst_ip"] = node1_if["local_ip"]
302
303                 node1_if["dst_mac"] = node0_if["local_mac"]
304                 node1_if["dst_ip"] = node0_if["local_ip"]
305
306             except KeyError:
307                 LOG.exception("")
308                 raise IncorrectConfig("Required interface not found, "
309                                       "topology file corrupted")
310
311         for vld in self.topology['vld']:
312             try:
313                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
314             except (ValueError, TypeError):
315                 raise IncorrectConfig("Topology file corrupted, "
316                                       "wrong endpoint count for connection")
317
318             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
319             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
320
321             node0_if_name = node0_data["vnfd-connection-point-ref"]
322             node1_if_name = node1_data["vnfd-connection-point-ref"]
323
324             nodes = self.context_cfg["nodes"]
325             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
326             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
327
328             # add peer interface dict, but remove circular link
329             # TODO: don't waste memory
330             node0_copy = node0_if.copy()
331             node1_copy = node1_if.copy()
332             node0_if["peer_intf"] = node1_copy
333             node1_if["peer_intf"] = node0_copy
334
335     def _find_vnfd_from_vnf_idx(self, vnf_idx):
336         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
337                      if vnf_idx == vnfd["member-vnf-index"]), None)
338
339     def _update_context_with_topology(self):
340         for vnfd in self.topology["constituent-vnfd"]:
341             vnf_idx = vnfd["member-vnf-index"]
342             vnf_name = self._find_vnf_name_from_id(vnf_idx)
343             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
344             self.context_cfg["nodes"][vnf_name].update(vnfd)
345
346     def _probe_netdevs(self, node, node_dict, timeout=120):
347         try:
348             return self.node_netdevs[node]
349         except KeyError:
350             pass
351
352         netdevs = {}
353         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
354
355         with SshManager(node_dict, timeout=timeout) as conn:
356             if conn:
357                 exit_status = conn.execute(cmd)[0]
358                 if exit_status != 0:
359                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
360                 exit_status, stdout, _ = conn.execute(
361                     self.FIND_NETDEVICE_STRING)
362                 if exit_status != 0:
363                     raise IncorrectSetup(
364                         "Cannot find netdev info in sysfs" % node)
365                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
366
367         self.node_netdevs[node] = netdevs
368         return netdevs
369
370     @classmethod
371     def _probe_missing_values(cls, netdevs, network):
372
373         mac_lower = network['local_mac'].lower()
374         for netdev in netdevs.values():
375             if netdev['address'].lower() != mac_lower:
376                 continue
377             network.update({
378                 'driver': netdev['driver'],
379                 'vpci': netdev['pci_bus_id'],
380                 'ifindex': netdev['ifindex'],
381             })
382
383     def _generate_pod_yaml(self):
384         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
385         # convert OrderedDict to a list
386         # pod.yaml nodes is a list
387         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
388         pod_dict = {
389             "nodes": nodes,
390             "networks": self.context_cfg["networks"]
391         }
392         with open(context_yaml, "w") as context_out:
393             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
394                            explicit_start=True)
395
396     @staticmethod
397     def _serialize_node(node):
398         new_node = copy.deepcopy(node)
399         # name field is required
400         # remove context suffix
401         new_node["name"] = node['name'].split('.')[0]
402         try:
403             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
404         except KeyError:
405             pass
406         return new_node
407
408     TOPOLOGY_REQUIRED_KEYS = frozenset({
409         "vpci", "local_ip", "netmask", "local_mac", "driver"})
410
411     def map_topology_to_infrastructure(self):
412         """ This method should verify if the available resources defined in pod.yaml
413         match the topology.yaml file.
414
415         :return: None. Side effect: context_cfg is updated
416         """
417         num_nodes = len(self.context_cfg["nodes"])
418         # OpenStack instance creation time is probably proportional to the number
419         # of instances
420         timeout = 120 * num_nodes
421         for node, node_dict in self.context_cfg["nodes"].items():
422
423             for network in node_dict["interfaces"].values():
424                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
425                 if not missing:
426                     continue
427
428                 # only ssh probe if there are missing values
429                 # ssh probe won't work on Ixia, so we had better define all our values
430                 try:
431                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
432                 except (SSHError, SSHTimeout):
433                     raise IncorrectConfig(
434                         "Unable to probe missing interface fields '%s', on node %s "
435                         "SSH Error" % (', '.join(missing), node))
436                 try:
437                     self._probe_missing_values(netdevs, network)
438                 except KeyError:
439                     pass
440                 else:
441                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
442                         network)
443                 if missing:
444                     raise IncorrectConfig(
445                         "Require interface fields '%s' not found, topology file "
446                         "corrupted" % ', '.join(missing))
447
448         # we have to generate pod.yaml here so we have vpci and driver
449         self._generate_pod_yaml()
450         # 3. Use topology file to find connections & resolve dest address
451         self._resolve_topology()
452         self._update_context_with_topology()
453
454     FIND_NETDEVICE_STRING = r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
455 $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
456 $1/device/subsystem_vendor $1/device/subsystem_device ; \
457 printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
458 ' sh  \{\}/* \;
459 """
460     BASE_ADAPTER_RE = re.compile(
461         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
462
463     @classmethod
464     def parse_netdev_info(cls, stdout):
465         network_devices = defaultdict(dict)
466         matches = cls.BASE_ADAPTER_RE.findall(stdout)
467         for bus_path, interface_name, name, value in matches:
468             dirname, bus_id = os.path.split(bus_path)
469             if 'virtio' in bus_id:
470                 # for some stupid reason VMs include virtio1/
471                 # in PCI device path
472                 bus_id = os.path.basename(dirname)
473             # remove extra 'device/' from 'device/vendor,
474             # device/subsystem_vendor', etc.
475             if 'device/' in name:
476                 name = name.split('/')[1]
477             network_devices[interface_name][name] = value
478             network_devices[interface_name][
479                 'interface_name'] = interface_name
480             network_devices[interface_name]['pci_bus_id'] = bus_id
481         # convert back to regular dict
482         return dict(network_devices)
483
484     @classmethod
485     def get_vnf_impl(cls, vnf_model_id):
486         """ Find the implementing class from vnf_model["vnf"]["name"] field
487
488         :param vnf_model_id: parsed vnfd model ID field
489         :return: subclass of GenericVNF
490         """
491         import_modules_from_package(
492             "yardstick.network_services.vnf_generic.vnf")
493         expected_name = vnf_model_id
494         classes_found = []
495
496         def impl():
497             for name, class_ in ((c.__name__, c) for c in itersubclasses(GenericVNF)):
498                 if name == expected_name:
499                     yield class_
500                 classes_found.append(name)
501
502         try:
503             return next(impl())
504         except StopIteration:
505             pass
506
507         raise IncorrectConfig("No implementation for %s found in %s" %
508                               (expected_name, classes_found))
509
510     @staticmethod
511     def create_interfaces_from_node(vnfd, node):
512         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
513         # have to sort so xe0 goes first
514         for intf_name, intf in sorted(node['interfaces'].items()):
515             # only interfaces with vld_id are added.
516             # Thus there are two layers of filters, only intefaces with vld_id
517             # show up in interfaces, and only interfaces with traffic profiles
518             # are used by the generators
519             if intf.get('vld_id'):
520                 # force dpkd_port_num to int so we can do reverse lookup
521                 try:
522                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
523                 except KeyError:
524                     pass
525                 ext_intf = {
526                     "name": intf_name,
527                     "virtual-interface": intf,
528                     "vnfd-connection-point-ref": intf_name,
529                 }
530                 ext_intfs.append(ext_intf)
531
532     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
533         """ Create VNF objects based on YAML descriptors
534
535         :param scenario_cfg:
536         :type scenario_cfg:
537         :param context_cfg:
538         :return:
539         """
540         trex_lib_path = get_nsb_option('trex_client_lib')
541         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
542
543         if scenario_cfg is None:
544             scenario_cfg = self.scenario_cfg
545
546         if context_cfg is None:
547             context_cfg = self.context_cfg
548
549         vnfs = []
550         # we assume OrderedDict for consistenct in instantiation
551         for node_name, node in context_cfg["nodes"].items():
552             LOG.debug(node)
553             try:
554                 file_name = node["VNF model"]
555             except KeyError:
556                 LOG.debug("no model for %s, skipping", node_name)
557                 continue
558             file_path = scenario_cfg['task_path']
559             with open_relative_file(file_name, file_path) as stream:
560                 vnf_model = stream.read()
561             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
562             # TODO: here add extra context_cfg["nodes"] regardless of template
563             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
564             # force inject pkey if it exists
565             # we want to standardize Heat using pkey as a string so we don't rely
566             # on the filesystem
567             try:
568                 vnfd['mgmt-interface']['pkey'] = node['pkey']
569             except KeyError:
570                 pass
571             self.create_interfaces_from_node(vnfd, node)
572             vnf_impl = self.get_vnf_impl(vnfd['id'])
573             vnf_instance = vnf_impl(node_name, vnfd)
574             vnfs.append(vnf_instance)
575
576         self.vnfs = vnfs
577         return vnfs
578
579     def setup(self):
580         """ Setup infrastructure, provission VNFs & start traffic
581
582         :return:
583         """
584         # 1. Verify if infrastructure mapping can meet topology
585         self.map_topology_to_infrastructure()
586         # 1a. Load VNF models
587         self.load_vnf_models()
588         # 1b. Fill traffic profile with information from topology
589         self._fill_traffic_profile()
590
591         # 2. Provision VNFs
592
593         # link events will cause VNF application to exit
594         # so we should start traffic runners before VNFs
595         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
596         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
597         try:
598             for vnf in chain(traffic_runners, non_traffic_runners):
599                 LOG.info("Instantiating %s", vnf.name)
600                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
601                 LOG.info("Waiting for %s to instantiate", vnf.name)
602                 vnf.wait_for_instantiate()
603         except:
604             LOG.exception("")
605             for vnf in self.vnfs:
606                 vnf.terminate()
607             raise
608
609         # 3. Run experiment
610         # Start listeners first to avoid losing packets
611         for traffic_gen in traffic_runners:
612             traffic_gen.listen_traffic(self.traffic_profile)
613
614         # register collector with yardstick for KPI collection.
615         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
616         self.collector.start()
617
618         # Start the actual traffic
619         for traffic_gen in traffic_runners:
620             LOG.info("Starting traffic on %s", traffic_gen.name)
621             traffic_gen.run_traffic(self.traffic_profile)
622
623     def run(self, result):  # yardstick API
624         """ Yardstick calls run() at intervals defined in the yaml and
625             produces timestamped samples
626
627         :param result: dictionary with results to update
628         :return: None
629         """
630
631         # this is the only method that is check from the runner
632         # so if we have any fatal error it must be raised via these methods
633         # otherwise we will not terminate
634
635         result.update(self.collector.get_kpi())
636
637     def teardown(self):
638         """ Stop the collector and terminate VNF & TG instance
639
640         :return
641         """
642
643         try:
644             try:
645                 self.collector.stop()
646                 for vnf in self.vnfs:
647                     LOG.info("Stopping %s", vnf.name)
648                     vnf.terminate()
649                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
650             finally:
651                 terminate_children()
652         except Exception:
653             # catch any exception in teardown and convert to simple exception
654             # never pass exceptions back to multiprocessing, because some exceptions can
655             # be unpicklable
656             # https://bugs.python.org/issue9400
657             LOG.exception("")
658             raise RuntimeError("Error in teardown")