1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from trex_stl_lib import api as Pkt
18 from trex_stl_lib import trex_stl_client
19 from trex_stl_lib import trex_stl_packet_builder_scapy
20 from trex_stl_lib import trex_stl_streams
22 from yardstick.network_services.traffic_profile import rfc2544
23 from yardstick.tests.unit import base
26 class TestRFC2544Profile(base.BaseUnitTestCase):
28 "schema": "isb:traffic_profile:0.1",
30 "description": "Fixed traffic profile to run UDP traffic",
32 "traffic_type": "FixedTraffic",
37 PROFILE = {'description': 'Traffic profile to run RFC2544 latency',
39 'traffic_profile': {'traffic_type': 'RFC2544Profile',
45 {'64B': '100', '1518B': '0',
46 '128B': '0', '1400B': '0',
47 '256B': '0', '373b': '0',
50 {'dstip4': '1.1.1.1-1.15.255.255',
52 'srcip4': '90.90.1.1-90.105.255.255',
53 'dscp': 0, 'ttl': 32, 'count': 1},
56 'dsrport': '1234', 'count': 1}}},
61 {'64B': '100', '1518B': '0',
62 '128B': '0', '1400B': '0',
63 '256B': '0', '373b': '0',
66 {'dstip4': '9.9.1.1-90.105.255.255',
68 'srcip4': '1.1.1.1-1.15.255.255',
69 'dscp': 0, 'ttl': 32, 'count': 1},
72 'srcport': '1234', 'count': 1}}},
73 'schema': 'isb:traffic_profile:0.1'}
75 def test___init__(self):
76 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
77 self.assertEqual(rfc2544_profile.max_rate, rfc2544_profile.rate)
78 self.assertEqual(0, rfc2544_profile.min_rate)
80 def test_stop_traffic(self):
81 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
82 mock_generator = mock.Mock()
83 rfc2544_profile.stop_traffic(traffic_generator=mock_generator)
84 mock_generator.client.stop.assert_called_once()
85 mock_generator.client.reset.assert_called_once()
86 mock_generator.client.remove_all_streams.assert_called_once()
88 def test_execute_traffic(self):
89 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
90 mock_generator = mock.Mock()
91 mock_generator.networks = {
92 'downlink_0': ['xe0', 'xe1'],
93 'uplink_0': ['xe2', 'xe3'],
95 mock_generator.port_num.side_effect = [10, 20, 30, 40]
96 mock_generator.rfc2544_helper.correlated_traffic = False
97 rfc2544_profile.params = {
98 'downlink_0': 'profile1',
99 'uplink_0': 'profile2'}
101 with mock.patch.object(rfc2544_profile, '_create_profile') as \
103 rfc2544_profile.execute_traffic(traffic_generator=mock_generator)
104 mock_create_profile.assert_has_calls([
105 mock.call('profile1', rfc2544_profile.rate, mock.ANY, False),
106 mock.call('profile1', rfc2544_profile.rate, mock.ANY, False),
107 mock.call('profile2', rfc2544_profile.rate, mock.ANY, False),
108 mock.call('profile2', rfc2544_profile.rate, mock.ANY, False)])
109 mock_generator.client.add_streams.assert_has_calls([
110 mock.call(mock.ANY, ports=[10]),
111 mock.call(mock.ANY, ports=[20]),
112 mock.call(mock.ANY, ports=[30]),
113 mock.call(mock.ANY, ports=[40])])
114 mock_generator.client.start(ports=[10, 20, 30, 40],
115 duration=rfc2544_profile.config.duration,
118 @mock.patch.object(trex_stl_streams, 'STLProfile')
119 def test__create_profile(self, mock_stl_profile):
120 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
121 port_pg_id = mock.ANY
122 profile_data = {'packetid_1': {'outer_l2': {'framesize': 'imix_info'}}}
124 with mock.patch.object(rfc2544_profile, '_create_imix_data') as \
126 mock.patch.object(rfc2544_profile, '_create_vm') as \
128 mock.patch.object(rfc2544_profile, '_create_streams') as \
130 mock_create_imix.return_value = 'imix_data'
131 mock_create_streams.return_value = ['stream1']
132 rfc2544_profile._create_profile(profile_data, rate, port_pg_id,
135 mock_create_imix.assert_called_once_with('imix_info')
136 mock_create_vm.assert_called_once_with(
137 {'outer_l2': {'framesize': 'imix_info'}})
138 mock_create_streams.assert_called_once_with('imix_data', 100,
140 mock_stl_profile.assert_called_once_with(['stream1'])
142 def test__create_imix_data(self):
143 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
144 data = {'64B': 50, '128B': 50}
145 self.assertEqual({'64': 50.0, '128': 50.0},
146 rfc2544_profile._create_imix_data(data))
147 data = {'64B': 1, '128b': 3}
148 self.assertEqual({'64': 25.0, '128': 75.0},
149 rfc2544_profile._create_imix_data(data))
151 self.assertEqual({}, rfc2544_profile._create_imix_data(data))
153 def test__create_vm(self):
154 packet = {'outer_l2': 'l2_definition'}
155 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
156 with mock.patch.object(rfc2544_profile, '_set_outer_l2_fields') as \
158 rfc2544_profile._create_vm(packet)
159 mock_l2_fileds.assert_called_once_with('l2_definition')
161 @mock.patch.object(trex_stl_packet_builder_scapy, 'STLPktBuilder',
162 return_value='packet')
163 def test__create_single_packet(self, mock_pktbuilder):
165 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
166 rfc2544_profile.ether_packet = Pkt.Eth()
167 rfc2544_profile.ip_packet = Pkt.IP()
168 rfc2544_profile.udp_packet = Pkt.UDP()
169 rfc2544_profile.trex_vm = 'trex_vm'
170 base_pkt = (rfc2544_profile.ether_packet / rfc2544_profile.ip_packet /
171 rfc2544_profile.udp_packet)
172 pad = (size - len(base_pkt)) * 'x'
173 output = rfc2544_profile._create_single_packet(size=size)
174 mock_pktbuilder.assert_called_once_with(pkt=base_pkt / pad,
176 self.assertEqual(output, 'packet')
178 @mock.patch.object(trex_stl_packet_builder_scapy, 'STLPktBuilder',
179 return_value='packet')
180 def test__create_single_packet_qinq(self, mock_pktbuilder):
182 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
183 rfc2544_profile.ether_packet = Pkt.Eth()
184 rfc2544_profile.ip_packet = Pkt.IP()
185 rfc2544_profile.udp_packet = Pkt.UDP()
186 rfc2544_profile.trex_vm = 'trex_vm'
187 rfc2544_profile.qinq = True
188 rfc2544_profile.qinq_packet = Pkt.Dot1Q(vlan=1) / Pkt.Dot1Q(vlan=2)
189 base_pkt = (rfc2544_profile.ether_packet /
190 rfc2544_profile.qinq_packet / rfc2544_profile.ip_packet /
191 rfc2544_profile.udp_packet)
192 pad = (size - len(base_pkt)) * 'x'
193 output = rfc2544_profile._create_single_packet(size=size)
194 mock_pktbuilder.assert_called_once_with(pkt=base_pkt / pad,
196 self.assertEqual(output, 'packet')
198 @mock.patch.object(trex_stl_streams, 'STLFlowLatencyStats')
199 @mock.patch.object(trex_stl_streams, 'STLTXCont')
200 @mock.patch.object(trex_stl_client, 'STLStream')
201 def test__create_streams(self, mock_stream, mock_txcont, mock_latency):
202 imix_data = {'64': 25, '512': 75}
204 port_pg_id = rfc2544.PortPgIDMap()
205 port_pg_id.add_port(10)
206 mock_stream.side_effect = ['stream1', 'stream2']
207 mock_txcont.side_effect = ['txcont1', 'txcont2']
208 mock_latency.side_effect = ['latency1', 'latency2']
209 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
210 with mock.patch.object(rfc2544_profile, '_create_single_packet'):
211 output = rfc2544_profile._create_streams(imix_data, rate,
213 self.assertEqual(['stream1', 'stream2'], output)
214 mock_latency.assert_has_calls([
215 mock.call(pg_id=1), mock.call(pg_id=2)])
216 mock_txcont.assert_has_calls([
217 mock.call(percentage=float(25 * 35) / 100),
218 mock.call(percentage=float(75 * 35) / 100)], any_order=True)
220 def test_get_drop_percentage(self):
221 rfc2544_profile = rfc2544.RFC2544Profile(self.TRAFFIC_PROFILE)
223 {'xe1': {'tx_throughput_fps': 100,
224 'rx_throughput_fps': 101,
227 'xe2': {'tx_throughput_fps': 200,
228 'rx_throughput_fps': 201,
230 'in_packets': 4010}},
231 {'xe1': {'tx_throughput_fps': 106,
232 'rx_throughput_fps': 108,
235 'latency': 'Latency1'},
236 'xe2': {'tx_throughput_fps': 203,
237 'rx_throughput_fps': 215,
240 'latency': 'Latency2'}}
242 completed, output = rfc2544_profile.get_drop_percentage(
243 samples, 0, 0, False)
244 expected = {'DropPercentage': 0.3963,
245 'Latency': {'xe1': 'Latency1', 'xe2': 'Latency2'},
246 'RxThroughput': 312.5,
247 'TxThroughput': 304.5,
248 'CurrentDropPercentage': 0.3963,
251 self.assertEqual(expected, output)
252 self.assertFalse(completed)
255 class PortPgIDMapTestCase(base.BaseUnitTestCase):
257 def test_add_port(self):
258 port_pg_id_map = rfc2544.PortPgIDMap()
259 port_pg_id_map.add_port(10)
260 self.assertEqual(10, port_pg_id_map._last_port)
261 self.assertEqual([], port_pg_id_map._port_pg_id_map[10])
263 def test_get_pg_ids(self):
264 port_pg_id_map = rfc2544.PortPgIDMap()
265 port_pg_id_map.add_port(10)
266 port_pg_id_map.increase_pg_id()
267 port_pg_id_map.increase_pg_id()
268 port_pg_id_map.add_port(20)
269 port_pg_id_map.increase_pg_id()
270 self.assertEqual([1, 2], port_pg_id_map.get_pg_ids(10))
271 self.assertEqual([3], port_pg_id_map.get_pg_ids(20))
272 self.assertEqual([], port_pg_id_map.get_pg_ids(30))
274 def test_increase_pg_id_no_port(self):
275 port_pg_id_map = rfc2544.PortPgIDMap()
276 self.assertIsNone(port_pg_id_map.increase_pg_id())
278 def test_increase_pg_id_last_port(self):
279 port_pg_id_map = rfc2544.PortPgIDMap()
280 port_pg_id_map.add_port(10)
281 self.assertEqual(1, port_pg_id_map.increase_pg_id())
282 self.assertEqual([1], port_pg_id_map.get_pg_ids(10))
283 self.assertEqual(10, port_pg_id_map._last_port)
285 def test_increase_pg_id(self):
286 port_pg_id_map = rfc2544.PortPgIDMap()
287 port_pg_id_map.add_port(10)
288 port_pg_id_map.increase_pg_id()
289 self.assertEqual(2, port_pg_id_map.increase_pg_id(port=20))
290 self.assertEqual([1], port_pg_id_map.get_pg_ids(10))
291 self.assertEqual([2], port_pg_id_map.get_pg_ids(20))
292 self.assertEqual(20, port_pg_id_map._last_port)