1 # Copyright (c) 2016-2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from multiprocessing import Queue, Value, Process
27 from trex_stl_lib.trex_stl_client import LoggerApi
28 from trex_stl_lib.trex_stl_client import STLClient
29 from trex_stl_lib.trex_stl_exceptions import STLError
30 from yardstick.benchmark.contexts.base import Context
31 from yardstick.common import exceptions as y_exceptions
32 from yardstick.common.process import check_if_process_failed
33 from yardstick.common import utils
34 from yardstick.common import yaml_loader
35 from yardstick.network_services import constants
36 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
37 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
38 from yardstick.network_services.nfvi.resource import ResourceProfile
39 from yardstick.network_services.utils import get_nsb_option
40 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
41 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
42 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
43 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
44 from yardstick.benchmark.contexts.node import NodeContext
46 LOG = logging.getLogger(__name__)
49 class SetupEnvHelper(object):
51 CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
52 CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
53 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
57 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
58 super(SetupEnvHelper, self).__init__()
59 self.vnfd_helper = vnfd_helper
60 self.ssh_helper = ssh_helper
61 self.scenario_helper = scenario_helper
62 self.collectd_options = {}
64 def build_config(self):
65 raise NotImplementedError
67 def setup_vnf_environment(self):
74 raise NotImplementedError
77 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
80 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
81 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
84 def _update_packet_type(ip_pipeline_cfg, traffic_options):
85 match_str = 'pkt_type = ipv4'
86 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
87 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
88 return pipeline_config_str
91 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
92 traffic_type = traffic_options['traffic_type']
94 if traffic_options['vnf_type'] is not cls.APP_NAME:
95 match_str = 'traffic_type = 4'
96 replace_str = 'traffic_type = {0}'.format(traffic_type)
98 elif traffic_type == 4:
99 match_str = 'pkt_type = ipv4'
100 replace_str = 'pkt_type = ipv4'
103 match_str = 'pkt_type = ipv4'
104 replace_str = 'pkt_type = ipv6'
106 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
107 return pipeline_config_str
109 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
110 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
111 self.all_ports = None
112 self.bound_pci = None
114 self.used_drivers = None
115 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
117 def _setup_hugepages(self):
118 meminfo = utils.read_meminfo(self.ssh_helper)
119 hp_size_kb = int(meminfo['Hugepagesize'])
120 hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
121 nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
122 self.ssh_helper.execute('echo %s | sudo tee %s' %
123 (nr_hugepages, self.NR_HUGEPAGES_PATH))
125 self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
126 nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
127 LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
128 hp_size_kb, nr_hugepages, nr_hugepages_set)
130 def build_config(self):
131 vnf_cfg = self.scenario_helper.vnf_cfg
132 task_path = self.scenario_helper.task_path
134 config_file = vnf_cfg.get('file')
135 lb_count = vnf_cfg.get('lb_count', 3)
136 lb_config = vnf_cfg.get('lb_config', 'SW')
137 worker_config = vnf_cfg.get('worker_config', '1C/1T')
138 worker_threads = vnf_cfg.get('worker_threads', 3)
140 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
142 'traffic_type': traffic_type,
143 'pkt_type': 'ipv%s' % traffic_type,
144 'vnf_type': self.VNF_TYPE,
147 # read actions/rules from file
149 acl_file_name = self.scenario_helper.options.get('rules')
151 with utils.open_relative_file(acl_file_name, task_path) as infile:
152 acl_options = yaml_loader.yaml_load(infile)
154 config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
156 config_basename = posixpath.basename(self.CFG_CONFIG)
157 script_basename = posixpath.basename(self.CFG_SCRIPT)
158 multiport = MultiPortConfig(self.scenario_helper.topology,
169 multiport.generate_config()
171 with utils.open_relative_file(config_file, task_path) as infile:
172 new_config = ['[EAL]']
174 for port in self.vnfd_helper.port_pairs.all_ports:
175 interface = self.vnfd_helper.find_interface(name=port)
176 vpci.append(interface['virtual-interface']["vpci"])
177 new_config.extend('w = {0}'.format(item) for item in vpci)
178 new_config = '\n'.join(new_config) + '\n' + infile.read()
180 with open(self.CFG_CONFIG) as handle:
181 new_config = handle.read()
182 new_config = self._update_traffic_type(new_config, traffic_options)
183 new_config = self._update_packet_type(new_config, traffic_options)
184 self.ssh_helper.upload_config_file(config_basename, new_config)
185 self.ssh_helper.upload_config_file(script_basename,
186 multiport.generate_script(self.vnfd_helper,
187 self.get_flows_config(acl_options)))
189 LOG.info("Provision and start the %s", self.APP_NAME)
190 self._build_pipeline_kwargs()
191 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
193 def get_flows_config(self, options=None): # pylint: disable=unused-argument
194 """No actions/rules (flows) by default"""
197 def _build_pipeline_kwargs(self):
198 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
199 # count the number of actual ports in the list of pairs
200 # remove duplicate ports
201 # this is really a mapping from LINK ID to DPDK PMD ID
202 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
203 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
204 ports = self.vnfd_helper.port_pairs.all_ports
205 port_nums = self.vnfd_helper.port_nums(ports)
206 # create mask from all the dpdk port numbers
207 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
209 vnf_cfg = self.scenario_helper.vnf_cfg
210 lb_config = vnf_cfg.get('lb_config', 'SW')
211 worker_threads = vnf_cfg.get('worker_threads', 3)
213 if lb_config == 'HW':
214 hwlb = ' --hwlb %s' % worker_threads
216 self.pipeline_kwargs = {
217 'cfg_file': self.CFG_CONFIG,
218 'script': self.CFG_SCRIPT,
219 'port_mask_hex': ports_mask_hex,
220 'tool_path': tool_path,
224 def setup_vnf_environment(self):
227 # bind before _setup_resources so we can use dpdk_port_num
228 self._detect_and_bind_drivers()
229 resource = self._setup_resources()
233 # pkill is not matching, debug with pgrep
234 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
235 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
236 # have to use exact match
237 # try using killall to match
238 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
240 def _setup_dpdk(self):
241 """Setup DPDK environment needed for VNF to run"""
242 self._setup_hugepages()
243 self.dpdk_bind_helper.load_dpdk_driver()
245 exit_status = self.dpdk_bind_helper.check_dpdk_driver()
249 def _setup_resources(self):
250 # what is this magic? how do we know which socket is for which port?
251 # what about quad-socket?
252 if any(v[5] == "0" for v in self.bound_pci):
257 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
258 # this won't work because we don't have DPDK port numbers yet
259 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
260 port_names = (intf["name"] for intf in ports)
261 plugins = self.collectd_options.get("plugins", {})
262 interval = self.collectd_options.get("interval")
263 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
264 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
265 plugins=plugins, interval=interval,
266 timeout=self.scenario_helper.timeout)
268 def _check_interface_fields(self):
269 num_nodes = len(self.scenario_helper.nodes)
270 # OpenStack instance creation time is probably proportional to the number
272 timeout = 120 * num_nodes
273 dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
274 self.ssh_helper, timeout)
277 def _detect_and_bind_drivers(self):
278 interfaces = self.vnfd_helper.interfaces
280 self._check_interface_fields()
281 # check for bound after probe
282 self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
284 self.dpdk_bind_helper.read_status()
285 self.dpdk_bind_helper.save_used_drivers()
287 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
289 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
290 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
292 intf = next(v for v in interfaces
293 if vpci == v['virtual-interface']['vpci'])
295 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
296 except: # pylint: disable=bare-except
300 def get_local_iface_name_by_vpci(self, vpci):
301 find_net_cmd = self.FIND_NET_CMD.format(vpci)
302 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
308 self.dpdk_bind_helper.rebind_drivers()
311 class ResourceHelper(object):
314 MAKE_INSTALL = 'cd {0} && make && sudo make install'
315 RESOURCE_WORD = 'sample'
319 def __init__(self, setup_helper):
320 super(ResourceHelper, self).__init__()
322 self.setup_helper = setup_helper
323 self.ssh_helper = setup_helper.ssh_helper
327 self.resource = self.setup_helper.setup_vnf_environment()
329 def generate_cfg(self):
332 def update_from_context(self, context, attr_name):
333 """Disable resource helper in case of baremetal context.
335 And update appropriate node collectd options in context
337 if isinstance(context, NodeContext):
339 context.update_collectd_options_for_node(self.setup_helper.collectd_options,
342 def _collect_resource_kpi(self):
344 status = self.resource.check_if_system_agent_running("collectd")[0]
345 if status == 0 and self._enable:
346 result = self.resource.amqp_collect_nfvi_kpi()
348 result = {"core": result}
351 def start_collect(self):
353 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
354 self.resource.start()
355 self.resource.amqp_process_for_nfvi_kpi()
357 def stop_collect(self):
358 if self.resource and self._enable:
361 def collect_kpi(self):
362 return self._collect_resource_kpi()
365 class ClientResourceHelper(ResourceHelper):
372 def __init__(self, setup_helper):
373 super(ClientResourceHelper, self).__init__(setup_helper)
374 self.vnfd_helper = setup_helper.vnfd_helper
375 self.scenario_helper = setup_helper.scenario_helper
378 self.client_started = Value('i', 0)
379 self.all_ports = None
380 self._queue = Queue()
382 self._terminated = Value('i', 0)
384 def _build_ports(self):
385 self.networks = self.vnfd_helper.port_pairs.networks
386 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
387 self.downlink_ports = \
388 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
389 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
391 def port_num(self, intf):
392 # by default return port num
393 return self.vnfd_helper.port_num(intf)
395 def get_stats(self, *args, **kwargs):
397 return self.client.get_stats(*args, **kwargs)
399 LOG.error('TRex client not connected')
402 def _get_samples(self, ports, port_pg_id=False):
403 raise NotImplementedError()
405 def _run_traffic_once(self, traffic_profile):
406 traffic_profile.execute_traffic(self)
407 self.client_started.value = 1
408 time.sleep(self.RUN_DURATION)
409 samples = self._get_samples(traffic_profile.ports)
410 time.sleep(self.QUEUE_WAIT_TIME)
411 self._queue.put(samples)
413 def run_traffic(self, traffic_profile, mq_producer):
414 # if we don't do this we can hang waiting for the queue to drain
415 # have to do this in the subprocess
416 self._queue.cancel_join_thread()
417 # fixme: fix passing correct trex config file,
418 # instead of searching the default path
419 mq_producer.tg_method_started()
422 self.client = self._connect()
423 self.client.reset(ports=self.all_ports)
424 self.client.remove_all_streams(self.all_ports) # remove all streams
425 traffic_profile.register_generator(self)
428 while self._terminated.value == 0:
430 if self._run_traffic_once(traffic_profile):
431 self._terminated.value = 1
432 mq_producer.tg_method_iteration(iteration_index)
434 self.client.stop(self.all_ports)
435 self.client.disconnect()
436 self._terminated.value = 0
438 if self._terminated.value:
439 LOG.debug("traffic generator is stopped")
440 return # return if trex/tg server is stopped.
443 mq_producer.tg_method_finished()
446 self._terminated.value = 1 # stop client
448 def clear_stats(self, ports=None):
450 ports = self.all_ports
451 self.client.clear_stats(ports=ports)
453 def start(self, ports=None, *args, **kwargs):
454 # pylint: disable=keyword-arg-before-vararg
455 # NOTE(ralonsoh): defining keyworded arguments before variable
456 # positional arguments is a bug. This function definition doesn't work
457 # in Python 2, although it works in Python 3. Reference:
458 # https://www.python.org/dev/peps/pep-3102/
460 ports = self.all_ports
461 self.client.start(ports=ports, *args, **kwargs)
463 def collect_kpi(self):
464 if not self._queue.empty():
465 kpi = self._queue.get()
466 self._result.update(kpi)
467 LOG.debug('Got KPIs from _queue for %s %s',
468 self.scenario_helper.name, self.RESOURCE_WORD)
471 def _connect(self, client=None):
473 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
474 server=self.vnfd_helper.mgmt_interface["ip"],
475 verbose_level=LoggerApi.VERBOSE_QUIET)
477 # try to connect with 5s intervals, 30s max
483 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
488 class Rfc2544ResourceHelper(object):
490 DEFAULT_CORRELATED_TRAFFIC = False
491 DEFAULT_LATENCY = False
492 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
494 def __init__(self, scenario_helper):
495 super(Rfc2544ResourceHelper, self).__init__()
496 self.scenario_helper = scenario_helper
497 self._correlated_traffic = None
498 self.iteration = Value('i', 0)
501 self._tolerance_low = None
502 self._tolerance_high = None
503 self._tolerance_precision = None
507 if self._rfc2544 is None:
508 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
512 def tolerance_low(self):
513 if self._tolerance_low is None:
514 self.get_rfc_tolerance()
515 return self._tolerance_low
518 def tolerance_high(self):
519 if self._tolerance_high is None:
520 self.get_rfc_tolerance()
521 return self._tolerance_high
524 def tolerance_precision(self):
525 if self._tolerance_precision is None:
526 self.get_rfc_tolerance()
527 return self._tolerance_precision
530 def correlated_traffic(self):
531 if self._correlated_traffic is None:
532 self._correlated_traffic = \
533 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
535 return self._correlated_traffic
539 if self._latency is None:
540 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
543 def get_rfc2544(self, name, default=None):
544 return self.rfc2544.get(name, default)
546 def get_rfc_tolerance(self):
547 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
548 tolerance_iter = iter(sorted(
549 decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
550 tolerance_low = next(tolerance_iter)
551 tolerance_high = next(tolerance_iter, tolerance_low)
552 self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
553 self._tolerance_high = float(tolerance_high)
554 self._tolerance_low = float(tolerance_low)
557 class SampleVNFDeployHelper(object):
559 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
560 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
561 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
563 def __init__(self, vnfd_helper, ssh_helper):
564 super(SampleVNFDeployHelper, self).__init__()
565 self.ssh_helper = ssh_helper
566 self.vnfd_helper = vnfd_helper
568 def deploy_vnfs(self, app_name):
569 vnf_bin = self.ssh_helper.join_bin_path(app_name)
570 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
574 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
575 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
577 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
578 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
580 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
582 http_proxy = os.environ.get('http_proxy', '')
583 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
585 self.ssh_helper.execute(cmd)
586 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
587 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
588 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
591 class ScenarioHelper(object):
596 'worker_config': '1C/1T',
600 def __init__(self, name):
602 self.scenario_cfg = None
606 return self.scenario_cfg['task_path']
610 return self.scenario_cfg.get('nodes')
613 def all_options(self):
614 return self.scenario_cfg.get('options', {})
618 return self.all_options.get(self.name, {})
622 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
626 return self.scenario_cfg['topology']
630 test_duration = self.scenario_cfg.get('runner', {}).get('duration',
631 self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
632 test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
633 return test_duration if test_duration > test_timeout else test_timeout
636 class SampleVNF(GenericVNF):
637 """ Class providing file-like API for generic VNF implementation """
639 VNF_PROMPT = "pipeline>"
641 WAIT_TIME_FOR_SCRIPT = 10
642 APP_NAME = "SampleVNF"
643 # we run the VNF interactively, so the ssh command will timeout after this long
645 def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
646 resource_helper_type=None):
647 super(SampleVNF, self).__init__(name, vnfd, task_id)
648 self.bin_path = get_nsb_option('bin_path', '')
650 self.scenario_helper = ScenarioHelper(self.name)
651 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
653 if setup_env_helper_type is None:
654 setup_env_helper_type = SetupEnvHelper
656 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
658 self.scenario_helper)
660 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
662 if resource_helper_type is None:
663 resource_helper_type = ResourceHelper
665 self.resource_helper = resource_helper_type(self.setup_helper)
667 self.context_cfg = None
668 self.pipeline_kwargs = {}
669 self.uplink_ports = None
670 self.downlink_ports = None
671 # NOTE(esm): make QueueFileWrapper invert-able so that we
672 # never have to manage the queues
675 self.queue_wrapper = None
677 self.used_drivers = {}
678 self.vnf_port_pairs = None
679 self._vnf_process = None
681 def _start_vnf(self):
682 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
683 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
684 self._vnf_process = Process(name=name, target=self._run)
685 self._vnf_process.start()
687 def _vnf_up_post(self):
690 def instantiate(self, scenario_cfg, context_cfg):
691 self._update_collectd_options(scenario_cfg, context_cfg)
692 self.scenario_helper.scenario_cfg = scenario_cfg
693 self.context_cfg = context_cfg
694 self.resource_helper.update_from_context(
695 Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
696 self.scenario_helper.nodes[self.name]
699 # vnf deploy is unsupported, use ansible playbooks
700 if self.scenario_helper.options.get("vnf_deploy", False):
701 self.deploy_helper.deploy_vnfs(self.APP_NAME)
702 self.resource_helper.setup()
705 def _update_collectd_options(self, scenario_cfg, context_cfg):
706 """Update collectd configuration options
707 This function retrieves all collectd options contained in the test case
709 definition builds a single dictionary combining them. The following fragment
710 represents a test case with the collectd options and priorities (1 highest, 3 lowest):
712 schema: yardstick:task:0.1
716 tg__0: trafficgen_1.yardstick
717 vnf__0: vnf.yardstick
720 <options> # COLLECTD priority 3
725 <options> # COLLECTD priority 2
730 file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1
732 scenario_options = scenario_cfg.get('options', {})
733 generic_options = scenario_options.get('collectd', {})
734 scenario_node_options = scenario_options.get(self.name, {})\
736 context_node_options = context_cfg.get('nodes', {})\
737 .get(self.name, {}).get('collectd', {})
739 options = generic_options
740 self._update_options(options, scenario_node_options)
741 self._update_options(options, context_node_options)
743 self.setup_helper.collectd_options = options
745 def _update_options(self, options, additional_options):
746 """Update collectd options and plugins dictionary"""
747 for k, v in additional_options.items():
748 if isinstance(v, dict) and k in options:
753 def wait_for_instantiate(self):
755 time.sleep(self.WAIT_TIME) # Give some time for config to load
757 if not self._vnf_process.is_alive():
758 raise RuntimeError("%s VNF process died." % self.APP_NAME)
760 # NOTE(esm): move to QueueFileWrapper
761 while self.q_out.qsize() > 0:
762 buf.append(self.q_out.get())
763 message = ''.join(buf)
764 if self.VNF_PROMPT in message:
765 LOG.info("%s VNF is up and running.", self.APP_NAME)
767 self.queue_wrapper.clear()
768 return self._vnf_process.exitcode
770 if "PANIC" in message:
771 raise RuntimeError("Error starting %s VNF." %
774 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
775 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
776 # Send ENTER to display a new prompt in case the prompt text was corrupted
777 # by other VNF output
778 self.q_in.put('\r\n')
780 def start_collect(self):
781 self.resource_helper.start_collect()
783 def stop_collect(self):
784 self.resource_helper.stop_collect()
786 def _build_run_kwargs(self):
788 'stdin': self.queue_wrapper,
789 'stdout': self.queue_wrapper,
790 'keep_stdin_open': True,
792 'timeout': self.scenario_helper.timeout,
795 def _build_config(self):
796 return self.setup_helper.build_config()
799 # we can't share ssh paramiko objects to force new connection
800 self.ssh_helper.drop_connection()
801 cmd = self._build_config()
802 # kill before starting
803 self.setup_helper.kill_vnf()
806 self._build_run_kwargs()
807 self.ssh_helper.run(cmd, **self.run_kwargs)
809 def vnf_execute(self, cmd, wait_time=2):
810 """ send cmd to vnf process """
812 LOG.info("%s command: %s", self.APP_NAME, cmd)
813 self.q_in.put("{}\r\n".format(cmd))
814 time.sleep(wait_time)
816 while self.q_out.qsize() > 0:
817 output.append(self.q_out.get())
818 return "".join(output)
820 def _tear_down(self):
824 self.vnf_execute("quit")
825 self.setup_helper.kill_vnf()
827 self.resource_helper.stop_collect()
828 if self._vnf_process is not None:
829 # be proper and join first before we kill
830 LOG.debug("joining before terminate %s", self._vnf_process.name)
831 self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
832 self._vnf_process.terminate()
833 # no terminate children here because we share processes with tg
835 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
836 """Method for checking the statistics
838 This method could be overridden in children classes.
840 :return: VNF statistics
842 cmd = 'p {0} stats'.format(self.APP_WORD)
843 out = self.vnf_execute(cmd)
846 def collect_kpi(self):
847 # we can't get KPIs if the VNF is down
848 check_if_process_failed(self._vnf_process, 0.01)
849 stats = self.get_stats()
850 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
851 physical_node = Context.get_physical_node_from_server(
852 self.scenario_helper.nodes[self.name])
854 result = {"physical_node": physical_node}
856 result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
857 result["collect_stats"] = self.resource_helper.collect_kpi()
859 result.update({"packets_in": 0,
861 "packets_dropped": 0})
863 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
866 def scale(self, flavor=""):
867 """The SampleVNF base class doesn't provide the 'scale' feature"""
868 raise y_exceptions.FunctionNotImplemented(
869 function_name='scale', class_name='SampleVNFTrafficGen')
872 class SampleVNFTrafficGen(GenericTrafficGen):
873 """ Class providing file-like API for generic traffic generator """
878 def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
879 resource_helper_type=None):
880 super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
881 self.bin_path = get_nsb_option('bin_path', '')
883 self.scenario_helper = ScenarioHelper(self.name)
884 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
886 if setup_env_helper_type is None:
887 setup_env_helper_type = SetupEnvHelper
889 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
891 self.scenario_helper)
893 if resource_helper_type is None:
894 resource_helper_type = ClientResourceHelper
896 self.resource_helper = resource_helper_type(self.setup_helper)
898 self.runs_traffic = True
899 self.traffic_finished = False
900 self._tg_process = None
901 self._traffic_process = None
903 def _start_server(self):
904 # we can't share ssh paramiko objects to force new connection
905 self.ssh_helper.drop_connection()
907 def instantiate(self, scenario_cfg, context_cfg):
908 self.scenario_helper.scenario_cfg = scenario_cfg
909 self.resource_helper.update_from_context(
910 Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
911 self.scenario_helper.nodes[self.name]
914 self.resource_helper.setup()
915 # must generate_cfg after DPDK bind because we need port number
916 self.resource_helper.generate_cfg()
918 LOG.info("Starting %s server...", self.APP_NAME)
919 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
920 self._tg_process = Process(name=name, target=self._start_server)
921 self._tg_process.start()
923 def _check_status(self):
924 raise NotImplementedError
926 def _wait_for_process(self):
928 if not self._tg_process.is_alive():
929 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
930 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
932 status = self._check_status()
934 LOG.info("%s TG Server is up and running.", self.APP_NAME)
935 return self._tg_process.exitcode
937 def _traffic_runner(self, traffic_profile, mq_id):
938 # always drop connections first thing in new processes
939 # so we don't get paramiko errors
940 self.ssh_helper.drop_connection()
941 LOG.info("Starting %s client...", self.APP_NAME)
942 self._mq_producer = self._setup_mq_producer(mq_id)
943 self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
945 def run_traffic(self, traffic_profile):
946 """ Generate traffic on the wire according to the given params.
947 Method is non-blocking, returns immediately when traffic process
948 is running. Mandatory.
950 :param traffic_profile:
953 name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
954 traffic_profile.__class__.__name__,
956 self._traffic_process = Process(
957 name=name, target=self._traffic_runner,
958 args=(traffic_profile, uuid.uuid1().int))
959 self._traffic_process.start()
960 # Wait for traffic process to start
961 while self.resource_helper.client_started.value == 0:
962 time.sleep(self.RUN_WAIT)
963 # what if traffic process takes a few seconds to start?
964 if not self._traffic_process.is_alive():
967 def collect_kpi(self):
968 # check if the tg processes have exited
969 physical_node = Context.get_physical_node_from_server(
970 self.scenario_helper.nodes[self.name])
972 result = {"physical_node": physical_node}
973 for proc in (self._tg_process, self._traffic_process):
974 check_if_process_failed(proc)
976 result["collect_stats"] = self.resource_helper.collect_kpi()
977 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
981 """ After this method finishes, all traffic processes should stop. Mandatory.
985 self.traffic_finished = True
986 # we must kill client before we kill the server, or the client will raise exception
987 if self._traffic_process is not None:
988 # be proper and try to join before terminating
989 LOG.debug("joining before terminate %s", self._traffic_process.name)
990 self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
991 self._traffic_process.terminate()
992 if self._tg_process is not None:
993 # be proper and try to join before terminating
994 LOG.debug("joining before terminate %s", self._tg_process.name)
995 self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
996 self._tg_process.terminate()
997 # no terminate children here because we share processes with vnf
999 def scale(self, flavor=""):
1000 """A traffic generator VFN doesn't provide the 'scale' feature"""
1001 raise y_exceptions.FunctionNotImplemented(
1002 function_name='scale', class_name='SampleVNFTrafficGen')