45c151b5925fe66c4a8e4077f430193e97df20fe
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from collections import defaultdict
16 import copy
17 import logging
18 import ipaddress
19 from itertools import chain
20 import os
21 import re
22 import sys
23
24 import six
25 import yaml
26
27 from yardstick.benchmark.scenarios import base as scenario_base
28 from yardstick.common.constants import LOG_DIR
29 from yardstick.common.process import terminate_children
30 from yardstick.common import utils
31 from yardstick.network_services.collector.subscriber import Collector
32 from yardstick.network_services.vnf_generic import vnfdgen
33 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
34 from yardstick.network_services import traffic_profile
35 from yardstick.network_services.traffic_profile import base as tprofile_base
36 from yardstick.network_services.utils import get_nsb_option
37 from yardstick import ssh
38
39
40 traffic_profile.register_modules()
41
42
43 LOG = logging.getLogger(__name__)
44
45
46 class SSHError(Exception):
47     """Class handles ssh connection error exception"""
48     pass
49
50
51 class SSHTimeout(SSHError):
52     """Class handles ssh connection timeout exception"""
53     pass
54
55
56 class IncorrectConfig(Exception):
57     """Class handles incorrect configuration during setup"""
58     pass
59
60
61 class IncorrectSetup(Exception):
62     """Class handles incorrect setup during setup"""
63     pass
64
65
66 class SshManager(object):
67     def __init__(self, node, timeout=120):
68         super(SshManager, self).__init__()
69         self.node = node
70         self.conn = None
71         self.timeout = timeout
72
73     def __enter__(self):
74         """
75         args -> network device mappings
76         returns -> ssh connection ready to be used
77         """
78         try:
79             self.conn = ssh.SSH.from_node(self.node)
80             self.conn.wait(timeout=self.timeout)
81         except SSHError as error:
82             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
83         # self.conn defaults to None
84         return self.conn
85
86     def __exit__(self, exc_type, exc_val, exc_tb):
87         if self.conn:
88             self.conn.close()
89
90
91 class NetworkServiceTestCase(scenario_base.Scenario):
92     """Class handles Generic framework to do pre-deployment VNF &
93        Network service testing  """
94
95     __scenario_type__ = "NSPerf"
96
97     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
98         super(NetworkServiceTestCase, self).__init__()
99         self.scenario_cfg = scenario_cfg
100         self.context_cfg = context_cfg
101
102         self._render_topology()
103         self.vnfs = []
104         self.collector = None
105         self.traffic_profile = None
106         self.node_netdevs = {}
107
108     def _get_ip_flow_range(self, ip_start_range):
109
110         # IP range is specified as 'x.x.x.x-y.y.y.y'
111         if isinstance(ip_start_range, six.string_types):
112             return ip_start_range
113
114         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
115         if node_name is None:
116             # we are manually specifying the range
117             ip_addr_range = range_or_interface
118         else:
119             node = self.context_cfg["nodes"].get(node_name, {})
120             try:
121                 # the ip_range is the interface name
122                 interface = node.get("interfaces", {})[range_or_interface]
123             except KeyError:
124                 ip = "0.0.0.0"
125                 mask = "255.255.255.0"
126             else:
127                 ip = interface["local_ip"]
128                 # we can't default these values, they must both exist to be valid
129                 mask = interface["netmask"]
130
131             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
132             hosts = list(ipaddr.hosts())
133             if len(hosts) > 2:
134                 # skip the first host in case of gateway
135                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
136             else:
137                 LOG.warning("Only single IP in range %s", ipaddr)
138                 # fall back to single IP range
139                 ip_addr_range = ip
140         return ip_addr_range
141
142     def _get_traffic_flow(self):
143         flow = {}
144         try:
145             # TODO: should be .0  or .1 so we can use list
146             # but this also roughly matches uplink_0, downlink_0
147             fflow = self.scenario_cfg["options"]["flow"]
148             for index, src in enumerate(fflow.get("src_ip", [])):
149                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
150
151             for index, dst in enumerate(fflow.get("dst_ip", [])):
152                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
153
154             for index, publicip in enumerate(fflow.get("public_ip", [])):
155                 flow["public_ip_{}".format(index)] = publicip
156
157             for index, src_port in enumerate(fflow.get("src_port", [])):
158                 flow["src_port_{}".format(index)] = src_port
159
160             for index, dst_port in enumerate(fflow.get("dst_port", [])):
161                 flow["dst_port_{}".format(index)] = dst_port
162
163             flow["count"] = fflow["count"]
164         except KeyError:
165             flow = {}
166         return {"flow": flow}
167
168     def _get_traffic_imix(self):
169         try:
170             imix = {"imix": self.scenario_cfg['options']['framesize']}
171         except KeyError:
172             imix = {}
173         return imix
174
175     def _get_traffic_profile(self):
176         profile = self.scenario_cfg["traffic_profile"]
177         path = self.scenario_cfg["task_path"]
178         with utils.open_relative_file(profile, path) as infile:
179             return infile.read()
180
181     def _get_topology(self):
182         topology = self.scenario_cfg["topology"]
183         path = self.scenario_cfg["task_path"]
184         with utils.open_relative_file(topology, path) as infile:
185             return infile.read()
186
187     def _fill_traffic_profile(self):
188         tprofile = self._get_traffic_profile()
189         extra_args = self.scenario_cfg.get('extra_args', {})
190         tprofile_data = {
191             'flow': self._get_traffic_flow(),
192             'imix': self._get_traffic_imix(),
193             tprofile_base.TrafficProfile.UPLINK: {},
194             tprofile_base.TrafficProfile.DOWNLINK: {},
195             'extra_args': extra_args
196         }
197
198         traffic_vnfd = vnfdgen.generate_vnfd(tprofile, tprofile_data)
199         self.traffic_profile = tprofile_base.TrafficProfile.get(traffic_vnfd)
200
201     def _render_topology(self):
202         topology = self._get_topology()
203         topology_args = self.scenario_cfg.get('extra_args', {})
204         topolgy_data = {
205             'extra_args': topology_args
206         }
207         topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data)
208         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
209
210     def _find_vnf_name_from_id(self, vnf_id):
211         return next((vnfd["vnfd-id-ref"]
212                      for vnfd in self.topology["constituent-vnfd"]
213                      if vnf_id == vnfd["member-vnf-index"]), None)
214
215     @staticmethod
216     def get_vld_networks(networks):
217         # network name is vld_id
218         vld_map = {}
219         for name, n in networks.items():
220             try:
221                 vld_map[n['vld_id']] = n
222             except KeyError:
223                 vld_map[name] = n
224         return vld_map
225
226     @staticmethod
227     def find_node_if(nodes, name, if_name, vld_id):
228         try:
229             # check for xe0, xe1
230             intf = nodes[name]["interfaces"][if_name]
231         except KeyError:
232             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
233             # pop it and re-insert with the correct name from topology
234             intf = nodes[name]["interfaces"].pop(vld_id)
235             nodes[name]["interfaces"][if_name] = intf
236         return intf
237
238     def _resolve_topology(self):
239         for vld in self.topology["vld"]:
240             try:
241                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
242             except (ValueError, TypeError):
243                 raise IncorrectConfig("Topology file corrupted, "
244                                       "wrong endpoint count for connection")
245
246             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
247             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
248
249             node0_if_name = node0_data["vnfd-connection-point-ref"]
250             node1_if_name = node1_data["vnfd-connection-point-ref"]
251
252             try:
253                 nodes = self.context_cfg["nodes"]
254                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
255                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
256
257                 # names so we can do reverse lookups
258                 node0_if["ifname"] = node0_if_name
259                 node1_if["ifname"] = node1_if_name
260
261                 node0_if["node_name"] = node0_name
262                 node1_if["node_name"] = node1_name
263
264                 node0_if["vld_id"] = vld["id"]
265                 node1_if["vld_id"] = vld["id"]
266
267                 # set peer name
268                 node0_if["peer_name"] = node1_name
269                 node1_if["peer_name"] = node0_name
270
271                 # set peer interface name
272                 node0_if["peer_ifname"] = node1_if_name
273                 node1_if["peer_ifname"] = node0_if_name
274
275                 # just load the network
276                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
277                 node0_if["network"] = vld_networks.get(vld["id"], {})
278                 node1_if["network"] = vld_networks.get(vld["id"], {})
279
280                 node0_if["dst_mac"] = node1_if["local_mac"]
281                 node0_if["dst_ip"] = node1_if["local_ip"]
282
283                 node1_if["dst_mac"] = node0_if["local_mac"]
284                 node1_if["dst_ip"] = node0_if["local_ip"]
285
286             except KeyError:
287                 LOG.exception("")
288                 raise IncorrectConfig("Required interface not found, "
289                                       "topology file corrupted")
290
291         for vld in self.topology['vld']:
292             try:
293                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
294             except (ValueError, TypeError):
295                 raise IncorrectConfig("Topology file corrupted, "
296                                       "wrong endpoint count for connection")
297
298             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
299             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
300
301             node0_if_name = node0_data["vnfd-connection-point-ref"]
302             node1_if_name = node1_data["vnfd-connection-point-ref"]
303
304             nodes = self.context_cfg["nodes"]
305             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
306             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
307
308             # add peer interface dict, but remove circular link
309             # TODO: don't waste memory
310             node0_copy = node0_if.copy()
311             node1_copy = node1_if.copy()
312             node0_if["peer_intf"] = node1_copy
313             node1_if["peer_intf"] = node0_copy
314
315     def _find_vnfd_from_vnf_idx(self, vnf_idx):
316         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
317                      if vnf_idx == vnfd["member-vnf-index"]), None)
318
319     def _update_context_with_topology(self):
320         for vnfd in self.topology["constituent-vnfd"]:
321             vnf_idx = vnfd["member-vnf-index"]
322             vnf_name = self._find_vnf_name_from_id(vnf_idx)
323             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
324             self.context_cfg["nodes"][vnf_name].update(vnfd)
325
326     def _probe_netdevs(self, node, node_dict, timeout=120):
327         try:
328             return self.node_netdevs[node]
329         except KeyError:
330             pass
331
332         netdevs = {}
333         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
334
335         with SshManager(node_dict, timeout=timeout) as conn:
336             if conn:
337                 exit_status = conn.execute(cmd)[0]
338                 if exit_status != 0:
339                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
340                 exit_status, stdout, _ = conn.execute(
341                     self.FIND_NETDEVICE_STRING)
342                 if exit_status != 0:
343                     raise IncorrectSetup(
344                         "Cannot find netdev info in sysfs" % node)
345                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
346
347         self.node_netdevs[node] = netdevs
348         return netdevs
349
350     @classmethod
351     def _probe_missing_values(cls, netdevs, network):
352
353         mac_lower = network['local_mac'].lower()
354         for netdev in netdevs.values():
355             if netdev['address'].lower() != mac_lower:
356                 continue
357             network.update({
358                 'driver': netdev['driver'],
359                 'vpci': netdev['pci_bus_id'],
360                 'ifindex': netdev['ifindex'],
361             })
362
363     def _generate_pod_yaml(self):
364         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
365         # convert OrderedDict to a list
366         # pod.yaml nodes is a list
367         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
368         pod_dict = {
369             "nodes": nodes,
370             "networks": self.context_cfg["networks"]
371         }
372         with open(context_yaml, "w") as context_out:
373             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
374                            explicit_start=True)
375
376     @staticmethod
377     def _serialize_node(node):
378         new_node = copy.deepcopy(node)
379         # name field is required
380         # remove context suffix
381         new_node["name"] = node['name'].split('.')[0]
382         try:
383             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
384         except KeyError:
385             pass
386         return new_node
387
388     TOPOLOGY_REQUIRED_KEYS = frozenset({
389         "vpci", "local_ip", "netmask", "local_mac", "driver"})
390
391     def map_topology_to_infrastructure(self):
392         """ This method should verify if the available resources defined in pod.yaml
393         match the topology.yaml file.
394
395         :return: None. Side effect: context_cfg is updated
396         """
397         num_nodes = len(self.context_cfg["nodes"])
398         # OpenStack instance creation time is probably proportional to the number
399         # of instances
400         timeout = 120 * num_nodes
401         for node, node_dict in self.context_cfg["nodes"].items():
402
403             for network in node_dict["interfaces"].values():
404                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
405                 if not missing:
406                     continue
407
408                 # only ssh probe if there are missing values
409                 # ssh probe won't work on Ixia, so we had better define all our values
410                 try:
411                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
412                 except (SSHError, SSHTimeout):
413                     raise IncorrectConfig(
414                         "Unable to probe missing interface fields '%s', on node %s "
415                         "SSH Error" % (', '.join(missing), node))
416                 try:
417                     self._probe_missing_values(netdevs, network)
418                 except KeyError:
419                     pass
420                 else:
421                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
422                         network)
423                 if missing:
424                     raise IncorrectConfig(
425                         "Require interface fields '%s' not found, topology file "
426                         "corrupted" % ', '.join(missing))
427
428         # we have to generate pod.yaml here so we have vpci and driver
429         self._generate_pod_yaml()
430         # 3. Use topology file to find connections & resolve dest address
431         self._resolve_topology()
432         self._update_context_with_topology()
433
434     FIND_NETDEVICE_STRING = (
435         r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
436         $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
437         $1/device/subsystem_vendor $1/device/subsystem_device ; \
438         printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
439         ' sh  \{\}/* \;
440         """)
441
442     BASE_ADAPTER_RE = re.compile(
443         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
444
445     @classmethod
446     def parse_netdev_info(cls, stdout):
447         network_devices = defaultdict(dict)
448         matches = cls.BASE_ADAPTER_RE.findall(stdout)
449         for bus_path, interface_name, name, value in matches:
450             dirname, bus_id = os.path.split(bus_path)
451             if 'virtio' in bus_id:
452                 # for some stupid reason VMs include virtio1/
453                 # in PCI device path
454                 bus_id = os.path.basename(dirname)
455             # remove extra 'device/' from 'device/vendor,
456             # device/subsystem_vendor', etc.
457             if 'device/' in name:
458                 name = name.split('/')[1]
459             network_devices[interface_name][name] = value
460             network_devices[interface_name][
461                 'interface_name'] = interface_name
462             network_devices[interface_name]['pci_bus_id'] = bus_id
463         # convert back to regular dict
464         return dict(network_devices)
465
466     @classmethod
467     def get_vnf_impl(cls, vnf_model_id):
468         """ Find the implementing class from vnf_model["vnf"]["name"] field
469
470         :param vnf_model_id: parsed vnfd model ID field
471         :return: subclass of GenericVNF
472         """
473         utils.import_modules_from_package(
474             "yardstick.network_services.vnf_generic.vnf")
475         expected_name = vnf_model_id
476         classes_found = []
477
478         def impl():
479             for name, class_ in ((c.__name__, c) for c in
480                                  utils.itersubclasses(GenericVNF)):
481                 if name == expected_name:
482                     yield class_
483                 classes_found.append(name)
484
485         try:
486             return next(impl())
487         except StopIteration:
488             pass
489
490         raise IncorrectConfig("No implementation for %s found in %s" %
491                               (expected_name, classes_found))
492
493     @staticmethod
494     def create_interfaces_from_node(vnfd, node):
495         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
496         # have to sort so xe0 goes first
497         for intf_name, intf in sorted(node['interfaces'].items()):
498             # only interfaces with vld_id are added.
499             # Thus there are two layers of filters, only intefaces with vld_id
500             # show up in interfaces, and only interfaces with traffic profiles
501             # are used by the generators
502             if intf.get('vld_id'):
503                 # force dpkd_port_num to int so we can do reverse lookup
504                 try:
505                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
506                 except KeyError:
507                     pass
508                 ext_intf = {
509                     "name": intf_name,
510                     "virtual-interface": intf,
511                     "vnfd-connection-point-ref": intf_name,
512                 }
513                 ext_intfs.append(ext_intf)
514
515     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
516         """ Create VNF objects based on YAML descriptors
517
518         :param scenario_cfg:
519         :type scenario_cfg:
520         :param context_cfg:
521         :return:
522         """
523         trex_lib_path = get_nsb_option('trex_client_lib')
524         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
525
526         if scenario_cfg is None:
527             scenario_cfg = self.scenario_cfg
528
529         if context_cfg is None:
530             context_cfg = self.context_cfg
531
532         vnfs = []
533         # we assume OrderedDict for consistenct in instantiation
534         for node_name, node in context_cfg["nodes"].items():
535             LOG.debug(node)
536             try:
537                 file_name = node["VNF model"]
538             except KeyError:
539                 LOG.debug("no model for %s, skipping", node_name)
540                 continue
541             file_path = scenario_cfg['task_path']
542             with utils.open_relative_file(file_name, file_path) as stream:
543                 vnf_model = stream.read()
544             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
545             # TODO: here add extra context_cfg["nodes"] regardless of template
546             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
547             # force inject pkey if it exists
548             # we want to standardize Heat using pkey as a string so we don't rely
549             # on the filesystem
550             try:
551                 vnfd['mgmt-interface']['pkey'] = node['pkey']
552             except KeyError:
553                 pass
554             self.create_interfaces_from_node(vnfd, node)
555             vnf_impl = self.get_vnf_impl(vnfd['id'])
556             vnf_instance = vnf_impl(node_name, vnfd)
557             vnfs.append(vnf_instance)
558
559         self.vnfs = vnfs
560         return vnfs
561
562     def setup(self):
563         """ Setup infrastructure, provission VNFs & start traffic
564
565         :return:
566         """
567         # 1. Verify if infrastructure mapping can meet topology
568         self.map_topology_to_infrastructure()
569         # 1a. Load VNF models
570         self.load_vnf_models()
571         # 1b. Fill traffic profile with information from topology
572         self._fill_traffic_profile()
573
574         # 2. Provision VNFs
575
576         # link events will cause VNF application to exit
577         # so we should start traffic runners before VNFs
578         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
579         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
580         try:
581             for vnf in chain(traffic_runners, non_traffic_runners):
582                 LOG.info("Instantiating %s", vnf.name)
583                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
584                 LOG.info("Waiting for %s to instantiate", vnf.name)
585                 vnf.wait_for_instantiate()
586         except:
587             LOG.exception("")
588             for vnf in self.vnfs:
589                 vnf.terminate()
590             raise
591
592         # 3. Run experiment
593         # Start listeners first to avoid losing packets
594         for traffic_gen in traffic_runners:
595             traffic_gen.listen_traffic(self.traffic_profile)
596
597         # register collector with yardstick for KPI collection.
598         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
599         self.collector.start()
600
601         # Start the actual traffic
602         for traffic_gen in traffic_runners:
603             LOG.info("Starting traffic on %s", traffic_gen.name)
604             traffic_gen.run_traffic(self.traffic_profile)
605
606     def run(self, result):  # yardstick API
607         """ Yardstick calls run() at intervals defined in the yaml and
608             produces timestamped samples
609
610         :param result: dictionary with results to update
611         :return: None
612         """
613
614         # this is the only method that is check from the runner
615         # so if we have any fatal error it must be raised via these methods
616         # otherwise we will not terminate
617
618         result.update(self.collector.get_kpi())
619
620     def teardown(self):
621         """ Stop the collector and terminate VNF & TG instance
622
623         :return
624         """
625
626         try:
627             try:
628                 self.collector.stop()
629                 for vnf in self.vnfs:
630                     LOG.info("Stopping %s", vnf.name)
631                     vnf.terminate()
632                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
633             finally:
634                 terminate_children()
635         except Exception:
636             # catch any exception in teardown and convert to simple exception
637             # never pass exceptions back to multiprocessing, because some exceptions can
638             # be unpicklable
639             # https://bugs.python.org/issue9400
640             LOG.exception("")
641             raise RuntimeError("Error in teardown")