Merge "SPEC cpu2006 test case for VM"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25 from multiprocessing import Queue, Value, Process
26
27 from six.moves import cStringIO
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54 DEFAULT_VNF_TIMEOUT = 3600
55 PROCESS_JOIN_TIMEOUT = 3
56
57
58 class VnfSshHelper(AutoConnectSSH):
59
60     def __init__(self, node, bin_path, wait=None):
61         self.node = node
62         kwargs = self.args_from_node(self.node)
63         if wait:
64             kwargs.setdefault('wait', wait)
65
66         super(VnfSshHelper, self).__init__(**kwargs)
67         self.bin_path = bin_path
68
69     @staticmethod
70     def get_class():
71         # must return static class name, anything else refers to the calling class
72         # i.e. the subclass, not the superclass
73         return VnfSshHelper
74
75     def copy(self):
76         # this copy constructor is different from SSH classes, since it uses node
77         return self.get_class()(self.node, self.bin_path)
78
79     def upload_config_file(self, prefix, content):
80         cfg_file = os.path.join(REMOTE_TMP, prefix)
81         LOG.debug(content)
82         file_obj = cStringIO(content)
83         self.put_file_obj(file_obj, cfg_file)
84         return cfg_file
85
86     def join_bin_path(self, *args):
87         return os.path.join(self.bin_path, *args)
88
89     def provision_tool(self, tool_path=None, tool_file=None):
90         if tool_path is None:
91             tool_path = self.bin_path
92         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
93
94
95 class SetupEnvHelper(object):
96
97     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
98     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
99     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
100     PIPELINE_COMMAND = ''
101     VNF_TYPE = "SAMPLE"
102
103     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
104         super(SetupEnvHelper, self).__init__()
105         self.vnfd_helper = vnfd_helper
106         self.ssh_helper = ssh_helper
107         self.scenario_helper = scenario_helper
108
109     def build_config(self):
110         raise NotImplementedError
111
112     def setup_vnf_environment(self):
113         pass
114
115     def kill_vnf(self):
116         pass
117
118     def tear_down(self):
119         raise NotImplementedError
120
121
122 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
123
124     APP_NAME = 'DpdkVnf'
125     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
126
127     @staticmethod
128     def _update_packet_type(ip_pipeline_cfg, traffic_options):
129         match_str = 'pkt_type = ipv4'
130         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
131         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
132         return pipeline_config_str
133
134     @classmethod
135     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
136         traffic_type = traffic_options['traffic_type']
137
138         if traffic_options['vnf_type'] is not cls.APP_NAME:
139             match_str = 'traffic_type = 4'
140             replace_str = 'traffic_type = {0}'.format(traffic_type)
141
142         elif traffic_type == 4:
143             match_str = 'pkt_type = ipv4'
144             replace_str = 'pkt_type = ipv4'
145
146         else:
147             match_str = 'pkt_type = ipv4'
148             replace_str = 'pkt_type = ipv6'
149
150         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
151         return pipeline_config_str
152
153     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
154         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
155         self.all_ports = None
156         self.bound_pci = None
157         self.socket = None
158         self.used_drivers = None
159         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
160
161     def _setup_hugepages(self):
162         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
163         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
164
165         memory_path = \
166             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
167         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
168
169         if hugepages == "2048kB":
170             pages = 8192
171         else:
172             pages = 16
173
174         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
175
176     def build_config(self):
177         vnf_cfg = self.scenario_helper.vnf_cfg
178         task_path = self.scenario_helper.task_path
179
180         lb_count = vnf_cfg.get('lb_count', 3)
181         lb_config = vnf_cfg.get('lb_config', 'SW')
182         worker_config = vnf_cfg.get('worker_config', '1C/1T')
183         worker_threads = vnf_cfg.get('worker_threads', 3)
184
185         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
186         traffic_options = {
187             'traffic_type': traffic_type,
188             'pkt_type': 'ipv%s' % traffic_type,
189             'vnf_type': self.VNF_TYPE,
190         }
191
192         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
193         config_basename = posixpath.basename(self.CFG_CONFIG)
194         script_basename = posixpath.basename(self.CFG_SCRIPT)
195         multiport = MultiPortConfig(self.scenario_helper.topology,
196                                     config_tpl_cfg,
197                                     config_basename,
198                                     self.vnfd_helper,
199                                     self.VNF_TYPE,
200                                     lb_count,
201                                     worker_threads,
202                                     worker_config,
203                                     lb_config,
204                                     self.socket)
205
206         multiport.generate_config()
207         with open(self.CFG_CONFIG) as handle:
208             new_config = handle.read()
209
210         new_config = self._update_traffic_type(new_config, traffic_options)
211         new_config = self._update_packet_type(new_config, traffic_options)
212
213         self.ssh_helper.upload_config_file(config_basename, new_config)
214         self.ssh_helper.upload_config_file(script_basename,
215                                            multiport.generate_script(self.vnfd_helper))
216
217         LOG.info("Provision and start the %s", self.APP_NAME)
218         self._build_pipeline_kwargs()
219         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
220
221     def _build_pipeline_kwargs(self):
222         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
223         # count the number of actual ports in the list of pairs
224         # remove duplicate ports
225         # this is really a mapping from LINK ID to DPDK PMD ID
226         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
227         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
228         ports = self.vnfd_helper.port_pairs.all_ports
229         port_nums = self.vnfd_helper.port_nums(ports)
230         # create mask from all the dpdk port numbers
231         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
232         self.pipeline_kwargs = {
233             'cfg_file': self.CFG_CONFIG,
234             'script': self.CFG_SCRIPT,
235             'port_mask_hex': ports_mask_hex,
236             'tool_path': tool_path,
237         }
238
239     def setup_vnf_environment(self):
240         self._setup_dpdk()
241         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
242         self.kill_vnf()
243         # bind before _setup_resources so we can use dpdk_port_num
244         self._detect_and_bind_drivers()
245         resource = self._setup_resources()
246         return resource
247
248     def kill_vnf(self):
249         # pkill is not matching, debug with pgrep
250         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
251         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
252         # have to use exact match
253         # try using killall to match
254         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
255
256     def _setup_dpdk(self):
257         """ setup dpdk environment needed for vnf to run """
258
259         self._setup_hugepages()
260         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
261
262         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
263         if exit_status == 0:
264             return
265
266         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
267         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
268         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
269         if exit_status != 0:
270             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
271
272     def get_collectd_options(self):
273         options = self.scenario_helper.all_options.get("collectd", {})
274         # override with specific node settings
275         options.update(self.scenario_helper.options.get("collectd", {}))
276         return options
277
278     def _setup_resources(self):
279         # what is this magic?  how do we know which socket is for which port?
280         # what about quad-socket?
281         if any(v[5] == "0" for v in self.bound_pci):
282             self.socket = 0
283         else:
284             self.socket = 1
285
286         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
287         # this won't work because we don't have DPDK port numbers yet
288         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
289         port_names = (intf["name"] for intf in ports)
290         collectd_options = self.get_collectd_options()
291         plugins = collectd_options.get("plugins", {})
292         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
293         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
294                                plugins=plugins, interval=collectd_options.get("interval"),
295                                timeout=self.scenario_helper.timeout)
296
297     def _detect_and_bind_drivers(self):
298         interfaces = self.vnfd_helper.interfaces
299
300         self.dpdk_bind_helper.read_status()
301         self.dpdk_bind_helper.save_used_drivers()
302
303         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
304
305         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
306         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
307             try:
308                 intf = next(v for v in interfaces
309                             if vpci == v['virtual-interface']['vpci'])
310                 # force to int
311                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
312             except:  # pylint: disable=bare-except
313                 pass
314         time.sleep(2)
315
316     def get_local_iface_name_by_vpci(self, vpci):
317         find_net_cmd = self.FIND_NET_CMD.format(vpci)
318         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
319         if exit_status == 0:
320             return stdout
321         return None
322
323     def tear_down(self):
324         self.dpdk_bind_helper.rebind_drivers()
325
326
327 class ResourceHelper(object):
328
329     COLLECT_KPI = ''
330     MAKE_INSTALL = 'cd {0} && make && sudo make install'
331     RESOURCE_WORD = 'sample'
332
333     COLLECT_MAP = {}
334
335     def __init__(self, setup_helper):
336         super(ResourceHelper, self).__init__()
337         self.resource = None
338         self.setup_helper = setup_helper
339         self.ssh_helper = setup_helper.ssh_helper
340
341     def setup(self):
342         self.resource = self.setup_helper.setup_vnf_environment()
343
344     def generate_cfg(self):
345         pass
346
347     def _collect_resource_kpi(self):
348         result = {}
349         status = self.resource.check_if_sa_running("collectd")[0]
350         if status == 0:
351             result = self.resource.amqp_collect_nfvi_kpi()
352
353         result = {"core": result}
354         return result
355
356     def start_collect(self):
357         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
358         self.resource.start()
359         self.resource.amqp_process_for_nfvi_kpi()
360
361     def stop_collect(self):
362         if self.resource:
363             self.resource.stop()
364
365     def collect_kpi(self):
366         return self._collect_resource_kpi()
367
368
369 class ClientResourceHelper(ResourceHelper):
370
371     RUN_DURATION = 60
372     QUEUE_WAIT_TIME = 5
373     SYNC_PORT = 1
374     ASYNC_PORT = 2
375
376     def __init__(self, setup_helper):
377         super(ClientResourceHelper, self).__init__(setup_helper)
378         self.vnfd_helper = setup_helper.vnfd_helper
379         self.scenario_helper = setup_helper.scenario_helper
380
381         self.client = None
382         self.client_started = Value('i', 0)
383         self.all_ports = None
384         self._queue = Queue()
385         self._result = {}
386         self._terminated = Value('i', 0)
387
388     def _build_ports(self):
389         self.networks = self.vnfd_helper.port_pairs.networks
390         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
391         self.downlink_ports = \
392             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
393         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
394
395     def port_num(self, intf):
396         # by default return port num
397         return self.vnfd_helper.port_num(intf)
398
399     def get_stats(self, *args, **kwargs):
400         try:
401             return self.client.get_stats(*args, **kwargs)
402         except STLError:
403             LOG.exception("TRex client not connected")
404             return {}
405
406     def generate_samples(self, ports, key=None, default=None):
407         # needs to be used ports
408         last_result = self.get_stats(ports)
409         key_value = last_result.get(key, default)
410
411         if not isinstance(last_result, Mapping):  # added for mock unit test
412             self._terminated.value = 1
413             return {}
414
415         samples = {}
416         # recalculate port for interface and see if it matches ports provided
417         for intf in self.vnfd_helper.interfaces:
418             name = intf["name"]
419             port = self.vnfd_helper.port_num(name)
420             if port in ports:
421                 xe_value = last_result.get(port, {})
422                 samples[name] = {
423                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
424                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
425                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
426                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
427                     "in_packets": int(xe_value.get("ipackets", 0)),
428                     "out_packets": int(xe_value.get("opackets", 0)),
429                 }
430                 if key:
431                     samples[name][key] = key_value
432         return samples
433
434     def _run_traffic_once(self, traffic_profile):
435         traffic_profile.execute_traffic(self)
436         self.client_started.value = 1
437         time.sleep(self.RUN_DURATION)
438         samples = self.generate_samples(traffic_profile.ports)
439         time.sleep(self.QUEUE_WAIT_TIME)
440         self._queue.put(samples)
441
442     def run_traffic(self, traffic_profile):
443         # if we don't do this we can hang waiting for the queue to drain
444         # have to do this in the subprocess
445         self._queue.cancel_join_thread()
446         # fixme: fix passing correct trex config file,
447         # instead of searching the default path
448         try:
449             self._build_ports()
450             self.client = self._connect()
451             self.client.reset(ports=self.all_ports)
452             self.client.remove_all_streams(self.all_ports)  # remove all streams
453             traffic_profile.register_generator(self)
454
455             while self._terminated.value == 0:
456                 self._run_traffic_once(traffic_profile)
457
458             self.client.stop(self.all_ports)
459             self.client.disconnect()
460             self._terminated.value = 0
461         except STLError:
462             if self._terminated.value:
463                 LOG.debug("traffic generator is stopped")
464                 return  # return if trex/tg server is stopped.
465             raise
466
467     def terminate(self):
468         self._terminated.value = 1  # stop client
469
470     def clear_stats(self, ports=None):
471         if ports is None:
472             ports = self.all_ports
473         self.client.clear_stats(ports=ports)
474
475     def start(self, ports=None, *args, **kwargs):
476         # pylint: disable=keyword-arg-before-vararg
477         # NOTE(ralonsoh): defining keyworded arguments before variable
478         # positional arguments is a bug. This function definition doesn't work
479         # in Python 2, although it works in Python 3. Reference:
480         # https://www.python.org/dev/peps/pep-3102/
481         if ports is None:
482             ports = self.all_ports
483         self.client.start(ports=ports, *args, **kwargs)
484
485     def collect_kpi(self):
486         if not self._queue.empty():
487             kpi = self._queue.get()
488             self._result.update(kpi)
489             LOG.debug('Got KPIs from _queue for %s %s',
490                       self.scenario_helper.name, self.RESOURCE_WORD)
491         return self._result
492
493     def _connect(self, client=None):
494         if client is None:
495             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
496                                server=self.vnfd_helper.mgmt_interface["ip"],
497                                verbose_level=LoggerApi.VERBOSE_QUIET)
498
499         # try to connect with 5s intervals, 30s max
500         for idx in range(6):
501             try:
502                 client.connect()
503                 break
504             except STLError:
505                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
506                 time.sleep(5)
507         return client
508
509
510 class Rfc2544ResourceHelper(object):
511
512     DEFAULT_CORRELATED_TRAFFIC = False
513     DEFAULT_LATENCY = False
514     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
515
516     def __init__(self, scenario_helper):
517         super(Rfc2544ResourceHelper, self).__init__()
518         self.scenario_helper = scenario_helper
519         self._correlated_traffic = None
520         self.iteration = Value('i', 0)
521         self._latency = None
522         self._rfc2544 = None
523         self._tolerance_low = None
524         self._tolerance_high = None
525
526     @property
527     def rfc2544(self):
528         if self._rfc2544 is None:
529             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
530         return self._rfc2544
531
532     @property
533     def tolerance_low(self):
534         if self._tolerance_low is None:
535             self.get_rfc_tolerance()
536         return self._tolerance_low
537
538     @property
539     def tolerance_high(self):
540         if self._tolerance_high is None:
541             self.get_rfc_tolerance()
542         return self._tolerance_high
543
544     @property
545     def correlated_traffic(self):
546         if self._correlated_traffic is None:
547             self._correlated_traffic = \
548                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
549
550         return self._correlated_traffic
551
552     @property
553     def latency(self):
554         if self._latency is None:
555             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
556         return self._latency
557
558     def get_rfc2544(self, name, default=None):
559         return self.rfc2544.get(name, default)
560
561     def get_rfc_tolerance(self):
562         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
563         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
564         self._tolerance_low = next(tolerance_iter)
565         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
566
567
568 class SampleVNFDeployHelper(object):
569
570     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
571     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
572     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
573
574     def __init__(self, vnfd_helper, ssh_helper):
575         super(SampleVNFDeployHelper, self).__init__()
576         self.ssh_helper = ssh_helper
577         self.vnfd_helper = vnfd_helper
578
579     def deploy_vnfs(self, app_name):
580         vnf_bin = self.ssh_helper.join_bin_path(app_name)
581         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
582         if not exit_status:
583             return
584
585         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
586         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
587         time.sleep(2)
588         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
589         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
590
591         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
592         time.sleep(2)
593         http_proxy = os.environ.get('http_proxy', '')
594         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
595         LOG.debug(cmd)
596         self.ssh_helper.execute(cmd)
597         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
598         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
599         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
600
601
602 class ScenarioHelper(object):
603
604     DEFAULT_VNF_CFG = {
605         'lb_config': 'SW',
606         'lb_count': 1,
607         'worker_config': '1C/1T',
608         'worker_threads': 1,
609     }
610
611     def __init__(self, name):
612         self.name = name
613         self.scenario_cfg = None
614
615     @property
616     def task_path(self):
617         return self.scenario_cfg['task_path']
618
619     @property
620     def nodes(self):
621         return self.scenario_cfg.get('nodes')
622
623     @property
624     def all_options(self):
625         return self.scenario_cfg.get('options', {})
626
627     @property
628     def options(self):
629         return self.all_options.get(self.name, {})
630
631     @property
632     def vnf_cfg(self):
633         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
634
635     @property
636     def topology(self):
637         return self.scenario_cfg['topology']
638
639     @property
640     def timeout(self):
641         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
642
643
644 class SampleVNF(GenericVNF):
645     """ Class providing file-like API for generic VNF implementation """
646
647     VNF_PROMPT = "pipeline>"
648     WAIT_TIME = 1
649     WAIT_TIME_FOR_SCRIPT = 10
650     APP_NAME = "SampleVNF"
651     # we run the VNF interactively, so the ssh command will timeout after this long
652
653     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
654         super(SampleVNF, self).__init__(name, vnfd)
655         self.bin_path = get_nsb_option('bin_path', '')
656
657         self.scenario_helper = ScenarioHelper(self.name)
658         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
659
660         if setup_env_helper_type is None:
661             setup_env_helper_type = SetupEnvHelper
662
663         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
664                                                   self.ssh_helper,
665                                                   self.scenario_helper)
666
667         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
668
669         if resource_helper_type is None:
670             resource_helper_type = ResourceHelper
671
672         self.resource_helper = resource_helper_type(self.setup_helper)
673
674         self.context_cfg = None
675         self.nfvi_context = None
676         self.pipeline_kwargs = {}
677         self.uplink_ports = None
678         self.downlink_ports = None
679         # NOTE(esm): make QueueFileWrapper invert-able so that we
680         #            never have to manage the queues
681         self.q_in = Queue()
682         self.q_out = Queue()
683         self.queue_wrapper = None
684         self.run_kwargs = {}
685         self.used_drivers = {}
686         self.vnf_port_pairs = None
687         self._vnf_process = None
688
689     def _build_ports(self):
690         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
691         self.networks = self._port_pairs.networks
692         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
693         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
694         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
695
696     def _get_route_data(self, route_index, route_type):
697         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
698         for _ in range(route_index):
699             next(route_iter, '')
700         return next(route_iter, {}).get(route_type, '')
701
702     def _get_port0localip6(self):
703         return_value = self._get_route_data(0, 'network')
704         LOG.info("_get_port0localip6 : %s", return_value)
705         return return_value
706
707     def _get_port1localip6(self):
708         return_value = self._get_route_data(1, 'network')
709         LOG.info("_get_port1localip6 : %s", return_value)
710         return return_value
711
712     def _get_port0prefixlen6(self):
713         return_value = self._get_route_data(0, 'netmask')
714         LOG.info("_get_port0prefixlen6 : %s", return_value)
715         return return_value
716
717     def _get_port1prefixlen6(self):
718         return_value = self._get_route_data(1, 'netmask')
719         LOG.info("_get_port1prefixlen6 : %s", return_value)
720         return return_value
721
722     def _get_port0gateway6(self):
723         return_value = self._get_route_data(0, 'network')
724         LOG.info("_get_port0gateway6 : %s", return_value)
725         return return_value
726
727     def _get_port1gateway6(self):
728         return_value = self._get_route_data(1, 'network')
729         LOG.info("_get_port1gateway6 : %s", return_value)
730         return return_value
731
732     def _start_vnf(self):
733         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
734         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
735         self._vnf_process = Process(name=name, target=self._run)
736         self._vnf_process.start()
737
738     def _vnf_up_post(self):
739         pass
740
741     def instantiate(self, scenario_cfg, context_cfg):
742         self.scenario_helper.scenario_cfg = scenario_cfg
743         self.context_cfg = context_cfg
744         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
745         # self.nfvi_context = None
746
747         # vnf deploy is unsupported, use ansible playbooks
748         if self.scenario_helper.options.get("vnf_deploy", False):
749             self.deploy_helper.deploy_vnfs(self.APP_NAME)
750         self.resource_helper.setup()
751         self._start_vnf()
752
753     def wait_for_instantiate(self):
754         buf = []
755         time.sleep(self.WAIT_TIME)  # Give some time for config to load
756         while True:
757             if not self._vnf_process.is_alive():
758                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
759
760             # NOTE(esm): move to QueueFileWrapper
761             while self.q_out.qsize() > 0:
762                 buf.append(self.q_out.get())
763                 message = ''.join(buf)
764                 if self.VNF_PROMPT in message:
765                     LOG.info("%s VNF is up and running.", self.APP_NAME)
766                     self._vnf_up_post()
767                     self.queue_wrapper.clear()
768                     self.resource_helper.start_collect()
769                     return self._vnf_process.exitcode
770
771                 if "PANIC" in message:
772                     raise RuntimeError("Error starting %s VNF." %
773                                        self.APP_NAME)
774
775             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
776             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
777             # Send ENTER to display a new prompt in case the prompt text was corrupted
778             # by other VNF output
779             self.q_in.put('\r\n')
780
781     def _build_run_kwargs(self):
782         self.run_kwargs = {
783             'stdin': self.queue_wrapper,
784             'stdout': self.queue_wrapper,
785             'keep_stdin_open': True,
786             'pty': True,
787             'timeout': self.scenario_helper.timeout,
788         }
789
790     def _build_config(self):
791         return self.setup_helper.build_config()
792
793     def _run(self):
794         # we can't share ssh paramiko objects to force new connection
795         self.ssh_helper.drop_connection()
796         cmd = self._build_config()
797         # kill before starting
798         self.setup_helper.kill_vnf()
799
800         LOG.debug(cmd)
801         self._build_run_kwargs()
802         self.ssh_helper.run(cmd, **self.run_kwargs)
803
804     def vnf_execute(self, cmd, wait_time=2):
805         """ send cmd to vnf process """
806
807         LOG.info("%s command: %s", self.APP_NAME, cmd)
808         self.q_in.put("{}\r\n".format(cmd))
809         time.sleep(wait_time)
810         output = []
811         while self.q_out.qsize() > 0:
812             output.append(self.q_out.get())
813         return "".join(output)
814
815     def _tear_down(self):
816         pass
817
818     def terminate(self):
819         self.vnf_execute("quit")
820         self.setup_helper.kill_vnf()
821         self._tear_down()
822         self.resource_helper.stop_collect()
823         if self._vnf_process is not None:
824             # be proper and join first before we kill
825             LOG.debug("joining before terminate %s", self._vnf_process.name)
826             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
827             self._vnf_process.terminate()
828         # no terminate children here because we share processes with tg
829
830     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
831         """Method for checking the statistics
832
833         This method could be overridden in children classes.
834
835         :return: VNF statistics
836         """
837         cmd = 'p {0} stats'.format(self.APP_WORD)
838         out = self.vnf_execute(cmd)
839         return out
840
841     def collect_kpi(self):
842         # we can't get KPIs if the VNF is down
843         check_if_process_failed(self._vnf_process)
844         stats = self.get_stats()
845         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
846         if m:
847             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
848             result["collect_stats"] = self.resource_helper.collect_kpi()
849         else:
850             result = {
851                 "packets_in": 0,
852                 "packets_fwd": 0,
853                 "packets_dropped": 0,
854             }
855         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
856         return result
857
858     def scale(self, flavor=""):
859         """The SampleVNF base class doesn't provide the 'scale' feature"""
860         raise y_exceptions.FunctionNotImplemented(
861             function_name='scale', class_name='SampleVNFTrafficGen')
862
863
864 class SampleVNFTrafficGen(GenericTrafficGen):
865     """ Class providing file-like API for generic traffic generator """
866
867     APP_NAME = 'Sample'
868     RUN_WAIT = 1
869
870     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
871         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
872         self.bin_path = get_nsb_option('bin_path', '')
873
874         self.scenario_helper = ScenarioHelper(self.name)
875         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
876
877         if setup_env_helper_type is None:
878             setup_env_helper_type = SetupEnvHelper
879
880         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
881                                                   self.ssh_helper,
882                                                   self.scenario_helper)
883
884         if resource_helper_type is None:
885             resource_helper_type = ClientResourceHelper
886
887         self.resource_helper = resource_helper_type(self.setup_helper)
888
889         self.runs_traffic = True
890         self.traffic_finished = False
891         self._tg_process = None
892         self._traffic_process = None
893
894     def _start_server(self):
895         # we can't share ssh paramiko objects to force new connection
896         self.ssh_helper.drop_connection()
897
898     def instantiate(self, scenario_cfg, context_cfg):
899         self.scenario_helper.scenario_cfg = scenario_cfg
900         self.resource_helper.setup()
901         # must generate_cfg after DPDK bind because we need port number
902         self.resource_helper.generate_cfg()
903
904         LOG.info("Starting %s server...", self.APP_NAME)
905         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
906         self._tg_process = Process(name=name, target=self._start_server)
907         self._tg_process.start()
908
909     def _check_status(self):
910         raise NotImplementedError
911
912     def _wait_for_process(self):
913         while True:
914             if not self._tg_process.is_alive():
915                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
916             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
917             time.sleep(1)
918             status = self._check_status()
919             if status == 0:
920                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
921                 return self._tg_process.exitcode
922
923     def _traffic_runner(self, traffic_profile):
924         # always drop connections first thing in new processes
925         # so we don't get paramiko errors
926         self.ssh_helper.drop_connection()
927         LOG.info("Starting %s client...", self.APP_NAME)
928         self.resource_helper.run_traffic(traffic_profile)
929
930     def run_traffic(self, traffic_profile):
931         """ Generate traffic on the wire according to the given params.
932         Method is non-blocking, returns immediately when traffic process
933         is running. Mandatory.
934
935         :param traffic_profile:
936         :return: True/False
937         """
938         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
939                                     os.getpid())
940         self._traffic_process = Process(name=name, target=self._traffic_runner,
941                                         args=(traffic_profile,))
942         self._traffic_process.start()
943         # Wait for traffic process to start
944         while self.resource_helper.client_started.value == 0:
945             time.sleep(self.RUN_WAIT)
946             # what if traffic process takes a few seconds to start?
947             if not self._traffic_process.is_alive():
948                 break
949
950         return self._traffic_process.is_alive()
951
952     def collect_kpi(self):
953         # check if the tg processes have exited
954         for proc in (self._tg_process, self._traffic_process):
955             check_if_process_failed(proc)
956         result = self.resource_helper.collect_kpi()
957         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
958         return result
959
960     def terminate(self):
961         """ After this method finishes, all traffic processes should stop. Mandatory.
962
963         :return: True/False
964         """
965         self.traffic_finished = True
966         # we must kill client before we kill the server, or the client will raise exception
967         if self._traffic_process is not None:
968             # be proper and try to join before terminating
969             LOG.debug("joining before terminate %s", self._traffic_process.name)
970             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
971             self._traffic_process.terminate()
972         if self._tg_process is not None:
973             # be proper and try to join before terminating
974             LOG.debug("joining before terminate %s", self._tg_process.name)
975             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
976             self._tg_process.terminate()
977         # no terminate children here because we share processes with vnf
978
979     def scale(self, flavor=""):
980         """A traffic generator VFN doesn't provide the 'scale' feature"""
981         raise y_exceptions.FunctionNotImplemented(
982             function_name='scale', class_name='SampleVNFTrafficGen')