Merge "Bugfix: HA kill process recovery has a conflict"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17
18 import os
19 import posixpath
20 import re
21 import six
22 import subprocess
23 import time
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SetupEnvHelper(object):
48
49     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
52     PIPELINE_COMMAND = ''
53     VNF_TYPE = "SAMPLE"
54
55     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56         super(SetupEnvHelper, self).__init__()
57         self.vnfd_helper = vnfd_helper
58         self.ssh_helper = ssh_helper
59         self.scenario_helper = scenario_helper
60         self.collectd_options = {}
61
62     def build_config(self):
63         raise NotImplementedError
64
65     def setup_vnf_environment(self):
66         pass
67
68     def kill_vnf(self):
69         pass
70
71     def tear_down(self):
72         raise NotImplementedError
73
74
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
76
77     APP_NAME = 'DpdkVnf'
78     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
80
81     @staticmethod
82     def _update_packet_type(ip_pipeline_cfg, traffic_options):
83         match_str = 'pkt_type = ipv4'
84         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86         return pipeline_config_str
87
88     @classmethod
89     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90         traffic_type = traffic_options['traffic_type']
91
92         if traffic_options['vnf_type'] is not cls.APP_NAME:
93             match_str = 'traffic_type = 4'
94             replace_str = 'traffic_type = {0}'.format(traffic_type)
95
96         elif traffic_type == 4:
97             match_str = 'pkt_type = ipv4'
98             replace_str = 'pkt_type = ipv4'
99
100         else:
101             match_str = 'pkt_type = ipv4'
102             replace_str = 'pkt_type = ipv6'
103
104         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105         return pipeline_config_str
106
107     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109         self.all_ports = None
110         self.bound_pci = None
111         self.socket = None
112         self.used_drivers = None
113         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
114
115     def _setup_hugepages(self):
116         meminfo = utils.read_meminfo(self.ssh_helper)
117         hp_size_kb = int(meminfo['Hugepagesize'])
118         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
119         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
120         self.ssh_helper.execute('echo %s | sudo tee %s' %
121                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
122         hp = six.BytesIO()
123         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
124         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
125         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
126                  hp_size_kb, nr_hugepages, nr_hugepages_set)
127
128     def build_config(self):
129         vnf_cfg = self.scenario_helper.vnf_cfg
130         task_path = self.scenario_helper.task_path
131
132         config_file = vnf_cfg.get('file')
133         lb_count = vnf_cfg.get('lb_count', 3)
134         lb_config = vnf_cfg.get('lb_config', 'SW')
135         worker_config = vnf_cfg.get('worker_config', '1C/1T')
136         worker_threads = vnf_cfg.get('worker_threads', 3)
137
138         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
139         traffic_options = {
140             'traffic_type': traffic_type,
141             'pkt_type': 'ipv%s' % traffic_type,
142             'vnf_type': self.VNF_TYPE,
143         }
144
145         # read actions/rules from file
146         acl_options = None
147         acl_file_name = self.scenario_helper.options.get('rules')
148         if acl_file_name:
149             with utils.open_relative_file(acl_file_name, task_path) as infile:
150                 acl_options = yaml_loader.yaml_load(infile)
151
152         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
153                                                   task_path)
154         config_basename = posixpath.basename(self.CFG_CONFIG)
155         script_basename = posixpath.basename(self.CFG_SCRIPT)
156         multiport = MultiPortConfig(self.scenario_helper.topology,
157                                     config_tpl_cfg,
158                                     config_basename,
159                                     self.vnfd_helper,
160                                     self.VNF_TYPE,
161                                     lb_count,
162                                     worker_threads,
163                                     worker_config,
164                                     lb_config,
165                                     self.socket)
166
167         multiport.generate_config()
168         if config_file:
169             with utils.open_relative_file(config_file, task_path) as infile:
170                 new_config = ['[EAL]']
171                 vpci = []
172                 for port in self.vnfd_helper.port_pairs.all_ports:
173                     interface = self.vnfd_helper.find_interface(name=port)
174                     vpci.append(interface['virtual-interface']["vpci"])
175                 new_config.extend('w = {0}'.format(item) for item in vpci)
176                 new_config = '\n'.join(new_config) + '\n' + infile.read()
177         else:
178             with open(self.CFG_CONFIG) as handle:
179                 new_config = handle.read()
180             new_config = self._update_traffic_type(new_config, traffic_options)
181             new_config = self._update_packet_type(new_config, traffic_options)
182         self.ssh_helper.upload_config_file(config_basename, new_config)
183         self.ssh_helper.upload_config_file(script_basename,
184             multiport.generate_script(self.vnfd_helper,
185                                       self.get_flows_config(acl_options)))
186
187         LOG.info("Provision and start the %s", self.APP_NAME)
188         self._build_pipeline_kwargs()
189         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
190
191     def get_flows_config(self, options=None): # pylint: disable=unused-argument
192         """No actions/rules (flows) by default"""
193         return None
194
195     def _build_pipeline_kwargs(self):
196         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
197         # count the number of actual ports in the list of pairs
198         # remove duplicate ports
199         # this is really a mapping from LINK ID to DPDK PMD ID
200         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
201         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
202         ports = self.vnfd_helper.port_pairs.all_ports
203         port_nums = self.vnfd_helper.port_nums(ports)
204         # create mask from all the dpdk port numbers
205         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
206
207         vnf_cfg = self.scenario_helper.vnf_cfg
208         lb_config = vnf_cfg.get('lb_config', 'SW')
209         worker_threads = vnf_cfg.get('worker_threads', 3)
210         hwlb = ''
211         if lb_config == 'HW':
212             hwlb = ' --hwlb %s' % worker_threads
213
214         self.pipeline_kwargs = {
215             'cfg_file': self.CFG_CONFIG,
216             'script': self.CFG_SCRIPT,
217             'port_mask_hex': ports_mask_hex,
218             'tool_path': tool_path,
219             'hwlb': hwlb,
220         }
221
222     def setup_vnf_environment(self):
223         self._setup_dpdk()
224         self.kill_vnf()
225         # bind before _setup_resources so we can use dpdk_port_num
226         self._detect_and_bind_drivers()
227         resource = self._setup_resources()
228         return resource
229
230     def kill_vnf(self):
231         # pkill is not matching, debug with pgrep
232         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
233         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
234         # have to use exact match
235         # try using killall to match
236         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
237
238     def _setup_dpdk(self):
239         """Setup DPDK environment needed for VNF to run"""
240         self._setup_hugepages()
241         self.dpdk_bind_helper.load_dpdk_driver()
242
243         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
244         if exit_status == 0:
245             return
246
247     def _setup_resources(self):
248         # what is this magic?  how do we know which socket is for which port?
249         # what about quad-socket?
250         if any(v[5] == "0" for v in self.bound_pci):
251             self.socket = 0
252         else:
253             self.socket = 1
254
255         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
256         # this won't work because we don't have DPDK port numbers yet
257         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
258         port_names = (intf["name"] for intf in ports)
259         plugins = self.collectd_options.get("plugins", {})
260         interval = self.collectd_options.get("interval")
261         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
262         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
263                                plugins=plugins, interval=interval,
264                                timeout=self.scenario_helper.timeout)
265
266     def _check_interface_fields(self):
267         num_nodes = len(self.scenario_helper.nodes)
268         # OpenStack instance creation time is probably proportional to the number
269         # of instances
270         timeout = 120 * num_nodes
271         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
272                              self.ssh_helper, timeout)
273         dpdk_node.check()
274
275     def _detect_and_bind_drivers(self):
276         interfaces = self.vnfd_helper.interfaces
277
278         self._check_interface_fields()
279         # check for bound after probe
280         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
281
282         self.dpdk_bind_helper.read_status()
283         self.dpdk_bind_helper.save_used_drivers()
284
285         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
286
287         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
288         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
289             try:
290                 intf = next(v for v in interfaces
291                             if vpci == v['virtual-interface']['vpci'])
292                 # force to int
293                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
294             except:  # pylint: disable=bare-except
295                 pass
296         time.sleep(2)
297
298     def get_local_iface_name_by_vpci(self, vpci):
299         find_net_cmd = self.FIND_NET_CMD.format(vpci)
300         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
301         if exit_status == 0:
302             return stdout
303         return None
304
305     def tear_down(self):
306         self.dpdk_bind_helper.rebind_drivers()
307
308
309 class ResourceHelper(object):
310
311     COLLECT_KPI = ''
312     MAKE_INSTALL = 'cd {0} && make && sudo make install'
313     RESOURCE_WORD = 'sample'
314
315     COLLECT_MAP = {}
316
317     def __init__(self, setup_helper):
318         super(ResourceHelper, self).__init__()
319         self.resource = None
320         self.setup_helper = setup_helper
321         self.ssh_helper = setup_helper.ssh_helper
322
323     def setup(self):
324         self.resource = self.setup_helper.setup_vnf_environment()
325
326     def generate_cfg(self):
327         pass
328
329     def _collect_resource_kpi(self):
330         result = {}
331         status = self.resource.check_if_system_agent_running("collectd")[0]
332         if status == 0:
333             result = self.resource.amqp_collect_nfvi_kpi()
334
335         result = {"core": result}
336         return result
337
338     def start_collect(self):
339         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
340         self.resource.start()
341         self.resource.amqp_process_for_nfvi_kpi()
342
343     def stop_collect(self):
344         if self.resource:
345             self.resource.stop()
346
347     def collect_kpi(self):
348         return self._collect_resource_kpi()
349
350
351 class ClientResourceHelper(ResourceHelper):
352
353     RUN_DURATION = 60
354     QUEUE_WAIT_TIME = 5
355     SYNC_PORT = 1
356     ASYNC_PORT = 2
357
358     def __init__(self, setup_helper):
359         super(ClientResourceHelper, self).__init__(setup_helper)
360         self.vnfd_helper = setup_helper.vnfd_helper
361         self.scenario_helper = setup_helper.scenario_helper
362
363         self.client = None
364         self.client_started = Value('i', 0)
365         self.all_ports = None
366         self._queue = Queue()
367         self._result = {}
368         self._terminated = Value('i', 0)
369
370     def _build_ports(self):
371         self.networks = self.vnfd_helper.port_pairs.networks
372         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
373         self.downlink_ports = \
374             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
375         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
376
377     def port_num(self, intf):
378         # by default return port num
379         return self.vnfd_helper.port_num(intf)
380
381     def get_stats(self, *args, **kwargs):
382         try:
383             return self.client.get_stats(*args, **kwargs)
384         except STLError:
385             LOG.error('TRex client not connected')
386             return {}
387
388     def _get_samples(self, ports, port_pg_id=False):
389         raise NotImplementedError()
390
391     def _run_traffic_once(self, traffic_profile):
392         traffic_profile.execute_traffic(self)
393         self.client_started.value = 1
394         time.sleep(self.RUN_DURATION)
395         samples = self._get_samples(traffic_profile.ports)
396         time.sleep(self.QUEUE_WAIT_TIME)
397         self._queue.put(samples)
398
399     def run_traffic(self, traffic_profile):
400         # if we don't do this we can hang waiting for the queue to drain
401         # have to do this in the subprocess
402         self._queue.cancel_join_thread()
403         # fixme: fix passing correct trex config file,
404         # instead of searching the default path
405         try:
406             self._build_ports()
407             self.client = self._connect()
408             self.client.reset(ports=self.all_ports)
409             self.client.remove_all_streams(self.all_ports)  # remove all streams
410             traffic_profile.register_generator(self)
411
412             while self._terminated.value == 0:
413                 self._run_traffic_once(traffic_profile)
414
415             self.client.stop(self.all_ports)
416             self.client.disconnect()
417             self._terminated.value = 0
418         except STLError:
419             if self._terminated.value:
420                 LOG.debug("traffic generator is stopped")
421                 return  # return if trex/tg server is stopped.
422             raise
423
424     def terminate(self):
425         self._terminated.value = 1  # stop client
426
427     def clear_stats(self, ports=None):
428         if ports is None:
429             ports = self.all_ports
430         self.client.clear_stats(ports=ports)
431
432     def start(self, ports=None, *args, **kwargs):
433         # pylint: disable=keyword-arg-before-vararg
434         # NOTE(ralonsoh): defining keyworded arguments before variable
435         # positional arguments is a bug. This function definition doesn't work
436         # in Python 2, although it works in Python 3. Reference:
437         # https://www.python.org/dev/peps/pep-3102/
438         if ports is None:
439             ports = self.all_ports
440         self.client.start(ports=ports, *args, **kwargs)
441
442     def collect_kpi(self):
443         if not self._queue.empty():
444             kpi = self._queue.get()
445             self._result.update(kpi)
446             LOG.debug('Got KPIs from _queue for %s %s',
447                       self.scenario_helper.name, self.RESOURCE_WORD)
448         return self._result
449
450     def _connect(self, client=None):
451         if client is None:
452             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
453                                server=self.vnfd_helper.mgmt_interface["ip"],
454                                verbose_level=LoggerApi.VERBOSE_QUIET)
455
456         # try to connect with 5s intervals, 30s max
457         for idx in range(6):
458             try:
459                 client.connect()
460                 break
461             except STLError:
462                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
463                 time.sleep(5)
464         return client
465
466
467 class Rfc2544ResourceHelper(object):
468
469     DEFAULT_CORRELATED_TRAFFIC = False
470     DEFAULT_LATENCY = False
471     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
472
473     def __init__(self, scenario_helper):
474         super(Rfc2544ResourceHelper, self).__init__()
475         self.scenario_helper = scenario_helper
476         self._correlated_traffic = None
477         self.iteration = Value('i', 0)
478         self._latency = None
479         self._rfc2544 = None
480         self._tolerance_low = None
481         self._tolerance_high = None
482
483     @property
484     def rfc2544(self):
485         if self._rfc2544 is None:
486             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
487         return self._rfc2544
488
489     @property
490     def tolerance_low(self):
491         if self._tolerance_low is None:
492             self.get_rfc_tolerance()
493         return self._tolerance_low
494
495     @property
496     def tolerance_high(self):
497         if self._tolerance_high is None:
498             self.get_rfc_tolerance()
499         return self._tolerance_high
500
501     @property
502     def correlated_traffic(self):
503         if self._correlated_traffic is None:
504             self._correlated_traffic = \
505                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
506
507         return self._correlated_traffic
508
509     @property
510     def latency(self):
511         if self._latency is None:
512             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
513         return self._latency
514
515     def get_rfc2544(self, name, default=None):
516         return self.rfc2544.get(name, default)
517
518     def get_rfc_tolerance(self):
519         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
520         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
521         self._tolerance_low = next(tolerance_iter)
522         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
523
524
525 class SampleVNFDeployHelper(object):
526
527     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
528     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
529     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
530
531     def __init__(self, vnfd_helper, ssh_helper):
532         super(SampleVNFDeployHelper, self).__init__()
533         self.ssh_helper = ssh_helper
534         self.vnfd_helper = vnfd_helper
535
536     def deploy_vnfs(self, app_name):
537         vnf_bin = self.ssh_helper.join_bin_path(app_name)
538         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
539         if not exit_status:
540             return
541
542         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
543         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
544         time.sleep(2)
545         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
546         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
547
548         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
549         time.sleep(2)
550         http_proxy = os.environ.get('http_proxy', '')
551         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
552         LOG.debug(cmd)
553         self.ssh_helper.execute(cmd)
554         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
555         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
556         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
557
558
559 class ScenarioHelper(object):
560
561     DEFAULT_VNF_CFG = {
562         'lb_config': 'SW',
563         'lb_count': 1,
564         'worker_config': '1C/1T',
565         'worker_threads': 1,
566     }
567
568     def __init__(self, name):
569         self.name = name
570         self.scenario_cfg = None
571
572     @property
573     def task_path(self):
574         return self.scenario_cfg['task_path']
575
576     @property
577     def nodes(self):
578         return self.scenario_cfg.get('nodes')
579
580     @property
581     def all_options(self):
582         return self.scenario_cfg.get('options', {})
583
584     @property
585     def options(self):
586         return self.all_options.get(self.name, {})
587
588     @property
589     def vnf_cfg(self):
590         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
591
592     @property
593     def topology(self):
594         return self.scenario_cfg['topology']
595
596     @property
597     def timeout(self):
598         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
599             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
600         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
601         return test_duration if test_duration > test_timeout else test_timeout
602
603 class SampleVNF(GenericVNF):
604     """ Class providing file-like API for generic VNF implementation """
605
606     VNF_PROMPT = "pipeline>"
607     WAIT_TIME = 1
608     WAIT_TIME_FOR_SCRIPT = 10
609     APP_NAME = "SampleVNF"
610     # we run the VNF interactively, so the ssh command will timeout after this long
611
612     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
613         super(SampleVNF, self).__init__(name, vnfd)
614         self.bin_path = get_nsb_option('bin_path', '')
615
616         self.scenario_helper = ScenarioHelper(self.name)
617         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
618
619         if setup_env_helper_type is None:
620             setup_env_helper_type = SetupEnvHelper
621
622         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
623                                                   self.ssh_helper,
624                                                   self.scenario_helper)
625
626         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
627
628         if resource_helper_type is None:
629             resource_helper_type = ResourceHelper
630
631         self.resource_helper = resource_helper_type(self.setup_helper)
632
633         self.context_cfg = None
634         self.nfvi_context = None
635         self.pipeline_kwargs = {}
636         self.uplink_ports = None
637         self.downlink_ports = None
638         # NOTE(esm): make QueueFileWrapper invert-able so that we
639         #            never have to manage the queues
640         self.q_in = Queue()
641         self.q_out = Queue()
642         self.queue_wrapper = None
643         self.run_kwargs = {}
644         self.used_drivers = {}
645         self.vnf_port_pairs = None
646         self._vnf_process = None
647
648     def _start_vnf(self):
649         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
650         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
651         self._vnf_process = Process(name=name, target=self._run)
652         self._vnf_process.start()
653
654     def _vnf_up_post(self):
655         pass
656
657     def instantiate(self, scenario_cfg, context_cfg):
658         self._update_collectd_options(scenario_cfg, context_cfg)
659         self.scenario_helper.scenario_cfg = scenario_cfg
660         self.context_cfg = context_cfg
661         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
662         # self.nfvi_context = None
663
664         # vnf deploy is unsupported, use ansible playbooks
665         if self.scenario_helper.options.get("vnf_deploy", False):
666             self.deploy_helper.deploy_vnfs(self.APP_NAME)
667         self.resource_helper.setup()
668         self._start_vnf()
669
670     def _update_collectd_options(self, scenario_cfg, context_cfg):
671         """Update collectd configuration options
672         This function retrieves all collectd options contained in the test case
673
674         definition builds a single dictionary combining them. The following fragment
675         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
676         ---
677         schema: yardstick:task:0.1
678         scenarios:
679         - type: NSPerf
680           nodes:
681             tg__0: trafficgen_1.yardstick
682             vnf__0: vnf.yardstick
683           options:
684             collectd:
685               <options>  # COLLECTD priority 3
686             vnf__0:
687               collectd:
688                 plugins:
689                     load
690                 <options> # COLLECTD priority 2
691         context:
692           type: Node
693           name: yardstick
694           nfvi_type: baremetal
695           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
696         """
697         scenario_options = scenario_cfg.get('options', {})
698         generic_options = scenario_options.get('collectd', {})
699         scenario_node_options = scenario_options.get(self.name, {})\
700             .get('collectd', {})
701         context_node_options = context_cfg.get('nodes', {})\
702             .get(self.name, {}).get('collectd', {})
703
704         options = generic_options
705         self._update_options(options, scenario_node_options)
706         self._update_options(options, context_node_options)
707
708         self.setup_helper.collectd_options = options
709
710     def _update_options(self, options, additional_options):
711         """Update collectd options and plugins dictionary"""
712         for k, v in additional_options.items():
713             if isinstance(v, dict) and k in options:
714                 options[k].update(v)
715             else:
716                 options[k] = v
717
718     def wait_for_instantiate(self):
719         buf = []
720         time.sleep(self.WAIT_TIME)  # Give some time for config to load
721         while True:
722             if not self._vnf_process.is_alive():
723                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
724
725             # NOTE(esm): move to QueueFileWrapper
726             while self.q_out.qsize() > 0:
727                 buf.append(self.q_out.get())
728                 message = ''.join(buf)
729                 if self.VNF_PROMPT in message:
730                     LOG.info("%s VNF is up and running.", self.APP_NAME)
731                     self._vnf_up_post()
732                     self.queue_wrapper.clear()
733                     return self._vnf_process.exitcode
734
735                 if "PANIC" in message:
736                     raise RuntimeError("Error starting %s VNF." %
737                                        self.APP_NAME)
738
739             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
740             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
741             # Send ENTER to display a new prompt in case the prompt text was corrupted
742             # by other VNF output
743             self.q_in.put('\r\n')
744
745     def start_collect(self):
746         self.resource_helper.start_collect()
747
748     def stop_collect(self):
749         self.resource_helper.stop_collect()
750
751     def _build_run_kwargs(self):
752         self.run_kwargs = {
753             'stdin': self.queue_wrapper,
754             'stdout': self.queue_wrapper,
755             'keep_stdin_open': True,
756             'pty': True,
757             'timeout': self.scenario_helper.timeout,
758         }
759
760     def _build_config(self):
761         return self.setup_helper.build_config()
762
763     def _run(self):
764         # we can't share ssh paramiko objects to force new connection
765         self.ssh_helper.drop_connection()
766         cmd = self._build_config()
767         # kill before starting
768         self.setup_helper.kill_vnf()
769
770         LOG.debug(cmd)
771         self._build_run_kwargs()
772         self.ssh_helper.run(cmd, **self.run_kwargs)
773
774     def vnf_execute(self, cmd, wait_time=2):
775         """ send cmd to vnf process """
776
777         LOG.info("%s command: %s", self.APP_NAME, cmd)
778         self.q_in.put("{}\r\n".format(cmd))
779         time.sleep(wait_time)
780         output = []
781         while self.q_out.qsize() > 0:
782             output.append(self.q_out.get())
783         return "".join(output)
784
785     def _tear_down(self):
786         pass
787
788     def terminate(self):
789         self.vnf_execute("quit")
790         self.setup_helper.kill_vnf()
791         self._tear_down()
792         self.resource_helper.stop_collect()
793         if self._vnf_process is not None:
794             # be proper and join first before we kill
795             LOG.debug("joining before terminate %s", self._vnf_process.name)
796             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
797             self._vnf_process.terminate()
798         # no terminate children here because we share processes with tg
799
800     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
801         """Method for checking the statistics
802
803         This method could be overridden in children classes.
804
805         :return: VNF statistics
806         """
807         cmd = 'p {0} stats'.format(self.APP_WORD)
808         out = self.vnf_execute(cmd)
809         return out
810
811     def collect_kpi(self):
812         # we can't get KPIs if the VNF is down
813         check_if_process_failed(self._vnf_process, 0.01)
814         stats = self.get_stats()
815         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
816         if m:
817             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
818             result["collect_stats"] = self.resource_helper.collect_kpi()
819         else:
820             result = {
821                 "packets_in": 0,
822                 "packets_fwd": 0,
823                 "packets_dropped": 0,
824             }
825         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
826         return result
827
828     def scale(self, flavor=""):
829         """The SampleVNF base class doesn't provide the 'scale' feature"""
830         raise y_exceptions.FunctionNotImplemented(
831             function_name='scale', class_name='SampleVNFTrafficGen')
832
833
834 class SampleVNFTrafficGen(GenericTrafficGen):
835     """ Class providing file-like API for generic traffic generator """
836
837     APP_NAME = 'Sample'
838     RUN_WAIT = 1
839
840     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
841         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
842         self.bin_path = get_nsb_option('bin_path', '')
843
844         self.scenario_helper = ScenarioHelper(self.name)
845         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
846
847         if setup_env_helper_type is None:
848             setup_env_helper_type = SetupEnvHelper
849
850         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
851                                                   self.ssh_helper,
852                                                   self.scenario_helper)
853
854         if resource_helper_type is None:
855             resource_helper_type = ClientResourceHelper
856
857         self.resource_helper = resource_helper_type(self.setup_helper)
858
859         self.runs_traffic = True
860         self.traffic_finished = False
861         self._tg_process = None
862         self._traffic_process = None
863
864     def _start_server(self):
865         # we can't share ssh paramiko objects to force new connection
866         self.ssh_helper.drop_connection()
867
868     def instantiate(self, scenario_cfg, context_cfg):
869         self.scenario_helper.scenario_cfg = scenario_cfg
870         self.resource_helper.setup()
871         # must generate_cfg after DPDK bind because we need port number
872         self.resource_helper.generate_cfg()
873
874         LOG.info("Starting %s server...", self.APP_NAME)
875         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
876         self._tg_process = Process(name=name, target=self._start_server)
877         self._tg_process.start()
878
879     def _check_status(self):
880         raise NotImplementedError
881
882     def _wait_for_process(self):
883         while True:
884             if not self._tg_process.is_alive():
885                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
886             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
887             time.sleep(1)
888             status = self._check_status()
889             if status == 0:
890                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
891                 return self._tg_process.exitcode
892
893     def _traffic_runner(self, traffic_profile):
894         # always drop connections first thing in new processes
895         # so we don't get paramiko errors
896         self.ssh_helper.drop_connection()
897         LOG.info("Starting %s client...", self.APP_NAME)
898         self.resource_helper.run_traffic(traffic_profile)
899
900     def run_traffic(self, traffic_profile):
901         """ Generate traffic on the wire according to the given params.
902         Method is non-blocking, returns immediately when traffic process
903         is running. Mandatory.
904
905         :param traffic_profile:
906         :return: True/False
907         """
908         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
909                                     os.getpid())
910         self._traffic_process = Process(name=name, target=self._traffic_runner,
911                                         args=(traffic_profile,))
912         self._traffic_process.start()
913         # Wait for traffic process to start
914         while self.resource_helper.client_started.value == 0:
915             time.sleep(self.RUN_WAIT)
916             # what if traffic process takes a few seconds to start?
917             if not self._traffic_process.is_alive():
918                 break
919
920         return self._traffic_process.is_alive()
921
922     def collect_kpi(self):
923         # check if the tg processes have exited
924         for proc in (self._tg_process, self._traffic_process):
925             check_if_process_failed(proc)
926         result = self.resource_helper.collect_kpi()
927         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
928         return result
929
930     def terminate(self):
931         """ After this method finishes, all traffic processes should stop. Mandatory.
932
933         :return: True/False
934         """
935         self.traffic_finished = True
936         # we must kill client before we kill the server, or the client will raise exception
937         if self._traffic_process is not None:
938             # be proper and try to join before terminating
939             LOG.debug("joining before terminate %s", self._traffic_process.name)
940             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
941             self._traffic_process.terminate()
942         if self._tg_process is not None:
943             # be proper and try to join before terminating
944             LOG.debug("joining before terminate %s", self._tg_process.name)
945             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
946             self._tg_process.terminate()
947         # no terminate children here because we share processes with vnf
948
949     def scale(self, flavor=""):
950         """A traffic generator VFN doesn't provide the 'scale' feature"""
951         raise y_exceptions.FunctionNotImplemented(
952             function_name='scale', class_name='SampleVNFTrafficGen')