Merge "vnf_generic: adjust ssh timeout for number of VNFs"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     CORES = []
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def _get_ports_gateway(self, name):
109         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110         for route in routing_table:
111             if name == route['if']:
112                 return route['gateway']
113         return None
114
115     def build_config(self):
116         raise NotImplementedError
117
118     def setup_vnf_environment(self):
119         pass
120
121     def kill_vnf(self):
122         pass
123
124     def tear_down(self):
125         raise NotImplementedError
126
127
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129
130     APP_NAME = 'DpdkVnf'
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self.socket = None
167         self.used_drivers = None
168         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
169
170     def _setup_hugepages(self):
171         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
173
174         memory_path = \
175             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
177
178         if hugepages == "2048kB":
179             pages = 8192
180         else:
181             pages = 16
182
183         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
184
185     def build_config(self):
186         vnf_cfg = self.scenario_helper.vnf_cfg
187         task_path = self.scenario_helper.task_path
188
189         lb_count = vnf_cfg.get('lb_count', 3)
190         lb_config = vnf_cfg.get('lb_config', 'SW')
191         worker_config = vnf_cfg.get('worker_config', '1C/1T')
192         worker_threads = vnf_cfg.get('worker_threads', 3)
193
194         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
195         traffic_options = {
196             'traffic_type': traffic_type,
197             'pkt_type': 'ipv%s' % traffic_type,
198             'vnf_type': self.VNF_TYPE,
199         }
200
201         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202         config_basename = posixpath.basename(self.CFG_CONFIG)
203         script_basename = posixpath.basename(self.CFG_SCRIPT)
204         multiport = MultiPortConfig(self.scenario_helper.topology,
205                                     config_tpl_cfg,
206                                     config_basename,
207                                     self.vnfd_helper,
208                                     self.VNF_TYPE,
209                                     lb_count,
210                                     worker_threads,
211                                     worker_config,
212                                     lb_config,
213                                     self.socket)
214
215         multiport.generate_config()
216         with open(self.CFG_CONFIG) as handle:
217             new_config = handle.read()
218
219         new_config = self._update_traffic_type(new_config, traffic_options)
220         new_config = self._update_packet_type(new_config, traffic_options)
221
222         self.ssh_helper.upload_config_file(config_basename, new_config)
223         self.ssh_helper.upload_config_file(script_basename,
224                                            multiport.generate_script(self.vnfd_helper))
225
226         LOG.info("Provision and start the %s", self.APP_NAME)
227         self._build_pipeline_kwargs()
228         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
229
230     def _build_pipeline_kwargs(self):
231         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232         # count the number of actual ports in the list of pairs
233         # remove duplicate ports
234         # this is really a mapping from LINK ID to DPDK PMD ID
235         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237         ports = self.vnfd_helper.port_pairs.all_ports
238         port_nums = self.vnfd_helper.port_nums(ports)
239         # create mask from all the dpdk port numbers
240         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241         self.pipeline_kwargs = {
242             'cfg_file': self.CFG_CONFIG,
243             'script': self.CFG_SCRIPT,
244             'port_mask_hex': ports_mask_hex,
245             'tool_path': tool_path,
246         }
247
248     def _get_app_cpu(self):
249         if self.CORES:
250             return self.CORES
251
252         vnf_cfg = self.scenario_helper.vnf_cfg
253         sys_obj = CpuSysCores(self.ssh_helper)
254         self.sys_cpu = sys_obj.get_core_socket()
255         num_core = int(vnf_cfg["worker_threads"])
256         if vnf_cfg.get("lb_config", "SW") == 'HW':
257             num_core += self.HW_DEFAULT_CORE
258         else:
259             num_core += self.SW_DEFAULT_CORE
260         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
261         return app_cpu
262
263     def _get_cpu_sibling_list(self, cores=None):
264         if cores is None:
265             cores = self._get_app_cpu()
266         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267         awk_template = "awk -F: '{ print $1 }' < %s"
268         sys_path = "/sys/devices/system/cpu/"
269         cpu_topology = []
270         try:
271             for core in cores:
272                 sys_cmd = sys_cmd_template % (sys_path, core)
273                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
275
276             return cpu_topology
277         except Exception:
278             return []
279
280     def _validate_cpu_cfg(self):
281         return self._get_cpu_sibling_list()
282
283     def setup_vnf_environment(self):
284         self._setup_dpdk()
285         resource = self._setup_resources()
286         self.kill_vnf()
287         self._detect_and_bind_drivers()
288         return resource
289
290     def kill_vnf(self):
291         # have to use exact match
292         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
293
294     def _setup_dpdk(self):
295         """ setup dpdk environment needed for vnf to run """
296
297         self._setup_hugepages()
298         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
299
300         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
301         if exit_status == 0:
302             return
303
304         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
305         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
306         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
307         if exit_status != 0:
308             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
309
310     def _setup_resources(self):
311         interfaces = self.vnfd_helper.interfaces
312         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
313
314         # what is this magic?  how do we know which socket is for which port?
315         # what about quad-socket?
316         if any(v[5] == "0" for v in self.bound_pci):
317             self.socket = 0
318         else:
319             self.socket = 1
320
321         cores = self._validate_cpu_cfg()
322         return ResourceProfile(self.vnfd_helper.mgmt_interface,
323                                interfaces=self.vnfd_helper.interfaces, cores=cores)
324
325     def _detect_and_bind_drivers(self):
326         interfaces = self.vnfd_helper.interfaces
327
328         self.dpdk_bind_helper.read_status()
329         self.dpdk_bind_helper.save_used_drivers()
330
331         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
332
333         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
334         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
335             try:
336                 intf = next(v for v in interfaces
337                             if vpci == v['virtual-interface']['vpci'])
338                 # force to int
339                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
340             except:
341                 pass
342         time.sleep(2)
343
344     def get_local_iface_name_by_vpci(self, vpci):
345         find_net_cmd = self.FIND_NET_CMD.format(vpci)
346         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
347         if exit_status == 0:
348             return stdout
349         return None
350
351     def tear_down(self):
352         self.dpdk_bind_helper.rebind_drivers()
353
354
355 class ResourceHelper(object):
356
357     COLLECT_KPI = ''
358     MAKE_INSTALL = 'cd {0} && make && sudo make install'
359     RESOURCE_WORD = 'sample'
360
361     COLLECT_MAP = {}
362
363     def __init__(self, setup_helper):
364         super(ResourceHelper, self).__init__()
365         self.resource = None
366         self.setup_helper = setup_helper
367         self.ssh_helper = setup_helper.ssh_helper
368
369     def setup(self):
370         self.resource = self.setup_helper.setup_vnf_environment()
371
372     def generate_cfg(self):
373         pass
374
375     def _collect_resource_kpi(self):
376         result = {}
377         status = self.resource.check_if_sa_running("collectd")[0]
378         if status:
379             result = self.resource.amqp_collect_nfvi_kpi()
380
381         result = {"core": result}
382         return result
383
384     def start_collect(self):
385         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
386         self.resource.start()
387         self.resource.amqp_process_for_nfvi_kpi()
388
389     def stop_collect(self):
390         if self.resource:
391             self.resource.stop()
392
393     def collect_kpi(self):
394         return self._collect_resource_kpi()
395
396
397 class ClientResourceHelper(ResourceHelper):
398
399     RUN_DURATION = 60
400     QUEUE_WAIT_TIME = 5
401     SYNC_PORT = 1
402     ASYNC_PORT = 2
403
404     def __init__(self, setup_helper):
405         super(ClientResourceHelper, self).__init__(setup_helper)
406         self.vnfd_helper = setup_helper.vnfd_helper
407         self.scenario_helper = setup_helper.scenario_helper
408
409         self.client = None
410         self.client_started = Value('i', 0)
411         self.all_ports = None
412         self._queue = Queue()
413         self._result = {}
414         self._terminated = Value('i', 0)
415         self._vpci_ascending = None
416
417     def _build_ports(self):
418         self.networks = self.vnfd_helper.port_pairs.networks
419         self.priv_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.priv_ports)
420         self.pub_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.pub_ports)
421         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
422
423     def get_stats(self, *args, **kwargs):
424         try:
425             return self.client.get_stats(*args, **kwargs)
426         except STLError:
427             LOG.exception("TRex client not connected")
428             return {}
429
430     def generate_samples(self, ports, key=None, default=None):
431         # needs to be used ports
432         last_result = self.get_stats(ports)
433         key_value = last_result.get(key, default)
434
435         if not isinstance(last_result, Mapping):  # added for mock unit test
436             self._terminated.value = 1
437             return {}
438
439         samples = {}
440         # recalculate port for interface and see if it matches ports provided
441         for intf in self.vnfd_helper.interfaces:
442             name = intf["name"]
443             port = self.vnfd_helper.port_num(name)
444             if port in ports:
445                 xe_value = last_result.get(port, {})
446                 samples[name] = {
447                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
448                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
449                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
450                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
451                     "in_packets": int(xe_value.get("ipackets", 0)),
452                     "out_packets": int(xe_value.get("opackets", 0)),
453                 }
454                 if key:
455                     samples[name][key] = key_value
456         return samples
457
458     def _run_traffic_once(self, traffic_profile):
459         traffic_profile.execute_traffic(self)
460         self.client_started.value = 1
461         time.sleep(self.RUN_DURATION)
462         samples = self.generate_samples(traffic_profile.ports)
463         time.sleep(self.QUEUE_WAIT_TIME)
464         self._queue.put(samples)
465
466     def run_traffic(self, traffic_profile):
467         # fixme: fix passing correct trex config file,
468         # instead of searching the default path
469         try:
470             self._build_ports()
471             self.client = self._connect()
472             self.client.reset(ports=self.all_ports)
473             self.client.remove_all_streams(self.all_ports)  # remove all streams
474             traffic_profile.register_generator(self)
475
476             while self._terminated.value == 0:
477                 self._run_traffic_once(traffic_profile)
478
479             self.client.stop(self.all_ports)
480             self.client.disconnect()
481             self._terminated.value = 0
482         except STLError:
483             if self._terminated.value:
484                 LOG.debug("traffic generator is stopped")
485                 return  # return if trex/tg server is stopped.
486             raise
487
488     def terminate(self):
489         self._terminated.value = 1  # stop client
490
491     def clear_stats(self, ports=None):
492         if ports is None:
493             ports = self.all_ports
494         self.client.clear_stats(ports=ports)
495
496     def start(self, ports=None, *args, **kwargs):
497         if ports is None:
498             ports = self.all_ports
499         self.client.start(ports=ports, *args, **kwargs)
500
501     def collect_kpi(self):
502         if not self._queue.empty():
503             kpi = self._queue.get()
504             self._result.update(kpi)
505             LOG.debug("Got KPIs from _queue for {0} {1}".format(
506                 self.scenario_helper.name, self.RESOURCE_WORD))
507         return self._result
508
509     def _connect(self, client=None):
510         if client is None:
511             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
512                                server=self.vnfd_helper.mgmt_interface["ip"],
513                                verbose_level=LoggerApi.VERBOSE_QUIET)
514
515         # try to connect with 5s intervals, 30s max
516         for idx in range(6):
517             try:
518                 client.connect()
519                 break
520             except STLError:
521                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
522                 time.sleep(5)
523         return client
524
525
526 class Rfc2544ResourceHelper(object):
527
528     DEFAULT_CORRELATED_TRAFFIC = False
529     DEFAULT_LATENCY = False
530     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
531
532     def __init__(self, scenario_helper):
533         super(Rfc2544ResourceHelper, self).__init__()
534         self.scenario_helper = scenario_helper
535         self._correlated_traffic = None
536         self.iteration = Value('i', 0)
537         self._latency = None
538         self._rfc2544 = None
539         self._tolerance_low = None
540         self._tolerance_high = None
541
542     @property
543     def rfc2544(self):
544         if self._rfc2544 is None:
545             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
546         return self._rfc2544
547
548     @property
549     def tolerance_low(self):
550         if self._tolerance_low is None:
551             self.get_rfc_tolerance()
552         return self._tolerance_low
553
554     @property
555     def tolerance_high(self):
556         if self._tolerance_high is None:
557             self.get_rfc_tolerance()
558         return self._tolerance_high
559
560     @property
561     def correlated_traffic(self):
562         if self._correlated_traffic is None:
563             self._correlated_traffic = \
564                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
565
566         return self._correlated_traffic
567
568     @property
569     def latency(self):
570         if self._latency is None:
571             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
572         return self._latency
573
574     def get_rfc2544(self, name, default=None):
575         return self.rfc2544.get(name, default)
576
577     def get_rfc_tolerance(self):
578         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
579         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
580         self._tolerance_low = next(tolerance_iter)
581         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
582
583
584 class SampleVNFDeployHelper(object):
585
586     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
587     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
588     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
589
590     def __init__(self, vnfd_helper, ssh_helper):
591         super(SampleVNFDeployHelper, self).__init__()
592         self.ssh_helper = ssh_helper
593         self.vnfd_helper = vnfd_helper
594
595     DISABLE_DEPLOY = True
596
597     def deploy_vnfs(self, app_name):
598         # temp disable for now
599         if self.DISABLE_DEPLOY:
600             return
601
602         vnf_bin = self.ssh_helper.join_bin_path(app_name)
603         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
604         if not exit_status:
605             return
606
607         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
608         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
609         time.sleep(2)
610         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
611         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
612
613         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
614         time.sleep(2)
615         http_proxy = os.environ.get('http_proxy', '')
616         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
617         LOG.debug(cmd)
618         self.ssh_helper.execute(cmd)
619         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
620         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
621         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
622
623
624 class ScenarioHelper(object):
625
626     DEFAULT_VNF_CFG = {
627         'lb_config': 'SW',
628         'lb_count': 1,
629         'worker_config': '1C/1T',
630         'worker_threads': 1,
631     }
632
633     def __init__(self, name):
634         self.name = name
635         self.scenario_cfg = None
636
637     @property
638     def task_path(self):
639         return self.scenario_cfg['task_path']
640
641     @property
642     def nodes(self):
643         return self.scenario_cfg.get('nodes')
644
645     @property
646     def all_options(self):
647         return self.scenario_cfg.get('options', {})
648
649     @property
650     def options(self):
651         return self.all_options.get(self.name, {})
652
653     @property
654     def vnf_cfg(self):
655         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
656
657     @property
658     def topology(self):
659         return self.scenario_cfg['topology']
660
661
662 class SampleVNF(GenericVNF):
663     """ Class providing file-like API for generic VNF implementation """
664
665     VNF_PROMPT = "pipeline>"
666     WAIT_TIME = 1
667
668     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
669         super(SampleVNF, self).__init__(name, vnfd)
670         self.bin_path = get_nsb_option('bin_path', '')
671
672         self.scenario_helper = ScenarioHelper(self.name)
673         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
674
675         if setup_env_helper_type is None:
676             setup_env_helper_type = SetupEnvHelper
677
678         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
679                                                   self.ssh_helper,
680                                                   self.scenario_helper)
681
682         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
683
684         if resource_helper_type is None:
685             resource_helper_type = ResourceHelper
686
687         self.resource_helper = resource_helper_type(self.setup_helper)
688
689         self.context_cfg = None
690         self.nfvi_context = None
691         self.pipeline_kwargs = {}
692         self.priv_ports = None
693         self.pub_ports = None
694         # TODO(esm): make QueueFileWrapper invert-able so that we
695         #            never have to manage the queues
696         self.q_in = Queue()
697         self.q_out = Queue()
698         self.queue_wrapper = None
699         self.run_kwargs = {}
700         self.used_drivers = {}
701         self.vnf_port_pairs = None
702         self._vnf_process = None
703
704     def _build_ports(self):
705         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
706         self.networks = self._port_pairs.networks
707         self.priv_ports = self.vnfd_helper.port_nums(self._port_pairs.priv_ports)
708         self.pub_ports = self.vnfd_helper.port_nums(self._port_pairs.pub_ports)
709         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
710
711     def _get_route_data(self, route_index, route_type):
712         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
713         for _ in range(route_index):
714             next(route_iter, '')
715         return next(route_iter, {}).get(route_type, '')
716
717     def _get_port0localip6(self):
718         return_value = self._get_route_data(0, 'network')
719         LOG.info("_get_port0localip6 : %s", return_value)
720         return return_value
721
722     def _get_port1localip6(self):
723         return_value = self._get_route_data(1, 'network')
724         LOG.info("_get_port1localip6 : %s", return_value)
725         return return_value
726
727     def _get_port0prefixlen6(self):
728         return_value = self._get_route_data(0, 'netmask')
729         LOG.info("_get_port0prefixlen6 : %s", return_value)
730         return return_value
731
732     def _get_port1prefixlen6(self):
733         return_value = self._get_route_data(1, 'netmask')
734         LOG.info("_get_port1prefixlen6 : %s", return_value)
735         return return_value
736
737     def _get_port0gateway6(self):
738         return_value = self._get_route_data(0, 'network')
739         LOG.info("_get_port0gateway6 : %s", return_value)
740         return return_value
741
742     def _get_port1gateway6(self):
743         return_value = self._get_route_data(1, 'network')
744         LOG.info("_get_port1gateway6 : %s", return_value)
745         return return_value
746
747     def _start_vnf(self):
748         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
749         self._vnf_process = Process(target=self._run)
750         self._vnf_process.start()
751
752     def _vnf_up_post(self):
753         pass
754
755     def instantiate(self, scenario_cfg, context_cfg):
756         self.scenario_helper.scenario_cfg = scenario_cfg
757         self.context_cfg = context_cfg
758         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
759         # self.nfvi_context = None
760
761         self.deploy_helper.deploy_vnfs(self.APP_NAME)
762         self.resource_helper.setup()
763         self._start_vnf()
764
765     def wait_for_instantiate(self):
766         buf = []
767         time.sleep(self.WAIT_TIME)  # Give some time for config to load
768         while True:
769             if not self._vnf_process.is_alive():
770                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
771
772             # TODO(esm): move to QueueFileWrapper
773             while self.q_out.qsize() > 0:
774                 buf.append(self.q_out.get())
775                 message = ''.join(buf)
776                 if self.VNF_PROMPT in message:
777                     LOG.info("%s VNF is up and running.", self.APP_NAME)
778                     self._vnf_up_post()
779                     self.queue_wrapper.clear()
780                     self.resource_helper.start_collect()
781                     return self._vnf_process.exitcode
782
783                 if "PANIC" in message:
784                     raise RuntimeError("Error starting %s VNF." %
785                                        self.APP_NAME)
786
787             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
788             time.sleep(1)
789             # Send ENTER to display a new prompt in case the prompt text was corrupted
790             # by other VNF output
791             self.q_in.put('\r\n')
792
793     def _build_run_kwargs(self):
794         self.run_kwargs = {
795             'stdin': self.queue_wrapper,
796             'stdout': self.queue_wrapper,
797             'keep_stdin_open': True,
798             'pty': True,
799         }
800
801     def _build_config(self):
802         return self.setup_helper.build_config()
803
804     def _run(self):
805         # we can't share ssh paramiko objects to force new connection
806         self.ssh_helper.drop_connection()
807         cmd = self._build_config()
808         # kill before starting
809         self.setup_helper.kill_vnf()
810
811         LOG.debug(cmd)
812         self._build_run_kwargs()
813         self.ssh_helper.run(cmd, **self.run_kwargs)
814
815     def vnf_execute(self, cmd, wait_time=2):
816         """ send cmd to vnf process """
817
818         LOG.info("%s command: %s", self.APP_NAME, cmd)
819         self.q_in.put("{}\r\n".format(cmd))
820         time.sleep(wait_time)
821         output = []
822         while self.q_out.qsize() > 0:
823             output.append(self.q_out.get())
824         return "".join(output)
825
826     def _tear_down(self):
827         pass
828
829     def terminate(self):
830         self.vnf_execute("quit")
831         if self._vnf_process:
832             self._vnf_process.terminate()
833         self.setup_helper.kill_vnf()
834         self._tear_down()
835         self.resource_helper.stop_collect()
836
837     def get_stats(self, *args, **kwargs):
838         """
839         Method for checking the statistics
840
841         :return:
842            VNF statistics
843         """
844         cmd = 'p {0} stats'.format(self.APP_WORD)
845         out = self.vnf_execute(cmd)
846         return out
847
848     def collect_kpi(self):
849         stats = self.get_stats()
850         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
851         if m:
852             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
853             result["collect_stats"] = self.resource_helper.collect_kpi()
854         else:
855             result = {
856                 "packets_in": 0,
857                 "packets_fwd": 0,
858                 "packets_dropped": 0,
859             }
860         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
861         return result
862
863
864 class SampleVNFTrafficGen(GenericTrafficGen):
865     """ Class providing file-like API for generic traffic generator """
866
867     APP_NAME = 'Sample'
868     RUN_WAIT = 1
869
870     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
871         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
872         self.bin_path = get_nsb_option('bin_path', '')
873         self.name = "tgen__1"  # name in topology file
874
875         self.scenario_helper = ScenarioHelper(self.name)
876         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
877
878         if setup_env_helper_type is None:
879             setup_env_helper_type = SetupEnvHelper
880
881         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
882                                                   self.ssh_helper,
883                                                   self.scenario_helper)
884
885         if resource_helper_type is None:
886             resource_helper_type = ClientResourceHelper
887
888         self.resource_helper = resource_helper_type(self.setup_helper)
889
890         self.runs_traffic = True
891         self.traffic_finished = False
892         self._tg_process = None
893         self._traffic_process = None
894
895     def _start_server(self):
896         # we can't share ssh paramiko objects to force new connection
897         self.ssh_helper.drop_connection()
898
899     def instantiate(self, scenario_cfg, context_cfg):
900         self.scenario_helper.scenario_cfg = scenario_cfg
901         self.resource_helper.generate_cfg()
902         self.resource_helper.setup()
903
904         LOG.info("Starting %s server...", self.APP_NAME)
905         self._tg_process = Process(target=self._start_server)
906         self._tg_process.start()
907
908     def wait_for_instantiate(self):
909         # overridden by subclasses
910         return self._wait_for_process()
911
912     def _check_status(self):
913         raise NotImplementedError
914
915     def _wait_for_process(self):
916         while True:
917             if not self._tg_process.is_alive():
918                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
919             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
920             time.sleep(1)
921             status = self._check_status()
922             if status == 0:
923                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
924                 return self._tg_process.exitcode
925
926     def _traffic_runner(self, traffic_profile):
927         # always drop connections first thing in new processes
928         # so we don't get paramiko errors
929         self.ssh_helper.drop_connection()
930         LOG.info("Starting %s client...", self.APP_NAME)
931         self.resource_helper.run_traffic(traffic_profile)
932
933     def run_traffic(self, traffic_profile):
934         """ Generate traffic on the wire according to the given params.
935         Method is non-blocking, returns immediately when traffic process
936         is running. Mandatory.
937
938         :param traffic_profile:
939         :return: True/False
940         """
941         self._traffic_process = Process(target=self._traffic_runner,
942                                         args=(traffic_profile,))
943         self._traffic_process.start()
944         # Wait for traffic process to start
945         while self.resource_helper.client_started.value == 0:
946             time.sleep(self.RUN_WAIT)
947             # what if traffic process takes a few seconds to start?
948             if not self._traffic_process.is_alive():
949                 break
950
951         return self._traffic_process.is_alive()
952
953     def listen_traffic(self, traffic_profile):
954         """ Listen to traffic with the given parameters.
955         Method is non-blocking, returns immediately when traffic process
956         is running. Optional.
957
958         :param traffic_profile:
959         :return: True/False
960         """
961         pass
962
963     def verify_traffic(self, traffic_profile):
964         """ Verify captured traffic after it has ended. Optional.
965
966         :param traffic_profile:
967         :return: dict
968         """
969         pass
970
971     def collect_kpi(self):
972         result = self.resource_helper.collect_kpi()
973         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
974         return result
975
976     def terminate(self):
977         """ After this method finishes, all traffic processes should stop. Mandatory.
978
979         :return: True/False
980         """
981         self.traffic_finished = True
982         if self._traffic_process is not None:
983             self._traffic_process.terminate()