Merge "Add NSB test descriptors for vIPSEC testcase"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / tg_rfc2544_ixia.py
1 # Copyright (c) 2016-2019 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import ipaddress
16 import logging
17 import six
18 import collections
19 import os
20 import time
21
22 from six import moves
23
24 from multiprocessing import Queue, Process, JoinableQueue
25
26 from yardstick.common import utils
27 from yardstick.common import exceptions
28 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
29 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
31 from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
32
33
34 LOG = logging.getLogger(__name__)
35
36 WAIT_AFTER_CFG_LOAD = 10
37 WAIT_FOR_TRAFFIC = 30
38 WAIT_PROTOCOLS_STARTED = 420
39
40
41 class IxiaBasicScenario(object):
42     """Ixia Basic scenario for flow from port to port"""
43
44     def __init__(self, client, context_cfg, ixia_cfg):
45
46         self.client = client
47         self.context_cfg = context_cfg
48         self.ixia_cfg = ixia_cfg
49
50         self._uplink_vports = None
51         self._downlink_vports = None
52
53     def apply_config(self):
54         pass
55
56     def run_protocols(self):
57         pass
58
59     def stop_protocols(self):
60         pass
61
62     def create_traffic_model(self, traffic_profile=None):
63         # pylint: disable=unused-argument
64         vports = self.client.get_vports()
65         self._uplink_vports = vports[::2]
66         self._downlink_vports = vports[1::2]
67         self.client.create_traffic_model(self._uplink_vports,
68                                          self._downlink_vports)
69
70     def _get_stats(self):
71         return self.client.get_statistics()
72
73     def generate_samples(self, resource_helper, ports, duration):
74         stats = self._get_stats()
75
76         samples = {}
77         # this is not DPDK port num, but this is whatever number we gave
78         # when we selected ports and programmed the profile
79         for port_num in ports:
80             try:
81                 # reverse lookup port name from port_num so the stats dict is descriptive
82                 intf = resource_helper.vnfd_helper.find_interface_by_port(port_num)
83                 port_name = intf['name']
84                 avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num]
85                 min_latency = stats['Store-Forward_Min_latency_ns'][port_num]
86                 max_latency = stats['Store-Forward_Max_latency_ns'][port_num]
87                 samples[port_name] = {
88                     'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]),
89                     'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]),
90                     'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]),
91                     'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]),
92                     'in_packets': int(stats['Valid_Frames_Rx'][port_num]),
93                     'out_packets': int(stats['Frames_Tx'][port_num]),
94                     'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration,
95                     'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration,
96                     'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
97                     'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
98                     'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
99                 }
100             except IndexError:
101                 pass
102
103         return samples
104
105     def update_tracking_options(self):
106         pass
107
108     def get_tc_rfc2544_options(self):
109         pass
110
111
112 class IxiaL3Scenario(IxiaBasicScenario):
113     """Ixia scenario for L3 flow between static ip's"""
114
115     def _add_static_ips(self):
116         vports = self.client.get_vports()
117         uplink_intf_vport = [(self.client.get_static_interface(vport), vport)
118                              for vport in vports[::2]]
119         downlink_intf_vport = [(self.client.get_static_interface(vport), vport)
120                                for vport in vports[1::2]]
121
122         for index in range(len(uplink_intf_vport)):
123             intf, vport = uplink_intf_vport[index]
124             try:
125                 iprange = self.ixia_cfg['flow'].get('src_ip')[index]
126                 start_ip = utils.get_ip_range_start(iprange)
127                 count = utils.get_ip_range_count(iprange)
128                 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
129             except IndexError:
130                 raise exceptions.IncorrectFlowOption(
131                     option="src_ip", link="uplink_{}".format(index))
132
133             intf, vport = downlink_intf_vport[index]
134             try:
135                 iprange = self.ixia_cfg['flow'].get('dst_ip')[index]
136                 start_ip = utils.get_ip_range_start(iprange)
137                 count = utils.get_ip_range_count(iprange)
138                 self.client.add_static_ipv4(intf, vport, start_ip, count, '32')
139             except IndexError:
140                 raise exceptions.IncorrectFlowOption(
141                     option="dst_ip", link="downlink_{}".format(index))
142
143     def _add_interfaces(self):
144         vports = self.client.get_vports()
145         uplink_vports = (vport for vport in vports[::2])
146         downlink_vports = (vport for vport in vports[1::2])
147
148         ix_node = next(node for _, node in self.context_cfg['nodes'].items()
149                        if node['role'] == 'IxNet')
150
151         for intf in ix_node['interfaces'].values():
152             ip = intf.get('local_ip')
153             mac = intf.get('local_mac')
154             gateway = None
155             try:
156                 gateway = next(route.get('gateway')
157                                for route in ix_node.get('routing_table')
158                                if route.get('if') == intf.get('ifname'))
159             except StopIteration:
160                 LOG.debug("Gateway not provided")
161
162             if 'uplink' in intf.get('vld_id'):
163                 self.client.add_interface(next(uplink_vports),
164                                           ip, mac, gateway)
165             else:
166                 self.client.add_interface(next(downlink_vports),
167                                           ip, mac, gateway)
168
169     def apply_config(self):
170         self._add_interfaces()
171         self._add_static_ips()
172
173     def create_traffic_model(self, traffic_profile=None):
174         # pylint: disable=unused-argument
175         vports = self.client.get_vports()
176         self._uplink_vports = vports[::2]
177         self._downlink_vports = vports[1::2]
178
179         uplink_endpoints = [port + '/protocols/static'
180                             for port in self._uplink_vports]
181         downlink_endpoints = [port + '/protocols/static'
182                               for port in self._downlink_vports]
183
184         self.client.create_ipv4_traffic_model(uplink_endpoints,
185                                               downlink_endpoints)
186
187
188 class IxiaPppoeClientScenario(object):
189     def __init__(self, client, context_cfg, ixia_cfg):
190
191         self.client = client
192
193         self._uplink_vports = None
194         self._downlink_vports = None
195
196         self._access_topologies = []
197         self._core_topologies = []
198
199         self._context_cfg = context_cfg
200         self._ixia_cfg = ixia_cfg
201         self.protocols = []
202         self.device_groups = []
203
204     def apply_config(self):
205         vports = self.client.get_vports()
206         self._uplink_vports = vports[::2]
207         self._downlink_vports = vports[1::2]
208         self._fill_ixia_config()
209         self._apply_access_network_config()
210         self._apply_core_network_config()
211
212     def create_traffic_model(self, traffic_profile):
213         endpoints_id_pairs = self._get_endpoints_src_dst_id_pairs(
214             traffic_profile.full_profile)
215         endpoints_obj_pairs = \
216             self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs)
217         if endpoints_obj_pairs:
218             uplink_endpoints = endpoints_obj_pairs[::2]
219             downlink_endpoints = endpoints_obj_pairs[1::2]
220         else:
221             uplink_endpoints = self._access_topologies
222             downlink_endpoints = self._core_topologies
223         self.client.create_ipv4_traffic_model(uplink_endpoints,
224                                               downlink_endpoints)
225
226     def run_protocols(self):
227         LOG.info('PPPoE Scenario - Start Protocols')
228         self.client.start_protocols()
229         utils.wait_until_true(
230             lambda: self.client.is_protocols_running(self.protocols),
231             timeout=WAIT_PROTOCOLS_STARTED, sleep=2)
232
233     def stop_protocols(self):
234         LOG.info('PPPoE Scenario - Stop Protocols')
235         self.client.stop_protocols()
236
237     def _get_intf_addr(self, intf):
238         """Retrieve interface IP address and mask
239
240         :param intf: could be the string which represents IP address
241         with mask (e.g 192.168.10.2/24) or a dictionary with the host
242         name and the port (e.g. {'tg__0': 'xe1'})
243         :return: (tuple) pair of ip address and mask
244         """
245         if isinstance(intf, six.string_types):
246             ip, mask = tuple(intf.split('/'))
247             return ip, int(mask)
248
249         node_name, intf_name = next(iter(intf.items()))
250         node = self._context_cfg["nodes"].get(node_name, {})
251         interface = node.get("interfaces", {})[intf_name]
252         ip = interface["local_ip"]
253         mask = interface["netmask"]
254         ipaddr = ipaddress.ip_network(six.text_type('{}/{}'.format(ip, mask)),
255                                       strict=False)
256         return ip, ipaddr.prefixlen
257
258     @staticmethod
259     def _get_endpoints_src_dst_id_pairs(flows_params):
260         """Get list of flows src/dst port pairs
261
262         Create list of flows src/dst port pairs based on traffic profile
263         flows data. Each uplink/downlink pair in traffic profile represents
264         specific flows between the pair of ports.
265
266         Example ('port' key represents port on which flow will be created):
267
268         Input flows data:
269         uplink_0:
270           ipv4:
271             id: 1
272             port: xe0
273         downlink_0:
274           ipv4:
275             id: 2
276             port: xe1
277         uplink_1:
278           ipv4:
279             id: 3
280             port: xe2
281         downlink_1:
282           ipv4:
283             id: 4
284             port: xe3
285
286         Result list: ['xe0', 'xe1', 'xe2', 'xe3']
287
288         Result list means that the following flows pairs will be created:
289         - uplink 0: port xe0 <-> port xe1
290         - downlink 0: port xe1 <-> port xe0
291         - uplink 1: port xe2 <-> port xe3
292         - downlink 1: port xe3 <-> port xe2
293
294         :param flows_params: ordered dict of traffic profile flows params
295         :return: (list) list of flows src/dst ports
296         """
297         if len(flows_params) % 2:
298             raise RuntimeError('Number of uplink/downlink pairs'
299                                ' in traffic profile is not equal')
300         endpoint_pairs = []
301         for flow in flows_params:
302             port = flows_params[flow]['ipv4'].get('port')
303             if port is None:
304                 continue
305             endpoint_pairs.append(port)
306         return endpoint_pairs
307
308     def _get_endpoints_src_dst_obj_pairs(self, endpoints_id_pairs):
309         """Create list of uplink/downlink device groups pairs
310
311         Based on traffic profile options, create list of uplink/downlink
312         device groups pairs between which flow groups will be created:
313
314         1. In case uplink/downlink flows in traffic profile doesn't have
315            specified 'port' key, flows will be created between topologies
316            on corresponding access and core port.
317            E.g.:
318            Access topology on xe0: topology1
319            Core topology on xe1: topology2
320            Flows will be created between:
321            topology1 -> topology2
322            topology2 -> topology1
323
324         2. In case uplink/downlink flows in traffic profile have specified
325            'port' key, flows will be created between device groups on this
326            port.
327            E.g., for the following traffic profile
328            uplink_0:
329              port: xe0
330            downlink_0:
331              port: xe1
332            uplink_1:
333              port: xe0
334            downlink_0:
335              port: xe3
336            Flows will be created between:
337            Port xe0 (dg1) -> Port xe1 (dg1)
338            Port xe1 (dg1) -> Port xe0 (dg1)
339            Port xe0 (dg2) -> Port xe3 (dg1)
340            Port xe3 (dg3) -> Port xe0 (dg1)
341
342         :param endpoints_id_pairs: (list) List of uplink/downlink flows ports
343          pairs
344         :return: (list) list of uplink/downlink device groups descriptors pairs
345         """
346         pppoe = self._ixia_cfg['pppoe_client']
347         sessions_per_port = pppoe['sessions_per_port']
348         sessions_per_svlan = pppoe['sessions_per_svlan']
349         svlan_count = int(sessions_per_port / sessions_per_svlan)
350
351         uplink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['src_ip']]
352         downlink_ports = [p['tg__0'] for p in self._ixia_cfg['flow']['dst_ip']]
353         uplink_port_topology_map = zip(uplink_ports, self._access_topologies)
354         downlink_port_topology_map = zip(downlink_ports, self._core_topologies)
355
356         port_to_dev_group_mapping = {}
357         for port, topology in uplink_port_topology_map:
358             topology_dgs = self.client.get_topology_device_groups(topology)
359             port_to_dev_group_mapping[port] = topology_dgs
360         for port, topology in downlink_port_topology_map:
361             topology_dgs = self.client.get_topology_device_groups(topology)
362             port_to_dev_group_mapping[port] = topology_dgs
363
364         uplink_endpoints = endpoints_id_pairs[::2]
365         downlink_endpoints = endpoints_id_pairs[1::2]
366
367         uplink_dev_groups = []
368         group_up = [uplink_endpoints[i:i + svlan_count]
369                     for i in range(0, len(uplink_endpoints), svlan_count)]
370
371         for group in group_up:
372             for i, port in enumerate(group):
373                 uplink_dev_groups.append(port_to_dev_group_mapping[port][i])
374
375         downlink_dev_groups = []
376         for port in downlink_endpoints:
377             downlink_dev_groups.append(port_to_dev_group_mapping[port][0])
378
379         endpoint_obj_pairs = []
380         [endpoint_obj_pairs.extend([up, down])
381          for up, down in zip(uplink_dev_groups, downlink_dev_groups)]
382
383         return endpoint_obj_pairs
384
385     def _fill_ixia_config(self):
386         pppoe = self._ixia_cfg["pppoe_client"]
387         ipv4 = self._ixia_cfg["ipv4_client"]
388
389         _ip = [self._get_intf_addr(intf)[0] for intf in pppoe["ip"]]
390         self._ixia_cfg["pppoe_client"]["ip"] = _ip
391
392         _ip = [self._get_intf_addr(intf)[0] for intf in ipv4["gateway_ip"]]
393         self._ixia_cfg["ipv4_client"]["gateway_ip"] = _ip
394
395         addrs = [self._get_intf_addr(intf) for intf in ipv4["ip"]]
396         _ip = [addr[0] for addr in addrs]
397         _prefix = [addr[1] for addr in addrs]
398
399         self._ixia_cfg["ipv4_client"]["ip"] = _ip
400         self._ixia_cfg["ipv4_client"]["prefix"] = _prefix
401
402     def _apply_access_network_config(self):
403         pppoe = self._ixia_cfg["pppoe_client"]
404         sessions_per_port = pppoe['sessions_per_port']
405         sessions_per_svlan = pppoe['sessions_per_svlan']
406         svlan_count = int(sessions_per_port / sessions_per_svlan)
407
408         # add topology per uplink port (access network)
409         for access_tp_id, vport in enumerate(self._uplink_vports):
410             name = 'Topology access {}'.format(access_tp_id)
411             tp = self.client.add_topology(name, vport)
412             self._access_topologies.append(tp)
413             # add device group per svlan
414             for dg_id in range(svlan_count):
415                 s_vlan_id = int(pppoe['s_vlan']) + dg_id + access_tp_id * svlan_count
416                 s_vlan = ixnet_api.Vlan(vlan_id=s_vlan_id)
417                 c_vlan = ixnet_api.Vlan(vlan_id=pppoe['c_vlan'], vlan_id_step=1)
418                 name = 'SVLAN {}'.format(s_vlan_id)
419                 dg = self.client.add_device_group(tp, name, sessions_per_svlan)
420                 self.device_groups.append(dg)
421                 # add ethernet layer to device group
422                 ethernet = self.client.add_ethernet(dg, 'Ethernet')
423                 self.protocols.append(ethernet)
424                 self.client.add_vlans(ethernet, [s_vlan, c_vlan])
425                 # add ppp over ethernet
426                 if 'pap_user' in pppoe:
427                     ppp = self.client.add_pppox_client(ethernet, 'pap',
428                                                        pppoe['pap_user'],
429                                                        pppoe['pap_password'])
430                 else:
431                     ppp = self.client.add_pppox_client(ethernet, 'chap',
432                                                        pppoe['chap_user'],
433                                                        pppoe['chap_password'])
434                 self.protocols.append(ppp)
435
436     def _apply_core_network_config(self):
437         ipv4 = self._ixia_cfg["ipv4_client"]
438         sessions_per_port = ipv4['sessions_per_port']
439         sessions_per_vlan = ipv4['sessions_per_vlan']
440         vlan_count = int(sessions_per_port / sessions_per_vlan)
441
442         # add topology per downlink port (core network)
443         for core_tp_id, vport in enumerate(self._downlink_vports):
444             name = 'Topology core {}'.format(core_tp_id)
445             tp = self.client.add_topology(name, vport)
446             self._core_topologies.append(tp)
447             # add device group per vlan
448             for dg_id in range(vlan_count):
449                 name = 'Core port {}'.format(core_tp_id)
450                 dg = self.client.add_device_group(tp, name, sessions_per_vlan)
451                 self.device_groups.append(dg)
452                 # add ethernet layer to device group
453                 ethernet = self.client.add_ethernet(dg, 'Ethernet')
454                 self.protocols.append(ethernet)
455                 if 'vlan' in ipv4:
456                     vlan_id = int(ipv4['vlan']) + dg_id + core_tp_id * vlan_count
457                     vlan = ixnet_api.Vlan(vlan_id=vlan_id)
458                     self.client.add_vlans(ethernet, [vlan])
459                 # add ipv4 layer
460                 gw_ip = ipv4['gateway_ip'][core_tp_id]
461                 # use gw addr to generate ip addr from the same network
462                 ip_addr = ipaddress.IPv4Address(gw_ip) + 1
463                 ipv4_obj = self.client.add_ipv4(ethernet, name='ipv4',
464                                                 addr=ip_addr,
465                                                 addr_step='0.0.0.1',
466                                                 prefix=ipv4['prefix'][core_tp_id],
467                                                 gateway=gw_ip)
468                 self.protocols.append(ipv4_obj)
469                 if ipv4.get("bgp"):
470                     bgp_peer_obj = self.client.add_bgp(ipv4_obj,
471                                                        dut_ip=ipv4["bgp"]["dut_ip"],
472                                                        local_as=ipv4["bgp"]["as_number"],
473                                                        bgp_type=ipv4["bgp"].get("bgp_type"))
474                     self.protocols.append(bgp_peer_obj)
475
476     def update_tracking_options(self):
477         priority_map = {
478             'raw': 'ipv4Raw0',
479             'tos': {'precedence': 'ipv4Precedence0'},
480             'dscp': {'defaultPHB': 'ipv4DefaultPhb0',
481                      'selectorPHB': 'ipv4ClassSelectorPhb0',
482                      'assuredPHB': 'ipv4AssuredForwardingPhb0',
483                      'expeditedPHB': 'ipv4ExpeditedForwardingPhb0'}
484         }
485
486         prio_trackby_key = 'ipv4Precedence0'
487
488         try:
489             priority = list(self._ixia_cfg['priority'])[0]
490             if priority == 'raw':
491                 prio_trackby_key = priority_map[priority]
492             elif priority in ['tos', 'dscp']:
493                 priority_type = list(self._ixia_cfg['priority'][priority])[0]
494                 prio_trackby_key = priority_map[priority][priority_type]
495         except KeyError:
496             pass
497
498         tracking_options = ['flowGroup0', 'vlanVlanId0', prio_trackby_key]
499         self.client.set_flow_tracking(tracking_options)
500
501     def get_tc_rfc2544_options(self):
502         return self._ixia_cfg.get('rfc2544')
503
504     def _get_stats(self):
505         return self.client.get_pppoe_scenario_statistics()
506
507     @staticmethod
508     def get_flow_id_data(stats, flow_id, key):
509         result = [float(flow.get(key)) for flow in stats if flow['id'] == flow_id]
510         return sum(result) / len(result)
511
512     def get_priority_flows_stats(self, samples, duration):
513         results = {}
514         priorities = set([flow['IP_Priority'] for flow in samples])
515         for priority in priorities:
516             tx_frames = sum(
517                 [int(flow['Tx_Frames']) for flow in samples
518                  if flow['IP_Priority'] == priority])
519             rx_frames = sum(
520                 [int(flow['Rx_Frames']) for flow in samples
521                  if flow['IP_Priority'] == priority])
522             prio_flows_num = len([flow for flow in samples
523                                   if flow['IP_Priority'] == priority])
524             avg_latency_ns = sum(
525                 [int(flow['Store-Forward_Avg_latency_ns']) for flow in samples
526                  if flow['IP_Priority'] == priority]) / prio_flows_num
527             min_latency_ns = sum(
528                 [int(flow['Store-Forward_Min_latency_ns']) for flow in samples
529                  if flow['IP_Priority'] == priority]) / prio_flows_num
530             max_latency_ns = sum(
531                 [int(flow['Store-Forward_Max_latency_ns']) for flow in samples
532                  if flow['IP_Priority'] == priority]) / prio_flows_num
533             tx_throughput = float(tx_frames) / duration
534             rx_throughput = float(rx_frames) / duration
535             results[priority] = {
536                 'in_packets': rx_frames,
537                 'out_packets': tx_frames,
538                 'RxThroughput': round(rx_throughput, 3),
539                 'TxThroughput': round(tx_throughput, 3),
540                 'avg_latency_ns': utils.safe_cast(avg_latency_ns, int, 0),
541                 'min_latency_ns': utils.safe_cast(min_latency_ns, int, 0),
542                 'max_latency_ns': utils.safe_cast(max_latency_ns, int, 0)
543             }
544         return results
545
546     def generate_samples(self, resource_helper, ports, duration):
547
548         stats = self._get_stats()
549         samples = {}
550         ports_stats = stats['port_statistics']
551         flows_stats = stats['flow_statistic']
552         pppoe_subs_per_port = stats['pppox_client_per_port']
553
554         # Get sorted list of ixia ports names
555         ixia_port_names = sorted([data['port_name'] for data in ports_stats])
556
557         # Set 'port_id' key for ports stats items
558         for item in ports_stats:
559             port_id = item.pop('port_name').split('-')[-1].strip()
560             item['port_id'] = int(port_id)
561
562         # Set 'id' key for flows stats items
563         for item in flows_stats:
564             flow_id = item.pop('Flow_Group').split('-')[1].strip()
565             item['id'] = int(flow_id)
566
567         # Set 'port_id' key for pppoe subs per port stats
568         for item in pppoe_subs_per_port:
569             port_id = item.pop('subs_port').split('-')[-1].strip()
570             item['port_id'] = int(port_id)
571
572         # Map traffic flows to ports
573         port_flow_map = collections.defaultdict(set)
574         for item in flows_stats:
575             tx_port = item.pop('Tx_Port')
576             tx_port_index = ixia_port_names.index(tx_port)
577             port_flow_map[tx_port_index].update([item['id']])
578
579         # Sort ports stats
580         ports_stats = sorted(ports_stats, key=lambda k: k['port_id'])
581
582         # Get priority flows stats
583         prio_flows_stats = self.get_priority_flows_stats(flows_stats, duration)
584         samples['priority_stats'] = prio_flows_stats
585
586         # this is not DPDK port num, but this is whatever number we gave
587         # when we selected ports and programmed the profile
588         for port_num in ports:
589             try:
590                 # reverse lookup port name from port_num so the stats dict is descriptive
591                 intf = resource_helper.vnfd_helper.find_interface_by_port(port_num)
592                 port_name = intf['name']
593                 port_id = ports_stats[port_num]['port_id']
594                 port_subs_stats = \
595                     [port_data for port_data in pppoe_subs_per_port
596                      if port_data.get('port_id') == port_id]
597
598                 avg_latency = \
599                     sum([float(self.get_flow_id_data(
600                         flows_stats, flow, 'Store-Forward_Avg_latency_ns'))
601                         for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
602                 min_latency = \
603                     sum([float(self.get_flow_id_data(
604                         flows_stats, flow, 'Store-Forward_Min_latency_ns'))
605                         for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
606                 max_latency = \
607                     sum([float(self.get_flow_id_data(
608                         flows_stats, flow, 'Store-Forward_Max_latency_ns'))
609                         for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num])
610
611                 samples[port_name] = {
612                     'rx_throughput_kps': float(ports_stats[port_num]['Rx_Rate_Kbps']),
613                     'tx_throughput_kps': float(ports_stats[port_num]['Tx_Rate_Kbps']),
614                     'rx_throughput_mbps': float(ports_stats[port_num]['Rx_Rate_Mbps']),
615                     'tx_throughput_mbps': float(ports_stats[port_num]['Tx_Rate_Mbps']),
616                     'in_packets': int(ports_stats[port_num]['Valid_Frames_Rx']),
617                     'out_packets': int(ports_stats[port_num]['Frames_Tx']),
618                     'RxThroughput': float(ports_stats[port_num]['Valid_Frames_Rx']) / duration,
619                     'TxThroughput': float(ports_stats[port_num]['Frames_Tx']) / duration,
620                     'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0),
621                     'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0),
622                     'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0)
623                 }
624
625                 if port_subs_stats:
626                     samples[port_name].update(
627                         {'sessions_up': int(port_subs_stats[0]['Sessions_Up']),
628                          'sessions_down': int(port_subs_stats[0]['Sessions_Down']),
629                          'sessions_not_started': int(port_subs_stats[0]['Sessions_Not_Started']),
630                          'sessions_total': int(port_subs_stats[0]['Sessions_Total'])}
631                     )
632
633             except IndexError:
634                 pass
635
636         return samples
637
638
639 class IxiaRfc2544Helper(Rfc2544ResourceHelper):
640
641     def is_done(self):
642         return self.latency and self.iteration.value > 10
643
644
645 class IxiaResourceHelper(ClientResourceHelper):
646
647     LATENCY_TIME_SLEEP = 120
648
649     def __init__(self, setup_helper, rfc_helper_type=None):
650         super(IxiaResourceHelper, self).__init__(setup_helper)
651         self.scenario_helper = setup_helper.scenario_helper
652
653         self._ixia_scenarios = {
654             "IxiaBasic": IxiaBasicScenario,
655             "IxiaL3": IxiaL3Scenario,
656             "IxiaPppoeClient": IxiaPppoeClientScenario,
657         }
658
659         self.client = ixnet_api.IxNextgen()
660
661         if rfc_helper_type is None:
662             rfc_helper_type = IxiaRfc2544Helper
663
664         self.rfc_helper = rfc_helper_type(self.scenario_helper)
665         self.uplink_ports = None
666         self.downlink_ports = None
667         self.context_cfg = None
668         self._ix_scenario = None
669         self._connect()
670
671     def _connect(self, client=None):
672         self.client.connect(self.vnfd_helper)
673
674     def setup(self):
675         super(IxiaResourceHelper, self).setup()
676         self._init_ix_scenario()
677
678     def stop_collect(self):
679         self._ix_scenario.stop_protocols()
680         self._terminated.value = 1
681
682     def generate_samples(self, ports, duration):
683         return self._ix_scenario.generate_samples(self, ports, duration)
684
685     def _init_ix_scenario(self):
686         ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic')
687
688         if ixia_config in self._ixia_scenarios:
689             scenario_type = self._ixia_scenarios[ixia_config]
690
691             self._ix_scenario = scenario_type(self.client, self.context_cfg,
692                                               self.scenario_helper.scenario_cfg['options'])
693         else:
694             raise RuntimeError(
695                 "IXIA config type '{}' not supported".format(ixia_config))
696
697     def _initialize_client(self, traffic_profile):
698         """Initialize the IXIA IxNetwork client and configure the server"""
699         self.client.clear_config()
700         self.client.assign_ports()
701         self._ix_scenario.apply_config()
702         self._ix_scenario.create_traffic_model(traffic_profile)
703
704     def update_tracking_options(self):
705         self._ix_scenario.update_tracking_options()
706
707     def run_traffic(self, traffic_profile):
708         if self._terminated.value:
709             return
710
711         min_tol = self.rfc_helper.tolerance_low
712         max_tol = self.rfc_helper.tolerance_high
713         precision = self.rfc_helper.tolerance_precision
714         resolution = self.rfc_helper.resolution
715         default = "00:00:00:00:00:00"
716
717         self._build_ports()
718         traffic_profile.update_traffic_profile(self)
719         self._initialize_client(traffic_profile)
720
721         mac = {}
722         for port_name in self.vnfd_helper.port_pairs.all_ports:
723             intf = self.vnfd_helper.find_interface(name=port_name)
724             virt_intf = intf["virtual-interface"]
725             # we only know static traffic id by reading the json
726             # this is used by _get_ixia_trafficrofile
727             port_num = self.vnfd_helper.port_num(intf)
728             mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
729             mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
730
731         self._ix_scenario.run_protocols()
732
733         try:
734             while not self._terminated.value:
735                 first_run = traffic_profile.execute_traffic(self, self.client,
736                                                             mac)
737                 self.client_started.value = 1
738                 # pylint: disable=unnecessary-lambda
739                 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
740                                       timeout=traffic_profile.config.duration * 2)
741                 rfc2544_opts = self._ix_scenario.get_tc_rfc2544_options()
742                 samples = self.generate_samples(traffic_profile.ports,
743                                                 traffic_profile.config.duration)
744
745                 completed, samples = traffic_profile.get_drop_percentage(
746                     samples, min_tol, max_tol, precision, resolution,
747                     first_run=first_run, tc_rfc2544_opts=rfc2544_opts)
748                 self._queue.put(samples)
749
750                 if completed:
751                     self._terminated.value = 1
752
753         except Exception:  # pylint: disable=broad-except
754             LOG.exception('Run Traffic terminated')
755
756         self._ix_scenario.stop_protocols()
757         self.client_started.value = 0
758         self._terminated.value = 1
759
760     def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover
761         LOG.info("Ixia resource_helper run_test")
762         if self._terminated.value:
763             return
764
765         min_tol = self.rfc_helper.tolerance_low
766         max_tol = self.rfc_helper.tolerance_high
767         precision = self.rfc_helper.tolerance_precision
768         resolution = self.rfc_helper.resolution
769         default = "00:00:00:00:00:00"
770
771         self._build_ports()
772         traffic_profile.update_traffic_profile(self)
773         self._initialize_client(traffic_profile)
774
775         mac = {}
776         for port_name in self.vnfd_helper.port_pairs.all_ports:
777             intf = self.vnfd_helper.find_interface(name=port_name)
778             virt_intf = intf["virtual-interface"]
779             # we only know static traffic id by reading the json
780             # this is used by _get_ixia_trafficrofile
781             port_num = self.vnfd_helper.port_num(intf)
782             mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default)
783             mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default)
784
785         self._ix_scenario.run_protocols()
786
787         try:
788             completed = False
789             self.rfc_helper.iteration.value = 0
790             self.client_started.value = 1
791             while completed is False and not self._terminated.value:
792                 LOG.info("Wait for task ...")
793
794                 try:
795                     task = tasks_queue.get(True, 5)
796                 except moves.queue.Empty:
797                     continue
798                 else:
799                     if task != 'RUN_TRAFFIC':
800                         continue
801
802                 self.rfc_helper.iteration.value += 1
803                 LOG.info("Got %s task, start iteration %d", task,
804                          self.rfc_helper.iteration.value)
805                 first_run = traffic_profile.execute_traffic(self, self.client,
806                                                             mac)
807                 # pylint: disable=unnecessary-lambda
808                 utils.wait_until_true(lambda: self.client.is_traffic_stopped(),
809                                       timeout=traffic_profile.config.duration * 2)
810                 samples = self.generate_samples(traffic_profile.ports,
811                                                 traffic_profile.config.duration)
812
813                 completed, samples = traffic_profile.get_drop_percentage(
814                     samples, min_tol, max_tol, precision, resolution,
815                     first_run=first_run)
816                 self._queue.put(samples)
817
818                 if completed:
819                     LOG.debug("IxiaResourceHelper::run_test - test completed")
820                     results_queue.put('COMPLETE')
821                 else:
822                     results_queue.put('CONTINUE')
823                 tasks_queue.task_done()
824
825         except Exception:  # pylint: disable=broad-except
826             LOG.exception('Run Traffic terminated')
827
828         self._ix_scenario.stop_protocols()
829         self.client_started.value = 0
830         LOG.debug("IxiaResourceHelper::run_test done")
831
832
833 class IxiaTrafficGen(SampleVNFTrafficGen):
834
835     APP_NAME = 'Ixia'
836
837     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
838         if resource_helper_type is None:
839             resource_helper_type = IxiaResourceHelper
840
841         super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
842                                              resource_helper_type)
843         self._ixia_traffic_gen = None
844         self.ixia_file_name = ''
845         self.vnf_port_pairs = []
846         self._traffic_process = None
847         self._tasks_queue = JoinableQueue()
848         self._result_queue = Queue()
849
850     def _check_status(self):
851         pass
852
853     def terminate(self):
854         self.resource_helper.stop_collect()
855         super(IxiaTrafficGen, self).terminate()
856
857     def _test_runner(self, traffic_profile, tasks, results):
858         self.resource_helper.run_test(traffic_profile, tasks, results)
859
860     def _init_traffic_process(self, traffic_profile):
861         name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
862                                     traffic_profile.__class__.__name__,
863                                     os.getpid())
864         self._traffic_process = Process(name=name, target=self._test_runner,
865                                         args=(
866                                         traffic_profile, self._tasks_queue,
867                                         self._result_queue))
868
869         self._traffic_process.start()
870         while self.resource_helper.client_started.value == 0:
871             time.sleep(1)
872             if not self._traffic_process.is_alive():
873                 break
874
875     def run_traffic_once(self, traffic_profile):
876         if self.resource_helper.client_started.value == 0:
877             self._init_traffic_process(traffic_profile)
878
879         # continue test - run next iteration
880         LOG.info("Run next iteration ...")
881         self._tasks_queue.put('RUN_TRAFFIC')
882
883     def wait_on_traffic(self):
884         self._tasks_queue.join()
885         result = self._result_queue.get()
886         return result