1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
16 from collections import Mapping
18 from multiprocessing import Queue, Value, Process
26 from six.moves import cStringIO
28 from trex_stl_lib.trex_stl_client import LoggerApi
29 from trex_stl_lib.trex_stl_client import STLClient
30 from trex_stl_lib.trex_stl_exceptions import STLError
31 from yardstick.benchmark.contexts.base import Context
32 from yardstick.common import exceptions as y_exceptions
33 from yardstick.common.process import check_if_process_failed
34 from yardstick.common import utils
35 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
36 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
41 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.ssh import AutoConnectSSH
46 DPDK_VERSION = "dpdk-16.07"
48 LOG = logging.getLogger(__name__)
52 DEFAULT_VNF_TIMEOUT = 3600
53 PROCESS_JOIN_TIMEOUT = 3
56 class VnfSshHelper(AutoConnectSSH):
58 def __init__(self, node, bin_path, wait=None):
60 kwargs = self.args_from_node(self.node)
62 kwargs.setdefault('wait', wait)
64 super(VnfSshHelper, self).__init__(**kwargs)
65 self.bin_path = bin_path
69 # must return static class name, anything else refers to the calling class
70 # i.e. the subclass, not the superclass
74 # this copy constructor is different from SSH classes, since it uses node
75 return self.get_class()(self.node, self.bin_path)
77 def upload_config_file(self, prefix, content):
78 cfg_file = os.path.join(REMOTE_TMP, prefix)
80 file_obj = cStringIO(content)
81 self.put_file_obj(file_obj, cfg_file)
84 def join_bin_path(self, *args):
85 return os.path.join(self.bin_path, *args)
87 def provision_tool(self, tool_path=None, tool_file=None):
89 tool_path = self.bin_path
90 return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
93 class SetupEnvHelper(object):
95 CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
96 CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
97 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
101 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
102 super(SetupEnvHelper, self).__init__()
103 self.vnfd_helper = vnfd_helper
104 self.ssh_helper = ssh_helper
105 self.scenario_helper = scenario_helper
107 def build_config(self):
108 raise NotImplementedError
110 def setup_vnf_environment(self):
117 raise NotImplementedError
120 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
123 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
124 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
125 HUGEPAGES_KB = 1024 * 1024 * 16
128 def _update_packet_type(ip_pipeline_cfg, traffic_options):
129 match_str = 'pkt_type = ipv4'
130 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
131 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
132 return pipeline_config_str
135 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
136 traffic_type = traffic_options['traffic_type']
138 if traffic_options['vnf_type'] is not cls.APP_NAME:
139 match_str = 'traffic_type = 4'
140 replace_str = 'traffic_type = {0}'.format(traffic_type)
142 elif traffic_type == 4:
143 match_str = 'pkt_type = ipv4'
144 replace_str = 'pkt_type = ipv4'
147 match_str = 'pkt_type = ipv4'
148 replace_str = 'pkt_type = ipv6'
150 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
151 return pipeline_config_str
153 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
154 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
155 self.all_ports = None
156 self.bound_pci = None
158 self.used_drivers = None
159 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
161 def _setup_hugepages(self):
162 meminfo = utils.read_meminfo(self.ssh_helper)
163 hp_size_kb = int(meminfo['Hugepagesize'])
164 nr_hugepages = int(abs(self.HUGEPAGES_KB / hp_size_kb))
165 self.ssh_helper.execute('echo %s | sudo tee %s' %
166 (nr_hugepages, self.NR_HUGEPAGES_PATH))
168 self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
169 nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
170 LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
171 hp_size_kb, nr_hugepages, nr_hugepages_set)
173 def build_config(self):
174 vnf_cfg = self.scenario_helper.vnf_cfg
175 task_path = self.scenario_helper.task_path
177 config_file = vnf_cfg.get('file')
178 lb_count = vnf_cfg.get('lb_count', 3)
179 lb_config = vnf_cfg.get('lb_config', 'SW')
180 worker_config = vnf_cfg.get('worker_config', '1C/1T')
181 worker_threads = vnf_cfg.get('worker_threads', 3)
183 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
185 'traffic_type': traffic_type,
186 'pkt_type': 'ipv%s' % traffic_type,
187 'vnf_type': self.VNF_TYPE,
190 config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
192 config_basename = posixpath.basename(self.CFG_CONFIG)
193 script_basename = posixpath.basename(self.CFG_SCRIPT)
194 multiport = MultiPortConfig(self.scenario_helper.topology,
205 multiport.generate_config()
207 with utils.open_relative_file(config_file, task_path) as infile:
208 new_config = ['[EAL]']
210 for port in self.vnfd_helper.port_pairs.all_ports:
211 interface = self.vnfd_helper.find_interface(name=port)
212 vpci.append(interface['virtual-interface']["vpci"])
213 new_config.extend('w = {0}'.format(item) for item in vpci)
214 new_config = '\n'.join(new_config) + '\n' + infile.read()
216 with open(self.CFG_CONFIG) as handle:
217 new_config = handle.read()
218 new_config = self._update_traffic_type(new_config, traffic_options)
219 new_config = self._update_packet_type(new_config, traffic_options)
220 self.ssh_helper.upload_config_file(config_basename, new_config)
221 self.ssh_helper.upload_config_file(script_basename,
222 multiport.generate_script(self.vnfd_helper))
224 LOG.info("Provision and start the %s", self.APP_NAME)
225 self._build_pipeline_kwargs()
226 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
228 def _build_pipeline_kwargs(self):
229 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
230 # count the number of actual ports in the list of pairs
231 # remove duplicate ports
232 # this is really a mapping from LINK ID to DPDK PMD ID
233 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
234 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
235 ports = self.vnfd_helper.port_pairs.all_ports
236 port_nums = self.vnfd_helper.port_nums(ports)
237 # create mask from all the dpdk port numbers
238 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
239 self.pipeline_kwargs = {
240 'cfg_file': self.CFG_CONFIG,
241 'script': self.CFG_SCRIPT,
242 'port_mask_hex': ports_mask_hex,
243 'tool_path': tool_path,
246 def setup_vnf_environment(self):
248 self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces]
250 # bind before _setup_resources so we can use dpdk_port_num
251 self._detect_and_bind_drivers()
252 resource = self._setup_resources()
256 # pkill is not matching, debug with pgrep
257 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
258 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
259 # have to use exact match
260 # try using killall to match
261 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
263 def _setup_dpdk(self):
264 """Setup DPDK environment needed for VNF to run"""
265 self._setup_hugepages()
266 self.ssh_helper.execute('sudo modprobe uio && sudo modprobe igb_uio')
267 exit_status = self.ssh_helper.execute('lsmod | grep -i igb_uio')[0]
269 raise y_exceptions.DPDKSetupDriverError()
271 def get_collectd_options(self):
272 options = self.scenario_helper.all_options.get("collectd", {})
273 # override with specific node settings
274 options.update(self.scenario_helper.options.get("collectd", {}))
277 def _setup_resources(self):
278 # what is this magic? how do we know which socket is for which port?
279 # what about quad-socket?
280 if any(v[5] == "0" for v in self.bound_pci):
285 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
286 # this won't work because we don't have DPDK port numbers yet
287 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
288 port_names = (intf["name"] for intf in ports)
289 collectd_options = self.get_collectd_options()
290 plugins = collectd_options.get("plugins", {})
291 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
292 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
293 plugins=plugins, interval=collectd_options.get("interval"),
294 timeout=self.scenario_helper.timeout)
296 def _detect_and_bind_drivers(self):
297 interfaces = self.vnfd_helper.interfaces
299 self.dpdk_bind_helper.read_status()
300 self.dpdk_bind_helper.save_used_drivers()
302 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
304 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
305 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
307 intf = next(v for v in interfaces
308 if vpci == v['virtual-interface']['vpci'])
310 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
311 except: # pylint: disable=bare-except
315 def get_local_iface_name_by_vpci(self, vpci):
316 find_net_cmd = self.FIND_NET_CMD.format(vpci)
317 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
323 self.dpdk_bind_helper.rebind_drivers()
326 class ResourceHelper(object):
329 MAKE_INSTALL = 'cd {0} && make && sudo make install'
330 RESOURCE_WORD = 'sample'
334 def __init__(self, setup_helper):
335 super(ResourceHelper, self).__init__()
337 self.setup_helper = setup_helper
338 self.ssh_helper = setup_helper.ssh_helper
341 self.resource = self.setup_helper.setup_vnf_environment()
343 def generate_cfg(self):
346 def _collect_resource_kpi(self):
348 status = self.resource.check_if_system_agent_running("collectd")[0]
350 result = self.resource.amqp_collect_nfvi_kpi()
352 result = {"core": result}
355 def start_collect(self):
356 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
357 self.resource.start()
358 self.resource.amqp_process_for_nfvi_kpi()
360 def stop_collect(self):
364 def collect_kpi(self):
365 return self._collect_resource_kpi()
368 class ClientResourceHelper(ResourceHelper):
375 def __init__(self, setup_helper):
376 super(ClientResourceHelper, self).__init__(setup_helper)
377 self.vnfd_helper = setup_helper.vnfd_helper
378 self.scenario_helper = setup_helper.scenario_helper
381 self.client_started = Value('i', 0)
382 self.all_ports = None
383 self._queue = Queue()
385 self._terminated = Value('i', 0)
387 def _build_ports(self):
388 self.networks = self.vnfd_helper.port_pairs.networks
389 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
390 self.downlink_ports = \
391 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
392 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
394 def port_num(self, intf):
395 # by default return port num
396 return self.vnfd_helper.port_num(intf)
398 def get_stats(self, *args, **kwargs):
400 return self.client.get_stats(*args, **kwargs)
402 LOG.exception("TRex client not connected")
405 def generate_samples(self, ports, key=None, default=None):
406 # needs to be used ports
407 last_result = self.get_stats(ports)
408 key_value = last_result.get(key, default)
410 if not isinstance(last_result, Mapping): # added for mock unit test
411 self._terminated.value = 1
415 # recalculate port for interface and see if it matches ports provided
416 for intf in self.vnfd_helper.interfaces:
418 port = self.vnfd_helper.port_num(name)
420 xe_value = last_result.get(port, {})
422 "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
423 "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
424 "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
425 "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
426 "in_packets": int(xe_value.get("ipackets", 0)),
427 "out_packets": int(xe_value.get("opackets", 0)),
430 samples[name][key] = key_value
433 def _run_traffic_once(self, traffic_profile):
434 traffic_profile.execute_traffic(self)
435 self.client_started.value = 1
436 time.sleep(self.RUN_DURATION)
437 samples = self.generate_samples(traffic_profile.ports)
438 time.sleep(self.QUEUE_WAIT_TIME)
439 self._queue.put(samples)
441 def run_traffic(self, traffic_profile):
442 # if we don't do this we can hang waiting for the queue to drain
443 # have to do this in the subprocess
444 self._queue.cancel_join_thread()
445 # fixme: fix passing correct trex config file,
446 # instead of searching the default path
449 self.client = self._connect()
450 self.client.reset(ports=self.all_ports)
451 self.client.remove_all_streams(self.all_ports) # remove all streams
452 traffic_profile.register_generator(self)
454 while self._terminated.value == 0:
455 self._run_traffic_once(traffic_profile)
457 self.client.stop(self.all_ports)
458 self.client.disconnect()
459 self._terminated.value = 0
461 if self._terminated.value:
462 LOG.debug("traffic generator is stopped")
463 return # return if trex/tg server is stopped.
467 self._terminated.value = 1 # stop client
469 def clear_stats(self, ports=None):
471 ports = self.all_ports
472 self.client.clear_stats(ports=ports)
474 def start(self, ports=None, *args, **kwargs):
475 # pylint: disable=keyword-arg-before-vararg
476 # NOTE(ralonsoh): defining keyworded arguments before variable
477 # positional arguments is a bug. This function definition doesn't work
478 # in Python 2, although it works in Python 3. Reference:
479 # https://www.python.org/dev/peps/pep-3102/
481 ports = self.all_ports
482 self.client.start(ports=ports, *args, **kwargs)
484 def collect_kpi(self):
485 if not self._queue.empty():
486 kpi = self._queue.get()
487 self._result.update(kpi)
488 LOG.debug('Got KPIs from _queue for %s %s',
489 self.scenario_helper.name, self.RESOURCE_WORD)
492 def _connect(self, client=None):
494 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
495 server=self.vnfd_helper.mgmt_interface["ip"],
496 verbose_level=LoggerApi.VERBOSE_QUIET)
498 # try to connect with 5s intervals, 30s max
504 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
509 class Rfc2544ResourceHelper(object):
511 DEFAULT_CORRELATED_TRAFFIC = False
512 DEFAULT_LATENCY = False
513 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
515 def __init__(self, scenario_helper):
516 super(Rfc2544ResourceHelper, self).__init__()
517 self.scenario_helper = scenario_helper
518 self._correlated_traffic = None
519 self.iteration = Value('i', 0)
522 self._tolerance_low = None
523 self._tolerance_high = None
527 if self._rfc2544 is None:
528 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
532 def tolerance_low(self):
533 if self._tolerance_low is None:
534 self.get_rfc_tolerance()
535 return self._tolerance_low
538 def tolerance_high(self):
539 if self._tolerance_high is None:
540 self.get_rfc_tolerance()
541 return self._tolerance_high
544 def correlated_traffic(self):
545 if self._correlated_traffic is None:
546 self._correlated_traffic = \
547 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
549 return self._correlated_traffic
553 if self._latency is None:
554 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
557 def get_rfc2544(self, name, default=None):
558 return self.rfc2544.get(name, default)
560 def get_rfc_tolerance(self):
561 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
562 tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
563 self._tolerance_low = next(tolerance_iter)
564 self._tolerance_high = next(tolerance_iter, self.tolerance_low)
567 class SampleVNFDeployHelper(object):
569 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
570 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
571 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
573 def __init__(self, vnfd_helper, ssh_helper):
574 super(SampleVNFDeployHelper, self).__init__()
575 self.ssh_helper = ssh_helper
576 self.vnfd_helper = vnfd_helper
578 def deploy_vnfs(self, app_name):
579 vnf_bin = self.ssh_helper.join_bin_path(app_name)
580 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
584 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
585 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
587 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
588 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
590 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
592 http_proxy = os.environ.get('http_proxy', '')
593 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
595 self.ssh_helper.execute(cmd)
596 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
597 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
598 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
601 class ScenarioHelper(object):
606 'worker_config': '1C/1T',
610 def __init__(self, name):
612 self.scenario_cfg = None
616 return self.scenario_cfg['task_path']
620 return self.scenario_cfg.get('nodes')
623 def all_options(self):
624 return self.scenario_cfg.get('options', {})
628 return self.all_options.get(self.name, {})
632 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
636 return self.scenario_cfg['topology']
640 return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
643 class SampleVNF(GenericVNF):
644 """ Class providing file-like API for generic VNF implementation """
646 VNF_PROMPT = "pipeline>"
648 WAIT_TIME_FOR_SCRIPT = 10
649 APP_NAME = "SampleVNF"
650 # we run the VNF interactively, so the ssh command will timeout after this long
652 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
653 super(SampleVNF, self).__init__(name, vnfd)
654 self.bin_path = get_nsb_option('bin_path', '')
656 self.scenario_helper = ScenarioHelper(self.name)
657 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
659 if setup_env_helper_type is None:
660 setup_env_helper_type = SetupEnvHelper
662 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
664 self.scenario_helper)
666 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
668 if resource_helper_type is None:
669 resource_helper_type = ResourceHelper
671 self.resource_helper = resource_helper_type(self.setup_helper)
673 self.context_cfg = None
674 self.nfvi_context = None
675 self.pipeline_kwargs = {}
676 self.uplink_ports = None
677 self.downlink_ports = None
678 # NOTE(esm): make QueueFileWrapper invert-able so that we
679 # never have to manage the queues
682 self.queue_wrapper = None
684 self.used_drivers = {}
685 self.vnf_port_pairs = None
686 self._vnf_process = None
688 def _build_ports(self):
689 self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
690 self.networks = self._port_pairs.networks
691 self.uplink_ports = self.vnfd_helper.port_nums(self._port_pairs.uplink_ports)
692 self.downlink_ports = self.vnfd_helper.port_nums(self._port_pairs.downlink_ports)
693 self.my_ports = self.vnfd_helper.port_nums(self._port_pairs.all_ports)
695 def _get_route_data(self, route_index, route_type):
696 route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
697 for _ in range(route_index):
699 return next(route_iter, {}).get(route_type, '')
701 def _get_port0localip6(self):
702 return_value = self._get_route_data(0, 'network')
703 LOG.info("_get_port0localip6 : %s", return_value)
706 def _get_port1localip6(self):
707 return_value = self._get_route_data(1, 'network')
708 LOG.info("_get_port1localip6 : %s", return_value)
711 def _get_port0prefixlen6(self):
712 return_value = self._get_route_data(0, 'netmask')
713 LOG.info("_get_port0prefixlen6 : %s", return_value)
716 def _get_port1prefixlen6(self):
717 return_value = self._get_route_data(1, 'netmask')
718 LOG.info("_get_port1prefixlen6 : %s", return_value)
721 def _get_port0gateway6(self):
722 return_value = self._get_route_data(0, 'network')
723 LOG.info("_get_port0gateway6 : %s", return_value)
726 def _get_port1gateway6(self):
727 return_value = self._get_route_data(1, 'network')
728 LOG.info("_get_port1gateway6 : %s", return_value)
731 def _start_vnf(self):
732 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
733 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
734 self._vnf_process = Process(name=name, target=self._run)
735 self._vnf_process.start()
737 def _vnf_up_post(self):
740 def instantiate(self, scenario_cfg, context_cfg):
741 self.scenario_helper.scenario_cfg = scenario_cfg
742 self.context_cfg = context_cfg
743 self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
744 # self.nfvi_context = None
746 # vnf deploy is unsupported, use ansible playbooks
747 if self.scenario_helper.options.get("vnf_deploy", False):
748 self.deploy_helper.deploy_vnfs(self.APP_NAME)
749 self.resource_helper.setup()
752 def wait_for_instantiate(self):
754 time.sleep(self.WAIT_TIME) # Give some time for config to load
756 if not self._vnf_process.is_alive():
757 raise RuntimeError("%s VNF process died." % self.APP_NAME)
759 # NOTE(esm): move to QueueFileWrapper
760 while self.q_out.qsize() > 0:
761 buf.append(self.q_out.get())
762 message = ''.join(buf)
763 if self.VNF_PROMPT in message:
764 LOG.info("%s VNF is up and running.", self.APP_NAME)
766 self.queue_wrapper.clear()
767 self.resource_helper.start_collect()
768 return self._vnf_process.exitcode
770 if "PANIC" in message:
771 raise RuntimeError("Error starting %s VNF." %
774 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
775 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
776 # Send ENTER to display a new prompt in case the prompt text was corrupted
777 # by other VNF output
778 self.q_in.put('\r\n')
780 def _build_run_kwargs(self):
782 'stdin': self.queue_wrapper,
783 'stdout': self.queue_wrapper,
784 'keep_stdin_open': True,
786 'timeout': self.scenario_helper.timeout,
789 def _build_config(self):
790 return self.setup_helper.build_config()
793 # we can't share ssh paramiko objects to force new connection
794 self.ssh_helper.drop_connection()
795 cmd = self._build_config()
796 # kill before starting
797 self.setup_helper.kill_vnf()
800 self._build_run_kwargs()
801 self.ssh_helper.run(cmd, **self.run_kwargs)
803 def vnf_execute(self, cmd, wait_time=2):
804 """ send cmd to vnf process """
806 LOG.info("%s command: %s", self.APP_NAME, cmd)
807 self.q_in.put("{}\r\n".format(cmd))
808 time.sleep(wait_time)
810 while self.q_out.qsize() > 0:
811 output.append(self.q_out.get())
812 return "".join(output)
814 def _tear_down(self):
818 self.vnf_execute("quit")
819 self.setup_helper.kill_vnf()
821 self.resource_helper.stop_collect()
822 if self._vnf_process is not None:
823 # be proper and join first before we kill
824 LOG.debug("joining before terminate %s", self._vnf_process.name)
825 self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
826 self._vnf_process.terminate()
827 # no terminate children here because we share processes with tg
829 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
830 """Method for checking the statistics
832 This method could be overridden in children classes.
834 :return: VNF statistics
836 cmd = 'p {0} stats'.format(self.APP_WORD)
837 out = self.vnf_execute(cmd)
840 def collect_kpi(self):
841 # we can't get KPIs if the VNF is down
842 check_if_process_failed(self._vnf_process)
843 stats = self.get_stats()
844 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
846 result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
847 result["collect_stats"] = self.resource_helper.collect_kpi()
852 "packets_dropped": 0,
854 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
857 def scale(self, flavor=""):
858 """The SampleVNF base class doesn't provide the 'scale' feature"""
859 raise y_exceptions.FunctionNotImplemented(
860 function_name='scale', class_name='SampleVNFTrafficGen')
863 class SampleVNFTrafficGen(GenericTrafficGen):
864 """ Class providing file-like API for generic traffic generator """
869 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
870 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
871 self.bin_path = get_nsb_option('bin_path', '')
873 self.scenario_helper = ScenarioHelper(self.name)
874 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
876 if setup_env_helper_type is None:
877 setup_env_helper_type = SetupEnvHelper
879 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
881 self.scenario_helper)
883 if resource_helper_type is None:
884 resource_helper_type = ClientResourceHelper
886 self.resource_helper = resource_helper_type(self.setup_helper)
888 self.runs_traffic = True
889 self.traffic_finished = False
890 self._tg_process = None
891 self._traffic_process = None
893 def _start_server(self):
894 # we can't share ssh paramiko objects to force new connection
895 self.ssh_helper.drop_connection()
897 def instantiate(self, scenario_cfg, context_cfg):
898 self.scenario_helper.scenario_cfg = scenario_cfg
899 self.resource_helper.setup()
900 # must generate_cfg after DPDK bind because we need port number
901 self.resource_helper.generate_cfg()
903 LOG.info("Starting %s server...", self.APP_NAME)
904 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
905 self._tg_process = Process(name=name, target=self._start_server)
906 self._tg_process.start()
908 def _check_status(self):
909 raise NotImplementedError
911 def _wait_for_process(self):
913 if not self._tg_process.is_alive():
914 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
915 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
917 status = self._check_status()
919 LOG.info("%s TG Server is up and running.", self.APP_NAME)
920 return self._tg_process.exitcode
922 def _traffic_runner(self, traffic_profile):
923 # always drop connections first thing in new processes
924 # so we don't get paramiko errors
925 self.ssh_helper.drop_connection()
926 LOG.info("Starting %s client...", self.APP_NAME)
927 self.resource_helper.run_traffic(traffic_profile)
929 def run_traffic(self, traffic_profile):
930 """ Generate traffic on the wire according to the given params.
931 Method is non-blocking, returns immediately when traffic process
932 is running. Mandatory.
934 :param traffic_profile:
937 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
939 self._traffic_process = Process(name=name, target=self._traffic_runner,
940 args=(traffic_profile,))
941 self._traffic_process.start()
942 # Wait for traffic process to start
943 while self.resource_helper.client_started.value == 0:
944 time.sleep(self.RUN_WAIT)
945 # what if traffic process takes a few seconds to start?
946 if not self._traffic_process.is_alive():
949 return self._traffic_process.is_alive()
951 def collect_kpi(self):
952 # check if the tg processes have exited
953 for proc in (self._tg_process, self._traffic_process):
954 check_if_process_failed(proc)
955 result = self.resource_helper.collect_kpi()
956 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
960 """ After this method finishes, all traffic processes should stop. Mandatory.
964 self.traffic_finished = True
965 # we must kill client before we kill the server, or the client will raise exception
966 if self._traffic_process is not None:
967 # be proper and try to join before terminating
968 LOG.debug("joining before terminate %s", self._traffic_process.name)
969 self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
970 self._traffic_process.terminate()
971 if self._tg_process is not None:
972 # be proper and try to join before terminating
973 LOG.debug("joining before terminate %s", self._tg_process.name)
974 self._tg_process.join(PROCESS_JOIN_TIMEOUT)
975 self._tg_process.terminate()
976 # no terminate children here because we share processes with vnf
978 def scale(self, flavor=""):
979 """A traffic generator VFN doesn't provide the 'scale' feature"""
980 raise y_exceptions.FunctionNotImplemented(
981 function_name='scale', class_name='SampleVNFTrafficGen')