1 # Copyright (c) 2016-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from multiprocessing import Queue, Value, Process, JoinableQueue
25 from trex_stl_lib.trex_stl_client import LoggerApi
26 from trex_stl_lib.trex_stl_client import STLClient
27 from trex_stl_lib.trex_stl_exceptions import STLError
28 from yardstick.benchmark.contexts.base import Context
29 from yardstick.common import exceptions as y_exceptions
30 from yardstick.common.process import check_if_process_failed
31 from yardstick.common import utils
32 from yardstick.common import yaml_loader
33 from yardstick.network_services import constants
34 from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode
35 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
36 from yardstick.network_services.nfvi.resource import ResourceProfile
37 from yardstick.network_services.utils import get_nsb_option
38 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
39 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
40 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
41 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 from yardstick.benchmark.contexts.node import NodeContext
44 LOG = logging.getLogger(__name__)
47 class SetupEnvHelper(object):
49 CFG_CONFIG = os.path.join(constants.REMOTE_TMP, "sample_config")
50 CFG_SCRIPT = os.path.join(constants.REMOTE_TMP, "sample_script")
51 DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
55 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
56 super(SetupEnvHelper, self).__init__()
57 self.vnfd_helper = vnfd_helper
58 self.ssh_helper = ssh_helper
59 self.scenario_helper = scenario_helper
60 self.collectd_options = {}
62 def build_config(self):
63 raise NotImplementedError
65 def setup_vnf_environment(self):
72 raise NotImplementedError
75 class DpdkVnfSetupEnvHelper(SetupEnvHelper):
78 FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
79 NR_HUGEPAGES_PATH = '/proc/sys/vm/nr_hugepages'
82 def _update_packet_type(ip_pipeline_cfg, traffic_options):
83 match_str = 'pkt_type = ipv4'
84 replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
85 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
86 return pipeline_config_str
89 def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
90 traffic_type = traffic_options['traffic_type']
92 if traffic_options['vnf_type'] is not cls.APP_NAME:
93 match_str = 'traffic_type = 4'
94 replace_str = 'traffic_type = {0}'.format(traffic_type)
96 elif traffic_type == 4:
97 match_str = 'pkt_type = ipv4'
98 replace_str = 'pkt_type = ipv4'
101 match_str = 'pkt_type = ipv4'
102 replace_str = 'pkt_type = ipv6'
104 pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
105 return pipeline_config_str
107 def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
108 super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
109 self.all_ports = None
110 self.bound_pci = None
112 self.used_drivers = None
113 self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
115 def build_config(self):
116 vnf_cfg = self.scenario_helper.vnf_cfg
117 task_path = self.scenario_helper.task_path
119 config_file = vnf_cfg.get('file')
120 lb_count = vnf_cfg.get('lb_count', 3)
121 lb_config = vnf_cfg.get('lb_config', 'SW')
122 worker_config = vnf_cfg.get('worker_config', '1C/1T')
123 worker_threads = vnf_cfg.get('worker_threads', 3)
125 traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
127 'traffic_type': traffic_type,
128 'pkt_type': 'ipv%s' % traffic_type,
129 'vnf_type': self.VNF_TYPE,
132 # read actions/rules from file
134 acl_file_name = self.scenario_helper.options.get('rules')
136 with utils.open_relative_file(acl_file_name, task_path) as infile:
137 acl_options = yaml_loader.yaml_load(infile)
139 config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG,
141 config_basename = posixpath.basename(self.CFG_CONFIG)
142 script_basename = posixpath.basename(self.CFG_SCRIPT)
143 multiport = MultiPortConfig(self.scenario_helper.topology,
154 multiport.generate_config()
156 with utils.open_relative_file(config_file, task_path) as infile:
157 new_config = ['[EAL]']
159 for port in self.vnfd_helper.port_pairs.all_ports:
160 interface = self.vnfd_helper.find_interface(name=port)
161 vpci.append(interface['virtual-interface']["vpci"])
162 new_config.extend('w = {0}'.format(item) for item in vpci)
163 new_config = '\n'.join(new_config) + '\n' + infile.read()
165 with open(self.CFG_CONFIG) as handle:
166 new_config = handle.read()
167 new_config = self._update_traffic_type(new_config, traffic_options)
168 new_config = self._update_packet_type(new_config, traffic_options)
169 self.ssh_helper.upload_config_file(config_basename, new_config)
170 self.ssh_helper.upload_config_file(script_basename,
171 multiport.generate_script(self.vnfd_helper,
172 self.get_flows_config(acl_options)))
174 LOG.info("Provision and start the %s", self.APP_NAME)
175 self._build_pipeline_kwargs()
176 return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
178 def get_flows_config(self, options=None): # pylint: disable=unused-argument
179 """No actions/rules (flows) by default"""
182 def _build_pipeline_kwargs(self, cfg_file=None, script=None):
183 tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
184 # count the number of actual ports in the list of pairs
185 # remove duplicate ports
186 # this is really a mapping from LINK ID to DPDK PMD ID
187 # e.g. 0x110 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_2
188 # 0x1010 maps LINK0 -> PMD_ID_1, LINK1 -> PMD_ID_3
189 ports = self.vnfd_helper.port_pairs.all_ports
190 port_nums = self.vnfd_helper.port_nums(ports)
191 # create mask from all the dpdk port numbers
192 ports_mask_hex = hex(sum(2 ** num for num in port_nums))
194 vnf_cfg = self.scenario_helper.vnf_cfg
195 lb_config = vnf_cfg.get('lb_config', 'SW')
196 worker_threads = vnf_cfg.get('worker_threads', 3)
198 if lb_config == 'HW':
199 hwlb = ' --hwlb %s' % worker_threads
201 self.pipeline_kwargs = {
202 'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
203 'script': script if script else self.CFG_SCRIPT,
204 'port_mask_hex': ports_mask_hex,
205 'tool_path': tool_path,
209 def setup_vnf_environment(self):
212 # bind before _setup_resources so we can use dpdk_port_num
213 self._detect_and_bind_drivers()
214 resource = self._setup_resources()
218 # pkill is not matching, debug with pgrep
219 self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME)
220 self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
221 # have to use exact match
222 # try using killall to match
223 self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
225 def _setup_dpdk(self):
226 """Setup DPDK environment needed for VNF to run"""
227 hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
228 utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
229 self.dpdk_bind_helper.load_dpdk_driver()
231 exit_status = self.dpdk_bind_helper.check_dpdk_driver()
235 LOG.critical("DPDK Driver not installed")
238 def _setup_resources(self):
239 # what is this magic? how do we know which socket is for which port?
240 # what about quad-socket?
241 if any(v[5] == "0" for v in self.bound_pci):
246 # implicit ordering, presumably by DPDK port num, so pre-sort by port_num
247 # this won't work because we don't have DPDK port numbers yet
248 ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num)
249 port_names = (intf["name"] for intf in ports)
250 plugins = self.collectd_options.get("plugins", {})
251 interval = self.collectd_options.get("interval")
252 # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
253 return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names,
254 plugins=plugins, interval=interval,
255 timeout=self.scenario_helper.timeout)
257 def _check_interface_fields(self):
258 num_nodes = len(self.scenario_helper.nodes)
259 # OpenStack instance creation time is probably proportional to the number
261 timeout = 120 * num_nodes
262 dpdk_node = DpdkNode(self.scenario_helper.name, self.vnfd_helper.interfaces,
263 self.ssh_helper, timeout)
266 def _detect_and_bind_drivers(self):
267 interfaces = self.vnfd_helper.interfaces
269 self._check_interface_fields()
270 # check for bound after probe
271 self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
273 self.dpdk_bind_helper.read_status()
274 self.dpdk_bind_helper.save_used_drivers()
276 self.dpdk_bind_helper.bind(self.bound_pci, 'igb_uio')
278 sorted_dpdk_pci_addresses = sorted(self.dpdk_bind_helper.dpdk_bound_pci_addresses)
279 for dpdk_port_num, vpci in enumerate(sorted_dpdk_pci_addresses):
281 intf = next(v for v in interfaces
282 if vpci == v['virtual-interface']['vpci'])
284 intf['virtual-interface']['dpdk_port_num'] = int(dpdk_port_num)
285 except: # pylint: disable=bare-except
289 def get_local_iface_name_by_vpci(self, vpci):
290 find_net_cmd = self.FIND_NET_CMD.format(vpci)
291 exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
297 self.dpdk_bind_helper.rebind_drivers()
300 class ResourceHelper(object):
303 MAKE_INSTALL = 'cd {0} && make && sudo make install'
304 RESOURCE_WORD = 'sample'
308 def __init__(self, setup_helper):
309 super(ResourceHelper, self).__init__()
311 self.setup_helper = setup_helper
312 self.ssh_helper = setup_helper.ssh_helper
316 self.resource = self.setup_helper.setup_vnf_environment()
318 def generate_cfg(self):
321 def update_from_context(self, context, attr_name):
322 """Disable resource helper in case of baremetal context.
324 And update appropriate node collectd options in context
326 if isinstance(context, NodeContext):
328 context.update_collectd_options_for_node(self.setup_helper.collectd_options,
331 def _collect_resource_kpi(self):
333 status = self.resource.check_if_system_agent_running("collectd")[0]
334 if status == 0 and self._enable:
335 result = self.resource.amqp_collect_nfvi_kpi()
337 result = {"core": result}
340 def start_collect(self):
342 self.resource.initiate_systemagent(self.ssh_helper.bin_path)
343 self.resource.start()
344 self.resource.amqp_process_for_nfvi_kpi()
346 def stop_collect(self):
347 if self.resource and self._enable:
350 def collect_kpi(self):
351 return self._collect_resource_kpi()
354 class ClientResourceHelper(ResourceHelper):
361 def __init__(self, setup_helper):
362 super(ClientResourceHelper, self).__init__(setup_helper)
363 self.vnfd_helper = setup_helper.vnfd_helper
364 self.scenario_helper = setup_helper.scenario_helper
367 self.client_started = Value('i', 0)
368 self.all_ports = None
369 self._queue = Queue()
371 self._terminated = Value('i', 0)
373 def _build_ports(self):
374 self.networks = self.vnfd_helper.port_pairs.networks
375 self.uplink_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.uplink_ports)
376 self.downlink_ports = \
377 self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
378 self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
380 def port_num(self, intf):
381 # by default return port num
382 return self.vnfd_helper.port_num(intf)
384 def get_stats(self, *args, **kwargs):
386 return self.client.get_stats(*args, **kwargs)
388 LOG.error('TRex client not connected')
391 def _get_samples(self, ports, port_pg_id=False):
392 raise NotImplementedError()
394 def _run_traffic_once(self, traffic_profile):
395 traffic_profile.execute_traffic(self)
396 self.client_started.value = 1
397 time.sleep(self.RUN_DURATION)
398 samples = self._get_samples(traffic_profile.ports)
399 time.sleep(self.QUEUE_WAIT_TIME)
400 self._queue.put(samples)
402 def run_traffic(self, traffic_profile):
403 # if we don't do this we can hang waiting for the queue to drain
404 # have to do this in the subprocess
405 self._queue.cancel_join_thread()
406 # fixme: fix passing correct trex config file,
407 # instead of searching the default path
410 self.client = self._connect()
411 if self.client is None:
412 LOG.critical("Failure to Connect ... unable to continue")
415 self.client.reset(ports=self.all_ports)
416 self.client.remove_all_streams(self.all_ports) # remove all streams
417 traffic_profile.register_generator(self)
419 while self._terminated.value == 0:
420 if self._run_traffic_once(traffic_profile):
421 self._terminated.value = 1
423 self.client.stop(self.all_ports)
424 self.client.disconnect()
425 self._terminated.value = 0
427 if self._terminated.value:
428 LOG.debug("traffic generator is stopped")
429 return # return if trex/tg server is stopped.
433 self._terminated.value = 1 # stop client
435 def clear_stats(self, ports=None):
437 ports = self.all_ports
438 self.client.clear_stats(ports=ports)
440 def start(self, ports=None, *args, **kwargs):
441 # pylint: disable=keyword-arg-before-vararg
442 # NOTE(ralonsoh): defining keyworded arguments before variable
443 # positional arguments is a bug. This function definition doesn't work
444 # in Python 2, although it works in Python 3. Reference:
445 # https://www.python.org/dev/peps/pep-3102/
447 ports = self.all_ports
448 self.client.start(ports=ports, *args, **kwargs)
450 def collect_kpi(self):
451 if not self._queue.empty():
452 kpi = self._queue.get()
453 self._result.update(kpi)
454 LOG.debug('Got KPIs from _queue for %s %s',
455 self.scenario_helper.name, self.RESOURCE_WORD)
458 def _connect(self, client=None):
460 client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
461 server=self.vnfd_helper.mgmt_interface["ip"],
462 verbose_level=LoggerApi.VERBOSE_QUIET)
464 # try to connect with 5s intervals
468 for idx2 in range(6):
469 if client.is_connected():
471 LOG.info("Waiting to confirm connection %s .. Attempt %s",
474 client.disconnect(stop_traffic=True, release_ports=True)
476 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
479 if client.is_connected():
482 LOG.critical("Connection failure ..TRex username: %s server: %s",
483 self.vnfd_helper.mgmt_interface["user"],
484 self.vnfd_helper.mgmt_interface["ip"])
487 class Rfc2544ResourceHelper(object):
489 DEFAULT_CORRELATED_TRAFFIC = False
490 DEFAULT_LATENCY = False
491 DEFAULT_TOLERANCE = '0.0001 - 0.0001'
492 DEFAULT_RESOLUTION = '0.1'
494 def __init__(self, scenario_helper):
495 super(Rfc2544ResourceHelper, self).__init__()
496 self.scenario_helper = scenario_helper
497 self._correlated_traffic = None
498 self.iteration = Value('i', 0)
501 self._tolerance_low = None
502 self._tolerance_high = None
503 self._tolerance_precision = None
504 self._resolution = None
508 if self._rfc2544 is None:
509 self._rfc2544 = self.scenario_helper.all_options['rfc2544']
513 def tolerance_low(self):
514 if self._tolerance_low is None:
515 self.get_rfc_tolerance()
516 return self._tolerance_low
519 def tolerance_high(self):
520 if self._tolerance_high is None:
521 self.get_rfc_tolerance()
522 return self._tolerance_high
525 def tolerance_precision(self):
526 if self._tolerance_precision is None:
527 self.get_rfc_tolerance()
528 return self._tolerance_precision
531 def correlated_traffic(self):
532 if self._correlated_traffic is None:
533 self._correlated_traffic = \
534 self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
536 return self._correlated_traffic
540 if self._latency is None:
541 self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
545 def resolution(self):
546 if self._resolution is None:
547 self._resolution = float(self.get_rfc2544('resolution',
548 self.DEFAULT_RESOLUTION))
549 return self._resolution
551 def get_rfc2544(self, name, default=None):
552 return self.rfc2544.get(name, default)
554 def get_rfc_tolerance(self):
555 tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
556 tolerance_iter = iter(sorted(
557 decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
558 tolerance_low = next(tolerance_iter)
559 tolerance_high = next(tolerance_iter, tolerance_low)
560 self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
561 self._tolerance_high = float(tolerance_high)
562 self._tolerance_low = float(tolerance_low)
565 class SampleVNFDeployHelper(object):
567 SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
568 REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
569 SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
571 def __init__(self, vnfd_helper, ssh_helper):
572 super(SampleVNFDeployHelper, self).__init__()
573 self.ssh_helper = ssh_helper
574 self.vnfd_helper = vnfd_helper
576 def deploy_vnfs(self, app_name):
577 vnf_bin = self.ssh_helper.join_bin_path(app_name)
578 exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
582 subprocess.check_output(["rm", "-rf", self.REPO_NAME])
583 subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
585 self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
586 self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
588 build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
590 http_proxy = os.environ.get('http_proxy', '')
591 cmd = "sudo -E %s -s -p='%s'" % (build_script, http_proxy)
593 self.ssh_helper.execute(cmd)
594 vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
595 self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
596 self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
599 class ScenarioHelper(object):
604 'worker_config': '1C/1T',
608 def __init__(self, name):
610 self.scenario_cfg = None
614 return self.scenario_cfg['task_path']
618 return self.scenario_cfg.get('nodes')
621 def all_options(self):
622 return self.scenario_cfg.get('options', {})
626 return self.all_options.get(self.name, {})
630 return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
634 return self.scenario_cfg['topology']
638 test_duration = self.scenario_cfg.get('runner', {}).get('duration',
639 self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT))
640 test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
641 return test_duration if test_duration > test_timeout else test_timeout
643 class SampleVNF(GenericVNF):
644 """ Class providing file-like API for generic VNF implementation """
646 VNF_PROMPT = "pipeline>"
648 WAIT_TIME_FOR_SCRIPT = 10
649 APP_NAME = "SampleVNF"
650 # we run the VNF interactively, so the ssh command will timeout after this long
652 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
653 super(SampleVNF, self).__init__(name, vnfd)
654 self.bin_path = get_nsb_option('bin_path', '')
656 self.scenario_helper = ScenarioHelper(self.name)
657 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
659 if setup_env_helper_type is None:
660 setup_env_helper_type = SetupEnvHelper
662 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
664 self.scenario_helper)
666 self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
668 if resource_helper_type is None:
669 resource_helper_type = ResourceHelper
671 self.resource_helper = resource_helper_type(self.setup_helper)
673 self.context_cfg = None
674 self.pipeline_kwargs = {}
675 self.uplink_ports = None
676 self.downlink_ports = None
677 # NOTE(esm): make QueueFileWrapper invert-able so that we
678 # never have to manage the queues
681 self.queue_wrapper = None
683 self.used_drivers = {}
684 self.vnf_port_pairs = None
685 self._vnf_process = None
687 def _start_vnf(self):
688 self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
689 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
690 self._vnf_process = Process(name=name, target=self._run)
691 self._vnf_process.start()
693 def _vnf_up_post(self):
696 def instantiate(self, scenario_cfg, context_cfg):
697 self._update_collectd_options(scenario_cfg, context_cfg)
698 self.scenario_helper.scenario_cfg = scenario_cfg
699 self.context_cfg = context_cfg
700 self.resource_helper.update_from_context(
701 Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
702 self.scenario_helper.nodes[self.name]
705 # vnf deploy is unsupported, use ansible playbooks
706 if self.scenario_helper.options.get("vnf_deploy", False):
707 self.deploy_helper.deploy_vnfs(self.APP_NAME)
708 self.resource_helper.setup()
711 def _update_collectd_options(self, scenario_cfg, context_cfg):
712 """Update collectd configuration options
713 This function retrieves all collectd options contained in the test case
715 definition builds a single dictionary combining them. The following fragment
716 represents a test case with the collectd options and priorities (1 highest, 3 lowest):
718 schema: yardstick:task:0.1
722 tg__0: trafficgen_0.yardstick
723 vnf__0: vnf_0.yardstick
726 <options> # COLLECTD priority 3
731 <options> # COLLECTD priority 2
736 file: /etc/yardstick/nodes/pod_ixia.yaml # COLLECTD priority 1
738 scenario_options = scenario_cfg.get('options', {})
739 generic_options = scenario_options.get('collectd', {})
740 scenario_node_options = scenario_options.get(self.name, {})\
742 context_node_options = context_cfg.get('nodes', {})\
743 .get(self.name, {}).get('collectd', {})
745 options = generic_options
746 self._update_options(options, scenario_node_options)
747 self._update_options(options, context_node_options)
749 self.setup_helper.collectd_options = options
751 def _update_options(self, options, additional_options):
752 """Update collectd options and plugins dictionary"""
753 for k, v in additional_options.items():
754 if isinstance(v, dict) and k in options:
759 def wait_for_instantiate(self):
761 time.sleep(self.WAIT_TIME) # Give some time for config to load
763 if not self._vnf_process.is_alive():
764 raise RuntimeError("%s VNF process died." % self.APP_NAME)
766 # NOTE(esm): move to QueueFileWrapper
767 while self.q_out.qsize() > 0:
768 buf.append(self.q_out.get())
769 message = ''.join(buf)
770 if self.VNF_PROMPT in message:
771 LOG.info("%s VNF is up and running.", self.APP_NAME)
773 self.queue_wrapper.clear()
774 return self._vnf_process.exitcode
776 if "PANIC" in message:
777 raise RuntimeError("Error starting %s VNF." %
780 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
781 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
782 # Send ENTER to display a new prompt in case the prompt text was corrupted
783 # by other VNF output
784 self.q_in.put('\r\n')
786 def wait_for_initialize(self):
788 vnf_prompt_found = False
789 prompt_command = '\r\n'
790 script_name = 'non_existent_script_name'
791 done_string = 'Cannot open file "{}"'.format(script_name)
792 time.sleep(self.WAIT_TIME) # Give some time for config to load
794 if not self._vnf_process.is_alive():
795 raise RuntimeError("%s VNF process died." % self.APP_NAME)
796 while self.q_out.qsize() > 0:
797 buf.append(self.q_out.get())
798 message = ''.join(buf)
800 if self.VNF_PROMPT in message and not vnf_prompt_found:
801 # Once we got VNF promt, it doesn't mean that the VNF is
802 # up and running/initialized completely. But we can run
803 # addition (any) VNF command and wait for it to complete
804 # as it will be finished ONLY at the end of the VNF
805 # initialization. So, this approach can be used to
806 # indentify that VNF is completely initialized.
807 LOG.info("Got %s VNF prompt.", self.APP_NAME)
808 prompt_command = "run {}\r\n".format(script_name)
809 self.q_in.put(prompt_command)
810 # Cut the buffer since we are not interesting to find
811 # the VNF prompt anymore
812 prompt_pos = message.find(self.VNF_PROMPT)
813 buf = [message[prompt_pos + len(self.VNF_PROMPT):]]
814 vnf_prompt_found = True
817 if done_string in message:
818 LOG.info("%s VNF is up and running.", self.APP_NAME)
820 self.queue_wrapper.clear()
821 return self._vnf_process.exitcode
823 if "PANIC" in message:
824 raise RuntimeError("Error starting %s VNF." %
827 LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
828 time.sleep(self.WAIT_TIME_FOR_SCRIPT)
829 # Send command again to display the expected prompt in case the
830 # expected text was corrupted by other VNF output
831 self.q_in.put(prompt_command)
833 def start_collect(self):
834 self.resource_helper.start_collect()
836 def stop_collect(self):
837 self.resource_helper.stop_collect()
839 def _build_run_kwargs(self):
841 'stdin': self.queue_wrapper,
842 'stdout': self.queue_wrapper,
843 'keep_stdin_open': True,
845 'timeout': self.scenario_helper.timeout,
848 def _build_config(self):
849 return self.setup_helper.build_config()
852 # we can't share ssh paramiko objects to force new connection
853 self.ssh_helper.drop_connection()
854 cmd = self._build_config()
855 # kill before starting
856 self.setup_helper.kill_vnf()
859 self._build_run_kwargs()
860 self.ssh_helper.run(cmd, **self.run_kwargs)
862 def vnf_execute(self, cmd, wait_time=2):
863 """ send cmd to vnf process """
865 LOG.info("%s command: %s", self.APP_NAME, cmd)
866 self.q_in.put("{}\r\n".format(cmd))
867 time.sleep(wait_time)
869 while self.q_out.qsize() > 0:
870 output.append(self.q_out.get())
871 return "".join(output)
873 def _tear_down(self):
877 self.vnf_execute("quit")
878 self.setup_helper.kill_vnf()
880 self.resource_helper.stop_collect()
881 if self._vnf_process is not None:
882 # be proper and join first before we kill
883 LOG.debug("joining before terminate %s", self._vnf_process.name)
884 self._vnf_process.join(constants.PROCESS_JOIN_TIMEOUT)
885 self._vnf_process.terminate()
886 # no terminate children here because we share processes with tg
888 def get_stats(self, *args, **kwargs): # pylint: disable=unused-argument
889 """Method for checking the statistics
891 This method could be overridden in children classes.
893 :return: VNF statistics
895 cmd = 'p {0} stats'.format(self.APP_WORD)
896 out = self.vnf_execute(cmd)
899 def collect_kpi(self):
900 # we can't get KPIs if the VNF is down
901 check_if_process_failed(self._vnf_process, 0.01)
902 stats = self.get_stats()
903 m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
904 physical_node = Context.get_physical_node_from_server(
905 self.scenario_helper.nodes[self.name])
907 result = {"physical_node": physical_node}
909 result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()})
910 result["collect_stats"] = self.resource_helper.collect_kpi()
912 result.update({"packets_in": 0,
914 "packets_dropped": 0})
916 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
919 def scale(self, flavor=""):
920 """The SampleVNF base class doesn't provide the 'scale' feature"""
921 raise y_exceptions.FunctionNotImplemented(
922 function_name='scale', class_name='SampleVNFTrafficGen')
925 class SampleVNFTrafficGen(GenericTrafficGen):
926 """ Class providing file-like API for generic traffic generator """
931 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
932 super(SampleVNFTrafficGen, self).__init__(name, vnfd)
933 self.bin_path = get_nsb_option('bin_path', '')
935 self.scenario_helper = ScenarioHelper(self.name)
936 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
938 if setup_env_helper_type is None:
939 setup_env_helper_type = SetupEnvHelper
941 self.setup_helper = setup_env_helper_type(self.vnfd_helper,
943 self.scenario_helper)
945 if resource_helper_type is None:
946 resource_helper_type = ClientResourceHelper
948 self.resource_helper = resource_helper_type(self.setup_helper)
950 self.runs_traffic = True
951 self.traffic_finished = False
952 self._tg_process = None
953 self._traffic_process = None
954 self._tasks_queue = JoinableQueue()
955 self._result_queue = Queue()
957 def _test_runner(self, traffic_profile, tasks, results):
958 self.resource_helper.run_test(traffic_profile, tasks, results)
960 def _init_traffic_process(self, traffic_profile):
961 name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
962 traffic_profile.__class__.__name__,
964 self._traffic_process = Process(name=name, target=self._test_runner,
966 traffic_profile, self._tasks_queue,
969 self._traffic_process.start()
970 while self.resource_helper.client_started.value == 0:
972 if not self._traffic_process.is_alive():
975 def run_traffic_once(self, traffic_profile):
976 if self.resource_helper.client_started.value == 0:
977 self._init_traffic_process(traffic_profile)
979 # continue test - run next iteration
980 LOG.info("Run next iteration ...")
981 self._tasks_queue.put('RUN_TRAFFIC')
983 def wait_on_traffic(self):
984 self._tasks_queue.join()
985 result = self._result_queue.get()
988 def _start_server(self):
989 # we can't share ssh paramiko objects to force new connection
990 self.ssh_helper.drop_connection()
992 def instantiate(self, scenario_cfg, context_cfg):
993 self.scenario_helper.scenario_cfg = scenario_cfg
994 self.resource_helper.update_from_context(
995 Context.get_context_from_server(self.scenario_helper.nodes[self.name]),
996 self.scenario_helper.nodes[self.name]
999 self.resource_helper.context_cfg = context_cfg
1001 self.resource_helper.setup()
1002 # must generate_cfg after DPDK bind because we need port number
1003 self.resource_helper.generate_cfg()
1005 LOG.info("Starting %s server...", self.APP_NAME)
1006 name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
1007 self._tg_process = Process(name=name, target=self._start_server)
1008 self._tg_process.start()
1010 def _check_status(self):
1011 raise NotImplementedError
1013 def _wait_for_process(self):
1015 if not self._tg_process.is_alive():
1016 raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
1017 LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
1019 status = self._check_status()
1021 LOG.info("%s TG Server is up and running.", self.APP_NAME)
1022 return self._tg_process.exitcode
1024 def _traffic_runner(self, traffic_profile):
1025 # always drop connections first thing in new processes
1026 # so we don't get paramiko errors
1027 self.ssh_helper.drop_connection()
1028 LOG.info("Starting %s client...", self.APP_NAME)
1029 self.resource_helper.run_traffic(traffic_profile)
1031 def run_traffic(self, traffic_profile):
1032 """ Generate traffic on the wire according to the given params.
1033 Method is non-blocking, returns immediately when traffic process
1034 is running. Mandatory.
1036 :param traffic_profile:
1039 name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
1041 self._traffic_process = Process(name=name, target=self._traffic_runner,
1042 args=(traffic_profile,))
1043 self._traffic_process.start()
1044 # Wait for traffic process to start
1045 while self.resource_helper.client_started.value == 0:
1046 time.sleep(self.RUN_WAIT)
1047 # what if traffic process takes a few seconds to start?
1048 if not self._traffic_process.is_alive():
1051 return self._traffic_process.is_alive()
1053 def collect_kpi(self):
1054 # check if the tg processes have exited
1055 physical_node = Context.get_physical_node_from_server(
1056 self.scenario_helper.nodes[self.name])
1058 result = {"physical_node": physical_node}
1059 for proc in (self._tg_process, self._traffic_process):
1060 check_if_process_failed(proc)
1062 result["collect_stats"] = self.resource_helper.collect_kpi()
1063 LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
1066 def terminate(self):
1067 """ After this method finishes, all traffic processes should stop. Mandatory.
1071 self.traffic_finished = True
1072 # we must kill client before we kill the server, or the client will raise exception
1073 if self._traffic_process is not None:
1074 # be proper and try to join before terminating
1075 LOG.debug("joining before terminate %s", self._traffic_process.name)
1076 self._traffic_process.join(constants.PROCESS_JOIN_TIMEOUT)
1077 self._traffic_process.terminate()
1078 if self._tg_process is not None:
1079 # be proper and try to join before terminating
1080 LOG.debug("joining before terminate %s", self._tg_process.name)
1081 self._tg_process.join(constants.PROCESS_JOIN_TIMEOUT)
1082 self._tg_process.terminate()
1083 # no terminate children here because we share processes with vnf
1085 def scale(self, flavor=""):
1086 """A traffic generator VFN doesn't provide the 'scale' feature"""
1087 raise y_exceptions.FunctionNotImplemented(
1088 function_name='scale', class_name='SampleVNFTrafficGen')