Enable static cgnapt functionality.
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.cpu import CpuSysCores
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.utils import get_nsb_option
42
43 from trex_stl_lib.trex_stl_client import STLClient
44 from trex_stl_lib.trex_stl_client import LoggerApi
45 from trex_stl_lib.trex_stl_exceptions import STLError
46
47 from yardstick.ssh import AutoConnectSSH
48
49 DPDK_VERSION = "dpdk-16.07"
50
51 LOG = logging.getLogger(__name__)
52
53
54 REMOTE_TMP = "/tmp"
55 DEFAULT_VNF_TIMEOUT = 3600
56 PROCESS_JOIN_TIMEOUT = 3
57
58
59 class VnfSshHelper(AutoConnectSSH):
60
61     def __init__(self, node, bin_path, wait=None):
62         self.node = node
63         kwargs = self.args_from_node(self.node)
64         if wait:
65             kwargs.setdefault('wait', wait)
66
67         super(VnfSshHelper, self).__init__(**kwargs)
68         self.bin_path = bin_path
69
70     @staticmethod
71     def get_class():
72         # must return static class name, anything else refers to the calling class
73         # i.e. the subclass, not the superclass
74         return VnfSshHelper
75
76     def copy(self):
77         # this copy constructor is different from SSH classes, since it uses node
78         return self.get_class()(self.node, self.bin_path)
79
80     def upload_config_file(self, prefix, content):
81         cfg_file = os.path.join(REMOTE_TMP, prefix)
82         LOG.debug(content)
83         file_obj = cStringIO(content)
84         self.put_file_obj(file_obj, cfg_file)
85         return cfg_file
86
87     def join_bin_path(self, *args):
88         return os.path.join(self.bin_path, *args)
89
90     def provision_tool(self, tool_path=None, tool_file=None):
91         if tool_path is None:
92             tool_path = self.bin_path
93         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94
95
96 class SetupEnvHelper(object):
97
98     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
99     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
100     CORES = []
101     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102     PIPELINE_COMMAND = ''
103     VNF_TYPE = "SAMPLE"
104
105     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
106         super(SetupEnvHelper, self).__init__()
107         self.vnfd_helper = vnfd_helper
108         self.ssh_helper = ssh_helper
109         self.scenario_helper = scenario_helper
110
111     def build_config(self):
112         raise NotImplementedError
113
114     def setup_vnf_environment(self):
115         pass
116
117     def kill_vnf(self):
118         pass
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
128
129     HW_DEFAULT_CORE = 3
130     SW_DEFAULT_CORE = 2
131
132     @staticmethod
133     def _update_packet_type(ip_pipeline_cfg, traffic_options):
134         match_str = 'pkt_type = ipv4'
135         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
136         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
137         return pipeline_config_str
138
139     @classmethod
140     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
141         traffic_type = traffic_options['traffic_type']
142
143         if traffic_options['vnf_type'] is not cls.APP_NAME:
144             match_str = 'traffic_type = 4'
145             replace_str = 'traffic_type = {0}'.format(traffic_type)
146
147         elif traffic_type == 4:
148             match_str = 'pkt_type = ipv4'
149             replace_str = 'pkt_type = ipv4'
150
151         else:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv6'
154
155         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
156         return pipeline_config_str
157
158     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
159         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
160         self.all_ports = None
161         self.bound_pci = None
162         self.socket = None
163         self.used_drivers = None
164         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
165
166     def _setup_hugepages(self):
167         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
168         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
169
170         memory_path = \
171             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
172         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
173
174         if hugepages == "2048kB":
175             pages = 8192
176         else:
177             pages = 16
178
179         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
180
181     def build_config(self):
182         vnf_cfg = self.scenario_helper.vnf_cfg
183         task_path = self.scenario_helper.task_path
184
185         lb_count = vnf_cfg.get('lb_count', 3)
186         lb_config = vnf_cfg.get('lb_config', 'SW')
187         worker_config = vnf_cfg.get('worker_config', '1C/1T')
188         worker_threads = vnf_cfg.get('worker_threads', 3)
189
190         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
191         traffic_options = {
192             'traffic_type': traffic_type,
193             'pkt_type': 'ipv%s' % traffic_type,
194             'vnf_type': self.VNF_TYPE,
195         }
196
197         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
198         config_basename = posixpath.basename(self.CFG_CONFIG)
199         script_basename = posixpath.basename(self.CFG_SCRIPT)
200         multiport = MultiPortConfig(self.scenario_helper.topology,
201                                     config_tpl_cfg,
202                                     config_basename,
203                                     self.vnfd_helper,
204                                     self.VNF_TYPE,
205                                     lb_count,
206                                     worker_threads,
207                                     worker_config,
208                                     lb_config,
209                                     self.socket)
210
211         multiport.generate_config()
212         with open(self.CFG_CONFIG) as handle:
213             new_config = handle.read()
214
215         new_config = self._update_traffic_type(new_config, traffic_options)
216         new_config = self._update_packet_type(new_config, traffic_options)
217
218         self.ssh_helper.upload_config_file(config_basename, new_config)
219         self.ssh_helper.upload_config_file(script_basename,
220                                            multiport.generate_script(self.vnfd_helper))
221
222         LOG.info("Provision and start the %s", self.APP_NAME)
223         self._build_pipeline_kwargs()
224         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
225
226     def _build_pipeline_kwargs(self):
227         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
228         # count the number of actual ports in the list of pairs
229         # remove duplicate ports
230         # this is really a mapping from LINK ID to DPDK PMD ID
231         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
232         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
233         ports = self.vnfd_helper.port_pairs.all_ports
234         port_nums = self.vnfd_helper.port_nums(ports)
235         # create mask from all the dpdk port numbers
236         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
237         self.pipeline_kwargs = {
238             'cfg_file': self.CFG_CONFIG,
239             'script': self.CFG_SCRIPT,
240             'port_mask_hex': ports_mask_hex,
241             'tool_path': tool_path,
242         }
243
244     def _get_app_cpu(self):
245         if self.CORES:
246             return self.CORES
247
248         vnf_cfg = self.scenario_helper.vnf_cfg
249         sys_obj = CpuSysCores(self.ssh_helper)
250         self.sys_cpu = sys_obj.get_core_socket()
251         num_core = int(vnf_cfg["worker_threads"])
252         if vnf_cfg.get("lb_config", "SW") == 'HW':
253             num_core += self.HW_DEFAULT_CORE
254         else:
255             num_core += self.SW_DEFAULT_CORE
256         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
257         return app_cpu
258
259     def _get_cpu_sibling_list(self, cores=None):
260         if cores is None:
261             cores = self._get_app_cpu()
262         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
263         awk_template = "awk -F: '{ print $1 }' < %s"
264         sys_path = "/sys/devices/system/cpu/"
265         cpu_topology = []
266         try:
267             for core in cores:
268                 sys_cmd = sys_cmd_template % (sys_path, core)
269                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
270                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
271
272             return cpu_topology
273         except Exception:
274             return []
275
276     def _validate_cpu_cfg(self):
277         return self._get_cpu_sibling_list()
278
279     def setup_vnf_environment(self):
280         self._setup_dpdk()
281         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
282         self.kill_vnf()
283         # bind before _setup_resources so we can use dpdk_port_num
284         self._detect_and_bind_drivers()
285         resource = self._setup_resources()
286         return resource
287
288     def kill_vnf(self):
289         # pkill is not matching, debug with pgrep
290         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
291         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
292         # have to use exact match
293         # try using killall to match
294         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
295
296     def _setup_dpdk(self):
297         """ setup dpdk environment needed for vnf to run """
298
299         self._setup_hugepages()
300         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
301
302         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
303         if exit_status == 0:
304             return
305
306         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
307         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
308         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
309         if exit_status != 0:
310             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
311
312     def get_collectd_options(self):
313         options = self.scenario_helper.all_options.get("collectd", {})
314         # override with specific node settings
315         options.update(self.scenario_helper.options.get("collectd", {}))
316         return options
317
318     def _setup_resources(self):
319         # what is this magic?  how do we know which socket is for which port?
320         # what about quad-socket?
321         if any(v[5] == "0" for v in self.bound_pci):
322             self.socket = 0
323         else:
324             self.socket = 1
325
326         cores = self._validate_cpu_cfg()
327         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
328         # this won't work because we don't have DPDK port numbers yet
329         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
330         port_names = (intf["name"] for intf in ports)
331         collectd_options = self.get_collectd_options()
332         plugins = collectd_options.get("plugins", {})
333         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
334         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
335                                plugins=plugins, interval=collectd_options.get("interval"),
336                                timeout=self.scenario_helper.timeout)
337
338     def _detect_and_bind_drivers(self):
339         interfaces = self.vnfd_helper.interfaces
340
341         self.dpdk_bind_helper.read_status()
342         self.dpdk_bind_helper.save_used_drivers()
343
344         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
345
346         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
347         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
348             try:
349                 intf = next(v for v in interfaces
350                             if vpci == v['virtual-interface']['vpci'])
351                 # force to int
352                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
353             except:
354                 pass
355         time.sleep(2)
356
357     def get_local_iface_name_by_vpci(self, vpci):
358         find_net_cmd = self.FIND_NET_CMD.format(vpci)
359         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
360         if exit_status == 0:
361             return stdout
362         return None
363
364     def tear_down(self):
365         self.dpdk_bind_helper.rebind_drivers()
366
367
368 class ResourceHelper(object):
369
370     COLLECT_KPI = ''
371     MAKE_INSTALL = 'cd {0} && make && sudo make install'
372     RESOURCE_WORD = 'sample'
373
374     COLLECT_MAP = {}
375
376     def __init__(self, setup_helper):
377         super(ResourceHelper, self).__init__()
378         self.resource = None
379         self.setup_helper = setup_helper
380         self.ssh_helper = setup_helper.ssh_helper
381
382     def setup(self):
383         self.resource = self.setup_helper.setup_vnf_environment()
384
385     def generate_cfg(self):
386         pass
387
388     def _collect_resource_kpi(self):
389         result = {}
390         status = self.resource.check_if_sa_running("collectd")[0]
391         if status == 0:
392             result = self.resource.amqp_collect_nfvi_kpi()
393
394         result = {"core": result}
395         return result
396
397     def start_collect(self):
398         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
399         self.resource.start()
400         self.resource.amqp_process_for_nfvi_kpi()
401
402     def stop_collect(self):
403         if self.resource:
404             self.resource.stop()
405
406     def collect_kpi(self):
407         return self._collect_resource_kpi()
408
409
410 class ClientResourceHelper(ResourceHelper):
411
412     RUN_DURATION = 60
413     QUEUE_WAIT_TIME = 5
414     SYNC_PORT = 1
415     ASYNC_PORT = 2
416
417     def __init__(self, setup_helper):
418         super(ClientResourceHelper, self).__init__(setup_helper)
419         self.vnfd_helper = setup_helper.vnfd_helper
420         self.scenario_helper = setup_helper.scenario_helper
421
422         self.client = None
423         self.client_started = Value('i', 0)
424         self.all_ports = None
425         self._queue = Queue()
426         self._result = {}
427         self._terminated = Value('i', 0)
428
429     def _build_ports(self):
430         self.networks = self.vnfd_helper.port_pairs.networks
431         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
432         self.downlink_ports = \
433             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
434         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
435
436     def get_stats(self, *args, **kwargs):
437         try:
438             return self.client.get_stats(*args, **kwargs)
439         except STLError:
440             LOG.exception("TRex client not connected")
441             return {}
442
443     def generate_samples(self, ports, key=None, default=None):
444         # needs to be used ports
445         last_result = self.get_stats(ports)
446         key_value = last_result.get(key, default)
447
448         if not isinstance(last_result, Mapping):  # added for mock unit test
449             self._terminated.value = 1
450             return {}
451
452         samples = {}
453         # recalculate port for interface and see if it matches ports provided
454         for intf in self.vnfd_helper.interfaces:
455             name = intf["name"]
456             port = self.vnfd_helper.port_num(name)
457             if port in ports:
458                 xe_value = last_result.get(port, {})
459                 samples[name] = {
460                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
461                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
462                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
463                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
464                     "in_packets": int(xe_value.get("ipackets", 0)),
465                     "out_packets": int(xe_value.get("opackets", 0)),
466                 }
467                 if key:
468                     samples[name][key] = key_value
469         return samples
470
471     def _run_traffic_once(self, traffic_profile):
472         traffic_profile.execute_traffic(self)
473         self.client_started.value = 1
474         time.sleep(self.RUN_DURATION)
475         samples = self.generate_samples(traffic_profile.ports)
476         time.sleep(self.QUEUE_WAIT_TIME)
477         self._queue.put(samples)
478
479     def run_traffic(self, traffic_profile):
480         # if we don't do this we can hang waiting for the queue to drain
481         # have to do this in the subprocess
482         self._queue.cancel_join_thread()
483         # fixme: fix passing correct trex config file,
484         # instead of searching the default path
485         try:
486             self._build_ports()
487             self.client = self._connect()
488             self.client.reset(ports=self.all_ports)
489             self.client.remove_all_streams(self.all_ports)  # remove all streams
490             traffic_profile.register_generator(self)
491
492             while self._terminated.value == 0:
493                 self._run_traffic_once(traffic_profile)
494
495             self.client.stop(self.all_ports)
496             self.client.disconnect()
497             self._terminated.value = 0
498         except STLError:
499             if self._terminated.value:
500                 LOG.debug("traffic generator is stopped")
501                 return  # return if trex/tg server is stopped.
502             raise
503
504     def terminate(self):
505         self._terminated.value = 1  # stop client
506
507     def clear_stats(self, ports=None):
508         if ports is None:
509             ports = self.all_ports
510         self.client.clear_stats(ports=ports)
511
512     def start(self, ports=None, *args, **kwargs):
513         if ports is None:
514             ports = self.all_ports
515         self.client.start(ports=ports, *args, **kwargs)
516
517     def collect_kpi(self):
518         if not self._queue.empty():
519             kpi = self._queue.get()
520             self._result.update(kpi)
521             LOG.debug("Got KPIs from _queue for {0} {1}".format(
522                 self.scenario_helper.name, self.RESOURCE_WORD))
523         return self._result
524
525     def _connect(self, client=None):
526         if client is None:
527             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
528                                server=self.vnfd_helper.mgmt_interface["ip"],
529                                verbose_level=LoggerApi.VERBOSE_QUIET)
530
531         # try to connect with 5s intervals, 30s max
532         for idx in range(6):
533             try:
534                 client.connect()
535                 break
536             except STLError:
537                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
538                 time.sleep(5)
539         return client
540
541
542 class Rfc2544ResourceHelper(object):
543
544     DEFAULT_CORRELATED_TRAFFIC = False
545     DEFAULT_LATENCY = False
546     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
547
548     def __init__(self, scenario_helper):
549         super(Rfc2544ResourceHelper, self).__init__()
550         self.scenario_helper = scenario_helper
551         self._correlated_traffic = None
552         self.iteration = Value('i', 0)
553         self._latency = None
554         self._rfc2544 = None
555         self._tolerance_low = None
556         self._tolerance_high = None
557
558     @property
559     def rfc2544(self):
560         if self._rfc2544 is None:
561             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
562         return self._rfc2544
563
564     @property
565     def tolerance_low(self):
566         if self._tolerance_low is None:
567             self.get_rfc_tolerance()
568         return self._tolerance_low
569
570     @property
571     def tolerance_high(self):
572         if self._tolerance_high is None:
573             self.get_rfc_tolerance()
574         return self._tolerance_high
575
576     @property
577     def correlated_traffic(self):
578         if self._correlated_traffic is None:
579             self._correlated_traffic = \
580                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
581
582         return self._correlated_traffic
583
584     @property
585     def latency(self):
586         if self._latency is None:
587             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
588         return self._latency
589
590     def get_rfc2544(self, name, default=None):
591         return self.rfc2544.get(name, default)
592
593     def get_rfc_tolerance(self):
594         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
595         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
596         self._tolerance_low = next(tolerance_iter)
597         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
598
599
600 class SampleVNFDeployHelper(object):
601
602     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
603     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
604     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
605
606     def __init__(self, vnfd_helper, ssh_helper):
607         super(SampleVNFDeployHelper, self).__init__()
608         self.ssh_helper = ssh_helper
609         self.vnfd_helper = vnfd_helper
610
611     def deploy_vnfs(self, app_name):
612         vnf_bin = self.ssh_helper.join_bin_path(app_name)
613         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
614         if not exit_status:
615             return
616
617         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
618         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
619         time.sleep(2)
620         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
621         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
622
623         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
624         time.sleep(2)
625         http_proxy = os.environ.get('http_proxy', '')
626         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
627         LOG.debug(cmd)
628         self.ssh_helper.execute(cmd)
629         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
630         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
631         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
632
633
634 class ScenarioHelper(object):
635
636     DEFAULT_VNF_CFG = {
637         'lb_config': 'SW',
638         'lb_count': 1,
639         'worker_config': '1C/1T',
640         'worker_threads': 1,
641     }
642
643     def __init__(self, name):
644         self.name = name
645         self.scenario_cfg = None
646
647     @property
648     def task_path(self):
649         return self.scenario_cfg['task_path']
650
651     @property
652     def nodes(self):
653         return self.scenario_cfg.get('nodes')
654
655     @property
656     def all_options(self):
657         return self.scenario_cfg.get('options', {})
658
659     @property
660     def options(self):
661         return self.all_options.get(self.name, {})
662
663     @property
664     def vnf_cfg(self):
665         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
666
667     @property
668     def topology(self):
669         return self.scenario_cfg['topology']
670
671     @property
672     def timeout(self):
673         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
674
675
676 class SampleVNF(GenericVNF):
677     """ Class providing file-like API for generic VNF implementation """
678
679     VNF_PROMPT = "pipeline>"
680     WAIT_TIME = 1
681     WAIT_TIME_FOR_SCRIPT = 10
682     APP_NAME = "SampleVNF"
683     # we run the VNF interactively, so the ssh command will timeout after this long
684
685     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
686         super(SampleVNF, self).__init__(name, vnfd)
687         self.bin_path = get_nsb_option('bin_path', '')
688
689         self.scenario_helper = ScenarioHelper(self.name)
690         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
691
692         if setup_env_helper_type is None:
693             setup_env_helper_type = SetupEnvHelper
694
695         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
696                                                   self.ssh_helper,
697                                                   self.scenario_helper)
698
699         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
700
701         if resource_helper_type is None:
702             resource_helper_type = ResourceHelper
703
704         self.resource_helper = resource_helper_type(self.setup_helper)
705
706         self.context_cfg = None
707         self.nfvi_context = None
708         self.pipeline_kwargs = {}
709         self.uplink_ports = None
710         self.downlink_ports = None
711         # TODO(esm): make QueueFileWrapper invert-able so that we
712         #            never have to manage the queues
713         self.q_in = Queue()
714         self.q_out = Queue()
715         self.queue_wrapper = None
716         self.run_kwargs = {}
717         self.used_drivers = {}
718         self.vnf_port_pairs = None
719         self._vnf_process = None
720
721     def _build_ports(self):
722         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
723         self.networks = self._port_pairs.networks
724         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
725         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
726         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
727
728     def _get_route_data(self, route_index, route_type):
729         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
730         for _ in range(route_index):
731             next(route_iter, '')
732         return next(route_iter, {}).get(route_type, '')
733
734     def _get_port0localip6(self):
735         return_value = self._get_route_data(0, 'network')
736         LOG.info("_get_port0localip6 : %s", return_value)
737         return return_value
738
739     def _get_port1localip6(self):
740         return_value = self._get_route_data(1, 'network')
741         LOG.info("_get_port1localip6 : %s", return_value)
742         return return_value
743
744     def _get_port0prefixlen6(self):
745         return_value = self._get_route_data(0, 'netmask')
746         LOG.info("_get_port0prefixlen6 : %s", return_value)
747         return return_value
748
749     def _get_port1prefixlen6(self):
750         return_value = self._get_route_data(1, 'netmask')
751         LOG.info("_get_port1prefixlen6 : %s", return_value)
752         return return_value
753
754     def _get_port0gateway6(self):
755         return_value = self._get_route_data(0, 'network')
756         LOG.info("_get_port0gateway6 : %s", return_value)
757         return return_value
758
759     def _get_port1gateway6(self):
760         return_value = self._get_route_data(1, 'network')
761         LOG.info("_get_port1gateway6 : %s", return_value)
762         return return_value
763
764     def _start_vnf(self):
765         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
766         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
767         self._vnf_process = Process(name=name, target=self._run)
768         self._vnf_process.start()
769
770     def _vnf_up_post(self):
771         pass
772
773     def instantiate(self, scenario_cfg, context_cfg):
774         self.scenario_helper.scenario_cfg = scenario_cfg
775         self.context_cfg = context_cfg
776         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
777         # self.nfvi_context = None
778
779         # vnf deploy is unsupported, use ansible playbooks
780         if self.scenario_helper.options.get("vnf_deploy", False):
781             self.deploy_helper.deploy_vnfs(self.APP_NAME)
782         self.resource_helper.setup()
783         self._start_vnf()
784
785     def wait_for_instantiate(self):
786         buf = []
787         time.sleep(self.WAIT_TIME)  # Give some time for config to load
788         while True:
789             if not self._vnf_process.is_alive():
790                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
791
792             # TODO(esm): move to QueueFileWrapper
793             while self.q_out.qsize() > 0:
794                 buf.append(self.q_out.get())
795                 message = ''.join(buf)
796                 if self.VNF_PROMPT in message:
797                     LOG.info("%s VNF is up and running.", self.APP_NAME)
798                     self._vnf_up_post()
799                     self.queue_wrapper.clear()
800                     self.resource_helper.start_collect()
801                     return self._vnf_process.exitcode
802
803                 if "PANIC" in message:
804                     raise RuntimeError("Error starting %s VNF." %
805                                        self.APP_NAME)
806
807             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
808             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
809             # Send ENTER to display a new prompt in case the prompt text was corrupted
810             # by other VNF output
811             self.q_in.put('\r\n')
812
813     def _build_run_kwargs(self):
814         self.run_kwargs = {
815             'stdin': self.queue_wrapper,
816             'stdout': self.queue_wrapper,
817             'keep_stdin_open': True,
818             'pty': True,
819             'timeout': self.scenario_helper.timeout,
820         }
821
822     def _build_config(self):
823         return self.setup_helper.build_config()
824
825     def _run(self):
826         # we can't share ssh paramiko objects to force new connection
827         self.ssh_helper.drop_connection()
828         cmd = self._build_config()
829         # kill before starting
830         self.setup_helper.kill_vnf()
831
832         LOG.debug(cmd)
833         self._build_run_kwargs()
834         self.ssh_helper.run(cmd, **self.run_kwargs)
835
836     def vnf_execute(self, cmd, wait_time=2):
837         """ send cmd to vnf process """
838
839         LOG.info("%s command: %s", self.APP_NAME, cmd)
840         self.q_in.put("{}\r\n".format(cmd))
841         time.sleep(wait_time)
842         output = []
843         while self.q_out.qsize() > 0:
844             output.append(self.q_out.get())
845         return "".join(output)
846
847     def _tear_down(self):
848         pass
849
850     def terminate(self):
851         self.vnf_execute("quit")
852         self.setup_helper.kill_vnf()
853         self._tear_down()
854         self.resource_helper.stop_collect()
855         if self._vnf_process is not None:
856             # be proper and join first before we kill
857             LOG.debug("joining before terminate %s", self._vnf_process.name)
858             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
859             self._vnf_process.terminate()
860         # no terminate children here because we share processes with tg
861
862     def get_stats(self, *args, **kwargs):
863         """
864         Method for checking the statistics
865
866         :return:
867            VNF statistics
868         """
869         cmd = 'p {0} stats'.format(self.APP_WORD)
870         out = self.vnf_execute(cmd)
871         return out
872
873     def collect_kpi(self):
874         # we can't get KPIs if the VNF is down
875         check_if_process_failed(self._vnf_process)
876         stats = self.get_stats()
877         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
878         if m:
879             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
880             result["collect_stats"] = self.resource_helper.collect_kpi()
881         else:
882             result = {
883                 "packets_in": 0,
884                 "packets_fwd": 0,
885                 "packets_dropped": 0,
886             }
887         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
888         return result
889
890
891 class SampleVNFTrafficGen(GenericTrafficGen):
892     """ Class providing file-like API for generic traffic generator """
893
894     APP_NAME = 'Sample'
895     RUN_WAIT = 1
896
897     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
898         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
899         self.bin_path = get_nsb_option('bin_path', '')
900
901         self.scenario_helper = ScenarioHelper(self.name)
902         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
903
904         if setup_env_helper_type is None:
905             setup_env_helper_type = SetupEnvHelper
906
907         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
908                                                   self.ssh_helper,
909                                                   self.scenario_helper)
910
911         if resource_helper_type is None:
912             resource_helper_type = ClientResourceHelper
913
914         self.resource_helper = resource_helper_type(self.setup_helper)
915
916         self.runs_traffic = True
917         self.traffic_finished = False
918         self._tg_process = None
919         self._traffic_process = None
920
921     def _start_server(self):
922         # we can't share ssh paramiko objects to force new connection
923         self.ssh_helper.drop_connection()
924
925     def instantiate(self, scenario_cfg, context_cfg):
926         self.scenario_helper.scenario_cfg = scenario_cfg
927         self.resource_helper.generate_cfg()
928         self.resource_helper.setup()
929
930         LOG.info("Starting %s server...", self.APP_NAME)
931         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
932         self._tg_process = Process(name=name, target=self._start_server)
933         self._tg_process.start()
934
935     def wait_for_instantiate(self):
936         # overridden by subclasses
937         return self._wait_for_process()
938
939     def _check_status(self):
940         raise NotImplementedError
941
942     def _wait_for_process(self):
943         while True:
944             if not self._tg_process.is_alive():
945                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
946             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
947             time.sleep(1)
948             status = self._check_status()
949             if status == 0:
950                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
951                 return self._tg_process.exitcode
952
953     def _traffic_runner(self, traffic_profile):
954         # always drop connections first thing in new processes
955         # so we don't get paramiko errors
956         self.ssh_helper.drop_connection()
957         LOG.info("Starting %s client...", self.APP_NAME)
958         self.resource_helper.run_traffic(traffic_profile)
959
960     def run_traffic(self, traffic_profile):
961         """ Generate traffic on the wire according to the given params.
962         Method is non-blocking, returns immediately when traffic process
963         is running. Mandatory.
964
965         :param traffic_profile:
966         :return: True/False
967         """
968         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
969                                     os.getpid())
970         self._traffic_process = Process(name=name, target=self._traffic_runner,
971                                         args=(traffic_profile,))
972         self._traffic_process.start()
973         # Wait for traffic process to start
974         while self.resource_helper.client_started.value == 0:
975             time.sleep(self.RUN_WAIT)
976             # what if traffic process takes a few seconds to start?
977             if not self._traffic_process.is_alive():
978                 break
979
980         return self._traffic_process.is_alive()
981
982     def listen_traffic(self, traffic_profile):
983         """ Listen to traffic with the given parameters.
984         Method is non-blocking, returns immediately when traffic process
985         is running. Optional.
986
987         :param traffic_profile:
988         :return: True/False
989         """
990         pass
991
992     def verify_traffic(self, traffic_profile):
993         """ Verify captured traffic after it has ended. Optional.
994
995         :param traffic_profile:
996         :return: dict
997         """
998         pass
999
1000     def collect_kpi(self):
1001         # check if the tg processes have exited
1002         for proc in (self._tg_process, self._traffic_process):
1003             check_if_process_failed(proc)
1004         result = self.resource_helper.collect_kpi()
1005         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1006         return result
1007
1008     def terminate(self):
1009         """ After this method finishes, all traffic processes should stop. Mandatory.
1010
1011         :return: True/False
1012         """
1013         self.traffic_finished = True
1014         # we must kill client before we kill the server, or the client will raise exception
1015         if self._traffic_process is not None:
1016             # be proper and try to join before terminating
1017             LOG.debug("joining before terminate %s", self._traffic_process.name)
1018             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
1019             self._traffic_process.terminate()
1020         if self._tg_process is not None:
1021             # be proper and try to join before terminating
1022             LOG.debug("joining before terminate %s", self._tg_process.name)
1023             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
1024             self._tg_process.terminate()
1025         # no terminate children here because we share processes with vnf