Configure ACL via static file
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.common import yaml_loader
36 from yardstick.network_services import constants
37 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
38 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
42 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
45
46
47 LOG = logging.getLogger(__name__)
48
49
50 class SetupEnvHelper(object):
51
52     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
53     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
54     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
55     PIPELINE_COMMAND = ''
56     VNF_TYPE = "SAMPLE"
57
58     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
59         super(SetupEnvHelper, self).__init__()
60         self.vnfd_helper = vnfd_helper
61         self.ssh_helper = ssh_helper
62         self.scenario_helper = scenario_helper
63         self.collectd_options = {}
64
65     def build_config(self):
66         raise NotImplementedError
67
68     def setup_vnf_environment(self):
69         pass
70
71     def kill_vnf(self):
72         pass
73
74     def tear_down(self):
75         raise NotImplementedError
76
77
78 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
79
80     APP_NAME = 'DpdkVnf'
81     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
82     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
83
84     @staticmethod
85     def _update_packet_type(ip_pipeline_cfg, traffic_options):
86         match_str = 'pkt_type = ipv4'
87         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
88         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
89         return pipeline_config_str
90
91     @classmethod
92     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
93         traffic_type = traffic_options['traffic_type']
94
95         if traffic_options['vnf_type'] is not cls.APP_NAME:
96             match_str = 'traffic_type = 4'
97             replace_str = 'traffic_type = {0}'.format(traffic_type)
98
99         elif traffic_type == 4:
100             match_str = 'pkt_type = ipv4'
101             replace_str = 'pkt_type = ipv4'
102
103         else:
104             match_str = 'pkt_type = ipv4'
105             replace_str = 'pkt_type = ipv6'
106
107         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
108         return pipeline_config_str
109
110     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
111         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
112         self.all_ports = None
113         self.bound_pci = None
114         self.socket = None
115         self.used_drivers = None
116         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
117
118     def _setup_hugepages(self):
119         meminfo = utils.read_meminfo(self.ssh_helper)
120         hp_size_kb = int(meminfo['Hugepagesize'])
121         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
122         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
123         self.ssh_helper.execute('echo %s | sudo tee %s' %
124                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
125         hp = six.BytesIO()
126         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
127         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
128         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
129                  hp_size_kb, nr_hugepages, nr_hugepages_set)
130
131     def build_config(self):
132         vnf_cfg = self.scenario_helper.vnf_cfg
133         task_path = self.scenario_helper.task_path
134
135         config_file = vnf_cfg.get('file')
136         lb_count = vnf_cfg.get('lb_count', 3)
137         lb_config = vnf_cfg.get('lb_config', 'SW')
138         worker_config = vnf_cfg.get('worker_config', '1C/1T')
139         worker_threads = vnf_cfg.get('worker_threads', 3)
140
141         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
142         traffic_options = {
143             'traffic_type': traffic_type,
144             'pkt_type': 'ipv%s' % traffic_type,
145             'vnf_type': self.VNF_TYPE,
146         }
147
148         # read actions/rules from file
149         acl_options = None
150         acl_file_name = self.scenario_helper.options.get('rules')
151         if acl_file_name:
152             with utils.open_relative_file(acl_file_name, task_path) as infile:
153                 acl_options = yaml_loader.yaml_load(infile)
154
155         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
156                                                   task_path)
157         config_basename = posixpath.basename(self.CFG_CONFIG)
158         script_basename = posixpath.basename(self.CFG_SCRIPT)
159         multiport = MultiPortConfig(self.scenario_helper.topology,
160                                     config_tpl_cfg,
161                                     config_basename,
162                                     self.vnfd_helper,
163                                     self.VNF_TYPE,
164                                     lb_count,
165                                     worker_threads,
166                                     worker_config,
167                                     lb_config,
168                                     self.socket)
169
170         multiport.generate_config()
171         if config_file:
172             with utils.open_relative_file(config_file, task_path) as infile:
173                 new_config = ['[EAL]']
174                 vpci = []
175                 for port in self.vnfd_helper.port_pairs.all_ports:
176                     interface = self.vnfd_helper.find_interface(name=port)
177                     vpci.append(interface['virtual-interface']["vpci"])
178                 new_config.extend('w = {0}'.format(item) for item in vpci)
179                 new_config = '\n'.join(new_config) + '\n' + infile.read()
180         else:
181             with open(self.CFG_CONFIG) as handle:
182                 new_config = handle.read()
183             new_config = self._update_traffic_type(new_config, traffic_options)
184             new_config = self._update_packet_type(new_config, traffic_options)
185         self.ssh_helper.upload_config_file(config_basename, new_config)
186         self.ssh_helper.upload_config_file(script_basename,
187             multiport.generate_script(self.vnfd_helper,
188                                       self.get_flows_config(acl_options)))
189
190         LOG.info("Provision and start the %s", self.APP_NAME)
191         self._build_pipeline_kwargs()
192         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
193
194     def get_flows_config(self, options=None): # pylint: disable=unused-argument
195         """No actions/rules (flows) by default"""
196         return None
197
198     def _build_pipeline_kwargs(self):
199         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
200         # count the number of actual ports in the list of pairs
201         # remove duplicate ports
202         # this is really a mapping from LINK ID to DPDK PMD ID
203         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
204         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
205         ports = self.vnfd_helper.port_pairs.all_ports
206         port_nums = self.vnfd_helper.port_nums(ports)
207         # create mask from all the dpdk port numbers
208         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
209
210         vnf_cfg = self.scenario_helper.vnf_cfg
211         lb_config = vnf_cfg.get('lb_config', 'SW')
212         worker_threads = vnf_cfg.get('worker_threads', 3)
213         hwlb = ''
214         if lb_config == 'HW':
215             hwlb = ' --hwlb %s' % worker_threads
216
217         self.pipeline_kwargs = {
218             'cfg_file': self.CFG_CONFIG,
219             'script': self.CFG_SCRIPT,
220             'port_mask_hex': ports_mask_hex,
221             'tool_path': tool_path,
222             'hwlb': hwlb,
223         }
224
225     def setup_vnf_environment(self):
226         self._setup_dpdk()
227         self.kill_vnf()
228         # bind before _setup_resources so we can use dpdk_port_num
229         self._detect_and_bind_drivers()
230         resource = self._setup_resources()
231         return resource
232
233     def kill_vnf(self):
234         # pkill is not matching, debug with pgrep
235         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
236         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
237         # have to use exact match
238         # try using killall to match
239         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
240
241     def _setup_dpdk(self):
242         """Setup DPDK environment needed for VNF to run"""
243         self._setup_hugepages()
244         self.dpdk_bind_helper.load_dpdk_driver()
245
246         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
247         if exit_status == 0:
248             return
249
250     def _setup_resources(self):
251         # what is this magic?  how do we know which socket is for which port?
252         # what about quad-socket?
253         if any(v[5] == "0" for v in self.bound_pci):
254             self.socket = 0
255         else:
256             self.socket = 1
257
258         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
259         # this won't work because we don't have DPDK port numbers yet
260         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
261         port_names = (intf["name"] for intf in ports)
262         plugins = self.collectd_options.get("plugins", {})
263         interval = self.collectd_options.get("interval")
264         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
265         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
266                                plugins=plugins, interval=interval,
267                                timeout=self.scenario_helper.timeout)
268
269     def _check_interface_fields(self):
270         num_nodes = len(self.scenario_helper.nodes)
271         # OpenStack instance creation time is probably proportional to the number
272         # of instances
273         timeout = 120 * num_nodes
274         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
275                              self.ssh_helper, timeout)
276         dpdk_node.check()
277
278     def _detect_and_bind_drivers(self):
279         interfaces = self.vnfd_helper.interfaces
280
281         self._check_interface_fields()
282         # check for bound after probe
283         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
284
285         self.dpdk_bind_helper.read_status()
286         self.dpdk_bind_helper.save_used_drivers()
287
288         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
289
290         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
291         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
292             try:
293                 intf = next(v for v in interfaces
294                             if vpci == v['virtual-interface']['vpci'])
295                 # force to int
296                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
297             except:  # pylint: disable=bare-except
298                 pass
299         time.sleep(2)
300
301     def get_local_iface_name_by_vpci(self, vpci):
302         find_net_cmd = self.FIND_NET_CMD.format(vpci)
303         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
304         if exit_status == 0:
305             return stdout
306         return None
307
308     def tear_down(self):
309         self.dpdk_bind_helper.rebind_drivers()
310
311
312 class ResourceHelper(object):
313
314     COLLECT_KPI = ''
315     MAKE_INSTALL = 'cd {0} && make && sudo make install'
316     RESOURCE_WORD = 'sample'
317
318     COLLECT_MAP = {}
319
320     def __init__(self, setup_helper):
321         super(ResourceHelper, self).__init__()
322         self.resource = None
323         self.setup_helper = setup_helper
324         self.ssh_helper = setup_helper.ssh_helper
325
326     def setup(self):
327         self.resource = self.setup_helper.setup_vnf_environment()
328
329     def generate_cfg(self):
330         pass
331
332     def _collect_resource_kpi(self):
333         result = {}
334         status = self.resource.check_if_system_agent_running("collectd")[0]
335         if status == 0:
336             result = self.resource.amqp_collect_nfvi_kpi()
337
338         result = {"core": result}
339         return result
340
341     def start_collect(self):
342         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
343         self.resource.start()
344         self.resource.amqp_process_for_nfvi_kpi()
345
346     def stop_collect(self):
347         if self.resource:
348             self.resource.stop()
349
350     def collect_kpi(self):
351         return self._collect_resource_kpi()
352
353
354 class ClientResourceHelper(ResourceHelper):
355
356     RUN_DURATION = 60
357     QUEUE_WAIT_TIME = 5
358     SYNC_PORT = 1
359     ASYNC_PORT = 2
360
361     def __init__(self, setup_helper):
362         super(ClientResourceHelper, self).__init__(setup_helper)
363         self.vnfd_helper = setup_helper.vnfd_helper
364         self.scenario_helper = setup_helper.scenario_helper
365
366         self.client = None
367         self.client_started = Value('i', 0)
368         self.all_ports = None
369         self._queue = Queue()
370         self._result = {}
371         self._terminated = Value('i', 0)
372
373     def _build_ports(self):
374         self.networks = self.vnfd_helper.port_pairs.networks
375         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
376         self.downlink_ports = \
377             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
378         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
379
380     def port_num(self, intf):
381         # by default return port num
382         return self.vnfd_helper.port_num(intf)
383
384     def get_stats(self, *args, **kwargs):
385         try:
386             return self.client.get_stats(*args, **kwargs)
387         except STLError:
388             LOG.error('TRex client not connected')
389             return {}
390
391     def generate_samples(self, ports, key=None, default=None):
392         # needs to be used ports
393         last_result = self.get_stats(ports)
394         key_value = last_result.get(key, default)
395
396         if not isinstance(last_result, Mapping):  # added for mock unit test
397             self._terminated.value = 1
398             return {}
399
400         samples = {}
401         # recalculate port for interface and see if it matches ports provided
402         for intf in self.vnfd_helper.interfaces:
403             name = intf["name"]
404             port = self.vnfd_helper.port_num(name)
405             if port in ports:
406                 xe_value = last_result.get(port, {})
407                 samples[name] = {
408                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
409                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
410                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
411                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
412                     "in_packets": int(xe_value.get("ipackets", 0)),
413                     "out_packets": int(xe_value.get("opackets", 0)),
414                 }
415                 if key:
416                     samples[name][key] = key_value
417         return samples
418
419     def _run_traffic_once(self, traffic_profile):
420         traffic_profile.execute_traffic(self)
421         self.client_started.value = 1
422         time.sleep(self.RUN_DURATION)
423         samples = self.generate_samples(traffic_profile.ports)
424         time.sleep(self.QUEUE_WAIT_TIME)
425         self._queue.put(samples)
426
427     def run_traffic(self, traffic_profile):
428         # if we don't do this we can hang waiting for the queue to drain
429         # have to do this in the subprocess
430         self._queue.cancel_join_thread()
431         # fixme: fix passing correct trex config file,
432         # instead of searching the default path
433         try:
434             self._build_ports()
435             self.client = self._connect()
436             self.client.reset(ports=self.all_ports)
437             self.client.remove_all_streams(self.all_ports)  # remove all streams
438             traffic_profile.register_generator(self)
439
440             while self._terminated.value == 0:
441                 self._run_traffic_once(traffic_profile)
442
443             self.client.stop(self.all_ports)
444             self.client.disconnect()
445             self._terminated.value = 0
446         except STLError:
447             if self._terminated.value:
448                 LOG.debug("traffic generator is stopped")
449                 return  # return if trex/tg server is stopped.
450             raise
451
452     def terminate(self):
453         self._terminated.value = 1  # stop client
454
455     def clear_stats(self, ports=None):
456         if ports is None:
457             ports = self.all_ports
458         self.client.clear_stats(ports=ports)
459
460     def start(self, ports=None, *args, **kwargs):
461         # pylint: disable=keyword-arg-before-vararg
462         # NOTE(ralonsoh): defining keyworded arguments before variable
463         # positional arguments is a bug. This function definition doesn't work
464         # in Python 2, although it works in Python 3. Reference:
465         # https://www.python.org/dev/peps/pep-3102/
466         if ports is None:
467             ports = self.all_ports
468         self.client.start(ports=ports, *args, **kwargs)
469
470     def collect_kpi(self):
471         if not self._queue.empty():
472             kpi = self._queue.get()
473             self._result.update(kpi)
474             LOG.debug('Got KPIs from _queue for %s %s',
475                       self.scenario_helper.name, self.RESOURCE_WORD)
476         return self._result
477
478     def _connect(self, client=None):
479         if client is None:
480             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
481                                server=self.vnfd_helper.mgmt_interface["ip"],
482                                verbose_level=LoggerApi.VERBOSE_QUIET)
483
484         # try to connect with 5s intervals, 30s max
485         for idx in range(6):
486             try:
487                 client.connect()
488                 break
489             except STLError:
490                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
491                 time.sleep(5)
492         return client
493
494
495 class Rfc2544ResourceHelper(object):
496
497     DEFAULT_CORRELATED_TRAFFIC = False
498     DEFAULT_LATENCY = False
499     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
500
501     def __init__(self, scenario_helper):
502         super(Rfc2544ResourceHelper, self).__init__()
503         self.scenario_helper = scenario_helper
504         self._correlated_traffic = None
505         self.iteration = Value('i', 0)
506         self._latency = None
507         self._rfc2544 = None
508         self._tolerance_low = None
509         self._tolerance_high = None
510
511     @property
512     def rfc2544(self):
513         if self._rfc2544 is None:
514             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
515         return self._rfc2544
516
517     @property
518     def tolerance_low(self):
519         if self._tolerance_low is None:
520             self.get_rfc_tolerance()
521         return self._tolerance_low
522
523     @property
524     def tolerance_high(self):
525         if self._tolerance_high is None:
526             self.get_rfc_tolerance()
527         return self._tolerance_high
528
529     @property
530     def correlated_traffic(self):
531         if self._correlated_traffic is None:
532             self._correlated_traffic = \
533                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
534
535         return self._correlated_traffic
536
537     @property
538     def latency(self):
539         if self._latency is None:
540             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
541         return self._latency
542
543     def get_rfc2544(self, name, default=None):
544         return self.rfc2544.get(name, default)
545
546     def get_rfc_tolerance(self):
547         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
548         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
549         self._tolerance_low = next(tolerance_iter)
550         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
551
552
553 class SampleVNFDeployHelper(object):
554
555     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
556     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
557     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
558
559     def __init__(self, vnfd_helper, ssh_helper):
560         super(SampleVNFDeployHelper, self).__init__()
561         self.ssh_helper = ssh_helper
562         self.vnfd_helper = vnfd_helper
563
564     def deploy_vnfs(self, app_name):
565         vnf_bin = self.ssh_helper.join_bin_path(app_name)
566         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
567         if not exit_status:
568             return
569
570         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
571         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
572         time.sleep(2)
573         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
574         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
575
576         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
577         time.sleep(2)
578         http_proxy = os.environ.get('http_proxy', '')
579         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
580         LOG.debug(cmd)
581         self.ssh_helper.execute(cmd)
582         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
583         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
584         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
585
586
587 class ScenarioHelper(object):
588
589     DEFAULT_VNF_CFG = {
590         'lb_config': 'SW',
591         'lb_count': 1,
592         'worker_config': '1C/1T',
593         'worker_threads': 1,
594     }
595
596     def __init__(self, name):
597         self.name = name
598         self.scenario_cfg = None
599
600     @property
601     def task_path(self):
602         return self.scenario_cfg['task_path']
603
604     @property
605     def nodes(self):
606         return self.scenario_cfg.get('nodes')
607
608     @property
609     def all_options(self):
610         return self.scenario_cfg.get('options', {})
611
612     @property
613     def options(self):
614         return self.all_options.get(self.name, {})
615
616     @property
617     def vnf_cfg(self):
618         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
619
620     @property
621     def topology(self):
622         return self.scenario_cfg['topology']
623
624     @property
625     def timeout(self):
626         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
627             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
628         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
629         return test_duration if test_duration > test_timeout else test_timeout
630
631 class SampleVNF(GenericVNF):
632     """ Class providing file-like API for generic VNF implementation """
633
634     VNF_PROMPT = "pipeline>"
635     WAIT_TIME = 1
636     WAIT_TIME_FOR_SCRIPT = 10
637     APP_NAME = "SampleVNF"
638     # we run the VNF interactively, so the ssh command will timeout after this long
639
640     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
641         super(SampleVNF, self).__init__(name, vnfd)
642         self.bin_path = get_nsb_option('bin_path', '')
643
644         self.scenario_helper = ScenarioHelper(self.name)
645         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
646
647         if setup_env_helper_type is None:
648             setup_env_helper_type = SetupEnvHelper
649
650         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
651                                                   self.ssh_helper,
652                                                   self.scenario_helper)
653
654         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
655
656         if resource_helper_type is None:
657             resource_helper_type = ResourceHelper
658
659         self.resource_helper = resource_helper_type(self.setup_helper)
660
661         self.context_cfg = None
662         self.nfvi_context = None
663         self.pipeline_kwargs = {}
664         self.uplink_ports = None
665         self.downlink_ports = None
666         # NOTE(esm): make QueueFileWrapper invert-able so that we
667         #            never have to manage the queues
668         self.q_in = Queue()
669         self.q_out = Queue()
670         self.queue_wrapper = None
671         self.run_kwargs = {}
672         self.used_drivers = {}
673         self.vnf_port_pairs = None
674         self._vnf_process = None
675
676     def _start_vnf(self):
677         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
678         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
679         self._vnf_process = Process(name=name, target=self._run)
680         self._vnf_process.start()
681
682     def _vnf_up_post(self):
683         pass
684
685     def instantiate(self, scenario_cfg, context_cfg):
686         self._update_collectd_options(scenario_cfg, context_cfg)
687         self.scenario_helper.scenario_cfg = scenario_cfg
688         self.context_cfg = context_cfg
689         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
690         # self.nfvi_context = None
691
692         # vnf deploy is unsupported, use ansible playbooks
693         if self.scenario_helper.options.get("vnf_deploy", False):
694             self.deploy_helper.deploy_vnfs(self.APP_NAME)
695         self.resource_helper.setup()
696         self._start_vnf()
697
698     def _update_collectd_options(self, scenario_cfg, context_cfg):
699         """Update collectd configuration options
700         This function retrieves all collectd options contained in the test case
701
702         definition builds a single dictionary combining them. The following fragment
703         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
704         ---
705         schema: yardstick:task:0.1
706         scenarios:
707         - type: NSPerf
708           nodes:
709             tg__0: trafficgen_1.yardstick
710             vnf__0: vnf.yardstick
711           options:
712             collectd:
713               <options>  # COLLECTD priority 3
714             vnf__0:
715               collectd:
716                 plugins:
717                     load
718                 <options> # COLLECTD priority 2
719         context:
720           type: Node
721           name: yardstick
722           nfvi_type: baremetal
723           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
724         """
725         scenario_options = scenario_cfg.get('options', {})
726         generic_options = scenario_options.get('collectd', {})
727         scenario_node_options = scenario_options.get(self.name, {})\
728             .get('collectd', {})
729         context_node_options = context_cfg.get('nodes', {})\
730             .get(self.name, {}).get('collectd', {})
731
732         options = generic_options
733         self._update_options(options, scenario_node_options)
734         self._update_options(options, context_node_options)
735
736         self.setup_helper.collectd_options = options
737
738     def _update_options(self, options, additional_options):
739         """Update collectd options and plugins dictionary"""
740         for k, v in additional_options.items():
741             if isinstance(v, dict) and k in options:
742                 options[k].update(v)
743             else:
744                 options[k] = v
745
746     def wait_for_instantiate(self):
747         buf = []
748         time.sleep(self.WAIT_TIME)  # Give some time for config to load
749         while True:
750             if not self._vnf_process.is_alive():
751                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
752
753             # NOTE(esm): move to QueueFileWrapper
754             while self.q_out.qsize() > 0:
755                 buf.append(self.q_out.get())
756                 message = ''.join(buf)
757                 if self.VNF_PROMPT in message:
758                     LOG.info("%s VNF is up and running.", self.APP_NAME)
759                     self._vnf_up_post()
760                     self.queue_wrapper.clear()
761                     return self._vnf_process.exitcode
762
763                 if "PANIC" in message:
764                     raise RuntimeError("Error starting %s VNF." %
765                                        self.APP_NAME)
766
767             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
768             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
769             # Send ENTER to display a new prompt in case the prompt text was corrupted
770             # by other VNF output
771             self.q_in.put('\r\n')
772
773     def start_collect(self):
774         self.resource_helper.start_collect()
775
776     def stop_collect(self):
777         self.resource_helper.stop_collect()
778
779     def _build_run_kwargs(self):
780         self.run_kwargs = {
781             'stdin': self.queue_wrapper,
782             'stdout': self.queue_wrapper,
783             'keep_stdin_open': True,
784             'pty': True,
785             'timeout': self.scenario_helper.timeout,
786         }
787
788     def _build_config(self):
789         return self.setup_helper.build_config()
790
791     def _run(self):
792         # we can't share ssh paramiko objects to force new connection
793         self.ssh_helper.drop_connection()
794         cmd = self._build_config()
795         # kill before starting
796         self.setup_helper.kill_vnf()
797
798         LOG.debug(cmd)
799         self._build_run_kwargs()
800         self.ssh_helper.run(cmd, **self.run_kwargs)
801
802     def vnf_execute(self, cmd, wait_time=2):
803         """ send cmd to vnf process """
804
805         LOG.info("%s command: %s", self.APP_NAME, cmd)
806         self.q_in.put("{}\r\n".format(cmd))
807         time.sleep(wait_time)
808         output = []
809         while self.q_out.qsize() > 0:
810             output.append(self.q_out.get())
811         return "".join(output)
812
813     def _tear_down(self):
814         pass
815
816     def terminate(self):
817         self.vnf_execute("quit")
818         self.setup_helper.kill_vnf()
819         self._tear_down()
820         self.resource_helper.stop_collect()
821         if self._vnf_process is not None:
822             # be proper and join first before we kill
823             LOG.debug("joining before terminate %s", self._vnf_process.name)
824             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
825             self._vnf_process.terminate()
826         # no terminate children here because we share processes with tg
827
828     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
829         """Method for checking the statistics
830
831         This method could be overridden in children classes.
832
833         :return: VNF statistics
834         """
835         cmd = 'p {0} stats'.format(self.APP_WORD)
836         out = self.vnf_execute(cmd)
837         return out
838
839     def collect_kpi(self):
840         # we can't get KPIs if the VNF is down
841         check_if_process_failed(self._vnf_process, 0.01)
842         stats = self.get_stats()
843         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
844         if m:
845             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
846             result["collect_stats"] = self.resource_helper.collect_kpi()
847         else:
848             result = {
849                 "packets_in": 0,
850                 "packets_fwd": 0,
851                 "packets_dropped": 0,
852             }
853         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
854         return result
855
856     def scale(self, flavor=""):
857         """The SampleVNF base class doesn't provide the 'scale' feature"""
858         raise y_exceptions.FunctionNotImplemented(
859             function_name='scale', class_name='SampleVNFTrafficGen')
860
861
862 class SampleVNFTrafficGen(GenericTrafficGen):
863     """ Class providing file-like API for generic traffic generator """
864
865     APP_NAME = 'Sample'
866     RUN_WAIT = 1
867
868     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
869         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
870         self.bin_path = get_nsb_option('bin_path', '')
871
872         self.scenario_helper = ScenarioHelper(self.name)
873         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
874
875         if setup_env_helper_type is None:
876             setup_env_helper_type = SetupEnvHelper
877
878         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
879                                                   self.ssh_helper,
880                                                   self.scenario_helper)
881
882         if resource_helper_type is None:
883             resource_helper_type = ClientResourceHelper
884
885         self.resource_helper = resource_helper_type(self.setup_helper)
886
887         self.runs_traffic = True
888         self.traffic_finished = False
889         self._tg_process = None
890         self._traffic_process = None
891
892     def _start_server(self):
893         # we can't share ssh paramiko objects to force new connection
894         self.ssh_helper.drop_connection()
895
896     def instantiate(self, scenario_cfg, context_cfg):
897         self.scenario_helper.scenario_cfg = scenario_cfg
898         self.resource_helper.setup()
899         # must generate_cfg after DPDK bind because we need port number
900         self.resource_helper.generate_cfg()
901
902         LOG.info("Starting %s server...", self.APP_NAME)
903         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
904         self._tg_process = Process(name=name, target=self._start_server)
905         self._tg_process.start()
906
907     def _check_status(self):
908         raise NotImplementedError
909
910     def _wait_for_process(self):
911         while True:
912             if not self._tg_process.is_alive():
913                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
914             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
915             time.sleep(1)
916             status = self._check_status()
917             if status == 0:
918                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
919                 return self._tg_process.exitcode
920
921     def _traffic_runner(self, traffic_profile):
922         # always drop connections first thing in new processes
923         # so we don't get paramiko errors
924         self.ssh_helper.drop_connection()
925         LOG.info("Starting %s client...", self.APP_NAME)
926         self.resource_helper.run_traffic(traffic_profile)
927
928     def run_traffic(self, traffic_profile):
929         """ Generate traffic on the wire according to the given params.
930         Method is non-blocking, returns immediately when traffic process
931         is running. Mandatory.
932
933         :param traffic_profile:
934         :return: True/False
935         """
936         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
937                                     os.getpid())
938         self._traffic_process = Process(name=name, target=self._traffic_runner,
939                                         args=(traffic_profile,))
940         self._traffic_process.start()
941         # Wait for traffic process to start
942         while self.resource_helper.client_started.value == 0:
943             time.sleep(self.RUN_WAIT)
944             # what if traffic process takes a few seconds to start?
945             if not self._traffic_process.is_alive():
946                 break
947
948         return self._traffic_process.is_alive()
949
950     def collect_kpi(self):
951         # check if the tg processes have exited
952         for proc in (self._tg_process, self._traffic_process):
953             check_if_process_failed(proc)
954         result = self.resource_helper.collect_kpi()
955         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
956         return result
957
958     def terminate(self):
959         """ After this method finishes, all traffic processes should stop. Mandatory.
960
961         :return: True/False
962         """
963         self.traffic_finished = True
964         # we must kill client before we kill the server, or the client will raise exception
965         if self._traffic_process is not None:
966             # be proper and try to join before terminating
967             LOG.debug("joining before terminate %s", self._traffic_process.name)
968             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
969             self._traffic_process.terminate()
970         if self._tg_process is not None:
971             # be proper and try to join before terminating
972             LOG.debug("joining before terminate %s", self._tg_process.name)
973             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
974             self._tg_process.terminate()
975         # no terminate children here because we share processes with vnf
976
977     def scale(self, flavor=""):
978         """A traffic generator VFN doesn't provide the 'scale' feature"""
979         raise y_exceptions.FunctionNotImplemented(
980             function_name='scale', class_name='SampleVNFTrafficGen')