1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
26 from six.moves import cStringIO
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
33 from yardstick.common import exceptions as y_exceptions
34 from yardstick.common import utils
35 from yardstick.common.process import check_if_process_failed
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
38 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.ssh import AutoConnectSSH
47 DPDK_VERSION = "dpdk-16.07"
49 LOG = logging.getLogger(__name__)
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
57 class VnfSshHelper(AutoConnectSSH):
59 def __init__(self, node, bin_path, wait=None):
61 kwargs = self.args_from_node(self.node)
63 kwargs.setdefault('wait', wait)
65 super(VnfSshHelper, self).__init__(**kwargs)
66 self.bin_path = bin_path
70 # must return static class name, anything else refers to the calling class
71 # i.e. the subclass, not the superclass
75 # this copy constructor is different from SSH classes, since it uses node
76 return self.get_class()(self.node, self.bin_path)
78 def upload_config_file(self, prefix, content):
79 cfg_file = os.path.join(REMOTE_TMP, prefix)
81 file_obj = cStringIO(content)
82 self.put_file_obj(file_obj, cfg_file)
85 def join_bin_path(self, *args):
86 return os.path.join(self.bin_path, *args)
88 def provision_tool(self, tool_path=None, tool_file=None):
90 tool_path = self.bin_path
91 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94 class SetupEnvHelper(object):
96 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103 super(SetupEnvHelper, self).__init__()
104 self.vnfd_helper = vnfd_helper
105 self.ssh_helper = ssh_helper
106 self.scenario_helper = scenario_helper
108 def build_config(self):
109 raise NotImplementedError
111 def setup_vnf_environment(self):
118 raise NotImplementedError
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
124 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
126 HUGEPAGES_KB = 1024 * 1024 * 16
129 def _update_packet_type(ip_pipeline_cfg, traffic_options):
130 match_str = 'pkt_type = ipv4'
131 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
132 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
133 return pipeline_config_str
136 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
137 traffic_type = traffic_options['traffic_type']
139 if traffic_options['vnf_type'] is not cls.APP_NAME:
140 match_str = 'traffic_type = 4'
141 replace_str = 'traffic_type = {0}'.format(traffic_type)
143 elif traffic_type == 4:
144 match_str = 'pkt_type = ipv4'
145 replace_str = 'pkt_type = ipv4'
148 match_str = 'pkt_type = ipv4'
149 replace_str = 'pkt_type = ipv6'
151 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
152 return pipeline_config_str
154 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
155 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
156 self.all_ports = None
157 self.bound_pci = None
159 self.used_drivers = None
160 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
162 def _setup_hugepages(self):
163 meminfo = utils.read_meminfo(self.ssh_helper)
164 hp_size_kb = int(meminfo['Hugepagesize'])
165 nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
166 self.ssh_helper.execute('echo %s | sudo tee %s' %
167 (nr_hugepages, self.NR_HUGEPAGES_PATH))
169 self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
170 nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
171 LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
172 hp_size_kb, nr_hugepages, nr_hugepages_set)
174 def build_config(self):
175 vnf_cfg = self.scenario_helper.vnf_cfg
176 task_path = self.scenario_helper.task_path
178 lb_count = vnf_cfg.get('lb_count', 3)
179 lb_config = vnf_cfg.get('lb_config', 'SW')
180 worker_config = vnf_cfg.get('worker_config', '1C/1T')
181 worker_threads = vnf_cfg.get('worker_threads', 3)
183 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
185 'traffic_type': traffic_type,
186 'pkt_type': 'ipv%s' % traffic_type,
187 'vnf_type': self.VNF_TYPE,
190 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
191 config_basename = posixpath.basename(self.CFG_CONFIG)
192 script_basename = posixpath.basename(self.CFG_SCRIPT)
193 multiport = MultiPortConfig(self.scenario_helper.topology,
204 multiport.generate_config()
205 with open(self.CFG_CONFIG) as handle:
206 new_config = handle.read()
208 new_config = self._update_traffic_type(new_config, traffic_options)
209 new_config = self._update_packet_type(new_config, traffic_options)
211 self.ssh_helper.upload_config_file(config_basename, new_config)
212 self.ssh_helper.upload_config_file(script_basename,
213 multiport.generate_script(self.vnfd_helper))
215 LOG.info("Provision and start the %s", self.APP_NAME)
216 self._build_pipeline_kwargs()
217 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
219 def _build_pipeline_kwargs(self):
220 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
221 # count the number of actual ports in the list of pairs
222 # remove duplicate ports
223 # this is really a mapping from LINK ID to DPDK PMD ID
224 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
225 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
226 ports = self.vnfd_helper.port_pairs.all_ports
227 port_nums = self.vnfd_helper.port_nums(ports)
228 # create mask from all the dpdk port numbers
229 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
230 self.pipeline_kwargs = {
231 'cfg_file': self.CFG_CONFIG,
232 'script': self.CFG_SCRIPT,
233 'port_mask_hex': ports_mask_hex,
234 'tool_path': tool_path,
237 def setup_vnf_environment(self):
239 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
241 # bind before _setup_resources so we can use dpdk_port_num
242 self._detect_and_bind_drivers()
243 resource = self._setup_resources()
247 # pkill is not matching, debug with pgrep
248 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
249 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
250 # have to use exact match
251 # try using killall to match
252 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
254 def _setup_dpdk(self):
255 """Setup DPDK environment needed for VNF to run"""
256 self._setup_hugepages()
257 self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
258 exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
260 raise y_exceptions.DPDKSetupDriverError()
262 def get_collectd_options(self):
263 options = self.scenario_helper.all_options.get("collectd", {})
264 # override with specific node settings
265 options.update(self.scenario_helper.options.get("collectd", {}))
268 def _setup_resources(self):
269 # what is this magic? how do we know which socket is for which port?
270 # what about quad-socket?
271 if any(v[5] == "0" for v in self.bound_pci):
276 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
277 # this won't work because we don't have DPDK port numbers yet
278 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
279 port_names = (intf["name"] for intf in ports)
280 collectd_options = self.get_collectd_options()
281 plugins = collectd_options.get("plugins", {})
282 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
283 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
284 plugins=plugins, interval=collectd_options.get("interval"),
285 timeout=self.scenario_helper.timeout)
287 def _detect_and_bind_drivers(self):
288 interfaces = self.vnfd_helper.interfaces
290 self.dpdk_bind_helper.read_status()
291 self.dpdk_bind_helper.save_used_drivers()
293 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
295 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
296 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
298 intf = next(v for v in interfaces
299 if vpci == v['virtual-interface']['vpci'])
301 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
302 except: # pylint: disable=bare-except
306 def get_local_iface_name_by_vpci(self, vpci):
307 find_net_cmd = self.FIND_NET_CMD.format(vpci)
308 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
314 self.dpdk_bind_helper.rebind_drivers()
317 class ResourceHelper(object):
320 MAKE_INSTALL = 'cd {0} && make && sudo make install'
321 RESOURCE_WORD = 'sample'
325 def __init__(self, setup_helper):
326 super(ResourceHelper, self).__init__()
328 self.setup_helper = setup_helper
329 self.ssh_helper = setup_helper.ssh_helper
332 self.resource = self.setup_helper.setup_vnf_environment()
334 def generate_cfg(self):
337 def _collect_resource_kpi(self):
339 status = self.resource.check_if_system_agent_running("collectd")[0]
341 result = self.resource.amqp_collect_nfvi_kpi()
343 result = {"core": result}
346 def start_collect(self):
347 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
348 self.resource.start()
349 self.resource.amqp_process_for_nfvi_kpi()
351 def stop_collect(self):
355 def collect_kpi(self):
356 return self._collect_resource_kpi()
359 class ClientResourceHelper(ResourceHelper):
366 def __init__(self, setup_helper):
367 super(ClientResourceHelper, self).__init__(setup_helper)
368 self.vnfd_helper = setup_helper.vnfd_helper
369 self.scenario_helper = setup_helper.scenario_helper
372 self.client_started = Value('i', 0)
373 self.all_ports = None
374 self._queue = Queue()
376 self._terminated = Value('i', 0)
378 def _build_ports(self):
379 self.networks = self.vnfd_helper.port_pairs.networks
380 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
381 self.downlink_ports = \
382 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
383 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
385 def port_num(self, intf):
386 # by default return port num
387 return self.vnfd_helper.port_num(intf)
389 def get_stats(self, *args, **kwargs):
391 return self.client.get_stats(*args, **kwargs)
393 LOG.exception("TRex client not connected")
396 def generate_samples(self, ports, key=None, default=None):
397 # needs to be used ports
398 last_result = self.get_stats(ports)
399 key_value = last_result.get(key, default)
401 if not isinstance(last_result, Mapping): # added for mock unit test
402 self._terminated.value = 1
406 # recalculate port for interface and see if it matches ports provided
407 for intf in self.vnfd_helper.interfaces:
409 port = self.vnfd_helper.port_num(name)
411 xe_value = last_result.get(port, {})
413 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
414 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
415 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
416 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
417 "in_packets": int(xe_value.get("ipackets", 0)),
418 "out_packets": int(xe_value.get("opackets", 0)),
421 samples[name][key] = key_value
424 def _run_traffic_once(self, traffic_profile):
425 traffic_profile.execute_traffic(self)
426 self.client_started.value = 1
427 time.sleep(self.RUN_DURATION)
428 samples = self.generate_samples(traffic_profile.ports)
429 time.sleep(self.QUEUE_WAIT_TIME)
430 self._queue.put(samples)
432 def run_traffic(self, traffic_profile):
433 # if we don't do this we can hang waiting for the queue to drain
434 # have to do this in the subprocess
435 self._queue.cancel_join_thread()
436 # fixme: fix passing correct trex config file,
437 # instead of searching the default path
440 self.client = self._connect()
441 self.client.reset(ports=self.all_ports)
442 self.client.remove_all_streams(self.all_ports) # remove all streams
443 traffic_profile.register_generator(self)
445 while self._terminated.value == 0:
446 self._run_traffic_once(traffic_profile)
448 self.client.stop(self.all_ports)
449 self.client.disconnect()
450 self._terminated.value = 0
452 if self._terminated.value:
453 LOG.debug("traffic generator is stopped")
454 return # return if trex/tg server is stopped.
458 self._terminated.value = 1 # stop client
460 def clear_stats(self, ports=None):
462 ports = self.all_ports
463 self.client.clear_stats(ports=ports)
465 def start(self, ports=None, *args, **kwargs):
466 # pylint: disable=keyword-arg-before-vararg
467 # NOTE(ralonsoh): defining keyworded arguments before variable
468 # positional arguments is a bug. This function definition doesn't work
469 # in Python 2, although it works in Python 3. Reference:
470 # https://www.python.org/dev/peps/pep-3102/
472 ports = self.all_ports
473 self.client.start(ports=ports, *args, **kwargs)
475 def collect_kpi(self):
476 if not self._queue.empty():
477 kpi = self._queue.get()
478 self._result.update(kpi)
479 LOG.debug('Got KPIs from _queue for %s %s',
480 self.scenario_helper.name, self.RESOURCE_WORD)
483 def _connect(self, client=None):
485 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
486 server=self.vnfd_helper.mgmt_interface["ip"],
487 verbose_level=LoggerApi.VERBOSE_QUIET)
489 # try to connect with 5s intervals, 30s max
495 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
500 class Rfc2544ResourceHelper(object):
502 DEFAULT_CORRELATED_TRAFFIC = False
503 DEFAULT_LATENCY = False
504 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
506 def __init__(self, scenario_helper):
507 super(Rfc2544ResourceHelper, self).__init__()
508 self.scenario_helper = scenario_helper
509 self._correlated_traffic = None
510 self.iteration = Value('i', 0)
513 self._tolerance_low = None
514 self._tolerance_high = None
518 if self._rfc2544 is None:
519 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
523 def tolerance_low(self):
524 if self._tolerance_low is None:
525 self.get_rfc_tolerance()
526 return self._tolerance_low
529 def tolerance_high(self):
530 if self._tolerance_high is None:
531 self.get_rfc_tolerance()
532 return self._tolerance_high
535 def correlated_traffic(self):
536 if self._correlated_traffic is None:
537 self._correlated_traffic = \
538 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
540 return self._correlated_traffic
544 if self._latency is None:
545 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
548 def get_rfc2544(self, name, default=None):
549 return self.rfc2544.get(name, default)
551 def get_rfc_tolerance(self):
552 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
553 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
554 self._tolerance_low = next(tolerance_iter)
555 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
558 class SampleVNFDeployHelper(object):
560 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
561 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
562 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
564 def __init__(self, vnfd_helper, ssh_helper):
565 super(SampleVNFDeployHelper, self).__init__()
566 self.ssh_helper = ssh_helper
567 self.vnfd_helper = vnfd_helper
569 def deploy_vnfs(self, app_name):
570 vnf_bin = self.ssh_helper.join_bin_path(app_name)
571 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
575 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
576 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
578 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
579 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
581 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
583 http_proxy = os.environ.get('http_proxy', '')
584 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
586 self.ssh_helper.execute(cmd)
587 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
588 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
589 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
592 class ScenarioHelper(object):
597 'worker_config': '1C/1T',
601 def __init__(self, name):
603 self.scenario_cfg = None
607 return self.scenario_cfg['task_path']
611 return self.scenario_cfg.get('nodes')
614 def all_options(self):
615 return self.scenario_cfg.get('options', {})
619 return self.all_options.get(self.name, {})
623 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
627 return self.scenario_cfg['topology']
631 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
634 class SampleVNF(GenericVNF):
635 """ Class providing file-like API for generic VNF implementation """
637 VNF_PROMPT = "pipeline>"
639 WAIT_TIME_FOR_SCRIPT = 10
640 APP_NAME = "SampleVNF"
641 # we run the VNF interactively, so the ssh command will timeout after this long
643 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
644 super(SampleVNF, self).__init__(name, vnfd)
645 self.bin_path = get_nsb_option('bin_path', '')
647 self.scenario_helper = ScenarioHelper(self.name)
648 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
650 if setup_env_helper_type is None:
651 setup_env_helper_type = SetupEnvHelper
653 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
655 self.scenario_helper)
657 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
659 if resource_helper_type is None:
660 resource_helper_type = ResourceHelper
662 self.resource_helper = resource_helper_type(self.setup_helper)
664 self.context_cfg = None
665 self.nfvi_context = None
666 self.pipeline_kwargs = {}
667 self.uplink_ports = None
668 self.downlink_ports = None
669 # NOTE(esm): make QueueFileWrapper invert-able so that we
670 # never have to manage the queues
673 self.queue_wrapper = None
675 self.used_drivers = {}
676 self.vnf_port_pairs = None
677 self._vnf_process = None
679 def _build_ports(self):
680 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
681 self.networks = self._port_pairs.networks
682 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
683 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
684 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
686 def _get_route_data(self, route_index, route_type):
687 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
688 for _ in range(route_index):
690 return next(route_iter, {}).get(route_type, '')
692 def _get_port0localip6(self):
693 return_value = self._get_route_data(0, 'network')
694 LOG.info("_get_port0localip6 : %s", return_value)
697 def _get_port1localip6(self):
698 return_value = self._get_route_data(1, 'network')
699 LOG.info("_get_port1localip6 : %s", return_value)
702 def _get_port0prefixlen6(self):
703 return_value = self._get_route_data(0, 'netmask')
704 LOG.info("_get_port0prefixlen6 : %s", return_value)
707 def _get_port1prefixlen6(self):
708 return_value = self._get_route_data(1, 'netmask')
709 LOG.info("_get_port1prefixlen6 : %s", return_value)
712 def _get_port0gateway6(self):
713 return_value = self._get_route_data(0, 'network')
714 LOG.info("_get_port0gateway6 : %s", return_value)
717 def _get_port1gateway6(self):
718 return_value = self._get_route_data(1, 'network')
719 LOG.info("_get_port1gateway6 : %s", return_value)
722 def _start_vnf(self):
723 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
724 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
725 self._vnf_process = Process(name=name, target=self._run)
726 self._vnf_process.start()
728 def _vnf_up_post(self):
731 def instantiate(self, scenario_cfg, context_cfg):
732 self.scenario_helper.scenario_cfg = scenario_cfg
733 self.context_cfg = context_cfg
734 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
735 # self.nfvi_context = None
737 # vnf deploy is unsupported, use ansible playbooks
738 if self.scenario_helper.options.get("vnf_deploy", False):
739 self.deploy_helper.deploy_vnfs(self.APP_NAME)
740 self.resource_helper.setup()
743 def wait_for_instantiate(self):
745 time.sleep(self.WAIT_TIME) # Give some time for config to load
747 if not self._vnf_process.is_alive():
748 raise RuntimeError("%s VNF process died." % self.APP_NAME)
750 # NOTE(esm): move to QueueFileWrapper
751 while self.q_out.qsize() > 0:
752 buf.append(self.q_out.get())
753 message = ''.join(buf)
754 if self.VNF_PROMPT in message:
755 LOG.info("%s VNF is up and running.", self.APP_NAME)
757 self.queue_wrapper.clear()
758 self.resource_helper.start_collect()
759 return self._vnf_process.exitcode
761 if "PANIC" in message:
762 raise RuntimeError("Error starting %s VNF." %
765 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
766 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
767 # Send ENTER to display a new prompt in case the prompt text was corrupted
768 # by other VNF output
769 self.q_in.put('\r\n')
771 def _build_run_kwargs(self):
773 'stdin': self.queue_wrapper,
774 'stdout': self.queue_wrapper,
775 'keep_stdin_open': True,
777 'timeout': self.scenario_helper.timeout,
780 def _build_config(self):
781 return self.setup_helper.build_config()
784 # we can't share ssh paramiko objects to force new connection
785 self.ssh_helper.drop_connection()
786 cmd = self._build_config()
787 # kill before starting
788 self.setup_helper.kill_vnf()
791 self._build_run_kwargs()
792 self.ssh_helper.run(cmd, **self.run_kwargs)
794 def vnf_execute(self, cmd, wait_time=2):
795 """ send cmd to vnf process """
797 LOG.info("%s command: %s", self.APP_NAME, cmd)
798 self.q_in.put("{}\r\n".format(cmd))
799 time.sleep(wait_time)
801 while self.q_out.qsize() > 0:
802 output.append(self.q_out.get())
803 return "".join(output)
805 def _tear_down(self):
809 self.vnf_execute("quit")
810 self.setup_helper.kill_vnf()
812 self.resource_helper.stop_collect()
813 if self._vnf_process is not None:
814 # be proper and join first before we kill
815 LOG.debug("joining before terminate %s", self._vnf_process.name)
816 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
817 self._vnf_process.terminate()
818 # no terminate children here because we share processes with tg
820 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
821 """Method for checking the statistics
823 This method could be overridden in children classes.
825 :return: VNF statistics
827 cmd = 'p {0} stats'.format(self.APP_WORD)
828 out = self.vnf_execute(cmd)
831 def collect_kpi(self):
832 # we can't get KPIs if the VNF is down
833 check_if_process_failed(self._vnf_process)
834 stats = self.get_stats()
835 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
837 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
838 result["collect_stats"] = self.resource_helper.collect_kpi()
843 "packets_dropped": 0,
845 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
848 def scale(self, flavor=""):
849 """The SampleVNF base class doesn't provide the 'scale' feature"""
850 raise y_exceptions.FunctionNotImplemented(
851 function_name='scale', class_name='SampleVNFTrafficGen')
854 class SampleVNFTrafficGen(GenericTrafficGen):
855 """ Class providing file-like API for generic traffic generator """
860 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
861 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
862 self.bin_path = get_nsb_option('bin_path', '')
864 self.scenario_helper = ScenarioHelper(self.name)
865 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
867 if setup_env_helper_type is None:
868 setup_env_helper_type = SetupEnvHelper
870 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
872 self.scenario_helper)
874 if resource_helper_type is None:
875 resource_helper_type = ClientResourceHelper
877 self.resource_helper = resource_helper_type(self.setup_helper)
879 self.runs_traffic = True
880 self.traffic_finished = False
881 self._tg_process = None
882 self._traffic_process = None
884 def _start_server(self):
885 # we can't share ssh paramiko objects to force new connection
886 self.ssh_helper.drop_connection()
888 def instantiate(self, scenario_cfg, context_cfg):
889 self.scenario_helper.scenario_cfg = scenario_cfg
890 self.resource_helper.setup()
891 # must generate_cfg after DPDK bind because we need port number
892 self.resource_helper.generate_cfg()
894 LOG.info("Starting %s server...", self.APP_NAME)
895 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
896 self._tg_process = Process(name=name, target=self._start_server)
897 self._tg_process.start()
899 def _check_status(self):
900 raise NotImplementedError
902 def _wait_for_process(self):
904 if not self._tg_process.is_alive():
905 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
906 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
908 status = self._check_status()
910 LOG.info("%s TG Server is up and running.", self.APP_NAME)
911 return self._tg_process.exitcode
913 def _traffic_runner(self, traffic_profile):
914 # always drop connections first thing in new processes
915 # so we don't get paramiko errors
916 self.ssh_helper.drop_connection()
917 LOG.info("Starting %s client...", self.APP_NAME)
918 self.resource_helper.run_traffic(traffic_profile)
920 def run_traffic(self, traffic_profile):
921 """ Generate traffic on the wire according to the given params.
922 Method is non-blocking, returns immediately when traffic process
923 is running. Mandatory.
925 :param traffic_profile:
928 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
930 self._traffic_process = Process(name=name, target=self._traffic_runner,
931 args=(traffic_profile,))
932 self._traffic_process.start()
933 # Wait for traffic process to start
934 while self.resource_helper.client_started.value == 0:
935 time.sleep(self.RUN_WAIT)
936 # what if traffic process takes a few seconds to start?
937 if not self._traffic_process.is_alive():
940 return self._traffic_process.is_alive()
942 def collect_kpi(self):
943 # check if the tg processes have exited
944 for proc in (self._tg_process, self._traffic_process):
945 check_if_process_failed(proc)
946 result = self.resource_helper.collect_kpi()
947 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
951 """ After this method finishes, all traffic processes should stop. Mandatory.
955 self.traffic_finished = True
956 # we must kill client before we kill the server, or the client will raise exception
957 if self._traffic_process is not None:
958 # be proper and try to join before terminating
959 LOG.debug("joining before terminate %s", self._traffic_process.name)
960 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
961 self._traffic_process.terminate()
962 if self._tg_process is not None:
963 # be proper and try to join before terminating
964 LOG.debug("joining before terminate %s", self._tg_process.name)
965 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
966 self._tg_process.terminate()
967 # no terminate children here because we share processes with vnf
969 def scale(self, flavor=""):
970 """A traffic generator VFN doesn't provide the 'scale' feature"""
971 raise y_exceptions.FunctionNotImplemented(
972 function_name='scale', class_name='SampleVNFTrafficGen')