Merge "fix pep8 problem in env.py"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 import subprocess
23 import time
24
25 import six
26 from six.moves import cStringIO
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
33 from yardstick.common import exceptions as y_exceptions
34 from yardstick.common import utils
35 from yardstick.common.process import check_if_process_failed
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
38 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.ssh import AutoConnectSSH
45
46
47 DPDK_VERSION = "dpdk-16.07"
48
49 LOG = logging.getLogger(__name__)
50
51
52 REMOTE_TMP = "/tmp"
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
55
56
57 class VnfSshHelper(AutoConnectSSH):
58
59     def __init__(self, node, bin_path, wait=None):
60         self.node = node
61         kwargs = self.args_from_node(self.node)
62         if wait:
63             kwargs.setdefault('wait', wait)
64
65         super(VnfSshHelper, self).__init__(**kwargs)
66         self.bin_path = bin_path
67
68     @staticmethod
69     def get_class():
70         # must return static class name, anything else refers to the calling class
71         # i.e. the subclass, not the superclass
72         return VnfSshHelper
73
74     def copy(self):
75         # this copy constructor is different from SSH classes, since it uses node
76         return self.get_class()(self.node, self.bin_path)
77
78     def upload_config_file(self, prefix, content):
79         cfg_file = os.path.join(REMOTE_TMP, prefix)
80         LOG.debug(content)
81         file_obj = cStringIO(content)
82         self.put_file_obj(file_obj, cfg_file)
83         return cfg_file
84
85     def join_bin_path(self, *args):
86         return os.path.join(self.bin_path, *args)
87
88     def provision_tool(self, tool_path=None, tool_file=None):
89         if tool_path is None:
90             tool_path = self.bin_path
91         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
92
93
94 class SetupEnvHelper(object):
95
96     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def build_config(self):
109         raise NotImplementedError
110
111     def setup_vnf_environment(self):
112         pass
113
114     def kill_vnf(self):
115         pass
116
117     def tear_down(self):
118         raise NotImplementedError
119
120
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
122
123     APP_NAME = 'DpdkVnf'
124     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
126     HUGEPAGES_KB = 1024 * 1024 * 16
127
128     @staticmethod
129     def _update_packet_type(ip_pipeline_cfg, traffic_options):
130         match_str = 'pkt_type = ipv4'
131         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
132         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
133         return pipeline_config_str
134
135     @classmethod
136     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
137         traffic_type = traffic_options['traffic_type']
138
139         if traffic_options['vnf_type'] is not cls.APP_NAME:
140             match_str = 'traffic_type = 4'
141             replace_str = 'traffic_type = {0}'.format(traffic_type)
142
143         elif traffic_type == 4:
144             match_str = 'pkt_type = ipv4'
145             replace_str = 'pkt_type = ipv4'
146
147         else:
148             match_str = 'pkt_type = ipv4'
149             replace_str = 'pkt_type = ipv6'
150
151         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
152         return pipeline_config_str
153
154     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
155         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
156         self.all_ports = None
157         self.bound_pci = None
158         self.socket = None
159         self.used_drivers = None
160         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
161
162     def _setup_hugepages(self):
163         meminfo = utils.read_meminfo(self.ssh_helper)
164         hp_size_kb = int(meminfo['Hugepagesize'])
165         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
166         self.ssh_helper.execute('echo %s | sudo tee %s' %
167                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
168         hp = six.BytesIO()
169         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
170         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
171         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
172                  hp_size_kb, nr_hugepages, nr_hugepages_set)
173
174     def build_config(self):
175         vnf_cfg = self.scenario_helper.vnf_cfg
176         task_path = self.scenario_helper.task_path
177
178         lb_count = vnf_cfg.get('lb_count', 3)
179         lb_config = vnf_cfg.get('lb_config', 'SW')
180         worker_config = vnf_cfg.get('worker_config', '1C/1T')
181         worker_threads = vnf_cfg.get('worker_threads', 3)
182
183         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
184         traffic_options = {
185             'traffic_type': traffic_type,
186             'pkt_type': 'ipv%s' % traffic_type,
187             'vnf_type': self.VNF_TYPE,
188         }
189
190         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
191         config_basename = posixpath.basename(self.CFG_CONFIG)
192         script_basename = posixpath.basename(self.CFG_SCRIPT)
193         multiport = MultiPortConfig(self.scenario_helper.topology,
194                                     config_tpl_cfg,
195                                     config_basename,
196                                     self.vnfd_helper,
197                                     self.VNF_TYPE,
198                                     lb_count,
199                                     worker_threads,
200                                     worker_config,
201                                     lb_config,
202                                     self.socket)
203
204         multiport.generate_config()
205         with open(self.CFG_CONFIG) as handle:
206             new_config = handle.read()
207
208         new_config = self._update_traffic_type(new_config, traffic_options)
209         new_config = self._update_packet_type(new_config, traffic_options)
210
211         self.ssh_helper.upload_config_file(config_basename, new_config)
212         self.ssh_helper.upload_config_file(script_basename,
213                                            multiport.generate_script(self.vnfd_helper))
214
215         LOG.info("Provision and start the %s", self.APP_NAME)
216         self._build_pipeline_kwargs()
217         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
218
219     def _build_pipeline_kwargs(self):
220         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
221         # count the number of actual ports in the list of pairs
222         # remove duplicate ports
223         # this is really a mapping from LINK ID to DPDK PMD ID
224         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
225         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
226         ports = self.vnfd_helper.port_pairs.all_ports
227         port_nums = self.vnfd_helper.port_nums(ports)
228         # create mask from all the dpdk port numbers
229         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
230         self.pipeline_kwargs = {
231             'cfg_file': self.CFG_CONFIG,
232             'script': self.CFG_SCRIPT,
233             'port_mask_hex': ports_mask_hex,
234             'tool_path': tool_path,
235         }
236
237     def setup_vnf_environment(self):
238         self._setup_dpdk()
239         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
240         self.kill_vnf()
241         # bind before _setup_resources so we can use dpdk_port_num
242         self._detect_and_bind_drivers()
243         resource = self._setup_resources()
244         return resource
245
246     def kill_vnf(self):
247         # pkill is not matching, debug with pgrep
248         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
249         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
250         # have to use exact match
251         # try using killall to match
252         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
253
254     def _setup_dpdk(self):
255         """Setup DPDK environment needed for VNF to run"""
256         self._setup_hugepages()
257         self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
258         exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
259         if exit_status:
260             raise y_exceptions.DPDKSetupDriverError()
261
262     def get_collectd_options(self):
263         options = self.scenario_helper.all_options.get("collectd", {})
264         # override with specific node settings
265         options.update(self.scenario_helper.options.get("collectd", {}))
266         return options
267
268     def _setup_resources(self):
269         # what is this magic?  how do we know which socket is for which port?
270         # what about quad-socket?
271         if any(v[5] == "0" for v in self.bound_pci):
272             self.socket = 0
273         else:
274             self.socket = 1
275
276         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
277         # this won't work because we don't have DPDK port numbers yet
278         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
279         port_names = (intf["name"] for intf in ports)
280         collectd_options = self.get_collectd_options()
281         plugins = collectd_options.get("plugins", {})
282         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
283         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
284                                plugins=plugins, interval=collectd_options.get("interval"),
285                                timeout=self.scenario_helper.timeout)
286
287     def _detect_and_bind_drivers(self):
288         interfaces = self.vnfd_helper.interfaces
289
290         self.dpdk_bind_helper.read_status()
291         self.dpdk_bind_helper.save_used_drivers()
292
293         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
294
295         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
296         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
297             try:
298                 intf = next(v for v in interfaces
299                             if vpci == v['virtual-interface']['vpci'])
300                 # force to int
301                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
302             except:  # pylint: disable=bare-except
303                 pass
304         time.sleep(2)
305
306     def get_local_iface_name_by_vpci(self, vpci):
307         find_net_cmd = self.FIND_NET_CMD.format(vpci)
308         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
309         if exit_status == 0:
310             return stdout
311         return None
312
313     def tear_down(self):
314         self.dpdk_bind_helper.rebind_drivers()
315
316
317 class ResourceHelper(object):
318
319     COLLECT_KPI = ''
320     MAKE_INSTALL = 'cd {0} && make && sudo make install'
321     RESOURCE_WORD = 'sample'
322
323     COLLECT_MAP = {}
324
325     def __init__(self, setup_helper):
326         super(ResourceHelper, self).__init__()
327         self.resource = None
328         self.setup_helper = setup_helper
329         self.ssh_helper = setup_helper.ssh_helper
330
331     def setup(self):
332         self.resource = self.setup_helper.setup_vnf_environment()
333
334     def generate_cfg(self):
335         pass
336
337     def _collect_resource_kpi(self):
338         result = {}
339         status = self.resource.check_if_system_agent_running("collectd")[0]
340         if status == 0:
341             result = self.resource.amqp_collect_nfvi_kpi()
342
343         result = {"core": result}
344         return result
345
346     def start_collect(self):
347         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
348         self.resource.start()
349         self.resource.amqp_process_for_nfvi_kpi()
350
351     def stop_collect(self):
352         if self.resource:
353             self.resource.stop()
354
355     def collect_kpi(self):
356         return self._collect_resource_kpi()
357
358
359 class ClientResourceHelper(ResourceHelper):
360
361     RUN_DURATION = 60
362     QUEUE_WAIT_TIME = 5
363     SYNC_PORT = 1
364     ASYNC_PORT = 2
365
366     def __init__(self, setup_helper):
367         super(ClientResourceHelper, self).__init__(setup_helper)
368         self.vnfd_helper = setup_helper.vnfd_helper
369         self.scenario_helper = setup_helper.scenario_helper
370
371         self.client = None
372         self.client_started = Value('i', 0)
373         self.all_ports = None
374         self._queue = Queue()
375         self._result = {}
376         self._terminated = Value('i', 0)
377
378     def _build_ports(self):
379         self.networks = self.vnfd_helper.port_pairs.networks
380         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
381         self.downlink_ports = \
382             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
383         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
384
385     def port_num(self, intf):
386         # by default return port num
387         return self.vnfd_helper.port_num(intf)
388
389     def get_stats(self, *args, **kwargs):
390         try:
391             return self.client.get_stats(*args, **kwargs)
392         except STLError:
393             LOG.exception("TRex client not connected")
394             return {}
395
396     def generate_samples(self, ports, key=None, default=None):
397         # needs to be used ports
398         last_result = self.get_stats(ports)
399         key_value = last_result.get(key, default)
400
401         if not isinstance(last_result, Mapping):  # added for mock unit test
402             self._terminated.value = 1
403             return {}
404
405         samples = {}
406         # recalculate port for interface and see if it matches ports provided
407         for intf in self.vnfd_helper.interfaces:
408             name = intf["name"]
409             port = self.vnfd_helper.port_num(name)
410             if port in ports:
411                 xe_value = last_result.get(port, {})
412                 samples[name] = {
413                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
414                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
415                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
416                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
417                     "in_packets": int(xe_value.get("ipackets", 0)),
418                     "out_packets": int(xe_value.get("opackets", 0)),
419                 }
420                 if key:
421                     samples[name][key] = key_value
422         return samples
423
424     def _run_traffic_once(self, traffic_profile):
425         traffic_profile.execute_traffic(self)
426         self.client_started.value = 1
427         time.sleep(self.RUN_DURATION)
428         samples = self.generate_samples(traffic_profile.ports)
429         time.sleep(self.QUEUE_WAIT_TIME)
430         self._queue.put(samples)
431
432     def run_traffic(self, traffic_profile):
433         # if we don't do this we can hang waiting for the queue to drain
434         # have to do this in the subprocess
435         self._queue.cancel_join_thread()
436         # fixme: fix passing correct trex config file,
437         # instead of searching the default path
438         try:
439             self._build_ports()
440             self.client = self._connect()
441             self.client.reset(ports=self.all_ports)
442             self.client.remove_all_streams(self.all_ports)  # remove all streams
443             traffic_profile.register_generator(self)
444
445             while self._terminated.value == 0:
446                 self._run_traffic_once(traffic_profile)
447
448             self.client.stop(self.all_ports)
449             self.client.disconnect()
450             self._terminated.value = 0
451         except STLError:
452             if self._terminated.value:
453                 LOG.debug("traffic generator is stopped")
454                 return  # return if trex/tg server is stopped.
455             raise
456
457     def terminate(self):
458         self._terminated.value = 1  # stop client
459
460     def clear_stats(self, ports=None):
461         if ports is None:
462             ports = self.all_ports
463         self.client.clear_stats(ports=ports)
464
465     def start(self, ports=None, *args, **kwargs):
466         # pylint: disable=keyword-arg-before-vararg
467         # NOTE(ralonsoh): defining keyworded arguments before variable
468         # positional arguments is a bug. This function definition doesn't work
469         # in Python 2, although it works in Python 3. Reference:
470         # https://www.python.org/dev/peps/pep-3102/
471         if ports is None:
472             ports = self.all_ports
473         self.client.start(ports=ports, *args, **kwargs)
474
475     def collect_kpi(self):
476         if not self._queue.empty():
477             kpi = self._queue.get()
478             self._result.update(kpi)
479             LOG.debug('Got KPIs from _queue for %s %s',
480                       self.scenario_helper.name, self.RESOURCE_WORD)
481         return self._result
482
483     def _connect(self, client=None):
484         if client is None:
485             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
486                                server=self.vnfd_helper.mgmt_interface["ip"],
487                                verbose_level=LoggerApi.VERBOSE_QUIET)
488
489         # try to connect with 5s intervals, 30s max
490         for idx in range(6):
491             try:
492                 client.connect()
493                 break
494             except STLError:
495                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
496                 time.sleep(5)
497         return client
498
499
500 class Rfc2544ResourceHelper(object):
501
502     DEFAULT_CORRELATED_TRAFFIC = False
503     DEFAULT_LATENCY = False
504     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
505
506     def __init__(self, scenario_helper):
507         super(Rfc2544ResourceHelper, self).__init__()
508         self.scenario_helper = scenario_helper
509         self._correlated_traffic = None
510         self.iteration = Value('i', 0)
511         self._latency = None
512         self._rfc2544 = None
513         self._tolerance_low = None
514         self._tolerance_high = None
515
516     @property
517     def rfc2544(self):
518         if self._rfc2544 is None:
519             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
520         return self._rfc2544
521
522     @property
523     def tolerance_low(self):
524         if self._tolerance_low is None:
525             self.get_rfc_tolerance()
526         return self._tolerance_low
527
528     @property
529     def tolerance_high(self):
530         if self._tolerance_high is None:
531             self.get_rfc_tolerance()
532         return self._tolerance_high
533
534     @property
535     def correlated_traffic(self):
536         if self._correlated_traffic is None:
537             self._correlated_traffic = \
538                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
539
540         return self._correlated_traffic
541
542     @property
543     def latency(self):
544         if self._latency is None:
545             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
546         return self._latency
547
548     def get_rfc2544(self, name, default=None):
549         return self.rfc2544.get(name, default)
550
551     def get_rfc_tolerance(self):
552         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
553         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
554         self._tolerance_low = next(tolerance_iter)
555         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
556
557
558 class SampleVNFDeployHelper(object):
559
560     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
561     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
562     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
563
564     def __init__(self, vnfd_helper, ssh_helper):
565         super(SampleVNFDeployHelper, self).__init__()
566         self.ssh_helper = ssh_helper
567         self.vnfd_helper = vnfd_helper
568
569     def deploy_vnfs(self, app_name):
570         vnf_bin = self.ssh_helper.join_bin_path(app_name)
571         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
572         if not exit_status:
573             return
574
575         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
576         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
577         time.sleep(2)
578         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
579         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
580
581         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
582         time.sleep(2)
583         http_proxy = os.environ.get('http_proxy', '')
584         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
585         LOG.debug(cmd)
586         self.ssh_helper.execute(cmd)
587         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
588         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
589         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
590
591
592 class ScenarioHelper(object):
593
594     DEFAULT_VNF_CFG = {
595         'lb_config': 'SW',
596         'lb_count': 1,
597         'worker_config': '1C/1T',
598         'worker_threads': 1,
599     }
600
601     def __init__(self, name):
602         self.name = name
603         self.scenario_cfg = None
604
605     @property
606     def task_path(self):
607         return self.scenario_cfg['task_path']
608
609     @property
610     def nodes(self):
611         return self.scenario_cfg.get('nodes')
612
613     @property
614     def all_options(self):
615         return self.scenario_cfg.get('options', {})
616
617     @property
618     def options(self):
619         return self.all_options.get(self.name, {})
620
621     @property
622     def vnf_cfg(self):
623         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
624
625     @property
626     def topology(self):
627         return self.scenario_cfg['topology']
628
629     @property
630     def timeout(self):
631         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
632
633
634 class SampleVNF(GenericVNF):
635     """ Class providing file-like API for generic VNF implementation """
636
637     VNF_PROMPT = "pipeline>"
638     WAIT_TIME = 1
639     WAIT_TIME_FOR_SCRIPT = 10
640     APP_NAME = "SampleVNF"
641     # we run the VNF interactively, so the ssh command will timeout after this long
642
643     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
644         super(SampleVNF, self).__init__(name, vnfd)
645         self.bin_path = get_nsb_option('bin_path', '')
646
647         self.scenario_helper = ScenarioHelper(self.name)
648         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
649
650         if setup_env_helper_type is None:
651             setup_env_helper_type = SetupEnvHelper
652
653         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
654                                                   self.ssh_helper,
655                                                   self.scenario_helper)
656
657         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
658
659         if resource_helper_type is None:
660             resource_helper_type = ResourceHelper
661
662         self.resource_helper = resource_helper_type(self.setup_helper)
663
664         self.context_cfg = None
665         self.nfvi_context = None
666         self.pipeline_kwargs = {}
667         self.uplink_ports = None
668         self.downlink_ports = None
669         # NOTE(esm): make QueueFileWrapper invert-able so that we
670         #            never have to manage the queues
671         self.q_in = Queue()
672         self.q_out = Queue()
673         self.queue_wrapper = None
674         self.run_kwargs = {}
675         self.used_drivers = {}
676         self.vnf_port_pairs = None
677         self._vnf_process = None
678
679     def _build_ports(self):
680         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
681         self.networks = self._port_pairs.networks
682         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
683         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
684         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
685
686     def _get_route_data(self, route_index, route_type):
687         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
688         for _ in range(route_index):
689             next(route_iter, '')
690         return next(route_iter, {}).get(route_type, '')
691
692     def _get_port0localip6(self):
693         return_value = self._get_route_data(0, 'network')
694         LOG.info("_get_port0localip6 : %s", return_value)
695         return return_value
696
697     def _get_port1localip6(self):
698         return_value = self._get_route_data(1, 'network')
699         LOG.info("_get_port1localip6 : %s", return_value)
700         return return_value
701
702     def _get_port0prefixlen6(self):
703         return_value = self._get_route_data(0, 'netmask')
704         LOG.info("_get_port0prefixlen6 : %s", return_value)
705         return return_value
706
707     def _get_port1prefixlen6(self):
708         return_value = self._get_route_data(1, 'netmask')
709         LOG.info("_get_port1prefixlen6 : %s", return_value)
710         return return_value
711
712     def _get_port0gateway6(self):
713         return_value = self._get_route_data(0, 'network')
714         LOG.info("_get_port0gateway6 : %s", return_value)
715         return return_value
716
717     def _get_port1gateway6(self):
718         return_value = self._get_route_data(1, 'network')
719         LOG.info("_get_port1gateway6 : %s", return_value)
720         return return_value
721
722     def _start_vnf(self):
723         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
724         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
725         self._vnf_process = Process(name=name, target=self._run)
726         self._vnf_process.start()
727
728     def _vnf_up_post(self):
729         pass
730
731     def instantiate(self, scenario_cfg, context_cfg):
732         self.scenario_helper.scenario_cfg = scenario_cfg
733         self.context_cfg = context_cfg
734         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
735         # self.nfvi_context = None
736
737         # vnf deploy is unsupported, use ansible playbooks
738         if self.scenario_helper.options.get("vnf_deploy", False):
739             self.deploy_helper.deploy_vnfs(self.APP_NAME)
740         self.resource_helper.setup()
741         self._start_vnf()
742
743     def wait_for_instantiate(self):
744         buf = []
745         time.sleep(self.WAIT_TIME)  # Give some time for config to load
746         while True:
747             if not self._vnf_process.is_alive():
748                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
749
750             # NOTE(esm): move to QueueFileWrapper
751             while self.q_out.qsize() > 0:
752                 buf.append(self.q_out.get())
753                 message = ''.join(buf)
754                 if self.VNF_PROMPT in message:
755                     LOG.info("%s VNF is up and running.", self.APP_NAME)
756                     self._vnf_up_post()
757                     self.queue_wrapper.clear()
758                     self.resource_helper.start_collect()
759                     return self._vnf_process.exitcode
760
761                 if "PANIC" in message:
762                     raise RuntimeError("Error starting %s VNF." %
763                                        self.APP_NAME)
764
765             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
766             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
767             # Send ENTER to display a new prompt in case the prompt text was corrupted
768             # by other VNF output
769             self.q_in.put('\r\n')
770
771     def _build_run_kwargs(self):
772         self.run_kwargs = {
773             'stdin': self.queue_wrapper,
774             'stdout': self.queue_wrapper,
775             'keep_stdin_open': True,
776             'pty': True,
777             'timeout': self.scenario_helper.timeout,
778         }
779
780     def _build_config(self):
781         return self.setup_helper.build_config()
782
783     def _run(self):
784         # we can't share ssh paramiko objects to force new connection
785         self.ssh_helper.drop_connection()
786         cmd = self._build_config()
787         # kill before starting
788         self.setup_helper.kill_vnf()
789
790         LOG.debug(cmd)
791         self._build_run_kwargs()
792         self.ssh_helper.run(cmd, **self.run_kwargs)
793
794     def vnf_execute(self, cmd, wait_time=2):
795         """ send cmd to vnf process """
796
797         LOG.info("%s command: %s", self.APP_NAME, cmd)
798         self.q_in.put("{}\r\n".format(cmd))
799         time.sleep(wait_time)
800         output = []
801         while self.q_out.qsize() > 0:
802             output.append(self.q_out.get())
803         return "".join(output)
804
805     def _tear_down(self):
806         pass
807
808     def terminate(self):
809         self.vnf_execute("quit")
810         self.setup_helper.kill_vnf()
811         self._tear_down()
812         self.resource_helper.stop_collect()
813         if self._vnf_process is not None:
814             # be proper and join first before we kill
815             LOG.debug("joining before terminate %s", self._vnf_process.name)
816             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
817             self._vnf_process.terminate()
818         # no terminate children here because we share processes with tg
819
820     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
821         """Method for checking the statistics
822
823         This method could be overridden in children classes.
824
825         :return: VNF statistics
826         """
827         cmd = 'p {0} stats'.format(self.APP_WORD)
828         out = self.vnf_execute(cmd)
829         return out
830
831     def collect_kpi(self):
832         # we can't get KPIs if the VNF is down
833         check_if_process_failed(self._vnf_process)
834         stats = self.get_stats()
835         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
836         if m:
837             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
838             result["collect_stats"] = self.resource_helper.collect_kpi()
839         else:
840             result = {
841                 "packets_in": 0,
842                 "packets_fwd": 0,
843                 "packets_dropped": 0,
844             }
845         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
846         return result
847
848     def scale(self, flavor=""):
849         """The SampleVNF base class doesn't provide the 'scale' feature"""
850         raise y_exceptions.FunctionNotImplemented(
851             function_name='scale', class_name='SampleVNFTrafficGen')
852
853
854 class SampleVNFTrafficGen(GenericTrafficGen):
855     """ Class providing file-like API for generic traffic generator """
856
857     APP_NAME = 'Sample'
858     RUN_WAIT = 1
859
860     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
861         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
862         self.bin_path = get_nsb_option('bin_path', '')
863
864         self.scenario_helper = ScenarioHelper(self.name)
865         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
866
867         if setup_env_helper_type is None:
868             setup_env_helper_type = SetupEnvHelper
869
870         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
871                                                   self.ssh_helper,
872                                                   self.scenario_helper)
873
874         if resource_helper_type is None:
875             resource_helper_type = ClientResourceHelper
876
877         self.resource_helper = resource_helper_type(self.setup_helper)
878
879         self.runs_traffic = True
880         self.traffic_finished = False
881         self._tg_process = None
882         self._traffic_process = None
883
884     def _start_server(self):
885         # we can't share ssh paramiko objects to force new connection
886         self.ssh_helper.drop_connection()
887
888     def instantiate(self, scenario_cfg, context_cfg):
889         self.scenario_helper.scenario_cfg = scenario_cfg
890         self.resource_helper.setup()
891         # must generate_cfg after DPDK bind because we need port number
892         self.resource_helper.generate_cfg()
893
894         LOG.info("Starting %s server...", self.APP_NAME)
895         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
896         self._tg_process = Process(name=name, target=self._start_server)
897         self._tg_process.start()
898
899     def _check_status(self):
900         raise NotImplementedError
901
902     def _wait_for_process(self):
903         while True:
904             if not self._tg_process.is_alive():
905                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
906             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
907             time.sleep(1)
908             status = self._check_status()
909             if status == 0:
910                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
911                 return self._tg_process.exitcode
912
913     def _traffic_runner(self, traffic_profile):
914         # always drop connections first thing in new processes
915         # so we don't get paramiko errors
916         self.ssh_helper.drop_connection()
917         LOG.info("Starting %s client...", self.APP_NAME)
918         self.resource_helper.run_traffic(traffic_profile)
919
920     def run_traffic(self, traffic_profile):
921         """ Generate traffic on the wire according to the given params.
922         Method is non-blocking, returns immediately when traffic process
923         is running. Mandatory.
924
925         :param traffic_profile:
926         :return: True/False
927         """
928         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
929                                     os.getpid())
930         self._traffic_process = Process(name=name, target=self._traffic_runner,
931                                         args=(traffic_profile,))
932         self._traffic_process.start()
933         # Wait for traffic process to start
934         while self.resource_helper.client_started.value == 0:
935             time.sleep(self.RUN_WAIT)
936             # what if traffic process takes a few seconds to start?
937             if not self._traffic_process.is_alive():
938                 break
939
940         return self._traffic_process.is_alive()
941
942     def collect_kpi(self):
943         # check if the tg processes have exited
944         for proc in (self._tg_process, self._traffic_process):
945             check_if_process_failed(proc)
946         result = self.resource_helper.collect_kpi()
947         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
948         return result
949
950     def terminate(self):
951         """ After this method finishes, all traffic processes should stop. Mandatory.
952
953         :return: True/False
954         """
955         self.traffic_finished = True
956         # we must kill client before we kill the server, or the client will raise exception
957         if self._traffic_process is not None:
958             # be proper and try to join before terminating
959             LOG.debug("joining before terminate %s", self._traffic_process.name)
960             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
961             self._traffic_process.terminate()
962         if self._tg_process is not None:
963             # be proper and try to join before terminating
964             LOG.debug("joining before terminate %s", self._tg_process.name)
965             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
966             self._tg_process.terminate()
967         # no terminate children here because we share processes with vnf
968
969     def scale(self, flavor=""):
970         """A traffic generator VFN doesn't provide the 'scale' feature"""
971         raise y_exceptions.FunctionNotImplemented(
972             function_name='scale', class_name='SampleVNFTrafficGen')