Merge "resource: remove cores args"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.cpu import CpuSysCores
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.utils import get_nsb_option
42
43 from trex_stl_lib.trex_stl_client import STLClient
44 from trex_stl_lib.trex_stl_client import LoggerApi
45 from trex_stl_lib.trex_stl_exceptions import STLError
46
47 from yardstick.ssh import AutoConnectSSH
48
49 DPDK_VERSION = "dpdk-16.07"
50
51 LOG = logging.getLogger(__name__)
52
53
54 REMOTE_TMP = "/tmp"
55 DEFAULT_VNF_TIMEOUT = 3600
56 PROCESS_JOIN_TIMEOUT = 3
57
58
59 class VnfSshHelper(AutoConnectSSH):
60
61     def __init__(self, node, bin_path, wait=None):
62         self.node = node
63         kwargs = self.args_from_node(self.node)
64         if wait:
65             kwargs.setdefault('wait', wait)
66
67         super(VnfSshHelper, self).__init__(**kwargs)
68         self.bin_path = bin_path
69
70     @staticmethod
71     def get_class():
72         # must return static class name, anything else refers to the calling class
73         # i.e. the subclass, not the superclass
74         return VnfSshHelper
75
76     def copy(self):
77         # this copy constructor is different from SSH classes, since it uses node
78         return self.get_class()(self.node, self.bin_path)
79
80     def upload_config_file(self, prefix, content):
81         cfg_file = os.path.join(REMOTE_TMP, prefix)
82         LOG.debug(content)
83         file_obj = cStringIO(content)
84         self.put_file_obj(file_obj, cfg_file)
85         return cfg_file
86
87     def join_bin_path(self, *args):
88         return os.path.join(self.bin_path, *args)
89
90     def provision_tool(self, tool_path=None, tool_file=None):
91         if tool_path is None:
92             tool_path = self.bin_path
93         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94
95
96 class SetupEnvHelper(object):
97
98     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
99     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
100     CORES = []
101     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102     PIPELINE_COMMAND = ''
103     VNF_TYPE = "SAMPLE"
104
105     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
106         super(SetupEnvHelper, self).__init__()
107         self.vnfd_helper = vnfd_helper
108         self.ssh_helper = ssh_helper
109         self.scenario_helper = scenario_helper
110
111     def build_config(self):
112         raise NotImplementedError
113
114     def setup_vnf_environment(self):
115         pass
116
117     def kill_vnf(self):
118         pass
119
120     def tear_down(self):
121         raise NotImplementedError
122
123
124 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125
126     APP_NAME = 'DpdkVnf'
127     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
128
129     HW_DEFAULT_CORE = 3
130     SW_DEFAULT_CORE = 2
131
132     @staticmethod
133     def _update_packet_type(ip_pipeline_cfg, traffic_options):
134         match_str = 'pkt_type = ipv4'
135         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
136         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
137         return pipeline_config_str
138
139     @classmethod
140     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
141         traffic_type = traffic_options['traffic_type']
142
143         if traffic_options['vnf_type'] is not cls.APP_NAME:
144             match_str = 'traffic_type = 4'
145             replace_str = 'traffic_type = {0}'.format(traffic_type)
146
147         elif traffic_type == 4:
148             match_str = 'pkt_type = ipv4'
149             replace_str = 'pkt_type = ipv4'
150
151         else:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv6'
154
155         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
156         return pipeline_config_str
157
158     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
159         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
160         self.all_ports = None
161         self.bound_pci = None
162         self.socket = None
163         self.used_drivers = None
164         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
165
166     def _setup_hugepages(self):
167         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
168         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
169
170         memory_path = \
171             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
172         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
173
174         if hugepages == "2048kB":
175             pages = 8192
176         else:
177             pages = 16
178
179         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
180
181     def build_config(self):
182         vnf_cfg = self.scenario_helper.vnf_cfg
183         task_path = self.scenario_helper.task_path
184
185         lb_count = vnf_cfg.get('lb_count', 3)
186         lb_config = vnf_cfg.get('lb_config', 'SW')
187         worker_config = vnf_cfg.get('worker_config', '1C/1T')
188         worker_threads = vnf_cfg.get('worker_threads', 3)
189
190         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
191         traffic_options = {
192             'traffic_type': traffic_type,
193             'pkt_type': 'ipv%s' % traffic_type,
194             'vnf_type': self.VNF_TYPE,
195         }
196
197         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
198         config_basename = posixpath.basename(self.CFG_CONFIG)
199         script_basename = posixpath.basename(self.CFG_SCRIPT)
200         multiport = MultiPortConfig(self.scenario_helper.topology,
201                                     config_tpl_cfg,
202                                     config_basename,
203                                     self.vnfd_helper,
204                                     self.VNF_TYPE,
205                                     lb_count,
206                                     worker_threads,
207                                     worker_config,
208                                     lb_config,
209                                     self.socket)
210
211         multiport.generate_config()
212         with open(self.CFG_CONFIG) as handle:
213             new_config = handle.read()
214
215         new_config = self._update_traffic_type(new_config, traffic_options)
216         new_config = self._update_packet_type(new_config, traffic_options)
217
218         self.ssh_helper.upload_config_file(config_basename, new_config)
219         self.ssh_helper.upload_config_file(script_basename,
220                                            multiport.generate_script(self.vnfd_helper))
221
222         LOG.info("Provision and start the %s", self.APP_NAME)
223         self._build_pipeline_kwargs()
224         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
225
226     def _build_pipeline_kwargs(self):
227         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
228         # count the number of actual ports in the list of pairs
229         # remove duplicate ports
230         # this is really a mapping from LINK ID to DPDK PMD ID
231         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
232         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
233         ports = self.vnfd_helper.port_pairs.all_ports
234         port_nums = self.vnfd_helper.port_nums(ports)
235         # create mask from all the dpdk port numbers
236         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
237         self.pipeline_kwargs = {
238             'cfg_file': self.CFG_CONFIG,
239             'script': self.CFG_SCRIPT,
240             'port_mask_hex': ports_mask_hex,
241             'tool_path': tool_path,
242         }
243
244     def _get_app_cpu(self):
245         if self.CORES:
246             return self.CORES
247
248         vnf_cfg = self.scenario_helper.vnf_cfg
249         sys_obj = CpuSysCores(self.ssh_helper)
250         self.sys_cpu = sys_obj.get_core_socket()
251         num_core = int(vnf_cfg["worker_threads"])
252         if vnf_cfg.get("lb_config", "SW") == 'HW':
253             num_core += self.HW_DEFAULT_CORE
254         else:
255             num_core += self.SW_DEFAULT_CORE
256         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
257         return app_cpu
258
259     def _get_cpu_sibling_list(self, cores=None):
260         if cores is None:
261             cores = self._get_app_cpu()
262         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
263         awk_template = "awk -F: '{ print $1 }' < %s"
264         sys_path = "/sys/devices/system/cpu/"
265         cpu_topology = []
266         try:
267             for core in cores:
268                 sys_cmd = sys_cmd_template % (sys_path, core)
269                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
270                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
271
272             return cpu_topology
273         except Exception:
274             return []
275
276     def _validate_cpu_cfg(self):
277         return self._get_cpu_sibling_list()
278
279     def setup_vnf_environment(self):
280         self._setup_dpdk()
281         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
282         self.kill_vnf()
283         # bind before _setup_resources so we can use dpdk_port_num
284         self._detect_and_bind_drivers()
285         resource = self._setup_resources()
286         return resource
287
288     def kill_vnf(self):
289         # pkill is not matching, debug with pgrep
290         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
291         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
292         # have to use exact match
293         # try using killall to match
294         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
295
296     def _setup_dpdk(self):
297         """ setup dpdk environment needed for vnf to run """
298
299         self._setup_hugepages()
300         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
301
302         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
303         if exit_status == 0:
304             return
305
306         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
307         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
308         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
309         if exit_status != 0:
310             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
311
312     def get_collectd_options(self):
313         options = self.scenario_helper.all_options.get("collectd", {})
314         # override with specific node settings
315         options.update(self.scenario_helper.options.get("collectd", {}))
316         return options
317
318     def _setup_resources(self):
319         # what is this magic?  how do we know which socket is for which port?
320         # what about quad-socket?
321         if any(v[5] == "0" for v in self.bound_pci):
322             self.socket = 0
323         else:
324             self.socket = 1
325
326         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
327         # this won't work because we don't have DPDK port numbers yet
328         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
329         port_names = (intf["name"] for intf in ports)
330         collectd_options = self.get_collectd_options()
331         plugins = collectd_options.get("plugins", {})
332         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
333         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
334                                plugins=plugins, interval=collectd_options.get("interval"),
335                                timeout=self.scenario_helper.timeout)
336
337     def _detect_and_bind_drivers(self):
338         interfaces = self.vnfd_helper.interfaces
339
340         self.dpdk_bind_helper.read_status()
341         self.dpdk_bind_helper.save_used_drivers()
342
343         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
344
345         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
346         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
347             try:
348                 intf = next(v for v in interfaces
349                             if vpci == v['virtual-interface']['vpci'])
350                 # force to int
351                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
352             except:
353                 pass
354         time.sleep(2)
355
356     def get_local_iface_name_by_vpci(self, vpci):
357         find_net_cmd = self.FIND_NET_CMD.format(vpci)
358         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
359         if exit_status == 0:
360             return stdout
361         return None
362
363     def tear_down(self):
364         self.dpdk_bind_helper.rebind_drivers()
365
366
367 class ResourceHelper(object):
368
369     COLLECT_KPI = ''
370     MAKE_INSTALL = 'cd {0} && make && sudo make install'
371     RESOURCE_WORD = 'sample'
372
373     COLLECT_MAP = {}
374
375     def __init__(self, setup_helper):
376         super(ResourceHelper, self).__init__()
377         self.resource = None
378         self.setup_helper = setup_helper
379         self.ssh_helper = setup_helper.ssh_helper
380
381     def setup(self):
382         self.resource = self.setup_helper.setup_vnf_environment()
383
384     def generate_cfg(self):
385         pass
386
387     def _collect_resource_kpi(self):
388         result = {}
389         status = self.resource.check_if_sa_running("collectd")[0]
390         if status == 0:
391             result = self.resource.amqp_collect_nfvi_kpi()
392
393         result = {"core": result}
394         return result
395
396     def start_collect(self):
397         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
398         self.resource.start()
399         self.resource.amqp_process_for_nfvi_kpi()
400
401     def stop_collect(self):
402         if self.resource:
403             self.resource.stop()
404
405     def collect_kpi(self):
406         return self._collect_resource_kpi()
407
408
409 class ClientResourceHelper(ResourceHelper):
410
411     RUN_DURATION = 60
412     QUEUE_WAIT_TIME = 5
413     SYNC_PORT = 1
414     ASYNC_PORT = 2
415
416     def __init__(self, setup_helper):
417         super(ClientResourceHelper, self).__init__(setup_helper)
418         self.vnfd_helper = setup_helper.vnfd_helper
419         self.scenario_helper = setup_helper.scenario_helper
420
421         self.client = None
422         self.client_started = Value('i', 0)
423         self.all_ports = None
424         self._queue = Queue()
425         self._result = {}
426         self._terminated = Value('i', 0)
427
428     def _build_ports(self):
429         self.networks = self.vnfd_helper.port_pairs.networks
430         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
431         self.downlink_ports = \
432             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
433         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
434
435     def get_stats(self, *args, **kwargs):
436         try:
437             return self.client.get_stats(*args, **kwargs)
438         except STLError:
439             LOG.exception("TRex client not connected")
440             return {}
441
442     def generate_samples(self, ports, key=None, default=None):
443         # needs to be used ports
444         last_result = self.get_stats(ports)
445         key_value = last_result.get(key, default)
446
447         if not isinstance(last_result, Mapping):  # added for mock unit test
448             self._terminated.value = 1
449             return {}
450
451         samples = {}
452         # recalculate port for interface and see if it matches ports provided
453         for intf in self.vnfd_helper.interfaces:
454             name = intf["name"]
455             port = self.vnfd_helper.port_num(name)
456             if port in ports:
457                 xe_value = last_result.get(port, {})
458                 samples[name] = {
459                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
460                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
461                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
462                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
463                     "in_packets": int(xe_value.get("ipackets", 0)),
464                     "out_packets": int(xe_value.get("opackets", 0)),
465                 }
466                 if key:
467                     samples[name][key] = key_value
468         return samples
469
470     def _run_traffic_once(self, traffic_profile):
471         traffic_profile.execute_traffic(self)
472         self.client_started.value = 1
473         time.sleep(self.RUN_DURATION)
474         samples = self.generate_samples(traffic_profile.ports)
475         time.sleep(self.QUEUE_WAIT_TIME)
476         self._queue.put(samples)
477
478     def run_traffic(self, traffic_profile):
479         # if we don't do this we can hang waiting for the queue to drain
480         # have to do this in the subprocess
481         self._queue.cancel_join_thread()
482         # fixme: fix passing correct trex config file,
483         # instead of searching the default path
484         try:
485             self._build_ports()
486             self.client = self._connect()
487             self.client.reset(ports=self.all_ports)
488             self.client.remove_all_streams(self.all_ports)  # remove all streams
489             traffic_profile.register_generator(self)
490
491             while self._terminated.value == 0:
492                 self._run_traffic_once(traffic_profile)
493
494             self.client.stop(self.all_ports)
495             self.client.disconnect()
496             self._terminated.value = 0
497         except STLError:
498             if self._terminated.value:
499                 LOG.debug("traffic generator is stopped")
500                 return  # return if trex/tg server is stopped.
501             raise
502
503     def terminate(self):
504         self._terminated.value = 1  # stop client
505
506     def clear_stats(self, ports=None):
507         if ports is None:
508             ports = self.all_ports
509         self.client.clear_stats(ports=ports)
510
511     def start(self, ports=None, *args, **kwargs):
512         if ports is None:
513             ports = self.all_ports
514         self.client.start(ports=ports, *args, **kwargs)
515
516     def collect_kpi(self):
517         if not self._queue.empty():
518             kpi = self._queue.get()
519             self._result.update(kpi)
520             LOG.debug("Got KPIs from _queue for {0} {1}".format(
521                 self.scenario_helper.name, self.RESOURCE_WORD))
522         return self._result
523
524     def _connect(self, client=None):
525         if client is None:
526             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
527                                server=self.vnfd_helper.mgmt_interface["ip"],
528                                verbose_level=LoggerApi.VERBOSE_QUIET)
529
530         # try to connect with 5s intervals, 30s max
531         for idx in range(6):
532             try:
533                 client.connect()
534                 break
535             except STLError:
536                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
537                 time.sleep(5)
538         return client
539
540
541 class Rfc2544ResourceHelper(object):
542
543     DEFAULT_CORRELATED_TRAFFIC = False
544     DEFAULT_LATENCY = False
545     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
546
547     def __init__(self, scenario_helper):
548         super(Rfc2544ResourceHelper, self).__init__()
549         self.scenario_helper = scenario_helper
550         self._correlated_traffic = None
551         self.iteration = Value('i', 0)
552         self._latency = None
553         self._rfc2544 = None
554         self._tolerance_low = None
555         self._tolerance_high = None
556
557     @property
558     def rfc2544(self):
559         if self._rfc2544 is None:
560             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
561         return self._rfc2544
562
563     @property
564     def tolerance_low(self):
565         if self._tolerance_low is None:
566             self.get_rfc_tolerance()
567         return self._tolerance_low
568
569     @property
570     def tolerance_high(self):
571         if self._tolerance_high is None:
572             self.get_rfc_tolerance()
573         return self._tolerance_high
574
575     @property
576     def correlated_traffic(self):
577         if self._correlated_traffic is None:
578             self._correlated_traffic = \
579                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
580
581         return self._correlated_traffic
582
583     @property
584     def latency(self):
585         if self._latency is None:
586             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
587         return self._latency
588
589     def get_rfc2544(self, name, default=None):
590         return self.rfc2544.get(name, default)
591
592     def get_rfc_tolerance(self):
593         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
594         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
595         self._tolerance_low = next(tolerance_iter)
596         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
597
598
599 class SampleVNFDeployHelper(object):
600
601     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
602     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
603     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
604
605     def __init__(self, vnfd_helper, ssh_helper):
606         super(SampleVNFDeployHelper, self).__init__()
607         self.ssh_helper = ssh_helper
608         self.vnfd_helper = vnfd_helper
609
610     def deploy_vnfs(self, app_name):
611         vnf_bin = self.ssh_helper.join_bin_path(app_name)
612         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
613         if not exit_status:
614             return
615
616         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
617         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
618         time.sleep(2)
619         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
620         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
621
622         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
623         time.sleep(2)
624         http_proxy = os.environ.get('http_proxy', '')
625         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
626         LOG.debug(cmd)
627         self.ssh_helper.execute(cmd)
628         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
629         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
630         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
631
632
633 class ScenarioHelper(object):
634
635     DEFAULT_VNF_CFG = {
636         'lb_config': 'SW',
637         'lb_count': 1,
638         'worker_config': '1C/1T',
639         'worker_threads': 1,
640     }
641
642     def __init__(self, name):
643         self.name = name
644         self.scenario_cfg = None
645
646     @property
647     def task_path(self):
648         return self.scenario_cfg['task_path']
649
650     @property
651     def nodes(self):
652         return self.scenario_cfg.get('nodes')
653
654     @property
655     def all_options(self):
656         return self.scenario_cfg.get('options', {})
657
658     @property
659     def options(self):
660         return self.all_options.get(self.name, {})
661
662     @property
663     def vnf_cfg(self):
664         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
665
666     @property
667     def topology(self):
668         return self.scenario_cfg['topology']
669
670     @property
671     def timeout(self):
672         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
673
674
675 class SampleVNF(GenericVNF):
676     """ Class providing file-like API for generic VNF implementation """
677
678     VNF_PROMPT = "pipeline>"
679     WAIT_TIME = 1
680     WAIT_TIME_FOR_SCRIPT = 10
681     APP_NAME = "SampleVNF"
682     # we run the VNF interactively, so the ssh command will timeout after this long
683
684     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
685         super(SampleVNF, self).__init__(name, vnfd)
686         self.bin_path = get_nsb_option('bin_path', '')
687
688         self.scenario_helper = ScenarioHelper(self.name)
689         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
690
691         if setup_env_helper_type is None:
692             setup_env_helper_type = SetupEnvHelper
693
694         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
695                                                   self.ssh_helper,
696                                                   self.scenario_helper)
697
698         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
699
700         if resource_helper_type is None:
701             resource_helper_type = ResourceHelper
702
703         self.resource_helper = resource_helper_type(self.setup_helper)
704
705         self.context_cfg = None
706         self.nfvi_context = None
707         self.pipeline_kwargs = {}
708         self.uplink_ports = None
709         self.downlink_ports = None
710         # TODO(esm): make QueueFileWrapper invert-able so that we
711         #            never have to manage the queues
712         self.q_in = Queue()
713         self.q_out = Queue()
714         self.queue_wrapper = None
715         self.run_kwargs = {}
716         self.used_drivers = {}
717         self.vnf_port_pairs = None
718         self._vnf_process = None
719
720     def _build_ports(self):
721         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
722         self.networks = self._port_pairs.networks
723         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
724         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
725         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
726
727     def _get_route_data(self, route_index, route_type):
728         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
729         for _ in range(route_index):
730             next(route_iter, '')
731         return next(route_iter, {}).get(route_type, '')
732
733     def _get_port0localip6(self):
734         return_value = self._get_route_data(0, 'network')
735         LOG.info("_get_port0localip6 : %s", return_value)
736         return return_value
737
738     def _get_port1localip6(self):
739         return_value = self._get_route_data(1, 'network')
740         LOG.info("_get_port1localip6 : %s", return_value)
741         return return_value
742
743     def _get_port0prefixlen6(self):
744         return_value = self._get_route_data(0, 'netmask')
745         LOG.info("_get_port0prefixlen6 : %s", return_value)
746         return return_value
747
748     def _get_port1prefixlen6(self):
749         return_value = self._get_route_data(1, 'netmask')
750         LOG.info("_get_port1prefixlen6 : %s", return_value)
751         return return_value
752
753     def _get_port0gateway6(self):
754         return_value = self._get_route_data(0, 'network')
755         LOG.info("_get_port0gateway6 : %s", return_value)
756         return return_value
757
758     def _get_port1gateway6(self):
759         return_value = self._get_route_data(1, 'network')
760         LOG.info("_get_port1gateway6 : %s", return_value)
761         return return_value
762
763     def _start_vnf(self):
764         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
765         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
766         self._vnf_process = Process(name=name, target=self._run)
767         self._vnf_process.start()
768
769     def _vnf_up_post(self):
770         pass
771
772     def instantiate(self, scenario_cfg, context_cfg):
773         self.scenario_helper.scenario_cfg = scenario_cfg
774         self.context_cfg = context_cfg
775         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
776         # self.nfvi_context = None
777
778         # vnf deploy is unsupported, use ansible playbooks
779         if self.scenario_helper.options.get("vnf_deploy", False):
780             self.deploy_helper.deploy_vnfs(self.APP_NAME)
781         self.resource_helper.setup()
782         self._start_vnf()
783
784     def wait_for_instantiate(self):
785         buf = []
786         time.sleep(self.WAIT_TIME)  # Give some time for config to load
787         while True:
788             if not self._vnf_process.is_alive():
789                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
790
791             # TODO(esm): move to QueueFileWrapper
792             while self.q_out.qsize() > 0:
793                 buf.append(self.q_out.get())
794                 message = ''.join(buf)
795                 if self.VNF_PROMPT in message:
796                     LOG.info("%s VNF is up and running.", self.APP_NAME)
797                     self._vnf_up_post()
798                     self.queue_wrapper.clear()
799                     self.resource_helper.start_collect()
800                     return self._vnf_process.exitcode
801
802                 if "PANIC" in message:
803                     raise RuntimeError("Error starting %s VNF." %
804                                        self.APP_NAME)
805
806             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
807             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
808             # Send ENTER to display a new prompt in case the prompt text was corrupted
809             # by other VNF output
810             self.q_in.put('\r\n')
811
812     def _build_run_kwargs(self):
813         self.run_kwargs = {
814             'stdin': self.queue_wrapper,
815             'stdout': self.queue_wrapper,
816             'keep_stdin_open': True,
817             'pty': True,
818             'timeout': self.scenario_helper.timeout,
819         }
820
821     def _build_config(self):
822         return self.setup_helper.build_config()
823
824     def _run(self):
825         # we can't share ssh paramiko objects to force new connection
826         self.ssh_helper.drop_connection()
827         cmd = self._build_config()
828         # kill before starting
829         self.setup_helper.kill_vnf()
830
831         LOG.debug(cmd)
832         self._build_run_kwargs()
833         self.ssh_helper.run(cmd, **self.run_kwargs)
834
835     def vnf_execute(self, cmd, wait_time=2):
836         """ send cmd to vnf process """
837
838         LOG.info("%s command: %s", self.APP_NAME, cmd)
839         self.q_in.put("{}\r\n".format(cmd))
840         time.sleep(wait_time)
841         output = []
842         while self.q_out.qsize() > 0:
843             output.append(self.q_out.get())
844         return "".join(output)
845
846     def _tear_down(self):
847         pass
848
849     def terminate(self):
850         self.vnf_execute("quit")
851         self.setup_helper.kill_vnf()
852         self._tear_down()
853         self.resource_helper.stop_collect()
854         if self._vnf_process is not None:
855             # be proper and join first before we kill
856             LOG.debug("joining before terminate %s", self._vnf_process.name)
857             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
858             self._vnf_process.terminate()
859         # no terminate children here because we share processes with tg
860
861     def get_stats(self, *args, **kwargs):
862         """
863         Method for checking the statistics
864
865         :return:
866            VNF statistics
867         """
868         cmd = 'p {0} stats'.format(self.APP_WORD)
869         out = self.vnf_execute(cmd)
870         return out
871
872     def collect_kpi(self):
873         # we can't get KPIs if the VNF is down
874         check_if_process_failed(self._vnf_process)
875         stats = self.get_stats()
876         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
877         if m:
878             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
879             result["collect_stats"] = self.resource_helper.collect_kpi()
880         else:
881             result = {
882                 "packets_in": 0,
883                 "packets_fwd": 0,
884                 "packets_dropped": 0,
885             }
886         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
887         return result
888
889
890 class SampleVNFTrafficGen(GenericTrafficGen):
891     """ Class providing file-like API for generic traffic generator """
892
893     APP_NAME = 'Sample'
894     RUN_WAIT = 1
895
896     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
897         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
898         self.bin_path = get_nsb_option('bin_path', '')
899
900         self.scenario_helper = ScenarioHelper(self.name)
901         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
902
903         if setup_env_helper_type is None:
904             setup_env_helper_type = SetupEnvHelper
905
906         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
907                                                   self.ssh_helper,
908                                                   self.scenario_helper)
909
910         if resource_helper_type is None:
911             resource_helper_type = ClientResourceHelper
912
913         self.resource_helper = resource_helper_type(self.setup_helper)
914
915         self.runs_traffic = True
916         self.traffic_finished = False
917         self._tg_process = None
918         self._traffic_process = None
919
920     def _start_server(self):
921         # we can't share ssh paramiko objects to force new connection
922         self.ssh_helper.drop_connection()
923
924     def instantiate(self, scenario_cfg, context_cfg):
925         self.scenario_helper.scenario_cfg = scenario_cfg
926         self.resource_helper.setup()
927         # must generate_cfg after DPDK bind because we need port number
928         self.resource_helper.generate_cfg()
929
930         LOG.info("Starting %s server...", self.APP_NAME)
931         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
932         self._tg_process = Process(name=name, target=self._start_server)
933         self._tg_process.start()
934
935     def wait_for_instantiate(self):
936         # overridden by subclasses
937         return self._wait_for_process()
938
939     def _check_status(self):
940         raise NotImplementedError
941
942     def _wait_for_process(self):
943         while True:
944             if not self._tg_process.is_alive():
945                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
946             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
947             time.sleep(1)
948             status = self._check_status()
949             if status == 0:
950                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
951                 return self._tg_process.exitcode
952
953     def _traffic_runner(self, traffic_profile):
954         # always drop connections first thing in new processes
955         # so we don't get paramiko errors
956         self.ssh_helper.drop_connection()
957         LOG.info("Starting %s client...", self.APP_NAME)
958         self.resource_helper.run_traffic(traffic_profile)
959
960     def run_traffic(self, traffic_profile):
961         """ Generate traffic on the wire according to the given params.
962         Method is non-blocking, returns immediately when traffic process
963         is running. Mandatory.
964
965         :param traffic_profile:
966         :return: True/False
967         """
968         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
969                                     os.getpid())
970         self._traffic_process = Process(name=name, target=self._traffic_runner,
971                                         args=(traffic_profile,))
972         self._traffic_process.start()
973         # Wait for traffic process to start
974         while self.resource_helper.client_started.value == 0:
975             time.sleep(self.RUN_WAIT)
976             # what if traffic process takes a few seconds to start?
977             if not self._traffic_process.is_alive():
978                 break
979
980         return self._traffic_process.is_alive()
981
982     def listen_traffic(self, traffic_profile):
983         """ Listen to traffic with the given parameters.
984         Method is non-blocking, returns immediately when traffic process
985         is running. Optional.
986
987         :param traffic_profile:
988         :return: True/False
989         """
990         pass
991
992     def verify_traffic(self, traffic_profile):
993         """ Verify captured traffic after it has ended. Optional.
994
995         :param traffic_profile:
996         :return: dict
997         """
998         pass
999
1000     def collect_kpi(self):
1001         # check if the tg processes have exited
1002         for proc in (self._tg_process, self._traffic_process):
1003             check_if_process_failed(proc)
1004         result = self.resource_helper.collect_kpi()
1005         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1006         return result
1007
1008     def terminate(self):
1009         """ After this method finishes, all traffic processes should stop. Mandatory.
1010
1011         :return: True/False
1012         """
1013         self.traffic_finished = True
1014         # we must kill client before we kill the server, or the client will raise exception
1015         if self._traffic_process is not None:
1016             # be proper and try to join before terminating
1017             LOG.debug("joining before terminate %s", self._traffic_process.name)
1018             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
1019             self._traffic_process.terminate()
1020         if self._tg_process is not None:
1021             # be proper and try to join before terminating
1022             LOG.debug("joining before terminate %s", self._tg_process.name)
1023             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
1024             self._tg_process.terminate()
1025         # no terminate children here because we share processes with vnf