Import "traffic_profile" modules only once
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from collections import defaultdict
16 import copy
17 import logging
18 import ipaddress
19 from itertools import chain
20 import os
21 import re
22 import sys
23
24 import six
25 import yaml
26
27 from yardstick.benchmark.scenarios import base as scenario_base
28 from yardstick.common.constants import LOG_DIR
29 from yardstick.common.process import terminate_children
30 from yardstick.common import utils
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.network_services.collector.subscriber import Collector
33 from yardstick.network_services.vnf_generic import vnfdgen
34 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
35 from yardstick.network_services import traffic_profile
36 from yardstick.network_services.traffic_profile import base as tprofile_base
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick import ssh
39
40
41 traffic_profile.register_modules()
42
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SSHError(Exception):
48     """Class handles ssh connection error exception"""
49     pass
50
51
52 class SSHTimeout(SSHError):
53     """Class handles ssh connection timeout exception"""
54     pass
55
56
57 class IncorrectConfig(Exception):
58     """Class handles incorrect configuration during setup"""
59     pass
60
61
62 class IncorrectSetup(Exception):
63     """Class handles incorrect setup during setup"""
64     pass
65
66
67 class SshManager(object):
68     def __init__(self, node, timeout=120):
69         super(SshManager, self).__init__()
70         self.node = node
71         self.conn = None
72         self.timeout = timeout
73
74     def __enter__(self):
75         """
76         args -> network device mappings
77         returns -> ssh connection ready to be used
78         """
79         try:
80             self.conn = ssh.SSH.from_node(self.node)
81             self.conn.wait(timeout=self.timeout)
82         except SSHError as error:
83             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
84         # self.conn defaults to None
85         return self.conn
86
87     def __exit__(self, exc_type, exc_val, exc_tb):
88         if self.conn:
89             self.conn.close()
90
91
92 class NetworkServiceTestCase(scenario_base.Scenario):
93     """Class handles Generic framework to do pre-deployment VNF &
94        Network service testing  """
95
96     __scenario_type__ = "NSPerf"
97
98     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
99         super(NetworkServiceTestCase, self).__init__()
100         self.scenario_cfg = scenario_cfg
101         self.context_cfg = context_cfg
102
103         # fixme: create schema to validate all fields have been provided
104         with utils.open_relative_file(scenario_cfg["topology"],
105                                       scenario_cfg['task_path']) as stream:
106             topology_yaml = yaml_load(stream)
107
108         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
109         self.vnfs = []
110         self.collector = None
111         self.traffic_profile = None
112         self.node_netdevs = {}
113
114     def _get_ip_flow_range(self, ip_start_range):
115
116         # IP range is specified as 'x.x.x.x-y.y.y.y'
117         if isinstance(ip_start_range, six.string_types):
118             return ip_start_range
119
120         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
121         if node_name is None:
122             # we are manually specifying the range
123             ip_addr_range = range_or_interface
124         else:
125             node = self.context_cfg["nodes"].get(node_name, {})
126             try:
127                 # the ip_range is the interface name
128                 interface = node.get("interfaces", {})[range_or_interface]
129             except KeyError:
130                 ip = "0.0.0.0"
131                 mask = "255.255.255.0"
132             else:
133                 ip = interface["local_ip"]
134                 # we can't default these values, they must both exist to be valid
135                 mask = interface["netmask"]
136
137             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
138             hosts = list(ipaddr.hosts())
139             if len(hosts) > 2:
140                 # skip the first host in case of gateway
141                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
142             else:
143                 LOG.warning("Only single IP in range %s", ipaddr)
144                 # fall back to single IP range
145                 ip_addr_range = ip
146         return ip_addr_range
147
148     def _get_traffic_flow(self):
149         flow = {}
150         try:
151             # TODO: should be .0  or .1 so we can use list
152             # but this also roughly matches uplink_0, downlink_0
153             fflow = self.scenario_cfg["options"]["flow"]
154             for index, src in enumerate(fflow.get("src_ip", [])):
155                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
156
157             for index, dst in enumerate(fflow.get("dst_ip", [])):
158                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
159
160             for index, publicip in enumerate(fflow.get("public_ip", [])):
161                 flow["public_ip_{}".format(index)] = publicip
162
163             for index, src_port in enumerate(fflow.get("src_port", [])):
164                 flow["src_port_{}".format(index)] = src_port
165
166             for index, dst_port in enumerate(fflow.get("dst_port", [])):
167                 flow["dst_port_{}".format(index)] = dst_port
168
169             flow["count"] = fflow["count"]
170         except KeyError:
171             flow = {}
172         return {"flow": flow}
173
174     def _get_traffic_imix(self):
175         try:
176             imix = {"imix": self.scenario_cfg['options']['framesize']}
177         except KeyError:
178             imix = {}
179         return imix
180
181     def _get_traffic_profile(self):
182         profile = self.scenario_cfg["traffic_profile"]
183         path = self.scenario_cfg["task_path"]
184         with utils.open_relative_file(profile, path) as infile:
185             return infile.read()
186
187     def _fill_traffic_profile(self):
188         traffic_mapping = self._get_traffic_profile()
189         traffic_map_data = {
190             'flow': self._get_traffic_flow(),
191             'imix': self._get_traffic_imix(),
192             tprofile_base.TrafficProfile.UPLINK: {},
193             tprofile_base.TrafficProfile.DOWNLINK: {},
194         }
195
196         traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
197         self.traffic_profile = tprofile_base.TrafficProfile.get(traffic_vnfd)
198         return self.traffic_profile
199
200     def _find_vnf_name_from_id(self, vnf_id):
201         return next((vnfd["vnfd-id-ref"]
202                      for vnfd in self.topology["constituent-vnfd"]
203                      if vnf_id == vnfd["member-vnf-index"]), None)
204
205     @staticmethod
206     def get_vld_networks(networks):
207         # network name is vld_id
208         vld_map = {}
209         for name, n in networks.items():
210             try:
211                 vld_map[n['vld_id']] = n
212             except KeyError:
213                 vld_map[name] = n
214         return vld_map
215
216     @staticmethod
217     def find_node_if(nodes, name, if_name, vld_id):
218         try:
219             # check for xe0, xe1
220             intf = nodes[name]["interfaces"][if_name]
221         except KeyError:
222             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
223             # pop it and re-insert with the correct name from topology
224             intf = nodes[name]["interfaces"].pop(vld_id)
225             nodes[name]["interfaces"][if_name] = intf
226         return intf
227
228     def _resolve_topology(self):
229         for vld in self.topology["vld"]:
230             try:
231                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
232             except (ValueError, TypeError):
233                 raise IncorrectConfig("Topology file corrupted, "
234                                       "wrong endpoint count for connection")
235
236             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
237             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
238
239             node0_if_name = node0_data["vnfd-connection-point-ref"]
240             node1_if_name = node1_data["vnfd-connection-point-ref"]
241
242             try:
243                 nodes = self.context_cfg["nodes"]
244                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
245                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
246
247                 # names so we can do reverse lookups
248                 node0_if["ifname"] = node0_if_name
249                 node1_if["ifname"] = node1_if_name
250
251                 node0_if["node_name"] = node0_name
252                 node1_if["node_name"] = node1_name
253
254                 node0_if["vld_id"] = vld["id"]
255                 node1_if["vld_id"] = vld["id"]
256
257                 # set peer name
258                 node0_if["peer_name"] = node1_name
259                 node1_if["peer_name"] = node0_name
260
261                 # set peer interface name
262                 node0_if["peer_ifname"] = node1_if_name
263                 node1_if["peer_ifname"] = node0_if_name
264
265                 # just load the network
266                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
267                 node0_if["network"] = vld_networks.get(vld["id"], {})
268                 node1_if["network"] = vld_networks.get(vld["id"], {})
269
270                 node0_if["dst_mac"] = node1_if["local_mac"]
271                 node0_if["dst_ip"] = node1_if["local_ip"]
272
273                 node1_if["dst_mac"] = node0_if["local_mac"]
274                 node1_if["dst_ip"] = node0_if["local_ip"]
275
276             except KeyError:
277                 LOG.exception("")
278                 raise IncorrectConfig("Required interface not found, "
279                                       "topology file corrupted")
280
281         for vld in self.topology['vld']:
282             try:
283                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
284             except (ValueError, TypeError):
285                 raise IncorrectConfig("Topology file corrupted, "
286                                       "wrong endpoint count for connection")
287
288             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
289             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
290
291             node0_if_name = node0_data["vnfd-connection-point-ref"]
292             node1_if_name = node1_data["vnfd-connection-point-ref"]
293
294             nodes = self.context_cfg["nodes"]
295             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
296             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
297
298             # add peer interface dict, but remove circular link
299             # TODO: don't waste memory
300             node0_copy = node0_if.copy()
301             node1_copy = node1_if.copy()
302             node0_if["peer_intf"] = node1_copy
303             node1_if["peer_intf"] = node0_copy
304
305     def _find_vnfd_from_vnf_idx(self, vnf_idx):
306         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
307                      if vnf_idx == vnfd["member-vnf-index"]), None)
308
309     def _update_context_with_topology(self):
310         for vnfd in self.topology["constituent-vnfd"]:
311             vnf_idx = vnfd["member-vnf-index"]
312             vnf_name = self._find_vnf_name_from_id(vnf_idx)
313             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
314             self.context_cfg["nodes"][vnf_name].update(vnfd)
315
316     def _probe_netdevs(self, node, node_dict, timeout=120):
317         try:
318             return self.node_netdevs[node]
319         except KeyError:
320             pass
321
322         netdevs = {}
323         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
324
325         with SshManager(node_dict, timeout=timeout) as conn:
326             if conn:
327                 exit_status = conn.execute(cmd)[0]
328                 if exit_status != 0:
329                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
330                 exit_status, stdout, _ = conn.execute(
331                     self.FIND_NETDEVICE_STRING)
332                 if exit_status != 0:
333                     raise IncorrectSetup(
334                         "Cannot find netdev info in sysfs" % node)
335                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
336
337         self.node_netdevs[node] = netdevs
338         return netdevs
339
340     @classmethod
341     def _probe_missing_values(cls, netdevs, network):
342
343         mac_lower = network['local_mac'].lower()
344         for netdev in netdevs.values():
345             if netdev['address'].lower() != mac_lower:
346                 continue
347             network.update({
348                 'driver': netdev['driver'],
349                 'vpci': netdev['pci_bus_id'],
350                 'ifindex': netdev['ifindex'],
351             })
352
353     def _generate_pod_yaml(self):
354         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
355         # convert OrderedDict to a list
356         # pod.yaml nodes is a list
357         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
358         pod_dict = {
359             "nodes": nodes,
360             "networks": self.context_cfg["networks"]
361         }
362         with open(context_yaml, "w") as context_out:
363             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
364                            explicit_start=True)
365
366     @staticmethod
367     def _serialize_node(node):
368         new_node = copy.deepcopy(node)
369         # name field is required
370         # remove context suffix
371         new_node["name"] = node['name'].split('.')[0]
372         try:
373             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
374         except KeyError:
375             pass
376         return new_node
377
378     TOPOLOGY_REQUIRED_KEYS = frozenset({
379         "vpci", "local_ip", "netmask", "local_mac", "driver"})
380
381     def map_topology_to_infrastructure(self):
382         """ This method should verify if the available resources defined in pod.yaml
383         match the topology.yaml file.
384
385         :return: None. Side effect: context_cfg is updated
386         """
387         num_nodes = len(self.context_cfg["nodes"])
388         # OpenStack instance creation time is probably proportional to the number
389         # of instances
390         timeout = 120 * num_nodes
391         for node, node_dict in self.context_cfg["nodes"].items():
392
393             for network in node_dict["interfaces"].values():
394                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
395                 if not missing:
396                     continue
397
398                 # only ssh probe if there are missing values
399                 # ssh probe won't work on Ixia, so we had better define all our values
400                 try:
401                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
402                 except (SSHError, SSHTimeout):
403                     raise IncorrectConfig(
404                         "Unable to probe missing interface fields '%s', on node %s "
405                         "SSH Error" % (', '.join(missing), node))
406                 try:
407                     self._probe_missing_values(netdevs, network)
408                 except KeyError:
409                     pass
410                 else:
411                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
412                         network)
413                 if missing:
414                     raise IncorrectConfig(
415                         "Require interface fields '%s' not found, topology file "
416                         "corrupted" % ', '.join(missing))
417
418         # we have to generate pod.yaml here so we have vpci and driver
419         self._generate_pod_yaml()
420         # 3. Use topology file to find connections & resolve dest address
421         self._resolve_topology()
422         self._update_context_with_topology()
423
424     FIND_NETDEVICE_STRING = (
425         r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
426         $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
427         $1/device/subsystem_vendor $1/device/subsystem_device ; \
428         printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
429         ' sh  \{\}/* \;
430         """)
431     BASE_ADAPTER_RE = re.compile(
432         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
433
434     @classmethod
435     def parse_netdev_info(cls, stdout):
436         network_devices = defaultdict(dict)
437         matches = cls.BASE_ADAPTER_RE.findall(stdout)
438         for bus_path, interface_name, name, value in matches:
439             dirname, bus_id = os.path.split(bus_path)
440             if 'virtio' in bus_id:
441                 # for some stupid reason VMs include virtio1/
442                 # in PCI device path
443                 bus_id = os.path.basename(dirname)
444             # remove extra 'device/' from 'device/vendor,
445             # device/subsystem_vendor', etc.
446             if 'device/' in name:
447                 name = name.split('/')[1]
448             network_devices[interface_name][name] = value
449             network_devices[interface_name][
450                 'interface_name'] = interface_name
451             network_devices[interface_name]['pci_bus_id'] = bus_id
452         # convert back to regular dict
453         return dict(network_devices)
454
455     @classmethod
456     def get_vnf_impl(cls, vnf_model_id):
457         """ Find the implementing class from vnf_model["vnf"]["name"] field
458
459         :param vnf_model_id: parsed vnfd model ID field
460         :return: subclass of GenericVNF
461         """
462         utils.import_modules_from_package(
463             "yardstick.network_services.vnf_generic.vnf")
464         expected_name = vnf_model_id
465         classes_found = []
466
467         def impl():
468             for name, class_ in ((c.__name__, c) for c in
469                                  utils.itersubclasses(GenericVNF)):
470                 if name == expected_name:
471                     yield class_
472                 classes_found.append(name)
473
474         try:
475             return next(impl())
476         except StopIteration:
477             pass
478
479         raise IncorrectConfig("No implementation for %s found in %s" %
480                               (expected_name, classes_found))
481
482     @staticmethod
483     def create_interfaces_from_node(vnfd, node):
484         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
485         # have to sort so xe0 goes first
486         for intf_name, intf in sorted(node['interfaces'].items()):
487             # only interfaces with vld_id are added.
488             # Thus there are two layers of filters, only intefaces with vld_id
489             # show up in interfaces, and only interfaces with traffic profiles
490             # are used by the generators
491             if intf.get('vld_id'):
492                 # force dpkd_port_num to int so we can do reverse lookup
493                 try:
494                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
495                 except KeyError:
496                     pass
497                 ext_intf = {
498                     "name": intf_name,
499                     "virtual-interface": intf,
500                     "vnfd-connection-point-ref": intf_name,
501                 }
502                 ext_intfs.append(ext_intf)
503
504     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
505         """ Create VNF objects based on YAML descriptors
506
507         :param scenario_cfg:
508         :type scenario_cfg:
509         :param context_cfg:
510         :return:
511         """
512         trex_lib_path = get_nsb_option('trex_client_lib')
513         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
514
515         if scenario_cfg is None:
516             scenario_cfg = self.scenario_cfg
517
518         if context_cfg is None:
519             context_cfg = self.context_cfg
520
521         vnfs = []
522         # we assume OrderedDict for consistenct in instantiation
523         for node_name, node in context_cfg["nodes"].items():
524             LOG.debug(node)
525             try:
526                 file_name = node["VNF model"]
527             except KeyError:
528                 LOG.debug("no model for %s, skipping", node_name)
529                 continue
530             file_path = scenario_cfg['task_path']
531             with utils.open_relative_file(file_name, file_path) as stream:
532                 vnf_model = stream.read()
533             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
534             # TODO: here add extra context_cfg["nodes"] regardless of template
535             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
536             # force inject pkey if it exists
537             # we want to standardize Heat using pkey as a string so we don't rely
538             # on the filesystem
539             try:
540                 vnfd['mgmt-interface']['pkey'] = node['pkey']
541             except KeyError:
542                 pass
543             self.create_interfaces_from_node(vnfd, node)
544             vnf_impl = self.get_vnf_impl(vnfd['id'])
545             vnf_instance = vnf_impl(node_name, vnfd)
546             vnfs.append(vnf_instance)
547
548         self.vnfs = vnfs
549         return vnfs
550
551     def setup(self):
552         """ Setup infrastructure, provission VNFs & start traffic
553
554         :return:
555         """
556         # 1. Verify if infrastructure mapping can meet topology
557         self.map_topology_to_infrastructure()
558         # 1a. Load VNF models
559         self.load_vnf_models()
560         # 1b. Fill traffic profile with information from topology
561         self._fill_traffic_profile()
562
563         # 2. Provision VNFs
564
565         # link events will cause VNF application to exit
566         # so we should start traffic runners before VNFs
567         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
568         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
569         try:
570             for vnf in chain(traffic_runners, non_traffic_runners):
571                 LOG.info("Instantiating %s", vnf.name)
572                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
573                 LOG.info("Waiting for %s to instantiate", vnf.name)
574                 vnf.wait_for_instantiate()
575         except:
576             LOG.exception("")
577             for vnf in self.vnfs:
578                 vnf.terminate()
579             raise
580
581         # 3. Run experiment
582         # Start listeners first to avoid losing packets
583         for traffic_gen in traffic_runners:
584             traffic_gen.listen_traffic(self.traffic_profile)
585
586         # register collector with yardstick for KPI collection.
587         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
588         self.collector.start()
589
590         # Start the actual traffic
591         for traffic_gen in traffic_runners:
592             LOG.info("Starting traffic on %s", traffic_gen.name)
593             traffic_gen.run_traffic(self.traffic_profile)
594
595     def run(self, result):  # yardstick API
596         """ Yardstick calls run() at intervals defined in the yaml and
597             produces timestamped samples
598
599         :param result: dictionary with results to update
600         :return: None
601         """
602
603         # this is the only method that is check from the runner
604         # so if we have any fatal error it must be raised via these methods
605         # otherwise we will not terminate
606
607         result.update(self.collector.get_kpi())
608
609     def teardown(self):
610         """ Stop the collector and terminate VNF & TG instance
611
612         :return
613         """
614
615         try:
616             try:
617                 self.collector.stop()
618                 for vnf in self.vnfs:
619                     LOG.info("Stopping %s", vnf.name)
620                     vnf.terminate()
621                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
622             finally:
623                 terminate_children()
624         except Exception:
625             # catch any exception in teardown and convert to simple exception
626             # never pass exceptions back to multiprocessing, because some exceptions can
627             # be unpicklable
628             # https://bugs.python.org/issue9400
629             LOG.exception("")
630             raise RuntimeError("Error in teardown")