1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
22 from six.moves import cStringIO
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
44 DPDK_VERSION = "dpdk-16.07"
46 LOG = logging.getLogger(__name__)
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
54 class VnfSshHelper(AutoConnectSSH):
56 def __init__(self, node, bin_path, wait=None):
58 kwargs = self.args_from_node(self.node)
60 kwargs.setdefault('wait', wait)
62 super(VnfSshHelper, self).__init__(**kwargs)
63 self.bin_path = bin_path
67 # must return static class name, anything else refers to the calling class
68 # i.e. the subclass, not the superclass
72 # this copy constructor is different from SSH classes, since it uses node
73 return self.get_class()(self.node, self.bin_path)
75 def upload_config_file(self, prefix, content):
76 cfg_file = os.path.join(REMOTE_TMP, prefix)
78 file_obj = cStringIO(content)
79 self.put_file_obj(file_obj, cfg_file)
82 def join_bin_path(self, *args):
83 return os.path.join(self.bin_path, *args)
85 def provision_tool(self, tool_path=None, tool_file=None):
87 tool_path = self.bin_path
88 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91 class SetupEnvHelper(object):
93 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100 super(SetupEnvHelper, self).__init__()
101 self.vnfd_helper = vnfd_helper
102 self.ssh_helper = ssh_helper
103 self.scenario_helper = scenario_helper
105 def build_config(self):
106 raise NotImplementedError
108 def setup_vnf_environment(self):
115 raise NotImplementedError
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
121 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
124 def _update_packet_type(ip_pipeline_cfg, traffic_options):
125 match_str = 'pkt_type = ipv4'
126 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128 return pipeline_config_str
131 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132 traffic_type = traffic_options['traffic_type']
134 if traffic_options['vnf_type'] is not cls.APP_NAME:
135 match_str = 'traffic_type = 4'
136 replace_str = 'traffic_type = {0}'.format(traffic_type)
138 elif traffic_type == 4:
139 match_str = 'pkt_type = ipv4'
140 replace_str = 'pkt_type = ipv4'
143 match_str = 'pkt_type = ipv4'
144 replace_str = 'pkt_type = ipv6'
146 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147 return pipeline_config_str
149 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151 self.all_ports = None
152 self.bound_pci = None
154 self.used_drivers = None
155 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
157 def _setup_hugepages(self):
158 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
162 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
165 if hugepages == "2048kB":
170 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
172 def build_config(self):
173 vnf_cfg = self.scenario_helper.vnf_cfg
174 task_path = self.scenario_helper.task_path
176 lb_count = vnf_cfg.get('lb_count', 3)
177 lb_config = vnf_cfg.get('lb_config', 'SW')
178 worker_config = vnf_cfg.get('worker_config', '1C/1T')
179 worker_threads = vnf_cfg.get('worker_threads', 3)
181 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
183 'traffic_type': traffic_type,
184 'pkt_type': 'ipv%s' % traffic_type,
185 'vnf_type': self.VNF_TYPE,
188 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189 config_basename = posixpath.basename(self.CFG_CONFIG)
190 script_basename = posixpath.basename(self.CFG_SCRIPT)
191 multiport = MultiPortConfig(self.scenario_helper.topology,
202 multiport.generate_config()
203 with open(self.CFG_CONFIG) as handle:
204 new_config = handle.read()
206 new_config = self._update_traffic_type(new_config, traffic_options)
207 new_config = self._update_packet_type(new_config, traffic_options)
209 self.ssh_helper.upload_config_file(config_basename, new_config)
210 self.ssh_helper.upload_config_file(script_basename,
211 multiport.generate_script(self.vnfd_helper))
213 LOG.info("Provision and start the %s", self.APP_NAME)
214 self._build_pipeline_kwargs()
215 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
217 def _build_pipeline_kwargs(self):
218 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219 # count the number of actual ports in the list of pairs
220 # remove duplicate ports
221 # this is really a mapping from LINK ID to DPDK PMD ID
222 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224 ports = self.vnfd_helper.port_pairs.all_ports
225 port_nums = self.vnfd_helper.port_nums(ports)
226 # create mask from all the dpdk port numbers
227 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228 self.pipeline_kwargs = {
229 'cfg_file': self.CFG_CONFIG,
230 'script': self.CFG_SCRIPT,
231 'port_mask_hex': ports_mask_hex,
232 'tool_path': tool_path,
235 def setup_vnf_environment(self):
237 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
239 # bind before _setup_resources so we can use dpdk_port_num
240 self._detect_and_bind_drivers()
241 resource = self._setup_resources()
245 # pkill is not matching, debug with pgrep
246 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
247 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248 # have to use exact match
249 # try using killall to match
250 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
252 def _setup_dpdk(self):
253 """Setup DPDK environment needed for VNF to run"""
254 self._setup_hugepages()
255 self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
256 exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
258 raise y_exceptions.DPDKSetupDriverError()
260 def get_collectd_options(self):
261 options = self.scenario_helper.all_options.get("collectd", {})
262 # override with specific node settings
263 options.update(self.scenario_helper.options.get("collectd", {}))
266 def _setup_resources(self):
267 # what is this magic? how do we know which socket is for which port?
268 # what about quad-socket?
269 if any(v[5] == "0" for v in self.bound_pci):
274 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
275 # this won't work because we don't have DPDK port numbers yet
276 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
277 port_names = (intf["name"] for intf in ports)
278 collectd_options = self.get_collectd_options()
279 plugins = collectd_options.get("plugins", {})
280 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
281 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
282 plugins=plugins, interval=collectd_options.get("interval"),
283 timeout=self.scenario_helper.timeout)
285 def _detect_and_bind_drivers(self):
286 interfaces = self.vnfd_helper.interfaces
288 self.dpdk_bind_helper.read_status()
289 self.dpdk_bind_helper.save_used_drivers()
291 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
293 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
294 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
296 intf = next(v for v in interfaces
297 if vpci == v['virtual-interface']['vpci'])
299 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
300 except: # pylint: disable=bare-except
304 def get_local_iface_name_by_vpci(self, vpci):
305 find_net_cmd = self.FIND_NET_CMD.format(vpci)
306 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
312 self.dpdk_bind_helper.rebind_drivers()
315 class ResourceHelper(object):
318 MAKE_INSTALL = 'cd {0} && make && sudo make install'
319 RESOURCE_WORD = 'sample'
323 def __init__(self, setup_helper):
324 super(ResourceHelper, self).__init__()
326 self.setup_helper = setup_helper
327 self.ssh_helper = setup_helper.ssh_helper
330 self.resource = self.setup_helper.setup_vnf_environment()
332 def generate_cfg(self):
335 def _collect_resource_kpi(self):
337 status = self.resource.check_if_system_agent_running("collectd")[0]
339 result = self.resource.amqp_collect_nfvi_kpi()
341 result = {"core": result}
344 def start_collect(self):
345 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
346 self.resource.start()
347 self.resource.amqp_process_for_nfvi_kpi()
349 def stop_collect(self):
353 def collect_kpi(self):
354 return self._collect_resource_kpi()
357 class ClientResourceHelper(ResourceHelper):
364 def __init__(self, setup_helper):
365 super(ClientResourceHelper, self).__init__(setup_helper)
366 self.vnfd_helper = setup_helper.vnfd_helper
367 self.scenario_helper = setup_helper.scenario_helper
370 self.client_started = Value('i', 0)
371 self.all_ports = None
372 self._queue = Queue()
374 self._terminated = Value('i', 0)
376 def _build_ports(self):
377 self.networks = self.vnfd_helper.port_pairs.networks
378 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
379 self.downlink_ports = \
380 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
381 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
383 def port_num(self, intf):
384 # by default return port num
385 return self.vnfd_helper.port_num(intf)
387 def get_stats(self, *args, **kwargs):
389 return self.client.get_stats(*args, **kwargs)
391 LOG.exception("TRex client not connected")
394 def generate_samples(self, ports, key=None, default=None):
395 # needs to be used ports
396 last_result = self.get_stats(ports)
397 key_value = last_result.get(key, default)
399 if not isinstance(last_result, Mapping): # added for mock unit test
400 self._terminated.value = 1
404 # recalculate port for interface and see if it matches ports provided
405 for intf in self.vnfd_helper.interfaces:
407 port = self.vnfd_helper.port_num(name)
409 xe_value = last_result.get(port, {})
411 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
412 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
413 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
414 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
415 "in_packets": int(xe_value.get("ipackets", 0)),
416 "out_packets": int(xe_value.get("opackets", 0)),
419 samples[name][key] = key_value
422 def _run_traffic_once(self, traffic_profile):
423 traffic_profile.execute_traffic(self)
424 self.client_started.value = 1
425 time.sleep(self.RUN_DURATION)
426 samples = self.generate_samples(traffic_profile.ports)
427 time.sleep(self.QUEUE_WAIT_TIME)
428 self._queue.put(samples)
430 def run_traffic(self, traffic_profile):
431 # if we don't do this we can hang waiting for the queue to drain
432 # have to do this in the subprocess
433 self._queue.cancel_join_thread()
434 # fixme: fix passing correct trex config file,
435 # instead of searching the default path
438 self.client = self._connect()
439 self.client.reset(ports=self.all_ports)
440 self.client.remove_all_streams(self.all_ports) # remove all streams
441 traffic_profile.register_generator(self)
443 while self._terminated.value == 0:
444 self._run_traffic_once(traffic_profile)
446 self.client.stop(self.all_ports)
447 self.client.disconnect()
448 self._terminated.value = 0
450 if self._terminated.value:
451 LOG.debug("traffic generator is stopped")
452 return # return if trex/tg server is stopped.
456 self._terminated.value = 1 # stop client
458 def clear_stats(self, ports=None):
460 ports = self.all_ports
461 self.client.clear_stats(ports=ports)
463 def start(self, ports=None, *args, **kwargs):
464 # pylint: disable=keyword-arg-before-vararg
465 # NOTE(ralonsoh): defining keyworded arguments before variable
466 # positional arguments is a bug. This function definition doesn't work
467 # in Python 2, although it works in Python 3. Reference:
468 # https://www.python.org/dev/peps/pep-3102/
470 ports = self.all_ports
471 self.client.start(ports=ports, *args, **kwargs)
473 def collect_kpi(self):
474 if not self._queue.empty():
475 kpi = self._queue.get()
476 self._result.update(kpi)
477 LOG.debug('Got KPIs from _queue for %s %s',
478 self.scenario_helper.name, self.RESOURCE_WORD)
481 def _connect(self, client=None):
483 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
484 server=self.vnfd_helper.mgmt_interface["ip"],
485 verbose_level=LoggerApi.VERBOSE_QUIET)
487 # try to connect with 5s intervals, 30s max
493 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
498 class Rfc2544ResourceHelper(object):
500 DEFAULT_CORRELATED_TRAFFIC = False
501 DEFAULT_LATENCY = False
502 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
504 def __init__(self, scenario_helper):
505 super(Rfc2544ResourceHelper, self).__init__()
506 self.scenario_helper = scenario_helper
507 self._correlated_traffic = None
508 self.iteration = Value('i', 0)
511 self._tolerance_low = None
512 self._tolerance_high = None
516 if self._rfc2544 is None:
517 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
521 def tolerance_low(self):
522 if self._tolerance_low is None:
523 self.get_rfc_tolerance()
524 return self._tolerance_low
527 def tolerance_high(self):
528 if self._tolerance_high is None:
529 self.get_rfc_tolerance()
530 return self._tolerance_high
533 def correlated_traffic(self):
534 if self._correlated_traffic is None:
535 self._correlated_traffic = \
536 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
538 return self._correlated_traffic
542 if self._latency is None:
543 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
546 def get_rfc2544(self, name, default=None):
547 return self.rfc2544.get(name, default)
549 def get_rfc_tolerance(self):
550 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
551 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
552 self._tolerance_low = next(tolerance_iter)
553 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
556 class SampleVNFDeployHelper(object):
558 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
559 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
560 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
562 def __init__(self, vnfd_helper, ssh_helper):
563 super(SampleVNFDeployHelper, self).__init__()
564 self.ssh_helper = ssh_helper
565 self.vnfd_helper = vnfd_helper
567 def deploy_vnfs(self, app_name):
568 vnf_bin = self.ssh_helper.join_bin_path(app_name)
569 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
573 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
574 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
576 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
577 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
579 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
581 http_proxy = os.environ.get('http_proxy', '')
582 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
584 self.ssh_helper.execute(cmd)
585 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
586 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
587 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
590 class ScenarioHelper(object):
595 'worker_config': '1C/1T',
599 def __init__(self, name):
601 self.scenario_cfg = None
605 return self.scenario_cfg['task_path']
609 return self.scenario_cfg.get('nodes')
612 def all_options(self):
613 return self.scenario_cfg.get('options', {})
617 return self.all_options.get(self.name, {})
621 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
625 return self.scenario_cfg['topology']
629 test_duration = self.scenario_cfg.get('runner', {}).get('duration',
630 self.options.get('timeout', DEFAULT_VNF_TIMEOUT))
631 test_timeout = self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
632 return test_duration if test_duration > test_timeout else test_timeout
634 class SampleVNF(GenericVNF):
635 """ Class providing file-like API for generic VNF implementation """
637 VNF_PROMPT = "pipeline>"
639 WAIT_TIME_FOR_SCRIPT = 10
640 APP_NAME = "SampleVNF"
641 # we run the VNF interactively, so the ssh command will timeout after this long
643 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
644 super(SampleVNF, self).__init__(name, vnfd)
645 self.bin_path = get_nsb_option('bin_path', '')
647 self.scenario_helper = ScenarioHelper(self.name)
648 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
650 if setup_env_helper_type is None:
651 setup_env_helper_type = SetupEnvHelper
653 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
655 self.scenario_helper)
657 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
659 if resource_helper_type is None:
660 resource_helper_type = ResourceHelper
662 self.resource_helper = resource_helper_type(self.setup_helper)
664 self.context_cfg = None
665 self.nfvi_context = None
666 self.pipeline_kwargs = {}
667 self.uplink_ports = None
668 self.downlink_ports = None
669 # NOTE(esm): make QueueFileWrapper invert-able so that we
670 # never have to manage the queues
673 self.queue_wrapper = None
675 self.used_drivers = {}
676 self.vnf_port_pairs = None
677 self._vnf_process = None
679 def _build_ports(self):
680 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
681 self.networks = self._port_pairs.networks
682 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
683 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
684 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
686 def _get_route_data(self, route_index, route_type):
687 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
688 for _ in range(route_index):
690 return next(route_iter, {}).get(route_type, '')
692 def _get_port0localip6(self):
693 return_value = self._get_route_data(0, 'network')
694 LOG.info("_get_port0localip6 : %s", return_value)
697 def _get_port1localip6(self):
698 return_value = self._get_route_data(1, 'network')
699 LOG.info("_get_port1localip6 : %s", return_value)
702 def _get_port0prefixlen6(self):
703 return_value = self._get_route_data(0, 'netmask')
704 LOG.info("_get_port0prefixlen6 : %s", return_value)
707 def _get_port1prefixlen6(self):
708 return_value = self._get_route_data(1, 'netmask')
709 LOG.info("_get_port1prefixlen6 : %s", return_value)
712 def _get_port0gateway6(self):
713 return_value = self._get_route_data(0, 'network')
714 LOG.info("_get_port0gateway6 : %s", return_value)
717 def _get_port1gateway6(self):
718 return_value = self._get_route_data(1, 'network')
719 LOG.info("_get_port1gateway6 : %s", return_value)
722 def _start_vnf(self):
723 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
724 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
725 self._vnf_process = Process(name=name, target=self._run)
726 self._vnf_process.start()
728 def _vnf_up_post(self):
731 def instantiate(self, scenario_cfg, context_cfg):
732 self.scenario_helper.scenario_cfg = scenario_cfg
733 self.context_cfg = context_cfg
734 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
735 # self.nfvi_context = None
737 # vnf deploy is unsupported, use ansible playbooks
738 if self.scenario_helper.options.get("vnf_deploy", False):
739 self.deploy_helper.deploy_vnfs(self.APP_NAME)
740 self.resource_helper.setup()
743 def wait_for_instantiate(self):
745 time.sleep(self.WAIT_TIME) # Give some time for config to load
747 if not self._vnf_process.is_alive():
748 raise RuntimeError("%s VNF process died." % self.APP_NAME)
750 # NOTE(esm): move to QueueFileWrapper
751 while self.q_out.qsize() > 0:
752 buf.append(self.q_out.get())
753 message = ''.join(buf)
754 if self.VNF_PROMPT in message:
755 LOG.info("%s VNF is up and running.", self.APP_NAME)
757 self.queue_wrapper.clear()
758 self.resource_helper.start_collect()
759 return self._vnf_process.exitcode
761 if "PANIC" in message:
762 raise RuntimeError("Error starting %s VNF." %
765 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
766 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
767 # Send ENTER to display a new prompt in case the prompt text was corrupted
768 # by other VNF output
769 self.q_in.put('\r\n')
771 def _build_run_kwargs(self):
773 'stdin': self.queue_wrapper,
774 'stdout': self.queue_wrapper,
775 'keep_stdin_open': True,
777 'timeout': self.scenario_helper.timeout,
780 def _build_config(self):
781 return self.setup_helper.build_config()
784 # we can't share ssh paramiko objects to force new connection
785 self.ssh_helper.drop_connection()
786 cmd = self._build_config()
787 # kill before starting
788 self.setup_helper.kill_vnf()
791 self._build_run_kwargs()
792 self.ssh_helper.run(cmd, **self.run_kwargs)
794 def vnf_execute(self, cmd, wait_time=2):
795 """ send cmd to vnf process """
797 LOG.info("%s command: %s", self.APP_NAME, cmd)
798 self.q_in.put("{}\r\n".format(cmd))
799 time.sleep(wait_time)
801 while self.q_out.qsize() > 0:
802 output.append(self.q_out.get())
803 return "".join(output)
805 def _tear_down(self):
809 self.vnf_execute("quit")
810 self.setup_helper.kill_vnf()
812 self.resource_helper.stop_collect()
813 if self._vnf_process is not None:
814 # be proper and join first before we kill
815 LOG.debug("joining before terminate %s", self._vnf_process.name)
816 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
817 self._vnf_process.terminate()
818 # no terminate children here because we share processes with tg
820 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
821 """Method for checking the statistics
823 This method could be overridden in children classes.
825 :return: VNF statistics
827 cmd = 'p {0} stats'.format(self.APP_WORD)
828 out = self.vnf_execute(cmd)
831 def collect_kpi(self):
832 # we can't get KPIs if the VNF is down
833 check_if_process_failed(self._vnf_process)
834 stats = self.get_stats()
835 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
837 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
838 result["collect_stats"] = self.resource_helper.collect_kpi()
843 "packets_dropped": 0,
845 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
848 def scale(self, flavor=""):
849 """The SampleVNF base class doesn't provide the 'scale' feature"""
850 raise y_exceptions.FunctionNotImplemented(
851 function_name='scale', class_name='SampleVNFTrafficGen')
854 class SampleVNFTrafficGen(GenericTrafficGen):
855 """ Class providing file-like API for generic traffic generator """
860 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
861 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
862 self.bin_path = get_nsb_option('bin_path', '')
864 self.scenario_helper = ScenarioHelper(self.name)
865 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
867 if setup_env_helper_type is None:
868 setup_env_helper_type = SetupEnvHelper
870 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
872 self.scenario_helper)
874 if resource_helper_type is None:
875 resource_helper_type = ClientResourceHelper
877 self.resource_helper = resource_helper_type(self.setup_helper)
879 self.runs_traffic = True
880 self.traffic_finished = False
881 self._tg_process = None
882 self._traffic_process = None
884 def _start_server(self):
885 # we can't share ssh paramiko objects to force new connection
886 self.ssh_helper.drop_connection()
888 def instantiate(self, scenario_cfg, context_cfg):
889 self.scenario_helper.scenario_cfg = scenario_cfg
890 self.resource_helper.setup()
891 # must generate_cfg after DPDK bind because we need port number
892 self.resource_helper.generate_cfg()
894 LOG.info("Starting %s server...", self.APP_NAME)
895 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
896 self._tg_process = Process(name=name, target=self._start_server)
897 self._tg_process.start()
899 def _check_status(self):
900 raise NotImplementedError
902 def _wait_for_process(self):
904 if not self._tg_process.is_alive():
905 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
906 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
908 status = self._check_status()
910 LOG.info("%s TG Server is up and running.", self.APP_NAME)
911 return self._tg_process.exitcode
913 def _traffic_runner(self, traffic_profile):
914 # always drop connections first thing in new processes
915 # so we don't get paramiko errors
916 self.ssh_helper.drop_connection()
917 LOG.info("Starting %s client...", self.APP_NAME)
918 self.resource_helper.run_traffic(traffic_profile)
920 def run_traffic(self, traffic_profile):
921 """ Generate traffic on the wire according to the given params.
922 Method is non-blocking, returns immediately when traffic process
923 is running. Mandatory.
925 :param traffic_profile:
928 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
930 self._traffic_process = Process(name=name, target=self._traffic_runner,
931 args=(traffic_profile,))
932 self._traffic_process.start()
933 # Wait for traffic process to start
934 while self.resource_helper.client_started.value == 0:
935 time.sleep(self.RUN_WAIT)
936 # what if traffic process takes a few seconds to start?
937 if not self._traffic_process.is_alive():
940 return self._traffic_process.is_alive()
942 def collect_kpi(self):
943 # check if the tg processes have exited
944 for proc in (self._tg_process, self._traffic_process):
945 check_if_process_failed(proc)
946 result = self.resource_helper.collect_kpi()
947 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
951 """ After this method finishes, all traffic processes should stop. Mandatory.
955 self.traffic_finished = True
956 # we must kill client before we kill the server, or the client will raise exception
957 if self._traffic_process is not None:
958 # be proper and try to join before terminating
959 LOG.debug("joining before terminate %s", self._traffic_process.name)
960 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
961 self._traffic_process.terminate()
962 if self._tg_process is not None:
963 # be proper and try to join before terminating
964 LOG.debug("joining before terminate %s", self._tg_process.name)
965 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
966 self._tg_process.terminate()
967 # no terminate children here because we share processes with vnf
969 def scale(self, flavor=""):
970 """A traffic generator VFN doesn't provide the 'scale' feature"""
971 raise y_exceptions.FunctionNotImplemented(
972 function_name='scale', class_name='SampleVNFTrafficGen')