add yardstick iruya 9.0.0 release notes
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import decimal
17 from multiprocessing import Queue, Value, Process, JoinableQueue
18 import os
19 import posixpath
20 import re
21 import subprocess
22 import time
23
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 from yardstick.benchmark.contexts.node import NodeContext
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SetupEnvHelper(object):
48
49     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
52     PIPELINE_COMMAND = ''
53     VNF_TYPE = "SAMPLE"
54
55     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56         super(SetupEnvHelper, self).__init__()
57         self.vnfd_helper = vnfd_helper
58         self.ssh_helper = ssh_helper
59         self.scenario_helper = scenario_helper
60         self.collectd_options = {}
61
62     def build_config(self):
63         raise NotImplementedError
64
65     def setup_vnf_environment(self):
66         pass
67
68     def kill_vnf(self):
69         pass
70
71     def tear_down(self):
72         raise NotImplementedError
73
74
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
76
77     APP_NAME = 'DpdkVnf'
78     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
80
81     @staticmethod
82     def _update_packet_type(ip_pipeline_cfg, traffic_options):
83         match_str = 'pkt_type = ipv4'
84         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86         return pipeline_config_str
87
88     @classmethod
89     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90         traffic_type = traffic_options['traffic_type']
91
92         if traffic_options['vnf_type'] is not cls.APP_NAME:
93             match_str = 'traffic_type = 4'
94             replace_str = 'traffic_type = {0}'.format(traffic_type)
95
96         elif traffic_type == 4:
97             match_str = 'pkt_type = ipv4'
98             replace_str = 'pkt_type = ipv4'
99
100         else:
101             match_str = 'pkt_type = ipv4'
102             replace_str = 'pkt_type = ipv6'
103
104         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105         return pipeline_config_str
106
107     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109         self.all_ports = None
110         self.bound_pci = None
111         self.socket = None
112         self.used_drivers = None
113         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
114
115     def build_config(self):
116         vnf_cfg = self.scenario_helper.vnf_cfg
117         task_path = self.scenario_helper.task_path
118
119         config_file = vnf_cfg.get('file')
120         lb_count = vnf_cfg.get('lb_count', 3)
121         lb_config = vnf_cfg.get('lb_config', 'SW')
122         worker_config = vnf_cfg.get('worker_config', '1C/1T')
123         worker_threads = vnf_cfg.get('worker_threads', 3)
124
125         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
126         traffic_options = {
127             'traffic_type': traffic_type,
128             'pkt_type': 'ipv%s' % traffic_type,
129             'vnf_type': self.VNF_TYPE,
130         }
131
132         # read actions/rules from file
133         acl_options = None
134         acl_file_name = self.scenario_helper.options.get('rules')
135         if acl_file_name:
136             with utils.open_relative_file(acl_file_name, task_path) as infile:
137                 acl_options = yaml_loader.yaml_load(infile)
138
139         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
140                                                   task_path)
141         config_basename = posixpath.basename(self.CFG_CONFIG)
142         script_basename = posixpath.basename(self.CFG_SCRIPT)
143         multiport = MultiPortConfig(self.scenario_helper.topology,
144                                     config_tpl_cfg,
145                                     config_basename,
146                                     self.vnfd_helper,
147                                     self.VNF_TYPE,
148                                     lb_count,
149                                     worker_threads,
150                                     worker_config,
151                                     lb_config,
152                                     self.socket)
153
154         multiport.generate_config()
155         if config_file:
156             with utils.open_relative_file(config_file, task_path) as infile:
157                 new_config = ['[EAL]']
158                 vpci = []
159                 for port in self.vnfd_helper.port_pairs.all_ports:
160                     interface = self.vnfd_helper.find_interface(name=port)
161                     vpci.append(interface['virtual-interface']["vpci"])
162                 new_config.extend('w = {0}'.format(item) for item in vpci)
163                 new_config = '\n'.join(new_config) + '\n' + infile.read()
164         else:
165             with open(self.CFG_CONFIG) as handle:
166                 new_config = handle.read()
167             new_config = self._update_traffic_type(new_config, traffic_options)
168             new_config = self._update_packet_type(new_config, traffic_options)
169         self.ssh_helper.upload_config_file(config_basename, new_config)
170         self.ssh_helper.upload_config_file(script_basename,
171             multiport.generate_script(self.vnfd_helper,
172                                       self.get_flows_config(acl_options)))
173
174         LOG.info("Provision and start the %s", self.APP_NAME)
175         self._build_pipeline_kwargs()
176         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
177
178     def get_flows_config(self, options=None): # pylint: disable=unused-argument
179         """No actions/rules (flows) by default"""
180         return None
181
182     def _build_pipeline_kwargs(self, cfg_file=None, script=None):
183         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
184         # count the number of actual ports in the list of pairs
185         # remove duplicate ports
186         # this is really a mapping from LINK ID to DPDK PMD ID
187         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
188         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
189         ports = self.vnfd_helper.port_pairs.all_ports
190         port_nums = self.vnfd_helper.port_nums(ports)
191         # create mask from all the dpdk port numbers
192         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
193
194         vnf_cfg = self.scenario_helper.vnf_cfg
195         lb_config = vnf_cfg.get('lb_config', 'SW')
196         worker_threads = vnf_cfg.get('worker_threads', 3)
197         hwlb = ''
198         if lb_config == 'HW':
199             hwlb = ' --hwlb %s' % worker_threads
200
201         self.pipeline_kwargs = {
202             'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
203             'script': script if script else self.CFG_SCRIPT,
204             'port_mask_hex': ports_mask_hex,
205             'tool_path': tool_path,
206             'hwlb': hwlb,
207         }
208
209     def setup_vnf_environment(self):
210         self._setup_dpdk()
211         self.kill_vnf()
212         # bind before _setup_resources so we can use dpdk_port_num
213         self._detect_and_bind_drivers()
214         resource = self._setup_resources()
215         return resource
216
217     def kill_vnf(self):
218         # pkill is not matching, debug with pgrep
219         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
220         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
221         # have to use exact match
222         # try using killall to match
223         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
224
225     def _setup_dpdk(self):
226         """Setup DPDK environment needed for VNF to run"""
227         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
228         utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
229         self.dpdk_bind_helper.load_dpdk_driver()
230
231         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
232         if exit_status == 0:
233             return
234         else:
235             LOG.critical("DPDK Driver not installed")
236             return
237
238     def _setup_resources(self):
239         # what is this magic?  how do we know which socket is for which port?
240         # what about quad-socket?
241         if any(v[5] == "0" for v in self.bound_pci):
242             self.socket = 0
243         else:
244             self.socket = 1
245
246         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
247         # this won't work because we don't have DPDK port numbers yet
248         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
249         port_names = (intf["name"] for intf in ports)
250         plugins = self.collectd_options.get("plugins", {})
251         interval = self.collectd_options.get("interval")
252         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
253         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
254                                plugins=plugins, interval=interval,
255                                timeout=self.scenario_helper.timeout)
256
257     def _check_interface_fields(self):
258         num_nodes = len(self.scenario_helper.nodes)
259         # OpenStack instance creation time is probably proportional to the number
260         # of instances
261         timeout = 120 * num_nodes
262         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
263                              self.ssh_helper, timeout)
264         dpdk_node.check()
265
266     def _detect_and_bind_drivers(self):
267         interfaces = self.vnfd_helper.interfaces
268
269         self._check_interface_fields()
270         # check for bound after probe
271         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
272
273         self.dpdk_bind_helper.read_status()
274         self.dpdk_bind_helper.save_used_drivers()
275
276         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
277
278         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
279         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
280             try:
281                 intf = next(v for v in interfaces
282                             if vpci == v['virtual-interface']['vpci'])
283                 # force to int
284                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
285             except:  # pylint: disable=bare-except
286                 pass
287         time.sleep(2)
288
289     def get_local_iface_name_by_vpci(self, vpci):
290         find_net_cmd = self.FIND_NET_CMD.format(vpci)
291         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
292         if exit_status == 0:
293             return stdout
294         return None
295
296     def tear_down(self):
297         self.dpdk_bind_helper.rebind_drivers()
298
299
300 class ResourceHelper(object):
301
302     COLLECT_KPI = ''
303     MAKE_INSTALL = 'cd {0} && make && sudo make install'
304     RESOURCE_WORD = 'sample'
305
306     COLLECT_MAP = {}
307
308     def __init__(self, setup_helper):
309         super(ResourceHelper, self).__init__()
310         self.resource = None
311         self.setup_helper = setup_helper
312         self.ssh_helper = setup_helper.ssh_helper
313         self._enable = True
314
315     def setup(self):
316         self.resource = self.setup_helper.setup_vnf_environment()
317
318     def generate_cfg(self):
319         pass
320
321     def update_from_context(self, context, attr_name):
322         """Disable resource helper in case of baremetal context.
323
324         And update appropriate node collectd options in context
325         """
326         if isinstance(context, NodeContext):
327             self._enable = False
328             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
329                                                      attr_name)
330
331     def _collect_resource_kpi(self):
332         result = {}
333         status = self.resource.check_if_system_agent_running("collectd")[0]
334         if status == 0 and self._enable:
335             result = self.resource.amqp_collect_nfvi_kpi()
336
337         result = {"core": result}
338         return result
339
340     def start_collect(self):
341         if self._enable:
342             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
343             self.resource.start()
344             self.resource.amqp_process_for_nfvi_kpi()
345
346     def stop_collect(self):
347         if self.resource and self._enable:
348             self.resource.stop()
349
350     def collect_kpi(self):
351         return self._collect_resource_kpi()
352
353
354 class ClientResourceHelper(ResourceHelper):
355
356     RUN_DURATION = 60
357     QUEUE_WAIT_TIME = 5
358     SYNC_PORT = 1
359     ASYNC_PORT = 2
360
361     def __init__(self, setup_helper):
362         super(ClientResourceHelper, self).__init__(setup_helper)
363         self.vnfd_helper = setup_helper.vnfd_helper
364         self.scenario_helper = setup_helper.scenario_helper
365
366         self.client = None
367         self.client_started = Value('i', 0)
368         self.all_ports = None
369         self._queue = Queue()
370         self._result = {}
371         self._terminated = Value('i', 0)
372
373     def _build_ports(self):
374         self.networks = self.vnfd_helper.port_pairs.networks
375         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
376         self.downlink_ports = \
377             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
378         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
379
380     def port_num(self, intf):
381         # by default return port num
382         return self.vnfd_helper.port_num(intf)
383
384     def get_stats(self, *args, **kwargs):
385         try:
386             return self.client.get_stats(*args, **kwargs)
387         except STLError:
388             LOG.error('TRex client not connected')
389             return {}
390
391     def _get_samples(self, ports, port_pg_id=False):
392         raise NotImplementedError()
393
394     def _run_traffic_once(self, traffic_profile):
395         traffic_profile.execute_traffic(self)
396         self.client_started.value = 1
397         time.sleep(self.RUN_DURATION)
398         samples = self._get_samples(traffic_profile.ports)
399         time.sleep(self.QUEUE_WAIT_TIME)
400         self._queue.put(samples)
401
402     def run_traffic(self, traffic_profile):
403         # if we don't do this we can hang waiting for the queue to drain
404         # have to do this in the subprocess
405         self._queue.cancel_join_thread()
406         # fixme: fix passing correct trex config file,
407         # instead of searching the default path
408         try:
409             self._build_ports()
410             self.client = self._connect()
411             if self.client is None:
412                 LOG.critical("Failure to Connect ... unable to continue")
413                 return
414
415             self.client.reset(ports=self.all_ports)
416             self.client.remove_all_streams(self.all_ports)  # remove all streams
417             traffic_profile.register_generator(self)
418
419             while self._terminated.value == 0:
420                 if self._run_traffic_once(traffic_profile):
421                     self._terminated.value = 1
422
423             self.client.stop(self.all_ports)
424             self.client.disconnect()
425             self._terminated.value = 0
426         except STLError:
427             if self._terminated.value:
428                 LOG.debug("traffic generator is stopped")
429                 return  # return if trex/tg server is stopped.
430             raise
431
432     def terminate(self):
433         self._terminated.value = 1  # stop client
434
435     def clear_stats(self, ports=None):
436         if ports is None:
437             ports = self.all_ports
438         self.client.clear_stats(ports=ports)
439
440     def start(self, ports=None, *args, **kwargs):
441         # pylint: disable=keyword-arg-before-vararg
442         # NOTE(ralonsoh): defining keyworded arguments before variable
443         # positional arguments is a bug. This function definition doesn't work
444         # in Python 2, although it works in Python 3. Reference:
445         # https://www.python.org/dev/peps/pep-3102/
446         if ports is None:
447             ports = self.all_ports
448         self.client.start(ports=ports, *args, **kwargs)
449
450     def collect_kpi(self):
451         if not self._queue.empty():
452             kpi = self._queue.get()
453             self._result.update(kpi)
454             LOG.debug('Got KPIs from _queue for %s %s',
455                       self.scenario_helper.name, self.RESOURCE_WORD)
456         return self._result
457
458     def _connect(self, client=None):
459         if client is None:
460             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
461                                server=self.vnfd_helper.mgmt_interface["ip"],
462                                verbose_level=LoggerApi.VERBOSE_QUIET)
463
464         # try to connect with 5s intervals
465         for idx in range(6):
466             try:
467                 client.connect()
468                 for idx2 in range(6):
469                     if client.is_connected():
470                         return client
471                     LOG.info("Waiting to confirm connection %s .. Attempt %s",
472                              idx, idx2)
473                     time.sleep(1)
474                 client.disconnect(stop_traffic=True, release_ports=True)
475             except STLError:
476                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
477                 time.sleep(5)
478
479         if client.is_connected():
480             return client
481         else:
482             LOG.critical("Connection failure ..TRex username: %s server: %s",
483                          self.vnfd_helper.mgmt_interface["user"],
484                          self.vnfd_helper.mgmt_interface["ip"])
485             return None
486
487 class Rfc2544ResourceHelper(object):
488
489     DEFAULT_CORRELATED_TRAFFIC = False
490     DEFAULT_LATENCY = False
491     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
492     DEFAULT_RESOLUTION = '0.1'
493
494     def __init__(self, scenario_helper):
495         super(Rfc2544ResourceHelper, self).__init__()
496         self.scenario_helper = scenario_helper
497         self._correlated_traffic = None
498         self.iteration = Value('i', 0)
499         self._latency = None
500         self._rfc2544 = None
501         self._tolerance_low = None
502         self._tolerance_high = None
503         self._tolerance_precision = None
504         self._resolution = None
505
506     @property
507     def rfc2544(self):
508         if self._rfc2544 is None:
509             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
510         return self._rfc2544
511
512     @property
513     def tolerance_low(self):
514         if self._tolerance_low is None:
515             self.get_rfc_tolerance()
516         return self._tolerance_low
517
518     @property
519     def tolerance_high(self):
520         if self._tolerance_high is None:
521             self.get_rfc_tolerance()
522         return self._tolerance_high
523
524     @property
525     def tolerance_precision(self):
526         if self._tolerance_precision is None:
527             self.get_rfc_tolerance()
528         return self._tolerance_precision
529
530     @property
531     def correlated_traffic(self):
532         if self._correlated_traffic is None:
533             self._correlated_traffic = \
534                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
535
536         return self._correlated_traffic
537
538     @property
539     def latency(self):
540         if self._latency is None:
541             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
542         return self._latency
543
544     @property
545     def resolution(self):
546         if self._resolution is None:
547             self._resolution = float(self.get_rfc2544('resolution',
548                                                 self.DEFAULT_RESOLUTION))
549         return self._resolution
550
551     def get_rfc2544(self, name, default=None):
552         return self.rfc2544.get(name, default)
553
554     def get_rfc_tolerance(self):
555         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
556         tolerance_iter = iter(sorted(
557             decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
558         tolerance_low = next(tolerance_iter)
559         tolerance_high = next(tolerance_iter, tolerance_low)
560         self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
561         self._tolerance_high = float(tolerance_high)
562         self._tolerance_low = float(tolerance_low)
563
564
565 class SampleVNFDeployHelper(object):
566
567     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
568     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
569     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
570
571     def __init__(self, vnfd_helper, ssh_helper):
572         super(SampleVNFDeployHelper, self).__init__()
573         self.ssh_helper = ssh_helper
574         self.vnfd_helper = vnfd_helper
575
576     def deploy_vnfs(self, app_name):
577         vnf_bin = self.ssh_helper.join_bin_path(app_name)
578         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
579         if not exit_status:
580             return
581
582         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
583         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
584         time.sleep(2)
585         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
586         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
587
588         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
589         time.sleep(2)
590         http_proxy = os.environ.get('http_proxy', '')
591         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
592         LOG.debug(cmd)
593         self.ssh_helper.execute(cmd)
594         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
595         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
596         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
597
598
599 class ScenarioHelper(object):
600
601     DEFAULT_VNF_CFG = {
602         'lb_config': 'SW',
603         'lb_count': 1,
604         'worker_config': '1C/1T',
605         'worker_threads': 1,
606     }
607
608     def __init__(self, name):
609         self.name = name
610         self.scenario_cfg = None
611
612     @property
613     def task_path(self):
614         return self.scenario_cfg['task_path']
615
616     @property
617     def nodes(self):
618         return self.scenario_cfg.get('nodes')
619
620     @property
621     def all_options(self):
622         return self.scenario_cfg.get('options', {})
623
624     @property
625     def options(self):
626         return self.all_options.get(self.name, {})
627
628     @property
629     def vnf_cfg(self):
630         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
631
632     @property
633     def topology(self):
634         return self.scenario_cfg['topology']
635
636     @property
637     def timeout(self):
638         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
639             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
640         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
641         return test_duration if test_duration > test_timeout else test_timeout
642
643 class SampleVNF(GenericVNF):
644     """ Class providing file-like API for generic VNF implementation """
645
646     VNF_PROMPT = "pipeline>"
647     WAIT_TIME = 1
648     WAIT_TIME_FOR_SCRIPT = 10
649     APP_NAME = "SampleVNF"
650     # we run the VNF interactively, so the ssh command will timeout after this long
651
652     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
653         super(SampleVNF, self).__init__(name, vnfd)
654         self.bin_path = get_nsb_option('bin_path', '')
655
656         self.scenario_helper = ScenarioHelper(self.name)
657         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
658
659         if setup_env_helper_type is None:
660             setup_env_helper_type = SetupEnvHelper
661
662         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
663                                                   self.ssh_helper,
664                                                   self.scenario_helper)
665
666         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
667
668         if resource_helper_type is None:
669             resource_helper_type = ResourceHelper
670
671         self.resource_helper = resource_helper_type(self.setup_helper)
672
673         self.context_cfg = None
674         self.pipeline_kwargs = {}
675         self.uplink_ports = None
676         self.downlink_ports = None
677         # NOTE(esm): make QueueFileWrapper invert-able so that we
678         #            never have to manage the queues
679         self.q_in = Queue()
680         self.q_out = Queue()
681         self.queue_wrapper = None
682         self.run_kwargs = {}
683         self.used_drivers = {}
684         self.vnf_port_pairs = None
685         self._vnf_process = None
686
687     def _start_vnf(self):
688         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
689         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
690         self._vnf_process = Process(name=name, target=self._run)
691         self._vnf_process.start()
692
693     def _vnf_up_post(self):
694         pass
695
696     def instantiate(self, scenario_cfg, context_cfg):
697         self._update_collectd_options(scenario_cfg, context_cfg)
698         self.scenario_helper.scenario_cfg = scenario_cfg
699         self.context_cfg = context_cfg
700         self.resource_helper.update_from_context(
701             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
702             self.scenario_helper.nodes[self.name]
703         )
704
705         # vnf deploy is unsupported, use ansible playbooks
706         if self.scenario_helper.options.get("vnf_deploy", False):
707             self.deploy_helper.deploy_vnfs(self.APP_NAME)
708         self.resource_helper.setup()
709         self._start_vnf()
710
711     def _update_collectd_options(self, scenario_cfg, context_cfg):
712         """Update collectd configuration options
713         This function retrieves all collectd options contained in the test case
714
715         definition builds a single dictionary combining them. The following fragment
716         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
717         ---
718         schema: yardstick:task:0.1
719         scenarios:
720         - type: NSPerf
721           nodes:
722             tg__0: trafficgen_0.yardstick
723             vnf__0: vnf_0.yardstick
724           options:
725             collectd:
726               <options>  # COLLECTD priority 3
727             vnf__0:
728               collectd:
729                 plugins:
730                     load
731                 <options> # COLLECTD priority 2
732         context:
733           type: Node
734           name: yardstick
735           nfvi_type: baremetal
736           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
737         """
738         scenario_options = scenario_cfg.get('options', {})
739         generic_options = scenario_options.get('collectd', {})
740         scenario_node_options = scenario_options.get(self.name, {})\
741             .get('collectd', {})
742         context_node_options = context_cfg.get('nodes', {})\
743             .get(self.name, {}).get('collectd', {})
744
745         options = generic_options
746         self._update_options(options, scenario_node_options)
747         self._update_options(options, context_node_options)
748
749         self.setup_helper.collectd_options = options
750
751     def _update_options(self, options, additional_options):
752         """Update collectd options and plugins dictionary"""
753         for k, v in additional_options.items():
754             if isinstance(v, dict) and k in options:
755                 options[k].update(v)
756             else:
757                 options[k] = v
758
759     def wait_for_instantiate(self):
760         buf = []
761         time.sleep(self.WAIT_TIME)  # Give some time for config to load
762         while True:
763             if not self._vnf_process.is_alive():
764                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
765
766             # NOTE(esm): move to QueueFileWrapper
767             while self.q_out.qsize() > 0:
768                 buf.append(self.q_out.get())
769                 message = ''.join(buf)
770                 if self.VNF_PROMPT in message:
771                     LOG.info("%s VNF is up and running.", self.APP_NAME)
772                     self._vnf_up_post()
773                     self.queue_wrapper.clear()
774                     return self._vnf_process.exitcode
775
776                 if "PANIC" in message:
777                     raise RuntimeError("Error starting %s VNF." %
778                                        self.APP_NAME)
779
780             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
781             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
782             # Send ENTER to display a new prompt in case the prompt text was corrupted
783             # by other VNF output
784             self.q_in.put('\r\n')
785
786     def wait_for_initialize(self):
787         buf = []
788         vnf_prompt_found = False
789         prompt_command = '\r\n'
790         script_name = 'non_existent_script_name'
791         done_string = 'Cannot open file "{}"'.format(script_name)
792         time.sleep(self.WAIT_TIME)  # Give some time for config to load
793         while True:
794             if not self._vnf_process.is_alive():
795                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
796             while self.q_out.qsize() > 0:
797                 buf.append(self.q_out.get())
798                 message = ''.join(buf)
799
800                 if self.VNF_PROMPT in message and not vnf_prompt_found:
801                     # Once we got VNF promt, it doesn't mean that the VNF is
802                     # up and running/initialized completely. But we can run
803                     # addition (any) VNF command and wait for it to complete
804                     # as it will be finished ONLY at the end of the VNF
805                     # initialization. So, this approach can be used to
806                     # indentify that VNF is completely initialized.
807                     LOG.info("Got %s VNF prompt.", self.APP_NAME)
808                     prompt_command = "run {}\r\n".format(script_name)
809                     self.q_in.put(prompt_command)
810                     # Cut the buffer since we are not interesting to find
811                     # the VNF prompt anymore
812                     prompt_pos = message.find(self.VNF_PROMPT)
813                     buf = [message[prompt_pos + len(self.VNF_PROMPT):]]
814                     vnf_prompt_found = True
815                     continue
816
817                 if done_string in message:
818                     LOG.info("%s VNF is up and running.", self.APP_NAME)
819                     self._vnf_up_post()
820                     self.queue_wrapper.clear()
821                     return self._vnf_process.exitcode
822
823                 if "PANIC" in message:
824                     raise RuntimeError("Error starting %s VNF." %
825                                        self.APP_NAME)
826
827             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
828             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
829             # Send command again to display the expected prompt in case the
830             # expected text was corrupted by other VNF output
831             self.q_in.put(prompt_command)
832
833     def start_collect(self):
834         self.resource_helper.start_collect()
835
836     def stop_collect(self):
837         self.resource_helper.stop_collect()
838
839     def _build_run_kwargs(self):
840         self.run_kwargs = {
841             'stdin': self.queue_wrapper,
842             'stdout': self.queue_wrapper,
843             'keep_stdin_open': True,
844             'pty': True,
845             'timeout': self.scenario_helper.timeout,
846         }
847
848     def _build_config(self):
849         return self.setup_helper.build_config()
850
851     def _run(self):
852         # we can't share ssh paramiko objects to force new connection
853         self.ssh_helper.drop_connection()
854         cmd = self._build_config()
855         # kill before starting
856         self.setup_helper.kill_vnf()
857
858         LOG.debug(cmd)
859         self._build_run_kwargs()
860         self.ssh_helper.run(cmd, **self.run_kwargs)
861
862     def vnf_execute(self, cmd, wait_time=2):
863         """ send cmd to vnf process """
864
865         LOG.info("%s command: %s", self.APP_NAME, cmd)
866         self.q_in.put("{}\r\n".format(cmd))
867         time.sleep(wait_time)
868         output = []
869         while self.q_out.qsize() > 0:
870             output.append(self.q_out.get())
871         return "".join(output)
872
873     def _tear_down(self):
874         pass
875
876     def terminate(self):
877         self.vnf_execute("quit")
878         self.setup_helper.kill_vnf()
879         self._tear_down()
880         self.resource_helper.stop_collect()
881         if self._vnf_process is not None:
882             # be proper and join first before we kill
883             LOG.debug("joining before terminate %s", self._vnf_process.name)
884             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
885             self._vnf_process.terminate()
886         # no terminate children here because we share processes with tg
887
888     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
889         """Method for checking the statistics
890
891         This method could be overridden in children classes.
892
893         :return: VNF statistics
894         """
895         cmd = 'p {0} stats'.format(self.APP_WORD)
896         out = self.vnf_execute(cmd)
897         return out
898
899     def collect_kpi(self):
900         # we can't get KPIs if the VNF is down
901         check_if_process_failed(self._vnf_process, 0.01)
902         stats = self.get_stats()
903         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
904         physical_node = Context.get_physical_node_from_server(
905             self.scenario_helper.nodes[self.name])
906
907         result = {"physical_node": physical_node}
908         if m:
909             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
910             result["collect_stats"] = self.resource_helper.collect_kpi()
911         else:
912             result.update({"packets_in": 0,
913                            "packets_fwd": 0,
914                            "packets_dropped": 0})
915
916         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
917         return result
918
919     def scale(self, flavor=""):
920         """The SampleVNF base class doesn't provide the 'scale' feature"""
921         raise y_exceptions.FunctionNotImplemented(
922             function_name='scale', class_name='SampleVNFTrafficGen')
923
924
925 class SampleVNFTrafficGen(GenericTrafficGen):
926     """ Class providing file-like API for generic traffic generator """
927
928     APP_NAME = 'Sample'
929     RUN_WAIT = 1
930
931     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
932         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
933         self.bin_path = get_nsb_option('bin_path', '')
934
935         self.scenario_helper = ScenarioHelper(self.name)
936         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
937
938         if setup_env_helper_type is None:
939             setup_env_helper_type = SetupEnvHelper
940
941         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
942                                                   self.ssh_helper,
943                                                   self.scenario_helper)
944
945         if resource_helper_type is None:
946             resource_helper_type = ClientResourceHelper
947
948         self.resource_helper = resource_helper_type(self.setup_helper)
949
950         self.runs_traffic = True
951         self.traffic_finished = False
952         self._tg_process = None
953         self._traffic_process = None
954         self._tasks_queue = JoinableQueue()
955         self._result_queue = Queue()
956
957     def _test_runner(self, traffic_profile, tasks, results):
958         self.resource_helper.run_test(traffic_profile, tasks, results)
959
960     def _init_traffic_process(self, traffic_profile):
961         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
962                                     traffic_profile.__class__.__name__,
963                                     os.getpid())
964         self._traffic_process = Process(name=name, target=self._test_runner,
965                                         args=(
966                                         traffic_profile, self._tasks_queue,
967                                         self._result_queue))
968
969         self._traffic_process.start()
970         while self.resource_helper.client_started.value == 0:
971             time.sleep(1)
972             if not self._traffic_process.is_alive():
973                 break
974
975     def run_traffic_once(self, traffic_profile):
976         if self.resource_helper.client_started.value == 0:
977             self._init_traffic_process(traffic_profile)
978
979         # continue test - run next iteration
980         LOG.info("Run next iteration ...")
981         self._tasks_queue.put('RUN_TRAFFIC')
982
983     def wait_on_traffic(self):
984         self._tasks_queue.join()
985         result = self._result_queue.get()
986         return result
987
988     def _start_server(self):
989         # we can't share ssh paramiko objects to force new connection
990         self.ssh_helper.drop_connection()
991
992     def instantiate(self, scenario_cfg, context_cfg):
993         self.scenario_helper.scenario_cfg = scenario_cfg
994         self.resource_helper.update_from_context(
995             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
996             self.scenario_helper.nodes[self.name]
997         )
998
999         self.resource_helper.context_cfg = context_cfg
1000
1001         self.resource_helper.setup()
1002         # must generate_cfg after DPDK bind because we need port number
1003         self.resource_helper.generate_cfg()
1004
1005         LOG.info("Starting %s server...", self.APP_NAME)
1006         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
1007         self._tg_process = Process(name=name, target=self._start_server)
1008         self._tg_process.start()
1009
1010     def _check_status(self):
1011         raise NotImplementedError
1012
1013     def _wait_for_process(self):
1014         while True:
1015             if not self._tg_process.is_alive():
1016                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
1017             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
1018             time.sleep(1)
1019             status = self._check_status()
1020             if status == 0:
1021                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
1022                 return self._tg_process.exitcode
1023
1024     def _traffic_runner(self, traffic_profile):
1025         # always drop connections first thing in new processes
1026         # so we don't get paramiko errors
1027         self.ssh_helper.drop_connection()
1028         LOG.info("Starting %s client...", self.APP_NAME)
1029         self.resource_helper.run_traffic(traffic_profile)
1030
1031     def run_traffic(self, traffic_profile):
1032         """ Generate traffic on the wire according to the given params.
1033         Method is non-blocking, returns immediately when traffic process
1034         is running. Mandatory.
1035
1036         :param traffic_profile:
1037         :return: True/False
1038         """
1039         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
1040                                     os.getpid())
1041         self._traffic_process = Process(name=name, target=self._traffic_runner,
1042                                         args=(traffic_profile,))
1043         self._traffic_process.start()
1044         # Wait for traffic process to start
1045         while self.resource_helper.client_started.value == 0:
1046             time.sleep(self.RUN_WAIT)
1047             # what if traffic process takes a few seconds to start?
1048             if not self._traffic_process.is_alive():
1049                 break
1050
1051         return self._traffic_process.is_alive()
1052
1053     def collect_kpi(self):
1054         # check if the tg processes have exited
1055         physical_node = Context.get_physical_node_from_server(
1056             self.scenario_helper.nodes[self.name])
1057
1058         result = {"physical_node": physical_node}
1059         for proc in (self._tg_process, self._traffic_process):
1060             check_if_process_failed(proc)
1061
1062         result["collect_stats"] = self.resource_helper.collect_kpi()
1063         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1064         return result
1065
1066     def terminate(self):
1067         """ After this method finishes, all traffic processes should stop. Mandatory.
1068
1069         :return: True/False
1070         """
1071         self.traffic_finished = True
1072         # we must kill client before we kill the server, or the client will raise exception
1073         if self._traffic_process is not None:
1074             # be proper and try to join before terminating
1075             LOG.debug("joining before terminate %s", self._traffic_process.name)
1076             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
1077             self._traffic_process.terminate()
1078         if self._tg_process is not None:
1079             # be proper and try to join before terminating
1080             LOG.debug("joining before terminate %s", self._tg_process.name)
1081             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
1082             self._tg_process.terminate()
1083         # no terminate children here because we share processes with vnf
1084
1085     def scale(self, flavor=""):
1086         """A traffic generator VFN doesn't provide the 'scale' feature"""
1087         raise y_exceptions.FunctionNotImplemented(
1088             function_name='scale', class_name='SampleVNFTrafficGen')