Merge "Added traffic update capability to Ixload TG"
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import copy
16 import ipaddress
17 from itertools import chain
18 import logging
19 import os
20 import sys
21 import time
22
23 import six
24 import yaml
25
26 from yardstick.benchmark.contexts import base as context_base
27 from yardstick.benchmark.scenarios import base as scenario_base
28 from yardstick.common.constants import LOG_DIR
29 from yardstick.common import exceptions
30 from yardstick.common.process import terminate_children
31 from yardstick.common import utils
32 from yardstick.network_services.collector.subscriber import Collector
33 from yardstick.network_services.vnf_generic import vnfdgen
34 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
35 from yardstick.network_services import traffic_profile
36 from yardstick.network_services.traffic_profile import base as tprofile_base
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick import ssh
39
40
41 traffic_profile.register_modules()
42
43
44 LOG = logging.getLogger(__name__)
45
46
47 class NetworkServiceTestCase(scenario_base.Scenario):
48     """Class handles Generic framework to do pre-deployment VNF &
49        Network service testing  """
50
51     __scenario_type__ = "NSPerf"
52
53     def __init__(self, scenario_cfg, context_cfg):  # pragma: no cover
54         super(NetworkServiceTestCase, self).__init__()
55         self.scenario_cfg = scenario_cfg
56         self.context_cfg = context_cfg
57
58         self._render_topology()
59         self.vnfs = []
60         self.collector = None
61         self.traffic_profile = None
62         self.node_netdevs = {}
63         self.bin_path = get_nsb_option('bin_path', '')
64         self._mq_ids = []
65
66     def is_ended(self):
67         return self.traffic_profile is not None and self.traffic_profile.is_ended()
68
69     def _get_ip_flow_range(self, ip_start_range):
70         """Retrieve a CIDR first and last viable IPs
71
72         :param ip_start_range: could be the IP range itself or a dictionary
73                with the host name and the port.
74         :return: (str) IP range (min, max) with this format "x.x.x.x-y.y.y.y"
75         """
76         if isinstance(ip_start_range, six.string_types):
77             return ip_start_range
78
79         node_name, range_or_interface = next(iter(ip_start_range.items()),
80                                              (None, '0.0.0.0'))
81         if node_name is None:
82             return range_or_interface
83
84         node = self.context_cfg['nodes'].get(node_name, {})
85         interface = node.get('interfaces', {}).get(range_or_interface)
86         if interface:
87             ip = interface['local_ip']
88             mask = interface['netmask']
89         else:
90             ip = '0.0.0.0'
91             mask = '255.255.255.0'
92
93         ipaddr = ipaddress.ip_network(
94             six.text_type('{}/{}'.format(ip, mask)), strict=False)
95         if ipaddr.prefixlen + 2 < ipaddr.max_prefixlen:
96             ip_addr_range = '{}-{}'.format(ipaddr[2], ipaddr[-2])
97         else:
98             LOG.warning('Only single IP in range %s', ipaddr)
99             ip_addr_range = ip
100         return ip_addr_range
101
102     def _get_traffic_flow(self):
103         flow = {}
104         try:
105             # TODO: should be .0  or .1 so we can use list
106             # but this also roughly matches uplink_0, downlink_0
107             fflow = self.scenario_cfg["options"]["flow"]
108             for index, src in enumerate(fflow.get("src_ip", [])):
109                 flow["src_ip_{}".format(index)] = self._get_ip_flow_range(src)
110
111             for index, dst in enumerate(fflow.get("dst_ip", [])):
112                 flow["dst_ip_{}".format(index)] = self._get_ip_flow_range(dst)
113
114             for index, publicip in enumerate(fflow.get("public_ip", [])):
115                 flow["public_ip_{}".format(index)] = publicip
116
117             for index, src_port in enumerate(fflow.get("src_port", [])):
118                 flow["src_port_{}".format(index)] = src_port
119
120             for index, dst_port in enumerate(fflow.get("dst_port", [])):
121                 flow["dst_port_{}".format(index)] = dst_port
122
123             if "count" in fflow:
124                 flow["count"] = fflow["count"]
125
126             if "srcseed" in fflow:
127                 flow["srcseed"] = fflow["srcseed"]
128
129             if "dstseed" in fflow:
130                 flow["dstseed"] = fflow["dstseed"]
131
132         except KeyError:
133             flow = {}
134         return {"flow": flow}
135
136     def _get_traffic_imix(self):
137         try:
138             imix = {"imix": self.scenario_cfg['options']['framesize']}
139         except KeyError:
140             imix = {}
141         return imix
142
143     def _get_traffic_profile(self):
144         profile = self.scenario_cfg["traffic_profile"]
145         path = self.scenario_cfg["task_path"]
146         with utils.open_relative_file(profile, path) as infile:
147             return infile.read()
148
149     def _get_duration(self):
150         options = self.scenario_cfg.get('options', {})
151         return options.get('duration',
152                            tprofile_base.TrafficProfileConfig.DEFAULT_DURATION)
153
154     def _fill_traffic_profile(self):
155         tprofile = self._get_traffic_profile()
156         extra_args = self.scenario_cfg.get('extra_args', {})
157         tprofile_data = {
158             'flow': self._get_traffic_flow(),
159             'imix': self._get_traffic_imix(),
160             tprofile_base.TrafficProfile.UPLINK: {},
161             tprofile_base.TrafficProfile.DOWNLINK: {},
162             'extra_args': extra_args,
163             'duration': self._get_duration()}
164         traffic_vnfd = vnfdgen.generate_vnfd(tprofile, tprofile_data)
165         self.traffic_profile = tprofile_base.TrafficProfile.get(traffic_vnfd)
166
167     def _get_topology(self):
168         topology = self.scenario_cfg["topology"]
169         path = self.scenario_cfg["task_path"]
170         with utils.open_relative_file(topology, path) as infile:
171             return infile.read()
172
173     def _render_topology(self):
174         topology = self._get_topology()
175         topology_args = self.scenario_cfg.get('extra_args', {})
176         topolgy_data = {
177             'extra_args': topology_args
178         }
179         topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data)
180         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
181
182     def _find_vnf_name_from_id(self, vnf_id):  # pragma: no cover
183         return next((vnfd["vnfd-id-ref"]
184                      for vnfd in self.topology["constituent-vnfd"]
185                      if vnf_id == vnfd["member-vnf-index"]), None)
186
187     def _find_vnfd_from_vnf_idx(self, vnf_id):  # pragma: no cover
188         return next((vnfd
189                      for vnfd in self.topology["constituent-vnfd"]
190                      if vnf_id == vnfd["member-vnf-index"]), None)
191
192     @staticmethod
193     def find_node_if(nodes, name, if_name, vld_id):  # pragma: no cover
194         try:
195             # check for xe0, xe1
196             intf = nodes[name]["interfaces"][if_name]
197         except KeyError:
198             # if not xe0, then maybe vld_id,  uplink_0, downlink_0
199             # pop it and re-insert with the correct name from topology
200             intf = nodes[name]["interfaces"].pop(vld_id)
201             nodes[name]["interfaces"][if_name] = intf
202         return intf
203
204     def _resolve_topology(self):
205         for vld in self.topology["vld"]:
206             try:
207                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
208             except (ValueError, TypeError):
209                 raise exceptions.IncorrectConfig(
210                     error_msg='Topology file corrupted, wrong endpoint count '
211                               'for connection')
212
213             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
214             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
215
216             node0_if_name = node0_data["vnfd-connection-point-ref"]
217             node1_if_name = node1_data["vnfd-connection-point-ref"]
218
219             try:
220                 nodes = self.context_cfg["nodes"]
221                 node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
222                 node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
223
224                 # names so we can do reverse lookups
225                 node0_if["ifname"] = node0_if_name
226                 node1_if["ifname"] = node1_if_name
227
228                 node0_if["node_name"] = node0_name
229                 node1_if["node_name"] = node1_name
230
231                 node0_if["vld_id"] = vld["id"]
232                 node1_if["vld_id"] = vld["id"]
233
234                 # set peer name
235                 node0_if["peer_name"] = node1_name
236                 node1_if["peer_name"] = node0_name
237
238                 # set peer interface name
239                 node0_if["peer_ifname"] = node1_if_name
240                 node1_if["peer_ifname"] = node0_if_name
241
242                 # just load the network
243                 vld_networks = {n.get('vld_id', name): n for name, n in
244                                 self.context_cfg["networks"].items()}
245
246                 node0_if["network"] = vld_networks.get(vld["id"], {})
247                 node1_if["network"] = vld_networks.get(vld["id"], {})
248
249                 node0_if["dst_mac"] = node1_if["local_mac"]
250                 node0_if["dst_ip"] = node1_if["local_ip"]
251
252                 node1_if["dst_mac"] = node0_if["local_mac"]
253                 node1_if["dst_ip"] = node0_if["local_ip"]
254
255             except KeyError:
256                 LOG.exception("")
257                 raise exceptions.IncorrectConfig(
258                     error_msg='Required interface not found, topology file '
259                               'corrupted')
260
261         for vld in self.topology['vld']:
262             try:
263                 node0_data, node1_data = vld["vnfd-connection-point-ref"]
264             except (ValueError, TypeError):
265                 raise exceptions.IncorrectConfig(
266                     error_msg='Topology file corrupted, wrong endpoint count '
267                               'for connection')
268
269             node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
270             node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
271
272             node0_if_name = node0_data["vnfd-connection-point-ref"]
273             node1_if_name = node1_data["vnfd-connection-point-ref"]
274
275             nodes = self.context_cfg["nodes"]
276             node0_if = self.find_node_if(nodes, node0_name, node0_if_name, vld["id"])
277             node1_if = self.find_node_if(nodes, node1_name, node1_if_name, vld["id"])
278
279             # add peer interface dict, but remove circular link
280             # TODO: don't waste memory
281             node0_copy = node0_if.copy()
282             node1_copy = node1_if.copy()
283             node0_if["peer_intf"] = node1_copy
284             node1_if["peer_intf"] = node0_copy
285
286     def _update_context_with_topology(self):  # pragma: no cover
287         for vnfd in self.topology["constituent-vnfd"]:
288             vnf_idx = vnfd["member-vnf-index"]
289             vnf_name = self._find_vnf_name_from_id(vnf_idx)
290             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
291             self.context_cfg["nodes"][vnf_name].update(vnfd)
292
293     def _generate_pod_yaml(self):  # pragma: no cover
294         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
295         # convert OrderedDict to a list
296         # pod.yaml nodes is a list
297         nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
298         pod_dict = {
299             "nodes": nodes,
300             "networks": self.context_cfg["networks"]
301         }
302         with open(context_yaml, "w") as context_out:
303             yaml.safe_dump(pod_dict, context_out, default_flow_style=False,
304                            explicit_start=True)
305
306     @staticmethod
307     def _serialize_node(node):  # pragma: no cover
308         new_node = copy.deepcopy(node)
309         # name field is required
310         # remove context suffix
311         new_node["name"] = node['name'].split('.')[0]
312         try:
313             new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
314         except KeyError:
315             pass
316         return new_node
317
318     def map_topology_to_infrastructure(self):
319         """ This method should verify if the available resources defined in pod.yaml
320         match the topology.yaml file.
321
322         :return: None. Side effect: context_cfg is updated
323         """
324         # 3. Use topology file to find connections & resolve dest address
325         self._resolve_topology()
326         self._update_context_with_topology()
327
328     @classmethod
329     def get_vnf_impl(cls, vnf_model_id):  # pragma: no cover
330         """ Find the implementing class from vnf_model["vnf"]["name"] field
331
332         :param vnf_model_id: parsed vnfd model ID field
333         :return: subclass of GenericVNF
334         """
335         utils.import_modules_from_package(
336             "yardstick.network_services.vnf_generic.vnf")
337         expected_name = vnf_model_id
338         classes_found = []
339
340         def impl():
341             for name, class_ in ((c.__name__, c) for c in
342                                  utils.itersubclasses(GenericVNF)):
343                 if name == expected_name:
344                     yield class_
345                 classes_found.append(name)
346
347         try:
348             return next(impl())
349         except StopIteration:
350             pass
351
352         message = ('No implementation for %s found in %s'
353                    % (expected_name, classes_found))
354         raise exceptions.IncorrectConfig(error_msg=message)
355
356     @staticmethod
357     def create_interfaces_from_node(vnfd, node):  # pragma: no cover
358         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
359         # have to sort so xe0 goes first
360         for intf_name, intf in sorted(node['interfaces'].items()):
361             # only interfaces with vld_id are added.
362             # Thus there are two layers of filters, only intefaces with vld_id
363             # show up in interfaces, and only interfaces with traffic profiles
364             # are used by the generators
365             if intf.get('vld_id'):
366                 # force dpkd_port_num to int so we can do reverse lookup
367                 try:
368                     intf['dpdk_port_num'] = int(intf['dpdk_port_num'])
369                 except KeyError:
370                     pass
371                 ext_intf = {
372                     "name": intf_name,
373                     "virtual-interface": intf,
374                     "vnfd-connection-point-ref": intf_name,
375                 }
376                 ext_intfs.append(ext_intf)
377
378     def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
379         """ Create VNF objects based on YAML descriptors
380
381         :param scenario_cfg:
382         :type scenario_cfg:
383         :param context_cfg:
384         :return:
385         """
386         trex_lib_path = get_nsb_option('trex_client_lib')
387         sys.path[:] = list(chain([trex_lib_path], (x for x in sys.path if x != trex_lib_path)))
388
389         if scenario_cfg is None:
390             scenario_cfg = self.scenario_cfg
391
392         if context_cfg is None:
393             context_cfg = self.context_cfg
394
395         vnfs = []
396         # we assume OrderedDict for consistency in instantiation
397         for node_name, node in context_cfg["nodes"].items():
398             LOG.debug(node)
399             try:
400                 file_name = node["VNF model"]
401             except KeyError:
402                 LOG.debug("no model for %s, skipping", node_name)
403                 continue
404             file_path = scenario_cfg['task_path']
405             with utils.open_relative_file(file_name, file_path) as stream:
406                 vnf_model = stream.read()
407             vnfd = vnfdgen.generate_vnfd(vnf_model, node)
408             # TODO: here add extra context_cfg["nodes"] regardless of template
409             vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
410             # force inject pkey if it exists
411             # we want to standardize Heat using pkey as a string so we don't rely
412             # on the filesystem
413             try:
414                 vnfd['mgmt-interface']['pkey'] = node['pkey']
415             except KeyError:
416                 pass
417             self.create_interfaces_from_node(vnfd, node)
418             vnf_impl = self.get_vnf_impl(vnfd['id'])
419             vnf_instance = vnf_impl(node_name, vnfd, scenario_cfg['task_id'])
420             vnfs.append(vnf_instance)
421
422         self.vnfs = vnfs
423         return vnfs
424
425     def setup(self):
426         """Setup infrastructure, provission VNFs & start traffic"""
427         # 1. Verify if infrastructure mapping can meet topology
428         self.map_topology_to_infrastructure()
429         # 1a. Load VNF models
430         self.load_vnf_models()
431         # 1b. Fill traffic profile with information from topology
432         self._fill_traffic_profile()
433
434         # 2. Provision VNFs
435
436         # link events will cause VNF application to exit
437         # so we should start traffic runners before VNFs
438         traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
439         non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
440         try:
441             for vnf in chain(traffic_runners, non_traffic_runners):
442                 LOG.info("Instantiating %s", vnf.name)
443                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
444                 LOG.info("Waiting for %s to instantiate", vnf.name)
445                 vnf.wait_for_instantiate()
446         except:
447             LOG.exception("")
448             for vnf in self.vnfs:
449                 vnf.terminate()
450             raise
451
452         # we have to generate pod.yaml here after VNF has probed so we know vpci and driver
453         self._generate_pod_yaml()
454
455         # 3. Run experiment
456         # Start listeners first to avoid losing packets
457         for traffic_gen in traffic_runners:
458             traffic_gen.listen_traffic(self.traffic_profile)
459
460         # register collector with yardstick for KPI collection.
461         self.collector = Collector(self.vnfs, context_base.Context.get_physical_nodes())
462         self.collector.start()
463
464         # Start the actual traffic
465         for traffic_gen in traffic_runners:
466             LOG.info("Starting traffic on %s", traffic_gen.name)
467             traffic_gen.run_traffic(self.traffic_profile)
468             self._mq_ids.append(traffic_gen.get_mq_producer_id())
469
470     def get_mq_ids(self):  # pragma: no cover
471         """Return stored MQ producer IDs"""
472         return self._mq_ids
473
474     def run(self, result):  # yardstick API
475         """ Yardstick calls run() at intervals defined in the yaml and
476             produces timestamped samples
477
478         :param result: dictionary with results to update
479         :return: None
480         """
481
482         # this is the only method that is check from the runner
483         # so if we have any fatal error it must be raised via these methods
484         # otherwise we will not terminate
485
486         result.update(self.collector.get_kpi())
487
488     def teardown(self):
489         """ Stop the collector and terminate VNF & TG instance
490
491         :return
492         """
493
494         try:
495             try:
496                 self.collector.stop()
497                 for vnf in self.vnfs:
498                     LOG.info("Stopping %s", vnf.name)
499                     vnf.terminate()
500                 LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
501             finally:
502                 terminate_children()
503         except Exception:
504             # catch any exception in teardown and convert to simple exception
505             # never pass exceptions back to multiprocessing, because some exceptions can
506             # be unpicklable
507             # https://bugs.python.org/issue9400
508             LOG.exception("")
509             raise RuntimeError("Error in teardown")
510
511     def pre_run_wait_time(self, time_seconds):  # pragma: no cover
512         """Time waited before executing the run method"""
513         time.sleep(time_seconds)
514
515     def post_run_wait_time(self, time_seconds):  # pragma: no cover
516         """Time waited after executing the run method"""
517         pass