1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from copy import deepcopy
21 from yardstick.common import exceptions
22 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 TRAFFIC_PARAMETERS = {
36 'framesize': {'64B': '25', '256B': '75'},
46 'dstip': '152.16.40.20',
47 'srcip': '152.16.100.20',
59 'traffic_type': 'continuous'
68 'framesize': {'128B': '35', '1024B': '65'},
91 'traffic_type': 'continuous'
96 class TestIxNextgen(unittest.TestCase):
99 self.ixnet = mock.Mock()
100 self.ixnet.execute = mock.Mock()
101 self.ixnet.getRoot.return_value = 'my_root'
102 self.ixnet_gen = ixnet_api.IxNextgen()
103 self.ixnet_gen._ixnet = self.ixnet
105 def test_get_config(self):
109 'external-interface': [
110 {'virtual-interface': {'vpci': '0000:07:00.1'}},
111 {'virtual-interface': {'vpci': '0001:08:01.2'}}
118 'dut_result_dir': 'test2',
120 'ixchassis': 'test4',
130 'cards': ['0000', '0001'],
131 'ports': ['07', '08'],
132 'output_dir': 'test2',
137 result = ixnet_api.IxNextgen.get_config(tg_cfg)
138 self.assertEqual(result, expected)
140 def test__get_config_element_by_flow_group_name(self):
141 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
143 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
144 output = self.ixnet_gen._get_config_element_by_flow_group_name(
146 self.assertEqual('traffic_item/configElement:flow_group_01', output)
148 def test__get_config_element_by_flow_group_name_no_match(self):
149 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
151 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
152 output = self.ixnet_gen._get_config_element_by_flow_group_name(
154 self.assertIsNone(output)
156 def test__get_stack_item(self):
157 self.ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
158 with mock.patch.object(
159 self.ixnet_gen, '_get_config_element_by_flow_group_name') as \
160 mock_get_cfg_element:
161 mock_get_cfg_element.return_value = 'cfg_element'
162 output = self.ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
163 self.assertEqual(['tcp1', 'tcp2'], output)
165 def test__get_stack_item_no_config_element(self):
166 with mock.patch.object(
167 self.ixnet_gen, '_get_config_element_by_flow_group_name',
169 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
170 self.ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
172 def test__get_field_in_stack_item(self):
173 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
174 output = self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
175 self.assertEqual('field2', output)
177 def test__get_field_in_stack_item_no_field_present(self):
178 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
179 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
180 self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
182 def test__parse_framesize(self):
183 framesize = {'64B': '75', '512b': '25'}
184 output = self.ixnet_gen._parse_framesize(framesize)
185 self.assertEqual(2, len(output))
186 self.assertIn([64, 64, 75], output)
187 self.assertIn([512, 512, 25], output)
189 def test_add_topology(self):
190 self.ixnet_gen.ixnet.add.return_value = 'obj'
191 self.ixnet_gen.add_topology('topology 1', 'vports')
192 self.ixnet_gen.ixnet.add.assert_called_once_with('my_root', 'topology')
193 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
194 'obj', '-name', 'topology 1', '-vports', 'vports')
195 self.ixnet_gen.ixnet.commit.assert_called_once()
197 def test_add_device_group(self):
198 self.ixnet_gen.ixnet.add.return_value = 'obj'
199 self.ixnet_gen.add_device_group('topology', 'device group 1', '1')
200 self.ixnet_gen.ixnet.add.assert_called_once_with('topology',
202 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
203 'obj', '-name', 'device group 1', '-multiplier', '1')
204 self.ixnet_gen.ixnet.commit.assert_called_once()
206 def test_add_ethernet(self):
207 self.ixnet_gen.ixnet.add.return_value = 'obj'
208 self.ixnet_gen.add_ethernet('device_group', 'ethernet 1')
209 self.ixnet_gen.ixnet.add.assert_called_once_with('device_group',
211 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
212 'obj', '-name', 'ethernet 1')
213 self.ixnet_gen.ixnet.commit.assert_called_once()
215 def test_add_vlans_single(self):
217 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
218 self.ixnet_gen.ixnet.getList.return_value = ['vlan1', 'vlan2']
219 vlan1 = ixnet_api.Vlan(vlan_id=100, tp_id='ethertype88a8', prio=2)
220 vlan2 = ixnet_api.Vlan(vlan_id=101, tp_id='ethertype88a8', prio=3)
221 self.ixnet_gen.add_vlans(obj, [vlan1, vlan2])
222 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('ethernet',
224 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
226 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
228 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
230 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
232 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
233 'attr/singleValue', '-value', 'ethertype88a8')
234 self.assertEqual(self.ixnet.commit.call_count, 2)
236 def test_add_vlans_increment(self):
238 self.ixnet_gen.ixnet.add.return_value = 'obj'
239 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
240 self.ixnet_gen.ixnet.getList.return_value = ['vlan1']
241 vlan = ixnet_api.Vlan(vlan_id=100, vlan_id_step=1, prio=3, prio_step=2)
242 self.ixnet_gen.add_vlans(obj, [vlan])
243 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 100,
245 '-direction', 'increment')
246 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 3,
248 '-direction', 'increment')
250 self.assertEqual(self.ixnet.commit.call_count, 2)
252 def test_add_vlans_invalid(self):
254 self.assertRaises(RuntimeError, self.ixnet_gen.add_vlans, 'obj', vlans)
256 def test_add_ipv4(self):
257 self.ixnet_gen.ixnet.add.return_value = 'obj'
258 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1')
259 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
260 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
263 self.assertEqual(self.ixnet.commit.call_count, 2)
265 def test_add_ipv4_single(self):
266 self.ixnet_gen.ixnet.add.return_value = 'obj'
267 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
268 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1', addr='100.1.1.100',
269 prefix='24', gateway='100.1.1.200')
270 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
271 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
274 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
275 'attr/singleValue', '-value', '100.1.1.100')
276 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
277 'attr/singleValue', '-value', '24')
278 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
279 'attr/singleValue', '-value', '100.1.1.200')
281 self.assertEqual(self.ixnet.commit.call_count, 2)
283 def test_add_ipv4_counter(self):
284 self.ixnet_gen.ixnet.add.return_value = 'obj'
285 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
286 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1',
289 addr_direction='increment',
291 gateway='100.1.1.200',
293 gw_direction='increment')
294 self.ixnet_gen.ixnet.add.assert_any_call('ethernet 1', 'ipv4')
295 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
298 self.ixnet_gen.ixnet.add.assert_any_call('attr', 'counter')
299 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
304 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
305 'attr/singleValue', '-value', '24')
306 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
311 self.assertEqual(self.ixnet.commit.call_count, 2)
313 def test_add_pppox_client(self):
314 self.ixnet_gen.ixnet.add.return_value = 'obj'
315 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
316 self.ixnet_gen.add_pppox_client('ethernet 1', 'pap', 'user', 'pwd')
317 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1',
320 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
321 'attr/singleValue', '-value', 'pap')
322 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
323 'attr/singleValue', '-value', 'user')
324 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
325 'attr/singleValue', '-value', 'pwd')
327 self.assertEqual(self.ixnet.commit.call_count, 2)
329 def test_add_pppox_client_invalid_auth(self):
330 self.ixnet_gen.ixnet.add.return_value = 'obj'
331 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
332 self.assertRaises(NotImplementedError, self.ixnet_gen.add_pppox_client,
333 'ethernet 1', 'invalid_auth', 'user', 'pwd')
335 self.ixnet_gen.ixnet.setMultiAttribute.assert_not_called()
337 def test_add_bgp(self):
338 self.ixnet_gen.ixnet.add.return_value = 'obj'
339 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
340 self.ixnet_gen.add_bgp(ipv4='ipv4 1',
344 self.ixnet_gen.ixnet.add.assert_called_once_with('ipv4 1', 'bgpIpv4Peer')
345 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
346 'attr/singleValue', '-value', '10.0.0.1')
347 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
348 'attr/singleValue', '-value', 65000)
349 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
350 'attr/singleValue', '-value', 'external')
352 @mock.patch.object(IxNetwork, 'IxNet')
353 def test_connect(self, mock_ixnet):
354 mock_ixnet.return_value = self.ixnet
355 with mock.patch.object(self.ixnet_gen, 'get_config') as mock_config:
356 mock_config.return_value = {'machine': 'machine_fake',
359 self.ixnet_gen.connect(mock.ANY)
361 self.ixnet.connect.assert_called_once_with(
362 'machine_fake', '-port', 'port_fake', '-version', '12345')
363 mock_config.assert_called_once()
365 def test_connect_invalid_config_no_machine(self):
366 self.ixnet_gen.get_config = mock.Mock(return_value={
369 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
370 self.ixnet.connect.assert_not_called()
372 def test_connect_invalid_config_no_port(self):
373 self.ixnet_gen.get_config = mock.Mock(return_value={
374 'machine': 'machine_fake',
376 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
377 self.ixnet.connect.assert_not_called()
379 def test_connect_invalid_config_no_version(self):
380 self.ixnet_gen.get_config = mock.Mock(return_value={
381 'machine': 'machine_fake',
382 'port': 'port_fake'})
383 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
384 self.ixnet.connect.assert_not_called()
386 def test_connect_no_config(self):
387 self.ixnet_gen.get_config = mock.Mock(return_value={})
388 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
389 self.ixnet.connect.assert_not_called()
391 def test_clear_config(self):
392 self.ixnet_gen.clear_config()
393 self.ixnet.execute.assert_called_once_with('newConfig')
395 @mock.patch.object(ixnet_api, 'log')
396 def test_assign_ports_2_ports(self, *args):
397 self.ixnet.getAttribute.side_effect = ['up', 'down']
399 'chassis': '1.1.1.1',
402 self.ixnet_gen._cfg = config
404 self.assertIsNone(self.ixnet_gen.assign_ports())
405 self.assertEqual(self.ixnet.execute.call_count, 1)
406 self.assertEqual(self.ixnet.commit.call_count, 3)
407 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
409 @mock.patch.object(ixnet_api, 'log')
410 def test_assign_ports_port_down(self, mock_log):
411 self.ixnet.getAttribute.return_value = 'down'
413 'chassis': '1.1.1.1',
416 self.ixnet_gen._cfg = config
417 self.ixnet_gen.assign_ports()
418 mock_log.warning.assert_called()
420 def test_assign_ports_no_config(self):
421 self.ixnet_gen._cfg = {}
422 self.assertRaises(KeyError, self.ixnet_gen.assign_ports)
424 def test__create_traffic_item(self):
425 self.ixnet.add.return_value = 'my_new_traffic_item'
426 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
428 self.ixnet_gen._create_traffic_item()
429 self.ixnet.add.assert_called_once_with(
430 'my_root/traffic', 'trafficItem')
431 self.ixnet.setMultiAttribute.assert_called_once_with(
432 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
433 self.assertEqual(2, self.ixnet.commit.call_count)
434 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
435 self.ixnet.setAttribute('my_traffic_item_id/tracking',
436 '-trackBy', 'trafficGroupId0')
438 def test__create_flow_groups(self):
439 uplink_endpoints = ['up_endp1', 'up_endp2']
440 downlink_endpoints = ['down_endp1', 'down_endp2']
441 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
442 self.ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2', 'endp3',
444 self.ixnet_gen._create_flow_groups(uplink_endpoints, downlink_endpoints)
445 self.ixnet_gen.ixnet.add.assert_has_calls([
446 mock.call('traffic_item', 'endpointSet'),
447 mock.call('traffic_item', 'endpointSet')])
448 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
449 mock.call('endp1', '-name', '1', '-sources', ['up_endp1'],
450 '-destinations', ['down_endp1']),
451 mock.call('endp2', '-name', '2', '-sources', ['down_endp1'],
452 '-destinations', ['up_endp1']),
453 mock.call('endp3', '-name', '3', '-sources', ['up_endp2'],
454 '-destinations', ['down_endp2']),
455 mock.call('endp4', '-name', '4', '-sources', ['down_endp2'],
456 '-destinations', ['up_endp2'])])
458 def test__append_protocol_to_stack(self):
460 self.ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
461 self.ixnet.execute.assert_called_with(
462 'append', 'prev_element',
463 'my_root/traffic/protocolTemplate:"my_protocol"')
465 def test__setup_config_elements(self):
466 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
468 with mock.patch.object(self.ixnet_gen, '_append_procotol_to_stack') as \
470 self.ixnet_gen._setup_config_elements()
471 mock_append_proto.assert_has_calls([
472 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
473 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
474 self.ixnet_gen.ixnet.setAttribute.assert_has_calls([
475 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
477 mock.call('cfg_element/frameRateDistribution',
478 '-streamDistribution', 'splitRateEvenly')])
480 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
481 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
482 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
483 def test_create_traffic_model(self, mock__setup_config_elements,
484 mock__create_flow_groups,
485 mock__create_traffic_item):
486 uplink_ports = ['port1', 'port3']
487 downlink_ports = ['port2', 'port4']
488 uplink_endpoints = ['port1/protocols', 'port3/protocols']
489 downlink_endpoints = ['port2/protocols', 'port4/protocols']
490 self.ixnet_gen.create_traffic_model(uplink_ports, downlink_ports)
491 mock__create_traffic_item.assert_called_once_with('raw')
492 mock__create_flow_groups.assert_called_once_with(uplink_endpoints,
494 mock__setup_config_elements.assert_called_once()
496 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
497 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
498 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
499 def test_create_ipv4_traffic_model(self, mock__setup_config_elements,
500 mock__create_flow_groups,
501 mock__create_traffic_item):
502 uplink_topologies = ['up1', 'up3']
503 downlink_topologies = ['down2', 'down4']
504 self.ixnet_gen.create_ipv4_traffic_model(uplink_topologies,
506 mock__create_traffic_item.assert_called_once_with('ipv4')
507 mock__create_flow_groups.assert_called_once_with(uplink_topologies,
509 mock__setup_config_elements.assert_called_once_with(False)
511 def test__update_frame_mac(self):
512 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item') as \
514 mock_get_field.return_value = 'field_descriptor'
515 self.ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
516 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
517 self.ixnet_gen.ixnet.setMultiAttribute(
518 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
519 '-valueType', 'singleValue')
520 self.ixnet_gen.ixnet.commit.assert_called_once()
522 def test_update_frame(self):
523 with mock.patch.object(
524 self.ixnet_gen, '_get_config_element_by_flow_group_name',
525 return_value='cfg_element'), \
526 mock.patch.object(self.ixnet_gen, '_update_frame_mac') as \
528 mock.patch.object(self.ixnet_gen, '_get_stack_item') as \
530 mock_get_stack_item.side_effect = [['item1'], ['item2'],
531 ['item3'], ['item4']]
532 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 50)
534 self.assertEqual(6, len(self.ixnet_gen.ixnet.setMultiAttribute.mock_calls))
535 self.assertEqual(4, len(mock_update_frame.mock_calls))
537 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
538 mock.call('cfg_element/transmissionControl',
539 '-type', 'continuous', '-duration', 50)
542 def test_update_frame_qinq(self):
543 with mock.patch.object(self.ixnet_gen,
544 '_get_config_element_by_flow_group_name',
545 return_value='cfg_element'), \
546 mock.patch.object(self.ixnet_gen, '_update_frame_mac'),\
547 mock.patch.object(self.ixnet_gen, '_get_stack_item',
548 return_value='item'), \
549 mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
550 return_value='field'):
552 traffic_parameters = deepcopy(TRAFFIC_PARAMETERS)
553 traffic_parameters[UPLINK]['outer_l2']['QinQ'] = {
554 'S-VLAN': {'id': 128,
557 'C-VLAN': {'id': 512,
562 self.ixnet_gen.update_frame(traffic_parameters, 50)
564 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
565 mock.call('field', '-auto', 'false', '-singleValue', '0x88a8',
566 '-fieldValue', '0x88a8', '-valueType', 'singleValue'),
567 mock.call('field', '-auto', 'false', '-singleValue', 1,
568 '-fieldValue', 1, '-valueType', 'singleValue'),
569 mock.call('field', '-auto', 'false', '-singleValue', 128,
570 '-fieldValue', 128, '-valueType', 'singleValue'),
571 mock.call('field', '-auto', 'false', '-singleValue', 512,
572 '-fieldValue', 512, '-valueType', 'singleValue'),
573 mock.call('field', '-auto', 'false', '-singleValue', 2,
574 '-fieldValue', 2, '-valueType', 'singleValue')
577 def test_update_frame_flow_not_present(self):
578 with mock.patch.object(
579 self.ixnet_gen, '_get_config_element_by_flow_group_name',
581 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
582 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 40)
584 def test_get_statistics(self):
585 port_statistics = '::ixNet::OBJ-/statistics/view:"Port Statistics"'
586 flow_statistics = '::ixNet::OBJ-/statistics/view:"Flow Statistics"'
587 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
589 self.ixnet_gen.get_statistics()
591 mock_build_stats.assert_has_calls([
592 mock.call(port_statistics, self.ixnet_gen.PORT_STATS_NAME_MAP),
593 mock.call(flow_statistics, self.ixnet_gen.LATENCY_NAME_MAP)])
595 def test__update_ipv4_address(self):
596 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
597 return_value='field_desc'):
598 self.ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
600 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
601 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
602 '-randomMask', '0.0.0.63', '-valueType', 'random',
605 def test__update_udp_port(self):
606 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
607 return_value='field_desc'):
608 self.ixnet_gen._update_udp_port(mock.ANY, mock.ANY, 1234,
611 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
617 '-valueType', 'random',
620 def test_update_ip_packet(self):
621 with mock.patch.object(self.ixnet_gen, '_update_ipv4_address') as \
623 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
624 mock.patch.object(self.ixnet_gen,
625 '_get_config_element_by_flow_group_name', return_value='celm'):
626 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
628 self.assertEqual(4, len(mock_update_add.mock_calls))
630 def test_update_ip_packet_exception_no_config_element(self):
631 with mock.patch.object(self.ixnet_gen,
632 '_get_config_element_by_flow_group_name',
634 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
635 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
637 def test_update_l4(self):
638 with mock.patch.object(self.ixnet_gen, '_update_udp_port') as \
640 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
641 mock.patch.object(self.ixnet_gen,
642 '_get_config_element_by_flow_group_name', return_value='celm'):
643 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
645 self.assertEqual(4, len(mock_update_udp.mock_calls))
647 def test_update_l4_exception_no_config_element(self):
648 with mock.patch.object(self.ixnet_gen,
649 '_get_config_element_by_flow_group_name',
651 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
652 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
654 def test_update_l4_exception_no_supported_proto(self):
655 traffic_parameters = {
659 'proto': 'unsupported',
666 with mock.patch.object(self.ixnet_gen,
667 '_get_config_element_by_flow_group_name',
668 return_value='celm'):
669 with self.assertRaises(exceptions.IXIAUnsupportedProtocol):
670 self.ixnet_gen.update_l4(traffic_parameters)
672 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
673 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
674 self.ixnet_gen._ixnet.getList.return_value = [0]
676 mock_ixnextgen_get_traffic_state.side_effect = [
677 'stopped', 'started', 'started', 'started']
679 result = self.ixnet_gen.start_traffic()
680 self.assertIsNone(result)
681 self.ixnet.getList.assert_called_once()
682 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
684 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
685 def test_start_traffic_traffic_running(
686 self, mock_ixnextgen_get_traffic_state):
687 self.ixnet_gen._ixnet.getList.return_value = [0]
688 mock_ixnextgen_get_traffic_state.side_effect = [
689 'started', 'stopped', 'started']
691 result = self.ixnet_gen.start_traffic()
692 self.assertIsNone(result)
693 self.ixnet.getList.assert_called_once()
694 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
696 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
697 def test_start_traffic_wait_for_traffic_to_stop(
698 self, mock_ixnextgen_get_traffic_state):
699 self.ixnet_gen._ixnet.getList.return_value = [0]
700 mock_ixnextgen_get_traffic_state.side_effect = [
701 'started', 'started', 'started', 'stopped', 'started']
703 result = self.ixnet_gen.start_traffic()
704 self.assertIsNone(result)
705 self.ixnet.getList.assert_called_once()
706 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
708 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
709 def test_start_traffic_wait_for_traffic_start(
710 self, mock_ixnextgen_get_traffic_state):
711 self.ixnet_gen._ixnet.getList.return_value = [0]
712 mock_ixnextgen_get_traffic_state.side_effect = [
713 'stopped', 'stopped', 'stopped', 'started']
715 result = self.ixnet_gen.start_traffic()
716 self.assertIsNone(result)
717 self.ixnet.getList.assert_called_once()
718 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
720 def test__get_protocol_status(self):
721 self.ixnet.getAttribute.return_value = ['up']
722 self.ixnet_gen._get_protocol_status('ipv4')
723 self.ixnet.getAttribute.assert_called_once_with('ipv4',
726 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
727 def test_is_protocols_running(self, mock_ixnextgen_get_protocol_status):
728 mock_ixnextgen_get_protocol_status.return_value = 'up'
729 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
730 self.assertTrue(result)
732 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
733 def test_is_protocols_stopped(self, mock_ixnextgen_get_protocol_status):
734 mock_ixnextgen_get_protocol_status.return_value = 'down'
735 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
736 self.assertFalse(result)
738 def test_start_protocols(self):
739 self.ixnet_gen.start_protocols()
740 self.ixnet.execute.assert_called_once_with('startAllProtocols')
742 def test_stop_protocols(self):
743 self.ixnet_gen.stop_protocols()
744 self.ixnet.execute.assert_called_once_with('stopAllProtocols')
746 def test_get_vports(self):
747 self.ixnet_gen._ixnet.getRoot.return_value = 'root'
748 self.ixnet_gen.get_vports()
749 self.ixnet.getList.assert_called_once_with('root', 'vport')