1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from oslo_serialization import jsonutils
20 from yardstick.common import exceptions
21 from yardstick.network_services.traffic_profile import http_ixload
22 from yardstick.network_services.traffic_profile.http_ixload import \
23 join_non_strings, validate_non_string_sequence
26 class TestJoinNonStrings(unittest.TestCase):
28 def test_validate_non_string_sequence(self):
29 self.assertEqual(validate_non_string_sequence([1, 2, 3]), [1, 2, 3])
30 self.assertIsNone(validate_non_string_sequence('123'))
31 self.assertIsNone(validate_non_string_sequence(1))
33 self.assertEqual(validate_non_string_sequence(1, 2), 2)
34 self.assertEqual(validate_non_string_sequence(1, default=2), 2)
36 with self.assertRaises(RuntimeError):
37 validate_non_string_sequence(1, raise_exc=RuntimeError)
39 def test_join_non_strings(self):
40 self.assertEqual(join_non_strings(':'), '')
41 self.assertEqual(join_non_strings(':', 'a'), 'a')
42 self.assertEqual(join_non_strings(':', 'a', 2, 'c'), 'a:2:c')
43 self.assertEqual(join_non_strings(':', ['a', 2, 'c']), 'a:2:c')
44 self.assertEqual(join_non_strings(':', 'abc'), 'abc')
47 class TestIxLoadTrafficGen(unittest.TestCase):
52 "remote_server": "REMOTE_SERVER",
53 "ixload_cfg": "IXLOAD_CFG",
54 "result_dir": "RESULT_DIR",
55 "ixia_chassis": "IXIA_CHASSIS",
62 "ip": {"address": "address",
64 "subnet_prefix": "subnet_prefix",
69 def test_parse_run_test(self):
72 "remote_server": "REMOTE_SERVER",
73 "ixload_cfg": "IXLOAD_CFG",
74 "result_dir": "RESULT_DIR",
75 "ixia_chassis": "IXIA_CHASSIS",
82 j = jsonutils.dump_as_bytes(test_input)
83 ixload = http_ixload.IXLOADHttpTest(j)
84 self.assertDictEqual(ixload.test_input, test_input)
85 self.assertIsNone(ixload.parse_run_test())
86 self.assertEqual(ixload.ports_to_reassign, [
87 ["IXIA_CHASSIS", "CARD", 1],
88 ["IXIA_CHASSIS", "CARD", 2],
89 ["IXIA_CHASSIS", "CARD", 3],
91 self.assertEqual({}, ixload.links_param)
93 def test_format_ports_for_reassignment(self):
95 ["IXIA_CHASSIS", "CARD", 1],
96 ["IXIA_CHASSIS", "CARD", 2],
97 ["IXIA_CHASSIS", "CARD", 3],
99 formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
100 self.assertEqual(formatted, [
101 "IXIA_CHASSIS;CARD;1",
102 "IXIA_CHASSIS;CARD;2",
103 "IXIA_CHASSIS;CARD;3",
106 def test_reassign_ports(self):
109 "remote_server": "REMOTE_SERVER",
110 "ixload_cfg": "IXLOAD_CFG",
111 "result_dir": "RESULT_DIR",
112 "ixia_chassis": "IXIA_CHASSIS",
119 j = jsonutils.dump_as_bytes(test_input)
120 ixload = http_ixload.IXLOADHttpTest(j)
121 repository = mock.Mock()
122 test = mock.MagicMock()
123 test.setPorts = mock.Mock()
124 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
125 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
126 self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
128 def test_reassign_ports_error(self):
131 "remote_server": "REMOTE_SERVER",
132 "ixload_cfg": "IXLOAD_CFG",
133 "result_dir": "RESULT_DIR",
134 "ixia_chassis": "IXIA_CHASSIS",
141 j = jsonutils.dump_as_bytes(test_input)
142 ixload = http_ixload.IXLOADHttpTest(j)
143 repository = mock.Mock()
145 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
146 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
147 ixload.ix_load = mock.MagicMock()
148 ixload.ix_load.delete = mock.Mock()
149 ixload.ix_load.disconnect = mock.Mock()
150 with self.assertRaises(Exception):
151 ixload.reassign_ports(test, repository, ports_to_reassign)
153 def test_stat_collector(self):
156 http_ixload.IXLOADHttpTest.stat_collector(*args))
158 def test_IxL_StatCollectorCommand(self):
159 args = [[0, 1, 2, 3], [0, 1, 2, 3]]
161 http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
163 def test_set_results_dir(self):
164 test_stat_collector = mock.MagicMock()
165 test_stat_collector.setResultDir = mock.Mock()
166 results_on_windows = "c:/Results"
168 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
171 def test_set_results_dir_error(self):
172 test_stat_collector = ""
173 results_on_windows = "c:/Results"
174 with self.assertRaises(Exception):
175 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
177 def test_load_config_file(self):
180 "remote_server": "REMOTE_SERVER",
181 "ixload_cfg": "IXLOAD_CFG",
182 "result_dir": "RESULT_DIR",
183 "ixia_chassis": "IXIA_CHASSIS",
190 j = jsonutils.dump_as_bytes(test_input)
191 ixload = http_ixload.IXLOADHttpTest(j)
192 ixload.ix_load = mock.MagicMock()
193 ixload.ix_load.new = mock.Mock(return_value="")
194 self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
196 def test_load_config_file_error(self):
199 "remote_server": "REMOTE_SERVER",
200 "ixload_cfg": "IXLOAD_CFG",
201 "result_dir": "RESULT_DIR",
202 "ixia_chassis": "IXIA_CHASSIS",
209 j = jsonutils.dump_as_bytes(test_input)
210 ixload = http_ixload.IXLOADHttpTest(j)
211 ixload.ix_load = "test"
212 with self.assertRaises(Exception):
213 ixload.load_config_file("ixload.cfg")
215 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
216 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
217 def test_start_http_test_connect_error(self, mock_ixload_type, *args):
220 "remote_server": "REMOTE_SERVER",
221 "ixload_cfg": "IXLOAD_CFG",
222 "result_dir": "RESULT_DIR",
223 "ixia_chassis": "IXIA_CHASSIS",
231 j = jsonutils.dump_as_bytes(test_input)
233 mock_ixload_type.return_value.connect.side_effect = RuntimeError
235 ixload = http_ixload.IXLOADHttpTest(j)
236 ixload.results_on_windows = 'windows_result_dir'
237 ixload.result_dir = 'my_result_dir'
239 with self.assertRaises(RuntimeError):
240 ixload.start_http_test()
242 def test_update_config(self):
243 net_taraffic_0 = mock.Mock()
244 net_taraffic_0.name = "HTTP client@uplink_0"
245 net_taraffic_1 = mock.Mock()
246 net_taraffic_1.name = "HTTP client@uplink_1"
248 community_list = [net_taraffic_0, net_taraffic_1, Exception]
249 ixload = http_ixload.IXLOADHttpTest(
250 jsonutils.dump_as_bytes(self.test_input))
252 ixload.links_param = {"uplink_0": {"ip": {},
255 ixload.test = mock.Mock()
256 ixload.test.communityList = community_list
258 ixload.update_network_param = mock.Mock()
259 ixload.update_http_client_param = mock.Mock()
261 ixload.update_config()
263 ixload.update_network_param.assert_called_once_with(net_taraffic_0, {})
264 ixload.update_http_client_param.assert_called_once_with(net_taraffic_0,
267 def test_update_network_mac_address(self):
268 ethernet = mock.MagicMock()
269 net_traffic = mock.Mock()
270 net_traffic.network.getL1Plugin.return_value = ethernet
272 ixload = http_ixload.IXLOADHttpTest(
273 jsonutils.dump_as_bytes(self.test_input))
275 ix_net_l2_ethernet_plugin = ethernet.childrenList[0]
276 ix_net_ip_v4_v6_plugin = ix_net_l2_ethernet_plugin.childrenList[0]
277 ix_net_ip_v4_v6_range = ix_net_ip_v4_v6_plugin.rangeList[0]
279 ixload.update_network_mac_address(net_traffic, "auto")
280 ix_net_ip_v4_v6_range.config.assert_called_once_with(
281 autoMacGeneration=True)
283 ixload.update_network_mac_address(net_traffic, "00:00:00:00:00:01")
284 ix_net_ip_v4_v6_range.config.assert_called_with(
285 autoMacGeneration=False)
286 mac_range = ix_net_ip_v4_v6_range.getLowerRelatedRange("MacRange")
287 mac_range.config.assert_called_once_with(mac="00:00:00:00:00:01")
289 net_traffic.network.getL1Plugin.return_value = Exception
291 with self.assertRaises(exceptions.InvalidRxfFile):
292 ixload.update_network_mac_address(net_traffic, "auto")
294 def test_update_network_address(self):
295 ethernet = mock.MagicMock()
296 net_traffic = mock.Mock()
297 net_traffic.network.getL1Plugin.return_value = ethernet
299 ixload = http_ixload.IXLOADHttpTest(
300 jsonutils.dump_as_bytes(self.test_input))
302 ix_net_l2_ethernet_plugin = ethernet.childrenList[0]
303 ix_net_ip_v4_v6_plugin = ix_net_l2_ethernet_plugin.childrenList[0]
304 ix_net_ip_v4_v6_range = ix_net_ip_v4_v6_plugin.rangeList[0]
306 ixload.update_network_address(net_traffic, "address", "gateway",
308 ix_net_ip_v4_v6_range.config.assert_called_once_with(
311 gatewayAddress="gateway")
313 net_traffic.network.getL1Plugin.return_value = Exception
315 with self.assertRaises(exceptions.InvalidRxfFile):
316 ixload.update_network_address(net_traffic, "address", "gateway",
319 def test_update_network_param(self):
320 net_traffic = mock.Mock()
322 ixload = http_ixload.IXLOADHttpTest(
323 jsonutils.dump_as_bytes(self.test_input))
325 ixload.update_network_address = mock.Mock()
326 ixload.update_network_mac_address = mock.Mock()
328 param = {"address": "address",
329 "gateway": "gateway",
330 "subnet_prefix": "subnet_prefix",
334 ixload.update_network_param(net_traffic, param)
336 ixload.update_network_address.assert_called_once_with(net_traffic,
341 ixload.update_network_mac_address.assert_called_once_with(
345 def test_update_http_client_param(self):
346 net_traffic = mock.Mock()
348 ixload = http_ixload.IXLOADHttpTest(
349 jsonutils.dump_as_bytes(self.test_input))
351 ixload.update_page_size = mock.Mock()
352 ixload.update_user_count = mock.Mock()
354 param = {"page_object": "page_object",
355 "simulated_users": "simulated_users"}
357 ixload.update_http_client_param(net_traffic, param)
359 ixload.update_page_size.assert_called_once_with(net_traffic,
361 ixload.update_user_count.assert_called_once_with(net_traffic,
364 def test_update_page_size(self):
365 activity = mock.MagicMock()
366 net_traffic = mock.Mock()
368 ixload = http_ixload.IXLOADHttpTest(
369 jsonutils.dump_as_bytes(self.test_input))
371 net_traffic.activityList = [activity]
372 ix_http_command = activity.agent.actionList[0]
373 ixload.update_page_size(net_traffic, "page_object")
374 ix_http_command.config.assert_called_once_with(
375 pageObject="page_object")
377 net_traffic.activityList = []
378 with self.assertRaises(exceptions.InvalidRxfFile):
379 ixload.update_page_size(net_traffic, "page_object")
381 def test_update_user_count(self):
382 activity = mock.MagicMock()
383 net_traffic = mock.Mock()
385 ixload = http_ixload.IXLOADHttpTest(
386 jsonutils.dump_as_bytes(self.test_input))
388 net_traffic.activityList = [activity]
389 ixload.update_user_count(net_traffic, 123)
390 activity.config.assert_called_once_with(userObjectiveValue=123)
392 net_traffic.activityList = []
393 with self.assertRaises(exceptions.InvalidRxfFile):
394 ixload.update_user_count(net_traffic, 123)
396 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
397 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
398 def test_start_http_test(self, *args):
401 "remote_server": "REMOTE_SERVER",
402 "ixload_cfg": "IXLOAD_CFG",
403 "result_dir": "RESULT_DIR",
404 "ixia_chassis": "IXIA_CHASSIS",
412 j = jsonutils.dump_as_bytes(test_input)
414 ixload = http_ixload.IXLOADHttpTest(j)
415 ixload.results_on_windows = 'windows_result_dir'
416 ixload.result_dir = 'my_result_dir'
417 ixload.load_config_file = mock.MagicMock()
419 self.assertIsNone(ixload.start_http_test())
421 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
422 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
423 def test_start_http_test_reassign_error(self, *args):
426 "remote_server": "REMOTE_SERVER",
427 "ixload_cfg": "IXLOAD_CFG",
428 "result_dir": "RESULT_DIR",
429 "ixia_chassis": "IXIA_CHASSIS",
437 j = jsonutils.dump_as_bytes(test_input)
439 ixload = http_ixload.IXLOADHttpTest(j)
440 ixload.load_config_file = mock.MagicMock()
442 reassign_ports = mock.Mock(side_effect=RuntimeError)
443 ixload.reassign_ports = reassign_ports
444 ixload.results_on_windows = 'windows_result_dir'
445 ixload.result_dir = 'my_result_dir'
447 ixload.start_http_test()
448 reassign_ports.assert_called_once()
450 @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
451 def test_main(self, *args):
452 args = ["1", "2", "3"]
453 http_ixload.main(args)