1 # Copyright (c) 2016-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from multiprocessing import Queue, Process, JoinableQueue
25 from yardstick.common import utils
26 from yardstick.common import exceptions
27 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
29 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
33 LOG = logging.getLogger(__name__)
35 WAIT_AFTER_CFG_LOAD = 10
37 WAIT_PROTOCOLS_STARTED = 360
40 class IxiaBasicScenario(object):
41 """Ixia Basic scenario for flow from port to port"""
43 def __init__(self, client, context_cfg, ixia_cfg):
46 self.context_cfg = context_cfg
47 self.ixia_cfg = ixia_cfg
49 self._uplink_vports = None
50 self._downlink_vports = None
52 def apply_config(self):
55 def run_protocols(self):
58 def stop_protocols(self):
61 def create_traffic_model(self, traffic_profile=None):
62 # pylint: disable=unused-argument
63 vports = self.client.get_vports()
64 self._uplink_vports = vports[::2]
65 self._downlink_vports = vports[1::2]
66 self.client.create_traffic_model(self._uplink_vports,
67 self._downlink_vports)
70 class IxiaL3Scenario(IxiaBasicScenario):
71 """Ixia scenario for L3 flow between static ip's"""
73 def _add_static_ips(self):
74 vports = self.client.get_vports()
75 uplink_intf_vport = [(self.client.get_static_interface(vport), vport)
76 for vport in vports[::2]]
77 downlink_intf_vport = [(self.client.get_static_interface(vport), vport)
78 for vport in vports[1::2]]
80 for index in range(len(uplink_intf_vport)):
81 intf, vport = uplink_intf_vport[index]
83 iprange = self.ixia_cfg['flow'].get('src_ip')[index]
84 start_ip = utils.get_ip_range_start(iprange)
85 count = utils.get_ip_range_count(iprange)
86 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
88 raise exceptions.IncorrectFlowOption(
89 option="src_ip", link="uplink_{}".format(index))
91 intf, vport = downlink_intf_vport[index]
93 iprange = self.ixia_cfg['flow'].get('dst_ip')[index]
94 start_ip = utils.get_ip_range_start(iprange)
95 count = utils.get_ip_range_count(iprange)
96 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
98 raise exceptions.IncorrectFlowOption(
99 option="dst_ip", link="downlink_{}".format(index))
101 def _add_interfaces(self):
102 vports = self.client.get_vports()
103 uplink_vports = (vport for vport in vports[::2])
104 downlink_vports = (vport for vport in vports[1::2])
106 ix_node = next(node for _, node in self.context_cfg['nodes'].items()
107 if node['role'] == 'IxNet')
109 for intf in ix_node['interfaces'].values():
110 ip = intf.get('local_ip')
111 mac = intf.get('local_mac')
114 gateway = next(route.get('gateway')
115 for route in ix_node.get('routing_table')
116 if route.get('if') == intf.get('ifname'))
117 except StopIteration:
118 LOG.debug("Gateway not provided")
120 if 'uplink' in intf.get('vld_id'):
121 self.client.add_interface(next(uplink_vports),
124 self.client.add_interface(next(downlink_vports),
127 def apply_config(self):
128 self._add_interfaces()
129 self._add_static_ips()
131 def create_traffic_model(self, traffic_profile=None):
132 # pylint: disable=unused-argument
133 vports = self.client.get_vports()
134 self._uplink_vports = vports[::2]
135 self._downlink_vports = vports[1::2]
137 uplink_endpoints = [port + '/protocols/static'
138 for port in self._uplink_vports]
139 downlink_endpoints = [port + '/protocols/static'
140 for port in self._downlink_vports]
142 self.client.create_ipv4_traffic_model(uplink_endpoints,
146 class IxiaPppoeClientScenario(object):
147 def __init__(self, client, context_cfg, ixia_cfg):
151 self._uplink_vports = None
152 self._downlink_vports = None
154 self._access_topologies = []
155 self._core_topologies = []
157 self._context_cfg = context_cfg
158 self._ixia_cfg = ixia_cfg
160 self.device_groups = []
162 def apply_config(self):
163 vports = self.client.get_vports()
164 self._uplink_vports = vports[::2]
165 self._downlink_vports = vports[1::2]
166 self._fill_ixia_config()
167 self._apply_access_network_config()
168 self._apply_core_network_config()
170 def create_traffic_model(self, traffic_profile):
171 endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs(
172 traffic_profile.full_profile)
173 endpoints_obj_pairs = \
174 self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs)
175 uplink_endpoints = endpoints_obj_pairs[::2]
176 downlink_endpoints = endpoints_obj_pairs[1::2]
177 self.client.create_ipv4_traffic_model(uplink_endpoints,
180 def run_protocols(self):
181 LOG.info('PPPoE Scenario - Start Protocols')
182 self.client.start_protocols()
183 utils.wait_until_true(
184 lambda: self.client.is_protocols_running(self.protocols),
185 timeout=WAIT_PROTOCOLS_STARTED, sleep=2)
187 def stop_protocols(self):
188 LOG.info('PPPoE Scenario - Stop Protocols')
189 self.client.stop_protocols()
191 def _get_intf_addr(self, intf):
192 """Retrieve interface IP address and mask
194 :param intf: could be the string which represents IP address
195 with mask (e.g 192.168.10.2/24) or a dictionary with the host
196 name and the port (e.g. {'tg__0': 'xe1'})
197 :return: (tuple) pair of ip address and mask
199 if isinstance(intf, six.string_types):
200 ip, mask = tuple(intf.split('/'))
203 node_name, intf_name = next(iter(intf.items()))
204 node = self._context_cfg["nodes"].get(node_name, {})
205 interface = node.get("interfaces", {})[intf_name]
206 ip = interface["local_ip"]
207 mask = interface["netmask"]
208 ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)),
210 return ip, ipaddr.prefixlen
213 def _get_endpoints_src_dst_id_pairs(flows_params):
214 """Get list of flows src/dst port pairs
216 Create list of flows src/dst port pairs based on traffic profile
217 flows data. Each uplink/downlink pair in traffic profile represents
218 specific flows between the pair of ports.
220 Example ('port' key represents port on which flow will be created):
240 Result list: ['xe0', 'xe1', 'xe2', 'xe3']
242 Result list means that the following flows pairs will be created:
243 - uplink 0: port xe0 <-> port xe1
244 - downlink 0: port xe1 <-> port xe0
245 - uplink 1: port xe2 <-> port xe3
246 - downlink 1: port xe3 <-> port xe2
248 :param flows_params: ordered dict of traffic profile flows params
249 :return: (list) list of flows src/dst ports
251 if len(flows_params) % 2:
252 raise RuntimeError('Number of uplink/downlink pairs'
253 ' in traffic profile is not equal')
255 for flow in flows_params:
256 port = flows_params[flow]['ipv4'].get('port')
259 endpoint_pairs.append(port)
260 return endpoint_pairs
262 def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs):
263 """Create list of uplink/downlink device groups pairs
265 Based on traffic profile options, create list of uplink/downlink
266 device groups pairs between which flow groups will be created:
268 1. In case uplink/downlink flows in traffic profile doesn't have
269 specified 'port' key, flows will be created between each device
270 group on access port and device group on corresponding core port.
272 Device groups created on access port xe0: dg1, dg2, dg3
273 Device groups created on core port xe1: dg4
274 Flows will be created between:
282 2. In case uplink/downlink flows in traffic profile have specified
283 'port' key, flows will be created between device groups on this
285 E.g., for the following traffic profile
294 Flows will be created between:
295 Port xe0 (dg1) -> Port xe1 (dg1)
296 Port xe1 (dg1) -> Port xe0 (dg1)
297 Port xe0 (dg2) -> Port xe3 (dg1)
298 Port xe3 (dg3) -> Port xe0 (dg1)
300 :param endpoints_id_pairs: (list) List of uplink/downlink flows ports
302 :return: (list) list of uplink/downlink device groups descriptors pairs
304 pppoe = self._ixia_cfg['pppoe_client']
305 sessions_per_port = pppoe['sessions_per_port']
306 sessions_per_svlan = pppoe['sessions_per_svlan']
307 svlan_count = int(sessions_per_port / sessions_per_svlan)
309 uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']]
310 downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']]
311 uplink_port_topology_map = zip(uplink_ports, self._access_topologies)
312 downlink_port_topology_map = zip(downlink_ports, self._core_topologies)
314 port_to_dev_group_mapping = {}
315 for port, topology in uplink_port_topology_map:
316 topology_dgs = self.client.get_topology_device_groups(topology)
317 port_to_dev_group_mapping[port] = topology_dgs
318 for port, topology in downlink_port_topology_map:
319 topology_dgs = self.client.get_topology_device_groups(topology)
320 port_to_dev_group_mapping[port] = topology_dgs
322 uplink_endpoints = endpoints_id_pairs[::2]
323 downlink_endpoints = endpoints_id_pairs[1::2]
325 uplink_dev_groups = []
326 group_up = [uplink_endpoints[i:i + svlan_count]
327 for i in range(0, len(uplink_endpoints), svlan_count)]
329 for group in group_up:
330 for i, port in enumerate(group):
331 uplink_dev_groups.append(port_to_dev_group_mapping[port][i])
333 downlink_dev_groups = []
334 for port in downlink_endpoints:
335 downlink_dev_groups.append(port_to_dev_group_mapping[port][0])
337 endpoint_obj_pairs = []
338 [endpoint_obj_pairs.extend([up, down])
339 for up, down in zip(uplink_dev_groups, downlink_dev_groups)]
341 if not endpoint_obj_pairs:
342 for up, down in zip(uplink_ports, downlink_ports):
343 uplink_dev_groups = port_to_dev_group_mapping[up]
344 downlink_dev_groups = \
345 port_to_dev_group_mapping[down] * len(uplink_dev_groups)
346 [endpoint_obj_pairs.extend(list(i))
347 for i in zip(uplink_dev_groups, downlink_dev_groups)]
348 return endpoint_obj_pairs
350 def _fill_ixia_config(self):
351 pppoe = self._ixia_cfg["pppoe_client"]
352 ipv4 = self._ixia_cfg["ipv4_client"]
354 _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]]
355 self._ixia_cfg["pppoe_client"]["ip"] = _ip
357 _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]]
358 self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip
360 addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]]
361 _ip = [addr[0] for addr in addrs]
362 _prefix = [addr[1] for addr in addrs]
364 self._ixia_cfg["ipv4_client"]["ip"] = _ip
365 self._ixia_cfg["ipv4_client"]["prefix"] = _prefix
367 def _apply_access_network_config(self):
368 pppoe = self._ixia_cfg["pppoe_client"]
369 sessions_per_port = pppoe['sessions_per_port']
370 sessions_per_svlan = pppoe['sessions_per_svlan']
371 svlan_count = int(sessions_per_port / sessions_per_svlan)
373 # add topology per uplink port (access network)
374 for access_tp_id, vport in enumerate(self._uplink_vports):
375 name = 'Topology access {}'.format(access_tp_id)
376 tp = self.client.add_topology(name, vport)
377 self._access_topologies.append(tp)
378 # add device group per svlan
379 for dg_id in range(svlan_count):
380 s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count
381 s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id)
382 c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1)
383 name = 'SVLAN {}'.format(s_vlan_id)
384 dg = self.client.add_device_group(tp, name, sessions_per_svlan)
385 self.device_groups.append(dg)
386 # add ethernet layer to device group
387 ethernet = self.client.add_ethernet(dg, 'Ethernet')
388 self.protocols.append(ethernet)
389 self.client.add_vlans(ethernet, [s_vlan, c_vlan])
390 # add ppp over ethernet
391 if 'pap_user' in pppoe:
392 ppp = self.client.add_pppox_client(ethernet, 'pap',
394 pppoe['pap_password'])
396 ppp = self.client.add_pppox_client(ethernet, 'chap',
398 pppoe['chap_password'])
399 self.protocols.append(ppp)
401 def _apply_core_network_config(self):
402 ipv4 = self._ixia_cfg["ipv4_client"]
403 sessions_per_port = ipv4['sessions_per_port']
404 sessions_per_vlan = ipv4['sessions_per_vlan']
405 vlan_count = int(sessions_per_port / sessions_per_vlan)
407 # add topology per downlink port (core network)
408 for core_tp_id, vport in enumerate(self._downlink_vports):
409 name = 'Topology core {}'.format(core_tp_id)
410 tp = self.client.add_topology(name, vport)
411 self._core_topologies.append(tp)
412 # add device group per vlan
413 for dg_id in range(vlan_count):
414 name = 'Core port {}'.format(core_tp_id)
415 dg = self.client.add_device_group(tp, name, sessions_per_vlan)
416 self.device_groups.append(dg)
417 # add ethernet layer to device group
418 ethernet = self.client.add_ethernet(dg, 'Ethernet')
419 self.protocols.append(ethernet)
421 vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count
422 vlan = ixnet_api.Vlan(vlan_id=vlan_id)
423 self.client.add_vlans(ethernet, [vlan])
425 gw_ip = ipv4['gateway_ip'][core_tp_id]
426 # use gw addr to generate ip addr from the same network
427 ip_addr = ipaddress.IPv4Address(gw_ip) + 1
428 ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4',
431 prefix=ipv4['prefix'][core_tp_id],
433 self.protocols.append(ipv4_obj)
435 bgp_peer_obj = self.client.add_bgp(ipv4_obj,
436 dut_ip=ipv4["bgp"]["dut_ip"],
437 local_as=ipv4["bgp"]["as_number"],
438 bgp_type=ipv4["bgp"].get("bgp_type"))
439 self.protocols.append(bgp_peer_obj)
442 class IxiaRfc2544Helper(Rfc2544ResourceHelper):
445 return self.latency and self.iteration.value > 10
448 class IxiaResourceHelper(ClientResourceHelper):
450 LATENCY_TIME_SLEEP = 120
452 def __init__(self, setup_helper, rfc_helper_type=None):
453 super(IxiaResourceHelper, self).__init__(setup_helper)
454 self.scenario_helper = setup_helper.scenario_helper
456 self._ixia_scenarios = {
457 "IxiaBasic": IxiaBasicScenario,
458 "IxiaL3": IxiaL3Scenario,
459 "IxiaPppoeClient": IxiaPppoeClientScenario,
462 self.client = ixnet_api.IxNextgen()
464 if rfc_helper_type is None:
465 rfc_helper_type = IxiaRfc2544Helper
467 self.rfc_helper = rfc_helper_type(self.scenario_helper)
468 self.uplink_ports = None
469 self.downlink_ports = None
470 self.context_cfg = None
471 self._ix_scenario = None
474 def _connect(self, client=None):
475 self.client.connect(self.vnfd_helper)
477 def get_stats(self, *args, **kwargs):
478 return self.client.get_statistics()
481 super(IxiaResourceHelper, self).setup()
482 self._init_ix_scenario()
484 def stop_collect(self):
485 self._ix_scenario.stop_protocols()
486 self._terminated.value = 1
488 def generate_samples(self, ports, duration):
489 stats = self.get_stats()
492 # this is not DPDK port num, but this is whatever number we gave
493 # when we selected ports and programmed the profile
494 for port_num in ports:
496 # reverse lookup port name from port_num so the stats dict is descriptive
497 intf = self.vnfd_helper.find_interface_by_port(port_num)
498 port_name = intf['name']
499 avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num]
500 min_latency = stats['Store-Forward_Min_latency_ns'][port_num]
501 max_latency = stats['Store-Forward_Max_latency_ns'][port_num]
502 samples[port_name] = {
503 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]),
504 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]),
505 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]),
506 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]),
507 'in_packets': int(stats['Valid_Frames_Rx'][port_num]),
508 'out_packets': int(stats['Frames_Tx'][port_num]),
509 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration,
510 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration,
511 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
512 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
513 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
520 def _init_ix_scenario(self):
521 ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic')
523 if ixia_config in self._ixia_scenarios:
524 scenario_type = self._ixia_scenarios[ixia_config]
526 self._ix_scenario = scenario_type(self.client, self.context_cfg,
527 self.scenario_helper.scenario_cfg['options'])
530 "IXIA config type '{}' not supported".format(ixia_config))
532 def _initialize_client(self, traffic_profile):
533 """Initialize the IXIA IxNetwork client and configure the server"""
534 self.client.clear_config()
535 self.client.assign_ports()
536 self._ix_scenario.apply_config()
537 self._ix_scenario.create_traffic_model(traffic_profile)
539 def run_traffic(self, traffic_profile):
540 if self._terminated.value:
543 min_tol = self.rfc_helper.tolerance_low
544 max_tol = self.rfc_helper.tolerance_high
545 precision = self.rfc_helper.tolerance_precision
546 default = "00:00:00:00:00:00"
549 traffic_profile.update_traffic_profile(self)
550 self._initialize_client(traffic_profile)
553 for port_name in self.vnfd_helper.port_pairs.all_ports:
554 intf = self.vnfd_helper.find_interface(name=port_name)
555 virt_intf = intf["virtual-interface"]
556 # we only know static traffic id by reading the json
557 # this is used by _get_ixia_trafficrofile
558 port_num = self.vnfd_helper.port_num(intf)
559 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
560 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
562 self._ix_scenario.run_protocols()
565 while not self._terminated.value:
566 first_run = traffic_profile.execute_traffic(
567 self, self.client, mac)
568 self.client_started.value = 1
569 # pylint: disable=unnecessary-lambda
570 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
571 timeout=traffic_profile.config.duration * 2)
572 samples = self.generate_samples(traffic_profile.ports,
573 traffic_profile.config.duration)
575 completed, samples = traffic_profile.get_drop_percentage(
576 samples, min_tol, max_tol, precision, first_run=first_run)
577 self._queue.put(samples)
580 self._terminated.value = 1
582 except Exception: # pylint: disable=broad-except
583 LOG.exception('Run Traffic terminated')
585 self._ix_scenario.stop_protocols()
586 self.client_started.value = 0
587 self._terminated.value = 1
589 def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
590 LOG.info("Ixia resource_helper run_test")
591 if self._terminated.value:
594 min_tol = self.rfc_helper.tolerance_low
595 max_tol = self.rfc_helper.tolerance_high
596 precision = self.rfc_helper.tolerance_precision
597 default = "00:00:00:00:00:00"
600 traffic_profile.update_traffic_profile(self)
601 self._initialize_client(traffic_profile)
604 for port_name in self.vnfd_helper.port_pairs.all_ports:
605 intf = self.vnfd_helper.find_interface(name=port_name)
606 virt_intf = intf["virtual-interface"]
607 # we only know static traffic id by reading the json
608 # this is used by _get_ixia_trafficrofile
609 port_num = self.vnfd_helper.port_num(intf)
610 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
611 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
613 self._ix_scenario.run_protocols()
617 self.client_started.value = 1
618 while completed is False and not self._terminated.value:
619 LOG.info("Wait for task ...")
622 task = tasks_queue.get(True, 5)
623 except moves.queue.Empty:
626 if task != 'RUN_TRAFFIC':
629 LOG.info("Got %s task", task)
630 first_run = traffic_profile.execute_traffic(
631 self, self.client, mac)
632 # pylint: disable=unnecessary-lambda
633 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
634 timeout=traffic_profile.config.duration * 2)
635 samples = self.generate_samples(traffic_profile.ports,
636 traffic_profile.config.duration)
638 completed, samples = traffic_profile.get_drop_percentage(
639 samples, min_tol, max_tol, precision, first_run=first_run)
640 self._queue.put(samples)
643 LOG.debug("IxiaResourceHelper::run_test - test completed")
644 results_queue.put('COMPLETE')
646 results_queue.put('CONTINUE')
647 tasks_queue.task_done()
649 except Exception: # pylint: disable=broad-except
650 LOG.exception('Run Traffic terminated')
652 self._ix_scenario.stop_protocols()
653 self.client_started.value = 0
654 LOG.debug("IxiaResourceHelper::run_test done")
656 def collect_kpi(self):
657 self.rfc_helper.iteration.value += 1
658 return super(IxiaResourceHelper, self).collect_kpi()
661 class IxiaTrafficGen(SampleVNFTrafficGen):
665 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
666 if resource_helper_type is None:
667 resource_helper_type = IxiaResourceHelper
669 super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
670 resource_helper_type)
671 self._ixia_traffic_gen = None
672 self.ixia_file_name = ''
673 self.vnf_port_pairs = []
674 self._traffic_process = None
675 self._tasks_queue = JoinableQueue()
676 self._result_queue = Queue()
678 def _check_status(self):
682 self.resource_helper.stop_collect()
683 super(IxiaTrafficGen, self).terminate()
685 def _test_runner(self, traffic_profile, tasks, results):
686 self.resource_helper.run_test(traffic_profile, tasks, results)
688 def _init_traffic_process(self, traffic_profile):
689 name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
690 traffic_profile.__class__.__name__,
692 self._traffic_process = Process(name=name, target=self._test_runner,
694 traffic_profile, self._tasks_queue,
697 self._traffic_process.start()
698 while self.resource_helper.client_started.value == 0:
700 if not self._traffic_process.is_alive():
703 def run_traffic_once(self, traffic_profile):
704 if self.resource_helper.client_started.value == 0:
705 self._init_traffic_process(traffic_profile)
707 # continue test - run next iteration
708 LOG.info("Run next iteration ...")
709 self._tasks_queue.put('RUN_TRAFFIC')
711 def wait_on_traffic(self):
712 self._tasks_queue.join()
713 result = self._result_queue.get()