Merge "NFVi Test stops after one hour even if configured context duration is longer"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services.constants import DEFAULT_VNF_TIMEOUT
36 from yardstick.network_services.constants import PROCESS_JOIN_TIMEOUT
37 from yardstick.network_services.constants import REMOTE_TMP
38 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
39 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
40 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
41 from yardstick.network_services.nfvi.resource import ResourceProfile
42 from yardstick.network_services.utils import get_nsb_option
43 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
44 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
45 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
46 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
47
48
49 LOG = logging.getLogger(__name__)
50
51
52 class SetupEnvHelper(object):
53
54     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
55     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
56     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
57     PIPELINE_COMMAND = ''
58     VNF_TYPE = "SAMPLE"
59
60     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
61         super(SetupEnvHelper, self).__init__()
62         self.vnfd_helper = vnfd_helper
63         self.ssh_helper = ssh_helper
64         self.scenario_helper = scenario_helper
65
66     def build_config(self):
67         raise NotImplementedError
68
69     def setup_vnf_environment(self):
70         pass
71
72     def kill_vnf(self):
73         pass
74
75     def tear_down(self):
76         raise NotImplementedError
77
78
79 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
80
81     APP_NAME = 'DpdkVnf'
82     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
83     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
84     HUGEPAGES_KB = 1024 * 1024 * 16
85
86     @staticmethod
87     def _update_packet_type(ip_pipeline_cfg, traffic_options):
88         match_str = 'pkt_type = ipv4'
89         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
90         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
91         return pipeline_config_str
92
93     @classmethod
94     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
95         traffic_type = traffic_options['traffic_type']
96
97         if traffic_options['vnf_type'] is not cls.APP_NAME:
98             match_str = 'traffic_type = 4'
99             replace_str = 'traffic_type = {0}'.format(traffic_type)
100
101         elif traffic_type == 4:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv4'
104
105         else:
106             match_str = 'pkt_type = ipv4'
107             replace_str = 'pkt_type = ipv6'
108
109         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
110         return pipeline_config_str
111
112     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
113         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
114         self.all_ports = None
115         self.bound_pci = None
116         self.socket = None
117         self.used_drivers = None
118         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
119
120     def _setup_hugepages(self):
121         meminfo = utils.read_meminfo(self.ssh_helper)
122         hp_size_kb = int(meminfo['Hugepagesize'])
123         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
124         self.ssh_helper.execute('echo %s | sudo tee %s' %
125                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
126         hp = six.BytesIO()
127         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
128         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
129         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
130                  hp_size_kb, nr_hugepages, nr_hugepages_set)
131
132     def build_config(self):
133         vnf_cfg = self.scenario_helper.vnf_cfg
134         task_path = self.scenario_helper.task_path
135
136         config_file = vnf_cfg.get('file')
137         lb_count = vnf_cfg.get('lb_count', 3)
138         lb_config = vnf_cfg.get('lb_config', 'SW')
139         worker_config = vnf_cfg.get('worker_config', '1C/1T')
140         worker_threads = vnf_cfg.get('worker_threads', 3)
141
142         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
143         traffic_options = {
144             'traffic_type': traffic_type,
145             'pkt_type': 'ipv%s' % traffic_type,
146             'vnf_type': self.VNF_TYPE,
147         }
148
149         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
150                                                   task_path)
151         config_basename = posixpath.basename(self.CFG_CONFIG)
152         script_basename = posixpath.basename(self.CFG_SCRIPT)
153         multiport = MultiPortConfig(self.scenario_helper.topology,
154                                     config_tpl_cfg,
155                                     config_basename,
156                                     self.vnfd_helper,
157                                     self.VNF_TYPE,
158                                     lb_count,
159                                     worker_threads,
160                                     worker_config,
161                                     lb_config,
162                                     self.socket)
163
164         multiport.generate_config()
165         if config_file:
166             with utils.open_relative_file(config_file, task_path) as infile:
167                 new_config = ['[EAL]']
168                 vpci = []
169                 for port in self.vnfd_helper.port_pairs.all_ports:
170                     interface = self.vnfd_helper.find_interface(name=port)
171                     vpci.append(interface['virtual-interface']["vpci"])
172                 new_config.extend('w = {0}'.format(item) for item in vpci)
173                 new_config = '\n'.join(new_config) + '\n' + infile.read()
174         else:
175             with open(self.CFG_CONFIG) as handle:
176                 new_config = handle.read()
177             new_config = self._update_traffic_type(new_config, traffic_options)
178             new_config = self._update_packet_type(new_config, traffic_options)
179         self.ssh_helper.upload_config_file(config_basename, new_config)
180         self.ssh_helper.upload_config_file(script_basename,
181                                            multiport.generate_script(self.vnfd_helper))
182
183         LOG.info("Provision and start the %s", self.APP_NAME)
184         self._build_pipeline_kwargs()
185         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
186
187     def _build_pipeline_kwargs(self):
188         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
189         # count the number of actual ports in the list of pairs
190         # remove duplicate ports
191         # this is really a mapping from LINK ID to DPDK PMD ID
192         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
193         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
194         ports = self.vnfd_helper.port_pairs.all_ports
195         port_nums = self.vnfd_helper.port_nums(ports)
196         # create mask from all the dpdk port numbers
197         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
198         self.pipeline_kwargs = {
199             'cfg_file': self.CFG_CONFIG,
200             'script': self.CFG_SCRIPT,
201             'port_mask_hex': ports_mask_hex,
202             'tool_path': tool_path,
203         }
204
205     def setup_vnf_environment(self):
206         self._setup_dpdk()
207         self.kill_vnf()
208         # bind before _setup_resources so we can use dpdk_port_num
209         self._detect_and_bind_drivers()
210         resource = self._setup_resources()
211         return resource
212
213     def kill_vnf(self):
214         # pkill is not matching, debug with pgrep
215         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
216         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
217         # have to use exact match
218         # try using killall to match
219         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
220
221     def _setup_dpdk(self):
222         """Setup DPDK environment needed for VNF to run"""
223         self._setup_hugepages()
224         self.dpdk_bind_helper.load_dpdk_driver()
225
226         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
227         if exit_status == 0:
228             return
229
230     def get_collectd_options(self):
231         options = self.scenario_helper.all_options.get("collectd", {})
232         # override with specific node settings
233         options.update(self.scenario_helper.options.get("collectd", {}))
234         return options
235
236     def _setup_resources(self):
237         # what is this magic?  how do we know which socket is for which port?
238         # what about quad-socket?
239         if any(v[5] == "0" for v in self.bound_pci):
240             self.socket = 0
241         else:
242             self.socket = 1
243
244         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
245         # this won't work because we don't have DPDK port numbers yet
246         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
247         port_names = (intf["name"] for intf in ports)
248         collectd_options = self.get_collectd_options()
249         plugins = collectd_options.get("plugins", {})
250         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
251         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
252                                plugins=plugins, interval=collectd_options.get("interval"),
253                                timeout=self.scenario_helper.timeout)
254
255     def _check_interface_fields(self):
256         num_nodes = len(self.scenario_helper.nodes)
257         # OpenStack instance creation time is probably proportional to the number
258         # of instances
259         timeout = 120 * num_nodes
260         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
261                              self.ssh_helper, timeout)
262         dpdk_node.check()
263
264     def _detect_and_bind_drivers(self):
265         interfaces = self.vnfd_helper.interfaces
266
267         self._check_interface_fields()
268         # check for bound after probe
269         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
270
271         self.dpdk_bind_helper.read_status()
272         self.dpdk_bind_helper.save_used_drivers()
273
274         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
275
276         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
277         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
278             try:
279                 intf = next(v for v in interfaces
280                             if vpci == v['virtual-interface']['vpci'])
281                 # force to int
282                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
283             except:  # pylint: disable=bare-except
284                 pass
285         time.sleep(2)
286
287     def get_local_iface_name_by_vpci(self, vpci):
288         find_net_cmd = self.FIND_NET_CMD.format(vpci)
289         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
290         if exit_status == 0:
291             return stdout
292         return None
293
294     def tear_down(self):
295         self.dpdk_bind_helper.rebind_drivers()
296
297
298 class ResourceHelper(object):
299
300     COLLECT_KPI = ''
301     MAKE_INSTALL = 'cd {0} && make && sudo make install'
302     RESOURCE_WORD = 'sample'
303
304     COLLECT_MAP = {}
305
306     def __init__(self, setup_helper):
307         super(ResourceHelper, self).__init__()
308         self.resource = None
309         self.setup_helper = setup_helper
310         self.ssh_helper = setup_helper.ssh_helper
311
312     def setup(self):
313         self.resource = self.setup_helper.setup_vnf_environment()
314
315     def generate_cfg(self):
316         pass
317
318     def _collect_resource_kpi(self):
319         result = {}
320         status = self.resource.check_if_system_agent_running("collectd")[0]
321         if status == 0:
322             result = self.resource.amqp_collect_nfvi_kpi()
323
324         result = {"core": result}
325         return result
326
327     def start_collect(self):
328         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
329         self.resource.start()
330         self.resource.amqp_process_for_nfvi_kpi()
331
332     def stop_collect(self):
333         if self.resource:
334             self.resource.stop()
335
336     def collect_kpi(self):
337         return self._collect_resource_kpi()
338
339
340 class ClientResourceHelper(ResourceHelper):
341
342     RUN_DURATION = 60
343     QUEUE_WAIT_TIME = 5
344     SYNC_PORT = 1
345     ASYNC_PORT = 2
346
347     def __init__(self, setup_helper):
348         super(ClientResourceHelper, self).__init__(setup_helper)
349         self.vnfd_helper = setup_helper.vnfd_helper
350         self.scenario_helper = setup_helper.scenario_helper
351
352         self.client = None
353         self.client_started = Value('i', 0)
354         self.all_ports = None
355         self._queue = Queue()
356         self._result = {}
357         self._terminated = Value('i', 0)
358
359     def _build_ports(self):
360         self.networks = self.vnfd_helper.port_pairs.networks
361         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
362         self.downlink_ports = \
363             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
364         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
365
366     def port_num(self, intf):
367         # by default return port num
368         return self.vnfd_helper.port_num(intf)
369
370     def get_stats(self, *args, **kwargs):
371         try:
372             return self.client.get_stats(*args, **kwargs)
373         except STLError:
374             LOG.exception("TRex client not connected")
375             return {}
376
377     def generate_samples(self, ports, key=None, default=None):
378         # needs to be used ports
379         last_result = self.get_stats(ports)
380         key_value = last_result.get(key, default)
381
382         if not isinstance(last_result, Mapping):  # added for mock unit test
383             self._terminated.value = 1
384             return {}
385
386         samples = {}
387         # recalculate port for interface and see if it matches ports provided
388         for intf in self.vnfd_helper.interfaces:
389             name = intf["name"]
390             port = self.vnfd_helper.port_num(name)
391             if port in ports:
392                 xe_value = last_result.get(port, {})
393                 samples[name] = {
394                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
395                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
396                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
397                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
398                     "in_packets": int(xe_value.get("ipackets", 0)),
399                     "out_packets": int(xe_value.get("opackets", 0)),
400                 }
401                 if key:
402                     samples[name][key] = key_value
403         return samples
404
405     def _run_traffic_once(self, traffic_profile):
406         traffic_profile.execute_traffic(self)
407         self.client_started.value = 1
408         time.sleep(self.RUN_DURATION)
409         samples = self.generate_samples(traffic_profile.ports)
410         time.sleep(self.QUEUE_WAIT_TIME)
411         self._queue.put(samples)
412
413     def run_traffic(self, traffic_profile):
414         # if we don't do this we can hang waiting for the queue to drain
415         # have to do this in the subprocess
416         self._queue.cancel_join_thread()
417         # fixme: fix passing correct trex config file,
418         # instead of searching the default path
419         try:
420             self._build_ports()
421             self.client = self._connect()
422             self.client.reset(ports=self.all_ports)
423             self.client.remove_all_streams(self.all_ports)  # remove all streams
424             traffic_profile.register_generator(self)
425
426             while self._terminated.value == 0:
427                 self._run_traffic_once(traffic_profile)
428
429             self.client.stop(self.all_ports)
430             self.client.disconnect()
431             self._terminated.value = 0
432         except STLError:
433             if self._terminated.value:
434                 LOG.debug("traffic generator is stopped")
435                 return  # return if trex/tg server is stopped.
436             raise
437
438     def terminate(self):
439         self._terminated.value = 1  # stop client
440
441     def clear_stats(self, ports=None):
442         if ports is None:
443             ports = self.all_ports
444         self.client.clear_stats(ports=ports)
445
446     def start(self, ports=None, *args, **kwargs):
447         # pylint: disable=keyword-arg-before-vararg
448         # NOTE(ralonsoh): defining keyworded arguments before variable
449         # positional arguments is a bug. This function definition doesn't work
450         # in Python 2, although it works in Python 3. Reference:
451         # https://www.python.org/dev/peps/pep-3102/
452         if ports is None:
453             ports = self.all_ports
454         self.client.start(ports=ports, *args, **kwargs)
455
456     def collect_kpi(self):
457         if not self._queue.empty():
458             kpi = self._queue.get()
459             self._result.update(kpi)
460             LOG.debug('Got KPIs from _queue for %s %s',
461                       self.scenario_helper.name, self.RESOURCE_WORD)
462         return self._result
463
464     def _connect(self, client=None):
465         if client is None:
466             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
467                                server=self.vnfd_helper.mgmt_interface["ip"],
468                                verbose_level=LoggerApi.VERBOSE_QUIET)
469
470         # try to connect with 5s intervals, 30s max
471         for idx in range(6):
472             try:
473                 client.connect()
474                 break
475             except STLError:
476                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
477                 time.sleep(5)
478         return client
479
480
481 class Rfc2544ResourceHelper(object):
482
483     DEFAULT_CORRELATED_TRAFFIC = False
484     DEFAULT_LATENCY = False
485     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
486
487     def __init__(self, scenario_helper):
488         super(Rfc2544ResourceHelper, self).__init__()
489         self.scenario_helper = scenario_helper
490         self._correlated_traffic = None
491         self.iteration = Value('i', 0)
492         self._latency = None
493         self._rfc2544 = None
494         self._tolerance_low = None
495         self._tolerance_high = None
496
497     @property
498     def rfc2544(self):
499         if self._rfc2544 is None:
500             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
501         return self._rfc2544
502
503     @property
504     def tolerance_low(self):
505         if self._tolerance_low is None:
506             self.get_rfc_tolerance()
507         return self._tolerance_low
508
509     @property
510     def tolerance_high(self):
511         if self._tolerance_high is None:
512             self.get_rfc_tolerance()
513         return self._tolerance_high
514
515     @property
516     def correlated_traffic(self):
517         if self._correlated_traffic is None:
518             self._correlated_traffic = \
519                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
520
521         return self._correlated_traffic
522
523     @property
524     def latency(self):
525         if self._latency is None:
526             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
527         return self._latency
528
529     def get_rfc2544(self, name, default=None):
530         return self.rfc2544.get(name, default)
531
532     def get_rfc_tolerance(self):
533         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
534         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
535         self._tolerance_low = next(tolerance_iter)
536         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
537
538
539 class SampleVNFDeployHelper(object):
540
541     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
542     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
543     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
544
545     def __init__(self, vnfd_helper, ssh_helper):
546         super(SampleVNFDeployHelper, self).__init__()
547         self.ssh_helper = ssh_helper
548         self.vnfd_helper = vnfd_helper
549
550     def deploy_vnfs(self, app_name):
551         vnf_bin = self.ssh_helper.join_bin_path(app_name)
552         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
553         if not exit_status:
554             return
555
556         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
557         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
558         time.sleep(2)
559         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
560         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
561
562         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
563         time.sleep(2)
564         http_proxy = os.environ.get('http_proxy', '')
565         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
566         LOG.debug(cmd)
567         self.ssh_helper.execute(cmd)
568         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
569         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
570         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
571
572
573 class ScenarioHelper(object):
574
575     DEFAULT_VNF_CFG = {
576         'lb_config': 'SW',
577         'lb_count': 1,
578         'worker_config': '1C/1T',
579         'worker_threads': 1,
580     }
581
582     def __init__(self, name):
583         self.name = name
584         self.scenario_cfg = None
585
586     @property
587     def task_path(self):
588         return self.scenario_cfg['task_path']
589
590     @property
591     def nodes(self):
592         return self.scenario_cfg.get('nodes')
593
594     @property
595     def all_options(self):
596         return self.scenario_cfg.get('options', {})
597
598     @property
599     def options(self):
600         return self.all_options.get(self.name, {})
601
602     @property
603     def vnf_cfg(self):
604         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
605
606     @property
607     def topology(self):
608         return self.scenario_cfg['topology']
609
610     @property
611     def timeout(self):
612         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
613             self.options.get('timeout', DEFAULT_VNF_TIMEOUT))
614         test_timeout = self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
615         return test_duration if test_duration > test_timeout else test_timeout
616
617 class SampleVNF(GenericVNF):
618     """ Class providing file-like API for generic VNF implementation """
619
620     VNF_PROMPT = "pipeline>"
621     WAIT_TIME = 1
622     WAIT_TIME_FOR_SCRIPT = 10
623     APP_NAME = "SampleVNF"
624     # we run the VNF interactively, so the ssh command will timeout after this long
625
626     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
627         super(SampleVNF, self).__init__(name, vnfd)
628         self.bin_path = get_nsb_option('bin_path', '')
629
630         self.scenario_helper = ScenarioHelper(self.name)
631         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
632
633         if setup_env_helper_type is None:
634             setup_env_helper_type = SetupEnvHelper
635
636         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
637                                                   self.ssh_helper,
638                                                   self.scenario_helper)
639
640         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
641
642         if resource_helper_type is None:
643             resource_helper_type = ResourceHelper
644
645         self.resource_helper = resource_helper_type(self.setup_helper)
646
647         self.context_cfg = None
648         self.nfvi_context = None
649         self.pipeline_kwargs = {}
650         self.uplink_ports = None
651         self.downlink_ports = None
652         # NOTE(esm): make QueueFileWrapper invert-able so that we
653         #            never have to manage the queues
654         self.q_in = Queue()
655         self.q_out = Queue()
656         self.queue_wrapper = None
657         self.run_kwargs = {}
658         self.used_drivers = {}
659         self.vnf_port_pairs = None
660         self._vnf_process = None
661
662     def _build_ports(self):
663         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
664         self.networks = self._port_pairs.networks
665         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
666         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
667         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
668
669     def _get_route_data(self, route_index, route_type):
670         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
671         for _ in range(route_index):
672             next(route_iter, '')
673         return next(route_iter, {}).get(route_type, '')
674
675     def _get_port0localip6(self):
676         return_value = self._get_route_data(0, 'network')
677         LOG.info("_get_port0localip6 : %s", return_value)
678         return return_value
679
680     def _get_port1localip6(self):
681         return_value = self._get_route_data(1, 'network')
682         LOG.info("_get_port1localip6 : %s", return_value)
683         return return_value
684
685     def _get_port0prefixlen6(self):
686         return_value = self._get_route_data(0, 'netmask')
687         LOG.info("_get_port0prefixlen6 : %s", return_value)
688         return return_value
689
690     def _get_port1prefixlen6(self):
691         return_value = self._get_route_data(1, 'netmask')
692         LOG.info("_get_port1prefixlen6 : %s", return_value)
693         return return_value
694
695     def _get_port0gateway6(self):
696         return_value = self._get_route_data(0, 'network')
697         LOG.info("_get_port0gateway6 : %s", return_value)
698         return return_value
699
700     def _get_port1gateway6(self):
701         return_value = self._get_route_data(1, 'network')
702         LOG.info("_get_port1gateway6 : %s", return_value)
703         return return_value
704
705     def _start_vnf(self):
706         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
707         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
708         self._vnf_process = Process(name=name, target=self._run)
709         self._vnf_process.start()
710
711     def _vnf_up_post(self):
712         pass
713
714     def instantiate(self, scenario_cfg, context_cfg):
715         self.scenario_helper.scenario_cfg = scenario_cfg
716         self.context_cfg = context_cfg
717         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
718         # self.nfvi_context = None
719
720         # vnf deploy is unsupported, use ansible playbooks
721         if self.scenario_helper.options.get("vnf_deploy", False):
722             self.deploy_helper.deploy_vnfs(self.APP_NAME)
723         self.resource_helper.setup()
724         self._start_vnf()
725
726     def wait_for_instantiate(self):
727         buf = []
728         time.sleep(self.WAIT_TIME)  # Give some time for config to load
729         while True:
730             if not self._vnf_process.is_alive():
731                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
732
733             # NOTE(esm): move to QueueFileWrapper
734             while self.q_out.qsize() > 0:
735                 buf.append(self.q_out.get())
736                 message = ''.join(buf)
737                 if self.VNF_PROMPT in message:
738                     LOG.info("%s VNF is up and running.", self.APP_NAME)
739                     self._vnf_up_post()
740                     self.queue_wrapper.clear()
741                     self.resource_helper.start_collect()
742                     return self._vnf_process.exitcode
743
744                 if "PANIC" in message:
745                     raise RuntimeError("Error starting %s VNF." %
746                                        self.APP_NAME)
747
748             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
749             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
750             # Send ENTER to display a new prompt in case the prompt text was corrupted
751             # by other VNF output
752             self.q_in.put('\r\n')
753
754     def _build_run_kwargs(self):
755         self.run_kwargs = {
756             'stdin': self.queue_wrapper,
757             'stdout': self.queue_wrapper,
758             'keep_stdin_open': True,
759             'pty': True,
760             'timeout': self.scenario_helper.timeout,
761         }
762
763     def _build_config(self):
764         return self.setup_helper.build_config()
765
766     def _run(self):
767         # we can't share ssh paramiko objects to force new connection
768         self.ssh_helper.drop_connection()
769         cmd = self._build_config()
770         # kill before starting
771         self.setup_helper.kill_vnf()
772
773         LOG.debug(cmd)
774         self._build_run_kwargs()
775         self.ssh_helper.run(cmd, **self.run_kwargs)
776
777     def vnf_execute(self, cmd, wait_time=2):
778         """ send cmd to vnf process """
779
780         LOG.info("%s command: %s", self.APP_NAME, cmd)
781         self.q_in.put("{}\r\n".format(cmd))
782         time.sleep(wait_time)
783         output = []
784         while self.q_out.qsize() > 0:
785             output.append(self.q_out.get())
786         return "".join(output)
787
788     def _tear_down(self):
789         pass
790
791     def terminate(self):
792         self.vnf_execute("quit")
793         self.setup_helper.kill_vnf()
794         self._tear_down()
795         self.resource_helper.stop_collect()
796         if self._vnf_process is not None:
797             # be proper and join first before we kill
798             LOG.debug("joining before terminate %s", self._vnf_process.name)
799             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
800             self._vnf_process.terminate()
801         # no terminate children here because we share processes with tg
802
803     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
804         """Method for checking the statistics
805
806         This method could be overridden in children classes.
807
808         :return: VNF statistics
809         """
810         cmd = 'p {0} stats'.format(self.APP_WORD)
811         out = self.vnf_execute(cmd)
812         return out
813
814     def collect_kpi(self):
815         # we can't get KPIs if the VNF is down
816         check_if_process_failed(self._vnf_process)
817         stats = self.get_stats()
818         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
819         if m:
820             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
821             result["collect_stats"] = self.resource_helper.collect_kpi()
822         else:
823             result = {
824                 "packets_in": 0,
825                 "packets_fwd": 0,
826                 "packets_dropped": 0,
827             }
828         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
829         return result
830
831     def scale(self, flavor=""):
832         """The SampleVNF base class doesn't provide the 'scale' feature"""
833         raise y_exceptions.FunctionNotImplemented(
834             function_name='scale', class_name='SampleVNFTrafficGen')
835
836
837 class SampleVNFTrafficGen(GenericTrafficGen):
838     """ Class providing file-like API for generic traffic generator """
839
840     APP_NAME = 'Sample'
841     RUN_WAIT = 1
842
843     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
844         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
845         self.bin_path = get_nsb_option('bin_path', '')
846
847         self.scenario_helper = ScenarioHelper(self.name)
848         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
849
850         if setup_env_helper_type is None:
851             setup_env_helper_type = SetupEnvHelper
852
853         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
854                                                   self.ssh_helper,
855                                                   self.scenario_helper)
856
857         if resource_helper_type is None:
858             resource_helper_type = ClientResourceHelper
859
860         self.resource_helper = resource_helper_type(self.setup_helper)
861
862         self.runs_traffic = True
863         self.traffic_finished = False
864         self._tg_process = None
865         self._traffic_process = None
866
867     def _start_server(self):
868         # we can't share ssh paramiko objects to force new connection
869         self.ssh_helper.drop_connection()
870
871     def instantiate(self, scenario_cfg, context_cfg):
872         self.scenario_helper.scenario_cfg = scenario_cfg
873         self.resource_helper.setup()
874         # must generate_cfg after DPDK bind because we need port number
875         self.resource_helper.generate_cfg()
876
877         LOG.info("Starting %s server...", self.APP_NAME)
878         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
879         self._tg_process = Process(name=name, target=self._start_server)
880         self._tg_process.start()
881
882     def _check_status(self):
883         raise NotImplementedError
884
885     def _wait_for_process(self):
886         while True:
887             if not self._tg_process.is_alive():
888                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
889             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
890             time.sleep(1)
891             status = self._check_status()
892             if status == 0:
893                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
894                 return self._tg_process.exitcode
895
896     def _traffic_runner(self, traffic_profile):
897         # always drop connections first thing in new processes
898         # so we don't get paramiko errors
899         self.ssh_helper.drop_connection()
900         LOG.info("Starting %s client...", self.APP_NAME)
901         self.resource_helper.run_traffic(traffic_profile)
902
903     def run_traffic(self, traffic_profile):
904         """ Generate traffic on the wire according to the given params.
905         Method is non-blocking, returns immediately when traffic process
906         is running. Mandatory.
907
908         :param traffic_profile:
909         :return: True/False
910         """
911         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
912                                     os.getpid())
913         self._traffic_process = Process(name=name, target=self._traffic_runner,
914                                         args=(traffic_profile,))
915         self._traffic_process.start()
916         # Wait for traffic process to start
917         while self.resource_helper.client_started.value == 0:
918             time.sleep(self.RUN_WAIT)
919             # what if traffic process takes a few seconds to start?
920             if not self._traffic_process.is_alive():
921                 break
922
923         return self._traffic_process.is_alive()
924
925     def collect_kpi(self):
926         # check if the tg processes have exited
927         for proc in (self._tg_process, self._traffic_process):
928             check_if_process_failed(proc)
929         result = self.resource_helper.collect_kpi()
930         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
931         return result
932
933     def terminate(self):
934         """ After this method finishes, all traffic processes should stop. Mandatory.
935
936         :return: True/False
937         """
938         self.traffic_finished = True
939         # we must kill client before we kill the server, or the client will raise exception
940         if self._traffic_process is not None:
941             # be proper and try to join before terminating
942             LOG.debug("joining before terminate %s", self._traffic_process.name)
943             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
944             self._traffic_process.terminate()
945         if self._tg_process is not None:
946             # be proper and try to join before terminating
947             LOG.debug("joining before terminate %s", self._tg_process.name)
948             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
949             self._tg_process.terminate()
950         # no terminate children here because we share processes with vnf
951
952     def scale(self, flavor=""):
953         """A traffic generator VFN doesn't provide the 'scale' feature"""
954         raise y_exceptions.FunctionNotImplemented(
955             function_name='scale', class_name='SampleVNFTrafficGen')