Merge "rewrite _generate_pod_yaml to combine name and pkey setting"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.cpu import CpuSysCores
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.utils import get_nsb_option
42
43 from trex_stl_lib.trex_stl_client import STLClient
44 from trex_stl_lib.trex_stl_client import LoggerApi
45 from trex_stl_lib.trex_stl_exceptions import STLError
46
47 from yardstick.ssh import AutoConnectSSH
48
49 DPDK_VERSION = "dpdk-16.07"
50
51 LOG = logging.getLogger(__name__)
52
53
54 REMOTE_TMP = "/tmp"
55 DEFAULT_VNF_TIMEOUT = 3600
56 PROCESS_JOIN_TIMEOUT = 3
57
58
59 class VnfSshHelper(AutoConnectSSH):
60
61     def __init__(self, node, bin_path, wait=None):
62         self.node = node
63         kwargs = self.args_from_node(self.node)
64         if wait:
65             kwargs.setdefault('wait', wait)
66
67         super(VnfSshHelper, self).__init__(**kwargs)
68         self.bin_path = bin_path
69
70     @staticmethod
71     def get_class():
72         # must return static class name, anything else refers to the calling class
73         # i.e. the subclass, not the superclass
74         return VnfSshHelper
75
76     def copy(self):
77         # this copy constructor is different from SSH classes, since it uses node
78         return self.get_class()(self.node, self.bin_path)
79
80     def upload_config_file(self, prefix, content):
81         cfg_file = os.path.join(REMOTE_TMP, prefix)
82         LOG.debug(content)
83         file_obj = cStringIO(content)
84         self.put_file_obj(file_obj, cfg_file)
85         return cfg_file
86
87     def join_bin_path(self, *args):
88         return os.path.join(self.bin_path, *args)
89
90     def provision_tool(self, tool_path=None, tool_file=None):
91         if tool_path is None:
92             tool_path = self.bin_path
93         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94
95
96 class SetupEnvHelper(object):
97
98     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
99     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
100     CORES = []
101     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102     PIPELINE_COMMAND = ''
103     VNF_TYPE = "SAMPLE"
104
105     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
106         super(SetupEnvHelper, self).__init__()
107         self.vnfd_helper = vnfd_helper
108         self.ssh_helper = ssh_helper
109         self.scenario_helper = scenario_helper
110
111     def _get_ports_gateway(self, name):
112         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
113         for route in routing_table:
114             if name == route['if']:
115                 return route['gateway']
116         return None
117
118     def build_config(self):
119         raise NotImplementedError
120
121     def setup_vnf_environment(self):
122         pass
123
124     def kill_vnf(self):
125         pass
126
127     def tear_down(self):
128         raise NotImplementedError
129
130
131 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
132
133     APP_NAME = 'DpdkVnf'
134     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
135
136     HW_DEFAULT_CORE = 3
137     SW_DEFAULT_CORE = 2
138
139     @staticmethod
140     def _update_packet_type(ip_pipeline_cfg, traffic_options):
141         match_str = 'pkt_type = ipv4'
142         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
143         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
144         return pipeline_config_str
145
146     @classmethod
147     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
148         traffic_type = traffic_options['traffic_type']
149
150         if traffic_options['vnf_type'] is not cls.APP_NAME:
151             match_str = 'traffic_type = 4'
152             replace_str = 'traffic_type = {0}'.format(traffic_type)
153
154         elif traffic_type == 4:
155             match_str = 'pkt_type = ipv4'
156             replace_str = 'pkt_type = ipv4'
157
158         else:
159             match_str = 'pkt_type = ipv4'
160             replace_str = 'pkt_type = ipv6'
161
162         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
163         return pipeline_config_str
164
165     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
166         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
167         self.all_ports = None
168         self.bound_pci = None
169         self.socket = None
170         self.used_drivers = None
171         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
172
173     def _setup_hugepages(self):
174         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
175         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
176
177         memory_path = \
178             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
179         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
180
181         if hugepages == "2048kB":
182             pages = 8192
183         else:
184             pages = 16
185
186         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
187
188     def build_config(self):
189         vnf_cfg = self.scenario_helper.vnf_cfg
190         task_path = self.scenario_helper.task_path
191
192         lb_count = vnf_cfg.get('lb_count', 3)
193         lb_config = vnf_cfg.get('lb_config', 'SW')
194         worker_config = vnf_cfg.get('worker_config', '1C/1T')
195         worker_threads = vnf_cfg.get('worker_threads', 3)
196
197         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
198         traffic_options = {
199             'traffic_type': traffic_type,
200             'pkt_type': 'ipv%s' % traffic_type,
201             'vnf_type': self.VNF_TYPE,
202         }
203
204         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
205         config_basename = posixpath.basename(self.CFG_CONFIG)
206         script_basename = posixpath.basename(self.CFG_SCRIPT)
207         multiport = MultiPortConfig(self.scenario_helper.topology,
208                                     config_tpl_cfg,
209                                     config_basename,
210                                     self.vnfd_helper,
211                                     self.VNF_TYPE,
212                                     lb_count,
213                                     worker_threads,
214                                     worker_config,
215                                     lb_config,
216                                     self.socket)
217
218         multiport.generate_config()
219         with open(self.CFG_CONFIG) as handle:
220             new_config = handle.read()
221
222         new_config = self._update_traffic_type(new_config, traffic_options)
223         new_config = self._update_packet_type(new_config, traffic_options)
224
225         self.ssh_helper.upload_config_file(config_basename, new_config)
226         self.ssh_helper.upload_config_file(script_basename,
227                                            multiport.generate_script(self.vnfd_helper))
228
229         LOG.info("Provision and start the %s", self.APP_NAME)
230         self._build_pipeline_kwargs()
231         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
232
233     def _build_pipeline_kwargs(self):
234         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
235         # count the number of actual ports in the list of pairs
236         # remove duplicate ports
237         # this is really a mapping from LINK ID to DPDK PMD ID
238         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
239         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
240         ports = self.vnfd_helper.port_pairs.all_ports
241         port_nums = self.vnfd_helper.port_nums(ports)
242         # create mask from all the dpdk port numbers
243         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
244         self.pipeline_kwargs = {
245             'cfg_file': self.CFG_CONFIG,
246             'script': self.CFG_SCRIPT,
247             'port_mask_hex': ports_mask_hex,
248             'tool_path': tool_path,
249         }
250
251     def _get_app_cpu(self):
252         if self.CORES:
253             return self.CORES
254
255         vnf_cfg = self.scenario_helper.vnf_cfg
256         sys_obj = CpuSysCores(self.ssh_helper)
257         self.sys_cpu = sys_obj.get_core_socket()
258         num_core = int(vnf_cfg["worker_threads"])
259         if vnf_cfg.get("lb_config", "SW") == 'HW':
260             num_core += self.HW_DEFAULT_CORE
261         else:
262             num_core += self.SW_DEFAULT_CORE
263         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
264         return app_cpu
265
266     def _get_cpu_sibling_list(self, cores=None):
267         if cores is None:
268             cores = self._get_app_cpu()
269         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
270         awk_template = "awk -F: '{ print $1 }' < %s"
271         sys_path = "/sys/devices/system/cpu/"
272         cpu_topology = []
273         try:
274             for core in cores:
275                 sys_cmd = sys_cmd_template % (sys_path, core)
276                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
277                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
278
279             return cpu_topology
280         except Exception:
281             return []
282
283     def _validate_cpu_cfg(self):
284         return self._get_cpu_sibling_list()
285
286     def setup_vnf_environment(self):
287         self._setup_dpdk()
288         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
289         self.kill_vnf()
290         # bind before _setup_resources so we can use dpdk_port_num
291         self._detect_and_bind_drivers()
292         resource = self._setup_resources()
293         return resource
294
295     def kill_vnf(self):
296         # pkill is not matching, debug with pgrep
297         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
298         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
299         # have to use exact match
300         # try using killall to match
301         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
302
303     def _setup_dpdk(self):
304         """ setup dpdk environment needed for vnf to run """
305
306         self._setup_hugepages()
307         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
308
309         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
310         if exit_status == 0:
311             return
312
313         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
314         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
315         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
316         if exit_status != 0:
317             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
318
319     def get_collectd_options(self):
320         options = self.scenario_helper.all_options.get("collectd", {})
321         # override with specific node settings
322         options.update(self.scenario_helper.options.get("collectd", {}))
323         return options
324
325     def _setup_resources(self):
326         # what is this magic?  how do we know which socket is for which port?
327         # what about quad-socket?
328         if any(v[5] == "0" for v in self.bound_pci):
329             self.socket = 0
330         else:
331             self.socket = 1
332
333         cores = self._validate_cpu_cfg()
334         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
335         # this won't work because we don't have DPDK port numbers yet
336         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
337         port_names = (intf["name"] for intf in ports)
338         collectd_options = self.get_collectd_options()
339         plugins = collectd_options.get("plugins", {})
340         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
341         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
342                                plugins=plugins, interval=collectd_options.get("interval"),
343                                timeout=self.scenario_helper.timeout)
344
345     def _detect_and_bind_drivers(self):
346         interfaces = self.vnfd_helper.interfaces
347
348         self.dpdk_bind_helper.read_status()
349         self.dpdk_bind_helper.save_used_drivers()
350
351         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
352
353         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
354         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
355             try:
356                 intf = next(v for v in interfaces
357                             if vpci == v['virtual-interface']['vpci'])
358                 # force to int
359                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
360             except:
361                 pass
362         time.sleep(2)
363
364     def get_local_iface_name_by_vpci(self, vpci):
365         find_net_cmd = self.FIND_NET_CMD.format(vpci)
366         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
367         if exit_status == 0:
368             return stdout
369         return None
370
371     def tear_down(self):
372         self.dpdk_bind_helper.rebind_drivers()
373
374
375 class ResourceHelper(object):
376
377     COLLECT_KPI = ''
378     MAKE_INSTALL = 'cd {0} && make && sudo make install'
379     RESOURCE_WORD = 'sample'
380
381     COLLECT_MAP = {}
382
383     def __init__(self, setup_helper):
384         super(ResourceHelper, self).__init__()
385         self.resource = None
386         self.setup_helper = setup_helper
387         self.ssh_helper = setup_helper.ssh_helper
388
389     def setup(self):
390         self.resource = self.setup_helper.setup_vnf_environment()
391
392     def generate_cfg(self):
393         pass
394
395     def _collect_resource_kpi(self):
396         result = {}
397         status = self.resource.check_if_sa_running("collectd")[0]
398         if status == 0:
399             result = self.resource.amqp_collect_nfvi_kpi()
400
401         result = {"core": result}
402         return result
403
404     def start_collect(self):
405         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
406         self.resource.start()
407         self.resource.amqp_process_for_nfvi_kpi()
408
409     def stop_collect(self):
410         if self.resource:
411             self.resource.stop()
412
413     def collect_kpi(self):
414         return self._collect_resource_kpi()
415
416
417 class ClientResourceHelper(ResourceHelper):
418
419     RUN_DURATION = 60
420     QUEUE_WAIT_TIME = 5
421     SYNC_PORT = 1
422     ASYNC_PORT = 2
423
424     def __init__(self, setup_helper):
425         super(ClientResourceHelper, self).__init__(setup_helper)
426         self.vnfd_helper = setup_helper.vnfd_helper
427         self.scenario_helper = setup_helper.scenario_helper
428
429         self.client = None
430         self.client_started = Value('i', 0)
431         self.all_ports = None
432         self._queue = Queue()
433         self._result = {}
434         self._terminated = Value('i', 0)
435
436     def _build_ports(self):
437         self.networks = self.vnfd_helper.port_pairs.networks
438         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
439         self.downlink_ports = \
440             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
441         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
442
443     def get_stats(self, *args, **kwargs):
444         try:
445             return self.client.get_stats(*args, **kwargs)
446         except STLError:
447             LOG.exception("TRex client not connected")
448             return {}
449
450     def generate_samples(self, ports, key=None, default=None):
451         # needs to be used ports
452         last_result = self.get_stats(ports)
453         key_value = last_result.get(key, default)
454
455         if not isinstance(last_result, Mapping):  # added for mock unit test
456             self._terminated.value = 1
457             return {}
458
459         samples = {}
460         # recalculate port for interface and see if it matches ports provided
461         for intf in self.vnfd_helper.interfaces:
462             name = intf["name"]
463             port = self.vnfd_helper.port_num(name)
464             if port in ports:
465                 xe_value = last_result.get(port, {})
466                 samples[name] = {
467                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
468                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
469                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
470                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
471                     "in_packets": int(xe_value.get("ipackets", 0)),
472                     "out_packets": int(xe_value.get("opackets", 0)),
473                 }
474                 if key:
475                     samples[name][key] = key_value
476         return samples
477
478     def _run_traffic_once(self, traffic_profile):
479         traffic_profile.execute_traffic(self)
480         self.client_started.value = 1
481         time.sleep(self.RUN_DURATION)
482         samples = self.generate_samples(traffic_profile.ports)
483         time.sleep(self.QUEUE_WAIT_TIME)
484         self._queue.put(samples)
485
486     def run_traffic(self, traffic_profile):
487         # if we don't do this we can hang waiting for the queue to drain
488         # have to do this in the subprocess
489         self._queue.cancel_join_thread()
490         # fixme: fix passing correct trex config file,
491         # instead of searching the default path
492         try:
493             self._build_ports()
494             self.client = self._connect()
495             self.client.reset(ports=self.all_ports)
496             self.client.remove_all_streams(self.all_ports)  # remove all streams
497             traffic_profile.register_generator(self)
498
499             while self._terminated.value == 0:
500                 self._run_traffic_once(traffic_profile)
501
502             self.client.stop(self.all_ports)
503             self.client.disconnect()
504             self._terminated.value = 0
505         except STLError:
506             if self._terminated.value:
507                 LOG.debug("traffic generator is stopped")
508                 return  # return if trex/tg server is stopped.
509             raise
510
511     def terminate(self):
512         self._terminated.value = 1  # stop client
513
514     def clear_stats(self, ports=None):
515         if ports is None:
516             ports = self.all_ports
517         self.client.clear_stats(ports=ports)
518
519     def start(self, ports=None, *args, **kwargs):
520         if ports is None:
521             ports = self.all_ports
522         self.client.start(ports=ports, *args, **kwargs)
523
524     def collect_kpi(self):
525         if not self._queue.empty():
526             kpi = self._queue.get()
527             self._result.update(kpi)
528             LOG.debug("Got KPIs from _queue for {0} {1}".format(
529                 self.scenario_helper.name, self.RESOURCE_WORD))
530         return self._result
531
532     def _connect(self, client=None):
533         if client is None:
534             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
535                                server=self.vnfd_helper.mgmt_interface["ip"],
536                                verbose_level=LoggerApi.VERBOSE_QUIET)
537
538         # try to connect with 5s intervals, 30s max
539         for idx in range(6):
540             try:
541                 client.connect()
542                 break
543             except STLError:
544                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
545                 time.sleep(5)
546         return client
547
548
549 class Rfc2544ResourceHelper(object):
550
551     DEFAULT_CORRELATED_TRAFFIC = False
552     DEFAULT_LATENCY = False
553     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
554
555     def __init__(self, scenario_helper):
556         super(Rfc2544ResourceHelper, self).__init__()
557         self.scenario_helper = scenario_helper
558         self._correlated_traffic = None
559         self.iteration = Value('i', 0)
560         self._latency = None
561         self._rfc2544 = None
562         self._tolerance_low = None
563         self._tolerance_high = None
564
565     @property
566     def rfc2544(self):
567         if self._rfc2544 is None:
568             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
569         return self._rfc2544
570
571     @property
572     def tolerance_low(self):
573         if self._tolerance_low is None:
574             self.get_rfc_tolerance()
575         return self._tolerance_low
576
577     @property
578     def tolerance_high(self):
579         if self._tolerance_high is None:
580             self.get_rfc_tolerance()
581         return self._tolerance_high
582
583     @property
584     def correlated_traffic(self):
585         if self._correlated_traffic is None:
586             self._correlated_traffic = \
587                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
588
589         return self._correlated_traffic
590
591     @property
592     def latency(self):
593         if self._latency is None:
594             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
595         return self._latency
596
597     def get_rfc2544(self, name, default=None):
598         return self.rfc2544.get(name, default)
599
600     def get_rfc_tolerance(self):
601         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
602         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
603         self._tolerance_low = next(tolerance_iter)
604         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
605
606
607 class SampleVNFDeployHelper(object):
608
609     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
610     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
611     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
612
613     def __init__(self, vnfd_helper, ssh_helper):
614         super(SampleVNFDeployHelper, self).__init__()
615         self.ssh_helper = ssh_helper
616         self.vnfd_helper = vnfd_helper
617
618     def deploy_vnfs(self, app_name):
619         vnf_bin = self.ssh_helper.join_bin_path(app_name)
620         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
621         if not exit_status:
622             return
623
624         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
625         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
626         time.sleep(2)
627         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
628         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
629
630         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
631         time.sleep(2)
632         http_proxy = os.environ.get('http_proxy', '')
633         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
634         LOG.debug(cmd)
635         self.ssh_helper.execute(cmd)
636         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
637         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
638         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
639
640
641 class ScenarioHelper(object):
642
643     DEFAULT_VNF_CFG = {
644         'lb_config': 'SW',
645         'lb_count': 1,
646         'worker_config': '1C/1T',
647         'worker_threads': 1,
648     }
649
650     def __init__(self, name):
651         self.name = name
652         self.scenario_cfg = None
653
654     @property
655     def task_path(self):
656         return self.scenario_cfg['task_path']
657
658     @property
659     def nodes(self):
660         return self.scenario_cfg.get('nodes')
661
662     @property
663     def all_options(self):
664         return self.scenario_cfg.get('options', {})
665
666     @property
667     def options(self):
668         return self.all_options.get(self.name, {})
669
670     @property
671     def vnf_cfg(self):
672         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
673
674     @property
675     def topology(self):
676         return self.scenario_cfg['topology']
677
678     @property
679     def timeout(self):
680         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
681
682
683 class SampleVNF(GenericVNF):
684     """ Class providing file-like API for generic VNF implementation """
685
686     VNF_PROMPT = "pipeline>"
687     WAIT_TIME = 1
688     APP_NAME = "SampleVNF"
689     # we run the VNF interactively, so the ssh command will timeout after this long
690
691     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
692         super(SampleVNF, self).__init__(name, vnfd)
693         self.bin_path = get_nsb_option('bin_path', '')
694
695         self.scenario_helper = ScenarioHelper(self.name)
696         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
697
698         if setup_env_helper_type is None:
699             setup_env_helper_type = SetupEnvHelper
700
701         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
702                                                   self.ssh_helper,
703                                                   self.scenario_helper)
704
705         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
706
707         if resource_helper_type is None:
708             resource_helper_type = ResourceHelper
709
710         self.resource_helper = resource_helper_type(self.setup_helper)
711
712         self.context_cfg = None
713         self.nfvi_context = None
714         self.pipeline_kwargs = {}
715         self.uplink_ports = None
716         self.downlink_ports = None
717         # TODO(esm): make QueueFileWrapper invert-able so that we
718         #            never have to manage the queues
719         self.q_in = Queue()
720         self.q_out = Queue()
721         self.queue_wrapper = None
722         self.run_kwargs = {}
723         self.used_drivers = {}
724         self.vnf_port_pairs = None
725         self._vnf_process = None
726
727     def _build_ports(self):
728         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
729         self.networks = self._port_pairs.networks
730         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
731         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
732         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
733
734     def _get_route_data(self, route_index, route_type):
735         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
736         for _ in range(route_index):
737             next(route_iter, '')
738         return next(route_iter, {}).get(route_type, '')
739
740     def _get_port0localip6(self):
741         return_value = self._get_route_data(0, 'network')
742         LOG.info("_get_port0localip6 : %s", return_value)
743         return return_value
744
745     def _get_port1localip6(self):
746         return_value = self._get_route_data(1, 'network')
747         LOG.info("_get_port1localip6 : %s", return_value)
748         return return_value
749
750     def _get_port0prefixlen6(self):
751         return_value = self._get_route_data(0, 'netmask')
752         LOG.info("_get_port0prefixlen6 : %s", return_value)
753         return return_value
754
755     def _get_port1prefixlen6(self):
756         return_value = self._get_route_data(1, 'netmask')
757         LOG.info("_get_port1prefixlen6 : %s", return_value)
758         return return_value
759
760     def _get_port0gateway6(self):
761         return_value = self._get_route_data(0, 'network')
762         LOG.info("_get_port0gateway6 : %s", return_value)
763         return return_value
764
765     def _get_port1gateway6(self):
766         return_value = self._get_route_data(1, 'network')
767         LOG.info("_get_port1gateway6 : %s", return_value)
768         return return_value
769
770     def _start_vnf(self):
771         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
772         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
773         self._vnf_process = Process(name=name, target=self._run)
774         self._vnf_process.start()
775
776     def _vnf_up_post(self):
777         pass
778
779     def instantiate(self, scenario_cfg, context_cfg):
780         self.scenario_helper.scenario_cfg = scenario_cfg
781         self.context_cfg = context_cfg
782         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
783         # self.nfvi_context = None
784
785         # vnf deploy is unsupported, use ansible playbooks
786         if self.scenario_helper.options.get("vnf_deploy", False):
787             self.deploy_helper.deploy_vnfs(self.APP_NAME)
788         self.resource_helper.setup()
789         self._start_vnf()
790
791     def wait_for_instantiate(self):
792         buf = []
793         time.sleep(self.WAIT_TIME)  # Give some time for config to load
794         while True:
795             if not self._vnf_process.is_alive():
796                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
797
798             # TODO(esm): move to QueueFileWrapper
799             while self.q_out.qsize() > 0:
800                 buf.append(self.q_out.get())
801                 message = ''.join(buf)
802                 if self.VNF_PROMPT in message:
803                     LOG.info("%s VNF is up and running.", self.APP_NAME)
804                     self._vnf_up_post()
805                     self.queue_wrapper.clear()
806                     self.resource_helper.start_collect()
807                     return self._vnf_process.exitcode
808
809                 if "PANIC" in message:
810                     raise RuntimeError("Error starting %s VNF." %
811                                        self.APP_NAME)
812
813             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
814             time.sleep(1)
815             # Send ENTER to display a new prompt in case the prompt text was corrupted
816             # by other VNF output
817             self.q_in.put('\r\n')
818
819     def _build_run_kwargs(self):
820         self.run_kwargs = {
821             'stdin': self.queue_wrapper,
822             'stdout': self.queue_wrapper,
823             'keep_stdin_open': True,
824             'pty': True,
825             'timeout': self.scenario_helper.timeout,
826         }
827
828     def _build_config(self):
829         return self.setup_helper.build_config()
830
831     def _run(self):
832         # we can't share ssh paramiko objects to force new connection
833         self.ssh_helper.drop_connection()
834         cmd = self._build_config()
835         # kill before starting
836         self.setup_helper.kill_vnf()
837
838         LOG.debug(cmd)
839         self._build_run_kwargs()
840         self.ssh_helper.run(cmd, **self.run_kwargs)
841
842     def vnf_execute(self, cmd, wait_time=2):
843         """ send cmd to vnf process """
844
845         LOG.info("%s command: %s", self.APP_NAME, cmd)
846         self.q_in.put("{}\r\n".format(cmd))
847         time.sleep(wait_time)
848         output = []
849         while self.q_out.qsize() > 0:
850             output.append(self.q_out.get())
851         return "".join(output)
852
853     def _tear_down(self):
854         pass
855
856     def terminate(self):
857         self.vnf_execute("quit")
858         self.setup_helper.kill_vnf()
859         self._tear_down()
860         self.resource_helper.stop_collect()
861         if self._vnf_process is not None:
862             # be proper and join first before we kill
863             LOG.debug("joining before terminate %s", self._vnf_process.name)
864             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
865             self._vnf_process.terminate()
866         # no terminate children here because we share processes with tg
867
868     def get_stats(self, *args, **kwargs):
869         """
870         Method for checking the statistics
871
872         :return:
873            VNF statistics
874         """
875         cmd = 'p {0} stats'.format(self.APP_WORD)
876         out = self.vnf_execute(cmd)
877         return out
878
879     def collect_kpi(self):
880         # we can't get KPIs if the VNF is down
881         check_if_process_failed(self._vnf_process)
882         stats = self.get_stats()
883         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
884         if m:
885             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
886             result["collect_stats"] = self.resource_helper.collect_kpi()
887         else:
888             result = {
889                 "packets_in": 0,
890                 "packets_fwd": 0,
891                 "packets_dropped": 0,
892             }
893         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
894         return result
895
896
897 class SampleVNFTrafficGen(GenericTrafficGen):
898     """ Class providing file-like API for generic traffic generator """
899
900     APP_NAME = 'Sample'
901     RUN_WAIT = 1
902
903     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
904         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
905         self.bin_path = get_nsb_option('bin_path', '')
906
907         self.scenario_helper = ScenarioHelper(self.name)
908         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
909
910         if setup_env_helper_type is None:
911             setup_env_helper_type = SetupEnvHelper
912
913         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
914                                                   self.ssh_helper,
915                                                   self.scenario_helper)
916
917         if resource_helper_type is None:
918             resource_helper_type = ClientResourceHelper
919
920         self.resource_helper = resource_helper_type(self.setup_helper)
921
922         self.runs_traffic = True
923         self.traffic_finished = False
924         self._tg_process = None
925         self._traffic_process = None
926
927     def _start_server(self):
928         # we can't share ssh paramiko objects to force new connection
929         self.ssh_helper.drop_connection()
930
931     def instantiate(self, scenario_cfg, context_cfg):
932         self.scenario_helper.scenario_cfg = scenario_cfg
933         self.resource_helper.generate_cfg()
934         self.resource_helper.setup()
935
936         LOG.info("Starting %s server...", self.APP_NAME)
937         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
938         self._tg_process = Process(name=name, target=self._start_server)
939         self._tg_process.start()
940
941     def wait_for_instantiate(self):
942         # overridden by subclasses
943         return self._wait_for_process()
944
945     def _check_status(self):
946         raise NotImplementedError
947
948     def _wait_for_process(self):
949         while True:
950             if not self._tg_process.is_alive():
951                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
952             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
953             time.sleep(1)
954             status = self._check_status()
955             if status == 0:
956                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
957                 return self._tg_process.exitcode
958
959     def _traffic_runner(self, traffic_profile):
960         # always drop connections first thing in new processes
961         # so we don't get paramiko errors
962         self.ssh_helper.drop_connection()
963         LOG.info("Starting %s client...", self.APP_NAME)
964         self.resource_helper.run_traffic(traffic_profile)
965
966     def run_traffic(self, traffic_profile):
967         """ Generate traffic on the wire according to the given params.
968         Method is non-blocking, returns immediately when traffic process
969         is running. Mandatory.
970
971         :param traffic_profile:
972         :return: True/False
973         """
974         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
975                                     os.getpid())
976         self._traffic_process = Process(name=name, target=self._traffic_runner,
977                                         args=(traffic_profile,))
978         self._traffic_process.start()
979         # Wait for traffic process to start
980         while self.resource_helper.client_started.value == 0:
981             time.sleep(self.RUN_WAIT)
982             # what if traffic process takes a few seconds to start?
983             if not self._traffic_process.is_alive():
984                 break
985
986         return self._traffic_process.is_alive()
987
988     def listen_traffic(self, traffic_profile):
989         """ Listen to traffic with the given parameters.
990         Method is non-blocking, returns immediately when traffic process
991         is running. Optional.
992
993         :param traffic_profile:
994         :return: True/False
995         """
996         pass
997
998     def verify_traffic(self, traffic_profile):
999         """ Verify captured traffic after it has ended. Optional.
1000
1001         :param traffic_profile:
1002         :return: dict
1003         """
1004         pass
1005
1006     def collect_kpi(self):
1007         # check if the tg processes have exited
1008         for proc in (self._tg_process, self._traffic_process):
1009             check_if_process_failed(proc)
1010         result = self.resource_helper.collect_kpi()
1011         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1012         return result
1013
1014     def terminate(self):
1015         """ After this method finishes, all traffic processes should stop. Mandatory.
1016
1017         :return: True/False
1018         """
1019         self.traffic_finished = True
1020         # we must kill client before we kill the server, or the client will raise exception
1021         if self._traffic_process is not None:
1022             # be proper and try to join before terminating
1023             LOG.debug("joining before terminate %s", self._traffic_process.name)
1024             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
1025             self._traffic_process.terminate()
1026         if self._tg_process is not None:
1027             # be proper and try to join before terminating
1028             LOG.debug("joining before terminate %s", self._tg_process.name)
1029             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
1030             self._tg_process.terminate()
1031         # no terminate children here because we share processes with vnf