Yardstick TC082: move sample test case perf.yaml
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
39
40 from trex_stl_lib.trex_stl_client import STLClient
41 from trex_stl_lib.trex_stl_client import LoggerApi
42 from trex_stl_lib.trex_stl_exceptions import STLError
43
44 from yardstick.ssh import AutoConnectSSH
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     CORES = []
96     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
97     PIPELINE_COMMAND = ''
98     VNF_TYPE = "SAMPLE"
99
100     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101         super(SetupEnvHelper, self).__init__()
102         self.vnfd_helper = vnfd_helper
103         self.ssh_helper = ssh_helper
104         self.scenario_helper = scenario_helper
105
106     def _get_ports_gateway(self, name):
107         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108         for route in routing_table:
109             if name == route['if']:
110                 return route['gateway']
111         return None
112
113     def build_config(self):
114         raise NotImplementedError
115
116     def setup_vnf_environment(self):
117         pass
118         # raise NotImplementedError
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
128     DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
129     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
130
131     HW_DEFAULT_CORE = 3
132     SW_DEFAULT_CORE = 2
133
134     DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self._dpdk_nic_bind = None
167         self.socket = None
168         self.used_drivers = None
169
170     @property
171     def dpdk_nic_bind(self):
172         if self._dpdk_nic_bind is None:
173             self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
174         return self._dpdk_nic_bind
175
176     def _setup_hugepages(self):
177         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
178         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
179
180         memory_path = \
181             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
182         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
183
184         if hugepages == "2048kB":
185             pages = 16384
186         else:
187             pages = 16
188
189         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
190
191     def _get_dpdk_port_num(self, name):
192         interface = self.vnfd_helper.find_interface(name=name)
193         return interface['virtual-interface']['dpdk_port_num']
194
195     def build_config(self):
196         vnf_cfg = self.scenario_helper.vnf_cfg
197         task_path = self.scenario_helper.task_path
198
199         lb_count = vnf_cfg.get('lb_count', 3)
200         lb_config = vnf_cfg.get('lb_config', 'SW')
201         worker_config = vnf_cfg.get('worker_config', '1C/1T')
202         worker_threads = vnf_cfg.get('worker_threads', 3)
203
204         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
205         traffic_options = {
206             'traffic_type': traffic_type,
207             'pkt_type': 'ipv%s' % traffic_type,
208             'vnf_type': self.VNF_TYPE,
209         }
210
211         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
212         config_basename = posixpath.basename(self.CFG_CONFIG)
213         script_basename = posixpath.basename(self.CFG_SCRIPT)
214         multiport = MultiPortConfig(self.scenario_helper.topology,
215                                     config_tpl_cfg,
216                                     config_basename,
217                                     self.vnfd_helper.interfaces,
218                                     self.VNF_TYPE,
219                                     lb_count,
220                                     worker_threads,
221                                     worker_config,
222                                     lb_config,
223                                     self.socket)
224
225         multiport.generate_config()
226         with open(self.CFG_CONFIG) as handle:
227             new_config = handle.read()
228
229         new_config = self._update_traffic_type(new_config, traffic_options)
230         new_config = self._update_packet_type(new_config, traffic_options)
231
232         self.ssh_helper.upload_config_file(config_basename, new_config)
233         self.ssh_helper.upload_config_file(script_basename,
234                                            multiport.generate_script(self.vnfd_helper))
235         self.all_ports = multiport.port_pair_list
236
237         LOG.info("Provision and start the %s", self.APP_NAME)
238         self._build_pipeline_kwargs()
239         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
240
241     def _build_pipeline_kwargs(self):
242         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
243         ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
244         self.pipeline_kwargs = {
245             'cfg_file': self.CFG_CONFIG,
246             'script': self.CFG_SCRIPT,
247             'ports_len_hex': ports_len_hex,
248             'tool_path': tool_path,
249         }
250
251     def _get_app_cpu(self):
252         if self.CORES:
253             return self.CORES
254
255         vnf_cfg = self.scenario_helper.vnf_cfg
256         sys_obj = CpuSysCores(self.ssh_helper)
257         self.sys_cpu = sys_obj.get_core_socket()
258         num_core = int(vnf_cfg["worker_threads"])
259         if vnf_cfg.get("lb_config", "SW") == 'HW':
260             num_core += self.HW_DEFAULT_CORE
261         else:
262             num_core += self.SW_DEFAULT_CORE
263         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
264         return app_cpu
265
266     def _get_cpu_sibling_list(self, cores=None):
267         if cores is None:
268             cores = self._get_app_cpu()
269         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
270         awk_template = "awk -F: '{ print $1 }' < %s"
271         sys_path = "/sys/devices/system/cpu/"
272         cpu_topology = []
273         try:
274             for core in cores:
275                 sys_cmd = sys_cmd_template % (sys_path, core)
276                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
277                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
278
279             return cpu_topology
280         except Exception:
281             return []
282
283     def _validate_cpu_cfg(self):
284         return self._get_cpu_sibling_list()
285
286     def _find_used_drivers(self):
287         cmd = "{0} -s".format(self.dpdk_nic_bind)
288         rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
289
290         self.used_drivers = {
291             vpci: (index, driver)
292             for index, (vpci, driver)
293             in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
294             if any(b.endswith(vpci) for b in self.bound_pci)
295         }
296
297     def setup_vnf_environment(self):
298         self._setup_dpdk()
299         resource = self._setup_resources()
300         self._kill_vnf()
301         self._detect_drivers()
302         return resource
303
304     def _kill_vnf(self):
305         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
306
307     def _setup_dpdk(self):
308         """ setup dpdk environment needed for vnf to run """
309
310         self._setup_hugepages()
311         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
312
313         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
314         if exit_status == 0:
315             return
316
317         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
318         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
319         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
320         if exit_status != 0:
321             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
322
323     def _setup_resources(self):
324         interfaces = self.vnfd_helper.interfaces
325         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
326
327         # what is this magic?  how do we know which socket is for which port?
328         # what about quad-socket?
329         if any(v[5] == "0" for v in self.bound_pci):
330             self.socket = 0
331         else:
332             self.socket = 1
333
334         cores = self._validate_cpu_cfg()
335         return ResourceProfile(self.vnfd_helper.mgmt_interface,
336                                interfaces=self.vnfd_helper.interfaces, cores=cores)
337
338     def _detect_drivers(self):
339         interfaces = self.vnfd_helper.interfaces
340
341         self._find_used_drivers()
342         for vpci, (index, _) in self.used_drivers.items():
343             try:
344                 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
345             except StopIteration:
346                 pass
347             else:
348                 intf1['dpdk_port_num'] = index
349
350         for vpci in self.bound_pci:
351             self._bind_dpdk('igb_uio', vpci)
352             time.sleep(2)
353
354     def _bind_dpdk(self, driver, vpci, force=True):
355         if force:
356             force = '--force '
357         else:
358             force = ''
359         cmd = self.DPDK_BIND_CMD.format(force=force,
360                                         dpdk_nic_bind=self.dpdk_nic_bind,
361                                         driver=driver,
362                                         vpci=vpci)
363         self.ssh_helper.execute(cmd)
364
365     def _detect_and_bind_dpdk(self, vpci, driver):
366         find_net_cmd = self.FIND_NET_CMD.format(vpci)
367         exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
368         if exit_status == 0:
369             # already bound
370             return None
371         self._bind_dpdk(driver, vpci)
372         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
373         if exit_status != 0:
374             # failed to bind
375             return None
376         return stdout
377
378     def _bind_kernel_devices(self):
379         for intf in self.vnfd_helper.interfaces:
380             vi = intf["virtual-interface"]
381             stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
382             if stdout is not None:
383                 vi["local_iface_name"] = posixpath.basename(stdout)
384
385     def tear_down(self):
386         for vpci, (_, driver) in self.used_drivers.items():
387             self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
388                                                                 driver=driver,
389                                                                 vpci=vpci))
390
391
392 class ResourceHelper(object):
393
394     COLLECT_KPI = ''
395     MAKE_INSTALL = 'cd {0} && make && sudo make install'
396     RESOURCE_WORD = 'sample'
397
398     COLLECT_MAP = {}
399
400     def __init__(self, setup_helper):
401         super(ResourceHelper, self).__init__()
402         self.resource = None
403         self.setup_helper = setup_helper
404         self.ssh_helper = setup_helper.ssh_helper
405
406     def setup(self):
407         self.resource = self.setup_helper.setup_vnf_environment()
408
409     def generate_cfg(self):
410         pass
411
412     def _collect_resource_kpi(self):
413         result = {}
414         status = self.resource.check_if_sa_running("collectd")[0]
415         if status:
416             result = self.resource.amqp_collect_nfvi_kpi()
417
418         result = {"core": result}
419         return result
420
421     def start_collect(self):
422         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
423         self.resource.start()
424         self.resource.amqp_process_for_nfvi_kpi()
425
426     def stop_collect(self):
427         if self.resource:
428             self.resource.stop()
429
430     def collect_kpi(self):
431         return self._collect_resource_kpi()
432
433
434 class ClientResourceHelper(ResourceHelper):
435
436     RUN_DURATION = 60
437     QUEUE_WAIT_TIME = 5
438     SYNC_PORT = 1
439     ASYNC_PORT = 2
440
441     def __init__(self, setup_helper):
442         super(ClientResourceHelper, self).__init__(setup_helper)
443         self.vnfd_helper = setup_helper.vnfd_helper
444         self.scenario_helper = setup_helper.scenario_helper
445
446         self.client = None
447         self.client_started = Value('i', 0)
448         self.my_ports = None
449         self._queue = Queue()
450         self._result = {}
451         self._terminated = Value('i', 0)
452         self._vpci_ascending = None
453
454     def _build_ports(self):
455         self.my_ports = [0, 1]
456
457     def get_stats(self, *args, **kwargs):
458         try:
459             return self.client.get_stats(*args, **kwargs)
460         except STLError:
461             LOG.exception("TRex client not connected")
462             return {}
463
464     def generate_samples(self, key=None, default=None):
465         last_result = self.get_stats(self.my_ports)
466         key_value = last_result.get(key, default)
467
468         if not isinstance(last_result, Mapping):  # added for mock unit test
469             self._terminated.value = 1
470             return {}
471
472         samples = {}
473         for vpci_idx, vpci in enumerate(self._vpci_ascending):
474             name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
475             # fixme: VNFDs KPIs values needs to be mapped to TRex structure
476             xe_value = last_result.get(vpci_idx, {})
477             samples[name] = {
478                 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
479                 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
480                 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
481                 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
482                 "in_packets": int(xe_value.get("ipackets", 0)),
483                 "out_packets": int(xe_value.get("opackets", 0)),
484             }
485             if key:
486                 samples[name][key] = key_value
487         return samples
488
489     def _run_traffic_once(self, traffic_profile):
490         traffic_profile.execute(self)
491         self.client_started.value = 1
492         time.sleep(self.RUN_DURATION)
493         samples = self.generate_samples()
494         time.sleep(self.QUEUE_WAIT_TIME)
495         self._queue.put(samples)
496
497     def run_traffic(self, traffic_profile):
498         # fixme: fix passing correct trex config file,
499         # instead of searching the default path
500         try:
501             self._build_ports()
502             self.client = self._connect()
503             self.client.reset(ports=self.my_ports)
504             self.client.remove_all_streams(self.my_ports)  # remove all streams
505             traffic_profile.register_generator(self)
506
507             while self._terminated.value == 0:
508                 self._run_traffic_once(traffic_profile)
509
510             self.client.stop(self.my_ports)
511             self.client.disconnect()
512             self._terminated.value = 0
513         except STLError:
514             if self._terminated.value:
515                 LOG.debug("traffic generator is stopped")
516                 return  # return if trex/tg server is stopped.
517             raise
518
519     def terminate(self):
520         self._terminated.value = 1  # stop client
521
522     def clear_stats(self, ports=None):
523         if ports is None:
524             ports = self.my_ports
525         self.client.clear_stats(ports=ports)
526
527     def start(self, ports=None, *args, **kwargs):
528         if ports is None:
529             ports = self.my_ports
530         self.client.start(ports=ports, *args, **kwargs)
531
532     def collect_kpi(self):
533         if not self._queue.empty():
534             kpi = self._queue.get()
535             self._result.update(kpi)
536         LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
537         return self._result
538
539     def _connect(self, client=None):
540         if client is None:
541             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
542                                server=self.vnfd_helper.mgmt_interface["ip"],
543                                verbose_level=LoggerApi.VERBOSE_QUIET)
544
545         # try to connect with 5s intervals, 30s max
546         for idx in range(6):
547             try:
548                 client.connect()
549                 break
550             except STLError:
551                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
552                 time.sleep(5)
553         return client
554
555
556 class Rfc2544ResourceHelper(object):
557
558     DEFAULT_CORRELATED_TRAFFIC = False
559     DEFAULT_LATENCY = False
560     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
561
562     def __init__(self, scenario_helper):
563         super(Rfc2544ResourceHelper, self).__init__()
564         self.scenario_helper = scenario_helper
565         self._correlated_traffic = None
566         self.iteration = Value('i', 0)
567         self._latency = None
568         self._rfc2544 = None
569         self._tolerance_low = None
570         self._tolerance_high = None
571
572     @property
573     def rfc2544(self):
574         if self._rfc2544 is None:
575             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
576         return self._rfc2544
577
578     @property
579     def tolerance_low(self):
580         if self._tolerance_low is None:
581             self.get_rfc_tolerance()
582         return self._tolerance_low
583
584     @property
585     def tolerance_high(self):
586         if self._tolerance_high is None:
587             self.get_rfc_tolerance()
588         return self._tolerance_high
589
590     @property
591     def correlated_traffic(self):
592         if self._correlated_traffic is None:
593             self._correlated_traffic = \
594                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
595
596         return self._correlated_traffic
597
598     @property
599     def latency(self):
600         if self._latency is None:
601             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
602         return self._latency
603
604     def get_rfc2544(self, name, default=None):
605         return self.rfc2544.get(name, default)
606
607     def get_rfc_tolerance(self):
608         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
609         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
610         self._tolerance_low = next(tolerance_iter)
611         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
612
613
614 class SampleVNFDeployHelper(object):
615
616     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
617     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
618     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
619
620     def __init__(self, vnfd_helper, ssh_helper):
621         super(SampleVNFDeployHelper, self).__init__()
622         self.ssh_helper = ssh_helper
623         self.vnfd_helper = vnfd_helper
624
625     DISABLE_DEPLOY = True
626
627     def deploy_vnfs(self, app_name):
628         # temp disable for now
629         if self.DISABLE_DEPLOY:
630             return
631
632         vnf_bin = self.ssh_helper.join_bin_path(app_name)
633         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
634         if not exit_status:
635             return
636
637         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
638         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
639         time.sleep(2)
640         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
641         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
642
643         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
644         time.sleep(2)
645         http_proxy = os.environ.get('http_proxy', '')
646         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
647         LOG.debug(cmd)
648         self.ssh_helper.execute(cmd)
649         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
650         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
651         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
652
653
654 class ScenarioHelper(object):
655
656     DEFAULT_VNF_CFG = {
657         'lb_config': 'SW',
658         'lb_count': 1,
659         'worker_config': '1C/1T',
660         'worker_threads': 1,
661     }
662
663     def __init__(self, name):
664         self.name = name
665         self.scenario_cfg = None
666
667     @property
668     def task_path(self):
669         return self.scenario_cfg["task_path"]
670
671     @property
672     def nodes(self):
673         return self.scenario_cfg['nodes']
674
675     @property
676     def all_options(self):
677         return self.scenario_cfg["options"]
678
679     @property
680     def options(self):
681         return self.all_options[self.name]
682
683     @property
684     def vnf_cfg(self):
685         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
686
687     @property
688     def topology(self):
689         return self.scenario_cfg['topology']
690
691
692 class SampleVNF(GenericVNF):
693     """ Class providing file-like API for generic VNF implementation """
694
695     VNF_PROMPT = "pipeline>"
696     WAIT_TIME = 1
697
698     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
699         super(SampleVNF, self).__init__(name, vnfd)
700         self.bin_path = get_nsb_option('bin_path', '')
701
702         self.scenario_helper = ScenarioHelper(self.name)
703         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
704
705         if setup_env_helper_type is None:
706             setup_env_helper_type = SetupEnvHelper
707
708         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
709                                                   self.ssh_helper,
710                                                   self.scenario_helper)
711
712         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
713
714         if resource_helper_type is None:
715             resource_helper_type = ResourceHelper
716
717         self.resource_helper = resource_helper_type(self.setup_helper)
718
719         self.all_ports = None
720         self.context_cfg = None
721         self.nfvi_context = None
722         self.pipeline_kwargs = {}
723         self.priv_ports = None
724         self.pub_ports = None
725         # TODO(esm): make QueueFileWrapper invert-able so that we
726         #            never have to manage the queues
727         self.q_in = Queue()
728         self.q_out = Queue()
729         self.queue_wrapper = None
730         self.run_kwargs = {}
731         self.scenario_cfg = None
732         self.tg_port_pairs = None
733         self.used_drivers = {}
734         self.vnf_port_pairs = None
735         self._vnf_process = None
736
737     def _get_route_data(self, route_index, route_type):
738         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
739         for _ in range(route_index):
740             next(route_iter, '')
741         return next(route_iter, {}).get(route_type, '')
742
743     def _get_port0localip6(self):
744         return_value = self._get_route_data(0, 'network')
745         LOG.info("_get_port0localip6 : %s", return_value)
746         return return_value
747
748     def _get_port1localip6(self):
749         return_value = self._get_route_data(1, 'network')
750         LOG.info("_get_port1localip6 : %s", return_value)
751         return return_value
752
753     def _get_port0prefixlen6(self):
754         return_value = self._get_route_data(0, 'netmask')
755         LOG.info("_get_port0prefixlen6 : %s", return_value)
756         return return_value
757
758     def _get_port1prefixlen6(self):
759         return_value = self._get_route_data(1, 'netmask')
760         LOG.info("_get_port1prefixlen6 : %s", return_value)
761         return return_value
762
763     def _get_port0gateway6(self):
764         return_value = self._get_route_data(0, 'network')
765         LOG.info("_get_port0gateway6 : %s", return_value)
766         return return_value
767
768     def _get_port1gateway6(self):
769         return_value = self._get_route_data(1, 'network')
770         LOG.info("_get_port1gateway6 : %s", return_value)
771         return return_value
772
773     def _start_vnf(self):
774         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
775         self._vnf_process = Process(target=self._run)
776         self._vnf_process.start()
777
778     def _vnf_up_post(self):
779         pass
780
781     def instantiate(self, scenario_cfg, context_cfg):
782         self.scenario_helper.scenario_cfg = scenario_cfg
783         self.context_cfg = context_cfg
784         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
785         # self.nfvi_context = None
786
787         self.deploy_helper.deploy_vnfs(self.APP_NAME)
788         self.resource_helper.setup()
789         self._start_vnf()
790
791     def wait_for_instantiate(self):
792         buf = []
793         time.sleep(self.WAIT_TIME)  # Give some time for config to load
794         while True:
795             if not self._vnf_process.is_alive():
796                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
797
798             # TODO(esm): move to QueueFileWrapper
799             while self.q_out.qsize() > 0:
800                 buf.append(self.q_out.get())
801                 message = ''.join(buf)
802                 if self.VNF_PROMPT in message:
803                     LOG.info("%s VNF is up and running.", self.APP_NAME)
804                     self._vnf_up_post()
805                     self.queue_wrapper.clear()
806                     self.resource_helper.start_collect()
807                     return self._vnf_process.exitcode
808
809                 if "PANIC" in message:
810                     raise RuntimeError("Error starting %s VNF." %
811                                        self.APP_NAME)
812
813             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
814             time.sleep(1)
815
816     def _build_run_kwargs(self):
817         self.run_kwargs = {
818             'stdin': self.queue_wrapper,
819             'stdout': self.queue_wrapper,
820             'keep_stdin_open': True,
821             'pty': True,
822         }
823
824     def _build_config(self):
825         return self.setup_helper.build_config()
826
827     def _run(self):
828         # we can't share ssh paramiko objects to force new connection
829         self.ssh_helper.drop_connection()
830         cmd = self._build_config()
831         # kill before starting
832         self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
833
834         LOG.debug(cmd)
835         self._build_run_kwargs()
836         self.ssh_helper.run(cmd, **self.run_kwargs)
837
838     def vnf_execute(self, cmd, wait_time=2):
839         """ send cmd to vnf process """
840
841         LOG.info("%s command: %s", self.APP_NAME, cmd)
842         self.q_in.put("{}\r\n".format(cmd))
843         time.sleep(wait_time)
844         output = []
845         while self.q_out.qsize() > 0:
846             output.append(self.q_out.get())
847         return "".join(output)
848
849     def _tear_down(self):
850         pass
851
852     def terminate(self):
853         self.vnf_execute("quit")
854         if self._vnf_process:
855             self._vnf_process.terminate()
856         self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
857         self._tear_down()
858         self.resource_helper.stop_collect()
859
860     def get_stats(self, *args, **kwargs):
861         """
862         Method for checking the statistics
863
864         :return:
865            VNF statistics
866         """
867         cmd = 'p {0} stats'.format(self.APP_WORD)
868         out = self.vnf_execute(cmd)
869         return out
870
871     def collect_kpi(self):
872         stats = self.get_stats()
873         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
874         if m:
875             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
876             result["collect_stats"] = self.resource_helper.collect_kpi()
877         else:
878             result = {
879                 "packets_in": 0,
880                 "packets_fwd": 0,
881                 "packets_dropped": 0,
882             }
883         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
884         return result
885
886
887 class SampleVNFTrafficGen(GenericTrafficGen):
888     """ Class providing file-like API for generic traffic generator """
889
890     APP_NAME = 'Sample'
891     RUN_WAIT = 1
892
893     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
894         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
895         self.bin_path = get_nsb_option('bin_path', '')
896         self.name = "tgen__1"  # name in topology file
897
898         self.scenario_helper = ScenarioHelper(self.name)
899         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
900
901         if setup_env_helper_type is None:
902             setup_env_helper_type = SetupEnvHelper
903
904         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
905                                                   self.ssh_helper,
906                                                   self.scenario_helper)
907
908         if resource_helper_type is None:
909             resource_helper_type = ClientResourceHelper
910
911         self.resource_helper = resource_helper_type(self.setup_helper)
912
913         self.runs_traffic = True
914         self.traffic_finished = False
915         self.tg_port_pairs = None
916         self._tg_process = None
917         self._traffic_process = None
918
919     def _start_server(self):
920         # we can't share ssh paramiko objects to force new connection
921         self.ssh_helper.drop_connection()
922
923     def instantiate(self, scenario_cfg, context_cfg):
924         self.scenario_helper.scenario_cfg = scenario_cfg
925         self.resource_helper.generate_cfg()
926         self.setup_helper.setup_vnf_environment()
927         self.resource_helper.setup()
928
929         LOG.info("Starting %s server...", self.APP_NAME)
930         self._tg_process = Process(target=self._start_server)
931         self._tg_process.start()
932
933     def wait_for_instantiate(self):
934         # overridden by subclasses
935         return self._wait_for_process()
936
937     def _check_status(self):
938         raise NotImplementedError
939
940     def _wait_for_process(self):
941         while True:
942             if not self._tg_process.is_alive():
943                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
944             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
945             time.sleep(1)
946             status = self._check_status()
947             if status == 0:
948                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
949                 return self._tg_process.exitcode
950
951     def _traffic_runner(self, traffic_profile):
952         LOG.info("Starting %s client...", self.APP_NAME)
953         self.resource_helper.run_traffic(traffic_profile)
954
955     def run_traffic(self, traffic_profile):
956         """ Generate traffic on the wire according to the given params.
957         Method is non-blocking, returns immediately when traffic process
958         is running. Mandatory.
959
960         :param traffic_profile:
961         :return: True/False
962         """
963         self._traffic_process = Process(target=self._traffic_runner,
964                                         args=(traffic_profile,))
965         self._traffic_process.start()
966         # Wait for traffic process to start
967         while self.resource_helper.client_started.value == 0:
968             time.sleep(self.RUN_WAIT)
969             # what if traffic process takes a few seconds to start?
970             if not self._traffic_process.is_alive():
971                 break
972
973         return self._traffic_process.is_alive()
974
975     def listen_traffic(self, traffic_profile):
976         """ Listen to traffic with the given parameters.
977         Method is non-blocking, returns immediately when traffic process
978         is running. Optional.
979
980         :param traffic_profile:
981         :return: True/False
982         """
983         pass
984
985     def verify_traffic(self, traffic_profile):
986         """ Verify captured traffic after it has ended. Optional.
987
988         :param traffic_profile:
989         :return: dict
990         """
991         pass
992
993     def collect_kpi(self):
994         result = self.resource_helper.collect_kpi()
995         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
996         return result
997
998     def terminate(self):
999         """ After this method finishes, all traffic processes should stop. Mandatory.
1000
1001         :return: True/False
1002         """
1003         self.traffic_finished = True
1004         if self._traffic_process is not None:
1005             self._traffic_process.terminate()