Create Dockerfile to create a yardstick-image of docker
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17
18 import os
19 import posixpath
20 import re
21 import six
22 import subprocess
23 import time
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 from yardstick.benchmark.contexts.node import NodeContext
43
44 LOG = logging.getLogger(__name__)
45
46
47 class SetupEnvHelper(object):
48
49     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
52     PIPELINE_COMMAND = ''
53     VNF_TYPE = "SAMPLE"
54
55     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56         super(SetupEnvHelper, self).__init__()
57         self.vnfd_helper = vnfd_helper
58         self.ssh_helper = ssh_helper
59         self.scenario_helper = scenario_helper
60         self.collectd_options = {}
61
62     def build_config(self):
63         raise NotImplementedError
64
65     def setup_vnf_environment(self):
66         pass
67
68     def kill_vnf(self):
69         pass
70
71     def tear_down(self):
72         raise NotImplementedError
73
74
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
76
77     APP_NAME = 'DpdkVnf'
78     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
80
81     @staticmethod
82     def _update_packet_type(ip_pipeline_cfg, traffic_options):
83         match_str = 'pkt_type = ipv4'
84         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86         return pipeline_config_str
87
88     @classmethod
89     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90         traffic_type = traffic_options['traffic_type']
91
92         if traffic_options['vnf_type'] is not cls.APP_NAME:
93             match_str = 'traffic_type = 4'
94             replace_str = 'traffic_type = {0}'.format(traffic_type)
95
96         elif traffic_type == 4:
97             match_str = 'pkt_type = ipv4'
98             replace_str = 'pkt_type = ipv4'
99
100         else:
101             match_str = 'pkt_type = ipv4'
102             replace_str = 'pkt_type = ipv6'
103
104         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105         return pipeline_config_str
106
107     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109         self.all_ports = None
110         self.bound_pci = None
111         self.socket = None
112         self.used_drivers = None
113         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
114
115     def _setup_hugepages(self):
116         meminfo = utils.read_meminfo(self.ssh_helper)
117         hp_size_kb = int(meminfo['Hugepagesize'])
118         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
119         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
120         self.ssh_helper.execute('echo %s | sudo tee %s' %
121                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
122         hp = six.BytesIO()
123         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
124         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
125         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
126                  hp_size_kb, nr_hugepages, nr_hugepages_set)
127
128     def build_config(self):
129         vnf_cfg = self.scenario_helper.vnf_cfg
130         task_path = self.scenario_helper.task_path
131
132         config_file = vnf_cfg.get('file')
133         lb_count = vnf_cfg.get('lb_count', 3)
134         lb_config = vnf_cfg.get('lb_config', 'SW')
135         worker_config = vnf_cfg.get('worker_config', '1C/1T')
136         worker_threads = vnf_cfg.get('worker_threads', 3)
137
138         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
139         traffic_options = {
140             'traffic_type': traffic_type,
141             'pkt_type': 'ipv%s' % traffic_type,
142             'vnf_type': self.VNF_TYPE,
143         }
144
145         # read actions/rules from file
146         acl_options = None
147         acl_file_name = self.scenario_helper.options.get('rules')
148         if acl_file_name:
149             with utils.open_relative_file(acl_file_name, task_path) as infile:
150                 acl_options = yaml_loader.yaml_load(infile)
151
152         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
153                                                   task_path)
154         config_basename = posixpath.basename(self.CFG_CONFIG)
155         script_basename = posixpath.basename(self.CFG_SCRIPT)
156         multiport = MultiPortConfig(self.scenario_helper.topology,
157                                     config_tpl_cfg,
158                                     config_basename,
159                                     self.vnfd_helper,
160                                     self.VNF_TYPE,
161                                     lb_count,
162                                     worker_threads,
163                                     worker_config,
164                                     lb_config,
165                                     self.socket)
166
167         multiport.generate_config()
168         if config_file:
169             with utils.open_relative_file(config_file, task_path) as infile:
170                 new_config = ['[EAL]']
171                 vpci = []
172                 for port in self.vnfd_helper.port_pairs.all_ports:
173                     interface = self.vnfd_helper.find_interface(name=port)
174                     vpci.append(interface['virtual-interface']["vpci"])
175                 new_config.extend('w = {0}'.format(item) for item in vpci)
176                 new_config = '\n'.join(new_config) + '\n' + infile.read()
177         else:
178             with open(self.CFG_CONFIG) as handle:
179                 new_config = handle.read()
180             new_config = self._update_traffic_type(new_config, traffic_options)
181             new_config = self._update_packet_type(new_config, traffic_options)
182         self.ssh_helper.upload_config_file(config_basename, new_config)
183         self.ssh_helper.upload_config_file(script_basename,
184             multiport.generate_script(self.vnfd_helper,
185                                       self.get_flows_config(acl_options)))
186
187         LOG.info("Provision and start the %s", self.APP_NAME)
188         self._build_pipeline_kwargs()
189         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
190
191     def get_flows_config(self, options=None): # pylint: disable=unused-argument
192         """No actions/rules (flows) by default"""
193         return None
194
195     def _build_pipeline_kwargs(self):
196         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
197         # count the number of actual ports in the list of pairs
198         # remove duplicate ports
199         # this is really a mapping from LINK ID to DPDK PMD ID
200         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
201         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
202         ports = self.vnfd_helper.port_pairs.all_ports
203         port_nums = self.vnfd_helper.port_nums(ports)
204         # create mask from all the dpdk port numbers
205         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
206
207         vnf_cfg = self.scenario_helper.vnf_cfg
208         lb_config = vnf_cfg.get('lb_config', 'SW')
209         worker_threads = vnf_cfg.get('worker_threads', 3)
210         hwlb = ''
211         if lb_config == 'HW':
212             hwlb = ' --hwlb %s' % worker_threads
213
214         self.pipeline_kwargs = {
215             'cfg_file': self.CFG_CONFIG,
216             'script': self.CFG_SCRIPT,
217             'port_mask_hex': ports_mask_hex,
218             'tool_path': tool_path,
219             'hwlb': hwlb,
220         }
221
222     def setup_vnf_environment(self):
223         self._setup_dpdk()
224         self.kill_vnf()
225         # bind before _setup_resources so we can use dpdk_port_num
226         self._detect_and_bind_drivers()
227         resource = self._setup_resources()
228         return resource
229
230     def kill_vnf(self):
231         # pkill is not matching, debug with pgrep
232         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
233         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
234         # have to use exact match
235         # try using killall to match
236         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
237
238     def _setup_dpdk(self):
239         """Setup DPDK environment needed for VNF to run"""
240         self._setup_hugepages()
241         self.dpdk_bind_helper.load_dpdk_driver()
242
243         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
244         if exit_status == 0:
245             return
246
247     def _setup_resources(self):
248         # what is this magic?  how do we know which socket is for which port?
249         # what about quad-socket?
250         if any(v[5] == "0" for v in self.bound_pci):
251             self.socket = 0
252         else:
253             self.socket = 1
254
255         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
256         # this won't work because we don't have DPDK port numbers yet
257         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
258         port_names = (intf["name"] for intf in ports)
259         plugins = self.collectd_options.get("plugins", {})
260         interval = self.collectd_options.get("interval")
261         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
262         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
263                                plugins=plugins, interval=interval,
264                                timeout=self.scenario_helper.timeout)
265
266     def _check_interface_fields(self):
267         num_nodes = len(self.scenario_helper.nodes)
268         # OpenStack instance creation time is probably proportional to the number
269         # of instances
270         timeout = 120 * num_nodes
271         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
272                              self.ssh_helper, timeout)
273         dpdk_node.check()
274
275     def _detect_and_bind_drivers(self):
276         interfaces = self.vnfd_helper.interfaces
277
278         self._check_interface_fields()
279         # check for bound after probe
280         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
281
282         self.dpdk_bind_helper.read_status()
283         self.dpdk_bind_helper.save_used_drivers()
284
285         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
286
287         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
288         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
289             try:
290                 intf = next(v for v in interfaces
291                             if vpci == v['virtual-interface']['vpci'])
292                 # force to int
293                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
294             except:  # pylint: disable=bare-except
295                 pass
296         time.sleep(2)
297
298     def get_local_iface_name_by_vpci(self, vpci):
299         find_net_cmd = self.FIND_NET_CMD.format(vpci)
300         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
301         if exit_status == 0:
302             return stdout
303         return None
304
305     def tear_down(self):
306         self.dpdk_bind_helper.rebind_drivers()
307
308
309 class ResourceHelper(object):
310
311     COLLECT_KPI = ''
312     MAKE_INSTALL = 'cd {0} && make && sudo make install'
313     RESOURCE_WORD = 'sample'
314
315     COLLECT_MAP = {}
316
317     def __init__(self, setup_helper):
318         super(ResourceHelper, self).__init__()
319         self.resource = None
320         self.setup_helper = setup_helper
321         self.ssh_helper = setup_helper.ssh_helper
322         self._enable = True
323
324     def setup(self):
325         self.resource = self.setup_helper.setup_vnf_environment()
326
327     def generate_cfg(self):
328         pass
329
330     def update_from_context(self, context, attr_name):
331         """Disable resource helper in case of baremetal context.
332
333         And update appropriate node collectd options in context
334         """
335         if isinstance(context, NodeContext):
336             self._enable = False
337             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
338                                                      attr_name)
339
340     def _collect_resource_kpi(self):
341         result = {}
342         status = self.resource.check_if_system_agent_running("collectd")[0]
343         if status == 0 and self._enable:
344             result = self.resource.amqp_collect_nfvi_kpi()
345
346         result = {"core": result}
347         return result
348
349     def start_collect(self):
350         if self._enable:
351             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
352             self.resource.start()
353             self.resource.amqp_process_for_nfvi_kpi()
354
355     def stop_collect(self):
356         if self.resource and self._enable:
357             self.resource.stop()
358
359     def collect_kpi(self):
360         return self._collect_resource_kpi()
361
362
363 class ClientResourceHelper(ResourceHelper):
364
365     RUN_DURATION = 60
366     QUEUE_WAIT_TIME = 5
367     SYNC_PORT = 1
368     ASYNC_PORT = 2
369
370     def __init__(self, setup_helper):
371         super(ClientResourceHelper, self).__init__(setup_helper)
372         self.vnfd_helper = setup_helper.vnfd_helper
373         self.scenario_helper = setup_helper.scenario_helper
374
375         self.client = None
376         self.client_started = Value('i', 0)
377         self.all_ports = None
378         self._queue = Queue()
379         self._result = {}
380         self._terminated = Value('i', 0)
381
382     def _build_ports(self):
383         self.networks = self.vnfd_helper.port_pairs.networks
384         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
385         self.downlink_ports = \
386             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
387         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
388
389     def port_num(self, intf):
390         # by default return port num
391         return self.vnfd_helper.port_num(intf)
392
393     def get_stats(self, *args, **kwargs):
394         try:
395             return self.client.get_stats(*args, **kwargs)
396         except STLError:
397             LOG.error('TRex client not connected')
398             return {}
399
400     def _get_samples(self, ports, port_pg_id=False):
401         raise NotImplementedError()
402
403     def _run_traffic_once(self, traffic_profile):
404         traffic_profile.execute_traffic(self)
405         self.client_started.value = 1
406         time.sleep(self.RUN_DURATION)
407         samples = self._get_samples(traffic_profile.ports)
408         time.sleep(self.QUEUE_WAIT_TIME)
409         self._queue.put(samples)
410
411     def run_traffic(self, traffic_profile):
412         # if we don't do this we can hang waiting for the queue to drain
413         # have to do this in the subprocess
414         self._queue.cancel_join_thread()
415         # fixme: fix passing correct trex config file,
416         # instead of searching the default path
417         try:
418             self._build_ports()
419             self.client = self._connect()
420             self.client.reset(ports=self.all_ports)
421             self.client.remove_all_streams(self.all_ports)  # remove all streams
422             traffic_profile.register_generator(self)
423
424             while self._terminated.value == 0:
425                 self._run_traffic_once(traffic_profile)
426
427             self.client.stop(self.all_ports)
428             self.client.disconnect()
429             self._terminated.value = 0
430         except STLError:
431             if self._terminated.value:
432                 LOG.debug("traffic generator is stopped")
433                 return  # return if trex/tg server is stopped.
434             raise
435
436     def terminate(self):
437         self._terminated.value = 1  # stop client
438
439     def clear_stats(self, ports=None):
440         if ports is None:
441             ports = self.all_ports
442         self.client.clear_stats(ports=ports)
443
444     def start(self, ports=None, *args, **kwargs):
445         # pylint: disable=keyword-arg-before-vararg
446         # NOTE(ralonsoh): defining keyworded arguments before variable
447         # positional arguments is a bug. This function definition doesn't work
448         # in Python 2, although it works in Python 3. Reference:
449         # https://www.python.org/dev/peps/pep-3102/
450         if ports is None:
451             ports = self.all_ports
452         self.client.start(ports=ports, *args, **kwargs)
453
454     def collect_kpi(self):
455         if not self._queue.empty():
456             kpi = self._queue.get()
457             self._result.update(kpi)
458             LOG.debug('Got KPIs from _queue for %s %s',
459                       self.scenario_helper.name, self.RESOURCE_WORD)
460         return self._result
461
462     def _connect(self, client=None):
463         if client is None:
464             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
465                                server=self.vnfd_helper.mgmt_interface["ip"],
466                                verbose_level=LoggerApi.VERBOSE_QUIET)
467
468         # try to connect with 5s intervals, 30s max
469         for idx in range(6):
470             try:
471                 client.connect()
472                 break
473             except STLError:
474                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
475                 time.sleep(5)
476         return client
477
478
479 class Rfc2544ResourceHelper(object):
480
481     DEFAULT_CORRELATED_TRAFFIC = False
482     DEFAULT_LATENCY = False
483     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
484
485     def __init__(self, scenario_helper):
486         super(Rfc2544ResourceHelper, self).__init__()
487         self.scenario_helper = scenario_helper
488         self._correlated_traffic = None
489         self.iteration = Value('i', 0)
490         self._latency = None
491         self._rfc2544 = None
492         self._tolerance_low = None
493         self._tolerance_high = None
494
495     @property
496     def rfc2544(self):
497         if self._rfc2544 is None:
498             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
499         return self._rfc2544
500
501     @property
502     def tolerance_low(self):
503         if self._tolerance_low is None:
504             self.get_rfc_tolerance()
505         return self._tolerance_low
506
507     @property
508     def tolerance_high(self):
509         if self._tolerance_high is None:
510             self.get_rfc_tolerance()
511         return self._tolerance_high
512
513     @property
514     def correlated_traffic(self):
515         if self._correlated_traffic is None:
516             self._correlated_traffic = \
517                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
518
519         return self._correlated_traffic
520
521     @property
522     def latency(self):
523         if self._latency is None:
524             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
525         return self._latency
526
527     def get_rfc2544(self, name, default=None):
528         return self.rfc2544.get(name, default)
529
530     def get_rfc_tolerance(self):
531         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
532         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
533         self._tolerance_low = next(tolerance_iter)
534         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
535
536
537 class SampleVNFDeployHelper(object):
538
539     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
540     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
541     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
542
543     def __init__(self, vnfd_helper, ssh_helper):
544         super(SampleVNFDeployHelper, self).__init__()
545         self.ssh_helper = ssh_helper
546         self.vnfd_helper = vnfd_helper
547
548     def deploy_vnfs(self, app_name):
549         vnf_bin = self.ssh_helper.join_bin_path(app_name)
550         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
551         if not exit_status:
552             return
553
554         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
555         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
556         time.sleep(2)
557         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
558         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
559
560         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
561         time.sleep(2)
562         http_proxy = os.environ.get('http_proxy', '')
563         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
564         LOG.debug(cmd)
565         self.ssh_helper.execute(cmd)
566         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
567         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
568         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
569
570
571 class ScenarioHelper(object):
572
573     DEFAULT_VNF_CFG = {
574         'lb_config': 'SW',
575         'lb_count': 1,
576         'worker_config': '1C/1T',
577         'worker_threads': 1,
578     }
579
580     def __init__(self, name):
581         self.name = name
582         self.scenario_cfg = None
583
584     @property
585     def task_path(self):
586         return self.scenario_cfg['task_path']
587
588     @property
589     def nodes(self):
590         return self.scenario_cfg.get('nodes')
591
592     @property
593     def all_options(self):
594         return self.scenario_cfg.get('options', {})
595
596     @property
597     def options(self):
598         return self.all_options.get(self.name, {})
599
600     @property
601     def vnf_cfg(self):
602         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
603
604     @property
605     def topology(self):
606         return self.scenario_cfg['topology']
607
608     @property
609     def timeout(self):
610         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
611             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
612         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
613         return test_duration if test_duration > test_timeout else test_timeout
614
615 class SampleVNF(GenericVNF):
616     """ Class providing file-like API for generic VNF implementation """
617
618     VNF_PROMPT = "pipeline>"
619     WAIT_TIME = 1
620     WAIT_TIME_FOR_SCRIPT = 10
621     APP_NAME = "SampleVNF"
622     # we run the VNF interactively, so the ssh command will timeout after this long
623
624     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
625         super(SampleVNF, self).__init__(name, vnfd)
626         self.bin_path = get_nsb_option('bin_path', '')
627
628         self.scenario_helper = ScenarioHelper(self.name)
629         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
630
631         if setup_env_helper_type is None:
632             setup_env_helper_type = SetupEnvHelper
633
634         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
635                                                   self.ssh_helper,
636                                                   self.scenario_helper)
637
638         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
639
640         if resource_helper_type is None:
641             resource_helper_type = ResourceHelper
642
643         self.resource_helper = resource_helper_type(self.setup_helper)
644
645         self.context_cfg = None
646         self.pipeline_kwargs = {}
647         self.uplink_ports = None
648         self.downlink_ports = None
649         # NOTE(esm): make QueueFileWrapper invert-able so that we
650         #            never have to manage the queues
651         self.q_in = Queue()
652         self.q_out = Queue()
653         self.queue_wrapper = None
654         self.run_kwargs = {}
655         self.used_drivers = {}
656         self.vnf_port_pairs = None
657         self._vnf_process = None
658
659     def _start_vnf(self):
660         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
661         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
662         self._vnf_process = Process(name=name, target=self._run)
663         self._vnf_process.start()
664
665     def _vnf_up_post(self):
666         pass
667
668     def instantiate(self, scenario_cfg, context_cfg):
669         self._update_collectd_options(scenario_cfg, context_cfg)
670         self.scenario_helper.scenario_cfg = scenario_cfg
671         self.context_cfg = context_cfg
672         self.resource_helper.update_from_context(
673             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
674             self.scenario_helper.nodes[self.name]
675         )
676
677         # vnf deploy is unsupported, use ansible playbooks
678         if self.scenario_helper.options.get("vnf_deploy", False):
679             self.deploy_helper.deploy_vnfs(self.APP_NAME)
680         self.resource_helper.setup()
681         self._start_vnf()
682
683     def _update_collectd_options(self, scenario_cfg, context_cfg):
684         """Update collectd configuration options
685         This function retrieves all collectd options contained in the test case
686
687         definition builds a single dictionary combining them. The following fragment
688         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
689         ---
690         schema: yardstick:task:0.1
691         scenarios:
692         - type: NSPerf
693           nodes:
694             tg__0: trafficgen_1.yardstick
695             vnf__0: vnf.yardstick
696           options:
697             collectd:
698               <options>  # COLLECTD priority 3
699             vnf__0:
700               collectd:
701                 plugins:
702                     load
703                 <options> # COLLECTD priority 2
704         context:
705           type: Node
706           name: yardstick
707           nfvi_type: baremetal
708           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
709         """
710         scenario_options = scenario_cfg.get('options', {})
711         generic_options = scenario_options.get('collectd', {})
712         scenario_node_options = scenario_options.get(self.name, {})\
713             .get('collectd', {})
714         context_node_options = context_cfg.get('nodes', {})\
715             .get(self.name, {}).get('collectd', {})
716
717         options = generic_options
718         self._update_options(options, scenario_node_options)
719         self._update_options(options, context_node_options)
720
721         self.setup_helper.collectd_options = options
722
723     def _update_options(self, options, additional_options):
724         """Update collectd options and plugins dictionary"""
725         for k, v in additional_options.items():
726             if isinstance(v, dict) and k in options:
727                 options[k].update(v)
728             else:
729                 options[k] = v
730
731     def wait_for_instantiate(self):
732         buf = []
733         time.sleep(self.WAIT_TIME)  # Give some time for config to load
734         while True:
735             if not self._vnf_process.is_alive():
736                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
737
738             # NOTE(esm): move to QueueFileWrapper
739             while self.q_out.qsize() > 0:
740                 buf.append(self.q_out.get())
741                 message = ''.join(buf)
742                 if self.VNF_PROMPT in message:
743                     LOG.info("%s VNF is up and running.", self.APP_NAME)
744                     self._vnf_up_post()
745                     self.queue_wrapper.clear()
746                     return self._vnf_process.exitcode
747
748                 if "PANIC" in message:
749                     raise RuntimeError("Error starting %s VNF." %
750                                        self.APP_NAME)
751
752             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
753             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
754             # Send ENTER to display a new prompt in case the prompt text was corrupted
755             # by other VNF output
756             self.q_in.put('\r\n')
757
758     def start_collect(self):
759         self.resource_helper.start_collect()
760
761     def stop_collect(self):
762         self.resource_helper.stop_collect()
763
764     def _build_run_kwargs(self):
765         self.run_kwargs = {
766             'stdin': self.queue_wrapper,
767             'stdout': self.queue_wrapper,
768             'keep_stdin_open': True,
769             'pty': True,
770             'timeout': self.scenario_helper.timeout,
771         }
772
773     def _build_config(self):
774         return self.setup_helper.build_config()
775
776     def _run(self):
777         # we can't share ssh paramiko objects to force new connection
778         self.ssh_helper.drop_connection()
779         cmd = self._build_config()
780         # kill before starting
781         self.setup_helper.kill_vnf()
782
783         LOG.debug(cmd)
784         self._build_run_kwargs()
785         self.ssh_helper.run(cmd, **self.run_kwargs)
786
787     def vnf_execute(self, cmd, wait_time=2):
788         """ send cmd to vnf process """
789
790         LOG.info("%s command: %s", self.APP_NAME, cmd)
791         self.q_in.put("{}\r\n".format(cmd))
792         time.sleep(wait_time)
793         output = []
794         while self.q_out.qsize() > 0:
795             output.append(self.q_out.get())
796         return "".join(output)
797
798     def _tear_down(self):
799         pass
800
801     def terminate(self):
802         self.vnf_execute("quit")
803         self.setup_helper.kill_vnf()
804         self._tear_down()
805         self.resource_helper.stop_collect()
806         if self._vnf_process is not None:
807             # be proper and join first before we kill
808             LOG.debug("joining before terminate %s", self._vnf_process.name)
809             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
810             self._vnf_process.terminate()
811         # no terminate children here because we share processes with tg
812
813     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
814         """Method for checking the statistics
815
816         This method could be overridden in children classes.
817
818         :return: VNF statistics
819         """
820         cmd = 'p {0} stats'.format(self.APP_WORD)
821         out = self.vnf_execute(cmd)
822         return out
823
824     def collect_kpi(self):
825         # we can't get KPIs if the VNF is down
826         check_if_process_failed(self._vnf_process, 0.01)
827         stats = self.get_stats()
828         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
829         physical_node = Context.get_physical_node_from_server(
830             self.scenario_helper.nodes[self.name])
831
832         result = {"physical_node": physical_node}
833         if m:
834             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
835             result["collect_stats"] = self.resource_helper.collect_kpi()
836         else:
837             result.update({"packets_in": 0,
838                            "packets_fwd": 0,
839                            "packets_dropped": 0})
840
841         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
842         return result
843
844     def scale(self, flavor=""):
845         """The SampleVNF base class doesn't provide the 'scale' feature"""
846         raise y_exceptions.FunctionNotImplemented(
847             function_name='scale', class_name='SampleVNFTrafficGen')
848
849
850 class SampleVNFTrafficGen(GenericTrafficGen):
851     """ Class providing file-like API for generic traffic generator """
852
853     APP_NAME = 'Sample'
854     RUN_WAIT = 1
855
856     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
857         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
858         self.bin_path = get_nsb_option('bin_path', '')
859
860         self.scenario_helper = ScenarioHelper(self.name)
861         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
862
863         if setup_env_helper_type is None:
864             setup_env_helper_type = SetupEnvHelper
865
866         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
867                                                   self.ssh_helper,
868                                                   self.scenario_helper)
869
870         if resource_helper_type is None:
871             resource_helper_type = ClientResourceHelper
872
873         self.resource_helper = resource_helper_type(self.setup_helper)
874
875         self.runs_traffic = True
876         self.traffic_finished = False
877         self._tg_process = None
878         self._traffic_process = None
879
880     def _start_server(self):
881         # we can't share ssh paramiko objects to force new connection
882         self.ssh_helper.drop_connection()
883
884     def instantiate(self, scenario_cfg, context_cfg):
885         self.scenario_helper.scenario_cfg = scenario_cfg
886         self.resource_helper.update_from_context(
887             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
888             self.scenario_helper.nodes[self.name]
889         )
890
891         self.resource_helper.setup()
892         # must generate_cfg after DPDK bind because we need port number
893         self.resource_helper.generate_cfg()
894
895         LOG.info("Starting %s server...", self.APP_NAME)
896         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
897         self._tg_process = Process(name=name, target=self._start_server)
898         self._tg_process.start()
899
900     def _check_status(self):
901         raise NotImplementedError
902
903     def _wait_for_process(self):
904         while True:
905             if not self._tg_process.is_alive():
906                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
907             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
908             time.sleep(1)
909             status = self._check_status()
910             if status == 0:
911                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
912                 return self._tg_process.exitcode
913
914     def _traffic_runner(self, traffic_profile):
915         # always drop connections first thing in new processes
916         # so we don't get paramiko errors
917         self.ssh_helper.drop_connection()
918         LOG.info("Starting %s client...", self.APP_NAME)
919         self.resource_helper.run_traffic(traffic_profile)
920
921     def run_traffic(self, traffic_profile):
922         """ Generate traffic on the wire according to the given params.
923         Method is non-blocking, returns immediately when traffic process
924         is running. Mandatory.
925
926         :param traffic_profile:
927         :return: True/False
928         """
929         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
930                                     os.getpid())
931         self._traffic_process = Process(name=name, target=self._traffic_runner,
932                                         args=(traffic_profile,))
933         self._traffic_process.start()
934         # Wait for traffic process to start
935         while self.resource_helper.client_started.value == 0:
936             time.sleep(self.RUN_WAIT)
937             # what if traffic process takes a few seconds to start?
938             if not self._traffic_process.is_alive():
939                 break
940
941         return self._traffic_process.is_alive()
942
943     def collect_kpi(self):
944         # check if the tg processes have exited
945         physical_node = Context.get_physical_node_from_server(
946             self.scenario_helper.nodes[self.name])
947
948         result = {"physical_node": physical_node}
949         for proc in (self._tg_process, self._traffic_process):
950             check_if_process_failed(proc)
951
952         result["collect_stats"] = self.resource_helper.collect_kpi()
953         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
954         return result
955
956     def terminate(self):
957         """ After this method finishes, all traffic processes should stop. Mandatory.
958
959         :return: True/False
960         """
961         self.traffic_finished = True
962         # we must kill client before we kill the server, or the client will raise exception
963         if self._traffic_process is not None:
964             # be proper and try to join before terminating
965             LOG.debug("joining before terminate %s", self._traffic_process.name)
966             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
967             self._traffic_process.terminate()
968         if self._tg_process is not None:
969             # be proper and try to join before terminating
970             LOG.debug("joining before terminate %s", self._tg_process.name)
971             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
972             self._tg_process.terminate()
973         # no terminate children here because we share processes with vnf
974
975     def scale(self, flavor=""):
976         """A traffic generator VFN doesn't provide the 'scale' feature"""
977         raise y_exceptions.FunctionNotImplemented(
978             function_name='scale', class_name='SampleVNFTrafficGen')