Merge "Use assertIn(x, y) instead of other variations"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 from six.moves import cStringIO
23 import subprocess
24 import time
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
42
43
44 DPDK_VERSION = "dpdk-16.07"
45
46 LOG = logging.getLogger(__name__)
47
48
49 REMOTE_TMP = "/tmp"
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
96     PIPELINE_COMMAND = ''
97     VNF_TYPE = "SAMPLE"
98
99     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100         super(SetupEnvHelper, self).__init__()
101         self.vnfd_helper = vnfd_helper
102         self.ssh_helper = ssh_helper
103         self.scenario_helper = scenario_helper
104
105     def build_config(self):
106         raise NotImplementedError
107
108     def setup_vnf_environment(self):
109         pass
110
111     def kill_vnf(self):
112         pass
113
114     def tear_down(self):
115         raise NotImplementedError
116
117
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
119
120     APP_NAME = 'DpdkVnf'
121     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
122
123     @staticmethod
124     def _update_packet_type(ip_pipeline_cfg, traffic_options):
125         match_str = 'pkt_type = ipv4'
126         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128         return pipeline_config_str
129
130     @classmethod
131     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132         traffic_type = traffic_options['traffic_type']
133
134         if traffic_options['vnf_type'] is not cls.APP_NAME:
135             match_str = 'traffic_type = 4'
136             replace_str = 'traffic_type = {0}'.format(traffic_type)
137
138         elif traffic_type == 4:
139             match_str = 'pkt_type = ipv4'
140             replace_str = 'pkt_type = ipv4'
141
142         else:
143             match_str = 'pkt_type = ipv4'
144             replace_str = 'pkt_type = ipv6'
145
146         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147         return pipeline_config_str
148
149     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151         self.all_ports = None
152         self.bound_pci = None
153         self.socket = None
154         self.used_drivers = None
155         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
156
157     def _setup_hugepages(self):
158         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
160
161         memory_path = \
162             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
164
165         if hugepages == "2048kB":
166             pages = 8192
167         else:
168             pages = 16
169
170         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
171
172     def build_config(self):
173         vnf_cfg = self.scenario_helper.vnf_cfg
174         task_path = self.scenario_helper.task_path
175
176         lb_count = vnf_cfg.get('lb_count', 3)
177         lb_config = vnf_cfg.get('lb_config', 'SW')
178         worker_config = vnf_cfg.get('worker_config', '1C/1T')
179         worker_threads = vnf_cfg.get('worker_threads', 3)
180
181         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
182         traffic_options = {
183             'traffic_type': traffic_type,
184             'pkt_type': 'ipv%s' % traffic_type,
185             'vnf_type': self.VNF_TYPE,
186         }
187
188         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189         config_basename = posixpath.basename(self.CFG_CONFIG)
190         script_basename = posixpath.basename(self.CFG_SCRIPT)
191         multiport = MultiPortConfig(self.scenario_helper.topology,
192                                     config_tpl_cfg,
193                                     config_basename,
194                                     self.vnfd_helper,
195                                     self.VNF_TYPE,
196                                     lb_count,
197                                     worker_threads,
198                                     worker_config,
199                                     lb_config,
200                                     self.socket)
201
202         multiport.generate_config()
203         with open(self.CFG_CONFIG) as handle:
204             new_config = handle.read()
205
206         new_config = self._update_traffic_type(new_config, traffic_options)
207         new_config = self._update_packet_type(new_config, traffic_options)
208
209         self.ssh_helper.upload_config_file(config_basename, new_config)
210         self.ssh_helper.upload_config_file(script_basename,
211                                            multiport.generate_script(self.vnfd_helper))
212
213         LOG.info("Provision and start the %s", self.APP_NAME)
214         self._build_pipeline_kwargs()
215         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
216
217     def _build_pipeline_kwargs(self):
218         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219         # count the number of actual ports in the list of pairs
220         # remove duplicate ports
221         # this is really a mapping from LINK ID to DPDK PMD ID
222         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224         ports = self.vnfd_helper.port_pairs.all_ports
225         port_nums = self.vnfd_helper.port_nums(ports)
226         # create mask from all the dpdk port numbers
227         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228         self.pipeline_kwargs = {
229             'cfg_file': self.CFG_CONFIG,
230             'script': self.CFG_SCRIPT,
231             'port_mask_hex': ports_mask_hex,
232             'tool_path': tool_path,
233         }
234
235     def setup_vnf_environment(self):
236         self._setup_dpdk()
237         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
238         self.kill_vnf()
239         # bind before _setup_resources so we can use dpdk_port_num
240         self._detect_and_bind_drivers()
241         resource = self._setup_resources()
242         return resource
243
244     def kill_vnf(self):
245         # pkill is not matching, debug with pgrep
246         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
247         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248         # have to use exact match
249         # try using killall to match
250         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
251
252     def _setup_dpdk(self):
253         """ setup dpdk environment needed for vnf to run """
254
255         self._setup_hugepages()
256         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
257
258         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
259         if exit_status == 0:
260             return
261
262         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
263         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
264         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
265         if exit_status != 0:
266             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
267
268     def get_collectd_options(self):
269         options = self.scenario_helper.all_options.get("collectd", {})
270         # override with specific node settings
271         options.update(self.scenario_helper.options.get("collectd", {}))
272         return options
273
274     def _setup_resources(self):
275         # what is this magic?  how do we know which socket is for which port?
276         # what about quad-socket?
277         if any(v[5] == "0" for v in self.bound_pci):
278             self.socket = 0
279         else:
280             self.socket = 1
281
282         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
283         # this won't work because we don't have DPDK port numbers yet
284         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
285         port_names = (intf["name"] for intf in ports)
286         collectd_options = self.get_collectd_options()
287         plugins = collectd_options.get("plugins", {})
288         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
289         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
290                                plugins=plugins, interval=collectd_options.get("interval"),
291                                timeout=self.scenario_helper.timeout)
292
293     def _detect_and_bind_drivers(self):
294         interfaces = self.vnfd_helper.interfaces
295
296         self.dpdk_bind_helper.read_status()
297         self.dpdk_bind_helper.save_used_drivers()
298
299         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
300
301         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
302         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
303             try:
304                 intf = next(v for v in interfaces
305                             if vpci == v['virtual-interface']['vpci'])
306                 # force to int
307                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
308             except:  # pylint: disable=bare-except
309                 pass
310         time.sleep(2)
311
312     def get_local_iface_name_by_vpci(self, vpci):
313         find_net_cmd = self.FIND_NET_CMD.format(vpci)
314         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
315         if exit_status == 0:
316             return stdout
317         return None
318
319     def tear_down(self):
320         self.dpdk_bind_helper.rebind_drivers()
321
322
323 class ResourceHelper(object):
324
325     COLLECT_KPI = ''
326     MAKE_INSTALL = 'cd {0} && make && sudo make install'
327     RESOURCE_WORD = 'sample'
328
329     COLLECT_MAP = {}
330
331     def __init__(self, setup_helper):
332         super(ResourceHelper, self).__init__()
333         self.resource = None
334         self.setup_helper = setup_helper
335         self.ssh_helper = setup_helper.ssh_helper
336
337     def setup(self):
338         self.resource = self.setup_helper.setup_vnf_environment()
339
340     def generate_cfg(self):
341         pass
342
343     def _collect_resource_kpi(self):
344         result = {}
345         status = self.resource.check_if_system_agent_running("collectd")[0]
346         if status == 0:
347             result = self.resource.amqp_collect_nfvi_kpi()
348
349         result = {"core": result}
350         return result
351
352     def start_collect(self):
353         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
354         self.resource.start()
355         self.resource.amqp_process_for_nfvi_kpi()
356
357     def stop_collect(self):
358         if self.resource:
359             self.resource.stop()
360
361     def collect_kpi(self):
362         return self._collect_resource_kpi()
363
364
365 class ClientResourceHelper(ResourceHelper):
366
367     RUN_DURATION = 60
368     QUEUE_WAIT_TIME = 5
369     SYNC_PORT = 1
370     ASYNC_PORT = 2
371
372     def __init__(self, setup_helper):
373         super(ClientResourceHelper, self).__init__(setup_helper)
374         self.vnfd_helper = setup_helper.vnfd_helper
375         self.scenario_helper = setup_helper.scenario_helper
376
377         self.client = None
378         self.client_started = Value('i', 0)
379         self.all_ports = None
380         self._queue = Queue()
381         self._result = {}
382         self._terminated = Value('i', 0)
383
384     def _build_ports(self):
385         self.networks = self.vnfd_helper.port_pairs.networks
386         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
387         self.downlink_ports = \
388             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
389         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
390
391     def port_num(self, intf):
392         # by default return port num
393         return self.vnfd_helper.port_num(intf)
394
395     def get_stats(self, *args, **kwargs):
396         try:
397             return self.client.get_stats(*args, **kwargs)
398         except STLError:
399             LOG.exception("TRex client not connected")
400             return {}
401
402     def generate_samples(self, ports, key=None, default=None):
403         # needs to be used ports
404         last_result = self.get_stats(ports)
405         key_value = last_result.get(key, default)
406
407         if not isinstance(last_result, Mapping):  # added for mock unit test
408             self._terminated.value = 1
409             return {}
410
411         samples = {}
412         # recalculate port for interface and see if it matches ports provided
413         for intf in self.vnfd_helper.interfaces:
414             name = intf["name"]
415             port = self.vnfd_helper.port_num(name)
416             if port in ports:
417                 xe_value = last_result.get(port, {})
418                 samples[name] = {
419                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
420                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
421                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
422                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
423                     "in_packets": int(xe_value.get("ipackets", 0)),
424                     "out_packets": int(xe_value.get("opackets", 0)),
425                 }
426                 if key:
427                     samples[name][key] = key_value
428         return samples
429
430     def _run_traffic_once(self, traffic_profile):
431         traffic_profile.execute_traffic(self)
432         self.client_started.value = 1
433         time.sleep(self.RUN_DURATION)
434         samples = self.generate_samples(traffic_profile.ports)
435         time.sleep(self.QUEUE_WAIT_TIME)
436         self._queue.put(samples)
437
438     def run_traffic(self, traffic_profile):
439         # if we don't do this we can hang waiting for the queue to drain
440         # have to do this in the subprocess
441         self._queue.cancel_join_thread()
442         # fixme: fix passing correct trex config file,
443         # instead of searching the default path
444         try:
445             self._build_ports()
446             self.client = self._connect()
447             self.client.reset(ports=self.all_ports)
448             self.client.remove_all_streams(self.all_ports)  # remove all streams
449             traffic_profile.register_generator(self)
450
451             while self._terminated.value == 0:
452                 self._run_traffic_once(traffic_profile)
453
454             self.client.stop(self.all_ports)
455             self.client.disconnect()
456             self._terminated.value = 0
457         except STLError:
458             if self._terminated.value:
459                 LOG.debug("traffic generator is stopped")
460                 return  # return if trex/tg server is stopped.
461             raise
462
463     def terminate(self):
464         self._terminated.value = 1  # stop client
465
466     def clear_stats(self, ports=None):
467         if ports is None:
468             ports = self.all_ports
469         self.client.clear_stats(ports=ports)
470
471     def start(self, ports=None, *args, **kwargs):
472         # pylint: disable=keyword-arg-before-vararg
473         # NOTE(ralonsoh): defining keyworded arguments before variable
474         # positional arguments is a bug. This function definition doesn't work
475         # in Python 2, although it works in Python 3. Reference:
476         # https://www.python.org/dev/peps/pep-3102/
477         if ports is None:
478             ports = self.all_ports
479         self.client.start(ports=ports, *args, **kwargs)
480
481     def collect_kpi(self):
482         if not self._queue.empty():
483             kpi = self._queue.get()
484             self._result.update(kpi)
485             LOG.debug('Got KPIs from _queue for %s %s',
486                       self.scenario_helper.name, self.RESOURCE_WORD)
487         return self._result
488
489     def _connect(self, client=None):
490         if client is None:
491             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
492                                server=self.vnfd_helper.mgmt_interface["ip"],
493                                verbose_level=LoggerApi.VERBOSE_QUIET)
494
495         # try to connect with 5s intervals, 30s max
496         for idx in range(6):
497             try:
498                 client.connect()
499                 break
500             except STLError:
501                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
502                 time.sleep(5)
503         return client
504
505
506 class Rfc2544ResourceHelper(object):
507
508     DEFAULT_CORRELATED_TRAFFIC = False
509     DEFAULT_LATENCY = False
510     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
511
512     def __init__(self, scenario_helper):
513         super(Rfc2544ResourceHelper, self).__init__()
514         self.scenario_helper = scenario_helper
515         self._correlated_traffic = None
516         self.iteration = Value('i', 0)
517         self._latency = None
518         self._rfc2544 = None
519         self._tolerance_low = None
520         self._tolerance_high = None
521
522     @property
523     def rfc2544(self):
524         if self._rfc2544 is None:
525             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
526         return self._rfc2544
527
528     @property
529     def tolerance_low(self):
530         if self._tolerance_low is None:
531             self.get_rfc_tolerance()
532         return self._tolerance_low
533
534     @property
535     def tolerance_high(self):
536         if self._tolerance_high is None:
537             self.get_rfc_tolerance()
538         return self._tolerance_high
539
540     @property
541     def correlated_traffic(self):
542         if self._correlated_traffic is None:
543             self._correlated_traffic = \
544                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
545
546         return self._correlated_traffic
547
548     @property
549     def latency(self):
550         if self._latency is None:
551             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
552         return self._latency
553
554     def get_rfc2544(self, name, default=None):
555         return self.rfc2544.get(name, default)
556
557     def get_rfc_tolerance(self):
558         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
559         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
560         self._tolerance_low = next(tolerance_iter)
561         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
562
563
564 class SampleVNFDeployHelper(object):
565
566     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
567     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
568     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
569
570     def __init__(self, vnfd_helper, ssh_helper):
571         super(SampleVNFDeployHelper, self).__init__()
572         self.ssh_helper = ssh_helper
573         self.vnfd_helper = vnfd_helper
574
575     def deploy_vnfs(self, app_name):
576         vnf_bin = self.ssh_helper.join_bin_path(app_name)
577         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
578         if not exit_status:
579             return
580
581         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
582         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
583         time.sleep(2)
584         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
585         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
586
587         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
588         time.sleep(2)
589         http_proxy = os.environ.get('http_proxy', '')
590         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
591         LOG.debug(cmd)
592         self.ssh_helper.execute(cmd)
593         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
594         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
595         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
596
597
598 class ScenarioHelper(object):
599
600     DEFAULT_VNF_CFG = {
601         'lb_config': 'SW',
602         'lb_count': 1,
603         'worker_config': '1C/1T',
604         'worker_threads': 1,
605     }
606
607     def __init__(self, name):
608         self.name = name
609         self.scenario_cfg = None
610
611     @property
612     def task_path(self):
613         return self.scenario_cfg['task_path']
614
615     @property
616     def nodes(self):
617         return self.scenario_cfg.get('nodes')
618
619     @property
620     def all_options(self):
621         return self.scenario_cfg.get('options', {})
622
623     @property
624     def options(self):
625         return self.all_options.get(self.name, {})
626
627     @property
628     def vnf_cfg(self):
629         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
630
631     @property
632     def topology(self):
633         return self.scenario_cfg['topology']
634
635     @property
636     def timeout(self):
637         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
638
639
640 class SampleVNF(GenericVNF):
641     """ Class providing file-like API for generic VNF implementation """
642
643     VNF_PROMPT = "pipeline>"
644     WAIT_TIME = 1
645     WAIT_TIME_FOR_SCRIPT = 10
646     APP_NAME = "SampleVNF"
647     # we run the VNF interactively, so the ssh command will timeout after this long
648
649     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
650         super(SampleVNF, self).__init__(name, vnfd)
651         self.bin_path = get_nsb_option('bin_path', '')
652
653         self.scenario_helper = ScenarioHelper(self.name)
654         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
655
656         if setup_env_helper_type is None:
657             setup_env_helper_type = SetupEnvHelper
658
659         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
660                                                   self.ssh_helper,
661                                                   self.scenario_helper)
662
663         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
664
665         if resource_helper_type is None:
666             resource_helper_type = ResourceHelper
667
668         self.resource_helper = resource_helper_type(self.setup_helper)
669
670         self.context_cfg = None
671         self.nfvi_context = None
672         self.pipeline_kwargs = {}
673         self.uplink_ports = None
674         self.downlink_ports = None
675         # NOTE(esm): make QueueFileWrapper invert-able so that we
676         #            never have to manage the queues
677         self.q_in = Queue()
678         self.q_out = Queue()
679         self.queue_wrapper = None
680         self.run_kwargs = {}
681         self.used_drivers = {}
682         self.vnf_port_pairs = None
683         self._vnf_process = None
684
685     def _build_ports(self):
686         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
687         self.networks = self._port_pairs.networks
688         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
689         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
690         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
691
692     def _get_route_data(self, route_index, route_type):
693         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
694         for _ in range(route_index):
695             next(route_iter, '')
696         return next(route_iter, {}).get(route_type, '')
697
698     def _get_port0localip6(self):
699         return_value = self._get_route_data(0, 'network')
700         LOG.info("_get_port0localip6 : %s", return_value)
701         return return_value
702
703     def _get_port1localip6(self):
704         return_value = self._get_route_data(1, 'network')
705         LOG.info("_get_port1localip6 : %s", return_value)
706         return return_value
707
708     def _get_port0prefixlen6(self):
709         return_value = self._get_route_data(0, 'netmask')
710         LOG.info("_get_port0prefixlen6 : %s", return_value)
711         return return_value
712
713     def _get_port1prefixlen6(self):
714         return_value = self._get_route_data(1, 'netmask')
715         LOG.info("_get_port1prefixlen6 : %s", return_value)
716         return return_value
717
718     def _get_port0gateway6(self):
719         return_value = self._get_route_data(0, 'network')
720         LOG.info("_get_port0gateway6 : %s", return_value)
721         return return_value
722
723     def _get_port1gateway6(self):
724         return_value = self._get_route_data(1, 'network')
725         LOG.info("_get_port1gateway6 : %s", return_value)
726         return return_value
727
728     def _start_vnf(self):
729         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
730         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
731         self._vnf_process = Process(name=name, target=self._run)
732         self._vnf_process.start()
733
734     def _vnf_up_post(self):
735         pass
736
737     def instantiate(self, scenario_cfg, context_cfg):
738         self.scenario_helper.scenario_cfg = scenario_cfg
739         self.context_cfg = context_cfg
740         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
741         # self.nfvi_context = None
742
743         # vnf deploy is unsupported, use ansible playbooks
744         if self.scenario_helper.options.get("vnf_deploy", False):
745             self.deploy_helper.deploy_vnfs(self.APP_NAME)
746         self.resource_helper.setup()
747         self._start_vnf()
748
749     def wait_for_instantiate(self):
750         buf = []
751         time.sleep(self.WAIT_TIME)  # Give some time for config to load
752         while True:
753             if not self._vnf_process.is_alive():
754                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
755
756             # NOTE(esm): move to QueueFileWrapper
757             while self.q_out.qsize() > 0:
758                 buf.append(self.q_out.get())
759                 message = ''.join(buf)
760                 if self.VNF_PROMPT in message:
761                     LOG.info("%s VNF is up and running.", self.APP_NAME)
762                     self._vnf_up_post()
763                     self.queue_wrapper.clear()
764                     self.resource_helper.start_collect()
765                     return self._vnf_process.exitcode
766
767                 if "PANIC" in message:
768                     raise RuntimeError("Error starting %s VNF." %
769                                        self.APP_NAME)
770
771             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
772             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
773             # Send ENTER to display a new prompt in case the prompt text was corrupted
774             # by other VNF output
775             self.q_in.put('\r\n')
776
777     def _build_run_kwargs(self):
778         self.run_kwargs = {
779             'stdin': self.queue_wrapper,
780             'stdout': self.queue_wrapper,
781             'keep_stdin_open': True,
782             'pty': True,
783             'timeout': self.scenario_helper.timeout,
784         }
785
786     def _build_config(self):
787         return self.setup_helper.build_config()
788
789     def _run(self):
790         # we can't share ssh paramiko objects to force new connection
791         self.ssh_helper.drop_connection()
792         cmd = self._build_config()
793         # kill before starting
794         self.setup_helper.kill_vnf()
795
796         LOG.debug(cmd)
797         self._build_run_kwargs()
798         self.ssh_helper.run(cmd, **self.run_kwargs)
799
800     def vnf_execute(self, cmd, wait_time=2):
801         """ send cmd to vnf process """
802
803         LOG.info("%s command: %s", self.APP_NAME, cmd)
804         self.q_in.put("{}\r\n".format(cmd))
805         time.sleep(wait_time)
806         output = []
807         while self.q_out.qsize() > 0:
808             output.append(self.q_out.get())
809         return "".join(output)
810
811     def _tear_down(self):
812         pass
813
814     def terminate(self):
815         self.vnf_execute("quit")
816         self.setup_helper.kill_vnf()
817         self._tear_down()
818         self.resource_helper.stop_collect()
819         if self._vnf_process is not None:
820             # be proper and join first before we kill
821             LOG.debug("joining before terminate %s", self._vnf_process.name)
822             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
823             self._vnf_process.terminate()
824         # no terminate children here because we share processes with tg
825
826     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
827         """Method for checking the statistics
828
829         This method could be overridden in children classes.
830
831         :return: VNF statistics
832         """
833         cmd = 'p {0} stats'.format(self.APP_WORD)
834         out = self.vnf_execute(cmd)
835         return out
836
837     def collect_kpi(self):
838         # we can't get KPIs if the VNF is down
839         check_if_process_failed(self._vnf_process)
840         stats = self.get_stats()
841         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
842         if m:
843             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
844             result["collect_stats"] = self.resource_helper.collect_kpi()
845         else:
846             result = {
847                 "packets_in": 0,
848                 "packets_fwd": 0,
849                 "packets_dropped": 0,
850             }
851         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
852         return result
853
854     def scale(self, flavor=""):
855         """The SampleVNF base class doesn't provide the 'scale' feature"""
856         raise y_exceptions.FunctionNotImplemented(
857             function_name='scale', class_name='SampleVNFTrafficGen')
858
859
860 class SampleVNFTrafficGen(GenericTrafficGen):
861     """ Class providing file-like API for generic traffic generator """
862
863     APP_NAME = 'Sample'
864     RUN_WAIT = 1
865
866     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
867         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
868         self.bin_path = get_nsb_option('bin_path', '')
869
870         self.scenario_helper = ScenarioHelper(self.name)
871         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
872
873         if setup_env_helper_type is None:
874             setup_env_helper_type = SetupEnvHelper
875
876         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
877                                                   self.ssh_helper,
878                                                   self.scenario_helper)
879
880         if resource_helper_type is None:
881             resource_helper_type = ClientResourceHelper
882
883         self.resource_helper = resource_helper_type(self.setup_helper)
884
885         self.runs_traffic = True
886         self.traffic_finished = False
887         self._tg_process = None
888         self._traffic_process = None
889
890     def _start_server(self):
891         # we can't share ssh paramiko objects to force new connection
892         self.ssh_helper.drop_connection()
893
894     def instantiate(self, scenario_cfg, context_cfg):
895         self.scenario_helper.scenario_cfg = scenario_cfg
896         self.resource_helper.setup()
897         # must generate_cfg after DPDK bind because we need port number
898         self.resource_helper.generate_cfg()
899
900         LOG.info("Starting %s server...", self.APP_NAME)
901         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
902         self._tg_process = Process(name=name, target=self._start_server)
903         self._tg_process.start()
904
905     def _check_status(self):
906         raise NotImplementedError
907
908     def _wait_for_process(self):
909         while True:
910             if not self._tg_process.is_alive():
911                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
912             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
913             time.sleep(1)
914             status = self._check_status()
915             if status == 0:
916                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
917                 return self._tg_process.exitcode
918
919     def _traffic_runner(self, traffic_profile):
920         # always drop connections first thing in new processes
921         # so we don't get paramiko errors
922         self.ssh_helper.drop_connection()
923         LOG.info("Starting %s client...", self.APP_NAME)
924         self.resource_helper.run_traffic(traffic_profile)
925
926     def run_traffic(self, traffic_profile):
927         """ Generate traffic on the wire according to the given params.
928         Method is non-blocking, returns immediately when traffic process
929         is running. Mandatory.
930
931         :param traffic_profile:
932         :return: True/False
933         """
934         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
935                                     os.getpid())
936         self._traffic_process = Process(name=name, target=self._traffic_runner,
937                                         args=(traffic_profile,))
938         self._traffic_process.start()
939         # Wait for traffic process to start
940         while self.resource_helper.client_started.value == 0:
941             time.sleep(self.RUN_WAIT)
942             # what if traffic process takes a few seconds to start?
943             if not self._traffic_process.is_alive():
944                 break
945
946         return self._traffic_process.is_alive()
947
948     def collect_kpi(self):
949         # check if the tg processes have exited
950         for proc in (self._tg_process, self._traffic_process):
951             check_if_process_failed(proc)
952         result = self.resource_helper.collect_kpi()
953         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
954         return result
955
956     def terminate(self):
957         """ After this method finishes, all traffic processes should stop. Mandatory.
958
959         :return: True/False
960         """
961         self.traffic_finished = True
962         # we must kill client before we kill the server, or the client will raise exception
963         if self._traffic_process is not None:
964             # be proper and try to join before terminating
965             LOG.debug("joining before terminate %s", self._traffic_process.name)
966             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
967             self._traffic_process.terminate()
968         if self._tg_process is not None:
969             # be proper and try to join before terminating
970             LOG.debug("joining before terminate %s", self._tg_process.name)
971             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
972             self._tg_process.terminate()
973         # no terminate children here because we share processes with vnf
974
975     def scale(self, flavor=""):
976         """A traffic generator VFN doesn't provide the 'scale' feature"""
977         raise y_exceptions.FunctionNotImplemented(
978             function_name='scale', class_name='SampleVNFTrafficGen')