1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
26 from six.moves import cStringIO
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
33 from yardstick.common import exceptions as y_exceptions
34 from yardstick.common import utils
35 from yardstick.common.process import check_if_process_failed
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
37 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
38 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
39 from yardstick.network_services.nfvi.resource import ResourceProfile
40 from yardstick.network_services.utils import get_nsb_option
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
43 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
44 from yardstick.ssh import AutoConnectSSH
47 DPDK_VERSION = "dpdk-16.07"
49 LOG = logging.getLogger(__name__)
53 DEFAULT_VNF_TIMEOUT = 3600
54 PROCESS_JOIN_TIMEOUT = 3
57 class VnfSshHelper(AutoConnectSSH):
59 def __init__(self, node, bin_path, wait=None):
61 kwargs = self.args_from_node(self.node)
63 kwargs.setdefault('wait', wait)
65 super(VnfSshHelper, self).__init__(**kwargs)
66 self.bin_path = bin_path
70 # must return static class name, anything else refers to the calling class
71 # i.e. the subclass, not the superclass
75 # this copy constructor is different from SSH classes, since it uses node
76 return self.get_class()(self.node, self.bin_path)
78 def upload_config_file(self, prefix, content):
79 cfg_file = os.path.join(REMOTE_TMP, prefix)
81 file_obj = cStringIO(content)
82 self.put_file_obj(file_obj, cfg_file)
85 def join_bin_path(self, *args):
86 return os.path.join(self.bin_path, *args)
88 def provision_tool(self, tool_path=None, tool_file=None):
90 tool_path = self.bin_path
91 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
94 class SetupEnvHelper(object):
96 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
97 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103 super(SetupEnvHelper, self).__init__()
104 self.vnfd_helper = vnfd_helper
105 self.ssh_helper = ssh_helper
106 self.scenario_helper = scenario_helper
108 def build_config(self):
109 raise NotImplementedError
111 def setup_vnf_environment(self):
118 raise NotImplementedError
121 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
124 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
125 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
126 HUGEPAGES_KB = 1024 * 1024 * 16
129 def _update_packet_type(ip_pipeline_cfg, traffic_options):
130 match_str = 'pkt_type = ipv4'
131 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
132 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
133 return pipeline_config_str
136 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
137 traffic_type = traffic_options['traffic_type']
139 if traffic_options['vnf_type'] is not cls.APP_NAME:
140 match_str = 'traffic_type = 4'
141 replace_str = 'traffic_type = {0}'.format(traffic_type)
143 elif traffic_type == 4:
144 match_str = 'pkt_type = ipv4'
145 replace_str = 'pkt_type = ipv4'
148 match_str = 'pkt_type = ipv4'
149 replace_str = 'pkt_type = ipv6'
151 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
152 return pipeline_config_str
154 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
155 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
156 self.all_ports = None
157 self.bound_pci = None
159 self.used_drivers = None
160 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
162 def _setup_hugepages(self):
163 meminfo = utils.read_meminfo(self.ssh_helper)
164 hp_size_kb = int(meminfo['Hugepagesize'])
165 nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
166 self.ssh_helper.execute('echo %s | sudo tee %s' %
167 (nr_hugepages, self.NR_HUGEPAGES_PATH))
169 self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
170 nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
171 LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
172 hp_size_kb, nr_hugepages, nr_hugepages_set)
174 def build_config(self):
175 vnf_cfg = self.scenario_helper.vnf_cfg
176 task_path = self.scenario_helper.task_path
178 lb_count = vnf_cfg.get('lb_count', 3)
179 lb_config = vnf_cfg.get('lb_config', 'SW')
180 worker_config = vnf_cfg.get('worker_config', '1C/1T')
181 worker_threads = vnf_cfg.get('worker_threads', 3)
183 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
185 'traffic_type': traffic_type,
186 'pkt_type': 'ipv%s' % traffic_type,
187 'vnf_type': self.VNF_TYPE,
190 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
191 config_basename = posixpath.basename(self.CFG_CONFIG)
192 script_basename = posixpath.basename(self.CFG_SCRIPT)
193 multiport = MultiPortConfig(self.scenario_helper.topology,
204 multiport.generate_config()
205 with open(self.CFG_CONFIG) as handle:
206 new_config = handle.read()
208 new_config = self._update_traffic_type(new_config, traffic_options)
209 new_config = self._update_packet_type(new_config, traffic_options)
211 self.ssh_helper.upload_config_file(config_basename, new_config)
212 self.ssh_helper.upload_config_file(script_basename,
213 multiport.generate_script(self.vnfd_helper))
215 LOG.info("Provision and start the %s", self.APP_NAME)
216 self._build_pipeline_kwargs()
217 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
219 def _build_pipeline_kwargs(self):
220 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
221 # count the number of actual ports in the list of pairs
222 # remove duplicate ports
223 # this is really a mapping from LINK ID to DPDK PMD ID
224 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
225 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
226 ports = self.vnfd_helper.port_pairs.all_ports
227 port_nums = self.vnfd_helper.port_nums(ports)
228 # create mask from all the dpdk port numbers
229 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
230 self.pipeline_kwargs = {
231 'cfg_file': self.CFG_CONFIG,
232 'script': self.CFG_SCRIPT,
233 'port_mask_hex': ports_mask_hex,
234 'tool_path': tool_path,
237 def setup_vnf_environment(self):
239 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
241 # bind before _setup_resources so we can use dpdk_port_num
242 self._detect_and_bind_drivers()
243 resource = self._setup_resources()
247 # pkill is not matching, debug with pgrep
248 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
249 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
250 # have to use exact match
251 # try using killall to match
252 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
254 def _setup_dpdk(self):
255 """ setup dpdk environment needed for vnf to run """
257 self._setup_hugepages()
258 self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
260 exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
264 dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
265 dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
266 exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
268 self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
270 def get_collectd_options(self):
271 options = self.scenario_helper.all_options.get("collectd", {})
272 # override with specific node settings
273 options.update(self.scenario_helper.options.get("collectd", {}))
276 def _setup_resources(self):
277 # what is this magic? how do we know which socket is for which port?
278 # what about quad-socket?
279 if any(v[5] == "0" for v in self.bound_pci):
284 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
285 # this won't work because we don't have DPDK port numbers yet
286 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
287 port_names = (intf["name"] for intf in ports)
288 collectd_options = self.get_collectd_options()
289 plugins = collectd_options.get("plugins", {})
290 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
291 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
292 plugins=plugins, interval=collectd_options.get("interval"),
293 timeout=self.scenario_helper.timeout)
295 def _detect_and_bind_drivers(self):
296 interfaces = self.vnfd_helper.interfaces
298 self.dpdk_bind_helper.read_status()
299 self.dpdk_bind_helper.save_used_drivers()
301 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
303 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
304 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
306 intf = next(v for v in interfaces
307 if vpci == v['virtual-interface']['vpci'])
309 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
310 except: # pylint: disable=bare-except
314 def get_local_iface_name_by_vpci(self, vpci):
315 find_net_cmd = self.FIND_NET_CMD.format(vpci)
316 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
322 self.dpdk_bind_helper.rebind_drivers()
325 class ResourceHelper(object):
328 MAKE_INSTALL = 'cd {0} && make && sudo make install'
329 RESOURCE_WORD = 'sample'
333 def __init__(self, setup_helper):
334 super(ResourceHelper, self).__init__()
336 self.setup_helper = setup_helper
337 self.ssh_helper = setup_helper.ssh_helper
340 self.resource = self.setup_helper.setup_vnf_environment()
342 def generate_cfg(self):
345 def _collect_resource_kpi(self):
347 status = self.resource.check_if_system_agent_running("collectd")[0]
349 result = self.resource.amqp_collect_nfvi_kpi()
351 result = {"core": result}
354 def start_collect(self):
355 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
356 self.resource.start()
357 self.resource.amqp_process_for_nfvi_kpi()
359 def stop_collect(self):
363 def collect_kpi(self):
364 return self._collect_resource_kpi()
367 class ClientResourceHelper(ResourceHelper):
374 def __init__(self, setup_helper):
375 super(ClientResourceHelper, self).__init__(setup_helper)
376 self.vnfd_helper = setup_helper.vnfd_helper
377 self.scenario_helper = setup_helper.scenario_helper
380 self.client_started = Value('i', 0)
381 self.all_ports = None
382 self._queue = Queue()
384 self._terminated = Value('i', 0)
386 def _build_ports(self):
387 self.networks = self.vnfd_helper.port_pairs.networks
388 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
389 self.downlink_ports = \
390 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
391 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
393 def port_num(self, intf):
394 # by default return port num
395 return self.vnfd_helper.port_num(intf)
397 def get_stats(self, *args, **kwargs):
399 return self.client.get_stats(*args, **kwargs)
401 LOG.exception("TRex client not connected")
404 def generate_samples(self, ports, key=None, default=None):
405 # needs to be used ports
406 last_result = self.get_stats(ports)
407 key_value = last_result.get(key, default)
409 if not isinstance(last_result, Mapping): # added for mock unit test
410 self._terminated.value = 1
414 # recalculate port for interface and see if it matches ports provided
415 for intf in self.vnfd_helper.interfaces:
417 port = self.vnfd_helper.port_num(name)
419 xe_value = last_result.get(port, {})
421 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
422 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
423 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
424 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
425 "in_packets": int(xe_value.get("ipackets", 0)),
426 "out_packets": int(xe_value.get("opackets", 0)),
429 samples[name][key] = key_value
432 def _run_traffic_once(self, traffic_profile):
433 traffic_profile.execute_traffic(self)
434 self.client_started.value = 1
435 time.sleep(self.RUN_DURATION)
436 samples = self.generate_samples(traffic_profile.ports)
437 time.sleep(self.QUEUE_WAIT_TIME)
438 self._queue.put(samples)
440 def run_traffic(self, traffic_profile):
441 # if we don't do this we can hang waiting for the queue to drain
442 # have to do this in the subprocess
443 self._queue.cancel_join_thread()
444 # fixme: fix passing correct trex config file,
445 # instead of searching the default path
448 self.client = self._connect()
449 self.client.reset(ports=self.all_ports)
450 self.client.remove_all_streams(self.all_ports) # remove all streams
451 traffic_profile.register_generator(self)
453 while self._terminated.value == 0:
454 self._run_traffic_once(traffic_profile)
456 self.client.stop(self.all_ports)
457 self.client.disconnect()
458 self._terminated.value = 0
460 if self._terminated.value:
461 LOG.debug("traffic generator is stopped")
462 return # return if trex/tg server is stopped.
466 self._terminated.value = 1 # stop client
468 def clear_stats(self, ports=None):
470 ports = self.all_ports
471 self.client.clear_stats(ports=ports)
473 def start(self, ports=None, *args, **kwargs):
474 # pylint: disable=keyword-arg-before-vararg
475 # NOTE(ralonsoh): defining keyworded arguments before variable
476 # positional arguments is a bug. This function definition doesn't work
477 # in Python 2, although it works in Python 3. Reference:
478 # https://www.python.org/dev/peps/pep-3102/
480 ports = self.all_ports
481 self.client.start(ports=ports, *args, **kwargs)
483 def collect_kpi(self):
484 if not self._queue.empty():
485 kpi = self._queue.get()
486 self._result.update(kpi)
487 LOG.debug('Got KPIs from _queue for %s %s',
488 self.scenario_helper.name, self.RESOURCE_WORD)
491 def _connect(self, client=None):
493 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
494 server=self.vnfd_helper.mgmt_interface["ip"],
495 verbose_level=LoggerApi.VERBOSE_QUIET)
497 # try to connect with 5s intervals, 30s max
503 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
508 class Rfc2544ResourceHelper(object):
510 DEFAULT_CORRELATED_TRAFFIC = False
511 DEFAULT_LATENCY = False
512 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
514 def __init__(self, scenario_helper):
515 super(Rfc2544ResourceHelper, self).__init__()
516 self.scenario_helper = scenario_helper
517 self._correlated_traffic = None
518 self.iteration = Value('i', 0)
521 self._tolerance_low = None
522 self._tolerance_high = None
526 if self._rfc2544 is None:
527 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
531 def tolerance_low(self):
532 if self._tolerance_low is None:
533 self.get_rfc_tolerance()
534 return self._tolerance_low
537 def tolerance_high(self):
538 if self._tolerance_high is None:
539 self.get_rfc_tolerance()
540 return self._tolerance_high
543 def correlated_traffic(self):
544 if self._correlated_traffic is None:
545 self._correlated_traffic = \
546 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
548 return self._correlated_traffic
552 if self._latency is None:
553 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
556 def get_rfc2544(self, name, default=None):
557 return self.rfc2544.get(name, default)
559 def get_rfc_tolerance(self):
560 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
561 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
562 self._tolerance_low = next(tolerance_iter)
563 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
566 class SampleVNFDeployHelper(object):
568 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
569 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
570 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
572 def __init__(self, vnfd_helper, ssh_helper):
573 super(SampleVNFDeployHelper, self).__init__()
574 self.ssh_helper = ssh_helper
575 self.vnfd_helper = vnfd_helper
577 def deploy_vnfs(self, app_name):
578 vnf_bin = self.ssh_helper.join_bin_path(app_name)
579 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
583 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
584 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
586 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
587 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
589 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
591 http_proxy = os.environ.get('http_proxy', '')
592 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
594 self.ssh_helper.execute(cmd)
595 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
596 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
597 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
600 class ScenarioHelper(object):
605 'worker_config': '1C/1T',
609 def __init__(self, name):
611 self.scenario_cfg = None
615 return self.scenario_cfg['task_path']
619 return self.scenario_cfg.get('nodes')
622 def all_options(self):
623 return self.scenario_cfg.get('options', {})
627 return self.all_options.get(self.name, {})
631 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
635 return self.scenario_cfg['topology']
639 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
642 class SampleVNF(GenericVNF):
643 """ Class providing file-like API for generic VNF implementation """
645 VNF_PROMPT = "pipeline>"
647 WAIT_TIME_FOR_SCRIPT = 10
648 APP_NAME = "SampleVNF"
649 # we run the VNF interactively, so the ssh command will timeout after this long
651 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
652 super(SampleVNF, self).__init__(name, vnfd)
653 self.bin_path = get_nsb_option('bin_path', '')
655 self.scenario_helper = ScenarioHelper(self.name)
656 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
658 if setup_env_helper_type is None:
659 setup_env_helper_type = SetupEnvHelper
661 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
663 self.scenario_helper)
665 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
667 if resource_helper_type is None:
668 resource_helper_type = ResourceHelper
670 self.resource_helper = resource_helper_type(self.setup_helper)
672 self.context_cfg = None
673 self.nfvi_context = None
674 self.pipeline_kwargs = {}
675 self.uplink_ports = None
676 self.downlink_ports = None
677 # NOTE(esm): make QueueFileWrapper invert-able so that we
678 # never have to manage the queues
681 self.queue_wrapper = None
683 self.used_drivers = {}
684 self.vnf_port_pairs = None
685 self._vnf_process = None
687 def _build_ports(self):
688 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
689 self.networks = self._port_pairs.networks
690 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
691 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
692 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
694 def _get_route_data(self, route_index, route_type):
695 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
696 for _ in range(route_index):
698 return next(route_iter, {}).get(route_type, '')
700 def _get_port0localip6(self):
701 return_value = self._get_route_data(0, 'network')
702 LOG.info("_get_port0localip6 : %s", return_value)
705 def _get_port1localip6(self):
706 return_value = self._get_route_data(1, 'network')
707 LOG.info("_get_port1localip6 : %s", return_value)
710 def _get_port0prefixlen6(self):
711 return_value = self._get_route_data(0, 'netmask')
712 LOG.info("_get_port0prefixlen6 : %s", return_value)
715 def _get_port1prefixlen6(self):
716 return_value = self._get_route_data(1, 'netmask')
717 LOG.info("_get_port1prefixlen6 : %s", return_value)
720 def _get_port0gateway6(self):
721 return_value = self._get_route_data(0, 'network')
722 LOG.info("_get_port0gateway6 : %s", return_value)
725 def _get_port1gateway6(self):
726 return_value = self._get_route_data(1, 'network')
727 LOG.info("_get_port1gateway6 : %s", return_value)
730 def _start_vnf(self):
731 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
732 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
733 self._vnf_process = Process(name=name, target=self._run)
734 self._vnf_process.start()
736 def _vnf_up_post(self):
739 def instantiate(self, scenario_cfg, context_cfg):
740 self.scenario_helper.scenario_cfg = scenario_cfg
741 self.context_cfg = context_cfg
742 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
743 # self.nfvi_context = None
745 # vnf deploy is unsupported, use ansible playbooks
746 if self.scenario_helper.options.get("vnf_deploy", False):
747 self.deploy_helper.deploy_vnfs(self.APP_NAME)
748 self.resource_helper.setup()
751 def wait_for_instantiate(self):
753 time.sleep(self.WAIT_TIME) # Give some time for config to load
755 if not self._vnf_process.is_alive():
756 raise RuntimeError("%s VNF process died." % self.APP_NAME)
758 # NOTE(esm): move to QueueFileWrapper
759 while self.q_out.qsize() > 0:
760 buf.append(self.q_out.get())
761 message = ''.join(buf)
762 if self.VNF_PROMPT in message:
763 LOG.info("%s VNF is up and running.", self.APP_NAME)
765 self.queue_wrapper.clear()
766 self.resource_helper.start_collect()
767 return self._vnf_process.exitcode
769 if "PANIC" in message:
770 raise RuntimeError("Error starting %s VNF." %
773 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
774 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
775 # Send ENTER to display a new prompt in case the prompt text was corrupted
776 # by other VNF output
777 self.q_in.put('\r\n')
779 def _build_run_kwargs(self):
781 'stdin': self.queue_wrapper,
782 'stdout': self.queue_wrapper,
783 'keep_stdin_open': True,
785 'timeout': self.scenario_helper.timeout,
788 def _build_config(self):
789 return self.setup_helper.build_config()
792 # we can't share ssh paramiko objects to force new connection
793 self.ssh_helper.drop_connection()
794 cmd = self._build_config()
795 # kill before starting
796 self.setup_helper.kill_vnf()
799 self._build_run_kwargs()
800 self.ssh_helper.run(cmd, **self.run_kwargs)
802 def vnf_execute(self, cmd, wait_time=2):
803 """ send cmd to vnf process """
805 LOG.info("%s command: %s", self.APP_NAME, cmd)
806 self.q_in.put("{}\r\n".format(cmd))
807 time.sleep(wait_time)
809 while self.q_out.qsize() > 0:
810 output.append(self.q_out.get())
811 return "".join(output)
813 def _tear_down(self):
817 self.vnf_execute("quit")
818 self.setup_helper.kill_vnf()
820 self.resource_helper.stop_collect()
821 if self._vnf_process is not None:
822 # be proper and join first before we kill
823 LOG.debug("joining before terminate %s", self._vnf_process.name)
824 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
825 self._vnf_process.terminate()
826 # no terminate children here because we share processes with tg
828 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
829 """Method for checking the statistics
831 This method could be overridden in children classes.
833 :return: VNF statistics
835 cmd = 'p {0} stats'.format(self.APP_WORD)
836 out = self.vnf_execute(cmd)
839 def collect_kpi(self):
840 # we can't get KPIs if the VNF is down
841 check_if_process_failed(self._vnf_process)
842 stats = self.get_stats()
843 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
845 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
846 result["collect_stats"] = self.resource_helper.collect_kpi()
851 "packets_dropped": 0,
853 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
856 def scale(self, flavor=""):
857 """The SampleVNF base class doesn't provide the 'scale' feature"""
858 raise y_exceptions.FunctionNotImplemented(
859 function_name='scale', class_name='SampleVNFTrafficGen')
862 class SampleVNFTrafficGen(GenericTrafficGen):
863 """ Class providing file-like API for generic traffic generator """
868 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
869 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
870 self.bin_path = get_nsb_option('bin_path', '')
872 self.scenario_helper = ScenarioHelper(self.name)
873 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
875 if setup_env_helper_type is None:
876 setup_env_helper_type = SetupEnvHelper
878 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
880 self.scenario_helper)
882 if resource_helper_type is None:
883 resource_helper_type = ClientResourceHelper
885 self.resource_helper = resource_helper_type(self.setup_helper)
887 self.runs_traffic = True
888 self.traffic_finished = False
889 self._tg_process = None
890 self._traffic_process = None
892 def _start_server(self):
893 # we can't share ssh paramiko objects to force new connection
894 self.ssh_helper.drop_connection()
896 def instantiate(self, scenario_cfg, context_cfg):
897 self.scenario_helper.scenario_cfg = scenario_cfg
898 self.resource_helper.setup()
899 # must generate_cfg after DPDK bind because we need port number
900 self.resource_helper.generate_cfg()
902 LOG.info("Starting %s server...", self.APP_NAME)
903 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
904 self._tg_process = Process(name=name, target=self._start_server)
905 self._tg_process.start()
907 def _check_status(self):
908 raise NotImplementedError
910 def _wait_for_process(self):
912 if not self._tg_process.is_alive():
913 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
914 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
916 status = self._check_status()
918 LOG.info("%s TG Server is up and running.", self.APP_NAME)
919 return self._tg_process.exitcode
921 def _traffic_runner(self, traffic_profile):
922 # always drop connections first thing in new processes
923 # so we don't get paramiko errors
924 self.ssh_helper.drop_connection()
925 LOG.info("Starting %s client...", self.APP_NAME)
926 self.resource_helper.run_traffic(traffic_profile)
928 def run_traffic(self, traffic_profile):
929 """ Generate traffic on the wire according to the given params.
930 Method is non-blocking, returns immediately when traffic process
931 is running. Mandatory.
933 :param traffic_profile:
936 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
938 self._traffic_process = Process(name=name, target=self._traffic_runner,
939 args=(traffic_profile,))
940 self._traffic_process.start()
941 # Wait for traffic process to start
942 while self.resource_helper.client_started.value == 0:
943 time.sleep(self.RUN_WAIT)
944 # what if traffic process takes a few seconds to start?
945 if not self._traffic_process.is_alive():
948 return self._traffic_process.is_alive()
950 def collect_kpi(self):
951 # check if the tg processes have exited
952 for proc in (self._tg_process, self._traffic_process):
953 check_if_process_failed(proc)
954 result = self.resource_helper.collect_kpi()
955 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
959 """ After this method finishes, all traffic processes should stop. Mandatory.
963 self.traffic_finished = True
964 # we must kill client before we kill the server, or the client will raise exception
965 if self._traffic_process is not None:
966 # be proper and try to join before terminating
967 LOG.debug("joining before terminate %s", self._traffic_process.name)
968 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
969 self._traffic_process.terminate()
970 if self._tg_process is not None:
971 # be proper and try to join before terminating
972 LOG.debug("joining before terminate %s", self._tg_process.name)
973 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
974 self._tg_process.terminate()
975 # no terminate children here because we share processes with vnf
977 def scale(self, flavor=""):
978 """A traffic generator VFN doesn't provide the 'scale' feature"""
979 raise y_exceptions.FunctionNotImplemented(
980 function_name='scale', class_name='SampleVNFTrafficGen')