Cleanup unittests for test_attacker_baremetal
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import decimal
17 from multiprocessing import Queue, Value, Process
18 import os
19 import posixpath
20 import re
21 import uuid
22 import subprocess
23 import time
24
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.common import exceptions as y_exceptions
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.common import utils
33 from yardstick.common import yaml_loader
34 from yardstick.network_services import constants
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
36 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.utils import get_nsb_option
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
42 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
43 from yardstick.benchmark.contexts.node import NodeContext
44
45 LOG = logging.getLogger(__name__)
46
47
48 class SetupEnvHelper(object):
49
50     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
51     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
52     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
53     PIPELINE_COMMAND = ''
54     VNF_TYPE = "SAMPLE"
55
56     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
57         super(SetupEnvHelper, self).__init__()
58         self.vnfd_helper = vnfd_helper
59         self.ssh_helper = ssh_helper
60         self.scenario_helper = scenario_helper
61         self.collectd_options = {}
62
63     def build_config(self):
64         raise NotImplementedError
65
66     def setup_vnf_environment(self):
67         pass
68
69     def kill_vnf(self):
70         pass
71
72     def tear_down(self):
73         raise NotImplementedError
74
75
76 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
77
78     APP_NAME = 'DpdkVnf'
79     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
80     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
81
82     @staticmethod
83     def _update_packet_type(ip_pipeline_cfg, traffic_options):
84         match_str = 'pkt_type = ipv4'
85         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
86         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
87         return pipeline_config_str
88
89     @classmethod
90     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
91         traffic_type = traffic_options['traffic_type']
92
93         if traffic_options['vnf_type'] is not cls.APP_NAME:
94             match_str = 'traffic_type = 4'
95             replace_str = 'traffic_type = {0}'.format(traffic_type)
96
97         elif traffic_type == 4:
98             match_str = 'pkt_type = ipv4'
99             replace_str = 'pkt_type = ipv4'
100
101         else:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv6'
104
105         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
106         return pipeline_config_str
107
108     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
109         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
110         self.all_ports = None
111         self.bound_pci = None
112         self.socket = None
113         self.used_drivers = None
114         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115
116     def build_config(self):
117         vnf_cfg = self.scenario_helper.vnf_cfg
118         task_path = self.scenario_helper.task_path
119
120         config_file = vnf_cfg.get('file')
121         lb_count = vnf_cfg.get('lb_count', 3)
122         lb_config = vnf_cfg.get('lb_config', 'SW')
123         worker_config = vnf_cfg.get('worker_config', '1C/1T')
124         worker_threads = vnf_cfg.get('worker_threads', 3)
125
126         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
127         traffic_options = {
128             'traffic_type': traffic_type,
129             'pkt_type': 'ipv%s' % traffic_type,
130             'vnf_type': self.VNF_TYPE,
131         }
132
133         # read actions/rules from file
134         acl_options = None
135         acl_file_name = self.scenario_helper.options.get('rules')
136         if acl_file_name:
137             with utils.open_relative_file(acl_file_name, task_path) as infile:
138                 acl_options = yaml_loader.yaml_load(infile)
139
140         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
141                                                   task_path)
142         config_basename = posixpath.basename(self.CFG_CONFIG)
143         script_basename = posixpath.basename(self.CFG_SCRIPT)
144         multiport = MultiPortConfig(self.scenario_helper.topology,
145                                     config_tpl_cfg,
146                                     config_basename,
147                                     self.vnfd_helper,
148                                     self.VNF_TYPE,
149                                     lb_count,
150                                     worker_threads,
151                                     worker_config,
152                                     lb_config,
153                                     self.socket)
154
155         multiport.generate_config()
156         if config_file:
157             with utils.open_relative_file(config_file, task_path) as infile:
158                 new_config = ['[EAL]']
159                 vpci = []
160                 for port in self.vnfd_helper.port_pairs.all_ports:
161                     interface = self.vnfd_helper.find_interface(name=port)
162                     vpci.append(interface['virtual-interface']["vpci"])
163                 new_config.extend('w = {0}'.format(item) for item in vpci)
164                 new_config = '\n'.join(new_config) + '\n' + infile.read()
165         else:
166             with open(self.CFG_CONFIG) as handle:
167                 new_config = handle.read()
168             new_config = self._update_traffic_type(new_config, traffic_options)
169             new_config = self._update_packet_type(new_config, traffic_options)
170         self.ssh_helper.upload_config_file(config_basename, new_config)
171         self.ssh_helper.upload_config_file(script_basename,
172             multiport.generate_script(self.vnfd_helper,
173                                       self.get_flows_config(acl_options)))
174
175         LOG.info("Provision and start the %s", self.APP_NAME)
176         self._build_pipeline_kwargs()
177         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
178
179     def get_flows_config(self, options=None): # pylint: disable=unused-argument
180         """No actions/rules (flows) by default"""
181         return None
182
183     def _build_pipeline_kwargs(self, cfg_file=None):
184         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
185         # count the number of actual ports in the list of pairs
186         # remove duplicate ports
187         # this is really a mapping from LINK ID to DPDK PMD ID
188         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
189         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
190         ports = self.vnfd_helper.port_pairs.all_ports
191         port_nums = self.vnfd_helper.port_nums(ports)
192         # create mask from all the dpdk port numbers
193         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
194
195         vnf_cfg = self.scenario_helper.vnf_cfg
196         lb_config = vnf_cfg.get('lb_config', 'SW')
197         worker_threads = vnf_cfg.get('worker_threads', 3)
198         hwlb = ''
199         if lb_config == 'HW':
200             hwlb = ' --hwlb %s' % worker_threads
201
202         self.pipeline_kwargs = {
203             'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
204             'script': self.CFG_SCRIPT,
205             'port_mask_hex': ports_mask_hex,
206             'tool_path': tool_path,
207             'hwlb': hwlb,
208         }
209
210     def setup_vnf_environment(self):
211         self._setup_dpdk()
212         self.kill_vnf()
213         # bind before _setup_resources so we can use dpdk_port_num
214         self._detect_and_bind_drivers()
215         resource = self._setup_resources()
216         return resource
217
218     def kill_vnf(self):
219         # pkill is not matching, debug with pgrep
220         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
221         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
222         # have to use exact match
223         # try using killall to match
224         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
225
226     def _setup_dpdk(self):
227         """Setup DPDK environment needed for VNF to run"""
228         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
229         utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
230         self.dpdk_bind_helper.load_dpdk_driver()
231
232         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
233         if exit_status == 0:
234             return
235
236     def _setup_resources(self):
237         # what is this magic?  how do we know which socket is for which port?
238         # what about quad-socket?
239         if any(v[5] == "0" for v in self.bound_pci):
240             self.socket = 0
241         else:
242             self.socket = 1
243
244         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
245         # this won't work because we don't have DPDK port numbers yet
246         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
247         port_names = (intf["name"] for intf in ports)
248         plugins = self.collectd_options.get("plugins", {})
249         interval = self.collectd_options.get("interval")
250         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
251         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
252                                plugins=plugins, interval=interval,
253                                timeout=self.scenario_helper.timeout)
254
255     def _check_interface_fields(self):
256         num_nodes = len(self.scenario_helper.nodes)
257         # OpenStack instance creation time is probably proportional to the number
258         # of instances
259         timeout = 120 * num_nodes
260         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
261                              self.ssh_helper, timeout)
262         dpdk_node.check()
263
264     def _detect_and_bind_drivers(self):
265         interfaces = self.vnfd_helper.interfaces
266
267         self._check_interface_fields()
268         # check for bound after probe
269         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
270
271         self.dpdk_bind_helper.read_status()
272         self.dpdk_bind_helper.save_used_drivers()
273
274         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
275
276         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
277         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
278             try:
279                 intf = next(v for v in interfaces
280                             if vpci == v['virtual-interface']['vpci'])
281                 # force to int
282                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
283             except:  # pylint: disable=bare-except
284                 pass
285         time.sleep(2)
286
287     def get_local_iface_name_by_vpci(self, vpci):
288         find_net_cmd = self.FIND_NET_CMD.format(vpci)
289         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
290         if exit_status == 0:
291             return stdout
292         return None
293
294     def tear_down(self):
295         self.dpdk_bind_helper.rebind_drivers()
296
297
298 class ResourceHelper(object):
299
300     COLLECT_KPI = ''
301     MAKE_INSTALL = 'cd {0} && make && sudo make install'
302     RESOURCE_WORD = 'sample'
303
304     COLLECT_MAP = {}
305
306     def __init__(self, setup_helper):
307         super(ResourceHelper, self).__init__()
308         self.resource = None
309         self.setup_helper = setup_helper
310         self.ssh_helper = setup_helper.ssh_helper
311         self._enable = True
312
313     def setup(self):
314         self.resource = self.setup_helper.setup_vnf_environment()
315
316     def generate_cfg(self):
317         pass
318
319     def update_from_context(self, context, attr_name):
320         """Disable resource helper in case of baremetal context.
321
322         And update appropriate node collectd options in context
323         """
324         if isinstance(context, NodeContext):
325             self._enable = False
326             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
327                                                      attr_name)
328
329     def _collect_resource_kpi(self):
330         result = {}
331         status = self.resource.check_if_system_agent_running("collectd")[0]
332         if status == 0 and self._enable:
333             result = self.resource.amqp_collect_nfvi_kpi()
334
335         result = {"core": result}
336         return result
337
338     def start_collect(self):
339         if self._enable:
340             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
341             self.resource.start()
342             self.resource.amqp_process_for_nfvi_kpi()
343
344     def stop_collect(self):
345         if self.resource and self._enable:
346             self.resource.stop()
347
348     def collect_kpi(self):
349         return self._collect_resource_kpi()
350
351
352 class ClientResourceHelper(ResourceHelper):
353
354     RUN_DURATION = 60
355     QUEUE_WAIT_TIME = 5
356     SYNC_PORT = 1
357     ASYNC_PORT = 2
358
359     def __init__(self, setup_helper):
360         super(ClientResourceHelper, self).__init__(setup_helper)
361         self.vnfd_helper = setup_helper.vnfd_helper
362         self.scenario_helper = setup_helper.scenario_helper
363
364         self.client = None
365         self.client_started = Value('i', 0)
366         self.all_ports = None
367         self._queue = Queue()
368         self._result = {}
369         self._terminated = Value('i', 0)
370
371     def _build_ports(self):
372         self.networks = self.vnfd_helper.port_pairs.networks
373         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
374         self.downlink_ports = \
375             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
376         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
377
378     def port_num(self, intf):
379         # by default return port num
380         return self.vnfd_helper.port_num(intf)
381
382     def get_stats(self, *args, **kwargs):
383         try:
384             return self.client.get_stats(*args, **kwargs)
385         except STLError:
386             LOG.error('TRex client not connected')
387             return {}
388
389     def _get_samples(self, ports, port_pg_id=False):
390         raise NotImplementedError()
391
392     def _run_traffic_once(self, traffic_profile):
393         traffic_profile.execute_traffic(self)
394         self.client_started.value = 1
395         time.sleep(self.RUN_DURATION)
396         samples = self._get_samples(traffic_profile.ports)
397         time.sleep(self.QUEUE_WAIT_TIME)
398         self._queue.put(samples)
399
400     def run_traffic(self, traffic_profile, mq_producer):
401         # if we don't do this we can hang waiting for the queue to drain
402         # have to do this in the subprocess
403         self._queue.cancel_join_thread()
404         # fixme: fix passing correct trex config file,
405         # instead of searching the default path
406         mq_producer.tg_method_started()
407         try:
408             self._build_ports()
409             self.client = self._connect()
410             self.client.reset(ports=self.all_ports)
411             self.client.remove_all_streams(self.all_ports)  # remove all streams
412             traffic_profile.register_generator(self)
413
414             iteration_index = 0
415             while self._terminated.value == 0:
416                 iteration_index += 1
417                 if self._run_traffic_once(traffic_profile):
418                     self._terminated.value = 1
419                 mq_producer.tg_method_iteration(iteration_index)
420
421             self.client.stop(self.all_ports)
422             self.client.disconnect()
423             self._terminated.value = 0
424         except STLError:
425             if self._terminated.value:
426                 LOG.debug("traffic generator is stopped")
427                 return  # return if trex/tg server is stopped.
428             raise
429
430         mq_producer.tg_method_finished()
431
432     def terminate(self):
433         self._terminated.value = 1  # stop client
434
435     def clear_stats(self, ports=None):
436         if ports is None:
437             ports = self.all_ports
438         self.client.clear_stats(ports=ports)
439
440     def start(self, ports=None, *args, **kwargs):
441         # pylint: disable=keyword-arg-before-vararg
442         # NOTE(ralonsoh): defining keyworded arguments before variable
443         # positional arguments is a bug. This function definition doesn't work
444         # in Python 2, although it works in Python 3. Reference:
445         # https://www.python.org/dev/peps/pep-3102/
446         if ports is None:
447             ports = self.all_ports
448         self.client.start(ports=ports, *args, **kwargs)
449
450     def collect_kpi(self):
451         if not self._queue.empty():
452             kpi = self._queue.get()
453             self._result.update(kpi)
454             LOG.debug('Got KPIs from _queue for %s %s',
455                       self.scenario_helper.name, self.RESOURCE_WORD)
456         return self._result
457
458     def _connect(self, client=None):
459         if client is None:
460             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
461                                server=self.vnfd_helper.mgmt_interface["ip"],
462                                verbose_level=LoggerApi.VERBOSE_QUIET)
463
464         # try to connect with 5s intervals, 30s max
465         for idx in range(6):
466             try:
467                 client.connect()
468                 break
469             except STLError:
470                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
471                 time.sleep(5)
472         return client
473
474
475 class Rfc2544ResourceHelper(object):
476
477     DEFAULT_CORRELATED_TRAFFIC = False
478     DEFAULT_LATENCY = False
479     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
480
481     def __init__(self, scenario_helper):
482         super(Rfc2544ResourceHelper, self).__init__()
483         self.scenario_helper = scenario_helper
484         self._correlated_traffic = None
485         self.iteration = Value('i', 0)
486         self._latency = None
487         self._rfc2544 = None
488         self._tolerance_low = None
489         self._tolerance_high = None
490         self._tolerance_precision = None
491
492     @property
493     def rfc2544(self):
494         if self._rfc2544 is None:
495             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
496         return self._rfc2544
497
498     @property
499     def tolerance_low(self):
500         if self._tolerance_low is None:
501             self.get_rfc_tolerance()
502         return self._tolerance_low
503
504     @property
505     def tolerance_high(self):
506         if self._tolerance_high is None:
507             self.get_rfc_tolerance()
508         return self._tolerance_high
509
510     @property
511     def tolerance_precision(self):
512         if self._tolerance_precision is None:
513             self.get_rfc_tolerance()
514         return self._tolerance_precision
515
516     @property
517     def correlated_traffic(self):
518         if self._correlated_traffic is None:
519             self._correlated_traffic = \
520                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
521
522         return self._correlated_traffic
523
524     @property
525     def latency(self):
526         if self._latency is None:
527             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
528         return self._latency
529
530     def get_rfc2544(self, name, default=None):
531         return self.rfc2544.get(name, default)
532
533     def get_rfc_tolerance(self):
534         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
535         tolerance_iter = iter(sorted(
536             decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
537         tolerance_low = next(tolerance_iter)
538         tolerance_high = next(tolerance_iter, tolerance_low)
539         self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
540         self._tolerance_high = float(tolerance_high)
541         self._tolerance_low = float(tolerance_low)
542
543
544 class SampleVNFDeployHelper(object):
545
546     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
547     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
548     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
549
550     def __init__(self, vnfd_helper, ssh_helper):
551         super(SampleVNFDeployHelper, self).__init__()
552         self.ssh_helper = ssh_helper
553         self.vnfd_helper = vnfd_helper
554
555     def deploy_vnfs(self, app_name):
556         vnf_bin = self.ssh_helper.join_bin_path(app_name)
557         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
558         if not exit_status:
559             return
560
561         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
562         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
563         time.sleep(2)
564         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
565         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
566
567         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
568         time.sleep(2)
569         http_proxy = os.environ.get('http_proxy', '')
570         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
571         LOG.debug(cmd)
572         self.ssh_helper.execute(cmd)
573         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
574         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
575         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
576
577
578 class ScenarioHelper(object):
579
580     DEFAULT_VNF_CFG = {
581         'lb_config': 'SW',
582         'lb_count': 1,
583         'worker_config': '1C/1T',
584         'worker_threads': 1,
585     }
586
587     def __init__(self, name):
588         self.name = name
589         self.scenario_cfg = None
590
591     @property
592     def task_path(self):
593         return self.scenario_cfg['task_path']
594
595     @property
596     def nodes(self):
597         return self.scenario_cfg.get('nodes')
598
599     @property
600     def all_options(self):
601         return self.scenario_cfg.get('options', {})
602
603     @property
604     def options(self):
605         return self.all_options.get(self.name, {})
606
607     @property
608     def vnf_cfg(self):
609         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
610
611     @property
612     def topology(self):
613         return self.scenario_cfg['topology']
614
615     @property
616     def timeout(self):
617         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
618             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
619         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
620         return test_duration if test_duration > test_timeout else test_timeout
621
622
623 class SampleVNF(GenericVNF):
624     """ Class providing file-like API for generic VNF implementation """
625
626     VNF_PROMPT = "pipeline>"
627     WAIT_TIME = 1
628     WAIT_TIME_FOR_SCRIPT = 10
629     APP_NAME = "SampleVNF"
630     # we run the VNF interactively, so the ssh command will timeout after this long
631
632     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
633                  resource_helper_type=None):
634         super(SampleVNF, self).__init__(name, vnfd, task_id)
635         self.bin_path = get_nsb_option('bin_path', '')
636
637         self.scenario_helper = ScenarioHelper(self.name)
638         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
639
640         if setup_env_helper_type is None:
641             setup_env_helper_type = SetupEnvHelper
642
643         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
644                                                   self.ssh_helper,
645                                                   self.scenario_helper)
646
647         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
648
649         if resource_helper_type is None:
650             resource_helper_type = ResourceHelper
651
652         self.resource_helper = resource_helper_type(self.setup_helper)
653
654         self.context_cfg = None
655         self.pipeline_kwargs = {}
656         self.uplink_ports = None
657         self.downlink_ports = None
658         # NOTE(esm): make QueueFileWrapper invert-able so that we
659         #            never have to manage the queues
660         self.q_in = Queue()
661         self.q_out = Queue()
662         self.queue_wrapper = None
663         self.run_kwargs = {}
664         self.used_drivers = {}
665         self.vnf_port_pairs = None
666         self._vnf_process = None
667
668     def _start_vnf(self):
669         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
670         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
671         self._vnf_process = Process(name=name, target=self._run)
672         self._vnf_process.start()
673
674     def _vnf_up_post(self):
675         pass
676
677     def instantiate(self, scenario_cfg, context_cfg):
678         self._update_collectd_options(scenario_cfg, context_cfg)
679         self.scenario_helper.scenario_cfg = scenario_cfg
680         self.context_cfg = context_cfg
681         self.resource_helper.update_from_context(
682             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
683             self.scenario_helper.nodes[self.name]
684         )
685
686         # vnf deploy is unsupported, use ansible playbooks
687         if self.scenario_helper.options.get("vnf_deploy", False):
688             self.deploy_helper.deploy_vnfs(self.APP_NAME)
689         self.resource_helper.setup()
690         self._start_vnf()
691
692     def _update_collectd_options(self, scenario_cfg, context_cfg):
693         """Update collectd configuration options
694         This function retrieves all collectd options contained in the test case
695
696         definition builds a single dictionary combining them. The following fragment
697         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
698         ---
699         schema: yardstick:task:0.1
700         scenarios:
701         - type: NSPerf
702           nodes:
703             tg__0: trafficgen_1.yardstick
704             vnf__0: vnf.yardstick
705           options:
706             collectd:
707               <options>  # COLLECTD priority 3
708             vnf__0:
709               collectd:
710                 plugins:
711                     load
712                 <options> # COLLECTD priority 2
713         context:
714           type: Node
715           name: yardstick
716           nfvi_type: baremetal
717           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
718         """
719         scenario_options = scenario_cfg.get('options', {})
720         generic_options = scenario_options.get('collectd', {})
721         scenario_node_options = scenario_options.get(self.name, {})\
722             .get('collectd', {})
723         context_node_options = context_cfg.get('nodes', {})\
724             .get(self.name, {}).get('collectd', {})
725
726         options = generic_options
727         self._update_options(options, scenario_node_options)
728         self._update_options(options, context_node_options)
729
730         self.setup_helper.collectd_options = options
731
732     def _update_options(self, options, additional_options):
733         """Update collectd options and plugins dictionary"""
734         for k, v in additional_options.items():
735             if isinstance(v, dict) and k in options:
736                 options[k].update(v)
737             else:
738                 options[k] = v
739
740     def wait_for_instantiate(self):
741         buf = []
742         time.sleep(self.WAIT_TIME)  # Give some time for config to load
743         while True:
744             if not self._vnf_process.is_alive():
745                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
746
747             # NOTE(esm): move to QueueFileWrapper
748             while self.q_out.qsize() > 0:
749                 buf.append(self.q_out.get())
750                 message = ''.join(buf)
751                 if self.VNF_PROMPT in message:
752                     LOG.info("%s VNF is up and running.", self.APP_NAME)
753                     self._vnf_up_post()
754                     self.queue_wrapper.clear()
755                     return self._vnf_process.exitcode
756
757                 if "PANIC" in message:
758                     raise RuntimeError("Error starting %s VNF." %
759                                        self.APP_NAME)
760
761             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
762             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
763             # Send ENTER to display a new prompt in case the prompt text was corrupted
764             # by other VNF output
765             self.q_in.put('\r\n')
766
767     def start_collect(self):
768         self.resource_helper.start_collect()
769
770     def stop_collect(self):
771         self.resource_helper.stop_collect()
772
773     def _build_run_kwargs(self):
774         self.run_kwargs = {
775             'stdin': self.queue_wrapper,
776             'stdout': self.queue_wrapper,
777             'keep_stdin_open': True,
778             'pty': True,
779             'timeout': self.scenario_helper.timeout,
780         }
781
782     def _build_config(self):
783         return self.setup_helper.build_config()
784
785     def _run(self):
786         # we can't share ssh paramiko objects to force new connection
787         self.ssh_helper.drop_connection()
788         cmd = self._build_config()
789         # kill before starting
790         self.setup_helper.kill_vnf()
791
792         LOG.debug(cmd)
793         self._build_run_kwargs()
794         self.ssh_helper.run(cmd, **self.run_kwargs)
795
796     def vnf_execute(self, cmd, wait_time=2):
797         """ send cmd to vnf process """
798
799         LOG.info("%s command: %s", self.APP_NAME, cmd)
800         self.q_in.put("{}\r\n".format(cmd))
801         time.sleep(wait_time)
802         output = []
803         while self.q_out.qsize() > 0:
804             output.append(self.q_out.get())
805         return "".join(output)
806
807     def _tear_down(self):
808         pass
809
810     def terminate(self):
811         self.vnf_execute("quit")
812         self.setup_helper.kill_vnf()
813         self._tear_down()
814         self.resource_helper.stop_collect()
815         if self._vnf_process is not None:
816             # be proper and join first before we kill
817             LOG.debug("joining before terminate %s", self._vnf_process.name)
818             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
819             self._vnf_process.terminate()
820         # no terminate children here because we share processes with tg
821
822     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
823         """Method for checking the statistics
824
825         This method could be overridden in children classes.
826
827         :return: VNF statistics
828         """
829         cmd = 'p {0} stats'.format(self.APP_WORD)
830         out = self.vnf_execute(cmd)
831         return out
832
833     def collect_kpi(self):
834         # we can't get KPIs if the VNF is down
835         check_if_process_failed(self._vnf_process, 0.01)
836         stats = self.get_stats()
837         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
838         physical_node = Context.get_physical_node_from_server(
839             self.scenario_helper.nodes[self.name])
840
841         result = {"physical_node": physical_node}
842         if m:
843             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
844             result["collect_stats"] = self.resource_helper.collect_kpi()
845         else:
846             result.update({"packets_in": 0,
847                            "packets_fwd": 0,
848                            "packets_dropped": 0})
849
850         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
851         return result
852
853     def scale(self, flavor=""):
854         """The SampleVNF base class doesn't provide the 'scale' feature"""
855         raise y_exceptions.FunctionNotImplemented(
856             function_name='scale', class_name='SampleVNFTrafficGen')
857
858
859 class SampleVNFTrafficGen(GenericTrafficGen):
860     """ Class providing file-like API for generic traffic generator """
861
862     APP_NAME = 'Sample'
863     RUN_WAIT = 1
864
865     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
866                  resource_helper_type=None):
867         super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
868         self.bin_path = get_nsb_option('bin_path', '')
869
870         self.scenario_helper = ScenarioHelper(self.name)
871         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
872
873         if setup_env_helper_type is None:
874             setup_env_helper_type = SetupEnvHelper
875
876         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
877                                                   self.ssh_helper,
878                                                   self.scenario_helper)
879
880         if resource_helper_type is None:
881             resource_helper_type = ClientResourceHelper
882
883         self.resource_helper = resource_helper_type(self.setup_helper)
884
885         self.runs_traffic = True
886         self.traffic_finished = False
887         self._tg_process = None
888         self._traffic_process = None
889
890     def _start_server(self):
891         # we can't share ssh paramiko objects to force new connection
892         self.ssh_helper.drop_connection()
893
894     def instantiate(self, scenario_cfg, context_cfg):
895         self.scenario_helper.scenario_cfg = scenario_cfg
896         self.resource_helper.update_from_context(
897             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
898             self.scenario_helper.nodes[self.name]
899         )
900
901         self.resource_helper.context_cfg = context_cfg
902
903         self.resource_helper.setup()
904         # must generate_cfg after DPDK bind because we need port number
905         self.resource_helper.generate_cfg()
906
907         LOG.info("Starting %s server...", self.APP_NAME)
908         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
909         self._tg_process = Process(name=name, target=self._start_server)
910         self._tg_process.start()
911
912     def _check_status(self):
913         raise NotImplementedError
914
915     def _wait_for_process(self):
916         while True:
917             if not self._tg_process.is_alive():
918                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
919             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
920             time.sleep(1)
921             status = self._check_status()
922             if status == 0:
923                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
924                 return self._tg_process.exitcode
925
926     def _traffic_runner(self, traffic_profile, mq_id):
927         # always drop connections first thing in new processes
928         # so we don't get paramiko errors
929         self.ssh_helper.drop_connection()
930         LOG.info("Starting %s client...", self.APP_NAME)
931         self._mq_producer = self._setup_mq_producer(mq_id)
932         self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
933
934     def run_traffic(self, traffic_profile):
935         """ Generate traffic on the wire according to the given params.
936         Method is non-blocking, returns immediately when traffic process
937         is running. Mandatory.
938
939         :param traffic_profile:
940         :return: True/False
941         """
942         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
943                                     traffic_profile.__class__.__name__,
944                                     os.getpid())
945         self._traffic_process = Process(
946             name=name, target=self._traffic_runner,
947             args=(traffic_profile, uuid.uuid1().int))
948         self._traffic_process.start()
949         # Wait for traffic process to start
950         while self.resource_helper.client_started.value == 0:
951             time.sleep(self.RUN_WAIT)
952             # what if traffic process takes a few seconds to start?
953             if not self._traffic_process.is_alive():
954                 break
955
956     def collect_kpi(self):
957         # check if the tg processes have exited
958         physical_node = Context.get_physical_node_from_server(
959             self.scenario_helper.nodes[self.name])
960
961         result = {"physical_node": physical_node}
962         for proc in (self._tg_process, self._traffic_process):
963             check_if_process_failed(proc)
964
965         result["collect_stats"] = self.resource_helper.collect_kpi()
966         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
967         return result
968
969     def terminate(self):
970         """ After this method finishes, all traffic processes should stop. Mandatory.
971
972         :return: True/False
973         """
974         self.traffic_finished = True
975         # we must kill client before we kill the server, or the client will raise exception
976         if self._traffic_process is not None:
977             # be proper and try to join before terminating
978             LOG.debug("joining before terminate %s", self._traffic_process.name)
979             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
980             self._traffic_process.terminate()
981         if self._tg_process is not None:
982             # be proper and try to join before terminating
983             LOG.debug("joining before terminate %s", self._tg_process.name)
984             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
985             self._tg_process.terminate()
986         # no terminate children here because we share processes with vnf
987
988     def scale(self, flavor=""):
989         """A traffic generator VFN doesn't provide the 'scale' feature"""
990         raise y_exceptions.FunctionNotImplemented(
991             function_name='scale', class_name='SampleVNFTrafficGen')