Merge "Add NSB PROX Baremetal dashboard for L3"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 from collections import Mapping
17 import logging
18 from multiprocessing import Queue, Value, Process
19
20 import os
21 import posixpath
22 import re
23 import subprocess
24 import time
25
26 import six
27
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
44
45
46 LOG = logging.getLogger(__name__)
47
48
49 class SetupEnvHelper(object):
50
51     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
52     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
53     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
54     PIPELINE_COMMAND = ''
55     VNF_TYPE = "SAMPLE"
56
57     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
58         super(SetupEnvHelper, self).__init__()
59         self.vnfd_helper = vnfd_helper
60         self.ssh_helper = ssh_helper
61         self.scenario_helper = scenario_helper
62         self.collectd_options = {}
63
64     def build_config(self):
65         raise NotImplementedError
66
67     def setup_vnf_environment(self):
68         pass
69
70     def kill_vnf(self):
71         pass
72
73     def tear_down(self):
74         raise NotImplementedError
75
76
77 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
78
79     APP_NAME = 'DpdkVnf'
80     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
81     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
82
83     @staticmethod
84     def _update_packet_type(ip_pipeline_cfg, traffic_options):
85         match_str = 'pkt_type = ipv4'
86         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
87         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
88         return pipeline_config_str
89
90     @classmethod
91     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
92         traffic_type = traffic_options['traffic_type']
93
94         if traffic_options['vnf_type'] is not cls.APP_NAME:
95             match_str = 'traffic_type = 4'
96             replace_str = 'traffic_type = {0}'.format(traffic_type)
97
98         elif traffic_type == 4:
99             match_str = 'pkt_type = ipv4'
100             replace_str = 'pkt_type = ipv4'
101
102         else:
103             match_str = 'pkt_type = ipv4'
104             replace_str = 'pkt_type = ipv6'
105
106         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
107         return pipeline_config_str
108
109     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
110         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
111         self.all_ports = None
112         self.bound_pci = None
113         self.socket = None
114         self.used_drivers = None
115         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
116
117     def _setup_hugepages(self):
118         meminfo = utils.read_meminfo(self.ssh_helper)
119         hp_size_kb = int(meminfo['Hugepagesize'])
120         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
121         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
122         self.ssh_helper.execute('echo %s | sudo tee %s' %
123                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
124         hp = six.BytesIO()
125         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
126         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
127         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
128                  hp_size_kb, nr_hugepages, nr_hugepages_set)
129
130     def build_config(self):
131         vnf_cfg = self.scenario_helper.vnf_cfg
132         task_path = self.scenario_helper.task_path
133
134         config_file = vnf_cfg.get('file')
135         lb_count = vnf_cfg.get('lb_count', 3)
136         lb_config = vnf_cfg.get('lb_config', 'SW')
137         worker_config = vnf_cfg.get('worker_config', '1C/1T')
138         worker_threads = vnf_cfg.get('worker_threads', 3)
139
140         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
141         traffic_options = {
142             'traffic_type': traffic_type,
143             'pkt_type': 'ipv%s' % traffic_type,
144             'vnf_type': self.VNF_TYPE,
145         }
146
147         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
148                                                   task_path)
149         config_basename = posixpath.basename(self.CFG_CONFIG)
150         script_basename = posixpath.basename(self.CFG_SCRIPT)
151         multiport = MultiPortConfig(self.scenario_helper.topology,
152                                     config_tpl_cfg,
153                                     config_basename,
154                                     self.vnfd_helper,
155                                     self.VNF_TYPE,
156                                     lb_count,
157                                     worker_threads,
158                                     worker_config,
159                                     lb_config,
160                                     self.socket)
161
162         multiport.generate_config()
163         if config_file:
164             with utils.open_relative_file(config_file, task_path) as infile:
165                 new_config = ['[EAL]']
166                 vpci = []
167                 for port in self.vnfd_helper.port_pairs.all_ports:
168                     interface = self.vnfd_helper.find_interface(name=port)
169                     vpci.append(interface['virtual-interface']["vpci"])
170                 new_config.extend('w = {0}'.format(item) for item in vpci)
171                 new_config = '\n'.join(new_config) + '\n' + infile.read()
172         else:
173             with open(self.CFG_CONFIG) as handle:
174                 new_config = handle.read()
175             new_config = self._update_traffic_type(new_config, traffic_options)
176             new_config = self._update_packet_type(new_config, traffic_options)
177         self.ssh_helper.upload_config_file(config_basename, new_config)
178         self.ssh_helper.upload_config_file(script_basename,
179                                            multiport.generate_script(self.vnfd_helper))
180
181         LOG.info("Provision and start the %s", self.APP_NAME)
182         self._build_pipeline_kwargs()
183         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
184
185     def _build_pipeline_kwargs(self):
186         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
187         # count the number of actual ports in the list of pairs
188         # remove duplicate ports
189         # this is really a mapping from LINK ID to DPDK PMD ID
190         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
191         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
192         ports = self.vnfd_helper.port_pairs.all_ports
193         port_nums = self.vnfd_helper.port_nums(ports)
194         # create mask from all the dpdk port numbers
195         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
196
197         vnf_cfg = self.scenario_helper.vnf_cfg
198         lb_config = vnf_cfg.get('lb_config', 'SW')
199         worker_threads = vnf_cfg.get('worker_threads', 3)
200         hwlb = ''
201         if lb_config == 'HW':
202             hwlb = ' --hwlb %s' % worker_threads
203
204         self.pipeline_kwargs = {
205             'cfg_file': self.CFG_CONFIG,
206             'script': self.CFG_SCRIPT,
207             'port_mask_hex': ports_mask_hex,
208             'tool_path': tool_path,
209             'hwlb': hwlb,
210         }
211
212     def setup_vnf_environment(self):
213         self._setup_dpdk()
214         self.kill_vnf()
215         # bind before _setup_resources so we can use dpdk_port_num
216         self._detect_and_bind_drivers()
217         resource = self._setup_resources()
218         return resource
219
220     def kill_vnf(self):
221         # pkill is not matching, debug with pgrep
222         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
223         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
224         # have to use exact match
225         # try using killall to match
226         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
227
228     def _setup_dpdk(self):
229         """Setup DPDK environment needed for VNF to run"""
230         self._setup_hugepages()
231         self.dpdk_bind_helper.load_dpdk_driver()
232
233         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
234         if exit_status == 0:
235             return
236
237     def _setup_resources(self):
238         # what is this magic?  how do we know which socket is for which port?
239         # what about quad-socket?
240         if any(v[5] == "0" for v in self.bound_pci):
241             self.socket = 0
242         else:
243             self.socket = 1
244
245         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
246         # this won't work because we don't have DPDK port numbers yet
247         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
248         port_names = (intf["name"] for intf in ports)
249         plugins = self.collectd_options.get("plugins", {})
250         interval = self.collectd_options.get("interval")
251         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
252         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
253                                plugins=plugins, interval=interval,
254                                timeout=self.scenario_helper.timeout)
255
256     def _check_interface_fields(self):
257         num_nodes = len(self.scenario_helper.nodes)
258         # OpenStack instance creation time is probably proportional to the number
259         # of instances
260         timeout = 120 * num_nodes
261         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
262                              self.ssh_helper, timeout)
263         dpdk_node.check()
264
265     def _detect_and_bind_drivers(self):
266         interfaces = self.vnfd_helper.interfaces
267
268         self._check_interface_fields()
269         # check for bound after probe
270         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
271
272         self.dpdk_bind_helper.read_status()
273         self.dpdk_bind_helper.save_used_drivers()
274
275         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
276
277         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
278         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
279             try:
280                 intf = next(v for v in interfaces
281                             if vpci == v['virtual-interface']['vpci'])
282                 # force to int
283                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
284             except:  # pylint: disable=bare-except
285                 pass
286         time.sleep(2)
287
288     def get_local_iface_name_by_vpci(self, vpci):
289         find_net_cmd = self.FIND_NET_CMD.format(vpci)
290         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
291         if exit_status == 0:
292             return stdout
293         return None
294
295     def tear_down(self):
296         self.dpdk_bind_helper.rebind_drivers()
297
298
299 class ResourceHelper(object):
300
301     COLLECT_KPI = ''
302     MAKE_INSTALL = 'cd {0} && make && sudo make install'
303     RESOURCE_WORD = 'sample'
304
305     COLLECT_MAP = {}
306
307     def __init__(self, setup_helper):
308         super(ResourceHelper, self).__init__()
309         self.resource = None
310         self.setup_helper = setup_helper
311         self.ssh_helper = setup_helper.ssh_helper
312
313     def setup(self):
314         self.resource = self.setup_helper.setup_vnf_environment()
315
316     def generate_cfg(self):
317         pass
318
319     def _collect_resource_kpi(self):
320         result = {}
321         status = self.resource.check_if_system_agent_running("collectd")[0]
322         if status == 0:
323             result = self.resource.amqp_collect_nfvi_kpi()
324
325         result = {"core": result}
326         return result
327
328     def start_collect(self):
329         self.resource.initiate_systemagent(self.ssh_helper.bin_path)
330         self.resource.start()
331         self.resource.amqp_process_for_nfvi_kpi()
332
333     def stop_collect(self):
334         if self.resource:
335             self.resource.stop()
336
337     def collect_kpi(self):
338         return self._collect_resource_kpi()
339
340
341 class ClientResourceHelper(ResourceHelper):
342
343     RUN_DURATION = 60
344     QUEUE_WAIT_TIME = 5
345     SYNC_PORT = 1
346     ASYNC_PORT = 2
347
348     def __init__(self, setup_helper):
349         super(ClientResourceHelper, self).__init__(setup_helper)
350         self.vnfd_helper = setup_helper.vnfd_helper
351         self.scenario_helper = setup_helper.scenario_helper
352
353         self.client = None
354         self.client_started = Value('i', 0)
355         self.all_ports = None
356         self._queue = Queue()
357         self._result = {}
358         self._terminated = Value('i', 0)
359
360     def _build_ports(self):
361         self.networks = self.vnfd_helper.port_pairs.networks
362         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
363         self.downlink_ports = \
364             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
365         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
366
367     def port_num(self, intf):
368         # by default return port num
369         return self.vnfd_helper.port_num(intf)
370
371     def get_stats(self, *args, **kwargs):
372         try:
373             return self.client.get_stats(*args, **kwargs)
374         except STLError:
375             LOG.error('TRex client not connected')
376             return {}
377
378     def generate_samples(self, ports, key=None, default=None):
379         # needs to be used ports
380         last_result = self.get_stats(ports)
381         key_value = last_result.get(key, default)
382
383         if not isinstance(last_result, Mapping):  # added for mock unit test
384             self._terminated.value = 1
385             return {}
386
387         samples = {}
388         # recalculate port for interface and see if it matches ports provided
389         for intf in self.vnfd_helper.interfaces:
390             name = intf["name"]
391             port = self.vnfd_helper.port_num(name)
392             if port in ports:
393                 xe_value = last_result.get(port, {})
394                 samples[name] = {
395                     "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
396                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
397                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
398                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
399                     "in_packets": int(xe_value.get("ipackets", 0)),
400                     "out_packets": int(xe_value.get("opackets", 0)),
401                 }
402                 if key:
403                     samples[name][key] = key_value
404         return samples
405
406     def _run_traffic_once(self, traffic_profile):
407         traffic_profile.execute_traffic(self)
408         self.client_started.value = 1
409         time.sleep(self.RUN_DURATION)
410         samples = self.generate_samples(traffic_profile.ports)
411         time.sleep(self.QUEUE_WAIT_TIME)
412         self._queue.put(samples)
413
414     def run_traffic(self, traffic_profile):
415         # if we don't do this we can hang waiting for the queue to drain
416         # have to do this in the subprocess
417         self._queue.cancel_join_thread()
418         # fixme: fix passing correct trex config file,
419         # instead of searching the default path
420         try:
421             self._build_ports()
422             self.client = self._connect()
423             self.client.reset(ports=self.all_ports)
424             self.client.remove_all_streams(self.all_ports)  # remove all streams
425             traffic_profile.register_generator(self)
426
427             while self._terminated.value == 0:
428                 self._run_traffic_once(traffic_profile)
429
430             self.client.stop(self.all_ports)
431             self.client.disconnect()
432             self._terminated.value = 0
433         except STLError:
434             if self._terminated.value:
435                 LOG.debug("traffic generator is stopped")
436                 return  # return if trex/tg server is stopped.
437             raise
438
439     def terminate(self):
440         self._terminated.value = 1  # stop client
441
442     def clear_stats(self, ports=None):
443         if ports is None:
444             ports = self.all_ports
445         self.client.clear_stats(ports=ports)
446
447     def start(self, ports=None, *args, **kwargs):
448         # pylint: disable=keyword-arg-before-vararg
449         # NOTE(ralonsoh): defining keyworded arguments before variable
450         # positional arguments is a bug. This function definition doesn't work
451         # in Python 2, although it works in Python 3. Reference:
452         # https://www.python.org/dev/peps/pep-3102/
453         if ports is None:
454             ports = self.all_ports
455         self.client.start(ports=ports, *args, **kwargs)
456
457     def collect_kpi(self):
458         if not self._queue.empty():
459             kpi = self._queue.get()
460             self._result.update(kpi)
461             LOG.debug('Got KPIs from _queue for %s %s',
462                       self.scenario_helper.name, self.RESOURCE_WORD)
463         return self._result
464
465     def _connect(self, client=None):
466         if client is None:
467             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
468                                server=self.vnfd_helper.mgmt_interface["ip"],
469                                verbose_level=LoggerApi.VERBOSE_QUIET)
470
471         # try to connect with 5s intervals, 30s max
472         for idx in range(6):
473             try:
474                 client.connect()
475                 break
476             except STLError:
477                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
478                 time.sleep(5)
479         return client
480
481
482 class Rfc2544ResourceHelper(object):
483
484     DEFAULT_CORRELATED_TRAFFIC = False
485     DEFAULT_LATENCY = False
486     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
487
488     def __init__(self, scenario_helper):
489         super(Rfc2544ResourceHelper, self).__init__()
490         self.scenario_helper = scenario_helper
491         self._correlated_traffic = None
492         self.iteration = Value('i', 0)
493         self._latency = None
494         self._rfc2544 = None
495         self._tolerance_low = None
496         self._tolerance_high = None
497
498     @property
499     def rfc2544(self):
500         if self._rfc2544 is None:
501             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
502         return self._rfc2544
503
504     @property
505     def tolerance_low(self):
506         if self._tolerance_low is None:
507             self.get_rfc_tolerance()
508         return self._tolerance_low
509
510     @property
511     def tolerance_high(self):
512         if self._tolerance_high is None:
513             self.get_rfc_tolerance()
514         return self._tolerance_high
515
516     @property
517     def correlated_traffic(self):
518         if self._correlated_traffic is None:
519             self._correlated_traffic = \
520                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
521
522         return self._correlated_traffic
523
524     @property
525     def latency(self):
526         if self._latency is None:
527             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
528         return self._latency
529
530     def get_rfc2544(self, name, default=None):
531         return self.rfc2544.get(name, default)
532
533     def get_rfc_tolerance(self):
534         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
535         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
536         self._tolerance_low = next(tolerance_iter)
537         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
538
539
540 class SampleVNFDeployHelper(object):
541
542     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
543     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
544     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
545
546     def __init__(self, vnfd_helper, ssh_helper):
547         super(SampleVNFDeployHelper, self).__init__()
548         self.ssh_helper = ssh_helper
549         self.vnfd_helper = vnfd_helper
550
551     def deploy_vnfs(self, app_name):
552         vnf_bin = self.ssh_helper.join_bin_path(app_name)
553         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
554         if not exit_status:
555             return
556
557         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
558         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
559         time.sleep(2)
560         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
561         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
562
563         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
564         time.sleep(2)
565         http_proxy = os.environ.get('http_proxy', '')
566         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
567         LOG.debug(cmd)
568         self.ssh_helper.execute(cmd)
569         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
570         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
571         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
572
573
574 class ScenarioHelper(object):
575
576     DEFAULT_VNF_CFG = {
577         'lb_config': 'SW',
578         'lb_count': 1,
579         'worker_config': '1C/1T',
580         'worker_threads': 1,
581     }
582
583     def __init__(self, name):
584         self.name = name
585         self.scenario_cfg = None
586
587     @property
588     def task_path(self):
589         return self.scenario_cfg['task_path']
590
591     @property
592     def nodes(self):
593         return self.scenario_cfg.get('nodes')
594
595     @property
596     def all_options(self):
597         return self.scenario_cfg.get('options', {})
598
599     @property
600     def options(self):
601         return self.all_options.get(self.name, {})
602
603     @property
604     def vnf_cfg(self):
605         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
606
607     @property
608     def topology(self):
609         return self.scenario_cfg['topology']
610
611     @property
612     def timeout(self):
613         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
614             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
615         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
616         return test_duration if test_duration > test_timeout else test_timeout
617
618 class SampleVNF(GenericVNF):
619     """ Class providing file-like API for generic VNF implementation """
620
621     VNF_PROMPT = "pipeline>"
622     WAIT_TIME = 1
623     WAIT_TIME_FOR_SCRIPT = 10
624     APP_NAME = "SampleVNF"
625     # we run the VNF interactively, so the ssh command will timeout after this long
626
627     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
628         super(SampleVNF, self).__init__(name, vnfd)
629         self.bin_path = get_nsb_option('bin_path', '')
630
631         self.scenario_helper = ScenarioHelper(self.name)
632         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
633
634         if setup_env_helper_type is None:
635             setup_env_helper_type = SetupEnvHelper
636
637         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
638                                                   self.ssh_helper,
639                                                   self.scenario_helper)
640
641         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
642
643         if resource_helper_type is None:
644             resource_helper_type = ResourceHelper
645
646         self.resource_helper = resource_helper_type(self.setup_helper)
647
648         self.context_cfg = None
649         self.nfvi_context = None
650         self.pipeline_kwargs = {}
651         self.uplink_ports = None
652         self.downlink_ports = None
653         # NOTE(esm): make QueueFileWrapper invert-able so that we
654         #            never have to manage the queues
655         self.q_in = Queue()
656         self.q_out = Queue()
657         self.queue_wrapper = None
658         self.run_kwargs = {}
659         self.used_drivers = {}
660         self.vnf_port_pairs = None
661         self._vnf_process = None
662
663     def _start_vnf(self):
664         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
665         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
666         self._vnf_process = Process(name=name, target=self._run)
667         self._vnf_process.start()
668
669     def _vnf_up_post(self):
670         pass
671
672     def instantiate(self, scenario_cfg, context_cfg):
673         self._update_collectd_options(scenario_cfg, context_cfg)
674         self.scenario_helper.scenario_cfg = scenario_cfg
675         self.context_cfg = context_cfg
676         self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
677         # self.nfvi_context = None
678
679         # vnf deploy is unsupported, use ansible playbooks
680         if self.scenario_helper.options.get("vnf_deploy", False):
681             self.deploy_helper.deploy_vnfs(self.APP_NAME)
682         self.resource_helper.setup()
683         self._start_vnf()
684
685     def _update_collectd_options(self, scenario_cfg, context_cfg):
686         """Update collectd configuration options
687         This function retrieves all collectd options contained in the test case
688
689         definition builds a single dictionary combining them. The following fragment
690         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
691         ---
692         schema: yardstick:task:0.1
693         scenarios:
694         - type: NSPerf
695           nodes:
696             tg__0: trafficgen_1.yardstick
697             vnf__0: vnf.yardstick
698           options:
699             collectd:
700               <options>  # COLLECTD priority 3
701             vnf__0:
702               collectd:
703                 plugins:
704                     load
705                 <options> # COLLECTD priority 2
706         context:
707           type: Node
708           name: yardstick
709           nfvi_type: baremetal
710           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
711         """
712         scenario_options = scenario_cfg.get('options', {})
713         generic_options = scenario_options.get('collectd', {})
714         scenario_node_options = scenario_options.get(self.name, {})\
715             .get('collectd', {})
716         context_node_options = context_cfg.get('nodes', {})\
717             .get(self.name, {}).get('collectd', {})
718
719         options = generic_options
720         self._update_options(options, scenario_node_options)
721         self._update_options(options, context_node_options)
722
723         self.setup_helper.collectd_options = options
724
725     def _update_options(self, options, additional_options):
726         """Update collectd options and plugins dictionary"""
727         for k, v in additional_options.items():
728             if isinstance(v, dict) and k in options:
729                 options[k].update(v)
730             else:
731                 options[k] = v
732
733     def wait_for_instantiate(self):
734         buf = []
735         time.sleep(self.WAIT_TIME)  # Give some time for config to load
736         while True:
737             if not self._vnf_process.is_alive():
738                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
739
740             # NOTE(esm): move to QueueFileWrapper
741             while self.q_out.qsize() > 0:
742                 buf.append(self.q_out.get())
743                 message = ''.join(buf)
744                 if self.VNF_PROMPT in message:
745                     LOG.info("%s VNF is up and running.", self.APP_NAME)
746                     self._vnf_up_post()
747                     self.queue_wrapper.clear()
748                     return self._vnf_process.exitcode
749
750                 if "PANIC" in message:
751                     raise RuntimeError("Error starting %s VNF." %
752                                        self.APP_NAME)
753
754             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
755             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
756             # Send ENTER to display a new prompt in case the prompt text was corrupted
757             # by other VNF output
758             self.q_in.put('\r\n')
759
760     def start_collect(self):
761         self.resource_helper.start_collect()
762
763     def stop_collect(self):
764         self.resource_helper.stop_collect()
765
766     def _build_run_kwargs(self):
767         self.run_kwargs = {
768             'stdin': self.queue_wrapper,
769             'stdout': self.queue_wrapper,
770             'keep_stdin_open': True,
771             'pty': True,
772             'timeout': self.scenario_helper.timeout,
773         }
774
775     def _build_config(self):
776         return self.setup_helper.build_config()
777
778     def _run(self):
779         # we can't share ssh paramiko objects to force new connection
780         self.ssh_helper.drop_connection()
781         cmd = self._build_config()
782         # kill before starting
783         self.setup_helper.kill_vnf()
784
785         LOG.debug(cmd)
786         self._build_run_kwargs()
787         self.ssh_helper.run(cmd, **self.run_kwargs)
788
789     def vnf_execute(self, cmd, wait_time=2):
790         """ send cmd to vnf process """
791
792         LOG.info("%s command: %s", self.APP_NAME, cmd)
793         self.q_in.put("{}\r\n".format(cmd))
794         time.sleep(wait_time)
795         output = []
796         while self.q_out.qsize() > 0:
797             output.append(self.q_out.get())
798         return "".join(output)
799
800     def _tear_down(self):
801         pass
802
803     def terminate(self):
804         self.vnf_execute("quit")
805         self.setup_helper.kill_vnf()
806         self._tear_down()
807         self.resource_helper.stop_collect()
808         if self._vnf_process is not None:
809             # be proper and join first before we kill
810             LOG.debug("joining before terminate %s", self._vnf_process.name)
811             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
812             self._vnf_process.terminate()
813         # no terminate children here because we share processes with tg
814
815     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
816         """Method for checking the statistics
817
818         This method could be overridden in children classes.
819
820         :return: VNF statistics
821         """
822         cmd = 'p {0} stats'.format(self.APP_WORD)
823         out = self.vnf_execute(cmd)
824         return out
825
826     def collect_kpi(self):
827         # we can't get KPIs if the VNF is down
828         check_if_process_failed(self._vnf_process, 0.01)
829         stats = self.get_stats()
830         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
831         if m:
832             result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
833             result["collect_stats"] = self.resource_helper.collect_kpi()
834         else:
835             result = {
836                 "packets_in": 0,
837                 "packets_fwd": 0,
838                 "packets_dropped": 0,
839             }
840         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
841         return result
842
843     def scale(self, flavor=""):
844         """The SampleVNF base class doesn't provide the 'scale' feature"""
845         raise y_exceptions.FunctionNotImplemented(
846             function_name='scale', class_name='SampleVNFTrafficGen')
847
848
849 class SampleVNFTrafficGen(GenericTrafficGen):
850     """ Class providing file-like API for generic traffic generator """
851
852     APP_NAME = 'Sample'
853     RUN_WAIT = 1
854
855     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
856         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
857         self.bin_path = get_nsb_option('bin_path', '')
858
859         self.scenario_helper = ScenarioHelper(self.name)
860         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
861
862         if setup_env_helper_type is None:
863             setup_env_helper_type = SetupEnvHelper
864
865         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
866                                                   self.ssh_helper,
867                                                   self.scenario_helper)
868
869         if resource_helper_type is None:
870             resource_helper_type = ClientResourceHelper
871
872         self.resource_helper = resource_helper_type(self.setup_helper)
873
874         self.runs_traffic = True
875         self.traffic_finished = False
876         self._tg_process = None
877         self._traffic_process = None
878
879     def _start_server(self):
880         # we can't share ssh paramiko objects to force new connection
881         self.ssh_helper.drop_connection()
882
883     def instantiate(self, scenario_cfg, context_cfg):
884         self.scenario_helper.scenario_cfg = scenario_cfg
885         self.resource_helper.setup()
886         # must generate_cfg after DPDK bind because we need port number
887         self.resource_helper.generate_cfg()
888
889         LOG.info("Starting %s server...", self.APP_NAME)
890         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
891         self._tg_process = Process(name=name, target=self._start_server)
892         self._tg_process.start()
893
894     def _check_status(self):
895         raise NotImplementedError
896
897     def _wait_for_process(self):
898         while True:
899             if not self._tg_process.is_alive():
900                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
901             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
902             time.sleep(1)
903             status = self._check_status()
904             if status == 0:
905                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
906                 return self._tg_process.exitcode
907
908     def _traffic_runner(self, traffic_profile):
909         # always drop connections first thing in new processes
910         # so we don't get paramiko errors
911         self.ssh_helper.drop_connection()
912         LOG.info("Starting %s client...", self.APP_NAME)
913         self.resource_helper.run_traffic(traffic_profile)
914
915     def run_traffic(self, traffic_profile):
916         """ Generate traffic on the wire according to the given params.
917         Method is non-blocking, returns immediately when traffic process
918         is running. Mandatory.
919
920         :param traffic_profile:
921         :return: True/False
922         """
923         name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
924                                     os.getpid())
925         self._traffic_process = Process(name=name, target=self._traffic_runner,
926                                         args=(traffic_profile,))
927         self._traffic_process.start()
928         # Wait for traffic process to start
929         while self.resource_helper.client_started.value == 0:
930             time.sleep(self.RUN_WAIT)
931             # what if traffic process takes a few seconds to start?
932             if not self._traffic_process.is_alive():
933                 break
934
935         return self._traffic_process.is_alive()
936
937     def collect_kpi(self):
938         # check if the tg processes have exited
939         for proc in (self._tg_process, self._traffic_process):
940             check_if_process_failed(proc)
941         result = self.resource_helper.collect_kpi()
942         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
943         return result
944
945     def terminate(self):
946         """ After this method finishes, all traffic processes should stop. Mandatory.
947
948         :return: True/False
949         """
950         self.traffic_finished = True
951         # we must kill client before we kill the server, or the client will raise exception
952         if self._traffic_process is not None:
953             # be proper and try to join before terminating
954             LOG.debug("joining before terminate %s", self._traffic_process.name)
955             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
956             self._traffic_process.terminate()
957         if self._tg_process is not None:
958             # be proper and try to join before terminating
959             LOG.debug("joining before terminate %s", self._tg_process.name)
960             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
961             self._tg_process.terminate()
962         # no terminate children here because we share processes with vnf
963
964     def scale(self, flavor=""):
965         """A traffic generator VFN doesn't provide the 'scale' feature"""
966         raise y_exceptions.FunctionNotImplemented(
967             function_name='scale', class_name='SampleVNFTrafficGen')