Merge "Fix exception handling ixload"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import decimal
17 from multiprocessing import Queue, Value, Process
18 import os
19 import posixpath
20 import re
21 import subprocess
22 import time
23
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 from yardstick.benchmark.contexts.node import NodeContext
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SetupEnvHelper(object):
48
49     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
52     PIPELINE_COMMAND = ''
53     VNF_TYPE = "SAMPLE"
54
55     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56         super(SetupEnvHelper, self).__init__()
57         self.vnfd_helper = vnfd_helper
58         self.ssh_helper = ssh_helper
59         self.scenario_helper = scenario_helper
60         self.collectd_options = {}
61
62     def build_config(self):
63         raise NotImplementedError
64
65     def setup_vnf_environment(self):
66         pass
67
68     def kill_vnf(self):
69         pass
70
71     def tear_down(self):
72         raise NotImplementedError
73
74
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
76
77     APP_NAME = 'DpdkVnf'
78     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
80
81     @staticmethod
82     def _update_packet_type(ip_pipeline_cfg, traffic_options):
83         match_str = 'pkt_type = ipv4'
84         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86         return pipeline_config_str
87
88     @classmethod
89     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90         traffic_type = traffic_options['traffic_type']
91
92         if traffic_options['vnf_type'] is not cls.APP_NAME:
93             match_str = 'traffic_type = 4'
94             replace_str = 'traffic_type = {0}'.format(traffic_type)
95
96         elif traffic_type == 4:
97             match_str = 'pkt_type = ipv4'
98             replace_str = 'pkt_type = ipv4'
99
100         else:
101             match_str = 'pkt_type = ipv4'
102             replace_str = 'pkt_type = ipv6'
103
104         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105         return pipeline_config_str
106
107     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109         self.all_ports = None
110         self.bound_pci = None
111         self.socket = None
112         self.used_drivers = None
113         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
114
115     def build_config(self):
116         vnf_cfg = self.scenario_helper.vnf_cfg
117         task_path = self.scenario_helper.task_path
118
119         config_file = vnf_cfg.get('file')
120         lb_count = vnf_cfg.get('lb_count', 3)
121         lb_config = vnf_cfg.get('lb_config', 'SW')
122         worker_config = vnf_cfg.get('worker_config', '1C/1T')
123         worker_threads = vnf_cfg.get('worker_threads', 3)
124
125         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
126         traffic_options = {
127             'traffic_type': traffic_type,
128             'pkt_type': 'ipv%s' % traffic_type,
129             'vnf_type': self.VNF_TYPE,
130         }
131
132         # read actions/rules from file
133         acl_options = None
134         acl_file_name = self.scenario_helper.options.get('rules')
135         if acl_file_name:
136             with utils.open_relative_file(acl_file_name, task_path) as infile:
137                 acl_options = yaml_loader.yaml_load(infile)
138
139         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
140                                                   task_path)
141         config_basename = posixpath.basename(self.CFG_CONFIG)
142         script_basename = posixpath.basename(self.CFG_SCRIPT)
143         multiport = MultiPortConfig(self.scenario_helper.topology,
144                                     config_tpl_cfg,
145                                     config_basename,
146                                     self.vnfd_helper,
147                                     self.VNF_TYPE,
148                                     lb_count,
149                                     worker_threads,
150                                     worker_config,
151                                     lb_config,
152                                     self.socket)
153
154         multiport.generate_config()
155         if config_file:
156             with utils.open_relative_file(config_file, task_path) as infile:
157                 new_config = ['[EAL]']
158                 vpci = []
159                 for port in self.vnfd_helper.port_pairs.all_ports:
160                     interface = self.vnfd_helper.find_interface(name=port)
161                     vpci.append(interface['virtual-interface']["vpci"])
162                 new_config.extend('w = {0}'.format(item) for item in vpci)
163                 new_config = '\n'.join(new_config) + '\n' + infile.read()
164         else:
165             with open(self.CFG_CONFIG) as handle:
166                 new_config = handle.read()
167             new_config = self._update_traffic_type(new_config, traffic_options)
168             new_config = self._update_packet_type(new_config, traffic_options)
169         self.ssh_helper.upload_config_file(config_basename, new_config)
170         self.ssh_helper.upload_config_file(script_basename,
171             multiport.generate_script(self.vnfd_helper,
172                                       self.get_flows_config(acl_options)))
173
174         LOG.info("Provision and start the %s", self.APP_NAME)
175         self._build_pipeline_kwargs()
176         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
177
178     def get_flows_config(self, options=None): # pylint: disable=unused-argument
179         """No actions/rules (flows) by default"""
180         return None
181
182     def _build_pipeline_kwargs(self, cfg_file=None, script=None):
183         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
184         # count the number of actual ports in the list of pairs
185         # remove duplicate ports
186         # this is really a mapping from LINK ID to DPDK PMD ID
187         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
188         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
189         ports = self.vnfd_helper.port_pairs.all_ports
190         port_nums = self.vnfd_helper.port_nums(ports)
191         # create mask from all the dpdk port numbers
192         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
193
194         vnf_cfg = self.scenario_helper.vnf_cfg
195         lb_config = vnf_cfg.get('lb_config', 'SW')
196         worker_threads = vnf_cfg.get('worker_threads', 3)
197         hwlb = ''
198         if lb_config == 'HW':
199             hwlb = ' --hwlb %s' % worker_threads
200
201         self.pipeline_kwargs = {
202             'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
203             'script': script if script else self.CFG_SCRIPT,
204             'port_mask_hex': ports_mask_hex,
205             'tool_path': tool_path,
206             'hwlb': hwlb,
207         }
208
209     def setup_vnf_environment(self):
210         self._setup_dpdk()
211         self.kill_vnf()
212         # bind before _setup_resources so we can use dpdk_port_num
213         self._detect_and_bind_drivers()
214         resource = self._setup_resources()
215         return resource
216
217     def kill_vnf(self):
218         # pkill is not matching, debug with pgrep
219         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
220         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
221         # have to use exact match
222         # try using killall to match
223         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
224
225     def _setup_dpdk(self):
226         """Setup DPDK environment needed for VNF to run"""
227         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
228         utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
229         self.dpdk_bind_helper.load_dpdk_driver()
230
231         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
232         if exit_status == 0:
233             return
234
235     def _setup_resources(self):
236         # what is this magic?  how do we know which socket is for which port?
237         # what about quad-socket?
238         if any(v[5] == "0" for v in self.bound_pci):
239             self.socket = 0
240         else:
241             self.socket = 1
242
243         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
244         # this won't work because we don't have DPDK port numbers yet
245         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
246         port_names = (intf["name"] for intf in ports)
247         plugins = self.collectd_options.get("plugins", {})
248         interval = self.collectd_options.get("interval")
249         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
250         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
251                                plugins=plugins, interval=interval,
252                                timeout=self.scenario_helper.timeout)
253
254     def _check_interface_fields(self):
255         num_nodes = len(self.scenario_helper.nodes)
256         # OpenStack instance creation time is probably proportional to the number
257         # of instances
258         timeout = 120 * num_nodes
259         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
260                              self.ssh_helper, timeout)
261         dpdk_node.check()
262
263     def _detect_and_bind_drivers(self):
264         interfaces = self.vnfd_helper.interfaces
265
266         self._check_interface_fields()
267         # check for bound after probe
268         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
269
270         self.dpdk_bind_helper.read_status()
271         self.dpdk_bind_helper.save_used_drivers()
272
273         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
274
275         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
276         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
277             try:
278                 intf = next(v for v in interfaces
279                             if vpci == v['virtual-interface']['vpci'])
280                 # force to int
281                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
282             except:  # pylint: disable=bare-except
283                 pass
284         time.sleep(2)
285
286     def get_local_iface_name_by_vpci(self, vpci):
287         find_net_cmd = self.FIND_NET_CMD.format(vpci)
288         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
289         if exit_status == 0:
290             return stdout
291         return None
292
293     def tear_down(self):
294         self.dpdk_bind_helper.rebind_drivers()
295
296
297 class ResourceHelper(object):
298
299     COLLECT_KPI = ''
300     MAKE_INSTALL = 'cd {0} && make && sudo make install'
301     RESOURCE_WORD = 'sample'
302
303     COLLECT_MAP = {}
304
305     def __init__(self, setup_helper):
306         super(ResourceHelper, self).__init__()
307         self.resource = None
308         self.setup_helper = setup_helper
309         self.ssh_helper = setup_helper.ssh_helper
310         self._enable = True
311
312     def setup(self):
313         self.resource = self.setup_helper.setup_vnf_environment()
314
315     def generate_cfg(self):
316         pass
317
318     def update_from_context(self, context, attr_name):
319         """Disable resource helper in case of baremetal context.
320
321         And update appropriate node collectd options in context
322         """
323         if isinstance(context, NodeContext):
324             self._enable = False
325             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
326                                                      attr_name)
327
328     def _collect_resource_kpi(self):
329         result = {}
330         status = self.resource.check_if_system_agent_running("collectd")[0]
331         if status == 0 and self._enable:
332             result = self.resource.amqp_collect_nfvi_kpi()
333
334         result = {"core": result}
335         return result
336
337     def start_collect(self):
338         if self._enable:
339             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
340             self.resource.start()
341             self.resource.amqp_process_for_nfvi_kpi()
342
343     def stop_collect(self):
344         if self.resource and self._enable:
345             self.resource.stop()
346
347     def collect_kpi(self):
348         return self._collect_resource_kpi()
349
350
351 class ClientResourceHelper(ResourceHelper):
352
353     RUN_DURATION = 60
354     QUEUE_WAIT_TIME = 5
355     SYNC_PORT = 1
356     ASYNC_PORT = 2
357
358     def __init__(self, setup_helper):
359         super(ClientResourceHelper, self).__init__(setup_helper)
360         self.vnfd_helper = setup_helper.vnfd_helper
361         self.scenario_helper = setup_helper.scenario_helper
362
363         self.client = None
364         self.client_started = Value('i', 0)
365         self.all_ports = None
366         self._queue = Queue()
367         self._result = {}
368         self._terminated = Value('i', 0)
369
370     def _build_ports(self):
371         self.networks = self.vnfd_helper.port_pairs.networks
372         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
373         self.downlink_ports = \
374             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
375         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
376
377     def port_num(self, intf):
378         # by default return port num
379         return self.vnfd_helper.port_num(intf)
380
381     def get_stats(self, *args, **kwargs):
382         try:
383             return self.client.get_stats(*args, **kwargs)
384         except STLError:
385             LOG.error('TRex client not connected')
386             return {}
387
388     def _get_samples(self, ports, port_pg_id=False):
389         raise NotImplementedError()
390
391     def _run_traffic_once(self, traffic_profile):
392         traffic_profile.execute_traffic(self)
393         self.client_started.value = 1
394         time.sleep(self.RUN_DURATION)
395         samples = self._get_samples(traffic_profile.ports)
396         time.sleep(self.QUEUE_WAIT_TIME)
397         self._queue.put(samples)
398
399     def run_traffic(self, traffic_profile):
400         # if we don't do this we can hang waiting for the queue to drain
401         # have to do this in the subprocess
402         self._queue.cancel_join_thread()
403         # fixme: fix passing correct trex config file,
404         # instead of searching the default path
405         try:
406             self._build_ports()
407             self.client = self._connect()
408             self.client.reset(ports=self.all_ports)
409             self.client.remove_all_streams(self.all_ports)  # remove all streams
410             traffic_profile.register_generator(self)
411
412             while self._terminated.value == 0:
413                 if self._run_traffic_once(traffic_profile):
414                     self._terminated.value = 1
415
416             self.client.stop(self.all_ports)
417             self.client.disconnect()
418             self._terminated.value = 0
419         except STLError:
420             if self._terminated.value:
421                 LOG.debug("traffic generator is stopped")
422                 return  # return if trex/tg server is stopped.
423             raise
424
425     def terminate(self):
426         self._terminated.value = 1  # stop client
427
428     def clear_stats(self, ports=None):
429         if ports is None:
430             ports = self.all_ports
431         self.client.clear_stats(ports=ports)
432
433     def start(self, ports=None, *args, **kwargs):
434         # pylint: disable=keyword-arg-before-vararg
435         # NOTE(ralonsoh): defining keyworded arguments before variable
436         # positional arguments is a bug. This function definition doesn't work
437         # in Python 2, although it works in Python 3. Reference:
438         # https://www.python.org/dev/peps/pep-3102/
439         if ports is None:
440             ports = self.all_ports
441         self.client.start(ports=ports, *args, **kwargs)
442
443     def collect_kpi(self):
444         if not self._queue.empty():
445             kpi = self._queue.get()
446             self._result.update(kpi)
447             LOG.debug('Got KPIs from _queue for %s %s',
448                       self.scenario_helper.name, self.RESOURCE_WORD)
449         return self._result
450
451     def _connect(self, client=None):
452         if client is None:
453             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
454                                server=self.vnfd_helper.mgmt_interface["ip"],
455                                verbose_level=LoggerApi.VERBOSE_QUIET)
456
457         # try to connect with 5s intervals, 30s max
458         for idx in range(6):
459             try:
460                 client.connect()
461                 break
462             except STLError:
463                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
464                 time.sleep(5)
465         return client
466
467
468 class Rfc2544ResourceHelper(object):
469
470     DEFAULT_CORRELATED_TRAFFIC = False
471     DEFAULT_LATENCY = False
472     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
473
474     def __init__(self, scenario_helper):
475         super(Rfc2544ResourceHelper, self).__init__()
476         self.scenario_helper = scenario_helper
477         self._correlated_traffic = None
478         self.iteration = Value('i', 0)
479         self._latency = None
480         self._rfc2544 = None
481         self._tolerance_low = None
482         self._tolerance_high = None
483         self._tolerance_precision = None
484
485     @property
486     def rfc2544(self):
487         if self._rfc2544 is None:
488             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
489         return self._rfc2544
490
491     @property
492     def tolerance_low(self):
493         if self._tolerance_low is None:
494             self.get_rfc_tolerance()
495         return self._tolerance_low
496
497     @property
498     def tolerance_high(self):
499         if self._tolerance_high is None:
500             self.get_rfc_tolerance()
501         return self._tolerance_high
502
503     @property
504     def tolerance_precision(self):
505         if self._tolerance_precision is None:
506             self.get_rfc_tolerance()
507         return self._tolerance_precision
508
509     @property
510     def correlated_traffic(self):
511         if self._correlated_traffic is None:
512             self._correlated_traffic = \
513                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
514
515         return self._correlated_traffic
516
517     @property
518     def latency(self):
519         if self._latency is None:
520             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
521         return self._latency
522
523     def get_rfc2544(self, name, default=None):
524         return self.rfc2544.get(name, default)
525
526     def get_rfc_tolerance(self):
527         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
528         tolerance_iter = iter(sorted(
529             decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
530         tolerance_low = next(tolerance_iter)
531         tolerance_high = next(tolerance_iter, tolerance_low)
532         self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
533         self._tolerance_high = float(tolerance_high)
534         self._tolerance_low = float(tolerance_low)
535
536
537 class SampleVNFDeployHelper(object):
538
539     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
540     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
541     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
542
543     def __init__(self, vnfd_helper, ssh_helper):
544         super(SampleVNFDeployHelper, self).__init__()
545         self.ssh_helper = ssh_helper
546         self.vnfd_helper = vnfd_helper
547
548     def deploy_vnfs(self, app_name):
549         vnf_bin = self.ssh_helper.join_bin_path(app_name)
550         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
551         if not exit_status:
552             return
553
554         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
555         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
556         time.sleep(2)
557         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
558         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
559
560         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
561         time.sleep(2)
562         http_proxy = os.environ.get('http_proxy', '')
563         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
564         LOG.debug(cmd)
565         self.ssh_helper.execute(cmd)
566         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
567         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
568         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
569
570
571 class ScenarioHelper(object):
572
573     DEFAULT_VNF_CFG = {
574         'lb_config': 'SW',
575         'lb_count': 1,
576         'worker_config': '1C/1T',
577         'worker_threads': 1,
578     }
579
580     def __init__(self, name):
581         self.name = name
582         self.scenario_cfg = None
583
584     @property
585     def task_path(self):
586         return self.scenario_cfg['task_path']
587
588     @property
589     def nodes(self):
590         return self.scenario_cfg.get('nodes')
591
592     @property
593     def all_options(self):
594         return self.scenario_cfg.get('options', {})
595
596     @property
597     def options(self):
598         return self.all_options.get(self.name, {})
599
600     @property
601     def vnf_cfg(self):
602         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
603
604     @property
605     def topology(self):
606         return self.scenario_cfg['topology']
607
608     @property
609     def timeout(self):
610         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
611             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
612         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
613         return test_duration if test_duration > test_timeout else test_timeout
614
615 class SampleVNF(GenericVNF):
616     """ Class providing file-like API for generic VNF implementation """
617
618     VNF_PROMPT = "pipeline>"
619     WAIT_TIME = 1
620     WAIT_TIME_FOR_SCRIPT = 10
621     APP_NAME = "SampleVNF"
622     # we run the VNF interactively, so the ssh command will timeout after this long
623
624     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
625         super(SampleVNF, self).__init__(name, vnfd)
626         self.bin_path = get_nsb_option('bin_path', '')
627
628         self.scenario_helper = ScenarioHelper(self.name)
629         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
630
631         if setup_env_helper_type is None:
632             setup_env_helper_type = SetupEnvHelper
633
634         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
635                                                   self.ssh_helper,
636                                                   self.scenario_helper)
637
638         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
639
640         if resource_helper_type is None:
641             resource_helper_type = ResourceHelper
642
643         self.resource_helper = resource_helper_type(self.setup_helper)
644
645         self.context_cfg = None
646         self.pipeline_kwargs = {}
647         self.uplink_ports = None
648         self.downlink_ports = None
649         # NOTE(esm): make QueueFileWrapper invert-able so that we
650         #            never have to manage the queues
651         self.q_in = Queue()
652         self.q_out = Queue()
653         self.queue_wrapper = None
654         self.run_kwargs = {}
655         self.used_drivers = {}
656         self.vnf_port_pairs = None
657         self._vnf_process = None
658
659     def _start_vnf(self):
660         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
661         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
662         self._vnf_process = Process(name=name, target=self._run)
663         self._vnf_process.start()
664
665     def _vnf_up_post(self):
666         pass
667
668     def instantiate(self, scenario_cfg, context_cfg):
669         self._update_collectd_options(scenario_cfg, context_cfg)
670         self.scenario_helper.scenario_cfg = scenario_cfg
671         self.context_cfg = context_cfg
672         self.resource_helper.update_from_context(
673             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
674             self.scenario_helper.nodes[self.name]
675         )
676
677         # vnf deploy is unsupported, use ansible playbooks
678         if self.scenario_helper.options.get("vnf_deploy", False):
679             self.deploy_helper.deploy_vnfs(self.APP_NAME)
680         self.resource_helper.setup()
681         self._start_vnf()
682
683     def _update_collectd_options(self, scenario_cfg, context_cfg):
684         """Update collectd configuration options
685         This function retrieves all collectd options contained in the test case
686
687         definition builds a single dictionary combining them. The following fragment
688         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
689         ---
690         schema: yardstick:task:0.1
691         scenarios:
692         - type: NSPerf
693           nodes:
694             tg__0: trafficgen_1.yardstick
695             vnf__0: vnf.yardstick
696           options:
697             collectd:
698               <options>  # COLLECTD priority 3
699             vnf__0:
700               collectd:
701                 plugins:
702                     load
703                 <options> # COLLECTD priority 2
704         context:
705           type: Node
706           name: yardstick
707           nfvi_type: baremetal
708           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
709         """
710         scenario_options = scenario_cfg.get('options', {})
711         generic_options = scenario_options.get('collectd', {})
712         scenario_node_options = scenario_options.get(self.name, {})\
713             .get('collectd', {})
714         context_node_options = context_cfg.get('nodes', {})\
715             .get(self.name, {}).get('collectd', {})
716
717         options = generic_options
718         self._update_options(options, scenario_node_options)
719         self._update_options(options, context_node_options)
720
721         self.setup_helper.collectd_options = options
722
723     def _update_options(self, options, additional_options):
724         """Update collectd options and plugins dictionary"""
725         for k, v in additional_options.items():
726             if isinstance(v, dict) and k in options:
727                 options[k].update(v)
728             else:
729                 options[k] = v
730
731     def wait_for_instantiate(self):
732         buf = []
733         time.sleep(self.WAIT_TIME)  # Give some time for config to load
734         while True:
735             if not self._vnf_process.is_alive():
736                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
737
738             # NOTE(esm): move to QueueFileWrapper
739             while self.q_out.qsize() > 0:
740                 buf.append(self.q_out.get())
741                 message = ''.join(buf)
742                 if self.VNF_PROMPT in message:
743                     LOG.info("%s VNF is up and running.", self.APP_NAME)
744                     self._vnf_up_post()
745                     self.queue_wrapper.clear()
746                     return self._vnf_process.exitcode
747
748                 if "PANIC" in message:
749                     raise RuntimeError("Error starting %s VNF." %
750                                        self.APP_NAME)
751
752             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
753             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
754             # Send ENTER to display a new prompt in case the prompt text was corrupted
755             # by other VNF output
756             self.q_in.put('\r\n')
757
758     def start_collect(self):
759         self.resource_helper.start_collect()
760
761     def stop_collect(self):
762         self.resource_helper.stop_collect()
763
764     def _build_run_kwargs(self):
765         self.run_kwargs = {
766             'stdin': self.queue_wrapper,
767             'stdout': self.queue_wrapper,
768             'keep_stdin_open': True,
769             'pty': True,
770             'timeout': self.scenario_helper.timeout,
771         }
772
773     def _build_config(self):
774         return self.setup_helper.build_config()
775
776     def _run(self):
777         # we can't share ssh paramiko objects to force new connection
778         self.ssh_helper.drop_connection()
779         cmd = self._build_config()
780         # kill before starting
781         self.setup_helper.kill_vnf()
782
783         LOG.debug(cmd)
784         self._build_run_kwargs()
785         self.ssh_helper.run(cmd, **self.run_kwargs)
786
787     def vnf_execute(self, cmd, wait_time=2):
788         """ send cmd to vnf process """
789
790         LOG.info("%s command: %s", self.APP_NAME, cmd)
791         self.q_in.put("{}\r\n".format(cmd))
792         time.sleep(wait_time)
793         output = []
794         while self.q_out.qsize() > 0:
795             output.append(self.q_out.get())
796         return "".join(output)
797
798     def _tear_down(self):
799         pass
800
801     def terminate(self):
802         self.vnf_execute("quit")
803         self.setup_helper.kill_vnf()
804         self._tear_down()
805         self.resource_helper.stop_collect()
806         if self._vnf_process is not None:
807             # be proper and join first before we kill
808             LOG.debug("joining before terminate %s", self._vnf_process.name)
809             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
810             self._vnf_process.terminate()
811         # no terminate children here because we share processes with tg
812
813     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
814         """Method for checking the statistics
815
816         This method could be overridden in children classes.
817
818         :return: VNF statistics
819         """
820         cmd = 'p {0} stats'.format(self.APP_WORD)
821         out = self.vnf_execute(cmd)
822         return out
823
824     def collect_kpi(self):
825         # we can't get KPIs if the VNF is down
826         check_if_process_failed(self._vnf_process, 0.01)
827         stats = self.get_stats()
828         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
829         physical_node = Context.get_physical_node_from_server(
830             self.scenario_helper.nodes[self.name])
831
832         result = {"physical_node": physical_node}
833         if m:
834             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
835             result["collect_stats"] = self.resource_helper.collect_kpi()
836         else:
837             result.update({"packets_in": 0,
838                            "packets_fwd": 0,
839                            "packets_dropped": 0})
840
841         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
842         return result
843
844     def scale(self, flavor=""):
845         """The SampleVNF base class doesn't provide the 'scale' feature"""
846         raise y_exceptions.FunctionNotImplemented(
847             function_name='scale', class_name='SampleVNFTrafficGen')
848
849
850 class SampleVNFTrafficGen(GenericTrafficGen):
851     """ Class providing file-like API for generic traffic generator """
852
853     APP_NAME = 'Sample'
854     RUN_WAIT = 1
855
856     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
857         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
858         self.bin_path = get_nsb_option('bin_path', '')
859
860         self.scenario_helper = ScenarioHelper(self.name)
861         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
862
863         if setup_env_helper_type is None:
864             setup_env_helper_type = SetupEnvHelper
865
866         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
867                                                   self.ssh_helper,
868                                                   self.scenario_helper)
869
870         if resource_helper_type is None:
871             resource_helper_type = ClientResourceHelper
872
873         self.resource_helper = resource_helper_type(self.setup_helper)
874
875         self.runs_traffic = True
876         self.traffic_finished = False
877         self._tg_process = None
878         self._traffic_process = None
879
880     def _start_server(self):
881         # we can't share ssh paramiko objects to force new connection
882         self.ssh_helper.drop_connection()
883
884     def instantiate(self, scenario_cfg, context_cfg):
885         self.scenario_helper.scenario_cfg = scenario_cfg
886         self.resource_helper.update_from_context(
887             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
888             self.scenario_helper.nodes[self.name]
889         )
890
891         self.resource_helper.context_cfg = context_cfg
892
893         self.resource_helper.setup()
894         # must generate_cfg after DPDK bind because we need port number
895         self.resource_helper.generate_cfg()
896
897         LOG.info("Starting %s server...", self.APP_NAME)
898         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
899         self._tg_process = Process(name=name, target=self._start_server)
900         self._tg_process.start()
901
902     def _check_status(self):
903         raise NotImplementedError
904
905     def _wait_for_process(self):
906         while True:
907             if not self._tg_process.is_alive():
908                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
909             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
910             time.sleep(1)
911             status = self._check_status()
912             if status == 0:
913                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
914                 return self._tg_process.exitcode
915
916     def _traffic_runner(self, traffic_profile):
917         # always drop connections first thing in new processes
918         # so we don't get paramiko errors
919         self.ssh_helper.drop_connection()
920         LOG.info("Starting %s client...", self.APP_NAME)
921         self.resource_helper.run_traffic(traffic_profile)
922
923     def run_traffic(self, traffic_profile):
924         """ Generate traffic on the wire according to the given params.
925         Method is non-blocking, returns immediately when traffic process
926         is running. Mandatory.
927
928         :param traffic_profile:
929         :return: True/False
930         """
931         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
932                                     os.getpid())
933         self._traffic_process = Process(name=name, target=self._traffic_runner,
934                                         args=(traffic_profile,))
935         self._traffic_process.start()
936         # Wait for traffic process to start
937         while self.resource_helper.client_started.value == 0:
938             time.sleep(self.RUN_WAIT)
939             # what if traffic process takes a few seconds to start?
940             if not self._traffic_process.is_alive():
941                 break
942
943         return self._traffic_process.is_alive()
944
945     def collect_kpi(self):
946         # check if the tg processes have exited
947         physical_node = Context.get_physical_node_from_server(
948             self.scenario_helper.nodes[self.name])
949
950         result = {"physical_node": physical_node}
951         for proc in (self._tg_process, self._traffic_process):
952             check_if_process_failed(proc)
953
954         result["collect_stats"] = self.resource_helper.collect_kpi()
955         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
956         return result
957
958     def terminate(self):
959         """ After this method finishes, all traffic processes should stop. Mandatory.
960
961         :return: True/False
962         """
963         self.traffic_finished = True
964         # we must kill client before we kill the server, or the client will raise exception
965         if self._traffic_process is not None:
966             # be proper and try to join before terminating
967             LOG.debug("joining before terminate %s", self._traffic_process.name)
968             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
969             self._traffic_process.terminate()
970         if self._tg_process is not None:
971             # be proper and try to join before terminating
972             LOG.debug("joining before terminate %s", self._tg_process.name)
973             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
974             self._tg_process.terminate()
975         # no terminate children here because we share processes with vnf
976
977     def scale(self, flavor=""):
978         """A traffic generator VFN doesn't provide the 'scale' feature"""
979         raise y_exceptions.FunctionNotImplemented(
980             function_name='scale', class_name='SampleVNFTrafficGen')