Merge "Heat: use pkey from string instead of key_filename"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     CORES = []
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def _get_ports_gateway(self, name):
109         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110         for route in routing_table:
111             if name == route['if']:
112                 return route['gateway']
113         return None
114
115     def build_config(self):
116         raise NotImplementedError
117
118     def setup_vnf_environment(self):
119         pass
120
121     def kill_vnf(self):
122         pass
123
124     def tear_down(self):
125         raise NotImplementedError
126
127
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129
130     APP_NAME = 'DpdkVnf'
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self.socket = None
167         self.used_drivers = None
168         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
169
170     def _setup_hugepages(self):
171         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
173
174         memory_path = \
175             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
177
178         if hugepages == "2048kB":
179             pages = 8192
180         else:
181             pages = 16
182
183         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
184
185     def build_config(self):
186         vnf_cfg = self.scenario_helper.vnf_cfg
187         task_path = self.scenario_helper.task_path
188
189         lb_count = vnf_cfg.get('lb_count', 3)
190         lb_config = vnf_cfg.get('lb_config', 'SW')
191         worker_config = vnf_cfg.get('worker_config', '1C/1T')
192         worker_threads = vnf_cfg.get('worker_threads', 3)
193
194         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
195         traffic_options = {
196             'traffic_type': traffic_type,
197             'pkt_type': 'ipv%s' % traffic_type,
198             'vnf_type': self.VNF_TYPE,
199         }
200
201         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202         config_basename = posixpath.basename(self.CFG_CONFIG)
203         script_basename = posixpath.basename(self.CFG_SCRIPT)
204         multiport = MultiPortConfig(self.scenario_helper.topology,
205                                     config_tpl_cfg,
206                                     config_basename,
207                                     self.vnfd_helper,
208                                     self.VNF_TYPE,
209                                     lb_count,
210                                     worker_threads,
211                                     worker_config,
212                                     lb_config,
213                                     self.socket)
214
215         multiport.generate_config()
216         with open(self.CFG_CONFIG) as handle:
217             new_config = handle.read()
218
219         new_config = self._update_traffic_type(new_config, traffic_options)
220         new_config = self._update_packet_type(new_config, traffic_options)
221
222         self.ssh_helper.upload_config_file(config_basename, new_config)
223         self.ssh_helper.upload_config_file(script_basename,
224                                            multiport.generate_script(self.vnfd_helper))
225
226         LOG.info("Provision and start the %s", self.APP_NAME)
227         self._build_pipeline_kwargs()
228         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
229
230     def _build_pipeline_kwargs(self):
231         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232         # count the number of actual ports in the list of pairs
233         # remove duplicate ports
234         # this is really a mapping from LINK ID to DPDK PMD ID
235         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237         ports = self.vnfd_helper.port_pairs.all_ports
238         port_nums = self.vnfd_helper.port_nums(ports)
239         # create mask from all the dpdk port numbers
240         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241         self.pipeline_kwargs = {
242             'cfg_file': self.CFG_CONFIG,
243             'script': self.CFG_SCRIPT,
244             'port_mask_hex': ports_mask_hex,
245             'tool_path': tool_path,
246         }
247
248     def _get_app_cpu(self):
249         if self.CORES:
250             return self.CORES
251
252         vnf_cfg = self.scenario_helper.vnf_cfg
253         sys_obj = CpuSysCores(self.ssh_helper)
254         self.sys_cpu = sys_obj.get_core_socket()
255         num_core = int(vnf_cfg["worker_threads"])
256         if vnf_cfg.get("lb_config", "SW") == 'HW':
257             num_core += self.HW_DEFAULT_CORE
258         else:
259             num_core += self.SW_DEFAULT_CORE
260         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
261         return app_cpu
262
263     def _get_cpu_sibling_list(self, cores=None):
264         if cores is None:
265             cores = self._get_app_cpu()
266         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267         awk_template = "awk -F: '{ print $1 }' < %s"
268         sys_path = "/sys/devices/system/cpu/"
269         cpu_topology = []
270         try:
271             for core in cores:
272                 sys_cmd = sys_cmd_template % (sys_path, core)
273                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
275
276             return cpu_topology
277         except Exception:
278             return []
279
280     def _validate_cpu_cfg(self):
281         return self._get_cpu_sibling_list()
282
283     def setup_vnf_environment(self):
284         self._setup_dpdk()
285         resource = self._setup_resources()
286         self.kill_vnf()
287         self._detect_and_bind_drivers()
288         return resource
289
290     def kill_vnf(self):
291         # have to use exact match
292         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
293
294     def _setup_dpdk(self):
295         """ setup dpdk environment needed for vnf to run """
296
297         self._setup_hugepages()
298         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
299
300         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
301         if exit_status == 0:
302             return
303
304         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
305         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
306         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
307         if exit_status != 0:
308             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
309
310     def _setup_resources(self):
311         interfaces = self.vnfd_helper.interfaces
312         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
313
314         # what is this magic?  how do we know which socket is for which port?
315         # what about quad-socket?
316         if any(v[5] == "0" for v in self.bound_pci):
317             self.socket = 0
318         else:
319             self.socket = 1
320
321         cores = self._validate_cpu_cfg()
322         return ResourceProfile(self.vnfd_helper.mgmt_interface,
323                                interfaces=self.vnfd_helper.interfaces, cores=cores)
324
325     def _detect_and_bind_drivers(self):
326         interfaces = self.vnfd_helper.interfaces
327
328         self.dpdk_bind_helper.read_status()
329         self.dpdk_bind_helper.save_used_drivers()
330
331         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
332
333         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
334         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
335             try:
336                 intf = next(v for v in interfaces
337                             if vpci == v['virtual-interface']['vpci'])
338                 # force to int
339                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
340             except:
341                 pass
342         time.sleep(2)
343
344     def get_local_iface_name_by_vpci(self, vpci):
345         find_net_cmd = self.FIND_NET_CMD.format(vpci)
346         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
347         if exit_status == 0:
348             return stdout
349         return None
350
351     def tear_down(self):
352         self.dpdk_bind_helper.rebind_drivers()
353
354
355 class ResourceHelper(object):
356
357     COLLECT_KPI = ''
358     MAKE_INSTALL = 'cd {0} && make && sudo make install'
359     RESOURCE_WORD = 'sample'
360
361     COLLECT_MAP = {}
362
363     def __init__(self, setup_helper):
364         super(ResourceHelper, self).__init__()
365         self.resource = None
366         self.setup_helper = setup_helper
367         self.ssh_helper = setup_helper.ssh_helper
368
369     def setup(self):
370         self.resource = self.setup_helper.setup_vnf_environment()
371
372     def generate_cfg(self):
373         pass
374
375     def _collect_resource_kpi(self):
376         result = {}
377         status = self.resource.check_if_sa_running("collectd")[0]
378         if status:
379             result = self.resource.amqp_collect_nfvi_kpi()
380
381         result = {"core": result}
382         return result
383
384     def start_collect(self):
385         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
386         self.resource.start()
387         self.resource.amqp_process_for_nfvi_kpi()
388
389     def stop_collect(self):
390         if self.resource:
391             self.resource.stop()
392
393     def collect_kpi(self):
394         return self._collect_resource_kpi()
395
396
397 class ClientResourceHelper(ResourceHelper):
398
399     RUN_DURATION = 60
400     QUEUE_WAIT_TIME = 5
401     SYNC_PORT = 1
402     ASYNC_PORT = 2
403
404     def __init__(self, setup_helper):
405         super(ClientResourceHelper, self).__init__(setup_helper)
406         self.vnfd_helper = setup_helper.vnfd_helper
407         self.scenario_helper = setup_helper.scenario_helper
408
409         self.client = None
410         self.client_started = Value('i', 0)
411         self.all_ports = None
412         self._queue = Queue()
413         self._result = {}
414         self._terminated = Value('i', 0)
415         self._vpci_ascending = None
416
417     def _build_ports(self):
418         self.networks = self.vnfd_helper.port_pairs.networks
419         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
420         self.downlink_ports = \
421             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
422         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
423
424     def get_stats(self, *args, **kwargs):
425         try:
426             return self.client.get_stats(*args, **kwargs)
427         except STLError:
428             LOG.exception("TRex client not connected")
429             return {}
430
431     def generate_samples(self, ports, key=None, default=None):
432         # needs to be used ports
433         last_result = self.get_stats(ports)
434         key_value = last_result.get(key, default)
435
436         if not isinstance(last_result, Mapping):  # added for mock unit test
437             self._terminated.value = 1
438             return {}
439
440         samples = {}
441         # recalculate port for interface and see if it matches ports provided
442         for intf in self.vnfd_helper.interfaces:
443             name = intf["name"]
444             port = self.vnfd_helper.port_num(name)
445             if port in ports:
446                 xe_value = last_result.get(port, {})
447                 samples[name] = {
448                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
449                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
450                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
451                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
452                     "in_packets": int(xe_value.get("ipackets", 0)),
453                     "out_packets": int(xe_value.get("opackets", 0)),
454                 }
455                 if key:
456                     samples[name][key] = key_value
457         return samples
458
459     def _run_traffic_once(self, traffic_profile):
460         traffic_profile.execute_traffic(self)
461         self.client_started.value = 1
462         time.sleep(self.RUN_DURATION)
463         samples = self.generate_samples(traffic_profile.ports)
464         time.sleep(self.QUEUE_WAIT_TIME)
465         self._queue.put(samples)
466
467     def run_traffic(self, traffic_profile):
468         # fixme: fix passing correct trex config file,
469         # instead of searching the default path
470         try:
471             self._build_ports()
472             self.client = self._connect()
473             self.client.reset(ports=self.all_ports)
474             self.client.remove_all_streams(self.all_ports)  # remove all streams
475             traffic_profile.register_generator(self)
476
477             while self._terminated.value == 0:
478                 self._run_traffic_once(traffic_profile)
479
480             self.client.stop(self.all_ports)
481             self.client.disconnect()
482             self._terminated.value = 0
483         except STLError:
484             if self._terminated.value:
485                 LOG.debug("traffic generator is stopped")
486                 return  # return if trex/tg server is stopped.
487             raise
488
489     def terminate(self):
490         self._terminated.value = 1  # stop client
491
492     def clear_stats(self, ports=None):
493         if ports is None:
494             ports = self.all_ports
495         self.client.clear_stats(ports=ports)
496
497     def start(self, ports=None, *args, **kwargs):
498         if ports is None:
499             ports = self.all_ports
500         self.client.start(ports=ports, *args, **kwargs)
501
502     def collect_kpi(self):
503         if not self._queue.empty():
504             kpi = self._queue.get()
505             self._result.update(kpi)
506             LOG.debug("Got KPIs from _queue for {0} {1}".format(
507                 self.scenario_helper.name, self.RESOURCE_WORD))
508         return self._result
509
510     def _connect(self, client=None):
511         if client is None:
512             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
513                                server=self.vnfd_helper.mgmt_interface["ip"],
514                                verbose_level=LoggerApi.VERBOSE_QUIET)
515
516         # try to connect with 5s intervals, 30s max
517         for idx in range(6):
518             try:
519                 client.connect()
520                 break
521             except STLError:
522                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
523                 time.sleep(5)
524         return client
525
526
527 class Rfc2544ResourceHelper(object):
528
529     DEFAULT_CORRELATED_TRAFFIC = False
530     DEFAULT_LATENCY = False
531     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
532
533     def __init__(self, scenario_helper):
534         super(Rfc2544ResourceHelper, self).__init__()
535         self.scenario_helper = scenario_helper
536         self._correlated_traffic = None
537         self.iteration = Value('i', 0)
538         self._latency = None
539         self._rfc2544 = None
540         self._tolerance_low = None
541         self._tolerance_high = None
542
543     @property
544     def rfc2544(self):
545         if self._rfc2544 is None:
546             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
547         return self._rfc2544
548
549     @property
550     def tolerance_low(self):
551         if self._tolerance_low is None:
552             self.get_rfc_tolerance()
553         return self._tolerance_low
554
555     @property
556     def tolerance_high(self):
557         if self._tolerance_high is None:
558             self.get_rfc_tolerance()
559         return self._tolerance_high
560
561     @property
562     def correlated_traffic(self):
563         if self._correlated_traffic is None:
564             self._correlated_traffic = \
565                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
566
567         return self._correlated_traffic
568
569     @property
570     def latency(self):
571         if self._latency is None:
572             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
573         return self._latency
574
575     def get_rfc2544(self, name, default=None):
576         return self.rfc2544.get(name, default)
577
578     def get_rfc_tolerance(self):
579         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
580         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
581         self._tolerance_low = next(tolerance_iter)
582         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
583
584
585 class SampleVNFDeployHelper(object):
586
587     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
588     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
589     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
590
591     def __init__(self, vnfd_helper, ssh_helper):
592         super(SampleVNFDeployHelper, self).__init__()
593         self.ssh_helper = ssh_helper
594         self.vnfd_helper = vnfd_helper
595
596     DISABLE_DEPLOY = True
597
598     def deploy_vnfs(self, app_name):
599         # temp disable for now
600         if self.DISABLE_DEPLOY:
601             return
602
603         vnf_bin = self.ssh_helper.join_bin_path(app_name)
604         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
605         if not exit_status:
606             return
607
608         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
609         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
610         time.sleep(2)
611         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
612         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
613
614         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
615         time.sleep(2)
616         http_proxy = os.environ.get('http_proxy', '')
617         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
618         LOG.debug(cmd)
619         self.ssh_helper.execute(cmd)
620         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
621         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
622         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
623
624
625 class ScenarioHelper(object):
626
627     DEFAULT_VNF_CFG = {
628         'lb_config': 'SW',
629         'lb_count': 1,
630         'worker_config': '1C/1T',
631         'worker_threads': 1,
632     }
633
634     def __init__(self, name):
635         self.name = name
636         self.scenario_cfg = None
637
638     @property
639     def task_path(self):
640         return self.scenario_cfg['task_path']
641
642     @property
643     def nodes(self):
644         return self.scenario_cfg.get('nodes')
645
646     @property
647     def all_options(self):
648         return self.scenario_cfg.get('options', {})
649
650     @property
651     def options(self):
652         return self.all_options.get(self.name, {})
653
654     @property
655     def vnf_cfg(self):
656         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
657
658     @property
659     def topology(self):
660         return self.scenario_cfg['topology']
661
662
663 class SampleVNF(GenericVNF):
664     """ Class providing file-like API for generic VNF implementation """
665
666     VNF_PROMPT = "pipeline>"
667     WAIT_TIME = 1
668
669     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
670         super(SampleVNF, self).__init__(name, vnfd)
671         self.bin_path = get_nsb_option('bin_path', '')
672
673         self.scenario_helper = ScenarioHelper(self.name)
674         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
675
676         if setup_env_helper_type is None:
677             setup_env_helper_type = SetupEnvHelper
678
679         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
680                                                   self.ssh_helper,
681                                                   self.scenario_helper)
682
683         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
684
685         if resource_helper_type is None:
686             resource_helper_type = ResourceHelper
687
688         self.resource_helper = resource_helper_type(self.setup_helper)
689
690         self.context_cfg = None
691         self.nfvi_context = None
692         self.pipeline_kwargs = {}
693         self.uplink_ports = None
694         self.downlink_ports = None
695         # TODO(esm): make QueueFileWrapper invert-able so that we
696         #            never have to manage the queues
697         self.q_in = Queue()
698         self.q_out = Queue()
699         self.queue_wrapper = None
700         self.run_kwargs = {}
701         self.used_drivers = {}
702         self.vnf_port_pairs = None
703         self._vnf_process = None
704
705     def _build_ports(self):
706         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
707         self.networks = self._port_pairs.networks
708         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
709         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
710         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
711
712     def _get_route_data(self, route_index, route_type):
713         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
714         for _ in range(route_index):
715             next(route_iter, '')
716         return next(route_iter, {}).get(route_type, '')
717
718     def _get_port0localip6(self):
719         return_value = self._get_route_data(0, 'network')
720         LOG.info("_get_port0localip6 : %s", return_value)
721         return return_value
722
723     def _get_port1localip6(self):
724         return_value = self._get_route_data(1, 'network')
725         LOG.info("_get_port1localip6 : %s", return_value)
726         return return_value
727
728     def _get_port0prefixlen6(self):
729         return_value = self._get_route_data(0, 'netmask')
730         LOG.info("_get_port0prefixlen6 : %s", return_value)
731         return return_value
732
733     def _get_port1prefixlen6(self):
734         return_value = self._get_route_data(1, 'netmask')
735         LOG.info("_get_port1prefixlen6 : %s", return_value)
736         return return_value
737
738     def _get_port0gateway6(self):
739         return_value = self._get_route_data(0, 'network')
740         LOG.info("_get_port0gateway6 : %s", return_value)
741         return return_value
742
743     def _get_port1gateway6(self):
744         return_value = self._get_route_data(1, 'network')
745         LOG.info("_get_port1gateway6 : %s", return_value)
746         return return_value
747
748     def _start_vnf(self):
749         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
750         self._vnf_process = Process(target=self._run)
751         self._vnf_process.start()
752
753     def _vnf_up_post(self):
754         pass
755
756     def instantiate(self, scenario_cfg, context_cfg):
757         self.scenario_helper.scenario_cfg = scenario_cfg
758         self.context_cfg = context_cfg
759         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
760         # self.nfvi_context = None
761
762         self.deploy_helper.deploy_vnfs(self.APP_NAME)
763         self.resource_helper.setup()
764         self._start_vnf()
765
766     def wait_for_instantiate(self):
767         buf = []
768         time.sleep(self.WAIT_TIME)  # Give some time for config to load
769         while True:
770             if not self._vnf_process.is_alive():
771                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
772
773             # TODO(esm): move to QueueFileWrapper
774             while self.q_out.qsize() > 0:
775                 buf.append(self.q_out.get())
776                 message = ''.join(buf)
777                 if self.VNF_PROMPT in message:
778                     LOG.info("%s VNF is up and running.", self.APP_NAME)
779                     self._vnf_up_post()
780                     self.queue_wrapper.clear()
781                     self.resource_helper.start_collect()
782                     return self._vnf_process.exitcode
783
784                 if "PANIC" in message:
785                     raise RuntimeError("Error starting %s VNF." %
786                                        self.APP_NAME)
787
788             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
789             time.sleep(1)
790             # Send ENTER to display a new prompt in case the prompt text was corrupted
791             # by other VNF output
792             self.q_in.put('\r\n')
793
794     def _build_run_kwargs(self):
795         self.run_kwargs = {
796             'stdin': self.queue_wrapper,
797             'stdout': self.queue_wrapper,
798             'keep_stdin_open': True,
799             'pty': True,
800         }
801
802     def _build_config(self):
803         return self.setup_helper.build_config()
804
805     def _run(self):
806         # we can't share ssh paramiko objects to force new connection
807         self.ssh_helper.drop_connection()
808         cmd = self._build_config()
809         # kill before starting
810         self.setup_helper.kill_vnf()
811
812         LOG.debug(cmd)
813         self._build_run_kwargs()
814         self.ssh_helper.run(cmd, **self.run_kwargs)
815
816     def vnf_execute(self, cmd, wait_time=2):
817         """ send cmd to vnf process """
818
819         LOG.info("%s command: %s", self.APP_NAME, cmd)
820         self.q_in.put("{}\r\n".format(cmd))
821         time.sleep(wait_time)
822         output = []
823         while self.q_out.qsize() > 0:
824             output.append(self.q_out.get())
825         return "".join(output)
826
827     def _tear_down(self):
828         pass
829
830     def terminate(self):
831         self.vnf_execute("quit")
832         if self._vnf_process:
833             self._vnf_process.terminate()
834         self.setup_helper.kill_vnf()
835         self._tear_down()
836         self.resource_helper.stop_collect()
837
838     def get_stats(self, *args, **kwargs):
839         """
840         Method for checking the statistics
841
842         :return:
843            VNF statistics
844         """
845         cmd = 'p {0} stats'.format(self.APP_WORD)
846         out = self.vnf_execute(cmd)
847         return out
848
849     def collect_kpi(self):
850         stats = self.get_stats()
851         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
852         if m:
853             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
854             result["collect_stats"] = self.resource_helper.collect_kpi()
855         else:
856             result = {
857                 "packets_in": 0,
858                 "packets_fwd": 0,
859                 "packets_dropped": 0,
860             }
861         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
862         return result
863
864
865 class SampleVNFTrafficGen(GenericTrafficGen):
866     """ Class providing file-like API for generic traffic generator """
867
868     APP_NAME = 'Sample'
869     RUN_WAIT = 1
870
871     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
872         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
873         self.bin_path = get_nsb_option('bin_path', '')
874         self.name = "tgen__1"  # name in topology file
875
876         self.scenario_helper = ScenarioHelper(self.name)
877         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
878
879         if setup_env_helper_type is None:
880             setup_env_helper_type = SetupEnvHelper
881
882         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
883                                                   self.ssh_helper,
884                                                   self.scenario_helper)
885
886         if resource_helper_type is None:
887             resource_helper_type = ClientResourceHelper
888
889         self.resource_helper = resource_helper_type(self.setup_helper)
890
891         self.runs_traffic = True
892         self.traffic_finished = False
893         self._tg_process = None
894         self._traffic_process = None
895
896     def _start_server(self):
897         # we can't share ssh paramiko objects to force new connection
898         self.ssh_helper.drop_connection()
899
900     def instantiate(self, scenario_cfg, context_cfg):
901         self.scenario_helper.scenario_cfg = scenario_cfg
902         self.resource_helper.generate_cfg()
903         self.resource_helper.setup()
904
905         LOG.info("Starting %s server...", self.APP_NAME)
906         self._tg_process = Process(target=self._start_server)
907         self._tg_process.start()
908
909     def wait_for_instantiate(self):
910         # overridden by subclasses
911         return self._wait_for_process()
912
913     def _check_status(self):
914         raise NotImplementedError
915
916     def _wait_for_process(self):
917         while True:
918             if not self._tg_process.is_alive():
919                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
920             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
921             time.sleep(1)
922             status = self._check_status()
923             if status == 0:
924                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
925                 return self._tg_process.exitcode
926
927     def _traffic_runner(self, traffic_profile):
928         # always drop connections first thing in new processes
929         # so we don't get paramiko errors
930         self.ssh_helper.drop_connection()
931         LOG.info("Starting %s client...", self.APP_NAME)
932         self.resource_helper.run_traffic(traffic_profile)
933
934     def run_traffic(self, traffic_profile):
935         """ Generate traffic on the wire according to the given params.
936         Method is non-blocking, returns immediately when traffic process
937         is running. Mandatory.
938
939         :param traffic_profile:
940         :return: True/False
941         """
942         self._traffic_process = Process(target=self._traffic_runner,
943                                         args=(traffic_profile,))
944         self._traffic_process.start()
945         # Wait for traffic process to start
946         while self.resource_helper.client_started.value == 0:
947             time.sleep(self.RUN_WAIT)
948             # what if traffic process takes a few seconds to start?
949             if not self._traffic_process.is_alive():
950                 break
951
952         return self._traffic_process.is_alive()
953
954     def listen_traffic(self, traffic_profile):
955         """ Listen to traffic with the given parameters.
956         Method is non-blocking, returns immediately when traffic process
957         is running. Optional.
958
959         :param traffic_profile:
960         :return: True/False
961         """
962         pass
963
964     def verify_traffic(self, traffic_profile):
965         """ Verify captured traffic after it has ended. Optional.
966
967         :param traffic_profile:
968         :return: dict
969         """
970         pass
971
972     def collect_kpi(self):
973         result = self.resource_helper.collect_kpi()
974         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
975         return result
976
977     def terminate(self):
978         """ After this method finishes, all traffic processes should stop. Mandatory.
979
980         :return: True/False
981         """
982         self.traffic_finished = True
983         if self._traffic_process is not None:
984             self._traffic_process.terminate()