Merge "Make OvS to listen on TCP by default"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import decimal
17 from multiprocessing import Queue, Value, Process
18 import os
19 import posixpath
20 import re
21 import uuid
22 import subprocess
23 import time
24
25 import six
26
27 from trex_stl_lib.trex_stl_client import LoggerApi
28 from trex_stl_lib.trex_stl_client import STLClient
29 from trex_stl_lib.trex_stl_exceptions import STLError
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.common import utils
34 from yardstick.common import yaml_loader
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
44 from yardstick.benchmark.contexts.node import NodeContext
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SetupEnvHelper(object):
50
51     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
52     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
53     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
54     PIPELINE_COMMAND = ''
55     VNF_TYPE = "SAMPLE"
56
57     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
58         super(SetupEnvHelper, self).__init__()
59         self.vnfd_helper = vnfd_helper
60         self.ssh_helper = ssh_helper
61         self.scenario_helper = scenario_helper
62         self.collectd_options = {}
63
64     def build_config(self):
65         raise NotImplementedError
66
67     def setup_vnf_environment(self):
68         pass
69
70     def kill_vnf(self):
71         pass
72
73     def tear_down(self):
74         raise NotImplementedError
75
76
77 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
78
79     APP_NAME = 'DpdkVnf'
80     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
81     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
82
83     @staticmethod
84     def _update_packet_type(ip_pipeline_cfg, traffic_options):
85         match_str = 'pkt_type = ipv4'
86         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
87         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
88         return pipeline_config_str
89
90     @classmethod
91     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
92         traffic_type = traffic_options['traffic_type']
93
94         if traffic_options['vnf_type'] is not cls.APP_NAME:
95             match_str = 'traffic_type = 4'
96             replace_str = 'traffic_type = {0}'.format(traffic_type)
97
98         elif traffic_type == 4:
99             match_str = 'pkt_type = ipv4'
100             replace_str = 'pkt_type = ipv4'
101
102         else:
103             match_str = 'pkt_type = ipv4'
104             replace_str = 'pkt_type = ipv6'
105
106         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
107         return pipeline_config_str
108
109     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
110         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
111         self.all_ports = None
112         self.bound_pci = None
113         self.socket = None
114         self.used_drivers = None
115         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
116
117     def _setup_hugepages(self):
118         meminfo = utils.read_meminfo(self.ssh_helper)
119         hp_size_kb = int(meminfo['Hugepagesize'])
120         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
121         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
122         self.ssh_helper.execute('echo %s | sudo tee %s' %
123                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
124         hp = six.BytesIO()
125         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
126         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
127         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
128                  hp_size_kb, nr_hugepages, nr_hugepages_set)
129
130     def build_config(self):
131         vnf_cfg = self.scenario_helper.vnf_cfg
132         task_path = self.scenario_helper.task_path
133
134         config_file = vnf_cfg.get('file')
135         lb_count = vnf_cfg.get('lb_count', 3)
136         lb_config = vnf_cfg.get('lb_config', 'SW')
137         worker_config = vnf_cfg.get('worker_config', '1C/1T')
138         worker_threads = vnf_cfg.get('worker_threads', 3)
139
140         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
141         traffic_options = {
142             'traffic_type': traffic_type,
143             'pkt_type': 'ipv%s' % traffic_type,
144             'vnf_type': self.VNF_TYPE,
145         }
146
147         # read actions/rules from file
148         acl_options = None
149         acl_file_name = self.scenario_helper.options.get('rules')
150         if acl_file_name:
151             with utils.open_relative_file(acl_file_name, task_path) as infile:
152                 acl_options = yaml_loader.yaml_load(infile)
153
154         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
155                                                   task_path)
156         config_basename = posixpath.basename(self.CFG_CONFIG)
157         script_basename = posixpath.basename(self.CFG_SCRIPT)
158         multiport = MultiPortConfig(self.scenario_helper.topology,
159                                     config_tpl_cfg,
160                                     config_basename,
161                                     self.vnfd_helper,
162                                     self.VNF_TYPE,
163                                     lb_count,
164                                     worker_threads,
165                                     worker_config,
166                                     lb_config,
167                                     self.socket)
168
169         multiport.generate_config()
170         if config_file:
171             with utils.open_relative_file(config_file, task_path) as infile:
172                 new_config = ['[EAL]']
173                 vpci = []
174                 for port in self.vnfd_helper.port_pairs.all_ports:
175                     interface = self.vnfd_helper.find_interface(name=port)
176                     vpci.append(interface['virtual-interface']["vpci"])
177                 new_config.extend('w = {0}'.format(item) for item in vpci)
178                 new_config = '\n'.join(new_config) + '\n' + infile.read()
179         else:
180             with open(self.CFG_CONFIG) as handle:
181                 new_config = handle.read()
182             new_config = self._update_traffic_type(new_config, traffic_options)
183             new_config = self._update_packet_type(new_config, traffic_options)
184         self.ssh_helper.upload_config_file(config_basename, new_config)
185         self.ssh_helper.upload_config_file(script_basename,
186             multiport.generate_script(self.vnfd_helper,
187                                       self.get_flows_config(acl_options)))
188
189         LOG.info("Provision and start the %s", self.APP_NAME)
190         self._build_pipeline_kwargs()
191         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
192
193     def get_flows_config(self, options=None): # pylint: disable=unused-argument
194         """No actions/rules (flows) by default"""
195         return None
196
197     def _build_pipeline_kwargs(self):
198         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
199         # count the number of actual ports in the list of pairs
200         # remove duplicate ports
201         # this is really a mapping from LINK ID to DPDK PMD ID
202         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
203         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
204         ports = self.vnfd_helper.port_pairs.all_ports
205         port_nums = self.vnfd_helper.port_nums(ports)
206         # create mask from all the dpdk port numbers
207         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
208
209         vnf_cfg = self.scenario_helper.vnf_cfg
210         lb_config = vnf_cfg.get('lb_config', 'SW')
211         worker_threads = vnf_cfg.get('worker_threads', 3)
212         hwlb = ''
213         if lb_config == 'HW':
214             hwlb = ' --hwlb %s' % worker_threads
215
216         self.pipeline_kwargs = {
217             'cfg_file': self.CFG_CONFIG,
218             'script': self.CFG_SCRIPT,
219             'port_mask_hex': ports_mask_hex,
220             'tool_path': tool_path,
221             'hwlb': hwlb,
222         }
223
224     def setup_vnf_environment(self):
225         self._setup_dpdk()
226         self.kill_vnf()
227         # bind before _setup_resources so we can use dpdk_port_num
228         self._detect_and_bind_drivers()
229         resource = self._setup_resources()
230         return resource
231
232     def kill_vnf(self):
233         # pkill is not matching, debug with pgrep
234         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
235         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
236         # have to use exact match
237         # try using killall to match
238         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
239
240     def _setup_dpdk(self):
241         """Setup DPDK environment needed for VNF to run"""
242         self._setup_hugepages()
243         self.dpdk_bind_helper.load_dpdk_driver()
244
245         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
246         if exit_status == 0:
247             return
248
249     def _setup_resources(self):
250         # what is this magic?  how do we know which socket is for which port?
251         # what about quad-socket?
252         if any(v[5] == "0" for v in self.bound_pci):
253             self.socket = 0
254         else:
255             self.socket = 1
256
257         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
258         # this won't work because we don't have DPDK port numbers yet
259         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
260         port_names = (intf["name"] for intf in ports)
261         plugins = self.collectd_options.get("plugins", {})
262         interval = self.collectd_options.get("interval")
263         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
264         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
265                                plugins=plugins, interval=interval,
266                                timeout=self.scenario_helper.timeout)
267
268     def _check_interface_fields(self):
269         num_nodes = len(self.scenario_helper.nodes)
270         # OpenStack instance creation time is probably proportional to the number
271         # of instances
272         timeout = 120 * num_nodes
273         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
274                              self.ssh_helper, timeout)
275         dpdk_node.check()
276
277     def _detect_and_bind_drivers(self):
278         interfaces = self.vnfd_helper.interfaces
279
280         self._check_interface_fields()
281         # check for bound after probe
282         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
283
284         self.dpdk_bind_helper.read_status()
285         self.dpdk_bind_helper.save_used_drivers()
286
287         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
288
289         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
290         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
291             try:
292                 intf = next(v for v in interfaces
293                             if vpci == v['virtual-interface']['vpci'])
294                 # force to int
295                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
296             except:  # pylint: disable=bare-except
297                 pass
298         time.sleep(2)
299
300     def get_local_iface_name_by_vpci(self, vpci):
301         find_net_cmd = self.FIND_NET_CMD.format(vpci)
302         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
303         if exit_status == 0:
304             return stdout
305         return None
306
307     def tear_down(self):
308         self.dpdk_bind_helper.rebind_drivers()
309
310
311 class ResourceHelper(object):
312
313     COLLECT_KPI = ''
314     MAKE_INSTALL = 'cd {0} && make && sudo make install'
315     RESOURCE_WORD = 'sample'
316
317     COLLECT_MAP = {}
318
319     def __init__(self, setup_helper):
320         super(ResourceHelper, self).__init__()
321         self.resource = None
322         self.setup_helper = setup_helper
323         self.ssh_helper = setup_helper.ssh_helper
324         self._enable = True
325
326     def setup(self):
327         self.resource = self.setup_helper.setup_vnf_environment()
328
329     def generate_cfg(self):
330         pass
331
332     def update_from_context(self, context, attr_name):
333         """Disable resource helper in case of baremetal context.
334
335         And update appropriate node collectd options in context
336         """
337         if isinstance(context, NodeContext):
338             self._enable = False
339             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
340                                                      attr_name)
341
342     def _collect_resource_kpi(self):
343         result = {}
344         status = self.resource.check_if_system_agent_running("collectd")[0]
345         if status == 0 and self._enable:
346             result = self.resource.amqp_collect_nfvi_kpi()
347
348         result = {"core": result}
349         return result
350
351     def start_collect(self):
352         if self._enable:
353             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
354             self.resource.start()
355             self.resource.amqp_process_for_nfvi_kpi()
356
357     def stop_collect(self):
358         if self.resource and self._enable:
359             self.resource.stop()
360
361     def collect_kpi(self):
362         return self._collect_resource_kpi()
363
364
365 class ClientResourceHelper(ResourceHelper):
366
367     RUN_DURATION = 60
368     QUEUE_WAIT_TIME = 5
369     SYNC_PORT = 1
370     ASYNC_PORT = 2
371
372     def __init__(self, setup_helper):
373         super(ClientResourceHelper, self).__init__(setup_helper)
374         self.vnfd_helper = setup_helper.vnfd_helper
375         self.scenario_helper = setup_helper.scenario_helper
376
377         self.client = None
378         self.client_started = Value('i', 0)
379         self.all_ports = None
380         self._queue = Queue()
381         self._result = {}
382         self._terminated = Value('i', 0)
383
384     def _build_ports(self):
385         self.networks = self.vnfd_helper.port_pairs.networks
386         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
387         self.downlink_ports = \
388             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
389         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
390
391     def port_num(self, intf):
392         # by default return port num
393         return self.vnfd_helper.port_num(intf)
394
395     def get_stats(self, *args, **kwargs):
396         try:
397             return self.client.get_stats(*args, **kwargs)
398         except STLError:
399             LOG.error('TRex client not connected')
400             return {}
401
402     def _get_samples(self, ports, port_pg_id=False):
403         raise NotImplementedError()
404
405     def _run_traffic_once(self, traffic_profile):
406         traffic_profile.execute_traffic(self)
407         self.client_started.value = 1
408         time.sleep(self.RUN_DURATION)
409         samples = self._get_samples(traffic_profile.ports)
410         time.sleep(self.QUEUE_WAIT_TIME)
411         self._queue.put(samples)
412
413     def run_traffic(self, traffic_profile, mq_producer):
414         # if we don't do this we can hang waiting for the queue to drain
415         # have to do this in the subprocess
416         self._queue.cancel_join_thread()
417         # fixme: fix passing correct trex config file,
418         # instead of searching the default path
419         mq_producer.tg_method_started()
420         try:
421             self._build_ports()
422             self.client = self._connect()
423             self.client.reset(ports=self.all_ports)
424             self.client.remove_all_streams(self.all_ports)  # remove all streams
425             traffic_profile.register_generator(self)
426
427             iteration_index = 0
428             while self._terminated.value == 0:
429                 iteration_index += 1
430                 if self._run_traffic_once(traffic_profile):
431                     self._terminated.value = 1
432                 mq_producer.tg_method_iteration(iteration_index)
433
434             self.client.stop(self.all_ports)
435             self.client.disconnect()
436             self._terminated.value = 0
437         except STLError:
438             if self._terminated.value:
439                 LOG.debug("traffic generator is stopped")
440                 return  # return if trex/tg server is stopped.
441             raise
442
443         mq_producer.tg_method_finished()
444
445     def terminate(self):
446         self._terminated.value = 1  # stop client
447
448     def clear_stats(self, ports=None):
449         if ports is None:
450             ports = self.all_ports
451         self.client.clear_stats(ports=ports)
452
453     def start(self, ports=None, *args, **kwargs):
454         # pylint: disable=keyword-arg-before-vararg
455         # NOTE(ralonsoh): defining keyworded arguments before variable
456         # positional arguments is a bug. This function definition doesn't work
457         # in Python 2, although it works in Python 3. Reference:
458         # https://www.python.org/dev/peps/pep-3102/
459         if ports is None:
460             ports = self.all_ports
461         self.client.start(ports=ports, *args, **kwargs)
462
463     def collect_kpi(self):
464         if not self._queue.empty():
465             kpi = self._queue.get()
466             self._result.update(kpi)
467             LOG.debug('Got KPIs from _queue for %s %s',
468                       self.scenario_helper.name, self.RESOURCE_WORD)
469         return self._result
470
471     def _connect(self, client=None):
472         if client is None:
473             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
474                                server=self.vnfd_helper.mgmt_interface["ip"],
475                                verbose_level=LoggerApi.VERBOSE_QUIET)
476
477         # try to connect with 5s intervals, 30s max
478         for idx in range(6):
479             try:
480                 client.connect()
481                 break
482             except STLError:
483                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
484                 time.sleep(5)
485         return client
486
487
488 class Rfc2544ResourceHelper(object):
489
490     DEFAULT_CORRELATED_TRAFFIC = False
491     DEFAULT_LATENCY = False
492     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
493
494     def __init__(self, scenario_helper):
495         super(Rfc2544ResourceHelper, self).__init__()
496         self.scenario_helper = scenario_helper
497         self._correlated_traffic = None
498         self.iteration = Value('i', 0)
499         self._latency = None
500         self._rfc2544 = None
501         self._tolerance_low = None
502         self._tolerance_high = None
503         self._tolerance_precision = None
504
505     @property
506     def rfc2544(self):
507         if self._rfc2544 is None:
508             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
509         return self._rfc2544
510
511     @property
512     def tolerance_low(self):
513         if self._tolerance_low is None:
514             self.get_rfc_tolerance()
515         return self._tolerance_low
516
517     @property
518     def tolerance_high(self):
519         if self._tolerance_high is None:
520             self.get_rfc_tolerance()
521         return self._tolerance_high
522
523     @property
524     def tolerance_precision(self):
525         if self._tolerance_precision is None:
526             self.get_rfc_tolerance()
527         return self._tolerance_precision
528
529     @property
530     def correlated_traffic(self):
531         if self._correlated_traffic is None:
532             self._correlated_traffic = \
533                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
534
535         return self._correlated_traffic
536
537     @property
538     def latency(self):
539         if self._latency is None:
540             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
541         return self._latency
542
543     def get_rfc2544(self, name, default=None):
544         return self.rfc2544.get(name, default)
545
546     def get_rfc_tolerance(self):
547         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
548         tolerance_iter = iter(sorted(
549             decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
550         tolerance_low = next(tolerance_iter)
551         tolerance_high = next(tolerance_iter, tolerance_low)
552         self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
553         self._tolerance_high = float(tolerance_high)
554         self._tolerance_low = float(tolerance_low)
555
556
557 class SampleVNFDeployHelper(object):
558
559     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
560     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
561     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
562
563     def __init__(self, vnfd_helper, ssh_helper):
564         super(SampleVNFDeployHelper, self).__init__()
565         self.ssh_helper = ssh_helper
566         self.vnfd_helper = vnfd_helper
567
568     def deploy_vnfs(self, app_name):
569         vnf_bin = self.ssh_helper.join_bin_path(app_name)
570         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
571         if not exit_status:
572             return
573
574         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
575         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
576         time.sleep(2)
577         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
578         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
579
580         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
581         time.sleep(2)
582         http_proxy = os.environ.get('http_proxy', '')
583         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
584         LOG.debug(cmd)
585         self.ssh_helper.execute(cmd)
586         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
587         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
588         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
589
590
591 class ScenarioHelper(object):
592
593     DEFAULT_VNF_CFG = {
594         'lb_config': 'SW',
595         'lb_count': 1,
596         'worker_config': '1C/1T',
597         'worker_threads': 1,
598     }
599
600     def __init__(self, name):
601         self.name = name
602         self.scenario_cfg = None
603
604     @property
605     def task_path(self):
606         return self.scenario_cfg['task_path']
607
608     @property
609     def nodes(self):
610         return self.scenario_cfg.get('nodes')
611
612     @property
613     def all_options(self):
614         return self.scenario_cfg.get('options', {})
615
616     @property
617     def options(self):
618         return self.all_options.get(self.name, {})
619
620     @property
621     def vnf_cfg(self):
622         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
623
624     @property
625     def topology(self):
626         return self.scenario_cfg['topology']
627
628     @property
629     def timeout(self):
630         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
631             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
632         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
633         return test_duration if test_duration > test_timeout else test_timeout
634
635
636 class SampleVNF(GenericVNF):
637     """ Class providing file-like API for generic VNF implementation """
638
639     VNF_PROMPT = "pipeline>"
640     WAIT_TIME = 1
641     WAIT_TIME_FOR_SCRIPT = 10
642     APP_NAME = "SampleVNF"
643     # we run the VNF interactively, so the ssh command will timeout after this long
644
645     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
646                  resource_helper_type=None):
647         super(SampleVNF, self).__init__(name, vnfd, task_id)
648         self.bin_path = get_nsb_option('bin_path', '')
649
650         self.scenario_helper = ScenarioHelper(self.name)
651         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
652
653         if setup_env_helper_type is None:
654             setup_env_helper_type = SetupEnvHelper
655
656         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
657                                                   self.ssh_helper,
658                                                   self.scenario_helper)
659
660         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
661
662         if resource_helper_type is None:
663             resource_helper_type = ResourceHelper
664
665         self.resource_helper = resource_helper_type(self.setup_helper)
666
667         self.context_cfg = None
668         self.pipeline_kwargs = {}
669         self.uplink_ports = None
670         self.downlink_ports = None
671         # NOTE(esm): make QueueFileWrapper invert-able so that we
672         #            never have to manage the queues
673         self.q_in = Queue()
674         self.q_out = Queue()
675         self.queue_wrapper = None
676         self.run_kwargs = {}
677         self.used_drivers = {}
678         self.vnf_port_pairs = None
679         self._vnf_process = None
680
681     def _start_vnf(self):
682         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
683         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
684         self._vnf_process = Process(name=name, target=self._run)
685         self._vnf_process.start()
686
687     def _vnf_up_post(self):
688         pass
689
690     def instantiate(self, scenario_cfg, context_cfg):
691         self._update_collectd_options(scenario_cfg, context_cfg)
692         self.scenario_helper.scenario_cfg = scenario_cfg
693         self.context_cfg = context_cfg
694         self.resource_helper.update_from_context(
695             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
696             self.scenario_helper.nodes[self.name]
697         )
698
699         # vnf deploy is unsupported, use ansible playbooks
700         if self.scenario_helper.options.get("vnf_deploy", False):
701             self.deploy_helper.deploy_vnfs(self.APP_NAME)
702         self.resource_helper.setup()
703         self._start_vnf()
704
705     def _update_collectd_options(self, scenario_cfg, context_cfg):
706         """Update collectd configuration options
707         This function retrieves all collectd options contained in the test case
708
709         definition builds a single dictionary combining them. The following fragment
710         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
711         ---
712         schema: yardstick:task:0.1
713         scenarios:
714         - type: NSPerf
715           nodes:
716             tg__0: trafficgen_1.yardstick
717             vnf__0: vnf.yardstick
718           options:
719             collectd:
720               <options>  # COLLECTD priority 3
721             vnf__0:
722               collectd:
723                 plugins:
724                     load
725                 <options> # COLLECTD priority 2
726         context:
727           type: Node
728           name: yardstick
729           nfvi_type: baremetal
730           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
731         """
732         scenario_options = scenario_cfg.get('options', {})
733         generic_options = scenario_options.get('collectd', {})
734         scenario_node_options = scenario_options.get(self.name, {})\
735             .get('collectd', {})
736         context_node_options = context_cfg.get('nodes', {})\
737             .get(self.name, {}).get('collectd', {})
738
739         options = generic_options
740         self._update_options(options, scenario_node_options)
741         self._update_options(options, context_node_options)
742
743         self.setup_helper.collectd_options = options
744
745     def _update_options(self, options, additional_options):
746         """Update collectd options and plugins dictionary"""
747         for k, v in additional_options.items():
748             if isinstance(v, dict) and k in options:
749                 options[k].update(v)
750             else:
751                 options[k] = v
752
753     def wait_for_instantiate(self):
754         buf = []
755         time.sleep(self.WAIT_TIME)  # Give some time for config to load
756         while True:
757             if not self._vnf_process.is_alive():
758                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
759
760             # NOTE(esm): move to QueueFileWrapper
761             while self.q_out.qsize() > 0:
762                 buf.append(self.q_out.get())
763                 message = ''.join(buf)
764                 if self.VNF_PROMPT in message:
765                     LOG.info("%s VNF is up and running.", self.APP_NAME)
766                     self._vnf_up_post()
767                     self.queue_wrapper.clear()
768                     return self._vnf_process.exitcode
769
770                 if "PANIC" in message:
771                     raise RuntimeError("Error starting %s VNF." %
772                                        self.APP_NAME)
773
774             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
775             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
776             # Send ENTER to display a new prompt in case the prompt text was corrupted
777             # by other VNF output
778             self.q_in.put('\r\n')
779
780     def start_collect(self):
781         self.resource_helper.start_collect()
782
783     def stop_collect(self):
784         self.resource_helper.stop_collect()
785
786     def _build_run_kwargs(self):
787         self.run_kwargs = {
788             'stdin': self.queue_wrapper,
789             'stdout': self.queue_wrapper,
790             'keep_stdin_open': True,
791             'pty': True,
792             'timeout': self.scenario_helper.timeout,
793         }
794
795     def _build_config(self):
796         return self.setup_helper.build_config()
797
798     def _run(self):
799         # we can't share ssh paramiko objects to force new connection
800         self.ssh_helper.drop_connection()
801         cmd = self._build_config()
802         # kill before starting
803         self.setup_helper.kill_vnf()
804
805         LOG.debug(cmd)
806         self._build_run_kwargs()
807         self.ssh_helper.run(cmd, **self.run_kwargs)
808
809     def vnf_execute(self, cmd, wait_time=2):
810         """ send cmd to vnf process """
811
812         LOG.info("%s command: %s", self.APP_NAME, cmd)
813         self.q_in.put("{}\r\n".format(cmd))
814         time.sleep(wait_time)
815         output = []
816         while self.q_out.qsize() > 0:
817             output.append(self.q_out.get())
818         return "".join(output)
819
820     def _tear_down(self):
821         pass
822
823     def terminate(self):
824         self.vnf_execute("quit")
825         self.setup_helper.kill_vnf()
826         self._tear_down()
827         self.resource_helper.stop_collect()
828         if self._vnf_process is not None:
829             # be proper and join first before we kill
830             LOG.debug("joining before terminate %s", self._vnf_process.name)
831             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
832             self._vnf_process.terminate()
833         # no terminate children here because we share processes with tg
834
835     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
836         """Method for checking the statistics
837
838         This method could be overridden in children classes.
839
840         :return: VNF statistics
841         """
842         cmd = 'p {0} stats'.format(self.APP_WORD)
843         out = self.vnf_execute(cmd)
844         return out
845
846     def collect_kpi(self):
847         # we can't get KPIs if the VNF is down
848         check_if_process_failed(self._vnf_process, 0.01)
849         stats = self.get_stats()
850         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
851         physical_node = Context.get_physical_node_from_server(
852             self.scenario_helper.nodes[self.name])
853
854         result = {"physical_node": physical_node}
855         if m:
856             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
857             result["collect_stats"] = self.resource_helper.collect_kpi()
858         else:
859             result.update({"packets_in": 0,
860                            "packets_fwd": 0,
861                            "packets_dropped": 0})
862
863         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
864         return result
865
866     def scale(self, flavor=""):
867         """The SampleVNF base class doesn't provide the 'scale' feature"""
868         raise y_exceptions.FunctionNotImplemented(
869             function_name='scale', class_name='SampleVNFTrafficGen')
870
871
872 class SampleVNFTrafficGen(GenericTrafficGen):
873     """ Class providing file-like API for generic traffic generator """
874
875     APP_NAME = 'Sample'
876     RUN_WAIT = 1
877
878     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
879                  resource_helper_type=None):
880         super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
881         self.bin_path = get_nsb_option('bin_path', '')
882
883         self.scenario_helper = ScenarioHelper(self.name)
884         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
885
886         if setup_env_helper_type is None:
887             setup_env_helper_type = SetupEnvHelper
888
889         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
890                                                   self.ssh_helper,
891                                                   self.scenario_helper)
892
893         if resource_helper_type is None:
894             resource_helper_type = ClientResourceHelper
895
896         self.resource_helper = resource_helper_type(self.setup_helper)
897
898         self.runs_traffic = True
899         self.traffic_finished = False
900         self._tg_process = None
901         self._traffic_process = None
902
903     def _start_server(self):
904         # we can't share ssh paramiko objects to force new connection
905         self.ssh_helper.drop_connection()
906
907     def instantiate(self, scenario_cfg, context_cfg):
908         self.scenario_helper.scenario_cfg = scenario_cfg
909         self.resource_helper.update_from_context(
910             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
911             self.scenario_helper.nodes[self.name]
912         )
913
914         self.resource_helper.setup()
915         # must generate_cfg after DPDK bind because we need port number
916         self.resource_helper.generate_cfg()
917
918         LOG.info("Starting %s server...", self.APP_NAME)
919         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
920         self._tg_process = Process(name=name, target=self._start_server)
921         self._tg_process.start()
922
923     def _check_status(self):
924         raise NotImplementedError
925
926     def _wait_for_process(self):
927         while True:
928             if not self._tg_process.is_alive():
929                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
930             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
931             time.sleep(1)
932             status = self._check_status()
933             if status == 0:
934                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
935                 return self._tg_process.exitcode
936
937     def _traffic_runner(self, traffic_profile, mq_id):
938         # always drop connections first thing in new processes
939         # so we don't get paramiko errors
940         self.ssh_helper.drop_connection()
941         LOG.info("Starting %s client...", self.APP_NAME)
942         self._mq_producer = self._setup_mq_producer(mq_id)
943         self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
944
945     def run_traffic(self, traffic_profile):
946         """ Generate traffic on the wire according to the given params.
947         Method is non-blocking, returns immediately when traffic process
948         is running. Mandatory.
949
950         :param traffic_profile:
951         :return: True/False
952         """
953         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
954                                     traffic_profile.__class__.__name__,
955                                     os.getpid())
956         self._traffic_process = Process(
957             name=name, target=self._traffic_runner,
958             args=(traffic_profile, uuid.uuid1().int))
959         self._traffic_process.start()
960         # Wait for traffic process to start
961         while self.resource_helper.client_started.value == 0:
962             time.sleep(self.RUN_WAIT)
963             # what if traffic process takes a few seconds to start?
964             if not self._traffic_process.is_alive():
965                 break
966
967     def collect_kpi(self):
968         # check if the tg processes have exited
969         physical_node = Context.get_physical_node_from_server(
970             self.scenario_helper.nodes[self.name])
971
972         result = {"physical_node": physical_node}
973         for proc in (self._tg_process, self._traffic_process):
974             check_if_process_failed(proc)
975
976         result["collect_stats"] = self.resource_helper.collect_kpi()
977         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
978         return result
979
980     def terminate(self):
981         """ After this method finishes, all traffic processes should stop. Mandatory.
982
983         :return: True/False
984         """
985         self.traffic_finished = True
986         # we must kill client before we kill the server, or the client will raise exception
987         if self._traffic_process is not None:
988             # be proper and try to join before terminating
989             LOG.debug("joining before terminate %s", self._traffic_process.name)
990             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
991             self._traffic_process.terminate()
992         if self._tg_process is not None:
993             # be proper and try to join before terminating
994             LOG.debug("joining before terminate %s", self._tg_process.name)
995             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
996             self._tg_process.terminate()
997         # no terminate children here because we share processes with vnf
998
999     def scale(self, flavor=""):
1000         """A traffic generator VFN doesn't provide the 'scale' feature"""
1001         raise y_exceptions.FunctionNotImplemented(
1002             function_name='scale', class_name='SampleVNFTrafficGen')