1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from yardstick.common import exceptions
20 from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
26 TRAFFIC_PARAMETERS = {
34 'framesize': {'64B': '25', '256B': '75'}
41 'dstip': '152.16.40.20',
42 'srcip': '152.16.100.20',
50 'traffic_type': 'continuous'
59 'framesize': {'128B': '35', '1024B': '65'}
75 'traffic_type': 'continuous'
80 class TestIxNextgen(unittest.TestCase):
83 self.ixnet = mock.Mock()
84 self.ixnet.execute = mock.Mock()
85 self.ixnet.getRoot.return_value = 'my_root'
87 def test_get_config(self):
91 'external-interface': [
92 {'virtual-interface': {'vpci': '0000:07:00.1'}},
93 {'virtual-interface': {'vpci': '0001:08:01.2'}}
100 'dut_result_dir': 'test2',
102 'ixchassis': 'test4',
112 'cards': ['0000', '0001'],
113 'ports': ['07', '08'],
114 'output_dir': 'test2',
119 result = ixnet_api.IxNextgen.get_config(tg_cfg)
120 self.assertEqual(result, expected)
122 def test__get_config_element_by_flow_group_name(self):
123 ixnet_gen = ixnet_api.IxNextgen()
124 ixnet_gen._ixnet = self.ixnet
125 ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
127 ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_01'
128 output = ixnet_gen._get_config_element_by_flow_group_name(
130 self.assertEqual('traffic_item/configElement:flow_group_01', output)
132 def test__get_config_element_by_flow_group_name_no_match(self):
133 ixnet_gen = ixnet_api.IxNextgen()
134 ixnet_gen._ixnet = self.ixnet
135 ixnet_gen._ixnet.getList.side_effect = [['traffic_item'],
137 ixnet_gen._ixnet.getAttribute.return_value = 'flow_group_02'
138 output = ixnet_gen._get_config_element_by_flow_group_name(
140 self.assertIsNone(output)
142 def test__get_stack_item(self):
143 ixnet_gen = ixnet_api.IxNextgen()
144 ixnet_gen._ixnet = self.ixnet
145 ixnet_gen._ixnet.getList.return_value = ['tcp1', 'tcp2', 'udp']
146 with mock.patch.object(
147 ixnet_gen, '_get_config_element_by_flow_group_name') as \
148 mock_get_cfg_element:
149 mock_get_cfg_element.return_value = 'cfg_element'
150 output = ixnet_gen._get_stack_item(mock.ANY, ixnet_api.PROTO_TCP)
151 self.assertEqual(['tcp1', 'tcp2'], output)
153 def test__get_stack_item_no_config_element(self):
154 ixnet_gen = ixnet_api.IxNextgen()
155 ixnet_gen._ixnet = self.ixnet
156 with mock.patch.object(
157 ixnet_gen, '_get_config_element_by_flow_group_name',
159 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
160 ixnet_gen._get_stack_item(mock.ANY, mock.ANY)
162 def test__get_field_in_stack_item(self):
163 ixnet_gen = ixnet_api.IxNextgen()
164 ixnet_gen._ixnet = self.ixnet
165 ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
166 output = ixnet_gen._get_field_in_stack_item(mock.ANY, 'field2')
167 self.assertEqual('field2', output)
169 def test__get_field_in_stack_item_no_field_present(self):
170 ixnet_gen = ixnet_api.IxNextgen()
171 ixnet_gen._ixnet = self.ixnet
172 ixnet_gen._ixnet.getList.return_value = ['field1', 'field2']
173 with self.assertRaises(exceptions.IxNetworkFieldNotPresentInStackItem):
174 ixnet_gen._get_field_in_stack_item(mock.ANY, 'field3')
176 def test__parse_framesize(self):
177 ixnet_gen = ixnet_api.IxNextgen()
178 ixnet_gen._ixnet = self.ixnet
179 framesize = {'64B': '75', '512b': '25'}
180 output = ixnet_gen._parse_framesize(framesize)
181 self.assertEqual(2, len(output))
182 self.assertIn([64, 64, 75], output)
183 self.assertIn([512, 512, 25], output)
185 @mock.patch.object(IxNetwork, 'IxNet')
186 def test_connect(self, mock_ixnet):
187 mock_ixnet.return_value = self.ixnet
188 ixnet_gen = ixnet_api.IxNextgen()
189 with mock.patch.object(ixnet_gen, 'get_config') as mock_config:
190 mock_config.return_value = {'machine': 'machine_fake',
193 ixnet_gen.connect(mock.ANY)
195 self.ixnet.connect.assert_called_once_with(
196 'machine_fake', '-port', 'port_fake', '-version', '12345')
197 mock_config.assert_called_once()
199 def test_connect_invalid_config_no_machine(self):
200 ixnet_gen = ixnet_api.IxNextgen()
201 ixnet_gen._ixnet = self.ixnet
202 ixnet_gen.get_config = mock.Mock(return_value={
205 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
206 self.ixnet.connect.assert_not_called()
208 def test_connect_invalid_config_no_port(self):
209 ixnet_gen = ixnet_api.IxNextgen()
210 ixnet_gen._ixnet = self.ixnet
211 ixnet_gen.get_config = mock.Mock(return_value={
212 'machine': 'machine_fake',
214 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
215 self.ixnet.connect.assert_not_called()
217 def test_connect_invalid_config_no_version(self):
218 ixnet_gen = ixnet_api.IxNextgen()
219 ixnet_gen._ixnet = self.ixnet
220 ixnet_gen.get_config = mock.Mock(return_value={
221 'machine': 'machine_fake',
222 'port': 'port_fake'})
223 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
224 self.ixnet.connect.assert_not_called()
226 def test_connect_no_config(self):
227 ixnet_gen = ixnet_api.IxNextgen()
228 ixnet_gen._ixnet = self.ixnet
229 ixnet_gen.get_config = mock.Mock(return_value={})
230 self.assertRaises(KeyError, ixnet_gen.connect, mock.ANY)
231 self.ixnet.connect.assert_not_called()
233 def test_clear_config(self):
234 ixnet_gen = ixnet_api.IxNextgen()
235 ixnet_gen._ixnet = self.ixnet
236 ixnet_gen.clear_config()
237 self.ixnet.execute.assert_called_once_with('newConfig')
239 @mock.patch.object(ixnet_api, 'log')
240 def test_assign_ports_2_ports(self, *args):
241 self.ixnet.getAttribute.side_effect = ['up', 'down']
243 'chassis': '1.1.1.1',
246 ixnet_gen = ixnet_api.IxNextgen()
247 ixnet_gen._ixnet = self.ixnet
248 ixnet_gen._cfg = config
250 self.assertIsNone(ixnet_gen.assign_ports())
251 self.assertEqual(self.ixnet.execute.call_count, 2)
252 self.assertEqual(self.ixnet.commit.call_count, 4)
253 self.assertEqual(self.ixnet.getAttribute.call_count, 2)
255 @mock.patch.object(ixnet_api, 'log')
256 def test_assign_ports_port_down(self, mock_log):
257 self.ixnet.getAttribute.return_value = 'down'
259 'chassis': '1.1.1.1',
262 ixnet_gen = ixnet_api.IxNextgen()
263 ixnet_gen._ixnet = self.ixnet
264 ixnet_gen._cfg = config
265 ixnet_gen.assign_ports()
266 mock_log.warning.assert_called()
268 def test_assign_ports_no_config(self):
269 ixnet_gen = ixnet_api.IxNextgen()
270 ixnet_gen._ixnet = self.ixnet
272 self.assertRaises(KeyError, ixnet_gen.assign_ports)
274 def test__create_traffic_item(self):
275 ixnet_gen = ixnet_api.IxNextgen()
276 ixnet_gen._ixnet = self.ixnet
277 self.ixnet.add.return_value = 'my_new_traffic_item'
278 self.ixnet.remapIds.return_value = ['my_traffic_item_id']
280 ixnet_gen._create_traffic_item()
281 self.ixnet.add.assert_called_once_with(
282 'my_root/traffic', 'trafficItem')
283 self.ixnet.setMultiAttribute.assert_called_once_with(
284 'my_new_traffic_item', '-name', 'RFC2544', '-trafficType', 'raw')
285 self.assertEqual(2, self.ixnet.commit.call_count)
286 self.ixnet.remapIds.assert_called_once_with('my_new_traffic_item')
287 self.ixnet.setAttribute('my_traffic_item_id/tracking',
288 '-trackBy', 'trafficGroupId0')
290 def test__create_flow_groups(self):
291 ixnet_gen = ixnet_api.IxNextgen()
292 ixnet_gen._ixnet = self.ixnet
293 ixnet_gen.ixnet.getList.side_effect = [['traffic_item'], ['1', '2']]
294 ixnet_gen.ixnet.add.side_effect = ['endp1', 'endp2']
295 ixnet_gen._create_flow_groups()
296 ixnet_gen.ixnet.add.assert_has_calls([
297 mock.call('traffic_item', 'endpointSet'),
298 mock.call('traffic_item', 'endpointSet')])
299 ixnet_gen.ixnet.setMultiAttribute.assert_has_calls([
300 mock.call('endp1', '-name', '1', '-sources', ['1/protocols'],
301 '-destinations', ['2/protocols']),
302 mock.call('endp2', '-name', '2', '-sources', ['2/protocols'],
303 '-destinations', ['1/protocols'])])
305 def test__append_protocol_to_stack(self):
306 ixnet_gen = ixnet_api.IxNextgen()
307 ixnet_gen._ixnet = self.ixnet
309 ixnet_gen._append_procotol_to_stack('my_protocol', 'prev_element')
310 self.ixnet.execute.assert_called_with(
311 'append', 'prev_element',
312 'my_root/traffic/protocolTemplate:"my_protocol"')
314 def test__setup_config_elements(self):
315 ixnet_gen = ixnet_api.IxNextgen()
316 ixnet_gen._ixnet = self.ixnet
317 ixnet_gen.ixnet.getList.side_effect = [['traffic_item'],
319 with mock.patch.object(ixnet_gen, '_append_procotol_to_stack') as \
321 ixnet_gen._setup_config_elements()
322 mock_append_proto.assert_has_calls([
323 mock.call(ixnet_api.PROTO_UDP, 'cfg_element/stack:"ethernet-1"'),
324 mock.call(ixnet_api.PROTO_IPV4, 'cfg_element/stack:"ethernet-1"')])
325 ixnet_gen.ixnet.setAttribute.assert_has_calls([
326 mock.call('cfg_element/frameRateDistribution', '-portDistribution',
328 mock.call('cfg_element/frameRateDistribution',
329 '-streamDistribution', 'splitRateEvenly')])
331 @mock.patch.object(ixnet_api.IxNextgen, '_create_traffic_item')
332 @mock.patch.object(ixnet_api.IxNextgen, '_create_flow_groups')
333 @mock.patch.object(ixnet_api.IxNextgen, '_setup_config_elements')
334 def test_create_traffic_model(self, mock__setup_config_elements,
335 mock__create_flow_groups,
336 mock__create_traffic_item):
337 ixnet_gen = ixnet_api.IxNextgen()
338 ixnet_gen._ixnet = self.ixnet
340 ixnet_gen.create_traffic_model()
341 mock__create_traffic_item.assert_called_once()
342 mock__create_flow_groups.assert_called_once()
343 mock__setup_config_elements.assert_called_once()
345 def test__update_frame_mac(self):
346 ixnet_gen = ixnet_api.IxNextgen()
347 ixnet_gen._ixnet = self.ixnet
348 with mock.patch.object(ixnet_gen, '_get_field_in_stack_item') as \
350 mock_get_field.return_value = 'field_descriptor'
351 ixnet_gen._update_frame_mac('ethernet_descriptor', 'field', 'mac')
352 mock_get_field.assert_called_once_with('ethernet_descriptor', 'field')
353 ixnet_gen.ixnet.setMultiAttribute(
354 'field_descriptor', '-singleValue', 'mac', '-fieldValue', 'mac',
355 '-valueType', 'singleValue')
356 ixnet_gen.ixnet.commit.assert_called_once()
358 def test_update_frame(self):
359 ixnet_gen = ixnet_api.IxNextgen()
360 ixnet_gen._ixnet = self.ixnet
361 with mock.patch.object(
362 ixnet_gen, '_get_config_element_by_flow_group_name',
363 return_value='cfg_element'), \
364 mock.patch.object(ixnet_gen, '_update_frame_mac') as \
366 mock.patch.object(ixnet_gen, '_get_stack_item') as \
368 mock_get_stack_item.side_effect = [['item1'], ['item2'],
369 ['item3'], ['item4']]
370 ixnet_gen.update_frame(TRAFFIC_PARAMETERS)
372 self.assertEqual(6, len(ixnet_gen.ixnet.setMultiAttribute.mock_calls))
373 self.assertEqual(4, len(mock_update_frame.mock_calls))
374 ixnet_gen.ixnet.setMultiAttribute.assert_has_calls(
375 [mock.call('cfg_element/frameRate', '-rate', 10000.5,
376 '-type', 'framesPerSecond'),
377 mock.call('cfg_element/frameRate', '-rate', 75.2, '-type',
381 def test_update_frame_flow_not_present(self):
382 ixnet_gen = ixnet_api.IxNextgen()
383 ixnet_gen._ixnet = self.ixnet
384 with mock.patch.object(
385 ixnet_gen, '_get_config_element_by_flow_group_name',
387 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
388 ixnet_gen.update_frame(TRAFFIC_PARAMETERS)
390 def test_get_statistics(self):
391 ixnet_gen = ixnet_api.IxNextgen()
392 port_statistics = '::ixNet::OBJ-/statistics/view:"Port Statistics"'
393 flow_statistics = '::ixNet::OBJ-/statistics/view:"Flow Statistics"'
394 with mock.patch.object(ixnet_gen, '_build_stats_map') as \
396 ixnet_gen.get_statistics()
398 mock_build_stats.assert_has_calls([
399 mock.call(port_statistics, ixnet_gen.PORT_STATS_NAME_MAP),
400 mock.call(flow_statistics, ixnet_gen.LATENCY_NAME_MAP)])
402 def test__update_ipv4_address(self):
403 ixnet_gen = ixnet_api.IxNextgen()
404 ixnet_gen._ixnet = self.ixnet
405 with mock.patch.object(ixnet_gen, '_get_field_in_stack_item',
406 return_value='field_desc'):
407 ixnet_gen._update_ipv4_address(mock.ANY, mock.ANY, '192.168.1.1',
409 ixnet_gen.ixnet.setMultiAttribute.assert_called_once_with(
410 'field_desc', '-seed', 100, '-fixedBits', '192.168.1.1',
411 '-randomMask', '0.0.0.63', '-valueType', 'random',
414 def test_update_ip_packet(self):
415 ixnet_gen = ixnet_api.IxNextgen()
416 ixnet_gen._ixnet = self.ixnet
417 with mock.patch.object(ixnet_gen, '_update_ipv4_address') as \
419 mock.patch.object(ixnet_gen, '_get_stack_item'), \
420 mock.patch.object(ixnet_gen,
421 '_get_config_element_by_flow_group_name', return_value='celm'):
422 ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
424 self.assertEqual(4, len(mock_update_add.mock_calls))
426 def test_update_ip_packet_exception_no_config_element(self):
427 ixnet_gen = ixnet_api.IxNextgen()
428 ixnet_gen._ixnet = self.ixnet
429 with mock.patch.object(ixnet_gen,
430 '_get_config_element_by_flow_group_name',
432 with self.assertRaises(exceptions.IxNetworkFlowNotPresent):
433 ixnet_gen.update_ip_packet(TRAFFIC_PARAMETERS)
435 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
436 def test_start_traffic(self, mock_ixnextgen_get_traffic_state):
437 ixnet_gen = ixnet_api.IxNextgen()
438 ixnet_gen._ixnet = self.ixnet
439 ixnet_gen._ixnet.getList.return_value = [0]
441 mock_ixnextgen_get_traffic_state.side_effect = [
442 'stopped', 'started', 'started', 'started']
444 result = ixnet_gen.start_traffic()
445 self.assertIsNone(result)
446 self.ixnet.getList.assert_called_once()
447 self.assertEqual(3, ixnet_gen._ixnet.execute.call_count)
449 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
450 def test_start_traffic_traffic_running(
451 self, mock_ixnextgen_get_traffic_state):
452 ixnet_gen = ixnet_api.IxNextgen()
453 ixnet_gen._ixnet = self.ixnet
454 ixnet_gen._ixnet.getList.return_value = [0]
455 mock_ixnextgen_get_traffic_state.side_effect = [
456 'started', 'stopped', 'started']
458 result = ixnet_gen.start_traffic()
459 self.assertIsNone(result)
460 self.ixnet.getList.assert_called_once()
461 self.assertEqual(4, ixnet_gen._ixnet.execute.call_count)
463 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
464 def test_start_traffic_wait_for_traffic_to_stop(
465 self, mock_ixnextgen_get_traffic_state):
466 ixnet_gen = ixnet_api.IxNextgen()
467 ixnet_gen._ixnet = self.ixnet
468 ixnet_gen._ixnet.getList.return_value = [0]
469 mock_ixnextgen_get_traffic_state.side_effect = [
470 'started', 'started', 'started', 'stopped', 'started']
472 result = ixnet_gen.start_traffic()
473 self.assertIsNone(result)
474 self.ixnet.getList.assert_called_once()
475 self.assertEqual(4, ixnet_gen._ixnet.execute.call_count)
477 @mock.patch.object(ixnet_api.IxNextgen, '_get_traffic_state')
478 def test_start_traffic_wait_for_traffic_start(
479 self, mock_ixnextgen_get_traffic_state):
480 ixnet_gen = ixnet_api.IxNextgen()
481 ixnet_gen._ixnet = self.ixnet
482 ixnet_gen._ixnet.getList.return_value = [0]
483 mock_ixnextgen_get_traffic_state.side_effect = [
484 'stopped', 'stopped', 'stopped', 'started']
486 result = ixnet_gen.start_traffic()
487 self.assertIsNone(result)
488 self.ixnet.getList.assert_called_once()
489 self.assertEqual(3, ixnet_gen._ixnet.execute.call_count)