Merge "Cleanup EnvCommand test cases"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17
18 import os
19 import posixpath
20 import re
21 import six
22 import subprocess
23 import time
24
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.network_services import constants
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.nfvi.resource import ResourceProfile
36 from yardstick.network_services.utils import get_nsb_option
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
40 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
41
42
43 LOG = logging.getLogger(__name__)
44
45
46 class SetupEnvHelper(object):
47
48     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
49     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
50     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
51     PIPELINE_COMMAND = ''
52     VNF_TYPE = "SAMPLE"
53
54     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
55         super(SetupEnvHelper, self).__init__()
56         self.vnfd_helper = vnfd_helper
57         self.ssh_helper = ssh_helper
58         self.scenario_helper = scenario_helper
59         self.collectd_options = {}
60
61     def build_config(self):
62         raise NotImplementedError
63
64     def setup_vnf_environment(self):
65         pass
66
67     def kill_vnf(self):
68         pass
69
70     def tear_down(self):
71         raise NotImplementedError
72
73
74 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
75
76     APP_NAME = 'DpdkVnf'
77     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
78     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
79
80     @staticmethod
81     def _update_packet_type(ip_pipeline_cfg, traffic_options):
82         match_str = 'pkt_type = ipv4'
83         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
84         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
85         return pipeline_config_str
86
87     @classmethod
88     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
89         traffic_type = traffic_options['traffic_type']
90
91         if traffic_options['vnf_type'] is not cls.APP_NAME:
92             match_str = 'traffic_type = 4'
93             replace_str = 'traffic_type = {0}'.format(traffic_type)
94
95         elif traffic_type == 4:
96             match_str = 'pkt_type = ipv4'
97             replace_str = 'pkt_type = ipv4'
98
99         else:
100             match_str = 'pkt_type = ipv4'
101             replace_str = 'pkt_type = ipv6'
102
103         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
104         return pipeline_config_str
105
106     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
107         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
108         self.all_ports = None
109         self.bound_pci = None
110         self.socket = None
111         self.used_drivers = None
112         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
113
114     def _setup_hugepages(self):
115         meminfo = utils.read_meminfo(self.ssh_helper)
116         hp_size_kb = int(meminfo['Hugepagesize'])
117         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
118         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
119         self.ssh_helper.execute('echo %s | sudo tee %s' %
120                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
121         hp = six.BytesIO()
122         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
123         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
124         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
125                  hp_size_kb, nr_hugepages, nr_hugepages_set)
126
127     def build_config(self):
128         vnf_cfg = self.scenario_helper.vnf_cfg
129         task_path = self.scenario_helper.task_path
130
131         config_file = vnf_cfg.get('file')
132         lb_count = vnf_cfg.get('lb_count', 3)
133         lb_config = vnf_cfg.get('lb_config', 'SW')
134         worker_config = vnf_cfg.get('worker_config', '1C/1T')
135         worker_threads = vnf_cfg.get('worker_threads', 3)
136
137         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
138         traffic_options = {
139             'traffic_type': traffic_type,
140             'pkt_type': 'ipv%s' % traffic_type,
141             'vnf_type': self.VNF_TYPE,
142         }
143
144         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
145                                                   task_path)
146         config_basename = posixpath.basename(self.CFG_CONFIG)
147         script_basename = posixpath.basename(self.CFG_SCRIPT)
148         multiport = MultiPortConfig(self.scenario_helper.topology,
149                                     config_tpl_cfg,
150                                     config_basename,
151                                     self.vnfd_helper,
152                                     self.VNF_TYPE,
153                                     lb_count,
154                                     worker_threads,
155                                     worker_config,
156                                     lb_config,
157                                     self.socket)
158
159         multiport.generate_config()
160         if config_file:
161             with utils.open_relative_file(config_file, task_path) as infile:
162                 new_config = ['[EAL]']
163                 vpci = []
164                 for port in self.vnfd_helper.port_pairs.all_ports:
165                     interface = self.vnfd_helper.find_interface(name=port)
166                     vpci.append(interface['virtual-interface']["vpci"])
167                 new_config.extend('w = {0}'.format(item) for item in vpci)
168                 new_config = '\n'.join(new_config) + '\n' + infile.read()
169         else:
170             with open(self.CFG_CONFIG) as handle:
171                 new_config = handle.read()
172             new_config = self._update_traffic_type(new_config, traffic_options)
173             new_config = self._update_packet_type(new_config, traffic_options)
174         self.ssh_helper.upload_config_file(config_basename, new_config)
175         self.ssh_helper.upload_config_file(script_basename,
176                                            multiport.generate_script(self.vnfd_helper))
177
178         LOG.info("Provision and start the %s", self.APP_NAME)
179         self._build_pipeline_kwargs()
180         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
181
182     def _build_pipeline_kwargs(self):
183         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
184         # count the number of actual ports in the list of pairs
185         # remove duplicate ports
186         # this is really a mapping from LINK ID to DPDK PMD ID
187         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
188         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
189         ports = self.vnfd_helper.port_pairs.all_ports
190         port_nums = self.vnfd_helper.port_nums(ports)
191         # create mask from all the dpdk port numbers
192         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
193
194         vnf_cfg = self.scenario_helper.vnf_cfg
195         lb_config = vnf_cfg.get('lb_config', 'SW')
196         worker_threads = vnf_cfg.get('worker_threads', 3)
197         hwlb = ''
198         if lb_config == 'HW':
199             hwlb = ' --hwlb %s' % worker_threads
200
201         self.pipeline_kwargs = {
202             'cfg_file': self.CFG_CONFIG,
203             'script': self.CFG_SCRIPT,
204             'port_mask_hex': ports_mask_hex,
205             'tool_path': tool_path,
206             'hwlb': hwlb,
207         }
208
209     def setup_vnf_environment(self):
210         self._setup_dpdk()
211         self.kill_vnf()
212         # bind before _setup_resources so we can use dpdk_port_num
213         self._detect_and_bind_drivers()
214         resource = self._setup_resources()
215         return resource
216
217     def kill_vnf(self):
218         # pkill is not matching, debug with pgrep
219         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
220         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
221         # have to use exact match
222         # try using killall to match
223         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
224
225     def _setup_dpdk(self):
226         """Setup DPDK environment needed for VNF to run"""
227         self._setup_hugepages()
228         self.dpdk_bind_helper.load_dpdk_driver()
229
230         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
231         if exit_status == 0:
232             return
233
234     def _setup_resources(self):
235         # what is this magic?  how do we know which socket is for which port?
236         # what about quad-socket?
237         if any(v[5] == "0" for v in self.bound_pci):
238             self.socket = 0
239         else:
240             self.socket = 1
241
242         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
243         # this won't work because we don't have DPDK port numbers yet
244         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
245         port_names = (intf["name"] for intf in ports)
246         plugins = self.collectd_options.get("plugins", {})
247         interval = self.collectd_options.get("interval")
248         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
249         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
250                                plugins=plugins, interval=interval,
251                                timeout=self.scenario_helper.timeout)
252
253     def _check_interface_fields(self):
254         num_nodes = len(self.scenario_helper.nodes)
255         # OpenStack instance creation time is probably proportional to the number
256         # of instances
257         timeout = 120 * num_nodes
258         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
259                              self.ssh_helper, timeout)
260         dpdk_node.check()
261
262     def _detect_and_bind_drivers(self):
263         interfaces = self.vnfd_helper.interfaces
264
265         self._check_interface_fields()
266         # check for bound after probe
267         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
268
269         self.dpdk_bind_helper.read_status()
270         self.dpdk_bind_helper.save_used_drivers()
271
272         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
273
274         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
275         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
276             try:
277                 intf = next(v for v in interfaces
278                             if vpci == v['virtual-interface']['vpci'])
279                 # force to int
280                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
281             except:  # pylint: disable=bare-except
282                 pass
283         time.sleep(2)
284
285     def get_local_iface_name_by_vpci(self, vpci):
286         find_net_cmd = self.FIND_NET_CMD.format(vpci)
287         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
288         if exit_status == 0:
289             return stdout
290         return None
291
292     def tear_down(self):
293         self.dpdk_bind_helper.rebind_drivers()
294
295
296 class ResourceHelper(object):
297
298     COLLECT_KPI = ''
299     MAKE_INSTALL = 'cd {0} && make && sudo make install'
300     RESOURCE_WORD = 'sample'
301
302     COLLECT_MAP = {}
303
304     def __init__(self, setup_helper):
305         super(ResourceHelper, self).__init__()
306         self.resource = None
307         self.setup_helper = setup_helper
308         self.ssh_helper = setup_helper.ssh_helper
309
310     def setup(self):
311         self.resource = self.setup_helper.setup_vnf_environment()
312
313     def generate_cfg(self):
314         pass
315
316     def _collect_resource_kpi(self):
317         result = {}
318         status = self.resource.check_if_system_agent_running("collectd")[0]
319         if status == 0:
320             result = self.resource.amqp_collect_nfvi_kpi()
321
322         result = {"core": result}
323         return result
324
325     def start_collect(self):
326         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
327         self.resource.start()
328         self.resource.amqp_process_for_nfvi_kpi()
329
330     def stop_collect(self):
331         if self.resource:
332             self.resource.stop()
333
334     def collect_kpi(self):
335         return self._collect_resource_kpi()
336
337
338 class ClientResourceHelper(ResourceHelper):
339
340     RUN_DURATION = 60
341     QUEUE_WAIT_TIME = 5
342     SYNC_PORT = 1
343     ASYNC_PORT = 2
344
345     def __init__(self, setup_helper):
346         super(ClientResourceHelper, self).__init__(setup_helper)
347         self.vnfd_helper = setup_helper.vnfd_helper
348         self.scenario_helper = setup_helper.scenario_helper
349
350         self.client = None
351         self.client_started = Value('i', 0)
352         self.all_ports = None
353         self._queue = Queue()
354         self._result = {}
355         self._terminated = Value('i', 0)
356
357     def _build_ports(self):
358         self.networks = self.vnfd_helper.port_pairs.networks
359         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
360         self.downlink_ports = \
361             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
362         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
363
364     def port_num(self, intf):
365         # by default return port num
366         return self.vnfd_helper.port_num(intf)
367
368     def get_stats(self, *args, **kwargs):
369         try:
370             return self.client.get_stats(*args, **kwargs)
371         except STLError:
372             LOG.error('TRex client not connected')
373             return {}
374
375     def _get_samples(self, ports, port_pg_id=False):
376         raise NotImplementedError()
377
378     def _run_traffic_once(self, traffic_profile):
379         traffic_profile.execute_traffic(self)
380         self.client_started.value = 1
381         time.sleep(self.RUN_DURATION)
382         samples = self._get_samples(traffic_profile.ports)
383         time.sleep(self.QUEUE_WAIT_TIME)
384         self._queue.put(samples)
385
386     def run_traffic(self, traffic_profile):
387         # if we don't do this we can hang waiting for the queue to drain
388         # have to do this in the subprocess
389         self._queue.cancel_join_thread()
390         # fixme: fix passing correct trex config file,
391         # instead of searching the default path
392         try:
393             self._build_ports()
394             self.client = self._connect()
395             self.client.reset(ports=self.all_ports)
396             self.client.remove_all_streams(self.all_ports)  # remove all streams
397             traffic_profile.register_generator(self)
398
399             while self._terminated.value == 0:
400                 self._run_traffic_once(traffic_profile)
401
402             self.client.stop(self.all_ports)
403             self.client.disconnect()
404             self._terminated.value = 0
405         except STLError:
406             if self._terminated.value:
407                 LOG.debug("traffic generator is stopped")
408                 return  # return if trex/tg server is stopped.
409             raise
410
411     def terminate(self):
412         self._terminated.value = 1  # stop client
413
414     def clear_stats(self, ports=None):
415         if ports is None:
416             ports = self.all_ports
417         self.client.clear_stats(ports=ports)
418
419     def start(self, ports=None, *args, **kwargs):
420         # pylint: disable=keyword-arg-before-vararg
421         # NOTE(ralonsoh): defining keyworded arguments before variable
422         # positional arguments is a bug. This function definition doesn't work
423         # in Python 2, although it works in Python 3. Reference:
424         # https://www.python.org/dev/peps/pep-3102/
425         if ports is None:
426             ports = self.all_ports
427         self.client.start(ports=ports, *args, **kwargs)
428
429     def collect_kpi(self):
430         if not self._queue.empty():
431             kpi = self._queue.get()
432             self._result.update(kpi)
433             LOG.debug('Got KPIs from _queue for %s %s',
434                       self.scenario_helper.name, self.RESOURCE_WORD)
435         return self._result
436
437     def _connect(self, client=None):
438         if client is None:
439             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
440                                server=self.vnfd_helper.mgmt_interface["ip"],
441                                verbose_level=LoggerApi.VERBOSE_QUIET)
442
443         # try to connect with 5s intervals, 30s max
444         for idx in range(6):
445             try:
446                 client.connect()
447                 break
448             except STLError:
449                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
450                 time.sleep(5)
451         return client
452
453
454 class Rfc2544ResourceHelper(object):
455
456     DEFAULT_CORRELATED_TRAFFIC = False
457     DEFAULT_LATENCY = False
458     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
459
460     def __init__(self, scenario_helper):
461         super(Rfc2544ResourceHelper, self).__init__()
462         self.scenario_helper = scenario_helper
463         self._correlated_traffic = None
464         self.iteration = Value('i', 0)
465         self._latency = None
466         self._rfc2544 = None
467         self._tolerance_low = None
468         self._tolerance_high = None
469
470     @property
471     def rfc2544(self):
472         if self._rfc2544 is None:
473             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
474         return self._rfc2544
475
476     @property
477     def tolerance_low(self):
478         if self._tolerance_low is None:
479             self.get_rfc_tolerance()
480         return self._tolerance_low
481
482     @property
483     def tolerance_high(self):
484         if self._tolerance_high is None:
485             self.get_rfc_tolerance()
486         return self._tolerance_high
487
488     @property
489     def correlated_traffic(self):
490         if self._correlated_traffic is None:
491             self._correlated_traffic = \
492                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
493
494         return self._correlated_traffic
495
496     @property
497     def latency(self):
498         if self._latency is None:
499             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
500         return self._latency
501
502     def get_rfc2544(self, name, default=None):
503         return self.rfc2544.get(name, default)
504
505     def get_rfc_tolerance(self):
506         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
507         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
508         self._tolerance_low = next(tolerance_iter)
509         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
510
511
512 class SampleVNFDeployHelper(object):
513
514     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
515     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
516     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
517
518     def __init__(self, vnfd_helper, ssh_helper):
519         super(SampleVNFDeployHelper, self).__init__()
520         self.ssh_helper = ssh_helper
521         self.vnfd_helper = vnfd_helper
522
523     def deploy_vnfs(self, app_name):
524         vnf_bin = self.ssh_helper.join_bin_path(app_name)
525         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
526         if not exit_status:
527             return
528
529         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
530         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
531         time.sleep(2)
532         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
533         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
534
535         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
536         time.sleep(2)
537         http_proxy = os.environ.get('http_proxy', '')
538         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
539         LOG.debug(cmd)
540         self.ssh_helper.execute(cmd)
541         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
542         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
543         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
544
545
546 class ScenarioHelper(object):
547
548     DEFAULT_VNF_CFG = {
549         'lb_config': 'SW',
550         'lb_count': 1,
551         'worker_config': '1C/1T',
552         'worker_threads': 1,
553     }
554
555     def __init__(self, name):
556         self.name = name
557         self.scenario_cfg = None
558
559     @property
560     def task_path(self):
561         return self.scenario_cfg['task_path']
562
563     @property
564     def nodes(self):
565         return self.scenario_cfg.get('nodes')
566
567     @property
568     def all_options(self):
569         return self.scenario_cfg.get('options', {})
570
571     @property
572     def options(self):
573         return self.all_options.get(self.name, {})
574
575     @property
576     def vnf_cfg(self):
577         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
578
579     @property
580     def topology(self):
581         return self.scenario_cfg['topology']
582
583     @property
584     def timeout(self):
585         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
586             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
587         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
588         return test_duration if test_duration > test_timeout else test_timeout
589
590 class SampleVNF(GenericVNF):
591     """ Class providing file-like API for generic VNF implementation """
592
593     VNF_PROMPT = "pipeline>"
594     WAIT_TIME = 1
595     WAIT_TIME_FOR_SCRIPT = 10
596     APP_NAME = "SampleVNF"
597     # we run the VNF interactively, so the ssh command will timeout after this long
598
599     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
600         super(SampleVNF, self).__init__(name, vnfd)
601         self.bin_path = get_nsb_option('bin_path', '')
602
603         self.scenario_helper = ScenarioHelper(self.name)
604         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
605
606         if setup_env_helper_type is None:
607             setup_env_helper_type = SetupEnvHelper
608
609         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
610                                                   self.ssh_helper,
611                                                   self.scenario_helper)
612
613         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
614
615         if resource_helper_type is None:
616             resource_helper_type = ResourceHelper
617
618         self.resource_helper = resource_helper_type(self.setup_helper)
619
620         self.context_cfg = None
621         self.nfvi_context = None
622         self.pipeline_kwargs = {}
623         self.uplink_ports = None
624         self.downlink_ports = None
625         # NOTE(esm): make QueueFileWrapper invert-able so that we
626         #            never have to manage the queues
627         self.q_in = Queue()
628         self.q_out = Queue()
629         self.queue_wrapper = None
630         self.run_kwargs = {}
631         self.used_drivers = {}
632         self.vnf_port_pairs = None
633         self._vnf_process = None
634
635     def _start_vnf(self):
636         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
637         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
638         self._vnf_process = Process(name=name, target=self._run)
639         self._vnf_process.start()
640
641     def _vnf_up_post(self):
642         pass
643
644     def instantiate(self, scenario_cfg, context_cfg):
645         self._update_collectd_options(scenario_cfg, context_cfg)
646         self.scenario_helper.scenario_cfg = scenario_cfg
647         self.context_cfg = context_cfg
648         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
649         # self.nfvi_context = None
650
651         # vnf deploy is unsupported, use ansible playbooks
652         if self.scenario_helper.options.get("vnf_deploy", False):
653             self.deploy_helper.deploy_vnfs(self.APP_NAME)
654         self.resource_helper.setup()
655         self._start_vnf()
656
657     def _update_collectd_options(self, scenario_cfg, context_cfg):
658         """Update collectd configuration options
659         This function retrieves all collectd options contained in the test case
660
661         definition builds a single dictionary combining them. The following fragment
662         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
663         ---
664         schema: yardstick:task:0.1
665         scenarios:
666         - type: NSPerf
667           nodes:
668             tg__0: trafficgen_1.yardstick
669             vnf__0: vnf.yardstick
670           options:
671             collectd:
672               <options>  # COLLECTD priority 3
673             vnf__0:
674               collectd:
675                 plugins:
676                     load
677                 <options> # COLLECTD priority 2
678         context:
679           type: Node
680           name: yardstick
681           nfvi_type: baremetal
682           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
683         """
684         scenario_options = scenario_cfg.get('options', {})
685         generic_options = scenario_options.get('collectd', {})
686         scenario_node_options = scenario_options.get(self.name, {})\
687             .get('collectd', {})
688         context_node_options = context_cfg.get('nodes', {})\
689             .get(self.name, {}).get('collectd', {})
690
691         options = generic_options
692         self._update_options(options, scenario_node_options)
693         self._update_options(options, context_node_options)
694
695         self.setup_helper.collectd_options = options
696
697     def _update_options(self, options, additional_options):
698         """Update collectd options and plugins dictionary"""
699         for k, v in additional_options.items():
700             if isinstance(v, dict) and k in options:
701                 options[k].update(v)
702             else:
703                 options[k] = v
704
705     def wait_for_instantiate(self):
706         buf = []
707         time.sleep(self.WAIT_TIME)  # Give some time for config to load
708         while True:
709             if not self._vnf_process.is_alive():
710                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
711
712             # NOTE(esm): move to QueueFileWrapper
713             while self.q_out.qsize() > 0:
714                 buf.append(self.q_out.get())
715                 message = ''.join(buf)
716                 if self.VNF_PROMPT in message:
717                     LOG.info("%s VNF is up and running.", self.APP_NAME)
718                     self._vnf_up_post()
719                     self.queue_wrapper.clear()
720                     return self._vnf_process.exitcode
721
722                 if "PANIC" in message:
723                     raise RuntimeError("Error starting %s VNF." %
724                                        self.APP_NAME)
725
726             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
727             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
728             # Send ENTER to display a new prompt in case the prompt text was corrupted
729             # by other VNF output
730             self.q_in.put('\r\n')
731
732     def start_collect(self):
733         self.resource_helper.start_collect()
734
735     def stop_collect(self):
736         self.resource_helper.stop_collect()
737
738     def _build_run_kwargs(self):
739         self.run_kwargs = {
740             'stdin': self.queue_wrapper,
741             'stdout': self.queue_wrapper,
742             'keep_stdin_open': True,
743             'pty': True,
744             'timeout': self.scenario_helper.timeout,
745         }
746
747     def _build_config(self):
748         return self.setup_helper.build_config()
749
750     def _run(self):
751         # we can't share ssh paramiko objects to force new connection
752         self.ssh_helper.drop_connection()
753         cmd = self._build_config()
754         # kill before starting
755         self.setup_helper.kill_vnf()
756
757         LOG.debug(cmd)
758         self._build_run_kwargs()
759         self.ssh_helper.run(cmd, **self.run_kwargs)
760
761     def vnf_execute(self, cmd, wait_time=2):
762         """ send cmd to vnf process """
763
764         LOG.info("%s command: %s", self.APP_NAME, cmd)
765         self.q_in.put("{}\r\n".format(cmd))
766         time.sleep(wait_time)
767         output = []
768         while self.q_out.qsize() > 0:
769             output.append(self.q_out.get())
770         return "".join(output)
771
772     def _tear_down(self):
773         pass
774
775     def terminate(self):
776         self.vnf_execute("quit")
777         self.setup_helper.kill_vnf()
778         self._tear_down()
779         self.resource_helper.stop_collect()
780         if self._vnf_process is not None:
781             # be proper and join first before we kill
782             LOG.debug("joining before terminate %s", self._vnf_process.name)
783             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
784             self._vnf_process.terminate()
785         # no terminate children here because we share processes with tg
786
787     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
788         """Method for checking the statistics
789
790         This method could be overridden in children classes.
791
792         :return: VNF statistics
793         """
794         cmd = 'p {0} stats'.format(self.APP_WORD)
795         out = self.vnf_execute(cmd)
796         return out
797
798     def collect_kpi(self):
799         # we can't get KPIs if the VNF is down
800         check_if_process_failed(self._vnf_process, 0.01)
801         stats = self.get_stats()
802         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
803         if m:
804             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
805             result["collect_stats"] = self.resource_helper.collect_kpi()
806         else:
807             result = {
808                 "packets_in": 0,
809                 "packets_fwd": 0,
810                 "packets_dropped": 0,
811             }
812         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
813         return result
814
815     def scale(self, flavor=""):
816         """The SampleVNF base class doesn't provide the 'scale' feature"""
817         raise y_exceptions.FunctionNotImplemented(
818             function_name='scale', class_name='SampleVNFTrafficGen')
819
820
821 class SampleVNFTrafficGen(GenericTrafficGen):
822     """ Class providing file-like API for generic traffic generator """
823
824     APP_NAME = 'Sample'
825     RUN_WAIT = 1
826
827     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
828         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
829         self.bin_path = get_nsb_option('bin_path', '')
830
831         self.scenario_helper = ScenarioHelper(self.name)
832         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
833
834         if setup_env_helper_type is None:
835             setup_env_helper_type = SetupEnvHelper
836
837         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
838                                                   self.ssh_helper,
839                                                   self.scenario_helper)
840
841         if resource_helper_type is None:
842             resource_helper_type = ClientResourceHelper
843
844         self.resource_helper = resource_helper_type(self.setup_helper)
845
846         self.runs_traffic = True
847         self.traffic_finished = False
848         self._tg_process = None
849         self._traffic_process = None
850
851     def _start_server(self):
852         # we can't share ssh paramiko objects to force new connection
853         self.ssh_helper.drop_connection()
854
855     def instantiate(self, scenario_cfg, context_cfg):
856         self.scenario_helper.scenario_cfg = scenario_cfg
857         self.resource_helper.setup()
858         # must generate_cfg after DPDK bind because we need port number
859         self.resource_helper.generate_cfg()
860
861         LOG.info("Starting %s server...", self.APP_NAME)
862         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
863         self._tg_process = Process(name=name, target=self._start_server)
864         self._tg_process.start()
865
866     def _check_status(self):
867         raise NotImplementedError
868
869     def _wait_for_process(self):
870         while True:
871             if not self._tg_process.is_alive():
872                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
873             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
874             time.sleep(1)
875             status = self._check_status()
876             if status == 0:
877                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
878                 return self._tg_process.exitcode
879
880     def _traffic_runner(self, traffic_profile):
881         # always drop connections first thing in new processes
882         # so we don't get paramiko errors
883         self.ssh_helper.drop_connection()
884         LOG.info("Starting %s client...", self.APP_NAME)
885         self.resource_helper.run_traffic(traffic_profile)
886
887     def run_traffic(self, traffic_profile):
888         """ Generate traffic on the wire according to the given params.
889         Method is non-blocking, returns immediately when traffic process
890         is running. Mandatory.
891
892         :param traffic_profile:
893         :return: True/False
894         """
895         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
896                                     os.getpid())
897         self._traffic_process = Process(name=name, target=self._traffic_runner,
898                                         args=(traffic_profile,))
899         self._traffic_process.start()
900         # Wait for traffic process to start
901         while self.resource_helper.client_started.value == 0:
902             time.sleep(self.RUN_WAIT)
903             # what if traffic process takes a few seconds to start?
904             if not self._traffic_process.is_alive():
905                 break
906
907         return self._traffic_process.is_alive()
908
909     def collect_kpi(self):
910         # check if the tg processes have exited
911         for proc in (self._tg_process, self._traffic_process):
912             check_if_process_failed(proc)
913         result = self.resource_helper.collect_kpi()
914         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
915         return result
916
917     def terminate(self):
918         """ After this method finishes, all traffic processes should stop. Mandatory.
919
920         :return: True/False
921         """
922         self.traffic_finished = True
923         # we must kill client before we kill the server, or the client will raise exception
924         if self._traffic_process is not None:
925             # be proper and try to join before terminating
926             LOG.debug("joining before terminate %s", self._traffic_process.name)
927             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
928             self._traffic_process.terminate()
929         if self._tg_process is not None:
930             # be proper and try to join before terminating
931             LOG.debug("joining before terminate %s", self._tg_process.name)
932             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
933             self._tg_process.terminate()
934         # no terminate children here because we share processes with vnf
935
936     def scale(self, flavor=""):
937         """A traffic generator VFN doesn't provide the 'scale' feature"""
938         raise y_exceptions.FunctionNotImplemented(
939             function_name='scale', class_name='SampleVNFTrafficGen')