1 # Copyright (c) 2016-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from multiprocessing import Queue, Process, JoinableQueue
25 from yardstick.common import utils
26 from yardstick.common import exceptions
27 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
29 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
33 LOG = logging.getLogger(__name__)
35 WAIT_AFTER_CFG_LOAD = 10
37 WAIT_PROTOCOLS_STARTED = 360
40 class IxiaBasicScenario(object):
41 """Ixia Basic scenario for flow from port to port"""
43 def __init__(self, client, context_cfg, ixia_cfg):
46 self.context_cfg = context_cfg
47 self.ixia_cfg = ixia_cfg
49 self._uplink_vports = None
50 self._downlink_vports = None
52 def apply_config(self):
55 def run_protocols(self):
58 def stop_protocols(self):
61 def create_traffic_model(self, traffic_profile=None):
62 # pylint: disable=unused-argument
63 vports = self.client.get_vports()
64 self._uplink_vports = vports[::2]
65 self._downlink_vports = vports[1::2]
66 self.client.create_traffic_model(self._uplink_vports,
67 self._downlink_vports)
70 class IxiaL3Scenario(IxiaBasicScenario):
71 """Ixia scenario for L3 flow between static ip's"""
73 def _add_static_ips(self):
74 vports = self.client.get_vports()
75 uplink_intf_vport = [(self.client.get_static_interface(vport), vport)
76 for vport in vports[::2]]
77 downlink_intf_vport = [(self.client.get_static_interface(vport), vport)
78 for vport in vports[1::2]]
80 for index in range(len(uplink_intf_vport)):
81 intf, vport = uplink_intf_vport[index]
83 iprange = self.ixia_cfg['flow'].get('src_ip')[index]
84 start_ip = utils.get_ip_range_start(iprange)
85 count = utils.get_ip_range_count(iprange)
86 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
88 raise exceptions.IncorrectFlowOption(
89 option="src_ip", link="uplink_{}".format(index))
91 intf, vport = downlink_intf_vport[index]
93 iprange = self.ixia_cfg['flow'].get('dst_ip')[index]
94 start_ip = utils.get_ip_range_start(iprange)
95 count = utils.get_ip_range_count(iprange)
96 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
98 raise exceptions.IncorrectFlowOption(
99 option="dst_ip", link="downlink_{}".format(index))
101 def _add_interfaces(self):
102 vports = self.client.get_vports()
103 uplink_vports = (vport for vport in vports[::2])
104 downlink_vports = (vport for vport in vports[1::2])
106 ix_node = next(node for _, node in self.context_cfg['nodes'].items()
107 if node['role'] == 'IxNet')
109 for intf in ix_node['interfaces'].values():
110 ip = intf.get('local_ip')
111 mac = intf.get('local_mac')
114 gateway = next(route.get('gateway')
115 for route in ix_node.get('routing_table')
116 if route.get('if') == intf.get('ifname'))
117 except StopIteration:
118 LOG.debug("Gateway not provided")
120 if 'uplink' in intf.get('vld_id'):
121 self.client.add_interface(next(uplink_vports),
124 self.client.add_interface(next(downlink_vports),
127 def apply_config(self):
128 self._add_interfaces()
129 self._add_static_ips()
131 def create_traffic_model(self, traffic_profile=None):
132 # pylint: disable=unused-argument
133 vports = self.client.get_vports()
134 self._uplink_vports = vports[::2]
135 self._downlink_vports = vports[1::2]
137 uplink_endpoints = [port + '/protocols/static'
138 for port in self._uplink_vports]
139 downlink_endpoints = [port + '/protocols/static'
140 for port in self._downlink_vports]
142 self.client.create_ipv4_traffic_model(uplink_endpoints,
146 class IxiaPppoeClientScenario(object):
147 def __init__(self, client, context_cfg, ixia_cfg):
151 self._uplink_vports = None
152 self._downlink_vports = None
154 self._access_topologies = []
155 self._core_topologies = []
157 self._context_cfg = context_cfg
158 self._ixia_cfg = ixia_cfg
160 self.device_groups = []
162 def apply_config(self):
163 vports = self.client.get_vports()
164 self._uplink_vports = vports[::2]
165 self._downlink_vports = vports[1::2]
166 self._fill_ixia_config()
167 self._apply_access_network_config()
168 self._apply_core_network_config()
170 def create_traffic_model(self, traffic_profile):
171 endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs(
172 traffic_profile.full_profile)
173 endpoints_obj_pairs = \
174 self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs)
175 uplink_endpoints = endpoints_obj_pairs[::2]
176 downlink_endpoints = endpoints_obj_pairs[1::2]
177 self.client.create_ipv4_traffic_model(uplink_endpoints,
180 def run_protocols(self):
181 LOG.info('PPPoE Scenario - Start Protocols')
182 self.client.start_protocols()
183 utils.wait_until_true(
184 lambda: self.client.is_protocols_running(self.protocols),
185 timeout=WAIT_PROTOCOLS_STARTED, sleep=2)
187 def stop_protocols(self):
188 LOG.info('PPPoE Scenario - Stop Protocols')
189 self.client.stop_protocols()
191 def _get_intf_addr(self, intf):
192 """Retrieve interface IP address and mask
194 :param intf: could be the string which represents IP address
195 with mask (e.g 192.168.10.2/24) or a dictionary with the host
196 name and the port (e.g. {'tg__0': 'xe1'})
197 :return: (tuple) pair of ip address and mask
199 if isinstance(intf, six.string_types):
200 ip, mask = tuple(intf.split('/'))
203 node_name, intf_name = next(iter(intf.items()))
204 node = self._context_cfg["nodes"].get(node_name, {})
205 interface = node.get("interfaces", {})[intf_name]
206 ip = interface["local_ip"]
207 mask = interface["netmask"]
208 ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)),
210 return ip, ipaddr.prefixlen
213 def _get_endpoints_src_dst_id_pairs(flows_params):
214 """Get list of flows src/dst port pairs
216 Create list of flows src/dst port pairs based on traffic profile
217 flows data. Each uplink/downlink pair in traffic profile represents
218 specific flows between the pair of ports.
220 Example ('port' key represents port on which flow will be created):
240 Result list: ['xe0', 'xe1', 'xe2', 'xe3']
242 Result list means that the following flows pairs will be created:
243 - uplink 0: port xe0 <-> port xe1
244 - downlink 0: port xe1 <-> port xe0
245 - uplink 1: port xe2 <-> port xe3
246 - downlink 1: port xe3 <-> port xe2
248 :param flows_params: ordered dict of traffic profile flows params
249 :return: (list) list of flows src/dst ports
251 if len(flows_params) % 2:
252 raise RuntimeError('Number of uplink/downlink pairs'
253 ' in traffic profile is not equal')
255 for flow in flows_params:
256 port = flows_params[flow]['ipv4'].get('port')
259 endpoint_pairs.append(port)
260 return endpoint_pairs
262 def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs):
263 """Create list of uplink/downlink device groups pairs
265 Based on traffic profile options, create list of uplink/downlink
266 device groups pairs between which flow groups will be created:
268 1. In case uplink/downlink flows in traffic profile doesn't have
269 specified 'port' key, flows will be created between each device
270 group on access port and device group on corresponding core port.
272 Device groups created on access port xe0: dg1, dg2, dg3
273 Device groups created on core port xe1: dg4
274 Flows will be created between:
282 2. In case uplink/downlink flows in traffic profile have specified
283 'port' key, flows will be created between device groups on this
285 E.g., for the following traffic profile
294 Flows will be created between:
295 Port xe0 (dg1) -> Port xe1 (dg1)
296 Port xe1 (dg1) -> Port xe0 (dg1)
297 Port xe0 (dg2) -> Port xe3 (dg1)
298 Port xe3 (dg3) -> Port xe0 (dg1)
300 :param endpoints_id_pairs: (list) List of uplink/downlink flows ports
302 :return: (list) list of uplink/downlink device groups descriptors pairs
304 pppoe = self._ixia_cfg['pppoe_client']
305 sessions_per_port = pppoe['sessions_per_port']
306 sessions_per_svlan = pppoe['sessions_per_svlan']
307 svlan_count = int(sessions_per_port / sessions_per_svlan)
309 uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']]
310 downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']]
311 uplink_port_topology_map = zip(uplink_ports, self._access_topologies)
312 downlink_port_topology_map = zip(downlink_ports, self._core_topologies)
314 port_to_dev_group_mapping = {}
315 for port, topology in uplink_port_topology_map:
316 topology_dgs = self.client.get_topology_device_groups(topology)
317 port_to_dev_group_mapping[port] = topology_dgs
318 for port, topology in downlink_port_topology_map:
319 topology_dgs = self.client.get_topology_device_groups(topology)
320 port_to_dev_group_mapping[port] = topology_dgs
322 uplink_endpoints = endpoints_id_pairs[::2]
323 downlink_endpoints = endpoints_id_pairs[1::2]
325 uplink_dev_groups = []
326 group_up = [uplink_endpoints[i:i + svlan_count]
327 for i in range(0, len(uplink_endpoints), svlan_count)]
329 for group in group_up:
330 for i, port in enumerate(group):
331 uplink_dev_groups.append(port_to_dev_group_mapping[port][i])
333 downlink_dev_groups = []
334 for port in downlink_endpoints:
335 downlink_dev_groups.append(port_to_dev_group_mapping[port][0])
337 endpoint_obj_pairs = []
338 [endpoint_obj_pairs.extend([up, down])
339 for up, down in zip(uplink_dev_groups, downlink_dev_groups)]
341 if not endpoint_obj_pairs:
342 for up, down in zip(uplink_ports, downlink_ports):
343 uplink_dev_groups = port_to_dev_group_mapping[up]
344 downlink_dev_groups = \
345 port_to_dev_group_mapping[down] * len(uplink_dev_groups)
346 [endpoint_obj_pairs.extend(list(i))
347 for i in zip(uplink_dev_groups, downlink_dev_groups)]
348 return endpoint_obj_pairs
350 def _fill_ixia_config(self):
351 pppoe = self._ixia_cfg["pppoe_client"]
352 ipv4 = self._ixia_cfg["ipv4_client"]
354 _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]]
355 self._ixia_cfg["pppoe_client"]["ip"] = _ip
357 _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]]
358 self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip
360 addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]]
361 _ip = [addr[0] for addr in addrs]
362 _prefix = [addr[1] for addr in addrs]
364 self._ixia_cfg["ipv4_client"]["ip"] = _ip
365 self._ixia_cfg["ipv4_client"]["prefix"] = _prefix
367 def _apply_access_network_config(self):
368 pppoe = self._ixia_cfg["pppoe_client"]
369 sessions_per_port = pppoe['sessions_per_port']
370 sessions_per_svlan = pppoe['sessions_per_svlan']
371 svlan_count = int(sessions_per_port / sessions_per_svlan)
373 # add topology per uplink port (access network)
374 for access_tp_id, vport in enumerate(self._uplink_vports):
375 name = 'Topology access {}'.format(access_tp_id)
376 tp = self.client.add_topology(name, vport)
377 self._access_topologies.append(tp)
378 # add device group per svlan
379 for dg_id in range(svlan_count):
380 s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count
381 s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id)
382 c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1)
383 name = 'SVLAN {}'.format(s_vlan_id)
384 dg = self.client.add_device_group(tp, name, sessions_per_svlan)
385 self.device_groups.append(dg)
386 # add ethernet layer to device group
387 ethernet = self.client.add_ethernet(dg, 'Ethernet')
388 self.protocols.append(ethernet)
389 self.client.add_vlans(ethernet, [s_vlan, c_vlan])
390 # add ppp over ethernet
391 if 'pap_user' in pppoe:
392 ppp = self.client.add_pppox_client(ethernet, 'pap',
394 pppoe['pap_password'])
396 ppp = self.client.add_pppox_client(ethernet, 'chap',
398 pppoe['chap_password'])
399 self.protocols.append(ppp)
401 def _apply_core_network_config(self):
402 ipv4 = self._ixia_cfg["ipv4_client"]
403 sessions_per_port = ipv4['sessions_per_port']
404 sessions_per_vlan = ipv4['sessions_per_vlan']
405 vlan_count = int(sessions_per_port / sessions_per_vlan)
407 # add topology per downlink port (core network)
408 for core_tp_id, vport in enumerate(self._downlink_vports):
409 name = 'Topology core {}'.format(core_tp_id)
410 tp = self.client.add_topology(name, vport)
411 self._core_topologies.append(tp)
412 # add device group per vlan
413 for dg_id in range(vlan_count):
414 name = 'Core port {}'.format(core_tp_id)
415 dg = self.client.add_device_group(tp, name, sessions_per_vlan)
416 self.device_groups.append(dg)
417 # add ethernet layer to device group
418 ethernet = self.client.add_ethernet(dg, 'Ethernet')
419 self.protocols.append(ethernet)
421 vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count
422 vlan = ixnet_api.Vlan(vlan_id=vlan_id)
423 self.client.add_vlans(ethernet, [vlan])
425 gw_ip = ipv4['gateway_ip'][core_tp_id]
426 # use gw addr to generate ip addr from the same network
427 ip_addr = ipaddress.IPv4Address(gw_ip) + 1
428 ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4',
431 prefix=ipv4['prefix'][core_tp_id],
433 self.protocols.append(ipv4_obj)
435 bgp_peer_obj = self.client.add_bgp(ipv4_obj,
436 dut_ip=ipv4["bgp"]["dut_ip"],
437 local_as=ipv4["bgp"]["as_number"],
438 bgp_type=ipv4["bgp"].get("bgp_type"))
439 self.protocols.append(bgp_peer_obj)
442 class IxiaRfc2544Helper(Rfc2544ResourceHelper):
445 return self.latency and self.iteration.value > 10
448 class IxiaResourceHelper(ClientResourceHelper):
450 LATENCY_TIME_SLEEP = 120
452 def __init__(self, setup_helper, rfc_helper_type=None):
453 super(IxiaResourceHelper, self).__init__(setup_helper)
454 self.scenario_helper = setup_helper.scenario_helper
456 self._ixia_scenarios = {
457 "IxiaBasic": IxiaBasicScenario,
458 "IxiaL3": IxiaL3Scenario,
459 "IxiaPppoeClient": IxiaPppoeClientScenario,
462 self.client = ixnet_api.IxNextgen()
464 if rfc_helper_type is None:
465 rfc_helper_type = IxiaRfc2544Helper
467 self.rfc_helper = rfc_helper_type(self.scenario_helper)
468 self.uplink_ports = None
469 self.downlink_ports = None
470 self.context_cfg = None
471 self._ix_scenario = None
474 def _connect(self, client=None):
475 self.client.connect(self.vnfd_helper)
477 def get_stats(self, *args, **kwargs):
478 return self.client.get_statistics()
481 super(IxiaResourceHelper, self).setup()
482 self._init_ix_scenario()
484 def stop_collect(self):
485 self._ix_scenario.stop_protocols()
486 self._terminated.value = 1
488 def generate_samples(self, ports, duration):
489 stats = self.get_stats()
492 # this is not DPDK port num, but this is whatever number we gave
493 # when we selected ports and programmed the profile
494 for port_num in ports:
496 # reverse lookup port name from port_num so the stats dict is descriptive
497 intf = self.vnfd_helper.find_interface_by_port(port_num)
498 port_name = intf['name']
499 avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num]
500 min_latency = stats['Store-Forward_Min_latency_ns'][port_num]
501 max_latency = stats['Store-Forward_Max_latency_ns'][port_num]
502 samples[port_name] = {
503 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]),
504 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]),
505 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]),
506 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]),
507 'in_packets': int(stats['Valid_Frames_Rx'][port_num]),
508 'out_packets': int(stats['Frames_Tx'][port_num]),
509 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration,
510 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration,
511 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
512 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
513 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
520 def _init_ix_scenario(self):
521 ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic')
523 if ixia_config in self._ixia_scenarios:
524 scenario_type = self._ixia_scenarios[ixia_config]
526 self._ix_scenario = scenario_type(self.client, self.context_cfg,
527 self.scenario_helper.scenario_cfg['options'])
530 "IXIA config type '{}' not supported".format(ixia_config))
532 def _initialize_client(self, traffic_profile):
533 """Initialize the IXIA IxNetwork client and configure the server"""
534 self.client.clear_config()
535 self.client.assign_ports()
536 self._ix_scenario.apply_config()
537 self._ix_scenario.create_traffic_model(traffic_profile)
539 def run_traffic(self, traffic_profile):
540 if self._terminated.value:
543 min_tol = self.rfc_helper.tolerance_low
544 max_tol = self.rfc_helper.tolerance_high
545 precision = self.rfc_helper.tolerance_precision
546 resolution = self.rfc_helper.resolution
547 default = "00:00:00:00:00:00"
550 traffic_profile.update_traffic_profile(self)
551 self._initialize_client(traffic_profile)
554 for port_name in self.vnfd_helper.port_pairs.all_ports:
555 intf = self.vnfd_helper.find_interface(name=port_name)
556 virt_intf = intf["virtual-interface"]
557 # we only know static traffic id by reading the json
558 # this is used by _get_ixia_trafficrofile
559 port_num = self.vnfd_helper.port_num(intf)
560 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
561 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
563 self._ix_scenario.run_protocols()
566 while not self._terminated.value:
567 first_run = traffic_profile.execute_traffic(self, self.client,
569 self.client_started.value = 1
570 # pylint: disable=unnecessary-lambda
571 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
572 timeout=traffic_profile.config.duration * 2)
573 samples = self.generate_samples(traffic_profile.ports,
574 traffic_profile.config.duration)
576 completed, samples = traffic_profile.get_drop_percentage(
577 samples, min_tol, max_tol, precision, resolution,
579 self._queue.put(samples)
582 self._terminated.value = 1
584 except Exception: # pylint: disable=broad-except
585 LOG.exception('Run Traffic terminated')
587 self._ix_scenario.stop_protocols()
588 self.client_started.value = 0
589 self._terminated.value = 1
591 def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
592 LOG.info("Ixia resource_helper run_test")
593 if self._terminated.value:
596 min_tol = self.rfc_helper.tolerance_low
597 max_tol = self.rfc_helper.tolerance_high
598 precision = self.rfc_helper.tolerance_precision
599 resolution = self.rfc_helper.resolution
600 default = "00:00:00:00:00:00"
603 traffic_profile.update_traffic_profile(self)
604 self._initialize_client(traffic_profile)
607 for port_name in self.vnfd_helper.port_pairs.all_ports:
608 intf = self.vnfd_helper.find_interface(name=port_name)
609 virt_intf = intf["virtual-interface"]
610 # we only know static traffic id by reading the json
611 # this is used by _get_ixia_trafficrofile
612 port_num = self.vnfd_helper.port_num(intf)
613 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
614 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
616 self._ix_scenario.run_protocols()
620 self.rfc_helper.iteration.value = 0
621 self.client_started.value = 1
622 while completed is False and not self._terminated.value:
623 LOG.info("Wait for task ...")
626 task = tasks_queue.get(True, 5)
627 except moves.queue.Empty:
630 if task != 'RUN_TRAFFIC':
633 self.rfc_helper.iteration.value += 1
634 LOG.info("Got %s task, start iteration %d", task,
635 self.rfc_helper.iteration.value)
636 first_run = traffic_profile.execute_traffic(self, self.client,
638 # pylint: disable=unnecessary-lambda
639 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
640 timeout=traffic_profile.config.duration * 2)
641 samples = self.generate_samples(traffic_profile.ports,
642 traffic_profile.config.duration)
644 completed, samples = traffic_profile.get_drop_percentage(
645 samples, min_tol, max_tol, precision, resolution,
647 self._queue.put(samples)
650 LOG.debug("IxiaResourceHelper::run_test - test completed")
651 results_queue.put('COMPLETE')
653 results_queue.put('CONTINUE')
654 tasks_queue.task_done()
656 except Exception: # pylint: disable=broad-except
657 LOG.exception('Run Traffic terminated')
659 self._ix_scenario.stop_protocols()
660 self.client_started.value = 0
661 LOG.debug("IxiaResourceHelper::run_test done")
664 class IxiaTrafficGen(SampleVNFTrafficGen):
668 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
669 if resource_helper_type is None:
670 resource_helper_type = IxiaResourceHelper
672 super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
673 resource_helper_type)
674 self._ixia_traffic_gen = None
675 self.ixia_file_name = ''
676 self.vnf_port_pairs = []
677 self._traffic_process = None
678 self._tasks_queue = JoinableQueue()
679 self._result_queue = Queue()
681 def _check_status(self):
685 self.resource_helper.stop_collect()
686 super(IxiaTrafficGen, self).terminate()
688 def _test_runner(self, traffic_profile, tasks, results):
689 self.resource_helper.run_test(traffic_profile, tasks, results)
691 def _init_traffic_process(self, traffic_profile):
692 name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
693 traffic_profile.__class__.__name__,
695 self._traffic_process = Process(name=name, target=self._test_runner,
697 traffic_profile, self._tasks_queue,
700 self._traffic_process.start()
701 while self.resource_helper.client_started.value == 0:
703 if not self._traffic_process.is_alive():
706 def run_traffic_once(self, traffic_profile):
707 if self.resource_helper.client_started.value == 0:
708 self._init_traffic_process(traffic_profile)
710 # continue test - run next iteration
711 LOG.info("Run next iteration ...")
712 self._tasks_queue.put('RUN_TRAFFIC')
714 def wait_on_traffic(self):
715 self._tasks_queue.join()
716 result = self._result_queue.get()