Merge "Improve SSH to Open/Close interactive terminal"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import decimal
17 from multiprocessing import Queue, Value, Process
18 import os
19 import posixpath
20 import re
21 import subprocess
22 import time
23
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 from yardstick.benchmark.contexts.node import NodeContext
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SetupEnvHelper(object):
48
49     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
52     PIPELINE_COMMAND = ''
53     VNF_TYPE = "SAMPLE"
54
55     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56         super(SetupEnvHelper, self).__init__()
57         self.vnfd_helper = vnfd_helper
58         self.ssh_helper = ssh_helper
59         self.scenario_helper = scenario_helper
60         self.collectd_options = {}
61
62     def build_config(self):
63         raise NotImplementedError
64
65     def setup_vnf_environment(self):
66         pass
67
68     def kill_vnf(self):
69         pass
70
71     def tear_down(self):
72         raise NotImplementedError
73
74
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
76
77     APP_NAME = 'DpdkVnf'
78     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
80
81     @staticmethod
82     def _update_packet_type(ip_pipeline_cfg, traffic_options):
83         match_str = 'pkt_type = ipv4'
84         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86         return pipeline_config_str
87
88     @classmethod
89     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90         traffic_type = traffic_options['traffic_type']
91
92         if traffic_options['vnf_type'] is not cls.APP_NAME:
93             match_str = 'traffic_type = 4'
94             replace_str = 'traffic_type = {0}'.format(traffic_type)
95
96         elif traffic_type == 4:
97             match_str = 'pkt_type = ipv4'
98             replace_str = 'pkt_type = ipv4'
99
100         else:
101             match_str = 'pkt_type = ipv4'
102             replace_str = 'pkt_type = ipv6'
103
104         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105         return pipeline_config_str
106
107     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109         self.all_ports = None
110         self.bound_pci = None
111         self.socket = None
112         self.used_drivers = None
113         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
114
115     def build_config(self):
116         vnf_cfg = self.scenario_helper.vnf_cfg
117         task_path = self.scenario_helper.task_path
118
119         config_file = vnf_cfg.get('file')
120         lb_count = vnf_cfg.get('lb_count', 3)
121         lb_config = vnf_cfg.get('lb_config', 'SW')
122         worker_config = vnf_cfg.get('worker_config', '1C/1T')
123         worker_threads = vnf_cfg.get('worker_threads', 3)
124
125         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
126         traffic_options = {
127             'traffic_type': traffic_type,
128             'pkt_type': 'ipv%s' % traffic_type,
129             'vnf_type': self.VNF_TYPE,
130         }
131
132         # read actions/rules from file
133         acl_options = None
134         acl_file_name = self.scenario_helper.options.get('rules')
135         if acl_file_name:
136             with utils.open_relative_file(acl_file_name, task_path) as infile:
137                 acl_options = yaml_loader.yaml_load(infile)
138
139         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
140                                                   task_path)
141         config_basename = posixpath.basename(self.CFG_CONFIG)
142         script_basename = posixpath.basename(self.CFG_SCRIPT)
143         multiport = MultiPortConfig(self.scenario_helper.topology,
144                                     config_tpl_cfg,
145                                     config_basename,
146                                     self.vnfd_helper,
147                                     self.VNF_TYPE,
148                                     lb_count,
149                                     worker_threads,
150                                     worker_config,
151                                     lb_config,
152                                     self.socket)
153
154         multiport.generate_config()
155         if config_file:
156             with utils.open_relative_file(config_file, task_path) as infile:
157                 new_config = ['[EAL]']
158                 vpci = []
159                 for port in self.vnfd_helper.port_pairs.all_ports:
160                     interface = self.vnfd_helper.find_interface(name=port)
161                     vpci.append(interface['virtual-interface']["vpci"])
162                 new_config.extend('w = {0}'.format(item) for item in vpci)
163                 new_config = '\n'.join(new_config) + '\n' + infile.read()
164         else:
165             with open(self.CFG_CONFIG) as handle:
166                 new_config = handle.read()
167             new_config = self._update_traffic_type(new_config, traffic_options)
168             new_config = self._update_packet_type(new_config, traffic_options)
169         self.ssh_helper.upload_config_file(config_basename, new_config)
170         self.ssh_helper.upload_config_file(script_basename,
171             multiport.generate_script(self.vnfd_helper,
172                                       self.get_flows_config(acl_options)))
173
174         LOG.info("Provision and start the %s", self.APP_NAME)
175         self._build_pipeline_kwargs()
176         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
177
178     def get_flows_config(self, options=None): # pylint: disable=unused-argument
179         """No actions/rules (flows) by default"""
180         return None
181
182     def _build_pipeline_kwargs(self, cfg_file=None, script=None):
183         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
184         # count the number of actual ports in the list of pairs
185         # remove duplicate ports
186         # this is really a mapping from LINK ID to DPDK PMD ID
187         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
188         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
189         ports = self.vnfd_helper.port_pairs.all_ports
190         port_nums = self.vnfd_helper.port_nums(ports)
191         # create mask from all the dpdk port numbers
192         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
193
194         vnf_cfg = self.scenario_helper.vnf_cfg
195         lb_config = vnf_cfg.get('lb_config', 'SW')
196         worker_threads = vnf_cfg.get('worker_threads', 3)
197         hwlb = ''
198         if lb_config == 'HW':
199             hwlb = ' --hwlb %s' % worker_threads
200
201         self.pipeline_kwargs = {
202             'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
203             'script': script if script else self.CFG_SCRIPT,
204             'port_mask_hex': ports_mask_hex,
205             'tool_path': tool_path,
206             'hwlb': hwlb,
207         }
208
209     def setup_vnf_environment(self):
210         self._setup_dpdk()
211         self.kill_vnf()
212         # bind before _setup_resources so we can use dpdk_port_num
213         self._detect_and_bind_drivers()
214         resource = self._setup_resources()
215         return resource
216
217     def kill_vnf(self):
218         # pkill is not matching, debug with pgrep
219         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
220         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
221         # have to use exact match
222         # try using killall to match
223         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
224
225     def _setup_dpdk(self):
226         """Setup DPDK environment needed for VNF to run"""
227         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
228         utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
229         self.dpdk_bind_helper.load_dpdk_driver()
230
231         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
232         if exit_status == 0:
233             return
234
235     def _setup_resources(self):
236         # what is this magic?  how do we know which socket is for which port?
237         # what about quad-socket?
238         if any(v[5] == "0" for v in self.bound_pci):
239             self.socket = 0
240         else:
241             self.socket = 1
242
243         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
244         # this won't work because we don't have DPDK port numbers yet
245         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
246         port_names = (intf["name"] for intf in ports)
247         plugins = self.collectd_options.get("plugins", {})
248         interval = self.collectd_options.get("interval")
249         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
250         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
251                                plugins=plugins, interval=interval,
252                                timeout=self.scenario_helper.timeout)
253
254     def _check_interface_fields(self):
255         num_nodes = len(self.scenario_helper.nodes)
256         # OpenStack instance creation time is probably proportional to the number
257         # of instances
258         timeout = 120 * num_nodes
259         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
260                              self.ssh_helper, timeout)
261         dpdk_node.check()
262
263     def _detect_and_bind_drivers(self):
264         interfaces = self.vnfd_helper.interfaces
265
266         self._check_interface_fields()
267         # check for bound after probe
268         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
269
270         self.dpdk_bind_helper.read_status()
271         self.dpdk_bind_helper.save_used_drivers()
272
273         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
274
275         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
276         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
277             try:
278                 intf = next(v for v in interfaces
279                             if vpci == v['virtual-interface']['vpci'])
280                 # force to int
281                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
282             except:  # pylint: disable=bare-except
283                 pass
284         time.sleep(2)
285
286     def get_local_iface_name_by_vpci(self, vpci):
287         find_net_cmd = self.FIND_NET_CMD.format(vpci)
288         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
289         if exit_status == 0:
290             return stdout
291         return None
292
293     def tear_down(self):
294         self.dpdk_bind_helper.rebind_drivers()
295
296
297 class ResourceHelper(object):
298
299     COLLECT_KPI = ''
300     MAKE_INSTALL = 'cd {0} && make && sudo make install'
301     RESOURCE_WORD = 'sample'
302
303     COLLECT_MAP = {}
304
305     def __init__(self, setup_helper):
306         super(ResourceHelper, self).__init__()
307         self.resource = None
308         self.setup_helper = setup_helper
309         self.ssh_helper = setup_helper.ssh_helper
310         self._enable = True
311
312     def setup(self):
313         self.resource = self.setup_helper.setup_vnf_environment()
314
315     def generate_cfg(self):
316         pass
317
318     def update_from_context(self, context, attr_name):
319         """Disable resource helper in case of baremetal context.
320
321         And update appropriate node collectd options in context
322         """
323         if isinstance(context, NodeContext):
324             self._enable = False
325             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
326                                                      attr_name)
327
328     def _collect_resource_kpi(self):
329         result = {}
330         status = self.resource.check_if_system_agent_running("collectd")[0]
331         if status == 0 and self._enable:
332             result = self.resource.amqp_collect_nfvi_kpi()
333
334         result = {"core": result}
335         return result
336
337     def start_collect(self):
338         if self._enable:
339             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
340             self.resource.start()
341             self.resource.amqp_process_for_nfvi_kpi()
342
343     def stop_collect(self):
344         if self.resource and self._enable:
345             self.resource.stop()
346
347     def collect_kpi(self):
348         return self._collect_resource_kpi()
349
350
351 class ClientResourceHelper(ResourceHelper):
352
353     RUN_DURATION = 60
354     QUEUE_WAIT_TIME = 5
355     SYNC_PORT = 1
356     ASYNC_PORT = 2
357
358     def __init__(self, setup_helper):
359         super(ClientResourceHelper, self).__init__(setup_helper)
360         self.vnfd_helper = setup_helper.vnfd_helper
361         self.scenario_helper = setup_helper.scenario_helper
362
363         self.client = None
364         self.client_started = Value('i', 0)
365         self.all_ports = None
366         self._queue = Queue()
367         self._result = {}
368         self._terminated = Value('i', 0)
369
370     def _build_ports(self):
371         self.networks = self.vnfd_helper.port_pairs.networks
372         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
373         self.downlink_ports = \
374             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
375         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
376
377     def port_num(self, intf):
378         # by default return port num
379         return self.vnfd_helper.port_num(intf)
380
381     def get_stats(self, *args, **kwargs):
382         try:
383             return self.client.get_stats(*args, **kwargs)
384         except STLError:
385             LOG.error('TRex client not connected')
386             return {}
387
388     def _get_samples(self, ports, port_pg_id=False):
389         raise NotImplementedError()
390
391     def _run_traffic_once(self, traffic_profile):
392         traffic_profile.execute_traffic(self)
393         self.client_started.value = 1
394         time.sleep(self.RUN_DURATION)
395         samples = self._get_samples(traffic_profile.ports)
396         time.sleep(self.QUEUE_WAIT_TIME)
397         self._queue.put(samples)
398
399     def run_traffic(self, traffic_profile):
400         # if we don't do this we can hang waiting for the queue to drain
401         # have to do this in the subprocess
402         self._queue.cancel_join_thread()
403         # fixme: fix passing correct trex config file,
404         # instead of searching the default path
405         try:
406             self._build_ports()
407             self.client = self._connect()
408             if self.client is None:
409                 LOG.critical("Failure to Connect ... unable to continue")
410                 return
411
412             self.client.reset(ports=self.all_ports)
413             self.client.remove_all_streams(self.all_ports)  # remove all streams
414             traffic_profile.register_generator(self)
415
416             while self._terminated.value == 0:
417                 if self._run_traffic_once(traffic_profile):
418                     self._terminated.value = 1
419
420             self.client.stop(self.all_ports)
421             self.client.disconnect()
422             self._terminated.value = 0
423         except STLError:
424             if self._terminated.value:
425                 LOG.debug("traffic generator is stopped")
426                 return  # return if trex/tg server is stopped.
427             raise
428
429     def terminate(self):
430         self._terminated.value = 1  # stop client
431
432     def clear_stats(self, ports=None):
433         if ports is None:
434             ports = self.all_ports
435         self.client.clear_stats(ports=ports)
436
437     def start(self, ports=None, *args, **kwargs):
438         # pylint: disable=keyword-arg-before-vararg
439         # NOTE(ralonsoh): defining keyworded arguments before variable
440         # positional arguments is a bug. This function definition doesn't work
441         # in Python 2, although it works in Python 3. Reference:
442         # https://www.python.org/dev/peps/pep-3102/
443         if ports is None:
444             ports = self.all_ports
445         self.client.start(ports=ports, *args, **kwargs)
446
447     def collect_kpi(self):
448         if not self._queue.empty():
449             kpi = self._queue.get()
450             self._result.update(kpi)
451             LOG.debug('Got KPIs from _queue for %s %s',
452                       self.scenario_helper.name, self.RESOURCE_WORD)
453         return self._result
454
455     def _connect(self, client=None):
456         if client is None:
457             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
458                                server=self.vnfd_helper.mgmt_interface["ip"],
459                                verbose_level=LoggerApi.VERBOSE_QUIET)
460
461         # try to connect with 5s intervals
462         for idx in range(6):
463             try:
464                 client.connect()
465                 for idx2 in range(6):
466                     if client.is_connected():
467                         return client
468                     LOG.info("Waiting to confirm connection %s .. Attempt %s",
469                              idx, idx2)
470                     time.sleep(1)
471                 client.disconnect(stop_traffic=True, release_ports=True)
472             except STLError:
473                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
474                 time.sleep(5)
475
476         if client.is_connected():
477             return client
478         else:
479             LOG.critical("Connection failure ..TRex username: %s server: %s",
480                          self.vnfd_helper.mgmt_interface["user"],
481                          self.vnfd_helper.mgmt_interface["ip"])
482             return None
483
484 class Rfc2544ResourceHelper(object):
485
486     DEFAULT_CORRELATED_TRAFFIC = False
487     DEFAULT_LATENCY = False
488     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
489     DEFAULT_RESOLUTION = '0.1'
490
491     def __init__(self, scenario_helper):
492         super(Rfc2544ResourceHelper, self).__init__()
493         self.scenario_helper = scenario_helper
494         self._correlated_traffic = None
495         self.iteration = Value('i', 0)
496         self._latency = None
497         self._rfc2544 = None
498         self._tolerance_low = None
499         self._tolerance_high = None
500         self._tolerance_precision = None
501         self._resolution = None
502
503     @property
504     def rfc2544(self):
505         if self._rfc2544 is None:
506             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
507         return self._rfc2544
508
509     @property
510     def tolerance_low(self):
511         if self._tolerance_low is None:
512             self.get_rfc_tolerance()
513         return self._tolerance_low
514
515     @property
516     def tolerance_high(self):
517         if self._tolerance_high is None:
518             self.get_rfc_tolerance()
519         return self._tolerance_high
520
521     @property
522     def tolerance_precision(self):
523         if self._tolerance_precision is None:
524             self.get_rfc_tolerance()
525         return self._tolerance_precision
526
527     @property
528     def correlated_traffic(self):
529         if self._correlated_traffic is None:
530             self._correlated_traffic = \
531                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
532
533         return self._correlated_traffic
534
535     @property
536     def latency(self):
537         if self._latency is None:
538             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
539         return self._latency
540
541     @property
542     def resolution(self):
543         if self._resolution is None:
544             self._resolution = float(self.get_rfc2544('resolution',
545                                                 self.DEFAULT_RESOLUTION))
546         return self._resolution
547
548     def get_rfc2544(self, name, default=None):
549         return self.rfc2544.get(name, default)
550
551     def get_rfc_tolerance(self):
552         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
553         tolerance_iter = iter(sorted(
554             decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
555         tolerance_low = next(tolerance_iter)
556         tolerance_high = next(tolerance_iter, tolerance_low)
557         self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
558         self._tolerance_high = float(tolerance_high)
559         self._tolerance_low = float(tolerance_low)
560
561
562 class SampleVNFDeployHelper(object):
563
564     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
565     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
566     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
567
568     def __init__(self, vnfd_helper, ssh_helper):
569         super(SampleVNFDeployHelper, self).__init__()
570         self.ssh_helper = ssh_helper
571         self.vnfd_helper = vnfd_helper
572
573     def deploy_vnfs(self, app_name):
574         vnf_bin = self.ssh_helper.join_bin_path(app_name)
575         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
576         if not exit_status:
577             return
578
579         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
580         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
581         time.sleep(2)
582         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
583         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
584
585         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
586         time.sleep(2)
587         http_proxy = os.environ.get('http_proxy', '')
588         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
589         LOG.debug(cmd)
590         self.ssh_helper.execute(cmd)
591         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
592         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
593         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
594
595
596 class ScenarioHelper(object):
597
598     DEFAULT_VNF_CFG = {
599         'lb_config': 'SW',
600         'lb_count': 1,
601         'worker_config': '1C/1T',
602         'worker_threads': 1,
603     }
604
605     def __init__(self, name):
606         self.name = name
607         self.scenario_cfg = None
608
609     @property
610     def task_path(self):
611         return self.scenario_cfg['task_path']
612
613     @property
614     def nodes(self):
615         return self.scenario_cfg.get('nodes')
616
617     @property
618     def all_options(self):
619         return self.scenario_cfg.get('options', {})
620
621     @property
622     def options(self):
623         return self.all_options.get(self.name, {})
624
625     @property
626     def vnf_cfg(self):
627         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
628
629     @property
630     def topology(self):
631         return self.scenario_cfg['topology']
632
633     @property
634     def timeout(self):
635         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
636             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
637         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
638         return test_duration if test_duration > test_timeout else test_timeout
639
640 class SampleVNF(GenericVNF):
641     """ Class providing file-like API for generic VNF implementation """
642
643     VNF_PROMPT = "pipeline>"
644     WAIT_TIME = 1
645     WAIT_TIME_FOR_SCRIPT = 10
646     APP_NAME = "SampleVNF"
647     # we run the VNF interactively, so the ssh command will timeout after this long
648
649     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
650         super(SampleVNF, self).__init__(name, vnfd)
651         self.bin_path = get_nsb_option('bin_path', '')
652
653         self.scenario_helper = ScenarioHelper(self.name)
654         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
655
656         if setup_env_helper_type is None:
657             setup_env_helper_type = SetupEnvHelper
658
659         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
660                                                   self.ssh_helper,
661                                                   self.scenario_helper)
662
663         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
664
665         if resource_helper_type is None:
666             resource_helper_type = ResourceHelper
667
668         self.resource_helper = resource_helper_type(self.setup_helper)
669
670         self.context_cfg = None
671         self.pipeline_kwargs = {}
672         self.uplink_ports = None
673         self.downlink_ports = None
674         # NOTE(esm): make QueueFileWrapper invert-able so that we
675         #            never have to manage the queues
676         self.q_in = Queue()
677         self.q_out = Queue()
678         self.queue_wrapper = None
679         self.run_kwargs = {}
680         self.used_drivers = {}
681         self.vnf_port_pairs = None
682         self._vnf_process = None
683
684     def _start_vnf(self):
685         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
686         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
687         self._vnf_process = Process(name=name, target=self._run)
688         self._vnf_process.start()
689
690     def _vnf_up_post(self):
691         pass
692
693     def instantiate(self, scenario_cfg, context_cfg):
694         self._update_collectd_options(scenario_cfg, context_cfg)
695         self.scenario_helper.scenario_cfg = scenario_cfg
696         self.context_cfg = context_cfg
697         self.resource_helper.update_from_context(
698             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
699             self.scenario_helper.nodes[self.name]
700         )
701
702         # vnf deploy is unsupported, use ansible playbooks
703         if self.scenario_helper.options.get("vnf_deploy", False):
704             self.deploy_helper.deploy_vnfs(self.APP_NAME)
705         self.resource_helper.setup()
706         self._start_vnf()
707
708     def _update_collectd_options(self, scenario_cfg, context_cfg):
709         """Update collectd configuration options
710         This function retrieves all collectd options contained in the test case
711
712         definition builds a single dictionary combining them. The following fragment
713         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
714         ---
715         schema: yardstick:task:0.1
716         scenarios:
717         - type: NSPerf
718           nodes:
719             tg__0: trafficgen_1.yardstick
720             vnf__0: vnf.yardstick
721           options:
722             collectd:
723               <options>  # COLLECTD priority 3
724             vnf__0:
725               collectd:
726                 plugins:
727                     load
728                 <options> # COLLECTD priority 2
729         context:
730           type: Node
731           name: yardstick
732           nfvi_type: baremetal
733           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
734         """
735         scenario_options = scenario_cfg.get('options', {})
736         generic_options = scenario_options.get('collectd', {})
737         scenario_node_options = scenario_options.get(self.name, {})\
738             .get('collectd', {})
739         context_node_options = context_cfg.get('nodes', {})\
740             .get(self.name, {}).get('collectd', {})
741
742         options = generic_options
743         self._update_options(options, scenario_node_options)
744         self._update_options(options, context_node_options)
745
746         self.setup_helper.collectd_options = options
747
748     def _update_options(self, options, additional_options):
749         """Update collectd options and plugins dictionary"""
750         for k, v in additional_options.items():
751             if isinstance(v, dict) and k in options:
752                 options[k].update(v)
753             else:
754                 options[k] = v
755
756     def wait_for_instantiate(self):
757         buf = []
758         time.sleep(self.WAIT_TIME)  # Give some time for config to load
759         while True:
760             if not self._vnf_process.is_alive():
761                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
762
763             # NOTE(esm): move to QueueFileWrapper
764             while self.q_out.qsize() > 0:
765                 buf.append(self.q_out.get())
766                 message = ''.join(buf)
767                 if self.VNF_PROMPT in message:
768                     LOG.info("%s VNF is up and running.", self.APP_NAME)
769                     self._vnf_up_post()
770                     self.queue_wrapper.clear()
771                     return self._vnf_process.exitcode
772
773                 if "PANIC" in message:
774                     raise RuntimeError("Error starting %s VNF." %
775                                        self.APP_NAME)
776
777             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
778             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
779             # Send ENTER to display a new prompt in case the prompt text was corrupted
780             # by other VNF output
781             self.q_in.put('\r\n')
782
783     def wait_for_initialize(self):
784         buf = []
785         vnf_prompt_found = False
786         prompt_command = '\r\n'
787         script_name = 'non_existent_script_name'
788         done_string = 'Cannot open file "{}"'.format(script_name)
789         time.sleep(self.WAIT_TIME)  # Give some time for config to load
790         while True:
791             if not self._vnf_process.is_alive():
792                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
793             while self.q_out.qsize() > 0:
794                 buf.append(self.q_out.get())
795                 message = ''.join(buf)
796
797                 if self.VNF_PROMPT in message and not vnf_prompt_found:
798                     # Once we got VNF promt, it doesn't mean that the VNF is
799                     # up and running/initialized completely. But we can run
800                     # addition (any) VNF command and wait for it to complete
801                     # as it will be finished ONLY at the end of the VNF
802                     # initialization. So, this approach can be used to
803                     # indentify that VNF is completely initialized.
804                     LOG.info("Got %s VNF prompt.", self.APP_NAME)
805                     prompt_command = "run {}\r\n".format(script_name)
806                     self.q_in.put(prompt_command)
807                     # Cut the buffer since we are not interesting to find
808                     # the VNF prompt anymore
809                     prompt_pos = message.find(self.VNF_PROMPT)
810                     buf = [message[prompt_pos + len(self.VNF_PROMPT):]]
811                     vnf_prompt_found = True
812                     continue
813
814                 if done_string in message:
815                     LOG.info("%s VNF is up and running.", self.APP_NAME)
816                     self._vnf_up_post()
817                     self.queue_wrapper.clear()
818                     return self._vnf_process.exitcode
819
820                 if "PANIC" in message:
821                     raise RuntimeError("Error starting %s VNF." %
822                                        self.APP_NAME)
823
824             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
825             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
826             # Send command again to display the expected prompt in case the
827             # expected text was corrupted by other VNF output
828             self.q_in.put(prompt_command)
829
830     def start_collect(self):
831         self.resource_helper.start_collect()
832
833     def stop_collect(self):
834         self.resource_helper.stop_collect()
835
836     def _build_run_kwargs(self):
837         self.run_kwargs = {
838             'stdin': self.queue_wrapper,
839             'stdout': self.queue_wrapper,
840             'keep_stdin_open': True,
841             'pty': True,
842             'timeout': self.scenario_helper.timeout,
843         }
844
845     def _build_config(self):
846         return self.setup_helper.build_config()
847
848     def _run(self):
849         # we can't share ssh paramiko objects to force new connection
850         self.ssh_helper.drop_connection()
851         cmd = self._build_config()
852         # kill before starting
853         self.setup_helper.kill_vnf()
854
855         LOG.debug(cmd)
856         self._build_run_kwargs()
857         self.ssh_helper.run(cmd, **self.run_kwargs)
858
859     def vnf_execute(self, cmd, wait_time=2):
860         """ send cmd to vnf process """
861
862         LOG.info("%s command: %s", self.APP_NAME, cmd)
863         self.q_in.put("{}\r\n".format(cmd))
864         time.sleep(wait_time)
865         output = []
866         while self.q_out.qsize() > 0:
867             output.append(self.q_out.get())
868         return "".join(output)
869
870     def _tear_down(self):
871         pass
872
873     def terminate(self):
874         self.vnf_execute("quit")
875         self.setup_helper.kill_vnf()
876         self._tear_down()
877         self.resource_helper.stop_collect()
878         if self._vnf_process is not None:
879             # be proper and join first before we kill
880             LOG.debug("joining before terminate %s", self._vnf_process.name)
881             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
882             self._vnf_process.terminate()
883         # no terminate children here because we share processes with tg
884
885     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
886         """Method for checking the statistics
887
888         This method could be overridden in children classes.
889
890         :return: VNF statistics
891         """
892         cmd = 'p {0} stats'.format(self.APP_WORD)
893         out = self.vnf_execute(cmd)
894         return out
895
896     def collect_kpi(self):
897         # we can't get KPIs if the VNF is down
898         check_if_process_failed(self._vnf_process, 0.01)
899         stats = self.get_stats()
900         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
901         physical_node = Context.get_physical_node_from_server(
902             self.scenario_helper.nodes[self.name])
903
904         result = {"physical_node": physical_node}
905         if m:
906             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
907             result["collect_stats"] = self.resource_helper.collect_kpi()
908         else:
909             result.update({"packets_in": 0,
910                            "packets_fwd": 0,
911                            "packets_dropped": 0})
912
913         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
914         return result
915
916     def scale(self, flavor=""):
917         """The SampleVNF base class doesn't provide the 'scale' feature"""
918         raise y_exceptions.FunctionNotImplemented(
919             function_name='scale', class_name='SampleVNFTrafficGen')
920
921
922 class SampleVNFTrafficGen(GenericTrafficGen):
923     """ Class providing file-like API for generic traffic generator """
924
925     APP_NAME = 'Sample'
926     RUN_WAIT = 1
927
928     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
929         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
930         self.bin_path = get_nsb_option('bin_path', '')
931
932         self.scenario_helper = ScenarioHelper(self.name)
933         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
934
935         if setup_env_helper_type is None:
936             setup_env_helper_type = SetupEnvHelper
937
938         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
939                                                   self.ssh_helper,
940                                                   self.scenario_helper)
941
942         if resource_helper_type is None:
943             resource_helper_type = ClientResourceHelper
944
945         self.resource_helper = resource_helper_type(self.setup_helper)
946
947         self.runs_traffic = True
948         self.traffic_finished = False
949         self._tg_process = None
950         self._traffic_process = None
951
952     def _start_server(self):
953         # we can't share ssh paramiko objects to force new connection
954         self.ssh_helper.drop_connection()
955
956     def instantiate(self, scenario_cfg, context_cfg):
957         self.scenario_helper.scenario_cfg = scenario_cfg
958         self.resource_helper.update_from_context(
959             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
960             self.scenario_helper.nodes[self.name]
961         )
962
963         self.resource_helper.context_cfg = context_cfg
964
965         self.resource_helper.setup()
966         # must generate_cfg after DPDK bind because we need port number
967         self.resource_helper.generate_cfg()
968
969         LOG.info("Starting %s server...", self.APP_NAME)
970         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
971         self._tg_process = Process(name=name, target=self._start_server)
972         self._tg_process.start()
973
974     def _check_status(self):
975         raise NotImplementedError
976
977     def _wait_for_process(self):
978         while True:
979             if not self._tg_process.is_alive():
980                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
981             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
982             time.sleep(1)
983             status = self._check_status()
984             if status == 0:
985                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
986                 return self._tg_process.exitcode
987
988     def _traffic_runner(self, traffic_profile):
989         # always drop connections first thing in new processes
990         # so we don't get paramiko errors
991         self.ssh_helper.drop_connection()
992         LOG.info("Starting %s client...", self.APP_NAME)
993         self.resource_helper.run_traffic(traffic_profile)
994
995     def run_traffic(self, traffic_profile):
996         """ Generate traffic on the wire according to the given params.
997         Method is non-blocking, returns immediately when traffic process
998         is running. Mandatory.
999
1000         :param traffic_profile:
1001         :return: True/False
1002         """
1003         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
1004                                     os.getpid())
1005         self._traffic_process = Process(name=name, target=self._traffic_runner,
1006                                         args=(traffic_profile,))
1007         self._traffic_process.start()
1008         # Wait for traffic process to start
1009         while self.resource_helper.client_started.value == 0:
1010             time.sleep(self.RUN_WAIT)
1011             # what if traffic process takes a few seconds to start?
1012             if not self._traffic_process.is_alive():
1013                 break
1014
1015         return self._traffic_process.is_alive()
1016
1017     def collect_kpi(self):
1018         # check if the tg processes have exited
1019         physical_node = Context.get_physical_node_from_server(
1020             self.scenario_helper.nodes[self.name])
1021
1022         result = {"physical_node": physical_node}
1023         for proc in (self._tg_process, self._traffic_process):
1024             check_if_process_failed(proc)
1025
1026         result["collect_stats"] = self.resource_helper.collect_kpi()
1027         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1028         return result
1029
1030     def terminate(self):
1031         """ After this method finishes, all traffic processes should stop. Mandatory.
1032
1033         :return: True/False
1034         """
1035         self.traffic_finished = True
1036         # we must kill client before we kill the server, or the client will raise exception
1037         if self._traffic_process is not None:
1038             # be proper and try to join before terminating
1039             LOG.debug("joining before terminate %s", self._traffic_process.name)
1040             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
1041             self._traffic_process.terminate()
1042         if self._tg_process is not None:
1043             # be proper and try to join before terminating
1044             LOG.debug("joining before terminate %s", self._tg_process.name)
1045             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
1046             self._tg_process.terminate()
1047         # no terminate children here because we share processes with vnf
1048
1049     def scale(self, flavor=""):
1050         """A traffic generator VFN doesn't provide the 'scale' feature"""
1051         raise y_exceptions.FunctionNotImplemented(
1052             function_name='scale', class_name='SampleVNFTrafficGen')