1 # Copyright (c) 2018-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from copy import deepcopy
21 from collections import OrderedDict
23 from yardstick.common import exceptions
24 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
30 TRAFFIC_PARAMETERS = {
38 'framesize': {'64B': '25', '256B': '75'},
48 'dstip': '152.16.40.20',
49 'srcip': '152.16.100.20',
52 'priority': {'raw': '0x01'}
62 'traffic_type': 'continuous'
71 'framesize': {'128B': '35', '1024B': '65'},
85 'priority': {'raw': '0x01'}
95 'traffic_type': 'continuous'
100 class TestIxNextgen(unittest.TestCase):
103 self.ixnet = mock.Mock()
104 self.ixnet.execute = mock.Mock()
105 self.ixnet.getRoot.return_value = 'my_root'
106 self.ixnet_gen = ixnet_api.IxNextgen()
107 self.ixnet_gen._ixnet = self.ixnet
108 self._mock_log = mock.patch.object(ixnet_api.log, 'info')
109 self.mock_log = self._mock_log.start()
110 self.addCleanup(self._stop_mocks)
112 def _stop_mocks(self):
115 def test_get_config(self):
119 'external-interface': [
120 {'virtual-interface': {'vpci': '0000:07:00.1'}},
121 {'virtual-interface': {'vpci': '0001:08:01.2'}}
128 'dut_result_dir': 'test2',
130 'ixchassis': 'test4',
140 'cards': ['0000', '0001'],
141 'ports': ['07', '08'],
142 'output_dir': 'test2',
147 result = ixnet_api.IxNextgen.get_config(tg_cfg)
148 self.assertEqual(result, expected)
150 def test__get_config_element_by_flow_group_name(self):
151 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
153 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
154 output = self.ixnet_gen._get_config_element_by_flow_group_name(
156 self.assertEqual('traffic_item/configElement:flow_group_01', output)
158 def test__get_config_element_by_flow_group_name_no_match(self):
159 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
161 self.ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
162 output = self.ixnet_gen._get_config_element_by_flow_group_name(
164 self.assertIsNone(output)
166 def test__get_stack_item(self):
167 self.ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
168 with mock.patch.object(
169 self.ixnet_gen, '_get_config_element_by_flow_group_name') as \
170 mock_get_cfg_element:
171 mock_get_cfg_element.return_value = 'cfg_element'
172 output = self.ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
173 self.assertEqual(['tcp1', 'tcp2'], output)
175 def test__get_stack_item_no_config_element(self):
176 with mock.patch.object(
177 self.ixnet_gen, '_get_config_element_by_flow_group_name',
179 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
180 self.ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
182 def test__get_field_in_stack_item(self):
183 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
184 output = self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
185 self.assertEqual('field2', output)
187 def test__get_field_in_stack_item_no_field_present(self):
188 self.ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
189 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
190 self.ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
192 def test__parse_framesize(self):
193 framesize = {'64B': '75', '512b': '25'}
194 output = self.ixnet_gen._parse_framesize(framesize)
195 self.assertEqual(2, len(output))
196 self.assertIn([64, 64, 75], output)
197 self.assertIn([512, 512, 25], output)
199 def test_add_topology(self):
200 self.ixnet_gen.ixnet.add.return_value = 'obj'
201 self.ixnet_gen.add_topology('topology 1', 'vports')
202 self.ixnet_gen.ixnet.add.assert_called_once_with('my_root', 'topology')
203 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
204 'obj', '-name', 'topology 1', '-vports', 'vports')
205 self.ixnet_gen.ixnet.commit.assert_called_once()
207 def test_add_device_group(self):
208 self.ixnet_gen.ixnet.add.return_value = 'obj'
209 self.ixnet_gen.add_device_group('topology', 'device group 1', '1')
210 self.ixnet_gen.ixnet.add.assert_called_once_with('topology',
212 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
213 'obj', '-name', 'device group 1', '-multiplier', '1')
214 self.ixnet_gen.ixnet.commit.assert_called_once()
216 def test_add_ethernet(self):
217 self.ixnet_gen.ixnet.add.return_value = 'obj'
218 self.ixnet_gen.add_ethernet('device_group', 'ethernet 1')
219 self.ixnet_gen.ixnet.add.assert_called_once_with('device_group',
221 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
222 'obj', '-name', 'ethernet 1')
223 self.ixnet_gen.ixnet.commit.assert_called_once()
225 def test_add_vlans_single(self):
227 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
228 self.ixnet_gen.ixnet.getList.return_value = ['vlan1', 'vlan2']
229 vlan1 = ixnet_api.Vlan(vlan_id=100, tp_id='ethertype88a8', prio=2)
230 vlan2 = ixnet_api.Vlan(vlan_id=101, tp_id='ethertype88a8', prio=3)
231 self.ixnet_gen.add_vlans(obj, [vlan1, vlan2])
232 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('ethernet',
234 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
236 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
238 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
240 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('attr/singleValue',
242 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
243 'attr/singleValue', '-value', 'ethertype88a8')
244 self.assertEqual(self.ixnet.commit.call_count, 2)
246 def test_add_vlans_increment(self):
248 self.ixnet_gen.ixnet.add.return_value = 'obj'
249 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
250 self.ixnet_gen.ixnet.getList.return_value = ['vlan1']
251 vlan = ixnet_api.Vlan(vlan_id=100, vlan_id_step=1, prio=3, prio_step=2)
252 self.ixnet_gen.add_vlans(obj, [vlan])
253 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 100,
255 '-direction', 'increment')
256 self.ixnet.setMultiAttribute.assert_any_call('obj', '-start', 3,
258 '-direction', 'increment')
260 self.assertEqual(self.ixnet.commit.call_count, 2)
262 def test_add_vlans_invalid(self):
264 self.assertRaises(RuntimeError, self.ixnet_gen.add_vlans, 'obj', vlans)
266 def test_add_ipv4(self):
267 self.ixnet_gen.ixnet.add.return_value = 'obj'
268 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1')
269 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
270 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
273 self.assertEqual(self.ixnet.commit.call_count, 2)
275 def test_add_ipv4_single(self):
276 self.ixnet_gen.ixnet.add.return_value = 'obj'
277 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
278 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1', addr='100.1.1.100',
279 prefix='24', gateway='100.1.1.200')
280 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1', 'ipv4')
281 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
284 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
285 'attr/singleValue', '-value', '100.1.1.100')
286 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
287 'attr/singleValue', '-value', '24')
288 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
289 'attr/singleValue', '-value', '100.1.1.200')
291 self.assertEqual(self.ixnet.commit.call_count, 2)
293 def test_add_ipv4_counter(self):
294 self.ixnet_gen.ixnet.add.return_value = 'obj'
295 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
296 self.ixnet_gen.add_ipv4('ethernet 1', name='ipv4 1',
299 addr_direction='increment',
301 gateway='100.1.1.200',
303 gw_direction='increment')
304 self.ixnet_gen.ixnet.add.assert_any_call('ethernet 1', 'ipv4')
305 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with('obj',
308 self.ixnet_gen.ixnet.add.assert_any_call('attr', 'counter')
309 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
314 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
315 'attr/singleValue', '-value', '24')
316 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call('obj', '-start',
321 self.assertEqual(self.ixnet.commit.call_count, 2)
323 def test_add_pppox_client(self):
324 self.ixnet_gen.ixnet.add.return_value = 'obj'
325 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
326 self.ixnet_gen.add_pppox_client('ethernet 1', 'pap', 'user', 'pwd')
327 self.ixnet_gen.ixnet.add.assert_called_once_with('ethernet 1',
330 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
331 'attr/singleValue', '-value', 'pap')
332 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
333 'attr/singleValue', '-value', 'user')
334 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
335 'attr/singleValue', '-value', 'pwd')
337 self.assertEqual(self.ixnet.commit.call_count, 2)
339 def test_add_pppox_client_invalid_auth(self):
340 self.ixnet_gen.ixnet.add.return_value = 'obj'
341 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
342 self.assertRaises(NotImplementedError, self.ixnet_gen.add_pppox_client,
343 'ethernet 1', 'invalid_auth', 'user', 'pwd')
345 self.ixnet_gen.ixnet.setMultiAttribute.assert_not_called()
347 def test_add_bgp(self):
348 self.ixnet_gen.ixnet.add.return_value = 'obj'
349 self.ixnet_gen.ixnet.getAttribute.return_value = 'attr'
350 self.ixnet_gen.add_bgp(ipv4='ipv4 1',
354 self.ixnet_gen.ixnet.add.assert_called_once_with('ipv4 1', 'bgpIpv4Peer')
355 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
356 'attr/singleValue', '-value', '10.0.0.1')
357 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
358 'attr/singleValue', '-value', 65000)
359 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
360 'attr/singleValue', '-value', 'external')
362 def test_add_interface(self):
363 self.ixnet_gen.ixnet.add.return_value = 'obj'
364 self.ixnet_gen.add_interface(vport='vport',
366 mac='00:00:00:00:00:00',
368 self.ixnet_gen.ixnet.add.assert_any_call('vport', 'interface')
369 self.ixnet_gen.ixnet.add.assert_any_call('obj', 'ipv4')
370 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
371 'obj/ethernet', '-macAddress', '00:00:00:00:00:00')
372 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
373 'obj', '-ip', '10.0.0.2')
374 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
375 'obj', '-gateway', '10.0.0.1')
376 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
377 'obj', '-enabled', 'true')
379 def test_add_static_ipv4(self):
380 self.ixnet_gen.ixnet.add.return_value = 'obj'
381 self.ixnet_gen.add_static_ipv4(iface='iface',
386 self.ixnet_gen.ixnet.add.assert_called_once_with(
387 'vport/protocols/static', 'ip')
388 self.ixnet_gen.ixnet.setMultiAttribute.assert_any_call(
389 'obj', '-protocolInterface', 'iface',
390 '-ipStart', '10.0.0.0',
395 @mock.patch.object(IxNetwork, 'IxNet')
396 def test_connect(self, mock_ixnet):
397 mock_ixnet.return_value = self.ixnet
398 with mock.patch.object(self.ixnet_gen, 'get_config') as mock_config:
399 mock_config.return_value = {'machine': 'machine_fake',
402 self.ixnet_gen.connect(mock.ANY)
404 self.ixnet.connect.assert_called_once_with(
405 'machine_fake', '-port', 'port_fake', '-version', '12345')
406 mock_config.assert_called_once()
408 def test_connect_invalid_config_no_machine(self):
409 self.ixnet_gen.get_config = mock.Mock(return_value={
412 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
413 self.ixnet.connect.assert_not_called()
415 def test_connect_invalid_config_no_port(self):
416 self.ixnet_gen.get_config = mock.Mock(return_value={
417 'machine': 'machine_fake',
419 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
420 self.ixnet.connect.assert_not_called()
422 def test_connect_invalid_config_no_version(self):
423 self.ixnet_gen.get_config = mock.Mock(return_value={
424 'machine': 'machine_fake',
425 'port': 'port_fake'})
426 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
427 self.ixnet.connect.assert_not_called()
429 def test_connect_no_config(self):
430 self.ixnet_gen.get_config = mock.Mock(return_value={})
431 self.assertRaises(KeyError, self.ixnet_gen.connect, mock.ANY)
432 self.ixnet.connect.assert_not_called()
434 def test_clear_config(self):
435 self.ixnet_gen.clear_config()
436 self.ixnet.execute.assert_called_once_with('newConfig')
438 @mock.patch.object(ixnet_api, 'log')
439 def test_assign_ports_2_ports(self, *args):
440 self.ixnet.getAttribute.side_effect = ['up', 'down']
442 'chassis': '1.1.1.1',
445 self.ixnet_gen._cfg = config
447 self.assertIsNone(self.ixnet_gen.assign_ports())
448 self.assertEqual(self.ixnet.execute.call_count, 1)
449 self.assertEqual(self.ixnet.commit.call_count, 3)
450 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
452 @mock.patch.object(ixnet_api, 'log')
453 def test_assign_ports_port_down(self, mock_log):
454 self.ixnet.getAttribute.return_value = 'down'
456 'chassis': '1.1.1.1',
459 self.ixnet_gen._cfg = config
460 self.ixnet_gen.assign_ports()
461 mock_log.warning.assert_called()
463 def test_assign_ports_no_config(self):
464 self.ixnet_gen._cfg = {}
465 self.assertRaises(KeyError, self.ixnet_gen.assign_ports)
467 def test__create_traffic_item(self):
468 self.ixnet.add.return_value = 'my_new_traffic_item'
469 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
471 self.ixnet_gen._create_traffic_item()
472 self.ixnet.add.assert_called_once_with(
473 'my_root/traffic', 'trafficItem')
474 self.ixnet.setMultiAttribute.assert_called_once_with(
475 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
476 self.assertEqual(2, self.ixnet.commit.call_count)
477 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
478 self.ixnet.setAttribute('my_traffic_item_id/tracking',
479 '-trackBy', 'trafficGroupId0')
481 def test__create_flow_groups(self):
482 uplink_endpoints = ['up_endp1', 'up_endp2']
483 downlink_endpoints = ['down_endp1', 'down_endp2']
484 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
485 self.ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2', 'endp3',
487 self.ixnet_gen._create_flow_groups(uplink_endpoints, downlink_endpoints)
488 self.ixnet_gen.ixnet.add.assert_has_calls([
489 mock.call('traffic_item', 'endpointSet'),
490 mock.call('traffic_item', 'endpointSet')])
491 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
492 mock.call('endp1', '-name', '1', '-sources', ['up_endp1'],
493 '-destinations', ['down_endp1']),
494 mock.call('endp2', '-name', '2', '-sources', ['down_endp1'],
495 '-destinations', ['up_endp1']),
496 mock.call('endp3', '-name', '3', '-sources', ['up_endp2'],
497 '-destinations', ['down_endp2']),
498 mock.call('endp4', '-name', '4', '-sources', ['down_endp2'],
499 '-destinations', ['up_endp2'])])
501 def test__append_protocol_to_stack(self):
503 self.ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
504 self.ixnet.execute.assert_called_with(
505 'append', 'prev_element',
506 'my_root/traffic/protocolTemplate:"my_protocol"')
508 def test__setup_config_elements(self):
509 self.ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
511 with mock.patch.object(self.ixnet_gen, '_append_procotol_to_stack') as \
513 self.ixnet_gen._setup_config_elements()
514 mock_append_proto.assert_has_calls([
515 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
516 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
517 self.ixnet_gen.ixnet.setAttribute.assert_has_calls([
518 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
520 mock.call('cfg_element/frameRateDistribution',
521 '-streamDistribution', 'splitRateEvenly')])
523 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
524 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
525 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
526 def test_create_traffic_model(self, mock__setup_config_elements,
527 mock__create_flow_groups,
528 mock__create_traffic_item):
529 uplink_ports = ['port1', 'port3']
530 downlink_ports = ['port2', 'port4']
531 uplink_endpoints = ['port1/protocols', 'port3/protocols']
532 downlink_endpoints = ['port2/protocols', 'port4/protocols']
533 self.ixnet_gen.create_traffic_model(uplink_ports, downlink_ports)
534 mock__create_traffic_item.assert_called_once_with('raw')
535 mock__create_flow_groups.assert_called_once_with(uplink_endpoints,
537 mock__setup_config_elements.assert_called_once()
539 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
540 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
541 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
542 def test_create_ipv4_traffic_model(self, mock__setup_config_elements,
543 mock__create_flow_groups,
544 mock__create_traffic_item):
545 uplink_topologies = ['up1', 'up3']
546 downlink_topologies = ['down2', 'down4']
547 self.ixnet_gen.create_ipv4_traffic_model(uplink_topologies,
549 mock__create_traffic_item.assert_called_once_with('ipv4')
550 mock__create_flow_groups.assert_called_once_with(uplink_topologies,
552 mock__setup_config_elements.assert_called_once_with(False)
554 def test__update_frame_mac(self):
555 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item') as \
557 mock_get_field.return_value = 'field_descriptor'
558 self.ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
559 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
560 self.ixnet_gen.ixnet.setMultiAttribute(
561 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
562 '-valueType', 'singleValue')
563 self.ixnet_gen.ixnet.commit.assert_called_once()
565 def test_update_frame(self):
566 with mock.patch.object(
567 self.ixnet_gen, '_get_config_element_by_flow_group_name',
568 return_value='cfg_element'), \
569 mock.patch.object(self.ixnet_gen, '_update_frame_mac') as \
571 mock.patch.object(self.ixnet_gen, '_get_stack_item') as \
573 mock_get_stack_item.side_effect = [['item1'], ['item2'],
574 ['item3'], ['item4']]
575 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 50)
577 self.assertEqual(6, len(self.ixnet_gen.ixnet.setMultiAttribute.mock_calls))
578 self.assertEqual(4, len(mock_update_frame.mock_calls))
580 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
581 mock.call('cfg_element/transmissionControl',
582 '-type', 'continuous', '-duration', 50)
585 def test_update_frame_qinq(self):
586 with mock.patch.object(self.ixnet_gen,
587 '_get_config_element_by_flow_group_name',
588 return_value='cfg_element'), \
589 mock.patch.object(self.ixnet_gen, '_update_frame_mac'),\
590 mock.patch.object(self.ixnet_gen, '_get_stack_item',
591 return_value='item'), \
592 mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
593 return_value='field'):
595 traffic_parameters = deepcopy(TRAFFIC_PARAMETERS)
596 traffic_parameters[UPLINK]['outer_l2']['QinQ'] = {
597 'S-VLAN': {'id': 128,
600 'C-VLAN': {'id': 512,
605 self.ixnet_gen.update_frame(traffic_parameters, 50)
607 self.ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
608 mock.call('field', '-auto', 'false', '-singleValue', '0x88a8',
609 '-fieldValue', '0x88a8', '-valueType', 'singleValue'),
610 mock.call('field', '-auto', 'false', '-singleValue', 1,
611 '-fieldValue', 1, '-valueType', 'singleValue'),
612 mock.call('field', '-auto', 'false', '-singleValue', 128,
613 '-fieldValue', 128, '-valueType', 'singleValue'),
614 mock.call('field', '-auto', 'false', '-singleValue', 512,
615 '-fieldValue', 512, '-valueType', 'singleValue'),
616 mock.call('field', '-auto', 'false', '-singleValue', 2,
617 '-fieldValue', 2, '-valueType', 'singleValue')
620 def test_update_frame_flow_not_present(self):
621 with mock.patch.object(
622 self.ixnet_gen, '_get_config_element_by_flow_group_name',
624 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
625 self.ixnet_gen.update_frame(TRAFFIC_PARAMETERS, 40)
627 def test_get_statistics(self):
628 with mock.patch.object(self.ixnet_gen, '_build_stats_map') as \
630 self.ixnet_gen.get_statistics()
632 mock_build_stats.assert_has_calls([
633 mock.call(self.ixnet_gen.PORT_STATISTICS,
634 self.ixnet_gen.PORT_STATS_NAME_MAP),
635 mock.call(self.ixnet_gen.FLOW_STATISTICS,
636 self.ixnet_gen.LATENCY_NAME_MAP)])
638 def test_set_flow_tracking(self):
639 self.ixnet_gen._ixnet.getList.return_value = ['traffic_item']
640 self.ixnet_gen.set_flow_tracking(track_by=['vlanVlanId0'])
641 self.ixnet_gen.ixnet.setAttribute.assert_called_once_with(
642 'traffic_item/tracking', '-trackBy', ['vlanVlanId0'])
643 self.assertEqual(self.ixnet.commit.call_count, 1)
645 def test__set_egress_flow_tracking(self):
646 self.ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
648 self.ixnet_gen._set_egress_flow_tracking(encapsulation='Ethernet',
649 offset='IPv4 TOS Precedence')
650 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
651 'traffic_item', '-egressEnabled', True)
652 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
653 'encapsulation', '-encapsulation', 'Ethernet')
654 self.ixnet_gen.ixnet.setAttribute.assert_any_call(
655 'encapsulation', '-offset', 'IPv4 TOS Precedence')
656 self.assertEqual(self.ixnet.commit.call_count, 2)
658 def test__get_view_page_stats(self):
660 {'header1': 'row1_1', 'header2': 'row1_2'},
661 {'header1': 'row2_1', 'header2': 'row2_2'}
663 self.ixnet_gen._ixnet.getAttribute.side_effect = [
664 ['header1', 'header2'],
666 [['row1_1', 'row1_2']],
667 [['row2_1', 'row2_2']]
670 stats = self.ixnet_gen._get_view_page_stats('view_obj')
671 self.assertListEqual(stats, expected_stats)
673 @mock.patch.object(ixnet_api.IxNextgen, '_get_view_page_stats')
674 def test_get_pppoe_scenario_statistics(self, mock_get_view):
676 pattern = re.compile('Flow 2')
679 'port_statistics': [{
680 'port_1': 'port_stat1',
681 'port_2': 'port_stat2'
684 'flow_1': 'flow_stat1',
685 'flow_2': 'flow_stat2'
687 'pppox_client_per_port': [{
688 'sub_1': 'sub_stat1',
693 pppoe_scenario_stats = OrderedDict([
694 ('port_statistics', 'view_obj'),
695 ('flow_statistic', 'view_obj'),
696 ('pppox_client_per_port', 'view_obj')
699 pppoe_scenario_stats_map = {
700 'port_statistics': {'port_1': 'Port 1',
702 'flow_statistic': {'flow_1': 'Flow 1',
704 'pppox_client_per_port': {'sub_1': 'Sub 1',
709 port_stats = [{'Port 1': 'port_stat1',
710 'Port 2': 'port_stat2',
711 'Port 3': 'port_stat3'}]
712 flows_stats = [{'Flow 1': 'flow_stat1',
713 'Flow 2': 'flow_stat2',
714 'Flow 3': 'flow_stat3'}]
715 pppoe_sub_stats = [{'Sub 1': 'sub_stat1',
716 'Sub 2': 'sub_stat2',
717 'Sub 3': 'sub_stat3'}]
719 mock_get_view.side_effect = [port_stats, flows_stats, pppoe_sub_stats]
720 self.ixnet_gen._ixnet.getAttribute.return_value = '1'
722 with mock.patch.multiple(ixnet_api.IxNextgen,
723 PPPOE_SCENARIO_STATS=pppoe_scenario_stats,
724 PPPOE_SCENARIO_STATS_MAP=pppoe_scenario_stats_map):
725 stats = self.ixnet_gen.get_pppoe_scenario_statistics()
726 self.assertDictEqual(stats, expected_stats)
727 self.assertEqual(self.ixnet_gen.ixnet.getAttribute.call_count, 6)
728 self.ixnet_gen.ixnet.setAttribute.assert_not_called()
730 def test__update_ipv4_address(self):
731 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
732 return_value='field_desc'):
733 self.ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
735 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
736 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
737 '-randomMask', '0.0.0.63', '-valueType', 'random',
740 def test__update_ipv4_priority_raw(self):
741 priority = {'raw': '0x01'}
742 self.ixnet_gen._set_priority_field = mock.Mock()
743 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
744 return_value='field_desc'):
745 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
747 self.ixnet_gen._set_priority_field.assert_called_once_with(
748 'field_desc', priority['raw'])
750 def test__update_ipv4_priority_dscp(self):
751 priority = {'dscp': {'defaultPHB': [0, 1, 2, 3]}}
752 self.ixnet_gen._set_priority_field = mock.Mock()
753 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
754 return_value='field_desc'):
755 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
757 self.ixnet_gen._set_priority_field.assert_called_once_with(
758 'field_desc', priority['dscp']['defaultPHB'])
760 def test__update_ipv4_priority_tos(self):
761 priority = {'tos': {'precedence': [0, 4, 7]}}
762 self.ixnet_gen._set_priority_field = mock.Mock()
763 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
764 return_value='field_desc'):
765 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
767 self.ixnet_gen._set_priority_field.assert_called_once_with(
768 'field_desc', priority['tos']['precedence'])
770 def test__update_ipv4_priority_wrong_priority_type(self):
771 priority = {'test': [0, 4, 7]}
772 self.ixnet_gen._set_priority_field = mock.Mock()
773 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
774 return_value='field_desc'):
775 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
777 self.ixnet_gen._set_priority_field.assert_not_called()
779 def test__update_ipv4_priority_not_supported_dscp_class(self):
780 priority = {'dscp': {'testPHB': [0, 4, 7]}}
781 self.ixnet_gen._set_priority_field = mock.Mock()
782 self.ixnet_gen._get_field_in_stack_item = mock.Mock()
783 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
784 self.ixnet_gen._set_priority_field.assert_not_called()
785 self.ixnet_gen._get_field_in_stack_item.assert_not_called()
787 def test__update_ipv4_priority_not_supported_tos_field(self):
788 priority = {'tos': {'test': [0, 4, 7]}}
789 self.ixnet_gen._set_priority_field = mock.Mock()
790 self.ixnet_gen._get_field_in_stack_item = mock.Mock()
791 self.ixnet_gen._update_ipv4_priority('field_desc', priority)
792 self.ixnet_gen._set_priority_field.assert_not_called()
793 self.ixnet_gen._get_field_in_stack_item.assert_not_called()
795 def test__set_priority_field_list_value(self):
797 self.ixnet_gen._set_priority_field('field_desc', value)
798 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
800 '-valueList', [1, 4, 7],
801 '-activeFieldChoice', 'true',
802 '-valueType', 'valueList')
804 def test__set_priority_field_single_value(self):
806 self.ixnet_gen._set_priority_field('field_desc', value)
807 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
809 '-activeFieldChoice', 'true',
812 def test__update_udp_port(self):
813 with mock.patch.object(self.ixnet_gen, '_get_field_in_stack_item',
814 return_value='field_desc'):
815 self.ixnet_gen._update_udp_port(mock.ANY, mock.ANY, 1234,
818 self.ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
824 '-valueType', 'random',
827 def test_update_ip_packet(self):
828 with mock.patch.object(self.ixnet_gen, '_update_ipv4_address') as \
830 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
831 mock.patch.object(self.ixnet_gen,
832 '_get_config_element_by_flow_group_name', return_value='celm'), \
833 mock.patch.object(self.ixnet_gen, '_update_ipv4_priority') as \
834 mock_update_priority:
835 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
837 self.assertEqual(4, len(mock_update_add.mock_calls))
838 self.assertEqual(2, len(mock_update_priority.mock_calls))
840 def test_update_ip_packet_exception_no_config_element(self):
841 with mock.patch.object(self.ixnet_gen,
842 '_get_config_element_by_flow_group_name',
844 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
845 self.ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
847 def test_update_l4(self):
848 with mock.patch.object(self.ixnet_gen, '_update_udp_port') as \
850 mock.patch.object(self.ixnet_gen, '_get_stack_item'), \
851 mock.patch.object(self.ixnet_gen,
852 '_get_config_element_by_flow_group_name', return_value='celm'):
853 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
855 self.assertEqual(4, len(mock_update_udp.mock_calls))
857 def test_update_l4_exception_no_config_element(self):
858 with mock.patch.object(self.ixnet_gen,
859 '_get_config_element_by_flow_group_name',
861 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
862 self.ixnet_gen.update_l4(TRAFFIC_PARAMETERS)
864 def test_update_l4_exception_no_supported_proto(self):
865 traffic_parameters = {
869 'proto': 'unsupported',
876 with mock.patch.object(self.ixnet_gen,
877 '_get_config_element_by_flow_group_name',
878 return_value='celm'):
879 with self.assertRaises(exceptions.IXIAUnsupportedProtocol):
880 self.ixnet_gen.update_l4(traffic_parameters)
882 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
883 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
884 self.ixnet_gen._ixnet.getList.return_value = [0]
886 mock_ixnextgen_get_traffic_state.side_effect = [
887 'stopped', 'started', 'started', 'started']
889 result = self.ixnet_gen.start_traffic()
890 self.assertIsNone(result)
891 self.ixnet.getList.assert_called_once()
892 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
894 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
895 def test_start_traffic_traffic_running(
896 self, mock_ixnextgen_get_traffic_state):
897 self.ixnet_gen._ixnet.getList.return_value = [0]
898 mock_ixnextgen_get_traffic_state.side_effect = [
899 'started', 'stopped', 'started']
901 result = self.ixnet_gen.start_traffic()
902 self.assertIsNone(result)
903 self.ixnet.getList.assert_called_once()
904 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
906 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
907 def test_start_traffic_wait_for_traffic_to_stop(
908 self, mock_ixnextgen_get_traffic_state):
909 self.ixnet_gen._ixnet.getList.return_value = [0]
910 mock_ixnextgen_get_traffic_state.side_effect = [
911 'started', 'started', 'started', 'stopped', 'started']
913 result = self.ixnet_gen.start_traffic()
914 self.assertIsNone(result)
915 self.ixnet.getList.assert_called_once()
916 self.assertEqual(4, self.ixnet_gen._ixnet.execute.call_count)
918 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
919 def test_start_traffic_wait_for_traffic_start(
920 self, mock_ixnextgen_get_traffic_state):
921 self.ixnet_gen._ixnet.getList.return_value = [0]
922 mock_ixnextgen_get_traffic_state.side_effect = [
923 'stopped', 'stopped', 'stopped', 'started']
925 result = self.ixnet_gen.start_traffic()
926 self.assertIsNone(result)
927 self.ixnet.getList.assert_called_once()
928 self.assertEqual(3, self.ixnet_gen._ixnet.execute.call_count)
930 def test__get_protocol_status(self):
931 self.ixnet.getAttribute.return_value = ['up']
932 self.ixnet_gen._get_protocol_status('ipv4')
933 self.ixnet.getAttribute.assert_called_once_with('ipv4',
936 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
937 def test_is_protocols_running(self, mock_ixnextgen_get_protocol_status):
938 mock_ixnextgen_get_protocol_status.return_value = ['up', 'up']
939 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
940 self.assertTrue(result)
942 @mock.patch.object(ixnet_api.IxNextgen, '_get_protocol_status')
943 def test_is_protocols_stopped(self, mock_ixnextgen_get_protocol_status):
944 mock_ixnextgen_get_protocol_status.return_value = ['down', 'down']
945 result = self.ixnet_gen.is_protocols_running(['ethernet', 'ipv4'])
946 self.assertFalse(result)
948 def test_start_protocols(self):
949 self.ixnet_gen.start_protocols()
950 self.ixnet.execute.assert_called_once_with('startAllProtocols')
952 def test_stop_protocols(self):
953 self.ixnet_gen.stop_protocols()
954 self.ixnet.execute.assert_called_once_with('stopAllProtocols')
956 def test_get_vports(self):
957 self.ixnet_gen._ixnet.getRoot.return_value = 'root'
958 self.ixnet_gen.get_vports()
959 self.ixnet.getList.assert_called_once_with('root', 'vport')