1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from copy import deepcopy
21 from yardstick.common import exceptions
22 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
28 TRAFFIC_PARAMETERS = {
36 'framesize': {'64B': '25', '256B': '75'},
46 'dstip': '152.16.40.20',
47 'srcip': '152.16.100.20',
50 'priority': {'raw': '0x01'}
60 'traffic_type': 'continuous'
69 'framesize': {'128B': '35', '1024B': '65'},
83 'priority': {'raw': '0x01'}
93 'traffic_type': 'continuous'
98 class TestIxNextgen(unittest.TestCase):
101 self.ixnet = mock.Mock()
102 self.ixnet.execute = mock.Mock()
103 self.ixnet.getRoot.return_value = 'my_root'
104 self.ixnet_gen = ixnet_api.IxNextgen()
105 self.ixnet_gen._ixnet = self.ixnet
106 self._mock_log = mock.patch.object(ixnet_api.log, 'info')
107 self.mock_log = self._mock_log.start()
108 self.addCleanup(self._stop_mocks)
110 def _stop_mocks(self):
113 def test_get_config(self):
117 'external-interface': [
118 {'virtual-interface': {'vpci': '0000:07:00.1'}},
119 {'virtual-interface': {'vpci': '0001:08:01.2'}}
126 'dut_result_dir': 'test2',
128 'ixchassis': 'test4',
138 'cards': ['0000', '0001'],
139 'ports': ['07', '08'],
140 'output_dir': 'test2',
145 result = ixnet_api.IxNextgen.get_config(tg_cfg)
146 self.assertEqual(result, expected)
148 def test__get_config_element_by_flow_group_name(self):
149 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
151 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
152 output = self.ixnet_gen._get_config_element_by_flow_group_name(
154 self.assertEqual('traffic_item/configElement:flow_group_01', output)
156 def test__get_config_element_by_flow_group_name_no_match(self):
157 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
159 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
160 output = self.ixnet_gen._get_config_element_by_flow_group_name(
162 self.assertIsNone(output)
164 def test__get_stack_item(self):
165 self.ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
166 with mock.patch.object(
167 self.ixnet_gen, '_get_config_element_by_flow_group_name') as \
168 mock_get_cfg_element:
169 mock_get_cfg_element.return_value = 'cfg_element'
170 output = self.ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
171 self.assertEqual(['tcp1', 'tcp2'], output)
173 def test__get_stack_item_no_config_element(self):
174 with mock.patch.object(
175 self.ixnet_gen, '_get_config_element_by_flow_group_name',
177 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
178 self.ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
180 def test__get_field_in_stack_item(self):
181 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
182 output = self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
183 self.assertEqual('field2', output)
185 def test__get_field_in_stack_item_no_field_present(self):
186 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
187 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
188 self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
190 def test__parse_framesize(self):
191 framesize = {'64B': '75', '512b': '25'}
192 output = self.ixnet_gen._parse_framesize(framesize)
193 self.assertEqual(2, len(output))
194 self.assertIn([64, 64, 75], output)
195 self.assertIn([512, 512, 25], output)
197 def test_add_topology(self):
198 self.ixnet_gen.ixnet.add.return_value = 'obj'
199 self.ixnet_gen.add_topology('topology 1', 'vports')
200 self.ixnet_gen.ixnet.add.assert_called_once_with('my_root', 'topology')
201 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
202 'obj', '-name', 'topology 1', '-vports', 'vports')
203 self.ixnet_gen.ixnet.commit.assert_called_once()
205 def test_add_device_group(self):
206 self.ixnet_gen.ixnet.add.return_value = 'obj'
207 self.ixnet_gen.add_device_group('topology', 'device group 1', '1')
208 self.ixnet_gen.ixnet.add.assert_called_once_with('topology',
210 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
211 'obj', '-name', 'device group 1', '-multiplier', '1')
212 self.ixnet_gen.ixnet.commit.assert_called_once()
214 def test_add_ethernet(self):
215 self.ixnet_gen.ixnet.add.return_value = 'obj'
216 self.ixnet_gen.add_ethernet('device_group', 'ethernet 1')
217 self.ixnet_gen.ixnet.add.assert_called_once_with('device_group',
219 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
220 'obj', '-name', 'ethernet 1')
221 self.ixnet_gen.ixnet.commit.assert_called_once()
223 def test_add_vlans_single(self):
225 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
226 self.ixnet_gen.ixnet.getList.return_value = ['vlan1', 'vlan2']
227 vlan1 = ixnet_api.Vlan(vlan_id=100, tp_id='ethertype88a8', prio=2)
228 vlan2 = ixnet_api.Vlan(vlan_id=101, tp_id='ethertype88a8', prio=3)
229 self.ixnet_gen.add_vlans(obj, [vlan1, vlan2])
230 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('ethernet',
232 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
234 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
236 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
238 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
240 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
241 'attr/singleValue', '-value', 'ethertype88a8')
242 self.assertEqual(self.ixnet.commit.call_count, 2)
244 def test_add_vlans_increment(self):
246 self.ixnet_gen.ixnet.add.return_value = 'obj'
247 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
248 self.ixnet_gen.ixnet.getList.return_value = ['vlan1']
249 vlan = ixnet_api.Vlan(vlan_id=100, vlan_id_step=1, prio=3, prio_step=2)
250 self.ixnet_gen.add_vlans(obj, [vlan])
251 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 100,
253 '-direction', 'increment')
254 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 3,
256 '-direction', 'increment')
258 self.assertEqual(self.ixnet.commit.call_count, 2)
260 def test_add_vlans_invalid(self):
262 self.assertRaises(RuntimeError, self.ixnet_gen.add_vlans, 'obj', vlans)
264 def test_add_ipv4(self):
265 self.ixnet_gen.ixnet.add.return_value = 'obj'
266 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1')
267 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
268 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
271 self.assertEqual(self.ixnet.commit.call_count, 2)
273 def test_add_ipv4_single(self):
274 self.ixnet_gen.ixnet.add.return_value = 'obj'
275 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
276 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1', addr='100.1.1.100',
277 prefix='24', gateway='100.1.1.200')
278 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
279 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
282 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
283 'attr/singleValue', '-value', '100.1.1.100')
284 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
285 'attr/singleValue', '-value', '24')
286 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
287 'attr/singleValue', '-value', '100.1.1.200')
289 self.assertEqual(self.ixnet.commit.call_count, 2)
291 def test_add_ipv4_counter(self):
292 self.ixnet_gen.ixnet.add.return_value = 'obj'
293 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
294 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1',
297 addr_direction='increment',
299 gateway='100.1.1.200',
301 gw_direction='increment')
302 self.ixnet_gen.ixnet.add.assert_any_call('ethernet 1', 'ipv4')
303 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
306 self.ixnet_gen.ixnet.add.assert_any_call('attr', 'counter')
307 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
312 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
313 'attr/singleValue', '-value', '24')
314 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
319 self.assertEqual(self.ixnet.commit.call_count, 2)
321 def test_add_pppox_client(self):
322 self.ixnet_gen.ixnet.add.return_value = 'obj'
323 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
324 self.ixnet_gen.add_pppox_client('ethernet 1', 'pap', 'user', 'pwd')
325 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1',
328 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
329 'attr/singleValue', '-value', 'pap')
330 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
331 'attr/singleValue', '-value', 'user')
332 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
333 'attr/singleValue', '-value', 'pwd')
335 self.assertEqual(self.ixnet.commit.call_count, 2)
337 def test_add_pppox_client_invalid_auth(self):
338 self.ixnet_gen.ixnet.add.return_value = 'obj'
339 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
340 self.assertRaises(NotImplementedError, self.ixnet_gen.add_pppox_client,
341 'ethernet 1', 'invalid_auth', 'user', 'pwd')
343 self.ixnet_gen.ixnet.setMultiAttribute.assert_not_called()
345 def test_add_bgp(self):
346 self.ixnet_gen.ixnet.add.return_value = 'obj'
347 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
348 self.ixnet_gen.add_bgp(ipv4='ipv4 1',
352 self.ixnet_gen.ixnet.add.assert_called_once_with('ipv4 1', 'bgpIpv4Peer')
353 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
354 'attr/singleValue', '-value', '10.0.0.1')
355 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
356 'attr/singleValue', '-value', 65000)
357 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
358 'attr/singleValue', '-value', 'external')
360 def test_add_interface(self):
361 self.ixnet_gen.ixnet.add.return_value = 'obj'
362 self.ixnet_gen.add_interface(vport='vport',
364 mac='00:00:00:00:00:00',
366 self.ixnet_gen.ixnet.add.assert_any_call('vport', 'interface')
367 self.ixnet_gen.ixnet.add.assert_any_call('obj', 'ipv4')
368 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
369 'obj/ethernet', '-macAddress', '00:00:00:00:00:00')
370 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
371 'obj', '-ip', '10.0.0.2')
372 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
373 'obj', '-gateway', '10.0.0.1')
374 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
375 'obj', '-enabled', 'true')
377 def test_add_static_ipv4(self):
378 self.ixnet_gen.ixnet.add.return_value = 'obj'
379 self.ixnet_gen.add_static_ipv4(iface='iface',
384 self.ixnet_gen.ixnet.add.assert_called_once_with(
385 'vport/protocols/static', 'ip')
386 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
387 'obj', '-protocolInterface', 'iface',
388 '-ipStart', '10.0.0.0',
393 @mock.patch.object(IxNetwork, 'IxNet')
394 def test_connect(self, mock_ixnet):
395 mock_ixnet.return_value = self.ixnet
396 with mock.patch.object(self.ixnet_gen, 'get_config') as mock_config:
397 mock_config.return_value = {'machine': 'machine_fake',
400 self.ixnet_gen.connect(mock.ANY)
402 self.ixnet.connect.assert_called_once_with(
403 'machine_fake', '-port', 'port_fake', '-version', '12345')
404 mock_config.assert_called_once()
406 def test_connect_invalid_config_no_machine(self):
407 self.ixnet_gen.get_config = mock.Mock(return_value={
410 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
411 self.ixnet.connect.assert_not_called()
413 def test_connect_invalid_config_no_port(self):
414 self.ixnet_gen.get_config = mock.Mock(return_value={
415 'machine': 'machine_fake',
417 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
418 self.ixnet.connect.assert_not_called()
420 def test_connect_invalid_config_no_version(self):
421 self.ixnet_gen.get_config = mock.Mock(return_value={
422 'machine': 'machine_fake',
423 'port': 'port_fake'})
424 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
425 self.ixnet.connect.assert_not_called()
427 def test_connect_no_config(self):
428 self.ixnet_gen.get_config = mock.Mock(return_value={})
429 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
430 self.ixnet.connect.assert_not_called()
432 def test_clear_config(self):
433 self.ixnet_gen.clear_config()
434 self.ixnet.execute.assert_called_once_with('newConfig')
436 @mock.patch.object(ixnet_api, 'log')
437 def test_assign_ports_2_ports(self, *args):
438 self.ixnet.getAttribute.side_effect = ['up', 'down']
440 'chassis': '1.1.1.1',
443 self.ixnet_gen._cfg = config
445 self.assertIsNone(self.ixnet_gen.assign_ports())
446 self.assertEqual(self.ixnet.execute.call_count, 1)
447 self.assertEqual(self.ixnet.commit.call_count, 3)
448 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
450 @mock.patch.object(ixnet_api, 'log')
451 def test_assign_ports_port_down(self, mock_log):
452 self.ixnet.getAttribute.return_value = 'down'
454 'chassis': '1.1.1.1',
457 self.ixnet_gen._cfg = config
458 self.ixnet_gen.assign_ports()
459 mock_log.warning.assert_called()
461 def test_assign_ports_no_config(self):
462 self.ixnet_gen._cfg = {}
463 self.assertRaises(KeyError, self.ixnet_gen.assign_ports)
465 def test__create_traffic_item(self):
466 self.ixnet.add.return_value = 'my_new_traffic_item'
467 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
469 self.ixnet_gen._create_traffic_item()
470 self.ixnet.add.assert_called_once_with(
471 'my_root/traffic', 'trafficItem')
472 self.ixnet.setMultiAttribute.assert_called_once_with(
473 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
474 self.assertEqual(2, self.ixnet.commit.call_count)
475 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
476 self.ixnet.setAttribute('my_traffic_item_id/tracking',
477 '-trackBy', 'trafficGroupId0')
479 def test__create_flow_groups(self):
480 uplink_endpoints = ['up_endp1', 'up_endp2']
481 downlink_endpoints = ['down_endp1', 'down_endp2']
482 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
483 self.ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2', 'endp3',
485 self.ixnet_gen._create_flow_groups(uplink_endpoints, downlink_endpoints)
486 self.ixnet_gen.ixnet.add.assert_has_calls([
487 mock.call('traffic_item', 'endpointSet'),
488 mock.call('traffic_item', 'endpointSet')])
489 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
490 mock.call('endp1', '-name', '1', '-sources', ['up_endp1'],
491 '-destinations', ['down_endp1']),
492 mock.call('endp2', '-name', '2', '-sources', ['down_endp1'],
493 '-destinations', ['up_endp1']),
494 mock.call('endp3', '-name', '3', '-sources', ['up_endp2'],
495 '-destinations', ['down_endp2']),
496 mock.call('endp4', '-name', '4', '-sources', ['down_endp2'],
497 '-destinations', ['up_endp2'])])
499 def test__append_protocol_to_stack(self):
501 self.ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
502 self.ixnet.execute.assert_called_with(
503 'append', 'prev_element',
504 'my_root/traffic/protocolTemplate:"my_protocol"')
506 def test__setup_config_elements(self):
507 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
509 with mock.patch.object(self.ixnet_gen, '_append_procotol_to_stack') as \
511 self.ixnet_gen._setup_config_elements()
512 mock_append_proto.assert_has_calls([
513 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
514 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
515 self.ixnet_gen.ixnet.setAttribute.assert_has_calls([
516 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
518 mock.call('cfg_element/frameRateDistribution',
519 '-streamDistribution', 'splitRateEvenly')])
521 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
522 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
523 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
524 def test_create_traffic_model(self, mock__setup_config_elements,
525 mock__create_flow_groups,
526 mock__create_traffic_item):
527 uplink_ports = ['port1', 'port3']
528 downlink_ports = ['port2', 'port4']
529 uplink_endpoints = ['port1/protocols', 'port3/protocols']
530 downlink_endpoints = ['port2/protocols', 'port4/protocols']
531 self.ixnet_gen.create_traffic_model(uplink_ports, downlink_ports)
532 mock__create_traffic_item.assert_called_once_with('raw')
533 mock__create_flow_groups.assert_called_once_with(uplink_endpoints,
535 mock__setup_config_elements.assert_called_once()
537 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
538 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
539 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
540 def test_create_ipv4_traffic_model(self, mock__setup_config_elements,
541 mock__create_flow_groups,
542 mock__create_traffic_item):
543 uplink_topologies = ['up1', 'up3']
544 downlink_topologies = ['down2', 'down4']
545 self.ixnet_gen.create_ipv4_traffic_model(uplink_topologies,
547 mock__create_traffic_item.assert_called_once_with('ipv4')
548 mock__create_flow_groups.assert_called_once_with(uplink_topologies,
550 mock__setup_config_elements.assert_called_once_with(False)
552 def test__update_frame_mac(self):
553 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item') as \
555 mock_get_field.return_value = 'field_descriptor'
556 self.ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
557 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
558 self.ixnet_gen.ixnet.setMultiAttribute(
559 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
560 '-valueType', 'singleValue')
561 self.ixnet_gen.ixnet.commit.assert_called_once()
563 def test_update_frame(self):
564 with mock.patch.object(
565 self.ixnet_gen, '_get_config_element_by_flow_group_name',
566 return_value='cfg_element'), \
567 mock.patch.object(self.ixnet_gen, '_update_frame_mac') as \
569 mock.patch.object(self.ixnet_gen, '_get_stack_item') as \
571 mock_get_stack_item.side_effect = [['item1'], ['item2'],
572 ['item3'], ['item4']]
573 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 50)
575 self.assertEqual(6, len(self.ixnet_gen.ixnet.setMultiAttribute.mock_calls))
576 self.assertEqual(4, len(mock_update_frame.mock_calls))
578 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
579 mock.call('cfg_element/transmissionControl',
580 '-type', 'continuous', '-duration', 50)
583 def test_update_frame_qinq(self):
584 with mock.patch.object(self.ixnet_gen,
585 '_get_config_element_by_flow_group_name',
586 return_value='cfg_element'), \
587 mock.patch.object(self.ixnet_gen, '_update_frame_mac'),\
588 mock.patch.object(self.ixnet_gen, '_get_stack_item',
589 return_value='item'), \
590 mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
591 return_value='field'):
593 traffic_parameters = deepcopy(TRAFFIC_PARAMETERS)
594 traffic_parameters[UPLINK]['outer_l2']['QinQ'] = {
595 'S-VLAN': {'id': 128,
598 'C-VLAN': {'id': 512,
603 self.ixnet_gen.update_frame(traffic_parameters, 50)
605 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
606 mock.call('field', '-auto', 'false', '-singleValue', '0x88a8',
607 '-fieldValue', '0x88a8', '-valueType', 'singleValue'),
608 mock.call('field', '-auto', 'false', '-singleValue', 1,
609 '-fieldValue', 1, '-valueType', 'singleValue'),
610 mock.call('field', '-auto', 'false', '-singleValue', 128,
611 '-fieldValue', 128, '-valueType', 'singleValue'),
612 mock.call('field', '-auto', 'false', '-singleValue', 512,
613 '-fieldValue', 512, '-valueType', 'singleValue'),
614 mock.call('field', '-auto', 'false', '-singleValue', 2,
615 '-fieldValue', 2, '-valueType', 'singleValue')
618 def test_update_frame_flow_not_present(self):
619 with mock.patch.object(
620 self.ixnet_gen, '_get_config_element_by_flow_group_name',
622 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
623 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 40)
625 def test_get_statistics(self):
626 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
628 self.ixnet_gen.get_statistics()
630 mock_build_stats.assert_has_calls([
631 mock.call(self.ixnet_gen.PORT_STATISTICS,
632 self.ixnet_gen.PORT_STATS_NAME_MAP),
633 mock.call(self.ixnet_gen.FLOW_STATISTICS,
634 self.ixnet_gen.LATENCY_NAME_MAP)])
636 def test__set_flow_tracking(self):
637 self.ixnet_gen._ixnet.getList.return_value = ['traffic_item']
638 self.ixnet_gen._set_flow_tracking(track_by=['vlanVlanId0'])
639 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with(
640 'traffic_item/tracking', '-trackBy', ['vlanVlanId0'])
641 self.assertEqual(self.ixnet.commit.call_count, 1)
643 def test__set_egress_flow_tracking(self):
644 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
646 self.ixnet_gen._set_egress_flow_tracking(encapsulation='Ethernet',
647 offset='IPv4 TOS Precedence')
648 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
649 'traffic_item', '-egressEnabled', True)
650 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
651 'encapsulation', '-encapsulation', 'Ethernet')
652 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
653 'encapsulation', '-offset', 'IPv4 TOS Precedence')
654 self.assertEqual(self.ixnet.commit.call_count, 2)
656 def test_get_pppoe_scenario_statistics(self):
657 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
659 self.ixnet_gen.get_pppoe_scenario_statistics()
661 mock_build_stats.assert_any_call(self.ixnet_gen.PORT_STATISTICS,
662 self.ixnet_gen.PORT_STATS_NAME_MAP)
663 mock_build_stats.assert_any_call(self.ixnet_gen.FLOW_STATISTICS,
664 self.ixnet_gen.LATENCY_NAME_MAP)
665 mock_build_stats.assert_any_call(self.ixnet_gen.PPPOX_CLIENT_PER_PORT,
666 self.ixnet_gen.PPPOX_CLIENT_PER_PORT_NAME_MAP)
668 def test__update_ipv4_address(self):
669 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
670 return_value='field_desc'):
671 self.ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
673 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
674 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
675 '-randomMask', '0.0.0.63', '-valueType', 'random',
678 def test__update_ipv4_priority_raw(self):
679 priority = {'raw': '0x01'}
680 self.ixnet_gen._set_priority_field = mock.Mock()
681 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
682 return_value='field_desc'):
683 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
685 self.ixnet_gen._set_priority_field.assert_called_once_with(
686 'field_desc', priority['raw'])
688 def test__update_ipv4_priority_dscp(self):
689 priority = {'dscp': {'defaultPHB': [0, 1, 2, 3]}}
690 self.ixnet_gen._set_priority_field = mock.Mock()
691 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
692 return_value='field_desc'):
693 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
695 self.ixnet_gen._set_priority_field.assert_called_once_with(
696 'field_desc', priority['dscp']['defaultPHB'])
698 def test__update_ipv4_priority_tos(self):
699 priority = {'tos': {'precedence': [0, 4, 7]}}
700 self.ixnet_gen._set_priority_field = mock.Mock()
701 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
702 return_value='field_desc'):
703 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
705 self.ixnet_gen._set_priority_field.assert_called_once_with(
706 'field_desc', priority['tos']['precedence'])
708 def test__update_ipv4_priority_wrong_priority_type(self):
709 priority = {'test': [0, 4, 7]}
710 self.ixnet_gen._set_priority_field = mock.Mock()
711 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
712 return_value='field_desc'):
713 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
715 self.ixnet_gen._set_priority_field.assert_not_called()
717 def test__update_ipv4_priority_not_supported_dscp_class(self):
718 priority = {'dscp': {'testPHB': [0, 4, 7]}}
719 self.ixnet_gen._set_priority_field = mock.Mock()
720 self.ixnet_gen._get_field_in_stack_item = mock.Mock()
721 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
722 self.ixnet_gen._set_priority_field.assert_not_called()
723 self.ixnet_gen._get_field_in_stack_item.assert_not_called()
725 def test__update_ipv4_priority_not_supported_tos_field(self):
726 priority = {'tos': {'test': [0, 4, 7]}}
727 self.ixnet_gen._set_priority_field = mock.Mock()
728 self.ixnet_gen._get_field_in_stack_item = mock.Mock()
729 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
730 self.ixnet_gen._set_priority_field.assert_not_called()
731 self.ixnet_gen._get_field_in_stack_item.assert_not_called()
733 def test__set_priority_field_list_value(self):
735 self.ixnet_gen._set_priority_field('field_desc', value)
736 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
738 '-valueList', [1, 4, 7],
739 '-activeFieldChoice', 'true',
740 '-valueType', 'valueList')
742 def test__set_priority_field_single_value(self):
744 self.ixnet_gen._set_priority_field('field_desc', value)
745 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
747 '-activeFieldChoice', 'true',
750 def test__update_udp_port(self):
751 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
752 return_value='field_desc'):
753 self.ixnet_gen._update_udp_port(mock.ANY, mock.ANY, 1234,
756 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
762 '-valueType', 'random',
765 def test_update_ip_packet(self):
766 with mock.patch.object(self.ixnet_gen, '_update_ipv4_address') as \
768 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
769 mock.patch.object(self.ixnet_gen,
770 '_get_config_element_by_flow_group_name', return_value='celm'), \
771 mock.patch.object(self.ixnet_gen, '_update_ipv4_priority') as \
772 mock_update_priority:
773 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
775 self.assertEqual(4, len(mock_update_add.mock_calls))
776 self.assertEqual(2, len(mock_update_priority.mock_calls))
778 def test_update_ip_packet_exception_no_config_element(self):
779 with mock.patch.object(self.ixnet_gen,
780 '_get_config_element_by_flow_group_name',
782 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
783 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
785 def test_update_l4(self):
786 with mock.patch.object(self.ixnet_gen, '_update_udp_port') as \
788 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
789 mock.patch.object(self.ixnet_gen,
790 '_get_config_element_by_flow_group_name', return_value='celm'):
791 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
793 self.assertEqual(4, len(mock_update_udp.mock_calls))
795 def test_update_l4_exception_no_config_element(self):
796 with mock.patch.object(self.ixnet_gen,
797 '_get_config_element_by_flow_group_name',
799 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
800 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
802 def test_update_l4_exception_no_supported_proto(self):
803 traffic_parameters = {
807 'proto': 'unsupported',
814 with mock.patch.object(self.ixnet_gen,
815 '_get_config_element_by_flow_group_name',
816 return_value='celm'):
817 with self.assertRaises(exceptions.IXIAUnsupportedProtocol):
818 self.ixnet_gen.update_l4(traffic_parameters)
820 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
821 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
822 self.ixnet_gen._ixnet.getList.return_value = [0]
824 mock_ixnextgen_get_traffic_state.side_effect = [
825 'stopped', 'started', 'started', 'started']
827 result = self.ixnet_gen.start_traffic()
828 self.assertIsNone(result)
829 self.ixnet.getList.assert_called_once()
830 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
832 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
833 def test_start_traffic_traffic_running(
834 self, mock_ixnextgen_get_traffic_state):
835 self.ixnet_gen._ixnet.getList.return_value = [0]
836 mock_ixnextgen_get_traffic_state.side_effect = [
837 'started', 'stopped', 'started']
839 result = self.ixnet_gen.start_traffic()
840 self.assertIsNone(result)
841 self.ixnet.getList.assert_called_once()
842 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
844 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
845 def test_start_traffic_wait_for_traffic_to_stop(
846 self, mock_ixnextgen_get_traffic_state):
847 self.ixnet_gen._ixnet.getList.return_value = [0]
848 mock_ixnextgen_get_traffic_state.side_effect = [
849 'started', 'started', 'started', 'stopped', 'started']
851 result = self.ixnet_gen.start_traffic()
852 self.assertIsNone(result)
853 self.ixnet.getList.assert_called_once()
854 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
856 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
857 def test_start_traffic_wait_for_traffic_start(
858 self, mock_ixnextgen_get_traffic_state):
859 self.ixnet_gen._ixnet.getList.return_value = [0]
860 mock_ixnextgen_get_traffic_state.side_effect = [
861 'stopped', 'stopped', 'stopped', 'started']
863 result = self.ixnet_gen.start_traffic()
864 self.assertIsNone(result)
865 self.ixnet.getList.assert_called_once()
866 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
868 def test__get_protocol_status(self):
869 self.ixnet.getAttribute.return_value = ['up']
870 self.ixnet_gen._get_protocol_status('ipv4')
871 self.ixnet.getAttribute.assert_called_once_with('ipv4',
874 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
875 def test_is_protocols_running(self, mock_ixnextgen_get_protocol_status):
876 mock_ixnextgen_get_protocol_status.return_value = ['up', 'up']
877 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
878 self.assertTrue(result)
880 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
881 def test_is_protocols_stopped(self, mock_ixnextgen_get_protocol_status):
882 mock_ixnextgen_get_protocol_status.return_value = ['down', 'down']
883 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
884 self.assertFalse(result)
886 def test_start_protocols(self):
887 self.ixnet_gen.start_protocols()
888 self.ixnet.execute.assert_called_once_with('startAllProtocols')
890 def test_stop_protocols(self):
891 self.ixnet_gen.stop_protocols()
892 self.ixnet.execute.assert_called_once_with('stopAllProtocols')
894 def test_get_vports(self):
895 self.ixnet_gen._ixnet.getRoot.return_value = 'root'
896 self.ixnet_gen.get_vports()
897 self.ixnet.getList.assert_called_once_with('root', 'vport')