1 # Copyright (c) 2017-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from oslo_serialization import jsonutils
20 from yardstick.network_services.traffic_profile import http_ixload
21 from yardstick.network_services.traffic_profile.http_ixload import \
22 join_non_strings, validate_non_string_sequence
25 class TestJoinNonStrings(unittest.TestCase):
27 def test_validate_non_string_sequence(self):
28 self.assertEqual(validate_non_string_sequence([1, 2, 3]), [1, 2, 3])
29 self.assertIsNone(validate_non_string_sequence('123'))
30 self.assertIsNone(validate_non_string_sequence(1))
32 self.assertEqual(validate_non_string_sequence(1, 2), 2)
33 self.assertEqual(validate_non_string_sequence(1, default=2), 2)
35 with self.assertRaises(RuntimeError):
36 validate_non_string_sequence(1, raise_exc=RuntimeError)
38 def test_join_non_strings(self):
39 self.assertEqual(join_non_strings(':'), '')
40 self.assertEqual(join_non_strings(':', 'a'), 'a')
41 self.assertEqual(join_non_strings(':', 'a', 2, 'c'), 'a:2:c')
42 self.assertEqual(join_non_strings(':', ['a', 2, 'c']), 'a:2:c')
43 self.assertEqual(join_non_strings(':', 'abc'), 'abc')
46 class TestIxLoadTrafficGen(unittest.TestCase):
51 "remote_server": "REMOTE_SERVER",
52 "ixload_cfg": "IXLOAD_CFG",
53 "result_dir": "RESULT_DIR",
54 "ixia_chassis": "IXIA_CHASSIS",
61 "ip": {"address": "address",
63 "subnet_prefix": "subnet_prefix",
68 def test_parse_run_test(self):
71 "remote_server": "REMOTE_SERVER",
72 "ixload_cfg": "IXLOAD_CFG",
73 "result_dir": "RESULT_DIR",
74 "ixia_chassis": "IXIA_CHASSIS",
81 j = jsonutils.dump_as_bytes(test_input)
82 ixload = http_ixload.IXLOADHttpTest(j)
83 self.assertDictEqual(ixload.test_input, test_input)
84 self.assertIsNone(ixload.parse_run_test())
85 self.assertEqual(ixload.ports_to_reassign, [
86 ["IXIA_CHASSIS", "CARD", 1],
87 ["IXIA_CHASSIS", "CARD", 2],
88 ["IXIA_CHASSIS", "CARD", 3],
90 self.assertEqual({}, ixload.links_param)
92 def test_format_ports_for_reassignment(self):
94 ["IXIA_CHASSIS", "CARD", 1],
95 ["IXIA_CHASSIS", "CARD", 2],
96 ["IXIA_CHASSIS", "CARD", 3],
98 formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
99 self.assertEqual(formatted, [
100 "IXIA_CHASSIS;CARD;1",
101 "IXIA_CHASSIS;CARD;2",
102 "IXIA_CHASSIS;CARD;3",
105 def test_reassign_ports(self):
108 "remote_server": "REMOTE_SERVER",
109 "ixload_cfg": "IXLOAD_CFG",
110 "result_dir": "RESULT_DIR",
111 "ixia_chassis": "IXIA_CHASSIS",
118 j = jsonutils.dump_as_bytes(test_input)
119 ixload = http_ixload.IXLOADHttpTest(j)
120 repository = mock.Mock()
121 test = mock.MagicMock()
122 test.setPorts = mock.Mock()
123 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
124 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
125 self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
127 def test_reassign_ports_error(self):
130 "remote_server": "REMOTE_SERVER",
131 "ixload_cfg": "IXLOAD_CFG",
132 "result_dir": "RESULT_DIR",
133 "ixia_chassis": "IXIA_CHASSIS",
140 j = jsonutils.dump_as_bytes(test_input)
141 ixload = http_ixload.IXLOADHttpTest(j)
142 repository = mock.Mock()
144 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
145 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
146 ixload.ix_load = mock.MagicMock()
147 ixload.ix_load.delete = mock.Mock()
148 ixload.ix_load.disconnect = mock.Mock()
149 with self.assertRaises(Exception):
150 ixload.reassign_ports(test, repository, ports_to_reassign)
152 def test_stat_collector(self):
155 http_ixload.IXLOADHttpTest.stat_collector(*args))
157 def test_IxL_StatCollectorCommand(self):
158 args = [[0, 1, 2, 3], [0, 1, 2, 3]]
160 http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
162 def test_set_results_dir(self):
163 test_stat_collector = mock.MagicMock()
164 test_stat_collector.setResultDir = mock.Mock()
165 results_on_windows = "c:/Results"
167 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
170 def test_set_results_dir_error(self):
171 test_stat_collector = ""
172 results_on_windows = "c:/Results"
173 with self.assertRaises(Exception):
174 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
176 def test_load_config_file(self):
179 "remote_server": "REMOTE_SERVER",
180 "ixload_cfg": "IXLOAD_CFG",
181 "result_dir": "RESULT_DIR",
182 "ixia_chassis": "IXIA_CHASSIS",
189 j = jsonutils.dump_as_bytes(test_input)
190 ixload = http_ixload.IXLOADHttpTest(j)
191 ixload.ix_load = mock.MagicMock()
192 ixload.ix_load.new = mock.Mock(return_value="")
193 self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
195 def test_load_config_file_error(self):
198 "remote_server": "REMOTE_SERVER",
199 "ixload_cfg": "IXLOAD_CFG",
200 "result_dir": "RESULT_DIR",
201 "ixia_chassis": "IXIA_CHASSIS",
208 j = jsonutils.dump_as_bytes(test_input)
209 ixload = http_ixload.IXLOADHttpTest(j)
210 ixload.ix_load = "test"
211 with self.assertRaises(Exception):
212 ixload.load_config_file("ixload.cfg")
214 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
215 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
216 def test_start_http_test_connect_error(self, mock_ixload_type, *args):
219 "remote_server": "REMOTE_SERVER",
220 "ixload_cfg": "IXLOAD_CFG",
221 "result_dir": "RESULT_DIR",
222 "ixia_chassis": "IXIA_CHASSIS",
230 j = jsonutils.dump_as_bytes(test_input)
232 mock_ixload_type.return_value.connect.side_effect = RuntimeError
234 ixload = http_ixload.IXLOADHttpTest(j)
235 ixload.results_on_windows = 'windows_result_dir'
236 ixload.result_dir = 'my_result_dir'
238 with self.assertRaises(RuntimeError):
239 ixload.start_http_test()
241 def test_update_config(self):
242 net_taraffic_0 = mock.Mock()
243 net_taraffic_0.name = "HTTP client@uplink_0"
244 net_taraffic_1 = mock.Mock()
245 net_taraffic_1.name = "HTTP client@uplink_1"
247 community_list = [net_taraffic_0, net_taraffic_1, Exception]
248 ixload = http_ixload.IXLOADHttpTest(
249 jsonutils.dump_as_bytes(self.test_input))
251 ixload.links_param = {"uplink_0": {"ip": {},
254 ixload.test = mock.Mock()
255 ixload.test.communityList = community_list
257 ixload.update_network_param = mock.Mock()
258 ixload.update_http_client_param = mock.Mock()
260 ixload.update_config()
262 ixload.update_network_param.assert_called_once_with(net_taraffic_0, {})
263 ixload.update_http_client_param.assert_called_once_with(net_taraffic_0,
266 def test_update_network_mac_address(self):
267 ethernet = mock.MagicMock()
268 net_traffic = mock.Mock()
269 net_traffic.network.getL1Plugin.return_value = ethernet
271 ixload = http_ixload.IXLOADHttpTest(
272 jsonutils.dump_as_bytes(self.test_input))
274 ix_net_l2_ethernet_plugin = ethernet.childrenList[0]
275 ix_net_ip_v4_v6_plugin = ix_net_l2_ethernet_plugin.childrenList[0]
276 ix_net_ip_v4_v6_range = ix_net_ip_v4_v6_plugin.rangeList[0]
278 ixload.update_network_mac_address(net_traffic, "auto")
279 ix_net_ip_v4_v6_range.config.assert_called_once_with(
280 autoMacGeneration=True)
282 ixload.update_network_mac_address(net_traffic, "00:00:00:00:00:01")
283 ix_net_ip_v4_v6_range.config.assert_called_with(
284 autoMacGeneration=False)
285 mac_range = ix_net_ip_v4_v6_range.getLowerRelatedRange("MacRange")
286 mac_range.config.assert_called_once_with(mac="00:00:00:00:00:01")
288 net_traffic.network.getL1Plugin.return_value = Exception
290 with self.assertRaises(http_ixload.InvalidRxfFile):
291 ixload.update_network_mac_address(net_traffic, "auto")
293 def test_update_network_address(self):
294 ethernet = mock.MagicMock()
295 net_traffic = mock.Mock()
296 net_traffic.network.getL1Plugin.return_value = ethernet
298 ixload = http_ixload.IXLOADHttpTest(
299 jsonutils.dump_as_bytes(self.test_input))
301 ix_net_l2_ethernet_plugin = ethernet.childrenList[0]
302 ix_net_ip_v4_v6_plugin = ix_net_l2_ethernet_plugin.childrenList[0]
303 ix_net_ip_v4_v6_range = ix_net_ip_v4_v6_plugin.rangeList[0]
305 ixload.update_network_address(net_traffic, "address", "gateway",
307 ix_net_ip_v4_v6_range.config.assert_called_once_with(
310 gatewayAddress="gateway")
312 net_traffic.network.getL1Plugin.return_value = Exception
314 with self.assertRaises(http_ixload.InvalidRxfFile):
315 ixload.update_network_address(net_traffic, "address", "gateway",
318 def test_update_network_param(self):
319 net_traffic = mock.Mock()
321 ixload = http_ixload.IXLOADHttpTest(
322 jsonutils.dump_as_bytes(self.test_input))
324 ixload.update_network_address = mock.Mock()
325 ixload.update_network_mac_address = mock.Mock()
327 param = {"address": "address",
328 "gateway": "gateway",
329 "subnet_prefix": "subnet_prefix",
333 ixload.update_network_param(net_traffic, param)
335 ixload.update_network_address.assert_called_once_with(net_traffic,
340 ixload.update_network_mac_address.assert_called_once_with(
344 def test_update_http_client_param(self):
345 net_traffic = mock.Mock()
347 ixload = http_ixload.IXLOADHttpTest(
348 jsonutils.dump_as_bytes(self.test_input))
350 ixload.update_page_size = mock.Mock()
351 ixload.update_user_count = mock.Mock()
353 param = {"page_object": "page_object",
354 "simulated_users": "simulated_users"}
356 ixload.update_http_client_param(net_traffic, param)
358 ixload.update_page_size.assert_called_once_with(net_traffic,
360 ixload.update_user_count.assert_called_once_with(net_traffic,
363 def test_update_page_size(self):
364 activity = mock.MagicMock()
365 net_traffic = mock.Mock()
367 ixload = http_ixload.IXLOADHttpTest(
368 jsonutils.dump_as_bytes(self.test_input))
370 net_traffic.activityList = [activity]
371 ix_http_command = activity.agent.actionList[0]
372 ixload.update_page_size(net_traffic, "page_object")
373 ix_http_command.config.assert_called_once_with(
374 pageObject="page_object")
376 net_traffic.activityList = []
377 with self.assertRaises(http_ixload.InvalidRxfFile):
378 ixload.update_page_size(net_traffic, "page_object")
380 def test_update_user_count(self):
381 activity = mock.MagicMock()
382 net_traffic = mock.Mock()
384 ixload = http_ixload.IXLOADHttpTest(
385 jsonutils.dump_as_bytes(self.test_input))
387 net_traffic.activityList = [activity]
388 ixload.update_user_count(net_traffic, 123)
389 activity.config.assert_called_once_with(userObjectiveValue=123)
391 net_traffic.activityList = []
392 with self.assertRaises(http_ixload.InvalidRxfFile):
393 ixload.update_user_count(net_traffic, 123)
395 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
396 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
397 def test_start_http_test(self, *args):
400 "remote_server": "REMOTE_SERVER",
401 "ixload_cfg": "IXLOAD_CFG",
402 "result_dir": "RESULT_DIR",
403 "ixia_chassis": "IXIA_CHASSIS",
411 j = jsonutils.dump_as_bytes(test_input)
413 ixload = http_ixload.IXLOADHttpTest(j)
414 ixload.results_on_windows = 'windows_result_dir'
415 ixload.result_dir = 'my_result_dir'
416 ixload.load_config_file = mock.MagicMock()
418 self.assertIsNone(ixload.start_http_test())
420 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
421 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
422 def test_start_http_test_reassign_error(self, *args):
425 "remote_server": "REMOTE_SERVER",
426 "ixload_cfg": "IXLOAD_CFG",
427 "result_dir": "RESULT_DIR",
428 "ixia_chassis": "IXIA_CHASSIS",
436 j = jsonutils.dump_as_bytes(test_input)
438 ixload = http_ixload.IXLOADHttpTest(j)
439 ixload.load_config_file = mock.MagicMock()
441 reassign_ports = mock.Mock(side_effect=RuntimeError)
442 ixload.reassign_ports = reassign_ports
443 ixload.results_on_windows = 'windows_result_dir'
444 ixload.result_dir = 'my_result_dir'
446 ixload.start_http_test()
447 reassign_ports.assert_called_once()
449 @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
450 def test_main(self, *args):
451 args = ["1", "2", "3"]
452 http_ixload.main(args)