Merge "Proposing Abhijit Sinha as a committer in Yardstick"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 from six.moves import cStringIO
23 import subprocess
24 import time
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
42
43
44 DPDK_VERSION = "dpdk-16.07"
45
46 LOG = logging.getLogger(__name__)
47
48
49 REMOTE_TMP = "/tmp"
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
52
53
54 class VnfSshHelper(AutoConnectSSH):
55
56     def __init__(self, node, bin_path, wait=None):
57         self.node = node
58         kwargs = self.args_from_node(self.node)
59         if wait:
60             kwargs.setdefault('wait', wait)
61
62         super(VnfSshHelper, self).__init__(**kwargs)
63         self.bin_path = bin_path
64
65     @staticmethod
66     def get_class():
67         # must return static class name, anything else refers to the calling class
68         # i.e. the subclass, not the superclass
69         return VnfSshHelper
70
71     def copy(self):
72         # this copy constructor is different from SSH classes, since it uses node
73         return self.get_class()(self.node, self.bin_path)
74
75     def upload_config_file(self, prefix, content):
76         cfg_file = os.path.join(REMOTE_TMP, prefix)
77         LOG.debug(content)
78         file_obj = cStringIO(content)
79         self.put_file_obj(file_obj, cfg_file)
80         return cfg_file
81
82     def join_bin_path(self, *args):
83         return os.path.join(self.bin_path, *args)
84
85     def provision_tool(self, tool_path=None, tool_file=None):
86         if tool_path is None:
87             tool_path = self.bin_path
88         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
89
90
91 class SetupEnvHelper(object):
92
93     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
96     PIPELINE_COMMAND = ''
97     VNF_TYPE = "SAMPLE"
98
99     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100         super(SetupEnvHelper, self).__init__()
101         self.vnfd_helper = vnfd_helper
102         self.ssh_helper = ssh_helper
103         self.scenario_helper = scenario_helper
104
105     def build_config(self):
106         raise NotImplementedError
107
108     def setup_vnf_environment(self):
109         pass
110
111     def kill_vnf(self):
112         pass
113
114     def tear_down(self):
115         raise NotImplementedError
116
117
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
119
120     APP_NAME = 'DpdkVnf'
121     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
122
123     @staticmethod
124     def _update_packet_type(ip_pipeline_cfg, traffic_options):
125         match_str = 'pkt_type = ipv4'
126         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128         return pipeline_config_str
129
130     @classmethod
131     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132         traffic_type = traffic_options['traffic_type']
133
134         if traffic_options['vnf_type'] is not cls.APP_NAME:
135             match_str = 'traffic_type = 4'
136             replace_str = 'traffic_type = {0}'.format(traffic_type)
137
138         elif traffic_type == 4:
139             match_str = 'pkt_type = ipv4'
140             replace_str = 'pkt_type = ipv4'
141
142         else:
143             match_str = 'pkt_type = ipv4'
144             replace_str = 'pkt_type = ipv6'
145
146         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147         return pipeline_config_str
148
149     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151         self.all_ports = None
152         self.bound_pci = None
153         self.socket = None
154         self.used_drivers = None
155         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
156
157     def _setup_hugepages(self):
158         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
160
161         memory_path = \
162             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
164
165         if hugepages == "2048kB":
166             pages = 8192
167         else:
168             pages = 16
169
170         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
171
172     def build_config(self):
173         vnf_cfg = self.scenario_helper.vnf_cfg
174         task_path = self.scenario_helper.task_path
175
176         lb_count = vnf_cfg.get('lb_count', 3)
177         lb_config = vnf_cfg.get('lb_config', 'SW')
178         worker_config = vnf_cfg.get('worker_config', '1C/1T')
179         worker_threads = vnf_cfg.get('worker_threads', 3)
180
181         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
182         traffic_options = {
183             'traffic_type': traffic_type,
184             'pkt_type': 'ipv%s' % traffic_type,
185             'vnf_type': self.VNF_TYPE,
186         }
187
188         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189         config_basename = posixpath.basename(self.CFG_CONFIG)
190         script_basename = posixpath.basename(self.CFG_SCRIPT)
191         multiport = MultiPortConfig(self.scenario_helper.topology,
192                                     config_tpl_cfg,
193                                     config_basename,
194                                     self.vnfd_helper,
195                                     self.VNF_TYPE,
196                                     lb_count,
197                                     worker_threads,
198                                     worker_config,
199                                     lb_config,
200                                     self.socket)
201
202         multiport.generate_config()
203         with open(self.CFG_CONFIG) as handle:
204             new_config = handle.read()
205
206         new_config = self._update_traffic_type(new_config, traffic_options)
207         new_config = self._update_packet_type(new_config, traffic_options)
208
209         self.ssh_helper.upload_config_file(config_basename, new_config)
210         self.ssh_helper.upload_config_file(script_basename,
211                                            multiport.generate_script(self.vnfd_helper))
212
213         LOG.info("Provision and start the %s", self.APP_NAME)
214         self._build_pipeline_kwargs()
215         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
216
217     def _build_pipeline_kwargs(self):
218         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219         # count the number of actual ports in the list of pairs
220         # remove duplicate ports
221         # this is really a mapping from LINK ID to DPDK PMD ID
222         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224         ports = self.vnfd_helper.port_pairs.all_ports
225         port_nums = self.vnfd_helper.port_nums(ports)
226         # create mask from all the dpdk port numbers
227         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228         self.pipeline_kwargs = {
229             'cfg_file': self.CFG_CONFIG,
230             'script': self.CFG_SCRIPT,
231             'port_mask_hex': ports_mask_hex,
232             'tool_path': tool_path,
233         }
234
235     def setup_vnf_environment(self):
236         self._setup_dpdk()
237         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
238         self.kill_vnf()
239         # bind before _setup_resources so we can use dpdk_port_num
240         self._detect_and_bind_drivers()
241         resource = self._setup_resources()
242         return resource
243
244     def kill_vnf(self):
245         # pkill is not matching, debug with pgrep
246         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
247         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248         # have to use exact match
249         # try using killall to match
250         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
251
252     def _setup_dpdk(self):
253         """Setup DPDK environment needed for VNF to run"""
254         self._setup_hugepages()
255         self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
256         exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
257         if exit_status:
258             raise y_exceptions.DPDKSetupDriverError()
259
260     def get_collectd_options(self):
261         options = self.scenario_helper.all_options.get("collectd", {})
262         # override with specific node settings
263         options.update(self.scenario_helper.options.get("collectd", {}))
264         return options
265
266     def _setup_resources(self):
267         # what is this magic?  how do we know which socket is for which port?
268         # what about quad-socket?
269         if any(v[5] == "0" for v in self.bound_pci):
270             self.socket = 0
271         else:
272             self.socket = 1
273
274         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
275         # this won't work because we don't have DPDK port numbers yet
276         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
277         port_names = (intf["name"] for intf in ports)
278         collectd_options = self.get_collectd_options()
279         plugins = collectd_options.get("plugins", {})
280         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
281         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
282                                plugins=plugins, interval=collectd_options.get("interval"),
283                                timeout=self.scenario_helper.timeout)
284
285     def _detect_and_bind_drivers(self):
286         interfaces = self.vnfd_helper.interfaces
287
288         self.dpdk_bind_helper.read_status()
289         self.dpdk_bind_helper.save_used_drivers()
290
291         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
292
293         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
294         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
295             try:
296                 intf = next(v for v in interfaces
297                             if vpci == v['virtual-interface']['vpci'])
298                 # force to int
299                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
300             except:  # pylint: disable=bare-except
301                 pass
302         time.sleep(2)
303
304     def get_local_iface_name_by_vpci(self, vpci):
305         find_net_cmd = self.FIND_NET_CMD.format(vpci)
306         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
307         if exit_status == 0:
308             return stdout
309         return None
310
311     def tear_down(self):
312         self.dpdk_bind_helper.rebind_drivers()
313
314
315 class ResourceHelper(object):
316
317     COLLECT_KPI = ''
318     MAKE_INSTALL = 'cd {0} && make && sudo make install'
319     RESOURCE_WORD = 'sample'
320
321     COLLECT_MAP = {}
322
323     def __init__(self, setup_helper):
324         super(ResourceHelper, self).__init__()
325         self.resource = None
326         self.setup_helper = setup_helper
327         self.ssh_helper = setup_helper.ssh_helper
328
329     def setup(self):
330         self.resource = self.setup_helper.setup_vnf_environment()
331
332     def generate_cfg(self):
333         pass
334
335     def _collect_resource_kpi(self):
336         result = {}
337         status = self.resource.check_if_system_agent_running("collectd")[0]
338         if status == 0:
339             result = self.resource.amqp_collect_nfvi_kpi()
340
341         result = {"core": result}
342         return result
343
344     def start_collect(self):
345         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
346         self.resource.start()
347         self.resource.amqp_process_for_nfvi_kpi()
348
349     def stop_collect(self):
350         if self.resource:
351             self.resource.stop()
352
353     def collect_kpi(self):
354         return self._collect_resource_kpi()
355
356
357 class ClientResourceHelper(ResourceHelper):
358
359     RUN_DURATION = 60
360     QUEUE_WAIT_TIME = 5
361     SYNC_PORT = 1
362     ASYNC_PORT = 2
363
364     def __init__(self, setup_helper):
365         super(ClientResourceHelper, self).__init__(setup_helper)
366         self.vnfd_helper = setup_helper.vnfd_helper
367         self.scenario_helper = setup_helper.scenario_helper
368
369         self.client = None
370         self.client_started = Value('i', 0)
371         self.all_ports = None
372         self._queue = Queue()
373         self._result = {}
374         self._terminated = Value('i', 0)
375
376     def _build_ports(self):
377         self.networks = self.vnfd_helper.port_pairs.networks
378         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
379         self.downlink_ports = \
380             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
381         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
382
383     def port_num(self, intf):
384         # by default return port num
385         return self.vnfd_helper.port_num(intf)
386
387     def get_stats(self, *args, **kwargs):
388         try:
389             return self.client.get_stats(*args, **kwargs)
390         except STLError:
391             LOG.exception("TRex client not connected")
392             return {}
393
394     def generate_samples(self, ports, key=None, default=None):
395         # needs to be used ports
396         last_result = self.get_stats(ports)
397         key_value = last_result.get(key, default)
398
399         if not isinstance(last_result, Mapping):  # added for mock unit test
400             self._terminated.value = 1
401             return {}
402
403         samples = {}
404         # recalculate port for interface and see if it matches ports provided
405         for intf in self.vnfd_helper.interfaces:
406             name = intf["name"]
407             port = self.vnfd_helper.port_num(name)
408             if port in ports:
409                 xe_value = last_result.get(port, {})
410                 samples[name] = {
411                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
412                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
413                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
414                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
415                     "in_packets": int(xe_value.get("ipackets", 0)),
416                     "out_packets": int(xe_value.get("opackets", 0)),
417                 }
418                 if key:
419                     samples[name][key] = key_value
420         return samples
421
422     def _run_traffic_once(self, traffic_profile):
423         traffic_profile.execute_traffic(self)
424         self.client_started.value = 1
425         time.sleep(self.RUN_DURATION)
426         samples = self.generate_samples(traffic_profile.ports)
427         time.sleep(self.QUEUE_WAIT_TIME)
428         self._queue.put(samples)
429
430     def run_traffic(self, traffic_profile):
431         # if we don't do this we can hang waiting for the queue to drain
432         # have to do this in the subprocess
433         self._queue.cancel_join_thread()
434         # fixme: fix passing correct trex config file,
435         # instead of searching the default path
436         try:
437             self._build_ports()
438             self.client = self._connect()
439             self.client.reset(ports=self.all_ports)
440             self.client.remove_all_streams(self.all_ports)  # remove all streams
441             traffic_profile.register_generator(self)
442
443             while self._terminated.value == 0:
444                 self._run_traffic_once(traffic_profile)
445
446             self.client.stop(self.all_ports)
447             self.client.disconnect()
448             self._terminated.value = 0
449         except STLError:
450             if self._terminated.value:
451                 LOG.debug("traffic generator is stopped")
452                 return  # return if trex/tg server is stopped.
453             raise
454
455     def terminate(self):
456         self._terminated.value = 1  # stop client
457
458     def clear_stats(self, ports=None):
459         if ports is None:
460             ports = self.all_ports
461         self.client.clear_stats(ports=ports)
462
463     def start(self, ports=None, *args, **kwargs):
464         # pylint: disable=keyword-arg-before-vararg
465         # NOTE(ralonsoh): defining keyworded arguments before variable
466         # positional arguments is a bug. This function definition doesn't work
467         # in Python 2, although it works in Python 3. Reference:
468         # https://www.python.org/dev/peps/pep-3102/
469         if ports is None:
470             ports = self.all_ports
471         self.client.start(ports=ports, *args, **kwargs)
472
473     def collect_kpi(self):
474         if not self._queue.empty():
475             kpi = self._queue.get()
476             self._result.update(kpi)
477             LOG.debug('Got KPIs from _queue for %s %s',
478                       self.scenario_helper.name, self.RESOURCE_WORD)
479         return self._result
480
481     def _connect(self, client=None):
482         if client is None:
483             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
484                                server=self.vnfd_helper.mgmt_interface["ip"],
485                                verbose_level=LoggerApi.VERBOSE_QUIET)
486
487         # try to connect with 5s intervals, 30s max
488         for idx in range(6):
489             try:
490                 client.connect()
491                 break
492             except STLError:
493                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
494                 time.sleep(5)
495         return client
496
497
498 class Rfc2544ResourceHelper(object):
499
500     DEFAULT_CORRELATED_TRAFFIC = False
501     DEFAULT_LATENCY = False
502     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
503
504     def __init__(self, scenario_helper):
505         super(Rfc2544ResourceHelper, self).__init__()
506         self.scenario_helper = scenario_helper
507         self._correlated_traffic = None
508         self.iteration = Value('i', 0)
509         self._latency = None
510         self._rfc2544 = None
511         self._tolerance_low = None
512         self._tolerance_high = None
513
514     @property
515     def rfc2544(self):
516         if self._rfc2544 is None:
517             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
518         return self._rfc2544
519
520     @property
521     def tolerance_low(self):
522         if self._tolerance_low is None:
523             self.get_rfc_tolerance()
524         return self._tolerance_low
525
526     @property
527     def tolerance_high(self):
528         if self._tolerance_high is None:
529             self.get_rfc_tolerance()
530         return self._tolerance_high
531
532     @property
533     def correlated_traffic(self):
534         if self._correlated_traffic is None:
535             self._correlated_traffic = \
536                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
537
538         return self._correlated_traffic
539
540     @property
541     def latency(self):
542         if self._latency is None:
543             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
544         return self._latency
545
546     def get_rfc2544(self, name, default=None):
547         return self.rfc2544.get(name, default)
548
549     def get_rfc_tolerance(self):
550         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
551         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
552         self._tolerance_low = next(tolerance_iter)
553         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
554
555
556 class SampleVNFDeployHelper(object):
557
558     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
559     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
560     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
561
562     def __init__(self, vnfd_helper, ssh_helper):
563         super(SampleVNFDeployHelper, self).__init__()
564         self.ssh_helper = ssh_helper
565         self.vnfd_helper = vnfd_helper
566
567     def deploy_vnfs(self, app_name):
568         vnf_bin = self.ssh_helper.join_bin_path(app_name)
569         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
570         if not exit_status:
571             return
572
573         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
574         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
575         time.sleep(2)
576         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
577         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
578
579         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
580         time.sleep(2)
581         http_proxy = os.environ.get('http_proxy', '')
582         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
583         LOG.debug(cmd)
584         self.ssh_helper.execute(cmd)
585         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
586         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
587         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
588
589
590 class ScenarioHelper(object):
591
592     DEFAULT_VNF_CFG = {
593         'lb_config': 'SW',
594         'lb_count': 1,
595         'worker_config': '1C/1T',
596         'worker_threads': 1,
597     }
598
599     def __init__(self, name):
600         self.name = name
601         self.scenario_cfg = None
602
603     @property
604     def task_path(self):
605         return self.scenario_cfg['task_path']
606
607     @property
608     def nodes(self):
609         return self.scenario_cfg.get('nodes')
610
611     @property
612     def all_options(self):
613         return self.scenario_cfg.get('options', {})
614
615     @property
616     def options(self):
617         return self.all_options.get(self.name, {})
618
619     @property
620     def vnf_cfg(self):
621         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
622
623     @property
624     def topology(self):
625         return self.scenario_cfg['topology']
626
627     @property
628     def timeout(self):
629         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
630
631
632 class SampleVNF(GenericVNF):
633     """ Class providing file-like API for generic VNF implementation """
634
635     VNF_PROMPT = "pipeline>"
636     WAIT_TIME = 1
637     WAIT_TIME_FOR_SCRIPT = 10
638     APP_NAME = "SampleVNF"
639     # we run the VNF interactively, so the ssh command will timeout after this long
640
641     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
642         super(SampleVNF, self).__init__(name, vnfd)
643         self.bin_path = get_nsb_option('bin_path', '')
644
645         self.scenario_helper = ScenarioHelper(self.name)
646         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
647
648         if setup_env_helper_type is None:
649             setup_env_helper_type = SetupEnvHelper
650
651         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
652                                                   self.ssh_helper,
653                                                   self.scenario_helper)
654
655         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
656
657         if resource_helper_type is None:
658             resource_helper_type = ResourceHelper
659
660         self.resource_helper = resource_helper_type(self.setup_helper)
661
662         self.context_cfg = None
663         self.nfvi_context = None
664         self.pipeline_kwargs = {}
665         self.uplink_ports = None
666         self.downlink_ports = None
667         # NOTE(esm): make QueueFileWrapper invert-able so that we
668         #            never have to manage the queues
669         self.q_in = Queue()
670         self.q_out = Queue()
671         self.queue_wrapper = None
672         self.run_kwargs = {}
673         self.used_drivers = {}
674         self.vnf_port_pairs = None
675         self._vnf_process = None
676
677     def _build_ports(self):
678         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
679         self.networks = self._port_pairs.networks
680         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
681         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
682         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
683
684     def _get_route_data(self, route_index, route_type):
685         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
686         for _ in range(route_index):
687             next(route_iter, '')
688         return next(route_iter, {}).get(route_type, '')
689
690     def _get_port0localip6(self):
691         return_value = self._get_route_data(0, 'network')
692         LOG.info("_get_port0localip6 : %s", return_value)
693         return return_value
694
695     def _get_port1localip6(self):
696         return_value = self._get_route_data(1, 'network')
697         LOG.info("_get_port1localip6 : %s", return_value)
698         return return_value
699
700     def _get_port0prefixlen6(self):
701         return_value = self._get_route_data(0, 'netmask')
702         LOG.info("_get_port0prefixlen6 : %s", return_value)
703         return return_value
704
705     def _get_port1prefixlen6(self):
706         return_value = self._get_route_data(1, 'netmask')
707         LOG.info("_get_port1prefixlen6 : %s", return_value)
708         return return_value
709
710     def _get_port0gateway6(self):
711         return_value = self._get_route_data(0, 'network')
712         LOG.info("_get_port0gateway6 : %s", return_value)
713         return return_value
714
715     def _get_port1gateway6(self):
716         return_value = self._get_route_data(1, 'network')
717         LOG.info("_get_port1gateway6 : %s", return_value)
718         return return_value
719
720     def _start_vnf(self):
721         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
722         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
723         self._vnf_process = Process(name=name, target=self._run)
724         self._vnf_process.start()
725
726     def _vnf_up_post(self):
727         pass
728
729     def instantiate(self, scenario_cfg, context_cfg):
730         self.scenario_helper.scenario_cfg = scenario_cfg
731         self.context_cfg = context_cfg
732         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
733         # self.nfvi_context = None
734
735         # vnf deploy is unsupported, use ansible playbooks
736         if self.scenario_helper.options.get("vnf_deploy", False):
737             self.deploy_helper.deploy_vnfs(self.APP_NAME)
738         self.resource_helper.setup()
739         self._start_vnf()
740
741     def wait_for_instantiate(self):
742         buf = []
743         time.sleep(self.WAIT_TIME)  # Give some time for config to load
744         while True:
745             if not self._vnf_process.is_alive():
746                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
747
748             # NOTE(esm): move to QueueFileWrapper
749             while self.q_out.qsize() > 0:
750                 buf.append(self.q_out.get())
751                 message = ''.join(buf)
752                 if self.VNF_PROMPT in message:
753                     LOG.info("%s VNF is up and running.", self.APP_NAME)
754                     self._vnf_up_post()
755                     self.queue_wrapper.clear()
756                     self.resource_helper.start_collect()
757                     return self._vnf_process.exitcode
758
759                 if "PANIC" in message:
760                     raise RuntimeError("Error starting %s VNF." %
761                                        self.APP_NAME)
762
763             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
764             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
765             # Send ENTER to display a new prompt in case the prompt text was corrupted
766             # by other VNF output
767             self.q_in.put('\r\n')
768
769     def _build_run_kwargs(self):
770         self.run_kwargs = {
771             'stdin': self.queue_wrapper,
772             'stdout': self.queue_wrapper,
773             'keep_stdin_open': True,
774             'pty': True,
775             'timeout': self.scenario_helper.timeout,
776         }
777
778     def _build_config(self):
779         return self.setup_helper.build_config()
780
781     def _run(self):
782         # we can't share ssh paramiko objects to force new connection
783         self.ssh_helper.drop_connection()
784         cmd = self._build_config()
785         # kill before starting
786         self.setup_helper.kill_vnf()
787
788         LOG.debug(cmd)
789         self._build_run_kwargs()
790         self.ssh_helper.run(cmd, **self.run_kwargs)
791
792     def vnf_execute(self, cmd, wait_time=2):
793         """ send cmd to vnf process """
794
795         LOG.info("%s command: %s", self.APP_NAME, cmd)
796         self.q_in.put("{}\r\n".format(cmd))
797         time.sleep(wait_time)
798         output = []
799         while self.q_out.qsize() > 0:
800             output.append(self.q_out.get())
801         return "".join(output)
802
803     def _tear_down(self):
804         pass
805
806     def terminate(self):
807         self.vnf_execute("quit")
808         self.setup_helper.kill_vnf()
809         self._tear_down()
810         self.resource_helper.stop_collect()
811         if self._vnf_process is not None:
812             # be proper and join first before we kill
813             LOG.debug("joining before terminate %s", self._vnf_process.name)
814             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
815             self._vnf_process.terminate()
816         # no terminate children here because we share processes with tg
817
818     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
819         """Method for checking the statistics
820
821         This method could be overridden in children classes.
822
823         :return: VNF statistics
824         """
825         cmd = 'p {0} stats'.format(self.APP_WORD)
826         out = self.vnf_execute(cmd)
827         return out
828
829     def collect_kpi(self):
830         # we can't get KPIs if the VNF is down
831         check_if_process_failed(self._vnf_process)
832         stats = self.get_stats()
833         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
834         if m:
835             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
836             result["collect_stats"] = self.resource_helper.collect_kpi()
837         else:
838             result = {
839                 "packets_in": 0,
840                 "packets_fwd": 0,
841                 "packets_dropped": 0,
842             }
843         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
844         return result
845
846     def scale(self, flavor=""):
847         """The SampleVNF base class doesn't provide the 'scale' feature"""
848         raise y_exceptions.FunctionNotImplemented(
849             function_name='scale', class_name='SampleVNFTrafficGen')
850
851
852 class SampleVNFTrafficGen(GenericTrafficGen):
853     """ Class providing file-like API for generic traffic generator """
854
855     APP_NAME = 'Sample'
856     RUN_WAIT = 1
857
858     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
859         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
860         self.bin_path = get_nsb_option('bin_path', '')
861
862         self.scenario_helper = ScenarioHelper(self.name)
863         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
864
865         if setup_env_helper_type is None:
866             setup_env_helper_type = SetupEnvHelper
867
868         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
869                                                   self.ssh_helper,
870                                                   self.scenario_helper)
871
872         if resource_helper_type is None:
873             resource_helper_type = ClientResourceHelper
874
875         self.resource_helper = resource_helper_type(self.setup_helper)
876
877         self.runs_traffic = True
878         self.traffic_finished = False
879         self._tg_process = None
880         self._traffic_process = None
881
882     def _start_server(self):
883         # we can't share ssh paramiko objects to force new connection
884         self.ssh_helper.drop_connection()
885
886     def instantiate(self, scenario_cfg, context_cfg):
887         self.scenario_helper.scenario_cfg = scenario_cfg
888         self.resource_helper.setup()
889         # must generate_cfg after DPDK bind because we need port number
890         self.resource_helper.generate_cfg()
891
892         LOG.info("Starting %s server...", self.APP_NAME)
893         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
894         self._tg_process = Process(name=name, target=self._start_server)
895         self._tg_process.start()
896
897     def _check_status(self):
898         raise NotImplementedError
899
900     def _wait_for_process(self):
901         while True:
902             if not self._tg_process.is_alive():
903                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
904             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
905             time.sleep(1)
906             status = self._check_status()
907             if status == 0:
908                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
909                 return self._tg_process.exitcode
910
911     def _traffic_runner(self, traffic_profile):
912         # always drop connections first thing in new processes
913         # so we don't get paramiko errors
914         self.ssh_helper.drop_connection()
915         LOG.info("Starting %s client...", self.APP_NAME)
916         self.resource_helper.run_traffic(traffic_profile)
917
918     def run_traffic(self, traffic_profile):
919         """ Generate traffic on the wire according to the given params.
920         Method is non-blocking, returns immediately when traffic process
921         is running. Mandatory.
922
923         :param traffic_profile:
924         :return: True/False
925         """
926         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
927                                     os.getpid())
928         self._traffic_process = Process(name=name, target=self._traffic_runner,
929                                         args=(traffic_profile,))
930         self._traffic_process.start()
931         # Wait for traffic process to start
932         while self.resource_helper.client_started.value == 0:
933             time.sleep(self.RUN_WAIT)
934             # what if traffic process takes a few seconds to start?
935             if not self._traffic_process.is_alive():
936                 break
937
938         return self._traffic_process.is_alive()
939
940     def collect_kpi(self):
941         # check if the tg processes have exited
942         for proc in (self._tg_process, self._traffic_process):
943             check_if_process_failed(proc)
944         result = self.resource_helper.collect_kpi()
945         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
946         return result
947
948     def terminate(self):
949         """ After this method finishes, all traffic processes should stop. Mandatory.
950
951         :return: True/False
952         """
953         self.traffic_finished = True
954         # we must kill client before we kill the server, or the client will raise exception
955         if self._traffic_process is not None:
956             # be proper and try to join before terminating
957             LOG.debug("joining before terminate %s", self._traffic_process.name)
958             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
959             self._traffic_process.terminate()
960         if self._tg_process is not None:
961             # be proper and try to join before terminating
962             LOG.debug("joining before terminate %s", self._tg_process.name)
963             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
964             self._tg_process.terminate()
965         # no terminate children here because we share processes with vnf
966
967     def scale(self, flavor=""):
968         """A traffic generator VFN doesn't provide the 'scale' feature"""
969         raise y_exceptions.FunctionNotImplemented(
970             function_name='scale', class_name='SampleVNFTrafficGen')