Merge "Adding latency test for vfw"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
39
40 from trex_stl_lib.trex_stl_client import STLClient
41 from trex_stl_lib.trex_stl_client import LoggerApi
42 from trex_stl_lib.trex_stl_exceptions import STLError
43
44 from yardstick.ssh import AutoConnectSSH
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     CORES = []
96     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
97     PIPELINE_COMMAND = ''
98     VNF_TYPE = "SAMPLE"
99
100     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101         super(SetupEnvHelper, self).__init__()
102         self.vnfd_helper = vnfd_helper
103         self.ssh_helper = ssh_helper
104         self.scenario_helper = scenario_helper
105
106     def _get_ports_gateway(self, name):
107         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108         for route in routing_table:
109             if name == route['if']:
110                 return route['gateway']
111         return None
112
113     def build_config(self):
114         raise NotImplementedError
115
116     def setup_vnf_environment(self):
117         pass
118
119     def kill_vnf(self):
120         pass
121
122     def tear_down(self):
123         raise NotImplementedError
124
125
126 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
127
128     APP_NAME = 'DpdkVnf'
129     DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
130     DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
137
138     @staticmethod
139     def _update_packet_type(ip_pipeline_cfg, traffic_options):
140         match_str = 'pkt_type = ipv4'
141         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
142         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
143         return pipeline_config_str
144
145     @classmethod
146     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
147         traffic_type = traffic_options['traffic_type']
148
149         if traffic_options['vnf_type'] is not cls.APP_NAME:
150             match_str = 'traffic_type = 4'
151             replace_str = 'traffic_type = {0}'.format(traffic_type)
152
153         elif traffic_type == 4:
154             match_str = 'pkt_type = ipv4'
155             replace_str = 'pkt_type = ipv4'
156
157         else:
158             match_str = 'pkt_type = ipv4'
159             replace_str = 'pkt_type = ipv6'
160
161         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
162         return pipeline_config_str
163
164     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
165         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
166         self.all_ports = None
167         self.bound_pci = None
168         self._dpdk_nic_bind = None
169         self.socket = None
170         self.used_drivers = None
171
172     @property
173     def dpdk_nic_bind(self):
174         if self._dpdk_nic_bind is None:
175             self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
176         return self._dpdk_nic_bind
177
178     def _setup_hugepages(self):
179         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
180         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
181
182         memory_path = \
183             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
184         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
185
186         if hugepages == "2048kB":
187             pages = 8192
188         else:
189             pages = 16
190
191         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
192
193     def _get_dpdk_port_num(self, name):
194         interface = self.vnfd_helper.find_interface(name=name)
195         return interface['virtual-interface']['dpdk_port_num']
196
197     def build_config(self):
198         vnf_cfg = self.scenario_helper.vnf_cfg
199         task_path = self.scenario_helper.task_path
200
201         lb_count = vnf_cfg.get('lb_count', 3)
202         lb_config = vnf_cfg.get('lb_config', 'SW')
203         worker_config = vnf_cfg.get('worker_config', '1C/1T')
204         worker_threads = vnf_cfg.get('worker_threads', 3)
205
206         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
207         traffic_options = {
208             'traffic_type': traffic_type,
209             'pkt_type': 'ipv%s' % traffic_type,
210             'vnf_type': self.VNF_TYPE,
211         }
212
213         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
214         config_basename = posixpath.basename(self.CFG_CONFIG)
215         script_basename = posixpath.basename(self.CFG_SCRIPT)
216         multiport = MultiPortConfig(self.scenario_helper.topology,
217                                     config_tpl_cfg,
218                                     config_basename,
219                                     self.vnfd_helper.interfaces,
220                                     self.VNF_TYPE,
221                                     lb_count,
222                                     worker_threads,
223                                     worker_config,
224                                     lb_config,
225                                     self.socket)
226
227         multiport.generate_config()
228         with open(self.CFG_CONFIG) as handle:
229             new_config = handle.read()
230
231         new_config = self._update_traffic_type(new_config, traffic_options)
232         new_config = self._update_packet_type(new_config, traffic_options)
233
234         self.ssh_helper.upload_config_file(config_basename, new_config)
235         self.ssh_helper.upload_config_file(script_basename,
236                                            multiport.generate_script(self.vnfd_helper))
237         self.all_ports = multiport.port_pair_list
238
239         LOG.info("Provision and start the %s", self.APP_NAME)
240         self._build_pipeline_kwargs()
241         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
242
243     def _build_pipeline_kwargs(self):
244         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
245         ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
246         self.pipeline_kwargs = {
247             'cfg_file': self.CFG_CONFIG,
248             'script': self.CFG_SCRIPT,
249             'ports_len_hex': ports_len_hex,
250             'tool_path': tool_path,
251         }
252
253     def _get_app_cpu(self):
254         if self.CORES:
255             return self.CORES
256
257         vnf_cfg = self.scenario_helper.vnf_cfg
258         sys_obj = CpuSysCores(self.ssh_helper)
259         self.sys_cpu = sys_obj.get_core_socket()
260         num_core = int(vnf_cfg["worker_threads"])
261         if vnf_cfg.get("lb_config", "SW") == 'HW':
262             num_core += self.HW_DEFAULT_CORE
263         else:
264             num_core += self.SW_DEFAULT_CORE
265         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
266         return app_cpu
267
268     def _get_cpu_sibling_list(self, cores=None):
269         if cores is None:
270             cores = self._get_app_cpu()
271         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
272         awk_template = "awk -F: '{ print $1 }' < %s"
273         sys_path = "/sys/devices/system/cpu/"
274         cpu_topology = []
275         try:
276             for core in cores:
277                 sys_cmd = sys_cmd_template % (sys_path, core)
278                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
279                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
280
281             return cpu_topology
282         except Exception:
283             return []
284
285     def _validate_cpu_cfg(self):
286         return self._get_cpu_sibling_list()
287
288     def _find_used_drivers(self):
289         cmd = "{0} -s".format(self.dpdk_nic_bind)
290         rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
291
292         self.used_drivers = {
293             vpci: (index, driver)
294             for index, (vpci, driver)
295             in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
296             if any(b.endswith(vpci) for b in self.bound_pci)
297         }
298
299     def setup_vnf_environment(self):
300         self._setup_dpdk()
301         resource = self._setup_resources()
302         self.kill_vnf()
303         self._detect_and_bind_drivers()
304         return resource
305
306     def kill_vnf(self):
307         # have to use exact match
308         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
309
310     def _setup_dpdk(self):
311         """ setup dpdk environment needed for vnf to run """
312
313         self._setup_hugepages()
314         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
315
316         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
317         if exit_status == 0:
318             return
319
320         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
321         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
322         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
323         if exit_status != 0:
324             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
325
326     def _setup_resources(self):
327         interfaces = self.vnfd_helper.interfaces
328         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
329
330         # what is this magic?  how do we know which socket is for which port?
331         # what about quad-socket?
332         if any(v[5] == "0" for v in self.bound_pci):
333             self.socket = 0
334         else:
335             self.socket = 1
336
337         cores = self._validate_cpu_cfg()
338         return ResourceProfile(self.vnfd_helper.mgmt_interface,
339                                interfaces=self.vnfd_helper.interfaces, cores=cores)
340
341     def _detect_and_bind_drivers(self):
342         interfaces = self.vnfd_helper.interfaces
343
344         self._find_used_drivers()
345         for vpci, (index, _) in self.used_drivers.items():
346             try:
347                 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
348             except StopIteration:
349                 pass
350             else:
351                 intf1['dpdk_port_num'] = index
352
353         for vpci in self.bound_pci:
354             self._bind_dpdk('igb_uio', vpci)
355             time.sleep(2)
356
357         # debug dump after binding
358         self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind))
359
360     def rebind_drivers(self, force=True):
361         if not self.used_drivers:
362             self._find_used_drivers()
363         for vpci, (_, driver) in self.used_drivers.items():
364             self._bind_dpdk(driver, vpci, force)
365
366     def _bind_dpdk(self, driver, vpci, force=True):
367         if force:
368             force = '--force '
369         else:
370             force = ''
371         cmd = self.DPDK_BIND_CMD.format(force=force,
372                                         dpdk_nic_bind=self.dpdk_nic_bind,
373                                         driver=driver,
374                                         vpci=vpci)
375         self.ssh_helper.execute(cmd)
376
377     def _detect_and_bind_dpdk(self, vpci, driver):
378         find_net_cmd = self.FIND_NET_CMD.format(vpci)
379         exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
380         if exit_status == 0:
381             # already bound
382             return None
383         self._bind_dpdk(driver, vpci)
384         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
385         if exit_status != 0:
386             # failed to bind
387             return None
388         return stdout
389
390     def _bind_kernel_devices(self):
391         # only used by PingSetupEnvHelper?
392         for intf in self.vnfd_helper.interfaces:
393             vi = intf["virtual-interface"]
394             stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
395             if stdout is not None:
396                 vi["local_iface_name"] = posixpath.basename(stdout)
397
398     def tear_down(self):
399         for vpci, (_, driver) in self.used_drivers.items():
400             self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
401                                                                 driver=driver,
402                                                                 vpci=vpci))
403
404
405 class ResourceHelper(object):
406
407     COLLECT_KPI = ''
408     MAKE_INSTALL = 'cd {0} && make && sudo make install'
409     RESOURCE_WORD = 'sample'
410
411     COLLECT_MAP = {}
412
413     def __init__(self, setup_helper):
414         super(ResourceHelper, self).__init__()
415         self.resource = None
416         self.setup_helper = setup_helper
417         self.ssh_helper = setup_helper.ssh_helper
418
419     def setup(self):
420         self.resource = self.setup_helper.setup_vnf_environment()
421
422     def generate_cfg(self):
423         pass
424
425     def _collect_resource_kpi(self):
426         result = {}
427         status = self.resource.check_if_sa_running("collectd")[0]
428         if status:
429             result = self.resource.amqp_collect_nfvi_kpi()
430
431         result = {"core": result}
432         return result
433
434     def start_collect(self):
435         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
436         self.resource.start()
437         self.resource.amqp_process_for_nfvi_kpi()
438
439     def stop_collect(self):
440         if self.resource:
441             self.resource.stop()
442
443     def collect_kpi(self):
444         return self._collect_resource_kpi()
445
446
447 class ClientResourceHelper(ResourceHelper):
448
449     RUN_DURATION = 60
450     QUEUE_WAIT_TIME = 5
451     SYNC_PORT = 1
452     ASYNC_PORT = 2
453
454     def __init__(self, setup_helper):
455         super(ClientResourceHelper, self).__init__(setup_helper)
456         self.vnfd_helper = setup_helper.vnfd_helper
457         self.scenario_helper = setup_helper.scenario_helper
458
459         self.client = None
460         self.client_started = Value('i', 0)
461         self.my_ports = None
462         self._queue = Queue()
463         self._result = {}
464         self._terminated = Value('i', 0)
465         self._vpci_ascending = None
466
467     def _build_ports(self):
468         self.my_ports = [0, 1]
469
470     def get_stats(self, *args, **kwargs):
471         try:
472             return self.client.get_stats(*args, **kwargs)
473         except STLError:
474             LOG.exception("TRex client not connected")
475             return {}
476
477     def generate_samples(self, key=None, default=None):
478         last_result = self.get_stats(self.my_ports)
479         key_value = last_result.get(key, default)
480
481         if not isinstance(last_result, Mapping):  # added for mock unit test
482             self._terminated.value = 1
483             return {}
484
485         samples = {}
486         for vpci_idx, vpci in enumerate(self._vpci_ascending):
487             name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
488             # fixme: VNFDs KPIs values needs to be mapped to TRex structure
489             xe_value = last_result.get(vpci_idx, {})
490             samples[name] = {
491                 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
492                 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
493                 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
494                 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
495                 "in_packets": int(xe_value.get("ipackets", 0)),
496                 "out_packets": int(xe_value.get("opackets", 0)),
497             }
498             if key:
499                 samples[name][key] = key_value
500         return samples
501
502     def _run_traffic_once(self, traffic_profile):
503         traffic_profile.execute(self)
504         self.client_started.value = 1
505         time.sleep(self.RUN_DURATION)
506         samples = self.generate_samples()
507         time.sleep(self.QUEUE_WAIT_TIME)
508         self._queue.put(samples)
509
510     def run_traffic(self, traffic_profile):
511         # fixme: fix passing correct trex config file,
512         # instead of searching the default path
513         try:
514             self._build_ports()
515             self.client = self._connect()
516             self.client.reset(ports=self.my_ports)
517             self.client.remove_all_streams(self.my_ports)  # remove all streams
518             traffic_profile.register_generator(self)
519
520             while self._terminated.value == 0:
521                 self._run_traffic_once(traffic_profile)
522
523             self.client.stop(self.my_ports)
524             self.client.disconnect()
525             self._terminated.value = 0
526         except STLError:
527             if self._terminated.value:
528                 LOG.debug("traffic generator is stopped")
529                 return  # return if trex/tg server is stopped.
530             raise
531
532     def terminate(self):
533         self._terminated.value = 1  # stop client
534
535     def clear_stats(self, ports=None):
536         if ports is None:
537             ports = self.my_ports
538         self.client.clear_stats(ports=ports)
539
540     def start(self, ports=None, *args, **kwargs):
541         if ports is None:
542             ports = self.my_ports
543         self.client.start(ports=ports, *args, **kwargs)
544
545     def collect_kpi(self):
546         if not self._queue.empty():
547             kpi = self._queue.get()
548             self._result.update(kpi)
549             LOG.debug("Got KPIs from _queue for {0} {1}".format(
550                 self.scenario_helper.name, self.RESOURCE_WORD))
551         return self._result
552
553     def _connect(self, client=None):
554         if client is None:
555             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
556                                server=self.vnfd_helper.mgmt_interface["ip"],
557                                verbose_level=LoggerApi.VERBOSE_QUIET)
558
559         # try to connect with 5s intervals, 30s max
560         for idx in range(6):
561             try:
562                 client.connect()
563                 break
564             except STLError:
565                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
566                 time.sleep(5)
567         return client
568
569
570 class Rfc2544ResourceHelper(object):
571
572     DEFAULT_CORRELATED_TRAFFIC = False
573     DEFAULT_LATENCY = False
574     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
575
576     def __init__(self, scenario_helper):
577         super(Rfc2544ResourceHelper, self).__init__()
578         self.scenario_helper = scenario_helper
579         self._correlated_traffic = None
580         self.iteration = Value('i', 0)
581         self._latency = None
582         self._rfc2544 = None
583         self._tolerance_low = None
584         self._tolerance_high = None
585
586     @property
587     def rfc2544(self):
588         if self._rfc2544 is None:
589             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
590         return self._rfc2544
591
592     @property
593     def tolerance_low(self):
594         if self._tolerance_low is None:
595             self.get_rfc_tolerance()
596         return self._tolerance_low
597
598     @property
599     def tolerance_high(self):
600         if self._tolerance_high is None:
601             self.get_rfc_tolerance()
602         return self._tolerance_high
603
604     @property
605     def correlated_traffic(self):
606         if self._correlated_traffic is None:
607             self._correlated_traffic = \
608                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
609
610         return self._correlated_traffic
611
612     @property
613     def latency(self):
614         if self._latency is None:
615             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
616         return self._latency
617
618     def get_rfc2544(self, name, default=None):
619         return self.rfc2544.get(name, default)
620
621     def get_rfc_tolerance(self):
622         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
623         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
624         self._tolerance_low = next(tolerance_iter)
625         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
626
627
628 class SampleVNFDeployHelper(object):
629
630     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
631     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
632     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
633
634     def __init__(self, vnfd_helper, ssh_helper):
635         super(SampleVNFDeployHelper, self).__init__()
636         self.ssh_helper = ssh_helper
637         self.vnfd_helper = vnfd_helper
638
639     DISABLE_DEPLOY = True
640
641     def deploy_vnfs(self, app_name):
642         # temp disable for now
643         if self.DISABLE_DEPLOY:
644             return
645
646         vnf_bin = self.ssh_helper.join_bin_path(app_name)
647         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
648         if not exit_status:
649             return
650
651         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
652         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
653         time.sleep(2)
654         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
655         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
656
657         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
658         time.sleep(2)
659         http_proxy = os.environ.get('http_proxy', '')
660         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
661         LOG.debug(cmd)
662         self.ssh_helper.execute(cmd)
663         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
664         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
665         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
666
667
668 class ScenarioHelper(object):
669
670     DEFAULT_VNF_CFG = {
671         'lb_config': 'SW',
672         'lb_count': 1,
673         'worker_config': '1C/1T',
674         'worker_threads': 1,
675     }
676
677     def __init__(self, name):
678         self.name = name
679         self.scenario_cfg = None
680
681     @property
682     def task_path(self):
683         return self.scenario_cfg['task_path']
684
685     @property
686     def nodes(self):
687         return self.scenario_cfg.get('nodes')
688
689     @property
690     def all_options(self):
691         return self.scenario_cfg.get('options', {})
692
693     @property
694     def options(self):
695         return self.all_options.get(self.name, {})
696
697     @property
698     def vnf_cfg(self):
699         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
700
701     @property
702     def topology(self):
703         return self.scenario_cfg['topology']
704
705
706 class SampleVNF(GenericVNF):
707     """ Class providing file-like API for generic VNF implementation """
708
709     VNF_PROMPT = "pipeline>"
710     WAIT_TIME = 1
711
712     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
713         super(SampleVNF, self).__init__(name, vnfd)
714         self.bin_path = get_nsb_option('bin_path', '')
715
716         self.scenario_helper = ScenarioHelper(self.name)
717         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
718
719         if setup_env_helper_type is None:
720             setup_env_helper_type = SetupEnvHelper
721
722         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
723                                                   self.ssh_helper,
724                                                   self.scenario_helper)
725
726         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
727
728         if resource_helper_type is None:
729             resource_helper_type = ResourceHelper
730
731         self.resource_helper = resource_helper_type(self.setup_helper)
732
733         self.all_ports = None
734         self.context_cfg = None
735         self.nfvi_context = None
736         self.pipeline_kwargs = {}
737         self.priv_ports = None
738         self.pub_ports = None
739         # TODO(esm): make QueueFileWrapper invert-able so that we
740         #            never have to manage the queues
741         self.q_in = Queue()
742         self.q_out = Queue()
743         self.queue_wrapper = None
744         self.run_kwargs = {}
745         self.tg_port_pairs = None
746         self.used_drivers = {}
747         self.vnf_port_pairs = None
748         self._vnf_process = None
749
750     def _get_route_data(self, route_index, route_type):
751         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
752         for _ in range(route_index):
753             next(route_iter, '')
754         return next(route_iter, {}).get(route_type, '')
755
756     def _get_port0localip6(self):
757         return_value = self._get_route_data(0, 'network')
758         LOG.info("_get_port0localip6 : %s", return_value)
759         return return_value
760
761     def _get_port1localip6(self):
762         return_value = self._get_route_data(1, 'network')
763         LOG.info("_get_port1localip6 : %s", return_value)
764         return return_value
765
766     def _get_port0prefixlen6(self):
767         return_value = self._get_route_data(0, 'netmask')
768         LOG.info("_get_port0prefixlen6 : %s", return_value)
769         return return_value
770
771     def _get_port1prefixlen6(self):
772         return_value = self._get_route_data(1, 'netmask')
773         LOG.info("_get_port1prefixlen6 : %s", return_value)
774         return return_value
775
776     def _get_port0gateway6(self):
777         return_value = self._get_route_data(0, 'network')
778         LOG.info("_get_port0gateway6 : %s", return_value)
779         return return_value
780
781     def _get_port1gateway6(self):
782         return_value = self._get_route_data(1, 'network')
783         LOG.info("_get_port1gateway6 : %s", return_value)
784         return return_value
785
786     def _start_vnf(self):
787         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
788         self._vnf_process = Process(target=self._run)
789         self._vnf_process.start()
790
791     def _vnf_up_post(self):
792         pass
793
794     def instantiate(self, scenario_cfg, context_cfg):
795         self.scenario_helper.scenario_cfg = scenario_cfg
796         self.context_cfg = context_cfg
797         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
798         # self.nfvi_context = None
799
800         self.deploy_helper.deploy_vnfs(self.APP_NAME)
801         self.resource_helper.setup()
802         self._start_vnf()
803
804     def wait_for_instantiate(self):
805         buf = []
806         time.sleep(self.WAIT_TIME)  # Give some time for config to load
807         while True:
808             if not self._vnf_process.is_alive():
809                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
810
811             # TODO(esm): move to QueueFileWrapper
812             while self.q_out.qsize() > 0:
813                 buf.append(self.q_out.get())
814                 message = ''.join(buf)
815                 if self.VNF_PROMPT in message:
816                     LOG.info("%s VNF is up and running.", self.APP_NAME)
817                     self._vnf_up_post()
818                     self.queue_wrapper.clear()
819                     self.resource_helper.start_collect()
820                     return self._vnf_process.exitcode
821
822                 if "PANIC" in message:
823                     raise RuntimeError("Error starting %s VNF." %
824                                        self.APP_NAME)
825
826             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
827             time.sleep(1)
828
829     def _build_run_kwargs(self):
830         self.run_kwargs = {
831             'stdin': self.queue_wrapper,
832             'stdout': self.queue_wrapper,
833             'keep_stdin_open': True,
834             'pty': True,
835         }
836
837     def _build_config(self):
838         return self.setup_helper.build_config()
839
840     def _run(self):
841         # we can't share ssh paramiko objects to force new connection
842         self.ssh_helper.drop_connection()
843         cmd = self._build_config()
844         # kill before starting
845         self.setup_helper.kill_vnf()
846
847         LOG.debug(cmd)
848         self._build_run_kwargs()
849         self.ssh_helper.run(cmd, **self.run_kwargs)
850
851     def vnf_execute(self, cmd, wait_time=2):
852         """ send cmd to vnf process """
853
854         LOG.info("%s command: %s", self.APP_NAME, cmd)
855         self.q_in.put("{}\r\n".format(cmd))
856         time.sleep(wait_time)
857         output = []
858         while self.q_out.qsize() > 0:
859             output.append(self.q_out.get())
860         return "".join(output)
861
862     def _tear_down(self):
863         pass
864
865     def terminate(self):
866         self.vnf_execute("quit")
867         if self._vnf_process:
868             self._vnf_process.terminate()
869         self.setup_helper.kill_vnf()
870         self._tear_down()
871         self.resource_helper.stop_collect()
872
873     def get_stats(self, *args, **kwargs):
874         """
875         Method for checking the statistics
876
877         :return:
878            VNF statistics
879         """
880         cmd = 'p {0} stats'.format(self.APP_WORD)
881         out = self.vnf_execute(cmd)
882         return out
883
884     def collect_kpi(self):
885         stats = self.get_stats()
886         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
887         if m:
888             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
889             result["collect_stats"] = self.resource_helper.collect_kpi()
890         else:
891             result = {
892                 "packets_in": 0,
893                 "packets_fwd": 0,
894                 "packets_dropped": 0,
895             }
896         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
897         return result
898
899
900 class SampleVNFTrafficGen(GenericTrafficGen):
901     """ Class providing file-like API for generic traffic generator """
902
903     APP_NAME = 'Sample'
904     RUN_WAIT = 1
905
906     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
907         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
908         self.bin_path = get_nsb_option('bin_path', '')
909         self.name = "tgen__1"  # name in topology file
910
911         self.scenario_helper = ScenarioHelper(self.name)
912         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
913
914         if setup_env_helper_type is None:
915             setup_env_helper_type = SetupEnvHelper
916
917         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
918                                                   self.ssh_helper,
919                                                   self.scenario_helper)
920
921         if resource_helper_type is None:
922             resource_helper_type = ClientResourceHelper
923
924         self.resource_helper = resource_helper_type(self.setup_helper)
925
926         self.runs_traffic = True
927         self.traffic_finished = False
928         self.tg_port_pairs = None
929         self._tg_process = None
930         self._traffic_process = None
931
932     def _start_server(self):
933         # we can't share ssh paramiko objects to force new connection
934         self.ssh_helper.drop_connection()
935
936     def instantiate(self, scenario_cfg, context_cfg):
937         self.scenario_helper.scenario_cfg = scenario_cfg
938         self.resource_helper.generate_cfg()
939         self.resource_helper.setup()
940
941         LOG.info("Starting %s server...", self.APP_NAME)
942         self._tg_process = Process(target=self._start_server)
943         self._tg_process.start()
944
945     def wait_for_instantiate(self):
946         # overridden by subclasses
947         return self._wait_for_process()
948
949     def _check_status(self):
950         raise NotImplementedError
951
952     def _wait_for_process(self):
953         while True:
954             if not self._tg_process.is_alive():
955                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
956             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
957             time.sleep(1)
958             status = self._check_status()
959             if status == 0:
960                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
961                 return self._tg_process.exitcode
962
963     def _traffic_runner(self, traffic_profile):
964         # always drop connections first thing in new processes
965         # so we don't get paramiko errors
966         self.ssh_helper.drop_connection()
967         LOG.info("Starting %s client...", self.APP_NAME)
968         self.resource_helper.run_traffic(traffic_profile)
969
970     def run_traffic(self, traffic_profile):
971         """ Generate traffic on the wire according to the given params.
972         Method is non-blocking, returns immediately when traffic process
973         is running. Mandatory.
974
975         :param traffic_profile:
976         :return: True/False
977         """
978         self._traffic_process = Process(target=self._traffic_runner,
979                                         args=(traffic_profile,))
980         self._traffic_process.start()
981         # Wait for traffic process to start
982         while self.resource_helper.client_started.value == 0:
983             time.sleep(self.RUN_WAIT)
984             # what if traffic process takes a few seconds to start?
985             if not self._traffic_process.is_alive():
986                 break
987
988         return self._traffic_process.is_alive()
989
990     def listen_traffic(self, traffic_profile):
991         """ Listen to traffic with the given parameters.
992         Method is non-blocking, returns immediately when traffic process
993         is running. Optional.
994
995         :param traffic_profile:
996         :return: True/False
997         """
998         pass
999
1000     def verify_traffic(self, traffic_profile):
1001         """ Verify captured traffic after it has ended. Optional.
1002
1003         :param traffic_profile:
1004         :return: dict
1005         """
1006         pass
1007
1008     def collect_kpi(self):
1009         result = self.resource_helper.collect_kpi()
1010         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1011         return result
1012
1013     def terminate(self):
1014         """ After this method finishes, all traffic processes should stop. Mandatory.
1015
1016         :return: True/False
1017         """
1018         self.traffic_finished = True
1019         if self._traffic_process is not None:
1020             self._traffic_process.terminate()