1 # Copyright (c) 2016-2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services.constants import DEFAULT_VNF_TIMEOUT
36 from yardstick.network_services.constants import PROCESS_JOIN_TIMEOUT
37 from yardstick.network_services.constants import REMOTE_TMP
38 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
39 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
40 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
41 from yardstick.network_services.nfvi.resource import ResourceProfile
42 from yardstick.network_services.utils import get_nsb_option
43 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
44 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
45 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
46 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
49 LOG = logging.getLogger(__name__)
52 class SetupEnvHelper(object):
54 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
55 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
56 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
60 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
61 super(SetupEnvHelper, self).__init__()
62 self.vnfd_helper = vnfd_helper
63 self.ssh_helper = ssh_helper
64 self.scenario_helper = scenario_helper
66 def build_config(self):
67 raise NotImplementedError
69 def setup_vnf_environment(self):
76 raise NotImplementedError
79 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
82 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
83 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
84 HUGEPAGES_KB = 1024 * 1024 * 16
87 def _update_packet_type(ip_pipeline_cfg, traffic_options):
88 match_str = 'pkt_type = ipv4'
89 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
90 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
91 return pipeline_config_str
94 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
95 traffic_type = traffic_options['traffic_type']
97 if traffic_options['vnf_type'] is not cls.APP_NAME:
98 match_str = 'traffic_type = 4'
99 replace_str = 'traffic_type = {0}'.format(traffic_type)
101 elif traffic_type == 4:
102 match_str = 'pkt_type = ipv4'
103 replace_str = 'pkt_type = ipv4'
106 match_str = 'pkt_type = ipv4'
107 replace_str = 'pkt_type = ipv6'
109 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
110 return pipeline_config_str
112 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
113 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
114 self.all_ports = None
115 self.bound_pci = None
117 self.used_drivers = None
118 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
120 def _setup_hugepages(self):
121 meminfo = utils.read_meminfo(self.ssh_helper)
122 hp_size_kb = int(meminfo['Hugepagesize'])
123 nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
124 self.ssh_helper.execute('echo %s | sudo tee %s' %
125 (nr_hugepages, self.NR_HUGEPAGES_PATH))
127 self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
128 nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
129 LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
130 hp_size_kb, nr_hugepages, nr_hugepages_set)
132 def build_config(self):
133 vnf_cfg = self.scenario_helper.vnf_cfg
134 task_path = self.scenario_helper.task_path
136 config_file = vnf_cfg.get('file')
137 lb_count = vnf_cfg.get('lb_count', 3)
138 lb_config = vnf_cfg.get('lb_config', 'SW')
139 worker_config = vnf_cfg.get('worker_config', '1C/1T')
140 worker_threads = vnf_cfg.get('worker_threads', 3)
142 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
144 'traffic_type': traffic_type,
145 'pkt_type': 'ipv%s' % traffic_type,
146 'vnf_type': self.VNF_TYPE,
149 config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
151 config_basename = posixpath.basename(self.CFG_CONFIG)
152 script_basename = posixpath.basename(self.CFG_SCRIPT)
153 multiport = MultiPortConfig(self.scenario_helper.topology,
164 multiport.generate_config()
166 with utils.open_relative_file(config_file, task_path) as infile:
167 new_config = ['[EAL]']
169 for port in self.vnfd_helper.port_pairs.all_ports:
170 interface = self.vnfd_helper.find_interface(name=port)
171 vpci.append(interface['virtual-interface']["vpci"])
172 new_config.extend('w = {0}'.format(item) for item in vpci)
173 new_config = '\n'.join(new_config) + '\n' + infile.read()
175 with open(self.CFG_CONFIG) as handle:
176 new_config = handle.read()
177 new_config = self._update_traffic_type(new_config, traffic_options)
178 new_config = self._update_packet_type(new_config, traffic_options)
179 self.ssh_helper.upload_config_file(config_basename, new_config)
180 self.ssh_helper.upload_config_file(script_basename,
181 multiport.generate_script(self.vnfd_helper))
183 LOG.info("Provision and start the %s", self.APP_NAME)
184 self._build_pipeline_kwargs()
185 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
187 def _build_pipeline_kwargs(self):
188 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
189 # count the number of actual ports in the list of pairs
190 # remove duplicate ports
191 # this is really a mapping from LINK ID to DPDK PMD ID
192 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
193 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
194 ports = self.vnfd_helper.port_pairs.all_ports
195 port_nums = self.vnfd_helper.port_nums(ports)
196 # create mask from all the dpdk port numbers
197 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
198 self.pipeline_kwargs = {
199 'cfg_file': self.CFG_CONFIG,
200 'script': self.CFG_SCRIPT,
201 'port_mask_hex': ports_mask_hex,
202 'tool_path': tool_path,
205 def setup_vnf_environment(self):
208 # bind before _setup_resources so we can use dpdk_port_num
209 self._detect_and_bind_drivers()
210 resource = self._setup_resources()
214 # pkill is not matching, debug with pgrep
215 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
216 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
217 # have to use exact match
218 # try using killall to match
219 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
221 def _setup_dpdk(self):
222 """Setup DPDK environment needed for VNF to run"""
223 self._setup_hugepages()
224 self.dpdk_bind_helper.load_dpdk_driver()
226 exit_status = self.dpdk_bind_helper.check_dpdk_driver()
230 def get_collectd_options(self):
231 options = self.scenario_helper.all_options.get("collectd", {})
232 # override with specific node settings
233 options.update(self.scenario_helper.options.get("collectd", {}))
236 def _setup_resources(self):
237 # what is this magic? how do we know which socket is for which port?
238 # what about quad-socket?
239 if any(v[5] == "0" for v in self.bound_pci):
244 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
245 # this won't work because we don't have DPDK port numbers yet
246 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
247 port_names = (intf["name"] for intf in ports)
248 collectd_options = self.get_collectd_options()
249 plugins = collectd_options.get("plugins", {})
250 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
251 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
252 plugins=plugins, interval=collectd_options.get("interval"),
253 timeout=self.scenario_helper.timeout)
255 def _check_interface_fields(self):
256 num_nodes = len(self.scenario_helper.nodes)
257 # OpenStack instance creation time is probably proportional to the number
259 timeout = 120 * num_nodes
260 dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
261 self.ssh_helper, timeout)
264 def _detect_and_bind_drivers(self):
265 interfaces = self.vnfd_helper.interfaces
267 self._check_interface_fields()
268 # check for bound after probe
269 self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
271 self.dpdk_bind_helper.read_status()
272 self.dpdk_bind_helper.save_used_drivers()
274 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
276 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
277 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
279 intf = next(v for v in interfaces
280 if vpci == v['virtual-interface']['vpci'])
282 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
283 except: # pylint: disable=bare-except
287 def get_local_iface_name_by_vpci(self, vpci):
288 find_net_cmd = self.FIND_NET_CMD.format(vpci)
289 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
295 self.dpdk_bind_helper.rebind_drivers()
298 class ResourceHelper(object):
301 MAKE_INSTALL = 'cd {0} && make && sudo make install'
302 RESOURCE_WORD = 'sample'
306 def __init__(self, setup_helper):
307 super(ResourceHelper, self).__init__()
309 self.setup_helper = setup_helper
310 self.ssh_helper = setup_helper.ssh_helper
313 self.resource = self.setup_helper.setup_vnf_environment()
315 def generate_cfg(self):
318 def _collect_resource_kpi(self):
320 status = self.resource.check_if_system_agent_running("collectd")[0]
322 result = self.resource.amqp_collect_nfvi_kpi()
324 result = {"core": result}
327 def start_collect(self):
328 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
329 self.resource.start()
330 self.resource.amqp_process_for_nfvi_kpi()
332 def stop_collect(self):
336 def collect_kpi(self):
337 return self._collect_resource_kpi()
340 class ClientResourceHelper(ResourceHelper):
347 def __init__(self, setup_helper):
348 super(ClientResourceHelper, self).__init__(setup_helper)
349 self.vnfd_helper = setup_helper.vnfd_helper
350 self.scenario_helper = setup_helper.scenario_helper
353 self.client_started = Value('i', 0)
354 self.all_ports = None
355 self._queue = Queue()
357 self._terminated = Value('i', 0)
359 def _build_ports(self):
360 self.networks = self.vnfd_helper.port_pairs.networks
361 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
362 self.downlink_ports = \
363 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
364 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
366 def port_num(self, intf):
367 # by default return port num
368 return self.vnfd_helper.port_num(intf)
370 def get_stats(self, *args, **kwargs):
372 return self.client.get_stats(*args, **kwargs)
374 LOG.exception("TRex client not connected")
377 def generate_samples(self, ports, key=None, default=None):
378 # needs to be used ports
379 last_result = self.get_stats(ports)
380 key_value = last_result.get(key, default)
382 if not isinstance(last_result, Mapping): # added for mock unit test
383 self._terminated.value = 1
387 # recalculate port for interface and see if it matches ports provided
388 for intf in self.vnfd_helper.interfaces:
390 port = self.vnfd_helper.port_num(name)
392 xe_value = last_result.get(port, {})
394 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
395 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
396 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
397 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
398 "in_packets": int(xe_value.get("ipackets", 0)),
399 "out_packets": int(xe_value.get("opackets", 0)),
402 samples[name][key] = key_value
405 def _run_traffic_once(self, traffic_profile):
406 traffic_profile.execute_traffic(self)
407 self.client_started.value = 1
408 time.sleep(self.RUN_DURATION)
409 samples = self.generate_samples(traffic_profile.ports)
410 time.sleep(self.QUEUE_WAIT_TIME)
411 self._queue.put(samples)
413 def run_traffic(self, traffic_profile):
414 # if we don't do this we can hang waiting for the queue to drain
415 # have to do this in the subprocess
416 self._queue.cancel_join_thread()
417 # fixme: fix passing correct trex config file,
418 # instead of searching the default path
421 self.client = self._connect()
422 self.client.reset(ports=self.all_ports)
423 self.client.remove_all_streams(self.all_ports) # remove all streams
424 traffic_profile.register_generator(self)
426 while self._terminated.value == 0:
427 self._run_traffic_once(traffic_profile)
429 self.client.stop(self.all_ports)
430 self.client.disconnect()
431 self._terminated.value = 0
433 if self._terminated.value:
434 LOG.debug("traffic generator is stopped")
435 return # return if trex/tg server is stopped.
439 self._terminated.value = 1 # stop client
441 def clear_stats(self, ports=None):
443 ports = self.all_ports
444 self.client.clear_stats(ports=ports)
446 def start(self, ports=None, *args, **kwargs):
447 # pylint: disable=keyword-arg-before-vararg
448 # NOTE(ralonsoh): defining keyworded arguments before variable
449 # positional arguments is a bug. This function definition doesn't work
450 # in Python 2, although it works in Python 3. Reference:
451 # https://www.python.org/dev/peps/pep-3102/
453 ports = self.all_ports
454 self.client.start(ports=ports, *args, **kwargs)
456 def collect_kpi(self):
457 if not self._queue.empty():
458 kpi = self._queue.get()
459 self._result.update(kpi)
460 LOG.debug('Got KPIs from _queue for %s %s',
461 self.scenario_helper.name, self.RESOURCE_WORD)
464 def _connect(self, client=None):
466 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
467 server=self.vnfd_helper.mgmt_interface["ip"],
468 verbose_level=LoggerApi.VERBOSE_QUIET)
470 # try to connect with 5s intervals, 30s max
476 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
481 class Rfc2544ResourceHelper(object):
483 DEFAULT_CORRELATED_TRAFFIC = False
484 DEFAULT_LATENCY = False
485 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
487 def __init__(self, scenario_helper):
488 super(Rfc2544ResourceHelper, self).__init__()
489 self.scenario_helper = scenario_helper
490 self._correlated_traffic = None
491 self.iteration = Value('i', 0)
494 self._tolerance_low = None
495 self._tolerance_high = None
499 if self._rfc2544 is None:
500 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
504 def tolerance_low(self):
505 if self._tolerance_low is None:
506 self.get_rfc_tolerance()
507 return self._tolerance_low
510 def tolerance_high(self):
511 if self._tolerance_high is None:
512 self.get_rfc_tolerance()
513 return self._tolerance_high
516 def correlated_traffic(self):
517 if self._correlated_traffic is None:
518 self._correlated_traffic = \
519 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
521 return self._correlated_traffic
525 if self._latency is None:
526 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
529 def get_rfc2544(self, name, default=None):
530 return self.rfc2544.get(name, default)
532 def get_rfc_tolerance(self):
533 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
534 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
535 self._tolerance_low = next(tolerance_iter)
536 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
539 class SampleVNFDeployHelper(object):
541 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
542 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
543 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
545 def __init__(self, vnfd_helper, ssh_helper):
546 super(SampleVNFDeployHelper, self).__init__()
547 self.ssh_helper = ssh_helper
548 self.vnfd_helper = vnfd_helper
550 def deploy_vnfs(self, app_name):
551 vnf_bin = self.ssh_helper.join_bin_path(app_name)
552 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
556 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
557 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
559 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
560 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
562 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
564 http_proxy = os.environ.get('http_proxy', '')
565 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
567 self.ssh_helper.execute(cmd)
568 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
569 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
570 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
573 class ScenarioHelper(object):
578 'worker_config': '1C/1T',
582 def __init__(self, name):
584 self.scenario_cfg = None
588 return self.scenario_cfg['task_path']
592 return self.scenario_cfg.get('nodes')
595 def all_options(self):
596 return self.scenario_cfg.get('options', {})
600 return self.all_options.get(self.name, {})
604 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
608 return self.scenario_cfg['topology']
612 test_duration = self.scenario_cfg.get('runner', {}).get('duration',
613 self.options.get('timeout', DEFAULT_VNF_TIMEOUT))
614 test_timeout = self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
615 return test_duration if test_duration > test_timeout else test_timeout
617 class SampleVNF(GenericVNF):
618 """ Class providing file-like API for generic VNF implementation """
620 VNF_PROMPT = "pipeline>"
622 WAIT_TIME_FOR_SCRIPT = 10
623 APP_NAME = "SampleVNF"
624 # we run the VNF interactively, so the ssh command will timeout after this long
626 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
627 super(SampleVNF, self).__init__(name, vnfd)
628 self.bin_path = get_nsb_option('bin_path', '')
630 self.scenario_helper = ScenarioHelper(self.name)
631 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
633 if setup_env_helper_type is None:
634 setup_env_helper_type = SetupEnvHelper
636 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
638 self.scenario_helper)
640 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
642 if resource_helper_type is None:
643 resource_helper_type = ResourceHelper
645 self.resource_helper = resource_helper_type(self.setup_helper)
647 self.context_cfg = None
648 self.nfvi_context = None
649 self.pipeline_kwargs = {}
650 self.uplink_ports = None
651 self.downlink_ports = None
652 # NOTE(esm): make QueueFileWrapper invert-able so that we
653 # never have to manage the queues
656 self.queue_wrapper = None
658 self.used_drivers = {}
659 self.vnf_port_pairs = None
660 self._vnf_process = None
662 def _build_ports(self):
663 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
664 self.networks = self._port_pairs.networks
665 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
666 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
667 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
669 def _get_route_data(self, route_index, route_type):
670 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
671 for _ in range(route_index):
673 return next(route_iter, {}).get(route_type, '')
675 def _get_port0localip6(self):
676 return_value = self._get_route_data(0, 'network')
677 LOG.info("_get_port0localip6 : %s", return_value)
680 def _get_port1localip6(self):
681 return_value = self._get_route_data(1, 'network')
682 LOG.info("_get_port1localip6 : %s", return_value)
685 def _get_port0prefixlen6(self):
686 return_value = self._get_route_data(0, 'netmask')
687 LOG.info("_get_port0prefixlen6 : %s", return_value)
690 def _get_port1prefixlen6(self):
691 return_value = self._get_route_data(1, 'netmask')
692 LOG.info("_get_port1prefixlen6 : %s", return_value)
695 def _get_port0gateway6(self):
696 return_value = self._get_route_data(0, 'network')
697 LOG.info("_get_port0gateway6 : %s", return_value)
700 def _get_port1gateway6(self):
701 return_value = self._get_route_data(1, 'network')
702 LOG.info("_get_port1gateway6 : %s", return_value)
705 def _start_vnf(self):
706 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
707 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
708 self._vnf_process = Process(name=name, target=self._run)
709 self._vnf_process.start()
711 def _vnf_up_post(self):
714 def instantiate(self, scenario_cfg, context_cfg):
715 self.scenario_helper.scenario_cfg = scenario_cfg
716 self.context_cfg = context_cfg
717 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
718 # self.nfvi_context = None
720 # vnf deploy is unsupported, use ansible playbooks
721 if self.scenario_helper.options.get("vnf_deploy", False):
722 self.deploy_helper.deploy_vnfs(self.APP_NAME)
723 self.resource_helper.setup()
726 def wait_for_instantiate(self):
728 time.sleep(self.WAIT_TIME) # Give some time for config to load
730 if not self._vnf_process.is_alive():
731 raise RuntimeError("%s VNF process died." % self.APP_NAME)
733 # NOTE(esm): move to QueueFileWrapper
734 while self.q_out.qsize() > 0:
735 buf.append(self.q_out.get())
736 message = ''.join(buf)
737 if self.VNF_PROMPT in message:
738 LOG.info("%s VNF is up and running.", self.APP_NAME)
740 self.queue_wrapper.clear()
741 self.resource_helper.start_collect()
742 return self._vnf_process.exitcode
744 if "PANIC" in message:
745 raise RuntimeError("Error starting %s VNF." %
748 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
749 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
750 # Send ENTER to display a new prompt in case the prompt text was corrupted
751 # by other VNF output
752 self.q_in.put('\r\n')
754 def _build_run_kwargs(self):
756 'stdin': self.queue_wrapper,
757 'stdout': self.queue_wrapper,
758 'keep_stdin_open': True,
760 'timeout': self.scenario_helper.timeout,
763 def _build_config(self):
764 return self.setup_helper.build_config()
767 # we can't share ssh paramiko objects to force new connection
768 self.ssh_helper.drop_connection()
769 cmd = self._build_config()
770 # kill before starting
771 self.setup_helper.kill_vnf()
774 self._build_run_kwargs()
775 self.ssh_helper.run(cmd, **self.run_kwargs)
777 def vnf_execute(self, cmd, wait_time=2):
778 """ send cmd to vnf process """
780 LOG.info("%s command: %s", self.APP_NAME, cmd)
781 self.q_in.put("{}\r\n".format(cmd))
782 time.sleep(wait_time)
784 while self.q_out.qsize() > 0:
785 output.append(self.q_out.get())
786 return "".join(output)
788 def _tear_down(self):
792 self.vnf_execute("quit")
793 self.setup_helper.kill_vnf()
795 self.resource_helper.stop_collect()
796 if self._vnf_process is not None:
797 # be proper and join first before we kill
798 LOG.debug("joining before terminate %s", self._vnf_process.name)
799 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
800 self._vnf_process.terminate()
801 # no terminate children here because we share processes with tg
803 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
804 """Method for checking the statistics
806 This method could be overridden in children classes.
808 :return: VNF statistics
810 cmd = 'p {0} stats'.format(self.APP_WORD)
811 out = self.vnf_execute(cmd)
814 def collect_kpi(self):
815 # we can't get KPIs if the VNF is down
816 check_if_process_failed(self._vnf_process)
817 stats = self.get_stats()
818 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
820 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
821 result["collect_stats"] = self.resource_helper.collect_kpi()
826 "packets_dropped": 0,
828 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
831 def scale(self, flavor=""):
832 """The SampleVNF base class doesn't provide the 'scale' feature"""
833 raise y_exceptions.FunctionNotImplemented(
834 function_name='scale', class_name='SampleVNFTrafficGen')
837 class SampleVNFTrafficGen(GenericTrafficGen):
838 """ Class providing file-like API for generic traffic generator """
843 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
844 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
845 self.bin_path = get_nsb_option('bin_path', '')
847 self.scenario_helper = ScenarioHelper(self.name)
848 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
850 if setup_env_helper_type is None:
851 setup_env_helper_type = SetupEnvHelper
853 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
855 self.scenario_helper)
857 if resource_helper_type is None:
858 resource_helper_type = ClientResourceHelper
860 self.resource_helper = resource_helper_type(self.setup_helper)
862 self.runs_traffic = True
863 self.traffic_finished = False
864 self._tg_process = None
865 self._traffic_process = None
867 def _start_server(self):
868 # we can't share ssh paramiko objects to force new connection
869 self.ssh_helper.drop_connection()
871 def instantiate(self, scenario_cfg, context_cfg):
872 self.scenario_helper.scenario_cfg = scenario_cfg
873 self.resource_helper.setup()
874 # must generate_cfg after DPDK bind because we need port number
875 self.resource_helper.generate_cfg()
877 LOG.info("Starting %s server...", self.APP_NAME)
878 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
879 self._tg_process = Process(name=name, target=self._start_server)
880 self._tg_process.start()
882 def _check_status(self):
883 raise NotImplementedError
885 def _wait_for_process(self):
887 if not self._tg_process.is_alive():
888 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
889 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
891 status = self._check_status()
893 LOG.info("%s TG Server is up and running.", self.APP_NAME)
894 return self._tg_process.exitcode
896 def _traffic_runner(self, traffic_profile):
897 # always drop connections first thing in new processes
898 # so we don't get paramiko errors
899 self.ssh_helper.drop_connection()
900 LOG.info("Starting %s client...", self.APP_NAME)
901 self.resource_helper.run_traffic(traffic_profile)
903 def run_traffic(self, traffic_profile):
904 """ Generate traffic on the wire according to the given params.
905 Method is non-blocking, returns immediately when traffic process
906 is running. Mandatory.
908 :param traffic_profile:
911 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
913 self._traffic_process = Process(name=name, target=self._traffic_runner,
914 args=(traffic_profile,))
915 self._traffic_process.start()
916 # Wait for traffic process to start
917 while self.resource_helper.client_started.value == 0:
918 time.sleep(self.RUN_WAIT)
919 # what if traffic process takes a few seconds to start?
920 if not self._traffic_process.is_alive():
923 return self._traffic_process.is_alive()
925 def collect_kpi(self):
926 # check if the tg processes have exited
927 for proc in (self._tg_process, self._traffic_process):
928 check_if_process_failed(proc)
929 result = self.resource_helper.collect_kpi()
930 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
934 """ After this method finishes, all traffic processes should stop. Mandatory.
938 self.traffic_finished = True
939 # we must kill client before we kill the server, or the client will raise exception
940 if self._traffic_process is not None:
941 # be proper and try to join before terminating
942 LOG.debug("joining before terminate %s", self._traffic_process.name)
943 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
944 self._traffic_process.terminate()
945 if self._tg_process is not None:
946 # be proper and try to join before terminating
947 LOG.debug("joining before terminate %s", self._tg_process.name)
948 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
949 self._tg_process.terminate()
950 # no terminate children here because we share processes with vnf
952 def scale(self, flavor=""):
953 """A traffic generator VFN doesn't provide the 'scale' feature"""
954 raise y_exceptions.FunctionNotImplemented(
955 function_name='scale', class_name='SampleVNFTrafficGen')