NFVi Test stops after one hour even if configured context duration is longer
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 from six.moves import cStringIO
23 import subprocess
24 import time
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
42
43
44 DPDK_VERSION = "dpdk-16.07"
45
46 LOG = logging.getLogger(__name__)
47
48
49 REMOTE_TMP = "/tmp"
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
96     PIPELINE_COMMAND = ''
97     VNF_TYPE = "SAMPLE"
98
99     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100         super(SetupEnvHelper, self).__init__()
101         self.vnfd_helper = vnfd_helper
102         self.ssh_helper = ssh_helper
103         self.scenario_helper = scenario_helper
104
105     def build_config(self):
106         raise NotImplementedError
107
108     def setup_vnf_environment(self):
109         pass
110
111     def kill_vnf(self):
112         pass
113
114     def tear_down(self):
115         raise NotImplementedError
116
117
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
119
120     APP_NAME = 'DpdkVnf'
121     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
122
123     @staticmethod
124     def _update_packet_type(ip_pipeline_cfg, traffic_options):
125         match_str = 'pkt_type = ipv4'
126         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128         return pipeline_config_str
129
130     @classmethod
131     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132         traffic_type = traffic_options['traffic_type']
133
134         if traffic_options['vnf_type'] is not cls.APP_NAME:
135             match_str = 'traffic_type = 4'
136             replace_str = 'traffic_type = {0}'.format(traffic_type)
137
138         elif traffic_type == 4:
139             match_str = 'pkt_type = ipv4'
140             replace_str = 'pkt_type = ipv4'
141
142         else:
143             match_str = 'pkt_type = ipv4'
144             replace_str = 'pkt_type = ipv6'
145
146         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147         return pipeline_config_str
148
149     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151         self.all_ports = None
152         self.bound_pci = None
153         self.socket = None
154         self.used_drivers = None
155         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
156
157     def _setup_hugepages(self):
158         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
160
161         memory_path = \
162             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
164
165         if hugepages == "2048kB":
166             pages = 8192
167         else:
168             pages = 16
169
170         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
171
172     def build_config(self):
173         vnf_cfg = self.scenario_helper.vnf_cfg
174         task_path = self.scenario_helper.task_path
175
176         lb_count = vnf_cfg.get('lb_count', 3)
177         lb_config = vnf_cfg.get('lb_config', 'SW')
178         worker_config = vnf_cfg.get('worker_config', '1C/1T')
179         worker_threads = vnf_cfg.get('worker_threads', 3)
180
181         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
182         traffic_options = {
183             'traffic_type': traffic_type,
184             'pkt_type': 'ipv%s' % traffic_type,
185             'vnf_type': self.VNF_TYPE,
186         }
187
188         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189         config_basename = posixpath.basename(self.CFG_CONFIG)
190         script_basename = posixpath.basename(self.CFG_SCRIPT)
191         multiport = MultiPortConfig(self.scenario_helper.topology,
192                                     config_tpl_cfg,
193                                     config_basename,
194                                     self.vnfd_helper,
195                                     self.VNF_TYPE,
196                                     lb_count,
197                                     worker_threads,
198                                     worker_config,
199                                     lb_config,
200                                     self.socket)
201
202         multiport.generate_config()
203         with open(self.CFG_CONFIG) as handle:
204             new_config = handle.read()
205
206         new_config = self._update_traffic_type(new_config, traffic_options)
207         new_config = self._update_packet_type(new_config, traffic_options)
208
209         self.ssh_helper.upload_config_file(config_basename, new_config)
210         self.ssh_helper.upload_config_file(script_basename,
211                                            multiport.generate_script(self.vnfd_helper))
212
213         LOG.info("Provision and start the %s", self.APP_NAME)
214         self._build_pipeline_kwargs()
215         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
216
217     def _build_pipeline_kwargs(self):
218         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219         # count the number of actual ports in the list of pairs
220         # remove duplicate ports
221         # this is really a mapping from LINK ID to DPDK PMD ID
222         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224         ports = self.vnfd_helper.port_pairs.all_ports
225         port_nums = self.vnfd_helper.port_nums(ports)
226         # create mask from all the dpdk port numbers
227         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228         self.pipeline_kwargs = {
229             'cfg_file': self.CFG_CONFIG,
230             'script': self.CFG_SCRIPT,
231             'port_mask_hex': ports_mask_hex,
232             'tool_path': tool_path,
233         }
234
235     def setup_vnf_environment(self):
236         self._setup_dpdk()
237         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
238         self.kill_vnf()
239         # bind before _setup_resources so we can use dpdk_port_num
240         self._detect_and_bind_drivers()
241         resource = self._setup_resources()
242         return resource
243
244     def kill_vnf(self):
245         # pkill is not matching, debug with pgrep
246         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
247         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248         # have to use exact match
249         # try using killall to match
250         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
251
252     def _setup_dpdk(self):
253         """Setup DPDK environment needed for VNF to run"""
254         self._setup_hugepages()
255         self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
256         exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
257         if exit_status:
258             raise y_exceptions.DPDKSetupDriverError()
259
260     def get_collectd_options(self):
261         options = self.scenario_helper.all_options.get("collectd", {})
262         # override with specific node settings
263         options.update(self.scenario_helper.options.get("collectd", {}))
264         return options
265
266     def _setup_resources(self):
267         # what is this magic?  how do we know which socket is for which port?
268         # what about quad-socket?
269         if any(v[5] == "0" for v in self.bound_pci):
270             self.socket = 0
271         else:
272             self.socket = 1
273
274         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
275         # this won't work because we don't have DPDK port numbers yet
276         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
277         port_names = (intf["name"] for intf in ports)
278         collectd_options = self.get_collectd_options()
279         plugins = collectd_options.get("plugins", {})
280         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
281         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
282                                plugins=plugins, interval=collectd_options.get("interval"),
283                                timeout=self.scenario_helper.timeout)
284
285     def _detect_and_bind_drivers(self):
286         interfaces = self.vnfd_helper.interfaces
287
288         self.dpdk_bind_helper.read_status()
289         self.dpdk_bind_helper.save_used_drivers()
290
291         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
292
293         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
294         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
295             try:
296                 intf = next(v for v in interfaces
297                             if vpci == v['virtual-interface']['vpci'])
298                 # force to int
299                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
300             except:  # pylint: disable=bare-except
301                 pass
302         time.sleep(2)
303
304     def get_local_iface_name_by_vpci(self, vpci):
305         find_net_cmd = self.FIND_NET_CMD.format(vpci)
306         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
307         if exit_status == 0:
308             return stdout
309         return None
310
311     def tear_down(self):
312         self.dpdk_bind_helper.rebind_drivers()
313
314
315 class ResourceHelper(object):
316
317     COLLECT_KPI = ''
318     MAKE_INSTALL = 'cd {0} && make && sudo make install'
319     RESOURCE_WORD = 'sample'
320
321     COLLECT_MAP = {}
322
323     def __init__(self, setup_helper):
324         super(ResourceHelper, self).__init__()
325         self.resource = None
326         self.setup_helper = setup_helper
327         self.ssh_helper = setup_helper.ssh_helper
328
329     def setup(self):
330         self.resource = self.setup_helper.setup_vnf_environment()
331
332     def generate_cfg(self):
333         pass
334
335     def _collect_resource_kpi(self):
336         result = {}
337         status = self.resource.check_if_system_agent_running("collectd")[0]
338         if status == 0:
339             result = self.resource.amqp_collect_nfvi_kpi()
340
341         result = {"core": result}
342         return result
343
344     def start_collect(self):
345         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
346         self.resource.start()
347         self.resource.amqp_process_for_nfvi_kpi()
348
349     def stop_collect(self):
350         if self.resource:
351             self.resource.stop()
352
353     def collect_kpi(self):
354         return self._collect_resource_kpi()
355
356
357 class ClientResourceHelper(ResourceHelper):
358
359     RUN_DURATION = 60
360     QUEUE_WAIT_TIME = 5
361     SYNC_PORT = 1
362     ASYNC_PORT = 2
363
364     def __init__(self, setup_helper):
365         super(ClientResourceHelper, self).__init__(setup_helper)
366         self.vnfd_helper = setup_helper.vnfd_helper
367         self.scenario_helper = setup_helper.scenario_helper
368
369         self.client = None
370         self.client_started = Value('i', 0)
371         self.all_ports = None
372         self._queue = Queue()
373         self._result = {}
374         self._terminated = Value('i', 0)
375
376     def _build_ports(self):
377         self.networks = self.vnfd_helper.port_pairs.networks
378         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
379         self.downlink_ports = \
380             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
381         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
382
383     def port_num(self, intf):
384         # by default return port num
385         return self.vnfd_helper.port_num(intf)
386
387     def get_stats(self, *args, **kwargs):
388         try:
389             return self.client.get_stats(*args, **kwargs)
390         except STLError:
391             LOG.exception("TRex client not connected")
392             return {}
393
394     def generate_samples(self, ports, key=None, default=None):
395         # needs to be used ports
396         last_result = self.get_stats(ports)
397         key_value = last_result.get(key, default)
398
399         if not isinstance(last_result, Mapping):  # added for mock unit test
400             self._terminated.value = 1
401             return {}
402
403         samples = {}
404         # recalculate port for interface and see if it matches ports provided
405         for intf in self.vnfd_helper.interfaces:
406             name = intf["name"]
407             port = self.vnfd_helper.port_num(name)
408             if port in ports:
409                 xe_value = last_result.get(port, {})
410                 samples[name] = {
411                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
412                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
413                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
414                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
415                     "in_packets": int(xe_value.get("ipackets", 0)),
416                     "out_packets": int(xe_value.get("opackets", 0)),
417                 }
418                 if key:
419                     samples[name][key] = key_value
420         return samples
421
422     def _run_traffic_once(self, traffic_profile):
423         traffic_profile.execute_traffic(self)
424         self.client_started.value = 1
425         time.sleep(self.RUN_DURATION)
426         samples = self.generate_samples(traffic_profile.ports)
427         time.sleep(self.QUEUE_WAIT_TIME)
428         self._queue.put(samples)
429
430     def run_traffic(self, traffic_profile):
431         # if we don't do this we can hang waiting for the queue to drain
432         # have to do this in the subprocess
433         self._queue.cancel_join_thread()
434         # fixme: fix passing correct trex config file,
435         # instead of searching the default path
436         try:
437             self._build_ports()
438             self.client = self._connect()
439             self.client.reset(ports=self.all_ports)
440             self.client.remove_all_streams(self.all_ports)  # remove all streams
441             traffic_profile.register_generator(self)
442
443             while self._terminated.value == 0:
444                 self._run_traffic_once(traffic_profile)
445
446             self.client.stop(self.all_ports)
447             self.client.disconnect()
448             self._terminated.value = 0
449         except STLError:
450             if self._terminated.value:
451                 LOG.debug("traffic generator is stopped")
452                 return  # return if trex/tg server is stopped.
453             raise
454
455     def terminate(self):
456         self._terminated.value = 1  # stop client
457
458     def clear_stats(self, ports=None):
459         if ports is None:
460             ports = self.all_ports
461         self.client.clear_stats(ports=ports)
462
463     def start(self, ports=None, *args, **kwargs):
464         # pylint: disable=keyword-arg-before-vararg
465         # NOTE(ralonsoh): defining keyworded arguments before variable
466         # positional arguments is a bug. This function definition doesn't work
467         # in Python 2, although it works in Python 3. Reference:
468         # https://www.python.org/dev/peps/pep-3102/
469         if ports is None:
470             ports = self.all_ports
471         self.client.start(ports=ports, *args, **kwargs)
472
473     def collect_kpi(self):
474         if not self._queue.empty():
475             kpi = self._queue.get()
476             self._result.update(kpi)
477             LOG.debug('Got KPIs from _queue for %s %s',
478                       self.scenario_helper.name, self.RESOURCE_WORD)
479         return self._result
480
481     def _connect(self, client=None):
482         if client is None:
483             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
484                                server=self.vnfd_helper.mgmt_interface["ip"],
485                                verbose_level=LoggerApi.VERBOSE_QUIET)
486
487         # try to connect with 5s intervals, 30s max
488         for idx in range(6):
489             try:
490                 client.connect()
491                 break
492             except STLError:
493                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
494                 time.sleep(5)
495         return client
496
497
498 class Rfc2544ResourceHelper(object):
499
500     DEFAULT_CORRELATED_TRAFFIC = False
501     DEFAULT_LATENCY = False
502     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
503
504     def __init__(self, scenario_helper):
505         super(Rfc2544ResourceHelper, self).__init__()
506         self.scenario_helper = scenario_helper
507         self._correlated_traffic = None
508         self.iteration = Value('i', 0)
509         self._latency = None
510         self._rfc2544 = None
511         self._tolerance_low = None
512         self._tolerance_high = None
513
514     @property
515     def rfc2544(self):
516         if self._rfc2544 is None:
517             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
518         return self._rfc2544
519
520     @property
521     def tolerance_low(self):
522         if self._tolerance_low is None:
523             self.get_rfc_tolerance()
524         return self._tolerance_low
525
526     @property
527     def tolerance_high(self):
528         if self._tolerance_high is None:
529             self.get_rfc_tolerance()
530         return self._tolerance_high
531
532     @property
533     def correlated_traffic(self):
534         if self._correlated_traffic is None:
535             self._correlated_traffic = \
536                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
537
538         return self._correlated_traffic
539
540     @property
541     def latency(self):
542         if self._latency is None:
543             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
544         return self._latency
545
546     def get_rfc2544(self, name, default=None):
547         return self.rfc2544.get(name, default)
548
549     def get_rfc_tolerance(self):
550         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
551         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
552         self._tolerance_low = next(tolerance_iter)
553         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
554
555
556 class SampleVNFDeployHelper(object):
557
558     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
559     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
560     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
561
562     def __init__(self, vnfd_helper, ssh_helper):
563         super(SampleVNFDeployHelper, self).__init__()
564         self.ssh_helper = ssh_helper
565         self.vnfd_helper = vnfd_helper
566
567     def deploy_vnfs(self, app_name):
568         vnf_bin = self.ssh_helper.join_bin_path(app_name)
569         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
570         if not exit_status:
571             return
572
573         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
574         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
575         time.sleep(2)
576         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
577         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
578
579         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
580         time.sleep(2)
581         http_proxy = os.environ.get('http_proxy', '')
582         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
583         LOG.debug(cmd)
584         self.ssh_helper.execute(cmd)
585         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
586         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
587         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
588
589
590 class ScenarioHelper(object):
591
592     DEFAULT_VNF_CFG = {
593         'lb_config': 'SW',
594         'lb_count': 1,
595         'worker_config': '1C/1T',
596         'worker_threads': 1,
597     }
598
599     def __init__(self, name):
600         self.name = name
601         self.scenario_cfg = None
602
603     @property
604     def task_path(self):
605         return self.scenario_cfg['task_path']
606
607     @property
608     def nodes(self):
609         return self.scenario_cfg.get('nodes')
610
611     @property
612     def all_options(self):
613         return self.scenario_cfg.get('options', {})
614
615     @property
616     def options(self):
617         return self.all_options.get(self.name, {})
618
619     @property
620     def vnf_cfg(self):
621         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
622
623     @property
624     def topology(self):
625         return self.scenario_cfg['topology']
626
627     @property
628     def timeout(self):
629         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
630             self.options.get('timeout', DEFAULT_VNF_TIMEOUT))
631         test_timeout = self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
632         return test_duration if test_duration > test_timeout else test_timeout
633
634 class SampleVNF(GenericVNF):
635     """ Class providing file-like API for generic VNF implementation """
636
637     VNF_PROMPT = "pipeline>"
638     WAIT_TIME = 1
639     WAIT_TIME_FOR_SCRIPT = 10
640     APP_NAME = "SampleVNF"
641     # we run the VNF interactively, so the ssh command will timeout after this long
642
643     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
644         super(SampleVNF, self).__init__(name, vnfd)
645         self.bin_path = get_nsb_option('bin_path', '')
646
647         self.scenario_helper = ScenarioHelper(self.name)
648         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
649
650         if setup_env_helper_type is None:
651             setup_env_helper_type = SetupEnvHelper
652
653         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
654                                                   self.ssh_helper,
655                                                   self.scenario_helper)
656
657         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
658
659         if resource_helper_type is None:
660             resource_helper_type = ResourceHelper
661
662         self.resource_helper = resource_helper_type(self.setup_helper)
663
664         self.context_cfg = None
665         self.nfvi_context = None
666         self.pipeline_kwargs = {}
667         self.uplink_ports = None
668         self.downlink_ports = None
669         # NOTE(esm): make QueueFileWrapper invert-able so that we
670         #            never have to manage the queues
671         self.q_in = Queue()
672         self.q_out = Queue()
673         self.queue_wrapper = None
674         self.run_kwargs = {}
675         self.used_drivers = {}
676         self.vnf_port_pairs = None
677         self._vnf_process = None
678
679     def _build_ports(self):
680         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
681         self.networks = self._port_pairs.networks
682         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
683         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
684         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
685
686     def _get_route_data(self, route_index, route_type):
687         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
688         for _ in range(route_index):
689             next(route_iter, '')
690         return next(route_iter, {}).get(route_type, '')
691
692     def _get_port0localip6(self):
693         return_value = self._get_route_data(0, 'network')
694         LOG.info("_get_port0localip6 : %s", return_value)
695         return return_value
696
697     def _get_port1localip6(self):
698         return_value = self._get_route_data(1, 'network')
699         LOG.info("_get_port1localip6 : %s", return_value)
700         return return_value
701
702     def _get_port0prefixlen6(self):
703         return_value = self._get_route_data(0, 'netmask')
704         LOG.info("_get_port0prefixlen6 : %s", return_value)
705         return return_value
706
707     def _get_port1prefixlen6(self):
708         return_value = self._get_route_data(1, 'netmask')
709         LOG.info("_get_port1prefixlen6 : %s", return_value)
710         return return_value
711
712     def _get_port0gateway6(self):
713         return_value = self._get_route_data(0, 'network')
714         LOG.info("_get_port0gateway6 : %s", return_value)
715         return return_value
716
717     def _get_port1gateway6(self):
718         return_value = self._get_route_data(1, 'network')
719         LOG.info("_get_port1gateway6 : %s", return_value)
720         return return_value
721
722     def _start_vnf(self):
723         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
724         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
725         self._vnf_process = Process(name=name, target=self._run)
726         self._vnf_process.start()
727
728     def _vnf_up_post(self):
729         pass
730
731     def instantiate(self, scenario_cfg, context_cfg):
732         self.scenario_helper.scenario_cfg = scenario_cfg
733         self.context_cfg = context_cfg
734         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
735         # self.nfvi_context = None
736
737         # vnf deploy is unsupported, use ansible playbooks
738         if self.scenario_helper.options.get("vnf_deploy", False):
739             self.deploy_helper.deploy_vnfs(self.APP_NAME)
740         self.resource_helper.setup()
741         self._start_vnf()
742
743     def wait_for_instantiate(self):
744         buf = []
745         time.sleep(self.WAIT_TIME)  # Give some time for config to load
746         while True:
747             if not self._vnf_process.is_alive():
748                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
749
750             # NOTE(esm): move to QueueFileWrapper
751             while self.q_out.qsize() > 0:
752                 buf.append(self.q_out.get())
753                 message = ''.join(buf)
754                 if self.VNF_PROMPT in message:
755                     LOG.info("%s VNF is up and running.", self.APP_NAME)
756                     self._vnf_up_post()
757                     self.queue_wrapper.clear()
758                     self.resource_helper.start_collect()
759                     return self._vnf_process.exitcode
760
761                 if "PANIC" in message:
762                     raise RuntimeError("Error starting %s VNF." %
763                                        self.APP_NAME)
764
765             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
766             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
767             # Send ENTER to display a new prompt in case the prompt text was corrupted
768             # by other VNF output
769             self.q_in.put('\r\n')
770
771     def _build_run_kwargs(self):
772         self.run_kwargs = {
773             'stdin': self.queue_wrapper,
774             'stdout': self.queue_wrapper,
775             'keep_stdin_open': True,
776             'pty': True,
777             'timeout': self.scenario_helper.timeout,
778         }
779
780     def _build_config(self):
781         return self.setup_helper.build_config()
782
783     def _run(self):
784         # we can't share ssh paramiko objects to force new connection
785         self.ssh_helper.drop_connection()
786         cmd = self._build_config()
787         # kill before starting
788         self.setup_helper.kill_vnf()
789
790         LOG.debug(cmd)
791         self._build_run_kwargs()
792         self.ssh_helper.run(cmd, **self.run_kwargs)
793
794     def vnf_execute(self, cmd, wait_time=2):
795         """ send cmd to vnf process """
796
797         LOG.info("%s command: %s", self.APP_NAME, cmd)
798         self.q_in.put("{}\r\n".format(cmd))
799         time.sleep(wait_time)
800         output = []
801         while self.q_out.qsize() > 0:
802             output.append(self.q_out.get())
803         return "".join(output)
804
805     def _tear_down(self):
806         pass
807
808     def terminate(self):
809         self.vnf_execute("quit")
810         self.setup_helper.kill_vnf()
811         self._tear_down()
812         self.resource_helper.stop_collect()
813         if self._vnf_process is not None:
814             # be proper and join first before we kill
815             LOG.debug("joining before terminate %s", self._vnf_process.name)
816             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
817             self._vnf_process.terminate()
818         # no terminate children here because we share processes with tg
819
820     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
821         """Method for checking the statistics
822
823         This method could be overridden in children classes.
824
825         :return: VNF statistics
826         """
827         cmd = 'p {0} stats'.format(self.APP_WORD)
828         out = self.vnf_execute(cmd)
829         return out
830
831     def collect_kpi(self):
832         # we can't get KPIs if the VNF is down
833         check_if_process_failed(self._vnf_process)
834         stats = self.get_stats()
835         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
836         if m:
837             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
838             result["collect_stats"] = self.resource_helper.collect_kpi()
839         else:
840             result = {
841                 "packets_in": 0,
842                 "packets_fwd": 0,
843                 "packets_dropped": 0,
844             }
845         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
846         return result
847
848     def scale(self, flavor=""):
849         """The SampleVNF base class doesn't provide the 'scale' feature"""
850         raise y_exceptions.FunctionNotImplemented(
851             function_name='scale', class_name='SampleVNFTrafficGen')
852
853
854 class SampleVNFTrafficGen(GenericTrafficGen):
855     """ Class providing file-like API for generic traffic generator """
856
857     APP_NAME = 'Sample'
858     RUN_WAIT = 1
859
860     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
861         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
862         self.bin_path = get_nsb_option('bin_path', '')
863
864         self.scenario_helper = ScenarioHelper(self.name)
865         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
866
867         if setup_env_helper_type is None:
868             setup_env_helper_type = SetupEnvHelper
869
870         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
871                                                   self.ssh_helper,
872                                                   self.scenario_helper)
873
874         if resource_helper_type is None:
875             resource_helper_type = ClientResourceHelper
876
877         self.resource_helper = resource_helper_type(self.setup_helper)
878
879         self.runs_traffic = True
880         self.traffic_finished = False
881         self._tg_process = None
882         self._traffic_process = None
883
884     def _start_server(self):
885         # we can't share ssh paramiko objects to force new connection
886         self.ssh_helper.drop_connection()
887
888     def instantiate(self, scenario_cfg, context_cfg):
889         self.scenario_helper.scenario_cfg = scenario_cfg
890         self.resource_helper.setup()
891         # must generate_cfg after DPDK bind because we need port number
892         self.resource_helper.generate_cfg()
893
894         LOG.info("Starting %s server...", self.APP_NAME)
895         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
896         self._tg_process = Process(name=name, target=self._start_server)
897         self._tg_process.start()
898
899     def _check_status(self):
900         raise NotImplementedError
901
902     def _wait_for_process(self):
903         while True:
904             if not self._tg_process.is_alive():
905                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
906             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
907             time.sleep(1)
908             status = self._check_status()
909             if status == 0:
910                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
911                 return self._tg_process.exitcode
912
913     def _traffic_runner(self, traffic_profile):
914         # always drop connections first thing in new processes
915         # so we don't get paramiko errors
916         self.ssh_helper.drop_connection()
917         LOG.info("Starting %s client...", self.APP_NAME)
918         self.resource_helper.run_traffic(traffic_profile)
919
920     def run_traffic(self, traffic_profile):
921         """ Generate traffic on the wire according to the given params.
922         Method is non-blocking, returns immediately when traffic process
923         is running. Mandatory.
924
925         :param traffic_profile:
926         :return: True/False
927         """
928         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
929                                     os.getpid())
930         self._traffic_process = Process(name=name, target=self._traffic_runner,
931                                         args=(traffic_profile,))
932         self._traffic_process.start()
933         # Wait for traffic process to start
934         while self.resource_helper.client_started.value == 0:
935             time.sleep(self.RUN_WAIT)
936             # what if traffic process takes a few seconds to start?
937             if not self._traffic_process.is_alive():
938                 break
939
940         return self._traffic_process.is_alive()
941
942     def collect_kpi(self):
943         # check if the tg processes have exited
944         for proc in (self._tg_process, self._traffic_process):
945             check_if_process_failed(proc)
946         result = self.resource_helper.collect_kpi()
947         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
948         return result
949
950     def terminate(self):
951         """ After this method finishes, all traffic processes should stop. Mandatory.
952
953         :return: True/False
954         """
955         self.traffic_finished = True
956         # we must kill client before we kill the server, or the client will raise exception
957         if self._traffic_process is not None:
958             # be proper and try to join before terminating
959             LOG.debug("joining before terminate %s", self._traffic_process.name)
960             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
961             self._traffic_process.terminate()
962         if self._tg_process is not None:
963             # be proper and try to join before terminating
964             LOG.debug("joining before terminate %s", self._tg_process.name)
965             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
966             self._tg_process.terminate()
967         # no terminate children here because we share processes with vnf
968
969     def scale(self, flavor=""):
970         """A traffic generator VFN doesn't provide the 'scale' feature"""
971         raise y_exceptions.FunctionNotImplemented(
972             function_name='scale', class_name='SampleVNFTrafficGen')