Merge "Fix src/dst ip update in ixia configuration"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     CORES = []
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def _get_ports_gateway(self, name):
109         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110         for route in routing_table:
111             if name == route['if']:
112                 return route['gateway']
113         return None
114
115     def build_config(self):
116         raise NotImplementedError
117
118     def setup_vnf_environment(self):
119         pass
120
121     def kill_vnf(self):
122         pass
123
124     def tear_down(self):
125         raise NotImplementedError
126
127
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129
130     APP_NAME = 'DpdkVnf'
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self.socket = None
167         self.used_drivers = None
168         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
169
170     def _setup_hugepages(self):
171         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
173
174         memory_path = \
175             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
177
178         if hugepages == "2048kB":
179             pages = 8192
180         else:
181             pages = 16
182
183         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
184
185     def build_config(self):
186         vnf_cfg = self.scenario_helper.vnf_cfg
187         task_path = self.scenario_helper.task_path
188
189         lb_count = vnf_cfg.get('lb_count', 3)
190         lb_config = vnf_cfg.get('lb_config', 'SW')
191         worker_config = vnf_cfg.get('worker_config', '1C/1T')
192         worker_threads = vnf_cfg.get('worker_threads', 3)
193
194         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
195         traffic_options = {
196             'traffic_type': traffic_type,
197             'pkt_type': 'ipv%s' % traffic_type,
198             'vnf_type': self.VNF_TYPE,
199         }
200
201         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202         config_basename = posixpath.basename(self.CFG_CONFIG)
203         script_basename = posixpath.basename(self.CFG_SCRIPT)
204         multiport = MultiPortConfig(self.scenario_helper.topology,
205                                     config_tpl_cfg,
206                                     config_basename,
207                                     self.vnfd_helper,
208                                     self.VNF_TYPE,
209                                     lb_count,
210                                     worker_threads,
211                                     worker_config,
212                                     lb_config,
213                                     self.socket)
214
215         multiport.generate_config()
216         with open(self.CFG_CONFIG) as handle:
217             new_config = handle.read()
218
219         new_config = self._update_traffic_type(new_config, traffic_options)
220         new_config = self._update_packet_type(new_config, traffic_options)
221
222         self.ssh_helper.upload_config_file(config_basename, new_config)
223         self.ssh_helper.upload_config_file(script_basename,
224                                            multiport.generate_script(self.vnfd_helper))
225
226         LOG.info("Provision and start the %s", self.APP_NAME)
227         self._build_pipeline_kwargs()
228         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
229
230     def _build_pipeline_kwargs(self):
231         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232         # count the number of actual ports in the list of pairs
233         # remove duplicate ports
234         # this is really a mapping from LINK ID to DPDK PMD ID
235         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237         ports = self.vnfd_helper.port_pairs.all_ports
238         port_nums = self.vnfd_helper.port_nums(ports)
239         # create mask from all the dpdk port numbers
240         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241         self.pipeline_kwargs = {
242             'cfg_file': self.CFG_CONFIG,
243             'script': self.CFG_SCRIPT,
244             'port_mask_hex': ports_mask_hex,
245             'tool_path': tool_path,
246         }
247
248     def _get_app_cpu(self):
249         if self.CORES:
250             return self.CORES
251
252         vnf_cfg = self.scenario_helper.vnf_cfg
253         sys_obj = CpuSysCores(self.ssh_helper)
254         self.sys_cpu = sys_obj.get_core_socket()
255         num_core = int(vnf_cfg["worker_threads"])
256         if vnf_cfg.get("lb_config", "SW") == 'HW':
257             num_core += self.HW_DEFAULT_CORE
258         else:
259             num_core += self.SW_DEFAULT_CORE
260         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
261         return app_cpu
262
263     def _get_cpu_sibling_list(self, cores=None):
264         if cores is None:
265             cores = self._get_app_cpu()
266         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267         awk_template = "awk -F: '{ print $1 }' < %s"
268         sys_path = "/sys/devices/system/cpu/"
269         cpu_topology = []
270         try:
271             for core in cores:
272                 sys_cmd = sys_cmd_template % (sys_path, core)
273                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
275
276             return cpu_topology
277         except Exception:
278             return []
279
280     def _validate_cpu_cfg(self):
281         return self._get_cpu_sibling_list()
282
283     def setup_vnf_environment(self):
284         self._setup_dpdk()
285         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
286         self.kill_vnf()
287         # bind before _setup_resources so we can use dpdk_port_num
288         self._detect_and_bind_drivers()
289         resource = self._setup_resources()
290         return resource
291
292     def kill_vnf(self):
293         # have to use exact match
294         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
295
296     def _setup_dpdk(self):
297         """ setup dpdk environment needed for vnf to run """
298
299         self._setup_hugepages()
300         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
301
302         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
303         if exit_status == 0:
304             return
305
306         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
307         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
308         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
309         if exit_status != 0:
310             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
311
312     def get_collectd_options(self):
313         options = self.scenario_helper.all_options.get("collectd", {})
314         # override with specific node settings
315         options.update(self.scenario_helper.options.get("collectd", {}))
316         return options
317
318     def _setup_resources(self):
319         # what is this magic?  how do we know which socket is for which port?
320         # what about quad-socket?
321         if any(v[5] == "0" for v in self.bound_pci):
322             self.socket = 0
323         else:
324             self.socket = 1
325
326         cores = self._validate_cpu_cfg()
327         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
328         # this won't work because we don't have DPDK port numbers yet
329         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
330         port_names = (intf["name"] for intf in ports)
331         collectd_options = self.get_collectd_options()
332         plugins = collectd_options.get("plugins", {})
333         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
334                                plugins=plugins, interval=collectd_options.get("interval"))
335
336     def _detect_and_bind_drivers(self):
337         interfaces = self.vnfd_helper.interfaces
338
339         self.dpdk_bind_helper.read_status()
340         self.dpdk_bind_helper.save_used_drivers()
341
342         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
343
344         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
345         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
346             try:
347                 intf = next(v for v in interfaces
348                             if vpci == v['virtual-interface']['vpci'])
349                 # force to int
350                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
351             except:
352                 pass
353         time.sleep(2)
354
355     def get_local_iface_name_by_vpci(self, vpci):
356         find_net_cmd = self.FIND_NET_CMD.format(vpci)
357         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
358         if exit_status == 0:
359             return stdout
360         return None
361
362     def tear_down(self):
363         self.dpdk_bind_helper.rebind_drivers()
364
365
366 class ResourceHelper(object):
367
368     COLLECT_KPI = ''
369     MAKE_INSTALL = 'cd {0} && make && sudo make install'
370     RESOURCE_WORD = 'sample'
371
372     COLLECT_MAP = {}
373
374     def __init__(self, setup_helper):
375         super(ResourceHelper, self).__init__()
376         self.resource = None
377         self.setup_helper = setup_helper
378         self.ssh_helper = setup_helper.ssh_helper
379
380     def setup(self):
381         self.resource = self.setup_helper.setup_vnf_environment()
382
383     def generate_cfg(self):
384         pass
385
386     def _collect_resource_kpi(self):
387         result = {}
388         status = self.resource.check_if_sa_running("collectd")[0]
389         if status:
390             result = self.resource.amqp_collect_nfvi_kpi()
391
392         result = {"core": result}
393         return result
394
395     def start_collect(self):
396         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
397         self.resource.start()
398         self.resource.amqp_process_for_nfvi_kpi()
399
400     def stop_collect(self):
401         if self.resource:
402             self.resource.stop()
403
404     def collect_kpi(self):
405         return self._collect_resource_kpi()
406
407
408 class ClientResourceHelper(ResourceHelper):
409
410     RUN_DURATION = 60
411     QUEUE_WAIT_TIME = 5
412     SYNC_PORT = 1
413     ASYNC_PORT = 2
414
415     def __init__(self, setup_helper):
416         super(ClientResourceHelper, self).__init__(setup_helper)
417         self.vnfd_helper = setup_helper.vnfd_helper
418         self.scenario_helper = setup_helper.scenario_helper
419
420         self.client = None
421         self.client_started = Value('i', 0)
422         self.all_ports = None
423         self._queue = Queue()
424         self._result = {}
425         self._terminated = Value('i', 0)
426         self._vpci_ascending = None
427
428     def _build_ports(self):
429         self.networks = self.vnfd_helper.port_pairs.networks
430         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
431         self.downlink_ports = \
432             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
433         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
434
435     def get_stats(self, *args, **kwargs):
436         try:
437             return self.client.get_stats(*args, **kwargs)
438         except STLError:
439             LOG.exception("TRex client not connected")
440             return {}
441
442     def generate_samples(self, ports, key=None, default=None):
443         # needs to be used ports
444         last_result = self.get_stats(ports)
445         key_value = last_result.get(key, default)
446
447         if not isinstance(last_result, Mapping):  # added for mock unit test
448             self._terminated.value = 1
449             return {}
450
451         samples = {}
452         # recalculate port for interface and see if it matches ports provided
453         for intf in self.vnfd_helper.interfaces:
454             name = intf["name"]
455             port = self.vnfd_helper.port_num(name)
456             if port in ports:
457                 xe_value = last_result.get(port, {})
458                 samples[name] = {
459                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
460                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
461                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
462                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
463                     "in_packets": int(xe_value.get("ipackets", 0)),
464                     "out_packets": int(xe_value.get("opackets", 0)),
465                 }
466                 if key:
467                     samples[name][key] = key_value
468         return samples
469
470     def _run_traffic_once(self, traffic_profile):
471         traffic_profile.execute_traffic(self)
472         self.client_started.value = 1
473         time.sleep(self.RUN_DURATION)
474         samples = self.generate_samples(traffic_profile.ports)
475         time.sleep(self.QUEUE_WAIT_TIME)
476         self._queue.put(samples)
477
478     def run_traffic(self, traffic_profile):
479         # if we don't do this we can hang waiting for the queue to drain
480         # have to do this in the subprocess
481         self._queue.cancel_join_thread()
482         # fixme: fix passing correct trex config file,
483         # instead of searching the default path
484         try:
485             self._build_ports()
486             self.client = self._connect()
487             self.client.reset(ports=self.all_ports)
488             self.client.remove_all_streams(self.all_ports)  # remove all streams
489             traffic_profile.register_generator(self)
490
491             while self._terminated.value == 0:
492                 self._run_traffic_once(traffic_profile)
493
494             self.client.stop(self.all_ports)
495             self.client.disconnect()
496             self._terminated.value = 0
497         except STLError:
498             if self._terminated.value:
499                 LOG.debug("traffic generator is stopped")
500                 return  # return if trex/tg server is stopped.
501             raise
502
503     def terminate(self):
504         self._terminated.value = 1  # stop client
505
506     def clear_stats(self, ports=None):
507         if ports is None:
508             ports = self.all_ports
509         self.client.clear_stats(ports=ports)
510
511     def start(self, ports=None, *args, **kwargs):
512         if ports is None:
513             ports = self.all_ports
514         self.client.start(ports=ports, *args, **kwargs)
515
516     def collect_kpi(self):
517         if not self._queue.empty():
518             kpi = self._queue.get()
519             self._result.update(kpi)
520             LOG.debug("Got KPIs from _queue for {0} {1}".format(
521                 self.scenario_helper.name, self.RESOURCE_WORD))
522         return self._result
523
524     def _connect(self, client=None):
525         if client is None:
526             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
527                                server=self.vnfd_helper.mgmt_interface["ip"],
528                                verbose_level=LoggerApi.VERBOSE_QUIET)
529
530         # try to connect with 5s intervals, 30s max
531         for idx in range(6):
532             try:
533                 client.connect()
534                 break
535             except STLError:
536                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
537                 time.sleep(5)
538         return client
539
540
541 class Rfc2544ResourceHelper(object):
542
543     DEFAULT_CORRELATED_TRAFFIC = False
544     DEFAULT_LATENCY = False
545     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
546
547     def __init__(self, scenario_helper):
548         super(Rfc2544ResourceHelper, self).__init__()
549         self.scenario_helper = scenario_helper
550         self._correlated_traffic = None
551         self.iteration = Value('i', 0)
552         self._latency = None
553         self._rfc2544 = None
554         self._tolerance_low = None
555         self._tolerance_high = None
556
557     @property
558     def rfc2544(self):
559         if self._rfc2544 is None:
560             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
561         return self._rfc2544
562
563     @property
564     def tolerance_low(self):
565         if self._tolerance_low is None:
566             self.get_rfc_tolerance()
567         return self._tolerance_low
568
569     @property
570     def tolerance_high(self):
571         if self._tolerance_high is None:
572             self.get_rfc_tolerance()
573         return self._tolerance_high
574
575     @property
576     def correlated_traffic(self):
577         if self._correlated_traffic is None:
578             self._correlated_traffic = \
579                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
580
581         return self._correlated_traffic
582
583     @property
584     def latency(self):
585         if self._latency is None:
586             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
587         return self._latency
588
589     def get_rfc2544(self, name, default=None):
590         return self.rfc2544.get(name, default)
591
592     def get_rfc_tolerance(self):
593         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
594         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
595         self._tolerance_low = next(tolerance_iter)
596         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
597
598
599 class SampleVNFDeployHelper(object):
600
601     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
602     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
603     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
604
605     def __init__(self, vnfd_helper, ssh_helper):
606         super(SampleVNFDeployHelper, self).__init__()
607         self.ssh_helper = ssh_helper
608         self.vnfd_helper = vnfd_helper
609
610     DISABLE_DEPLOY = True
611
612     def deploy_vnfs(self, app_name):
613         # temp disable for now
614         if self.DISABLE_DEPLOY:
615             return
616
617         vnf_bin = self.ssh_helper.join_bin_path(app_name)
618         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
619         if not exit_status:
620             return
621
622         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
623         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
624         time.sleep(2)
625         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
626         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
627
628         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
629         time.sleep(2)
630         http_proxy = os.environ.get('http_proxy', '')
631         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
632         LOG.debug(cmd)
633         self.ssh_helper.execute(cmd)
634         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
635         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
636         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
637
638
639 class ScenarioHelper(object):
640
641     DEFAULT_VNF_CFG = {
642         'lb_config': 'SW',
643         'lb_count': 1,
644         'worker_config': '1C/1T',
645         'worker_threads': 1,
646     }
647
648     def __init__(self, name):
649         self.name = name
650         self.scenario_cfg = None
651
652     @property
653     def task_path(self):
654         return self.scenario_cfg['task_path']
655
656     @property
657     def nodes(self):
658         return self.scenario_cfg.get('nodes')
659
660     @property
661     def all_options(self):
662         return self.scenario_cfg.get('options', {})
663
664     @property
665     def options(self):
666         return self.all_options.get(self.name, {})
667
668     @property
669     def vnf_cfg(self):
670         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
671
672     @property
673     def topology(self):
674         return self.scenario_cfg['topology']
675
676
677 class SampleVNF(GenericVNF):
678     """ Class providing file-like API for generic VNF implementation """
679
680     VNF_PROMPT = "pipeline>"
681     WAIT_TIME = 1
682
683     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
684         super(SampleVNF, self).__init__(name, vnfd)
685         self.bin_path = get_nsb_option('bin_path', '')
686
687         self.scenario_helper = ScenarioHelper(self.name)
688         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
689
690         if setup_env_helper_type is None:
691             setup_env_helper_type = SetupEnvHelper
692
693         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
694                                                   self.ssh_helper,
695                                                   self.scenario_helper)
696
697         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
698
699         if resource_helper_type is None:
700             resource_helper_type = ResourceHelper
701
702         self.resource_helper = resource_helper_type(self.setup_helper)
703
704         self.context_cfg = None
705         self.nfvi_context = None
706         self.pipeline_kwargs = {}
707         self.uplink_ports = None
708         self.downlink_ports = None
709         # TODO(esm): make QueueFileWrapper invert-able so that we
710         #            never have to manage the queues
711         self.q_in = Queue()
712         self.q_out = Queue()
713         self.queue_wrapper = None
714         self.run_kwargs = {}
715         self.used_drivers = {}
716         self.vnf_port_pairs = None
717         self._vnf_process = None
718
719     def _build_ports(self):
720         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
721         self.networks = self._port_pairs.networks
722         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
723         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
724         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
725
726     def _get_route_data(self, route_index, route_type):
727         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
728         for _ in range(route_index):
729             next(route_iter, '')
730         return next(route_iter, {}).get(route_type, '')
731
732     def _get_port0localip6(self):
733         return_value = self._get_route_data(0, 'network')
734         LOG.info("_get_port0localip6 : %s", return_value)
735         return return_value
736
737     def _get_port1localip6(self):
738         return_value = self._get_route_data(1, 'network')
739         LOG.info("_get_port1localip6 : %s", return_value)
740         return return_value
741
742     def _get_port0prefixlen6(self):
743         return_value = self._get_route_data(0, 'netmask')
744         LOG.info("_get_port0prefixlen6 : %s", return_value)
745         return return_value
746
747     def _get_port1prefixlen6(self):
748         return_value = self._get_route_data(1, 'netmask')
749         LOG.info("_get_port1prefixlen6 : %s", return_value)
750         return return_value
751
752     def _get_port0gateway6(self):
753         return_value = self._get_route_data(0, 'network')
754         LOG.info("_get_port0gateway6 : %s", return_value)
755         return return_value
756
757     def _get_port1gateway6(self):
758         return_value = self._get_route_data(1, 'network')
759         LOG.info("_get_port1gateway6 : %s", return_value)
760         return return_value
761
762     def _start_vnf(self):
763         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
764         self._vnf_process = Process(target=self._run)
765         self._vnf_process.start()
766
767     def _vnf_up_post(self):
768         pass
769
770     def instantiate(self, scenario_cfg, context_cfg):
771         self.scenario_helper.scenario_cfg = scenario_cfg
772         self.context_cfg = context_cfg
773         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
774         # self.nfvi_context = None
775
776         self.deploy_helper.deploy_vnfs(self.APP_NAME)
777         self.resource_helper.setup()
778         self._start_vnf()
779
780     def wait_for_instantiate(self):
781         buf = []
782         time.sleep(self.WAIT_TIME)  # Give some time for config to load
783         while True:
784             if not self._vnf_process.is_alive():
785                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
786
787             # TODO(esm): move to QueueFileWrapper
788             while self.q_out.qsize() > 0:
789                 buf.append(self.q_out.get())
790                 message = ''.join(buf)
791                 if self.VNF_PROMPT in message:
792                     LOG.info("%s VNF is up and running.", self.APP_NAME)
793                     self._vnf_up_post()
794                     self.queue_wrapper.clear()
795                     self.resource_helper.start_collect()
796                     return self._vnf_process.exitcode
797
798                 if "PANIC" in message:
799                     raise RuntimeError("Error starting %s VNF." %
800                                        self.APP_NAME)
801
802             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
803             time.sleep(1)
804             # Send ENTER to display a new prompt in case the prompt text was corrupted
805             # by other VNF output
806             self.q_in.put('\r\n')
807
808     def _build_run_kwargs(self):
809         self.run_kwargs = {
810             'stdin': self.queue_wrapper,
811             'stdout': self.queue_wrapper,
812             'keep_stdin_open': True,
813             'pty': True,
814         }
815
816     def _build_config(self):
817         return self.setup_helper.build_config()
818
819     def _run(self):
820         # we can't share ssh paramiko objects to force new connection
821         self.ssh_helper.drop_connection()
822         cmd = self._build_config()
823         # kill before starting
824         self.setup_helper.kill_vnf()
825
826         LOG.debug(cmd)
827         self._build_run_kwargs()
828         self.ssh_helper.run(cmd, **self.run_kwargs)
829
830     def vnf_execute(self, cmd, wait_time=2):
831         """ send cmd to vnf process """
832
833         LOG.info("%s command: %s", self.APP_NAME, cmd)
834         self.q_in.put("{}\r\n".format(cmd))
835         time.sleep(wait_time)
836         output = []
837         while self.q_out.qsize() > 0:
838             output.append(self.q_out.get())
839         return "".join(output)
840
841     def _tear_down(self):
842         pass
843
844     def terminate(self):
845         self.vnf_execute("quit")
846         if self._vnf_process:
847             self._vnf_process.terminate()
848         self.setup_helper.kill_vnf()
849         self._tear_down()
850         self.resource_helper.stop_collect()
851
852     def get_stats(self, *args, **kwargs):
853         """
854         Method for checking the statistics
855
856         :return:
857            VNF statistics
858         """
859         cmd = 'p {0} stats'.format(self.APP_WORD)
860         out = self.vnf_execute(cmd)
861         return out
862
863     def collect_kpi(self):
864         stats = self.get_stats()
865         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
866         if m:
867             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
868             result["collect_stats"] = self.resource_helper.collect_kpi()
869         else:
870             result = {
871                 "packets_in": 0,
872                 "packets_fwd": 0,
873                 "packets_dropped": 0,
874             }
875         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
876         return result
877
878
879 class SampleVNFTrafficGen(GenericTrafficGen):
880     """ Class providing file-like API for generic traffic generator """
881
882     APP_NAME = 'Sample'
883     RUN_WAIT = 1
884
885     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
886         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
887         self.bin_path = get_nsb_option('bin_path', '')
888         self.name = "tgen__1"  # name in topology file
889
890         self.scenario_helper = ScenarioHelper(self.name)
891         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
892
893         if setup_env_helper_type is None:
894             setup_env_helper_type = SetupEnvHelper
895
896         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
897                                                   self.ssh_helper,
898                                                   self.scenario_helper)
899
900         if resource_helper_type is None:
901             resource_helper_type = ClientResourceHelper
902
903         self.resource_helper = resource_helper_type(self.setup_helper)
904
905         self.runs_traffic = True
906         self.traffic_finished = False
907         self._tg_process = None
908         self._traffic_process = None
909
910     def _start_server(self):
911         # we can't share ssh paramiko objects to force new connection
912         self.ssh_helper.drop_connection()
913
914     def instantiate(self, scenario_cfg, context_cfg):
915         self.scenario_helper.scenario_cfg = scenario_cfg
916         self.resource_helper.generate_cfg()
917         self.resource_helper.setup()
918
919         LOG.info("Starting %s server...", self.APP_NAME)
920         self._tg_process = Process(target=self._start_server)
921         self._tg_process.start()
922
923     def wait_for_instantiate(self):
924         # overridden by subclasses
925         return self._wait_for_process()
926
927     def _check_status(self):
928         raise NotImplementedError
929
930     def _wait_for_process(self):
931         while True:
932             if not self._tg_process.is_alive():
933                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
934             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
935             time.sleep(1)
936             status = self._check_status()
937             if status == 0:
938                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
939                 return self._tg_process.exitcode
940
941     def _traffic_runner(self, traffic_profile):
942         # always drop connections first thing in new processes
943         # so we don't get paramiko errors
944         self.ssh_helper.drop_connection()
945         LOG.info("Starting %s client...", self.APP_NAME)
946         self.resource_helper.run_traffic(traffic_profile)
947
948     def run_traffic(self, traffic_profile):
949         """ Generate traffic on the wire according to the given params.
950         Method is non-blocking, returns immediately when traffic process
951         is running. Mandatory.
952
953         :param traffic_profile:
954         :return: True/False
955         """
956         self._traffic_process = Process(target=self._traffic_runner,
957                                         args=(traffic_profile,))
958         self._traffic_process.start()
959         # Wait for traffic process to start
960         while self.resource_helper.client_started.value == 0:
961             time.sleep(self.RUN_WAIT)
962             # what if traffic process takes a few seconds to start?
963             if not self._traffic_process.is_alive():
964                 break
965
966         return self._traffic_process.is_alive()
967
968     def listen_traffic(self, traffic_profile):
969         """ Listen to traffic with the given parameters.
970         Method is non-blocking, returns immediately when traffic process
971         is running. Optional.
972
973         :param traffic_profile:
974         :return: True/False
975         """
976         pass
977
978     def verify_traffic(self, traffic_profile):
979         """ Verify captured traffic after it has ended. Optional.
980
981         :param traffic_profile:
982         :return: dict
983         """
984         pass
985
986     def collect_kpi(self):
987         result = self.resource_helper.collect_kpi()
988         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
989         return result
990
991     def terminate(self):
992         """ After this method finishes, all traffic processes should stop. Mandatory.
993
994         :return: True/False
995         """
996         self.traffic_finished = True
997         if self._traffic_process is not None:
998             self._traffic_process.terminate()