1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from __future__ import absolute_import
24 from collections import Mapping
26 from multiprocessing import Queue, Value, Process
28 from six.moves import cStringIO
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
34 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
35 from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
38 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.utils import get_nsb_option
42 from trex_stl_lib.trex_stl_client import STLClient
43 from trex_stl_lib.trex_stl_client import LoggerApi
44 from trex_stl_lib.trex_stl_exceptions import STLError
46 from yardstick.ssh import AutoConnectSSH
48 DPDK_VERSION = "dpdk-16.07"
50 LOG = logging.getLogger(__name__)
56 class VnfSshHelper(AutoConnectSSH):
58 def __init__(self, node, bin_path, wait=None):
60 kwargs = self.args_from_node(self.node)
62 kwargs.setdefault('wait', wait)
64 super(VnfSshHelper, self).__init__(**kwargs)
65 self.bin_path = bin_path
69 # must return static class name, anything else refers to the calling class
70 # i.e. the subclass, not the superclass
74 # this copy constructor is different from SSH classes, since it uses node
75 return self.get_class()(self.node, self.bin_path)
77 def upload_config_file(self, prefix, content):
78 cfg_file = os.path.join(REMOTE_TMP, prefix)
80 file_obj = cStringIO(content)
81 self.put_file_obj(file_obj, cfg_file)
84 def join_bin_path(self, *args):
85 return os.path.join(self.bin_path, *args)
87 def provision_tool(self, tool_path=None, tool_file=None):
89 tool_path = self.bin_path
90 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
93 class SetupEnvHelper(object):
95 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
98 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
102 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
103 super(SetupEnvHelper, self).__init__()
104 self.vnfd_helper = vnfd_helper
105 self.ssh_helper = ssh_helper
106 self.scenario_helper = scenario_helper
108 def _get_ports_gateway(self, name):
109 routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
110 for route in routing_table:
111 if name == route['if']:
112 return route['gateway']
115 def build_config(self):
116 raise NotImplementedError
118 def setup_vnf_environment(self):
125 raise NotImplementedError
128 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
131 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
137 def _update_packet_type(ip_pipeline_cfg, traffic_options):
138 match_str = 'pkt_type = ipv4'
139 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
140 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
141 return pipeline_config_str
144 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
145 traffic_type = traffic_options['traffic_type']
147 if traffic_options['vnf_type'] is not cls.APP_NAME:
148 match_str = 'traffic_type = 4'
149 replace_str = 'traffic_type = {0}'.format(traffic_type)
151 elif traffic_type == 4:
152 match_str = 'pkt_type = ipv4'
153 replace_str = 'pkt_type = ipv4'
156 match_str = 'pkt_type = ipv4'
157 replace_str = 'pkt_type = ipv6'
159 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
160 return pipeline_config_str
162 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
163 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
164 self.all_ports = None
165 self.bound_pci = None
167 self.used_drivers = None
168 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
170 def _setup_hugepages(self):
171 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
172 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
175 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
176 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
178 if hugepages == "2048kB":
183 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
185 def build_config(self):
186 vnf_cfg = self.scenario_helper.vnf_cfg
187 task_path = self.scenario_helper.task_path
189 lb_count = vnf_cfg.get('lb_count', 3)
190 lb_config = vnf_cfg.get('lb_config', 'SW')
191 worker_config = vnf_cfg.get('worker_config', '1C/1T')
192 worker_threads = vnf_cfg.get('worker_threads', 3)
194 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
196 'traffic_type': traffic_type,
197 'pkt_type': 'ipv%s' % traffic_type,
198 'vnf_type': self.VNF_TYPE,
201 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
202 config_basename = posixpath.basename(self.CFG_CONFIG)
203 script_basename = posixpath.basename(self.CFG_SCRIPT)
204 multiport = MultiPortConfig(self.scenario_helper.topology,
215 multiport.generate_config()
216 with open(self.CFG_CONFIG) as handle:
217 new_config = handle.read()
219 new_config = self._update_traffic_type(new_config, traffic_options)
220 new_config = self._update_packet_type(new_config, traffic_options)
222 self.ssh_helper.upload_config_file(config_basename, new_config)
223 self.ssh_helper.upload_config_file(script_basename,
224 multiport.generate_script(self.vnfd_helper))
226 LOG.info("Provision and start the %s", self.APP_NAME)
227 self._build_pipeline_kwargs()
228 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
230 def _build_pipeline_kwargs(self):
231 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
232 # count the number of actual ports in the list of pairs
233 # remove duplicate ports
234 # this is really a mapping from LINK ID to DPDK PMD ID
235 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
236 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
237 ports = self.vnfd_helper.port_pairs.all_ports
238 port_nums = self.vnfd_helper.port_nums(ports)
239 # create mask from all the dpdk port numbers
240 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
241 self.pipeline_kwargs = {
242 'cfg_file': self.CFG_CONFIG,
243 'script': self.CFG_SCRIPT,
244 'port_mask_hex': ports_mask_hex,
245 'tool_path': tool_path,
248 def _get_app_cpu(self):
252 vnf_cfg = self.scenario_helper.vnf_cfg
253 sys_obj = CpuSysCores(self.ssh_helper)
254 self.sys_cpu = sys_obj.get_core_socket()
255 num_core = int(vnf_cfg["worker_threads"])
256 if vnf_cfg.get("lb_config", "SW") == 'HW':
257 num_core += self.HW_DEFAULT_CORE
259 num_core += self.SW_DEFAULT_CORE
260 app_cpu = self.sys_cpu[str(self.socket)][:num_core]
263 def _get_cpu_sibling_list(self, cores=None):
265 cores = self._get_app_cpu()
266 sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
267 awk_template = "awk -F: '{ print $1 }' < %s"
268 sys_path = "/sys/devices/system/cpu/"
272 sys_cmd = sys_cmd_template % (sys_path, core)
273 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
274 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
280 def _validate_cpu_cfg(self):
281 return self._get_cpu_sibling_list()
283 def setup_vnf_environment(self):
285 resource = self._setup_resources()
287 self._detect_and_bind_drivers()
291 # have to use exact match
292 self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
294 def _setup_dpdk(self):
295 """ setup dpdk environment needed for vnf to run """
297 self._setup_hugepages()
298 self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
300 exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
304 dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
305 dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
306 exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
308 self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
310 def _setup_resources(self):
311 interfaces = self.vnfd_helper.interfaces
312 self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
314 # what is this magic? how do we know which socket is for which port?
315 # what about quad-socket?
316 if any(v[5] == "0" for v in self.bound_pci):
321 cores = self._validate_cpu_cfg()
322 return ResourceProfile(self.vnfd_helper.mgmt_interface,
323 interfaces=self.vnfd_helper.interfaces, cores=cores)
325 def _detect_and_bind_drivers(self):
326 interfaces = self.vnfd_helper.interfaces
328 self.dpdk_bind_helper.read_status()
329 self.dpdk_bind_helper.save_used_drivers()
331 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
333 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
334 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
336 intf = next(v for v in interfaces
337 if vpci == v['virtual-interface']['vpci'])
339 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
344 def get_local_iface_name_by_vpci(self, vpci):
345 find_net_cmd = self.FIND_NET_CMD.format(vpci)
346 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
352 self.dpdk_bind_helper.rebind_drivers()
355 class ResourceHelper(object):
358 MAKE_INSTALL = 'cd {0} && make && sudo make install'
359 RESOURCE_WORD = 'sample'
363 def __init__(self, setup_helper):
364 super(ResourceHelper, self).__init__()
366 self.setup_helper = setup_helper
367 self.ssh_helper = setup_helper.ssh_helper
370 self.resource = self.setup_helper.setup_vnf_environment()
372 def generate_cfg(self):
375 def _collect_resource_kpi(self):
377 status = self.resource.check_if_sa_running("collectd")[0]
379 result = self.resource.amqp_collect_nfvi_kpi()
381 result = {"core": result}
384 def start_collect(self):
385 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
386 self.resource.start()
387 self.resource.amqp_process_for_nfvi_kpi()
389 def stop_collect(self):
393 def collect_kpi(self):
394 return self._collect_resource_kpi()
397 class ClientResourceHelper(ResourceHelper):
404 def __init__(self, setup_helper):
405 super(ClientResourceHelper, self).__init__(setup_helper)
406 self.vnfd_helper = setup_helper.vnfd_helper
407 self.scenario_helper = setup_helper.scenario_helper
410 self.client_started = Value('i', 0)
411 self.all_ports = None
412 self._queue = Queue()
414 self._terminated = Value('i', 0)
415 self._vpci_ascending = None
417 def _build_ports(self):
418 self.networks = self.vnfd_helper.port_pairs.networks
419 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
420 self.downlink_ports = \
421 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
422 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
424 def get_stats(self, *args, **kwargs):
426 return self.client.get_stats(*args, **kwargs)
428 LOG.exception("TRex client not connected")
431 def generate_samples(self, ports, key=None, default=None):
432 # needs to be used ports
433 last_result = self.get_stats(ports)
434 key_value = last_result.get(key, default)
436 if not isinstance(last_result, Mapping): # added for mock unit test
437 self._terminated.value = 1
441 # recalculate port for interface and see if it matches ports provided
442 for intf in self.vnfd_helper.interfaces:
444 port = self.vnfd_helper.port_num(name)
446 xe_value = last_result.get(port, {})
448 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
449 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
450 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
451 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
452 "in_packets": int(xe_value.get("ipackets", 0)),
453 "out_packets": int(xe_value.get("opackets", 0)),
456 samples[name][key] = key_value
459 def _run_traffic_once(self, traffic_profile):
460 traffic_profile.execute_traffic(self)
461 self.client_started.value = 1
462 time.sleep(self.RUN_DURATION)
463 samples = self.generate_samples(traffic_profile.ports)
464 time.sleep(self.QUEUE_WAIT_TIME)
465 self._queue.put(samples)
467 def run_traffic(self, traffic_profile):
468 # fixme: fix passing correct trex config file,
469 # instead of searching the default path
472 self.client = self._connect()
473 self.client.reset(ports=self.all_ports)
474 self.client.remove_all_streams(self.all_ports) # remove all streams
475 traffic_profile.register_generator(self)
477 while self._terminated.value == 0:
478 self._run_traffic_once(traffic_profile)
480 self.client.stop(self.all_ports)
481 self.client.disconnect()
482 self._terminated.value = 0
484 if self._terminated.value:
485 LOG.debug("traffic generator is stopped")
486 return # return if trex/tg server is stopped.
490 self._terminated.value = 1 # stop client
492 def clear_stats(self, ports=None):
494 ports = self.all_ports
495 self.client.clear_stats(ports=ports)
497 def start(self, ports=None, *args, **kwargs):
499 ports = self.all_ports
500 self.client.start(ports=ports, *args, **kwargs)
502 def collect_kpi(self):
503 if not self._queue.empty():
504 kpi = self._queue.get()
505 self._result.update(kpi)
506 LOG.debug("Got KPIs from _queue for {0} {1}".format(
507 self.scenario_helper.name, self.RESOURCE_WORD))
510 def _connect(self, client=None):
512 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
513 server=self.vnfd_helper.mgmt_interface["ip"],
514 verbose_level=LoggerApi.VERBOSE_QUIET)
516 # try to connect with 5s intervals, 30s max
522 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
527 class Rfc2544ResourceHelper(object):
529 DEFAULT_CORRELATED_TRAFFIC = False
530 DEFAULT_LATENCY = False
531 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
533 def __init__(self, scenario_helper):
534 super(Rfc2544ResourceHelper, self).__init__()
535 self.scenario_helper = scenario_helper
536 self._correlated_traffic = None
537 self.iteration = Value('i', 0)
540 self._tolerance_low = None
541 self._tolerance_high = None
545 if self._rfc2544 is None:
546 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
550 def tolerance_low(self):
551 if self._tolerance_low is None:
552 self.get_rfc_tolerance()
553 return self._tolerance_low
556 def tolerance_high(self):
557 if self._tolerance_high is None:
558 self.get_rfc_tolerance()
559 return self._tolerance_high
562 def correlated_traffic(self):
563 if self._correlated_traffic is None:
564 self._correlated_traffic = \
565 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
567 return self._correlated_traffic
571 if self._latency is None:
572 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
575 def get_rfc2544(self, name, default=None):
576 return self.rfc2544.get(name, default)
578 def get_rfc_tolerance(self):
579 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
580 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
581 self._tolerance_low = next(tolerance_iter)
582 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
585 class SampleVNFDeployHelper(object):
587 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
588 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
589 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
591 def __init__(self, vnfd_helper, ssh_helper):
592 super(SampleVNFDeployHelper, self).__init__()
593 self.ssh_helper = ssh_helper
594 self.vnfd_helper = vnfd_helper
596 DISABLE_DEPLOY = True
598 def deploy_vnfs(self, app_name):
599 # temp disable for now
600 if self.DISABLE_DEPLOY:
603 vnf_bin = self.ssh_helper.join_bin_path(app_name)
604 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
608 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
609 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
611 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
612 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
614 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
616 http_proxy = os.environ.get('http_proxy', '')
617 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
619 self.ssh_helper.execute(cmd)
620 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
621 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
622 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
625 class ScenarioHelper(object):
630 'worker_config': '1C/1T',
634 def __init__(self, name):
636 self.scenario_cfg = None
640 return self.scenario_cfg['task_path']
644 return self.scenario_cfg.get('nodes')
647 def all_options(self):
648 return self.scenario_cfg.get('options', {})
652 return self.all_options.get(self.name, {})
656 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
660 return self.scenario_cfg['topology']
663 class SampleVNF(GenericVNF):
664 """ Class providing file-like API for generic VNF implementation """
666 VNF_PROMPT = "pipeline>"
669 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
670 super(SampleVNF, self).__init__(name, vnfd)
671 self.bin_path = get_nsb_option('bin_path', '')
673 self.scenario_helper = ScenarioHelper(self.name)
674 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
676 if setup_env_helper_type is None:
677 setup_env_helper_type = SetupEnvHelper
679 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
681 self.scenario_helper)
683 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
685 if resource_helper_type is None:
686 resource_helper_type = ResourceHelper
688 self.resource_helper = resource_helper_type(self.setup_helper)
690 self.context_cfg = None
691 self.nfvi_context = None
692 self.pipeline_kwargs = {}
693 self.uplink_ports = None
694 self.downlink_ports = None
695 # TODO(esm): make QueueFileWrapper invert-able so that we
696 # never have to manage the queues
699 self.queue_wrapper = None
701 self.used_drivers = {}
702 self.vnf_port_pairs = None
703 self._vnf_process = None
705 def _build_ports(self):
706 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
707 self.networks = self._port_pairs.networks
708 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
709 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
710 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
712 def _get_route_data(self, route_index, route_type):
713 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
714 for _ in range(route_index):
716 return next(route_iter, {}).get(route_type, '')
718 def _get_port0localip6(self):
719 return_value = self._get_route_data(0, 'network')
720 LOG.info("_get_port0localip6 : %s", return_value)
723 def _get_port1localip6(self):
724 return_value = self._get_route_data(1, 'network')
725 LOG.info("_get_port1localip6 : %s", return_value)
728 def _get_port0prefixlen6(self):
729 return_value = self._get_route_data(0, 'netmask')
730 LOG.info("_get_port0prefixlen6 : %s", return_value)
733 def _get_port1prefixlen6(self):
734 return_value = self._get_route_data(1, 'netmask')
735 LOG.info("_get_port1prefixlen6 : %s", return_value)
738 def _get_port0gateway6(self):
739 return_value = self._get_route_data(0, 'network')
740 LOG.info("_get_port0gateway6 : %s", return_value)
743 def _get_port1gateway6(self):
744 return_value = self._get_route_data(1, 'network')
745 LOG.info("_get_port1gateway6 : %s", return_value)
748 def _start_vnf(self):
749 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
750 self._vnf_process = Process(target=self._run)
751 self._vnf_process.start()
753 def _vnf_up_post(self):
756 def instantiate(self, scenario_cfg, context_cfg):
757 self.scenario_helper.scenario_cfg = scenario_cfg
758 self.context_cfg = context_cfg
759 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
760 # self.nfvi_context = None
762 self.deploy_helper.deploy_vnfs(self.APP_NAME)
763 self.resource_helper.setup()
766 def wait_for_instantiate(self):
768 time.sleep(self.WAIT_TIME) # Give some time for config to load
770 if not self._vnf_process.is_alive():
771 raise RuntimeError("%s VNF process died." % self.APP_NAME)
773 # TODO(esm): move to QueueFileWrapper
774 while self.q_out.qsize() > 0:
775 buf.append(self.q_out.get())
776 message = ''.join(buf)
777 if self.VNF_PROMPT in message:
778 LOG.info("%s VNF is up and running.", self.APP_NAME)
780 self.queue_wrapper.clear()
781 self.resource_helper.start_collect()
782 return self._vnf_process.exitcode
784 if "PANIC" in message:
785 raise RuntimeError("Error starting %s VNF." %
788 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
790 # Send ENTER to display a new prompt in case the prompt text was corrupted
791 # by other VNF output
792 self.q_in.put('\r\n')
794 def _build_run_kwargs(self):
796 'stdin': self.queue_wrapper,
797 'stdout': self.queue_wrapper,
798 'keep_stdin_open': True,
802 def _build_config(self):
803 return self.setup_helper.build_config()
806 # we can't share ssh paramiko objects to force new connection
807 self.ssh_helper.drop_connection()
808 cmd = self._build_config()
809 # kill before starting
810 self.setup_helper.kill_vnf()
813 self._build_run_kwargs()
814 self.ssh_helper.run(cmd, **self.run_kwargs)
816 def vnf_execute(self, cmd, wait_time=2):
817 """ send cmd to vnf process """
819 LOG.info("%s command: %s", self.APP_NAME, cmd)
820 self.q_in.put("{}\r\n".format(cmd))
821 time.sleep(wait_time)
823 while self.q_out.qsize() > 0:
824 output.append(self.q_out.get())
825 return "".join(output)
827 def _tear_down(self):
831 self.vnf_execute("quit")
832 if self._vnf_process:
833 self._vnf_process.terminate()
834 self.setup_helper.kill_vnf()
836 self.resource_helper.stop_collect()
838 def get_stats(self, *args, **kwargs):
840 Method for checking the statistics
845 cmd = 'p {0} stats'.format(self.APP_WORD)
846 out = self.vnf_execute(cmd)
849 def collect_kpi(self):
850 stats = self.get_stats()
851 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
853 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
854 result["collect_stats"] = self.resource_helper.collect_kpi()
859 "packets_dropped": 0,
861 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
865 class SampleVNFTrafficGen(GenericTrafficGen):
866 """ Class providing file-like API for generic traffic generator """
871 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
872 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
873 self.bin_path = get_nsb_option('bin_path', '')
874 self.name = "tgen__1" # name in topology file
876 self.scenario_helper = ScenarioHelper(self.name)
877 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
879 if setup_env_helper_type is None:
880 setup_env_helper_type = SetupEnvHelper
882 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
884 self.scenario_helper)
886 if resource_helper_type is None:
887 resource_helper_type = ClientResourceHelper
889 self.resource_helper = resource_helper_type(self.setup_helper)
891 self.runs_traffic = True
892 self.traffic_finished = False
893 self._tg_process = None
894 self._traffic_process = None
896 def _start_server(self):
897 # we can't share ssh paramiko objects to force new connection
898 self.ssh_helper.drop_connection()
900 def instantiate(self, scenario_cfg, context_cfg):
901 self.scenario_helper.scenario_cfg = scenario_cfg
902 self.resource_helper.generate_cfg()
903 self.resource_helper.setup()
905 LOG.info("Starting %s server...", self.APP_NAME)
906 self._tg_process = Process(target=self._start_server)
907 self._tg_process.start()
909 def wait_for_instantiate(self):
910 # overridden by subclasses
911 return self._wait_for_process()
913 def _check_status(self):
914 raise NotImplementedError
916 def _wait_for_process(self):
918 if not self._tg_process.is_alive():
919 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
920 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
922 status = self._check_status()
924 LOG.info("%s TG Server is up and running.", self.APP_NAME)
925 return self._tg_process.exitcode
927 def _traffic_runner(self, traffic_profile):
928 # always drop connections first thing in new processes
929 # so we don't get paramiko errors
930 self.ssh_helper.drop_connection()
931 LOG.info("Starting %s client...", self.APP_NAME)
932 self.resource_helper.run_traffic(traffic_profile)
934 def run_traffic(self, traffic_profile):
935 """ Generate traffic on the wire according to the given params.
936 Method is non-blocking, returns immediately when traffic process
937 is running. Mandatory.
939 :param traffic_profile:
942 self._traffic_process = Process(target=self._traffic_runner,
943 args=(traffic_profile,))
944 self._traffic_process.start()
945 # Wait for traffic process to start
946 while self.resource_helper.client_started.value == 0:
947 time.sleep(self.RUN_WAIT)
948 # what if traffic process takes a few seconds to start?
949 if not self._traffic_process.is_alive():
952 return self._traffic_process.is_alive()
954 def listen_traffic(self, traffic_profile):
955 """ Listen to traffic with the given parameters.
956 Method is non-blocking, returns immediately when traffic process
957 is running. Optional.
959 :param traffic_profile:
964 def verify_traffic(self, traffic_profile):
965 """ Verify captured traffic after it has ended. Optional.
967 :param traffic_profile:
972 def collect_kpi(self):
973 result = self.resource_helper.collect_kpi()
974 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
978 """ After this method finishes, all traffic processes should stop. Mandatory.
982 self.traffic_finished = True
983 if self._traffic_process is not None:
984 self._traffic_process.terminate()