1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from __future__ import absolute_import
24 from collections import Mapping
26 from multiprocessing import Queue, Value, Process
28 from six.moves import cStringIO
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
32 from yardstick.network_services.helpers.cpu import CpuSysCores
33 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
34 from yardstick.network_services.nfvi.resource import ResourceProfile
35 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
36 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
37 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
38 from yardstick.network_services.utils import get_nsb_option
40 from trex_stl_lib.trex_stl_client import STLClient
41 from trex_stl_lib.trex_stl_client import LoggerApi
42 from trex_stl_lib.trex_stl_exceptions import STLError
44 from yardstick.ssh import AutoConnectSSH
46 DPDK_VERSION = "dpdk-16.07"
48 LOG = logging.getLogger(__name__)
54 class VnfSshHelper(AutoConnectSSH):
56 def __init__(self, node, bin_path, wait=None):
58 kwargs = self.args_from_node(self.node)
60 kwargs.setdefault('wait', wait)
62 super(VnfSshHelper, self).__init__(**kwargs)
63 self.bin_path = bin_path
67 # must return static class name, anything else refers to the calling class
68 # i.e. the subclass, not the superclass
72 # this copy constructor is different from SSH classes, since it uses node
73 return self.get_class()(self.node, self.bin_path)
75 def upload_config_file(self, prefix, content):
76 cfg_file = os.path.join(REMOTE_TMP, prefix)
78 file_obj = cStringIO(content)
79 self.put_file_obj(file_obj, cfg_file)
82 def join_bin_path(self, *args):
83 return os.path.join(self.bin_path, *args)
85 def provision_tool(self, tool_path=None, tool_file=None):
87 tool_path = self.bin_path
88 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91 class SetupEnvHelper(object):
93 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
96 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
100 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
101 super(SetupEnvHelper, self).__init__()
102 self.vnfd_helper = vnfd_helper
103 self.ssh_helper = ssh_helper
104 self.scenario_helper = scenario_helper
106 def _get_ports_gateway(self, name):
107 routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
108 for route in routing_table:
109 if name == route['if']:
110 return route['gateway']
113 def build_config(self):
114 raise NotImplementedError
116 def setup_vnf_environment(self):
123 raise NotImplementedError
126 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
129 DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
130 DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
131 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
136 DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
139 def _update_packet_type(ip_pipeline_cfg, traffic_options):
140 match_str = 'pkt_type = ipv4'
141 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
142 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
143 return pipeline_config_str
146 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
147 traffic_type = traffic_options['traffic_type']
149 if traffic_options['vnf_type'] is not cls.APP_NAME:
150 match_str = 'traffic_type = 4'
151 replace_str = 'traffic_type = {0}'.format(traffic_type)
153 elif traffic_type == 4:
154 match_str = 'pkt_type = ipv4'
155 replace_str = 'pkt_type = ipv4'
158 match_str = 'pkt_type = ipv4'
159 replace_str = 'pkt_type = ipv6'
161 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
162 return pipeline_config_str
164 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
165 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
166 self.all_ports = None
167 self.bound_pci = None
168 self._dpdk_nic_bind = None
170 self.used_drivers = None
173 def dpdk_nic_bind(self):
174 if self._dpdk_nic_bind is None:
175 self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
176 return self._dpdk_nic_bind
178 def _setup_hugepages(self):
179 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
180 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
183 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
184 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
186 if hugepages == "2048kB":
191 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
193 def _get_dpdk_port_num(self, name):
194 interface = self.vnfd_helper.find_interface(name=name)
195 return interface['virtual-interface']['dpdk_port_num']
197 def build_config(self):
198 vnf_cfg = self.scenario_helper.vnf_cfg
199 task_path = self.scenario_helper.task_path
201 lb_count = vnf_cfg.get('lb_count', 3)
202 lb_config = vnf_cfg.get('lb_config', 'SW')
203 worker_config = vnf_cfg.get('worker_config', '1C/1T')
204 worker_threads = vnf_cfg.get('worker_threads', 3)
206 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
208 'traffic_type': traffic_type,
209 'pkt_type': 'ipv%s' % traffic_type,
210 'vnf_type': self.VNF_TYPE,
213 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
214 config_basename = posixpath.basename(self.CFG_CONFIG)
215 script_basename = posixpath.basename(self.CFG_SCRIPT)
216 multiport = MultiPortConfig(self.scenario_helper.topology,
219 self.vnfd_helper.interfaces,
227 multiport.generate_config()
228 with open(self.CFG_CONFIG) as handle:
229 new_config = handle.read()
231 new_config = self._update_traffic_type(new_config, traffic_options)
232 new_config = self._update_packet_type(new_config, traffic_options)
234 self.ssh_helper.upload_config_file(config_basename, new_config)
235 self.ssh_helper.upload_config_file(script_basename,
236 multiport.generate_script(self.vnfd_helper))
237 self.all_ports = multiport.port_pair_list
239 LOG.info("Provision and start the %s", self.APP_NAME)
240 self._build_pipeline_kwargs()
241 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
243 def _build_pipeline_kwargs(self):
244 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
245 ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
246 self.pipeline_kwargs = {
247 'cfg_file': self.CFG_CONFIG,
248 'script': self.CFG_SCRIPT,
249 'ports_len_hex': ports_len_hex,
250 'tool_path': tool_path,
253 def _get_app_cpu(self):
257 vnf_cfg = self.scenario_helper.vnf_cfg
258 sys_obj = CpuSysCores(self.ssh_helper)
259 self.sys_cpu = sys_obj.get_core_socket()
260 num_core = int(vnf_cfg["worker_threads"])
261 if vnf_cfg.get("lb_config", "SW") == 'HW':
262 num_core += self.HW_DEFAULT_CORE
264 num_core += self.SW_DEFAULT_CORE
265 app_cpu = self.sys_cpu[str(self.socket)][:num_core]
268 def _get_cpu_sibling_list(self, cores=None):
270 cores = self._get_app_cpu()
271 sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
272 awk_template = "awk -F: '{ print $1 }' < %s"
273 sys_path = "/sys/devices/system/cpu/"
277 sys_cmd = sys_cmd_template % (sys_path, core)
278 cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
279 cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
285 def _validate_cpu_cfg(self):
286 return self._get_cpu_sibling_list()
288 def _find_used_drivers(self):
289 cmd = "{0} -s".format(self.dpdk_nic_bind)
290 rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
292 self.used_drivers = {
293 vpci: (index, driver)
294 for index, (vpci, driver)
295 in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
296 if any(b.endswith(vpci) for b in self.bound_pci)
299 def setup_vnf_environment(self):
301 resource = self._setup_resources()
303 self._detect_and_bind_drivers()
307 # have to use exact match
308 self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
310 def _setup_dpdk(self):
311 """ setup dpdk environment needed for vnf to run """
313 self._setup_hugepages()
314 self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
316 exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
320 dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
321 dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
322 exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
324 self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
326 def _setup_resources(self):
327 interfaces = self.vnfd_helper.interfaces
328 self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
330 # what is this magic? how do we know which socket is for which port?
331 # what about quad-socket?
332 if any(v[5] == "0" for v in self.bound_pci):
337 cores = self._validate_cpu_cfg()
338 return ResourceProfile(self.vnfd_helper.mgmt_interface,
339 interfaces=self.vnfd_helper.interfaces, cores=cores)
341 def _detect_and_bind_drivers(self):
342 interfaces = self.vnfd_helper.interfaces
344 self._find_used_drivers()
345 for vpci, (index, _) in self.used_drivers.items():
347 intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
348 except StopIteration:
351 intf1['dpdk_port_num'] = index
353 for vpci in self.bound_pci:
354 self._bind_dpdk('igb_uio', vpci)
357 # debug dump after binding
358 self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind))
360 def rebind_drivers(self, force=True):
361 if not self.used_drivers:
362 self._find_used_drivers()
363 for vpci, (_, driver) in self.used_drivers.items():
364 self._bind_dpdk(driver, vpci, force)
366 def _bind_dpdk(self, driver, vpci, force=True):
371 cmd = self.DPDK_BIND_CMD.format(force=force,
372 dpdk_nic_bind=self.dpdk_nic_bind,
375 self.ssh_helper.execute(cmd)
377 def _detect_and_bind_dpdk(self, vpci, driver):
378 find_net_cmd = self.FIND_NET_CMD.format(vpci)
379 exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
383 self._bind_dpdk(driver, vpci)
384 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
390 def _bind_kernel_devices(self):
391 # only used by PingSetupEnvHelper?
392 for intf in self.vnfd_helper.interfaces:
393 vi = intf["virtual-interface"]
394 stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
395 if stdout is not None:
396 vi["local_iface_name"] = posixpath.basename(stdout)
399 for vpci, (_, driver) in self.used_drivers.items():
400 self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
405 class ResourceHelper(object):
408 MAKE_INSTALL = 'cd {0} && make && sudo make install'
409 RESOURCE_WORD = 'sample'
413 def __init__(self, setup_helper):
414 super(ResourceHelper, self).__init__()
416 self.setup_helper = setup_helper
417 self.ssh_helper = setup_helper.ssh_helper
420 self.resource = self.setup_helper.setup_vnf_environment()
422 def generate_cfg(self):
425 def _collect_resource_kpi(self):
427 status = self.resource.check_if_sa_running("collectd")[0]
429 result = self.resource.amqp_collect_nfvi_kpi()
431 result = {"core": result}
434 def start_collect(self):
435 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
436 self.resource.start()
437 self.resource.amqp_process_for_nfvi_kpi()
439 def stop_collect(self):
443 def collect_kpi(self):
444 return self._collect_resource_kpi()
447 class ClientResourceHelper(ResourceHelper):
454 def __init__(self, setup_helper):
455 super(ClientResourceHelper, self).__init__(setup_helper)
456 self.vnfd_helper = setup_helper.vnfd_helper
457 self.scenario_helper = setup_helper.scenario_helper
460 self.client_started = Value('i', 0)
462 self._queue = Queue()
464 self._terminated = Value('i', 0)
465 self._vpci_ascending = None
467 def _build_ports(self):
468 self.my_ports = [0, 1]
470 def get_stats(self, *args, **kwargs):
472 return self.client.get_stats(*args, **kwargs)
474 LOG.exception("TRex client not connected")
477 def generate_samples(self, key=None, default=None):
478 last_result = self.get_stats(self.my_ports)
479 key_value = last_result.get(key, default)
481 if not isinstance(last_result, Mapping): # added for mock unit test
482 self._terminated.value = 1
486 for vpci_idx, vpci in enumerate(self._vpci_ascending):
487 name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
488 # fixme: VNFDs KPIs values needs to be mapped to TRex structure
489 xe_value = last_result.get(vpci_idx, {})
491 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
492 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
493 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
494 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
495 "in_packets": int(xe_value.get("ipackets", 0)),
496 "out_packets": int(xe_value.get("opackets", 0)),
499 samples[name][key] = key_value
502 def _run_traffic_once(self, traffic_profile):
503 traffic_profile.execute(self)
504 self.client_started.value = 1
505 time.sleep(self.RUN_DURATION)
506 samples = self.generate_samples()
507 time.sleep(self.QUEUE_WAIT_TIME)
508 self._queue.put(samples)
510 def run_traffic(self, traffic_profile):
511 # fixme: fix passing correct trex config file,
512 # instead of searching the default path
515 self.client = self._connect()
516 self.client.reset(ports=self.my_ports)
517 self.client.remove_all_streams(self.my_ports) # remove all streams
518 traffic_profile.register_generator(self)
520 while self._terminated.value == 0:
521 self._run_traffic_once(traffic_profile)
523 self.client.stop(self.my_ports)
524 self.client.disconnect()
525 self._terminated.value = 0
527 if self._terminated.value:
528 LOG.debug("traffic generator is stopped")
529 return # return if trex/tg server is stopped.
533 self._terminated.value = 1 # stop client
535 def clear_stats(self, ports=None):
537 ports = self.my_ports
538 self.client.clear_stats(ports=ports)
540 def start(self, ports=None, *args, **kwargs):
542 ports = self.my_ports
543 self.client.start(ports=ports, *args, **kwargs)
545 def collect_kpi(self):
546 if not self._queue.empty():
547 kpi = self._queue.get()
548 self._result.update(kpi)
549 LOG.debug("Got KPIs from _queue for {0} {1}".format(
550 self.scenario_helper.name, self.RESOURCE_WORD))
553 def _connect(self, client=None):
555 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
556 server=self.vnfd_helper.mgmt_interface["ip"],
557 verbose_level=LoggerApi.VERBOSE_QUIET)
559 # try to connect with 5s intervals, 30s max
565 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
570 class Rfc2544ResourceHelper(object):
572 DEFAULT_CORRELATED_TRAFFIC = False
573 DEFAULT_LATENCY = False
574 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
576 def __init__(self, scenario_helper):
577 super(Rfc2544ResourceHelper, self).__init__()
578 self.scenario_helper = scenario_helper
579 self._correlated_traffic = None
580 self.iteration = Value('i', 0)
583 self._tolerance_low = None
584 self._tolerance_high = None
588 if self._rfc2544 is None:
589 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
593 def tolerance_low(self):
594 if self._tolerance_low is None:
595 self.get_rfc_tolerance()
596 return self._tolerance_low
599 def tolerance_high(self):
600 if self._tolerance_high is None:
601 self.get_rfc_tolerance()
602 return self._tolerance_high
605 def correlated_traffic(self):
606 if self._correlated_traffic is None:
607 self._correlated_traffic = \
608 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
610 return self._correlated_traffic
614 if self._latency is None:
615 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
618 def get_rfc2544(self, name, default=None):
619 return self.rfc2544.get(name, default)
621 def get_rfc_tolerance(self):
622 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
623 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
624 self._tolerance_low = next(tolerance_iter)
625 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
628 class SampleVNFDeployHelper(object):
630 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
631 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
632 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
634 def __init__(self, vnfd_helper, ssh_helper):
635 super(SampleVNFDeployHelper, self).__init__()
636 self.ssh_helper = ssh_helper
637 self.vnfd_helper = vnfd_helper
639 DISABLE_DEPLOY = True
641 def deploy_vnfs(self, app_name):
642 # temp disable for now
643 if self.DISABLE_DEPLOY:
646 vnf_bin = self.ssh_helper.join_bin_path(app_name)
647 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
651 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
652 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
654 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
655 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
657 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
659 http_proxy = os.environ.get('http_proxy', '')
660 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
662 self.ssh_helper.execute(cmd)
663 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
664 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
665 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
668 class ScenarioHelper(object):
673 'worker_config': '1C/1T',
677 def __init__(self, name):
679 self.scenario_cfg = None
683 return self.scenario_cfg['task_path']
687 return self.scenario_cfg.get('nodes')
690 def all_options(self):
691 return self.scenario_cfg.get('options', {})
695 return self.all_options.get(self.name, {})
699 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
703 return self.scenario_cfg['topology']
706 class SampleVNF(GenericVNF):
707 """ Class providing file-like API for generic VNF implementation """
709 VNF_PROMPT = "pipeline>"
712 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
713 super(SampleVNF, self).__init__(name, vnfd)
714 self.bin_path = get_nsb_option('bin_path', '')
716 self.scenario_helper = ScenarioHelper(self.name)
717 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
719 if setup_env_helper_type is None:
720 setup_env_helper_type = SetupEnvHelper
722 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
724 self.scenario_helper)
726 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
728 if resource_helper_type is None:
729 resource_helper_type = ResourceHelper
731 self.resource_helper = resource_helper_type(self.setup_helper)
733 self.all_ports = None
734 self.context_cfg = None
735 self.nfvi_context = None
736 self.pipeline_kwargs = {}
737 self.priv_ports = None
738 self.pub_ports = None
739 # TODO(esm): make QueueFileWrapper invert-able so that we
740 # never have to manage the queues
743 self.queue_wrapper = None
745 self.tg_port_pairs = None
746 self.used_drivers = {}
747 self.vnf_port_pairs = None
748 self._vnf_process = None
750 def _get_route_data(self, route_index, route_type):
751 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
752 for _ in range(route_index):
754 return next(route_iter, {}).get(route_type, '')
756 def _get_port0localip6(self):
757 return_value = self._get_route_data(0, 'network')
758 LOG.info("_get_port0localip6 : %s", return_value)
761 def _get_port1localip6(self):
762 return_value = self._get_route_data(1, 'network')
763 LOG.info("_get_port1localip6 : %s", return_value)
766 def _get_port0prefixlen6(self):
767 return_value = self._get_route_data(0, 'netmask')
768 LOG.info("_get_port0prefixlen6 : %s", return_value)
771 def _get_port1prefixlen6(self):
772 return_value = self._get_route_data(1, 'netmask')
773 LOG.info("_get_port1prefixlen6 : %s", return_value)
776 def _get_port0gateway6(self):
777 return_value = self._get_route_data(0, 'network')
778 LOG.info("_get_port0gateway6 : %s", return_value)
781 def _get_port1gateway6(self):
782 return_value = self._get_route_data(1, 'network')
783 LOG.info("_get_port1gateway6 : %s", return_value)
786 def _start_vnf(self):
787 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
788 self._vnf_process = Process(target=self._run)
789 self._vnf_process.start()
791 def _vnf_up_post(self):
794 def instantiate(self, scenario_cfg, context_cfg):
795 self.scenario_helper.scenario_cfg = scenario_cfg
796 self.context_cfg = context_cfg
797 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
798 # self.nfvi_context = None
800 self.deploy_helper.deploy_vnfs(self.APP_NAME)
801 self.resource_helper.setup()
804 def wait_for_instantiate(self):
806 time.sleep(self.WAIT_TIME) # Give some time for config to load
808 if not self._vnf_process.is_alive():
809 raise RuntimeError("%s VNF process died." % self.APP_NAME)
811 # TODO(esm): move to QueueFileWrapper
812 while self.q_out.qsize() > 0:
813 buf.append(self.q_out.get())
814 message = ''.join(buf)
815 if self.VNF_PROMPT in message:
816 LOG.info("%s VNF is up and running.", self.APP_NAME)
818 self.queue_wrapper.clear()
819 self.resource_helper.start_collect()
820 return self._vnf_process.exitcode
822 if "PANIC" in message:
823 raise RuntimeError("Error starting %s VNF." %
826 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
829 def _build_run_kwargs(self):
831 'stdin': self.queue_wrapper,
832 'stdout': self.queue_wrapper,
833 'keep_stdin_open': True,
837 def _build_config(self):
838 return self.setup_helper.build_config()
841 # we can't share ssh paramiko objects to force new connection
842 self.ssh_helper.drop_connection()
843 cmd = self._build_config()
844 # kill before starting
845 self.setup_helper.kill_vnf()
848 self._build_run_kwargs()
849 self.ssh_helper.run(cmd, **self.run_kwargs)
851 def vnf_execute(self, cmd, wait_time=2):
852 """ send cmd to vnf process """
854 LOG.info("%s command: %s", self.APP_NAME, cmd)
855 self.q_in.put("{}\r\n".format(cmd))
856 time.sleep(wait_time)
858 while self.q_out.qsize() > 0:
859 output.append(self.q_out.get())
860 return "".join(output)
862 def _tear_down(self):
866 self.vnf_execute("quit")
867 if self._vnf_process:
868 self._vnf_process.terminate()
869 self.setup_helper.kill_vnf()
871 self.resource_helper.stop_collect()
873 def get_stats(self, *args, **kwargs):
875 Method for checking the statistics
880 cmd = 'p {0} stats'.format(self.APP_WORD)
881 out = self.vnf_execute(cmd)
884 def collect_kpi(self):
885 stats = self.get_stats()
886 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
888 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
889 result["collect_stats"] = self.resource_helper.collect_kpi()
894 "packets_dropped": 0,
896 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
900 class SampleVNFTrafficGen(GenericTrafficGen):
901 """ Class providing file-like API for generic traffic generator """
906 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
907 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
908 self.bin_path = get_nsb_option('bin_path', '')
909 self.name = "tgen__1" # name in topology file
911 self.scenario_helper = ScenarioHelper(self.name)
912 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
914 if setup_env_helper_type is None:
915 setup_env_helper_type = SetupEnvHelper
917 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
919 self.scenario_helper)
921 if resource_helper_type is None:
922 resource_helper_type = ClientResourceHelper
924 self.resource_helper = resource_helper_type(self.setup_helper)
926 self.runs_traffic = True
927 self.traffic_finished = False
928 self.tg_port_pairs = None
929 self._tg_process = None
930 self._traffic_process = None
932 def _start_server(self):
933 # we can't share ssh paramiko objects to force new connection
934 self.ssh_helper.drop_connection()
936 def instantiate(self, scenario_cfg, context_cfg):
937 self.scenario_helper.scenario_cfg = scenario_cfg
938 self.resource_helper.generate_cfg()
939 self.resource_helper.setup()
941 LOG.info("Starting %s server...", self.APP_NAME)
942 self._tg_process = Process(target=self._start_server)
943 self._tg_process.start()
945 def wait_for_instantiate(self):
946 # overridden by subclasses
947 return self._wait_for_process()
949 def _check_status(self):
950 raise NotImplementedError
952 def _wait_for_process(self):
954 if not self._tg_process.is_alive():
955 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
956 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
958 status = self._check_status()
960 LOG.info("%s TG Server is up and running.", self.APP_NAME)
961 return self._tg_process.exitcode
963 def _traffic_runner(self, traffic_profile):
964 # always drop connections first thing in new processes
965 # so we don't get paramiko errors
966 self.ssh_helper.drop_connection()
967 LOG.info("Starting %s client...", self.APP_NAME)
968 self.resource_helper.run_traffic(traffic_profile)
970 def run_traffic(self, traffic_profile):
971 """ Generate traffic on the wire according to the given params.
972 Method is non-blocking, returns immediately when traffic process
973 is running. Mandatory.
975 :param traffic_profile:
978 self._traffic_process = Process(target=self._traffic_runner,
979 args=(traffic_profile,))
980 self._traffic_process.start()
981 # Wait for traffic process to start
982 while self.resource_helper.client_started.value == 0:
983 time.sleep(self.RUN_WAIT)
984 # what if traffic process takes a few seconds to start?
985 if not self._traffic_process.is_alive():
988 return self._traffic_process.is_alive()
990 def listen_traffic(self, traffic_profile):
991 """ Listen to traffic with the given parameters.
992 Method is non-blocking, returns immediately when traffic process
993 is running. Optional.
995 :param traffic_profile:
1000 def verify_traffic(self, traffic_profile):
1001 """ Verify captured traffic after it has ended. Optional.
1003 :param traffic_profile:
1008 def collect_kpi(self):
1009 result = self.resource_helper.collect_kpi()
1010 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1013 def terminate(self):
1014 """ After this method finishes, all traffic processes should stop. Mandatory.
1018 self.traffic_finished = True
1019 if self._traffic_process is not None:
1020 self._traffic_process.terminate()