Enable traffic generator PID in "NSPerf" scenario setup
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17 import os
18 import posixpath
19 import re
20 import uuid
21 import subprocess
22 import time
23
24 import six
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.common import exceptions as y_exceptions
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.common import utils
33 from yardstick.common import yaml_loader
34 from yardstick.network_services import constants
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
36 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.utils import get_nsb_option
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
42 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
43 from yardstick.benchmark.contexts.node import NodeContext
44
45 LOG = logging.getLogger(__name__)
46
47
48 class SetupEnvHelper(object):
49
50     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
51     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
52     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
53     PIPELINE_COMMAND = ''
54     VNF_TYPE = "SAMPLE"
55
56     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
57         super(SetupEnvHelper, self).__init__()
58         self.vnfd_helper = vnfd_helper
59         self.ssh_helper = ssh_helper
60         self.scenario_helper = scenario_helper
61         self.collectd_options = {}
62
63     def build_config(self):
64         raise NotImplementedError
65
66     def setup_vnf_environment(self):
67         pass
68
69     def kill_vnf(self):
70         pass
71
72     def tear_down(self):
73         raise NotImplementedError
74
75
76 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
77
78     APP_NAME = 'DpdkVnf'
79     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
80     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
81
82     @staticmethod
83     def _update_packet_type(ip_pipeline_cfg, traffic_options):
84         match_str = 'pkt_type = ipv4'
85         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
86         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
87         return pipeline_config_str
88
89     @classmethod
90     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
91         traffic_type = traffic_options['traffic_type']
92
93         if traffic_options['vnf_type'] is not cls.APP_NAME:
94             match_str = 'traffic_type = 4'
95             replace_str = 'traffic_type = {0}'.format(traffic_type)
96
97         elif traffic_type == 4:
98             match_str = 'pkt_type = ipv4'
99             replace_str = 'pkt_type = ipv4'
100
101         else:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv6'
104
105         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
106         return pipeline_config_str
107
108     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
109         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
110         self.all_ports = None
111         self.bound_pci = None
112         self.socket = None
113         self.used_drivers = None
114         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115
116     def _setup_hugepages(self):
117         meminfo = utils.read_meminfo(self.ssh_helper)
118         hp_size_kb = int(meminfo['Hugepagesize'])
119         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
120         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
121         self.ssh_helper.execute('echo %s | sudo tee %s' %
122                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
123         hp = six.BytesIO()
124         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
125         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
126         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
127                  hp_size_kb, nr_hugepages, nr_hugepages_set)
128
129     def build_config(self):
130         vnf_cfg = self.scenario_helper.vnf_cfg
131         task_path = self.scenario_helper.task_path
132
133         config_file = vnf_cfg.get('file')
134         lb_count = vnf_cfg.get('lb_count', 3)
135         lb_config = vnf_cfg.get('lb_config', 'SW')
136         worker_config = vnf_cfg.get('worker_config', '1C/1T')
137         worker_threads = vnf_cfg.get('worker_threads', 3)
138
139         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
140         traffic_options = {
141             'traffic_type': traffic_type,
142             'pkt_type': 'ipv%s' % traffic_type,
143             'vnf_type': self.VNF_TYPE,
144         }
145
146         # read actions/rules from file
147         acl_options = None
148         acl_file_name = self.scenario_helper.options.get('rules')
149         if acl_file_name:
150             with utils.open_relative_file(acl_file_name, task_path) as infile:
151                 acl_options = yaml_loader.yaml_load(infile)
152
153         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
154                                                   task_path)
155         config_basename = posixpath.basename(self.CFG_CONFIG)
156         script_basename = posixpath.basename(self.CFG_SCRIPT)
157         multiport = MultiPortConfig(self.scenario_helper.topology,
158                                     config_tpl_cfg,
159                                     config_basename,
160                                     self.vnfd_helper,
161                                     self.VNF_TYPE,
162                                     lb_count,
163                                     worker_threads,
164                                     worker_config,
165                                     lb_config,
166                                     self.socket)
167
168         multiport.generate_config()
169         if config_file:
170             with utils.open_relative_file(config_file, task_path) as infile:
171                 new_config = ['[EAL]']
172                 vpci = []
173                 for port in self.vnfd_helper.port_pairs.all_ports:
174                     interface = self.vnfd_helper.find_interface(name=port)
175                     vpci.append(interface['virtual-interface']["vpci"])
176                 new_config.extend('w = {0}'.format(item) for item in vpci)
177                 new_config = '\n'.join(new_config) + '\n' + infile.read()
178         else:
179             with open(self.CFG_CONFIG) as handle:
180                 new_config = handle.read()
181             new_config = self._update_traffic_type(new_config, traffic_options)
182             new_config = self._update_packet_type(new_config, traffic_options)
183         self.ssh_helper.upload_config_file(config_basename, new_config)
184         self.ssh_helper.upload_config_file(script_basename,
185             multiport.generate_script(self.vnfd_helper,
186                                       self.get_flows_config(acl_options)))
187
188         LOG.info("Provision and start the %s", self.APP_NAME)
189         self._build_pipeline_kwargs()
190         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
191
192     def get_flows_config(self, options=None): # pylint: disable=unused-argument
193         """No actions/rules (flows) by default"""
194         return None
195
196     def _build_pipeline_kwargs(self):
197         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
198         # count the number of actual ports in the list of pairs
199         # remove duplicate ports
200         # this is really a mapping from LINK ID to DPDK PMD ID
201         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
202         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
203         ports = self.vnfd_helper.port_pairs.all_ports
204         port_nums = self.vnfd_helper.port_nums(ports)
205         # create mask from all the dpdk port numbers
206         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
207
208         vnf_cfg = self.scenario_helper.vnf_cfg
209         lb_config = vnf_cfg.get('lb_config', 'SW')
210         worker_threads = vnf_cfg.get('worker_threads', 3)
211         hwlb = ''
212         if lb_config == 'HW':
213             hwlb = ' --hwlb %s' % worker_threads
214
215         self.pipeline_kwargs = {
216             'cfg_file': self.CFG_CONFIG,
217             'script': self.CFG_SCRIPT,
218             'port_mask_hex': ports_mask_hex,
219             'tool_path': tool_path,
220             'hwlb': hwlb,
221         }
222
223     def setup_vnf_environment(self):
224         self._setup_dpdk()
225         self.kill_vnf()
226         # bind before _setup_resources so we can use dpdk_port_num
227         self._detect_and_bind_drivers()
228         resource = self._setup_resources()
229         return resource
230
231     def kill_vnf(self):
232         # pkill is not matching, debug with pgrep
233         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
234         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
235         # have to use exact match
236         # try using killall to match
237         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
238
239     def _setup_dpdk(self):
240         """Setup DPDK environment needed for VNF to run"""
241         self._setup_hugepages()
242         self.dpdk_bind_helper.load_dpdk_driver()
243
244         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
245         if exit_status == 0:
246             return
247
248     def _setup_resources(self):
249         # what is this magic?  how do we know which socket is for which port?
250         # what about quad-socket?
251         if any(v[5] == "0" for v in self.bound_pci):
252             self.socket = 0
253         else:
254             self.socket = 1
255
256         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
257         # this won't work because we don't have DPDK port numbers yet
258         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
259         port_names = (intf["name"] for intf in ports)
260         plugins = self.collectd_options.get("plugins", {})
261         interval = self.collectd_options.get("interval")
262         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
263         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
264                                plugins=plugins, interval=interval,
265                                timeout=self.scenario_helper.timeout)
266
267     def _check_interface_fields(self):
268         num_nodes = len(self.scenario_helper.nodes)
269         # OpenStack instance creation time is probably proportional to the number
270         # of instances
271         timeout = 120 * num_nodes
272         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
273                              self.ssh_helper, timeout)
274         dpdk_node.check()
275
276     def _detect_and_bind_drivers(self):
277         interfaces = self.vnfd_helper.interfaces
278
279         self._check_interface_fields()
280         # check for bound after probe
281         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
282
283         self.dpdk_bind_helper.read_status()
284         self.dpdk_bind_helper.save_used_drivers()
285
286         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
287
288         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
289         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
290             try:
291                 intf = next(v for v in interfaces
292                             if vpci == v['virtual-interface']['vpci'])
293                 # force to int
294                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
295             except:  # pylint: disable=bare-except
296                 pass
297         time.sleep(2)
298
299     def get_local_iface_name_by_vpci(self, vpci):
300         find_net_cmd = self.FIND_NET_CMD.format(vpci)
301         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
302         if exit_status == 0:
303             return stdout
304         return None
305
306     def tear_down(self):
307         self.dpdk_bind_helper.rebind_drivers()
308
309
310 class ResourceHelper(object):
311
312     COLLECT_KPI = ''
313     MAKE_INSTALL = 'cd {0} && make && sudo make install'
314     RESOURCE_WORD = 'sample'
315
316     COLLECT_MAP = {}
317
318     def __init__(self, setup_helper):
319         super(ResourceHelper, self).__init__()
320         self.resource = None
321         self.setup_helper = setup_helper
322         self.ssh_helper = setup_helper.ssh_helper
323         self._enable = True
324
325     def setup(self):
326         self.resource = self.setup_helper.setup_vnf_environment()
327
328     def generate_cfg(self):
329         pass
330
331     def update_from_context(self, context, attr_name):
332         """Disable resource helper in case of baremetal context.
333
334         And update appropriate node collectd options in context
335         """
336         if isinstance(context, NodeContext):
337             self._enable = False
338             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
339                                                      attr_name)
340
341     def _collect_resource_kpi(self):
342         result = {}
343         status = self.resource.check_if_system_agent_running("collectd")[0]
344         if status == 0 and self._enable:
345             result = self.resource.amqp_collect_nfvi_kpi()
346
347         result = {"core": result}
348         return result
349
350     def start_collect(self):
351         if self._enable:
352             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
353             self.resource.start()
354             self.resource.amqp_process_for_nfvi_kpi()
355
356     def stop_collect(self):
357         if self.resource and self._enable:
358             self.resource.stop()
359
360     def collect_kpi(self):
361         return self._collect_resource_kpi()
362
363
364 class ClientResourceHelper(ResourceHelper):
365
366     RUN_DURATION = 60
367     QUEUE_WAIT_TIME = 5
368     SYNC_PORT = 1
369     ASYNC_PORT = 2
370
371     def __init__(self, setup_helper):
372         super(ClientResourceHelper, self).__init__(setup_helper)
373         self.vnfd_helper = setup_helper.vnfd_helper
374         self.scenario_helper = setup_helper.scenario_helper
375
376         self.client = None
377         self.client_started = Value('i', 0)
378         self.all_ports = None
379         self._queue = Queue()
380         self._result = {}
381         self._terminated = Value('i', 0)
382
383     def _build_ports(self):
384         self.networks = self.vnfd_helper.port_pairs.networks
385         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
386         self.downlink_ports = \
387             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
388         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
389
390     def port_num(self, intf):
391         # by default return port num
392         return self.vnfd_helper.port_num(intf)
393
394     def get_stats(self, *args, **kwargs):
395         try:
396             return self.client.get_stats(*args, **kwargs)
397         except STLError:
398             LOG.error('TRex client not connected')
399             return {}
400
401     def _get_samples(self, ports, port_pg_id=False):
402         raise NotImplementedError()
403
404     def _run_traffic_once(self, traffic_profile):
405         traffic_profile.execute_traffic(self)
406         self.client_started.value = 1
407         time.sleep(self.RUN_DURATION)
408         samples = self._get_samples(traffic_profile.ports)
409         time.sleep(self.QUEUE_WAIT_TIME)
410         self._queue.put(samples)
411
412     def run_traffic(self, traffic_profile, mq_producer):
413         # if we don't do this we can hang waiting for the queue to drain
414         # have to do this in the subprocess
415         self._queue.cancel_join_thread()
416         # fixme: fix passing correct trex config file,
417         # instead of searching the default path
418         mq_producer.tg_method_started()
419         try:
420             self._build_ports()
421             self.client = self._connect()
422             self.client.reset(ports=self.all_ports)
423             self.client.remove_all_streams(self.all_ports)  # remove all streams
424             traffic_profile.register_generator(self)
425
426             iteration_index = 0
427             while self._terminated.value == 0:
428                 iteration_index += 1
429                 self._run_traffic_once(traffic_profile)
430                 mq_producer.tg_method_iteration(iteration_index)
431
432             self.client.stop(self.all_ports)
433             self.client.disconnect()
434             self._terminated.value = 0
435         except STLError:
436             if self._terminated.value:
437                 LOG.debug("traffic generator is stopped")
438                 return  # return if trex/tg server is stopped.
439             raise
440
441         mq_producer.tg_method_finished()
442
443     def terminate(self):
444         self._terminated.value = 1  # stop client
445
446     def clear_stats(self, ports=None):
447         if ports is None:
448             ports = self.all_ports
449         self.client.clear_stats(ports=ports)
450
451     def start(self, ports=None, *args, **kwargs):
452         # pylint: disable=keyword-arg-before-vararg
453         # NOTE(ralonsoh): defining keyworded arguments before variable
454         # positional arguments is a bug. This function definition doesn't work
455         # in Python 2, although it works in Python 3. Reference:
456         # https://www.python.org/dev/peps/pep-3102/
457         if ports is None:
458             ports = self.all_ports
459         self.client.start(ports=ports, *args, **kwargs)
460
461     def collect_kpi(self):
462         if not self._queue.empty():
463             kpi = self._queue.get()
464             self._result.update(kpi)
465             LOG.debug('Got KPIs from _queue for %s %s',
466                       self.scenario_helper.name, self.RESOURCE_WORD)
467         return self._result
468
469     def _connect(self, client=None):
470         if client is None:
471             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
472                                server=self.vnfd_helper.mgmt_interface["ip"],
473                                verbose_level=LoggerApi.VERBOSE_QUIET)
474
475         # try to connect with 5s intervals, 30s max
476         for idx in range(6):
477             try:
478                 client.connect()
479                 break
480             except STLError:
481                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
482                 time.sleep(5)
483         return client
484
485
486 class Rfc2544ResourceHelper(object):
487
488     DEFAULT_CORRELATED_TRAFFIC = False
489     DEFAULT_LATENCY = False
490     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
491
492     def __init__(self, scenario_helper):
493         super(Rfc2544ResourceHelper, self).__init__()
494         self.scenario_helper = scenario_helper
495         self._correlated_traffic = None
496         self.iteration = Value('i', 0)
497         self._latency = None
498         self._rfc2544 = None
499         self._tolerance_low = None
500         self._tolerance_high = None
501
502     @property
503     def rfc2544(self):
504         if self._rfc2544 is None:
505             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
506         return self._rfc2544
507
508     @property
509     def tolerance_low(self):
510         if self._tolerance_low is None:
511             self.get_rfc_tolerance()
512         return self._tolerance_low
513
514     @property
515     def tolerance_high(self):
516         if self._tolerance_high is None:
517             self.get_rfc_tolerance()
518         return self._tolerance_high
519
520     @property
521     def correlated_traffic(self):
522         if self._correlated_traffic is None:
523             self._correlated_traffic = \
524                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
525
526         return self._correlated_traffic
527
528     @property
529     def latency(self):
530         if self._latency is None:
531             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
532         return self._latency
533
534     def get_rfc2544(self, name, default=None):
535         return self.rfc2544.get(name, default)
536
537     def get_rfc_tolerance(self):
538         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
539         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
540         self._tolerance_low = next(tolerance_iter)
541         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
542
543
544 class SampleVNFDeployHelper(object):
545
546     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
547     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
548     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
549
550     def __init__(self, vnfd_helper, ssh_helper):
551         super(SampleVNFDeployHelper, self).__init__()
552         self.ssh_helper = ssh_helper
553         self.vnfd_helper = vnfd_helper
554
555     def deploy_vnfs(self, app_name):
556         vnf_bin = self.ssh_helper.join_bin_path(app_name)
557         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
558         if not exit_status:
559             return
560
561         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
562         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
563         time.sleep(2)
564         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
565         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
566
567         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
568         time.sleep(2)
569         http_proxy = os.environ.get('http_proxy', '')
570         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
571         LOG.debug(cmd)
572         self.ssh_helper.execute(cmd)
573         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
574         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
575         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
576
577
578 class ScenarioHelper(object):
579
580     DEFAULT_VNF_CFG = {
581         'lb_config': 'SW',
582         'lb_count': 1,
583         'worker_config': '1C/1T',
584         'worker_threads': 1,
585     }
586
587     def __init__(self, name):
588         self.name = name
589         self.scenario_cfg = None
590
591     @property
592     def task_path(self):
593         return self.scenario_cfg['task_path']
594
595     @property
596     def nodes(self):
597         return self.scenario_cfg.get('nodes')
598
599     @property
600     def all_options(self):
601         return self.scenario_cfg.get('options', {})
602
603     @property
604     def options(self):
605         return self.all_options.get(self.name, {})
606
607     @property
608     def vnf_cfg(self):
609         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
610
611     @property
612     def topology(self):
613         return self.scenario_cfg['topology']
614
615     @property
616     def timeout(self):
617         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
618             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
619         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
620         return test_duration if test_duration > test_timeout else test_timeout
621
622 class SampleVNF(GenericVNF):
623     """ Class providing file-like API for generic VNF implementation """
624
625     VNF_PROMPT = "pipeline>"
626     WAIT_TIME = 1
627     WAIT_TIME_FOR_SCRIPT = 10
628     APP_NAME = "SampleVNF"
629     # we run the VNF interactively, so the ssh command will timeout after this long
630
631     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
632         super(SampleVNF, self).__init__(name, vnfd)
633         self.bin_path = get_nsb_option('bin_path', '')
634
635         self.scenario_helper = ScenarioHelper(self.name)
636         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
637
638         if setup_env_helper_type is None:
639             setup_env_helper_type = SetupEnvHelper
640
641         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
642                                                   self.ssh_helper,
643                                                   self.scenario_helper)
644
645         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
646
647         if resource_helper_type is None:
648             resource_helper_type = ResourceHelper
649
650         self.resource_helper = resource_helper_type(self.setup_helper)
651
652         self.context_cfg = None
653         self.pipeline_kwargs = {}
654         self.uplink_ports = None
655         self.downlink_ports = None
656         # NOTE(esm): make QueueFileWrapper invert-able so that we
657         #            never have to manage the queues
658         self.q_in = Queue()
659         self.q_out = Queue()
660         self.queue_wrapper = None
661         self.run_kwargs = {}
662         self.used_drivers = {}
663         self.vnf_port_pairs = None
664         self._vnf_process = None
665
666     def _start_vnf(self):
667         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
668         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
669         self._vnf_process = Process(name=name, target=self._run)
670         self._vnf_process.start()
671
672     def _vnf_up_post(self):
673         pass
674
675     def instantiate(self, scenario_cfg, context_cfg):
676         self._update_collectd_options(scenario_cfg, context_cfg)
677         self.scenario_helper.scenario_cfg = scenario_cfg
678         self.context_cfg = context_cfg
679         self.resource_helper.update_from_context(
680             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
681             self.scenario_helper.nodes[self.name]
682         )
683
684         # vnf deploy is unsupported, use ansible playbooks
685         if self.scenario_helper.options.get("vnf_deploy", False):
686             self.deploy_helper.deploy_vnfs(self.APP_NAME)
687         self.resource_helper.setup()
688         self._start_vnf()
689
690     def _update_collectd_options(self, scenario_cfg, context_cfg):
691         """Update collectd configuration options
692         This function retrieves all collectd options contained in the test case
693
694         definition builds a single dictionary combining them. The following fragment
695         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
696         ---
697         schema: yardstick:task:0.1
698         scenarios:
699         - type: NSPerf
700           nodes:
701             tg__0: trafficgen_1.yardstick
702             vnf__0: vnf.yardstick
703           options:
704             collectd:
705               <options>  # COLLECTD priority 3
706             vnf__0:
707               collectd:
708                 plugins:
709                     load
710                 <options> # COLLECTD priority 2
711         context:
712           type: Node
713           name: yardstick
714           nfvi_type: baremetal
715           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
716         """
717         scenario_options = scenario_cfg.get('options', {})
718         generic_options = scenario_options.get('collectd', {})
719         scenario_node_options = scenario_options.get(self.name, {})\
720             .get('collectd', {})
721         context_node_options = context_cfg.get('nodes', {})\
722             .get(self.name, {}).get('collectd', {})
723
724         options = generic_options
725         self._update_options(options, scenario_node_options)
726         self._update_options(options, context_node_options)
727
728         self.setup_helper.collectd_options = options
729
730     def _update_options(self, options, additional_options):
731         """Update collectd options and plugins dictionary"""
732         for k, v in additional_options.items():
733             if isinstance(v, dict) and k in options:
734                 options[k].update(v)
735             else:
736                 options[k] = v
737
738     def wait_for_instantiate(self):
739         buf = []
740         time.sleep(self.WAIT_TIME)  # Give some time for config to load
741         while True:
742             if not self._vnf_process.is_alive():
743                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
744
745             # NOTE(esm): move to QueueFileWrapper
746             while self.q_out.qsize() > 0:
747                 buf.append(self.q_out.get())
748                 message = ''.join(buf)
749                 if self.VNF_PROMPT in message:
750                     LOG.info("%s VNF is up and running.", self.APP_NAME)
751                     self._vnf_up_post()
752                     self.queue_wrapper.clear()
753                     return self._vnf_process.exitcode
754
755                 if "PANIC" in message:
756                     raise RuntimeError("Error starting %s VNF." %
757                                        self.APP_NAME)
758
759             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
760             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
761             # Send ENTER to display a new prompt in case the prompt text was corrupted
762             # by other VNF output
763             self.q_in.put('\r\n')
764
765     def start_collect(self):
766         self.resource_helper.start_collect()
767
768     def stop_collect(self):
769         self.resource_helper.stop_collect()
770
771     def _build_run_kwargs(self):
772         self.run_kwargs = {
773             'stdin': self.queue_wrapper,
774             'stdout': self.queue_wrapper,
775             'keep_stdin_open': True,
776             'pty': True,
777             'timeout': self.scenario_helper.timeout,
778         }
779
780     def _build_config(self):
781         return self.setup_helper.build_config()
782
783     def _run(self):
784         # we can't share ssh paramiko objects to force new connection
785         self.ssh_helper.drop_connection()
786         cmd = self._build_config()
787         # kill before starting
788         self.setup_helper.kill_vnf()
789
790         LOG.debug(cmd)
791         self._build_run_kwargs()
792         self.ssh_helper.run(cmd, **self.run_kwargs)
793
794     def vnf_execute(self, cmd, wait_time=2):
795         """ send cmd to vnf process """
796
797         LOG.info("%s command: %s", self.APP_NAME, cmd)
798         self.q_in.put("{}\r\n".format(cmd))
799         time.sleep(wait_time)
800         output = []
801         while self.q_out.qsize() > 0:
802             output.append(self.q_out.get())
803         return "".join(output)
804
805     def _tear_down(self):
806         pass
807
808     def terminate(self):
809         self.vnf_execute("quit")
810         self.setup_helper.kill_vnf()
811         self._tear_down()
812         self.resource_helper.stop_collect()
813         if self._vnf_process is not None:
814             # be proper and join first before we kill
815             LOG.debug("joining before terminate %s", self._vnf_process.name)
816             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
817             self._vnf_process.terminate()
818         # no terminate children here because we share processes with tg
819
820     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
821         """Method for checking the statistics
822
823         This method could be overridden in children classes.
824
825         :return: VNF statistics
826         """
827         cmd = 'p {0} stats'.format(self.APP_WORD)
828         out = self.vnf_execute(cmd)
829         return out
830
831     def collect_kpi(self):
832         # we can't get KPIs if the VNF is down
833         check_if_process_failed(self._vnf_process, 0.01)
834         stats = self.get_stats()
835         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
836         physical_node = Context.get_physical_node_from_server(
837             self.scenario_helper.nodes[self.name])
838
839         result = {"physical_node": physical_node}
840         if m:
841             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
842             result["collect_stats"] = self.resource_helper.collect_kpi()
843         else:
844             result.update({"packets_in": 0,
845                            "packets_fwd": 0,
846                            "packets_dropped": 0})
847
848         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
849         return result
850
851     def scale(self, flavor=""):
852         """The SampleVNF base class doesn't provide the 'scale' feature"""
853         raise y_exceptions.FunctionNotImplemented(
854             function_name='scale', class_name='SampleVNFTrafficGen')
855
856
857 class SampleVNFTrafficGen(GenericTrafficGen):
858     """ Class providing file-like API for generic traffic generator """
859
860     APP_NAME = 'Sample'
861     RUN_WAIT = 1
862
863     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
864         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
865         self.bin_path = get_nsb_option('bin_path', '')
866
867         self.scenario_helper = ScenarioHelper(self.name)
868         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
869
870         if setup_env_helper_type is None:
871             setup_env_helper_type = SetupEnvHelper
872
873         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
874                                                   self.ssh_helper,
875                                                   self.scenario_helper)
876
877         if resource_helper_type is None:
878             resource_helper_type = ClientResourceHelper
879
880         self.resource_helper = resource_helper_type(self.setup_helper)
881
882         self.runs_traffic = True
883         self.traffic_finished = False
884         self._tg_process = None
885         self._traffic_process = None
886
887     def _start_server(self):
888         # we can't share ssh paramiko objects to force new connection
889         self.ssh_helper.drop_connection()
890
891     def instantiate(self, scenario_cfg, context_cfg):
892         self.scenario_helper.scenario_cfg = scenario_cfg
893         self.resource_helper.update_from_context(
894             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
895             self.scenario_helper.nodes[self.name]
896         )
897
898         self.resource_helper.setup()
899         # must generate_cfg after DPDK bind because we need port number
900         self.resource_helper.generate_cfg()
901
902         LOG.info("Starting %s server...", self.APP_NAME)
903         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
904         self._tg_process = Process(name=name, target=self._start_server)
905         self._tg_process.start()
906
907     def _check_status(self):
908         raise NotImplementedError
909
910     def _wait_for_process(self):
911         while True:
912             if not self._tg_process.is_alive():
913                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
914             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
915             time.sleep(1)
916             status = self._check_status()
917             if status == 0:
918                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
919                 return self._tg_process.exitcode
920
921     def _traffic_runner(self, traffic_profile, mq_id):
922         # always drop connections first thing in new processes
923         # so we don't get paramiko errors
924         self.ssh_helper.drop_connection()
925         LOG.info("Starting %s client...", self.APP_NAME)
926         self._mq_producer = self._setup_mq_producer(mq_id)
927         self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
928
929     def run_traffic(self, traffic_profile):
930         """ Generate traffic on the wire according to the given params.
931         Method is non-blocking, returns immediately when traffic process
932         is running. Mandatory.
933
934         :param traffic_profile:
935         :return: True/False
936         """
937         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
938                                     traffic_profile.__class__.__name__,
939                                     os.getpid())
940         self._traffic_process = Process(
941             name=name, target=self._traffic_runner,
942             args=(traffic_profile, uuid.uuid1().int))
943         self._traffic_process.start()
944         # Wait for traffic process to start
945         while self.resource_helper.client_started.value == 0:
946             time.sleep(self.RUN_WAIT)
947             # what if traffic process takes a few seconds to start?
948             if not self._traffic_process.is_alive():
949                 break
950
951     def collect_kpi(self):
952         # check if the tg processes have exited
953         physical_node = Context.get_physical_node_from_server(
954             self.scenario_helper.nodes[self.name])
955
956         result = {"physical_node": physical_node}
957         for proc in (self._tg_process, self._traffic_process):
958             check_if_process_failed(proc)
959
960         result["collect_stats"] = self.resource_helper.collect_kpi()
961         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
962         return result
963
964     def terminate(self):
965         """ After this method finishes, all traffic processes should stop. Mandatory.
966
967         :return: True/False
968         """
969         self.traffic_finished = True
970         # we must kill client before we kill the server, or the client will raise exception
971         if self._traffic_process is not None:
972             # be proper and try to join before terminating
973             LOG.debug("joining before terminate %s", self._traffic_process.name)
974             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
975             self._traffic_process.terminate()
976         if self._tg_process is not None:
977             # be proper and try to join before terminating
978             LOG.debug("joining before terminate %s", self._tg_process.name)
979             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
980             self._tg_process.terminate()
981         # no terminate children here because we share processes with vnf
982
983     def scale(self, flavor=""):
984         """A traffic generator VFN doesn't provide the 'scale' feature"""
985         raise y_exceptions.FunctionNotImplemented(
986             function_name='scale', class_name='SampleVNFTrafficGen')