1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
22 from six.moves import cStringIO
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
44 DPDK_VERSION = "dpdk-16.07"
46 LOG = logging.getLogger(__name__)
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
54 class VnfSshHelper(AutoConnectSSH):
56 def __init__(self, node, bin_path, wait=None):
58 kwargs = self.args_from_node(self.node)
60 kwargs.setdefault('wait', wait)
62 super(VnfSshHelper, self).__init__(**kwargs)
63 self.bin_path = bin_path
67 # must return static class name, anything else refers to the calling class
68 # i.e. the subclass, not the superclass
72 # this copy constructor is different from SSH classes, since it uses node
73 return self.get_class()(self.node, self.bin_path)
75 def upload_config_file(self, prefix, content):
76 cfg_file = os.path.join(REMOTE_TMP, prefix)
78 file_obj = cStringIO(content)
79 self.put_file_obj(file_obj, cfg_file)
82 def join_bin_path(self, *args):
83 return os.path.join(self.bin_path, *args)
85 def provision_tool(self, tool_path=None, tool_file=None):
87 tool_path = self.bin_path
88 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91 class SetupEnvHelper(object):
93 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100 super(SetupEnvHelper, self).__init__()
101 self.vnfd_helper = vnfd_helper
102 self.ssh_helper = ssh_helper
103 self.scenario_helper = scenario_helper
105 def build_config(self):
106 raise NotImplementedError
108 def setup_vnf_environment(self):
115 raise NotImplementedError
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
121 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
124 def _update_packet_type(ip_pipeline_cfg, traffic_options):
125 match_str = 'pkt_type = ipv4'
126 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128 return pipeline_config_str
131 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132 traffic_type = traffic_options['traffic_type']
134 if traffic_options['vnf_type'] is not cls.APP_NAME:
135 match_str = 'traffic_type = 4'
136 replace_str = 'traffic_type = {0}'.format(traffic_type)
138 elif traffic_type == 4:
139 match_str = 'pkt_type = ipv4'
140 replace_str = 'pkt_type = ipv4'
143 match_str = 'pkt_type = ipv4'
144 replace_str = 'pkt_type = ipv6'
146 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147 return pipeline_config_str
149 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151 self.all_ports = None
152 self.bound_pci = None
154 self.used_drivers = None
155 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
157 def _setup_hugepages(self):
158 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
162 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
165 if hugepages == "2048kB":
170 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
172 def build_config(self):
173 vnf_cfg = self.scenario_helper.vnf_cfg
174 task_path = self.scenario_helper.task_path
176 lb_count = vnf_cfg.get('lb_count', 3)
177 lb_config = vnf_cfg.get('lb_config', 'SW')
178 worker_config = vnf_cfg.get('worker_config', '1C/1T')
179 worker_threads = vnf_cfg.get('worker_threads', 3)
181 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
183 'traffic_type': traffic_type,
184 'pkt_type': 'ipv%s' % traffic_type,
185 'vnf_type': self.VNF_TYPE,
188 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189 config_basename = posixpath.basename(self.CFG_CONFIG)
190 script_basename = posixpath.basename(self.CFG_SCRIPT)
191 multiport = MultiPortConfig(self.scenario_helper.topology,
202 multiport.generate_config()
203 with open(self.CFG_CONFIG) as handle:
204 new_config = handle.read()
206 new_config = self._update_traffic_type(new_config, traffic_options)
207 new_config = self._update_packet_type(new_config, traffic_options)
209 self.ssh_helper.upload_config_file(config_basename, new_config)
210 self.ssh_helper.upload_config_file(script_basename,
211 multiport.generate_script(self.vnfd_helper))
213 LOG.info("Provision and start the %s", self.APP_NAME)
214 self._build_pipeline_kwargs()
215 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
217 def _build_pipeline_kwargs(self):
218 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219 # count the number of actual ports in the list of pairs
220 # remove duplicate ports
221 # this is really a mapping from LINK ID to DPDK PMD ID
222 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224 ports = self.vnfd_helper.port_pairs.all_ports
225 port_nums = self.vnfd_helper.port_nums(ports)
226 # create mask from all the dpdk port numbers
227 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228 self.pipeline_kwargs = {
229 'cfg_file': self.CFG_CONFIG,
230 'script': self.CFG_SCRIPT,
231 'port_mask_hex': ports_mask_hex,
232 'tool_path': tool_path,
235 def setup_vnf_environment(self):
237 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
239 # bind before _setup_resources so we can use dpdk_port_num
240 self._detect_and_bind_drivers()
241 resource = self._setup_resources()
245 # pkill is not matching, debug with pgrep
246 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
247 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248 # have to use exact match
249 # try using killall to match
250 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
252 def _setup_dpdk(self):
253 """ setup dpdk environment needed for vnf to run """
255 self._setup_hugepages()
256 self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
258 exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
262 dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
263 dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
264 exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
266 self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
268 def get_collectd_options(self):
269 options = self.scenario_helper.all_options.get("collectd", {})
270 # override with specific node settings
271 options.update(self.scenario_helper.options.get("collectd", {}))
274 def _setup_resources(self):
275 # what is this magic? how do we know which socket is for which port?
276 # what about quad-socket?
277 if any(v[5] == "0" for v in self.bound_pci):
282 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
283 # this won't work because we don't have DPDK port numbers yet
284 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
285 port_names = (intf["name"] for intf in ports)
286 collectd_options = self.get_collectd_options()
287 plugins = collectd_options.get("plugins", {})
288 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
289 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
290 plugins=plugins, interval=collectd_options.get("interval"),
291 timeout=self.scenario_helper.timeout)
293 def _detect_and_bind_drivers(self):
294 interfaces = self.vnfd_helper.interfaces
296 self.dpdk_bind_helper.read_status()
297 self.dpdk_bind_helper.save_used_drivers()
299 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
301 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
302 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
304 intf = next(v for v in interfaces
305 if vpci == v['virtual-interface']['vpci'])
307 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
308 except: # pylint: disable=bare-except
312 def get_local_iface_name_by_vpci(self, vpci):
313 find_net_cmd = self.FIND_NET_CMD.format(vpci)
314 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
320 self.dpdk_bind_helper.rebind_drivers()
323 class ResourceHelper(object):
326 MAKE_INSTALL = 'cd {0} && make && sudo make install'
327 RESOURCE_WORD = 'sample'
331 def __init__(self, setup_helper):
332 super(ResourceHelper, self).__init__()
334 self.setup_helper = setup_helper
335 self.ssh_helper = setup_helper.ssh_helper
338 self.resource = self.setup_helper.setup_vnf_environment()
340 def generate_cfg(self):
343 def _collect_resource_kpi(self):
345 status = self.resource.check_if_system_agent_running("collectd")[0]
347 result = self.resource.amqp_collect_nfvi_kpi()
349 result = {"core": result}
352 def start_collect(self):
353 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
354 self.resource.start()
355 self.resource.amqp_process_for_nfvi_kpi()
357 def stop_collect(self):
361 def collect_kpi(self):
362 return self._collect_resource_kpi()
365 class ClientResourceHelper(ResourceHelper):
372 def __init__(self, setup_helper):
373 super(ClientResourceHelper, self).__init__(setup_helper)
374 self.vnfd_helper = setup_helper.vnfd_helper
375 self.scenario_helper = setup_helper.scenario_helper
378 self.client_started = Value('i', 0)
379 self.all_ports = None
380 self._queue = Queue()
382 self._terminated = Value('i', 0)
384 def _build_ports(self):
385 self.networks = self.vnfd_helper.port_pairs.networks
386 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
387 self.downlink_ports = \
388 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
389 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
391 def port_num(self, intf):
392 # by default return port num
393 return self.vnfd_helper.port_num(intf)
395 def get_stats(self, *args, **kwargs):
397 return self.client.get_stats(*args, **kwargs)
399 LOG.exception("TRex client not connected")
402 def generate_samples(self, ports, key=None, default=None):
403 # needs to be used ports
404 last_result = self.get_stats(ports)
405 key_value = last_result.get(key, default)
407 if not isinstance(last_result, Mapping): # added for mock unit test
408 self._terminated.value = 1
412 # recalculate port for interface and see if it matches ports provided
413 for intf in self.vnfd_helper.interfaces:
415 port = self.vnfd_helper.port_num(name)
417 xe_value = last_result.get(port, {})
419 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
420 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
421 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
422 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
423 "in_packets": int(xe_value.get("ipackets", 0)),
424 "out_packets": int(xe_value.get("opackets", 0)),
427 samples[name][key] = key_value
430 def _run_traffic_once(self, traffic_profile):
431 traffic_profile.execute_traffic(self)
432 self.client_started.value = 1
433 time.sleep(self.RUN_DURATION)
434 samples = self.generate_samples(traffic_profile.ports)
435 time.sleep(self.QUEUE_WAIT_TIME)
436 self._queue.put(samples)
438 def run_traffic(self, traffic_profile):
439 # if we don't do this we can hang waiting for the queue to drain
440 # have to do this in the subprocess
441 self._queue.cancel_join_thread()
442 # fixme: fix passing correct trex config file,
443 # instead of searching the default path
446 self.client = self._connect()
447 self.client.reset(ports=self.all_ports)
448 self.client.remove_all_streams(self.all_ports) # remove all streams
449 traffic_profile.register_generator(self)
451 while self._terminated.value == 0:
452 self._run_traffic_once(traffic_profile)
454 self.client.stop(self.all_ports)
455 self.client.disconnect()
456 self._terminated.value = 0
458 if self._terminated.value:
459 LOG.debug("traffic generator is stopped")
460 return # return if trex/tg server is stopped.
464 self._terminated.value = 1 # stop client
466 def clear_stats(self, ports=None):
468 ports = self.all_ports
469 self.client.clear_stats(ports=ports)
471 def start(self, ports=None, *args, **kwargs):
472 # pylint: disable=keyword-arg-before-vararg
473 # NOTE(ralonsoh): defining keyworded arguments before variable
474 # positional arguments is a bug. This function definition doesn't work
475 # in Python 2, although it works in Python 3. Reference:
476 # https://www.python.org/dev/peps/pep-3102/
478 ports = self.all_ports
479 self.client.start(ports=ports, *args, **kwargs)
481 def collect_kpi(self):
482 if not self._queue.empty():
483 kpi = self._queue.get()
484 self._result.update(kpi)
485 LOG.debug('Got KPIs from _queue for %s %s',
486 self.scenario_helper.name, self.RESOURCE_WORD)
489 def _connect(self, client=None):
491 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
492 server=self.vnfd_helper.mgmt_interface["ip"],
493 verbose_level=LoggerApi.VERBOSE_QUIET)
495 # try to connect with 5s intervals, 30s max
501 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
506 class Rfc2544ResourceHelper(object):
508 DEFAULT_CORRELATED_TRAFFIC = False
509 DEFAULT_LATENCY = False
510 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
512 def __init__(self, scenario_helper):
513 super(Rfc2544ResourceHelper, self).__init__()
514 self.scenario_helper = scenario_helper
515 self._correlated_traffic = None
516 self.iteration = Value('i', 0)
519 self._tolerance_low = None
520 self._tolerance_high = None
524 if self._rfc2544 is None:
525 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
529 def tolerance_low(self):
530 if self._tolerance_low is None:
531 self.get_rfc_tolerance()
532 return self._tolerance_low
535 def tolerance_high(self):
536 if self._tolerance_high is None:
537 self.get_rfc_tolerance()
538 return self._tolerance_high
541 def correlated_traffic(self):
542 if self._correlated_traffic is None:
543 self._correlated_traffic = \
544 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
546 return self._correlated_traffic
550 if self._latency is None:
551 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
554 def get_rfc2544(self, name, default=None):
555 return self.rfc2544.get(name, default)
557 def get_rfc_tolerance(self):
558 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
559 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
560 self._tolerance_low = next(tolerance_iter)
561 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
564 class SampleVNFDeployHelper(object):
566 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
567 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
568 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
570 def __init__(self, vnfd_helper, ssh_helper):
571 super(SampleVNFDeployHelper, self).__init__()
572 self.ssh_helper = ssh_helper
573 self.vnfd_helper = vnfd_helper
575 def deploy_vnfs(self, app_name):
576 vnf_bin = self.ssh_helper.join_bin_path(app_name)
577 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
581 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
582 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
584 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
585 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
587 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
589 http_proxy = os.environ.get('http_proxy', '')
590 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
592 self.ssh_helper.execute(cmd)
593 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
594 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
595 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
598 class ScenarioHelper(object):
603 'worker_config': '1C/1T',
607 def __init__(self, name):
609 self.scenario_cfg = None
613 return self.scenario_cfg['task_path']
617 return self.scenario_cfg.get('nodes')
620 def all_options(self):
621 return self.scenario_cfg.get('options', {})
625 return self.all_options.get(self.name, {})
629 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
633 return self.scenario_cfg['topology']
637 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
640 class SampleVNF(GenericVNF):
641 """ Class providing file-like API for generic VNF implementation """
643 VNF_PROMPT = "pipeline>"
645 WAIT_TIME_FOR_SCRIPT = 10
646 APP_NAME = "SampleVNF"
647 # we run the VNF interactively, so the ssh command will timeout after this long
649 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
650 super(SampleVNF, self).__init__(name, vnfd)
651 self.bin_path = get_nsb_option('bin_path', '')
653 self.scenario_helper = ScenarioHelper(self.name)
654 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
656 if setup_env_helper_type is None:
657 setup_env_helper_type = SetupEnvHelper
659 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
661 self.scenario_helper)
663 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
665 if resource_helper_type is None:
666 resource_helper_type = ResourceHelper
668 self.resource_helper = resource_helper_type(self.setup_helper)
670 self.context_cfg = None
671 self.nfvi_context = None
672 self.pipeline_kwargs = {}
673 self.uplink_ports = None
674 self.downlink_ports = None
675 # NOTE(esm): make QueueFileWrapper invert-able so that we
676 # never have to manage the queues
679 self.queue_wrapper = None
681 self.used_drivers = {}
682 self.vnf_port_pairs = None
683 self._vnf_process = None
685 def _build_ports(self):
686 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
687 self.networks = self._port_pairs.networks
688 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
689 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
690 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
692 def _get_route_data(self, route_index, route_type):
693 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
694 for _ in range(route_index):
696 return next(route_iter, {}).get(route_type, '')
698 def _get_port0localip6(self):
699 return_value = self._get_route_data(0, 'network')
700 LOG.info("_get_port0localip6 : %s", return_value)
703 def _get_port1localip6(self):
704 return_value = self._get_route_data(1, 'network')
705 LOG.info("_get_port1localip6 : %s", return_value)
708 def _get_port0prefixlen6(self):
709 return_value = self._get_route_data(0, 'netmask')
710 LOG.info("_get_port0prefixlen6 : %s", return_value)
713 def _get_port1prefixlen6(self):
714 return_value = self._get_route_data(1, 'netmask')
715 LOG.info("_get_port1prefixlen6 : %s", return_value)
718 def _get_port0gateway6(self):
719 return_value = self._get_route_data(0, 'network')
720 LOG.info("_get_port0gateway6 : %s", return_value)
723 def _get_port1gateway6(self):
724 return_value = self._get_route_data(1, 'network')
725 LOG.info("_get_port1gateway6 : %s", return_value)
728 def _start_vnf(self):
729 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
730 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
731 self._vnf_process = Process(name=name, target=self._run)
732 self._vnf_process.start()
734 def _vnf_up_post(self):
737 def instantiate(self, scenario_cfg, context_cfg):
738 self.scenario_helper.scenario_cfg = scenario_cfg
739 self.context_cfg = context_cfg
740 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
741 # self.nfvi_context = None
743 # vnf deploy is unsupported, use ansible playbooks
744 if self.scenario_helper.options.get("vnf_deploy", False):
745 self.deploy_helper.deploy_vnfs(self.APP_NAME)
746 self.resource_helper.setup()
749 def wait_for_instantiate(self):
751 time.sleep(self.WAIT_TIME) # Give some time for config to load
753 if not self._vnf_process.is_alive():
754 raise RuntimeError("%s VNF process died." % self.APP_NAME)
756 # NOTE(esm): move to QueueFileWrapper
757 while self.q_out.qsize() > 0:
758 buf.append(self.q_out.get())
759 message = ''.join(buf)
760 if self.VNF_PROMPT in message:
761 LOG.info("%s VNF is up and running.", self.APP_NAME)
763 self.queue_wrapper.clear()
764 self.resource_helper.start_collect()
765 return self._vnf_process.exitcode
767 if "PANIC" in message:
768 raise RuntimeError("Error starting %s VNF." %
771 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
772 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
773 # Send ENTER to display a new prompt in case the prompt text was corrupted
774 # by other VNF output
775 self.q_in.put('\r\n')
777 def _build_run_kwargs(self):
779 'stdin': self.queue_wrapper,
780 'stdout': self.queue_wrapper,
781 'keep_stdin_open': True,
783 'timeout': self.scenario_helper.timeout,
786 def _build_config(self):
787 return self.setup_helper.build_config()
790 # we can't share ssh paramiko objects to force new connection
791 self.ssh_helper.drop_connection()
792 cmd = self._build_config()
793 # kill before starting
794 self.setup_helper.kill_vnf()
797 self._build_run_kwargs()
798 self.ssh_helper.run(cmd, **self.run_kwargs)
800 def vnf_execute(self, cmd, wait_time=2):
801 """ send cmd to vnf process """
803 LOG.info("%s command: %s", self.APP_NAME, cmd)
804 self.q_in.put("{}\r\n".format(cmd))
805 time.sleep(wait_time)
807 while self.q_out.qsize() > 0:
808 output.append(self.q_out.get())
809 return "".join(output)
811 def _tear_down(self):
815 self.vnf_execute("quit")
816 self.setup_helper.kill_vnf()
818 self.resource_helper.stop_collect()
819 if self._vnf_process is not None:
820 # be proper and join first before we kill
821 LOG.debug("joining before terminate %s", self._vnf_process.name)
822 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
823 self._vnf_process.terminate()
824 # no terminate children here because we share processes with tg
826 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
827 """Method for checking the statistics
829 This method could be overridden in children classes.
831 :return: VNF statistics
833 cmd = 'p {0} stats'.format(self.APP_WORD)
834 out = self.vnf_execute(cmd)
837 def collect_kpi(self):
838 # we can't get KPIs if the VNF is down
839 check_if_process_failed(self._vnf_process)
840 stats = self.get_stats()
841 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
843 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
844 result["collect_stats"] = self.resource_helper.collect_kpi()
849 "packets_dropped": 0,
851 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
854 def scale(self, flavor=""):
855 """The SampleVNF base class doesn't provide the 'scale' feature"""
856 raise y_exceptions.FunctionNotImplemented(
857 function_name='scale', class_name='SampleVNFTrafficGen')
860 class SampleVNFTrafficGen(GenericTrafficGen):
861 """ Class providing file-like API for generic traffic generator """
866 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
867 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
868 self.bin_path = get_nsb_option('bin_path', '')
870 self.scenario_helper = ScenarioHelper(self.name)
871 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
873 if setup_env_helper_type is None:
874 setup_env_helper_type = SetupEnvHelper
876 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
878 self.scenario_helper)
880 if resource_helper_type is None:
881 resource_helper_type = ClientResourceHelper
883 self.resource_helper = resource_helper_type(self.setup_helper)
885 self.runs_traffic = True
886 self.traffic_finished = False
887 self._tg_process = None
888 self._traffic_process = None
890 def _start_server(self):
891 # we can't share ssh paramiko objects to force new connection
892 self.ssh_helper.drop_connection()
894 def instantiate(self, scenario_cfg, context_cfg):
895 self.scenario_helper.scenario_cfg = scenario_cfg
896 self.resource_helper.setup()
897 # must generate_cfg after DPDK bind because we need port number
898 self.resource_helper.generate_cfg()
900 LOG.info("Starting %s server...", self.APP_NAME)
901 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
902 self._tg_process = Process(name=name, target=self._start_server)
903 self._tg_process.start()
905 def _check_status(self):
906 raise NotImplementedError
908 def _wait_for_process(self):
910 if not self._tg_process.is_alive():
911 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
912 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
914 status = self._check_status()
916 LOG.info("%s TG Server is up and running.", self.APP_NAME)
917 return self._tg_process.exitcode
919 def _traffic_runner(self, traffic_profile):
920 # always drop connections first thing in new processes
921 # so we don't get paramiko errors
922 self.ssh_helper.drop_connection()
923 LOG.info("Starting %s client...", self.APP_NAME)
924 self.resource_helper.run_traffic(traffic_profile)
926 def run_traffic(self, traffic_profile):
927 """ Generate traffic on the wire according to the given params.
928 Method is non-blocking, returns immediately when traffic process
929 is running. Mandatory.
931 :param traffic_profile:
934 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
936 self._traffic_process = Process(name=name, target=self._traffic_runner,
937 args=(traffic_profile,))
938 self._traffic_process.start()
939 # Wait for traffic process to start
940 while self.resource_helper.client_started.value == 0:
941 time.sleep(self.RUN_WAIT)
942 # what if traffic process takes a few seconds to start?
943 if not self._traffic_process.is_alive():
946 return self._traffic_process.is_alive()
948 def collect_kpi(self):
949 # check if the tg processes have exited
950 for proc in (self._tg_process, self._traffic_process):
951 check_if_process_failed(proc)
952 result = self.resource_helper.collect_kpi()
953 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
957 """ After this method finishes, all traffic processes should stop. Mandatory.
961 self.traffic_finished = True
962 # we must kill client before we kill the server, or the client will raise exception
963 if self._traffic_process is not None:
964 # be proper and try to join before terminating
965 LOG.debug("joining before terminate %s", self._traffic_process.name)
966 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
967 self._traffic_process.terminate()
968 if self._tg_process is not None:
969 # be proper and try to join before terminating
970 LOG.debug("joining before terminate %s", self._tg_process.name)
971 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
972 self._tg_process.terminate()
973 # no terminate children here because we share processes with vnf
975 def scale(self, flavor=""):
976 """A traffic generator VFN doesn't provide the 'scale' feature"""
977 raise y_exceptions.FunctionNotImplemented(
978 function_name='scale', class_name='SampleVNFTrafficGen')