Merge "Add pod.yaml files for Apex"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
44
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SetupEnvHelper(object):
50
51     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
52     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
53     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
54     PIPELINE_COMMAND = ''
55     VNF_TYPE = "SAMPLE"
56
57     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
58         super(SetupEnvHelper, self).__init__()
59         self.vnfd_helper = vnfd_helper
60         self.ssh_helper = ssh_helper
61         self.scenario_helper = scenario_helper
62
63     def build_config(self):
64         raise NotImplementedError
65
66     def setup_vnf_environment(self):
67         pass
68
69     def kill_vnf(self):
70         pass
71
72     def tear_down(self):
73         raise NotImplementedError
74
75
76 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
77
78     APP_NAME = 'DpdkVnf'
79     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
80     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
81
82     @staticmethod
83     def _update_packet_type(ip_pipeline_cfg, traffic_options):
84         match_str = 'pkt_type = ipv4'
85         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
86         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
87         return pipeline_config_str
88
89     @classmethod
90     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
91         traffic_type = traffic_options['traffic_type']
92
93         if traffic_options['vnf_type'] is not cls.APP_NAME:
94             match_str = 'traffic_type = 4'
95             replace_str = 'traffic_type = {0}'.format(traffic_type)
96
97         elif traffic_type == 4:
98             match_str = 'pkt_type = ipv4'
99             replace_str = 'pkt_type = ipv4'
100
101         else:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv6'
104
105         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
106         return pipeline_config_str
107
108     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
109         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
110         self.all_ports = None
111         self.bound_pci = None
112         self.socket = None
113         self.used_drivers = None
114         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115
116     def _setup_hugepages(self):
117         meminfo = utils.read_meminfo(self.ssh_helper)
118         hp_size_kb = int(meminfo['Hugepagesize'])
119         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
120         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
121         self.ssh_helper.execute('echo %s | sudo tee %s' %
122                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
123         hp = six.BytesIO()
124         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
125         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
126         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
127                  hp_size_kb, nr_hugepages, nr_hugepages_set)
128
129     def build_config(self):
130         vnf_cfg = self.scenario_helper.vnf_cfg
131         task_path = self.scenario_helper.task_path
132
133         config_file = vnf_cfg.get('file')
134         lb_count = vnf_cfg.get('lb_count', 3)
135         lb_config = vnf_cfg.get('lb_config', 'SW')
136         worker_config = vnf_cfg.get('worker_config', '1C/1T')
137         worker_threads = vnf_cfg.get('worker_threads', 3)
138
139         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
140         traffic_options = {
141             'traffic_type': traffic_type,
142             'pkt_type': 'ipv%s' % traffic_type,
143             'vnf_type': self.VNF_TYPE,
144         }
145
146         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
147                                                   task_path)
148         config_basename = posixpath.basename(self.CFG_CONFIG)
149         script_basename = posixpath.basename(self.CFG_SCRIPT)
150         multiport = MultiPortConfig(self.scenario_helper.topology,
151                                     config_tpl_cfg,
152                                     config_basename,
153                                     self.vnfd_helper,
154                                     self.VNF_TYPE,
155                                     lb_count,
156                                     worker_threads,
157                                     worker_config,
158                                     lb_config,
159                                     self.socket)
160
161         multiport.generate_config()
162         if config_file:
163             with utils.open_relative_file(config_file, task_path) as infile:
164                 new_config = ['[EAL]']
165                 vpci = []
166                 for port in self.vnfd_helper.port_pairs.all_ports:
167                     interface = self.vnfd_helper.find_interface(name=port)
168                     vpci.append(interface['virtual-interface']["vpci"])
169                 new_config.extend('w = {0}'.format(item) for item in vpci)
170                 new_config = '\n'.join(new_config) + '\n' + infile.read()
171         else:
172             with open(self.CFG_CONFIG) as handle:
173                 new_config = handle.read()
174             new_config = self._update_traffic_type(new_config, traffic_options)
175             new_config = self._update_packet_type(new_config, traffic_options)
176         self.ssh_helper.upload_config_file(config_basename, new_config)
177         self.ssh_helper.upload_config_file(script_basename,
178                                            multiport.generate_script(self.vnfd_helper))
179
180         LOG.info("Provision and start the %s", self.APP_NAME)
181         self._build_pipeline_kwargs()
182         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
183
184     def _build_pipeline_kwargs(self):
185         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
186         # count the number of actual ports in the list of pairs
187         # remove duplicate ports
188         # this is really a mapping from LINK ID to DPDK PMD ID
189         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
190         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
191         ports = self.vnfd_helper.port_pairs.all_ports
192         port_nums = self.vnfd_helper.port_nums(ports)
193         # create mask from all the dpdk port numbers
194         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
195         self.pipeline_kwargs = {
196             'cfg_file': self.CFG_CONFIG,
197             'script': self.CFG_SCRIPT,
198             'port_mask_hex': ports_mask_hex,
199             'tool_path': tool_path,
200         }
201
202     def setup_vnf_environment(self):
203         self._setup_dpdk()
204         self.kill_vnf()
205         # bind before _setup_resources so we can use dpdk_port_num
206         self._detect_and_bind_drivers()
207         resource = self._setup_resources()
208         return resource
209
210     def kill_vnf(self):
211         # pkill is not matching, debug with pgrep
212         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
213         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
214         # have to use exact match
215         # try using killall to match
216         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
217
218     def _setup_dpdk(self):
219         """Setup DPDK environment needed for VNF to run"""
220         self._setup_hugepages()
221         self.dpdk_bind_helper.load_dpdk_driver()
222
223         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
224         if exit_status == 0:
225             return
226
227     def get_collectd_options(self):
228         options = self.scenario_helper.all_options.get("collectd", {})
229         # override with specific node settings
230         options.update(self.scenario_helper.options.get("collectd", {}))
231         return options
232
233     def _setup_resources(self):
234         # what is this magic?  how do we know which socket is for which port?
235         # what about quad-socket?
236         if any(v[5] == "0" for v in self.bound_pci):
237             self.socket = 0
238         else:
239             self.socket = 1
240
241         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
242         # this won't work because we don't have DPDK port numbers yet
243         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
244         port_names = (intf["name"] for intf in ports)
245         collectd_options = self.get_collectd_options()
246         plugins = collectd_options.get("plugins", {})
247         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
248         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
249                                plugins=plugins, interval=collectd_options.get("interval"),
250                                timeout=self.scenario_helper.timeout)
251
252     def _check_interface_fields(self):
253         num_nodes = len(self.scenario_helper.nodes)
254         # OpenStack instance creation time is probably proportional to the number
255         # of instances
256         timeout = 120 * num_nodes
257         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
258                              self.ssh_helper, timeout)
259         dpdk_node.check()
260
261     def _detect_and_bind_drivers(self):
262         interfaces = self.vnfd_helper.interfaces
263
264         self._check_interface_fields()
265         # check for bound after probe
266         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
267
268         self.dpdk_bind_helper.read_status()
269         self.dpdk_bind_helper.save_used_drivers()
270
271         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
272
273         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
274         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
275             try:
276                 intf = next(v for v in interfaces
277                             if vpci == v['virtual-interface']['vpci'])
278                 # force to int
279                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
280             except:  # pylint: disable=bare-except
281                 pass
282         time.sleep(2)
283
284     def get_local_iface_name_by_vpci(self, vpci):
285         find_net_cmd = self.FIND_NET_CMD.format(vpci)
286         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
287         if exit_status == 0:
288             return stdout
289         return None
290
291     def tear_down(self):
292         self.dpdk_bind_helper.rebind_drivers()
293
294
295 class ResourceHelper(object):
296
297     COLLECT_KPI = ''
298     MAKE_INSTALL = 'cd {0} && make && sudo make install'
299     RESOURCE_WORD = 'sample'
300
301     COLLECT_MAP = {}
302
303     def __init__(self, setup_helper):
304         super(ResourceHelper, self).__init__()
305         self.resource = None
306         self.setup_helper = setup_helper
307         self.ssh_helper = setup_helper.ssh_helper
308
309     def setup(self):
310         self.resource = self.setup_helper.setup_vnf_environment()
311
312     def generate_cfg(self):
313         pass
314
315     def _collect_resource_kpi(self):
316         result = {}
317         status = self.resource.check_if_system_agent_running("collectd")[0]
318         if status == 0:
319             result = self.resource.amqp_collect_nfvi_kpi()
320
321         result = {"core": result}
322         return result
323
324     def start_collect(self):
325         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
326         self.resource.start()
327         self.resource.amqp_process_for_nfvi_kpi()
328
329     def stop_collect(self):
330         if self.resource:
331             self.resource.stop()
332
333     def collect_kpi(self):
334         return self._collect_resource_kpi()
335
336
337 class ClientResourceHelper(ResourceHelper):
338
339     RUN_DURATION = 60
340     QUEUE_WAIT_TIME = 5
341     SYNC_PORT = 1
342     ASYNC_PORT = 2
343
344     def __init__(self, setup_helper):
345         super(ClientResourceHelper, self).__init__(setup_helper)
346         self.vnfd_helper = setup_helper.vnfd_helper
347         self.scenario_helper = setup_helper.scenario_helper
348
349         self.client = None
350         self.client_started = Value('i', 0)
351         self.all_ports = None
352         self._queue = Queue()
353         self._result = {}
354         self._terminated = Value('i', 0)
355
356     def _build_ports(self):
357         self.networks = self.vnfd_helper.port_pairs.networks
358         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
359         self.downlink_ports = \
360             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
361         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
362
363     def port_num(self, intf):
364         # by default return port num
365         return self.vnfd_helper.port_num(intf)
366
367     def get_stats(self, *args, **kwargs):
368         try:
369             return self.client.get_stats(*args, **kwargs)
370         except STLError:
371             LOG.error('TRex client not connected')
372             return {}
373
374     def generate_samples(self, ports, key=None, default=None):
375         # needs to be used ports
376         last_result = self.get_stats(ports)
377         key_value = last_result.get(key, default)
378
379         if not isinstance(last_result, Mapping):  # added for mock unit test
380             self._terminated.value = 1
381             return {}
382
383         samples = {}
384         # recalculate port for interface and see if it matches ports provided
385         for intf in self.vnfd_helper.interfaces:
386             name = intf["name"]
387             port = self.vnfd_helper.port_num(name)
388             if port in ports:
389                 xe_value = last_result.get(port, {})
390                 samples[name] = {
391                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
392                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
393                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
394                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
395                     "in_packets": int(xe_value.get("ipackets", 0)),
396                     "out_packets": int(xe_value.get("opackets", 0)),
397                 }
398                 if key:
399                     samples[name][key] = key_value
400         return samples
401
402     def _run_traffic_once(self, traffic_profile):
403         traffic_profile.execute_traffic(self)
404         self.client_started.value = 1
405         time.sleep(self.RUN_DURATION)
406         samples = self.generate_samples(traffic_profile.ports)
407         time.sleep(self.QUEUE_WAIT_TIME)
408         self._queue.put(samples)
409
410     def run_traffic(self, traffic_profile):
411         # if we don't do this we can hang waiting for the queue to drain
412         # have to do this in the subprocess
413         self._queue.cancel_join_thread()
414         # fixme: fix passing correct trex config file,
415         # instead of searching the default path
416         try:
417             self._build_ports()
418             self.client = self._connect()
419             self.client.reset(ports=self.all_ports)
420             self.client.remove_all_streams(self.all_ports)  # remove all streams
421             traffic_profile.register_generator(self)
422
423             while self._terminated.value == 0:
424                 self._run_traffic_once(traffic_profile)
425
426             self.client.stop(self.all_ports)
427             self.client.disconnect()
428             self._terminated.value = 0
429         except STLError:
430             if self._terminated.value:
431                 LOG.debug("traffic generator is stopped")
432                 return  # return if trex/tg server is stopped.
433             raise
434
435     def terminate(self):
436         self._terminated.value = 1  # stop client
437
438     def clear_stats(self, ports=None):
439         if ports is None:
440             ports = self.all_ports
441         self.client.clear_stats(ports=ports)
442
443     def start(self, ports=None, *args, **kwargs):
444         # pylint: disable=keyword-arg-before-vararg
445         # NOTE(ralonsoh): defining keyworded arguments before variable
446         # positional arguments is a bug. This function definition doesn't work
447         # in Python 2, although it works in Python 3. Reference:
448         # https://www.python.org/dev/peps/pep-3102/
449         if ports is None:
450             ports = self.all_ports
451         self.client.start(ports=ports, *args, **kwargs)
452
453     def collect_kpi(self):
454         if not self._queue.empty():
455             kpi = self._queue.get()
456             self._result.update(kpi)
457             LOG.debug('Got KPIs from _queue for %s %s',
458                       self.scenario_helper.name, self.RESOURCE_WORD)
459         return self._result
460
461     def _connect(self, client=None):
462         if client is None:
463             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
464                                server=self.vnfd_helper.mgmt_interface["ip"],
465                                verbose_level=LoggerApi.VERBOSE_QUIET)
466
467         # try to connect with 5s intervals, 30s max
468         for idx in range(6):
469             try:
470                 client.connect()
471                 break
472             except STLError:
473                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
474                 time.sleep(5)
475         return client
476
477
478 class Rfc2544ResourceHelper(object):
479
480     DEFAULT_CORRELATED_TRAFFIC = False
481     DEFAULT_LATENCY = False
482     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
483
484     def __init__(self, scenario_helper):
485         super(Rfc2544ResourceHelper, self).__init__()
486         self.scenario_helper = scenario_helper
487         self._correlated_traffic = None
488         self.iteration = Value('i', 0)
489         self._latency = None
490         self._rfc2544 = None
491         self._tolerance_low = None
492         self._tolerance_high = None
493
494     @property
495     def rfc2544(self):
496         if self._rfc2544 is None:
497             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
498         return self._rfc2544
499
500     @property
501     def tolerance_low(self):
502         if self._tolerance_low is None:
503             self.get_rfc_tolerance()
504         return self._tolerance_low
505
506     @property
507     def tolerance_high(self):
508         if self._tolerance_high is None:
509             self.get_rfc_tolerance()
510         return self._tolerance_high
511
512     @property
513     def correlated_traffic(self):
514         if self._correlated_traffic is None:
515             self._correlated_traffic = \
516                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
517
518         return self._correlated_traffic
519
520     @property
521     def latency(self):
522         if self._latency is None:
523             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
524         return self._latency
525
526     def get_rfc2544(self, name, default=None):
527         return self.rfc2544.get(name, default)
528
529     def get_rfc_tolerance(self):
530         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
531         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
532         self._tolerance_low = next(tolerance_iter)
533         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
534
535
536 class SampleVNFDeployHelper(object):
537
538     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
539     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
540     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
541
542     def __init__(self, vnfd_helper, ssh_helper):
543         super(SampleVNFDeployHelper, self).__init__()
544         self.ssh_helper = ssh_helper
545         self.vnfd_helper = vnfd_helper
546
547     def deploy_vnfs(self, app_name):
548         vnf_bin = self.ssh_helper.join_bin_path(app_name)
549         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
550         if not exit_status:
551             return
552
553         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
554         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
555         time.sleep(2)
556         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
557         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
558
559         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
560         time.sleep(2)
561         http_proxy = os.environ.get('http_proxy', '')
562         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
563         LOG.debug(cmd)
564         self.ssh_helper.execute(cmd)
565         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
566         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
567         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
568
569
570 class ScenarioHelper(object):
571
572     DEFAULT_VNF_CFG = {
573         'lb_config': 'SW',
574         'lb_count': 1,
575         'worker_config': '1C/1T',
576         'worker_threads': 1,
577     }
578
579     def __init__(self, name):
580         self.name = name
581         self.scenario_cfg = None
582
583     @property
584     def task_path(self):
585         return self.scenario_cfg['task_path']
586
587     @property
588     def nodes(self):
589         return self.scenario_cfg.get('nodes')
590
591     @property
592     def all_options(self):
593         return self.scenario_cfg.get('options', {})
594
595     @property
596     def options(self):
597         return self.all_options.get(self.name, {})
598
599     @property
600     def vnf_cfg(self):
601         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
602
603     @property
604     def topology(self):
605         return self.scenario_cfg['topology']
606
607     @property
608     def timeout(self):
609         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
610             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
611         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
612         return test_duration if test_duration > test_timeout else test_timeout
613
614 class SampleVNF(GenericVNF):
615     """ Class providing file-like API for generic VNF implementation """
616
617     VNF_PROMPT = "pipeline>"
618     WAIT_TIME = 1
619     WAIT_TIME_FOR_SCRIPT = 10
620     APP_NAME = "SampleVNF"
621     # we run the VNF interactively, so the ssh command will timeout after this long
622
623     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
624         super(SampleVNF, self).__init__(name, vnfd)
625         self.bin_path = get_nsb_option('bin_path', '')
626
627         self.scenario_helper = ScenarioHelper(self.name)
628         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
629
630         if setup_env_helper_type is None:
631             setup_env_helper_type = SetupEnvHelper
632
633         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
634                                                   self.ssh_helper,
635                                                   self.scenario_helper)
636
637         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
638
639         if resource_helper_type is None:
640             resource_helper_type = ResourceHelper
641
642         self.resource_helper = resource_helper_type(self.setup_helper)
643
644         self.context_cfg = None
645         self.nfvi_context = None
646         self.pipeline_kwargs = {}
647         self.uplink_ports = None
648         self.downlink_ports = None
649         # NOTE(esm): make QueueFileWrapper invert-able so that we
650         #            never have to manage the queues
651         self.q_in = Queue()
652         self.q_out = Queue()
653         self.queue_wrapper = None
654         self.run_kwargs = {}
655         self.used_drivers = {}
656         self.vnf_port_pairs = None
657         self._vnf_process = None
658
659     def _start_vnf(self):
660         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
661         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
662         self._vnf_process = Process(name=name, target=self._run)
663         self._vnf_process.start()
664
665     def _vnf_up_post(self):
666         pass
667
668     def instantiate(self, scenario_cfg, context_cfg):
669         self.scenario_helper.scenario_cfg = scenario_cfg
670         self.context_cfg = context_cfg
671         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
672         # self.nfvi_context = None
673
674         # vnf deploy is unsupported, use ansible playbooks
675         if self.scenario_helper.options.get("vnf_deploy", False):
676             self.deploy_helper.deploy_vnfs(self.APP_NAME)
677         self.resource_helper.setup()
678         self._start_vnf()
679
680     def wait_for_instantiate(self):
681         buf = []
682         time.sleep(self.WAIT_TIME)  # Give some time for config to load
683         while True:
684             if not self._vnf_process.is_alive():
685                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
686
687             # NOTE(esm): move to QueueFileWrapper
688             while self.q_out.qsize() > 0:
689                 buf.append(self.q_out.get())
690                 message = ''.join(buf)
691                 if self.VNF_PROMPT in message:
692                     LOG.info("%s VNF is up and running.", self.APP_NAME)
693                     self._vnf_up_post()
694                     self.queue_wrapper.clear()
695                     self.resource_helper.start_collect()
696                     return self._vnf_process.exitcode
697
698                 if "PANIC" in message:
699                     raise RuntimeError("Error starting %s VNF." %
700                                        self.APP_NAME)
701
702             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
703             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
704             # Send ENTER to display a new prompt in case the prompt text was corrupted
705             # by other VNF output
706             self.q_in.put('\r\n')
707
708     def _build_run_kwargs(self):
709         self.run_kwargs = {
710             'stdin': self.queue_wrapper,
711             'stdout': self.queue_wrapper,
712             'keep_stdin_open': True,
713             'pty': True,
714             'timeout': self.scenario_helper.timeout,
715         }
716
717     def _build_config(self):
718         return self.setup_helper.build_config()
719
720     def _run(self):
721         # we can't share ssh paramiko objects to force new connection
722         self.ssh_helper.drop_connection()
723         cmd = self._build_config()
724         # kill before starting
725         self.setup_helper.kill_vnf()
726
727         LOG.debug(cmd)
728         self._build_run_kwargs()
729         self.ssh_helper.run(cmd, **self.run_kwargs)
730
731     def vnf_execute(self, cmd, wait_time=2):
732         """ send cmd to vnf process """
733
734         LOG.info("%s command: %s", self.APP_NAME, cmd)
735         self.q_in.put("{}\r\n".format(cmd))
736         time.sleep(wait_time)
737         output = []
738         while self.q_out.qsize() > 0:
739             output.append(self.q_out.get())
740         return "".join(output)
741
742     def _tear_down(self):
743         pass
744
745     def terminate(self):
746         self.vnf_execute("quit")
747         self.setup_helper.kill_vnf()
748         self._tear_down()
749         self.resource_helper.stop_collect()
750         if self._vnf_process is not None:
751             # be proper and join first before we kill
752             LOG.debug("joining before terminate %s", self._vnf_process.name)
753             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
754             self._vnf_process.terminate()
755         # no terminate children here because we share processes with tg
756
757     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
758         """Method for checking the statistics
759
760         This method could be overridden in children classes.
761
762         :return: VNF statistics
763         """
764         cmd = 'p {0} stats'.format(self.APP_WORD)
765         out = self.vnf_execute(cmd)
766         return out
767
768     def collect_kpi(self):
769         # we can't get KPIs if the VNF is down
770         check_if_process_failed(self._vnf_process)
771         stats = self.get_stats()
772         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
773         if m:
774             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
775             result["collect_stats"] = self.resource_helper.collect_kpi()
776         else:
777             result = {
778                 "packets_in": 0,
779                 "packets_fwd": 0,
780                 "packets_dropped": 0,
781             }
782         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
783         return result
784
785     def scale(self, flavor=""):
786         """The SampleVNF base class doesn't provide the 'scale' feature"""
787         raise y_exceptions.FunctionNotImplemented(
788             function_name='scale', class_name='SampleVNFTrafficGen')
789
790
791 class SampleVNFTrafficGen(GenericTrafficGen):
792     """ Class providing file-like API for generic traffic generator """
793
794     APP_NAME = 'Sample'
795     RUN_WAIT = 1
796
797     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
798         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
799         self.bin_path = get_nsb_option('bin_path', '')
800
801         self.scenario_helper = ScenarioHelper(self.name)
802         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
803
804         if setup_env_helper_type is None:
805             setup_env_helper_type = SetupEnvHelper
806
807         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
808                                                   self.ssh_helper,
809                                                   self.scenario_helper)
810
811         if resource_helper_type is None:
812             resource_helper_type = ClientResourceHelper
813
814         self.resource_helper = resource_helper_type(self.setup_helper)
815
816         self.runs_traffic = True
817         self.traffic_finished = False
818         self._tg_process = None
819         self._traffic_process = None
820
821     def _start_server(self):
822         # we can't share ssh paramiko objects to force new connection
823         self.ssh_helper.drop_connection()
824
825     def instantiate(self, scenario_cfg, context_cfg):
826         self.scenario_helper.scenario_cfg = scenario_cfg
827         self.resource_helper.setup()
828         # must generate_cfg after DPDK bind because we need port number
829         self.resource_helper.generate_cfg()
830
831         LOG.info("Starting %s server...", self.APP_NAME)
832         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
833         self._tg_process = Process(name=name, target=self._start_server)
834         self._tg_process.start()
835
836     def _check_status(self):
837         raise NotImplementedError
838
839     def _wait_for_process(self):
840         while True:
841             if not self._tg_process.is_alive():
842                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
843             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
844             time.sleep(1)
845             status = self._check_status()
846             if status == 0:
847                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
848                 return self._tg_process.exitcode
849
850     def _traffic_runner(self, traffic_profile):
851         # always drop connections first thing in new processes
852         # so we don't get paramiko errors
853         self.ssh_helper.drop_connection()
854         LOG.info("Starting %s client...", self.APP_NAME)
855         self.resource_helper.run_traffic(traffic_profile)
856
857     def run_traffic(self, traffic_profile):
858         """ Generate traffic on the wire according to the given params.
859         Method is non-blocking, returns immediately when traffic process
860         is running. Mandatory.
861
862         :param traffic_profile:
863         :return: True/False
864         """
865         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
866                                     os.getpid())
867         self._traffic_process = Process(name=name, target=self._traffic_runner,
868                                         args=(traffic_profile,))
869         self._traffic_process.start()
870         # Wait for traffic process to start
871         while self.resource_helper.client_started.value == 0:
872             time.sleep(self.RUN_WAIT)
873             # what if traffic process takes a few seconds to start?
874             if not self._traffic_process.is_alive():
875                 break
876
877         return self._traffic_process.is_alive()
878
879     def collect_kpi(self):
880         # check if the tg processes have exited
881         for proc in (self._tg_process, self._traffic_process):
882             check_if_process_failed(proc)
883         result = self.resource_helper.collect_kpi()
884         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
885         return result
886
887     def terminate(self):
888         """ After this method finishes, all traffic processes should stop. Mandatory.
889
890         :return: True/False
891         """
892         self.traffic_finished = True
893         # we must kill client before we kill the server, or the client will raise exception
894         if self._traffic_process is not None:
895             # be proper and try to join before terminating
896             LOG.debug("joining before terminate %s", self._traffic_process.name)
897             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
898             self._traffic_process.terminate()
899         if self._tg_process is not None:
900             # be proper and try to join before terminating
901             LOG.debug("joining before terminate %s", self._tg_process.name)
902             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
903             self._tg_process.terminate()
904         # no terminate children here because we share processes with vnf
905
906     def scale(self, flavor=""):
907         """A traffic generator VFN doesn't provide the 'scale' feature"""
908         raise y_exceptions.FunctionNotImplemented(
909             function_name='scale', class_name='SampleVNFTrafficGen')