1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from __future__ import absolute_import
21 from oslo_serialization import jsonutils
23 from yardstick.network_services.traffic_profile import http_ixload
26 class TestIxLoadTrafficGen(unittest.TestCase):
28 def test_parse_run_test(self):
31 "remote_server": "REMOTE_SERVER",
32 "ixload_cfg": "IXLOAD_CFG",
33 "result_dir": "RESULT_DIR",
34 "ixia_chassis": "IXIA_CHASSIS",
40 j = jsonutils.dump_as_bytes(test_input)
41 ixload = http_ixload.IXLOADHttpTest(j)
42 self.assertDictEqual(ixload.test_input, test_input)
43 self.assertIsNone(ixload.parse_run_test())
44 self.assertEqual(ixload.ports_to_reassign, [
45 ["IXIA_CHASSIS", "CARD", 1],
46 ["IXIA_CHASSIS", "CARD", 2],
47 ["IXIA_CHASSIS", "CARD", 3],
50 def test_format_ports_for_reassignment(self):
52 ["IXIA_CHASSIS", "CARD", 1],
53 ["IXIA_CHASSIS", "CARD", 2],
54 ["IXIA_CHASSIS", "CARD", 3],
56 formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
57 self.assertEqual(formatted, [
58 "IXIA_CHASSIS;CARD;1",
59 "IXIA_CHASSIS;CARD;2",
60 "IXIA_CHASSIS;CARD;3",
63 def test_reassign_ports(self):
66 "remote_server": "REMOTE_SERVER",
67 "ixload_cfg": "IXLOAD_CFG",
68 "result_dir": "RESULT_DIR",
69 "ixia_chassis": "IXIA_CHASSIS",
75 j = jsonutils.dump_as_bytes(test_input)
76 ixload = http_ixload.IXLOADHttpTest(j)
77 repository = mock.Mock()
78 test = mock.MagicMock()
79 test.setPorts = mock.Mock()
80 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
81 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
82 self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
84 def test_reassign_ports_error(self):
87 "remote_server": "REMOTE_SERVER",
88 "ixload_cfg": "IXLOAD_CFG",
89 "result_dir": "RESULT_DIR",
90 "ixia_chassis": "IXIA_CHASSIS",
96 j = jsonutils.dump_as_bytes(test_input)
97 ixload = http_ixload.IXLOADHttpTest(j)
98 repository = mock.Mock()
100 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
101 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
102 ixload.ix_load = mock.MagicMock()
103 ixload.ix_load.delete = mock.Mock()
104 ixload.ix_load.disconnect = mock.Mock()
105 with self.assertRaises(Exception):
106 ixload.reassign_ports(test, repository, ports_to_reassign)
108 def test_stat_collector(self):
111 http_ixload.IXLOADHttpTest.stat_collector(*args))
113 def test_IxL_StatCollectorCommand(self):
114 args = [[0, 1, 2, 3], [0, 1, 2, 3]]
116 http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
118 def test_set_results_dir(self):
119 test_stat_collector = mock.MagicMock()
120 test_stat_collector.setResultDir = mock.Mock()
121 results_on_windows = "c:/Results"
123 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
126 def test_set_results_dir_error(self):
127 test_stat_collector = ""
128 results_on_windows = "c:/Results"
129 with self.assertRaises(Exception):
130 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
132 def test_load_config_file(self):
135 "remote_server": "REMOTE_SERVER",
136 "ixload_cfg": "IXLOAD_CFG",
137 "result_dir": "RESULT_DIR",
138 "ixia_chassis": "IXIA_CHASSIS",
144 j = jsonutils.dump_as_bytes(test_input)
145 ixload = http_ixload.IXLOADHttpTest(j)
146 ixload.ix_load = mock.MagicMock()
147 ixload.ix_load.new = mock.Mock(return_value="")
148 self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
150 def test_load_config_file_error(self):
153 "remote_server": "REMOTE_SERVER",
154 "ixload_cfg": "IXLOAD_CFG",
155 "result_dir": "RESULT_DIR",
156 "ixia_chassis": "IXIA_CHASSIS",
162 j = jsonutils.dump_as_bytes(test_input)
163 ixload = http_ixload.IXLOADHttpTest(j)
164 ixload.ix_load = "test"
165 with self.assertRaises(Exception):
166 ixload.load_config_file("ixload.cfg")
168 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
169 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
170 def test_start_http_test_connect_error(self, mock_collector_type, mock_ixload_type):
173 "remote_server": "REMOTE_SERVER",
174 "ixload_cfg": "IXLOAD_CFG",
175 "result_dir": "RESULT_DIR",
176 "ixia_chassis": "IXIA_CHASSIS",
183 j = jsonutils.dump_as_bytes(test_input)
185 mock_ixload = mock_ixload_type()
186 mock_ixload.connect.side_effect = RuntimeError
188 ixload = http_ixload.IXLOADHttpTest(j)
189 ixload.results_on_windows = 'windows_result_dir'
190 ixload.result_dir = 'my_result_dir'
192 with self.assertRaises(RuntimeError):
193 ixload.start_http_test()
195 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
196 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
197 def test_start_http_test(self, mock_collector_type, mock_ixload_type):
200 "remote_server": "REMOTE_SERVER",
201 "ixload_cfg": "IXLOAD_CFG",
202 "result_dir": "RESULT_DIR",
203 "ixia_chassis": "IXIA_CHASSIS",
210 j = jsonutils.dump_as_bytes(test_input)
212 ixload = http_ixload.IXLOADHttpTest(j)
213 ixload.results_on_windows = 'windows_result_dir'
214 ixload.result_dir = 'my_result_dir'
215 ixload.load_config_file = mock.MagicMock()
217 self.assertIsNone(ixload.start_http_test())
219 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
220 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
221 def test_start_http_test_reassign_error(self, mock_collector_type, mock_ixload_type):
224 "remote_server": "REMOTE_SERVER",
225 "ixload_cfg": "IXLOAD_CFG",
226 "result_dir": "RESULT_DIR",
227 "ixia_chassis": "IXIA_CHASSIS",
234 j = jsonutils.dump_as_bytes(test_input)
236 ixload = http_ixload.IXLOADHttpTest(j)
237 ixload.load_config_file = mock.MagicMock()
239 reassign_ports = mock.Mock(side_effect=RuntimeError)
240 ixload.reassign_ports = reassign_ports
241 ixload.results_on_windows = 'windows_result_dir'
242 ixload.result_dir = 'my_result_dir'
244 ixload.start_http_test()
245 self.assertEqual(reassign_ports.call_count, 1)
247 @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
248 def test_main(self, IXLOADHttpTest):
249 args = ["1", "2", "3"]
250 http_ixload.main(args)