Improve SampleVNF hugepages setup
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 import subprocess
23 import time
24
25 import six
26 from six.moves import cStringIO
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
33 from yardstick.common import exceptions as y_exceptions
34 from yardstick.common import utils
35 from yardstick.common.process import check_if_process_failed
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
38 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.ssh import AutoConnectSSH
45
46
47 DPDK_VERSION = "dpdk-16.07"
48
49 LOG = logging.getLogger(__name__)
50
51
52 REMOTE_TMP = "/tmp"
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
55
56
57 class VnfSshHelper(AutoConnectSSH):
58
59     def __init__(self, node, bin_path, wait=None):
60         self.node = node
61         kwargs = self.args_from_node(self.node)
62         if wait:
63             kwargs.setdefault('wait', wait)
64
65         super(VnfSshHelper, self).__init__(**kwargs)
66         self.bin_path = bin_path
67
68     @staticmethod
69     def get_class():
70         # must return static class name, anything else refers to the calling class
71         # i.e. the subclass, not the superclass
72         return VnfSshHelper
73
74     def copy(self):
75         # this copy constructor is different from SSH classes, since it uses node
76         return self.get_class()(self.node, self.bin_path)
77
78     def upload_config_file(self, prefix, content):
79         cfg_file = os.path.join(REMOTE_TMP, prefix)
80         LOG.debug(content)
81         file_obj = cStringIO(content)
82         self.put_file_obj(file_obj, cfg_file)
83         return cfg_file
84
85     def join_bin_path(self, *args):
86         return os.path.join(self.bin_path, *args)
87
88     def provision_tool(self, tool_path=None, tool_file=None):
89         if tool_path is None:
90             tool_path = self.bin_path
91         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
92
93
94 class SetupEnvHelper(object):
95
96     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def build_config(self):
109         raise NotImplementedError
110
111     def setup_vnf_environment(self):
112         pass
113
114     def kill_vnf(self):
115         pass
116
117     def tear_down(self):
118         raise NotImplementedError
119
120
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
122
123     APP_NAME = 'DpdkVnf'
124     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
126     HUGEPAGES_KB = 1024 * 1024 * 16
127
128     @staticmethod
129     def _update_packet_type(ip_pipeline_cfg, traffic_options):
130         match_str = 'pkt_type = ipv4'
131         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
132         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
133         return pipeline_config_str
134
135     @classmethod
136     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
137         traffic_type = traffic_options['traffic_type']
138
139         if traffic_options['vnf_type'] is not cls.APP_NAME:
140             match_str = 'traffic_type = 4'
141             replace_str = 'traffic_type = {0}'.format(traffic_type)
142
143         elif traffic_type == 4:
144             match_str = 'pkt_type = ipv4'
145             replace_str = 'pkt_type = ipv4'
146
147         else:
148             match_str = 'pkt_type = ipv4'
149             replace_str = 'pkt_type = ipv6'
150
151         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
152         return pipeline_config_str
153
154     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
155         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
156         self.all_ports = None
157         self.bound_pci = None
158         self.socket = None
159         self.used_drivers = None
160         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
161
162     def _setup_hugepages(self):
163         meminfo = utils.read_meminfo(self.ssh_helper)
164         hp_size_kb = int(meminfo['Hugepagesize'])
165         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
166         self.ssh_helper.execute('echo %s | sudo tee %s' %
167                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
168         hp = six.BytesIO()
169         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
170         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
171         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
172                  hp_size_kb, nr_hugepages, nr_hugepages_set)
173
174     def build_config(self):
175         vnf_cfg = self.scenario_helper.vnf_cfg
176         task_path = self.scenario_helper.task_path
177
178         lb_count = vnf_cfg.get('lb_count', 3)
179         lb_config = vnf_cfg.get('lb_config', 'SW')
180         worker_config = vnf_cfg.get('worker_config', '1C/1T')
181         worker_threads = vnf_cfg.get('worker_threads', 3)
182
183         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
184         traffic_options = {
185             'traffic_type': traffic_type,
186             'pkt_type': 'ipv%s' % traffic_type,
187             'vnf_type': self.VNF_TYPE,
188         }
189
190         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
191         config_basename = posixpath.basename(self.CFG_CONFIG)
192         script_basename = posixpath.basename(self.CFG_SCRIPT)
193         multiport = MultiPortConfig(self.scenario_helper.topology,
194                                     config_tpl_cfg,
195                                     config_basename,
196                                     self.vnfd_helper,
197                                     self.VNF_TYPE,
198                                     lb_count,
199                                     worker_threads,
200                                     worker_config,
201                                     lb_config,
202                                     self.socket)
203
204         multiport.generate_config()
205         with open(self.CFG_CONFIG) as handle:
206             new_config = handle.read()
207
208         new_config = self._update_traffic_type(new_config, traffic_options)
209         new_config = self._update_packet_type(new_config, traffic_options)
210
211         self.ssh_helper.upload_config_file(config_basename, new_config)
212         self.ssh_helper.upload_config_file(script_basename,
213                                            multiport.generate_script(self.vnfd_helper))
214
215         LOG.info("Provision and start the %s", self.APP_NAME)
216         self._build_pipeline_kwargs()
217         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
218
219     def _build_pipeline_kwargs(self):
220         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
221         # count the number of actual ports in the list of pairs
222         # remove duplicate ports
223         # this is really a mapping from LINK ID to DPDK PMD ID
224         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
225         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
226         ports = self.vnfd_helper.port_pairs.all_ports
227         port_nums = self.vnfd_helper.port_nums(ports)
228         # create mask from all the dpdk port numbers
229         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
230         self.pipeline_kwargs = {
231             'cfg_file': self.CFG_CONFIG,
232             'script': self.CFG_SCRIPT,
233             'port_mask_hex': ports_mask_hex,
234             'tool_path': tool_path,
235         }
236
237     def setup_vnf_environment(self):
238         self._setup_dpdk()
239         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
240         self.kill_vnf()
241         # bind before _setup_resources so we can use dpdk_port_num
242         self._detect_and_bind_drivers()
243         resource = self._setup_resources()
244         return resource
245
246     def kill_vnf(self):
247         # pkill is not matching, debug with pgrep
248         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
249         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
250         # have to use exact match
251         # try using killall to match
252         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
253
254     def _setup_dpdk(self):
255         """ setup dpdk environment needed for vnf to run """
256
257         self._setup_hugepages()
258         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
259
260         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
261         if exit_status == 0:
262             return
263
264         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
265         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
266         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
267         if exit_status != 0:
268             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
269
270     def get_collectd_options(self):
271         options = self.scenario_helper.all_options.get("collectd", {})
272         # override with specific node settings
273         options.update(self.scenario_helper.options.get("collectd", {}))
274         return options
275
276     def _setup_resources(self):
277         # what is this magic?  how do we know which socket is for which port?
278         # what about quad-socket?
279         if any(v[5] == "0" for v in self.bound_pci):
280             self.socket = 0
281         else:
282             self.socket = 1
283
284         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
285         # this won't work because we don't have DPDK port numbers yet
286         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
287         port_names = (intf["name"] for intf in ports)
288         collectd_options = self.get_collectd_options()
289         plugins = collectd_options.get("plugins", {})
290         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
291         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
292                                plugins=plugins, interval=collectd_options.get("interval"),
293                                timeout=self.scenario_helper.timeout)
294
295     def _detect_and_bind_drivers(self):
296         interfaces = self.vnfd_helper.interfaces
297
298         self.dpdk_bind_helper.read_status()
299         self.dpdk_bind_helper.save_used_drivers()
300
301         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
302
303         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
304         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
305             try:
306                 intf = next(v for v in interfaces
307                             if vpci == v['virtual-interface']['vpci'])
308                 # force to int
309                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
310             except:  # pylint: disable=bare-except
311                 pass
312         time.sleep(2)
313
314     def get_local_iface_name_by_vpci(self, vpci):
315         find_net_cmd = self.FIND_NET_CMD.format(vpci)
316         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
317         if exit_status == 0:
318             return stdout
319         return None
320
321     def tear_down(self):
322         self.dpdk_bind_helper.rebind_drivers()
323
324
325 class ResourceHelper(object):
326
327     COLLECT_KPI = ''
328     MAKE_INSTALL = 'cd {0} && make && sudo make install'
329     RESOURCE_WORD = 'sample'
330
331     COLLECT_MAP = {}
332
333     def __init__(self, setup_helper):
334         super(ResourceHelper, self).__init__()
335         self.resource = None
336         self.setup_helper = setup_helper
337         self.ssh_helper = setup_helper.ssh_helper
338
339     def setup(self):
340         self.resource = self.setup_helper.setup_vnf_environment()
341
342     def generate_cfg(self):
343         pass
344
345     def _collect_resource_kpi(self):
346         result = {}
347         status = self.resource.check_if_system_agent_running("collectd")[0]
348         if status == 0:
349             result = self.resource.amqp_collect_nfvi_kpi()
350
351         result = {"core": result}
352         return result
353
354     def start_collect(self):
355         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
356         self.resource.start()
357         self.resource.amqp_process_for_nfvi_kpi()
358
359     def stop_collect(self):
360         if self.resource:
361             self.resource.stop()
362
363     def collect_kpi(self):
364         return self._collect_resource_kpi()
365
366
367 class ClientResourceHelper(ResourceHelper):
368
369     RUN_DURATION = 60
370     QUEUE_WAIT_TIME = 5
371     SYNC_PORT = 1
372     ASYNC_PORT = 2
373
374     def __init__(self, setup_helper):
375         super(ClientResourceHelper, self).__init__(setup_helper)
376         self.vnfd_helper = setup_helper.vnfd_helper
377         self.scenario_helper = setup_helper.scenario_helper
378
379         self.client = None
380         self.client_started = Value('i', 0)
381         self.all_ports = None
382         self._queue = Queue()
383         self._result = {}
384         self._terminated = Value('i', 0)
385
386     def _build_ports(self):
387         self.networks = self.vnfd_helper.port_pairs.networks
388         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
389         self.downlink_ports = \
390             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
391         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
392
393     def port_num(self, intf):
394         # by default return port num
395         return self.vnfd_helper.port_num(intf)
396
397     def get_stats(self, *args, **kwargs):
398         try:
399             return self.client.get_stats(*args, **kwargs)
400         except STLError:
401             LOG.exception("TRex client not connected")
402             return {}
403
404     def generate_samples(self, ports, key=None, default=None):
405         # needs to be used ports
406         last_result = self.get_stats(ports)
407         key_value = last_result.get(key, default)
408
409         if not isinstance(last_result, Mapping):  # added for mock unit test
410             self._terminated.value = 1
411             return {}
412
413         samples = {}
414         # recalculate port for interface and see if it matches ports provided
415         for intf in self.vnfd_helper.interfaces:
416             name = intf["name"]
417             port = self.vnfd_helper.port_num(name)
418             if port in ports:
419                 xe_value = last_result.get(port, {})
420                 samples[name] = {
421                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
422                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
423                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
424                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
425                     "in_packets": int(xe_value.get("ipackets", 0)),
426                     "out_packets": int(xe_value.get("opackets", 0)),
427                 }
428                 if key:
429                     samples[name][key] = key_value
430         return samples
431
432     def _run_traffic_once(self, traffic_profile):
433         traffic_profile.execute_traffic(self)
434         self.client_started.value = 1
435         time.sleep(self.RUN_DURATION)
436         samples = self.generate_samples(traffic_profile.ports)
437         time.sleep(self.QUEUE_WAIT_TIME)
438         self._queue.put(samples)
439
440     def run_traffic(self, traffic_profile):
441         # if we don't do this we can hang waiting for the queue to drain
442         # have to do this in the subprocess
443         self._queue.cancel_join_thread()
444         # fixme: fix passing correct trex config file,
445         # instead of searching the default path
446         try:
447             self._build_ports()
448             self.client = self._connect()
449             self.client.reset(ports=self.all_ports)
450             self.client.remove_all_streams(self.all_ports)  # remove all streams
451             traffic_profile.register_generator(self)
452
453             while self._terminated.value == 0:
454                 self._run_traffic_once(traffic_profile)
455
456             self.client.stop(self.all_ports)
457             self.client.disconnect()
458             self._terminated.value = 0
459         except STLError:
460             if self._terminated.value:
461                 LOG.debug("traffic generator is stopped")
462                 return  # return if trex/tg server is stopped.
463             raise
464
465     def terminate(self):
466         self._terminated.value = 1  # stop client
467
468     def clear_stats(self, ports=None):
469         if ports is None:
470             ports = self.all_ports
471         self.client.clear_stats(ports=ports)
472
473     def start(self, ports=None, *args, **kwargs):
474         # pylint: disable=keyword-arg-before-vararg
475         # NOTE(ralonsoh): defining keyworded arguments before variable
476         # positional arguments is a bug. This function definition doesn't work
477         # in Python 2, although it works in Python 3. Reference:
478         # https://www.python.org/dev/peps/pep-3102/
479         if ports is None:
480             ports = self.all_ports
481         self.client.start(ports=ports, *args, **kwargs)
482
483     def collect_kpi(self):
484         if not self._queue.empty():
485             kpi = self._queue.get()
486             self._result.update(kpi)
487             LOG.debug('Got KPIs from _queue for %s %s',
488                       self.scenario_helper.name, self.RESOURCE_WORD)
489         return self._result
490
491     def _connect(self, client=None):
492         if client is None:
493             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
494                                server=self.vnfd_helper.mgmt_interface["ip"],
495                                verbose_level=LoggerApi.VERBOSE_QUIET)
496
497         # try to connect with 5s intervals, 30s max
498         for idx in range(6):
499             try:
500                 client.connect()
501                 break
502             except STLError:
503                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
504                 time.sleep(5)
505         return client
506
507
508 class Rfc2544ResourceHelper(object):
509
510     DEFAULT_CORRELATED_TRAFFIC = False
511     DEFAULT_LATENCY = False
512     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
513
514     def __init__(self, scenario_helper):
515         super(Rfc2544ResourceHelper, self).__init__()
516         self.scenario_helper = scenario_helper
517         self._correlated_traffic = None
518         self.iteration = Value('i', 0)
519         self._latency = None
520         self._rfc2544 = None
521         self._tolerance_low = None
522         self._tolerance_high = None
523
524     @property
525     def rfc2544(self):
526         if self._rfc2544 is None:
527             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
528         return self._rfc2544
529
530     @property
531     def tolerance_low(self):
532         if self._tolerance_low is None:
533             self.get_rfc_tolerance()
534         return self._tolerance_low
535
536     @property
537     def tolerance_high(self):
538         if self._tolerance_high is None:
539             self.get_rfc_tolerance()
540         return self._tolerance_high
541
542     @property
543     def correlated_traffic(self):
544         if self._correlated_traffic is None:
545             self._correlated_traffic = \
546                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
547
548         return self._correlated_traffic
549
550     @property
551     def latency(self):
552         if self._latency is None:
553             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
554         return self._latency
555
556     def get_rfc2544(self, name, default=None):
557         return self.rfc2544.get(name, default)
558
559     def get_rfc_tolerance(self):
560         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
561         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
562         self._tolerance_low = next(tolerance_iter)
563         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
564
565
566 class SampleVNFDeployHelper(object):
567
568     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
569     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
570     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
571
572     def __init__(self, vnfd_helper, ssh_helper):
573         super(SampleVNFDeployHelper, self).__init__()
574         self.ssh_helper = ssh_helper
575         self.vnfd_helper = vnfd_helper
576
577     def deploy_vnfs(self, app_name):
578         vnf_bin = self.ssh_helper.join_bin_path(app_name)
579         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
580         if not exit_status:
581             return
582
583         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
584         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
585         time.sleep(2)
586         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
587         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
588
589         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
590         time.sleep(2)
591         http_proxy = os.environ.get('http_proxy', '')
592         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
593         LOG.debug(cmd)
594         self.ssh_helper.execute(cmd)
595         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
596         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
597         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
598
599
600 class ScenarioHelper(object):
601
602     DEFAULT_VNF_CFG = {
603         'lb_config': 'SW',
604         'lb_count': 1,
605         'worker_config': '1C/1T',
606         'worker_threads': 1,
607     }
608
609     def __init__(self, name):
610         self.name = name
611         self.scenario_cfg = None
612
613     @property
614     def task_path(self):
615         return self.scenario_cfg['task_path']
616
617     @property
618     def nodes(self):
619         return self.scenario_cfg.get('nodes')
620
621     @property
622     def all_options(self):
623         return self.scenario_cfg.get('options', {})
624
625     @property
626     def options(self):
627         return self.all_options.get(self.name, {})
628
629     @property
630     def vnf_cfg(self):
631         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
632
633     @property
634     def topology(self):
635         return self.scenario_cfg['topology']
636
637     @property
638     def timeout(self):
639         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
640
641
642 class SampleVNF(GenericVNF):
643     """ Class providing file-like API for generic VNF implementation """
644
645     VNF_PROMPT = "pipeline>"
646     WAIT_TIME = 1
647     WAIT_TIME_FOR_SCRIPT = 10
648     APP_NAME = "SampleVNF"
649     # we run the VNF interactively, so the ssh command will timeout after this long
650
651     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
652         super(SampleVNF, self).__init__(name, vnfd)
653         self.bin_path = get_nsb_option('bin_path', '')
654
655         self.scenario_helper = ScenarioHelper(self.name)
656         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
657
658         if setup_env_helper_type is None:
659             setup_env_helper_type = SetupEnvHelper
660
661         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
662                                                   self.ssh_helper,
663                                                   self.scenario_helper)
664
665         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
666
667         if resource_helper_type is None:
668             resource_helper_type = ResourceHelper
669
670         self.resource_helper = resource_helper_type(self.setup_helper)
671
672         self.context_cfg = None
673         self.nfvi_context = None
674         self.pipeline_kwargs = {}
675         self.uplink_ports = None
676         self.downlink_ports = None
677         # NOTE(esm): make QueueFileWrapper invert-able so that we
678         #            never have to manage the queues
679         self.q_in = Queue()
680         self.q_out = Queue()
681         self.queue_wrapper = None
682         self.run_kwargs = {}
683         self.used_drivers = {}
684         self.vnf_port_pairs = None
685         self._vnf_process = None
686
687     def _build_ports(self):
688         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
689         self.networks = self._port_pairs.networks
690         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
691         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
692         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
693
694     def _get_route_data(self, route_index, route_type):
695         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
696         for _ in range(route_index):
697             next(route_iter, '')
698         return next(route_iter, {}).get(route_type, '')
699
700     def _get_port0localip6(self):
701         return_value = self._get_route_data(0, 'network')
702         LOG.info("_get_port0localip6 : %s", return_value)
703         return return_value
704
705     def _get_port1localip6(self):
706         return_value = self._get_route_data(1, 'network')
707         LOG.info("_get_port1localip6 : %s", return_value)
708         return return_value
709
710     def _get_port0prefixlen6(self):
711         return_value = self._get_route_data(0, 'netmask')
712         LOG.info("_get_port0prefixlen6 : %s", return_value)
713         return return_value
714
715     def _get_port1prefixlen6(self):
716         return_value = self._get_route_data(1, 'netmask')
717         LOG.info("_get_port1prefixlen6 : %s", return_value)
718         return return_value
719
720     def _get_port0gateway6(self):
721         return_value = self._get_route_data(0, 'network')
722         LOG.info("_get_port0gateway6 : %s", return_value)
723         return return_value
724
725     def _get_port1gateway6(self):
726         return_value = self._get_route_data(1, 'network')
727         LOG.info("_get_port1gateway6 : %s", return_value)
728         return return_value
729
730     def _start_vnf(self):
731         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
732         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
733         self._vnf_process = Process(name=name, target=self._run)
734         self._vnf_process.start()
735
736     def _vnf_up_post(self):
737         pass
738
739     def instantiate(self, scenario_cfg, context_cfg):
740         self.scenario_helper.scenario_cfg = scenario_cfg
741         self.context_cfg = context_cfg
742         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
743         # self.nfvi_context = None
744
745         # vnf deploy is unsupported, use ansible playbooks
746         if self.scenario_helper.options.get("vnf_deploy", False):
747             self.deploy_helper.deploy_vnfs(self.APP_NAME)
748         self.resource_helper.setup()
749         self._start_vnf()
750
751     def wait_for_instantiate(self):
752         buf = []
753         time.sleep(self.WAIT_TIME)  # Give some time for config to load
754         while True:
755             if not self._vnf_process.is_alive():
756                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
757
758             # NOTE(esm): move to QueueFileWrapper
759             while self.q_out.qsize() > 0:
760                 buf.append(self.q_out.get())
761                 message = ''.join(buf)
762                 if self.VNF_PROMPT in message:
763                     LOG.info("%s VNF is up and running.", self.APP_NAME)
764                     self._vnf_up_post()
765                     self.queue_wrapper.clear()
766                     self.resource_helper.start_collect()
767                     return self._vnf_process.exitcode
768
769                 if "PANIC" in message:
770                     raise RuntimeError("Error starting %s VNF." %
771                                        self.APP_NAME)
772
773             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
774             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
775             # Send ENTER to display a new prompt in case the prompt text was corrupted
776             # by other VNF output
777             self.q_in.put('\r\n')
778
779     def _build_run_kwargs(self):
780         self.run_kwargs = {
781             'stdin': self.queue_wrapper,
782             'stdout': self.queue_wrapper,
783             'keep_stdin_open': True,
784             'pty': True,
785             'timeout': self.scenario_helper.timeout,
786         }
787
788     def _build_config(self):
789         return self.setup_helper.build_config()
790
791     def _run(self):
792         # we can't share ssh paramiko objects to force new connection
793         self.ssh_helper.drop_connection()
794         cmd = self._build_config()
795         # kill before starting
796         self.setup_helper.kill_vnf()
797
798         LOG.debug(cmd)
799         self._build_run_kwargs()
800         self.ssh_helper.run(cmd, **self.run_kwargs)
801
802     def vnf_execute(self, cmd, wait_time=2):
803         """ send cmd to vnf process """
804
805         LOG.info("%s command: %s", self.APP_NAME, cmd)
806         self.q_in.put("{}\r\n".format(cmd))
807         time.sleep(wait_time)
808         output = []
809         while self.q_out.qsize() > 0:
810             output.append(self.q_out.get())
811         return "".join(output)
812
813     def _tear_down(self):
814         pass
815
816     def terminate(self):
817         self.vnf_execute("quit")
818         self.setup_helper.kill_vnf()
819         self._tear_down()
820         self.resource_helper.stop_collect()
821         if self._vnf_process is not None:
822             # be proper and join first before we kill
823             LOG.debug("joining before terminate %s", self._vnf_process.name)
824             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
825             self._vnf_process.terminate()
826         # no terminate children here because we share processes with tg
827
828     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
829         """Method for checking the statistics
830
831         This method could be overridden in children classes.
832
833         :return: VNF statistics
834         """
835         cmd = 'p {0} stats'.format(self.APP_WORD)
836         out = self.vnf_execute(cmd)
837         return out
838
839     def collect_kpi(self):
840         # we can't get KPIs if the VNF is down
841         check_if_process_failed(self._vnf_process)
842         stats = self.get_stats()
843         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
844         if m:
845             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
846             result["collect_stats"] = self.resource_helper.collect_kpi()
847         else:
848             result = {
849                 "packets_in": 0,
850                 "packets_fwd": 0,
851                 "packets_dropped": 0,
852             }
853         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
854         return result
855
856     def scale(self, flavor=""):
857         """The SampleVNF base class doesn't provide the 'scale' feature"""
858         raise y_exceptions.FunctionNotImplemented(
859             function_name='scale', class_name='SampleVNFTrafficGen')
860
861
862 class SampleVNFTrafficGen(GenericTrafficGen):
863     """ Class providing file-like API for generic traffic generator """
864
865     APP_NAME = 'Sample'
866     RUN_WAIT = 1
867
868     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
869         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
870         self.bin_path = get_nsb_option('bin_path', '')
871
872         self.scenario_helper = ScenarioHelper(self.name)
873         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
874
875         if setup_env_helper_type is None:
876             setup_env_helper_type = SetupEnvHelper
877
878         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
879                                                   self.ssh_helper,
880                                                   self.scenario_helper)
881
882         if resource_helper_type is None:
883             resource_helper_type = ClientResourceHelper
884
885         self.resource_helper = resource_helper_type(self.setup_helper)
886
887         self.runs_traffic = True
888         self.traffic_finished = False
889         self._tg_process = None
890         self._traffic_process = None
891
892     def _start_server(self):
893         # we can't share ssh paramiko objects to force new connection
894         self.ssh_helper.drop_connection()
895
896     def instantiate(self, scenario_cfg, context_cfg):
897         self.scenario_helper.scenario_cfg = scenario_cfg
898         self.resource_helper.setup()
899         # must generate_cfg after DPDK bind because we need port number
900         self.resource_helper.generate_cfg()
901
902         LOG.info("Starting %s server...", self.APP_NAME)
903         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
904         self._tg_process = Process(name=name, target=self._start_server)
905         self._tg_process.start()
906
907     def _check_status(self):
908         raise NotImplementedError
909
910     def _wait_for_process(self):
911         while True:
912             if not self._tg_process.is_alive():
913                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
914             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
915             time.sleep(1)
916             status = self._check_status()
917             if status == 0:
918                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
919                 return self._tg_process.exitcode
920
921     def _traffic_runner(self, traffic_profile):
922         # always drop connections first thing in new processes
923         # so we don't get paramiko errors
924         self.ssh_helper.drop_connection()
925         LOG.info("Starting %s client...", self.APP_NAME)
926         self.resource_helper.run_traffic(traffic_profile)
927
928     def run_traffic(self, traffic_profile):
929         """ Generate traffic on the wire according to the given params.
930         Method is non-blocking, returns immediately when traffic process
931         is running. Mandatory.
932
933         :param traffic_profile:
934         :return: True/False
935         """
936         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
937                                     os.getpid())
938         self._traffic_process = Process(name=name, target=self._traffic_runner,
939                                         args=(traffic_profile,))
940         self._traffic_process.start()
941         # Wait for traffic process to start
942         while self.resource_helper.client_started.value == 0:
943             time.sleep(self.RUN_WAIT)
944             # what if traffic process takes a few seconds to start?
945             if not self._traffic_process.is_alive():
946                 break
947
948         return self._traffic_process.is_alive()
949
950     def collect_kpi(self):
951         # check if the tg processes have exited
952         for proc in (self._tg_process, self._traffic_process):
953             check_if_process_failed(proc)
954         result = self.resource_helper.collect_kpi()
955         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
956         return result
957
958     def terminate(self):
959         """ After this method finishes, all traffic processes should stop. Mandatory.
960
961         :return: True/False
962         """
963         self.traffic_finished = True
964         # we must kill client before we kill the server, or the client will raise exception
965         if self._traffic_process is not None:
966             # be proper and try to join before terminating
967             LOG.debug("joining before terminate %s", self._traffic_process.name)
968             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
969             self._traffic_process.terminate()
970         if self._tg_process is not None:
971             # be proper and try to join before terminating
972             LOG.debug("joining before terminate %s", self._tg_process.name)
973             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
974             self._tg_process.terminate()
975         # no terminate children here because we share processes with vnf
976
977     def scale(self, flavor=""):
978         """A traffic generator VFN doesn't provide the 'scale' feature"""
979         raise y_exceptions.FunctionNotImplemented(
980             function_name='scale', class_name='SampleVNFTrafficGen')