Merge "exec_tests: remove releng clone code"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     CORES = []
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def _get_ports_gateway(self, name):
109         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110         for route in routing_table:
111             if name == route['if']:
112                 return route['gateway']
113         return None
114
115     def build_config(self):
116         raise NotImplementedError
117
118     def setup_vnf_environment(self):
119         pass
120
121     def kill_vnf(self):
122         pass
123
124     def tear_down(self):
125         raise NotImplementedError
126
127
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129
130     APP_NAME = 'DpdkVnf'
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self.socket = None
167         self.used_drivers = None
168         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
169
170     def _setup_hugepages(self):
171         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
173
174         memory_path = \
175             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
177
178         if hugepages == "2048kB":
179             pages = 8192
180         else:
181             pages = 16
182
183         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
184
185     def build_config(self):
186         vnf_cfg = self.scenario_helper.vnf_cfg
187         task_path = self.scenario_helper.task_path
188
189         lb_count = vnf_cfg.get('lb_count', 3)
190         lb_config = vnf_cfg.get('lb_config', 'SW')
191         worker_config = vnf_cfg.get('worker_config', '1C/1T')
192         worker_threads = vnf_cfg.get('worker_threads', 3)
193
194         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
195         traffic_options = {
196             'traffic_type': traffic_type,
197             'pkt_type': 'ipv%s' % traffic_type,
198             'vnf_type': self.VNF_TYPE,
199         }
200
201         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202         config_basename = posixpath.basename(self.CFG_CONFIG)
203         script_basename = posixpath.basename(self.CFG_SCRIPT)
204         multiport = MultiPortConfig(self.scenario_helper.topology,
205                                     config_tpl_cfg,
206                                     config_basename,
207                                     self.vnfd_helper,
208                                     self.VNF_TYPE,
209                                     lb_count,
210                                     worker_threads,
211                                     worker_config,
212                                     lb_config,
213                                     self.socket)
214
215         multiport.generate_config()
216         with open(self.CFG_CONFIG) as handle:
217             new_config = handle.read()
218
219         new_config = self._update_traffic_type(new_config, traffic_options)
220         new_config = self._update_packet_type(new_config, traffic_options)
221
222         self.ssh_helper.upload_config_file(config_basename, new_config)
223         self.ssh_helper.upload_config_file(script_basename,
224                                            multiport.generate_script(self.vnfd_helper))
225
226         LOG.info("Provision and start the %s", self.APP_NAME)
227         self._build_pipeline_kwargs()
228         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
229
230     def _build_pipeline_kwargs(self):
231         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232         # count the number of actual ports in the list of pairs
233         # remove duplicate ports
234         # this is really a mapping from LINK ID to DPDK PMD ID
235         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237         ports = self.vnfd_helper.port_pairs.all_ports
238         port_nums = self.vnfd_helper.port_nums(ports)
239         # create mask from all the dpdk port numbers
240         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241         self.pipeline_kwargs = {
242             'cfg_file': self.CFG_CONFIG,
243             'script': self.CFG_SCRIPT,
244             'port_mask_hex': ports_mask_hex,
245             'tool_path': tool_path,
246         }
247
248     def _get_app_cpu(self):
249         if self.CORES:
250             return self.CORES
251
252         vnf_cfg = self.scenario_helper.vnf_cfg
253         sys_obj = CpuSysCores(self.ssh_helper)
254         self.sys_cpu = sys_obj.get_core_socket()
255         num_core = int(vnf_cfg["worker_threads"])
256         if vnf_cfg.get("lb_config", "SW") == 'HW':
257             num_core += self.HW_DEFAULT_CORE
258         else:
259             num_core += self.SW_DEFAULT_CORE
260         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
261         return app_cpu
262
263     def _get_cpu_sibling_list(self, cores=None):
264         if cores is None:
265             cores = self._get_app_cpu()
266         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267         awk_template = "awk -F: '{ print $1 }' < %s"
268         sys_path = "/sys/devices/system/cpu/"
269         cpu_topology = []
270         try:
271             for core in cores:
272                 sys_cmd = sys_cmd_template % (sys_path, core)
273                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
275
276             return cpu_topology
277         except Exception:
278             return []
279
280     def _validate_cpu_cfg(self):
281         return self._get_cpu_sibling_list()
282
283     def setup_vnf_environment(self):
284         self._setup_dpdk()
285         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
286         self.kill_vnf()
287         # bind before _setup_resources so we can use dpdk_port_num
288         self._detect_and_bind_drivers()
289         resource = self._setup_resources()
290         return resource
291
292     def kill_vnf(self):
293         # have to use exact match
294         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
295
296     def _setup_dpdk(self):
297         """ setup dpdk environment needed for vnf to run """
298
299         self._setup_hugepages()
300         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
301
302         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
303         if exit_status == 0:
304             return
305
306         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
307         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
308         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
309         if exit_status != 0:
310             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
311
312     def get_collectd_options(self):
313         options = self.scenario_helper.all_options.get("collectd", {})
314         # override with specific node settings
315         options.update(self.scenario_helper.options.get("collectd", {}))
316         return options
317
318     def _setup_resources(self):
319         # what is this magic?  how do we know which socket is for which port?
320         # what about quad-socket?
321         if any(v[5] == "0" for v in self.bound_pci):
322             self.socket = 0
323         else:
324             self.socket = 1
325
326         cores = self._validate_cpu_cfg()
327         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
328         # this won't work because we don't have DPDK port numbers yet
329         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
330         port_names = (intf["name"] for intf in ports)
331         collectd_options = self.get_collectd_options()
332         plugins = collectd_options.get("plugins", {})
333         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
334                                plugins=plugins, interval=collectd_options.get("interval"))
335
336     def _detect_and_bind_drivers(self):
337         interfaces = self.vnfd_helper.interfaces
338
339         self.dpdk_bind_helper.read_status()
340         self.dpdk_bind_helper.save_used_drivers()
341
342         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
343
344         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
345         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
346             try:
347                 intf = next(v for v in interfaces
348                             if vpci == v['virtual-interface']['vpci'])
349                 # force to int
350                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
351             except:
352                 pass
353         time.sleep(2)
354
355     def get_local_iface_name_by_vpci(self, vpci):
356         find_net_cmd = self.FIND_NET_CMD.format(vpci)
357         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
358         if exit_status == 0:
359             return stdout
360         return None
361
362     def tear_down(self):
363         self.dpdk_bind_helper.rebind_drivers()
364
365
366 class ResourceHelper(object):
367
368     COLLECT_KPI = ''
369     MAKE_INSTALL = 'cd {0} && make && sudo make install'
370     RESOURCE_WORD = 'sample'
371
372     COLLECT_MAP = {}
373
374     def __init__(self, setup_helper):
375         super(ResourceHelper, self).__init__()
376         self.resource = None
377         self.setup_helper = setup_helper
378         self.ssh_helper = setup_helper.ssh_helper
379
380     def setup(self):
381         self.resource = self.setup_helper.setup_vnf_environment()
382
383     def generate_cfg(self):
384         pass
385
386     def _collect_resource_kpi(self):
387         result = {}
388         status = self.resource.check_if_sa_running("collectd")[0]
389         if status:
390             result = self.resource.amqp_collect_nfvi_kpi()
391
392         result = {"core": result}
393         return result
394
395     def start_collect(self):
396         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
397         self.resource.start()
398         self.resource.amqp_process_for_nfvi_kpi()
399
400     def stop_collect(self):
401         if self.resource:
402             self.resource.stop()
403
404     def collect_kpi(self):
405         return self._collect_resource_kpi()
406
407
408 class ClientResourceHelper(ResourceHelper):
409
410     RUN_DURATION = 60
411     QUEUE_WAIT_TIME = 5
412     SYNC_PORT = 1
413     ASYNC_PORT = 2
414
415     def __init__(self, setup_helper):
416         super(ClientResourceHelper, self).__init__(setup_helper)
417         self.vnfd_helper = setup_helper.vnfd_helper
418         self.scenario_helper = setup_helper.scenario_helper
419
420         self.client = None
421         self.client_started = Value('i', 0)
422         self.all_ports = None
423         self._queue = Queue()
424         self._result = {}
425         self._terminated = Value('i', 0)
426
427     def _build_ports(self):
428         self.networks = self.vnfd_helper.port_pairs.networks
429         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
430         self.downlink_ports = \
431             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
432         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
433
434     def get_stats(self, *args, **kwargs):
435         try:
436             return self.client.get_stats(*args, **kwargs)
437         except STLError:
438             LOG.exception("TRex client not connected")
439             return {}
440
441     def generate_samples(self, ports, key=None, default=None):
442         # needs to be used ports
443         last_result = self.get_stats(ports)
444         key_value = last_result.get(key, default)
445
446         if not isinstance(last_result, Mapping):  # added for mock unit test
447             self._terminated.value = 1
448             return {}
449
450         samples = {}
451         # recalculate port for interface and see if it matches ports provided
452         for intf in self.vnfd_helper.interfaces:
453             name = intf["name"]
454             port = self.vnfd_helper.port_num(name)
455             if port in ports:
456                 xe_value = last_result.get(port, {})
457                 samples[name] = {
458                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
459                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
460                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
461                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
462                     "in_packets": int(xe_value.get("ipackets", 0)),
463                     "out_packets": int(xe_value.get("opackets", 0)),
464                 }
465                 if key:
466                     samples[name][key] = key_value
467         return samples
468
469     def _run_traffic_once(self, traffic_profile):
470         traffic_profile.execute_traffic(self)
471         self.client_started.value = 1
472         time.sleep(self.RUN_DURATION)
473         samples = self.generate_samples(traffic_profile.ports)
474         time.sleep(self.QUEUE_WAIT_TIME)
475         self._queue.put(samples)
476
477     def run_traffic(self, traffic_profile):
478         # if we don't do this we can hang waiting for the queue to drain
479         # have to do this in the subprocess
480         self._queue.cancel_join_thread()
481         # fixme: fix passing correct trex config file,
482         # instead of searching the default path
483         try:
484             self._build_ports()
485             self.client = self._connect()
486             self.client.reset(ports=self.all_ports)
487             self.client.remove_all_streams(self.all_ports)  # remove all streams
488             traffic_profile.register_generator(self)
489
490             while self._terminated.value == 0:
491                 self._run_traffic_once(traffic_profile)
492
493             self.client.stop(self.all_ports)
494             self.client.disconnect()
495             self._terminated.value = 0
496         except STLError:
497             if self._terminated.value:
498                 LOG.debug("traffic generator is stopped")
499                 return  # return if trex/tg server is stopped.
500             raise
501
502     def terminate(self):
503         self._terminated.value = 1  # stop client
504
505     def clear_stats(self, ports=None):
506         if ports is None:
507             ports = self.all_ports
508         self.client.clear_stats(ports=ports)
509
510     def start(self, ports=None, *args, **kwargs):
511         if ports is None:
512             ports = self.all_ports
513         self.client.start(ports=ports, *args, **kwargs)
514
515     def collect_kpi(self):
516         if not self._queue.empty():
517             kpi = self._queue.get()
518             self._result.update(kpi)
519             LOG.debug("Got KPIs from _queue for {0} {1}".format(
520                 self.scenario_helper.name, self.RESOURCE_WORD))
521         return self._result
522
523     def _connect(self, client=None):
524         if client is None:
525             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
526                                server=self.vnfd_helper.mgmt_interface["ip"],
527                                verbose_level=LoggerApi.VERBOSE_QUIET)
528
529         # try to connect with 5s intervals, 30s max
530         for idx in range(6):
531             try:
532                 client.connect()
533                 break
534             except STLError:
535                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
536                 time.sleep(5)
537         return client
538
539
540 class Rfc2544ResourceHelper(object):
541
542     DEFAULT_CORRELATED_TRAFFIC = False
543     DEFAULT_LATENCY = False
544     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
545
546     def __init__(self, scenario_helper):
547         super(Rfc2544ResourceHelper, self).__init__()
548         self.scenario_helper = scenario_helper
549         self._correlated_traffic = None
550         self.iteration = Value('i', 0)
551         self._latency = None
552         self._rfc2544 = None
553         self._tolerance_low = None
554         self._tolerance_high = None
555
556     @property
557     def rfc2544(self):
558         if self._rfc2544 is None:
559             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
560         return self._rfc2544
561
562     @property
563     def tolerance_low(self):
564         if self._tolerance_low is None:
565             self.get_rfc_tolerance()
566         return self._tolerance_low
567
568     @property
569     def tolerance_high(self):
570         if self._tolerance_high is None:
571             self.get_rfc_tolerance()
572         return self._tolerance_high
573
574     @property
575     def correlated_traffic(self):
576         if self._correlated_traffic is None:
577             self._correlated_traffic = \
578                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
579
580         return self._correlated_traffic
581
582     @property
583     def latency(self):
584         if self._latency is None:
585             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
586         return self._latency
587
588     def get_rfc2544(self, name, default=None):
589         return self.rfc2544.get(name, default)
590
591     def get_rfc_tolerance(self):
592         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
593         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
594         self._tolerance_low = next(tolerance_iter)
595         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
596
597
598 class SampleVNFDeployHelper(object):
599
600     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
601     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
602     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
603
604     def __init__(self, vnfd_helper, ssh_helper):
605         super(SampleVNFDeployHelper, self).__init__()
606         self.ssh_helper = ssh_helper
607         self.vnfd_helper = vnfd_helper
608
609     DISABLE_DEPLOY = True
610
611     def deploy_vnfs(self, app_name):
612         # temp disable for now
613         if self.DISABLE_DEPLOY:
614             return
615
616         vnf_bin = self.ssh_helper.join_bin_path(app_name)
617         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
618         if not exit_status:
619             return
620
621         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
622         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
623         time.sleep(2)
624         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
625         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
626
627         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
628         time.sleep(2)
629         http_proxy = os.environ.get('http_proxy', '')
630         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
631         LOG.debug(cmd)
632         self.ssh_helper.execute(cmd)
633         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
634         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
635         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
636
637
638 class ScenarioHelper(object):
639
640     DEFAULT_VNF_CFG = {
641         'lb_config': 'SW',
642         'lb_count': 1,
643         'worker_config': '1C/1T',
644         'worker_threads': 1,
645     }
646
647     def __init__(self, name):
648         self.name = name
649         self.scenario_cfg = None
650
651     @property
652     def task_path(self):
653         return self.scenario_cfg['task_path']
654
655     @property
656     def nodes(self):
657         return self.scenario_cfg.get('nodes')
658
659     @property
660     def all_options(self):
661         return self.scenario_cfg.get('options', {})
662
663     @property
664     def options(self):
665         return self.all_options.get(self.name, {})
666
667     @property
668     def vnf_cfg(self):
669         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
670
671     @property
672     def topology(self):
673         return self.scenario_cfg['topology']
674
675
676 class SampleVNF(GenericVNF):
677     """ Class providing file-like API for generic VNF implementation """
678
679     VNF_PROMPT = "pipeline>"
680     WAIT_TIME = 1
681
682     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
683         super(SampleVNF, self).__init__(name, vnfd)
684         self.bin_path = get_nsb_option('bin_path', '')
685
686         self.scenario_helper = ScenarioHelper(self.name)
687         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
688
689         if setup_env_helper_type is None:
690             setup_env_helper_type = SetupEnvHelper
691
692         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
693                                                   self.ssh_helper,
694                                                   self.scenario_helper)
695
696         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
697
698         if resource_helper_type is None:
699             resource_helper_type = ResourceHelper
700
701         self.resource_helper = resource_helper_type(self.setup_helper)
702
703         self.context_cfg = None
704         self.nfvi_context = None
705         self.pipeline_kwargs = {}
706         self.uplink_ports = None
707         self.downlink_ports = None
708         # TODO(esm): make QueueFileWrapper invert-able so that we
709         #            never have to manage the queues
710         self.q_in = Queue()
711         self.q_out = Queue()
712         self.queue_wrapper = None
713         self.run_kwargs = {}
714         self.used_drivers = {}
715         self.vnf_port_pairs = None
716         self._vnf_process = None
717
718     def _build_ports(self):
719         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
720         self.networks = self._port_pairs.networks
721         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
722         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
723         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
724
725     def _get_route_data(self, route_index, route_type):
726         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
727         for _ in range(route_index):
728             next(route_iter, '')
729         return next(route_iter, {}).get(route_type, '')
730
731     def _get_port0localip6(self):
732         return_value = self._get_route_data(0, 'network')
733         LOG.info("_get_port0localip6 : %s", return_value)
734         return return_value
735
736     def _get_port1localip6(self):
737         return_value = self._get_route_data(1, 'network')
738         LOG.info("_get_port1localip6 : %s", return_value)
739         return return_value
740
741     def _get_port0prefixlen6(self):
742         return_value = self._get_route_data(0, 'netmask')
743         LOG.info("_get_port0prefixlen6 : %s", return_value)
744         return return_value
745
746     def _get_port1prefixlen6(self):
747         return_value = self._get_route_data(1, 'netmask')
748         LOG.info("_get_port1prefixlen6 : %s", return_value)
749         return return_value
750
751     def _get_port0gateway6(self):
752         return_value = self._get_route_data(0, 'network')
753         LOG.info("_get_port0gateway6 : %s", return_value)
754         return return_value
755
756     def _get_port1gateway6(self):
757         return_value = self._get_route_data(1, 'network')
758         LOG.info("_get_port1gateway6 : %s", return_value)
759         return return_value
760
761     def _start_vnf(self):
762         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
763         self._vnf_process = Process(target=self._run)
764         self._vnf_process.start()
765
766     def _vnf_up_post(self):
767         pass
768
769     def instantiate(self, scenario_cfg, context_cfg):
770         self.scenario_helper.scenario_cfg = scenario_cfg
771         self.context_cfg = context_cfg
772         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
773         # self.nfvi_context = None
774
775         self.deploy_helper.deploy_vnfs(self.APP_NAME)
776         self.resource_helper.setup()
777         self._start_vnf()
778
779     def wait_for_instantiate(self):
780         buf = []
781         time.sleep(self.WAIT_TIME)  # Give some time for config to load
782         while True:
783             if not self._vnf_process.is_alive():
784                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
785
786             # TODO(esm): move to QueueFileWrapper
787             while self.q_out.qsize() > 0:
788                 buf.append(self.q_out.get())
789                 message = ''.join(buf)
790                 if self.VNF_PROMPT in message:
791                     LOG.info("%s VNF is up and running.", self.APP_NAME)
792                     self._vnf_up_post()
793                     self.queue_wrapper.clear()
794                     self.resource_helper.start_collect()
795                     return self._vnf_process.exitcode
796
797                 if "PANIC" in message:
798                     raise RuntimeError("Error starting %s VNF." %
799                                        self.APP_NAME)
800
801             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
802             time.sleep(1)
803             # Send ENTER to display a new prompt in case the prompt text was corrupted
804             # by other VNF output
805             self.q_in.put('\r\n')
806
807     def _build_run_kwargs(self):
808         self.run_kwargs = {
809             'stdin': self.queue_wrapper,
810             'stdout': self.queue_wrapper,
811             'keep_stdin_open': True,
812             'pty': True,
813         }
814
815     def _build_config(self):
816         return self.setup_helper.build_config()
817
818     def _run(self):
819         # we can't share ssh paramiko objects to force new connection
820         self.ssh_helper.drop_connection()
821         cmd = self._build_config()
822         # kill before starting
823         self.setup_helper.kill_vnf()
824
825         LOG.debug(cmd)
826         self._build_run_kwargs()
827         self.ssh_helper.run(cmd, **self.run_kwargs)
828
829     def vnf_execute(self, cmd, wait_time=2):
830         """ send cmd to vnf process """
831
832         LOG.info("%s command: %s", self.APP_NAME, cmd)
833         self.q_in.put("{}\r\n".format(cmd))
834         time.sleep(wait_time)
835         output = []
836         while self.q_out.qsize() > 0:
837             output.append(self.q_out.get())
838         return "".join(output)
839
840     def _tear_down(self):
841         pass
842
843     def terminate(self):
844         self.vnf_execute("quit")
845         if self._vnf_process:
846             self._vnf_process.terminate()
847         self.setup_helper.kill_vnf()
848         self._tear_down()
849         self.resource_helper.stop_collect()
850
851     def get_stats(self, *args, **kwargs):
852         """
853         Method for checking the statistics
854
855         :return:
856            VNF statistics
857         """
858         cmd = 'p {0} stats'.format(self.APP_WORD)
859         out = self.vnf_execute(cmd)
860         return out
861
862     def collect_kpi(self):
863         stats = self.get_stats()
864         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
865         if m:
866             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
867             result["collect_stats"] = self.resource_helper.collect_kpi()
868         else:
869             result = {
870                 "packets_in": 0,
871                 "packets_fwd": 0,
872                 "packets_dropped": 0,
873             }
874         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
875         return result
876
877
878 class SampleVNFTrafficGen(GenericTrafficGen):
879     """ Class providing file-like API for generic traffic generator """
880
881     APP_NAME = 'Sample'
882     RUN_WAIT = 1
883
884     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
885         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
886         self.bin_path = get_nsb_option('bin_path', '')
887         self.name = "tgen__1"  # name in topology file
888
889         self.scenario_helper = ScenarioHelper(self.name)
890         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
891
892         if setup_env_helper_type is None:
893             setup_env_helper_type = SetupEnvHelper
894
895         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
896                                                   self.ssh_helper,
897                                                   self.scenario_helper)
898
899         if resource_helper_type is None:
900             resource_helper_type = ClientResourceHelper
901
902         self.resource_helper = resource_helper_type(self.setup_helper)
903
904         self.runs_traffic = True
905         self.traffic_finished = False
906         self._tg_process = None
907         self._traffic_process = None
908
909     def _start_server(self):
910         # we can't share ssh paramiko objects to force new connection
911         self.ssh_helper.drop_connection()
912
913     def instantiate(self, scenario_cfg, context_cfg):
914         self.scenario_helper.scenario_cfg = scenario_cfg
915         self.resource_helper.generate_cfg()
916         self.resource_helper.setup()
917
918         LOG.info("Starting %s server...", self.APP_NAME)
919         self._tg_process = Process(target=self._start_server)
920         self._tg_process.start()
921
922     def wait_for_instantiate(self):
923         # overridden by subclasses
924         return self._wait_for_process()
925
926     def _check_status(self):
927         raise NotImplementedError
928
929     def _wait_for_process(self):
930         while True:
931             if not self._tg_process.is_alive():
932                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
933             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
934             time.sleep(1)
935             status = self._check_status()
936             if status == 0:
937                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
938                 return self._tg_process.exitcode
939
940     def _traffic_runner(self, traffic_profile):
941         # always drop connections first thing in new processes
942         # so we don't get paramiko errors
943         self.ssh_helper.drop_connection()
944         LOG.info("Starting %s client...", self.APP_NAME)
945         self.resource_helper.run_traffic(traffic_profile)
946
947     def run_traffic(self, traffic_profile):
948         """ Generate traffic on the wire according to the given params.
949         Method is non-blocking, returns immediately when traffic process
950         is running. Mandatory.
951
952         :param traffic_profile:
953         :return: True/False
954         """
955         self._traffic_process = Process(target=self._traffic_runner,
956                                         args=(traffic_profile,))
957         self._traffic_process.start()
958         # Wait for traffic process to start
959         while self.resource_helper.client_started.value == 0:
960             time.sleep(self.RUN_WAIT)
961             # what if traffic process takes a few seconds to start?
962             if not self._traffic_process.is_alive():
963                 break
964
965         return self._traffic_process.is_alive()
966
967     def listen_traffic(self, traffic_profile):
968         """ Listen to traffic with the given parameters.
969         Method is non-blocking, returns immediately when traffic process
970         is running. Optional.
971
972         :param traffic_profile:
973         :return: True/False
974         """
975         pass
976
977     def verify_traffic(self, traffic_profile):
978         """ Verify captured traffic after it has ended. Optional.
979
980         :param traffic_profile:
981         :return: dict
982         """
983         pass
984
985     def collect_kpi(self):
986         result = self.resource_helper.collect_kpi()
987         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
988         return result
989
990     def terminate(self):
991         """ After this method finishes, all traffic processes should stop. Mandatory.
992
993         :return: True/False
994         """
995         self.traffic_finished = True
996         if self._traffic_process is not None:
997             self._traffic_process.terminate()