1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from yardstick.common import exceptions
20 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
26 TRAFFIC_PARAMETERS = {
33 'framesize': {'64B': '25', '256B': '75'}
38 'dstip4': '152.16.40.20',
40 'srcip4': '152.16.100.20',
45 'dstip4': '152.16.40.20',
47 'srcip4': '152.16.100.20',
53 'dstip4': '152.16.100.20',
55 'srcip4': '152.16.40.20',
62 'traffic_type': 'continuous'
70 'framesize': {'128B': '35', '1024B': '65'}
75 'dstip4': '152.16.100.20',
77 'srcip4': '152.16.40.20',
83 'dstip4': '152.16.100.20',
85 'srcip4': '152.16.40.20',
91 'dstip4': '152.16.100.20',
93 'srcip4': '152.16.40.20',
100 'traffic_type': 'continuous'
105 class TestIxNextgen(unittest.TestCase):
108 self.ixnet = mock.Mock()
109 self.ixnet.execute = mock.Mock()
110 self.ixnet.getRoot.return_value = 'my_root'
112 def test_get_config(self):
116 'external-interface': [
117 {'virtual-interface': {'vpci': '0000:07:00.1'}},
118 {'virtual-interface': {'vpci': '0001:08:01.2'}}
125 'dut_result_dir': 'test2',
127 'ixchassis': 'test4',
137 'cards': ['0000', '0001'],
138 'ports': ['07', '08'],
139 'output_dir': 'test2',
144 result = ixnet_api.IxNextgen.get_config(tg_cfg)
145 self.assertEqual(result, expected)
147 def test__get_config_element_by_flow_group_name(self):
148 ixnet_gen = ixnet_api.IxNextgen()
149 ixnet_gen._ixnet = self.ixnet
150 ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
152 ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
153 output = ixnet_gen._get_config_element_by_flow_group_name(
155 self.assertEqual('traffic_item/configElement:flow_group_01', output)
157 def test__get_config_element_by_flow_group_name_no_match(self):
158 ixnet_gen = ixnet_api.IxNextgen()
159 ixnet_gen._ixnet = self.ixnet
160 ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
162 ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
163 output = ixnet_gen._get_config_element_by_flow_group_name(
165 self.assertIsNone(output)
167 def test__get_stack_item(self):
168 ixnet_gen = ixnet_api.IxNextgen()
169 ixnet_gen._ixnet = self.ixnet
170 ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
171 with mock.patch.object(
172 ixnet_gen, '_get_config_element_by_flow_group_name') as \
173 mock_get_cfg_element:
174 mock_get_cfg_element.return_value = 'cfg_element'
175 output = ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
176 self.assertEqual(['tcp1', 'tcp2'], output)
178 def test__get_stack_item_no_config_element(self):
179 ixnet_gen = ixnet_api.IxNextgen()
180 ixnet_gen._ixnet = self.ixnet
181 with mock.patch.object(
182 ixnet_gen, '_get_config_element_by_flow_group_name',
184 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
185 ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
187 def test__get_field_in_stack_item(self):
188 ixnet_gen = ixnet_api.IxNextgen()
189 ixnet_gen._ixnet = self.ixnet
190 ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
191 output = ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
192 self.assertEqual('field2', output)
194 def test__get_field_in_stack_item_no_field_present(self):
195 ixnet_gen = ixnet_api.IxNextgen()
196 ixnet_gen._ixnet = self.ixnet
197 ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
198 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
199 ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
201 def test__parse_framesize(self):
202 ixnet_gen = ixnet_api.IxNextgen()
203 ixnet_gen._ixnet = self.ixnet
204 framesize = {'64B': '75', '512b': '25'}
205 output = ixnet_gen._parse_framesize(framesize)
206 self.assertEqual(2, len(output))
207 self.assertIn([64, 64, 75], output)
208 self.assertIn([512, 512, 25], output)
210 @mock.patch.object(IxNetwork, 'IxNet')
211 def test_connect(self, mock_ixnet):
212 mock_ixnet.return_value = self.ixnet
213 ixnet_gen = ixnet_api.IxNextgen()
214 with mock.patch.object(ixnet_gen, 'get_config') as mock_config:
215 mock_config.return_value = {'machine': 'machine_fake',
218 ixnet_gen.connect(mock.ANY)
220 self.ixnet.connect.assert_called_once_with(
221 'machine_fake', '-port', 'port_fake', '-version', '12345')
222 mock_config.assert_called_once()
224 def test_connect_invalid_config_no_machine(self):
225 ixnet_gen = ixnet_api.IxNextgen()
226 ixnet_gen._ixnet = self.ixnet
227 ixnet_gen.get_config = mock.Mock(return_value={
230 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
231 self.ixnet.connect.assert_not_called()
233 def test_connect_invalid_config_no_port(self):
234 ixnet_gen = ixnet_api.IxNextgen()
235 ixnet_gen._ixnet = self.ixnet
236 ixnet_gen.get_config = mock.Mock(return_value={
237 'machine': 'machine_fake',
239 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
240 self.ixnet.connect.assert_not_called()
242 def test_connect_invalid_config_no_version(self):
243 ixnet_gen = ixnet_api.IxNextgen()
244 ixnet_gen._ixnet = self.ixnet
245 ixnet_gen.get_config = mock.Mock(return_value={
246 'machine': 'machine_fake',
247 'port': 'port_fake'})
248 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
249 self.ixnet.connect.assert_not_called()
251 def test_connect_no_config(self):
252 ixnet_gen = ixnet_api.IxNextgen()
253 ixnet_gen._ixnet = self.ixnet
254 ixnet_gen.get_config = mock.Mock(return_value={})
255 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
256 self.ixnet.connect.assert_not_called()
258 def test_clear_config(self):
259 ixnet_gen = ixnet_api.IxNextgen()
260 ixnet_gen._ixnet = self.ixnet
261 ixnet_gen.clear_config()
262 self.ixnet.execute.assert_called_once_with('newConfig')
264 @mock.patch.object(ixnet_api, 'log')
265 def test_assign_ports_2_ports(self, *args):
266 self.ixnet.getAttribute.side_effect = ['up', 'down']
268 'chassis': '1.1.1.1',
271 ixnet_gen = ixnet_api.IxNextgen()
272 ixnet_gen._ixnet = self.ixnet
273 ixnet_gen._cfg = config
275 self.assertIsNone(ixnet_gen.assign_ports())
276 self.assertEqual(self.ixnet.execute.call_count, 2)
277 self.assertEqual(self.ixnet.commit.call_count, 4)
278 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
280 @mock.patch.object(ixnet_api, 'log')
281 def test_assign_ports_port_down(self, mock_log):
282 self.ixnet.getAttribute.return_value = 'down'
284 'chassis': '1.1.1.1',
287 ixnet_gen = ixnet_api.IxNextgen()
288 ixnet_gen._ixnet = self.ixnet
289 ixnet_gen._cfg = config
290 ixnet_gen.assign_ports()
291 mock_log.warning.assert_called()
293 def test_assign_ports_no_config(self):
294 ixnet_gen = ixnet_api.IxNextgen()
295 ixnet_gen._ixnet = self.ixnet
297 self.assertRaises(KeyError, ixnet_gen.assign_ports)
299 def test__create_traffic_item(self):
300 ixnet_gen = ixnet_api.IxNextgen()
301 ixnet_gen._ixnet = self.ixnet
302 self.ixnet.add.return_value = 'my_new_traffic_item'
303 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
305 ixnet_gen._create_traffic_item()
306 self.ixnet.add.assert_called_once_with(
307 'my_root/traffic', 'trafficItem')
308 self.ixnet.setMultiAttribute.assert_called_once_with(
309 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
310 self.assertEqual(2, self.ixnet.commit.call_count)
311 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
312 self.ixnet.setAttribute('my_traffic_item_id/tracking',
313 '-trackBy', 'trafficGroupId0')
315 def test__create_flow_groups(self):
316 ixnet_gen = ixnet_api.IxNextgen()
317 ixnet_gen._ixnet = self.ixnet
318 ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
319 ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2']
320 ixnet_gen._create_flow_groups()
321 ixnet_gen.ixnet.add.assert_has_calls([
322 mock.call('traffic_item', 'endpointSet'),
323 mock.call('traffic_item', 'endpointSet')])
324 ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
325 mock.call('endp1', '-name', '1', '-sources', ['1/protocols'],
326 '-destinations', ['2/protocols']),
327 mock.call('endp2', '-name', '2', '-sources', ['2/protocols'],
328 '-destinations', ['1/protocols'])])
330 def test__append_protocol_to_stack(self):
331 ixnet_gen = ixnet_api.IxNextgen()
332 ixnet_gen._ixnet = self.ixnet
334 ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
335 self.ixnet.execute.assert_called_with(
336 'append', 'prev_element',
337 'my_root/traffic/protocolTemplate:"my_protocol"')
339 def test__setup_config_elements(self):
340 ixnet_gen = ixnet_api.IxNextgen()
341 ixnet_gen._ixnet = self.ixnet
342 ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
344 with mock.patch.object(ixnet_gen, '_append_procotol_to_stack') as \
346 ixnet_gen._setup_config_elements()
347 mock_append_proto.assert_has_calls([
348 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
349 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
350 ixnet_gen.ixnet.setAttribute.assert_has_calls([
351 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
353 mock.call('cfg_element/frameRateDistribution',
354 '-streamDistribution', 'splitRateEvenly')])
356 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
357 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
358 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
359 def test_create_traffic_model(self, mock__setup_config_elements,
360 mock__create_flow_groups,
361 mock__create_traffic_item):
362 ixnet_gen = ixnet_api.IxNextgen()
363 ixnet_gen._ixnet = self.ixnet
365 ixnet_gen.create_traffic_model()
366 mock__create_traffic_item.assert_called_once()
367 mock__create_flow_groups.assert_called_once()
368 mock__setup_config_elements.assert_called_once()
370 def test__update_frame_mac(self):
371 ixnet_gen = ixnet_api.IxNextgen()
372 ixnet_gen._ixnet = self.ixnet
373 with mock.patch.object(ixnet_gen, '_get_field_in_stack_item') as \
375 mock_get_field.return_value = 'field_descriptor'
376 ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
377 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
378 ixnet_gen.ixnet.setMultiAttribute(
379 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
380 '-valueType', 'singleValue')
381 ixnet_gen.ixnet.commit.assert_called_once()
383 def test_update_frame(self):
384 ixnet_gen = ixnet_api.IxNextgen()
385 ixnet_gen._ixnet = self.ixnet
386 with mock.patch.object(
387 ixnet_gen, '_get_config_element_by_flow_group_name',
388 return_value='cfg_element'), \
389 mock.patch.object(ixnet_gen, '_update_frame_mac') as \
391 mock.patch.object(ixnet_gen, '_get_stack_item') as \
393 mock_get_stack_item.side_effect = [['item1'], ['item2'],
394 ['item3'], ['item4']]
395 ixnet_gen.update_frame(TRAFFIC_PARAMETERS)
397 self.assertEqual(6, len(ixnet_gen.ixnet.setMultiAttribute.mock_calls))
398 self.assertEqual(4, len(mock_update_frame.mock_calls))
400 def test_update_frame_flow_not_present(self):
401 ixnet_gen = ixnet_api.IxNextgen()
402 ixnet_gen._ixnet = self.ixnet
403 with mock.patch.object(
404 ixnet_gen, '_get_config_element_by_flow_group_name',
406 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
407 ixnet_gen.update_frame(TRAFFIC_PARAMETERS)
409 def test_get_statistics(self):
410 ixnet_gen = ixnet_api.IxNextgen()
411 port_statistics = '::ixNet::OBJ-/statistics/view:"Port Statistics"'
412 flow_statistics = '::ixNet::OBJ-/statistics/view:"Flow Statistics"'
413 with mock.patch.object(ixnet_gen, '_build_stats_map') as \
415 ixnet_gen.get_statistics()
417 mock_build_stats.assert_has_calls([
418 mock.call(port_statistics, ixnet_gen.PORT_STATS_NAME_MAP),
419 mock.call(flow_statistics, ixnet_gen.LATENCY_NAME_MAP)])
421 def test__update_ipv4_address(self):
422 ixnet_gen = ixnet_api.IxNextgen()
423 ixnet_gen._ixnet = self.ixnet
424 with mock.patch.object(ixnet_gen, '_get_field_in_stack_item',
425 return_value='field_desc'):
426 ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
427 100, '255.255.255.0', 25)
428 ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
429 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
430 '-randomMask', '255.255.255.0', '-valueType', 'random',
433 def test_update_ip_packet(self):
434 ixnet_gen = ixnet_api.IxNextgen()
435 ixnet_gen._ixnet = self.ixnet
436 with mock.patch.object(ixnet_gen, '_update_ipv4_address') as \
438 mock.patch.object(ixnet_gen, '_get_stack_item'), \
439 mock.patch.object(ixnet_gen,
440 '_get_config_element_by_flow_group_name', return_value='celm'):
441 ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
443 self.assertEqual(4, len(mock_update_add.mock_calls))
445 def test_update_ip_packet_exception_no_config_element(self):
446 ixnet_gen = ixnet_api.IxNextgen()
447 ixnet_gen._ixnet = self.ixnet
448 with mock.patch.object(ixnet_gen,
449 '_get_config_element_by_flow_group_name',
451 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
452 ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
454 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
455 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
456 ixnet_gen = ixnet_api.IxNextgen()
457 ixnet_gen._ixnet = self.ixnet
458 ixnet_gen._ixnet.getList.return_value = [0]
460 mock_ixnextgen_get_traffic_state.side_effect = [
461 'stopped', 'started', 'started', 'started']
463 result = ixnet_gen.start_traffic()
464 self.assertIsNone(result)
465 self.ixnet.getList.assert_called_once()
466 self.assertEqual(3, ixnet_gen._ixnet.execute.call_count)
468 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
469 def test_start_traffic_traffic_running(
470 self, mock_ixnextgen_get_traffic_state):
471 ixnet_gen = ixnet_api.IxNextgen()
472 ixnet_gen._ixnet = self.ixnet
473 ixnet_gen._ixnet.getList.return_value = [0]
474 mock_ixnextgen_get_traffic_state.side_effect = [
475 'started', 'stopped', 'started']
477 result = ixnet_gen.start_traffic()
478 self.assertIsNone(result)
479 self.ixnet.getList.assert_called_once()
480 self.assertEqual(4, ixnet_gen._ixnet.execute.call_count)
482 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
483 def test_start_traffic_wait_for_traffic_to_stop(
484 self, mock_ixnextgen_get_traffic_state):
485 ixnet_gen = ixnet_api.IxNextgen()
486 ixnet_gen._ixnet = self.ixnet
487 ixnet_gen._ixnet.getList.return_value = [0]
488 mock_ixnextgen_get_traffic_state.side_effect = [
489 'started', 'started', 'started', 'stopped', 'started']
491 result = ixnet_gen.start_traffic()
492 self.assertIsNone(result)
493 self.ixnet.getList.assert_called_once()
494 self.assertEqual(4, ixnet_gen._ixnet.execute.call_count)
496 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
497 def test_start_traffic_wait_for_traffic_start(
498 self, mock_ixnextgen_get_traffic_state):
499 ixnet_gen = ixnet_api.IxNextgen()
500 ixnet_gen._ixnet = self.ixnet
501 ixnet_gen._ixnet.getList.return_value = [0]
502 mock_ixnextgen_get_traffic_state.side_effect = [
503 'stopped', 'stopped', 'stopped', 'started']
505 result = ixnet_gen.start_traffic()
506 self.assertIsNone(result)
507 self.ixnet.getList.assert_called_once()
508 self.assertEqual(3, ixnet_gen._ixnet.execute.call_count)