Merge "Refactor userguide "Yardstick Installation""
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25 from multiprocessing import Queue, Value, Process
26
27 from six.moves import cStringIO
28
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
35 from yardstick.network_services.nfvi.resource import ResourceProfile
36 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
37 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.utils import get_nsb_option
40
41 from trex_stl_lib.trex_stl_client import STLClient
42 from trex_stl_lib.trex_stl_client import LoggerApi
43 from trex_stl_lib.trex_stl_exceptions import STLError
44
45 from yardstick.ssh import AutoConnectSSH
46
47 DPDK_VERSION = "dpdk-16.07"
48
49 LOG = logging.getLogger(__name__)
50
51
52 REMOTE_TMP = "/tmp"
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
55
56
57 class VnfSshHelper(AutoConnectSSH):
58
59     def __init__(self, node, bin_path, wait=None):
60         self.node = node
61         kwargs = self.args_from_node(self.node)
62         if wait:
63             kwargs.setdefault('wait', wait)
64
65         super(VnfSshHelper, self).__init__(**kwargs)
66         self.bin_path = bin_path
67
68     @staticmethod
69     def get_class():
70         # must return static class name, anything else refers to the calling class
71         # i.e. the subclass, not the superclass
72         return VnfSshHelper
73
74     def copy(self):
75         # this copy constructor is different from SSH classes, since it uses node
76         return self.get_class()(self.node, self.bin_path)
77
78     def upload_config_file(self, prefix, content):
79         cfg_file = os.path.join(REMOTE_TMP, prefix)
80         LOG.debug(content)
81         file_obj = cStringIO(content)
82         self.put_file_obj(file_obj, cfg_file)
83         return cfg_file
84
85     def join_bin_path(self, *args):
86         return os.path.join(self.bin_path, *args)
87
88     def provision_tool(self, tool_path=None, tool_file=None):
89         if tool_path is None:
90             tool_path = self.bin_path
91         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
92
93
94 class SetupEnvHelper(object):
95
96     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def build_config(self):
109         raise NotImplementedError
110
111     def setup_vnf_environment(self):
112         pass
113
114     def kill_vnf(self):
115         pass
116
117     def tear_down(self):
118         raise NotImplementedError
119
120
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
122
123     APP_NAME = 'DpdkVnf'
124     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125
126     @staticmethod
127     def _update_packet_type(ip_pipeline_cfg, traffic_options):
128         match_str = 'pkt_type = ipv4'
129         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
130         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
131         return pipeline_config_str
132
133     @classmethod
134     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
135         traffic_type = traffic_options['traffic_type']
136
137         if traffic_options['vnf_type'] is not cls.APP_NAME:
138             match_str = 'traffic_type = 4'
139             replace_str = 'traffic_type = {0}'.format(traffic_type)
140
141         elif traffic_type == 4:
142             match_str = 'pkt_type = ipv4'
143             replace_str = 'pkt_type = ipv4'
144
145         else:
146             match_str = 'pkt_type = ipv4'
147             replace_str = 'pkt_type = ipv6'
148
149         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
150         return pipeline_config_str
151
152     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
153         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
154         self.all_ports = None
155         self.bound_pci = None
156         self.socket = None
157         self.used_drivers = None
158         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
159
160     def _setup_hugepages(self):
161         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
162         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
163
164         memory_path = \
165             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
166         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
167
168         if hugepages == "2048kB":
169             pages = 8192
170         else:
171             pages = 16
172
173         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
174
175     def build_config(self):
176         vnf_cfg = self.scenario_helper.vnf_cfg
177         task_path = self.scenario_helper.task_path
178
179         lb_count = vnf_cfg.get('lb_count', 3)
180         lb_config = vnf_cfg.get('lb_config', 'SW')
181         worker_config = vnf_cfg.get('worker_config', '1C/1T')
182         worker_threads = vnf_cfg.get('worker_threads', 3)
183
184         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
185         traffic_options = {
186             'traffic_type': traffic_type,
187             'pkt_type': 'ipv%s' % traffic_type,
188             'vnf_type': self.VNF_TYPE,
189         }
190
191         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
192         config_basename = posixpath.basename(self.CFG_CONFIG)
193         script_basename = posixpath.basename(self.CFG_SCRIPT)
194         multiport = MultiPortConfig(self.scenario_helper.topology,
195                                     config_tpl_cfg,
196                                     config_basename,
197                                     self.vnfd_helper,
198                                     self.VNF_TYPE,
199                                     lb_count,
200                                     worker_threads,
201                                     worker_config,
202                                     lb_config,
203                                     self.socket)
204
205         multiport.generate_config()
206         with open(self.CFG_CONFIG) as handle:
207             new_config = handle.read()
208
209         new_config = self._update_traffic_type(new_config, traffic_options)
210         new_config = self._update_packet_type(new_config, traffic_options)
211
212         self.ssh_helper.upload_config_file(config_basename, new_config)
213         self.ssh_helper.upload_config_file(script_basename,
214                                            multiport.generate_script(self.vnfd_helper))
215
216         LOG.info("Provision and start the %s", self.APP_NAME)
217         self._build_pipeline_kwargs()
218         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
219
220     def _build_pipeline_kwargs(self):
221         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
222         # count the number of actual ports in the list of pairs
223         # remove duplicate ports
224         # this is really a mapping from LINK ID to DPDK PMD ID
225         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
226         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
227         ports = self.vnfd_helper.port_pairs.all_ports
228         port_nums = self.vnfd_helper.port_nums(ports)
229         # create mask from all the dpdk port numbers
230         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
231         self.pipeline_kwargs = {
232             'cfg_file': self.CFG_CONFIG,
233             'script': self.CFG_SCRIPT,
234             'port_mask_hex': ports_mask_hex,
235             'tool_path': tool_path,
236         }
237
238     def setup_vnf_environment(self):
239         self._setup_dpdk()
240         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
241         self.kill_vnf()
242         # bind before _setup_resources so we can use dpdk_port_num
243         self._detect_and_bind_drivers()
244         resource = self._setup_resources()
245         return resource
246
247     def kill_vnf(self):
248         # pkill is not matching, debug with pgrep
249         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
250         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
251         # have to use exact match
252         # try using killall to match
253         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
254
255     def _setup_dpdk(self):
256         """ setup dpdk environment needed for vnf to run """
257
258         self._setup_hugepages()
259         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
260
261         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
262         if exit_status == 0:
263             return
264
265         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
266         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
267         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
268         if exit_status != 0:
269             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
270
271     def get_collectd_options(self):
272         options = self.scenario_helper.all_options.get("collectd", {})
273         # override with specific node settings
274         options.update(self.scenario_helper.options.get("collectd", {}))
275         return options
276
277     def _setup_resources(self):
278         # what is this magic?  how do we know which socket is for which port?
279         # what about quad-socket?
280         if any(v[5] == "0" for v in self.bound_pci):
281             self.socket = 0
282         else:
283             self.socket = 1
284
285         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
286         # this won't work because we don't have DPDK port numbers yet
287         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
288         port_names = (intf["name"] for intf in ports)
289         collectd_options = self.get_collectd_options()
290         plugins = collectd_options.get("plugins", {})
291         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
292         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
293                                plugins=plugins, interval=collectd_options.get("interval"),
294                                timeout=self.scenario_helper.timeout)
295
296     def _detect_and_bind_drivers(self):
297         interfaces = self.vnfd_helper.interfaces
298
299         self.dpdk_bind_helper.read_status()
300         self.dpdk_bind_helper.save_used_drivers()
301
302         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
303
304         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
305         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
306             try:
307                 intf = next(v for v in interfaces
308                             if vpci == v['virtual-interface']['vpci'])
309                 # force to int
310                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
311             except:
312                 pass
313         time.sleep(2)
314
315     def get_local_iface_name_by_vpci(self, vpci):
316         find_net_cmd = self.FIND_NET_CMD.format(vpci)
317         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
318         if exit_status == 0:
319             return stdout
320         return None
321
322     def tear_down(self):
323         self.dpdk_bind_helper.rebind_drivers()
324
325
326 class ResourceHelper(object):
327
328     COLLECT_KPI = ''
329     MAKE_INSTALL = 'cd {0} && make && sudo make install'
330     RESOURCE_WORD = 'sample'
331
332     COLLECT_MAP = {}
333
334     def __init__(self, setup_helper):
335         super(ResourceHelper, self).__init__()
336         self.resource = None
337         self.setup_helper = setup_helper
338         self.ssh_helper = setup_helper.ssh_helper
339
340     def setup(self):
341         self.resource = self.setup_helper.setup_vnf_environment()
342
343     def generate_cfg(self):
344         pass
345
346     def _collect_resource_kpi(self):
347         result = {}
348         status = self.resource.check_if_sa_running("collectd")[0]
349         if status == 0:
350             result = self.resource.amqp_collect_nfvi_kpi()
351
352         result = {"core": result}
353         return result
354
355     def start_collect(self):
356         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
357         self.resource.start()
358         self.resource.amqp_process_for_nfvi_kpi()
359
360     def stop_collect(self):
361         if self.resource:
362             self.resource.stop()
363
364     def collect_kpi(self):
365         return self._collect_resource_kpi()
366
367
368 class ClientResourceHelper(ResourceHelper):
369
370     RUN_DURATION = 60
371     QUEUE_WAIT_TIME = 5
372     SYNC_PORT = 1
373     ASYNC_PORT = 2
374
375     def __init__(self, setup_helper):
376         super(ClientResourceHelper, self).__init__(setup_helper)
377         self.vnfd_helper = setup_helper.vnfd_helper
378         self.scenario_helper = setup_helper.scenario_helper
379
380         self.client = None
381         self.client_started = Value('i', 0)
382         self.all_ports = None
383         self._queue = Queue()
384         self._result = {}
385         self._terminated = Value('i', 0)
386
387     def _build_ports(self):
388         self.networks = self.vnfd_helper.port_pairs.networks
389         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
390         self.downlink_ports = \
391             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
392         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
393
394     def port_num(self, intf):
395         # by default return port num
396         return self.vnfd_helper.port_num(intf)
397
398     def get_stats(self, *args, **kwargs):
399         try:
400             return self.client.get_stats(*args, **kwargs)
401         except STLError:
402             LOG.exception("TRex client not connected")
403             return {}
404
405     def generate_samples(self, ports, key=None, default=None):
406         # needs to be used ports
407         last_result = self.get_stats(ports)
408         key_value = last_result.get(key, default)
409
410         if not isinstance(last_result, Mapping):  # added for mock unit test
411             self._terminated.value = 1
412             return {}
413
414         samples = {}
415         # recalculate port for interface and see if it matches ports provided
416         for intf in self.vnfd_helper.interfaces:
417             name = intf["name"]
418             port = self.vnfd_helper.port_num(name)
419             if port in ports:
420                 xe_value = last_result.get(port, {})
421                 samples[name] = {
422                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
423                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
424                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
425                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
426                     "in_packets": int(xe_value.get("ipackets", 0)),
427                     "out_packets": int(xe_value.get("opackets", 0)),
428                 }
429                 if key:
430                     samples[name][key] = key_value
431         return samples
432
433     def _run_traffic_once(self, traffic_profile):
434         traffic_profile.execute_traffic(self)
435         self.client_started.value = 1
436         time.sleep(self.RUN_DURATION)
437         samples = self.generate_samples(traffic_profile.ports)
438         time.sleep(self.QUEUE_WAIT_TIME)
439         self._queue.put(samples)
440
441     def run_traffic(self, traffic_profile):
442         # if we don't do this we can hang waiting for the queue to drain
443         # have to do this in the subprocess
444         self._queue.cancel_join_thread()
445         # fixme: fix passing correct trex config file,
446         # instead of searching the default path
447         try:
448             self._build_ports()
449             self.client = self._connect()
450             self.client.reset(ports=self.all_ports)
451             self.client.remove_all_streams(self.all_ports)  # remove all streams
452             traffic_profile.register_generator(self)
453
454             while self._terminated.value == 0:
455                 self._run_traffic_once(traffic_profile)
456
457             self.client.stop(self.all_ports)
458             self.client.disconnect()
459             self._terminated.value = 0
460         except STLError:
461             if self._terminated.value:
462                 LOG.debug("traffic generator is stopped")
463                 return  # return if trex/tg server is stopped.
464             raise
465
466     def terminate(self):
467         self._terminated.value = 1  # stop client
468
469     def clear_stats(self, ports=None):
470         if ports is None:
471             ports = self.all_ports
472         self.client.clear_stats(ports=ports)
473
474     def start(self, ports=None, *args, **kwargs):
475         if ports is None:
476             ports = self.all_ports
477         self.client.start(ports=ports, *args, **kwargs)
478
479     def collect_kpi(self):
480         if not self._queue.empty():
481             kpi = self._queue.get()
482             self._result.update(kpi)
483             LOG.debug("Got KPIs from _queue for {0} {1}".format(
484                 self.scenario_helper.name, self.RESOURCE_WORD))
485         return self._result
486
487     def _connect(self, client=None):
488         if client is None:
489             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
490                                server=self.vnfd_helper.mgmt_interface["ip"],
491                                verbose_level=LoggerApi.VERBOSE_QUIET)
492
493         # try to connect with 5s intervals, 30s max
494         for idx in range(6):
495             try:
496                 client.connect()
497                 break
498             except STLError:
499                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
500                 time.sleep(5)
501         return client
502
503
504 class Rfc2544ResourceHelper(object):
505
506     DEFAULT_CORRELATED_TRAFFIC = False
507     DEFAULT_LATENCY = False
508     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
509
510     def __init__(self, scenario_helper):
511         super(Rfc2544ResourceHelper, self).__init__()
512         self.scenario_helper = scenario_helper
513         self._correlated_traffic = None
514         self.iteration = Value('i', 0)
515         self._latency = None
516         self._rfc2544 = None
517         self._tolerance_low = None
518         self._tolerance_high = None
519
520     @property
521     def rfc2544(self):
522         if self._rfc2544 is None:
523             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
524         return self._rfc2544
525
526     @property
527     def tolerance_low(self):
528         if self._tolerance_low is None:
529             self.get_rfc_tolerance()
530         return self._tolerance_low
531
532     @property
533     def tolerance_high(self):
534         if self._tolerance_high is None:
535             self.get_rfc_tolerance()
536         return self._tolerance_high
537
538     @property
539     def correlated_traffic(self):
540         if self._correlated_traffic is None:
541             self._correlated_traffic = \
542                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
543
544         return self._correlated_traffic
545
546     @property
547     def latency(self):
548         if self._latency is None:
549             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
550         return self._latency
551
552     def get_rfc2544(self, name, default=None):
553         return self.rfc2544.get(name, default)
554
555     def get_rfc_tolerance(self):
556         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
557         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
558         self._tolerance_low = next(tolerance_iter)
559         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
560
561
562 class SampleVNFDeployHelper(object):
563
564     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
565     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
566     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
567
568     def __init__(self, vnfd_helper, ssh_helper):
569         super(SampleVNFDeployHelper, self).__init__()
570         self.ssh_helper = ssh_helper
571         self.vnfd_helper = vnfd_helper
572
573     def deploy_vnfs(self, app_name):
574         vnf_bin = self.ssh_helper.join_bin_path(app_name)
575         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
576         if not exit_status:
577             return
578
579         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
580         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
581         time.sleep(2)
582         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
583         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
584
585         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
586         time.sleep(2)
587         http_proxy = os.environ.get('http_proxy', '')
588         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
589         LOG.debug(cmd)
590         self.ssh_helper.execute(cmd)
591         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
592         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
593         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
594
595
596 class ScenarioHelper(object):
597
598     DEFAULT_VNF_CFG = {
599         'lb_config': 'SW',
600         'lb_count': 1,
601         'worker_config': '1C/1T',
602         'worker_threads': 1,
603     }
604
605     def __init__(self, name):
606         self.name = name
607         self.scenario_cfg = None
608
609     @property
610     def task_path(self):
611         return self.scenario_cfg['task_path']
612
613     @property
614     def nodes(self):
615         return self.scenario_cfg.get('nodes')
616
617     @property
618     def all_options(self):
619         return self.scenario_cfg.get('options', {})
620
621     @property
622     def options(self):
623         return self.all_options.get(self.name, {})
624
625     @property
626     def vnf_cfg(self):
627         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
628
629     @property
630     def topology(self):
631         return self.scenario_cfg['topology']
632
633     @property
634     def timeout(self):
635         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
636
637
638 class SampleVNF(GenericVNF):
639     """ Class providing file-like API for generic VNF implementation """
640
641     VNF_PROMPT = "pipeline>"
642     WAIT_TIME = 1
643     WAIT_TIME_FOR_SCRIPT = 10
644     APP_NAME = "SampleVNF"
645     # we run the VNF interactively, so the ssh command will timeout after this long
646
647     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
648         super(SampleVNF, self).__init__(name, vnfd)
649         self.bin_path = get_nsb_option('bin_path', '')
650
651         self.scenario_helper = ScenarioHelper(self.name)
652         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
653
654         if setup_env_helper_type is None:
655             setup_env_helper_type = SetupEnvHelper
656
657         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
658                                                   self.ssh_helper,
659                                                   self.scenario_helper)
660
661         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
662
663         if resource_helper_type is None:
664             resource_helper_type = ResourceHelper
665
666         self.resource_helper = resource_helper_type(self.setup_helper)
667
668         self.context_cfg = None
669         self.nfvi_context = None
670         self.pipeline_kwargs = {}
671         self.uplink_ports = None
672         self.downlink_ports = None
673         # TODO(esm): make QueueFileWrapper invert-able so that we
674         #            never have to manage the queues
675         self.q_in = Queue()
676         self.q_out = Queue()
677         self.queue_wrapper = None
678         self.run_kwargs = {}
679         self.used_drivers = {}
680         self.vnf_port_pairs = None
681         self._vnf_process = None
682
683     def _build_ports(self):
684         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
685         self.networks = self._port_pairs.networks
686         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
687         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
688         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
689
690     def _get_route_data(self, route_index, route_type):
691         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
692         for _ in range(route_index):
693             next(route_iter, '')
694         return next(route_iter, {}).get(route_type, '')
695
696     def _get_port0localip6(self):
697         return_value = self._get_route_data(0, 'network')
698         LOG.info("_get_port0localip6 : %s", return_value)
699         return return_value
700
701     def _get_port1localip6(self):
702         return_value = self._get_route_data(1, 'network')
703         LOG.info("_get_port1localip6 : %s", return_value)
704         return return_value
705
706     def _get_port0prefixlen6(self):
707         return_value = self._get_route_data(0, 'netmask')
708         LOG.info("_get_port0prefixlen6 : %s", return_value)
709         return return_value
710
711     def _get_port1prefixlen6(self):
712         return_value = self._get_route_data(1, 'netmask')
713         LOG.info("_get_port1prefixlen6 : %s", return_value)
714         return return_value
715
716     def _get_port0gateway6(self):
717         return_value = self._get_route_data(0, 'network')
718         LOG.info("_get_port0gateway6 : %s", return_value)
719         return return_value
720
721     def _get_port1gateway6(self):
722         return_value = self._get_route_data(1, 'network')
723         LOG.info("_get_port1gateway6 : %s", return_value)
724         return return_value
725
726     def _start_vnf(self):
727         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
728         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
729         self._vnf_process = Process(name=name, target=self._run)
730         self._vnf_process.start()
731
732     def _vnf_up_post(self):
733         pass
734
735     def instantiate(self, scenario_cfg, context_cfg):
736         self.scenario_helper.scenario_cfg = scenario_cfg
737         self.context_cfg = context_cfg
738         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
739         # self.nfvi_context = None
740
741         # vnf deploy is unsupported, use ansible playbooks
742         if self.scenario_helper.options.get("vnf_deploy", False):
743             self.deploy_helper.deploy_vnfs(self.APP_NAME)
744         self.resource_helper.setup()
745         self._start_vnf()
746
747     def wait_for_instantiate(self):
748         buf = []
749         time.sleep(self.WAIT_TIME)  # Give some time for config to load
750         while True:
751             if not self._vnf_process.is_alive():
752                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
753
754             # TODO(esm): move to QueueFileWrapper
755             while self.q_out.qsize() > 0:
756                 buf.append(self.q_out.get())
757                 message = ''.join(buf)
758                 if self.VNF_PROMPT in message:
759                     LOG.info("%s VNF is up and running.", self.APP_NAME)
760                     self._vnf_up_post()
761                     self.queue_wrapper.clear()
762                     self.resource_helper.start_collect()
763                     return self._vnf_process.exitcode
764
765                 if "PANIC" in message:
766                     raise RuntimeError("Error starting %s VNF." %
767                                        self.APP_NAME)
768
769             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
770             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
771             # Send ENTER to display a new prompt in case the prompt text was corrupted
772             # by other VNF output
773             self.q_in.put('\r\n')
774
775     def _build_run_kwargs(self):
776         self.run_kwargs = {
777             'stdin': self.queue_wrapper,
778             'stdout': self.queue_wrapper,
779             'keep_stdin_open': True,
780             'pty': True,
781             'timeout': self.scenario_helper.timeout,
782         }
783
784     def _build_config(self):
785         return self.setup_helper.build_config()
786
787     def _run(self):
788         # we can't share ssh paramiko objects to force new connection
789         self.ssh_helper.drop_connection()
790         cmd = self._build_config()
791         # kill before starting
792         self.setup_helper.kill_vnf()
793
794         LOG.debug(cmd)
795         self._build_run_kwargs()
796         self.ssh_helper.run(cmd, **self.run_kwargs)
797
798     def vnf_execute(self, cmd, wait_time=2):
799         """ send cmd to vnf process """
800
801         LOG.info("%s command: %s", self.APP_NAME, cmd)
802         self.q_in.put("{}\r\n".format(cmd))
803         time.sleep(wait_time)
804         output = []
805         while self.q_out.qsize() > 0:
806             output.append(self.q_out.get())
807         return "".join(output)
808
809     def _tear_down(self):
810         pass
811
812     def terminate(self):
813         self.vnf_execute("quit")
814         self.setup_helper.kill_vnf()
815         self._tear_down()
816         self.resource_helper.stop_collect()
817         if self._vnf_process is not None:
818             # be proper and join first before we kill
819             LOG.debug("joining before terminate %s", self._vnf_process.name)
820             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
821             self._vnf_process.terminate()
822         # no terminate children here because we share processes with tg
823
824     def get_stats(self, *args, **kwargs):
825         """
826         Method for checking the statistics
827
828         :return:
829            VNF statistics
830         """
831         cmd = 'p {0} stats'.format(self.APP_WORD)
832         out = self.vnf_execute(cmd)
833         return out
834
835     def collect_kpi(self):
836         # we can't get KPIs if the VNF is down
837         check_if_process_failed(self._vnf_process)
838         stats = self.get_stats()
839         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
840         if m:
841             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
842             result["collect_stats"] = self.resource_helper.collect_kpi()
843         else:
844             result = {
845                 "packets_in": 0,
846                 "packets_fwd": 0,
847                 "packets_dropped": 0,
848             }
849         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
850         return result
851
852
853 class SampleVNFTrafficGen(GenericTrafficGen):
854     """ Class providing file-like API for generic traffic generator """
855
856     APP_NAME = 'Sample'
857     RUN_WAIT = 1
858
859     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
860         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
861         self.bin_path = get_nsb_option('bin_path', '')
862
863         self.scenario_helper = ScenarioHelper(self.name)
864         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
865
866         if setup_env_helper_type is None:
867             setup_env_helper_type = SetupEnvHelper
868
869         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
870                                                   self.ssh_helper,
871                                                   self.scenario_helper)
872
873         if resource_helper_type is None:
874             resource_helper_type = ClientResourceHelper
875
876         self.resource_helper = resource_helper_type(self.setup_helper)
877
878         self.runs_traffic = True
879         self.traffic_finished = False
880         self._tg_process = None
881         self._traffic_process = None
882
883     def _start_server(self):
884         # we can't share ssh paramiko objects to force new connection
885         self.ssh_helper.drop_connection()
886
887     def instantiate(self, scenario_cfg, context_cfg):
888         self.scenario_helper.scenario_cfg = scenario_cfg
889         self.resource_helper.setup()
890         # must generate_cfg after DPDK bind because we need port number
891         self.resource_helper.generate_cfg()
892
893         LOG.info("Starting %s server...", self.APP_NAME)
894         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
895         self._tg_process = Process(name=name, target=self._start_server)
896         self._tg_process.start()
897
898     def _check_status(self):
899         raise NotImplementedError
900
901     def _wait_for_process(self):
902         while True:
903             if not self._tg_process.is_alive():
904                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
905             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
906             time.sleep(1)
907             status = self._check_status()
908             if status == 0:
909                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
910                 return self._tg_process.exitcode
911
912     def _traffic_runner(self, traffic_profile):
913         # always drop connections first thing in new processes
914         # so we don't get paramiko errors
915         self.ssh_helper.drop_connection()
916         LOG.info("Starting %s client...", self.APP_NAME)
917         self.resource_helper.run_traffic(traffic_profile)
918
919     def run_traffic(self, traffic_profile):
920         """ Generate traffic on the wire according to the given params.
921         Method is non-blocking, returns immediately when traffic process
922         is running. Mandatory.
923
924         :param traffic_profile:
925         :return: True/False
926         """
927         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
928                                     os.getpid())
929         self._traffic_process = Process(name=name, target=self._traffic_runner,
930                                         args=(traffic_profile,))
931         self._traffic_process.start()
932         # Wait for traffic process to start
933         while self.resource_helper.client_started.value == 0:
934             time.sleep(self.RUN_WAIT)
935             # what if traffic process takes a few seconds to start?
936             if not self._traffic_process.is_alive():
937                 break
938
939         return self._traffic_process.is_alive()
940
941     def collect_kpi(self):
942         # check if the tg processes have exited
943         for proc in (self._tg_process, self._traffic_process):
944             check_if_process_failed(proc)
945         result = self.resource_helper.collect_kpi()
946         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
947         return result
948
949     def terminate(self):
950         """ After this method finishes, all traffic processes should stop. Mandatory.
951
952         :return: True/False
953         """
954         self.traffic_finished = True
955         # we must kill client before we kill the server, or the client will raise exception
956         if self._traffic_process is not None:
957             # be proper and try to join before terminating
958             LOG.debug("joining before terminate %s", self._traffic_process.name)
959             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
960             self._traffic_process.terminate()
961         if self._tg_process is not None:
962             # be proper and try to join before terminating
963             LOG.debug("joining before terminate %s", self._tg_process.name)
964             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
965             self._tg_process.terminate()
966         # no terminate children here because we share processes with vnf