Create a SampleVNF MQ consumer class
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
1 # Copyright (c) 2016-2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 from multiprocessing import Queue, Value, Process
17 import os
18 import posixpath
19 import re
20 import uuid
21 import subprocess
22 import time
23
24 import six
25
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.common import exceptions as y_exceptions
31 from yardstick.common.process import check_if_process_failed
32 from yardstick.common import utils
33 from yardstick.common import yaml_loader
34 from yardstick.network_services import constants
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
36 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
37 from yardstick.network_services.nfvi.resource import ResourceProfile
38 from yardstick.network_services.utils import get_nsb_option
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
42 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
43 from yardstick.benchmark.contexts.node import NodeContext
44
45 LOG = logging.getLogger(__name__)
46
47
48 class SetupEnvHelper(object):
49
50     CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
51     CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
52     DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
53     PIPELINE_COMMAND = ''
54     VNF_TYPE = "SAMPLE"
55
56     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
57         super(SetupEnvHelper, self).__init__()
58         self.vnfd_helper = vnfd_helper
59         self.ssh_helper = ssh_helper
60         self.scenario_helper = scenario_helper
61         self.collectd_options = {}
62
63     def build_config(self):
64         raise NotImplementedError
65
66     def setup_vnf_environment(self):
67         pass
68
69     def kill_vnf(self):
70         pass
71
72     def tear_down(self):
73         raise NotImplementedError
74
75
76 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
77
78     APP_NAME = 'DpdkVnf'
79     FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
80     NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
81
82     @staticmethod
83     def _update_packet_type(ip_pipeline_cfg, traffic_options):
84         match_str = 'pkt_type = ipv4'
85         replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
86         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
87         return pipeline_config_str
88
89     @classmethod
90     def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
91         traffic_type = traffic_options['traffic_type']
92
93         if traffic_options['vnf_type'] is not cls.APP_NAME:
94             match_str = 'traffic_type = 4'
95             replace_str = 'traffic_type = {0}'.format(traffic_type)
96
97         elif traffic_type == 4:
98             match_str = 'pkt_type = ipv4'
99             replace_str = 'pkt_type = ipv4'
100
101         else:
102             match_str = 'pkt_type = ipv4'
103             replace_str = 'pkt_type = ipv6'
104
105         pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
106         return pipeline_config_str
107
108     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
109         super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
110         self.all_ports = None
111         self.bound_pci = None
112         self.socket = None
113         self.used_drivers = None
114         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115
116     def _setup_hugepages(self):
117         meminfo = utils.read_meminfo(self.ssh_helper)
118         hp_size_kb = int(meminfo['Hugepagesize'])
119         hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
120         nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
121         self.ssh_helper.execute('echo %s | sudo tee %s' %
122                                 (nr_hugepages, self.NR_HUGEPAGES_PATH))
123         hp = six.BytesIO()
124         self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
125         nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
126         LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
127                  hp_size_kb, nr_hugepages, nr_hugepages_set)
128
129     def build_config(self):
130         vnf_cfg = self.scenario_helper.vnf_cfg
131         task_path = self.scenario_helper.task_path
132
133         config_file = vnf_cfg.get('file')
134         lb_count = vnf_cfg.get('lb_count', 3)
135         lb_config = vnf_cfg.get('lb_config', 'SW')
136         worker_config = vnf_cfg.get('worker_config', '1C/1T')
137         worker_threads = vnf_cfg.get('worker_threads', 3)
138
139         traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
140         traffic_options = {
141             'traffic_type': traffic_type,
142             'pkt_type': 'ipv%s' % traffic_type,
143             'vnf_type': self.VNF_TYPE,
144         }
145
146         # read actions/rules from file
147         acl_options = None
148         acl_file_name = self.scenario_helper.options.get('rules')
149         if acl_file_name:
150             with utils.open_relative_file(acl_file_name, task_path) as infile:
151                 acl_options = yaml_loader.yaml_load(infile)
152
153         config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
154                                                   task_path)
155         config_basename = posixpath.basename(self.CFG_CONFIG)
156         script_basename = posixpath.basename(self.CFG_SCRIPT)
157         multiport = MultiPortConfig(self.scenario_helper.topology,
158                                     config_tpl_cfg,
159                                     config_basename,
160                                     self.vnfd_helper,
161                                     self.VNF_TYPE,
162                                     lb_count,
163                                     worker_threads,
164                                     worker_config,
165                                     lb_config,
166                                     self.socket)
167
168         multiport.generate_config()
169         if config_file:
170             with utils.open_relative_file(config_file, task_path) as infile:
171                 new_config = ['[EAL]']
172                 vpci = []
173                 for port in self.vnfd_helper.port_pairs.all_ports:
174                     interface = self.vnfd_helper.find_interface(name=port)
175                     vpci.append(interface['virtual-interface']["vpci"])
176                 new_config.extend('w = {0}'.format(item) for item in vpci)
177                 new_config = '\n'.join(new_config) + '\n' + infile.read()
178         else:
179             with open(self.CFG_CONFIG) as handle:
180                 new_config = handle.read()
181             new_config = self._update_traffic_type(new_config, traffic_options)
182             new_config = self._update_packet_type(new_config, traffic_options)
183         self.ssh_helper.upload_config_file(config_basename, new_config)
184         self.ssh_helper.upload_config_file(script_basename,
185             multiport.generate_script(self.vnfd_helper,
186                                       self.get_flows_config(acl_options)))
187
188         LOG.info("Provision and start the %s", self.APP_NAME)
189         self._build_pipeline_kwargs()
190         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
191
192     def get_flows_config(self, options=None): # pylint: disable=unused-argument
193         """No actions/rules (flows) by default"""
194         return None
195
196     def _build_pipeline_kwargs(self):
197         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
198         # count the number of actual ports in the list of pairs
199         # remove duplicate ports
200         # this is really a mapping from LINK ID to DPDK PMD ID
201         # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
202         #      0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
203         ports = self.vnfd_helper.port_pairs.all_ports
204         port_nums = self.vnfd_helper.port_nums(ports)
205         # create mask from all the dpdk port numbers
206         ports_mask_hex = hex(sum(2 ** num for num in port_nums))
207
208         vnf_cfg = self.scenario_helper.vnf_cfg
209         lb_config = vnf_cfg.get('lb_config', 'SW')
210         worker_threads = vnf_cfg.get('worker_threads', 3)
211         hwlb = ''
212         if lb_config == 'HW':
213             hwlb = ' --hwlb %s' % worker_threads
214
215         self.pipeline_kwargs = {
216             'cfg_file': self.CFG_CONFIG,
217             'script': self.CFG_SCRIPT,
218             'port_mask_hex': ports_mask_hex,
219             'tool_path': tool_path,
220             'hwlb': hwlb,
221         }
222
223     def setup_vnf_environment(self):
224         self._setup_dpdk()
225         self.kill_vnf()
226         # bind before _setup_resources so we can use dpdk_port_num
227         self._detect_and_bind_drivers()
228         resource = self._setup_resources()
229         return resource
230
231     def kill_vnf(self):
232         # pkill is not matching, debug with pgrep
233         self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
234         self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
235         # have to use exact match
236         # try using killall to match
237         self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
238
239     def _setup_dpdk(self):
240         """Setup DPDK environment needed for VNF to run"""
241         self._setup_hugepages()
242         self.dpdk_bind_helper.load_dpdk_driver()
243
244         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
245         if exit_status == 0:
246             return
247
248     def _setup_resources(self):
249         # what is this magic?  how do we know which socket is for which port?
250         # what about quad-socket?
251         if any(v[5] == "0" for v in self.bound_pci):
252             self.socket = 0
253         else:
254             self.socket = 1
255
256         # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
257         # this won't work because we don't have DPDK port numbers yet
258         ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
259         port_names = (intf["name"] for intf in ports)
260         plugins = self.collectd_options.get("plugins", {})
261         interval = self.collectd_options.get("interval")
262         # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
263         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
264                                plugins=plugins, interval=interval,
265                                timeout=self.scenario_helper.timeout)
266
267     def _check_interface_fields(self):
268         num_nodes = len(self.scenario_helper.nodes)
269         # OpenStack instance creation time is probably proportional to the number
270         # of instances
271         timeout = 120 * num_nodes
272         dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
273                              self.ssh_helper, timeout)
274         dpdk_node.check()
275
276     def _detect_and_bind_drivers(self):
277         interfaces = self.vnfd_helper.interfaces
278
279         self._check_interface_fields()
280         # check for bound after probe
281         self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
282
283         self.dpdk_bind_helper.read_status()
284         self.dpdk_bind_helper.save_used_drivers()
285
286         self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
287
288         sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
289         for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
290             try:
291                 intf = next(v for v in interfaces
292                             if vpci == v['virtual-interface']['vpci'])
293                 # force to int
294                 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
295             except:  # pylint: disable=bare-except
296                 pass
297         time.sleep(2)
298
299     def get_local_iface_name_by_vpci(self, vpci):
300         find_net_cmd = self.FIND_NET_CMD.format(vpci)
301         exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
302         if exit_status == 0:
303             return stdout
304         return None
305
306     def tear_down(self):
307         self.dpdk_bind_helper.rebind_drivers()
308
309
310 class ResourceHelper(object):
311
312     COLLECT_KPI = ''
313     MAKE_INSTALL = 'cd {0} && make && sudo make install'
314     RESOURCE_WORD = 'sample'
315
316     COLLECT_MAP = {}
317
318     def __init__(self, setup_helper):
319         super(ResourceHelper, self).__init__()
320         self.resource = None
321         self.setup_helper = setup_helper
322         self.ssh_helper = setup_helper.ssh_helper
323         self._enable = True
324
325     def setup(self):
326         self.resource = self.setup_helper.setup_vnf_environment()
327
328     def generate_cfg(self):
329         pass
330
331     def update_from_context(self, context, attr_name):
332         """Disable resource helper in case of baremetal context.
333
334         And update appropriate node collectd options in context
335         """
336         if isinstance(context, NodeContext):
337             self._enable = False
338             context.update_collectd_options_for_node(self.setup_helper.collectd_options,
339                                                      attr_name)
340
341     def _collect_resource_kpi(self):
342         result = {}
343         status = self.resource.check_if_system_agent_running("collectd")[0]
344         if status == 0 and self._enable:
345             result = self.resource.amqp_collect_nfvi_kpi()
346
347         result = {"core": result}
348         return result
349
350     def start_collect(self):
351         if self._enable:
352             self.resource.initiate_systemagent(self.ssh_helper.bin_path)
353             self.resource.start()
354             self.resource.amqp_process_for_nfvi_kpi()
355
356     def stop_collect(self):
357         if self.resource and self._enable:
358             self.resource.stop()
359
360     def collect_kpi(self):
361         return self._collect_resource_kpi()
362
363
364 class ClientResourceHelper(ResourceHelper):
365
366     RUN_DURATION = 60
367     QUEUE_WAIT_TIME = 5
368     SYNC_PORT = 1
369     ASYNC_PORT = 2
370
371     def __init__(self, setup_helper):
372         super(ClientResourceHelper, self).__init__(setup_helper)
373         self.vnfd_helper = setup_helper.vnfd_helper
374         self.scenario_helper = setup_helper.scenario_helper
375
376         self.client = None
377         self.client_started = Value('i', 0)
378         self.all_ports = None
379         self._queue = Queue()
380         self._result = {}
381         self._terminated = Value('i', 0)
382
383     def _build_ports(self):
384         self.networks = self.vnfd_helper.port_pairs.networks
385         self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
386         self.downlink_ports = \
387             self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
388         self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
389
390     def port_num(self, intf):
391         # by default return port num
392         return self.vnfd_helper.port_num(intf)
393
394     def get_stats(self, *args, **kwargs):
395         try:
396             return self.client.get_stats(*args, **kwargs)
397         except STLError:
398             LOG.error('TRex client not connected')
399             return {}
400
401     def _get_samples(self, ports, port_pg_id=False):
402         raise NotImplementedError()
403
404     def _run_traffic_once(self, traffic_profile):
405         traffic_profile.execute_traffic(self)
406         self.client_started.value = 1
407         time.sleep(self.RUN_DURATION)
408         samples = self._get_samples(traffic_profile.ports)
409         time.sleep(self.QUEUE_WAIT_TIME)
410         self._queue.put(samples)
411
412     def run_traffic(self, traffic_profile, mq_producer):
413         # if we don't do this we can hang waiting for the queue to drain
414         # have to do this in the subprocess
415         self._queue.cancel_join_thread()
416         # fixme: fix passing correct trex config file,
417         # instead of searching the default path
418         mq_producer.tg_method_started()
419         try:
420             self._build_ports()
421             self.client = self._connect()
422             self.client.reset(ports=self.all_ports)
423             self.client.remove_all_streams(self.all_ports)  # remove all streams
424             traffic_profile.register_generator(self)
425
426             iteration_index = 0
427             while self._terminated.value == 0:
428                 iteration_index += 1
429                 self._run_traffic_once(traffic_profile)
430                 mq_producer.tg_method_iteration(iteration_index)
431
432             self.client.stop(self.all_ports)
433             self.client.disconnect()
434             self._terminated.value = 0
435         except STLError:
436             if self._terminated.value:
437                 LOG.debug("traffic generator is stopped")
438                 return  # return if trex/tg server is stopped.
439             raise
440
441         mq_producer.tg_method_finished()
442
443     def terminate(self):
444         self._terminated.value = 1  # stop client
445
446     def clear_stats(self, ports=None):
447         if ports is None:
448             ports = self.all_ports
449         self.client.clear_stats(ports=ports)
450
451     def start(self, ports=None, *args, **kwargs):
452         # pylint: disable=keyword-arg-before-vararg
453         # NOTE(ralonsoh): defining keyworded arguments before variable
454         # positional arguments is a bug. This function definition doesn't work
455         # in Python 2, although it works in Python 3. Reference:
456         # https://www.python.org/dev/peps/pep-3102/
457         if ports is None:
458             ports = self.all_ports
459         self.client.start(ports=ports, *args, **kwargs)
460
461     def collect_kpi(self):
462         if not self._queue.empty():
463             kpi = self._queue.get()
464             self._result.update(kpi)
465             LOG.debug('Got KPIs from _queue for %s %s',
466                       self.scenario_helper.name, self.RESOURCE_WORD)
467         return self._result
468
469     def _connect(self, client=None):
470         if client is None:
471             client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
472                                server=self.vnfd_helper.mgmt_interface["ip"],
473                                verbose_level=LoggerApi.VERBOSE_QUIET)
474
475         # try to connect with 5s intervals, 30s max
476         for idx in range(6):
477             try:
478                 client.connect()
479                 break
480             except STLError:
481                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
482                 time.sleep(5)
483         return client
484
485
486 class Rfc2544ResourceHelper(object):
487
488     DEFAULT_CORRELATED_TRAFFIC = False
489     DEFAULT_LATENCY = False
490     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
491
492     def __init__(self, scenario_helper):
493         super(Rfc2544ResourceHelper, self).__init__()
494         self.scenario_helper = scenario_helper
495         self._correlated_traffic = None
496         self.iteration = Value('i', 0)
497         self._latency = None
498         self._rfc2544 = None
499         self._tolerance_low = None
500         self._tolerance_high = None
501
502     @property
503     def rfc2544(self):
504         if self._rfc2544 is None:
505             self._rfc2544 = self.scenario_helper.all_options['rfc2544']
506         return self._rfc2544
507
508     @property
509     def tolerance_low(self):
510         if self._tolerance_low is None:
511             self.get_rfc_tolerance()
512         return self._tolerance_low
513
514     @property
515     def tolerance_high(self):
516         if self._tolerance_high is None:
517             self.get_rfc_tolerance()
518         return self._tolerance_high
519
520     @property
521     def correlated_traffic(self):
522         if self._correlated_traffic is None:
523             self._correlated_traffic = \
524                 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
525
526         return self._correlated_traffic
527
528     @property
529     def latency(self):
530         if self._latency is None:
531             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
532         return self._latency
533
534     def get_rfc2544(self, name, default=None):
535         return self.rfc2544.get(name, default)
536
537     def get_rfc_tolerance(self):
538         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
539         tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
540         self._tolerance_low = next(tolerance_iter)
541         self._tolerance_high = next(tolerance_iter, self.tolerance_low)
542
543
544 class SampleVNFDeployHelper(object):
545
546     SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
547     REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
548     SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
549
550     def __init__(self, vnfd_helper, ssh_helper):
551         super(SampleVNFDeployHelper, self).__init__()
552         self.ssh_helper = ssh_helper
553         self.vnfd_helper = vnfd_helper
554
555     def deploy_vnfs(self, app_name):
556         vnf_bin = self.ssh_helper.join_bin_path(app_name)
557         exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
558         if not exit_status:
559             return
560
561         subprocess.check_output(["rm", "-rf", self.REPO_NAME])
562         subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
563         time.sleep(2)
564         self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
565         self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
566
567         build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
568         time.sleep(2)
569         http_proxy = os.environ.get('http_proxy', '')
570         cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
571         LOG.debug(cmd)
572         self.ssh_helper.execute(cmd)
573         vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
574         self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
575         self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
576
577
578 class ScenarioHelper(object):
579
580     DEFAULT_VNF_CFG = {
581         'lb_config': 'SW',
582         'lb_count': 1,
583         'worker_config': '1C/1T',
584         'worker_threads': 1,
585     }
586
587     def __init__(self, name):
588         self.name = name
589         self.scenario_cfg = None
590
591     @property
592     def task_path(self):
593         return self.scenario_cfg['task_path']
594
595     @property
596     def nodes(self):
597         return self.scenario_cfg.get('nodes')
598
599     @property
600     def all_options(self):
601         return self.scenario_cfg.get('options', {})
602
603     @property
604     def options(self):
605         return self.all_options.get(self.name, {})
606
607     @property
608     def vnf_cfg(self):
609         return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
610
611     @property
612     def topology(self):
613         return self.scenario_cfg['topology']
614
615     @property
616     def timeout(self):
617         test_duration = self.scenario_cfg.get('runner', {}).get('duration',
618             self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
619         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
620         return test_duration if test_duration > test_timeout else test_timeout
621
622
623 class SampleVNF(GenericVNF):
624     """ Class providing file-like API for generic VNF implementation """
625
626     VNF_PROMPT = "pipeline>"
627     WAIT_TIME = 1
628     WAIT_TIME_FOR_SCRIPT = 10
629     APP_NAME = "SampleVNF"
630     # we run the VNF interactively, so the ssh command will timeout after this long
631
632     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
633                  resource_helper_type=None):
634         super(SampleVNF, self).__init__(name, vnfd, task_id)
635         self.bin_path = get_nsb_option('bin_path', '')
636
637         self.scenario_helper = ScenarioHelper(self.name)
638         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
639
640         if setup_env_helper_type is None:
641             setup_env_helper_type = SetupEnvHelper
642
643         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
644                                                   self.ssh_helper,
645                                                   self.scenario_helper)
646
647         self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
648
649         if resource_helper_type is None:
650             resource_helper_type = ResourceHelper
651
652         self.resource_helper = resource_helper_type(self.setup_helper)
653
654         self.context_cfg = None
655         self.pipeline_kwargs = {}
656         self.uplink_ports = None
657         self.downlink_ports = None
658         # NOTE(esm): make QueueFileWrapper invert-able so that we
659         #            never have to manage the queues
660         self.q_in = Queue()
661         self.q_out = Queue()
662         self.queue_wrapper = None
663         self.run_kwargs = {}
664         self.used_drivers = {}
665         self.vnf_port_pairs = None
666         self._vnf_process = None
667
668     def _start_vnf(self):
669         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
670         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
671         self._vnf_process = Process(name=name, target=self._run)
672         self._vnf_process.start()
673
674     def _vnf_up_post(self):
675         pass
676
677     def instantiate(self, scenario_cfg, context_cfg):
678         self._update_collectd_options(scenario_cfg, context_cfg)
679         self.scenario_helper.scenario_cfg = scenario_cfg
680         self.context_cfg = context_cfg
681         self.resource_helper.update_from_context(
682             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
683             self.scenario_helper.nodes[self.name]
684         )
685
686         # vnf deploy is unsupported, use ansible playbooks
687         if self.scenario_helper.options.get("vnf_deploy", False):
688             self.deploy_helper.deploy_vnfs(self.APP_NAME)
689         self.resource_helper.setup()
690         self._start_vnf()
691
692     def _update_collectd_options(self, scenario_cfg, context_cfg):
693         """Update collectd configuration options
694         This function retrieves all collectd options contained in the test case
695
696         definition builds a single dictionary combining them. The following fragment
697         represents a test case with the collectd options and priorities (1 highest, 3 lowest):
698         ---
699         schema: yardstick:task:0.1
700         scenarios:
701         - type: NSPerf
702           nodes:
703             tg__0: trafficgen_1.yardstick
704             vnf__0: vnf.yardstick
705           options:
706             collectd:
707               <options>  # COLLECTD priority 3
708             vnf__0:
709               collectd:
710                 plugins:
711                     load
712                 <options> # COLLECTD priority 2
713         context:
714           type: Node
715           name: yardstick
716           nfvi_type: baremetal
717           file: /etc/yardstick/nodes/pod_ixia.yaml  # COLLECTD priority 1
718         """
719         scenario_options = scenario_cfg.get('options', {})
720         generic_options = scenario_options.get('collectd', {})
721         scenario_node_options = scenario_options.get(self.name, {})\
722             .get('collectd', {})
723         context_node_options = context_cfg.get('nodes', {})\
724             .get(self.name, {}).get('collectd', {})
725
726         options = generic_options
727         self._update_options(options, scenario_node_options)
728         self._update_options(options, context_node_options)
729
730         self.setup_helper.collectd_options = options
731
732     def _update_options(self, options, additional_options):
733         """Update collectd options and plugins dictionary"""
734         for k, v in additional_options.items():
735             if isinstance(v, dict) and k in options:
736                 options[k].update(v)
737             else:
738                 options[k] = v
739
740     def wait_for_instantiate(self):
741         buf = []
742         time.sleep(self.WAIT_TIME)  # Give some time for config to load
743         while True:
744             if not self._vnf_process.is_alive():
745                 raise RuntimeError("%s VNF process died." % self.APP_NAME)
746
747             # NOTE(esm): move to QueueFileWrapper
748             while self.q_out.qsize() > 0:
749                 buf.append(self.q_out.get())
750                 message = ''.join(buf)
751                 if self.VNF_PROMPT in message:
752                     LOG.info("%s VNF is up and running.", self.APP_NAME)
753                     self._vnf_up_post()
754                     self.queue_wrapper.clear()
755                     return self._vnf_process.exitcode
756
757                 if "PANIC" in message:
758                     raise RuntimeError("Error starting %s VNF." %
759                                        self.APP_NAME)
760
761             LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
762             time.sleep(self.WAIT_TIME_FOR_SCRIPT)
763             # Send ENTER to display a new prompt in case the prompt text was corrupted
764             # by other VNF output
765             self.q_in.put('\r\n')
766
767     def start_collect(self):
768         self.resource_helper.start_collect()
769
770     def stop_collect(self):
771         self.resource_helper.stop_collect()
772
773     def _build_run_kwargs(self):
774         self.run_kwargs = {
775             'stdin': self.queue_wrapper,
776             'stdout': self.queue_wrapper,
777             'keep_stdin_open': True,
778             'pty': True,
779             'timeout': self.scenario_helper.timeout,
780         }
781
782     def _build_config(self):
783         return self.setup_helper.build_config()
784
785     def _run(self):
786         # we can't share ssh paramiko objects to force new connection
787         self.ssh_helper.drop_connection()
788         cmd = self._build_config()
789         # kill before starting
790         self.setup_helper.kill_vnf()
791
792         LOG.debug(cmd)
793         self._build_run_kwargs()
794         self.ssh_helper.run(cmd, **self.run_kwargs)
795
796     def vnf_execute(self, cmd, wait_time=2):
797         """ send cmd to vnf process """
798
799         LOG.info("%s command: %s", self.APP_NAME, cmd)
800         self.q_in.put("{}\r\n".format(cmd))
801         time.sleep(wait_time)
802         output = []
803         while self.q_out.qsize() > 0:
804             output.append(self.q_out.get())
805         return "".join(output)
806
807     def _tear_down(self):
808         pass
809
810     def terminate(self):
811         self.vnf_execute("quit")
812         self.setup_helper.kill_vnf()
813         self._tear_down()
814         self.resource_helper.stop_collect()
815         if self._vnf_process is not None:
816             # be proper and join first before we kill
817             LOG.debug("joining before terminate %s", self._vnf_process.name)
818             self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
819             self._vnf_process.terminate()
820         # no terminate children here because we share processes with tg
821
822     def get_stats(self, *args, **kwargs):  # pylint: disable=unused-argument
823         """Method for checking the statistics
824
825         This method could be overridden in children classes.
826
827         :return: VNF statistics
828         """
829         cmd = 'p {0} stats'.format(self.APP_WORD)
830         out = self.vnf_execute(cmd)
831         return out
832
833     def collect_kpi(self):
834         # we can't get KPIs if the VNF is down
835         check_if_process_failed(self._vnf_process, 0.01)
836         stats = self.get_stats()
837         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
838         physical_node = Context.get_physical_node_from_server(
839             self.scenario_helper.nodes[self.name])
840
841         result = {"physical_node": physical_node}
842         if m:
843             result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
844             result["collect_stats"] = self.resource_helper.collect_kpi()
845         else:
846             result.update({"packets_in": 0,
847                            "packets_fwd": 0,
848                            "packets_dropped": 0})
849
850         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
851         return result
852
853     def scale(self, flavor=""):
854         """The SampleVNF base class doesn't provide the 'scale' feature"""
855         raise y_exceptions.FunctionNotImplemented(
856             function_name='scale', class_name='SampleVNFTrafficGen')
857
858
859 class SampleVNFTrafficGen(GenericTrafficGen):
860     """ Class providing file-like API for generic traffic generator """
861
862     APP_NAME = 'Sample'
863     RUN_WAIT = 1
864
865     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
866                  resource_helper_type=None):
867         super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
868         self.bin_path = get_nsb_option('bin_path', '')
869
870         self.scenario_helper = ScenarioHelper(self.name)
871         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
872
873         if setup_env_helper_type is None:
874             setup_env_helper_type = SetupEnvHelper
875
876         self.setup_helper = setup_env_helper_type(self.vnfd_helper,
877                                                   self.ssh_helper,
878                                                   self.scenario_helper)
879
880         if resource_helper_type is None:
881             resource_helper_type = ClientResourceHelper
882
883         self.resource_helper = resource_helper_type(self.setup_helper)
884
885         self.runs_traffic = True
886         self.traffic_finished = False
887         self._tg_process = None
888         self._traffic_process = None
889
890     def _start_server(self):
891         # we can't share ssh paramiko objects to force new connection
892         self.ssh_helper.drop_connection()
893
894     def instantiate(self, scenario_cfg, context_cfg):
895         self.scenario_helper.scenario_cfg = scenario_cfg
896         self.resource_helper.update_from_context(
897             Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
898             self.scenario_helper.nodes[self.name]
899         )
900
901         self.resource_helper.setup()
902         # must generate_cfg after DPDK bind because we need port number
903         self.resource_helper.generate_cfg()
904
905         LOG.info("Starting %s server...", self.APP_NAME)
906         name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
907         self._tg_process = Process(name=name, target=self._start_server)
908         self._tg_process.start()
909
910     def _check_status(self):
911         raise NotImplementedError
912
913     def _wait_for_process(self):
914         while True:
915             if not self._tg_process.is_alive():
916                 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
917             LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
918             time.sleep(1)
919             status = self._check_status()
920             if status == 0:
921                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
922                 return self._tg_process.exitcode
923
924     def _traffic_runner(self, traffic_profile, mq_id):
925         # always drop connections first thing in new processes
926         # so we don't get paramiko errors
927         self.ssh_helper.drop_connection()
928         LOG.info("Starting %s client...", self.APP_NAME)
929         self._mq_producer = self._setup_mq_producer(mq_id)
930         self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
931
932     def run_traffic(self, traffic_profile):
933         """ Generate traffic on the wire according to the given params.
934         Method is non-blocking, returns immediately when traffic process
935         is running. Mandatory.
936
937         :param traffic_profile:
938         :return: True/False
939         """
940         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
941                                     traffic_profile.__class__.__name__,
942                                     os.getpid())
943         self._traffic_process = Process(
944             name=name, target=self._traffic_runner,
945             args=(traffic_profile, uuid.uuid1().int))
946         self._traffic_process.start()
947         # Wait for traffic process to start
948         while self.resource_helper.client_started.value == 0:
949             time.sleep(self.RUN_WAIT)
950             # what if traffic process takes a few seconds to start?
951             if not self._traffic_process.is_alive():
952                 break
953
954     def collect_kpi(self):
955         # check if the tg processes have exited
956         physical_node = Context.get_physical_node_from_server(
957             self.scenario_helper.nodes[self.name])
958
959         result = {"physical_node": physical_node}
960         for proc in (self._tg_process, self._traffic_process):
961             check_if_process_failed(proc)
962
963         result["collect_stats"] = self.resource_helper.collect_kpi()
964         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
965         return result
966
967     def terminate(self):
968         """ After this method finishes, all traffic processes should stop. Mandatory.
969
970         :return: True/False
971         """
972         self.traffic_finished = True
973         # we must kill client before we kill the server, or the client will raise exception
974         if self._traffic_process is not None:
975             # be proper and try to join before terminating
976             LOG.debug("joining before terminate %s", self._traffic_process.name)
977             self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
978             self._traffic_process.terminate()
979         if self._tg_process is not None:
980             # be proper and try to join before terminating
981             LOG.debug("joining before terminate %s", self._tg_process.name)
982             self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
983             self._tg_process.terminate()
984         # no terminate children here because we share processes with vnf
985
986     def scale(self, flavor=""):
987         """A traffic generator VFN doesn't provide the 'scale' feature"""
988         raise y_exceptions.FunctionNotImplemented(
989             function_name='scale', class_name='SampleVNFTrafficGen')