1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from copy import deepcopy
21 from yardstick.common import exceptions
22 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 TRAFFIC_PARAMETERS = {
36 'framesize': {'64B': '25', '256B': '75'},
46 'dstip': '152.16.40.20',
47 'srcip': '152.16.100.20',
50 'priority': {'raw': '0x01'}
60 'traffic_type': 'continuous'
69 'framesize': {'128B': '35', '1024B': '65'},
83 'priority': {'raw': '0x01'}
93 'traffic_type': 'continuous'
98 class TestIxNextgen(unittest.TestCase):
101 self.ixnet = mock.Mock()
102 self.ixnet.execute = mock.Mock()
103 self.ixnet.getRoot.return_value = 'my_root'
104 self.ixnet_gen = ixnet_api.IxNextgen()
105 self.ixnet_gen._ixnet = self.ixnet
107 def test_get_config(self):
111 'external-interface': [
112 {'virtual-interface': {'vpci': '0000:07:00.1'}},
113 {'virtual-interface': {'vpci': '0001:08:01.2'}}
120 'dut_result_dir': 'test2',
122 'ixchassis': 'test4',
132 'cards': ['0000', '0001'],
133 'ports': ['07', '08'],
134 'output_dir': 'test2',
139 result = ixnet_api.IxNextgen.get_config(tg_cfg)
140 self.assertEqual(result, expected)
142 def test__get_config_element_by_flow_group_name(self):
143 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
145 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
146 output = self.ixnet_gen._get_config_element_by_flow_group_name(
148 self.assertEqual('traffic_item/configElement:flow_group_01', output)
150 def test__get_config_element_by_flow_group_name_no_match(self):
151 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
153 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
154 output = self.ixnet_gen._get_config_element_by_flow_group_name(
156 self.assertIsNone(output)
158 def test__get_stack_item(self):
159 self.ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
160 with mock.patch.object(
161 self.ixnet_gen, '_get_config_element_by_flow_group_name') as \
162 mock_get_cfg_element:
163 mock_get_cfg_element.return_value = 'cfg_element'
164 output = self.ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
165 self.assertEqual(['tcp1', 'tcp2'], output)
167 def test__get_stack_item_no_config_element(self):
168 with mock.patch.object(
169 self.ixnet_gen, '_get_config_element_by_flow_group_name',
171 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
172 self.ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
174 def test__get_field_in_stack_item(self):
175 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
176 output = self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
177 self.assertEqual('field2', output)
179 def test__get_field_in_stack_item_no_field_present(self):
180 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
181 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
182 self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
184 def test__parse_framesize(self):
185 framesize = {'64B': '75', '512b': '25'}
186 output = self.ixnet_gen._parse_framesize(framesize)
187 self.assertEqual(2, len(output))
188 self.assertIn([64, 64, 75], output)
189 self.assertIn([512, 512, 25], output)
191 def test_add_topology(self):
192 self.ixnet_gen.ixnet.add.return_value = 'obj'
193 self.ixnet_gen.add_topology('topology 1', 'vports')
194 self.ixnet_gen.ixnet.add.assert_called_once_with('my_root', 'topology')
195 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
196 'obj', '-name', 'topology 1', '-vports', 'vports')
197 self.ixnet_gen.ixnet.commit.assert_called_once()
199 def test_add_device_group(self):
200 self.ixnet_gen.ixnet.add.return_value = 'obj'
201 self.ixnet_gen.add_device_group('topology', 'device group 1', '1')
202 self.ixnet_gen.ixnet.add.assert_called_once_with('topology',
204 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
205 'obj', '-name', 'device group 1', '-multiplier', '1')
206 self.ixnet_gen.ixnet.commit.assert_called_once()
208 def test_add_ethernet(self):
209 self.ixnet_gen.ixnet.add.return_value = 'obj'
210 self.ixnet_gen.add_ethernet('device_group', 'ethernet 1')
211 self.ixnet_gen.ixnet.add.assert_called_once_with('device_group',
213 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
214 'obj', '-name', 'ethernet 1')
215 self.ixnet_gen.ixnet.commit.assert_called_once()
217 def test_add_vlans_single(self):
219 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
220 self.ixnet_gen.ixnet.getList.return_value = ['vlan1', 'vlan2']
221 vlan1 = ixnet_api.Vlan(vlan_id=100, tp_id='ethertype88a8', prio=2)
222 vlan2 = ixnet_api.Vlan(vlan_id=101, tp_id='ethertype88a8', prio=3)
223 self.ixnet_gen.add_vlans(obj, [vlan1, vlan2])
224 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('ethernet',
226 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
228 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
230 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
232 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
234 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
235 'attr/singleValue', '-value', 'ethertype88a8')
236 self.assertEqual(self.ixnet.commit.call_count, 2)
238 def test_add_vlans_increment(self):
240 self.ixnet_gen.ixnet.add.return_value = 'obj'
241 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
242 self.ixnet_gen.ixnet.getList.return_value = ['vlan1']
243 vlan = ixnet_api.Vlan(vlan_id=100, vlan_id_step=1, prio=3, prio_step=2)
244 self.ixnet_gen.add_vlans(obj, [vlan])
245 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 100,
247 '-direction', 'increment')
248 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 3,
250 '-direction', 'increment')
252 self.assertEqual(self.ixnet.commit.call_count, 2)
254 def test_add_vlans_invalid(self):
256 self.assertRaises(RuntimeError, self.ixnet_gen.add_vlans, 'obj', vlans)
258 def test_add_ipv4(self):
259 self.ixnet_gen.ixnet.add.return_value = 'obj'
260 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1')
261 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
262 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
265 self.assertEqual(self.ixnet.commit.call_count, 2)
267 def test_add_ipv4_single(self):
268 self.ixnet_gen.ixnet.add.return_value = 'obj'
269 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
270 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1', addr='100.1.1.100',
271 prefix='24', gateway='100.1.1.200')
272 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
273 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
276 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
277 'attr/singleValue', '-value', '100.1.1.100')
278 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
279 'attr/singleValue', '-value', '24')
280 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
281 'attr/singleValue', '-value', '100.1.1.200')
283 self.assertEqual(self.ixnet.commit.call_count, 2)
285 def test_add_ipv4_counter(self):
286 self.ixnet_gen.ixnet.add.return_value = 'obj'
287 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
288 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1',
291 addr_direction='increment',
293 gateway='100.1.1.200',
295 gw_direction='increment')
296 self.ixnet_gen.ixnet.add.assert_any_call('ethernet 1', 'ipv4')
297 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
300 self.ixnet_gen.ixnet.add.assert_any_call('attr', 'counter')
301 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
306 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
307 'attr/singleValue', '-value', '24')
308 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
313 self.assertEqual(self.ixnet.commit.call_count, 2)
315 def test_add_pppox_client(self):
316 self.ixnet_gen.ixnet.add.return_value = 'obj'
317 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
318 self.ixnet_gen.add_pppox_client('ethernet 1', 'pap', 'user', 'pwd')
319 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1',
322 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
323 'attr/singleValue', '-value', 'pap')
324 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
325 'attr/singleValue', '-value', 'user')
326 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
327 'attr/singleValue', '-value', 'pwd')
329 self.assertEqual(self.ixnet.commit.call_count, 2)
331 def test_add_pppox_client_invalid_auth(self):
332 self.ixnet_gen.ixnet.add.return_value = 'obj'
333 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
334 self.assertRaises(NotImplementedError, self.ixnet_gen.add_pppox_client,
335 'ethernet 1', 'invalid_auth', 'user', 'pwd')
337 self.ixnet_gen.ixnet.setMultiAttribute.assert_not_called()
339 def test_add_bgp(self):
340 self.ixnet_gen.ixnet.add.return_value = 'obj'
341 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
342 self.ixnet_gen.add_bgp(ipv4='ipv4 1',
346 self.ixnet_gen.ixnet.add.assert_called_once_with('ipv4 1', 'bgpIpv4Peer')
347 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
348 'attr/singleValue', '-value', '10.0.0.1')
349 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
350 'attr/singleValue', '-value', 65000)
351 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
352 'attr/singleValue', '-value', 'external')
354 @mock.patch.object(IxNetwork, 'IxNet')
355 def test_connect(self, mock_ixnet):
356 mock_ixnet.return_value = self.ixnet
357 with mock.patch.object(self.ixnet_gen, 'get_config') as mock_config:
358 mock_config.return_value = {'machine': 'machine_fake',
361 self.ixnet_gen.connect(mock.ANY)
363 self.ixnet.connect.assert_called_once_with(
364 'machine_fake', '-port', 'port_fake', '-version', '12345')
365 mock_config.assert_called_once()
367 def test_connect_invalid_config_no_machine(self):
368 self.ixnet_gen.get_config = mock.Mock(return_value={
371 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
372 self.ixnet.connect.assert_not_called()
374 def test_connect_invalid_config_no_port(self):
375 self.ixnet_gen.get_config = mock.Mock(return_value={
376 'machine': 'machine_fake',
378 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
379 self.ixnet.connect.assert_not_called()
381 def test_connect_invalid_config_no_version(self):
382 self.ixnet_gen.get_config = mock.Mock(return_value={
383 'machine': 'machine_fake',
384 'port': 'port_fake'})
385 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
386 self.ixnet.connect.assert_not_called()
388 def test_connect_no_config(self):
389 self.ixnet_gen.get_config = mock.Mock(return_value={})
390 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
391 self.ixnet.connect.assert_not_called()
393 def test_clear_config(self):
394 self.ixnet_gen.clear_config()
395 self.ixnet.execute.assert_called_once_with('newConfig')
397 @mock.patch.object(ixnet_api, 'log')
398 def test_assign_ports_2_ports(self, *args):
399 self.ixnet.getAttribute.side_effect = ['up', 'down']
401 'chassis': '1.1.1.1',
404 self.ixnet_gen._cfg = config
406 self.assertIsNone(self.ixnet_gen.assign_ports())
407 self.assertEqual(self.ixnet.execute.call_count, 1)
408 self.assertEqual(self.ixnet.commit.call_count, 3)
409 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
411 @mock.patch.object(ixnet_api, 'log')
412 def test_assign_ports_port_down(self, mock_log):
413 self.ixnet.getAttribute.return_value = 'down'
415 'chassis': '1.1.1.1',
418 self.ixnet_gen._cfg = config
419 self.ixnet_gen.assign_ports()
420 mock_log.warning.assert_called()
422 def test_assign_ports_no_config(self):
423 self.ixnet_gen._cfg = {}
424 self.assertRaises(KeyError, self.ixnet_gen.assign_ports)
426 def test__create_traffic_item(self):
427 self.ixnet.add.return_value = 'my_new_traffic_item'
428 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
430 self.ixnet_gen._create_traffic_item()
431 self.ixnet.add.assert_called_once_with(
432 'my_root/traffic', 'trafficItem')
433 self.ixnet.setMultiAttribute.assert_called_once_with(
434 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
435 self.assertEqual(2, self.ixnet.commit.call_count)
436 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
437 self.ixnet.setAttribute('my_traffic_item_id/tracking',
438 '-trackBy', 'trafficGroupId0')
440 def test__create_flow_groups(self):
441 uplink_endpoints = ['up_endp1', 'up_endp2']
442 downlink_endpoints = ['down_endp1', 'down_endp2']
443 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
444 self.ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2', 'endp3',
446 self.ixnet_gen._create_flow_groups(uplink_endpoints, downlink_endpoints)
447 self.ixnet_gen.ixnet.add.assert_has_calls([
448 mock.call('traffic_item', 'endpointSet'),
449 mock.call('traffic_item', 'endpointSet')])
450 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
451 mock.call('endp1', '-name', '1', '-sources', ['up_endp1'],
452 '-destinations', ['down_endp1']),
453 mock.call('endp2', '-name', '2', '-sources', ['down_endp1'],
454 '-destinations', ['up_endp1']),
455 mock.call('endp3', '-name', '3', '-sources', ['up_endp2'],
456 '-destinations', ['down_endp2']),
457 mock.call('endp4', '-name', '4', '-sources', ['down_endp2'],
458 '-destinations', ['up_endp2'])])
460 def test__append_protocol_to_stack(self):
462 self.ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
463 self.ixnet.execute.assert_called_with(
464 'append', 'prev_element',
465 'my_root/traffic/protocolTemplate:"my_protocol"')
467 def test__setup_config_elements(self):
468 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
470 with mock.patch.object(self.ixnet_gen, '_append_procotol_to_stack') as \
472 self.ixnet_gen._setup_config_elements()
473 mock_append_proto.assert_has_calls([
474 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
475 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
476 self.ixnet_gen.ixnet.setAttribute.assert_has_calls([
477 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
479 mock.call('cfg_element/frameRateDistribution',
480 '-streamDistribution', 'splitRateEvenly')])
482 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
483 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
484 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
485 def test_create_traffic_model(self, mock__setup_config_elements,
486 mock__create_flow_groups,
487 mock__create_traffic_item):
488 uplink_ports = ['port1', 'port3']
489 downlink_ports = ['port2', 'port4']
490 uplink_endpoints = ['port1/protocols', 'port3/protocols']
491 downlink_endpoints = ['port2/protocols', 'port4/protocols']
492 self.ixnet_gen.create_traffic_model(uplink_ports, downlink_ports)
493 mock__create_traffic_item.assert_called_once_with('raw')
494 mock__create_flow_groups.assert_called_once_with(uplink_endpoints,
496 mock__setup_config_elements.assert_called_once()
498 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
499 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
500 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
501 def test_create_ipv4_traffic_model(self, mock__setup_config_elements,
502 mock__create_flow_groups,
503 mock__create_traffic_item):
504 uplink_topologies = ['up1', 'up3']
505 downlink_topologies = ['down2', 'down4']
506 self.ixnet_gen.create_ipv4_traffic_model(uplink_topologies,
508 mock__create_traffic_item.assert_called_once_with('ipv4')
509 mock__create_flow_groups.assert_called_once_with(uplink_topologies,
511 mock__setup_config_elements.assert_called_once_with(False)
513 def test__update_frame_mac(self):
514 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item') as \
516 mock_get_field.return_value = 'field_descriptor'
517 self.ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
518 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
519 self.ixnet_gen.ixnet.setMultiAttribute(
520 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
521 '-valueType', 'singleValue')
522 self.ixnet_gen.ixnet.commit.assert_called_once()
524 def test_update_frame(self):
525 with mock.patch.object(
526 self.ixnet_gen, '_get_config_element_by_flow_group_name',
527 return_value='cfg_element'), \
528 mock.patch.object(self.ixnet_gen, '_update_frame_mac') as \
530 mock.patch.object(self.ixnet_gen, '_get_stack_item') as \
532 mock_get_stack_item.side_effect = [['item1'], ['item2'],
533 ['item3'], ['item4']]
534 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 50)
536 self.assertEqual(6, len(self.ixnet_gen.ixnet.setMultiAttribute.mock_calls))
537 self.assertEqual(4, len(mock_update_frame.mock_calls))
539 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
540 mock.call('cfg_element/transmissionControl',
541 '-type', 'continuous', '-duration', 50)
544 def test_update_frame_qinq(self):
545 with mock.patch.object(self.ixnet_gen,
546 '_get_config_element_by_flow_group_name',
547 return_value='cfg_element'), \
548 mock.patch.object(self.ixnet_gen, '_update_frame_mac'),\
549 mock.patch.object(self.ixnet_gen, '_get_stack_item',
550 return_value='item'), \
551 mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
552 return_value='field'):
554 traffic_parameters = deepcopy(TRAFFIC_PARAMETERS)
555 traffic_parameters[UPLINK]['outer_l2']['QinQ'] = {
556 'S-VLAN': {'id': 128,
559 'C-VLAN': {'id': 512,
564 self.ixnet_gen.update_frame(traffic_parameters, 50)
566 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
567 mock.call('field', '-auto', 'false', '-singleValue', '0x88a8',
568 '-fieldValue', '0x88a8', '-valueType', 'singleValue'),
569 mock.call('field', '-auto', 'false', '-singleValue', 1,
570 '-fieldValue', 1, '-valueType', 'singleValue'),
571 mock.call('field', '-auto', 'false', '-singleValue', 128,
572 '-fieldValue', 128, '-valueType', 'singleValue'),
573 mock.call('field', '-auto', 'false', '-singleValue', 512,
574 '-fieldValue', 512, '-valueType', 'singleValue'),
575 mock.call('field', '-auto', 'false', '-singleValue', 2,
576 '-fieldValue', 2, '-valueType', 'singleValue')
579 def test_update_frame_flow_not_present(self):
580 with mock.patch.object(
581 self.ixnet_gen, '_get_config_element_by_flow_group_name',
583 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
584 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 40)
586 def test_get_statistics(self):
587 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
589 self.ixnet_gen.get_statistics()
591 mock_build_stats.assert_has_calls([
592 mock.call(self.ixnet_gen.PORT_STATISTICS,
593 self.ixnet_gen.PORT_STATS_NAME_MAP),
594 mock.call(self.ixnet_gen.FLOW_STATISTICS,
595 self.ixnet_gen.LATENCY_NAME_MAP)])
597 def test__set_flow_tracking(self):
598 self.ixnet_gen._ixnet.getList.return_value = ['traffic_item']
599 self.ixnet_gen._set_flow_tracking(track_by=['vlanVlanId0'])
600 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with(
601 'traffic_item/tracking', '-trackBy', ['vlanVlanId0'])
602 self.assertEqual(self.ixnet.commit.call_count, 1)
604 def test__set_egress_flow_tracking(self):
605 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
607 self.ixnet_gen._set_egress_flow_tracking(encapsulation='Ethernet',
608 offset='IPv4 TOS Precedence')
609 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
610 'traffic_item', '-egressEnabled', True)
611 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
612 'encapsulation', '-encapsulation', 'Ethernet')
613 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
614 'encapsulation', '-offset', 'IPv4 TOS Precedence')
615 self.assertEqual(self.ixnet.commit.call_count, 2)
617 def test_get_pppoe_scenario_statistics(self):
618 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
620 self.ixnet_gen.get_pppoe_scenario_statistics()
622 mock_build_stats.assert_any_call(self.ixnet_gen.PORT_STATISTICS,
623 self.ixnet_gen.PORT_STATS_NAME_MAP)
624 mock_build_stats.assert_any_call(self.ixnet_gen.FLOW_STATISTICS,
625 self.ixnet_gen.LATENCY_NAME_MAP)
626 mock_build_stats.assert_any_call(self.ixnet_gen.PPPOX_CLIENT_PER_PORT,
627 self.ixnet_gen.PPPOX_CLIENT_PER_PORT_NAME_MAP)
629 def test__update_ipv4_address(self):
630 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
631 return_value='field_desc'):
632 self.ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
634 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
635 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
636 '-randomMask', '0.0.0.63', '-valueType', 'random',
639 def test__update_ipv4_priority_raw(self):
640 priority = {'raw': '0x01'}
641 self.ixnet_gen._set_priority_field = mock.Mock()
642 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
643 return_value='field_desc'):
644 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
646 self.ixnet_gen._set_priority_field.assert_called_once_with(
647 'field_desc', priority['raw'])
649 def test__update_ipv4_priority_dscp(self):
650 priority = {'dscp': {'defaultPHB': [0, 1, 2, 3]}}
651 self.ixnet_gen._set_priority_field = mock.Mock()
652 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
653 return_value='field_desc'):
654 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
656 self.ixnet_gen._set_priority_field.assert_called_once_with(
657 'field_desc', priority['dscp']['defaultPHB'])
659 def test__update_ipv4_priority_tos(self):
660 priority = {'tos': {'precedence': [0, 4, 7]}}
661 self.ixnet_gen._set_priority_field = mock.Mock()
662 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
663 return_value='field_desc'):
664 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
666 self.ixnet_gen._set_priority_field.assert_called_once_with(
667 'field_desc', priority['tos']['precedence'])
669 def test__update_ipv4_priority_wrong_priority_type(self):
670 priority = {'test': [0, 4, 7]}
671 self.ixnet_gen._set_priority_field = mock.Mock()
672 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
673 return_value='field_desc'):
674 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
676 self.assertEqual(self.ixnet_gen._set_priority_field.call_count, 0)
678 def test__set_priority_field_list_value(self):
680 self.ixnet_gen._set_priority_field('field_desc', value)
681 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
683 '-valueList', [1, 4, 7],
684 '-activeFieldChoice', 'true',
685 '-valueType', 'valueList')
687 def test__set_priority_field_single_value(self):
689 self.ixnet_gen._set_priority_field('field_desc', value)
690 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
692 '-activeFieldChoice', 'true',
695 def test__update_udp_port(self):
696 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
697 return_value='field_desc'):
698 self.ixnet_gen._update_udp_port(mock.ANY, mock.ANY, 1234,
701 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
707 '-valueType', 'random',
710 def test_update_ip_packet(self):
711 with mock.patch.object(self.ixnet_gen, '_update_ipv4_address') as \
713 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
714 mock.patch.object(self.ixnet_gen,
715 '_get_config_element_by_flow_group_name', return_value='celm'), \
716 mock.patch.object(self.ixnet_gen, '_update_ipv4_priority') as \
717 mock_update_priority:
718 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
720 self.assertEqual(4, len(mock_update_add.mock_calls))
721 self.assertEqual(2, len(mock_update_priority.mock_calls))
723 def test_update_ip_packet_exception_no_config_element(self):
724 with mock.patch.object(self.ixnet_gen,
725 '_get_config_element_by_flow_group_name',
727 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
728 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
730 def test_update_l4(self):
731 with mock.patch.object(self.ixnet_gen, '_update_udp_port') as \
733 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
734 mock.patch.object(self.ixnet_gen,
735 '_get_config_element_by_flow_group_name', return_value='celm'):
736 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
738 self.assertEqual(4, len(mock_update_udp.mock_calls))
740 def test_update_l4_exception_no_config_element(self):
741 with mock.patch.object(self.ixnet_gen,
742 '_get_config_element_by_flow_group_name',
744 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
745 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
747 def test_update_l4_exception_no_supported_proto(self):
748 traffic_parameters = {
752 'proto': 'unsupported',
759 with mock.patch.object(self.ixnet_gen,
760 '_get_config_element_by_flow_group_name',
761 return_value='celm'):
762 with self.assertRaises(exceptions.IXIAUnsupportedProtocol):
763 self.ixnet_gen.update_l4(traffic_parameters)
765 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
766 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
767 self.ixnet_gen._ixnet.getList.return_value = [0]
769 mock_ixnextgen_get_traffic_state.side_effect = [
770 'stopped', 'started', 'started', 'started']
772 result = self.ixnet_gen.start_traffic()
773 self.assertIsNone(result)
774 self.ixnet.getList.assert_called_once()
775 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
777 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
778 def test_start_traffic_traffic_running(
779 self, mock_ixnextgen_get_traffic_state):
780 self.ixnet_gen._ixnet.getList.return_value = [0]
781 mock_ixnextgen_get_traffic_state.side_effect = [
782 'started', 'stopped', 'started']
784 result = self.ixnet_gen.start_traffic()
785 self.assertIsNone(result)
786 self.ixnet.getList.assert_called_once()
787 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
789 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
790 def test_start_traffic_wait_for_traffic_to_stop(
791 self, mock_ixnextgen_get_traffic_state):
792 self.ixnet_gen._ixnet.getList.return_value = [0]
793 mock_ixnextgen_get_traffic_state.side_effect = [
794 'started', 'started', 'started', 'stopped', 'started']
796 result = self.ixnet_gen.start_traffic()
797 self.assertIsNone(result)
798 self.ixnet.getList.assert_called_once()
799 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
801 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
802 def test_start_traffic_wait_for_traffic_start(
803 self, mock_ixnextgen_get_traffic_state):
804 self.ixnet_gen._ixnet.getList.return_value = [0]
805 mock_ixnextgen_get_traffic_state.side_effect = [
806 'stopped', 'stopped', 'stopped', 'started']
808 result = self.ixnet_gen.start_traffic()
809 self.assertIsNone(result)
810 self.ixnet.getList.assert_called_once()
811 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
813 def test__get_protocol_status(self):
814 self.ixnet.getAttribute.return_value = ['up']
815 self.ixnet_gen._get_protocol_status('ipv4')
816 self.ixnet.getAttribute.assert_called_once_with('ipv4',
819 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
820 def test_is_protocols_running(self, mock_ixnextgen_get_protocol_status):
821 mock_ixnextgen_get_protocol_status.return_value = 'up'
822 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
823 self.assertTrue(result)
825 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
826 def test_is_protocols_stopped(self, mock_ixnextgen_get_protocol_status):
827 mock_ixnextgen_get_protocol_status.return_value = 'down'
828 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
829 self.assertFalse(result)
831 def test_start_protocols(self):
832 self.ixnet_gen.start_protocols()
833 self.ixnet.execute.assert_called_once_with('startAllProtocols')
835 def test_stop_protocols(self):
836 self.ixnet_gen.stop_protocols()
837 self.ixnet.execute.assert_called_once_with('stopAllProtocols')
839 def test_get_vports(self):
840 self.ixnet_gen._ixnet.getRoot.return_value = 'root'
841 self.ixnet_gen.get_vports()
842 self.ixnet.getList.assert_called_once_with('root', 'vport')