Merge "Enable heat context to support existing network"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services.constants import DEFAULT_VNF_TIMEOUT
36 from yardstick.network_services.constants import PROCESS_JOIN_TIMEOUT
37 from yardstick.network_services.constants import REMOTE_TMP
38 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
39 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
40 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
41 from yardstick.network_services.nfvi.resource import ResourceProfile
42 from yardstick.network_services.utils import get_nsb_option
43 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
44 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
45 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
46 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
47
48
49 LOG = logging.getLogger(__name__)
50
51
52 class SetupEnvHelper(object):
53
54     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
55     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
56     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
57     PIPELINE_COMMAND = ''
58     VNF_TYPE = "SAMPLE"
59
60     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
61         super(SetupEnvHelper, self).__init__()
62         self.vnfd_helper = vnfd_helper
63         self.ssh_helper = ssh_helper
64         self.scenario_helper = scenario_helper
65
66     def build_config(self):
67         raise NotImplementedError
68
69     def setup_vnf_environment(self):
70         pass
71
72     def kill_vnf(self):
73         pass
74
75     def tear_down(self):
76         raise NotImplementedError
77
78
79 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
80
81     APP_NAME = 'DpdkVnf'
82     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
83     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
84     HUGEPAGES_KB = 1024 * 1024 * 16
85
86     @staticmethod
87     def _update_packet_type(ip_pipeline_cfg, traffic_options):
88         match_str = 'pkt_type = ipv4'
89         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
90         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
91         return pipeline_config_str
92
93     @classmethod
94     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
95         traffic_type = traffic_options['traffic_type']
96
97         if traffic_options['vnf_type'] is not cls.APP_NAME:
98             match_str = 'traffic_type = 4'
99             replace_str = 'traffic_type = {0}'.format(traffic_type)
100
101         elif traffic_type == 4:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv4'
104
105         else:
106             match_str = 'pkt_type = ipv4'
107             replace_str = 'pkt_type = ipv6'
108
109         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
110         return pipeline_config_str
111
112     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
113         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
114         self.all_ports = None
115         self.bound_pci = None
116         self.socket = None
117         self.used_drivers = None
118         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
119
120     def _setup_hugepages(self):
121         meminfo = utils.read_meminfo(self.ssh_helper)
122         hp_size_kb = int(meminfo['Hugepagesize'])
123         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
124         self.ssh_helper.execute('echo %s | sudo tee %s' %
125                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
126         hp = six.BytesIO()
127         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
128         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
129         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
130                  hp_size_kb, nr_hugepages, nr_hugepages_set)
131
132     def build_config(self):
133         vnf_cfg = self.scenario_helper.vnf_cfg
134         task_path = self.scenario_helper.task_path
135
136         config_file = vnf_cfg.get('file')
137         lb_count = vnf_cfg.get('lb_count', 3)
138         lb_config = vnf_cfg.get('lb_config', 'SW')
139         worker_config = vnf_cfg.get('worker_config', '1C/1T')
140         worker_threads = vnf_cfg.get('worker_threads', 3)
141
142         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
143         traffic_options = {
144             'traffic_type': traffic_type,
145             'pkt_type': 'ipv%s' % traffic_type,
146             'vnf_type': self.VNF_TYPE,
147         }
148
149         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
150                                                   task_path)
151         config_basename = posixpath.basename(self.CFG_CONFIG)
152         script_basename = posixpath.basename(self.CFG_SCRIPT)
153         multiport = MultiPortConfig(self.scenario_helper.topology,
154                                     config_tpl_cfg,
155                                     config_basename,
156                                     self.vnfd_helper,
157                                     self.VNF_TYPE,
158                                     lb_count,
159                                     worker_threads,
160                                     worker_config,
161                                     lb_config,
162                                     self.socket)
163
164         multiport.generate_config()
165         if config_file:
166             with utils.open_relative_file(config_file, task_path) as infile:
167                 new_config = ['[EAL]']
168                 vpci = []
169                 for port in self.vnfd_helper.port_pairs.all_ports:
170                     interface = self.vnfd_helper.find_interface(name=port)
171                     vpci.append(interface['virtual-interface']["vpci"])
172                 new_config.extend('w = {0}'.format(item) for item in vpci)
173                 new_config = '\n'.join(new_config) + '\n' + infile.read()
174         else:
175             with open(self.CFG_CONFIG) as handle:
176                 new_config = handle.read()
177             new_config = self._update_traffic_type(new_config, traffic_options)
178             new_config = self._update_packet_type(new_config, traffic_options)
179         self.ssh_helper.upload_config_file(config_basename, new_config)
180         self.ssh_helper.upload_config_file(script_basename,
181                                            multiport.generate_script(self.vnfd_helper))
182
183         LOG.info("Provision and start the %s", self.APP_NAME)
184         self._build_pipeline_kwargs()
185         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
186
187     def _build_pipeline_kwargs(self):
188         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
189         # count the number of actual ports in the list of pairs
190         # remove duplicate ports
191         # this is really a mapping from LINK ID to DPDK PMD ID
192         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
193         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
194         ports = self.vnfd_helper.port_pairs.all_ports
195         port_nums = self.vnfd_helper.port_nums(ports)
196         # create mask from all the dpdk port numbers
197         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
198         self.pipeline_kwargs = {
199             'cfg_file': self.CFG_CONFIG,
200             'script': self.CFG_SCRIPT,
201             'port_mask_hex': ports_mask_hex,
202             'tool_path': tool_path,
203         }
204
205     def setup_vnf_environment(self):
206         self._setup_dpdk()
207         self.kill_vnf()
208         # bind before _setup_resources so we can use dpdk_port_num
209         self._detect_and_bind_drivers()
210         resource = self._setup_resources()
211         return resource
212
213     def kill_vnf(self):
214         # pkill is not matching, debug with pgrep
215         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
216         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
217         # have to use exact match
218         # try using killall to match
219         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
220
221     def _setup_dpdk(self):
222         """Setup DPDK environment needed for VNF to run"""
223         self._setup_hugepages()
224         self.dpdk_bind_helper.load_dpdk_driver()
225
226         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
227         if exit_status == 0:
228             return
229
230     def get_collectd_options(self):
231         options = self.scenario_helper.all_options.get("collectd", {})
232         # override with specific node settings
233         options.update(self.scenario_helper.options.get("collectd", {}))
234         return options
235
236     def _setup_resources(self):
237         # what is this magic?  how do we know which socket is for which port?
238         # what about quad-socket?
239         if any(v[5] == "0" for v in self.bound_pci):
240             self.socket = 0
241         else:
242             self.socket = 1
243
244         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
245         # this won't work because we don't have DPDK port numbers yet
246         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
247         port_names = (intf["name"] for intf in ports)
248         collectd_options = self.get_collectd_options()
249         plugins = collectd_options.get("plugins", {})
250         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
251         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
252                                plugins=plugins, interval=collectd_options.get("interval"),
253                                timeout=self.scenario_helper.timeout)
254
255     def _check_interface_fields(self):
256         num_nodes = len(self.scenario_helper.nodes)
257         # OpenStack instance creation time is probably proportional to the number
258         # of instances
259         timeout = 120 * num_nodes
260         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
261                              self.ssh_helper, timeout)
262         dpdk_node.check()
263
264     def _detect_and_bind_drivers(self):
265         interfaces = self.vnfd_helper.interfaces
266
267         self._check_interface_fields()
268         # check for bound after probe
269         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
270
271         self.dpdk_bind_helper.read_status()
272         self.dpdk_bind_helper.save_used_drivers()
273
274         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
275
276         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
277         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
278             try:
279                 intf = next(v for v in interfaces
280                             if vpci == v['virtual-interface']['vpci'])
281                 # force to int
282                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
283             except:  # pylint: disable=bare-except
284                 pass
285         time.sleep(2)
286
287     def get_local_iface_name_by_vpci(self, vpci):
288         find_net_cmd = self.FIND_NET_CMD.format(vpci)
289         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
290         if exit_status == 0:
291             return stdout
292         return None
293
294     def tear_down(self):
295         self.dpdk_bind_helper.rebind_drivers()
296
297
298 class ResourceHelper(object):
299
300     COLLECT_KPI = ''
301     MAKE_INSTALL = 'cd {0} && make && sudo make install'
302     RESOURCE_WORD = 'sample'
303
304     COLLECT_MAP = {}
305
306     def __init__(self, setup_helper):
307         super(ResourceHelper, self).__init__()
308         self.resource = None
309         self.setup_helper = setup_helper
310         self.ssh_helper = setup_helper.ssh_helper
311
312     def setup(self):
313         self.resource = self.setup_helper.setup_vnf_environment()
314
315     def generate_cfg(self):
316         pass
317
318     def _collect_resource_kpi(self):
319         result = {}
320         status = self.resource.check_if_system_agent_running("collectd")[0]
321         if status == 0:
322             result = self.resource.amqp_collect_nfvi_kpi()
323
324         result = {"core": result}
325         return result
326
327     def start_collect(self):
328         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
329         self.resource.start()
330         self.resource.amqp_process_for_nfvi_kpi()
331
332     def stop_collect(self):
333         if self.resource:
334             self.resource.stop()
335
336     def collect_kpi(self):
337         return self._collect_resource_kpi()
338
339
340 class ClientResourceHelper(ResourceHelper):
341
342     RUN_DURATION = 60
343     QUEUE_WAIT_TIME = 5
344     SYNC_PORT = 1
345     ASYNC_PORT = 2
346
347     def __init__(self, setup_helper):
348         super(ClientResourceHelper, self).__init__(setup_helper)
349         self.vnfd_helper = setup_helper.vnfd_helper
350         self.scenario_helper = setup_helper.scenario_helper
351
352         self.client = None
353         self.client_started = Value('i', 0)
354         self.all_ports = None
355         self._queue = Queue()
356         self._result = {}
357         self._terminated = Value('i', 0)
358
359     def _build_ports(self):
360         self.networks = self.vnfd_helper.port_pairs.networks
361         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
362         self.downlink_ports = \
363             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
364         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
365
366     def port_num(self, intf):
367         # by default return port num
368         return self.vnfd_helper.port_num(intf)
369
370     def get_stats(self, *args, **kwargs):
371         try:
372             return self.client.get_stats(*args, **kwargs)
373         except STLError:
374             LOG.exception("TRex client not connected")
375             return {}
376
377     def generate_samples(self, ports, key=None, default=None):
378         # needs to be used ports
379         last_result = self.get_stats(ports)
380         key_value = last_result.get(key, default)
381
382         if not isinstance(last_result, Mapping):  # added for mock unit test
383             self._terminated.value = 1
384             return {}
385
386         samples = {}
387         # recalculate port for interface and see if it matches ports provided
388         for intf in self.vnfd_helper.interfaces:
389             name = intf["name"]
390             port = self.vnfd_helper.port_num(name)
391             if port in ports:
392                 xe_value = last_result.get(port, {})
393                 samples[name] = {
394                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
395                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
396                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
397                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
398                     "in_packets": int(xe_value.get("ipackets", 0)),
399                     "out_packets": int(xe_value.get("opackets", 0)),
400                 }
401                 if key:
402                     samples[name][key] = key_value
403         return samples
404
405     def _run_traffic_once(self, traffic_profile):
406         traffic_profile.execute_traffic(self)
407         self.client_started.value = 1
408         time.sleep(self.RUN_DURATION)
409         samples = self.generate_samples(traffic_profile.ports)
410         time.sleep(self.QUEUE_WAIT_TIME)
411         self._queue.put(samples)
412
413     def run_traffic(self, traffic_profile):
414         # if we don't do this we can hang waiting for the queue to drain
415         # have to do this in the subprocess
416         self._queue.cancel_join_thread()
417         # fixme: fix passing correct trex config file,
418         # instead of searching the default path
419         try:
420             self._build_ports()
421             self.client = self._connect()
422             self.client.reset(ports=self.all_ports)
423             self.client.remove_all_streams(self.all_ports)  # remove all streams
424             traffic_profile.register_generator(self)
425
426             while self._terminated.value == 0:
427                 self._run_traffic_once(traffic_profile)
428
429             self.client.stop(self.all_ports)
430             self.client.disconnect()
431             self._terminated.value = 0
432         except STLError:
433             if self._terminated.value:
434                 LOG.debug("traffic generator is stopped")
435                 return  # return if trex/tg server is stopped.
436             raise
437
438     def terminate(self):
439         self._terminated.value = 1  # stop client
440
441     def clear_stats(self, ports=None):
442         if ports is None:
443             ports = self.all_ports
444         self.client.clear_stats(ports=ports)
445
446     def start(self, ports=None, *args, **kwargs):
447         # pylint: disable=keyword-arg-before-vararg
448         # NOTE(ralonsoh): defining keyworded arguments before variable
449         # positional arguments is a bug. This function definition doesn't work
450         # in Python 2, although it works in Python 3. Reference:
451         # https://www.python.org/dev/peps/pep-3102/
452         if ports is None:
453             ports = self.all_ports
454         self.client.start(ports=ports, *args, **kwargs)
455
456     def collect_kpi(self):
457         if not self._queue.empty():
458             kpi = self._queue.get()
459             self._result.update(kpi)
460             LOG.debug('Got KPIs from _queue for %s %s',
461                       self.scenario_helper.name, self.RESOURCE_WORD)
462         return self._result
463
464     def _connect(self, client=None):
465         if client is None:
466             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
467                                server=self.vnfd_helper.mgmt_interface["ip"],
468                                verbose_level=LoggerApi.VERBOSE_QUIET)
469
470         # try to connect with 5s intervals, 30s max
471         for idx in range(6):
472             try:
473                 client.connect()
474                 break
475             except STLError:
476                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
477                 time.sleep(5)
478         return client
479
480
481 class Rfc2544ResourceHelper(object):
482
483     DEFAULT_CORRELATED_TRAFFIC = False
484     DEFAULT_LATENCY = False
485     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
486
487     def __init__(self, scenario_helper):
488         super(Rfc2544ResourceHelper, self).__init__()
489         self.scenario_helper = scenario_helper
490         self._correlated_traffic = None
491         self.iteration = Value('i', 0)
492         self._latency = None
493         self._rfc2544 = None
494         self._tolerance_low = None
495         self._tolerance_high = None
496
497     @property
498     def rfc2544(self):
499         if self._rfc2544 is None:
500             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
501         return self._rfc2544
502
503     @property
504     def tolerance_low(self):
505         if self._tolerance_low is None:
506             self.get_rfc_tolerance()
507         return self._tolerance_low
508
509     @property
510     def tolerance_high(self):
511         if self._tolerance_high is None:
512             self.get_rfc_tolerance()
513         return self._tolerance_high
514
515     @property
516     def correlated_traffic(self):
517         if self._correlated_traffic is None:
518             self._correlated_traffic = \
519                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
520
521         return self._correlated_traffic
522
523     @property
524     def latency(self):
525         if self._latency is None:
526             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
527         return self._latency
528
529     def get_rfc2544(self, name, default=None):
530         return self.rfc2544.get(name, default)
531
532     def get_rfc_tolerance(self):
533         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
534         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
535         self._tolerance_low = next(tolerance_iter)
536         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
537
538
539 class SampleVNFDeployHelper(object):
540
541     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
542     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
543     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
544
545     def __init__(self, vnfd_helper, ssh_helper):
546         super(SampleVNFDeployHelper, self).__init__()
547         self.ssh_helper = ssh_helper
548         self.vnfd_helper = vnfd_helper
549
550     def deploy_vnfs(self, app_name):
551         vnf_bin = self.ssh_helper.join_bin_path(app_name)
552         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
553         if not exit_status:
554             return
555
556         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
557         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
558         time.sleep(2)
559         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
560         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
561
562         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
563         time.sleep(2)
564         http_proxy = os.environ.get('http_proxy', '')
565         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
566         LOG.debug(cmd)
567         self.ssh_helper.execute(cmd)
568         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
569         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
570         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
571
572
573 class ScenarioHelper(object):
574
575     DEFAULT_VNF_CFG = {
576         'lb_config': 'SW',
577         'lb_count': 1,
578         'worker_config': '1C/1T',
579         'worker_threads': 1,
580     }
581
582     def __init__(self, name):
583         self.name = name
584         self.scenario_cfg = None
585
586     @property
587     def task_path(self):
588         return self.scenario_cfg['task_path']
589
590     @property
591     def nodes(self):
592         return self.scenario_cfg.get('nodes')
593
594     @property
595     def all_options(self):
596         return self.scenario_cfg.get('options', {})
597
598     @property
599     def options(self):
600         return self.all_options.get(self.name, {})
601
602     @property
603     def vnf_cfg(self):
604         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
605
606     @property
607     def topology(self):
608         return self.scenario_cfg['topology']
609
610     @property
611     def timeout(self):
612         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
613
614
615 class SampleVNF(GenericVNF):
616     """ Class providing file-like API for generic VNF implementation """
617
618     VNF_PROMPT = "pipeline>"
619     WAIT_TIME = 1
620     WAIT_TIME_FOR_SCRIPT = 10
621     APP_NAME = "SampleVNF"
622     # we run the VNF interactively, so the ssh command will timeout after this long
623
624     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
625         super(SampleVNF, self).__init__(name, vnfd)
626         self.bin_path = get_nsb_option('bin_path', '')
627
628         self.scenario_helper = ScenarioHelper(self.name)
629         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
630
631         if setup_env_helper_type is None:
632             setup_env_helper_type = SetupEnvHelper
633
634         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
635                                                   self.ssh_helper,
636                                                   self.scenario_helper)
637
638         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
639
640         if resource_helper_type is None:
641             resource_helper_type = ResourceHelper
642
643         self.resource_helper = resource_helper_type(self.setup_helper)
644
645         self.context_cfg = None
646         self.nfvi_context = None
647         self.pipeline_kwargs = {}
648         self.uplink_ports = None
649         self.downlink_ports = None
650         # NOTE(esm): make QueueFileWrapper invert-able so that we
651         #            never have to manage the queues
652         self.q_in = Queue()
653         self.q_out = Queue()
654         self.queue_wrapper = None
655         self.run_kwargs = {}
656         self.used_drivers = {}
657         self.vnf_port_pairs = None
658         self._vnf_process = None
659
660     def _build_ports(self):
661         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
662         self.networks = self._port_pairs.networks
663         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
664         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
665         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
666
667     def _get_route_data(self, route_index, route_type):
668         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
669         for _ in range(route_index):
670             next(route_iter, '')
671         return next(route_iter, {}).get(route_type, '')
672
673     def _get_port0localip6(self):
674         return_value = self._get_route_data(0, 'network')
675         LOG.info("_get_port0localip6 : %s", return_value)
676         return return_value
677
678     def _get_port1localip6(self):
679         return_value = self._get_route_data(1, 'network')
680         LOG.info("_get_port1localip6 : %s", return_value)
681         return return_value
682
683     def _get_port0prefixlen6(self):
684         return_value = self._get_route_data(0, 'netmask')
685         LOG.info("_get_port0prefixlen6 : %s", return_value)
686         return return_value
687
688     def _get_port1prefixlen6(self):
689         return_value = self._get_route_data(1, 'netmask')
690         LOG.info("_get_port1prefixlen6 : %s", return_value)
691         return return_value
692
693     def _get_port0gateway6(self):
694         return_value = self._get_route_data(0, 'network')
695         LOG.info("_get_port0gateway6 : %s", return_value)
696         return return_value
697
698     def _get_port1gateway6(self):
699         return_value = self._get_route_data(1, 'network')
700         LOG.info("_get_port1gateway6 : %s", return_value)
701         return return_value
702
703     def _start_vnf(self):
704         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
705         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
706         self._vnf_process = Process(name=name, target=self._run)
707         self._vnf_process.start()
708
709     def _vnf_up_post(self):
710         pass
711
712     def instantiate(self, scenario_cfg, context_cfg):
713         self.scenario_helper.scenario_cfg = scenario_cfg
714         self.context_cfg = context_cfg
715         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
716         # self.nfvi_context = None
717
718         # vnf deploy is unsupported, use ansible playbooks
719         if self.scenario_helper.options.get("vnf_deploy", False):
720             self.deploy_helper.deploy_vnfs(self.APP_NAME)
721         self.resource_helper.setup()
722         self._start_vnf()
723
724     def wait_for_instantiate(self):
725         buf = []
726         time.sleep(self.WAIT_TIME)  # Give some time for config to load
727         while True:
728             if not self._vnf_process.is_alive():
729                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
730
731             # NOTE(esm): move to QueueFileWrapper
732             while self.q_out.qsize() > 0:
733                 buf.append(self.q_out.get())
734                 message = ''.join(buf)
735                 if self.VNF_PROMPT in message:
736                     LOG.info("%s VNF is up and running.", self.APP_NAME)
737                     self._vnf_up_post()
738                     self.queue_wrapper.clear()
739                     self.resource_helper.start_collect()
740                     return self._vnf_process.exitcode
741
742                 if "PANIC" in message:
743                     raise RuntimeError("Error starting %s VNF." %
744                                        self.APP_NAME)
745
746             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
747             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
748             # Send ENTER to display a new prompt in case the prompt text was corrupted
749             # by other VNF output
750             self.q_in.put('\r\n')
751
752     def _build_run_kwargs(self):
753         self.run_kwargs = {
754             'stdin': self.queue_wrapper,
755             'stdout': self.queue_wrapper,
756             'keep_stdin_open': True,
757             'pty': True,
758             'timeout': self.scenario_helper.timeout,
759         }
760
761     def _build_config(self):
762         return self.setup_helper.build_config()
763
764     def _run(self):
765         # we can't share ssh paramiko objects to force new connection
766         self.ssh_helper.drop_connection()
767         cmd = self._build_config()
768         # kill before starting
769         self.setup_helper.kill_vnf()
770
771         LOG.debug(cmd)
772         self._build_run_kwargs()
773         self.ssh_helper.run(cmd, **self.run_kwargs)
774
775     def vnf_execute(self, cmd, wait_time=2):
776         """ send cmd to vnf process """
777
778         LOG.info("%s command: %s", self.APP_NAME, cmd)
779         self.q_in.put("{}\r\n".format(cmd))
780         time.sleep(wait_time)
781         output = []
782         while self.q_out.qsize() > 0:
783             output.append(self.q_out.get())
784         return "".join(output)
785
786     def _tear_down(self):
787         pass
788
789     def terminate(self):
790         self.vnf_execute("quit")
791         self.setup_helper.kill_vnf()
792         self._tear_down()
793         self.resource_helper.stop_collect()
794         if self._vnf_process is not None:
795             # be proper and join first before we kill
796             LOG.debug("joining before terminate %s", self._vnf_process.name)
797             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
798             self._vnf_process.terminate()
799         # no terminate children here because we share processes with tg
800
801     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
802         """Method for checking the statistics
803
804         This method could be overridden in children classes.
805
806         :return: VNF statistics
807         """
808         cmd = 'p {0} stats'.format(self.APP_WORD)
809         out = self.vnf_execute(cmd)
810         return out
811
812     def collect_kpi(self):
813         # we can't get KPIs if the VNF is down
814         check_if_process_failed(self._vnf_process)
815         stats = self.get_stats()
816         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
817         if m:
818             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
819             result["collect_stats"] = self.resource_helper.collect_kpi()
820         else:
821             result = {
822                 "packets_in": 0,
823                 "packets_fwd": 0,
824                 "packets_dropped": 0,
825             }
826         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
827         return result
828
829     def scale(self, flavor=""):
830         """The SampleVNF base class doesn't provide the 'scale' feature"""
831         raise y_exceptions.FunctionNotImplemented(
832             function_name='scale', class_name='SampleVNFTrafficGen')
833
834
835 class SampleVNFTrafficGen(GenericTrafficGen):
836     """ Class providing file-like API for generic traffic generator """
837
838     APP_NAME = 'Sample'
839     RUN_WAIT = 1
840
841     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
842         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
843         self.bin_path = get_nsb_option('bin_path', '')
844
845         self.scenario_helper = ScenarioHelper(self.name)
846         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
847
848         if setup_env_helper_type is None:
849             setup_env_helper_type = SetupEnvHelper
850
851         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
852                                                   self.ssh_helper,
853                                                   self.scenario_helper)
854
855         if resource_helper_type is None:
856             resource_helper_type = ClientResourceHelper
857
858         self.resource_helper = resource_helper_type(self.setup_helper)
859
860         self.runs_traffic = True
861         self.traffic_finished = False
862         self._tg_process = None
863         self._traffic_process = None
864
865     def _start_server(self):
866         # we can't share ssh paramiko objects to force new connection
867         self.ssh_helper.drop_connection()
868
869     def instantiate(self, scenario_cfg, context_cfg):
870         self.scenario_helper.scenario_cfg = scenario_cfg
871         self.resource_helper.setup()
872         # must generate_cfg after DPDK bind because we need port number
873         self.resource_helper.generate_cfg()
874
875         LOG.info("Starting %s server...", self.APP_NAME)
876         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
877         self._tg_process = Process(name=name, target=self._start_server)
878         self._tg_process.start()
879
880     def _check_status(self):
881         raise NotImplementedError
882
883     def _wait_for_process(self):
884         while True:
885             if not self._tg_process.is_alive():
886                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
887             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
888             time.sleep(1)
889             status = self._check_status()
890             if status == 0:
891                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
892                 return self._tg_process.exitcode
893
894     def _traffic_runner(self, traffic_profile):
895         # always drop connections first thing in new processes
896         # so we don't get paramiko errors
897         self.ssh_helper.drop_connection()
898         LOG.info("Starting %s client...", self.APP_NAME)
899         self.resource_helper.run_traffic(traffic_profile)
900
901     def run_traffic(self, traffic_profile):
902         """ Generate traffic on the wire according to the given params.
903         Method is non-blocking, returns immediately when traffic process
904         is running. Mandatory.
905
906         :param traffic_profile:
907         :return: True/False
908         """
909         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
910                                     os.getpid())
911         self._traffic_process = Process(name=name, target=self._traffic_runner,
912                                         args=(traffic_profile,))
913         self._traffic_process.start()
914         # Wait for traffic process to start
915         while self.resource_helper.client_started.value == 0:
916             time.sleep(self.RUN_WAIT)
917             # what if traffic process takes a few seconds to start?
918             if not self._traffic_process.is_alive():
919                 break
920
921         return self._traffic_process.is_alive()
922
923     def collect_kpi(self):
924         # check if the tg processes have exited
925         for proc in (self._tg_process, self._traffic_process):
926             check_if_process_failed(proc)
927         result = self.resource_helper.collect_kpi()
928         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
929         return result
930
931     def terminate(self):
932         """ After this method finishes, all traffic processes should stop. Mandatory.
933
934         :return: True/False
935         """
936         self.traffic_finished = True
937         # we must kill client before we kill the server, or the client will raise exception
938         if self._traffic_process is not None:
939             # be proper and try to join before terminating
940             LOG.debug("joining before terminate %s", self._traffic_process.name)
941             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
942             self._traffic_process.terminate()
943         if self._tg_process is not None:
944             # be proper and try to join before terminating
945             LOG.debug("joining before terminate %s", self._tg_process.name)
946             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
947             self._tg_process.terminate()
948         # no terminate children here because we share processes with vnf
949
950     def scale(self, flavor=""):
951         """A traffic generator VFN doesn't provide the 'scale' feature"""
952         raise y_exceptions.FunctionNotImplemented(
953             function_name='scale', class_name='SampleVNFTrafficGen')