Merge "Replace cinder delete volume with shade client."
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
44
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SetupEnvHelper(object):
50
51     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
52     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
53     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
54     PIPELINE_COMMAND = ''
55     VNF_TYPE = "SAMPLE"
56
57     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
58         super(SetupEnvHelper, self).__init__()
59         self.vnfd_helper = vnfd_helper
60         self.ssh_helper = ssh_helper
61         self.scenario_helper = scenario_helper
62         self.collectd_options = {}
63
64     def build_config(self):
65         raise NotImplementedError
66
67     def setup_vnf_environment(self):
68         pass
69
70     def kill_vnf(self):
71         pass
72
73     def tear_down(self):
74         raise NotImplementedError
75
76
77 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
78
79     APP_NAME = 'DpdkVnf'
80     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
81     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
82
83     @staticmethod
84     def _update_packet_type(ip_pipeline_cfg, traffic_options):
85         match_str = 'pkt_type = ipv4'
86         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
87         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
88         return pipeline_config_str
89
90     @classmethod
91     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
92         traffic_type = traffic_options['traffic_type']
93
94         if traffic_options['vnf_type'] is not cls.APP_NAME:
95             match_str = 'traffic_type = 4'
96             replace_str = 'traffic_type = {0}'.format(traffic_type)
97
98         elif traffic_type == 4:
99             match_str = 'pkt_type = ipv4'
100             replace_str = 'pkt_type = ipv4'
101
102         else:
103             match_str = 'pkt_type = ipv4'
104             replace_str = 'pkt_type = ipv6'
105
106         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
107         return pipeline_config_str
108
109     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
110         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
111         self.all_ports = None
112         self.bound_pci = None
113         self.socket = None
114         self.used_drivers = None
115         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
116
117     def _setup_hugepages(self):
118         meminfo = utils.read_meminfo(self.ssh_helper)
119         hp_size_kb = int(meminfo['Hugepagesize'])
120         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
121         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
122         self.ssh_helper.execute('echo %s | sudo tee %s' %
123                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
124         hp = six.BytesIO()
125         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
126         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
127         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
128                  hp_size_kb, nr_hugepages, nr_hugepages_set)
129
130     def build_config(self):
131         vnf_cfg = self.scenario_helper.vnf_cfg
132         task_path = self.scenario_helper.task_path
133
134         config_file = vnf_cfg.get('file')
135         lb_count = vnf_cfg.get('lb_count', 3)
136         lb_config = vnf_cfg.get('lb_config', 'SW')
137         worker_config = vnf_cfg.get('worker_config', '1C/1T')
138         worker_threads = vnf_cfg.get('worker_threads', 3)
139
140         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
141         traffic_options = {
142             'traffic_type': traffic_type,
143             'pkt_type': 'ipv%s' % traffic_type,
144             'vnf_type': self.VNF_TYPE,
145         }
146
147         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
148                                                   task_path)
149         config_basename = posixpath.basename(self.CFG_CONFIG)
150         script_basename = posixpath.basename(self.CFG_SCRIPT)
151         multiport = MultiPortConfig(self.scenario_helper.topology,
152                                     config_tpl_cfg,
153                                     config_basename,
154                                     self.vnfd_helper,
155                                     self.VNF_TYPE,
156                                     lb_count,
157                                     worker_threads,
158                                     worker_config,
159                                     lb_config,
160                                     self.socket)
161
162         multiport.generate_config()
163         if config_file:
164             with utils.open_relative_file(config_file, task_path) as infile:
165                 new_config = ['[EAL]']
166                 vpci = []
167                 for port in self.vnfd_helper.port_pairs.all_ports:
168                     interface = self.vnfd_helper.find_interface(name=port)
169                     vpci.append(interface['virtual-interface']["vpci"])
170                 new_config.extend('w = {0}'.format(item) for item in vpci)
171                 new_config = '\n'.join(new_config) + '\n' + infile.read()
172         else:
173             with open(self.CFG_CONFIG) as handle:
174                 new_config = handle.read()
175             new_config = self._update_traffic_type(new_config, traffic_options)
176             new_config = self._update_packet_type(new_config, traffic_options)
177         self.ssh_helper.upload_config_file(config_basename, new_config)
178         self.ssh_helper.upload_config_file(script_basename,
179                                            multiport.generate_script(self.vnfd_helper))
180
181         LOG.info("Provision and start the %s", self.APP_NAME)
182         self._build_pipeline_kwargs()
183         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
184
185     def _build_pipeline_kwargs(self):
186         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
187         # count the number of actual ports in the list of pairs
188         # remove duplicate ports
189         # this is really a mapping from LINK ID to DPDK PMD ID
190         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
191         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
192         ports = self.vnfd_helper.port_pairs.all_ports
193         port_nums = self.vnfd_helper.port_nums(ports)
194         # create mask from all the dpdk port numbers
195         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
196         self.pipeline_kwargs = {
197             'cfg_file': self.CFG_CONFIG,
198             'script': self.CFG_SCRIPT,
199             'port_mask_hex': ports_mask_hex,
200             'tool_path': tool_path,
201         }
202
203     def setup_vnf_environment(self):
204         self._setup_dpdk()
205         self.kill_vnf()
206         # bind before _setup_resources so we can use dpdk_port_num
207         self._detect_and_bind_drivers()
208         resource = self._setup_resources()
209         return resource
210
211     def kill_vnf(self):
212         # pkill is not matching, debug with pgrep
213         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
214         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
215         # have to use exact match
216         # try using killall to match
217         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
218
219     def _setup_dpdk(self):
220         """Setup DPDK environment needed for VNF to run"""
221         self._setup_hugepages()
222         self.dpdk_bind_helper.load_dpdk_driver()
223
224         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
225         if exit_status == 0:
226             return
227
228     def _setup_resources(self):
229         # what is this magic?  how do we know which socket is for which port?
230         # what about quad-socket?
231         if any(v[5] == "0" for v in self.bound_pci):
232             self.socket = 0
233         else:
234             self.socket = 1
235
236         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
237         # this won't work because we don't have DPDK port numbers yet
238         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
239         port_names = (intf["name"] for intf in ports)
240         plugins = self.collectd_options.get("plugins", {})
241         interval = self.collectd_options.get("interval")
242         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
243         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
244                                plugins=plugins, interval=interval,
245                                timeout=self.scenario_helper.timeout)
246
247     def _check_interface_fields(self):
248         num_nodes = len(self.scenario_helper.nodes)
249         # OpenStack instance creation time is probably proportional to the number
250         # of instances
251         timeout = 120 * num_nodes
252         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
253                              self.ssh_helper, timeout)
254         dpdk_node.check()
255
256     def _detect_and_bind_drivers(self):
257         interfaces = self.vnfd_helper.interfaces
258
259         self._check_interface_fields()
260         # check for bound after probe
261         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
262
263         self.dpdk_bind_helper.read_status()
264         self.dpdk_bind_helper.save_used_drivers()
265
266         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
267
268         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
269         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
270             try:
271                 intf = next(v for v in interfaces
272                             if vpci == v['virtual-interface']['vpci'])
273                 # force to int
274                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
275             except:  # pylint: disable=bare-except
276                 pass
277         time.sleep(2)
278
279     def get_local_iface_name_by_vpci(self, vpci):
280         find_net_cmd = self.FIND_NET_CMD.format(vpci)
281         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
282         if exit_status == 0:
283             return stdout
284         return None
285
286     def tear_down(self):
287         self.dpdk_bind_helper.rebind_drivers()
288
289
290 class ResourceHelper(object):
291
292     COLLECT_KPI = ''
293     MAKE_INSTALL = 'cd {0} && make && sudo make install'
294     RESOURCE_WORD = 'sample'
295
296     COLLECT_MAP = {}
297
298     def __init__(self, setup_helper):
299         super(ResourceHelper, self).__init__()
300         self.resource = None
301         self.setup_helper = setup_helper
302         self.ssh_helper = setup_helper.ssh_helper
303
304     def setup(self):
305         self.resource = self.setup_helper.setup_vnf_environment()
306
307     def generate_cfg(self):
308         pass
309
310     def _collect_resource_kpi(self):
311         result = {}
312         status = self.resource.check_if_system_agent_running("collectd")[0]
313         if status == 0:
314             result = self.resource.amqp_collect_nfvi_kpi()
315
316         result = {"core": result}
317         return result
318
319     def start_collect(self):
320         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
321         self.resource.start()
322         self.resource.amqp_process_for_nfvi_kpi()
323
324     def stop_collect(self):
325         if self.resource:
326             self.resource.stop()
327
328     def collect_kpi(self):
329         return self._collect_resource_kpi()
330
331
332 class ClientResourceHelper(ResourceHelper):
333
334     RUN_DURATION = 60
335     QUEUE_WAIT_TIME = 5
336     SYNC_PORT = 1
337     ASYNC_PORT = 2
338
339     def __init__(self, setup_helper):
340         super(ClientResourceHelper, self).__init__(setup_helper)
341         self.vnfd_helper = setup_helper.vnfd_helper
342         self.scenario_helper = setup_helper.scenario_helper
343
344         self.client = None
345         self.client_started = Value('i', 0)
346         self.all_ports = None
347         self._queue = Queue()
348         self._result = {}
349         self._terminated = Value('i', 0)
350
351     def _build_ports(self):
352         self.networks = self.vnfd_helper.port_pairs.networks
353         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
354         self.downlink_ports = \
355             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
356         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
357
358     def port_num(self, intf):
359         # by default return port num
360         return self.vnfd_helper.port_num(intf)
361
362     def get_stats(self, *args, **kwargs):
363         try:
364             return self.client.get_stats(*args, **kwargs)
365         except STLError:
366             LOG.error('TRex client not connected')
367             return {}
368
369     def generate_samples(self, ports, key=None, default=None):
370         # needs to be used ports
371         last_result = self.get_stats(ports)
372         key_value = last_result.get(key, default)
373
374         if not isinstance(last_result, Mapping):  # added for mock unit test
375             self._terminated.value = 1
376             return {}
377
378         samples = {}
379         # recalculate port for interface and see if it matches ports provided
380         for intf in self.vnfd_helper.interfaces:
381             name = intf["name"]
382             port = self.vnfd_helper.port_num(name)
383             if port in ports:
384                 xe_value = last_result.get(port, {})
385                 samples[name] = {
386                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
387                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
388                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
389                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
390                     "in_packets": int(xe_value.get("ipackets", 0)),
391                     "out_packets": int(xe_value.get("opackets", 0)),
392                 }
393                 if key:
394                     samples[name][key] = key_value
395         return samples
396
397     def _run_traffic_once(self, traffic_profile):
398         traffic_profile.execute_traffic(self)
399         self.client_started.value = 1
400         time.sleep(self.RUN_DURATION)
401         samples = self.generate_samples(traffic_profile.ports)
402         time.sleep(self.QUEUE_WAIT_TIME)
403         self._queue.put(samples)
404
405     def run_traffic(self, traffic_profile):
406         # if we don't do this we can hang waiting for the queue to drain
407         # have to do this in the subprocess
408         self._queue.cancel_join_thread()
409         # fixme: fix passing correct trex config file,
410         # instead of searching the default path
411         try:
412             self._build_ports()
413             self.client = self._connect()
414             self.client.reset(ports=self.all_ports)
415             self.client.remove_all_streams(self.all_ports)  # remove all streams
416             traffic_profile.register_generator(self)
417
418             while self._terminated.value == 0:
419                 self._run_traffic_once(traffic_profile)
420
421             self.client.stop(self.all_ports)
422             self.client.disconnect()
423             self._terminated.value = 0
424         except STLError:
425             if self._terminated.value:
426                 LOG.debug("traffic generator is stopped")
427                 return  # return if trex/tg server is stopped.
428             raise
429
430     def terminate(self):
431         self._terminated.value = 1  # stop client
432
433     def clear_stats(self, ports=None):
434         if ports is None:
435             ports = self.all_ports
436         self.client.clear_stats(ports=ports)
437
438     def start(self, ports=None, *args, **kwargs):
439         # pylint: disable=keyword-arg-before-vararg
440         # NOTE(ralonsoh): defining keyworded arguments before variable
441         # positional arguments is a bug. This function definition doesn't work
442         # in Python 2, although it works in Python 3. Reference:
443         # https://www.python.org/dev/peps/pep-3102/
444         if ports is None:
445             ports = self.all_ports
446         self.client.start(ports=ports, *args, **kwargs)
447
448     def collect_kpi(self):
449         if not self._queue.empty():
450             kpi = self._queue.get()
451             self._result.update(kpi)
452             LOG.debug('Got KPIs from _queue for %s %s',
453                       self.scenario_helper.name, self.RESOURCE_WORD)
454         return self._result
455
456     def _connect(self, client=None):
457         if client is None:
458             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
459                                server=self.vnfd_helper.mgmt_interface["ip"],
460                                verbose_level=LoggerApi.VERBOSE_QUIET)
461
462         # try to connect with 5s intervals, 30s max
463         for idx in range(6):
464             try:
465                 client.connect()
466                 break
467             except STLError:
468                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
469                 time.sleep(5)
470         return client
471
472
473 class Rfc2544ResourceHelper(object):
474
475     DEFAULT_CORRELATED_TRAFFIC = False
476     DEFAULT_LATENCY = False
477     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
478
479     def __init__(self, scenario_helper):
480         super(Rfc2544ResourceHelper, self).__init__()
481         self.scenario_helper = scenario_helper
482         self._correlated_traffic = None
483         self.iteration = Value('i', 0)
484         self._latency = None
485         self._rfc2544 = None
486         self._tolerance_low = None
487         self._tolerance_high = None
488
489     @property
490     def rfc2544(self):
491         if self._rfc2544 is None:
492             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
493         return self._rfc2544
494
495     @property
496     def tolerance_low(self):
497         if self._tolerance_low is None:
498             self.get_rfc_tolerance()
499         return self._tolerance_low
500
501     @property
502     def tolerance_high(self):
503         if self._tolerance_high is None:
504             self.get_rfc_tolerance()
505         return self._tolerance_high
506
507     @property
508     def correlated_traffic(self):
509         if self._correlated_traffic is None:
510             self._correlated_traffic = \
511                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
512
513         return self._correlated_traffic
514
515     @property
516     def latency(self):
517         if self._latency is None:
518             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
519         return self._latency
520
521     def get_rfc2544(self, name, default=None):
522         return self.rfc2544.get(name, default)
523
524     def get_rfc_tolerance(self):
525         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
526         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
527         self._tolerance_low = next(tolerance_iter)
528         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
529
530
531 class SampleVNFDeployHelper(object):
532
533     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
534     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
535     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
536
537     def __init__(self, vnfd_helper, ssh_helper):
538         super(SampleVNFDeployHelper, self).__init__()
539         self.ssh_helper = ssh_helper
540         self.vnfd_helper = vnfd_helper
541
542     def deploy_vnfs(self, app_name):
543         vnf_bin = self.ssh_helper.join_bin_path(app_name)
544         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
545         if not exit_status:
546             return
547
548         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
549         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
550         time.sleep(2)
551         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
552         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
553
554         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
555         time.sleep(2)
556         http_proxy = os.environ.get('http_proxy', '')
557         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
558         LOG.debug(cmd)
559         self.ssh_helper.execute(cmd)
560         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
561         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
562         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
563
564
565 class ScenarioHelper(object):
566
567     DEFAULT_VNF_CFG = {
568         'lb_config': 'SW',
569         'lb_count': 1,
570         'worker_config': '1C/1T',
571         'worker_threads': 1,
572     }
573
574     def __init__(self, name):
575         self.name = name
576         self.scenario_cfg = None
577
578     @property
579     def task_path(self):
580         return self.scenario_cfg['task_path']
581
582     @property
583     def nodes(self):
584         return self.scenario_cfg.get('nodes')
585
586     @property
587     def all_options(self):
588         return self.scenario_cfg.get('options', {})
589
590     @property
591     def options(self):
592         return self.all_options.get(self.name, {})
593
594     @property
595     def vnf_cfg(self):
596         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
597
598     @property
599     def topology(self):
600         return self.scenario_cfg['topology']
601
602     @property
603     def timeout(self):
604         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
605             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
606         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
607         return test_duration if test_duration > test_timeout else test_timeout
608
609 class SampleVNF(GenericVNF):
610     """ Class providing file-like API for generic VNF implementation """
611
612     VNF_PROMPT = "pipeline>"
613     WAIT_TIME = 1
614     WAIT_TIME_FOR_SCRIPT = 10
615     APP_NAME = "SampleVNF"
616     # we run the VNF interactively, so the ssh command will timeout after this long
617
618     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
619         super(SampleVNF, self).__init__(name, vnfd)
620         self.bin_path = get_nsb_option('bin_path', '')
621
622         self.scenario_helper = ScenarioHelper(self.name)
623         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
624
625         if setup_env_helper_type is None:
626             setup_env_helper_type = SetupEnvHelper
627
628         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
629                                                   self.ssh_helper,
630                                                   self.scenario_helper)
631
632         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
633
634         if resource_helper_type is None:
635             resource_helper_type = ResourceHelper
636
637         self.resource_helper = resource_helper_type(self.setup_helper)
638
639         self.context_cfg = None
640         self.nfvi_context = None
641         self.pipeline_kwargs = {}
642         self.uplink_ports = None
643         self.downlink_ports = None
644         # NOTE(esm): make QueueFileWrapper invert-able so that we
645         #            never have to manage the queues
646         self.q_in = Queue()
647         self.q_out = Queue()
648         self.queue_wrapper = None
649         self.run_kwargs = {}
650         self.used_drivers = {}
651         self.vnf_port_pairs = None
652         self._vnf_process = None
653
654     def _start_vnf(self):
655         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
656         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
657         self._vnf_process = Process(name=name, target=self._run)
658         self._vnf_process.start()
659
660     def _vnf_up_post(self):
661         pass
662
663     def instantiate(self, scenario_cfg, context_cfg):
664         self._update_collectd_options(scenario_cfg, context_cfg)
665         self.scenario_helper.scenario_cfg = scenario_cfg
666         self.context_cfg = context_cfg
667         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
668         # self.nfvi_context = None
669
670         # vnf deploy is unsupported, use ansible playbooks
671         if self.scenario_helper.options.get("vnf_deploy", False):
672             self.deploy_helper.deploy_vnfs(self.APP_NAME)
673         self.resource_helper.setup()
674         self._start_vnf()
675
676     def _update_collectd_options(self, scenario_cfg, context_cfg):
677         """Update collectd configuration options
678         This function retrieves all collectd options contained in the test case
679
680         definition builds a single dictionary combining them. The following fragment
681         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
682         ---
683         schema: yardstick:task:0.1
684         scenarios:
685         - type: NSPerf
686           nodes:
687             tg__0: trafficgen_1.yardstick
688             vnf__0: vnf.yardstick
689           options:
690             collectd:
691               <options>  # COLLECTD priority 3
692             vnf__0:
693               collectd:
694                 plugins:
695                     load
696                 <options> # COLLECTD priority 2
697         context:
698           type: Node
699           name: yardstick
700           nfvi_type: baremetal
701           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
702         """
703         scenario_options = scenario_cfg.get('options', {})
704         generic_options = scenario_options.get('collectd', {})
705         scenario_node_options = scenario_options.get(self.name, {})\
706             .get('collectd', {})
707         context_node_options = context_cfg.get('nodes', {})\
708             .get(self.name, {}).get('collectd', {})
709
710         options = generic_options
711         self._update_options(options, scenario_node_options)
712         self._update_options(options, context_node_options)
713
714         self.setup_helper.collectd_options = options
715
716     def _update_options(self, options, additional_options):
717         """Update collectd options and plugins dictionary"""
718         for k, v in additional_options.items():
719             if isinstance(v, dict) and k in options:
720                 options[k].update(v)
721             else:
722                 options[k] = v
723
724     def wait_for_instantiate(self):
725         buf = []
726         time.sleep(self.WAIT_TIME)  # Give some time for config to load
727         while True:
728             if not self._vnf_process.is_alive():
729                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
730
731             # NOTE(esm): move to QueueFileWrapper
732             while self.q_out.qsize() > 0:
733                 buf.append(self.q_out.get())
734                 message = ''.join(buf)
735                 if self.VNF_PROMPT in message:
736                     LOG.info("%s VNF is up and running.", self.APP_NAME)
737                     self._vnf_up_post()
738                     self.queue_wrapper.clear()
739                     return self._vnf_process.exitcode
740
741                 if "PANIC" in message:
742                     raise RuntimeError("Error starting %s VNF." %
743                                        self.APP_NAME)
744
745             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
746             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
747             # Send ENTER to display a new prompt in case the prompt text was corrupted
748             # by other VNF output
749             self.q_in.put('\r\n')
750
751     def start_collect(self):
752         self.resource_helper.start_collect()
753
754     def stop_collect(self):
755         self.resource_helper.stop_collect()
756
757     def _build_run_kwargs(self):
758         self.run_kwargs = {
759             'stdin': self.queue_wrapper,
760             'stdout': self.queue_wrapper,
761             'keep_stdin_open': True,
762             'pty': True,
763             'timeout': self.scenario_helper.timeout,
764         }
765
766     def _build_config(self):
767         return self.setup_helper.build_config()
768
769     def _run(self):
770         # we can't share ssh paramiko objects to force new connection
771         self.ssh_helper.drop_connection()
772         cmd = self._build_config()
773         # kill before starting
774         self.setup_helper.kill_vnf()
775
776         LOG.debug(cmd)
777         self._build_run_kwargs()
778         self.ssh_helper.run(cmd, **self.run_kwargs)
779
780     def vnf_execute(self, cmd, wait_time=2):
781         """ send cmd to vnf process """
782
783         LOG.info("%s command: %s", self.APP_NAME, cmd)
784         self.q_in.put("{}\r\n".format(cmd))
785         time.sleep(wait_time)
786         output = []
787         while self.q_out.qsize() > 0:
788             output.append(self.q_out.get())
789         return "".join(output)
790
791     def _tear_down(self):
792         pass
793
794     def terminate(self):
795         self.vnf_execute("quit")
796         self.setup_helper.kill_vnf()
797         self._tear_down()
798         self.resource_helper.stop_collect()
799         if self._vnf_process is not None:
800             # be proper and join first before we kill
801             LOG.debug("joining before terminate %s", self._vnf_process.name)
802             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
803             self._vnf_process.terminate()
804         # no terminate children here because we share processes with tg
805
806     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
807         """Method for checking the statistics
808
809         This method could be overridden in children classes.
810
811         :return: VNF statistics
812         """
813         cmd = 'p {0} stats'.format(self.APP_WORD)
814         out = self.vnf_execute(cmd)
815         return out
816
817     def collect_kpi(self):
818         # we can't get KPIs if the VNF is down
819         check_if_process_failed(self._vnf_process)
820         stats = self.get_stats()
821         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
822         if m:
823             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
824             result["collect_stats"] = self.resource_helper.collect_kpi()
825         else:
826             result = {
827                 "packets_in": 0,
828                 "packets_fwd": 0,
829                 "packets_dropped": 0,
830             }
831         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
832         return result
833
834     def scale(self, flavor=""):
835         """The SampleVNF base class doesn't provide the 'scale' feature"""
836         raise y_exceptions.FunctionNotImplemented(
837             function_name='scale', class_name='SampleVNFTrafficGen')
838
839
840 class SampleVNFTrafficGen(GenericTrafficGen):
841     """ Class providing file-like API for generic traffic generator """
842
843     APP_NAME = 'Sample'
844     RUN_WAIT = 1
845
846     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
847         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
848         self.bin_path = get_nsb_option('bin_path', '')
849
850         self.scenario_helper = ScenarioHelper(self.name)
851         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
852
853         if setup_env_helper_type is None:
854             setup_env_helper_type = SetupEnvHelper
855
856         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
857                                                   self.ssh_helper,
858                                                   self.scenario_helper)
859
860         if resource_helper_type is None:
861             resource_helper_type = ClientResourceHelper
862
863         self.resource_helper = resource_helper_type(self.setup_helper)
864
865         self.runs_traffic = True
866         self.traffic_finished = False
867         self._tg_process = None
868         self._traffic_process = None
869
870     def _start_server(self):
871         # we can't share ssh paramiko objects to force new connection
872         self.ssh_helper.drop_connection()
873
874     def instantiate(self, scenario_cfg, context_cfg):
875         self.scenario_helper.scenario_cfg = scenario_cfg
876         self.resource_helper.setup()
877         # must generate_cfg after DPDK bind because we need port number
878         self.resource_helper.generate_cfg()
879
880         LOG.info("Starting %s server...", self.APP_NAME)
881         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
882         self._tg_process = Process(name=name, target=self._start_server)
883         self._tg_process.start()
884
885     def _check_status(self):
886         raise NotImplementedError
887
888     def _wait_for_process(self):
889         while True:
890             if not self._tg_process.is_alive():
891                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
892             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
893             time.sleep(1)
894             status = self._check_status()
895             if status == 0:
896                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
897                 return self._tg_process.exitcode
898
899     def _traffic_runner(self, traffic_profile):
900         # always drop connections first thing in new processes
901         # so we don't get paramiko errors
902         self.ssh_helper.drop_connection()
903         LOG.info("Starting %s client...", self.APP_NAME)
904         self.resource_helper.run_traffic(traffic_profile)
905
906     def run_traffic(self, traffic_profile):
907         """ Generate traffic on the wire according to the given params.
908         Method is non-blocking, returns immediately when traffic process
909         is running. Mandatory.
910
911         :param traffic_profile:
912         :return: True/False
913         """
914         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
915                                     os.getpid())
916         self._traffic_process = Process(name=name, target=self._traffic_runner,
917                                         args=(traffic_profile,))
918         self._traffic_process.start()
919         # Wait for traffic process to start
920         while self.resource_helper.client_started.value == 0:
921             time.sleep(self.RUN_WAIT)
922             # what if traffic process takes a few seconds to start?
923             if not self._traffic_process.is_alive():
924                 break
925
926         return self._traffic_process.is_alive()
927
928     def collect_kpi(self):
929         # check if the tg processes have exited
930         for proc in (self._tg_process, self._traffic_process):
931             check_if_process_failed(proc)
932         result = self.resource_helper.collect_kpi()
933         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
934         return result
935
936     def terminate(self):
937         """ After this method finishes, all traffic processes should stop. Mandatory.
938
939         :return: True/False
940         """
941         self.traffic_finished = True
942         # we must kill client before we kill the server, or the client will raise exception
943         if self._traffic_process is not None:
944             # be proper and try to join before terminating
945             LOG.debug("joining before terminate %s", self._traffic_process.name)
946             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
947             self._traffic_process.terminate()
948         if self._tg_process is not None:
949             # be proper and try to join before terminating
950             LOG.debug("joining before terminate %s", self._tg_process.name)
951             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
952             self._tg_process.terminate()
953         # no terminate children here because we share processes with vnf
954
955     def scale(self, flavor=""):
956         """A traffic generator VFN doesn't provide the 'scale' feature"""
957         raise y_exceptions.FunctionNotImplemented(
958             function_name='scale', class_name='SampleVNFTrafficGen')