Merge "assertTrue(mock.called) -> mock.assert_called"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
42 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
45
46
47 LOG = logging.getLogger(__name__)
48
49
50 class SetupEnvHelper(object):
51
52     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
53     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
54     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
55     PIPELINE_COMMAND = ''
56     VNF_TYPE = "SAMPLE"
57
58     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
59         super(SetupEnvHelper, self).__init__()
60         self.vnfd_helper = vnfd_helper
61         self.ssh_helper = ssh_helper
62         self.scenario_helper = scenario_helper
63
64     def build_config(self):
65         raise NotImplementedError
66
67     def setup_vnf_environment(self):
68         pass
69
70     def kill_vnf(self):
71         pass
72
73     def tear_down(self):
74         raise NotImplementedError
75
76
77 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
78
79     APP_NAME = 'DpdkVnf'
80     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
81     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
82     HUGEPAGES_KB = 1024 * 1024 * 16
83
84     @staticmethod
85     def _update_packet_type(ip_pipeline_cfg, traffic_options):
86         match_str = 'pkt_type = ipv4'
87         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
88         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
89         return pipeline_config_str
90
91     @classmethod
92     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
93         traffic_type = traffic_options['traffic_type']
94
95         if traffic_options['vnf_type'] is not cls.APP_NAME:
96             match_str = 'traffic_type = 4'
97             replace_str = 'traffic_type = {0}'.format(traffic_type)
98
99         elif traffic_type == 4:
100             match_str = 'pkt_type = ipv4'
101             replace_str = 'pkt_type = ipv4'
102
103         else:
104             match_str = 'pkt_type = ipv4'
105             replace_str = 'pkt_type = ipv6'
106
107         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
108         return pipeline_config_str
109
110     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
111         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
112         self.all_ports = None
113         self.bound_pci = None
114         self.socket = None
115         self.used_drivers = None
116         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
117
118     def _setup_hugepages(self):
119         meminfo = utils.read_meminfo(self.ssh_helper)
120         hp_size_kb = int(meminfo['Hugepagesize'])
121         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
122         self.ssh_helper.execute('echo %s | sudo tee %s' %
123                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
124         hp = six.BytesIO()
125         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
126         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
127         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
128                  hp_size_kb, nr_hugepages, nr_hugepages_set)
129
130     def build_config(self):
131         vnf_cfg = self.scenario_helper.vnf_cfg
132         task_path = self.scenario_helper.task_path
133
134         config_file = vnf_cfg.get('file')
135         lb_count = vnf_cfg.get('lb_count', 3)
136         lb_config = vnf_cfg.get('lb_config', 'SW')
137         worker_config = vnf_cfg.get('worker_config', '1C/1T')
138         worker_threads = vnf_cfg.get('worker_threads', 3)
139
140         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
141         traffic_options = {
142             'traffic_type': traffic_type,
143             'pkt_type': 'ipv%s' % traffic_type,
144             'vnf_type': self.VNF_TYPE,
145         }
146
147         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
148                                                   task_path)
149         config_basename = posixpath.basename(self.CFG_CONFIG)
150         script_basename = posixpath.basename(self.CFG_SCRIPT)
151         multiport = MultiPortConfig(self.scenario_helper.topology,
152                                     config_tpl_cfg,
153                                     config_basename,
154                                     self.vnfd_helper,
155                                     self.VNF_TYPE,
156                                     lb_count,
157                                     worker_threads,
158                                     worker_config,
159                                     lb_config,
160                                     self.socket)
161
162         multiport.generate_config()
163         if config_file:
164             with utils.open_relative_file(config_file, task_path) as infile:
165                 new_config = ['[EAL]']
166                 vpci = []
167                 for port in self.vnfd_helper.port_pairs.all_ports:
168                     interface = self.vnfd_helper.find_interface(name=port)
169                     vpci.append(interface['virtual-interface']["vpci"])
170                 new_config.extend('w = {0}'.format(item) for item in vpci)
171                 new_config = '\n'.join(new_config) + '\n' + infile.read()
172         else:
173             with open(self.CFG_CONFIG) as handle:
174                 new_config = handle.read()
175             new_config = self._update_traffic_type(new_config, traffic_options)
176             new_config = self._update_packet_type(new_config, traffic_options)
177         self.ssh_helper.upload_config_file(config_basename, new_config)
178         self.ssh_helper.upload_config_file(script_basename,
179                                            multiport.generate_script(self.vnfd_helper))
180
181         LOG.info("Provision and start the %s", self.APP_NAME)
182         self._build_pipeline_kwargs()
183         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
184
185     def _build_pipeline_kwargs(self):
186         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
187         # count the number of actual ports in the list of pairs
188         # remove duplicate ports
189         # this is really a mapping from LINK ID to DPDK PMD ID
190         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
191         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
192         ports = self.vnfd_helper.port_pairs.all_ports
193         port_nums = self.vnfd_helper.port_nums(ports)
194         # create mask from all the dpdk port numbers
195         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
196         self.pipeline_kwargs = {
197             'cfg_file': self.CFG_CONFIG,
198             'script': self.CFG_SCRIPT,
199             'port_mask_hex': ports_mask_hex,
200             'tool_path': tool_path,
201         }
202
203     def setup_vnf_environment(self):
204         self._setup_dpdk()
205         self.kill_vnf()
206         # bind before _setup_resources so we can use dpdk_port_num
207         self._detect_and_bind_drivers()
208         resource = self._setup_resources()
209         return resource
210
211     def kill_vnf(self):
212         # pkill is not matching, debug with pgrep
213         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
214         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
215         # have to use exact match
216         # try using killall to match
217         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
218
219     def _setup_dpdk(self):
220         """Setup DPDK environment needed for VNF to run"""
221         self._setup_hugepages()
222         self.dpdk_bind_helper.load_dpdk_driver()
223
224         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
225         if exit_status == 0:
226             return
227
228     def get_collectd_options(self):
229         options = self.scenario_helper.all_options.get("collectd", {})
230         # override with specific node settings
231         options.update(self.scenario_helper.options.get("collectd", {}))
232         return options
233
234     def _setup_resources(self):
235         # what is this magic?  how do we know which socket is for which port?
236         # what about quad-socket?
237         if any(v[5] == "0" for v in self.bound_pci):
238             self.socket = 0
239         else:
240             self.socket = 1
241
242         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
243         # this won't work because we don't have DPDK port numbers yet
244         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
245         port_names = (intf["name"] for intf in ports)
246         collectd_options = self.get_collectd_options()
247         plugins = collectd_options.get("plugins", {})
248         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
249         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
250                                plugins=plugins, interval=collectd_options.get("interval"),
251                                timeout=self.scenario_helper.timeout)
252
253     def _check_interface_fields(self):
254         num_nodes = len(self.scenario_helper.nodes)
255         # OpenStack instance creation time is probably proportional to the number
256         # of instances
257         timeout = 120 * num_nodes
258         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
259                              self.ssh_helper, timeout)
260         dpdk_node.check()
261
262     def _detect_and_bind_drivers(self):
263         interfaces = self.vnfd_helper.interfaces
264
265         self._check_interface_fields()
266         # check for bound after probe
267         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
268
269         self.dpdk_bind_helper.read_status()
270         self.dpdk_bind_helper.save_used_drivers()
271
272         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
273
274         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
275         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
276             try:
277                 intf = next(v for v in interfaces
278                             if vpci == v['virtual-interface']['vpci'])
279                 # force to int
280                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
281             except:  # pylint: disable=bare-except
282                 pass
283         time.sleep(2)
284
285     def get_local_iface_name_by_vpci(self, vpci):
286         find_net_cmd = self.FIND_NET_CMD.format(vpci)
287         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
288         if exit_status == 0:
289             return stdout
290         return None
291
292     def tear_down(self):
293         self.dpdk_bind_helper.rebind_drivers()
294
295
296 class ResourceHelper(object):
297
298     COLLECT_KPI = ''
299     MAKE_INSTALL = 'cd {0} && make && sudo make install'
300     RESOURCE_WORD = 'sample'
301
302     COLLECT_MAP = {}
303
304     def __init__(self, setup_helper):
305         super(ResourceHelper, self).__init__()
306         self.resource = None
307         self.setup_helper = setup_helper
308         self.ssh_helper = setup_helper.ssh_helper
309
310     def setup(self):
311         self.resource = self.setup_helper.setup_vnf_environment()
312
313     def generate_cfg(self):
314         pass
315
316     def _collect_resource_kpi(self):
317         result = {}
318         status = self.resource.check_if_system_agent_running("collectd")[0]
319         if status == 0:
320             result = self.resource.amqp_collect_nfvi_kpi()
321
322         result = {"core": result}
323         return result
324
325     def start_collect(self):
326         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
327         self.resource.start()
328         self.resource.amqp_process_for_nfvi_kpi()
329
330     def stop_collect(self):
331         if self.resource:
332             self.resource.stop()
333
334     def collect_kpi(self):
335         return self._collect_resource_kpi()
336
337
338 class ClientResourceHelper(ResourceHelper):
339
340     RUN_DURATION = 60
341     QUEUE_WAIT_TIME = 5
342     SYNC_PORT = 1
343     ASYNC_PORT = 2
344
345     def __init__(self, setup_helper):
346         super(ClientResourceHelper, self).__init__(setup_helper)
347         self.vnfd_helper = setup_helper.vnfd_helper
348         self.scenario_helper = setup_helper.scenario_helper
349
350         self.client = None
351         self.client_started = Value('i', 0)
352         self.all_ports = None
353         self._queue = Queue()
354         self._result = {}
355         self._terminated = Value('i', 0)
356
357     def _build_ports(self):
358         self.networks = self.vnfd_helper.port_pairs.networks
359         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
360         self.downlink_ports = \
361             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
362         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
363
364     def port_num(self, intf):
365         # by default return port num
366         return self.vnfd_helper.port_num(intf)
367
368     def get_stats(self, *args, **kwargs):
369         try:
370             return self.client.get_stats(*args, **kwargs)
371         except STLError:
372             LOG.exception("TRex client not connected")
373             return {}
374
375     def generate_samples(self, ports, key=None, default=None):
376         # needs to be used ports
377         last_result = self.get_stats(ports)
378         key_value = last_result.get(key, default)
379
380         if not isinstance(last_result, Mapping):  # added for mock unit test
381             self._terminated.value = 1
382             return {}
383
384         samples = {}
385         # recalculate port for interface and see if it matches ports provided
386         for intf in self.vnfd_helper.interfaces:
387             name = intf["name"]
388             port = self.vnfd_helper.port_num(name)
389             if port in ports:
390                 xe_value = last_result.get(port, {})
391                 samples[name] = {
392                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
393                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
394                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
395                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
396                     "in_packets": int(xe_value.get("ipackets", 0)),
397                     "out_packets": int(xe_value.get("opackets", 0)),
398                 }
399                 if key:
400                     samples[name][key] = key_value
401         return samples
402
403     def _run_traffic_once(self, traffic_profile):
404         traffic_profile.execute_traffic(self)
405         self.client_started.value = 1
406         time.sleep(self.RUN_DURATION)
407         samples = self.generate_samples(traffic_profile.ports)
408         time.sleep(self.QUEUE_WAIT_TIME)
409         self._queue.put(samples)
410
411     def run_traffic(self, traffic_profile):
412         # if we don't do this we can hang waiting for the queue to drain
413         # have to do this in the subprocess
414         self._queue.cancel_join_thread()
415         # fixme: fix passing correct trex config file,
416         # instead of searching the default path
417         try:
418             self._build_ports()
419             self.client = self._connect()
420             self.client.reset(ports=self.all_ports)
421             self.client.remove_all_streams(self.all_ports)  # remove all streams
422             traffic_profile.register_generator(self)
423
424             while self._terminated.value == 0:
425                 self._run_traffic_once(traffic_profile)
426
427             self.client.stop(self.all_ports)
428             self.client.disconnect()
429             self._terminated.value = 0
430         except STLError:
431             if self._terminated.value:
432                 LOG.debug("traffic generator is stopped")
433                 return  # return if trex/tg server is stopped.
434             raise
435
436     def terminate(self):
437         self._terminated.value = 1  # stop client
438
439     def clear_stats(self, ports=None):
440         if ports is None:
441             ports = self.all_ports
442         self.client.clear_stats(ports=ports)
443
444     def start(self, ports=None, *args, **kwargs):
445         # pylint: disable=keyword-arg-before-vararg
446         # NOTE(ralonsoh): defining keyworded arguments before variable
447         # positional arguments is a bug. This function definition doesn't work
448         # in Python 2, although it works in Python 3. Reference:
449         # https://www.python.org/dev/peps/pep-3102/
450         if ports is None:
451             ports = self.all_ports
452         self.client.start(ports=ports, *args, **kwargs)
453
454     def collect_kpi(self):
455         if not self._queue.empty():
456             kpi = self._queue.get()
457             self._result.update(kpi)
458             LOG.debug('Got KPIs from _queue for %s %s',
459                       self.scenario_helper.name, self.RESOURCE_WORD)
460         return self._result
461
462     def _connect(self, client=None):
463         if client is None:
464             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
465                                server=self.vnfd_helper.mgmt_interface["ip"],
466                                verbose_level=LoggerApi.VERBOSE_QUIET)
467
468         # try to connect with 5s intervals, 30s max
469         for idx in range(6):
470             try:
471                 client.connect()
472                 break
473             except STLError:
474                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
475                 time.sleep(5)
476         return client
477
478
479 class Rfc2544ResourceHelper(object):
480
481     DEFAULT_CORRELATED_TRAFFIC = False
482     DEFAULT_LATENCY = False
483     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
484
485     def __init__(self, scenario_helper):
486         super(Rfc2544ResourceHelper, self).__init__()
487         self.scenario_helper = scenario_helper
488         self._correlated_traffic = None
489         self.iteration = Value('i', 0)
490         self._latency = None
491         self._rfc2544 = None
492         self._tolerance_low = None
493         self._tolerance_high = None
494
495     @property
496     def rfc2544(self):
497         if self._rfc2544 is None:
498             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
499         return self._rfc2544
500
501     @property
502     def tolerance_low(self):
503         if self._tolerance_low is None:
504             self.get_rfc_tolerance()
505         return self._tolerance_low
506
507     @property
508     def tolerance_high(self):
509         if self._tolerance_high is None:
510             self.get_rfc_tolerance()
511         return self._tolerance_high
512
513     @property
514     def correlated_traffic(self):
515         if self._correlated_traffic is None:
516             self._correlated_traffic = \
517                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
518
519         return self._correlated_traffic
520
521     @property
522     def latency(self):
523         if self._latency is None:
524             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
525         return self._latency
526
527     def get_rfc2544(self, name, default=None):
528         return self.rfc2544.get(name, default)
529
530     def get_rfc_tolerance(self):
531         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
532         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
533         self._tolerance_low = next(tolerance_iter)
534         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
535
536
537 class SampleVNFDeployHelper(object):
538
539     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
540     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
541     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
542
543     def __init__(self, vnfd_helper, ssh_helper):
544         super(SampleVNFDeployHelper, self).__init__()
545         self.ssh_helper = ssh_helper
546         self.vnfd_helper = vnfd_helper
547
548     def deploy_vnfs(self, app_name):
549         vnf_bin = self.ssh_helper.join_bin_path(app_name)
550         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
551         if not exit_status:
552             return
553
554         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
555         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
556         time.sleep(2)
557         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
558         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
559
560         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
561         time.sleep(2)
562         http_proxy = os.environ.get('http_proxy', '')
563         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
564         LOG.debug(cmd)
565         self.ssh_helper.execute(cmd)
566         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
567         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
568         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
569
570
571 class ScenarioHelper(object):
572
573     DEFAULT_VNF_CFG = {
574         'lb_config': 'SW',
575         'lb_count': 1,
576         'worker_config': '1C/1T',
577         'worker_threads': 1,
578     }
579
580     def __init__(self, name):
581         self.name = name
582         self.scenario_cfg = None
583
584     @property
585     def task_path(self):
586         return self.scenario_cfg['task_path']
587
588     @property
589     def nodes(self):
590         return self.scenario_cfg.get('nodes')
591
592     @property
593     def all_options(self):
594         return self.scenario_cfg.get('options', {})
595
596     @property
597     def options(self):
598         return self.all_options.get(self.name, {})
599
600     @property
601     def vnf_cfg(self):
602         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
603
604     @property
605     def topology(self):
606         return self.scenario_cfg['topology']
607
608     @property
609     def timeout(self):
610         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
611             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
612         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
613         return test_duration if test_duration > test_timeout else test_timeout
614
615 class SampleVNF(GenericVNF):
616     """ Class providing file-like API for generic VNF implementation """
617
618     VNF_PROMPT = "pipeline>"
619     WAIT_TIME = 1
620     WAIT_TIME_FOR_SCRIPT = 10
621     APP_NAME = "SampleVNF"
622     # we run the VNF interactively, so the ssh command will timeout after this long
623
624     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
625         super(SampleVNF, self).__init__(name, vnfd)
626         self.bin_path = get_nsb_option('bin_path', '')
627
628         self.scenario_helper = ScenarioHelper(self.name)
629         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
630
631         if setup_env_helper_type is None:
632             setup_env_helper_type = SetupEnvHelper
633
634         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
635                                                   self.ssh_helper,
636                                                   self.scenario_helper)
637
638         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
639
640         if resource_helper_type is None:
641             resource_helper_type = ResourceHelper
642
643         self.resource_helper = resource_helper_type(self.setup_helper)
644
645         self.context_cfg = None
646         self.nfvi_context = None
647         self.pipeline_kwargs = {}
648         self.uplink_ports = None
649         self.downlink_ports = None
650         # NOTE(esm): make QueueFileWrapper invert-able so that we
651         #            never have to manage the queues
652         self.q_in = Queue()
653         self.q_out = Queue()
654         self.queue_wrapper = None
655         self.run_kwargs = {}
656         self.used_drivers = {}
657         self.vnf_port_pairs = None
658         self._vnf_process = None
659
660     def _build_ports(self):
661         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
662         self.networks = self._port_pairs.networks
663         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
664         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
665         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
666
667     def _get_route_data(self, route_index, route_type):
668         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
669         for _ in range(route_index):
670             next(route_iter, '')
671         return next(route_iter, {}).get(route_type, '')
672
673     def _get_port0localip6(self):
674         return_value = self._get_route_data(0, 'network')
675         LOG.info("_get_port0localip6 : %s", return_value)
676         return return_value
677
678     def _get_port1localip6(self):
679         return_value = self._get_route_data(1, 'network')
680         LOG.info("_get_port1localip6 : %s", return_value)
681         return return_value
682
683     def _get_port0prefixlen6(self):
684         return_value = self._get_route_data(0, 'netmask')
685         LOG.info("_get_port0prefixlen6 : %s", return_value)
686         return return_value
687
688     def _get_port1prefixlen6(self):
689         return_value = self._get_route_data(1, 'netmask')
690         LOG.info("_get_port1prefixlen6 : %s", return_value)
691         return return_value
692
693     def _get_port0gateway6(self):
694         return_value = self._get_route_data(0, 'network')
695         LOG.info("_get_port0gateway6 : %s", return_value)
696         return return_value
697
698     def _get_port1gateway6(self):
699         return_value = self._get_route_data(1, 'network')
700         LOG.info("_get_port1gateway6 : %s", return_value)
701         return return_value
702
703     def _start_vnf(self):
704         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
705         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
706         self._vnf_process = Process(name=name, target=self._run)
707         self._vnf_process.start()
708
709     def _vnf_up_post(self):
710         pass
711
712     def instantiate(self, scenario_cfg, context_cfg):
713         self.scenario_helper.scenario_cfg = scenario_cfg
714         self.context_cfg = context_cfg
715         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
716         # self.nfvi_context = None
717
718         # vnf deploy is unsupported, use ansible playbooks
719         if self.scenario_helper.options.get("vnf_deploy", False):
720             self.deploy_helper.deploy_vnfs(self.APP_NAME)
721         self.resource_helper.setup()
722         self._start_vnf()
723
724     def wait_for_instantiate(self):
725         buf = []
726         time.sleep(self.WAIT_TIME)  # Give some time for config to load
727         while True:
728             if not self._vnf_process.is_alive():
729                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
730
731             # NOTE(esm): move to QueueFileWrapper
732             while self.q_out.qsize() > 0:
733                 buf.append(self.q_out.get())
734                 message = ''.join(buf)
735                 if self.VNF_PROMPT in message:
736                     LOG.info("%s VNF is up and running.", self.APP_NAME)
737                     self._vnf_up_post()
738                     self.queue_wrapper.clear()
739                     self.resource_helper.start_collect()
740                     return self._vnf_process.exitcode
741
742                 if "PANIC" in message:
743                     raise RuntimeError("Error starting %s VNF." %
744                                        self.APP_NAME)
745
746             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
747             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
748             # Send ENTER to display a new prompt in case the prompt text was corrupted
749             # by other VNF output
750             self.q_in.put('\r\n')
751
752     def _build_run_kwargs(self):
753         self.run_kwargs = {
754             'stdin': self.queue_wrapper,
755             'stdout': self.queue_wrapper,
756             'keep_stdin_open': True,
757             'pty': True,
758             'timeout': self.scenario_helper.timeout,
759         }
760
761     def _build_config(self):
762         return self.setup_helper.build_config()
763
764     def _run(self):
765         # we can't share ssh paramiko objects to force new connection
766         self.ssh_helper.drop_connection()
767         cmd = self._build_config()
768         # kill before starting
769         self.setup_helper.kill_vnf()
770
771         LOG.debug(cmd)
772         self._build_run_kwargs()
773         self.ssh_helper.run(cmd, **self.run_kwargs)
774
775     def vnf_execute(self, cmd, wait_time=2):
776         """ send cmd to vnf process """
777
778         LOG.info("%s command: %s", self.APP_NAME, cmd)
779         self.q_in.put("{}\r\n".format(cmd))
780         time.sleep(wait_time)
781         output = []
782         while self.q_out.qsize() > 0:
783             output.append(self.q_out.get())
784         return "".join(output)
785
786     def _tear_down(self):
787         pass
788
789     def terminate(self):
790         self.vnf_execute("quit")
791         self.setup_helper.kill_vnf()
792         self._tear_down()
793         self.resource_helper.stop_collect()
794         if self._vnf_process is not None:
795             # be proper and join first before we kill
796             LOG.debug("joining before terminate %s", self._vnf_process.name)
797             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
798             self._vnf_process.terminate()
799         # no terminate children here because we share processes with tg
800
801     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
802         """Method for checking the statistics
803
804         This method could be overridden in children classes.
805
806         :return: VNF statistics
807         """
808         cmd = 'p {0} stats'.format(self.APP_WORD)
809         out = self.vnf_execute(cmd)
810         return out
811
812     def collect_kpi(self):
813         # we can't get KPIs if the VNF is down
814         check_if_process_failed(self._vnf_process)
815         stats = self.get_stats()
816         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
817         if m:
818             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
819             result["collect_stats"] = self.resource_helper.collect_kpi()
820         else:
821             result = {
822                 "packets_in": 0,
823                 "packets_fwd": 0,
824                 "packets_dropped": 0,
825             }
826         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
827         return result
828
829     def scale(self, flavor=""):
830         """The SampleVNF base class doesn't provide the 'scale' feature"""
831         raise y_exceptions.FunctionNotImplemented(
832             function_name='scale', class_name='SampleVNFTrafficGen')
833
834
835 class SampleVNFTrafficGen(GenericTrafficGen):
836     """ Class providing file-like API for generic traffic generator """
837
838     APP_NAME = 'Sample'
839     RUN_WAIT = 1
840
841     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
842         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
843         self.bin_path = get_nsb_option('bin_path', '')
844
845         self.scenario_helper = ScenarioHelper(self.name)
846         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
847
848         if setup_env_helper_type is None:
849             setup_env_helper_type = SetupEnvHelper
850
851         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
852                                                   self.ssh_helper,
853                                                   self.scenario_helper)
854
855         if resource_helper_type is None:
856             resource_helper_type = ClientResourceHelper
857
858         self.resource_helper = resource_helper_type(self.setup_helper)
859
860         self.runs_traffic = True
861         self.traffic_finished = False
862         self._tg_process = None
863         self._traffic_process = None
864
865     def _start_server(self):
866         # we can't share ssh paramiko objects to force new connection
867         self.ssh_helper.drop_connection()
868
869     def instantiate(self, scenario_cfg, context_cfg):
870         self.scenario_helper.scenario_cfg = scenario_cfg
871         self.resource_helper.setup()
872         # must generate_cfg after DPDK bind because we need port number
873         self.resource_helper.generate_cfg()
874
875         LOG.info("Starting %s server...", self.APP_NAME)
876         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
877         self._tg_process = Process(name=name, target=self._start_server)
878         self._tg_process.start()
879
880     def _check_status(self):
881         raise NotImplementedError
882
883     def _wait_for_process(self):
884         while True:
885             if not self._tg_process.is_alive():
886                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
887             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
888             time.sleep(1)
889             status = self._check_status()
890             if status == 0:
891                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
892                 return self._tg_process.exitcode
893
894     def _traffic_runner(self, traffic_profile):
895         # always drop connections first thing in new processes
896         # so we don't get paramiko errors
897         self.ssh_helper.drop_connection()
898         LOG.info("Starting %s client...", self.APP_NAME)
899         self.resource_helper.run_traffic(traffic_profile)
900
901     def run_traffic(self, traffic_profile):
902         """ Generate traffic on the wire according to the given params.
903         Method is non-blocking, returns immediately when traffic process
904         is running. Mandatory.
905
906         :param traffic_profile:
907         :return: True/False
908         """
909         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
910                                     os.getpid())
911         self._traffic_process = Process(name=name, target=self._traffic_runner,
912                                         args=(traffic_profile,))
913         self._traffic_process.start()
914         # Wait for traffic process to start
915         while self.resource_helper.client_started.value == 0:
916             time.sleep(self.RUN_WAIT)
917             # what if traffic process takes a few seconds to start?
918             if not self._traffic_process.is_alive():
919                 break
920
921         return self._traffic_process.is_alive()
922
923     def collect_kpi(self):
924         # check if the tg processes have exited
925         for proc in (self._tg_process, self._traffic_process):
926             check_if_process_failed(proc)
927         result = self.resource_helper.collect_kpi()
928         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
929         return result
930
931     def terminate(self):
932         """ After this method finishes, all traffic processes should stop. Mandatory.
933
934         :return: True/False
935         """
936         self.traffic_finished = True
937         # we must kill client before we kill the server, or the client will raise exception
938         if self._traffic_process is not None:
939             # be proper and try to join before terminating
940             LOG.debug("joining before terminate %s", self._traffic_process.name)
941             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
942             self._traffic_process.terminate()
943         if self._tg_process is not None:
944             # be proper and try to join before terminating
945             LOG.debug("joining before terminate %s", self._tg_process.name)
946             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
947             self._tg_process.terminate()
948         # no terminate children here because we share processes with vnf
949
950     def scale(self, flavor=""):
951         """A traffic generator VFN doesn't provide the 'scale' feature"""
952         raise y_exceptions.FunctionNotImplemented(
953             function_name='scale', class_name='SampleVNFTrafficGen')