Merge "requirements.txt: add license info"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
39
40 from stl.trex_stl_lib.trex_stl_client import STLClient
41 from stl.trex_stl_lib.trex_stl_client import LoggerApi
42 from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
43
44 from yardstick.ssh import AutoConnectSSH
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     CORES = []
96     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
97     PIPELINE_COMMAND = ''
98     VNF_TYPE = "SAMPLE"
99
100     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101         super(SetupEnvHelper, self).__init__()
102         self.vnfd_helper = vnfd_helper
103         self.ssh_helper = ssh_helper
104         self.scenario_helper = scenario_helper
105
106     def _get_ports_gateway(self, name):
107         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108         for route in routing_table:
109             if name == route['if']:
110                 return route['gateway']
111         return None
112
113     def build_config(self):
114         raise NotImplementedError
115
116     def setup_vnf_environment(self):
117         pass
118         # raise NotImplementedError
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
128     DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
129     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
130
131     HW_DEFAULT_CORE = 3
132     SW_DEFAULT_CORE = 2
133
134     DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self._dpdk_nic_bind = None
167         self.socket = None
168
169     @property
170     def dpdk_nic_bind(self):
171         if self._dpdk_nic_bind is None:
172             self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
173         return self._dpdk_nic_bind
174
175     def _setup_hugepages(self):
176         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
177         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
178
179         memory_path = \
180             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
181         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
182
183         if hugepages == "2048kB":
184             pages = 16384
185         else:
186             pages = 16
187
188         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
189
190     def _get_dpdk_port_num(self, name):
191         interface = self.vnfd_helper.find_interface(name=name)
192         return interface['virtual-interface']['dpdk_port_num']
193
194     def build_config(self):
195         vnf_cfg = self.scenario_helper.vnf_cfg
196         task_path = self.scenario_helper.task_path
197
198         lb_count = vnf_cfg.get('lb_count', 3)
199         lb_config = vnf_cfg.get('lb_config', 'SW')
200         worker_config = vnf_cfg.get('worker_config', '1C/1T')
201         worker_threads = vnf_cfg.get('worker_threads', 3)
202
203         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
204         traffic_options = {
205             'traffic_type': traffic_type,
206             'pkt_type': 'ipv%s' % traffic_type,
207             'vnf_type': self.VNF_TYPE,
208         }
209
210         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
211         config_basename = posixpath.basename(self.CFG_CONFIG)
212         script_basename = posixpath.basename(self.CFG_SCRIPT)
213         multiport = MultiPortConfig(self.scenario_helper.topology,
214                                     config_tpl_cfg,
215                                     config_basename,
216                                     self.vnfd_helper.interfaces,
217                                     self.VNF_TYPE,
218                                     lb_count,
219                                     worker_threads,
220                                     worker_config,
221                                     lb_config,
222                                     self.socket)
223
224         multiport.generate_config()
225         with open(self.CFG_CONFIG) as handle:
226             new_config = handle.read()
227
228         new_config = self._update_traffic_type(new_config, traffic_options)
229         new_config = self._update_packet_type(new_config, traffic_options)
230
231         self.ssh_helper.upload_config_file(config_basename, new_config)
232         self.ssh_helper.upload_config_file(script_basename,
233                                            multiport.generate_script(self.vnfd_helper))
234         self.all_ports = multiport.port_pair_list
235
236         LOG.info("Provision and start the %s", self.APP_NAME)
237         self._build_pipeline_kwargs()
238         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
239
240     def _build_pipeline_kwargs(self):
241         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
242         ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
243         self.pipeline_kwargs = {
244             'cfg_file': self.CFG_CONFIG,
245             'script': self.CFG_SCRIPT,
246             'ports_len_hex': ports_len_hex,
247             'tool_path': tool_path,
248         }
249
250     def _get_app_cpu(self):
251         if self.CORES:
252             return self.CORES
253
254         vnf_cfg = self.scenario_helper.vnf_cfg
255         sys_obj = CpuSysCores(self.ssh_helper)
256         self.sys_cpu = sys_obj.get_core_socket()
257         num_core = int(vnf_cfg["worker_threads"])
258         if vnf_cfg.get("lb_config", "SW") == 'HW':
259             num_core += self.HW_DEFAULT_CORE
260         else:
261             num_core += self.SW_DEFAULT_CORE
262         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
263         return app_cpu
264
265     def _get_cpu_sibling_list(self, cores=None):
266         if cores is None:
267             cores = self._get_app_cpu()
268         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
269         awk_template = "awk -F: '{ print $1 }' < %s"
270         sys_path = "/sys/devices/system/cpu/"
271         cpu_topology = []
272         try:
273             for core in cores:
274                 sys_cmd = sys_cmd_template % (sys_path, core)
275                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
276                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
277
278             return cpu_topology
279         except Exception:
280             return []
281
282     def _validate_cpu_cfg(self):
283         return self._get_cpu_sibling_list()
284
285     def _find_used_drivers(self):
286         cmd = "{0} -s".format(self.dpdk_nic_bind)
287         rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
288
289         self.used_drivers = {
290             vpci: (index, driver)
291             for index, (vpci, driver)
292             in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
293             if any(b.endswith(vpci) for b in self.bound_pci)
294         }
295
296     def setup_vnf_environment(self):
297         self._setup_dpdk()
298         resource = self._setup_resources()
299         self._kill_vnf()
300         self._detect_drivers()
301         return resource
302
303     def _kill_vnf(self):
304         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
305
306     def _setup_dpdk(self):
307         """ setup dpdk environment needed for vnf to run """
308
309         self._setup_hugepages()
310         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
311
312         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
313         if exit_status == 0:
314             return
315
316         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
317         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
318         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
319         if exit_status != 0:
320             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
321
322     def _setup_resources(self):
323         interfaces = self.vnfd_helper.interfaces
324         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
325
326         # what is this magic?  how do we know which socket is for which port?
327         # what about quad-socket?
328         if any(v[5] == "0" for v in self.bound_pci):
329             self.socket = 0
330         else:
331             self.socket = 1
332
333         cores = self._validate_cpu_cfg()
334         return ResourceProfile(self.vnfd_helper.mgmt_interface,
335                                interfaces=self.vnfd_helper.interfaces, cores=cores)
336
337     def _detect_drivers(self):
338         interfaces = self.vnfd_helper.interfaces
339
340         self._find_used_drivers()
341         for vpci, (index, _) in self.used_drivers.items():
342             try:
343                 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
344             except StopIteration:
345                 pass
346             else:
347                 intf1['dpdk_port_num'] = index
348
349         for vpci in self.bound_pci:
350             self._bind_dpdk('igb_uio', vpci)
351             time.sleep(2)
352
353     def _bind_dpdk(self, driver, vpci, force=True):
354         if force:
355             force = '--force '
356         else:
357             force = ''
358         cmd = self.DPDK_BIND_CMD.format(force=force,
359                                         dpdk_nic_bind=self.dpdk_nic_bind,
360                                         driver=driver,
361                                         vpci=vpci)
362         self.ssh_helper.execute(cmd)
363
364     def _detect_and_bind_dpdk(self, vpci, driver):
365         find_net_cmd = self.FIND_NET_CMD.format(vpci)
366         exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
367         if exit_status == 0:
368             # already bound
369             return None
370         self._bind_dpdk(driver, vpci)
371         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
372         if exit_status != 0:
373             # failed to bind
374             return None
375         return stdout
376
377     def _bind_kernel_devices(self):
378         for intf in self.vnfd_helper.interfaces:
379             vi = intf["virtual-interface"]
380             stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
381             if stdout is not None:
382                 vi["local_iface_name"] = posixpath.basename(stdout)
383
384     def tear_down(self):
385         for vpci, (_, driver) in self.used_drivers.items():
386             self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
387                                                                 driver=driver,
388                                                                 vpci=vpci))
389
390
391 class ResourceHelper(object):
392
393     COLLECT_KPI = ''
394     MAKE_INSTALL = 'cd {0} && make && sudo make install'
395     RESOURCE_WORD = 'sample'
396
397     COLLECT_MAP = {}
398
399     def __init__(self, setup_helper):
400         super(ResourceHelper, self).__init__()
401         self.resource = None
402         self.setup_helper = setup_helper
403         self.ssh_helper = setup_helper.ssh_helper
404
405     def setup(self):
406         self.resource = self.setup_helper.setup_vnf_environment()
407
408     def generate_cfg(self):
409         pass
410
411     def _collect_resource_kpi(self):
412         result = {}
413         status = self.resource.check_if_sa_running("collectd")[0]
414         if status:
415             result = self.resource.amqp_collect_nfvi_kpi()
416
417         result = {"core": result}
418         return result
419
420     def start_collect(self):
421         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
422         self.resource.start()
423         self.resource.amqp_process_for_nfvi_kpi()
424
425     def stop_collect(self):
426         if self.resource:
427             self.resource.stop()
428
429     def collect_kpi(self):
430         return self._collect_resource_kpi()
431
432
433 class ClientResourceHelper(ResourceHelper):
434
435     RUN_DURATION = 60
436     QUEUE_WAIT_TIME = 5
437     SYNC_PORT = 1
438     ASYNC_PORT = 2
439
440     def __init__(self, setup_helper):
441         super(ClientResourceHelper, self).__init__(setup_helper)
442         self.vnfd_helper = setup_helper.vnfd_helper
443         self.scenario_helper = setup_helper.scenario_helper
444
445         self.client = None
446         self.client_started = Value('i', 0)
447         self.my_ports = None
448         self._queue = Queue()
449         self._result = {}
450         self._terminated = Value('i', 0)
451         self._vpci_ascending = None
452
453     def _build_ports(self):
454         self.my_ports = [0, 1]
455
456     def get_stats(self, *args, **kwargs):
457         try:
458             return self.client.get_stats(*args, **kwargs)
459         except STLStateError:
460             LOG.exception("TRex client not connected")
461             return {}
462
463     def generate_samples(self, key=None, default=None):
464         last_result = self.get_stats(self.my_ports)
465         key_value = last_result.get(key, default)
466
467         if not isinstance(last_result, Mapping):  # added for mock unit test
468             self._terminated.value = 1
469             return {}
470
471         samples = {}
472         for vpci_idx, vpci in enumerate(self._vpci_ascending):
473             name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
474             # fixme: VNFDs KPIs values needs to be mapped to TRex structure
475             xe_value = last_result.get(vpci_idx, {})
476             samples[name] = {
477                 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
478                 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
479                 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
480                 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
481                 "in_packets": int(xe_value.get("ipackets", 0)),
482                 "out_packets": int(xe_value.get("opackets", 0)),
483             }
484             if key:
485                 samples[name][key] = key_value
486         return samples
487
488     def _run_traffic_once(self, traffic_profile):
489         traffic_profile.execute(self)
490         self.client_started.value = 1
491         time.sleep(self.RUN_DURATION)
492         samples = self.generate_samples()
493         time.sleep(self.QUEUE_WAIT_TIME)
494         self._queue.put(samples)
495
496     def run_traffic(self, traffic_profile):
497         # fixme: fix passing correct trex config file,
498         # instead of searching the default path
499         self._build_ports()
500         self.client = self._connect()
501         self.client.reset(ports=self.my_ports)
502         self.client.remove_all_streams(self.my_ports)  # remove all streams
503         traffic_profile.register_generator(self)
504
505         while self._terminated.value == 0:
506             self._run_traffic_once(traffic_profile)
507
508         self.client.stop(self.my_ports)
509         self.client.disconnect()
510         self._terminated.value = 0
511
512     def terminate(self):
513         self._terminated.value = 1  # stop client
514
515     def clear_stats(self, ports=None):
516         if ports is None:
517             ports = self.my_ports
518         self.client.clear_stats(ports=ports)
519
520     def start(self, ports=None, *args, **kwargs):
521         if ports is None:
522             ports = self.my_ports
523         self.client.start(ports=ports, *args, **kwargs)
524
525     def collect_kpi(self):
526         if not self._queue.empty():
527             kpi = self._queue.get()
528             self._result.update(kpi)
529         LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
530         return self._result
531
532     def _connect(self, client=None):
533         if client is None:
534             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
535                                server=self.vnfd_helper.mgmt_interface["ip"],
536                                verbose_level=LoggerApi.VERBOSE_QUIET)
537
538         # try to connect with 5s intervals, 30s max
539         for idx in range(6):
540             try:
541                 client.connect()
542                 break
543             except STLError:
544                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
545                 time.sleep(5)
546         return client
547
548
549 class Rfc2544ResourceHelper(object):
550
551     DEFAULT_CORRELATED_TRAFFIC = False
552     DEFAULT_LATENCY = False
553     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
554
555     def __init__(self, scenario_helper):
556         super(Rfc2544ResourceHelper, self).__init__()
557         self.scenario_helper = scenario_helper
558         self._correlated_traffic = None
559         self.iteration = Value('i', 0)
560         self._latency = None
561         self._rfc2544 = None
562         self._tolerance_low = None
563         self._tolerance_high = None
564
565     @property
566     def rfc2544(self):
567         if self._rfc2544 is None:
568             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
569         return self._rfc2544
570
571     @property
572     def tolerance_low(self):
573         if self._tolerance_low is None:
574             self.get_rfc_tolerance()
575         return self._tolerance_low
576
577     @property
578     def tolerance_high(self):
579         if self._tolerance_high is None:
580             self.get_rfc_tolerance()
581         return self._tolerance_high
582
583     @property
584     def correlated_traffic(self):
585         if self._correlated_traffic is None:
586             self._correlated_traffic = \
587                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
588
589         return self._correlated_traffic
590
591     @property
592     def latency(self):
593         if self._latency is None:
594             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
595         return self._latency
596
597     def get_rfc2544(self, name, default=None):
598         return self.rfc2544.get(name, default)
599
600     def get_rfc_tolerance(self):
601         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
602         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
603         self._tolerance_low = next(tolerance_iter)
604         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
605
606
607 class SampleVNFDeployHelper(object):
608
609     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
610     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
611     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
612
613     def __init__(self, vnfd_helper, ssh_helper):
614         super(SampleVNFDeployHelper, self).__init__()
615         self.ssh_helper = ssh_helper
616         self.vnfd_helper = vnfd_helper
617
618     DISABLE_DEPLOY = True
619
620     def deploy_vnfs(self, app_name):
621         # temp disable for now
622         if self.DISABLE_DEPLOY:
623             return
624
625         vnf_bin = self.ssh_helper.join_bin_path(app_name)
626         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
627         if not exit_status:
628             return
629
630         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
631         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
632         time.sleep(2)
633         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
634         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
635
636         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
637         time.sleep(2)
638         http_proxy = os.environ.get('http_proxy', '')
639         https_proxy = os.environ.get('https_proxy', '')
640         cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
641         LOG.debug(cmd)
642         self.ssh_helper.execute(cmd)
643         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
644         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
645         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
646
647
648 class ScenarioHelper(object):
649
650     DEFAULT_VNF_CFG = {
651         'lb_config': 'SW',
652         'lb_count': 1,
653         'worker_config': '1C/1T',
654         'worker_threads': 1,
655     }
656
657     def __init__(self, name):
658         self.name = name
659         self.scenario_cfg = None
660
661     @property
662     def task_path(self):
663         return self.scenario_cfg["task_path"]
664
665     @property
666     def nodes(self):
667         return self.scenario_cfg['nodes']
668
669     @property
670     def all_options(self):
671         return self.scenario_cfg["options"]
672
673     @property
674     def options(self):
675         return self.all_options[self.name]
676
677     @property
678     def vnf_cfg(self):
679         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
680
681     @property
682     def topology(self):
683         return self.scenario_cfg['topology']
684
685
686 class SampleVNF(GenericVNF):
687     """ Class providing file-like API for generic VNF implementation """
688
689     VNF_PROMPT = "pipeline>"
690     WAIT_TIME = 1
691
692     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
693         super(SampleVNF, self).__init__(name, vnfd)
694         self.bin_path = get_nsb_option('bin_path', '')
695
696         self.scenario_helper = ScenarioHelper(self.name)
697         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
698
699         if setup_env_helper_type is None:
700             setup_env_helper_type = SetupEnvHelper
701
702         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
703                                                   self.ssh_helper,
704                                                   self.scenario_helper)
705
706         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
707
708         if resource_helper_type is None:
709             resource_helper_type = ResourceHelper
710
711         self.resource_helper = resource_helper_type(self.setup_helper)
712
713         self.all_ports = None
714         self.context_cfg = None
715         self.nfvi_context = None
716         self.pipeline_kwargs = {}
717         self.priv_ports = None
718         self.pub_ports = None
719         # TODO(esm): make QueueFileWrapper invert-able so that we
720         #            never have to manage the queues
721         self.q_in = Queue()
722         self.q_out = Queue()
723         self.queue_wrapper = None
724         self.run_kwargs = {}
725         self.scenario_cfg = None
726         self.tg_port_pairs = None
727         self.used_drivers = {}
728         self.vnf_port_pairs = None
729         self._vnf_process = None
730
731     def _get_route_data(self, route_index, route_type):
732         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
733         for _ in range(route_index):
734             next(route_iter, '')
735         return next(route_iter, {}).get(route_type, '')
736
737     def _get_port0localip6(self):
738         return_value = self._get_route_data(0, 'network')
739         LOG.info("_get_port0localip6 : %s", return_value)
740         return return_value
741
742     def _get_port1localip6(self):
743         return_value = self._get_route_data(1, 'network')
744         LOG.info("_get_port1localip6 : %s", return_value)
745         return return_value
746
747     def _get_port0prefixlen6(self):
748         return_value = self._get_route_data(0, 'netmask')
749         LOG.info("_get_port0prefixlen6 : %s", return_value)
750         return return_value
751
752     def _get_port1prefixlen6(self):
753         return_value = self._get_route_data(1, 'netmask')
754         LOG.info("_get_port1prefixlen6 : %s", return_value)
755         return return_value
756
757     def _get_port0gateway6(self):
758         return_value = self._get_route_data(0, 'network')
759         LOG.info("_get_port0gateway6 : %s", return_value)
760         return return_value
761
762     def _get_port1gateway6(self):
763         return_value = self._get_route_data(1, 'network')
764         LOG.info("_get_port1gateway6 : %s", return_value)
765         return return_value
766
767     def _start_vnf(self):
768         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
769         self._vnf_process = Process(target=self._run)
770         self._vnf_process.start()
771
772     def _vnf_up_post(self):
773         pass
774
775     def instantiate(self, scenario_cfg, context_cfg):
776         self.scenario_helper.scenario_cfg = scenario_cfg
777         self.context_cfg = context_cfg
778         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
779         # self.nfvi_context = None
780
781         self.deploy_helper.deploy_vnfs(self.APP_NAME)
782         self.resource_helper.setup()
783         self._start_vnf()
784
785     def wait_for_instantiate(self):
786         buf = []
787         time.sleep(self.WAIT_TIME)  # Give some time for config to load
788         while True:
789             if not self._vnf_process.is_alive():
790                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
791
792             # TODO(esm): move to QueueFileWrapper
793             while self.q_out.qsize() > 0:
794                 buf.append(self.q_out.get())
795                 message = ''.join(buf)
796                 if self.VNF_PROMPT in message:
797                     LOG.info("%s VNF is up and running.", self.APP_NAME)
798                     self._vnf_up_post()
799                     self.queue_wrapper.clear()
800                     self.resource_helper.start_collect()
801                     return self._vnf_process.exitcode
802
803                 if "PANIC" in message:
804                     raise RuntimeError("Error starting %s VNF." %
805                                        self.APP_NAME)
806
807             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
808             time.sleep(1)
809
810     def _build_run_kwargs(self):
811         self.run_kwargs = {
812             'stdin': self.queue_wrapper,
813             'stdout': self.queue_wrapper,
814             'keep_stdin_open': True,
815             'pty': True,
816         }
817
818     def _build_config(self):
819         return self.setup_helper.build_config()
820
821     def _run(self):
822         # we can't share ssh paramiko objects to force new connection
823         self.ssh_helper.drop_connection()
824         cmd = self._build_config()
825         # kill before starting
826         self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
827
828         LOG.debug(cmd)
829         self._build_run_kwargs()
830         self.ssh_helper.run(cmd, **self.run_kwargs)
831
832     def vnf_execute(self, cmd, wait_time=2):
833         """ send cmd to vnf process """
834
835         LOG.info("%s command: %s", self.APP_NAME, cmd)
836         self.q_in.put("{}\r\n".format(cmd))
837         time.sleep(wait_time)
838         output = []
839         while self.q_out.qsize() > 0:
840             output.append(self.q_out.get())
841         return "".join(output)
842
843     def _tear_down(self):
844         pass
845
846     def terminate(self):
847         self.vnf_execute("quit")
848         if self._vnf_process:
849             self._vnf_process.terminate()
850         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
851         self._tear_down()
852         self.resource_helper.stop_collect()
853
854     def get_stats(self, *args, **kwargs):
855         """
856         Method for checking the statistics
857
858         :return:
859            VNF statistics
860         """
861         cmd = 'p {0} stats'.format(self.APP_WORD)
862         out = self.vnf_execute(cmd)
863         return out
864
865     def collect_kpi(self):
866         stats = self.get_stats()
867         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
868         if m:
869             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
870             result["collect_stats"] = self.resource_helper.collect_kpi()
871         else:
872             result = {
873                 "packets_in": 0,
874                 "packets_fwd": 0,
875                 "packets_dropped": 0,
876             }
877         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
878         return result
879
880
881 class SampleVNFTrafficGen(GenericTrafficGen):
882     """ Class providing file-like API for generic traffic generator """
883
884     APP_NAME = 'Sample'
885     RUN_WAIT = 1
886
887     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
888         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
889         self.bin_path = get_nsb_option('bin_path', '')
890         self.name = "tgen__1"  # name in topology file
891
892         self.scenario_helper = ScenarioHelper(self.name)
893         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
894
895         if setup_env_helper_type is None:
896             setup_env_helper_type = SetupEnvHelper
897
898         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
899                                                   self.ssh_helper,
900                                                   self.scenario_helper)
901
902         if resource_helper_type is None:
903             resource_helper_type = ClientResourceHelper
904
905         self.resource_helper = resource_helper_type(self.setup_helper)
906
907         self.runs_traffic = True
908         self.traffic_finished = False
909         self.tg_port_pairs = None
910         self._tg_process = None
911         self._traffic_process = None
912
913     def _start_server(self):
914         # we can't share ssh paramiko objects to force new connection
915         self.ssh_helper.drop_connection()
916
917     def instantiate(self, scenario_cfg, context_cfg):
918         self.scenario_helper.scenario_cfg = scenario_cfg
919         self.resource_helper.generate_cfg()
920         self.setup_helper.setup_vnf_environment()
921         self.resource_helper.setup()
922
923         LOG.info("Starting %s server...", self.APP_NAME)
924         self._tg_process = Process(target=self._start_server)
925         self._tg_process.start()
926
927     def wait_for_instantiate(self):
928         return self._wait_for_process()
929
930     def _check_status(self):
931         raise NotImplementedError
932
933     def _wait_for_process(self):
934         while True:
935             if not self._tg_process.is_alive():
936                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
937             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
938             time.sleep(1)
939             status = self._check_status()
940             if status == 0:
941                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
942                 return self._tg_process.exitcode
943
944     def _traffic_runner(self, traffic_profile):
945         LOG.info("Starting %s client...", self.APP_NAME)
946         self.resource_helper.run_traffic(traffic_profile)
947
948     def run_traffic(self, traffic_profile):
949         """ Generate traffic on the wire according to the given params.
950         Method is non-blocking, returns immediately when traffic process
951         is running. Mandatory.
952
953         :param traffic_profile:
954         :return: True/False
955         """
956         self._traffic_process = Process(target=self._traffic_runner,
957                                         args=(traffic_profile,))
958         self._traffic_process.start()
959         # Wait for traffic process to start
960         while self.resource_helper.client_started.value == 0:
961             time.sleep(self.RUN_WAIT)
962
963         return self._traffic_process.is_alive()
964
965     def listen_traffic(self, traffic_profile):
966         """ Listen to traffic with the given parameters.
967         Method is non-blocking, returns immediately when traffic process
968         is running. Optional.
969
970         :param traffic_profile:
971         :return: True/False
972         """
973         pass
974
975     def verify_traffic(self, traffic_profile):
976         """ Verify captured traffic after it has ended. Optional.
977
978         :param traffic_profile:
979         :return: dict
980         """
981         pass
982
983     def collect_kpi(self):
984         result = self.resource_helper.collect_kpi()
985         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
986         return result
987
988     def terminate(self):
989         """ After this method finishes, all traffic processes should stop. Mandatory.
990
991         :return: True/False
992         """
993         self.traffic_finished = True
994         if self._traffic_process is not None:
995             self._traffic_process.terminate()