NSB update
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
39
40 from stl.trex_stl_lib.trex_stl_client import STLClient
41 from stl.trex_stl_lib.trex_stl_client import LoggerApi
42 from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
43
44 from yardstick.ssh import AutoConnectSSH
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     CORES = []
96     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
97     PIPELINE_COMMAND = ''
98     VNF_TYPE = "SAMPLE"
99
100     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101         super(SetupEnvHelper, self).__init__()
102         self.vnfd_helper = vnfd_helper
103         self.ssh_helper = ssh_helper
104         self.scenario_helper = scenario_helper
105
106     def _get_ports_gateway(self, name):
107         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108         for route in routing_table:
109             if name == route['if']:
110                 return route['gateway']
111         return None
112
113     def build_config(self):
114         raise NotImplementedError
115
116     def setup_vnf_environment(self):
117         pass
118         # raise NotImplementedError
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
128     DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
129     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
130
131     HW_DEFAULT_CORE = 3
132     SW_DEFAULT_CORE = 2
133
134     DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self._dpdk_nic_bind = None
167         self.socket = None
168
169     @property
170     def dpdk_nic_bind(self):
171         if self._dpdk_nic_bind is None:
172             self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
173         return self._dpdk_nic_bind
174
175     def _setup_hugepages(self):
176         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
177         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
178
179         memory_path = \
180             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
181         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
182
183         if hugepages == "2048kB":
184             pages = 16384
185         else:
186             pages = 16
187
188         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
189
190     def _get_dpdk_port_num(self, name):
191         interface = self.vnfd_helper.find_interface(name=name)
192         return interface['virtual-interface']['dpdk_port_num']
193
194     def build_config(self):
195         vnf_cfg = self.scenario_helper.vnf_cfg
196         task_path = self.scenario_helper.task_path
197
198         lb_count = vnf_cfg.get('lb_count', 3)
199         lb_config = vnf_cfg.get('lb_config', 'SW')
200         worker_config = vnf_cfg.get('worker_config', '1C/1T')
201         worker_threads = vnf_cfg.get('worker_threads', 3)
202
203         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
204         traffic_options = {
205             'traffic_type': traffic_type,
206             'pkt_type': 'ipv%s' % traffic_type,
207             'vnf_type': self.VNF_TYPE,
208         }
209
210         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
211         config_basename = posixpath.basename(self.CFG_CONFIG)
212         script_basename = posixpath.basename(self.CFG_SCRIPT)
213         multiport = MultiPortConfig(self.scenario_helper.topology,
214                                     config_tpl_cfg,
215                                     config_basename,
216                                     self.vnfd_helper.interfaces,
217                                     self.VNF_TYPE,
218                                     lb_count,
219                                     worker_threads,
220                                     worker_config,
221                                     lb_config,
222                                     self.socket)
223
224         multiport.generate_config()
225         with open(self.CFG_CONFIG) as handle:
226             new_config = handle.read()
227
228         new_config = self._update_traffic_type(new_config, traffic_options)
229         new_config = self._update_packet_type(new_config, traffic_options)
230
231         self.ssh_helper.upload_config_file(config_basename, new_config)
232         self.ssh_helper.upload_config_file(script_basename,
233                                            multiport.generate_script(self.vnfd_helper))
234         self.all_ports = multiport.port_pair_list
235
236         LOG.info("Provision and start the %s", self.APP_NAME)
237         self._build_pipeline_kwargs()
238         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
239
240     def _build_pipeline_kwargs(self):
241         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
242         ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
243         self.pipeline_kwargs = {
244             'cfg_file': self.CFG_CONFIG,
245             'script': self.CFG_SCRIPT,
246             'ports_len_hex': ports_len_hex,
247             'tool_path': tool_path,
248         }
249
250     def _get_app_cpu(self):
251         if self.CORES:
252             return self.CORES
253
254         vnf_cfg = self.scenario_helper.vnf_cfg
255         sys_obj = CpuSysCores(self.ssh_helper)
256         self.sys_cpu = sys_obj.get_core_socket()
257         num_core = int(vnf_cfg["worker_threads"])
258         if vnf_cfg.get("lb_config", "SW") == 'HW':
259             num_core += self.HW_DEFAULT_CORE
260         else:
261             num_core += self.SW_DEFAULT_CORE
262         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
263         return app_cpu
264
265     def _get_cpu_sibling_list(self, cores=None):
266         if cores is None:
267             cores = self._get_app_cpu()
268         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
269         awk_template = "awk -F: '{ print $1 }' < %s"
270         sys_path = "/sys/devices/system/cpu/"
271         cpu_topology = []
272         try:
273             for core in cores:
274                 sys_cmd = sys_cmd_template % (sys_path, core)
275                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
276                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
277
278             return cpu_topology
279         except Exception:
280             return []
281
282     def _validate_cpu_cfg(self):
283         return self._get_cpu_sibling_list()
284
285     def _find_used_drivers(self):
286         cmd = "{0} -s".format(self.dpdk_nic_bind)
287         rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
288
289         self.used_drivers = {
290             vpci: (index, driver)
291             for index, (vpci, driver)
292             in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
293             if any(b.endswith(vpci) for b in self.bound_pci)
294         }
295
296     def setup_vnf_environment(self):
297         self._setup_dpdk()
298         resource = self._setup_resources()
299         self._kill_vnf()
300         self._detect_drivers()
301         return resource
302
303     def _kill_vnf(self):
304         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
305
306     def _setup_dpdk(self):
307         """ setup dpdk environment needed for vnf to run """
308
309         self._setup_hugepages()
310         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
311
312         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
313         if exit_status == 0:
314             return
315
316         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
317         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
318         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
319         if exit_status != 0:
320             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
321
322     def _setup_resources(self):
323         interfaces = self.vnfd_helper.interfaces
324         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
325
326         # what is this magic?  how do we know which socket is for which port?
327         # what about quad-socket?
328         if any(v[5] == "0" for v in self.bound_pci):
329             self.socket = 0
330         else:
331             self.socket = 1
332
333         cores = self._validate_cpu_cfg()
334         return ResourceProfile(self.vnfd_helper, cores)
335
336     def _detect_drivers(self):
337         interfaces = self.vnfd_helper.interfaces
338
339         self._find_used_drivers()
340         for vpci, (index, _) in self.used_drivers.items():
341             try:
342                 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
343             except StopIteration:
344                 pass
345             else:
346                 intf1['dpdk_port_num'] = index
347
348         for vpci in self.bound_pci:
349             self._bind_dpdk('igb_uio', vpci)
350             time.sleep(2)
351
352     def _bind_dpdk(self, driver, vpci, force=True):
353         if force:
354             force = '--force '
355         else:
356             force = ''
357         cmd = self.DPDK_BIND_CMD.format(force=force,
358                                         dpdk_nic_bind=self.dpdk_nic_bind,
359                                         driver=driver,
360                                         vpci=vpci)
361         self.ssh_helper.execute(cmd)
362
363     def _detect_and_bind_dpdk(self, vpci, driver):
364         find_net_cmd = self.FIND_NET_CMD.format(vpci)
365         exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
366         if exit_status == 0:
367             # already bound
368             return None
369         self._bind_dpdk(driver, vpci)
370         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
371         if exit_status != 0:
372             # failed to bind
373             return None
374         return stdout
375
376     def _bind_kernel_devices(self):
377         for intf in self.vnfd_helper.interfaces:
378             vi = intf["virtual-interface"]
379             stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
380             if stdout is not None:
381                 vi["local_iface_name"] = posixpath.basename(stdout)
382
383     def tear_down(self):
384         for vpci, (_, driver) in self.used_drivers.items():
385             self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
386                                                                 driver=driver,
387                                                                 vpci=vpci))
388
389
390 class ResourceHelper(object):
391
392     COLLECT_KPI = ''
393     MAKE_INSTALL = 'cd {0} && make && sudo make install'
394     RESOURCE_WORD = 'sample'
395
396     COLLECT_MAP = {}
397
398     def __init__(self, setup_helper):
399         super(ResourceHelper, self).__init__()
400         self.resource = None
401         self.setup_helper = setup_helper
402         self.ssh_helper = setup_helper.ssh_helper
403
404     def setup(self):
405         self.resource = self.setup_helper.setup_vnf_environment()
406
407     def generate_cfg(self):
408         pass
409
410     def _collect_resource_kpi(self):
411         result = {}
412         status = self.resource.check_if_sa_running("collectd")[0]
413         if status:
414             result = self.resource.amqp_collect_nfvi_kpi()
415
416         result = {"core": result}
417         return result
418
419     def start_collect(self):
420         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
421         self.resource.start()
422         self.resource.amqp_process_for_nfvi_kpi()
423
424     def stop_collect(self):
425         if self.resource:
426             self.resource.stop()
427
428     def collect_kpi(self):
429         return self._collect_resource_kpi()
430
431
432 class ClientResourceHelper(ResourceHelper):
433
434     RUN_DURATION = 60
435     QUEUE_WAIT_TIME = 5
436     SYNC_PORT = 1
437     ASYNC_PORT = 2
438
439     def __init__(self, setup_helper):
440         super(ClientResourceHelper, self).__init__(setup_helper)
441         self.vnfd_helper = setup_helper.vnfd_helper
442         self.scenario_helper = setup_helper.scenario_helper
443
444         self.client = None
445         self.client_started = Value('i', 0)
446         self.my_ports = None
447         self._queue = Queue()
448         self._result = {}
449         self._terminated = Value('i', 0)
450         self._vpci_ascending = None
451
452     def _build_ports(self):
453         self.my_ports = [0, 1]
454
455     def get_stats(self, *args, **kwargs):
456         try:
457             return self.client.get_stats(*args, **kwargs)
458         except STLStateError:
459             LOG.exception("TRex client not connected")
460             return {}
461
462     def generate_samples(self, key=None, default=None):
463         last_result = self.get_stats(self.my_ports)
464         key_value = last_result.get(key, default)
465
466         if not isinstance(last_result, Mapping):  # added for mock unit test
467             self._terminated.value = 1
468             return {}
469
470         samples = {}
471         for vpci_idx, vpci in enumerate(self._vpci_ascending):
472             name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
473             # fixme: VNFDs KPIs values needs to be mapped to TRex structure
474             xe_value = last_result.get(vpci_idx, {})
475             samples[name] = {
476                 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
477                 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
478                 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
479                 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
480                 "in_packets": int(xe_value.get("ipackets", 0)),
481                 "out_packets": int(xe_value.get("opackets", 0)),
482             }
483             if key:
484                 samples[name][key] = key_value
485         return samples
486
487     def _run_traffic_once(self, traffic_profile):
488         traffic_profile.execute(self)
489         self.client_started.value = 1
490         time.sleep(self.RUN_DURATION)
491         samples = self.generate_samples()
492         time.sleep(self.QUEUE_WAIT_TIME)
493         self._queue.put(samples)
494
495     def run_traffic(self, traffic_profile):
496         # fixme: fix passing correct trex config file,
497         # instead of searching the default path
498         self._build_ports()
499         self.client = self._connect()
500         self.client.reset(ports=self.my_ports)
501         self.client.remove_all_streams(self.my_ports)  # remove all streams
502         traffic_profile.register_generator(self)
503
504         while self._terminated.value == 0:
505             self._run_traffic_once(traffic_profile)
506
507         self.client.stop(self.my_ports)
508         self.client.disconnect()
509         self._terminated.value = 0
510
511     def terminate(self):
512         self._terminated.value = 1  # stop client
513
514     def clear_stats(self, ports=None):
515         if ports is None:
516             ports = self.my_ports
517         self.client.clear_stats(ports=ports)
518
519     def start(self, ports=None, *args, **kwargs):
520         if ports is None:
521             ports = self.my_ports
522         self.client.start(ports=ports, *args, **kwargs)
523
524     def collect_kpi(self):
525         if not self._queue.empty():
526             kpi = self._queue.get()
527             self._result.update(kpi)
528         LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
529         return self._result
530
531     def _connect(self, client=None):
532         if client is None:
533             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
534                                server=self.vnfd_helper.mgmt_interface["ip"],
535                                verbose_level=LoggerApi.VERBOSE_QUIET)
536
537         # try to connect with 5s intervals, 30s max
538         for idx in range(6):
539             try:
540                 client.connect()
541                 break
542             except STLError:
543                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
544                 time.sleep(5)
545         return client
546
547
548 class Rfc2544ResourceHelper(object):
549
550     DEFAULT_CORRELATED_TRAFFIC = False
551     DEFAULT_LATENCY = False
552     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
553
554     def __init__(self, scenario_helper):
555         super(Rfc2544ResourceHelper, self).__init__()
556         self.scenario_helper = scenario_helper
557         self._correlated_traffic = None
558         self.iteration = Value('i', 0)
559         self._latency = None
560         self._rfc2544 = None
561         self._tolerance_low = None
562         self._tolerance_high = None
563
564     @property
565     def rfc2544(self):
566         if self._rfc2544 is None:
567             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
568         return self._rfc2544
569
570     @property
571     def tolerance_low(self):
572         if self._tolerance_low is None:
573             self.get_rfc_tolerance()
574         return self._tolerance_low
575
576     @property
577     def tolerance_high(self):
578         if self._tolerance_high is None:
579             self.get_rfc_tolerance()
580         return self._tolerance_high
581
582     @property
583     def correlated_traffic(self):
584         if self._correlated_traffic is None:
585             self._correlated_traffic = \
586                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
587
588         return self._correlated_traffic
589
590     @property
591     def latency(self):
592         if self._latency is None:
593             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
594         return self._latency
595
596     def get_rfc2544(self, name, default=None):
597         return self.rfc2544.get(name, default)
598
599     def get_rfc_tolerance(self):
600         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
601         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
602         self._tolerance_low = next(tolerance_iter)
603         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
604
605
606 class SampleVNFDeployHelper(object):
607
608     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
609     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
610     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
611
612     def __init__(self, vnfd_helper, ssh_helper):
613         super(SampleVNFDeployHelper, self).__init__()
614         self.ssh_helper = ssh_helper
615         self.vnfd_helper = vnfd_helper
616
617     DISABLE_DEPLOY = True
618
619     def deploy_vnfs(self, app_name):
620         # temp disable for now
621         if self.DISABLE_DEPLOY:
622             return
623
624         vnf_bin = self.ssh_helper.join_bin_path(app_name)
625         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
626         if not exit_status:
627             return
628
629         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
630         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
631         time.sleep(2)
632         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
633         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
634
635         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
636         time.sleep(2)
637         http_proxy = os.environ.get('http_proxy', '')
638         https_proxy = os.environ.get('https_proxy', '')
639         cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
640         LOG.debug(cmd)
641         self.ssh_helper.execute(cmd)
642         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
643         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
644         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
645
646
647 class ScenarioHelper(object):
648
649     DEFAULT_VNF_CFG = {
650         'lb_config': 'SW',
651         'lb_count': 1,
652         'worker_config': '1C/1T',
653         'worker_threads': 1,
654     }
655
656     def __init__(self, name):
657         self.name = name
658         self.scenario_cfg = None
659
660     @property
661     def task_path(self):
662         return self.scenario_cfg["task_path"]
663
664     @property
665     def nodes(self):
666         return self.scenario_cfg['nodes']
667
668     @property
669     def all_options(self):
670         return self.scenario_cfg["options"]
671
672     @property
673     def options(self):
674         return self.all_options[self.name]
675
676     @property
677     def vnf_cfg(self):
678         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
679
680     @property
681     def topology(self):
682         return self.scenario_cfg['topology']
683
684
685 class SampleVNF(GenericVNF):
686     """ Class providing file-like API for generic VNF implementation """
687
688     VNF_PROMPT = "pipeline>"
689     WAIT_TIME = 1
690
691     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
692         super(SampleVNF, self).__init__(name, vnfd)
693         self.bin_path = get_nsb_option('bin_path', '')
694
695         self.scenario_helper = ScenarioHelper(self.name)
696         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
697
698         if setup_env_helper_type is None:
699             setup_env_helper_type = SetupEnvHelper
700
701         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
702                                                   self.ssh_helper,
703                                                   self.scenario_helper)
704
705         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
706
707         if resource_helper_type is None:
708             resource_helper_type = ResourceHelper
709
710         self.resource_helper = resource_helper_type(self.setup_helper)
711
712         self.all_ports = None
713         self.context_cfg = None
714         self.nfvi_context = None
715         self.pipeline_kwargs = {}
716         self.priv_ports = None
717         self.pub_ports = None
718         # TODO(esm): make QueueFileWrapper invert-able so that we
719         #            never have to manage the queues
720         self.q_in = Queue()
721         self.q_out = Queue()
722         self.queue_wrapper = None
723         self.run_kwargs = {}
724         self.scenario_cfg = None
725         self.tg_port_pairs = None
726         self.used_drivers = {}
727         self.vnf_port_pairs = None
728         self._vnf_process = None
729
730     def _get_route_data(self, route_index, route_type):
731         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
732         for _ in range(route_index):
733             next(route_iter, '')
734         return next(route_iter, {}).get(route_type, '')
735
736     def _get_port0localip6(self):
737         return_value = self._get_route_data(0, 'network')
738         LOG.info("_get_port0localip6 : %s", return_value)
739         return return_value
740
741     def _get_port1localip6(self):
742         return_value = self._get_route_data(1, 'network')
743         LOG.info("_get_port1localip6 : %s", return_value)
744         return return_value
745
746     def _get_port0prefixlen6(self):
747         return_value = self._get_route_data(0, 'netmask')
748         LOG.info("_get_port0prefixlen6 : %s", return_value)
749         return return_value
750
751     def _get_port1prefixlen6(self):
752         return_value = self._get_route_data(1, 'netmask')
753         LOG.info("_get_port1prefixlen6 : %s", return_value)
754         return return_value
755
756     def _get_port0gateway6(self):
757         return_value = self._get_route_data(0, 'network')
758         LOG.info("_get_port0gateway6 : %s", return_value)
759         return return_value
760
761     def _get_port1gateway6(self):
762         return_value = self._get_route_data(1, 'network')
763         LOG.info("_get_port1gateway6 : %s", return_value)
764         return return_value
765
766     def _start_vnf(self):
767         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
768         self._vnf_process = Process(target=self._run)
769         self._vnf_process.start()
770
771     def _vnf_up_post(self):
772         pass
773
774     def instantiate(self, scenario_cfg, context_cfg):
775         self.scenario_helper.scenario_cfg = scenario_cfg
776         self.context_cfg = context_cfg
777         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
778         # self.nfvi_context = None
779
780         self.deploy_helper.deploy_vnfs(self.APP_NAME)
781         self.resource_helper.setup()
782         self._start_vnf()
783
784     def wait_for_instantiate(self):
785         buf = []
786         time.sleep(self.WAIT_TIME)  # Give some time for config to load
787         while True:
788             if not self._vnf_process.is_alive():
789                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
790
791             # TODO(esm): move to QueueFileWrapper
792             while self.q_out.qsize() > 0:
793                 buf.append(self.q_out.get())
794                 message = ''.join(buf)
795                 if self.VNF_PROMPT in message:
796                     LOG.info("%s VNF is up and running.", self.APP_NAME)
797                     self._vnf_up_post()
798                     self.queue_wrapper.clear()
799                     self.resource_helper.start_collect()
800                     return self._vnf_process.exitcode
801
802                 if "PANIC" in message:
803                     raise RuntimeError("Error starting %s VNF." %
804                                        self.APP_NAME)
805
806             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
807             time.sleep(1)
808
809     def _build_run_kwargs(self):
810         self.run_kwargs = {
811             'stdin': self.queue_wrapper,
812             'stdout': self.queue_wrapper,
813             'keep_stdin_open': True,
814             'pty': True,
815         }
816
817     def _build_config(self):
818         return self.setup_helper.build_config()
819
820     def _run(self):
821         # we can't share ssh paramiko objects to force new connection
822         self.ssh_helper.drop_connection()
823         cmd = self._build_config()
824         # kill before starting
825         self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
826
827         LOG.debug(cmd)
828         self._build_run_kwargs()
829         self.ssh_helper.run(cmd, **self.run_kwargs)
830
831     def vnf_execute(self, cmd, wait_time=2):
832         """ send cmd to vnf process """
833
834         LOG.info("%s command: %s", self.APP_NAME, cmd)
835         self.q_in.put("{}\r\n".format(cmd))
836         time.sleep(wait_time)
837         output = []
838         while self.q_out.qsize() > 0:
839             output.append(self.q_out.get())
840         return "".join(output)
841
842     def _tear_down(self):
843         pass
844
845     def terminate(self):
846         self.vnf_execute("quit")
847         if self._vnf_process:
848             self._vnf_process.terminate()
849         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
850         self._tear_down()
851         self.resource_helper.stop_collect()
852
853     def get_stats(self, *args, **kwargs):
854         """
855         Method for checking the statistics
856
857         :return:
858            VNF statistics
859         """
860         cmd = 'p {0} stats'.format(self.APP_WORD)
861         out = self.vnf_execute(cmd)
862         return out
863
864     def collect_kpi(self):
865         stats = self.get_stats()
866         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
867         if m:
868             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
869             result["collect_stats"] = self.resource_helper.collect_kpi()
870         else:
871             result = {
872                 "packets_in": 0,
873                 "packets_fwd": 0,
874                 "packets_dropped": 0,
875             }
876         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
877         return result
878
879
880 class SampleVNFTrafficGen(GenericTrafficGen):
881     """ Class providing file-like API for generic traffic generator """
882
883     APP_NAME = 'Sample'
884     RUN_WAIT = 1
885
886     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
887         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
888         self.bin_path = get_nsb_option('bin_path', '')
889         self.name = "tgen__1"  # name in topology file
890
891         self.scenario_helper = ScenarioHelper(self.name)
892         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
893
894         if setup_env_helper_type is None:
895             setup_env_helper_type = SetupEnvHelper
896
897         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
898                                                   self.ssh_helper,
899                                                   self.scenario_helper)
900
901         if resource_helper_type is None:
902             resource_helper_type = ClientResourceHelper
903
904         self.resource_helper = resource_helper_type(self.setup_helper)
905
906         self.runs_traffic = True
907         self.traffic_finished = False
908         self.tg_port_pairs = None
909         self._tg_process = None
910         self._traffic_process = None
911
912     def _start_server(self):
913         # we can't share ssh paramiko objects to force new connection
914         self.ssh_helper.drop_connection()
915
916     def instantiate(self, scenario_cfg, context_cfg):
917         self.scenario_helper.scenario_cfg = scenario_cfg
918         self.resource_helper.generate_cfg()
919         self.setup_helper.setup_vnf_environment()
920         self.resource_helper.setup()
921
922         LOG.info("Starting %s server...", self.APP_NAME)
923         self._tg_process = Process(target=self._start_server)
924         self._tg_process.start()
925
926     def wait_for_instantiate(self):
927         return self._wait_for_process()
928
929     def _check_status(self):
930         raise NotImplementedError
931
932     def _wait_for_process(self):
933         while True:
934             if not self._tg_process.is_alive():
935                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
936             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
937             time.sleep(1)
938             status = self._check_status()
939             if status == 0:
940                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
941                 return self._tg_process.exitcode
942
943     def _traffic_runner(self, traffic_profile):
944         LOG.info("Starting %s client...", self.APP_NAME)
945         self.resource_helper.run_traffic(traffic_profile)
946
947     def run_traffic(self, traffic_profile):
948         """ Generate traffic on the wire according to the given params.
949         Method is non-blocking, returns immediately when traffic process
950         is running. Mandatory.
951
952         :param traffic_profile:
953         :return: True/False
954         """
955         self._traffic_process = Process(target=self._traffic_runner,
956                                         args=(traffic_profile,))
957         self._traffic_process.start()
958         # Wait for traffic process to start
959         while self.resource_helper.client_started.value == 0:
960             time.sleep(self.RUN_WAIT)
961
962         return self._traffic_process.is_alive()
963
964     def listen_traffic(self, traffic_profile):
965         """ Listen to traffic with the given parameters.
966         Method is non-blocking, returns immediately when traffic process
967         is running. Optional.
968
969         :param traffic_profile:
970         :return: True/False
971         """
972         pass
973
974     def verify_traffic(self, traffic_profile):
975         """ Verify captured traffic after it has ended. Optional.
976
977         :param traffic_profile:
978         :return: dict
979         """
980         pass
981
982     def collect_kpi(self):
983         result = self.resource_helper.collect_kpi()
984         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
985         return result
986
987     def terminate(self):
988         """ After this method finishes, all traffic processes should stop. Mandatory.
989
990         :return: True/False
991         """
992         self.traffic_finished = True
993         if self._traffic_process is not None:
994             self._traffic_process.terminate()