1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from __future__ import absolute_import
24 from collections import Mapping
25 from multiprocessing import Queue, Value, Process
27 from six.moves import cStringIO
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
46 from yardstick.ssh import AutoConnectSSH
48 DPDK_VERSION = "dpdk-16.07"
50 LOG = logging.getLogger(__name__)
54 DEFAULT_VNF_TIMEOUT = 3600
55 PROCESS_JOIN_TIMEOUT = 3
58 class VnfSshHelper(AutoConnectSSH):
60 def __init__(self, node, bin_path, wait=None):
62 kwargs = self.args_from_node(self.node)
64 kwargs.setdefault('wait', wait)
66 super(VnfSshHelper, self).__init__(**kwargs)
67 self.bin_path = bin_path
71 # must return static class name, anything else refers to the calling class
72 # i.e. the subclass, not the superclass
76 # this copy constructor is different from SSH classes, since it uses node
77 return self.get_class()(self.node, self.bin_path)
79 def upload_config_file(self, prefix, content):
80 cfg_file = os.path.join(REMOTE_TMP, prefix)
82 file_obj = cStringIO(content)
83 self.put_file_obj(file_obj, cfg_file)
86 def join_bin_path(self, *args):
87 return os.path.join(self.bin_path, *args)
89 def provision_tool(self, tool_path=None, tool_file=None):
91 tool_path = self.bin_path
92 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
95 class SetupEnvHelper(object):
97 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
98 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
99 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
100 PIPELINE_COMMAND = ''
103 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
104 super(SetupEnvHelper, self).__init__()
105 self.vnfd_helper = vnfd_helper
106 self.ssh_helper = ssh_helper
107 self.scenario_helper = scenario_helper
109 def build_config(self):
110 raise NotImplementedError
112 def setup_vnf_environment(self):
119 raise NotImplementedError
122 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
125 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
128 def _update_packet_type(ip_pipeline_cfg, traffic_options):
129 match_str = 'pkt_type = ipv4'
130 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
131 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
132 return pipeline_config_str
135 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
136 traffic_type = traffic_options['traffic_type']
138 if traffic_options['vnf_type'] is not cls.APP_NAME:
139 match_str = 'traffic_type = 4'
140 replace_str = 'traffic_type = {0}'.format(traffic_type)
142 elif traffic_type == 4:
143 match_str = 'pkt_type = ipv4'
144 replace_str = 'pkt_type = ipv4'
147 match_str = 'pkt_type = ipv4'
148 replace_str = 'pkt_type = ipv6'
150 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
151 return pipeline_config_str
153 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
154 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
155 self.all_ports = None
156 self.bound_pci = None
158 self.used_drivers = None
159 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
161 def _setup_hugepages(self):
162 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
163 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
166 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
167 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
169 if hugepages == "2048kB":
174 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
176 def build_config(self):
177 vnf_cfg = self.scenario_helper.vnf_cfg
178 task_path = self.scenario_helper.task_path
180 lb_count = vnf_cfg.get('lb_count', 3)
181 lb_config = vnf_cfg.get('lb_config', 'SW')
182 worker_config = vnf_cfg.get('worker_config', '1C/1T')
183 worker_threads = vnf_cfg.get('worker_threads', 3)
185 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
187 'traffic_type': traffic_type,
188 'pkt_type': 'ipv%s' % traffic_type,
189 'vnf_type': self.VNF_TYPE,
192 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
193 config_basename = posixpath.basename(self.CFG_CONFIG)
194 script_basename = posixpath.basename(self.CFG_SCRIPT)
195 multiport = MultiPortConfig(self.scenario_helper.topology,
206 multiport.generate_config()
207 with open(self.CFG_CONFIG) as handle:
208 new_config = handle.read()
210 new_config = self._update_traffic_type(new_config, traffic_options)
211 new_config = self._update_packet_type(new_config, traffic_options)
213 self.ssh_helper.upload_config_file(config_basename, new_config)
214 self.ssh_helper.upload_config_file(script_basename,
215 multiport.generate_script(self.vnfd_helper))
217 LOG.info("Provision and start the %s", self.APP_NAME)
218 self._build_pipeline_kwargs()
219 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
221 def _build_pipeline_kwargs(self):
222 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
223 # count the number of actual ports in the list of pairs
224 # remove duplicate ports
225 # this is really a mapping from LINK ID to DPDK PMD ID
226 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
227 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
228 ports = self.vnfd_helper.port_pairs.all_ports
229 port_nums = self.vnfd_helper.port_nums(ports)
230 # create mask from all the dpdk port numbers
231 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
232 self.pipeline_kwargs = {
233 'cfg_file': self.CFG_CONFIG,
234 'script': self.CFG_SCRIPT,
235 'port_mask_hex': ports_mask_hex,
236 'tool_path': tool_path,
239 def setup_vnf_environment(self):
241 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
243 # bind before _setup_resources so we can use dpdk_port_num
244 self._detect_and_bind_drivers()
245 resource = self._setup_resources()
249 # pkill is not matching, debug with pgrep
250 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
251 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
252 # have to use exact match
253 # try using killall to match
254 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
256 def _setup_dpdk(self):
257 """ setup dpdk environment needed for vnf to run """
259 self._setup_hugepages()
260 self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
262 exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
266 dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
267 dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
268 exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
270 self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
272 def get_collectd_options(self):
273 options = self.scenario_helper.all_options.get("collectd", {})
274 # override with specific node settings
275 options.update(self.scenario_helper.options.get("collectd", {}))
278 def _setup_resources(self):
279 # what is this magic? how do we know which socket is for which port?
280 # what about quad-socket?
281 if any(v[5] == "0" for v in self.bound_pci):
286 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
287 # this won't work because we don't have DPDK port numbers yet
288 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
289 port_names = (intf["name"] for intf in ports)
290 collectd_options = self.get_collectd_options()
291 plugins = collectd_options.get("plugins", {})
292 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
293 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
294 plugins=plugins, interval=collectd_options.get("interval"),
295 timeout=self.scenario_helper.timeout)
297 def _detect_and_bind_drivers(self):
298 interfaces = self.vnfd_helper.interfaces
300 self.dpdk_bind_helper.read_status()
301 self.dpdk_bind_helper.save_used_drivers()
303 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
305 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
306 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
308 intf = next(v for v in interfaces
309 if vpci == v['virtual-interface']['vpci'])
311 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
312 except: # pylint: disable=bare-except
316 def get_local_iface_name_by_vpci(self, vpci):
317 find_net_cmd = self.FIND_NET_CMD.format(vpci)
318 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
324 self.dpdk_bind_helper.rebind_drivers()
327 class ResourceHelper(object):
330 MAKE_INSTALL = 'cd {0} && make && sudo make install'
331 RESOURCE_WORD = 'sample'
335 def __init__(self, setup_helper):
336 super(ResourceHelper, self).__init__()
338 self.setup_helper = setup_helper
339 self.ssh_helper = setup_helper.ssh_helper
342 self.resource = self.setup_helper.setup_vnf_environment()
344 def generate_cfg(self):
347 def _collect_resource_kpi(self):
349 status = self.resource.check_if_sa_running("collectd")[0]
351 result = self.resource.amqp_collect_nfvi_kpi()
353 result = {"core": result}
356 def start_collect(self):
357 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
358 self.resource.start()
359 self.resource.amqp_process_for_nfvi_kpi()
361 def stop_collect(self):
365 def collect_kpi(self):
366 return self._collect_resource_kpi()
369 class ClientResourceHelper(ResourceHelper):
376 def __init__(self, setup_helper):
377 super(ClientResourceHelper, self).__init__(setup_helper)
378 self.vnfd_helper = setup_helper.vnfd_helper
379 self.scenario_helper = setup_helper.scenario_helper
382 self.client_started = Value('i', 0)
383 self.all_ports = None
384 self._queue = Queue()
386 self._terminated = Value('i', 0)
388 def _build_ports(self):
389 self.networks = self.vnfd_helper.port_pairs.networks
390 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
391 self.downlink_ports = \
392 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
393 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
395 def port_num(self, intf):
396 # by default return port num
397 return self.vnfd_helper.port_num(intf)
399 def get_stats(self, *args, **kwargs):
401 return self.client.get_stats(*args, **kwargs)
403 LOG.exception("TRex client not connected")
406 def generate_samples(self, ports, key=None, default=None):
407 # needs to be used ports
408 last_result = self.get_stats(ports)
409 key_value = last_result.get(key, default)
411 if not isinstance(last_result, Mapping): # added for mock unit test
412 self._terminated.value = 1
416 # recalculate port for interface and see if it matches ports provided
417 for intf in self.vnfd_helper.interfaces:
419 port = self.vnfd_helper.port_num(name)
421 xe_value = last_result.get(port, {})
423 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
424 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
425 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
426 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
427 "in_packets": int(xe_value.get("ipackets", 0)),
428 "out_packets": int(xe_value.get("opackets", 0)),
431 samples[name][key] = key_value
434 def _run_traffic_once(self, traffic_profile):
435 traffic_profile.execute_traffic(self)
436 self.client_started.value = 1
437 time.sleep(self.RUN_DURATION)
438 samples = self.generate_samples(traffic_profile.ports)
439 time.sleep(self.QUEUE_WAIT_TIME)
440 self._queue.put(samples)
442 def run_traffic(self, traffic_profile):
443 # if we don't do this we can hang waiting for the queue to drain
444 # have to do this in the subprocess
445 self._queue.cancel_join_thread()
446 # fixme: fix passing correct trex config file,
447 # instead of searching the default path
450 self.client = self._connect()
451 self.client.reset(ports=self.all_ports)
452 self.client.remove_all_streams(self.all_ports) # remove all streams
453 traffic_profile.register_generator(self)
455 while self._terminated.value == 0:
456 self._run_traffic_once(traffic_profile)
458 self.client.stop(self.all_ports)
459 self.client.disconnect()
460 self._terminated.value = 0
462 if self._terminated.value:
463 LOG.debug("traffic generator is stopped")
464 return # return if trex/tg server is stopped.
468 self._terminated.value = 1 # stop client
470 def clear_stats(self, ports=None):
472 ports = self.all_ports
473 self.client.clear_stats(ports=ports)
475 def start(self, ports=None, *args, **kwargs):
476 # pylint: disable=keyword-arg-before-vararg
477 # NOTE(ralonsoh): defining keyworded arguments before variable
478 # positional arguments is a bug. This function definition doesn't work
479 # in Python 2, although it works in Python 3. Reference:
480 # https://www.python.org/dev/peps/pep-3102/
482 ports = self.all_ports
483 self.client.start(ports=ports, *args, **kwargs)
485 def collect_kpi(self):
486 if not self._queue.empty():
487 kpi = self._queue.get()
488 self._result.update(kpi)
489 LOG.debug('Got KPIs from _queue for %s %s',
490 self.scenario_helper.name, self.RESOURCE_WORD)
493 def _connect(self, client=None):
495 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
496 server=self.vnfd_helper.mgmt_interface["ip"],
497 verbose_level=LoggerApi.VERBOSE_QUIET)
499 # try to connect with 5s intervals, 30s max
505 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
510 class Rfc2544ResourceHelper(object):
512 DEFAULT_CORRELATED_TRAFFIC = False
513 DEFAULT_LATENCY = False
514 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
516 def __init__(self, scenario_helper):
517 super(Rfc2544ResourceHelper, self).__init__()
518 self.scenario_helper = scenario_helper
519 self._correlated_traffic = None
520 self.iteration = Value('i', 0)
523 self._tolerance_low = None
524 self._tolerance_high = None
528 if self._rfc2544 is None:
529 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
533 def tolerance_low(self):
534 if self._tolerance_low is None:
535 self.get_rfc_tolerance()
536 return self._tolerance_low
539 def tolerance_high(self):
540 if self._tolerance_high is None:
541 self.get_rfc_tolerance()
542 return self._tolerance_high
545 def correlated_traffic(self):
546 if self._correlated_traffic is None:
547 self._correlated_traffic = \
548 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
550 return self._correlated_traffic
554 if self._latency is None:
555 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
558 def get_rfc2544(self, name, default=None):
559 return self.rfc2544.get(name, default)
561 def get_rfc_tolerance(self):
562 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
563 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
564 self._tolerance_low = next(tolerance_iter)
565 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
568 class SampleVNFDeployHelper(object):
570 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
571 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
572 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
574 def __init__(self, vnfd_helper, ssh_helper):
575 super(SampleVNFDeployHelper, self).__init__()
576 self.ssh_helper = ssh_helper
577 self.vnfd_helper = vnfd_helper
579 def deploy_vnfs(self, app_name):
580 vnf_bin = self.ssh_helper.join_bin_path(app_name)
581 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
585 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
586 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
588 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
589 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
591 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
593 http_proxy = os.environ.get('http_proxy', '')
594 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
596 self.ssh_helper.execute(cmd)
597 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
598 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
599 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
602 class ScenarioHelper(object):
607 'worker_config': '1C/1T',
611 def __init__(self, name):
613 self.scenario_cfg = None
617 return self.scenario_cfg['task_path']
621 return self.scenario_cfg.get('nodes')
624 def all_options(self):
625 return self.scenario_cfg.get('options', {})
629 return self.all_options.get(self.name, {})
633 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
637 return self.scenario_cfg['topology']
641 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
644 class SampleVNF(GenericVNF):
645 """ Class providing file-like API for generic VNF implementation """
647 VNF_PROMPT = "pipeline>"
649 WAIT_TIME_FOR_SCRIPT = 10
650 APP_NAME = "SampleVNF"
651 # we run the VNF interactively, so the ssh command will timeout after this long
653 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
654 super(SampleVNF, self).__init__(name, vnfd)
655 self.bin_path = get_nsb_option('bin_path', '')
657 self.scenario_helper = ScenarioHelper(self.name)
658 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
660 if setup_env_helper_type is None:
661 setup_env_helper_type = SetupEnvHelper
663 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
665 self.scenario_helper)
667 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
669 if resource_helper_type is None:
670 resource_helper_type = ResourceHelper
672 self.resource_helper = resource_helper_type(self.setup_helper)
674 self.context_cfg = None
675 self.nfvi_context = None
676 self.pipeline_kwargs = {}
677 self.uplink_ports = None
678 self.downlink_ports = None
679 # NOTE(esm): make QueueFileWrapper invert-able so that we
680 # never have to manage the queues
683 self.queue_wrapper = None
685 self.used_drivers = {}
686 self.vnf_port_pairs = None
687 self._vnf_process = None
689 def _build_ports(self):
690 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
691 self.networks = self._port_pairs.networks
692 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
693 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
694 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
696 def _get_route_data(self, route_index, route_type):
697 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
698 for _ in range(route_index):
700 return next(route_iter, {}).get(route_type, '')
702 def _get_port0localip6(self):
703 return_value = self._get_route_data(0, 'network')
704 LOG.info("_get_port0localip6 : %s", return_value)
707 def _get_port1localip6(self):
708 return_value = self._get_route_data(1, 'network')
709 LOG.info("_get_port1localip6 : %s", return_value)
712 def _get_port0prefixlen6(self):
713 return_value = self._get_route_data(0, 'netmask')
714 LOG.info("_get_port0prefixlen6 : %s", return_value)
717 def _get_port1prefixlen6(self):
718 return_value = self._get_route_data(1, 'netmask')
719 LOG.info("_get_port1prefixlen6 : %s", return_value)
722 def _get_port0gateway6(self):
723 return_value = self._get_route_data(0, 'network')
724 LOG.info("_get_port0gateway6 : %s", return_value)
727 def _get_port1gateway6(self):
728 return_value = self._get_route_data(1, 'network')
729 LOG.info("_get_port1gateway6 : %s", return_value)
732 def _start_vnf(self):
733 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
734 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
735 self._vnf_process = Process(name=name, target=self._run)
736 self._vnf_process.start()
738 def _vnf_up_post(self):
741 def instantiate(self, scenario_cfg, context_cfg):
742 self.scenario_helper.scenario_cfg = scenario_cfg
743 self.context_cfg = context_cfg
744 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
745 # self.nfvi_context = None
747 # vnf deploy is unsupported, use ansible playbooks
748 if self.scenario_helper.options.get("vnf_deploy", False):
749 self.deploy_helper.deploy_vnfs(self.APP_NAME)
750 self.resource_helper.setup()
753 def wait_for_instantiate(self):
755 time.sleep(self.WAIT_TIME) # Give some time for config to load
757 if not self._vnf_process.is_alive():
758 raise RuntimeError("%s VNF process died." % self.APP_NAME)
760 # NOTE(esm): move to QueueFileWrapper
761 while self.q_out.qsize() > 0:
762 buf.append(self.q_out.get())
763 message = ''.join(buf)
764 if self.VNF_PROMPT in message:
765 LOG.info("%s VNF is up and running.", self.APP_NAME)
767 self.queue_wrapper.clear()
768 self.resource_helper.start_collect()
769 return self._vnf_process.exitcode
771 if "PANIC" in message:
772 raise RuntimeError("Error starting %s VNF." %
775 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
776 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
777 # Send ENTER to display a new prompt in case the prompt text was corrupted
778 # by other VNF output
779 self.q_in.put('\r\n')
781 def _build_run_kwargs(self):
783 'stdin': self.queue_wrapper,
784 'stdout': self.queue_wrapper,
785 'keep_stdin_open': True,
787 'timeout': self.scenario_helper.timeout,
790 def _build_config(self):
791 return self.setup_helper.build_config()
794 # we can't share ssh paramiko objects to force new connection
795 self.ssh_helper.drop_connection()
796 cmd = self._build_config()
797 # kill before starting
798 self.setup_helper.kill_vnf()
801 self._build_run_kwargs()
802 self.ssh_helper.run(cmd, **self.run_kwargs)
804 def vnf_execute(self, cmd, wait_time=2):
805 """ send cmd to vnf process """
807 LOG.info("%s command: %s", self.APP_NAME, cmd)
808 self.q_in.put("{}\r\n".format(cmd))
809 time.sleep(wait_time)
811 while self.q_out.qsize() > 0:
812 output.append(self.q_out.get())
813 return "".join(output)
815 def _tear_down(self):
819 self.vnf_execute("quit")
820 self.setup_helper.kill_vnf()
822 self.resource_helper.stop_collect()
823 if self._vnf_process is not None:
824 # be proper and join first before we kill
825 LOG.debug("joining before terminate %s", self._vnf_process.name)
826 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
827 self._vnf_process.terminate()
828 # no terminate children here because we share processes with tg
830 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
831 """Method for checking the statistics
833 This method could be overridden in children classes.
835 :return: VNF statistics
837 cmd = 'p {0} stats'.format(self.APP_WORD)
838 out = self.vnf_execute(cmd)
841 def collect_kpi(self):
842 # we can't get KPIs if the VNF is down
843 check_if_process_failed(self._vnf_process)
844 stats = self.get_stats()
845 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
847 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
848 result["collect_stats"] = self.resource_helper.collect_kpi()
853 "packets_dropped": 0,
855 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
858 def scale(self, flavor=""):
859 """The SampleVNF base class doesn't provide the 'scale' feature"""
860 raise y_exceptions.FunctionNotImplemented(
861 function_name='scale', class_name='SampleVNFTrafficGen')
864 class SampleVNFTrafficGen(GenericTrafficGen):
865 """ Class providing file-like API for generic traffic generator """
870 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
871 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
872 self.bin_path = get_nsb_option('bin_path', '')
874 self.scenario_helper = ScenarioHelper(self.name)
875 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
877 if setup_env_helper_type is None:
878 setup_env_helper_type = SetupEnvHelper
880 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
882 self.scenario_helper)
884 if resource_helper_type is None:
885 resource_helper_type = ClientResourceHelper
887 self.resource_helper = resource_helper_type(self.setup_helper)
889 self.runs_traffic = True
890 self.traffic_finished = False
891 self._tg_process = None
892 self._traffic_process = None
894 def _start_server(self):
895 # we can't share ssh paramiko objects to force new connection
896 self.ssh_helper.drop_connection()
898 def instantiate(self, scenario_cfg, context_cfg):
899 self.scenario_helper.scenario_cfg = scenario_cfg
900 self.resource_helper.setup()
901 # must generate_cfg after DPDK bind because we need port number
902 self.resource_helper.generate_cfg()
904 LOG.info("Starting %s server...", self.APP_NAME)
905 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
906 self._tg_process = Process(name=name, target=self._start_server)
907 self._tg_process.start()
909 def _check_status(self):
910 raise NotImplementedError
912 def _wait_for_process(self):
914 if not self._tg_process.is_alive():
915 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
916 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
918 status = self._check_status()
920 LOG.info("%s TG Server is up and running.", self.APP_NAME)
921 return self._tg_process.exitcode
923 def _traffic_runner(self, traffic_profile):
924 # always drop connections first thing in new processes
925 # so we don't get paramiko errors
926 self.ssh_helper.drop_connection()
927 LOG.info("Starting %s client...", self.APP_NAME)
928 self.resource_helper.run_traffic(traffic_profile)
930 def run_traffic(self, traffic_profile):
931 """ Generate traffic on the wire according to the given params.
932 Method is non-blocking, returns immediately when traffic process
933 is running. Mandatory.
935 :param traffic_profile:
938 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
940 self._traffic_process = Process(name=name, target=self._traffic_runner,
941 args=(traffic_profile,))
942 self._traffic_process.start()
943 # Wait for traffic process to start
944 while self.resource_helper.client_started.value == 0:
945 time.sleep(self.RUN_WAIT)
946 # what if traffic process takes a few seconds to start?
947 if not self._traffic_process.is_alive():
950 return self._traffic_process.is_alive()
952 def collect_kpi(self):
953 # check if the tg processes have exited
954 for proc in (self._tg_process, self._traffic_process):
955 check_if_process_failed(proc)
956 result = self.resource_helper.collect_kpi()
957 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
961 """ After this method finishes, all traffic processes should stop. Mandatory.
965 self.traffic_finished = True
966 # we must kill client before we kill the server, or the client will raise exception
967 if self._traffic_process is not None:
968 # be proper and try to join before terminating
969 LOG.debug("joining before terminate %s", self._traffic_process.name)
970 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
971 self._traffic_process.terminate()
972 if self._tg_process is not None:
973 # be proper and try to join before terminating
974 LOG.debug("joining before terminate %s", self._tg_process.name)
975 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
976 self._tg_process.terminate()
977 # no terminate children here because we share processes with vnf
979 def scale(self, flavor=""):
980 """A traffic generator VFN doesn't provide the 'scale' feature"""
981 raise y_exceptions.FunctionNotImplemented(
982 function_name='scale', class_name='SampleVNFTrafficGen')