1 # Copyright (c) 2016-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
24 from multiprocessing import Queue, Process, JoinableQueue
26 from yardstick.common import utils
27 from yardstick.common import exceptions
28 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
29 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
31 from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
34 LOG = logging.getLogger(__name__)
36 WAIT_AFTER_CFG_LOAD = 10
38 WAIT_PROTOCOLS_STARTED = 420
41 class IxiaBasicScenario(object):
42 """Ixia Basic scenario for flow from port to port"""
44 def __init__(self, client, context_cfg, ixia_cfg):
47 self.context_cfg = context_cfg
48 self.ixia_cfg = ixia_cfg
50 self._uplink_vports = None
51 self._downlink_vports = None
53 def apply_config(self):
56 def run_protocols(self):
59 def stop_protocols(self):
62 def create_traffic_model(self, traffic_profile=None):
63 # pylint: disable=unused-argument
64 vports = self.client.get_vports()
65 self._uplink_vports = vports[::2]
66 self._downlink_vports = vports[1::2]
67 self.client.create_traffic_model(self._uplink_vports,
68 self._downlink_vports)
71 return self.client.get_statistics()
73 def generate_samples(self, resource_helper, ports, duration):
74 stats = self._get_stats()
77 # this is not DPDK port num, but this is whatever number we gave
78 # when we selected ports and programmed the profile
79 for port_num in ports:
81 # reverse lookup port name from port_num so the stats dict is descriptive
82 intf = resource_helper.vnfd_helper.find_interface_by_port(port_num)
83 port_name = intf['name']
84 avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num]
85 min_latency = stats['Store-Forward_Min_latency_ns'][port_num]
86 max_latency = stats['Store-Forward_Max_latency_ns'][port_num]
87 samples[port_name] = {
88 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]),
89 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]),
90 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]),
91 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]),
92 'in_packets': int(stats['Valid_Frames_Rx'][port_num]),
93 'out_packets': int(stats['Frames_Tx'][port_num]),
94 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration,
95 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration,
96 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
97 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
98 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
105 def update_tracking_options(self):
108 def get_tc_rfc2544_options(self):
112 class IxiaL3Scenario(IxiaBasicScenario):
113 """Ixia scenario for L3 flow between static ip's"""
115 def _add_static_ips(self):
116 vports = self.client.get_vports()
117 uplink_intf_vport = [(self.client.get_static_interface(vport), vport)
118 for vport in vports[::2]]
119 downlink_intf_vport = [(self.client.get_static_interface(vport), vport)
120 for vport in vports[1::2]]
122 for index in range(len(uplink_intf_vport)):
123 intf, vport = uplink_intf_vport[index]
125 iprange = self.ixia_cfg['flow'].get('src_ip')[index]
126 start_ip = utils.get_ip_range_start(iprange)
127 count = utils.get_ip_range_count(iprange)
128 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
130 raise exceptions.IncorrectFlowOption(
131 option="src_ip", link="uplink_{}".format(index))
133 intf, vport = downlink_intf_vport[index]
135 iprange = self.ixia_cfg['flow'].get('dst_ip')[index]
136 start_ip = utils.get_ip_range_start(iprange)
137 count = utils.get_ip_range_count(iprange)
138 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
140 raise exceptions.IncorrectFlowOption(
141 option="dst_ip", link="downlink_{}".format(index))
143 def _add_interfaces(self):
144 vports = self.client.get_vports()
145 uplink_vports = (vport for vport in vports[::2])
146 downlink_vports = (vport for vport in vports[1::2])
148 ix_node = next(node for _, node in self.context_cfg['nodes'].items()
149 if node['role'] == 'IxNet')
151 for intf in ix_node['interfaces'].values():
152 ip = intf.get('local_ip')
153 mac = intf.get('local_mac')
156 gateway = next(route.get('gateway')
157 for route in ix_node.get('routing_table')
158 if route.get('if') == intf.get('ifname'))
159 except StopIteration:
160 LOG.debug("Gateway not provided")
162 if 'uplink' in intf.get('vld_id'):
163 self.client.add_interface(next(uplink_vports),
166 self.client.add_interface(next(downlink_vports),
169 def apply_config(self):
170 self._add_interfaces()
171 self._add_static_ips()
173 def create_traffic_model(self, traffic_profile=None):
174 # pylint: disable=unused-argument
175 vports = self.client.get_vports()
176 self._uplink_vports = vports[::2]
177 self._downlink_vports = vports[1::2]
179 uplink_endpoints = [port + '/protocols/static'
180 for port in self._uplink_vports]
181 downlink_endpoints = [port + '/protocols/static'
182 for port in self._downlink_vports]
184 self.client.create_ipv4_traffic_model(uplink_endpoints,
188 class IxiaPppoeClientScenario(object):
189 def __init__(self, client, context_cfg, ixia_cfg):
193 self._uplink_vports = None
194 self._downlink_vports = None
196 self._access_topologies = []
197 self._core_topologies = []
199 self._context_cfg = context_cfg
200 self._ixia_cfg = ixia_cfg
202 self.device_groups = []
204 def apply_config(self):
205 vports = self.client.get_vports()
206 self._uplink_vports = vports[::2]
207 self._downlink_vports = vports[1::2]
208 self._fill_ixia_config()
209 self._apply_access_network_config()
210 self._apply_core_network_config()
212 def create_traffic_model(self, traffic_profile):
213 endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs(
214 traffic_profile.full_profile)
215 endpoints_obj_pairs = \
216 self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs)
217 if endpoints_obj_pairs:
218 uplink_endpoints = endpoints_obj_pairs[::2]
219 downlink_endpoints = endpoints_obj_pairs[1::2]
221 uplink_endpoints = self._access_topologies
222 downlink_endpoints = self._core_topologies
223 self.client.create_ipv4_traffic_model(uplink_endpoints,
226 def run_protocols(self):
227 LOG.info('PPPoE Scenario - Start Protocols')
228 self.client.start_protocols()
229 utils.wait_until_true(
230 lambda: self.client.is_protocols_running(self.protocols),
231 timeout=WAIT_PROTOCOLS_STARTED, sleep=2)
233 def stop_protocols(self):
234 LOG.info('PPPoE Scenario - Stop Protocols')
235 self.client.stop_protocols()
237 def _get_intf_addr(self, intf):
238 """Retrieve interface IP address and mask
240 :param intf: could be the string which represents IP address
241 with mask (e.g 192.168.10.2/24) or a dictionary with the host
242 name and the port (e.g. {'tg__0': 'xe1'})
243 :return: (tuple) pair of ip address and mask
245 if isinstance(intf, six.string_types):
246 ip, mask = tuple(intf.split('/'))
249 node_name, intf_name = next(iter(intf.items()))
250 node = self._context_cfg["nodes"].get(node_name, {})
251 interface = node.get("interfaces", {})[intf_name]
252 ip = interface["local_ip"]
253 mask = interface["netmask"]
254 ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)),
256 return ip, ipaddr.prefixlen
259 def _get_endpoints_src_dst_id_pairs(flows_params):
260 """Get list of flows src/dst port pairs
262 Create list of flows src/dst port pairs based on traffic profile
263 flows data. Each uplink/downlink pair in traffic profile represents
264 specific flows between the pair of ports.
266 Example ('port' key represents port on which flow will be created):
286 Result list: ['xe0', 'xe1', 'xe2', 'xe3']
288 Result list means that the following flows pairs will be created:
289 - uplink 0: port xe0 <-> port xe1
290 - downlink 0: port xe1 <-> port xe0
291 - uplink 1: port xe2 <-> port xe3
292 - downlink 1: port xe3 <-> port xe2
294 :param flows_params: ordered dict of traffic profile flows params
295 :return: (list) list of flows src/dst ports
297 if len(flows_params) % 2:
298 raise RuntimeError('Number of uplink/downlink pairs'
299 ' in traffic profile is not equal')
301 for flow in flows_params:
302 port = flows_params[flow]['ipv4'].get('port')
305 endpoint_pairs.append(port)
306 return endpoint_pairs
308 def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs):
309 """Create list of uplink/downlink device groups pairs
311 Based on traffic profile options, create list of uplink/downlink
312 device groups pairs between which flow groups will be created:
314 1. In case uplink/downlink flows in traffic profile doesn't have
315 specified 'port' key, flows will be created between topologies
316 on corresponding access and core port.
318 Access topology on xe0: topology1
319 Core topology on xe1: topology2
320 Flows will be created between:
321 topology1 -> topology2
322 topology2 -> topology1
324 2. In case uplink/downlink flows in traffic profile have specified
325 'port' key, flows will be created between device groups on this
327 E.g., for the following traffic profile
336 Flows will be created between:
337 Port xe0 (dg1) -> Port xe1 (dg1)
338 Port xe1 (dg1) -> Port xe0 (dg1)
339 Port xe0 (dg2) -> Port xe3 (dg1)
340 Port xe3 (dg3) -> Port xe0 (dg1)
342 :param endpoints_id_pairs: (list) List of uplink/downlink flows ports
344 :return: (list) list of uplink/downlink device groups descriptors pairs
346 pppoe = self._ixia_cfg['pppoe_client']
347 sessions_per_port = pppoe['sessions_per_port']
348 sessions_per_svlan = pppoe['sessions_per_svlan']
349 svlan_count = int(sessions_per_port / sessions_per_svlan)
351 uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']]
352 downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']]
353 uplink_port_topology_map = zip(uplink_ports, self._access_topologies)
354 downlink_port_topology_map = zip(downlink_ports, self._core_topologies)
356 port_to_dev_group_mapping = {}
357 for port, topology in uplink_port_topology_map:
358 topology_dgs = self.client.get_topology_device_groups(topology)
359 port_to_dev_group_mapping[port] = topology_dgs
360 for port, topology in downlink_port_topology_map:
361 topology_dgs = self.client.get_topology_device_groups(topology)
362 port_to_dev_group_mapping[port] = topology_dgs
364 uplink_endpoints = endpoints_id_pairs[::2]
365 downlink_endpoints = endpoints_id_pairs[1::2]
367 uplink_dev_groups = []
368 group_up = [uplink_endpoints[i:i + svlan_count]
369 for i in range(0, len(uplink_endpoints), svlan_count)]
371 for group in group_up:
372 for i, port in enumerate(group):
373 uplink_dev_groups.append(port_to_dev_group_mapping[port][i])
375 downlink_dev_groups = []
376 for port in downlink_endpoints:
377 downlink_dev_groups.append(port_to_dev_group_mapping[port][0])
379 endpoint_obj_pairs = []
380 [endpoint_obj_pairs.extend([up, down])
381 for up, down in zip(uplink_dev_groups, downlink_dev_groups)]
383 return endpoint_obj_pairs
385 def _fill_ixia_config(self):
386 pppoe = self._ixia_cfg["pppoe_client"]
387 ipv4 = self._ixia_cfg["ipv4_client"]
389 _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]]
390 self._ixia_cfg["pppoe_client"]["ip"] = _ip
392 _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]]
393 self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip
395 addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]]
396 _ip = [addr[0] for addr in addrs]
397 _prefix = [addr[1] for addr in addrs]
399 self._ixia_cfg["ipv4_client"]["ip"] = _ip
400 self._ixia_cfg["ipv4_client"]["prefix"] = _prefix
402 def _apply_access_network_config(self):
403 pppoe = self._ixia_cfg["pppoe_client"]
404 sessions_per_port = pppoe['sessions_per_port']
405 sessions_per_svlan = pppoe['sessions_per_svlan']
406 svlan_count = int(sessions_per_port / sessions_per_svlan)
408 # add topology per uplink port (access network)
409 for access_tp_id, vport in enumerate(self._uplink_vports):
410 name = 'Topology access {}'.format(access_tp_id)
411 tp = self.client.add_topology(name, vport)
412 self._access_topologies.append(tp)
413 # add device group per svlan
414 for dg_id in range(svlan_count):
415 s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count
416 s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id)
417 c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1)
418 name = 'SVLAN {}'.format(s_vlan_id)
419 dg = self.client.add_device_group(tp, name, sessions_per_svlan)
420 self.device_groups.append(dg)
421 # add ethernet layer to device group
422 ethernet = self.client.add_ethernet(dg, 'Ethernet')
423 self.protocols.append(ethernet)
424 self.client.add_vlans(ethernet, [s_vlan, c_vlan])
425 # add ppp over ethernet
426 if 'pap_user' in pppoe:
427 ppp = self.client.add_pppox_client(ethernet, 'pap',
429 pppoe['pap_password'])
431 ppp = self.client.add_pppox_client(ethernet, 'chap',
433 pppoe['chap_password'])
434 self.protocols.append(ppp)
436 def _apply_core_network_config(self):
437 ipv4 = self._ixia_cfg["ipv4_client"]
438 sessions_per_port = ipv4['sessions_per_port']
439 sessions_per_vlan = ipv4['sessions_per_vlan']
440 vlan_count = int(sessions_per_port / sessions_per_vlan)
442 # add topology per downlink port (core network)
443 for core_tp_id, vport in enumerate(self._downlink_vports):
444 name = 'Topology core {}'.format(core_tp_id)
445 tp = self.client.add_topology(name, vport)
446 self._core_topologies.append(tp)
447 # add device group per vlan
448 for dg_id in range(vlan_count):
449 name = 'Core port {}'.format(core_tp_id)
450 dg = self.client.add_device_group(tp, name, sessions_per_vlan)
451 self.device_groups.append(dg)
452 # add ethernet layer to device group
453 ethernet = self.client.add_ethernet(dg, 'Ethernet')
454 self.protocols.append(ethernet)
456 vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count
457 vlan = ixnet_api.Vlan(vlan_id=vlan_id)
458 self.client.add_vlans(ethernet, [vlan])
460 gw_ip = ipv4['gateway_ip'][core_tp_id]
461 # use gw addr to generate ip addr from the same network
462 ip_addr = ipaddress.IPv4Address(gw_ip) + 1
463 ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4',
466 prefix=ipv4['prefix'][core_tp_id],
468 self.protocols.append(ipv4_obj)
470 bgp_peer_obj = self.client.add_bgp(ipv4_obj,
471 dut_ip=ipv4["bgp"]["dut_ip"],
472 local_as=ipv4["bgp"]["as_number"],
473 bgp_type=ipv4["bgp"].get("bgp_type"))
474 self.protocols.append(bgp_peer_obj)
476 def update_tracking_options(self):
479 'tos': {'precedence': 'ipv4Precedence0'},
480 'dscp': {'defaultPHB': 'ipv4DefaultPhb0',
481 'selectorPHB': 'ipv4ClassSelectorPhb0',
482 'assuredPHB': 'ipv4AssuredForwardingPhb0',
483 'expeditedPHB': 'ipv4ExpeditedForwardingPhb0'}
486 prio_trackby_key = 'ipv4Precedence0'
489 priority = list(self._ixia_cfg['priority'])[0]
490 if priority == 'raw':
491 prio_trackby_key = priority_map[priority]
492 elif priority in ['tos', 'dscp']:
493 priority_type = list(self._ixia_cfg['priority'][priority])[0]
494 prio_trackby_key = priority_map[priority][priority_type]
498 tracking_options = ['flowGroup0', 'vlanVlanId0', prio_trackby_key]
499 self.client.set_flow_tracking(tracking_options)
501 def get_tc_rfc2544_options(self):
502 return self._ixia_cfg.get('rfc2544')
504 def _get_stats(self):
505 return self.client.get_pppoe_scenario_statistics()
508 def get_flow_id_data(stats, flow_id, key):
509 result = [float(flow.get(key)) for flow in stats if flow['id'] == flow_id]
510 return sum(result) / len(result)
512 def get_priority_flows_stats(self, samples, duration):
514 priorities = set([flow['IP_Priority'] for flow in samples])
515 for priority in priorities:
517 [int(flow['Tx_Frames']) for flow in samples
518 if flow['IP_Priority'] == priority])
520 [int(flow['Rx_Frames']) for flow in samples
521 if flow['IP_Priority'] == priority])
522 prio_flows_num = len([flow for flow in samples
523 if flow['IP_Priority'] == priority])
524 avg_latency_ns = sum(
525 [int(flow['Store-Forward_Avg_latency_ns']) for flow in samples
526 if flow['IP_Priority'] == priority]) / prio_flows_num
527 min_latency_ns = sum(
528 [int(flow['Store-Forward_Min_latency_ns']) for flow in samples
529 if flow['IP_Priority'] == priority]) / prio_flows_num
530 max_latency_ns = sum(
531 [int(flow['Store-Forward_Max_latency_ns']) for flow in samples
532 if flow['IP_Priority'] == priority]) / prio_flows_num
533 tx_throughput = float(tx_frames) / duration
534 rx_throughput = float(rx_frames) / duration
535 results[priority] = {
536 'in_packets': rx_frames,
537 'out_packets': tx_frames,
538 'RxThroughput': round(rx_throughput, 3),
539 'TxThroughput': round(tx_throughput, 3),
540 'avg_latency_ns': utils.safe_cast(avg_latency_ns, int, 0),
541 'min_latency_ns': utils.safe_cast(min_latency_ns, int, 0),
542 'max_latency_ns': utils.safe_cast(max_latency_ns, int, 0)
546 def generate_samples(self, resource_helper, ports, duration):
548 stats = self._get_stats()
550 ports_stats = stats['port_statistics']
551 flows_stats = stats['flow_statistic']
552 pppoe_subs_per_port = stats['pppox_client_per_port']
554 # Get sorted list of ixia ports names
555 ixia_port_names = sorted([data['port_name'] for data in ports_stats])
557 # Set 'port_id' key for ports stats items
558 for item in ports_stats:
559 port_id = item.pop('port_name').split('-')[-1].strip()
560 item['port_id'] = int(port_id)
562 # Set 'id' key for flows stats items
563 for item in flows_stats:
564 flow_id = item.pop('Flow_Group').split('-')[1].strip()
565 item['id'] = int(flow_id)
567 # Set 'port_id' key for pppoe subs per port stats
568 for item in pppoe_subs_per_port:
569 port_id = item.pop('subs_port').split('-')[-1].strip()
570 item['port_id'] = int(port_id)
572 # Map traffic flows to ports
573 port_flow_map = collections.defaultdict(set)
574 for item in flows_stats:
575 tx_port = item.pop('Tx_Port')
576 tx_port_index = ixia_port_names.index(tx_port)
577 port_flow_map[tx_port_index].update([item['id']])
580 ports_stats = sorted(ports_stats, key=lambda k: k['port_id'])
582 # Get priority flows stats
583 prio_flows_stats = self.get_priority_flows_stats(flows_stats, duration)
584 samples['priority_stats'] = prio_flows_stats
586 # this is not DPDK port num, but this is whatever number we gave
587 # when we selected ports and programmed the profile
588 for port_num in ports:
590 # reverse lookup port name from port_num so the stats dict is descriptive
591 intf = resource_helper.vnfd_helper.find_interface_by_port(port_num)
592 port_name = intf['name']
593 port_id = ports_stats[port_num]['port_id']
595 [port_data for port_data in pppoe_subs_per_port
596 if port_data.get('port_id') == port_id]
599 sum([float(self.get_flow_id_data(
600 flows_stats, flow, 'Store-Forward_Avg_latency_ns'))
601 for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
603 sum([float(self.get_flow_id_data(
604 flows_stats, flow, 'Store-Forward_Min_latency_ns'))
605 for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
607 sum([float(self.get_flow_id_data(
608 flows_stats, flow, 'Store-Forward_Max_latency_ns'))
609 for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
611 samples[port_name] = {
612 'rx_throughput_kps': float(ports_stats[port_num]['Rx_Rate_Kbps']),
613 'tx_throughput_kps': float(ports_stats[port_num]['Tx_Rate_Kbps']),
614 'rx_throughput_mbps': float(ports_stats[port_num]['Rx_Rate_Mbps']),
615 'tx_throughput_mbps': float(ports_stats[port_num]['Tx_Rate_Mbps']),
616 'in_packets': int(ports_stats[port_num]['Valid_Frames_Rx']),
617 'out_packets': int(ports_stats[port_num]['Frames_Tx']),
618 'RxThroughput': float(ports_stats[port_num]['Valid_Frames_Rx']) / duration,
619 'TxThroughput': float(ports_stats[port_num]['Frames_Tx']) / duration,
620 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
621 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
622 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
626 samples[port_name].update(
627 {'sessions_up': int(port_subs_stats[0]['Sessions_Up']),
628 'sessions_down': int(port_subs_stats[0]['Sessions_Down']),
629 'sessions_not_started': int(port_subs_stats[0]['Sessions_Not_Started']),
630 'sessions_total': int(port_subs_stats[0]['Sessions_Total'])}
639 class IxiaRfc2544Helper(Rfc2544ResourceHelper):
642 return self.latency and self.iteration.value > 10
645 class IxiaResourceHelper(ClientResourceHelper):
647 LATENCY_TIME_SLEEP = 120
649 def __init__(self, setup_helper, rfc_helper_type=None):
650 super(IxiaResourceHelper, self).__init__(setup_helper)
651 self.scenario_helper = setup_helper.scenario_helper
653 self._ixia_scenarios = {
654 "IxiaBasic": IxiaBasicScenario,
655 "IxiaL3": IxiaL3Scenario,
656 "IxiaPppoeClient": IxiaPppoeClientScenario,
659 self.client = ixnet_api.IxNextgen()
661 if rfc_helper_type is None:
662 rfc_helper_type = IxiaRfc2544Helper
664 self.rfc_helper = rfc_helper_type(self.scenario_helper)
665 self.uplink_ports = None
666 self.downlink_ports = None
667 self.context_cfg = None
668 self._ix_scenario = None
671 def _connect(self, client=None):
672 self.client.connect(self.vnfd_helper)
675 super(IxiaResourceHelper, self).setup()
676 self._init_ix_scenario()
678 def stop_collect(self):
679 self._ix_scenario.stop_protocols()
680 self._terminated.value = 1
682 def generate_samples(self, ports, duration):
683 return self._ix_scenario.generate_samples(self, ports, duration)
685 def _init_ix_scenario(self):
686 ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic')
688 if ixia_config in self._ixia_scenarios:
689 scenario_type = self._ixia_scenarios[ixia_config]
691 self._ix_scenario = scenario_type(self.client, self.context_cfg,
692 self.scenario_helper.scenario_cfg['options'])
695 "IXIA config type '{}' not supported".format(ixia_config))
697 def _initialize_client(self, traffic_profile):
698 """Initialize the IXIA IxNetwork client and configure the server"""
699 self.client.clear_config()
700 self.client.assign_ports()
701 self._ix_scenario.apply_config()
702 self._ix_scenario.create_traffic_model(traffic_profile)
704 def update_tracking_options(self):
705 self._ix_scenario.update_tracking_options()
707 def run_traffic(self, traffic_profile):
708 if self._terminated.value:
711 min_tol = self.rfc_helper.tolerance_low
712 max_tol = self.rfc_helper.tolerance_high
713 precision = self.rfc_helper.tolerance_precision
714 resolution = self.rfc_helper.resolution
715 default = "00:00:00:00:00:00"
718 traffic_profile.update_traffic_profile(self)
719 self._initialize_client(traffic_profile)
722 for port_name in self.vnfd_helper.port_pairs.all_ports:
723 intf = self.vnfd_helper.find_interface(name=port_name)
724 virt_intf = intf["virtual-interface"]
725 # we only know static traffic id by reading the json
726 # this is used by _get_ixia_trafficrofile
727 port_num = self.vnfd_helper.port_num(intf)
728 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
729 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
731 self._ix_scenario.run_protocols()
734 while not self._terminated.value:
735 first_run = traffic_profile.execute_traffic(self, self.client,
737 self.client_started.value = 1
738 # pylint: disable=unnecessary-lambda
739 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
740 timeout=traffic_profile.config.duration * 2)
741 rfc2544_opts = self._ix_scenario.get_tc_rfc2544_options()
742 samples = self.generate_samples(traffic_profile.ports,
743 traffic_profile.config.duration)
745 completed, samples = traffic_profile.get_drop_percentage(
746 samples, min_tol, max_tol, precision, resolution,
747 first_run=first_run, tc_rfc2544_opts=rfc2544_opts)
748 self._queue.put(samples)
751 self._terminated.value = 1
753 except Exception: # pylint: disable=broad-except
754 LOG.exception('Run Traffic terminated')
756 self._ix_scenario.stop_protocols()
757 self.client_started.value = 0
758 self._terminated.value = 1
760 def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
761 LOG.info("Ixia resource_helper run_test")
762 if self._terminated.value:
765 min_tol = self.rfc_helper.tolerance_low
766 max_tol = self.rfc_helper.tolerance_high
767 precision = self.rfc_helper.tolerance_precision
768 resolution = self.rfc_helper.resolution
769 default = "00:00:00:00:00:00"
772 traffic_profile.update_traffic_profile(self)
773 self._initialize_client(traffic_profile)
776 for port_name in self.vnfd_helper.port_pairs.all_ports:
777 intf = self.vnfd_helper.find_interface(name=port_name)
778 virt_intf = intf["virtual-interface"]
779 # we only know static traffic id by reading the json
780 # this is used by _get_ixia_trafficrofile
781 port_num = self.vnfd_helper.port_num(intf)
782 mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
783 mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
785 self._ix_scenario.run_protocols()
789 self.rfc_helper.iteration.value = 0
790 self.client_started.value = 1
791 while completed is False and not self._terminated.value:
792 LOG.info("Wait for task ...")
795 task = tasks_queue.get(True, 5)
796 except moves.queue.Empty:
799 if task != 'RUN_TRAFFIC':
802 self.rfc_helper.iteration.value += 1
803 LOG.info("Got %s task, start iteration %d", task,
804 self.rfc_helper.iteration.value)
805 first_run = traffic_profile.execute_traffic(self, self.client,
807 # pylint: disable=unnecessary-lambda
808 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
809 timeout=traffic_profile.config.duration * 2)
810 samples = self.generate_samples(traffic_profile.ports,
811 traffic_profile.config.duration)
813 completed, samples = traffic_profile.get_drop_percentage(
814 samples, min_tol, max_tol, precision, resolution,
816 samples['Iteration'] = self.rfc_helper.iteration.value
817 self._queue.put(samples)
820 LOG.debug("IxiaResourceHelper::run_test - test completed")
821 results_queue.put('COMPLETE')
823 results_queue.put('CONTINUE')
824 tasks_queue.task_done()
826 except Exception: # pylint: disable=broad-except
827 LOG.exception('Run Traffic terminated')
829 self._ix_scenario.stop_protocols()
830 self.client_started.value = 0
831 LOG.debug("IxiaResourceHelper::run_test done")
834 class IxiaTrafficGen(SampleVNFTrafficGen):
838 def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
839 if resource_helper_type is None:
840 resource_helper_type = IxiaResourceHelper
842 super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
843 resource_helper_type)
844 self._ixia_traffic_gen = None
845 self.ixia_file_name = ''
846 self.vnf_port_pairs = []
847 self._traffic_process = None
848 self._tasks_queue = JoinableQueue()
849 self._result_queue = Queue()
851 def _check_status(self):
855 self.resource_helper.stop_collect()
856 super(IxiaTrafficGen, self).terminate()
858 def _test_runner(self, traffic_profile, tasks, results):
859 self.resource_helper.run_test(traffic_profile, tasks, results)
861 def _init_traffic_process(self, traffic_profile):
862 name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
863 traffic_profile.__class__.__name__,
865 self._traffic_process = Process(name=name, target=self._test_runner,
867 traffic_profile, self._tasks_queue,
870 self._traffic_process.start()
871 while self.resource_helper.client_started.value == 0:
873 if not self._traffic_process.is_alive():
876 def run_traffic_once(self, traffic_profile):
877 if self.resource_helper.client_started.value == 0:
878 self._init_traffic_process(traffic_profile)
880 # continue test - run next iteration
881 LOG.info("Run next iteration ...")
882 self._tasks_queue.put('RUN_TRAFFIC')
884 def wait_on_traffic(self):
885 self._tasks_queue.join()
886 result = self._result_queue.get()