Merge changes from topics 'feat/keep_vnf', 'YARDSTICK-886'
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19 import os
20 import posixpath
21 import re
22 import subprocess
23 import time
24
25 import six
26 from six.moves import cStringIO
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
36 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.ssh import AutoConnectSSH
44
45
46 DPDK_VERSION = "dpdk-16.07"
47
48 LOG = logging.getLogger(__name__)
49
50
51 REMOTE_TMP = "/tmp"
52 DEFAULT_VNF_TIMEOUT = 3600
53 PROCESS_JOIN_TIMEOUT = 3
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
98     PIPELINE_COMMAND = ''
99     VNF_TYPE = "SAMPLE"
100
101     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
102         super(SetupEnvHelper, self).__init__()
103         self.vnfd_helper = vnfd_helper
104         self.ssh_helper = ssh_helper
105         self.scenario_helper = scenario_helper
106
107     def build_config(self):
108         raise NotImplementedError
109
110     def setup_vnf_environment(self):
111         pass
112
113     def kill_vnf(self):
114         pass
115
116     def tear_down(self):
117         raise NotImplementedError
118
119
120 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
121
122     APP_NAME = 'DpdkVnf'
123     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
124     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
125     HUGEPAGES_KB = 1024 * 1024 * 16
126
127     @staticmethod
128     def _update_packet_type(ip_pipeline_cfg, traffic_options):
129         match_str = 'pkt_type = ipv4'
130         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
131         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
132         return pipeline_config_str
133
134     @classmethod
135     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
136         traffic_type = traffic_options['traffic_type']
137
138         if traffic_options['vnf_type'] is not cls.APP_NAME:
139             match_str = 'traffic_type = 4'
140             replace_str = 'traffic_type = {0}'.format(traffic_type)
141
142         elif traffic_type == 4:
143             match_str = 'pkt_type = ipv4'
144             replace_str = 'pkt_type = ipv4'
145
146         else:
147             match_str = 'pkt_type = ipv4'
148             replace_str = 'pkt_type = ipv6'
149
150         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
151         return pipeline_config_str
152
153     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
154         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
155         self.all_ports = None
156         self.bound_pci = None
157         self.socket = None
158         self.used_drivers = None
159         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
160
161     def _setup_hugepages(self):
162         meminfo = utils.read_meminfo(self.ssh_helper)
163         hp_size_kb = int(meminfo['Hugepagesize'])
164         nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
165         self.ssh_helper.execute('echo %s | sudo tee %s' %
166                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
167         hp = six.BytesIO()
168         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
169         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
170         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
171                  hp_size_kb, nr_hugepages, nr_hugepages_set)
172
173     def build_config(self):
174         vnf_cfg = self.scenario_helper.vnf_cfg
175         task_path = self.scenario_helper.task_path
176
177         config_file = vnf_cfg.get('file')
178         lb_count = vnf_cfg.get('lb_count', 3)
179         lb_config = vnf_cfg.get('lb_config', 'SW')
180         worker_config = vnf_cfg.get('worker_config', '1C/1T')
181         worker_threads = vnf_cfg.get('worker_threads', 3)
182
183         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
184         traffic_options = {
185             'traffic_type': traffic_type,
186             'pkt_type': 'ipv%s' % traffic_type,
187             'vnf_type': self.VNF_TYPE,
188         }
189
190         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
191                                                   task_path)
192         config_basename = posixpath.basename(self.CFG_CONFIG)
193         script_basename = posixpath.basename(self.CFG_SCRIPT)
194         multiport = MultiPortConfig(self.scenario_helper.topology,
195                                     config_tpl_cfg,
196                                     config_basename,
197                                     self.vnfd_helper,
198                                     self.VNF_TYPE,
199                                     lb_count,
200                                     worker_threads,
201                                     worker_config,
202                                     lb_config,
203                                     self.socket)
204
205         multiport.generate_config()
206         if config_file:
207             with utils.open_relative_file(config_file, task_path) as infile:
208                 new_config = ['[EAL]']
209                 vpci = []
210                 for port in self.vnfd_helper.port_pairs.all_ports:
211                     interface = self.vnfd_helper.find_interface(name=port)
212                     vpci.append(interface['virtual-interface']["vpci"])
213                 new_config.extend('w = {0}'.format(item) for item in vpci)
214                 new_config = '\n'.join(new_config) + '\n' + infile.read()
215         else:
216             with open(self.CFG_CONFIG) as handle:
217                 new_config = handle.read()
218             new_config = self._update_traffic_type(new_config, traffic_options)
219             new_config = self._update_packet_type(new_config, traffic_options)
220         self.ssh_helper.upload_config_file(config_basename, new_config)
221         self.ssh_helper.upload_config_file(script_basename,
222                                            multiport.generate_script(self.vnfd_helper))
223
224         LOG.info("Provision and start the %s", self.APP_NAME)
225         self._build_pipeline_kwargs()
226         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
227
228     def _build_pipeline_kwargs(self):
229         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
230         # count the number of actual ports in the list of pairs
231         # remove duplicate ports
232         # this is really a mapping from LINK ID to DPDK PMD ID
233         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
234         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
235         ports = self.vnfd_helper.port_pairs.all_ports
236         port_nums = self.vnfd_helper.port_nums(ports)
237         # create mask from all the dpdk port numbers
238         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
239         self.pipeline_kwargs = {
240             'cfg_file': self.CFG_CONFIG,
241             'script': self.CFG_SCRIPT,
242             'port_mask_hex': ports_mask_hex,
243             'tool_path': tool_path,
244         }
245
246     def setup_vnf_environment(self):
247         self._setup_dpdk()
248         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
249         self.kill_vnf()
250         # bind before _setup_resources so we can use dpdk_port_num
251         self._detect_and_bind_drivers()
252         resource = self._setup_resources()
253         return resource
254
255     def kill_vnf(self):
256         # pkill is not matching, debug with pgrep
257         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
258         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
259         # have to use exact match
260         # try using killall to match
261         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
262
263     def _setup_dpdk(self):
264         """Setup DPDK environment needed for VNF to run"""
265         self._setup_hugepages()
266         self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
267         exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
268         if exit_status:
269             raise y_exceptions.DPDKSetupDriverError()
270
271     def get_collectd_options(self):
272         options = self.scenario_helper.all_options.get("collectd", {})
273         # override with specific node settings
274         options.update(self.scenario_helper.options.get("collectd", {}))
275         return options
276
277     def _setup_resources(self):
278         # what is this magic?  how do we know which socket is for which port?
279         # what about quad-socket?
280         if any(v[5] == "0" for v in self.bound_pci):
281             self.socket = 0
282         else:
283             self.socket = 1
284
285         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
286         # this won't work because we don't have DPDK port numbers yet
287         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
288         port_names = (intf["name"] for intf in ports)
289         collectd_options = self.get_collectd_options()
290         plugins = collectd_options.get("plugins", {})
291         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
292         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
293                                plugins=plugins, interval=collectd_options.get("interval"),
294                                timeout=self.scenario_helper.timeout)
295
296     def _detect_and_bind_drivers(self):
297         interfaces = self.vnfd_helper.interfaces
298
299         self.dpdk_bind_helper.read_status()
300         self.dpdk_bind_helper.save_used_drivers()
301
302         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
303
304         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
305         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
306             try:
307                 intf = next(v for v in interfaces
308                             if vpci == v['virtual-interface']['vpci'])
309                 # force to int
310                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
311             except:  # pylint: disable=bare-except
312                 pass
313         time.sleep(2)
314
315     def get_local_iface_name_by_vpci(self, vpci):
316         find_net_cmd = self.FIND_NET_CMD.format(vpci)
317         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
318         if exit_status == 0:
319             return stdout
320         return None
321
322     def tear_down(self):
323         self.dpdk_bind_helper.rebind_drivers()
324
325
326 class ResourceHelper(object):
327
328     COLLECT_KPI = ''
329     MAKE_INSTALL = 'cd {0} && make && sudo make install'
330     RESOURCE_WORD = 'sample'
331
332     COLLECT_MAP = {}
333
334     def __init__(self, setup_helper):
335         super(ResourceHelper, self).__init__()
336         self.resource = None
337         self.setup_helper = setup_helper
338         self.ssh_helper = setup_helper.ssh_helper
339
340     def setup(self):
341         self.resource = self.setup_helper.setup_vnf_environment()
342
343     def generate_cfg(self):
344         pass
345
346     def _collect_resource_kpi(self):
347         result = {}
348         status = self.resource.check_if_system_agent_running("collectd")[0]
349         if status == 0:
350             result = self.resource.amqp_collect_nfvi_kpi()
351
352         result = {"core": result}
353         return result
354
355     def start_collect(self):
356         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
357         self.resource.start()
358         self.resource.amqp_process_for_nfvi_kpi()
359
360     def stop_collect(self):
361         if self.resource:
362             self.resource.stop()
363
364     def collect_kpi(self):
365         return self._collect_resource_kpi()
366
367
368 class ClientResourceHelper(ResourceHelper):
369
370     RUN_DURATION = 60
371     QUEUE_WAIT_TIME = 5
372     SYNC_PORT = 1
373     ASYNC_PORT = 2
374
375     def __init__(self, setup_helper):
376         super(ClientResourceHelper, self).__init__(setup_helper)
377         self.vnfd_helper = setup_helper.vnfd_helper
378         self.scenario_helper = setup_helper.scenario_helper
379
380         self.client = None
381         self.client_started = Value('i', 0)
382         self.all_ports = None
383         self._queue = Queue()
384         self._result = {}
385         self._terminated = Value('i', 0)
386
387     def _build_ports(self):
388         self.networks = self.vnfd_helper.port_pairs.networks
389         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
390         self.downlink_ports = \
391             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
392         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
393
394     def port_num(self, intf):
395         # by default return port num
396         return self.vnfd_helper.port_num(intf)
397
398     def get_stats(self, *args, **kwargs):
399         try:
400             return self.client.get_stats(*args, **kwargs)
401         except STLError:
402             LOG.exception("TRex client not connected")
403             return {}
404
405     def generate_samples(self, ports, key=None, default=None):
406         # needs to be used ports
407         last_result = self.get_stats(ports)
408         key_value = last_result.get(key, default)
409
410         if not isinstance(last_result, Mapping):  # added for mock unit test
411             self._terminated.value = 1
412             return {}
413
414         samples = {}
415         # recalculate port for interface and see if it matches ports provided
416         for intf in self.vnfd_helper.interfaces:
417             name = intf["name"]
418             port = self.vnfd_helper.port_num(name)
419             if port in ports:
420                 xe_value = last_result.get(port, {})
421                 samples[name] = {
422                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
423                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
424                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
425                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
426                     "in_packets": int(xe_value.get("ipackets", 0)),
427                     "out_packets": int(xe_value.get("opackets", 0)),
428                 }
429                 if key:
430                     samples[name][key] = key_value
431         return samples
432
433     def _run_traffic_once(self, traffic_profile):
434         traffic_profile.execute_traffic(self)
435         self.client_started.value = 1
436         time.sleep(self.RUN_DURATION)
437         samples = self.generate_samples(traffic_profile.ports)
438         time.sleep(self.QUEUE_WAIT_TIME)
439         self._queue.put(samples)
440
441     def run_traffic(self, traffic_profile):
442         # if we don't do this we can hang waiting for the queue to drain
443         # have to do this in the subprocess
444         self._queue.cancel_join_thread()
445         # fixme: fix passing correct trex config file,
446         # instead of searching the default path
447         try:
448             self._build_ports()
449             self.client = self._connect()
450             self.client.reset(ports=self.all_ports)
451             self.client.remove_all_streams(self.all_ports)  # remove all streams
452             traffic_profile.register_generator(self)
453
454             while self._terminated.value == 0:
455                 self._run_traffic_once(traffic_profile)
456
457             self.client.stop(self.all_ports)
458             self.client.disconnect()
459             self._terminated.value = 0
460         except STLError:
461             if self._terminated.value:
462                 LOG.debug("traffic generator is stopped")
463                 return  # return if trex/tg server is stopped.
464             raise
465
466     def terminate(self):
467         self._terminated.value = 1  # stop client
468
469     def clear_stats(self, ports=None):
470         if ports is None:
471             ports = self.all_ports
472         self.client.clear_stats(ports=ports)
473
474     def start(self, ports=None, *args, **kwargs):
475         # pylint: disable=keyword-arg-before-vararg
476         # NOTE(ralonsoh): defining keyworded arguments before variable
477         # positional arguments is a bug. This function definition doesn't work
478         # in Python 2, although it works in Python 3. Reference:
479         # https://www.python.org/dev/peps/pep-3102/
480         if ports is None:
481             ports = self.all_ports
482         self.client.start(ports=ports, *args, **kwargs)
483
484     def collect_kpi(self):
485         if not self._queue.empty():
486             kpi = self._queue.get()
487             self._result.update(kpi)
488             LOG.debug('Got KPIs from _queue for %s %s',
489                       self.scenario_helper.name, self.RESOURCE_WORD)
490         return self._result
491
492     def _connect(self, client=None):
493         if client is None:
494             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
495                                server=self.vnfd_helper.mgmt_interface["ip"],
496                                verbose_level=LoggerApi.VERBOSE_QUIET)
497
498         # try to connect with 5s intervals, 30s max
499         for idx in range(6):
500             try:
501                 client.connect()
502                 break
503             except STLError:
504                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
505                 time.sleep(5)
506         return client
507
508
509 class Rfc2544ResourceHelper(object):
510
511     DEFAULT_CORRELATED_TRAFFIC = False
512     DEFAULT_LATENCY = False
513     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
514
515     def __init__(self, scenario_helper):
516         super(Rfc2544ResourceHelper, self).__init__()
517         self.scenario_helper = scenario_helper
518         self._correlated_traffic = None
519         self.iteration = Value('i', 0)
520         self._latency = None
521         self._rfc2544 = None
522         self._tolerance_low = None
523         self._tolerance_high = None
524
525     @property
526     def rfc2544(self):
527         if self._rfc2544 is None:
528             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
529         return self._rfc2544
530
531     @property
532     def tolerance_low(self):
533         if self._tolerance_low is None:
534             self.get_rfc_tolerance()
535         return self._tolerance_low
536
537     @property
538     def tolerance_high(self):
539         if self._tolerance_high is None:
540             self.get_rfc_tolerance()
541         return self._tolerance_high
542
543     @property
544     def correlated_traffic(self):
545         if self._correlated_traffic is None:
546             self._correlated_traffic = \
547                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
548
549         return self._correlated_traffic
550
551     @property
552     def latency(self):
553         if self._latency is None:
554             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
555         return self._latency
556
557     def get_rfc2544(self, name, default=None):
558         return self.rfc2544.get(name, default)
559
560     def get_rfc_tolerance(self):
561         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
562         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
563         self._tolerance_low = next(tolerance_iter)
564         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
565
566
567 class SampleVNFDeployHelper(object):
568
569     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
570     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
571     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
572
573     def __init__(self, vnfd_helper, ssh_helper):
574         super(SampleVNFDeployHelper, self).__init__()
575         self.ssh_helper = ssh_helper
576         self.vnfd_helper = vnfd_helper
577
578     def deploy_vnfs(self, app_name):
579         vnf_bin = self.ssh_helper.join_bin_path(app_name)
580         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
581         if not exit_status:
582             return
583
584         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
585         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
586         time.sleep(2)
587         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
588         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
589
590         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
591         time.sleep(2)
592         http_proxy = os.environ.get('http_proxy', '')
593         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
594         LOG.debug(cmd)
595         self.ssh_helper.execute(cmd)
596         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
597         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
598         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
599
600
601 class ScenarioHelper(object):
602
603     DEFAULT_VNF_CFG = {
604         'lb_config': 'SW',
605         'lb_count': 1,
606         'worker_config': '1C/1T',
607         'worker_threads': 1,
608     }
609
610     def __init__(self, name):
611         self.name = name
612         self.scenario_cfg = None
613
614     @property
615     def task_path(self):
616         return self.scenario_cfg['task_path']
617
618     @property
619     def nodes(self):
620         return self.scenario_cfg.get('nodes')
621
622     @property
623     def all_options(self):
624         return self.scenario_cfg.get('options', {})
625
626     @property
627     def options(self):
628         return self.all_options.get(self.name, {})
629
630     @property
631     def vnf_cfg(self):
632         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
633
634     @property
635     def topology(self):
636         return self.scenario_cfg['topology']
637
638     @property
639     def timeout(self):
640         return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
641
642
643 class SampleVNF(GenericVNF):
644     """ Class providing file-like API for generic VNF implementation """
645
646     VNF_PROMPT = "pipeline>"
647     WAIT_TIME = 1
648     WAIT_TIME_FOR_SCRIPT = 10
649     APP_NAME = "SampleVNF"
650     # we run the VNF interactively, so the ssh command will timeout after this long
651
652     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
653         super(SampleVNF, self).__init__(name, vnfd)
654         self.bin_path = get_nsb_option('bin_path', '')
655
656         self.scenario_helper = ScenarioHelper(self.name)
657         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
658
659         if setup_env_helper_type is None:
660             setup_env_helper_type = SetupEnvHelper
661
662         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
663                                                   self.ssh_helper,
664                                                   self.scenario_helper)
665
666         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
667
668         if resource_helper_type is None:
669             resource_helper_type = ResourceHelper
670
671         self.resource_helper = resource_helper_type(self.setup_helper)
672
673         self.context_cfg = None
674         self.nfvi_context = None
675         self.pipeline_kwargs = {}
676         self.uplink_ports = None
677         self.downlink_ports = None
678         # NOTE(esm): make QueueFileWrapper invert-able so that we
679         #            never have to manage the queues
680         self.q_in = Queue()
681         self.q_out = Queue()
682         self.queue_wrapper = None
683         self.run_kwargs = {}
684         self.used_drivers = {}
685         self.vnf_port_pairs = None
686         self._vnf_process = None
687
688     def _build_ports(self):
689         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
690         self.networks = self._port_pairs.networks
691         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
692         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
693         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
694
695     def _get_route_data(self, route_index, route_type):
696         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
697         for _ in range(route_index):
698             next(route_iter, '')
699         return next(route_iter, {}).get(route_type, '')
700
701     def _get_port0localip6(self):
702         return_value = self._get_route_data(0, 'network')
703         LOG.info("_get_port0localip6 : %s", return_value)
704         return return_value
705
706     def _get_port1localip6(self):
707         return_value = self._get_route_data(1, 'network')
708         LOG.info("_get_port1localip6 : %s", return_value)
709         return return_value
710
711     def _get_port0prefixlen6(self):
712         return_value = self._get_route_data(0, 'netmask')
713         LOG.info("_get_port0prefixlen6 : %s", return_value)
714         return return_value
715
716     def _get_port1prefixlen6(self):
717         return_value = self._get_route_data(1, 'netmask')
718         LOG.info("_get_port1prefixlen6 : %s", return_value)
719         return return_value
720
721     def _get_port0gateway6(self):
722         return_value = self._get_route_data(0, 'network')
723         LOG.info("_get_port0gateway6 : %s", return_value)
724         return return_value
725
726     def _get_port1gateway6(self):
727         return_value = self._get_route_data(1, 'network')
728         LOG.info("_get_port1gateway6 : %s", return_value)
729         return return_value
730
731     def _start_vnf(self):
732         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
733         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
734         self._vnf_process = Process(name=name, target=self._run)
735         self._vnf_process.start()
736
737     def _vnf_up_post(self):
738         pass
739
740     def instantiate(self, scenario_cfg, context_cfg):
741         self.scenario_helper.scenario_cfg = scenario_cfg
742         self.context_cfg = context_cfg
743         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
744         # self.nfvi_context = None
745
746         # vnf deploy is unsupported, use ansible playbooks
747         if self.scenario_helper.options.get("vnf_deploy", False):
748             self.deploy_helper.deploy_vnfs(self.APP_NAME)
749         self.resource_helper.setup()
750         self._start_vnf()
751
752     def wait_for_instantiate(self):
753         buf = []
754         time.sleep(self.WAIT_TIME)  # Give some time for config to load
755         while True:
756             if not self._vnf_process.is_alive():
757                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
758
759             # NOTE(esm): move to QueueFileWrapper
760             while self.q_out.qsize() > 0:
761                 buf.append(self.q_out.get())
762                 message = ''.join(buf)
763                 if self.VNF_PROMPT in message:
764                     LOG.info("%s VNF is up and running.", self.APP_NAME)
765                     self._vnf_up_post()
766                     self.queue_wrapper.clear()
767                     self.resource_helper.start_collect()
768                     return self._vnf_process.exitcode
769
770                 if "PANIC" in message:
771                     raise RuntimeError("Error starting %s VNF." %
772                                        self.APP_NAME)
773
774             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
775             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
776             # Send ENTER to display a new prompt in case the prompt text was corrupted
777             # by other VNF output
778             self.q_in.put('\r\n')
779
780     def _build_run_kwargs(self):
781         self.run_kwargs = {
782             'stdin': self.queue_wrapper,
783             'stdout': self.queue_wrapper,
784             'keep_stdin_open': True,
785             'pty': True,
786             'timeout': self.scenario_helper.timeout,
787         }
788
789     def _build_config(self):
790         return self.setup_helper.build_config()
791
792     def _run(self):
793         # we can't share ssh paramiko objects to force new connection
794         self.ssh_helper.drop_connection()
795         cmd = self._build_config()
796         # kill before starting
797         self.setup_helper.kill_vnf()
798
799         LOG.debug(cmd)
800         self._build_run_kwargs()
801         self.ssh_helper.run(cmd, **self.run_kwargs)
802
803     def vnf_execute(self, cmd, wait_time=2):
804         """ send cmd to vnf process """
805
806         LOG.info("%s command: %s", self.APP_NAME, cmd)
807         self.q_in.put("{}\r\n".format(cmd))
808         time.sleep(wait_time)
809         output = []
810         while self.q_out.qsize() > 0:
811             output.append(self.q_out.get())
812         return "".join(output)
813
814     def _tear_down(self):
815         pass
816
817     def terminate(self):
818         self.vnf_execute("quit")
819         self.setup_helper.kill_vnf()
820         self._tear_down()
821         self.resource_helper.stop_collect()
822         if self._vnf_process is not None:
823             # be proper and join first before we kill
824             LOG.debug("joining before terminate %s", self._vnf_process.name)
825             self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
826             self._vnf_process.terminate()
827         # no terminate children here because we share processes with tg
828
829     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
830         """Method for checking the statistics
831
832         This method could be overridden in children classes.
833
834         :return: VNF statistics
835         """
836         cmd = 'p {0} stats'.format(self.APP_WORD)
837         out = self.vnf_execute(cmd)
838         return out
839
840     def collect_kpi(self):
841         # we can't get KPIs if the VNF is down
842         check_if_process_failed(self._vnf_process)
843         stats = self.get_stats()
844         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
845         if m:
846             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
847             result["collect_stats"] = self.resource_helper.collect_kpi()
848         else:
849             result = {
850                 "packets_in": 0,
851                 "packets_fwd": 0,
852                 "packets_dropped": 0,
853             }
854         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
855         return result
856
857     def scale(self, flavor=""):
858         """The SampleVNF base class doesn't provide the 'scale' feature"""
859         raise y_exceptions.FunctionNotImplemented(
860             function_name='scale', class_name='SampleVNFTrafficGen')
861
862
863 class SampleVNFTrafficGen(GenericTrafficGen):
864     """ Class providing file-like API for generic traffic generator """
865
866     APP_NAME = 'Sample'
867     RUN_WAIT = 1
868
869     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
870         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
871         self.bin_path = get_nsb_option('bin_path', '')
872
873         self.scenario_helper = ScenarioHelper(self.name)
874         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
875
876         if setup_env_helper_type is None:
877             setup_env_helper_type = SetupEnvHelper
878
879         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
880                                                   self.ssh_helper,
881                                                   self.scenario_helper)
882
883         if resource_helper_type is None:
884             resource_helper_type = ClientResourceHelper
885
886         self.resource_helper = resource_helper_type(self.setup_helper)
887
888         self.runs_traffic = True
889         self.traffic_finished = False
890         self._tg_process = None
891         self._traffic_process = None
892
893     def _start_server(self):
894         # we can't share ssh paramiko objects to force new connection
895         self.ssh_helper.drop_connection()
896
897     def instantiate(self, scenario_cfg, context_cfg):
898         self.scenario_helper.scenario_cfg = scenario_cfg
899         self.resource_helper.setup()
900         # must generate_cfg after DPDK bind because we need port number
901         self.resource_helper.generate_cfg()
902
903         LOG.info("Starting %s server...", self.APP_NAME)
904         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
905         self._tg_process = Process(name=name, target=self._start_server)
906         self._tg_process.start()
907
908     def _check_status(self):
909         raise NotImplementedError
910
911     def _wait_for_process(self):
912         while True:
913             if not self._tg_process.is_alive():
914                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
915             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
916             time.sleep(1)
917             status = self._check_status()
918             if status == 0:
919                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
920                 return self._tg_process.exitcode
921
922     def _traffic_runner(self, traffic_profile):
923         # always drop connections first thing in new processes
924         # so we don't get paramiko errors
925         self.ssh_helper.drop_connection()
926         LOG.info("Starting %s client...", self.APP_NAME)
927         self.resource_helper.run_traffic(traffic_profile)
928
929     def run_traffic(self, traffic_profile):
930         """ Generate traffic on the wire according to the given params.
931         Method is non-blocking, returns immediately when traffic process
932         is running. Mandatory.
933
934         :param traffic_profile:
935         :return: True/False
936         """
937         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
938                                     os.getpid())
939         self._traffic_process = Process(name=name, target=self._traffic_runner,
940                                         args=(traffic_profile,))
941         self._traffic_process.start()
942         # Wait for traffic process to start
943         while self.resource_helper.client_started.value == 0:
944             time.sleep(self.RUN_WAIT)
945             # what if traffic process takes a few seconds to start?
946             if not self._traffic_process.is_alive():
947                 break
948
949         return self._traffic_process.is_alive()
950
951     def collect_kpi(self):
952         # check if the tg processes have exited
953         for proc in (self._tg_process, self._traffic_process):
954             check_if_process_failed(proc)
955         result = self.resource_helper.collect_kpi()
956         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
957         return result
958
959     def terminate(self):
960         """ After this method finishes, all traffic processes should stop. Mandatory.
961
962         :return: True/False
963         """
964         self.traffic_finished = True
965         # we must kill client before we kill the server, or the client will raise exception
966         if self._traffic_process is not None:
967             # be proper and try to join before terminating
968             LOG.debug("joining before terminate %s", self._traffic_process.name)
969             self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
970             self._traffic_process.terminate()
971         if self._tg_process is not None:
972             # be proper and try to join before terminating
973             LOG.debug("joining before terminate %s", self._tg_process.name)
974             self._tg_process.join(PROCESS_JOIN_TIMEOUT)
975             self._tg_process.terminate()
976         # no terminate children here because we share processes with vnf
977
978     def scale(self, flavor=""):
979         """A traffic generator VFN doesn't provide the 'scale' feature"""
980         raise y_exceptions.FunctionNotImplemented(
981             function_name='scale', class_name='SampleVNFTrafficGen')