Add arguments to the traffic profile render
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from collections import defaultdict
16 import copy
17 import logging
18 import ipaddress
19 from itertools import chain
20 import os
21 import re
22 import sys
23
24 import six
25 import yaml
26
27 from yardstick.benchmark.scenarios import base as scenario_base
28 from yardstick.common.constants import LOG_DIR
29 from yardstick.common.process import terminate_children
30 from yardstick.common import utils
31 from yardstick.common.yaml_loader import yaml_load
32 from yardstick.network_services.collector.subscriber import Collector
33 from yardstick.network_services.vnf_generic import vnfdgen
34 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
35 from yardstick.network_services import traffic_profile
36 from yardstick.network_services.traffic_profile import base as tprofile_base
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick import ssh
39
40
41 traffic_profile.register_modules()
42
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SSHError(Exception):
48     """Class handles ssh connection error exception"""
49     pass
50
51
52 class SSHTimeout(SSHError):
53     """Class handles ssh connection timeout exception"""
54     pass
55
56
57 class IncorrectConfig(Exception):
58     """Class handles incorrect configuration during setup"""
59     pass
60
61
62 class IncorrectSetup(Exception):
63     """Class handles incorrect setup during setup"""
64     pass
65
66
67 class SshManager(object):
68     def __init__(self, node, timeout=120):
69         super(SshManager, self).__init__()
70         self.node = node
71         self.conn = None
72         self.timeout = timeout
73
74     def __enter__(self):
75         """
76         args -> network device mappings
77         returns -> ssh connection ready to be used
78         """
79         try:
80             self.conn = ssh.SSH.from_node(self.node)
81             self.conn.wait(timeout=self.timeout)
82         except SSHError as error:
83             LOG.info("connect failed to %s, due to %s", self.node["ip"], error)
84         # self.conn defaults to None
85         return self.conn
86
87     def __exit__(self, exc_type, exc_val, exc_tb):
88         if self.conn:
89             self.conn.close()
90
91
92 class NetworkServiceTestCase(scenario_base.Scenario):
93     """Class handles Generic framework to do pre-deployment VNF &
94        Network service testing  """
95
96     __scenario_type__ = "NSPerf"
97
98     def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
99         super(NetworkServiceTestCase, self).__init__()
100         self.scenario_cfg = scenario_cfg
101         self.context_cfg = context_cfg
102
103         # fixme: create schema to validate all fields have been provided
104         with utils.open_relative_file(scenario_cfg["topology"],
105                                       scenario_cfg['task_path']) as stream:
106             topology_yaml = yaml_load(stream)
107
108         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
109         self.vnfs = []
110         self.collector = None
111         self.traffic_profile = None
112         self.node_netdevs = {}
113
114     def _get_ip_flow_range(self, ip_start_range):
115
116         # IP range is specified as 'x.x.x.x-y.y.y.y'
117         if isinstance(ip_start_range, six.string_types):
118             return ip_start_range
119
120         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
121         if node_name is None:
122             # we are manually specifying the range
123             ip_addr_range = range_or_interface
124         else:
125             node = self.context_cfg["nodes"].get(node_name, {})
126             try:
127                 # the ip_range is the interface name
128                 interface = node.get("interfaces", {})[range_or_interface]
129             except KeyError:
130                 ip = "0.0.0.0"
131                 mask = "255.255.255.0"
132             else:
133                 ip = interface["local_ip"]
134                 # we can't default these values, they must both exist to be valid
135                 mask = interface["netmask"]
136
137             ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)), strict=False)
138             hosts = list(ipaddr.hosts())
139             if len(hosts) > 2:
140                 # skip the first host in case of gateway
141                 ip_addr_range = "{}-{}".format(hosts[1], hosts[-1])
142             else:
143                 LOG.warning("Only single IP in range %s", ipaddr)
144                 # fall back to single IP range
145                 ip_addr_range = ip
146         return ip_addr_range
147
148     def _get_traffic_flow(self):
149         flow = {}
150         try:
151             # TODO: should be .0  or .1 so we can use list
152             # but this also roughly matches uplink_0, downlink_0
153             fflow = self.scenario_cfg["options"]["flow"]
154             for index, src in enumerate(fflow.get("src_ip", [])):
155                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
156
157             for index, dst in enumerate(fflow.get("dst_ip", [])):
158                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
159
160             for index, publicip in enumerate(fflow.get("public_ip", [])):
161                 flow["public_ip_{}".format(index)] = publicip
162
163             for index, src_port in enumerate(fflow.get("src_port", [])):
164                 flow["src_port_{}".format(index)] = src_port
165
166             for index, dst_port in enumerate(fflow.get("dst_port", [])):
167                 flow["dst_port_{}".format(index)] = dst_port
168
169             flow["count"] = fflow["count"]
170         except KeyError:
171             flow = {}
172         return {"flow": flow}
173
174     def _get_traffic_imix(self):
175         try:
176             imix = {"imix": self.scenario_cfg['options']['framesize']}
177         except KeyError:
178             imix = {}
179         return imix
180
181     def _get_traffic_profile(self):
182         profile = self.scenario_cfg["traffic_profile"]
183         path = self.scenario_cfg["task_path"]
184         with utils.open_relative_file(profile, path) as infile:
185             return infile.read()
186
187     def _fill_traffic_profile(self):
188         tprofile = self._get_traffic_profile()
189         extra_args = self.scenario_cfg.get('extra_args', {})
190         tprofile_data = {
191             'flow': self._get_traffic_flow(),
192             'imix': self._get_traffic_imix(),
193             tprofile_base.TrafficProfile.UPLINK: {},
194             tprofile_base.TrafficProfile.DOWNLINK: {},
195             'extra_args': extra_args
196         }
197
198         traffic_vnfd = vnfdgen.generate_vnfd(tprofile, tprofile_data)
199         self.traffic_profile = tprofile_base.TrafficProfile.get(traffic_vnfd)
200
201     def _find_vnf_name_from_id(self, vnf_id):
202         return next((vnfd["vnfd-id-ref"]
203                      for vnfd in self.topology["constituent-vnfd"]
204                      if vnf_id == vnfd["member-vnf-index"]), None)
205
206     @staticmethod
207     def get_vld_networks(networks):
208         # network name is vld_id
209         vld_map = {}
210         for name, n in networks.items():
211             try:
212                 vld_map[n['vld_id']] = n
213             except KeyError:
214                 vld_map[name] = n
215         return vld_map
216
217     @staticmethod
218     def find_node_if(nodes, name, if_name, vld_id):
219         try:
220             # check for xe0, xe1
221             intf = nodes[name]["interfaces"][if_name]
222         except KeyError:
223             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
224             # pop it and re-insert with the correct name from topology
225             intf = nodes[name]["interfaces"].pop(vld_id)
226             nodes[name]["interfaces"][if_name] = intf
227         return intf
228
229     def _resolve_topology(self):
230         for vld in self.topology["vld"]:
231             try:
232                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
233             except (ValueError, TypeError):
234                 raise IncorrectConfig("Topology file corrupted, "
235                                       "wrong endpoint count for connection")
236
237             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
238             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
239
240             node0_if_name = node0_data["vnfd-connection-point-ref"]
241             node1_if_name = node1_data["vnfd-connection-point-ref"]
242
243             try:
244                 nodes = self.context_cfg["nodes"]
245                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
246                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
247
248                 # names so we can do reverse lookups
249                 node0_if["ifname"] = node0_if_name
250                 node1_if["ifname"] = node1_if_name
251
252                 node0_if["node_name"] = node0_name
253                 node1_if["node_name"] = node1_name
254
255                 node0_if["vld_id"] = vld["id"]
256                 node1_if["vld_id"] = vld["id"]
257
258                 # set peer name
259                 node0_if["peer_name"] = node1_name
260                 node1_if["peer_name"] = node0_name
261
262                 # set peer interface name
263                 node0_if["peer_ifname"] = node1_if_name
264                 node1_if["peer_ifname"] = node0_if_name
265
266                 # just load the network
267                 vld_networks = self.get_vld_networks(self.context_cfg["networks"])
268                 node0_if["network"] = vld_networks.get(vld["id"], {})
269                 node1_if["network"] = vld_networks.get(vld["id"], {})
270
271                 node0_if["dst_mac"] = node1_if["local_mac"]
272                 node0_if["dst_ip"] = node1_if["local_ip"]
273
274                 node1_if["dst_mac"] = node0_if["local_mac"]
275                 node1_if["dst_ip"] = node0_if["local_ip"]
276
277             except KeyError:
278                 LOG.exception("")
279                 raise IncorrectConfig("Required interface not found, "
280                                       "topology file corrupted")
281
282         for vld in self.topology['vld']:
283             try:
284                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
285             except (ValueError, TypeError):
286                 raise IncorrectConfig("Topology file corrupted, "
287                                       "wrong endpoint count for connection")
288
289             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
290             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
291
292             node0_if_name = node0_data["vnfd-connection-point-ref"]
293             node1_if_name = node1_data["vnfd-connection-point-ref"]
294
295             nodes = self.context_cfg["nodes"]
296             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
297             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
298
299             # add peer interface dict, but remove circular link
300             # TODO: don't waste memory
301             node0_copy = node0_if.copy()
302             node1_copy = node1_if.copy()
303             node0_if["peer_intf"] = node1_copy
304             node1_if["peer_intf"] = node0_copy
305
306     def _find_vnfd_from_vnf_idx(self, vnf_idx):
307         return next((vnfd for vnfd in self.topology["constituent-vnfd"]
308                      if vnf_idx == vnfd["member-vnf-index"]), None)
309
310     def _update_context_with_topology(self):
311         for vnfd in self.topology["constituent-vnfd"]:
312             vnf_idx = vnfd["member-vnf-index"]
313             vnf_name = self._find_vnf_name_from_id(vnf_idx)
314             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
315             self.context_cfg["nodes"][vnf_name].update(vnfd)
316
317     def _probe_netdevs(self, node, node_dict, timeout=120):
318         try:
319             return self.node_netdevs[node]
320         except KeyError:
321             pass
322
323         netdevs = {}
324         cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
325
326         with SshManager(node_dict, timeout=timeout) as conn:
327             if conn:
328                 exit_status = conn.execute(cmd)[0]
329                 if exit_status != 0:
330                     raise IncorrectSetup("Node's %s lacks ip tool." % node)
331                 exit_status, stdout, _ = conn.execute(
332                     self.FIND_NETDEVICE_STRING)
333                 if exit_status != 0:
334                     raise IncorrectSetup(
335                         "Cannot find netdev info in sysfs" % node)
336                 netdevs = node_dict['netdevs'] = self.parse_netdev_info(stdout)
337
338         self.node_netdevs[node] = netdevs
339         return netdevs
340
341     @classmethod
342     def _probe_missing_values(cls, netdevs, network):
343
344         mac_lower = network['local_mac'].lower()
345         for netdev in netdevs.values():
346             if netdev['address'].lower() != mac_lower:
347                 continue
348             network.update({
349                 'driver': netdev['driver'],
350                 'vpci': netdev['pci_bus_id'],
351                 'ifindex': netdev['ifindex'],
352             })
353
354     def _generate_pod_yaml(self):
355         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
356         # convert OrderedDict to a list
357         # pod.yaml nodes is a list
358         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
359         pod_dict = {
360             "nodes": nodes,
361             "networks": self.context_cfg["networks"]
362         }
363         with open(context_yaml, "w") as context_out:
364             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
365                            explicit_start=True)
366
367     @staticmethod
368     def _serialize_node(node):
369         new_node = copy.deepcopy(node)
370         # name field is required
371         # remove context suffix
372         new_node["name"] = node['name'].split('.')[0]
373         try:
374             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
375         except KeyError:
376             pass
377         return new_node
378
379     TOPOLOGY_REQUIRED_KEYS = frozenset({
380         "vpci", "local_ip", "netmask", "local_mac", "driver"})
381
382     def map_topology_to_infrastructure(self):
383         """ This method should verify if the available resources defined in pod.yaml
384         match the topology.yaml file.
385
386         :return: None. Side effect: context_cfg is updated
387         """
388         num_nodes = len(self.context_cfg["nodes"])
389         # OpenStack instance creation time is probably proportional to the number
390         # of instances
391         timeout = 120 * num_nodes
392         for node, node_dict in self.context_cfg["nodes"].items():
393
394             for network in node_dict["interfaces"].values():
395                 missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
396                 if not missing:
397                     continue
398
399                 # only ssh probe if there are missing values
400                 # ssh probe won't work on Ixia, so we had better define all our values
401                 try:
402                     netdevs = self._probe_netdevs(node, node_dict, timeout=timeout)
403                 except (SSHError, SSHTimeout):
404                     raise IncorrectConfig(
405                         "Unable to probe missing interface fields '%s', on node %s "
406                         "SSH Error" % (', '.join(missing), node))
407                 try:
408                     self._probe_missing_values(netdevs, network)
409                 except KeyError:
410                     pass
411                 else:
412                     missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
413                         network)
414                 if missing:
415                     raise IncorrectConfig(
416                         "Require interface fields '%s' not found, topology file "
417                         "corrupted" % ', '.join(missing))
418
419         # we have to generate pod.yaml here so we have vpci and driver
420         self._generate_pod_yaml()
421         # 3. Use topology file to find connections & resolve dest address
422         self._resolve_topology()
423         self._update_context_with_topology()
424
425     FIND_NETDEVICE_STRING = (
426         r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
427         $1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
428         $1/device/subsystem_vendor $1/device/subsystem_device ; \
429         printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
430         ' sh  \{\}/* \;
431         """)
432
433     BASE_ADAPTER_RE = re.compile(
434         '^/sys/devices/(.*)/net/([^/]*)/([^:]*):(.*)$', re.M)
435
436     @classmethod
437     def parse_netdev_info(cls, stdout):
438         network_devices = defaultdict(dict)
439         matches = cls.BASE_ADAPTER_RE.findall(stdout)
440         for bus_path, interface_name, name, value in matches:
441             dirname, bus_id = os.path.split(bus_path)
442             if 'virtio' in bus_id:
443                 # for some stupid reason VMs include virtio1/
444                 # in PCI device path
445                 bus_id = os.path.basename(dirname)
446             # remove extra 'device/' from 'device/vendor,
447             # device/subsystem_vendor', etc.
448             if 'device/' in name:
449                 name = name.split('/')[1]
450             network_devices[interface_name][name] = value
451             network_devices[interface_name][
452                 'interface_name'] = interface_name
453             network_devices[interface_name]['pci_bus_id'] = bus_id
454         # convert back to regular dict
455         return dict(network_devices)
456
457     @classmethod
458     def get_vnf_impl(cls, vnf_model_id):
459         """ Find the implementing class from vnf_model["vnf"]["name"] field
460
461         :param vnf_model_id: parsed vnfd model ID field
462         :return: subclass of GenericVNF
463         """
464         utils.import_modules_from_package(
465             "yardstick.network_services.vnf_generic.vnf")
466         expected_name = vnf_model_id
467         classes_found = []
468
469         def impl():
470             for name, class_ in ((c.__name__, c) for c in
471                                  utils.itersubclasses(GenericVNF)):
472                 if name == expected_name:
473                     yield class_
474                 classes_found.append(name)
475
476         try:
477             return next(impl())
478         except StopIteration:
479             pass
480
481         raise IncorrectConfig("No implementation for %s found in %s" %
482                               (expected_name, classes_found))
483
484     @staticmethod
485     def create_interfaces_from_node(vnfd, node):
486         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
487         # have to sort so xe0 goes first
488         for intf_name, intf in sorted(node['interfaces'].items()):
489             # only interfaces with vld_id are added.
490             # Thus there are two layers of filters, only intefaces with vld_id
491             # show up in interfaces, and only interfaces with traffic profiles
492             # are used by the generators
493             if intf.get('vld_id'):
494                 # force dpkd_port_num to int so we can do reverse lookup
495                 try:
496                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
497                 except KeyError:
498                     pass
499                 ext_intf = {
500                     "name": intf_name,
501                     "virtual-interface": intf,
502                     "vnfd-connection-point-ref": intf_name,
503                 }
504                 ext_intfs.append(ext_intf)
505
506     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
507         """ Create VNF objects based on YAML descriptors
508
509         :param scenario_cfg:
510         :type scenario_cfg:
511         :param context_cfg:
512         :return:
513         """
514         trex_lib_path = get_nsb_option('trex_client_lib')
515         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
516
517         if scenario_cfg is None:
518             scenario_cfg = self.scenario_cfg
519
520         if context_cfg is None:
521             context_cfg = self.context_cfg
522
523         vnfs = []
524         # we assume OrderedDict for consistenct in instantiation
525         for node_name, node in context_cfg["nodes"].items():
526             LOG.debug(node)
527             try:
528                 file_name = node["VNF model"]
529             except KeyError:
530                 LOG.debug("no model for %s, skipping", node_name)
531                 continue
532             file_path = scenario_cfg['task_path']
533             with utils.open_relative_file(file_name, file_path) as stream:
534                 vnf_model = stream.read()
535             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
536             # TODO: here add extra context_cfg["nodes"] regardless of template
537             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
538             # force inject pkey if it exists
539             # we want to standardize Heat using pkey as a string so we don't rely
540             # on the filesystem
541             try:
542                 vnfd['mgmt-interface']['pkey'] = node['pkey']
543             except KeyError:
544                 pass
545             self.create_interfaces_from_node(vnfd, node)
546             vnf_impl = self.get_vnf_impl(vnfd['id'])
547             vnf_instance = vnf_impl(node_name, vnfd)
548             vnfs.append(vnf_instance)
549
550         self.vnfs = vnfs
551         return vnfs
552
553     def setup(self):
554         """ Setup infrastructure, provission VNFs & start traffic
555
556         :return:
557         """
558         # 1. Verify if infrastructure mapping can meet topology
559         self.map_topology_to_infrastructure()
560         # 1a. Load VNF models
561         self.load_vnf_models()
562         # 1b. Fill traffic profile with information from topology
563         self._fill_traffic_profile()
564
565         # 2. Provision VNFs
566
567         # link events will cause VNF application to exit
568         # so we should start traffic runners before VNFs
569         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
570         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
571         try:
572             for vnf in chain(traffic_runners, non_traffic_runners):
573                 LOG.info("Instantiating %s", vnf.name)
574                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
575                 LOG.info("Waiting for %s to instantiate", vnf.name)
576                 vnf.wait_for_instantiate()
577         except:
578             LOG.exception("")
579             for vnf in self.vnfs:
580                 vnf.terminate()
581             raise
582
583         # 3. Run experiment
584         # Start listeners first to avoid losing packets
585         for traffic_gen in traffic_runners:
586             traffic_gen.listen_traffic(self.traffic_profile)
587
588         # register collector with yardstick for KPI collection.
589         self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
590         self.collector.start()
591
592         # Start the actual traffic
593         for traffic_gen in traffic_runners:
594             LOG.info("Starting traffic on %s", traffic_gen.name)
595             traffic_gen.run_traffic(self.traffic_profile)
596
597     def run(self, result):  # yardstick API
598         """ Yardstick calls run() at intervals defined in the yaml and
599             produces timestamped samples
600
601         :param result: dictionary with results to update
602         :return: None
603         """
604
605         # this is the only method that is check from the runner
606         # so if we have any fatal error it must be raised via these methods
607         # otherwise we will not terminate
608
609         result.update(self.collector.get_kpi())
610
611     def teardown(self):
612         """ Stop the collector and terminate VNF & TG instance
613
614         :return
615         """
616
617         try:
618             try:
619                 self.collector.stop()
620                 for vnf in self.vnfs:
621                     LOG.info("Stopping %s", vnf.name)
622                     vnf.terminate()
623                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
624             finally:
625                 terminate_children()
626         except Exception:
627             # catch any exception in teardown and convert to simple exception
628             # never pass exceptions back to multiprocessing, because some exceptions can
629             # be unpicklable
630             # https://bugs.python.org/issue9400
631             LOG.exception("")
632             raise RuntimeError("Error in teardown")