Merge "Get node IPs and IDs according to env"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from __future__ import absolute_import
17
18 import posixpath
19 import time
20 import logging
21 import os
22 import re
23 import subprocess
24 from collections import Mapping
25
26 from multiprocessing import Queue, Value, Process
27
28 from six.moves import cStringIO
29
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
41
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
45
46 from yardstick.ssh import AutoConnectSSH
47
48 DPDK_VERSION = "dpdk-16.07"
49
50 LOG = logging.getLogger(__name__)
51
52
53 REMOTE_TMP = "/tmp"
54
55
56 class VnfSshHelper(AutoConnectSSH):
57
58     def __init__(self, node, bin_path, wait=None):
59         self.node = node
60         kwargs = self.args_from_node(self.node)
61         if wait:
62             kwargs.setdefault('wait', wait)
63
64         super(VnfSshHelper, self).__init__(**kwargs)
65         self.bin_path = bin_path
66
67     @staticmethod
68     def get_class():
69         # must return static class name, anything else refers to the calling class
70         # i.e. the subclass, not the superclass
71         return VnfSshHelper
72
73     def copy(self):
74         # this copy constructor is different from SSH classes, since it uses node
75         return self.get_class()(self.node, self.bin_path)
76
77     def upload_config_file(self, prefix, content):
78         cfg_file = os.path.join(REMOTE_TMP, prefix)
79         LOG.debug(content)
80         file_obj = cStringIO(content)
81         self.put_file_obj(file_obj, cfg_file)
82         return cfg_file
83
84     def join_bin_path(self, *args):
85         return os.path.join(self.bin_path, *args)
86
87     def provision_tool(self, tool_path=None, tool_file=None):
88         if tool_path is None:
89             tool_path = self.bin_path
90         return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91
92
93 class SetupEnvHelper(object):
94
95     CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96     CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97     CORES = []
98     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99     PIPELINE_COMMAND = ''
100     VNF_TYPE = "SAMPLE"
101
102     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103         super(SetupEnvHelper, self).__init__()
104         self.vnfd_helper = vnfd_helper
105         self.ssh_helper = ssh_helper
106         self.scenario_helper = scenario_helper
107
108     def _get_ports_gateway(self, name):
109         routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110         for route in routing_table:
111             if name == route['if']:
112                 return route['gateway']
113         return None
114
115     def build_config(self):
116         raise NotImplementedError
117
118     def setup_vnf_environment(self):
119         pass
120
121     def kill_vnf(self):
122         pass
123
124     def tear_down(self):
125         raise NotImplementedError
126
127
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129
130     APP_NAME = 'DpdkVnf'
131     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
132
133     HW_DEFAULT_CORE = 3
134     SW_DEFAULT_CORE = 2
135
136     @staticmethod
137     def _update_packet_type(ip_pipeline_cfg, traffic_options):
138         match_str = 'pkt_type = ipv4'
139         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141         return pipeline_config_str
142
143     @classmethod
144     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145         traffic_type = traffic_options['traffic_type']
146
147         if traffic_options['vnf_type'] is not cls.APP_NAME:
148             match_str = 'traffic_type = 4'
149             replace_str = 'traffic_type = {0}'.format(traffic_type)
150
151         elif traffic_type == 4:
152             match_str = 'pkt_type = ipv4'
153             replace_str = 'pkt_type = ipv4'
154
155         else:
156             match_str = 'pkt_type = ipv4'
157             replace_str = 'pkt_type = ipv6'
158
159         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160         return pipeline_config_str
161
162     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164         self.all_ports = None
165         self.bound_pci = None
166         self.socket = None
167         self.used_drivers = None
168         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
169
170     def _setup_hugepages(self):
171         cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172         hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
173
174         memory_path = \
175             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176         self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
177
178         if hugepages == "2048kB":
179             pages = 8192
180         else:
181             pages = 16
182
183         self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
184
185     def build_config(self):
186         vnf_cfg = self.scenario_helper.vnf_cfg
187         task_path = self.scenario_helper.task_path
188
189         lb_count = vnf_cfg.get('lb_count', 3)
190         lb_config = vnf_cfg.get('lb_config', 'SW')
191         worker_config = vnf_cfg.get('worker_config', '1C/1T')
192         worker_threads = vnf_cfg.get('worker_threads', 3)
193
194         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
195         traffic_options = {
196             'traffic_type': traffic_type,
197             'pkt_type': 'ipv%s' % traffic_type,
198             'vnf_type': self.VNF_TYPE,
199         }
200
201         config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202         config_basename = posixpath.basename(self.CFG_CONFIG)
203         script_basename = posixpath.basename(self.CFG_SCRIPT)
204         multiport = MultiPortConfig(self.scenario_helper.topology,
205                                     config_tpl_cfg,
206                                     config_basename,
207                                     self.vnfd_helper,
208                                     self.VNF_TYPE,
209                                     lb_count,
210                                     worker_threads,
211                                     worker_config,
212                                     lb_config,
213                                     self.socket)
214
215         multiport.generate_config()
216         with open(self.CFG_CONFIG) as handle:
217             new_config = handle.read()
218
219         new_config = self._update_traffic_type(new_config, traffic_options)
220         new_config = self._update_packet_type(new_config, traffic_options)
221
222         self.ssh_helper.upload_config_file(config_basename, new_config)
223         self.ssh_helper.upload_config_file(script_basename,
224                                            multiport.generate_script(self.vnfd_helper))
225
226         LOG.info("Provision and start the %s", self.APP_NAME)
227         self._build_pipeline_kwargs()
228         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
229
230     def _build_pipeline_kwargs(self):
231         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232         # count the number of actual ports in the list of pairs
233         # remove duplicate ports
234         # this is really a mapping from LINK ID to DPDK PMD ID
235         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237         ports = self.vnfd_helper.port_pairs.all_ports
238         port_nums = self.vnfd_helper.port_nums(ports)
239         # create mask from all the dpdk port numbers
240         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241         self.pipeline_kwargs = {
242             'cfg_file': self.CFG_CONFIG,
243             'script': self.CFG_SCRIPT,
244             'port_mask_hex': ports_mask_hex,
245             'tool_path': tool_path,
246         }
247
248     def _get_app_cpu(self):
249         if self.CORES:
250             return self.CORES
251
252         vnf_cfg = self.scenario_helper.vnf_cfg
253         sys_obj = CpuSysCores(self.ssh_helper)
254         self.sys_cpu = sys_obj.get_core_socket()
255         num_core = int(vnf_cfg["worker_threads"])
256         if vnf_cfg.get("lb_config", "SW") == 'HW':
257             num_core += self.HW_DEFAULT_CORE
258         else:
259             num_core += self.SW_DEFAULT_CORE
260         app_cpu = self.sys_cpu[str(self.socket)][:num_core]
261         return app_cpu
262
263     def _get_cpu_sibling_list(self, cores=None):
264         if cores is None:
265             cores = self._get_app_cpu()
266         sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267         awk_template = "awk -F: '{ print $1 }' < %s"
268         sys_path = "/sys/devices/system/cpu/"
269         cpu_topology = []
270         try:
271             for core in cores:
272                 sys_cmd = sys_cmd_template % (sys_path, core)
273                 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274                 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
275
276             return cpu_topology
277         except Exception:
278             return []
279
280     def _validate_cpu_cfg(self):
281         return self._get_cpu_sibling_list()
282
283     def setup_vnf_environment(self):
284         self._setup_dpdk()
285         self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
286         self.kill_vnf()
287         # bind before _setup_resources so we can use dpdk_port_num
288         self._detect_and_bind_drivers()
289         resource = self._setup_resources()
290         return resource
291
292     def kill_vnf(self):
293         # have to use exact match
294         self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
295
296     def _setup_dpdk(self):
297         """ setup dpdk environment needed for vnf to run """
298
299         self._setup_hugepages()
300         self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
301
302         exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
303         if exit_status == 0:
304             return
305
306         dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
307         dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
308         exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
309         if exit_status != 0:
310             self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
311
312     def get_collectd_options(self):
313         options = self.scenario_helper.all_options.get("collectd", {})
314         # override with specific node settings
315         options.update(self.scenario_helper.options.get("collectd", {}))
316         return options
317
318     def _setup_resources(self):
319         # what is this magic?  how do we know which socket is for which port?
320         # what about quad-socket?
321         if any(v[5] == "0" for v in self.bound_pci):
322             self.socket = 0
323         else:
324             self.socket = 1
325
326         cores = self._validate_cpu_cfg()
327         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
328         # this won't work because we don't have DPDK port numbers yet
329         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
330         port_names = (intf["name"] for intf in ports)
331         collectd_options = self.get_collectd_options()
332         plugins = collectd_options.get("plugins", {})
333         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
334                                plugins=plugins, interval=collectd_options.get("interval"))
335
336     def _detect_and_bind_drivers(self):
337         interfaces = self.vnfd_helper.interfaces
338
339         self.dpdk_bind_helper.read_status()
340         self.dpdk_bind_helper.save_used_drivers()
341
342         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
343
344         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
345         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
346             try:
347                 intf = next(v for v in interfaces
348                             if vpci == v['virtual-interface']['vpci'])
349                 # force to int
350                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
351             except:
352                 pass
353         time.sleep(2)
354
355     def get_local_iface_name_by_vpci(self, vpci):
356         find_net_cmd = self.FIND_NET_CMD.format(vpci)
357         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
358         if exit_status == 0:
359             return stdout
360         return None
361
362     def tear_down(self):
363         self.dpdk_bind_helper.rebind_drivers()
364
365
366 class ResourceHelper(object):
367
368     COLLECT_KPI = ''
369     MAKE_INSTALL = 'cd {0} && make && sudo make install'
370     RESOURCE_WORD = 'sample'
371
372     COLLECT_MAP = {}
373
374     def __init__(self, setup_helper):
375         super(ResourceHelper, self).__init__()
376         self.resource = None
377         self.setup_helper = setup_helper
378         self.ssh_helper = setup_helper.ssh_helper
379
380     def setup(self):
381         self.resource = self.setup_helper.setup_vnf_environment()
382
383     def generate_cfg(self):
384         pass
385
386     def _collect_resource_kpi(self):
387         result = {}
388         status = self.resource.check_if_sa_running("collectd")[0]
389         if status:
390             result = self.resource.amqp_collect_nfvi_kpi()
391
392         result = {"core": result}
393         return result
394
395     def start_collect(self):
396         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
397         self.resource.start()
398         self.resource.amqp_process_for_nfvi_kpi()
399
400     def stop_collect(self):
401         if self.resource:
402             self.resource.stop()
403
404     def collect_kpi(self):
405         return self._collect_resource_kpi()
406
407
408 class ClientResourceHelper(ResourceHelper):
409
410     RUN_DURATION = 60
411     QUEUE_WAIT_TIME = 5
412     SYNC_PORT = 1
413     ASYNC_PORT = 2
414
415     def __init__(self, setup_helper):
416         super(ClientResourceHelper, self).__init__(setup_helper)
417         self.vnfd_helper = setup_helper.vnfd_helper
418         self.scenario_helper = setup_helper.scenario_helper
419
420         self.client = None
421         self.client_started = Value('i', 0)
422         self.all_ports = None
423         self._queue = Queue()
424         self._result = {}
425         self._terminated = Value('i', 0)
426         self._vpci_ascending = None
427
428     def _build_ports(self):
429         self.networks = self.vnfd_helper.port_pairs.networks
430         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
431         self.downlink_ports = \
432             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
433         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
434
435     def get_stats(self, *args, **kwargs):
436         try:
437             return self.client.get_stats(*args, **kwargs)
438         except STLError:
439             LOG.exception("TRex client not connected")
440             return {}
441
442     def generate_samples(self, ports, key=None, default=None):
443         # needs to be used ports
444         last_result = self.get_stats(ports)
445         key_value = last_result.get(key, default)
446
447         if not isinstance(last_result, Mapping):  # added for mock unit test
448             self._terminated.value = 1
449             return {}
450
451         samples = {}
452         # recalculate port for interface and see if it matches ports provided
453         for intf in self.vnfd_helper.interfaces:
454             name = intf["name"]
455             port = self.vnfd_helper.port_num(name)
456             if port in ports:
457                 xe_value = last_result.get(port, {})
458                 samples[name] = {
459                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
460                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
461                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
462                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
463                     "in_packets": int(xe_value.get("ipackets", 0)),
464                     "out_packets": int(xe_value.get("opackets", 0)),
465                 }
466                 if key:
467                     samples[name][key] = key_value
468         return samples
469
470     def _run_traffic_once(self, traffic_profile):
471         traffic_profile.execute_traffic(self)
472         self.client_started.value = 1
473         time.sleep(self.RUN_DURATION)
474         samples = self.generate_samples(traffic_profile.ports)
475         time.sleep(self.QUEUE_WAIT_TIME)
476         self._queue.put(samples)
477
478     def run_traffic(self, traffic_profile):
479         # fixme: fix passing correct trex config file,
480         # instead of searching the default path
481         try:
482             self._build_ports()
483             self.client = self._connect()
484             self.client.reset(ports=self.all_ports)
485             self.client.remove_all_streams(self.all_ports)  # remove all streams
486             traffic_profile.register_generator(self)
487
488             while self._terminated.value == 0:
489                 self._run_traffic_once(traffic_profile)
490
491             self.client.stop(self.all_ports)
492             self.client.disconnect()
493             self._terminated.value = 0
494         except STLError:
495             if self._terminated.value:
496                 LOG.debug("traffic generator is stopped")
497                 return  # return if trex/tg server is stopped.
498             raise
499
500     def terminate(self):
501         self._terminated.value = 1  # stop client
502
503     def clear_stats(self, ports=None):
504         if ports is None:
505             ports = self.all_ports
506         self.client.clear_stats(ports=ports)
507
508     def start(self, ports=None, *args, **kwargs):
509         if ports is None:
510             ports = self.all_ports
511         self.client.start(ports=ports, *args, **kwargs)
512
513     def collect_kpi(self):
514         if not self._queue.empty():
515             kpi = self._queue.get()
516             self._result.update(kpi)
517             LOG.debug("Got KPIs from _queue for {0} {1}".format(
518                 self.scenario_helper.name, self.RESOURCE_WORD))
519         return self._result
520
521     def _connect(self, client=None):
522         if client is None:
523             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
524                                server=self.vnfd_helper.mgmt_interface["ip"],
525                                verbose_level=LoggerApi.VERBOSE_QUIET)
526
527         # try to connect with 5s intervals, 30s max
528         for idx in range(6):
529             try:
530                 client.connect()
531                 break
532             except STLError:
533                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
534                 time.sleep(5)
535         return client
536
537
538 class Rfc2544ResourceHelper(object):
539
540     DEFAULT_CORRELATED_TRAFFIC = False
541     DEFAULT_LATENCY = False
542     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
543
544     def __init__(self, scenario_helper):
545         super(Rfc2544ResourceHelper, self).__init__()
546         self.scenario_helper = scenario_helper
547         self._correlated_traffic = None
548         self.iteration = Value('i', 0)
549         self._latency = None
550         self._rfc2544 = None
551         self._tolerance_low = None
552         self._tolerance_high = None
553
554     @property
555     def rfc2544(self):
556         if self._rfc2544 is None:
557             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
558         return self._rfc2544
559
560     @property
561     def tolerance_low(self):
562         if self._tolerance_low is None:
563             self.get_rfc_tolerance()
564         return self._tolerance_low
565
566     @property
567     def tolerance_high(self):
568         if self._tolerance_high is None:
569             self.get_rfc_tolerance()
570         return self._tolerance_high
571
572     @property
573     def correlated_traffic(self):
574         if self._correlated_traffic is None:
575             self._correlated_traffic = \
576                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
577
578         return self._correlated_traffic
579
580     @property
581     def latency(self):
582         if self._latency is None:
583             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
584         return self._latency
585
586     def get_rfc2544(self, name, default=None):
587         return self.rfc2544.get(name, default)
588
589     def get_rfc_tolerance(self):
590         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
591         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
592         self._tolerance_low = next(tolerance_iter)
593         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
594
595
596 class SampleVNFDeployHelper(object):
597
598     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
599     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
600     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
601
602     def __init__(self, vnfd_helper, ssh_helper):
603         super(SampleVNFDeployHelper, self).__init__()
604         self.ssh_helper = ssh_helper
605         self.vnfd_helper = vnfd_helper
606
607     DISABLE_DEPLOY = True
608
609     def deploy_vnfs(self, app_name):
610         # temp disable for now
611         if self.DISABLE_DEPLOY:
612             return
613
614         vnf_bin = self.ssh_helper.join_bin_path(app_name)
615         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
616         if not exit_status:
617             return
618
619         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
620         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
621         time.sleep(2)
622         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
623         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
624
625         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
626         time.sleep(2)
627         http_proxy = os.environ.get('http_proxy', '')
628         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
629         LOG.debug(cmd)
630         self.ssh_helper.execute(cmd)
631         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
632         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
633         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
634
635
636 class ScenarioHelper(object):
637
638     DEFAULT_VNF_CFG = {
639         'lb_config': 'SW',
640         'lb_count': 1,
641         'worker_config': '1C/1T',
642         'worker_threads': 1,
643     }
644
645     def __init__(self, name):
646         self.name = name
647         self.scenario_cfg = None
648
649     @property
650     def task_path(self):
651         return self.scenario_cfg['task_path']
652
653     @property
654     def nodes(self):
655         return self.scenario_cfg.get('nodes')
656
657     @property
658     def all_options(self):
659         return self.scenario_cfg.get('options', {})
660
661     @property
662     def options(self):
663         return self.all_options.get(self.name, {})
664
665     @property
666     def vnf_cfg(self):
667         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
668
669     @property
670     def topology(self):
671         return self.scenario_cfg['topology']
672
673
674 class SampleVNF(GenericVNF):
675     """ Class providing file-like API for generic VNF implementation """
676
677     VNF_PROMPT = "pipeline>"
678     WAIT_TIME = 1
679
680     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
681         super(SampleVNF, self).__init__(name, vnfd)
682         self.bin_path = get_nsb_option('bin_path', '')
683
684         self.scenario_helper = ScenarioHelper(self.name)
685         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
686
687         if setup_env_helper_type is None:
688             setup_env_helper_type = SetupEnvHelper
689
690         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
691                                                   self.ssh_helper,
692                                                   self.scenario_helper)
693
694         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
695
696         if resource_helper_type is None:
697             resource_helper_type = ResourceHelper
698
699         self.resource_helper = resource_helper_type(self.setup_helper)
700
701         self.context_cfg = None
702         self.nfvi_context = None
703         self.pipeline_kwargs = {}
704         self.uplink_ports = None
705         self.downlink_ports = None
706         # TODO(esm): make QueueFileWrapper invert-able so that we
707         #            never have to manage the queues
708         self.q_in = Queue()
709         self.q_out = Queue()
710         self.queue_wrapper = None
711         self.run_kwargs = {}
712         self.used_drivers = {}
713         self.vnf_port_pairs = None
714         self._vnf_process = None
715
716     def _build_ports(self):
717         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
718         self.networks = self._port_pairs.networks
719         self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
720         self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
721         self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
722
723     def _get_route_data(self, route_index, route_type):
724         route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
725         for _ in range(route_index):
726             next(route_iter, '')
727         return next(route_iter, {}).get(route_type, '')
728
729     def _get_port0localip6(self):
730         return_value = self._get_route_data(0, 'network')
731         LOG.info("_get_port0localip6 : %s", return_value)
732         return return_value
733
734     def _get_port1localip6(self):
735         return_value = self._get_route_data(1, 'network')
736         LOG.info("_get_port1localip6 : %s", return_value)
737         return return_value
738
739     def _get_port0prefixlen6(self):
740         return_value = self._get_route_data(0, 'netmask')
741         LOG.info("_get_port0prefixlen6 : %s", return_value)
742         return return_value
743
744     def _get_port1prefixlen6(self):
745         return_value = self._get_route_data(1, 'netmask')
746         LOG.info("_get_port1prefixlen6 : %s", return_value)
747         return return_value
748
749     def _get_port0gateway6(self):
750         return_value = self._get_route_data(0, 'network')
751         LOG.info("_get_port0gateway6 : %s", return_value)
752         return return_value
753
754     def _get_port1gateway6(self):
755         return_value = self._get_route_data(1, 'network')
756         LOG.info("_get_port1gateway6 : %s", return_value)
757         return return_value
758
759     def _start_vnf(self):
760         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
761         self._vnf_process = Process(target=self._run)
762         self._vnf_process.start()
763
764     def _vnf_up_post(self):
765         pass
766
767     def instantiate(self, scenario_cfg, context_cfg):
768         self.scenario_helper.scenario_cfg = scenario_cfg
769         self.context_cfg = context_cfg
770         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
771         # self.nfvi_context = None
772
773         self.deploy_helper.deploy_vnfs(self.APP_NAME)
774         self.resource_helper.setup()
775         self._start_vnf()
776
777     def wait_for_instantiate(self):
778         buf = []
779         time.sleep(self.WAIT_TIME)  # Give some time for config to load
780         while True:
781             if not self._vnf_process.is_alive():
782                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
783
784             # TODO(esm): move to QueueFileWrapper
785             while self.q_out.qsize() > 0:
786                 buf.append(self.q_out.get())
787                 message = ''.join(buf)
788                 if self.VNF_PROMPT in message:
789                     LOG.info("%s VNF is up and running.", self.APP_NAME)
790                     self._vnf_up_post()
791                     self.queue_wrapper.clear()
792                     self.resource_helper.start_collect()
793                     return self._vnf_process.exitcode
794
795                 if "PANIC" in message:
796                     raise RuntimeError("Error starting %s VNF." %
797                                        self.APP_NAME)
798
799             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
800             time.sleep(1)
801             # Send ENTER to display a new prompt in case the prompt text was corrupted
802             # by other VNF output
803             self.q_in.put('\r\n')
804
805     def _build_run_kwargs(self):
806         self.run_kwargs = {
807             'stdin': self.queue_wrapper,
808             'stdout': self.queue_wrapper,
809             'keep_stdin_open': True,
810             'pty': True,
811         }
812
813     def _build_config(self):
814         return self.setup_helper.build_config()
815
816     def _run(self):
817         # we can't share ssh paramiko objects to force new connection
818         self.ssh_helper.drop_connection()
819         cmd = self._build_config()
820         # kill before starting
821         self.setup_helper.kill_vnf()
822
823         LOG.debug(cmd)
824         self._build_run_kwargs()
825         self.ssh_helper.run(cmd, **self.run_kwargs)
826
827     def vnf_execute(self, cmd, wait_time=2):
828         """ send cmd to vnf process """
829
830         LOG.info("%s command: %s", self.APP_NAME, cmd)
831         self.q_in.put("{}\r\n".format(cmd))
832         time.sleep(wait_time)
833         output = []
834         while self.q_out.qsize() > 0:
835             output.append(self.q_out.get())
836         return "".join(output)
837
838     def _tear_down(self):
839         pass
840
841     def terminate(self):
842         self.vnf_execute("quit")
843         if self._vnf_process:
844             self._vnf_process.terminate()
845         self.setup_helper.kill_vnf()
846         self._tear_down()
847         self.resource_helper.stop_collect()
848
849     def get_stats(self, *args, **kwargs):
850         """
851         Method for checking the statistics
852
853         :return:
854            VNF statistics
855         """
856         cmd = 'p {0} stats'.format(self.APP_WORD)
857         out = self.vnf_execute(cmd)
858         return out
859
860     def collect_kpi(self):
861         stats = self.get_stats()
862         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
863         if m:
864             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
865             result["collect_stats"] = self.resource_helper.collect_kpi()
866         else:
867             result = {
868                 "packets_in": 0,
869                 "packets_fwd": 0,
870                 "packets_dropped": 0,
871             }
872         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
873         return result
874
875
876 class SampleVNFTrafficGen(GenericTrafficGen):
877     """ Class providing file-like API for generic traffic generator """
878
879     APP_NAME = 'Sample'
880     RUN_WAIT = 1
881
882     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
883         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
884         self.bin_path = get_nsb_option('bin_path', '')
885         self.name = "tgen__1"  # name in topology file
886
887         self.scenario_helper = ScenarioHelper(self.name)
888         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
889
890         if setup_env_helper_type is None:
891             setup_env_helper_type = SetupEnvHelper
892
893         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
894                                                   self.ssh_helper,
895                                                   self.scenario_helper)
896
897         if resource_helper_type is None:
898             resource_helper_type = ClientResourceHelper
899
900         self.resource_helper = resource_helper_type(self.setup_helper)
901
902         self.runs_traffic = True
903         self.traffic_finished = False
904         self._tg_process = None
905         self._traffic_process = None
906
907     def _start_server(self):
908         # we can't share ssh paramiko objects to force new connection
909         self.ssh_helper.drop_connection()
910
911     def instantiate(self, scenario_cfg, context_cfg):
912         self.scenario_helper.scenario_cfg = scenario_cfg
913         self.resource_helper.generate_cfg()
914         self.resource_helper.setup()
915
916         LOG.info("Starting %s server...", self.APP_NAME)
917         self._tg_process = Process(target=self._start_server)
918         self._tg_process.start()
919
920     def wait_for_instantiate(self):
921         # overridden by subclasses
922         return self._wait_for_process()
923
924     def _check_status(self):
925         raise NotImplementedError
926
927     def _wait_for_process(self):
928         while True:
929             if not self._tg_process.is_alive():
930                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
931             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
932             time.sleep(1)
933             status = self._check_status()
934             if status == 0:
935                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
936                 return self._tg_process.exitcode
937
938     def _traffic_runner(self, traffic_profile):
939         # always drop connections first thing in new processes
940         # so we don't get paramiko errors
941         self.ssh_helper.drop_connection()
942         LOG.info("Starting %s client...", self.APP_NAME)
943         self.resource_helper.run_traffic(traffic_profile)
944
945     def run_traffic(self, traffic_profile):
946         """ Generate traffic on the wire according to the given params.
947         Method is non-blocking, returns immediately when traffic process
948         is running. Mandatory.
949
950         :param traffic_profile:
951         :return: True/False
952         """
953         self._traffic_process = Process(target=self._traffic_runner,
954                                         args=(traffic_profile,))
955         self._traffic_process.start()
956         # Wait for traffic process to start
957         while self.resource_helper.client_started.value == 0:
958             time.sleep(self.RUN_WAIT)
959             # what if traffic process takes a few seconds to start?
960             if not self._traffic_process.is_alive():
961                 break
962
963         return self._traffic_process.is_alive()
964
965     def listen_traffic(self, traffic_profile):
966         """ Listen to traffic with the given parameters.
967         Method is non-blocking, returns immediately when traffic process
968         is running. Optional.
969
970         :param traffic_profile:
971         :return: True/False
972         """
973         pass
974
975     def verify_traffic(self, traffic_profile):
976         """ Verify captured traffic after it has ended. Optional.
977
978         :param traffic_profile:
979         :return: dict
980         """
981         pass
982
983     def collect_kpi(self):
984         result = self.resource_helper.collect_kpi()
985         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
986         return result
987
988     def terminate(self):
989         """ After this method finishes, all traffic processes should stop. Mandatory.
990
991         :return: True/False
992         """
993         self.traffic_finished = True
994         if self._traffic_process is not None:
995             self._traffic_process.terminate()