Add rate 'resolution' option for IXIA rfc2544 test
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / tg_rfc2544_ixia.py
1 # Copyright (c) 2016-2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import ipaddress
16 import logging
17 import six
18 import os
19 import time
20
21 from six import moves
22
23 from multiprocessing import Queue, Process, JoinableQueue
24
25 from yardstick.common import utils
26 from yardstick.common import exceptions
27 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
29 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
31
32
33 LOG = logging.getLogger(__name__)
34
35 WAIT_AFTER_CFG_LOAD = 10
36 WAIT_FOR_TRAFFIC = 30
37 WAIT_PROTOCOLS_STARTED = 360
38
39
40 class IxiaBasicScenario(object):
41     """Ixia Basic scenario for flow from port to port"""
42
43     def __init__(self, client, context_cfg, ixia_cfg):
44
45         self.client = client
46         self.context_cfg = context_cfg
47         self.ixia_cfg = ixia_cfg
48
49         self._uplink_vports = None
50         self._downlink_vports = None
51
52     def apply_config(self):
53         pass
54
55     def run_protocols(self):
56         pass
57
58     def stop_protocols(self):
59         pass
60
61     def create_traffic_model(self, traffic_profile=None):
62         # pylint: disable=unused-argument
63         vports = self.client.get_vports()
64         self._uplink_vports = vports[::2]
65         self._downlink_vports = vports[1::2]
66         self.client.create_traffic_model(self._uplink_vports,
67                                          self._downlink_vports)
68
69
70 class IxiaL3Scenario(IxiaBasicScenario):
71     """Ixia scenario for L3 flow between static ip's"""
72
73     def _add_static_ips(self):
74         vports = self.client.get_vports()
75         uplink_intf_vport = [(self.client.get_static_interface(vport), vport)
76                              for vport in vports[::2]]
77         downlink_intf_vport = [(self.client.get_static_interface(vport), vport)
78                                for vport in vports[1::2]]
79
80         for index in range(len(uplink_intf_vport)):
81             intf, vport = uplink_intf_vport[index]
82             try:
83                 iprange = self.ixia_cfg['flow'].get('src_ip')[index]
84                 start_ip = utils.get_ip_range_start(iprange)
85                 count = utils.get_ip_range_count(iprange)
86                 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
87             except IndexError:
88                 raise exceptions.IncorrectFlowOption(
89                     option="src_ip", link="uplink_{}".format(index))
90
91             intf, vport = downlink_intf_vport[index]
92             try:
93                 iprange = self.ixia_cfg['flow'].get('dst_ip')[index]
94                 start_ip = utils.get_ip_range_start(iprange)
95                 count = utils.get_ip_range_count(iprange)
96                 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
97             except IndexError:
98                 raise exceptions.IncorrectFlowOption(
99                     option="dst_ip", link="downlink_{}".format(index))
100
101     def _add_interfaces(self):
102         vports = self.client.get_vports()
103         uplink_vports = (vport for vport in vports[::2])
104         downlink_vports = (vport for vport in vports[1::2])
105
106         ix_node = next(node for _, node in self.context_cfg['nodes'].items()
107                        if node['role'] == 'IxNet')
108
109         for intf in ix_node['interfaces'].values():
110             ip = intf.get('local_ip')
111             mac = intf.get('local_mac')
112             gateway = None
113             try:
114                 gateway = next(route.get('gateway')
115                                for route in ix_node.get('routing_table')
116                                if route.get('if') == intf.get('ifname'))
117             except StopIteration:
118                 LOG.debug("Gateway not provided")
119
120             if 'uplink' in intf.get('vld_id'):
121                 self.client.add_interface(next(uplink_vports),
122                                           ip, mac, gateway)
123             else:
124                 self.client.add_interface(next(downlink_vports),
125                                           ip, mac, gateway)
126
127     def apply_config(self):
128         self._add_interfaces()
129         self._add_static_ips()
130
131     def create_traffic_model(self, traffic_profile=None):
132         # pylint: disable=unused-argument
133         vports = self.client.get_vports()
134         self._uplink_vports = vports[::2]
135         self._downlink_vports = vports[1::2]
136
137         uplink_endpoints = [port + '/protocols/static'
138                             for port in self._uplink_vports]
139         downlink_endpoints = [port + '/protocols/static'
140                               for port in self._downlink_vports]
141
142         self.client.create_ipv4_traffic_model(uplink_endpoints,
143                                               downlink_endpoints)
144
145
146 class IxiaPppoeClientScenario(object):
147     def __init__(self, client, context_cfg, ixia_cfg):
148
149         self.client = client
150
151         self._uplink_vports = None
152         self._downlink_vports = None
153
154         self._access_topologies = []
155         self._core_topologies = []
156
157         self._context_cfg = context_cfg
158         self._ixia_cfg = ixia_cfg
159         self.protocols = []
160         self.device_groups = []
161
162     def apply_config(self):
163         vports = self.client.get_vports()
164         self._uplink_vports = vports[::2]
165         self._downlink_vports = vports[1::2]
166         self._fill_ixia_config()
167         self._apply_access_network_config()
168         self._apply_core_network_config()
169
170     def create_traffic_model(self, traffic_profile):
171         endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs(
172             traffic_profile.full_profile)
173         endpoints_obj_pairs = \
174             self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs)
175         uplink_endpoints = endpoints_obj_pairs[::2]
176         downlink_endpoints = endpoints_obj_pairs[1::2]
177         self.client.create_ipv4_traffic_model(uplink_endpoints,
178                                               downlink_endpoints)
179
180     def run_protocols(self):
181         LOG.info('PPPoE Scenario - Start Protocols')
182         self.client.start_protocols()
183         utils.wait_until_true(
184             lambda: self.client.is_protocols_running(self.protocols),
185             timeout=WAIT_PROTOCOLS_STARTED, sleep=2)
186
187     def stop_protocols(self):
188         LOG.info('PPPoE Scenario - Stop Protocols')
189         self.client.stop_protocols()
190
191     def _get_intf_addr(self, intf):
192         """Retrieve interface IP address and mask
193
194         :param intf: could be the string which represents IP address
195         with mask (e.g 192.168.10.2/24) or a dictionary with the host
196         name and the port (e.g. {'tg__0': 'xe1'})
197         :return: (tuple) pair of ip address and mask
198         """
199         if isinstance(intf, six.string_types):
200             ip, mask = tuple(intf.split('/'))
201             return ip, int(mask)
202
203         node_name, intf_name = next(iter(intf.items()))
204         node = self._context_cfg["nodes"].get(node_name, {})
205         interface = node.get("interfaces", {})[intf_name]
206         ip = interface["local_ip"]
207         mask = interface["netmask"]
208         ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)),
209                                       strict=False)
210         return ip, ipaddr.prefixlen
211
212     @staticmethod
213     def _get_endpoints_src_dst_id_pairs(flows_params):
214         """Get list of flows src/dst port pairs
215
216         Create list of flows src/dst port pairs based on traffic profile
217         flows data. Each uplink/downlink pair in traffic profile represents
218         specific flows between the pair of ports.
219
220         Example ('port' key represents port on which flow will be created):
221
222         Input flows data:
223         uplink_0:
224           ipv4:
225             id: 1
226             port: xe0
227         downlink_0:
228           ipv4:
229             id: 2
230             port: xe1
231         uplink_1:
232           ipv4:
233             id: 3
234             port: xe2
235         downlink_1:
236           ipv4:
237             id: 4
238             port: xe3
239
240         Result list: ['xe0', 'xe1', 'xe2', 'xe3']
241
242         Result list means that the following flows pairs will be created:
243         - uplink 0: port xe0 <-> port xe1
244         - downlink 0: port xe1 <-> port xe0
245         - uplink 1: port xe2 <-> port xe3
246         - downlink 1: port xe3 <-> port xe2
247
248         :param flows_params: ordered dict of traffic profile flows params
249         :return: (list) list of flows src/dst ports
250         """
251         if len(flows_params) % 2:
252             raise RuntimeError('Number of uplink/downlink pairs'
253                                ' in traffic profile is not equal')
254         endpoint_pairs = []
255         for flow in flows_params:
256             port = flows_params[flow]['ipv4'].get('port')
257             if port is None:
258                 continue
259             endpoint_pairs.append(port)
260         return endpoint_pairs
261
262     def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs):
263         """Create list of uplink/downlink device groups pairs
264
265         Based on traffic profile options, create list of uplink/downlink
266         device groups pairs between which flow groups will be created:
267
268         1. In case uplink/downlink flows in traffic profile doesn't have
269            specified 'port' key, flows will be created between each device
270            group on access port and device group on corresponding core port.
271            E.g.:
272            Device groups created on access port xe0: dg1, dg2, dg3
273            Device groups created on core port xe1: dg4
274            Flows will be created between:
275            dg1 -> dg4
276            dg4 -> dg1
277            dg2 -> dg4
278            dg4 -> dg2
279            dg3 -> dg4
280            dg4 -> dg3
281
282         2. In case uplink/downlink flows in traffic profile have specified
283            'port' key, flows will be created between device groups on this
284            port.
285            E.g., for the following traffic profile
286            uplink_0:
287              port: xe0
288            downlink_0:
289              port: xe1
290            uplink_1:
291              port: xe0
292            downlink_0:
293              port: xe3
294            Flows will be created between:
295            Port xe0 (dg1) -> Port xe1 (dg1)
296            Port xe1 (dg1) -> Port xe0 (dg1)
297            Port xe0 (dg2) -> Port xe3 (dg1)
298            Port xe3 (dg3) -> Port xe0 (dg1)
299
300         :param endpoints_id_pairs: (list) List of uplink/downlink flows ports
301          pairs
302         :return: (list) list of uplink/downlink device groups descriptors pairs
303         """
304         pppoe = self._ixia_cfg['pppoe_client']
305         sessions_per_port = pppoe['sessions_per_port']
306         sessions_per_svlan = pppoe['sessions_per_svlan']
307         svlan_count = int(sessions_per_port / sessions_per_svlan)
308
309         uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']]
310         downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']]
311         uplink_port_topology_map = zip(uplink_ports, self._access_topologies)
312         downlink_port_topology_map = zip(downlink_ports, self._core_topologies)
313
314         port_to_dev_group_mapping = {}
315         for port, topology in uplink_port_topology_map:
316             topology_dgs = self.client.get_topology_device_groups(topology)
317             port_to_dev_group_mapping[port] = topology_dgs
318         for port, topology in downlink_port_topology_map:
319             topology_dgs = self.client.get_topology_device_groups(topology)
320             port_to_dev_group_mapping[port] = topology_dgs
321
322         uplink_endpoints = endpoints_id_pairs[::2]
323         downlink_endpoints = endpoints_id_pairs[1::2]
324
325         uplink_dev_groups = []
326         group_up = [uplink_endpoints[i:i + svlan_count]
327                     for i in range(0, len(uplink_endpoints), svlan_count)]
328
329         for group in group_up:
330             for i, port in enumerate(group):
331                 uplink_dev_groups.append(port_to_dev_group_mapping[port][i])
332
333         downlink_dev_groups = []
334         for port in downlink_endpoints:
335             downlink_dev_groups.append(port_to_dev_group_mapping[port][0])
336
337         endpoint_obj_pairs = []
338         [endpoint_obj_pairs.extend([up, down])
339          for up, down in zip(uplink_dev_groups, downlink_dev_groups)]
340
341         if not endpoint_obj_pairs:
342             for up, down in zip(uplink_ports, downlink_ports):
343                 uplink_dev_groups = port_to_dev_group_mapping[up]
344                 downlink_dev_groups = \
345                     port_to_dev_group_mapping[down] * len(uplink_dev_groups)
346                 [endpoint_obj_pairs.extend(list(i))
347                  for i in zip(uplink_dev_groups, downlink_dev_groups)]
348         return endpoint_obj_pairs
349
350     def _fill_ixia_config(self):
351         pppoe = self._ixia_cfg["pppoe_client"]
352         ipv4 = self._ixia_cfg["ipv4_client"]
353
354         _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]]
355         self._ixia_cfg["pppoe_client"]["ip"] = _ip
356
357         _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]]
358         self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip
359
360         addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]]
361         _ip = [addr[0] for addr in addrs]
362         _prefix = [addr[1] for addr in addrs]
363
364         self._ixia_cfg["ipv4_client"]["ip"] = _ip
365         self._ixia_cfg["ipv4_client"]["prefix"] = _prefix
366
367     def _apply_access_network_config(self):
368         pppoe = self._ixia_cfg["pppoe_client"]
369         sessions_per_port = pppoe['sessions_per_port']
370         sessions_per_svlan = pppoe['sessions_per_svlan']
371         svlan_count = int(sessions_per_port / sessions_per_svlan)
372
373         # add topology per uplink port (access network)
374         for access_tp_id, vport in enumerate(self._uplink_vports):
375             name = 'Topology access {}'.format(access_tp_id)
376             tp = self.client.add_topology(name, vport)
377             self._access_topologies.append(tp)
378             # add device group per svlan
379             for dg_id in range(svlan_count):
380                 s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count
381                 s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id)
382                 c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1)
383                 name = 'SVLAN {}'.format(s_vlan_id)
384                 dg = self.client.add_device_group(tp, name, sessions_per_svlan)
385                 self.device_groups.append(dg)
386                 # add ethernet layer to device group
387                 ethernet = self.client.add_ethernet(dg, 'Ethernet')
388                 self.protocols.append(ethernet)
389                 self.client.add_vlans(ethernet, [s_vlan, c_vlan])
390                 # add ppp over ethernet
391                 if 'pap_user' in pppoe:
392                     ppp = self.client.add_pppox_client(ethernet, 'pap',
393                                                        pppoe['pap_user'],
394                                                        pppoe['pap_password'])
395                 else:
396                     ppp = self.client.add_pppox_client(ethernet, 'chap',
397                                                        pppoe['chap_user'],
398                                                        pppoe['chap_password'])
399                 self.protocols.append(ppp)
400
401     def _apply_core_network_config(self):
402         ipv4 = self._ixia_cfg["ipv4_client"]
403         sessions_per_port = ipv4['sessions_per_port']
404         sessions_per_vlan = ipv4['sessions_per_vlan']
405         vlan_count = int(sessions_per_port / sessions_per_vlan)
406
407         # add topology per downlink port (core network)
408         for core_tp_id, vport in enumerate(self._downlink_vports):
409             name = 'Topology core {}'.format(core_tp_id)
410             tp = self.client.add_topology(name, vport)
411             self._core_topologies.append(tp)
412             # add device group per vlan
413             for dg_id in range(vlan_count):
414                 name = 'Core port {}'.format(core_tp_id)
415                 dg = self.client.add_device_group(tp, name, sessions_per_vlan)
416                 self.device_groups.append(dg)
417                 # add ethernet layer to device group
418                 ethernet = self.client.add_ethernet(dg, 'Ethernet')
419                 self.protocols.append(ethernet)
420                 if 'vlan' in ipv4:
421                     vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count
422                     vlan = ixnet_api.Vlan(vlan_id=vlan_id)
423                     self.client.add_vlans(ethernet, [vlan])
424                 # add ipv4 layer
425                 gw_ip = ipv4['gateway_ip'][core_tp_id]
426                 # use gw addr to generate ip addr from the same network
427                 ip_addr = ipaddress.IPv4Address(gw_ip) + 1
428                 ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4',
429                                                 addr=ip_addr,
430                                                 addr_step='0.0.0.1',
431                                                 prefix=ipv4['prefix'][core_tp_id],
432                                                 gateway=gw_ip)
433                 self.protocols.append(ipv4_obj)
434                 if ipv4.get("bgp"):
435                     bgp_peer_obj = self.client.add_bgp(ipv4_obj,
436                                                        dut_ip=ipv4["bgp"]["dut_ip"],
437                                                        local_as=ipv4["bgp"]["as_number"],
438                                                        bgp_type=ipv4["bgp"].get("bgp_type"))
439                     self.protocols.append(bgp_peer_obj)
440
441
442 class IxiaRfc2544Helper(Rfc2544ResourceHelper):
443
444     def is_done(self):
445         return self.latency and self.iteration.value > 10
446
447
448 class IxiaResourceHelper(ClientResourceHelper):
449
450     LATENCY_TIME_SLEEP = 120
451
452     def __init__(self, setup_helper, rfc_helper_type=None):
453         super(IxiaResourceHelper, self).__init__(setup_helper)
454         self.scenario_helper = setup_helper.scenario_helper
455
456         self._ixia_scenarios = {
457             "IxiaBasic": IxiaBasicScenario,
458             "IxiaL3": IxiaL3Scenario,
459             "IxiaPppoeClient": IxiaPppoeClientScenario,
460         }
461
462         self.client = ixnet_api.IxNextgen()
463
464         if rfc_helper_type is None:
465             rfc_helper_type = IxiaRfc2544Helper
466
467         self.rfc_helper = rfc_helper_type(self.scenario_helper)
468         self.uplink_ports = None
469         self.downlink_ports = None
470         self.context_cfg = None
471         self._ix_scenario = None
472         self._connect()
473
474     def _connect(self, client=None):
475         self.client.connect(self.vnfd_helper)
476
477     def get_stats(self, *args, **kwargs):
478         return self.client.get_statistics()
479
480     def setup(self):
481         super(IxiaResourceHelper, self).setup()
482         self._init_ix_scenario()
483
484     def stop_collect(self):
485         self._ix_scenario.stop_protocols()
486         self._terminated.value = 1
487
488     def generate_samples(self, ports, duration):
489         stats = self.get_stats()
490
491         samples = {}
492         # this is not DPDK port num, but this is whatever number we gave
493         # when we selected ports and programmed the profile
494         for port_num in ports:
495             try:
496                 # reverse lookup port name from port_num so the stats dict is descriptive
497                 intf = self.vnfd_helper.find_interface_by_port(port_num)
498                 port_name = intf['name']
499                 avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num]
500                 min_latency = stats['Store-Forward_Min_latency_ns'][port_num]
501                 max_latency = stats['Store-Forward_Max_latency_ns'][port_num]
502                 samples[port_name] = {
503                     'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]),
504                     'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]),
505                     'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]),
506                     'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]),
507                     'in_packets': int(stats['Valid_Frames_Rx'][port_num]),
508                     'out_packets': int(stats['Frames_Tx'][port_num]),
509                     'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration,
510                     'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration,
511                     'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
512                     'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
513                     'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
514                 }
515             except IndexError:
516                 pass
517
518         return samples
519
520     def _init_ix_scenario(self):
521         ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic')
522
523         if ixia_config in self._ixia_scenarios:
524             scenario_type = self._ixia_scenarios[ixia_config]
525
526             self._ix_scenario = scenario_type(self.client, self.context_cfg,
527                                               self.scenario_helper.scenario_cfg['options'])
528         else:
529             raise RuntimeError(
530                 "IXIA config type '{}' not supported".format(ixia_config))
531
532     def _initialize_client(self, traffic_profile):
533         """Initialize the IXIA IxNetwork client and configure the server"""
534         self.client.clear_config()
535         self.client.assign_ports()
536         self._ix_scenario.apply_config()
537         self._ix_scenario.create_traffic_model(traffic_profile)
538
539     def run_traffic(self, traffic_profile):
540         if self._terminated.value:
541             return
542
543         min_tol = self.rfc_helper.tolerance_low
544         max_tol = self.rfc_helper.tolerance_high
545         precision = self.rfc_helper.tolerance_precision
546         resolution = self.rfc_helper.resolution
547         default = "00:00:00:00:00:00"
548
549         self._build_ports()
550         traffic_profile.update_traffic_profile(self)
551         self._initialize_client(traffic_profile)
552
553         mac = {}
554         for port_name in self.vnfd_helper.port_pairs.all_ports:
555             intf = self.vnfd_helper.find_interface(name=port_name)
556             virt_intf = intf["virtual-interface"]
557             # we only know static traffic id by reading the json
558             # this is used by _get_ixia_trafficrofile
559             port_num = self.vnfd_helper.port_num(intf)
560             mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
561             mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
562
563         self._ix_scenario.run_protocols()
564
565         try:
566             while not self._terminated.value:
567                 first_run = traffic_profile.execute_traffic(self, self.client,
568                                                             mac)
569                 self.client_started.value = 1
570                 # pylint: disable=unnecessary-lambda
571                 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
572                                       timeout=traffic_profile.config.duration * 2)
573                 samples = self.generate_samples(traffic_profile.ports,
574                                                 traffic_profile.config.duration)
575
576                 completed, samples = traffic_profile.get_drop_percentage(
577                     samples, min_tol, max_tol, precision, resolution,
578                     first_run=first_run)
579                 self._queue.put(samples)
580
581                 if completed:
582                     self._terminated.value = 1
583
584         except Exception:  # pylint: disable=broad-except
585             LOG.exception('Run Traffic terminated')
586
587         self._ix_scenario.stop_protocols()
588         self.client_started.value = 0
589         self._terminated.value = 1
590
591     def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
592         LOG.info("Ixia resource_helper run_test")
593         if self._terminated.value:
594             return
595
596         min_tol = self.rfc_helper.tolerance_low
597         max_tol = self.rfc_helper.tolerance_high
598         precision = self.rfc_helper.tolerance_precision
599         resolution = self.rfc_helper.resolution
600         default = "00:00:00:00:00:00"
601
602         self._build_ports()
603         traffic_profile.update_traffic_profile(self)
604         self._initialize_client(traffic_profile)
605
606         mac = {}
607         for port_name in self.vnfd_helper.port_pairs.all_ports:
608             intf = self.vnfd_helper.find_interface(name=port_name)
609             virt_intf = intf["virtual-interface"]
610             # we only know static traffic id by reading the json
611             # this is used by _get_ixia_trafficrofile
612             port_num = self.vnfd_helper.port_num(intf)
613             mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
614             mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
615
616         self._ix_scenario.run_protocols()
617
618         try:
619             completed = False
620             self.rfc_helper.iteration.value = 0
621             self.client_started.value = 1
622             while completed is False and not self._terminated.value:
623                 LOG.info("Wait for task ...")
624
625                 try:
626                     task = tasks_queue.get(True, 5)
627                 except moves.queue.Empty:
628                     continue
629                 else:
630                     if task != 'RUN_TRAFFIC':
631                         continue
632
633                 self.rfc_helper.iteration.value += 1
634                 LOG.info("Got %s task, start iteration %d", task,
635                          self.rfc_helper.iteration.value)
636                 first_run = traffic_profile.execute_traffic(self, self.client,
637                                                             mac)
638                 # pylint: disable=unnecessary-lambda
639                 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
640                                       timeout=traffic_profile.config.duration * 2)
641                 samples = self.generate_samples(traffic_profile.ports,
642                                                 traffic_profile.config.duration)
643
644                 completed, samples = traffic_profile.get_drop_percentage(
645                     samples, min_tol, max_tol, precision, resolution,
646                     first_run=first_run)
647                 self._queue.put(samples)
648
649                 if completed:
650                     LOG.debug("IxiaResourceHelper::run_test - test completed")
651                     results_queue.put('COMPLETE')
652                 else:
653                     results_queue.put('CONTINUE')
654                 tasks_queue.task_done()
655
656         except Exception:  # pylint: disable=broad-except
657             LOG.exception('Run Traffic terminated')
658
659         self._ix_scenario.stop_protocols()
660         self.client_started.value = 0
661         LOG.debug("IxiaResourceHelper::run_test done")
662
663
664 class IxiaTrafficGen(SampleVNFTrafficGen):
665
666     APP_NAME = 'Ixia'
667
668     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
669         if resource_helper_type is None:
670             resource_helper_type = IxiaResourceHelper
671
672         super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
673                                              resource_helper_type)
674         self._ixia_traffic_gen = None
675         self.ixia_file_name = ''
676         self.vnf_port_pairs = []
677         self._traffic_process = None
678         self._tasks_queue = JoinableQueue()
679         self._result_queue = Queue()
680
681     def _check_status(self):
682         pass
683
684     def terminate(self):
685         self.resource_helper.stop_collect()
686         super(IxiaTrafficGen, self).terminate()
687
688     def _test_runner(self, traffic_profile, tasks, results):
689         self.resource_helper.run_test(traffic_profile, tasks, results)
690
691     def _init_traffic_process(self, traffic_profile):
692         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
693                                     traffic_profile.__class__.__name__,
694                                     os.getpid())
695         self._traffic_process = Process(name=name, target=self._test_runner,
696                                         args=(
697                                         traffic_profile, self._tasks_queue,
698                                         self._result_queue))
699
700         self._traffic_process.start()
701         while self.resource_helper.client_started.value == 0:
702             time.sleep(1)
703             if not self._traffic_process.is_alive():
704                 break
705
706     def run_traffic_once(self, traffic_profile):
707         if self.resource_helper.client_started.value == 0:
708             self._init_traffic_process(traffic_profile)
709
710         # continue test - run next iteration
711         LOG.info("Run next iteration ...")
712         self._tasks_queue.put('RUN_TRAFFIC')
713
714     def wait_on_traffic(self):
715         self._tasks_queue.join()
716         result = self._result_queue.get()
717         return result