Merge "[docs] Remove the VTC chapter in the userguide"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17 import os
18 import posixpath
19 import re
20 import uuid
21 import subprocess
22 import time
23
24 import six
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.common import exceptions as y_exceptions
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.common import utils
33 from yardstick.common import yaml_loader
34 from yardstick.network_services import constants
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
36 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.utils import get_nsb_option
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
42 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
43 from yardstick.benchmark.contexts.node import NodeContext
44
45 LOG = logging.getLogger(__name__)
46
47
48 class SetupEnvHelper(object):
49
50     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
51     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
52     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
53     PIPELINE_COMMAND = ''
54     VNF_TYPE = "SAMPLE"
55
56     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
57         super(SetupEnvHelper, self).__init__()
58         self.vnfd_helper = vnfd_helper
59         self.ssh_helper = ssh_helper
60         self.scenario_helper = scenario_helper
61         self.collectd_options = {}
62
63     def build_config(self):
64         raise NotImplementedError
65
66     def setup_vnf_environment(self):
67         pass
68
69     def kill_vnf(self):
70         pass
71
72     def tear_down(self):
73         raise NotImplementedError
74
75
76 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
77
78     APP_NAME = 'DpdkVnf'
79     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
80     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
81
82     @staticmethod
83     def _update_packet_type(ip_pipeline_cfg, traffic_options):
84         match_str = 'pkt_type = ipv4'
85         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
86         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
87         return pipeline_config_str
88
89     @classmethod
90     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
91         traffic_type = traffic_options['traffic_type']
92
93         if traffic_options['vnf_type'] is not cls.APP_NAME:
94             match_str = 'traffic_type = 4'
95             replace_str = 'traffic_type = {0}'.format(traffic_type)
96
97         elif traffic_type == 4:
98             match_str = 'pkt_type = ipv4'
99             replace_str = 'pkt_type = ipv4'
100
101         else:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv6'
104
105         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
106         return pipeline_config_str
107
108     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
109         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
110         self.all_ports = None
111         self.bound_pci = None
112         self.socket = None
113         self.used_drivers = None
114         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115
116     def _setup_hugepages(self):
117         meminfo = utils.read_meminfo(self.ssh_helper)
118         hp_size_kb = int(meminfo['Hugepagesize'])
119         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
120         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
121         self.ssh_helper.execute('echo %s | sudo tee %s' %
122                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
123         hp = six.BytesIO()
124         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
125         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
126         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
127                  hp_size_kb, nr_hugepages, nr_hugepages_set)
128
129     def build_config(self):
130         vnf_cfg = self.scenario_helper.vnf_cfg
131         task_path = self.scenario_helper.task_path
132
133         config_file = vnf_cfg.get('file')
134         lb_count = vnf_cfg.get('lb_count', 3)
135         lb_config = vnf_cfg.get('lb_config', 'SW')
136         worker_config = vnf_cfg.get('worker_config', '1C/1T')
137         worker_threads = vnf_cfg.get('worker_threads', 3)
138
139         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
140         traffic_options = {
141             'traffic_type': traffic_type,
142             'pkt_type': 'ipv%s' % traffic_type,
143             'vnf_type': self.VNF_TYPE,
144         }
145
146         # read actions/rules from file
147         acl_options = None
148         acl_file_name = self.scenario_helper.options.get('rules')
149         if acl_file_name:
150             with utils.open_relative_file(acl_file_name, task_path) as infile:
151                 acl_options = yaml_loader.yaml_load(infile)
152
153         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
154                                                   task_path)
155         config_basename = posixpath.basename(self.CFG_CONFIG)
156         script_basename = posixpath.basename(self.CFG_SCRIPT)
157         multiport = MultiPortConfig(self.scenario_helper.topology,
158                                     config_tpl_cfg,
159                                     config_basename,
160                                     self.vnfd_helper,
161                                     self.VNF_TYPE,
162                                     lb_count,
163                                     worker_threads,
164                                     worker_config,
165                                     lb_config,
166                                     self.socket)
167
168         multiport.generate_config()
169         if config_file:
170             with utils.open_relative_file(config_file, task_path) as infile:
171                 new_config = ['[EAL]']
172                 vpci = []
173                 for port in self.vnfd_helper.port_pairs.all_ports:
174                     interface = self.vnfd_helper.find_interface(name=port)
175                     vpci.append(interface['virtual-interface']["vpci"])
176                 new_config.extend('w = {0}'.format(item) for item in vpci)
177                 new_config = '\n'.join(new_config) + '\n' + infile.read()
178         else:
179             with open(self.CFG_CONFIG) as handle:
180                 new_config = handle.read()
181             new_config = self._update_traffic_type(new_config, traffic_options)
182             new_config = self._update_packet_type(new_config, traffic_options)
183         self.ssh_helper.upload_config_file(config_basename, new_config)
184         self.ssh_helper.upload_config_file(script_basename,
185             multiport.generate_script(self.vnfd_helper,
186                                       self.get_flows_config(acl_options)))
187
188         LOG.info("Provision and start the %s", self.APP_NAME)
189         self._build_pipeline_kwargs()
190         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
191
192     def get_flows_config(self, options=None): # pylint: disable=unused-argument
193         """No actions/rules (flows) by default"""
194         return None
195
196     def _build_pipeline_kwargs(self):
197         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
198         # count the number of actual ports in the list of pairs
199         # remove duplicate ports
200         # this is really a mapping from LINK ID to DPDK PMD ID
201         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
202         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
203         ports = self.vnfd_helper.port_pairs.all_ports
204         port_nums = self.vnfd_helper.port_nums(ports)
205         # create mask from all the dpdk port numbers
206         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
207
208         vnf_cfg = self.scenario_helper.vnf_cfg
209         lb_config = vnf_cfg.get('lb_config', 'SW')
210         worker_threads = vnf_cfg.get('worker_threads', 3)
211         hwlb = ''
212         if lb_config == 'HW':
213             hwlb = ' --hwlb %s' % worker_threads
214
215         self.pipeline_kwargs = {
216             'cfg_file': self.CFG_CONFIG,
217             'script': self.CFG_SCRIPT,
218             'port_mask_hex': ports_mask_hex,
219             'tool_path': tool_path,
220             'hwlb': hwlb,
221         }
222
223     def setup_vnf_environment(self):
224         self._setup_dpdk()
225         self.kill_vnf()
226         # bind before _setup_resources so we can use dpdk_port_num
227         self._detect_and_bind_drivers()
228         resource = self._setup_resources()
229         return resource
230
231     def kill_vnf(self):
232         # pkill is not matching, debug with pgrep
233         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
234         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
235         # have to use exact match
236         # try using killall to match
237         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
238
239     def _setup_dpdk(self):
240         """Setup DPDK environment needed for VNF to run"""
241         self._setup_hugepages()
242         self.dpdk_bind_helper.load_dpdk_driver()
243
244         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
245         if exit_status == 0:
246             return
247
248     def _setup_resources(self):
249         # what is this magic?  how do we know which socket is for which port?
250         # what about quad-socket?
251         if any(v[5] == "0" for v in self.bound_pci):
252             self.socket = 0
253         else:
254             self.socket = 1
255
256         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
257         # this won't work because we don't have DPDK port numbers yet
258         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
259         port_names = (intf["name"] for intf in ports)
260         plugins = self.collectd_options.get("plugins", {})
261         interval = self.collectd_options.get("interval")
262         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
263         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
264                                plugins=plugins, interval=interval,
265                                timeout=self.scenario_helper.timeout)
266
267     def _check_interface_fields(self):
268         num_nodes = len(self.scenario_helper.nodes)
269         # OpenStack instance creation time is probably proportional to the number
270         # of instances
271         timeout = 120 * num_nodes
272         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
273                              self.ssh_helper, timeout)
274         dpdk_node.check()
275
276     def _detect_and_bind_drivers(self):
277         interfaces = self.vnfd_helper.interfaces
278
279         self._check_interface_fields()
280         # check for bound after probe
281         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
282
283         self.dpdk_bind_helper.read_status()
284         self.dpdk_bind_helper.save_used_drivers()
285
286         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
287
288         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
289         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
290             try:
291                 intf = next(v for v in interfaces
292                             if vpci == v['virtual-interface']['vpci'])
293                 # force to int
294                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
295             except:  # pylint: disable=bare-except
296                 pass
297         time.sleep(2)
298
299     def get_local_iface_name_by_vpci(self, vpci):
300         find_net_cmd = self.FIND_NET_CMD.format(vpci)
301         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
302         if exit_status == 0:
303             return stdout
304         return None
305
306     def tear_down(self):
307         self.dpdk_bind_helper.rebind_drivers()
308
309
310 class ResourceHelper(object):
311
312     COLLECT_KPI = ''
313     MAKE_INSTALL = 'cd {0} && make && sudo make install'
314     RESOURCE_WORD = 'sample'
315
316     COLLECT_MAP = {}
317
318     def __init__(self, setup_helper):
319         super(ResourceHelper, self).__init__()
320         self.resource = None
321         self.setup_helper = setup_helper
322         self.ssh_helper = setup_helper.ssh_helper
323         self._enable = True
324
325     def setup(self):
326         self.resource = self.setup_helper.setup_vnf_environment()
327
328     def generate_cfg(self):
329         pass
330
331     def update_from_context(self, context, attr_name):
332         """Disable resource helper in case of baremetal context.
333
334         And update appropriate node collectd options in context
335         """
336         if isinstance(context, NodeContext):
337             self._enable = False
338             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
339                                                      attr_name)
340
341     def _collect_resource_kpi(self):
342         result = {}
343         status = self.resource.check_if_system_agent_running("collectd")[0]
344         if status == 0 and self._enable:
345             result = self.resource.amqp_collect_nfvi_kpi()
346
347         result = {"core": result}
348         return result
349
350     def start_collect(self):
351         if self._enable:
352             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
353             self.resource.start()
354             self.resource.amqp_process_for_nfvi_kpi()
355
356     def stop_collect(self):
357         if self.resource and self._enable:
358             self.resource.stop()
359
360     def collect_kpi(self):
361         return self._collect_resource_kpi()
362
363
364 class ClientResourceHelper(ResourceHelper):
365
366     RUN_DURATION = 60
367     QUEUE_WAIT_TIME = 5
368     SYNC_PORT = 1
369     ASYNC_PORT = 2
370
371     def __init__(self, setup_helper):
372         super(ClientResourceHelper, self).__init__(setup_helper)
373         self.vnfd_helper = setup_helper.vnfd_helper
374         self.scenario_helper = setup_helper.scenario_helper
375
376         self.client = None
377         self.client_started = Value('i', 0)
378         self.all_ports = None
379         self._queue = Queue()
380         self._result = {}
381         self._terminated = Value('i', 0)
382
383     def _build_ports(self):
384         self.networks = self.vnfd_helper.port_pairs.networks
385         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
386         self.downlink_ports = \
387             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
388         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
389
390     def port_num(self, intf):
391         # by default return port num
392         return self.vnfd_helper.port_num(intf)
393
394     def get_stats(self, *args, **kwargs):
395         try:
396             return self.client.get_stats(*args, **kwargs)
397         except STLError:
398             LOG.error('TRex client not connected')
399             return {}
400
401     def _get_samples(self, ports, port_pg_id=False):
402         raise NotImplementedError()
403
404     def _run_traffic_once(self, traffic_profile):
405         traffic_profile.execute_traffic(self)
406         self.client_started.value = 1
407         time.sleep(self.RUN_DURATION)
408         samples = self._get_samples(traffic_profile.ports)
409         time.sleep(self.QUEUE_WAIT_TIME)
410         self._queue.put(samples)
411
412     def run_traffic(self, traffic_profile, mq_producer):
413         # if we don't do this we can hang waiting for the queue to drain
414         # have to do this in the subprocess
415         self._queue.cancel_join_thread()
416         # fixme: fix passing correct trex config file,
417         # instead of searching the default path
418         mq_producer.tg_method_started()
419         try:
420             self._build_ports()
421             self.client = self._connect()
422             self.client.reset(ports=self.all_ports)
423             self.client.remove_all_streams(self.all_ports)  # remove all streams
424             traffic_profile.register_generator(self)
425
426             iteration_index = 0
427             while self._terminated.value == 0:
428                 iteration_index += 1
429                 if self._run_traffic_once(traffic_profile):
430                     self._terminated.value = 1
431                 mq_producer.tg_method_iteration(iteration_index)
432
433             self.client.stop(self.all_ports)
434             self.client.disconnect()
435             self._terminated.value = 0
436         except STLError:
437             if self._terminated.value:
438                 LOG.debug("traffic generator is stopped")
439                 return  # return if trex/tg server is stopped.
440             raise
441
442         mq_producer.tg_method_finished()
443
444     def terminate(self):
445         self._terminated.value = 1  # stop client
446
447     def clear_stats(self, ports=None):
448         if ports is None:
449             ports = self.all_ports
450         self.client.clear_stats(ports=ports)
451
452     def start(self, ports=None, *args, **kwargs):
453         # pylint: disable=keyword-arg-before-vararg
454         # NOTE(ralonsoh): defining keyworded arguments before variable
455         # positional arguments is a bug. This function definition doesn't work
456         # in Python 2, although it works in Python 3. Reference:
457         # https://www.python.org/dev/peps/pep-3102/
458         if ports is None:
459             ports = self.all_ports
460         self.client.start(ports=ports, *args, **kwargs)
461
462     def collect_kpi(self):
463         if not self._queue.empty():
464             kpi = self._queue.get()
465             self._result.update(kpi)
466             LOG.debug('Got KPIs from _queue for %s %s',
467                       self.scenario_helper.name, self.RESOURCE_WORD)
468         return self._result
469
470     def _connect(self, client=None):
471         if client is None:
472             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
473                                server=self.vnfd_helper.mgmt_interface["ip"],
474                                verbose_level=LoggerApi.VERBOSE_QUIET)
475
476         # try to connect with 5s intervals, 30s max
477         for idx in range(6):
478             try:
479                 client.connect()
480                 break
481             except STLError:
482                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
483                 time.sleep(5)
484         return client
485
486
487 class Rfc2544ResourceHelper(object):
488
489     DEFAULT_CORRELATED_TRAFFIC = False
490     DEFAULT_LATENCY = False
491     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
492
493     def __init__(self, scenario_helper):
494         super(Rfc2544ResourceHelper, self).__init__()
495         self.scenario_helper = scenario_helper
496         self._correlated_traffic = None
497         self.iteration = Value('i', 0)
498         self._latency = None
499         self._rfc2544 = None
500         self._tolerance_low = None
501         self._tolerance_high = None
502
503     @property
504     def rfc2544(self):
505         if self._rfc2544 is None:
506             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
507         return self._rfc2544
508
509     @property
510     def tolerance_low(self):
511         if self._tolerance_low is None:
512             self.get_rfc_tolerance()
513         return self._tolerance_low
514
515     @property
516     def tolerance_high(self):
517         if self._tolerance_high is None:
518             self.get_rfc_tolerance()
519         return self._tolerance_high
520
521     @property
522     def correlated_traffic(self):
523         if self._correlated_traffic is None:
524             self._correlated_traffic = \
525                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
526
527         return self._correlated_traffic
528
529     @property
530     def latency(self):
531         if self._latency is None:
532             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
533         return self._latency
534
535     def get_rfc2544(self, name, default=None):
536         return self.rfc2544.get(name, default)
537
538     def get_rfc_tolerance(self):
539         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
540         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
541         self._tolerance_low = next(tolerance_iter)
542         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
543
544
545 class SampleVNFDeployHelper(object):
546
547     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
548     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
549     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
550
551     def __init__(self, vnfd_helper, ssh_helper):
552         super(SampleVNFDeployHelper, self).__init__()
553         self.ssh_helper = ssh_helper
554         self.vnfd_helper = vnfd_helper
555
556     def deploy_vnfs(self, app_name):
557         vnf_bin = self.ssh_helper.join_bin_path(app_name)
558         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
559         if not exit_status:
560             return
561
562         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
563         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
564         time.sleep(2)
565         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
566         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
567
568         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
569         time.sleep(2)
570         http_proxy = os.environ.get('http_proxy', '')
571         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
572         LOG.debug(cmd)
573         self.ssh_helper.execute(cmd)
574         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
575         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
576         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
577
578
579 class ScenarioHelper(object):
580
581     DEFAULT_VNF_CFG = {
582         'lb_config': 'SW',
583         'lb_count': 1,
584         'worker_config': '1C/1T',
585         'worker_threads': 1,
586     }
587
588     def __init__(self, name):
589         self.name = name
590         self.scenario_cfg = None
591
592     @property
593     def task_path(self):
594         return self.scenario_cfg['task_path']
595
596     @property
597     def nodes(self):
598         return self.scenario_cfg.get('nodes')
599
600     @property
601     def all_options(self):
602         return self.scenario_cfg.get('options', {})
603
604     @property
605     def options(self):
606         return self.all_options.get(self.name, {})
607
608     @property
609     def vnf_cfg(self):
610         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
611
612     @property
613     def topology(self):
614         return self.scenario_cfg['topology']
615
616     @property
617     def timeout(self):
618         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
619             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
620         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
621         return test_duration if test_duration > test_timeout else test_timeout
622
623
624 class SampleVNF(GenericVNF):
625     """ Class providing file-like API for generic VNF implementation """
626
627     VNF_PROMPT = "pipeline>"
628     WAIT_TIME = 1
629     WAIT_TIME_FOR_SCRIPT = 10
630     APP_NAME = "SampleVNF"
631     # we run the VNF interactively, so the ssh command will timeout after this long
632
633     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
634                  resource_helper_type=None):
635         super(SampleVNF, self).__init__(name, vnfd, task_id)
636         self.bin_path = get_nsb_option('bin_path', '')
637
638         self.scenario_helper = ScenarioHelper(self.name)
639         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
640
641         if setup_env_helper_type is None:
642             setup_env_helper_type = SetupEnvHelper
643
644         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
645                                                   self.ssh_helper,
646                                                   self.scenario_helper)
647
648         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
649
650         if resource_helper_type is None:
651             resource_helper_type = ResourceHelper
652
653         self.resource_helper = resource_helper_type(self.setup_helper)
654
655         self.context_cfg = None
656         self.pipeline_kwargs = {}
657         self.uplink_ports = None
658         self.downlink_ports = None
659         # NOTE(esm): make QueueFileWrapper invert-able so that we
660         #            never have to manage the queues
661         self.q_in = Queue()
662         self.q_out = Queue()
663         self.queue_wrapper = None
664         self.run_kwargs = {}
665         self.used_drivers = {}
666         self.vnf_port_pairs = None
667         self._vnf_process = None
668
669     def _start_vnf(self):
670         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
671         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
672         self._vnf_process = Process(name=name, target=self._run)
673         self._vnf_process.start()
674
675     def _vnf_up_post(self):
676         pass
677
678     def instantiate(self, scenario_cfg, context_cfg):
679         self._update_collectd_options(scenario_cfg, context_cfg)
680         self.scenario_helper.scenario_cfg = scenario_cfg
681         self.context_cfg = context_cfg
682         self.resource_helper.update_from_context(
683             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
684             self.scenario_helper.nodes[self.name]
685         )
686
687         # vnf deploy is unsupported, use ansible playbooks
688         if self.scenario_helper.options.get("vnf_deploy", False):
689             self.deploy_helper.deploy_vnfs(self.APP_NAME)
690         self.resource_helper.setup()
691         self._start_vnf()
692
693     def _update_collectd_options(self, scenario_cfg, context_cfg):
694         """Update collectd configuration options
695         This function retrieves all collectd options contained in the test case
696
697         definition builds a single dictionary combining them. The following fragment
698         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
699         ---
700         schema: yardstick:task:0.1
701         scenarios:
702         - type: NSPerf
703           nodes:
704             tg__0: trafficgen_1.yardstick
705             vnf__0: vnf.yardstick
706           options:
707             collectd:
708               <options>  # COLLECTD priority 3
709             vnf__0:
710               collectd:
711                 plugins:
712                     load
713                 <options> # COLLECTD priority 2
714         context:
715           type: Node
716           name: yardstick
717           nfvi_type: baremetal
718           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
719         """
720         scenario_options = scenario_cfg.get('options', {})
721         generic_options = scenario_options.get('collectd', {})
722         scenario_node_options = scenario_options.get(self.name, {})\
723             .get('collectd', {})
724         context_node_options = context_cfg.get('nodes', {})\
725             .get(self.name, {}).get('collectd', {})
726
727         options = generic_options
728         self._update_options(options, scenario_node_options)
729         self._update_options(options, context_node_options)
730
731         self.setup_helper.collectd_options = options
732
733     def _update_options(self, options, additional_options):
734         """Update collectd options and plugins dictionary"""
735         for k, v in additional_options.items():
736             if isinstance(v, dict) and k in options:
737                 options[k].update(v)
738             else:
739                 options[k] = v
740
741     def wait_for_instantiate(self):
742         buf = []
743         time.sleep(self.WAIT_TIME)  # Give some time for config to load
744         while True:
745             if not self._vnf_process.is_alive():
746                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
747
748             # NOTE(esm): move to QueueFileWrapper
749             while self.q_out.qsize() > 0:
750                 buf.append(self.q_out.get())
751                 message = ''.join(buf)
752                 if self.VNF_PROMPT in message:
753                     LOG.info("%s VNF is up and running.", self.APP_NAME)
754                     self._vnf_up_post()
755                     self.queue_wrapper.clear()
756                     return self._vnf_process.exitcode
757
758                 if "PANIC" in message:
759                     raise RuntimeError("Error starting %s VNF." %
760                                        self.APP_NAME)
761
762             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
763             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
764             # Send ENTER to display a new prompt in case the prompt text was corrupted
765             # by other VNF output
766             self.q_in.put('\r\n')
767
768     def start_collect(self):
769         self.resource_helper.start_collect()
770
771     def stop_collect(self):
772         self.resource_helper.stop_collect()
773
774     def _build_run_kwargs(self):
775         self.run_kwargs = {
776             'stdin': self.queue_wrapper,
777             'stdout': self.queue_wrapper,
778             'keep_stdin_open': True,
779             'pty': True,
780             'timeout': self.scenario_helper.timeout,
781         }
782
783     def _build_config(self):
784         return self.setup_helper.build_config()
785
786     def _run(self):
787         # we can't share ssh paramiko objects to force new connection
788         self.ssh_helper.drop_connection()
789         cmd = self._build_config()
790         # kill before starting
791         self.setup_helper.kill_vnf()
792
793         LOG.debug(cmd)
794         self._build_run_kwargs()
795         self.ssh_helper.run(cmd, **self.run_kwargs)
796
797     def vnf_execute(self, cmd, wait_time=2):
798         """ send cmd to vnf process """
799
800         LOG.info("%s command: %s", self.APP_NAME, cmd)
801         self.q_in.put("{}\r\n".format(cmd))
802         time.sleep(wait_time)
803         output = []
804         while self.q_out.qsize() > 0:
805             output.append(self.q_out.get())
806         return "".join(output)
807
808     def _tear_down(self):
809         pass
810
811     def terminate(self):
812         self.vnf_execute("quit")
813         self.setup_helper.kill_vnf()
814         self._tear_down()
815         self.resource_helper.stop_collect()
816         if self._vnf_process is not None:
817             # be proper and join first before we kill
818             LOG.debug("joining before terminate %s", self._vnf_process.name)
819             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
820             self._vnf_process.terminate()
821         # no terminate children here because we share processes with tg
822
823     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
824         """Method for checking the statistics
825
826         This method could be overridden in children classes.
827
828         :return: VNF statistics
829         """
830         cmd = 'p {0} stats'.format(self.APP_WORD)
831         out = self.vnf_execute(cmd)
832         return out
833
834     def collect_kpi(self):
835         # we can't get KPIs if the VNF is down
836         check_if_process_failed(self._vnf_process, 0.01)
837         stats = self.get_stats()
838         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
839         physical_node = Context.get_physical_node_from_server(
840             self.scenario_helper.nodes[self.name])
841
842         result = {"physical_node": physical_node}
843         if m:
844             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
845             result["collect_stats"] = self.resource_helper.collect_kpi()
846         else:
847             result.update({"packets_in": 0,
848                            "packets_fwd": 0,
849                            "packets_dropped": 0})
850
851         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
852         return result
853
854     def scale(self, flavor=""):
855         """The SampleVNF base class doesn't provide the 'scale' feature"""
856         raise y_exceptions.FunctionNotImplemented(
857             function_name='scale', class_name='SampleVNFTrafficGen')
858
859
860 class SampleVNFTrafficGen(GenericTrafficGen):
861     """ Class providing file-like API for generic traffic generator """
862
863     APP_NAME = 'Sample'
864     RUN_WAIT = 1
865
866     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
867                  resource_helper_type=None):
868         super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
869         self.bin_path = get_nsb_option('bin_path', '')
870
871         self.scenario_helper = ScenarioHelper(self.name)
872         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
873
874         if setup_env_helper_type is None:
875             setup_env_helper_type = SetupEnvHelper
876
877         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
878                                                   self.ssh_helper,
879                                                   self.scenario_helper)
880
881         if resource_helper_type is None:
882             resource_helper_type = ClientResourceHelper
883
884         self.resource_helper = resource_helper_type(self.setup_helper)
885
886         self.runs_traffic = True
887         self.traffic_finished = False
888         self._tg_process = None
889         self._traffic_process = None
890
891     def _start_server(self):
892         # we can't share ssh paramiko objects to force new connection
893         self.ssh_helper.drop_connection()
894
895     def instantiate(self, scenario_cfg, context_cfg):
896         self.scenario_helper.scenario_cfg = scenario_cfg
897         self.resource_helper.update_from_context(
898             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
899             self.scenario_helper.nodes[self.name]
900         )
901
902         self.resource_helper.setup()
903         # must generate_cfg after DPDK bind because we need port number
904         self.resource_helper.generate_cfg()
905
906         LOG.info("Starting %s server...", self.APP_NAME)
907         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
908         self._tg_process = Process(name=name, target=self._start_server)
909         self._tg_process.start()
910
911     def _check_status(self):
912         raise NotImplementedError
913
914     def _wait_for_process(self):
915         while True:
916             if not self._tg_process.is_alive():
917                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
918             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
919             time.sleep(1)
920             status = self._check_status()
921             if status == 0:
922                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
923                 return self._tg_process.exitcode
924
925     def _traffic_runner(self, traffic_profile, mq_id):
926         # always drop connections first thing in new processes
927         # so we don't get paramiko errors
928         self.ssh_helper.drop_connection()
929         LOG.info("Starting %s client...", self.APP_NAME)
930         self._mq_producer = self._setup_mq_producer(mq_id)
931         self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
932
933     def run_traffic(self, traffic_profile):
934         """ Generate traffic on the wire according to the given params.
935         Method is non-blocking, returns immediately when traffic process
936         is running. Mandatory.
937
938         :param traffic_profile:
939         :return: True/False
940         """
941         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
942                                     traffic_profile.__class__.__name__,
943                                     os.getpid())
944         self._traffic_process = Process(
945             name=name, target=self._traffic_runner,
946             args=(traffic_profile, uuid.uuid1().int))
947         self._traffic_process.start()
948         # Wait for traffic process to start
949         while self.resource_helper.client_started.value == 0:
950             time.sleep(self.RUN_WAIT)
951             # what if traffic process takes a few seconds to start?
952             if not self._traffic_process.is_alive():
953                 break
954
955     def collect_kpi(self):
956         # check if the tg processes have exited
957         physical_node = Context.get_physical_node_from_server(
958             self.scenario_helper.nodes[self.name])
959
960         result = {"physical_node": physical_node}
961         for proc in (self._tg_process, self._traffic_process):
962             check_if_process_failed(proc)
963
964         result["collect_stats"] = self.resource_helper.collect_kpi()
965         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
966         return result
967
968     def terminate(self):
969         """ After this method finishes, all traffic processes should stop. Mandatory.
970
971         :return: True/False
972         """
973         self.traffic_finished = True
974         # we must kill client before we kill the server, or the client will raise exception
975         if self._traffic_process is not None:
976             # be proper and try to join before terminating
977             LOG.debug("joining before terminate %s", self._traffic_process.name)
978             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
979             self._traffic_process.terminate()
980         if self._tg_process is not None:
981             # be proper and try to join before terminating
982             LOG.debug("joining before terminate %s", self._tg_process.name)
983             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
984             self._tg_process.terminate()
985         # no terminate children here because we share processes with vnf
986
987     def scale(self, flavor=""):
988         """A traffic generator VFN doesn't provide the 'scale' feature"""
989         raise y_exceptions.FunctionNotImplemented(
990             function_name='scale', class_name='SampleVNFTrafficGen')