1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from oslo_serialization import jsonutils
20 from yardstick.network_services.traffic_profile import http_ixload
21 from yardstick.network_services.traffic_profile.http_ixload import \
22 join_non_strings, validate_non_string_sequence
25 class TestJoinNonStrings(unittest.TestCase):
27 def test_validate_non_string_sequence(self):
28 self.assertEqual(validate_non_string_sequence([1, 2, 3]), [1, 2, 3])
29 self.assertIsNone(validate_non_string_sequence('123'))
30 self.assertIsNone(validate_non_string_sequence(1))
32 self.assertEqual(validate_non_string_sequence(1, 2), 2)
33 self.assertEqual(validate_non_string_sequence(1, default=2), 2)
35 with self.assertRaises(RuntimeError):
36 validate_non_string_sequence(1, raise_exc=RuntimeError)
38 def test_join_non_strings(self):
39 self.assertEqual(join_non_strings(':'), '')
40 self.assertEqual(join_non_strings(':', 'a'), 'a')
41 self.assertEqual(join_non_strings(':', 'a', 2, 'c'), 'a:2:c')
42 self.assertEqual(join_non_strings(':', ['a', 2, 'c']), 'a:2:c')
43 self.assertEqual(join_non_strings(':', 'abc'), 'abc')
46 class TestIxLoadTrafficGen(unittest.TestCase):
48 def test_parse_run_test(self):
51 "remote_server": "REMOTE_SERVER",
52 "ixload_cfg": "IXLOAD_CFG",
53 "result_dir": "RESULT_DIR",
54 "ixia_chassis": "IXIA_CHASSIS",
60 j = jsonutils.dump_as_bytes(test_input)
61 ixload = http_ixload.IXLOADHttpTest(j)
62 self.assertDictEqual(ixload.test_input, test_input)
63 self.assertIsNone(ixload.parse_run_test())
64 self.assertEqual(ixload.ports_to_reassign, [
65 ["IXIA_CHASSIS", "CARD", 1],
66 ["IXIA_CHASSIS", "CARD", 2],
67 ["IXIA_CHASSIS", "CARD", 3],
70 def test_format_ports_for_reassignment(self):
72 ["IXIA_CHASSIS", "CARD", 1],
73 ["IXIA_CHASSIS", "CARD", 2],
74 ["IXIA_CHASSIS", "CARD", 3],
76 formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
77 self.assertEqual(formatted, [
78 "IXIA_CHASSIS;CARD;1",
79 "IXIA_CHASSIS;CARD;2",
80 "IXIA_CHASSIS;CARD;3",
83 def test_reassign_ports(self):
86 "remote_server": "REMOTE_SERVER",
87 "ixload_cfg": "IXLOAD_CFG",
88 "result_dir": "RESULT_DIR",
89 "ixia_chassis": "IXIA_CHASSIS",
95 j = jsonutils.dump_as_bytes(test_input)
96 ixload = http_ixload.IXLOADHttpTest(j)
97 repository = mock.Mock()
98 test = mock.MagicMock()
99 test.setPorts = mock.Mock()
100 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
101 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
102 self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
104 def test_reassign_ports_error(self):
107 "remote_server": "REMOTE_SERVER",
108 "ixload_cfg": "IXLOAD_CFG",
109 "result_dir": "RESULT_DIR",
110 "ixia_chassis": "IXIA_CHASSIS",
116 j = jsonutils.dump_as_bytes(test_input)
117 ixload = http_ixload.IXLOADHttpTest(j)
118 repository = mock.Mock()
120 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
121 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
122 ixload.ix_load = mock.MagicMock()
123 ixload.ix_load.delete = mock.Mock()
124 ixload.ix_load.disconnect = mock.Mock()
125 with self.assertRaises(Exception):
126 ixload.reassign_ports(test, repository, ports_to_reassign)
128 def test_stat_collector(self):
131 http_ixload.IXLOADHttpTest.stat_collector(*args))
133 def test_IxL_StatCollectorCommand(self):
134 args = [[0, 1, 2, 3], [0, 1, 2, 3]]
136 http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
138 def test_set_results_dir(self):
139 test_stat_collector = mock.MagicMock()
140 test_stat_collector.setResultDir = mock.Mock()
141 results_on_windows = "c:/Results"
143 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
146 def test_set_results_dir_error(self):
147 test_stat_collector = ""
148 results_on_windows = "c:/Results"
149 with self.assertRaises(Exception):
150 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
152 def test_load_config_file(self):
155 "remote_server": "REMOTE_SERVER",
156 "ixload_cfg": "IXLOAD_CFG",
157 "result_dir": "RESULT_DIR",
158 "ixia_chassis": "IXIA_CHASSIS",
164 j = jsonutils.dump_as_bytes(test_input)
165 ixload = http_ixload.IXLOADHttpTest(j)
166 ixload.ix_load = mock.MagicMock()
167 ixload.ix_load.new = mock.Mock(return_value="")
168 self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
170 def test_load_config_file_error(self):
173 "remote_server": "REMOTE_SERVER",
174 "ixload_cfg": "IXLOAD_CFG",
175 "result_dir": "RESULT_DIR",
176 "ixia_chassis": "IXIA_CHASSIS",
182 j = jsonutils.dump_as_bytes(test_input)
183 ixload = http_ixload.IXLOADHttpTest(j)
184 ixload.ix_load = "test"
185 with self.assertRaises(Exception):
186 ixload.load_config_file("ixload.cfg")
188 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
189 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
190 def test_start_http_test_connect_error(self, mock_ixload_type, *args):
193 "remote_server": "REMOTE_SERVER",
194 "ixload_cfg": "IXLOAD_CFG",
195 "result_dir": "RESULT_DIR",
196 "ixia_chassis": "IXIA_CHASSIS",
203 j = jsonutils.dump_as_bytes(test_input)
205 mock_ixload_type.return_value.connect.side_effect = RuntimeError
207 ixload = http_ixload.IXLOADHttpTest(j)
208 ixload.results_on_windows = 'windows_result_dir'
209 ixload.result_dir = 'my_result_dir'
211 with self.assertRaises(RuntimeError):
212 ixload.start_http_test()
214 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
215 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
216 def test_start_http_test(self, *args):
219 "remote_server": "REMOTE_SERVER",
220 "ixload_cfg": "IXLOAD_CFG",
221 "result_dir": "RESULT_DIR",
222 "ixia_chassis": "IXIA_CHASSIS",
229 j = jsonutils.dump_as_bytes(test_input)
231 ixload = http_ixload.IXLOADHttpTest(j)
232 ixload.results_on_windows = 'windows_result_dir'
233 ixload.result_dir = 'my_result_dir'
234 ixload.load_config_file = mock.MagicMock()
236 self.assertIsNone(ixload.start_http_test())
238 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
239 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
240 def test_start_http_test_reassign_error(self, *args):
243 "remote_server": "REMOTE_SERVER",
244 "ixload_cfg": "IXLOAD_CFG",
245 "result_dir": "RESULT_DIR",
246 "ixia_chassis": "IXIA_CHASSIS",
253 j = jsonutils.dump_as_bytes(test_input)
255 ixload = http_ixload.IXLOADHttpTest(j)
256 ixload.load_config_file = mock.MagicMock()
258 reassign_ports = mock.Mock(side_effect=RuntimeError)
259 ixload.reassign_ports = reassign_ports
260 ixload.results_on_windows = 'windows_result_dir'
261 ixload.result_dir = 'my_result_dir'
263 ixload.start_http_test()
264 reassign_ports.assert_called_once()
266 @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
267 def test_main(self, *args):
268 args = ["1", "2", "3"]
269 http_ixload.main(args)