NSB: remove validate_cpu_cfg and all app cpu methods
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25 from multiprocessing import Queue, Value, Process
26
27 from six.moves import cStringIO
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
35 from yardstick.network_services.nfvi.resource import ResourceProfile
36 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
37 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.utils import get_nsb_option
40
41 from trex_stl_lib.trex_stl_client import STLClient
42 from trex_stl_lib.trex_stl_client import LoggerApi
43 from trex_stl_lib.trex_stl_exceptions import STLError
44
45 from yardstick.ssh import AutoConnectSSH
46
47 DPDK_VERSION = "dpdk-16.07"
48
49 LOG = logging.getLogger(__name__)
50
51
52 REMOTE_TMP = "/tmp"
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
55
56
57 class VnfSshHelper(AutoConnectSSH):
58
59     def __init__(self, node, bin_path, wait=None):
60         self.node = node
61         kwargs = self.args_from_node(self.node)
62         if wait:
63             kwargs.setdefault('wait', wait)
64
65         super(VnfSshHelper, self).__init__(**kwargs)
66         self.bin_path = bin_path
67
68     @staticmethod
69     def get_class():
70         # must return static class name, anything else refers to the calling class
71         # i.e. the subclass, not the superclass
72         return VnfSshHelper
73
74     def copy(self):
75         # this copy constructor is different from SSH classes, since it uses node
76         return self.get_class()(self.node, self.bin_path)
77
78     def upload_config_file(self, prefix, content):
79         cfg_file = os.path.join(REMOTE_TMP, prefix)
80         LOG.debug(content)
81         file_obj = cStringIO(content)
82         self.put_file_obj(file_obj, cfg_file)
83         return cfg_file
84
85     def join_bin_path(self, *args):
86         return os.path.join(self.bin_path, *args)
87
88     def provision_tool(self, tool_path=None, tool_file=None):
89         if tool_path is None:
90             tool_path = self.bin_path
91         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
92
93
94 class SetupEnvHelper(object):
95
96     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def build_config(self):
109         raise NotImplementedError
110
111     def setup_vnf_environment(self):
112         pass
113
114     def kill_vnf(self):
115         pass
116
117     def tear_down(self):
118         raise NotImplementedError
119
120
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
122
123     APP_NAME = 'DpdkVnf'
124     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125
126     @staticmethod
127     def _update_packet_type(ip_pipeline_cfg, traffic_options):
128         match_str = 'pkt_type = ipv4'
129         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
130         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
131         return pipeline_config_str
132
133     @classmethod
134     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
135         traffic_type = traffic_options['traffic_type']
136
137         if traffic_options['vnf_type'] is not cls.APP_NAME:
138             match_str = 'traffic_type = 4'
139             replace_str = 'traffic_type = {0}'.format(traffic_type)
140
141         elif traffic_type == 4:
142             match_str = 'pkt_type = ipv4'
143             replace_str = 'pkt_type = ipv4'
144
145         else:
146             match_str = 'pkt_type = ipv4'
147             replace_str = 'pkt_type = ipv6'
148
149         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
150         return pipeline_config_str
151
152     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
153         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
154         self.all_ports = None
155         self.bound_pci = None
156         self.socket = None
157         self.used_drivers = None
158         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
159
160     def _setup_hugepages(self):
161         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
162         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
163
164         memory_path = \
165             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
166         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
167
168         if hugepages == "2048kB":
169             pages = 8192
170         else:
171             pages = 16
172
173         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
174
175     def build_config(self):
176         vnf_cfg = self.scenario_helper.vnf_cfg
177         task_path = self.scenario_helper.task_path
178
179         lb_count = vnf_cfg.get('lb_count', 3)
180         lb_config = vnf_cfg.get('lb_config', 'SW')
181         worker_config = vnf_cfg.get('worker_config', '1C/1T')
182         worker_threads = vnf_cfg.get('worker_threads', 3)
183
184         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
185         traffic_options = {
186             'traffic_type': traffic_type,
187             'pkt_type': 'ipv%s' % traffic_type,
188             'vnf_type': self.VNF_TYPE,
189         }
190
191         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
192         config_basename = posixpath.basename(self.CFG_CONFIG)
193         script_basename = posixpath.basename(self.CFG_SCRIPT)
194         multiport = MultiPortConfig(self.scenario_helper.topology,
195                                     config_tpl_cfg,
196                                     config_basename,
197                                     self.vnfd_helper,
198                                     self.VNF_TYPE,
199                                     lb_count,
200                                     worker_threads,
201                                     worker_config,
202                                     lb_config,
203                                     self.socket)
204
205         multiport.generate_config()
206         with open(self.CFG_CONFIG) as handle:
207             new_config = handle.read()
208
209         new_config = self._update_traffic_type(new_config, traffic_options)
210         new_config = self._update_packet_type(new_config, traffic_options)
211
212         self.ssh_helper.upload_config_file(config_basename, new_config)
213         self.ssh_helper.upload_config_file(script_basename,
214                                            multiport.generate_script(self.vnfd_helper))
215
216         LOG.info("Provision and start the %s", self.APP_NAME)
217         self._build_pipeline_kwargs()
218         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
219
220     def _build_pipeline_kwargs(self):
221         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
222         # count the number of actual ports in the list of pairs
223         # remove duplicate ports
224         # this is really a mapping from LINK ID to DPDK PMD ID
225         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
226         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
227         ports = self.vnfd_helper.port_pairs.all_ports
228         port_nums = self.vnfd_helper.port_nums(ports)
229         # create mask from all the dpdk port numbers
230         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
231         self.pipeline_kwargs = {
232             'cfg_file': self.CFG_CONFIG,
233             'script': self.CFG_SCRIPT,
234             'port_mask_hex': ports_mask_hex,
235             'tool_path': tool_path,
236         }
237
238     def setup_vnf_environment(self):
239         self._setup_dpdk()
240         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
241         self.kill_vnf()
242         # bind before _setup_resources so we can use dpdk_port_num
243         self._detect_and_bind_drivers()
244         resource = self._setup_resources()
245         return resource
246
247     def kill_vnf(self):
248         # pkill is not matching, debug with pgrep
249         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
250         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
251         # have to use exact match
252         # try using killall to match
253         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
254
255     def _setup_dpdk(self):
256         """ setup dpdk environment needed for vnf to run """
257
258         self._setup_hugepages()
259         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
260
261         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
262         if exit_status == 0:
263             return
264
265         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
266         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
267         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
268         if exit_status != 0:
269             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
270
271     def get_collectd_options(self):
272         options = self.scenario_helper.all_options.get("collectd", {})
273         # override with specific node settings
274         options.update(self.scenario_helper.options.get("collectd", {}))
275         return options
276
277     def _setup_resources(self):
278         # what is this magic?  how do we know which socket is for which port?
279         # what about quad-socket?
280         if any(v[5] == "0" for v in self.bound_pci):
281             self.socket = 0
282         else:
283             self.socket = 1
284
285         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
286         # this won't work because we don't have DPDK port numbers yet
287         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
288         port_names = (intf["name"] for intf in ports)
289         collectd_options = self.get_collectd_options()
290         plugins = collectd_options.get("plugins", {})
291         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
292         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
293                                plugins=plugins, interval=collectd_options.get("interval"),
294                                timeout=self.scenario_helper.timeout)
295
296     def _detect_and_bind_drivers(self):
297         interfaces = self.vnfd_helper.interfaces
298
299         self.dpdk_bind_helper.read_status()
300         self.dpdk_bind_helper.save_used_drivers()
301
302         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
303
304         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
305         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
306             try:
307                 intf = next(v for v in interfaces
308                             if vpci == v['virtual-interface']['vpci'])
309                 # force to int
310                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
311             except:
312                 pass
313         time.sleep(2)
314
315     def get_local_iface_name_by_vpci(self, vpci):
316         find_net_cmd = self.FIND_NET_CMD.format(vpci)
317         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
318         if exit_status == 0:
319             return stdout
320         return None
321
322     def tear_down(self):
323         self.dpdk_bind_helper.rebind_drivers()
324
325
326 class ResourceHelper(object):
327
328     COLLECT_KPI = ''
329     MAKE_INSTALL = 'cd {0} && make && sudo make install'
330     RESOURCE_WORD = 'sample'
331
332     COLLECT_MAP = {}
333
334     def __init__(self, setup_helper):
335         super(ResourceHelper, self).__init__()
336         self.resource = None
337         self.setup_helper = setup_helper
338         self.ssh_helper = setup_helper.ssh_helper
339
340     def setup(self):
341         self.resource = self.setup_helper.setup_vnf_environment()
342
343     def generate_cfg(self):
344         pass
345
346     def _collect_resource_kpi(self):
347         result = {}
348         status = self.resource.check_if_sa_running("collectd")[0]
349         if status == 0:
350             result = self.resource.amqp_collect_nfvi_kpi()
351
352         result = {"core": result}
353         return result
354
355     def start_collect(self):
356         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
357         self.resource.start()
358         self.resource.amqp_process_for_nfvi_kpi()
359
360     def stop_collect(self):
361         if self.resource:
362             self.resource.stop()
363
364     def collect_kpi(self):
365         return self._collect_resource_kpi()
366
367
368 class ClientResourceHelper(ResourceHelper):
369
370     RUN_DURATION = 60
371     QUEUE_WAIT_TIME = 5
372     SYNC_PORT = 1
373     ASYNC_PORT = 2
374
375     def __init__(self, setup_helper):
376         super(ClientResourceHelper, self).__init__(setup_helper)
377         self.vnfd_helper = setup_helper.vnfd_helper
378         self.scenario_helper = setup_helper.scenario_helper
379
380         self.client = None
381         self.client_started = Value('i', 0)
382         self.all_ports = None
383         self._queue = Queue()
384         self._result = {}
385         self._terminated = Value('i', 0)
386
387     def _build_ports(self):
388         self.networks = self.vnfd_helper.port_pairs.networks
389         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
390         self.downlink_ports = \
391             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
392         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
393
394     def get_stats(self, *args, **kwargs):
395         try:
396             return self.client.get_stats(*args, **kwargs)
397         except STLError:
398             LOG.exception("TRex client not connected")
399             return {}
400
401     def generate_samples(self, ports, key=None, default=None):
402         # needs to be used ports
403         last_result = self.get_stats(ports)
404         key_value = last_result.get(key, default)
405
406         if not isinstance(last_result, Mapping):  # added for mock unit test
407             self._terminated.value = 1
408             return {}
409
410         samples = {}
411         # recalculate port for interface and see if it matches ports provided
412         for intf in self.vnfd_helper.interfaces:
413             name = intf["name"]
414             port = self.vnfd_helper.port_num(name)
415             if port in ports:
416                 xe_value = last_result.get(port, {})
417                 samples[name] = {
418                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
419                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
420                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
421                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
422                     "in_packets": int(xe_value.get("ipackets", 0)),
423                     "out_packets": int(xe_value.get("opackets", 0)),
424                 }
425                 if key:
426                     samples[name][key] = key_value
427         return samples
428
429     def _run_traffic_once(self, traffic_profile):
430         traffic_profile.execute_traffic(self)
431         self.client_started.value = 1
432         time.sleep(self.RUN_DURATION)
433         samples = self.generate_samples(traffic_profile.ports)
434         time.sleep(self.QUEUE_WAIT_TIME)
435         self._queue.put(samples)
436
437     def run_traffic(self, traffic_profile):
438         # if we don't do this we can hang waiting for the queue to drain
439         # have to do this in the subprocess
440         self._queue.cancel_join_thread()
441         # fixme: fix passing correct trex config file,
442         # instead of searching the default path
443         try:
444             self._build_ports()
445             self.client = self._connect()
446             self.client.reset(ports=self.all_ports)
447             self.client.remove_all_streams(self.all_ports)  # remove all streams
448             traffic_profile.register_generator(self)
449
450             while self._terminated.value == 0:
451                 self._run_traffic_once(traffic_profile)
452
453             self.client.stop(self.all_ports)
454             self.client.disconnect()
455             self._terminated.value = 0
456         except STLError:
457             if self._terminated.value:
458                 LOG.debug("traffic generator is stopped")
459                 return  # return if trex/tg server is stopped.
460             raise
461
462     def terminate(self):
463         self._terminated.value = 1  # stop client
464
465     def clear_stats(self, ports=None):
466         if ports is None:
467             ports = self.all_ports
468         self.client.clear_stats(ports=ports)
469
470     def start(self, ports=None, *args, **kwargs):
471         if ports is None:
472             ports = self.all_ports
473         self.client.start(ports=ports, *args, **kwargs)
474
475     def collect_kpi(self):
476         if not self._queue.empty():
477             kpi = self._queue.get()
478             self._result.update(kpi)
479             LOG.debug("Got KPIs from _queue for {0} {1}".format(
480                 self.scenario_helper.name, self.RESOURCE_WORD))
481         return self._result
482
483     def _connect(self, client=None):
484         if client is None:
485             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
486                                server=self.vnfd_helper.mgmt_interface["ip"],
487                                verbose_level=LoggerApi.VERBOSE_QUIET)
488
489         # try to connect with 5s intervals, 30s max
490         for idx in range(6):
491             try:
492                 client.connect()
493                 break
494             except STLError:
495                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
496                 time.sleep(5)
497         return client
498
499
500 class Rfc2544ResourceHelper(object):
501
502     DEFAULT_CORRELATED_TRAFFIC = False
503     DEFAULT_LATENCY = False
504     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
505
506     def __init__(self, scenario_helper):
507         super(Rfc2544ResourceHelper, self).__init__()
508         self.scenario_helper = scenario_helper
509         self._correlated_traffic = None
510         self.iteration = Value('i', 0)
511         self._latency = None
512         self._rfc2544 = None
513         self._tolerance_low = None
514         self._tolerance_high = None
515
516     @property
517     def rfc2544(self):
518         if self._rfc2544 is None:
519             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
520         return self._rfc2544
521
522     @property
523     def tolerance_low(self):
524         if self._tolerance_low is None:
525             self.get_rfc_tolerance()
526         return self._tolerance_low
527
528     @property
529     def tolerance_high(self):
530         if self._tolerance_high is None:
531             self.get_rfc_tolerance()
532         return self._tolerance_high
533
534     @property
535     def correlated_traffic(self):
536         if self._correlated_traffic is None:
537             self._correlated_traffic = \
538                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
539
540         return self._correlated_traffic
541
542     @property
543     def latency(self):
544         if self._latency is None:
545             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
546         return self._latency
547
548     def get_rfc2544(self, name, default=None):
549         return self.rfc2544.get(name, default)
550
551     def get_rfc_tolerance(self):
552         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
553         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
554         self._tolerance_low = next(tolerance_iter)
555         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
556
557
558 class SampleVNFDeployHelper(object):
559
560     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
561     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
562     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
563
564     def __init__(self, vnfd_helper, ssh_helper):
565         super(SampleVNFDeployHelper, self).__init__()
566         self.ssh_helper = ssh_helper
567         self.vnfd_helper = vnfd_helper
568
569     def deploy_vnfs(self, app_name):
570         vnf_bin = self.ssh_helper.join_bin_path(app_name)
571         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
572         if not exit_status:
573             return
574
575         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
576         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
577         time.sleep(2)
578         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
579         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
580
581         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
582         time.sleep(2)
583         http_proxy = os.environ.get('http_proxy', '')
584         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
585         LOG.debug(cmd)
586         self.ssh_helper.execute(cmd)
587         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
588         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
589         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
590
591
592 class ScenarioHelper(object):
593
594     DEFAULT_VNF_CFG = {
595         'lb_config': 'SW',
596         'lb_count': 1,
597         'worker_config': '1C/1T',
598         'worker_threads': 1,
599     }
600
601     def __init__(self, name):
602         self.name = name
603         self.scenario_cfg = None
604
605     @property
606     def task_path(self):
607         return self.scenario_cfg['task_path']
608
609     @property
610     def nodes(self):
611         return self.scenario_cfg.get('nodes')
612
613     @property
614     def all_options(self):
615         return self.scenario_cfg.get('options', {})
616
617     @property
618     def options(self):
619         return self.all_options.get(self.name, {})
620
621     @property
622     def vnf_cfg(self):
623         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
624
625     @property
626     def topology(self):
627         return self.scenario_cfg['topology']
628
629     @property
630     def timeout(self):
631         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
632
633
634 class SampleVNF(GenericVNF):
635     """ Class providing file-like API for generic VNF implementation """
636
637     VNF_PROMPT = "pipeline>"
638     WAIT_TIME = 1
639     WAIT_TIME_FOR_SCRIPT = 10
640     APP_NAME = "SampleVNF"
641     # we run the VNF interactively, so the ssh command will timeout after this long
642
643     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
644         super(SampleVNF, self).__init__(name, vnfd)
645         self.bin_path = get_nsb_option('bin_path', '')
646
647         self.scenario_helper = ScenarioHelper(self.name)
648         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
649
650         if setup_env_helper_type is None:
651             setup_env_helper_type = SetupEnvHelper
652
653         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
654                                                   self.ssh_helper,
655                                                   self.scenario_helper)
656
657         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
658
659         if resource_helper_type is None:
660             resource_helper_type = ResourceHelper
661
662         self.resource_helper = resource_helper_type(self.setup_helper)
663
664         self.context_cfg = None
665         self.nfvi_context = None
666         self.pipeline_kwargs = {}
667         self.uplink_ports = None
668         self.downlink_ports = None
669         # TODO(esm): make QueueFileWrapper invert-able so that we
670         #            never have to manage the queues
671         self.q_in = Queue()
672         self.q_out = Queue()
673         self.queue_wrapper = None
674         self.run_kwargs = {}
675         self.used_drivers = {}
676         self.vnf_port_pairs = None
677         self._vnf_process = None
678
679     def _build_ports(self):
680         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
681         self.networks = self._port_pairs.networks
682         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
683         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
684         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
685
686     def _get_route_data(self, route_index, route_type):
687         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
688         for _ in range(route_index):
689             next(route_iter, '')
690         return next(route_iter, {}).get(route_type, '')
691
692     def _get_port0localip6(self):
693         return_value = self._get_route_data(0, 'network')
694         LOG.info("_get_port0localip6 : %s", return_value)
695         return return_value
696
697     def _get_port1localip6(self):
698         return_value = self._get_route_data(1, 'network')
699         LOG.info("_get_port1localip6 : %s", return_value)
700         return return_value
701
702     def _get_port0prefixlen6(self):
703         return_value = self._get_route_data(0, 'netmask')
704         LOG.info("_get_port0prefixlen6 : %s", return_value)
705         return return_value
706
707     def _get_port1prefixlen6(self):
708         return_value = self._get_route_data(1, 'netmask')
709         LOG.info("_get_port1prefixlen6 : %s", return_value)
710         return return_value
711
712     def _get_port0gateway6(self):
713         return_value = self._get_route_data(0, 'network')
714         LOG.info("_get_port0gateway6 : %s", return_value)
715         return return_value
716
717     def _get_port1gateway6(self):
718         return_value = self._get_route_data(1, 'network')
719         LOG.info("_get_port1gateway6 : %s", return_value)
720         return return_value
721
722     def _start_vnf(self):
723         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
724         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
725         self._vnf_process = Process(name=name, target=self._run)
726         self._vnf_process.start()
727
728     def _vnf_up_post(self):
729         pass
730
731     def instantiate(self, scenario_cfg, context_cfg):
732         self.scenario_helper.scenario_cfg = scenario_cfg
733         self.context_cfg = context_cfg
734         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
735         # self.nfvi_context = None
736
737         # vnf deploy is unsupported, use ansible playbooks
738         if self.scenario_helper.options.get("vnf_deploy", False):
739             self.deploy_helper.deploy_vnfs(self.APP_NAME)
740         self.resource_helper.setup()
741         self._start_vnf()
742
743     def wait_for_instantiate(self):
744         buf = []
745         time.sleep(self.WAIT_TIME)  # Give some time for config to load
746         while True:
747             if not self._vnf_process.is_alive():
748                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
749
750             # TODO(esm): move to QueueFileWrapper
751             while self.q_out.qsize() > 0:
752                 buf.append(self.q_out.get())
753                 message = ''.join(buf)
754                 if self.VNF_PROMPT in message:
755                     LOG.info("%s VNF is up and running.", self.APP_NAME)
756                     self._vnf_up_post()
757                     self.queue_wrapper.clear()
758                     self.resource_helper.start_collect()
759                     return self._vnf_process.exitcode
760
761                 if "PANIC" in message:
762                     raise RuntimeError("Error starting %s VNF." %
763                                        self.APP_NAME)
764
765             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
766             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
767             # Send ENTER to display a new prompt in case the prompt text was corrupted
768             # by other VNF output
769             self.q_in.put('\r\n')
770
771     def _build_run_kwargs(self):
772         self.run_kwargs = {
773             'stdin': self.queue_wrapper,
774             'stdout': self.queue_wrapper,
775             'keep_stdin_open': True,
776             'pty': True,
777             'timeout': self.scenario_helper.timeout,
778         }
779
780     def _build_config(self):
781         return self.setup_helper.build_config()
782
783     def _run(self):
784         # we can't share ssh paramiko objects to force new connection
785         self.ssh_helper.drop_connection()
786         cmd = self._build_config()
787         # kill before starting
788         self.setup_helper.kill_vnf()
789
790         LOG.debug(cmd)
791         self._build_run_kwargs()
792         self.ssh_helper.run(cmd, **self.run_kwargs)
793
794     def vnf_execute(self, cmd, wait_time=2):
795         """ send cmd to vnf process """
796
797         LOG.info("%s command: %s", self.APP_NAME, cmd)
798         self.q_in.put("{}\r\n".format(cmd))
799         time.sleep(wait_time)
800         output = []
801         while self.q_out.qsize() > 0:
802             output.append(self.q_out.get())
803         return "".join(output)
804
805     def _tear_down(self):
806         pass
807
808     def terminate(self):
809         self.vnf_execute("quit")
810         self.setup_helper.kill_vnf()
811         self._tear_down()
812         self.resource_helper.stop_collect()
813         if self._vnf_process is not None:
814             # be proper and join first before we kill
815             LOG.debug("joining before terminate %s", self._vnf_process.name)
816             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
817             self._vnf_process.terminate()
818         # no terminate children here because we share processes with tg
819
820     def get_stats(self, *args, **kwargs):
821         """
822         Method for checking the statistics
823
824         :return:
825            VNF statistics
826         """
827         cmd = 'p {0} stats'.format(self.APP_WORD)
828         out = self.vnf_execute(cmd)
829         return out
830
831     def collect_kpi(self):
832         # we can't get KPIs if the VNF is down
833         check_if_process_failed(self._vnf_process)
834         stats = self.get_stats()
835         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
836         if m:
837             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
838             result["collect_stats"] = self.resource_helper.collect_kpi()
839         else:
840             result = {
841                 "packets_in": 0,
842                 "packets_fwd": 0,
843                 "packets_dropped": 0,
844             }
845         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
846         return result
847
848
849 class SampleVNFTrafficGen(GenericTrafficGen):
850     """ Class providing file-like API for generic traffic generator """
851
852     APP_NAME = 'Sample'
853     RUN_WAIT = 1
854
855     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
856         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
857         self.bin_path = get_nsb_option('bin_path', '')
858
859         self.scenario_helper = ScenarioHelper(self.name)
860         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
861
862         if setup_env_helper_type is None:
863             setup_env_helper_type = SetupEnvHelper
864
865         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
866                                                   self.ssh_helper,
867                                                   self.scenario_helper)
868
869         if resource_helper_type is None:
870             resource_helper_type = ClientResourceHelper
871
872         self.resource_helper = resource_helper_type(self.setup_helper)
873
874         self.runs_traffic = True
875         self.traffic_finished = False
876         self._tg_process = None
877         self._traffic_process = None
878
879     def _start_server(self):
880         # we can't share ssh paramiko objects to force new connection
881         self.ssh_helper.drop_connection()
882
883     def instantiate(self, scenario_cfg, context_cfg):
884         self.scenario_helper.scenario_cfg = scenario_cfg
885         self.resource_helper.setup()
886         # must generate_cfg after DPDK bind because we need port number
887         self.resource_helper.generate_cfg()
888
889         LOG.info("Starting %s server...", self.APP_NAME)
890         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
891         self._tg_process = Process(name=name, target=self._start_server)
892         self._tg_process.start()
893
894     def wait_for_instantiate(self):
895         # overridden by subclasses
896         return self._wait_for_process()
897
898     def _check_status(self):
899         raise NotImplementedError
900
901     def _wait_for_process(self):
902         while True:
903             if not self._tg_process.is_alive():
904                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
905             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
906             time.sleep(1)
907             status = self._check_status()
908             if status == 0:
909                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
910                 return self._tg_process.exitcode
911
912     def _traffic_runner(self, traffic_profile):
913         # always drop connections first thing in new processes
914         # so we don't get paramiko errors
915         self.ssh_helper.drop_connection()
916         LOG.info("Starting %s client...", self.APP_NAME)
917         self.resource_helper.run_traffic(traffic_profile)
918
919     def run_traffic(self, traffic_profile):
920         """ Generate traffic on the wire according to the given params.
921         Method is non-blocking, returns immediately when traffic process
922         is running. Mandatory.
923
924         :param traffic_profile:
925         :return: True/False
926         """
927         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
928                                     os.getpid())
929         self._traffic_process = Process(name=name, target=self._traffic_runner,
930                                         args=(traffic_profile,))
931         self._traffic_process.start()
932         # Wait for traffic process to start
933         while self.resource_helper.client_started.value == 0:
934             time.sleep(self.RUN_WAIT)
935             # what if traffic process takes a few seconds to start?
936             if not self._traffic_process.is_alive():
937                 break
938
939         return self._traffic_process.is_alive()
940
941     def listen_traffic(self, traffic_profile):
942         """ Listen to traffic with the given parameters.
943         Method is non-blocking, returns immediately when traffic process
944         is running. Optional.
945
946         :param traffic_profile:
947         :return: True/False
948         """
949         pass
950
951     def verify_traffic(self, traffic_profile):
952         """ Verify captured traffic after it has ended. Optional.
953
954         :param traffic_profile:
955         :return: dict
956         """
957         pass
958
959     def collect_kpi(self):
960         # check if the tg processes have exited
961         for proc in (self._tg_process, self._traffic_process):
962             check_if_process_failed(proc)
963         result = self.resource_helper.collect_kpi()
964         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
965         return result
966
967     def terminate(self):
968         """ After this method finishes, all traffic processes should stop. Mandatory.
969
970         :return: True/False
971         """
972         self.traffic_finished = True
973         # we must kill client before we kill the server, or the client will raise exception
974         if self._traffic_process is not None:
975             # be proper and try to join before terminating
976             LOG.debug("joining before terminate %s", self._traffic_process.name)
977             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
978             self._traffic_process.terminate()
979         if self._tg_process is not None:
980             # be proper and try to join before terminating
981             LOG.debug("joining before terminate %s", self._tg_process.name)
982             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
983             self._tg_process.terminate()
984         # no terminate children here because we share processes with vnf