3 # Copyright (c) 2016-2017 Intel Corporation
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # Unittest for yardstick.network_services.libs.ixia_libs.IxNet
20 from __future__ import absolute_import
24 from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IxNextgen
25 from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IP_VERSION_4
26 from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IP_VERSION_6
29 class TestIxNextgen(unittest.TestCase):
31 def test___init__(self):
32 ixnet_gen = IxNextgen()
33 self.assertIsNone(ixnet_gen._bidir)
35 @mock.patch("yardstick.network_services.libs.ixia_libs.IxNet.IxNet.IxNetwork")
36 @mock.patch("yardstick.network_services.libs.ixia_libs.IxNet.IxNet.sys")
37 def test_connect(self, mock_sys, mock_ix_network):
38 mock_ix_network.IxNet.return_value = mock_ixnet = mock.Mock()
40 ixnet_gen = IxNextgen()
41 ixnet_gen.get_config = mock.MagicMock()
42 ixnet_gen.get_ixnet = mock.MagicMock()
44 result = ixnet_gen._connect({"py_lib_path": "/tmp"})
45 self.assertIsNotNone(result)
46 self.assertEqual(mock_ix_network.IxNet.call_count, 1)
47 self.assertEqual(mock_ixnet.connect.call_count, 1)
49 def test_clear_ixia_config(self):
50 ixnet = mock.MagicMock()
51 ixnet.execute = mock.Mock()
53 ixnet_gen = IxNextgen(ixnet)
55 result = ixnet_gen.clear_ixia_config()
56 self.assertIsNone(result)
57 self.assertEqual(ixnet.execute.call_count, 1)
59 def test_load_ixia_profile(self):
60 ixnet = mock.MagicMock()
61 ixnet.execute = mock.Mock()
63 ixnet_gen = IxNextgen(ixnet)
65 result = ixnet_gen.load_ixia_profile({})
66 self.assertIsNone(result)
67 self.assertEqual(ixnet.execute.call_count, 1)
69 def test_load_ixia_config(self):
70 ixnet = mock.MagicMock()
71 ixnet.execute = mock.Mock()
73 ixnet_gen = IxNextgen(ixnet)
75 result = ixnet_gen.ix_load_config({})
76 self.assertIsNone(result)
77 self.assertEqual(ixnet.execute.call_count, 2)
79 @mock.patch('yardstick.network_services.libs.ixia_libs.IxNet.IxNet.log')
80 def test_ix_assign_ports(self, mock_logger):
81 ixnet = mock.MagicMock()
82 ixnet.getList.return_value = [0, 1]
83 ixnet.getAttribute.side_effect = ['up', 'down']
93 ixnet_gen = IxNextgen(ixnet)
94 ixnet_gen._cfg = config
96 result = ixnet_gen.ix_assign_ports()
97 self.assertIsNone(result)
98 self.assertEqual(ixnet.execute.call_count, 1)
99 self.assertEqual(ixnet.commit.call_count, 1)
100 self.assertEqual(ixnet.getAttribute.call_count, 2)
101 self.assertEqual(mock_logger.error.call_count, 1)
103 def test_ix_update_frame(self):
104 static_traffic_params = {
111 "dstmac": "00:00:00:00:00:03",
112 "framesPerSecond": True,
117 "srcmac": "00:00:00:00:00:01"
121 "dstip4": "152.16.40.20",
123 "srcip4": "152.16.100.20",
128 "dstip4": "152.16.40.20",
130 "srcip4": "152.16.100.20",
136 "dstip4": "152.16.100.20",
138 "srcip4": "152.16.40.20",
145 "traffic_type": "continuous"
153 "dstmac": "00:00:00:00:00:04",
154 "framesPerSecond": False,
155 "framesize": {"64B": "100"},
156 "srcmac": "00:00:00:00:00:01"
161 "dstip4": "152.16.100.20",
163 "srcip4": "152.16.40.20",
169 "dstip4": "152.16.100.20",
171 "srcip4": "152.16.40.20",
177 "dstip4": "152.16.100.20",
179 "srcip4": "152.16.40.20",
186 "traffic_type": "continuous"
190 ixnet = mock.MagicMock()
191 ixnet.remapIds.return_value = ["0"]
192 ixnet.setMultiAttribute.return_value = [1]
193 ixnet.commit.return_value = [1]
194 ixnet.getList.side_effect = [
199 "ethernet.header.destinationAddress",
200 "ethernet.header.sourceAddress",
204 ixnet_gen = IxNextgen(ixnet)
206 result = ixnet_gen.ix_update_frame(static_traffic_params)
207 self.assertIsNone(result)
208 self.assertEqual(ixnet.setMultiAttribute.call_count, 7)
209 self.assertEqual(ixnet.commit.call_count, 2)
211 def test_ix_update_udp(self):
212 ixnet = mock.MagicMock()
214 ixnet_gen = IxNextgen(ixnet)
216 result = ixnet_gen.ix_update_udp({})
217 self.assertIsNone(result)
219 def test_ix_update_tcp(self):
220 ixnet = mock.MagicMock()
221 ixnet_gen = IxNextgen(ixnet)
223 result = ixnet_gen.ix_update_tcp({})
224 self.assertIsNone(result)
226 def test_ix_start_traffic(self):
227 ixnet = mock.MagicMock()
228 ixnet.getList.return_value = [0]
229 ixnet.getAttribute.return_value = 'down'
231 ixnet_gen = IxNextgen(ixnet)
233 result = ixnet_gen.ix_start_traffic()
234 self.assertIsNone(result)
235 self.assertEqual(ixnet.getList.call_count, 1)
236 self.assertEqual(ixnet.execute.call_count, 3)
238 def test_ix_stop_traffic(self):
239 ixnet = mock.MagicMock()
240 ixnet.getList.return_value = [0]
242 ixnet_gen = IxNextgen(ixnet)
244 result = ixnet_gen.ix_stop_traffic()
245 self.assertIsNone(result)
246 self.assertEqual(ixnet.getList.call_count, 1)
247 self.assertEqual(ixnet.execute.call_count, 1)
249 def test_ix_get_statistics(self):
250 ixnet = mock.MagicMock()
251 ixnet.execute.return_value = ""
252 ixnet.getList.side_effect = [
254 '::ixNet::OBJ-/statistics/view:"Traffic Item Statistics"',
255 '::ixNet::OBJ-/statistics/view:"Port Statistics"',
258 '::ixNet::OBJ-/statistics/view:"Flow Statistics"',
262 ixnet_gen = IxNextgen(ixnet)
264 result = ixnet_gen.ix_get_statistics()
265 self.assertIsNotNone(result)
266 self.assertEqual(ixnet.getList.call_count, 1)
267 self.assertEqual(ixnet.execute.call_count, 20)
269 def test_find_view_obj_no_where(self):
270 views = ['here', 'there', 'everywhere']
271 result = IxNextgen.find_view_obj('no_where', views)
272 self.assertEqual(result, '')
274 def test_add_ip_header_v4(self):
275 static_traffic_params = {
282 "dstmac": "00:00:00:00:00:03",
283 "framesPerSecond": True,
284 "framesize": {"64B": "100"},
285 "srcmac": "00:00:00:00:00:01"
289 "dstip4": "152.16.40.20",
291 "srcip4": "152.16.100.20",
297 "dstip4": "152.16.40.20",
299 "srcip4": "152.16.100.20",
305 "dstip4": "152.16.100.20",
307 "srcip4": "152.16.40.20",
314 "traffic_type": "continuous"
322 "dstmac": "00:00:00:00:00:04",
323 "framesPerSecond": True,
324 "framesize": {"64B": "100"},
325 "srcmac": "00:00:00:00:00:01"
330 "dstip4": "152.16.100.20",
332 "srcip4": "152.16.40.20",
338 "dstip4": "152.16.100.20",
340 "srcip4": "152.16.40.20",
346 "dstip4": "152.16.100.20",
348 "srcip4": "152.16.40.20",
355 "traffic_type": "continuous"
359 ixnet = mock.MagicMock()
360 ixnet.remapIds.return_value = ["0"]
361 ixnet.setMultiAttribute.return_value = [1]
362 ixnet.commit.return_value = [1]
363 ixnet.getList.side_effect = [[1], [0], [0], ["srcIp", "dstIp"]]
365 ixnet_gen = IxNextgen(ixnet)
367 result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_4)
368 self.assertIsNone(result)
369 self.assertGreater(ixnet.setMultiAttribute.call_count, 0)
370 self.assertEqual(ixnet.commit.call_count, 1)
372 def test_add_ip_header_v4_nothing_to_do(self):
373 static_traffic_params = {
380 "dstmac": "00:00:00:00:00:03",
381 "framesPerSecond": True,
382 "framesize": {"64B": "100"},
383 "srcmac": "00:00:00:00:00:01"
387 "dstip4": "152.16.40.20",
389 "srcip4": "152.16.100.20",
395 "dstip4": "152.16.40.20",
397 "srcip4": "152.16.100.20",
403 "dstip4": "152.16.100.20",
405 "srcip4": "152.16.40.20",
412 "traffic_type": "continuous"
420 "dstmac": "00:00:00:00:00:04",
421 "framesPerSecond": True,
422 "framesize": {"64B": "100"},
423 "srcmac": "00:00:00:00:00:01"
428 "dstip4": "152.16.100.20",
430 "srcip4": "152.16.40.20",
436 "dstip4": "152.16.100.20",
438 "srcip4": "152.16.40.20",
444 "dstip4": "152.16.100.20",
446 "srcip4": "152.16.40.20",
453 "traffic_type": "continuous"
457 ixnet = mock.MagicMock()
458 ixnet.remapIds.return_value=["0"]
459 ixnet.setMultiAttribute.return_value = [1]
460 ixnet.commit.return_value = [1]
461 ixnet.getList.side_effect = [[1], [0, 1], [0], ["srcIp", "dstIp"]]
463 ixnet_gen = IxNextgen(ixnet)
465 result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_4)
466 self.assertIsNone(result)
467 self.assertGreater(ixnet.setMultiAttribute.call_count, 0)
468 self.assertEqual(ixnet.commit.call_count, 1)
470 def test_add_ip_header_v6(self):
471 static_traffic_profile = {
478 "dstmac": "00:00:00:00:00:03",
479 "framesPerSecond": True,
480 "framesize": {"64B": "100"},
481 "srcmac": "00:00:00:00:00:01"
486 "dstip4": "152.16.40.20",
488 "srcip4": "152.16.100.20",
494 "dstip4": "152.16.100.20",
496 "srcip4": "152.16.40.20",
503 "traffic_type": "continuous"
511 "dstmac": "00:00:00:00:00:04",
512 "framesPerSecond": True,
513 "framesize": {"64B": "100"},
514 "srcmac": "00:00:00:00:00:01"
519 "dstip4": "152.16.100.20",
521 "srcip4": "152.16.40.20",
527 "dstip4": "152.16.100.20",
529 "srcip4": "152.16.40.20",
536 "traffic_type": "continuous"
540 ixnet = mock.MagicMock()
541 ixnet.getList.side_effect = [[1], [1], [1], ["srcIp", "dstIp"]]
542 ixnet.remapIds.return_value = ["0"]
543 ixnet.setMultiAttribute.return_value = [1]
544 ixnet.commit.return_value = [1]
546 ixnet_gen = IxNextgen(ixnet)
548 result = ixnet_gen.add_ip_header(static_traffic_profile, IP_VERSION_6)
549 self.assertIsNone(result)
550 self.assertGreater(ixnet.setMultiAttribute.call_count, 0)
551 self.assertEqual(ixnet.commit.call_count, 1)
553 def test_add_ip_header_v6_nothing_to_do(self):
554 static_traffic_params = {
561 "dstmac": "00:00:00:00:00:03",
562 "framesPerSecond": True,
563 "framesize": {"64B": "100"},
564 "srcmac": "00:00:00:00:00:01"
568 "dstip4": "152.16.40.20",
570 "srcip4": "152.16.100.20",
576 "dstip4": "152.16.40.20",
578 "srcip4": "152.16.100.20",
585 "traffic_type": "continuous"
593 "dstmac": "00:00:00:00:00:04",
594 "framesPerSecond": True,
595 "framesize": {"64B": "100"},
596 "srcmac": "00:00:00:00:00:01"
601 "dstip4": "152.16.100.20",
603 "srcip4": "152.16.40.20",
609 "dstip4": "152.16.100.20",
611 "srcip4": "152.16.40.20",
618 "traffic_type": "continuous"
622 ixnet = mock.MagicMock()
623 ixnet.getList.side_effect = [[1], [0, 1], [1], ["srcIP", "dstIP"]]
624 ixnet.remapIds.return_value = ["0"]
625 ixnet.setMultiAttribute.return_value = [1]
626 ixnet.commit.return_value = [1]
628 ixnet_gen = IxNextgen(ixnet)
630 result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_6)
631 self.assertIsNone(result)
632 self.assertEqual(ixnet.setMultiAttribute.call_count, 0)
634 def test_set_random_ip_multi_attributes_bad_ip_version(self):
635 bad_ip_version = object()
636 ixnet_gen = IxNextgen(mock.Mock())
640 with self.assertRaises(ValueError):
641 ixnet_gen.set_random_ip_multi_attributes(mock1, bad_ip_version, mock2, mock3)
643 def test_get_config(self):
647 "external-interface": [
649 "virtual-interface": {
650 "vpci": "0000:07:00.1",
654 "virtual-interface": {
655 "vpci": "0001:08:01.2",
664 "dut_result_dir": "test2",
666 "ixchassis": "test4",
668 "py_lib_path": "test6",
674 'py_lib_path': 'test6',
682 'output_dir': 'test2',
687 result = IxNextgen.get_config(tg_cfg)
688 self.assertDictEqual(result, expected)
690 def test_ix_update_ether(self):
691 static_traffic_params = {
698 "dstmac": "00:00:00:00:00:03",
699 "framesPerSecond": True,
701 "srcmac": "00:00:00:00:00:01"
705 "dstip4": "152.16.40.20",
707 "srcip4": "152.16.100.20",
712 "dstip4": "152.16.40.20",
714 "srcip4": "152.16.100.20",
720 "dstip4": "152.16.100.20",
722 "srcip4": "152.16.40.20",
729 "traffic_type": "continuous"
737 "dstmac": "00:00:00:00:00:04",
738 "framesPerSecond": True,
740 "srcmac": "00:00:00:00:00:01"
745 "dstip4": "152.16.100.20",
747 "srcip4": "152.16.40.20",
753 "dstip4": "152.16.100.20",
755 "srcip4": "152.16.40.20",
761 "dstip4": "152.16.100.20",
763 "srcip4": "152.16.40.20",
770 "traffic_type": "continuous"
774 ixnet = mock.MagicMock()
775 ixnet.setMultiAttribute.return_value = [1]
776 ixnet.commit.return_value = [1]
777 ixnet.getList.side_effect = [
782 "ethernet.header.destinationAddress",
783 "ethernet.header.sourceAddress",
787 ixnet_gen = IxNextgen(ixnet)
789 result = ixnet_gen.ix_update_ether(static_traffic_params)
790 self.assertIsNone(result)
791 self.assertGreater(ixnet.setMultiAttribute.call_count, 0)
793 def test_ix_update_ether_nothing_to_do(self):
794 static_traffic_params = {
802 "dstip4": "152.16.40.20",
804 "srcip4": "152.16.100.20",
809 "dstip4": "152.16.40.20",
811 "srcip4": "152.16.100.20",
817 "dstip4": "152.16.100.20",
819 "srcip4": "152.16.40.20",
826 "traffic_type": "continuous"
836 "dstip4": "152.16.100.20",
838 "srcip4": "152.16.40.20",
844 "dstip4": "152.16.100.20",
846 "srcip4": "152.16.40.20",
852 "dstip4": "152.16.100.20",
854 "srcip4": "152.16.40.20",
861 "traffic_type": "continuous"
865 ixnet = mock.MagicMock()
866 ixnet.setMultiAttribute.return_value = [1]
867 ixnet.commit.return_value = [1]
868 ixnet.getList.side_effect=[
873 "ethernet.header.destinationAddress",
874 "ethernet.header.sourceAddress",
878 ixnet_gen = IxNextgen(ixnet)
880 result = ixnet_gen.ix_update_ether(static_traffic_params)
881 self.assertIsNone(result)
882 self.assertEqual(ixnet.setMultiAttribute.call_count, 0)