1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
22 from six.moves import cStringIO
26 from trex_stl_lib.trex_stl_client import LoggerApi
27 from trex_stl_lib.trex_stl_client import STLClient
28 from trex_stl_lib.trex_stl_exceptions import STLError
29 from yardstick.benchmark.contexts.base import Context
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
34 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
39 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.ssh import AutoConnectSSH
44 DPDK_VERSION = "dpdk-16.07"
46 LOG = logging.getLogger(__name__)
50 DEFAULT_VNF_TIMEOUT = 3600
51 PROCESS_JOIN_TIMEOUT = 3
54 class VnfSshHelper(AutoConnectSSH):
56 def __init__(self, node, bin_path, wait=None):
58 kwargs = self.args_from_node(self.node)
60 kwargs.setdefault('wait', wait)
62 super(VnfSshHelper, self).__init__(**kwargs)
63 self.bin_path = bin_path
67 # must return static class name, anything else refers to the calling class
68 # i.e. the subclass, not the superclass
72 # this copy constructor is different from SSH classes, since it uses node
73 return self.get_class()(self.node, self.bin_path)
75 def upload_config_file(self, prefix, content):
76 cfg_file = os.path.join(REMOTE_TMP, prefix)
78 file_obj = cStringIO(content)
79 self.put_file_obj(file_obj, cfg_file)
82 def join_bin_path(self, *args):
83 return os.path.join(self.bin_path, *args)
85 def provision_tool(self, tool_path=None, tool_file=None):
87 tool_path = self.bin_path
88 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
91 class SetupEnvHelper(object):
93 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
94 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
95 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
99 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
100 super(SetupEnvHelper, self).__init__()
101 self.vnfd_helper = vnfd_helper
102 self.ssh_helper = ssh_helper
103 self.scenario_helper = scenario_helper
105 def build_config(self):
106 raise NotImplementedError
108 def setup_vnf_environment(self):
115 raise NotImplementedError
118 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
121 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
124 def _update_packet_type(ip_pipeline_cfg, traffic_options):
125 match_str = 'pkt_type = ipv4'
126 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
127 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
128 return pipeline_config_str
131 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
132 traffic_type = traffic_options['traffic_type']
134 if traffic_options['vnf_type'] is not cls.APP_NAME:
135 match_str = 'traffic_type = 4'
136 replace_str = 'traffic_type = {0}'.format(traffic_type)
138 elif traffic_type == 4:
139 match_str = 'pkt_type = ipv4'
140 replace_str = 'pkt_type = ipv4'
143 match_str = 'pkt_type = ipv4'
144 replace_str = 'pkt_type = ipv6'
146 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
147 return pipeline_config_str
149 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
150 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
151 self.all_ports = None
152 self.bound_pci = None
154 self.used_drivers = None
155 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
157 def _setup_hugepages(self):
158 cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
159 hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
162 '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
163 self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
165 if hugepages == "2048kB":
170 self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
172 def build_config(self):
173 vnf_cfg = self.scenario_helper.vnf_cfg
174 task_path = self.scenario_helper.task_path
176 lb_count = vnf_cfg.get('lb_count', 3)
177 lb_config = vnf_cfg.get('lb_config', 'SW')
178 worker_config = vnf_cfg.get('worker_config', '1C/1T')
179 worker_threads = vnf_cfg.get('worker_threads', 3)
181 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
183 'traffic_type': traffic_type,
184 'pkt_type': 'ipv%s' % traffic_type,
185 'vnf_type': self.VNF_TYPE,
188 config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
189 config_basename = posixpath.basename(self.CFG_CONFIG)
190 script_basename = posixpath.basename(self.CFG_SCRIPT)
191 multiport = MultiPortConfig(self.scenario_helper.topology,
202 multiport.generate_config()
203 with open(self.CFG_CONFIG) as handle:
204 new_config = handle.read()
206 new_config = self._update_traffic_type(new_config, traffic_options)
207 new_config = self._update_packet_type(new_config, traffic_options)
209 self.ssh_helper.upload_config_file(config_basename, new_config)
210 self.ssh_helper.upload_config_file(script_basename,
211 multiport.generate_script(self.vnfd_helper))
213 LOG.info("Provision and start the %s", self.APP_NAME)
214 self._build_pipeline_kwargs()
215 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
217 def _build_pipeline_kwargs(self):
218 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
219 # count the number of actual ports in the list of pairs
220 # remove duplicate ports
221 # this is really a mapping from LINK ID to DPDK PMD ID
222 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
223 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
224 ports = self.vnfd_helper.port_pairs.all_ports
225 port_nums = self.vnfd_helper.port_nums(ports)
226 # create mask from all the dpdk port numbers
227 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
228 self.pipeline_kwargs = {
229 'cfg_file': self.CFG_CONFIG,
230 'script': self.CFG_SCRIPT,
231 'port_mask_hex': ports_mask_hex,
232 'tool_path': tool_path,
235 def setup_vnf_environment(self):
237 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
239 # bind before _setup_resources so we can use dpdk_port_num
240 self._detect_and_bind_drivers()
241 resource = self._setup_resources()
245 # pkill is not matching, debug with pgrep
246 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
247 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
248 # have to use exact match
249 # try using killall to match
250 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
252 def _setup_dpdk(self):
253 """Setup DPDK environment needed for VNF to run"""
254 self._setup_hugepages()
255 self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
256 exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
258 raise y_exceptions.DPDKSetupDriverError()
260 def get_collectd_options(self):
261 options = self.scenario_helper.all_options.get("collectd", {})
262 # override with specific node settings
263 options.update(self.scenario_helper.options.get("collectd", {}))
266 def _setup_resources(self):
267 # what is this magic? how do we know which socket is for which port?
268 # what about quad-socket?
269 if any(v[5] == "0" for v in self.bound_pci):
274 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
275 # this won't work because we don't have DPDK port numbers yet
276 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
277 port_names = (intf["name"] for intf in ports)
278 collectd_options = self.get_collectd_options()
279 plugins = collectd_options.get("plugins", {})
280 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
281 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
282 plugins=plugins, interval=collectd_options.get("interval"),
283 timeout=self.scenario_helper.timeout)
285 def _detect_and_bind_drivers(self):
286 interfaces = self.vnfd_helper.interfaces
288 self.dpdk_bind_helper.read_status()
289 self.dpdk_bind_helper.save_used_drivers()
291 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
293 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
294 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
296 intf = next(v for v in interfaces
297 if vpci == v['virtual-interface']['vpci'])
299 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
300 except: # pylint: disable=bare-except
304 def get_local_iface_name_by_vpci(self, vpci):
305 find_net_cmd = self.FIND_NET_CMD.format(vpci)
306 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
312 self.dpdk_bind_helper.rebind_drivers()
315 class ResourceHelper(object):
318 MAKE_INSTALL = 'cd {0} && make && sudo make install'
319 RESOURCE_WORD = 'sample'
323 def __init__(self, setup_helper):
324 super(ResourceHelper, self).__init__()
326 self.setup_helper = setup_helper
327 self.ssh_helper = setup_helper.ssh_helper
330 self.resource = self.setup_helper.setup_vnf_environment()
332 def generate_cfg(self):
335 def _collect_resource_kpi(self):
337 status = self.resource.check_if_system_agent_running("collectd")[0]
339 result = self.resource.amqp_collect_nfvi_kpi()
341 result = {"core": result}
344 def start_collect(self):
345 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
346 self.resource.start()
347 self.resource.amqp_process_for_nfvi_kpi()
349 def stop_collect(self):
353 def collect_kpi(self):
354 return self._collect_resource_kpi()
357 class ClientResourceHelper(ResourceHelper):
364 def __init__(self, setup_helper):
365 super(ClientResourceHelper, self).__init__(setup_helper)
366 self.vnfd_helper = setup_helper.vnfd_helper
367 self.scenario_helper = setup_helper.scenario_helper
370 self.client_started = Value('i', 0)
371 self.all_ports = None
372 self._queue = Queue()
374 self._terminated = Value('i', 0)
376 def _build_ports(self):
377 self.networks = self.vnfd_helper.port_pairs.networks
378 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
379 self.downlink_ports = \
380 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
381 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
383 def port_num(self, intf):
384 # by default return port num
385 return self.vnfd_helper.port_num(intf)
387 def get_stats(self, *args, **kwargs):
389 return self.client.get_stats(*args, **kwargs)
391 LOG.exception("TRex client not connected")
394 def generate_samples(self, ports, key=None, default=None):
395 # needs to be used ports
396 last_result = self.get_stats(ports)
397 key_value = last_result.get(key, default)
399 if not isinstance(last_result, Mapping): # added for mock unit test
400 self._terminated.value = 1
404 # recalculate port for interface and see if it matches ports provided
405 for intf in self.vnfd_helper.interfaces:
407 port = self.vnfd_helper.port_num(name)
409 xe_value = last_result.get(port, {})
411 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
412 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
413 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
414 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
415 "in_packets": int(xe_value.get("ipackets", 0)),
416 "out_packets": int(xe_value.get("opackets", 0)),
419 samples[name][key] = key_value
422 def _run_traffic_once(self, traffic_profile):
423 traffic_profile.execute_traffic(self)
424 self.client_started.value = 1
425 time.sleep(self.RUN_DURATION)
426 samples = self.generate_samples(traffic_profile.ports)
427 time.sleep(self.QUEUE_WAIT_TIME)
428 self._queue.put(samples)
430 def run_traffic(self, traffic_profile):
431 # if we don't do this we can hang waiting for the queue to drain
432 # have to do this in the subprocess
433 self._queue.cancel_join_thread()
434 # fixme: fix passing correct trex config file,
435 # instead of searching the default path
438 self.client = self._connect()
439 self.client.reset(ports=self.all_ports)
440 self.client.remove_all_streams(self.all_ports) # remove all streams
441 traffic_profile.register_generator(self)
443 while self._terminated.value == 0:
444 self._run_traffic_once(traffic_profile)
446 self.client.stop(self.all_ports)
447 self.client.disconnect()
448 self._terminated.value = 0
450 if self._terminated.value:
451 LOG.debug("traffic generator is stopped")
452 return # return if trex/tg server is stopped.
456 self._terminated.value = 1 # stop client
458 def clear_stats(self, ports=None):
460 ports = self.all_ports
461 self.client.clear_stats(ports=ports)
463 def start(self, ports=None, *args, **kwargs):
464 # pylint: disable=keyword-arg-before-vararg
465 # NOTE(ralonsoh): defining keyworded arguments before variable
466 # positional arguments is a bug. This function definition doesn't work
467 # in Python 2, although it works in Python 3. Reference:
468 # https://www.python.org/dev/peps/pep-3102/
470 ports = self.all_ports
471 self.client.start(ports=ports, *args, **kwargs)
473 def collect_kpi(self):
474 if not self._queue.empty():
475 kpi = self._queue.get()
476 self._result.update(kpi)
477 LOG.debug('Got KPIs from _queue for %s %s',
478 self.scenario_helper.name, self.RESOURCE_WORD)
481 def _connect(self, client=None):
483 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
484 server=self.vnfd_helper.mgmt_interface["ip"],
485 verbose_level=LoggerApi.VERBOSE_QUIET)
487 # try to connect with 5s intervals, 30s max
493 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
498 class Rfc2544ResourceHelper(object):
500 DEFAULT_CORRELATED_TRAFFIC = False
501 DEFAULT_LATENCY = False
502 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
504 def __init__(self, scenario_helper):
505 super(Rfc2544ResourceHelper, self).__init__()
506 self.scenario_helper = scenario_helper
507 self._correlated_traffic = None
508 self.iteration = Value('i', 0)
511 self._tolerance_low = None
512 self._tolerance_high = None
516 if self._rfc2544 is None:
517 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
521 def tolerance_low(self):
522 if self._tolerance_low is None:
523 self.get_rfc_tolerance()
524 return self._tolerance_low
527 def tolerance_high(self):
528 if self._tolerance_high is None:
529 self.get_rfc_tolerance()
530 return self._tolerance_high
533 def correlated_traffic(self):
534 if self._correlated_traffic is None:
535 self._correlated_traffic = \
536 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
538 return self._correlated_traffic
542 if self._latency is None:
543 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
546 def get_rfc2544(self, name, default=None):
547 return self.rfc2544.get(name, default)
549 def get_rfc_tolerance(self):
550 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
551 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
552 self._tolerance_low = next(tolerance_iter)
553 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
556 class SampleVNFDeployHelper(object):
558 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
559 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
560 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
562 def __init__(self, vnfd_helper, ssh_helper):
563 super(SampleVNFDeployHelper, self).__init__()
564 self.ssh_helper = ssh_helper
565 self.vnfd_helper = vnfd_helper
567 def deploy_vnfs(self, app_name):
568 vnf_bin = self.ssh_helper.join_bin_path(app_name)
569 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
573 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
574 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
576 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
577 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
579 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
581 http_proxy = os.environ.get('http_proxy', '')
582 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
584 self.ssh_helper.execute(cmd)
585 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
586 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
587 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
590 class ScenarioHelper(object):
595 'worker_config': '1C/1T',
599 def __init__(self, name):
601 self.scenario_cfg = None
605 return self.scenario_cfg['task_path']
609 return self.scenario_cfg.get('nodes')
612 def all_options(self):
613 return self.scenario_cfg.get('options', {})
617 return self.all_options.get(self.name, {})
621 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
625 return self.scenario_cfg['topology']
629 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
632 class SampleVNF(GenericVNF):
633 """ Class providing file-like API for generic VNF implementation """
635 VNF_PROMPT = "pipeline>"
637 WAIT_TIME_FOR_SCRIPT = 10
638 APP_NAME = "SampleVNF"
639 # we run the VNF interactively, so the ssh command will timeout after this long
641 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
642 super(SampleVNF, self).__init__(name, vnfd)
643 self.bin_path = get_nsb_option('bin_path', '')
645 self.scenario_helper = ScenarioHelper(self.name)
646 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
648 if setup_env_helper_type is None:
649 setup_env_helper_type = SetupEnvHelper
651 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
653 self.scenario_helper)
655 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
657 if resource_helper_type is None:
658 resource_helper_type = ResourceHelper
660 self.resource_helper = resource_helper_type(self.setup_helper)
662 self.context_cfg = None
663 self.nfvi_context = None
664 self.pipeline_kwargs = {}
665 self.uplink_ports = None
666 self.downlink_ports = None
667 # NOTE(esm): make QueueFileWrapper invert-able so that we
668 # never have to manage the queues
671 self.queue_wrapper = None
673 self.used_drivers = {}
674 self.vnf_port_pairs = None
675 self._vnf_process = None
677 def _build_ports(self):
678 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
679 self.networks = self._port_pairs.networks
680 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
681 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
682 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
684 def _get_route_data(self, route_index, route_type):
685 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
686 for _ in range(route_index):
688 return next(route_iter, {}).get(route_type, '')
690 def _get_port0localip6(self):
691 return_value = self._get_route_data(0, 'network')
692 LOG.info("_get_port0localip6 : %s", return_value)
695 def _get_port1localip6(self):
696 return_value = self._get_route_data(1, 'network')
697 LOG.info("_get_port1localip6 : %s", return_value)
700 def _get_port0prefixlen6(self):
701 return_value = self._get_route_data(0, 'netmask')
702 LOG.info("_get_port0prefixlen6 : %s", return_value)
705 def _get_port1prefixlen6(self):
706 return_value = self._get_route_data(1, 'netmask')
707 LOG.info("_get_port1prefixlen6 : %s", return_value)
710 def _get_port0gateway6(self):
711 return_value = self._get_route_data(0, 'network')
712 LOG.info("_get_port0gateway6 : %s", return_value)
715 def _get_port1gateway6(self):
716 return_value = self._get_route_data(1, 'network')
717 LOG.info("_get_port1gateway6 : %s", return_value)
720 def _start_vnf(self):
721 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
722 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
723 self._vnf_process = Process(name=name, target=self._run)
724 self._vnf_process.start()
726 def _vnf_up_post(self):
729 def instantiate(self, scenario_cfg, context_cfg):
730 self.scenario_helper.scenario_cfg = scenario_cfg
731 self.context_cfg = context_cfg
732 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
733 # self.nfvi_context = None
735 # vnf deploy is unsupported, use ansible playbooks
736 if self.scenario_helper.options.get("vnf_deploy", False):
737 self.deploy_helper.deploy_vnfs(self.APP_NAME)
738 self.resource_helper.setup()
741 def wait_for_instantiate(self):
743 time.sleep(self.WAIT_TIME) # Give some time for config to load
745 if not self._vnf_process.is_alive():
746 raise RuntimeError("%s VNF process died." % self.APP_NAME)
748 # NOTE(esm): move to QueueFileWrapper
749 while self.q_out.qsize() > 0:
750 buf.append(self.q_out.get())
751 message = ''.join(buf)
752 if self.VNF_PROMPT in message:
753 LOG.info("%s VNF is up and running.", self.APP_NAME)
755 self.queue_wrapper.clear()
756 self.resource_helper.start_collect()
757 return self._vnf_process.exitcode
759 if "PANIC" in message:
760 raise RuntimeError("Error starting %s VNF." %
763 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
764 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
765 # Send ENTER to display a new prompt in case the prompt text was corrupted
766 # by other VNF output
767 self.q_in.put('\r\n')
769 def _build_run_kwargs(self):
771 'stdin': self.queue_wrapper,
772 'stdout': self.queue_wrapper,
773 'keep_stdin_open': True,
775 'timeout': self.scenario_helper.timeout,
778 def _build_config(self):
779 return self.setup_helper.build_config()
782 # we can't share ssh paramiko objects to force new connection
783 self.ssh_helper.drop_connection()
784 cmd = self._build_config()
785 # kill before starting
786 self.setup_helper.kill_vnf()
789 self._build_run_kwargs()
790 self.ssh_helper.run(cmd, **self.run_kwargs)
792 def vnf_execute(self, cmd, wait_time=2):
793 """ send cmd to vnf process """
795 LOG.info("%s command: %s", self.APP_NAME, cmd)
796 self.q_in.put("{}\r\n".format(cmd))
797 time.sleep(wait_time)
799 while self.q_out.qsize() > 0:
800 output.append(self.q_out.get())
801 return "".join(output)
803 def _tear_down(self):
807 self.vnf_execute("quit")
808 self.setup_helper.kill_vnf()
810 self.resource_helper.stop_collect()
811 if self._vnf_process is not None:
812 # be proper and join first before we kill
813 LOG.debug("joining before terminate %s", self._vnf_process.name)
814 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
815 self._vnf_process.terminate()
816 # no terminate children here because we share processes with tg
818 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
819 """Method for checking the statistics
821 This method could be overridden in children classes.
823 :return: VNF statistics
825 cmd = 'p {0} stats'.format(self.APP_WORD)
826 out = self.vnf_execute(cmd)
829 def collect_kpi(self):
830 # we can't get KPIs if the VNF is down
831 check_if_process_failed(self._vnf_process)
832 stats = self.get_stats()
833 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
835 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
836 result["collect_stats"] = self.resource_helper.collect_kpi()
841 "packets_dropped": 0,
843 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
846 def scale(self, flavor=""):
847 """The SampleVNF base class doesn't provide the 'scale' feature"""
848 raise y_exceptions.FunctionNotImplemented(
849 function_name='scale', class_name='SampleVNFTrafficGen')
852 class SampleVNFTrafficGen(GenericTrafficGen):
853 """ Class providing file-like API for generic traffic generator """
858 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
859 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
860 self.bin_path = get_nsb_option('bin_path', '')
862 self.scenario_helper = ScenarioHelper(self.name)
863 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
865 if setup_env_helper_type is None:
866 setup_env_helper_type = SetupEnvHelper
868 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
870 self.scenario_helper)
872 if resource_helper_type is None:
873 resource_helper_type = ClientResourceHelper
875 self.resource_helper = resource_helper_type(self.setup_helper)
877 self.runs_traffic = True
878 self.traffic_finished = False
879 self._tg_process = None
880 self._traffic_process = None
882 def _start_server(self):
883 # we can't share ssh paramiko objects to force new connection
884 self.ssh_helper.drop_connection()
886 def instantiate(self, scenario_cfg, context_cfg):
887 self.scenario_helper.scenario_cfg = scenario_cfg
888 self.resource_helper.setup()
889 # must generate_cfg after DPDK bind because we need port number
890 self.resource_helper.generate_cfg()
892 LOG.info("Starting %s server...", self.APP_NAME)
893 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
894 self._tg_process = Process(name=name, target=self._start_server)
895 self._tg_process.start()
897 def _check_status(self):
898 raise NotImplementedError
900 def _wait_for_process(self):
902 if not self._tg_process.is_alive():
903 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
904 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
906 status = self._check_status()
908 LOG.info("%s TG Server is up and running.", self.APP_NAME)
909 return self._tg_process.exitcode
911 def _traffic_runner(self, traffic_profile):
912 # always drop connections first thing in new processes
913 # so we don't get paramiko errors
914 self.ssh_helper.drop_connection()
915 LOG.info("Starting %s client...", self.APP_NAME)
916 self.resource_helper.run_traffic(traffic_profile)
918 def run_traffic(self, traffic_profile):
919 """ Generate traffic on the wire according to the given params.
920 Method is non-blocking, returns immediately when traffic process
921 is running. Mandatory.
923 :param traffic_profile:
926 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
928 self._traffic_process = Process(name=name, target=self._traffic_runner,
929 args=(traffic_profile,))
930 self._traffic_process.start()
931 # Wait for traffic process to start
932 while self.resource_helper.client_started.value == 0:
933 time.sleep(self.RUN_WAIT)
934 # what if traffic process takes a few seconds to start?
935 if not self._traffic_process.is_alive():
938 return self._traffic_process.is_alive()
940 def collect_kpi(self):
941 # check if the tg processes have exited
942 for proc in (self._tg_process, self._traffic_process):
943 check_if_process_failed(proc)
944 result = self.resource_helper.collect_kpi()
945 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
949 """ After this method finishes, all traffic processes should stop. Mandatory.
953 self.traffic_finished = True
954 # we must kill client before we kill the server, or the client will raise exception
955 if self._traffic_process is not None:
956 # be proper and try to join before terminating
957 LOG.debug("joining before terminate %s", self._traffic_process.name)
958 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
959 self._traffic_process.terminate()
960 if self._tg_process is not None:
961 # be proper and try to join before terminating
962 LOG.debug("joining before terminate %s", self._tg_process.name)
963 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
964 self._tg_process.terminate()
965 # no terminate children here because we share processes with vnf
967 def scale(self, flavor=""):
968 """A traffic generator VFN doesn't provide the 'scale' feature"""
969 raise y_exceptions.FunctionNotImplemented(
970 function_name='scale', class_name='SampleVNFTrafficGen')