Merge "task: use CONF_FILE from constants"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.cpu import CpuSysCores
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.utils import get_nsb_option
42
43 from trex_stl_lib.trex_stl_client import STLClient
44 from trex_stl_lib.trex_stl_client import LoggerApi
45 from trex_stl_lib.trex_stl_exceptions import STLError
46
47 from yardstick.ssh import AutoConnectSSH
48
49 DPDK_VERSION = "dpdk-16.07"
50
51 LOG = logging.getLogger(__name__)
52
53
54 REMOTE_TMP = "/tmp"
55 DEFAULT_VNF_TIMEOUT = 3600
56 PROCESS_JOIN_TIMEOUT = 3
57
58
59 class VnfSshHelper(AutoConnectSSH):
60
61     def __init__(self, node, bin_path, wait=None):
62         self.node = node
63         kwargs = self.args_from_node(self.node)
64         if wait:
65             kwargs.setdefault('wait', wait)
66
67         super(VnfSshHelper, self).__init__(**kwargs)
68         self.bin_path = bin_path
69
70     @staticmethod
71     def get_class():
72         # must return static class name, anything else refers to the calling class
73         # i.e. the subclass, not the superclass
74         return VnfSshHelper
75
76     def copy(self):
77         # this copy constructor is different from SSH classes, since it uses node
78         return self.get_class()(self.node, self.bin_path)
79
80     def upload_config_file(self, prefix, content):
81         cfg_file = os.path.join(REMOTE_TMP, prefix)
82         LOG.debug(content)
83         file_obj = cStringIO(content)
84         self.put_file_obj(file_obj, cfg_file)
85         return cfg_file
86
87     def join_bin_path(self, *args):
88         return os.path.join(self.bin_path, *args)
89
90     def provision_tool(self, tool_path=None, tool_file=None):
91         if tool_path is None:
92             tool_path = self.bin_path
93         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94
95
96 class SetupEnvHelper(object):
97
98     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
99     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
100     CORES = []
101     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102     PIPELINE_COMMAND = ''
103     VNF_TYPE = "SAMPLE"
104
105     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
106         super(SetupEnvHelper, self).__init__()
107         self.vnfd_helper = vnfd_helper
108         self.ssh_helper = ssh_helper
109         self.scenario_helper = scenario_helper
110
111     def _get_ports_gateway(self, name):
112         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
113         for route in routing_table:
114             if name == route['if']:
115                 return route['gateway']
116         return None
117
118     def build_config(self):
119         raise NotImplementedError
120
121     def setup_vnf_environment(self):
122         pass
123
124     def kill_vnf(self):
125         pass
126
127     def tear_down(self):
128         raise NotImplementedError
129
130
131 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
132
133     APP_NAME = 'DpdkVnf'
134     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
135
136     HW_DEFAULT_CORE = 3
137     SW_DEFAULT_CORE = 2
138
139     @staticmethod
140     def _update_packet_type(ip_pipeline_cfg, traffic_options):
141         match_str = 'pkt_type = ipv4'
142         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
143         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
144         return pipeline_config_str
145
146     @classmethod
147     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
148         traffic_type = traffic_options['traffic_type']
149
150         if traffic_options['vnf_type'] is not cls.APP_NAME:
151             match_str = 'traffic_type = 4'
152             replace_str = 'traffic_type = {0}'.format(traffic_type)
153
154         elif traffic_type == 4:
155             match_str = 'pkt_type = ipv4'
156             replace_str = 'pkt_type = ipv4'
157
158         else:
159             match_str = 'pkt_type = ipv4'
160             replace_str = 'pkt_type = ipv6'
161
162         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
163         return pipeline_config_str
164
165     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
166         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
167         self.all_ports = None
168         self.bound_pci = None
169         self.socket = None
170         self.used_drivers = None
171         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
172
173     def _setup_hugepages(self):
174         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
175         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
176
177         memory_path = \
178             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
179         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
180
181         if hugepages == "2048kB":
182             pages = 8192
183         else:
184             pages = 16
185
186         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
187
188     def build_config(self):
189         vnf_cfg = self.scenario_helper.vnf_cfg
190         task_path = self.scenario_helper.task_path
191
192         lb_count = vnf_cfg.get('lb_count', 3)
193         lb_config = vnf_cfg.get('lb_config', 'SW')
194         worker_config = vnf_cfg.get('worker_config', '1C/1T')
195         worker_threads = vnf_cfg.get('worker_threads', 3)
196
197         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
198         traffic_options = {
199             'traffic_type': traffic_type,
200             'pkt_type': 'ipv%s' % traffic_type,
201             'vnf_type': self.VNF_TYPE,
202         }
203
204         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
205         config_basename = posixpath.basename(self.CFG_CONFIG)
206         script_basename = posixpath.basename(self.CFG_SCRIPT)
207         multiport = MultiPortConfig(self.scenario_helper.topology,
208                                     config_tpl_cfg,
209                                     config_basename,
210                                     self.vnfd_helper,
211                                     self.VNF_TYPE,
212                                     lb_count,
213                                     worker_threads,
214                                     worker_config,
215                                     lb_config,
216                                     self.socket)
217
218         multiport.generate_config()
219         with open(self.CFG_CONFIG) as handle:
220             new_config = handle.read()
221
222         new_config = self._update_traffic_type(new_config, traffic_options)
223         new_config = self._update_packet_type(new_config, traffic_options)
224
225         self.ssh_helper.upload_config_file(config_basename, new_config)
226         self.ssh_helper.upload_config_file(script_basename,
227                                            multiport.generate_script(self.vnfd_helper))
228
229         LOG.info("Provision and start the %s", self.APP_NAME)
230         self._build_pipeline_kwargs()
231         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
232
233     def _build_pipeline_kwargs(self):
234         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
235         # count the number of actual ports in the list of pairs
236         # remove duplicate ports
237         # this is really a mapping from LINK ID to DPDK PMD ID
238         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
239         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
240         ports = self.vnfd_helper.port_pairs.all_ports
241         port_nums = self.vnfd_helper.port_nums(ports)
242         # create mask from all the dpdk port numbers
243         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
244         self.pipeline_kwargs = {
245             'cfg_file': self.CFG_CONFIG,
246             'script': self.CFG_SCRIPT,
247             'port_mask_hex': ports_mask_hex,
248             'tool_path': tool_path,
249         }
250
251     def _get_app_cpu(self):
252         if self.CORES:
253             return self.CORES
254
255         vnf_cfg = self.scenario_helper.vnf_cfg
256         sys_obj = CpuSysCores(self.ssh_helper)
257         self.sys_cpu = sys_obj.get_core_socket()
258         num_core = int(vnf_cfg["worker_threads"])
259         if vnf_cfg.get("lb_config", "SW") == 'HW':
260             num_core += self.HW_DEFAULT_CORE
261         else:
262             num_core += self.SW_DEFAULT_CORE
263         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
264         return app_cpu
265
266     def _get_cpu_sibling_list(self, cores=None):
267         if cores is None:
268             cores = self._get_app_cpu()
269         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
270         awk_template = "awk -F: '{ print $1 }' < %s"
271         sys_path = "/sys/devices/system/cpu/"
272         cpu_topology = []
273         try:
274             for core in cores:
275                 sys_cmd = sys_cmd_template % (sys_path, core)
276                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
277                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
278
279             return cpu_topology
280         except Exception:
281             return []
282
283     def _validate_cpu_cfg(self):
284         return self._get_cpu_sibling_list()
285
286     def setup_vnf_environment(self):
287         self._setup_dpdk()
288         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
289         self.kill_vnf()
290         # bind before _setup_resources so we can use dpdk_port_num
291         self._detect_and_bind_drivers()
292         resource = self._setup_resources()
293         return resource
294
295     def kill_vnf(self):
296         # pkill is not matching, debug with pgrep
297         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
298         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
299         # have to use exact match
300         # try using killall to match
301         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
302
303     def _setup_dpdk(self):
304         """ setup dpdk environment needed for vnf to run """
305
306         self._setup_hugepages()
307         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
308
309         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
310         if exit_status == 0:
311             return
312
313         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
314         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
315         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
316         if exit_status != 0:
317             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
318
319     def get_collectd_options(self):
320         options = self.scenario_helper.all_options.get("collectd", {})
321         # override with specific node settings
322         options.update(self.scenario_helper.options.get("collectd", {}))
323         return options
324
325     def _setup_resources(self):
326         # what is this magic?  how do we know which socket is for which port?
327         # what about quad-socket?
328         if any(v[5] == "0" for v in self.bound_pci):
329             self.socket = 0
330         else:
331             self.socket = 1
332
333         cores = self._validate_cpu_cfg()
334         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
335         # this won't work because we don't have DPDK port numbers yet
336         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
337         port_names = (intf["name"] for intf in ports)
338         collectd_options = self.get_collectd_options()
339         plugins = collectd_options.get("plugins", {})
340         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
341         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
342                                plugins=plugins, interval=collectd_options.get("interval"),
343                                timeout=self.scenario_helper.timeout)
344
345     def _detect_and_bind_drivers(self):
346         interfaces = self.vnfd_helper.interfaces
347
348         self.dpdk_bind_helper.read_status()
349         self.dpdk_bind_helper.save_used_drivers()
350
351         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
352
353         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
354         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
355             try:
356                 intf = next(v for v in interfaces
357                             if vpci == v['virtual-interface']['vpci'])
358                 # force to int
359                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
360             except:
361                 pass
362         time.sleep(2)
363
364     def get_local_iface_name_by_vpci(self, vpci):
365         find_net_cmd = self.FIND_NET_CMD.format(vpci)
366         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
367         if exit_status == 0:
368             return stdout
369         return None
370
371     def tear_down(self):
372         self.dpdk_bind_helper.rebind_drivers()
373
374
375 class ResourceHelper(object):
376
377     COLLECT_KPI = ''
378     MAKE_INSTALL = 'cd {0} && make && sudo make install'
379     RESOURCE_WORD = 'sample'
380
381     COLLECT_MAP = {}
382
383     def __init__(self, setup_helper):
384         super(ResourceHelper, self).__init__()
385         self.resource = None
386         self.setup_helper = setup_helper
387         self.ssh_helper = setup_helper.ssh_helper
388
389     def setup(self):
390         self.resource = self.setup_helper.setup_vnf_environment()
391
392     def generate_cfg(self):
393         pass
394
395     def _collect_resource_kpi(self):
396         result = {}
397         status = self.resource.check_if_sa_running("collectd")[0]
398         if status == 0:
399             result = self.resource.amqp_collect_nfvi_kpi()
400
401         result = {"core": result}
402         return result
403
404     def start_collect(self):
405         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
406         self.resource.start()
407         self.resource.amqp_process_for_nfvi_kpi()
408
409     def stop_collect(self):
410         if self.resource:
411             self.resource.stop()
412
413     def collect_kpi(self):
414         return self._collect_resource_kpi()
415
416
417 class ClientResourceHelper(ResourceHelper):
418
419     RUN_DURATION = 60
420     QUEUE_WAIT_TIME = 5
421     SYNC_PORT = 1
422     ASYNC_PORT = 2
423
424     def __init__(self, setup_helper):
425         super(ClientResourceHelper, self).__init__(setup_helper)
426         self.vnfd_helper = setup_helper.vnfd_helper
427         self.scenario_helper = setup_helper.scenario_helper
428
429         self.client = None
430         self.client_started = Value('i', 0)
431         self.all_ports = None
432         self._queue = Queue()
433         self._result = {}
434         self._terminated = Value('i', 0)
435
436     def _build_ports(self):
437         self.networks = self.vnfd_helper.port_pairs.networks
438         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
439         self.downlink_ports = \
440             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
441         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
442
443     def get_stats(self, *args, **kwargs):
444         try:
445             return self.client.get_stats(*args, **kwargs)
446         except STLError:
447             LOG.exception("TRex client not connected")
448             return {}
449
450     def generate_samples(self, ports, key=None, default=None):
451         # needs to be used ports
452         last_result = self.get_stats(ports)
453         key_value = last_result.get(key, default)
454
455         if not isinstance(last_result, Mapping):  # added for mock unit test
456             self._terminated.value = 1
457             return {}
458
459         samples = {}
460         # recalculate port for interface and see if it matches ports provided
461         for intf in self.vnfd_helper.interfaces:
462             name = intf["name"]
463             port = self.vnfd_helper.port_num(name)
464             if port in ports:
465                 xe_value = last_result.get(port, {})
466                 samples[name] = {
467                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
468                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
469                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
470                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
471                     "in_packets": int(xe_value.get("ipackets", 0)),
472                     "out_packets": int(xe_value.get("opackets", 0)),
473                 }
474                 if key:
475                     samples[name][key] = key_value
476         return samples
477
478     def _run_traffic_once(self, traffic_profile):
479         traffic_profile.execute_traffic(self)
480         self.client_started.value = 1
481         time.sleep(self.RUN_DURATION)
482         samples = self.generate_samples(traffic_profile.ports)
483         time.sleep(self.QUEUE_WAIT_TIME)
484         self._queue.put(samples)
485
486     def run_traffic(self, traffic_profile):
487         # if we don't do this we can hang waiting for the queue to drain
488         # have to do this in the subprocess
489         self._queue.cancel_join_thread()
490         # fixme: fix passing correct trex config file,
491         # instead of searching the default path
492         try:
493             self._build_ports()
494             self.client = self._connect()
495             self.client.reset(ports=self.all_ports)
496             self.client.remove_all_streams(self.all_ports)  # remove all streams
497             traffic_profile.register_generator(self)
498
499             while self._terminated.value == 0:
500                 self._run_traffic_once(traffic_profile)
501
502             self.client.stop(self.all_ports)
503             self.client.disconnect()
504             self._terminated.value = 0
505         except STLError:
506             if self._terminated.value:
507                 LOG.debug("traffic generator is stopped")
508                 return  # return if trex/tg server is stopped.
509             raise
510
511     def terminate(self):
512         self._terminated.value = 1  # stop client
513
514     def clear_stats(self, ports=None):
515         if ports is None:
516             ports = self.all_ports
517         self.client.clear_stats(ports=ports)
518
519     def start(self, ports=None, *args, **kwargs):
520         if ports is None:
521             ports = self.all_ports
522         self.client.start(ports=ports, *args, **kwargs)
523
524     def collect_kpi(self):
525         if not self._queue.empty():
526             kpi = self._queue.get()
527             self._result.update(kpi)
528             LOG.debug("Got KPIs from _queue for {0} {1}".format(
529                 self.scenario_helper.name, self.RESOURCE_WORD))
530         return self._result
531
532     def _connect(self, client=None):
533         if client is None:
534             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
535                                server=self.vnfd_helper.mgmt_interface["ip"],
536                                verbose_level=LoggerApi.VERBOSE_QUIET)
537
538         # try to connect with 5s intervals, 30s max
539         for idx in range(6):
540             try:
541                 client.connect()
542                 break
543             except STLError:
544                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
545                 time.sleep(5)
546         return client
547
548
549 class Rfc2544ResourceHelper(object):
550
551     DEFAULT_CORRELATED_TRAFFIC = False
552     DEFAULT_LATENCY = False
553     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
554
555     def __init__(self, scenario_helper):
556         super(Rfc2544ResourceHelper, self).__init__()
557         self.scenario_helper = scenario_helper
558         self._correlated_traffic = None
559         self.iteration = Value('i', 0)
560         self._latency = None
561         self._rfc2544 = None
562         self._tolerance_low = None
563         self._tolerance_high = None
564
565     @property
566     def rfc2544(self):
567         if self._rfc2544 is None:
568             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
569         return self._rfc2544
570
571     @property
572     def tolerance_low(self):
573         if self._tolerance_low is None:
574             self.get_rfc_tolerance()
575         return self._tolerance_low
576
577     @property
578     def tolerance_high(self):
579         if self._tolerance_high is None:
580             self.get_rfc_tolerance()
581         return self._tolerance_high
582
583     @property
584     def correlated_traffic(self):
585         if self._correlated_traffic is None:
586             self._correlated_traffic = \
587                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
588
589         return self._correlated_traffic
590
591     @property
592     def latency(self):
593         if self._latency is None:
594             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
595         return self._latency
596
597     def get_rfc2544(self, name, default=None):
598         return self.rfc2544.get(name, default)
599
600     def get_rfc_tolerance(self):
601         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
602         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
603         self._tolerance_low = next(tolerance_iter)
604         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
605
606
607 class SampleVNFDeployHelper(object):
608
609     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
610     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
611     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
612
613     def __init__(self, vnfd_helper, ssh_helper):
614         super(SampleVNFDeployHelper, self).__init__()
615         self.ssh_helper = ssh_helper
616         self.vnfd_helper = vnfd_helper
617
618     def deploy_vnfs(self, app_name):
619         vnf_bin = self.ssh_helper.join_bin_path(app_name)
620         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
621         if not exit_status:
622             return
623
624         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
625         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
626         time.sleep(2)
627         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
628         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
629
630         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
631         time.sleep(2)
632         http_proxy = os.environ.get('http_proxy', '')
633         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
634         LOG.debug(cmd)
635         self.ssh_helper.execute(cmd)
636         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
637         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
638         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
639
640
641 class ScenarioHelper(object):
642
643     DEFAULT_VNF_CFG = {
644         'lb_config': 'SW',
645         'lb_count': 1,
646         'worker_config': '1C/1T',
647         'worker_threads': 1,
648     }
649
650     def __init__(self, name):
651         self.name = name
652         self.scenario_cfg = None
653
654     @property
655     def task_path(self):
656         return self.scenario_cfg['task_path']
657
658     @property
659     def nodes(self):
660         return self.scenario_cfg.get('nodes')
661
662     @property
663     def all_options(self):
664         return self.scenario_cfg.get('options', {})
665
666     @property
667     def options(self):
668         return self.all_options.get(self.name, {})
669
670     @property
671     def vnf_cfg(self):
672         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
673
674     @property
675     def topology(self):
676         return self.scenario_cfg['topology']
677
678     @property
679     def timeout(self):
680         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
681
682
683 class SampleVNF(GenericVNF):
684     """ Class providing file-like API for generic VNF implementation """
685
686     VNF_PROMPT = "pipeline>"
687     WAIT_TIME = 1
688     WAIT_TIME_FOR_SCRIPT = 10
689     APP_NAME = "SampleVNF"
690     # we run the VNF interactively, so the ssh command will timeout after this long
691
692     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
693         super(SampleVNF, self).__init__(name, vnfd)
694         self.bin_path = get_nsb_option('bin_path', '')
695
696         self.scenario_helper = ScenarioHelper(self.name)
697         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
698
699         if setup_env_helper_type is None:
700             setup_env_helper_type = SetupEnvHelper
701
702         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
703                                                   self.ssh_helper,
704                                                   self.scenario_helper)
705
706         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
707
708         if resource_helper_type is None:
709             resource_helper_type = ResourceHelper
710
711         self.resource_helper = resource_helper_type(self.setup_helper)
712
713         self.context_cfg = None
714         self.nfvi_context = None
715         self.pipeline_kwargs = {}
716         self.uplink_ports = None
717         self.downlink_ports = None
718         # TODO(esm): make QueueFileWrapper invert-able so that we
719         #            never have to manage the queues
720         self.q_in = Queue()
721         self.q_out = Queue()
722         self.queue_wrapper = None
723         self.run_kwargs = {}
724         self.used_drivers = {}
725         self.vnf_port_pairs = None
726         self._vnf_process = None
727
728     def _build_ports(self):
729         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
730         self.networks = self._port_pairs.networks
731         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
732         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
733         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
734
735     def _get_route_data(self, route_index, route_type):
736         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
737         for _ in range(route_index):
738             next(route_iter, '')
739         return next(route_iter, {}).get(route_type, '')
740
741     def _get_port0localip6(self):
742         return_value = self._get_route_data(0, 'network')
743         LOG.info("_get_port0localip6 : %s", return_value)
744         return return_value
745
746     def _get_port1localip6(self):
747         return_value = self._get_route_data(1, 'network')
748         LOG.info("_get_port1localip6 : %s", return_value)
749         return return_value
750
751     def _get_port0prefixlen6(self):
752         return_value = self._get_route_data(0, 'netmask')
753         LOG.info("_get_port0prefixlen6 : %s", return_value)
754         return return_value
755
756     def _get_port1prefixlen6(self):
757         return_value = self._get_route_data(1, 'netmask')
758         LOG.info("_get_port1prefixlen6 : %s", return_value)
759         return return_value
760
761     def _get_port0gateway6(self):
762         return_value = self._get_route_data(0, 'network')
763         LOG.info("_get_port0gateway6 : %s", return_value)
764         return return_value
765
766     def _get_port1gateway6(self):
767         return_value = self._get_route_data(1, 'network')
768         LOG.info("_get_port1gateway6 : %s", return_value)
769         return return_value
770
771     def _start_vnf(self):
772         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
773         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
774         self._vnf_process = Process(name=name, target=self._run)
775         self._vnf_process.start()
776
777     def _vnf_up_post(self):
778         pass
779
780     def instantiate(self, scenario_cfg, context_cfg):
781         self.scenario_helper.scenario_cfg = scenario_cfg
782         self.context_cfg = context_cfg
783         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
784         # self.nfvi_context = None
785
786         # vnf deploy is unsupported, use ansible playbooks
787         if self.scenario_helper.options.get("vnf_deploy", False):
788             self.deploy_helper.deploy_vnfs(self.APP_NAME)
789         self.resource_helper.setup()
790         self._start_vnf()
791
792     def wait_for_instantiate(self):
793         buf = []
794         time.sleep(self.WAIT_TIME)  # Give some time for config to load
795         while True:
796             if not self._vnf_process.is_alive():
797                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
798
799             # TODO(esm): move to QueueFileWrapper
800             while self.q_out.qsize() > 0:
801                 buf.append(self.q_out.get())
802                 message = ''.join(buf)
803                 if self.VNF_PROMPT in message:
804                     LOG.info("%s VNF is up and running.", self.APP_NAME)
805                     self._vnf_up_post()
806                     self.queue_wrapper.clear()
807                     self.resource_helper.start_collect()
808                     return self._vnf_process.exitcode
809
810                 if "PANIC" in message:
811                     raise RuntimeError("Error starting %s VNF." %
812                                        self.APP_NAME)
813
814             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
815             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
816             # Send ENTER to display a new prompt in case the prompt text was corrupted
817             # by other VNF output
818             self.q_in.put('\r\n')
819
820     def _build_run_kwargs(self):
821         self.run_kwargs = {
822             'stdin': self.queue_wrapper,
823             'stdout': self.queue_wrapper,
824             'keep_stdin_open': True,
825             'pty': True,
826             'timeout': self.scenario_helper.timeout,
827         }
828
829     def _build_config(self):
830         return self.setup_helper.build_config()
831
832     def _run(self):
833         # we can't share ssh paramiko objects to force new connection
834         self.ssh_helper.drop_connection()
835         cmd = self._build_config()
836         # kill before starting
837         self.setup_helper.kill_vnf()
838
839         LOG.debug(cmd)
840         self._build_run_kwargs()
841         self.ssh_helper.run(cmd, **self.run_kwargs)
842
843     def vnf_execute(self, cmd, wait_time=2):
844         """ send cmd to vnf process """
845
846         LOG.info("%s command: %s", self.APP_NAME, cmd)
847         self.q_in.put("{}\r\n".format(cmd))
848         time.sleep(wait_time)
849         output = []
850         while self.q_out.qsize() > 0:
851             output.append(self.q_out.get())
852         return "".join(output)
853
854     def _tear_down(self):
855         pass
856
857     def terminate(self):
858         self.vnf_execute("quit")
859         self.setup_helper.kill_vnf()
860         self._tear_down()
861         self.resource_helper.stop_collect()
862         if self._vnf_process is not None:
863             # be proper and join first before we kill
864             LOG.debug("joining before terminate %s", self._vnf_process.name)
865             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
866             self._vnf_process.terminate()
867         # no terminate children here because we share processes with tg
868
869     def get_stats(self, *args, **kwargs):
870         """
871         Method for checking the statistics
872
873         :return:
874            VNF statistics
875         """
876         cmd = 'p {0} stats'.format(self.APP_WORD)
877         out = self.vnf_execute(cmd)
878         return out
879
880     def collect_kpi(self):
881         # we can't get KPIs if the VNF is down
882         check_if_process_failed(self._vnf_process)
883         stats = self.get_stats()
884         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
885         if m:
886             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
887             result["collect_stats"] = self.resource_helper.collect_kpi()
888         else:
889             result = {
890                 "packets_in": 0,
891                 "packets_fwd": 0,
892                 "packets_dropped": 0,
893             }
894         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
895         return result
896
897
898 class SampleVNFTrafficGen(GenericTrafficGen):
899     """ Class providing file-like API for generic traffic generator """
900
901     APP_NAME = 'Sample'
902     RUN_WAIT = 1
903
904     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
905         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
906         self.bin_path = get_nsb_option('bin_path', '')
907
908         self.scenario_helper = ScenarioHelper(self.name)
909         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
910
911         if setup_env_helper_type is None:
912             setup_env_helper_type = SetupEnvHelper
913
914         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
915                                                   self.ssh_helper,
916                                                   self.scenario_helper)
917
918         if resource_helper_type is None:
919             resource_helper_type = ClientResourceHelper
920
921         self.resource_helper = resource_helper_type(self.setup_helper)
922
923         self.runs_traffic = True
924         self.traffic_finished = False
925         self._tg_process = None
926         self._traffic_process = None
927
928     def _start_server(self):
929         # we can't share ssh paramiko objects to force new connection
930         self.ssh_helper.drop_connection()
931
932     def instantiate(self, scenario_cfg, context_cfg):
933         self.scenario_helper.scenario_cfg = scenario_cfg
934         self.resource_helper.generate_cfg()
935         self.resource_helper.setup()
936
937         LOG.info("Starting %s server...", self.APP_NAME)
938         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
939         self._tg_process = Process(name=name, target=self._start_server)
940         self._tg_process.start()
941
942     def wait_for_instantiate(self):
943         # overridden by subclasses
944         return self._wait_for_process()
945
946     def _check_status(self):
947         raise NotImplementedError
948
949     def _wait_for_process(self):
950         while True:
951             if not self._tg_process.is_alive():
952                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
953             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
954             time.sleep(1)
955             status = self._check_status()
956             if status == 0:
957                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
958                 return self._tg_process.exitcode
959
960     def _traffic_runner(self, traffic_profile):
961         # always drop connections first thing in new processes
962         # so we don't get paramiko errors
963         self.ssh_helper.drop_connection()
964         LOG.info("Starting %s client...", self.APP_NAME)
965         self.resource_helper.run_traffic(traffic_profile)
966
967     def run_traffic(self, traffic_profile):
968         """ Generate traffic on the wire according to the given params.
969         Method is non-blocking, returns immediately when traffic process
970         is running. Mandatory.
971
972         :param traffic_profile:
973         :return: True/False
974         """
975         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
976                                     os.getpid())
977         self._traffic_process = Process(name=name, target=self._traffic_runner,
978                                         args=(traffic_profile,))
979         self._traffic_process.start()
980         # Wait for traffic process to start
981         while self.resource_helper.client_started.value == 0:
982             time.sleep(self.RUN_WAIT)
983             # what if traffic process takes a few seconds to start?
984             if not self._traffic_process.is_alive():
985                 break
986
987         return self._traffic_process.is_alive()
988
989     def listen_traffic(self, traffic_profile):
990         """ Listen to traffic with the given parameters.
991         Method is non-blocking, returns immediately when traffic process
992         is running. Optional.
993
994         :param traffic_profile:
995         :return: True/False
996         """
997         pass
998
999     def verify_traffic(self, traffic_profile):
1000         """ Verify captured traffic after it has ended. Optional.
1001
1002         :param traffic_profile:
1003         :return: dict
1004         """
1005         pass
1006
1007     def collect_kpi(self):
1008         # check if the tg processes have exited
1009         for proc in (self._tg_process, self._traffic_process):
1010             check_if_process_failed(proc)
1011         result = self.resource_helper.collect_kpi()
1012         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1013         return result
1014
1015     def terminate(self):
1016         """ After this method finishes, all traffic processes should stop. Mandatory.
1017
1018         :return: True/False
1019         """
1020         self.traffic_finished = True
1021         # we must kill client before we kill the server, or the client will raise exception
1022         if self._traffic_process is not None:
1023             # be proper and try to join before terminating
1024             LOG.debug("joining before terminate %s", self._traffic_process.name)
1025             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
1026             self._traffic_process.terminate()
1027         if self._tg_process is not None:
1028             # be proper and try to join before terminating
1029             LOG.debug("joining before terminate %s", self._tg_process.name)
1030             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
1031             self._tg_process.terminate()
1032         # no terminate children here because we share processes with vnf