Merge "Add yardstick report for each task"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
39
40 from stl.trex_stl_lib.trex_stl_client import STLClient
41 from stl.trex_stl_lib.trex_stl_client import LoggerApi
42 from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
43
44 from yardstick.ssh import AutoConnectSSH
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     CORES = []
96     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
97     PIPELINE_COMMAND = ''
98     VNF_TYPE = "SAMPLE"
99
100     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101         super(SetupEnvHelper, self).__init__()
102         self.vnfd_helper = vnfd_helper
103         self.ssh_helper = ssh_helper
104         self.scenario_helper = scenario_helper
105
106     def _get_ports_gateway(self, name):
107         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108         for route in routing_table:
109             if name == route['if']:
110                 return route['gateway']
111         return None
112
113     def build_config(self):
114         raise NotImplementedError
115
116     def setup_vnf_environment(self):
117         pass
118         # raise NotImplementedError
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
128     DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
129     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
130
131     HW_DEFAULT_CORE = 3
132     SW_DEFAULT_CORE = 2
133
134     DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self._dpdk_nic_bind = None
167         self.socket = None
168         self.used_drivers = None
169
170     @property
171     def dpdk_nic_bind(self):
172         if self._dpdk_nic_bind is None:
173             self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
174         return self._dpdk_nic_bind
175
176     def _setup_hugepages(self):
177         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
178         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
179
180         memory_path = \
181             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
182         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
183
184         if hugepages == "2048kB":
185             pages = 16384
186         else:
187             pages = 16
188
189         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
190
191     def _get_dpdk_port_num(self, name):
192         interface = self.vnfd_helper.find_interface(name=name)
193         return interface['virtual-interface']['dpdk_port_num']
194
195     def build_config(self):
196         vnf_cfg = self.scenario_helper.vnf_cfg
197         task_path = self.scenario_helper.task_path
198
199         lb_count = vnf_cfg.get('lb_count', 3)
200         lb_config = vnf_cfg.get('lb_config', 'SW')
201         worker_config = vnf_cfg.get('worker_config', '1C/1T')
202         worker_threads = vnf_cfg.get('worker_threads', 3)
203
204         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
205         traffic_options = {
206             'traffic_type': traffic_type,
207             'pkt_type': 'ipv%s' % traffic_type,
208             'vnf_type': self.VNF_TYPE,
209         }
210
211         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
212         config_basename = posixpath.basename(self.CFG_CONFIG)
213         script_basename = posixpath.basename(self.CFG_SCRIPT)
214         multiport = MultiPortConfig(self.scenario_helper.topology,
215                                     config_tpl_cfg,
216                                     config_basename,
217                                     self.vnfd_helper.interfaces,
218                                     self.VNF_TYPE,
219                                     lb_count,
220                                     worker_threads,
221                                     worker_config,
222                                     lb_config,
223                                     self.socket)
224
225         multiport.generate_config()
226         with open(self.CFG_CONFIG) as handle:
227             new_config = handle.read()
228
229         new_config = self._update_traffic_type(new_config, traffic_options)
230         new_config = self._update_packet_type(new_config, traffic_options)
231
232         self.ssh_helper.upload_config_file(config_basename, new_config)
233         self.ssh_helper.upload_config_file(script_basename,
234                                            multiport.generate_script(self.vnfd_helper))
235         self.all_ports = multiport.port_pair_list
236
237         LOG.info("Provision and start the %s", self.APP_NAME)
238         self._build_pipeline_kwargs()
239         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
240
241     def _build_pipeline_kwargs(self):
242         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
243         ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
244         self.pipeline_kwargs = {
245             'cfg_file': self.CFG_CONFIG,
246             'script': self.CFG_SCRIPT,
247             'ports_len_hex': ports_len_hex,
248             'tool_path': tool_path,
249         }
250
251     def _get_app_cpu(self):
252         if self.CORES:
253             return self.CORES
254
255         vnf_cfg = self.scenario_helper.vnf_cfg
256         sys_obj = CpuSysCores(self.ssh_helper)
257         self.sys_cpu = sys_obj.get_core_socket()
258         num_core = int(vnf_cfg["worker_threads"])
259         if vnf_cfg.get("lb_config", "SW") == 'HW':
260             num_core += self.HW_DEFAULT_CORE
261         else:
262             num_core += self.SW_DEFAULT_CORE
263         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
264         return app_cpu
265
266     def _get_cpu_sibling_list(self, cores=None):
267         if cores is None:
268             cores = self._get_app_cpu()
269         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
270         awk_template = "awk -F: '{ print $1 }' < %s"
271         sys_path = "/sys/devices/system/cpu/"
272         cpu_topology = []
273         try:
274             for core in cores:
275                 sys_cmd = sys_cmd_template % (sys_path, core)
276                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
277                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
278
279             return cpu_topology
280         except Exception:
281             return []
282
283     def _validate_cpu_cfg(self):
284         return self._get_cpu_sibling_list()
285
286     def _find_used_drivers(self):
287         cmd = "{0} -s".format(self.dpdk_nic_bind)
288         rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
289
290         self.used_drivers = {
291             vpci: (index, driver)
292             for index, (vpci, driver)
293             in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
294             if any(b.endswith(vpci) for b in self.bound_pci)
295         }
296
297     def setup_vnf_environment(self):
298         self._setup_dpdk()
299         resource = self._setup_resources()
300         self._kill_vnf()
301         self._detect_drivers()
302         return resource
303
304     def _kill_vnf(self):
305         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
306
307     def _setup_dpdk(self):
308         """ setup dpdk environment needed for vnf to run """
309
310         self._setup_hugepages()
311         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
312
313         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
314         if exit_status == 0:
315             return
316
317         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
318         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
319         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
320         if exit_status != 0:
321             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
322
323     def _setup_resources(self):
324         interfaces = self.vnfd_helper.interfaces
325         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
326
327         # what is this magic?  how do we know which socket is for which port?
328         # what about quad-socket?
329         if any(v[5] == "0" for v in self.bound_pci):
330             self.socket = 0
331         else:
332             self.socket = 1
333
334         cores = self._validate_cpu_cfg()
335         return ResourceProfile(self.vnfd_helper.mgmt_interface,
336                                interfaces=self.vnfd_helper.interfaces, cores=cores)
337
338     def _detect_drivers(self):
339         interfaces = self.vnfd_helper.interfaces
340
341         self._find_used_drivers()
342         for vpci, (index, _) in self.used_drivers.items():
343             try:
344                 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
345             except StopIteration:
346                 pass
347             else:
348                 intf1['dpdk_port_num'] = index
349
350         for vpci in self.bound_pci:
351             self._bind_dpdk('igb_uio', vpci)
352             time.sleep(2)
353
354     def _bind_dpdk(self, driver, vpci, force=True):
355         if force:
356             force = '--force '
357         else:
358             force = ''
359         cmd = self.DPDK_BIND_CMD.format(force=force,
360                                         dpdk_nic_bind=self.dpdk_nic_bind,
361                                         driver=driver,
362                                         vpci=vpci)
363         self.ssh_helper.execute(cmd)
364
365     def _detect_and_bind_dpdk(self, vpci, driver):
366         find_net_cmd = self.FIND_NET_CMD.format(vpci)
367         exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
368         if exit_status == 0:
369             # already bound
370             return None
371         self._bind_dpdk(driver, vpci)
372         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
373         if exit_status != 0:
374             # failed to bind
375             return None
376         return stdout
377
378     def _bind_kernel_devices(self):
379         for intf in self.vnfd_helper.interfaces:
380             vi = intf["virtual-interface"]
381             stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
382             if stdout is not None:
383                 vi["local_iface_name"] = posixpath.basename(stdout)
384
385     def tear_down(self):
386         for vpci, (_, driver) in self.used_drivers.items():
387             self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
388                                                                 driver=driver,
389                                                                 vpci=vpci))
390
391
392 class ResourceHelper(object):
393
394     COLLECT_KPI = ''
395     MAKE_INSTALL = 'cd {0} && make && sudo make install'
396     RESOURCE_WORD = 'sample'
397
398     COLLECT_MAP = {}
399
400     def __init__(self, setup_helper):
401         super(ResourceHelper, self).__init__()
402         self.resource = None
403         self.setup_helper = setup_helper
404         self.ssh_helper = setup_helper.ssh_helper
405
406     def setup(self):
407         self.resource = self.setup_helper.setup_vnf_environment()
408
409     def generate_cfg(self):
410         pass
411
412     def _collect_resource_kpi(self):
413         result = {}
414         status = self.resource.check_if_sa_running("collectd")[0]
415         if status:
416             result = self.resource.amqp_collect_nfvi_kpi()
417
418         result = {"core": result}
419         return result
420
421     def start_collect(self):
422         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
423         self.resource.start()
424         self.resource.amqp_process_for_nfvi_kpi()
425
426     def stop_collect(self):
427         if self.resource:
428             self.resource.stop()
429
430     def collect_kpi(self):
431         return self._collect_resource_kpi()
432
433
434 class ClientResourceHelper(ResourceHelper):
435
436     RUN_DURATION = 60
437     QUEUE_WAIT_TIME = 5
438     SYNC_PORT = 1
439     ASYNC_PORT = 2
440
441     def __init__(self, setup_helper):
442         super(ClientResourceHelper, self).__init__(setup_helper)
443         self.vnfd_helper = setup_helper.vnfd_helper
444         self.scenario_helper = setup_helper.scenario_helper
445
446         self.client = None
447         self.client_started = Value('i', 0)
448         self.my_ports = None
449         self._queue = Queue()
450         self._result = {}
451         self._terminated = Value('i', 0)
452         self._vpci_ascending = None
453
454     def _build_ports(self):
455         self.my_ports = [0, 1]
456
457     def get_stats(self, *args, **kwargs):
458         try:
459             return self.client.get_stats(*args, **kwargs)
460         except STLStateError:
461             LOG.exception("TRex client not connected")
462             return {}
463
464     def generate_samples(self, key=None, default=None):
465         last_result = self.get_stats(self.my_ports)
466         key_value = last_result.get(key, default)
467
468         if not isinstance(last_result, Mapping):  # added for mock unit test
469             self._terminated.value = 1
470             return {}
471
472         samples = {}
473         for vpci_idx, vpci in enumerate(self._vpci_ascending):
474             name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
475             # fixme: VNFDs KPIs values needs to be mapped to TRex structure
476             xe_value = last_result.get(vpci_idx, {})
477             samples[name] = {
478                 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
479                 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
480                 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
481                 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
482                 "in_packets": int(xe_value.get("ipackets", 0)),
483                 "out_packets": int(xe_value.get("opackets", 0)),
484             }
485             if key:
486                 samples[name][key] = key_value
487         return samples
488
489     def _run_traffic_once(self, traffic_profile):
490         traffic_profile.execute(self)
491         self.client_started.value = 1
492         time.sleep(self.RUN_DURATION)
493         samples = self.generate_samples()
494         time.sleep(self.QUEUE_WAIT_TIME)
495         self._queue.put(samples)
496
497     def run_traffic(self, traffic_profile):
498         # fixme: fix passing correct trex config file,
499         # instead of searching the default path
500         self._build_ports()
501         self.client = self._connect()
502         self.client.reset(ports=self.my_ports)
503         self.client.remove_all_streams(self.my_ports)  # remove all streams
504         traffic_profile.register_generator(self)
505
506         while self._terminated.value == 0:
507             self._run_traffic_once(traffic_profile)
508
509         self.client.stop(self.my_ports)
510         self.client.disconnect()
511         self._terminated.value = 0
512
513     def terminate(self):
514         self._terminated.value = 1  # stop client
515
516     def clear_stats(self, ports=None):
517         if ports is None:
518             ports = self.my_ports
519         self.client.clear_stats(ports=ports)
520
521     def start(self, ports=None, *args, **kwargs):
522         if ports is None:
523             ports = self.my_ports
524         self.client.start(ports=ports, *args, **kwargs)
525
526     def collect_kpi(self):
527         if not self._queue.empty():
528             kpi = self._queue.get()
529             self._result.update(kpi)
530         LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
531         return self._result
532
533     def _connect(self, client=None):
534         if client is None:
535             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
536                                server=self.vnfd_helper.mgmt_interface["ip"],
537                                verbose_level=LoggerApi.VERBOSE_QUIET)
538
539         # try to connect with 5s intervals, 30s max
540         for idx in range(6):
541             try:
542                 client.connect()
543                 break
544             except STLError:
545                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
546                 time.sleep(5)
547         return client
548
549
550 class Rfc2544ResourceHelper(object):
551
552     DEFAULT_CORRELATED_TRAFFIC = False
553     DEFAULT_LATENCY = False
554     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
555
556     def __init__(self, scenario_helper):
557         super(Rfc2544ResourceHelper, self).__init__()
558         self.scenario_helper = scenario_helper
559         self._correlated_traffic = None
560         self.iteration = Value('i', 0)
561         self._latency = None
562         self._rfc2544 = None
563         self._tolerance_low = None
564         self._tolerance_high = None
565
566     @property
567     def rfc2544(self):
568         if self._rfc2544 is None:
569             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
570         return self._rfc2544
571
572     @property
573     def tolerance_low(self):
574         if self._tolerance_low is None:
575             self.get_rfc_tolerance()
576         return self._tolerance_low
577
578     @property
579     def tolerance_high(self):
580         if self._tolerance_high is None:
581             self.get_rfc_tolerance()
582         return self._tolerance_high
583
584     @property
585     def correlated_traffic(self):
586         if self._correlated_traffic is None:
587             self._correlated_traffic = \
588                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
589
590         return self._correlated_traffic
591
592     @property
593     def latency(self):
594         if self._latency is None:
595             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
596         return self._latency
597
598     def get_rfc2544(self, name, default=None):
599         return self.rfc2544.get(name, default)
600
601     def get_rfc_tolerance(self):
602         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
603         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
604         self._tolerance_low = next(tolerance_iter)
605         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
606
607
608 class SampleVNFDeployHelper(object):
609
610     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
611     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
612     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
613
614     def __init__(self, vnfd_helper, ssh_helper):
615         super(SampleVNFDeployHelper, self).__init__()
616         self.ssh_helper = ssh_helper
617         self.vnfd_helper = vnfd_helper
618
619     DISABLE_DEPLOY = True
620
621     def deploy_vnfs(self, app_name):
622         # temp disable for now
623         if self.DISABLE_DEPLOY:
624             return
625
626         vnf_bin = self.ssh_helper.join_bin_path(app_name)
627         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
628         if not exit_status:
629             return
630
631         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
632         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
633         time.sleep(2)
634         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
635         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
636
637         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
638         time.sleep(2)
639         http_proxy = os.environ.get('http_proxy', '')
640         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
641         LOG.debug(cmd)
642         self.ssh_helper.execute(cmd)
643         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
644         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
645         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
646
647
648 class ScenarioHelper(object):
649
650     DEFAULT_VNF_CFG = {
651         'lb_config': 'SW',
652         'lb_count': 1,
653         'worker_config': '1C/1T',
654         'worker_threads': 1,
655     }
656
657     def __init__(self, name):
658         self.name = name
659         self.scenario_cfg = None
660
661     @property
662     def task_path(self):
663         return self.scenario_cfg["task_path"]
664
665     @property
666     def nodes(self):
667         return self.scenario_cfg['nodes']
668
669     @property
670     def all_options(self):
671         return self.scenario_cfg["options"]
672
673     @property
674     def options(self):
675         return self.all_options[self.name]
676
677     @property
678     def vnf_cfg(self):
679         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
680
681     @property
682     def topology(self):
683         return self.scenario_cfg['topology']
684
685
686 class SampleVNF(GenericVNF):
687     """ Class providing file-like API for generic VNF implementation """
688
689     VNF_PROMPT = "pipeline>"
690     WAIT_TIME = 1
691
692     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
693         super(SampleVNF, self).__init__(name, vnfd)
694         self.bin_path = get_nsb_option('bin_path', '')
695
696         self.scenario_helper = ScenarioHelper(self.name)
697         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
698
699         if setup_env_helper_type is None:
700             setup_env_helper_type = SetupEnvHelper
701
702         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
703                                                   self.ssh_helper,
704                                                   self.scenario_helper)
705
706         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
707
708         if resource_helper_type is None:
709             resource_helper_type = ResourceHelper
710
711         self.resource_helper = resource_helper_type(self.setup_helper)
712
713         self.all_ports = None
714         self.context_cfg = None
715         self.nfvi_context = None
716         self.pipeline_kwargs = {}
717         self.priv_ports = None
718         self.pub_ports = None
719         # TODO(esm): make QueueFileWrapper invert-able so that we
720         #            never have to manage the queues
721         self.q_in = Queue()
722         self.q_out = Queue()
723         self.queue_wrapper = None
724         self.run_kwargs = {}
725         self.scenario_cfg = None
726         self.tg_port_pairs = None
727         self.used_drivers = {}
728         self.vnf_port_pairs = None
729         self._vnf_process = None
730
731     def _get_route_data(self, route_index, route_type):
732         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
733         for _ in range(route_index):
734             next(route_iter, '')
735         return next(route_iter, {}).get(route_type, '')
736
737     def _get_port0localip6(self):
738         return_value = self._get_route_data(0, 'network')
739         LOG.info("_get_port0localip6 : %s", return_value)
740         return return_value
741
742     def _get_port1localip6(self):
743         return_value = self._get_route_data(1, 'network')
744         LOG.info("_get_port1localip6 : %s", return_value)
745         return return_value
746
747     def _get_port0prefixlen6(self):
748         return_value = self._get_route_data(0, 'netmask')
749         LOG.info("_get_port0prefixlen6 : %s", return_value)
750         return return_value
751
752     def _get_port1prefixlen6(self):
753         return_value = self._get_route_data(1, 'netmask')
754         LOG.info("_get_port1prefixlen6 : %s", return_value)
755         return return_value
756
757     def _get_port0gateway6(self):
758         return_value = self._get_route_data(0, 'network')
759         LOG.info("_get_port0gateway6 : %s", return_value)
760         return return_value
761
762     def _get_port1gateway6(self):
763         return_value = self._get_route_data(1, 'network')
764         LOG.info("_get_port1gateway6 : %s", return_value)
765         return return_value
766
767     def _start_vnf(self):
768         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
769         self._vnf_process = Process(target=self._run)
770         self._vnf_process.start()
771
772     def _vnf_up_post(self):
773         pass
774
775     def instantiate(self, scenario_cfg, context_cfg):
776         self.scenario_helper.scenario_cfg = scenario_cfg
777         self.context_cfg = context_cfg
778         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
779         # self.nfvi_context = None
780
781         self.deploy_helper.deploy_vnfs(self.APP_NAME)
782         self.resource_helper.setup()
783         self._start_vnf()
784
785     def wait_for_instantiate(self):
786         buf = []
787         time.sleep(self.WAIT_TIME)  # Give some time for config to load
788         while True:
789             if not self._vnf_process.is_alive():
790                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
791
792             # TODO(esm): move to QueueFileWrapper
793             while self.q_out.qsize() > 0:
794                 buf.append(self.q_out.get())
795                 message = ''.join(buf)
796                 if self.VNF_PROMPT in message:
797                     LOG.info("%s VNF is up and running.", self.APP_NAME)
798                     self._vnf_up_post()
799                     self.queue_wrapper.clear()
800                     self.resource_helper.start_collect()
801                     return self._vnf_process.exitcode
802
803                 if "PANIC" in message:
804                     raise RuntimeError("Error starting %s VNF." %
805                                        self.APP_NAME)
806
807             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
808             time.sleep(1)
809
810     def _build_run_kwargs(self):
811         self.run_kwargs = {
812             'stdin': self.queue_wrapper,
813             'stdout': self.queue_wrapper,
814             'keep_stdin_open': True,
815             'pty': True,
816         }
817
818     def _build_config(self):
819         return self.setup_helper.build_config()
820
821     def _run(self):
822         # we can't share ssh paramiko objects to force new connection
823         self.ssh_helper.drop_connection()
824         cmd = self._build_config()
825         # kill before starting
826         self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
827
828         LOG.debug(cmd)
829         self._build_run_kwargs()
830         self.ssh_helper.run(cmd, **self.run_kwargs)
831
832     def vnf_execute(self, cmd, wait_time=2):
833         """ send cmd to vnf process """
834
835         LOG.info("%s command: %s", self.APP_NAME, cmd)
836         self.q_in.put("{}\r\n".format(cmd))
837         time.sleep(wait_time)
838         output = []
839         while self.q_out.qsize() > 0:
840             output.append(self.q_out.get())
841         return "".join(output)
842
843     def _tear_down(self):
844         pass
845
846     def terminate(self):
847         self.vnf_execute("quit")
848         if self._vnf_process:
849             self._vnf_process.terminate()
850         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
851         self._tear_down()
852         self.resource_helper.stop_collect()
853
854     def get_stats(self, *args, **kwargs):
855         """
856         Method for checking the statistics
857
858         :return:
859            VNF statistics
860         """
861         cmd = 'p {0} stats'.format(self.APP_WORD)
862         out = self.vnf_execute(cmd)
863         return out
864
865     def collect_kpi(self):
866         stats = self.get_stats()
867         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
868         if m:
869             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
870             result["collect_stats"] = self.resource_helper.collect_kpi()
871         else:
872             result = {
873                 "packets_in": 0,
874                 "packets_fwd": 0,
875                 "packets_dropped": 0,
876             }
877         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
878         return result
879
880
881 class SampleVNFTrafficGen(GenericTrafficGen):
882     """ Class providing file-like API for generic traffic generator """
883
884     APP_NAME = 'Sample'
885     RUN_WAIT = 1
886
887     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
888         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
889         self.bin_path = get_nsb_option('bin_path', '')
890         self.name = "tgen__1"  # name in topology file
891
892         self.scenario_helper = ScenarioHelper(self.name)
893         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
894
895         if setup_env_helper_type is None:
896             setup_env_helper_type = SetupEnvHelper
897
898         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
899                                                   self.ssh_helper,
900                                                   self.scenario_helper)
901
902         if resource_helper_type is None:
903             resource_helper_type = ClientResourceHelper
904
905         self.resource_helper = resource_helper_type(self.setup_helper)
906
907         self.runs_traffic = True
908         self.traffic_finished = False
909         self.tg_port_pairs = None
910         self._tg_process = None
911         self._traffic_process = None
912
913     def _start_server(self):
914         # we can't share ssh paramiko objects to force new connection
915         self.ssh_helper.drop_connection()
916
917     def instantiate(self, scenario_cfg, context_cfg):
918         self.scenario_helper.scenario_cfg = scenario_cfg
919         self.resource_helper.generate_cfg()
920         self.setup_helper.setup_vnf_environment()
921         self.resource_helper.setup()
922
923         LOG.info("Starting %s server...", self.APP_NAME)
924         self._tg_process = Process(target=self._start_server)
925         self._tg_process.start()
926
927     def wait_for_instantiate(self):
928         return self._wait_for_process()
929
930     def _check_status(self):
931         raise NotImplementedError
932
933     def _wait_for_process(self):
934         while True:
935             if not self._tg_process.is_alive():
936                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
937             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
938             time.sleep(1)
939             status = self._check_status()
940             if status == 0:
941                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
942                 return self._tg_process.exitcode
943
944     def _traffic_runner(self, traffic_profile):
945         LOG.info("Starting %s client...", self.APP_NAME)
946         self.resource_helper.run_traffic(traffic_profile)
947
948     def run_traffic(self, traffic_profile):
949         """ Generate traffic on the wire according to the given params.
950         Method is non-blocking, returns immediately when traffic process
951         is running. Mandatory.
952
953         :param traffic_profile:
954         :return: True/False
955         """
956         self._traffic_process = Process(target=self._traffic_runner,
957                                         args=(traffic_profile,))
958         self._traffic_process.start()
959         # Wait for traffic process to start
960         while self.resource_helper.client_started.value == 0:
961             time.sleep(self.RUN_WAIT)
962
963         return self._traffic_process.is_alive()
964
965     def listen_traffic(self, traffic_profile):
966         """ Listen to traffic with the given parameters.
967         Method is non-blocking, returns immediately when traffic process
968         is running. Optional.
969
970         :param traffic_profile:
971         :return: True/False
972         """
973         pass
974
975     def verify_traffic(self, traffic_profile):
976         """ Verify captured traffic after it has ended. Optional.
977
978         :param traffic_profile:
979         :return: dict
980         """
981         pass
982
983     def collect_kpi(self):
984         result = self.resource_helper.collect_kpi()
985         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
986         return result
987
988     def terminate(self):
989         """ After this method finishes, all traffic processes should stop. Mandatory.
990
991         :return: True/False
992         """
993         self.traffic_finished = True
994         if self._traffic_process is not None:
995             self._traffic_process.terminate()