1 # Copyright (c) 2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from __future__ import absolute_import
20 from oslo_serialization import jsonutils
22 from yardstick.network_services.traffic_profile import http_ixload
23 from yardstick.network_services.traffic_profile.http_ixload import \
24 join_non_strings, validate_non_string_sequence
27 class TestJoinNonStrings(unittest.TestCase):
29 def test_validate_non_string_sequence(self):
30 self.assertEqual(validate_non_string_sequence([1, 2, 3]), [1, 2, 3])
31 self.assertIsNone(validate_non_string_sequence('123'))
32 self.assertIsNone(validate_non_string_sequence(1))
34 self.assertEqual(validate_non_string_sequence(1, 2), 2)
35 self.assertEqual(validate_non_string_sequence(1, default=2), 2)
37 with self.assertRaises(RuntimeError):
38 validate_non_string_sequence(1, raise_exc=RuntimeError)
40 def test_join_non_strings(self):
41 self.assertEqual(join_non_strings(':'), '')
42 self.assertEqual(join_non_strings(':', 'a'), 'a')
43 self.assertEqual(join_non_strings(':', 'a', 2, 'c'), 'a:2:c')
44 self.assertEqual(join_non_strings(':', ['a', 2, 'c']), 'a:2:c')
45 self.assertEqual(join_non_strings(':', 'abc'), 'abc')
48 class TestIxLoadTrafficGen(unittest.TestCase):
50 def test_parse_run_test(self):
53 "remote_server": "REMOTE_SERVER",
54 "ixload_cfg": "IXLOAD_CFG",
55 "result_dir": "RESULT_DIR",
56 "ixia_chassis": "IXIA_CHASSIS",
62 j = jsonutils.dump_as_bytes(test_input)
63 ixload = http_ixload.IXLOADHttpTest(j)
64 self.assertDictEqual(ixload.test_input, test_input)
65 self.assertIsNone(ixload.parse_run_test())
66 self.assertEqual(ixload.ports_to_reassign, [
67 ["IXIA_CHASSIS", "CARD", 1],
68 ["IXIA_CHASSIS", "CARD", 2],
69 ["IXIA_CHASSIS", "CARD", 3],
72 def test_format_ports_for_reassignment(self):
74 ["IXIA_CHASSIS", "CARD", 1],
75 ["IXIA_CHASSIS", "CARD", 2],
76 ["IXIA_CHASSIS", "CARD", 3],
78 formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
79 self.assertEqual(formatted, [
80 "IXIA_CHASSIS;CARD;1",
81 "IXIA_CHASSIS;CARD;2",
82 "IXIA_CHASSIS;CARD;3",
85 def test_reassign_ports(self):
88 "remote_server": "REMOTE_SERVER",
89 "ixload_cfg": "IXLOAD_CFG",
90 "result_dir": "RESULT_DIR",
91 "ixia_chassis": "IXIA_CHASSIS",
97 j = jsonutils.dump_as_bytes(test_input)
98 ixload = http_ixload.IXLOADHttpTest(j)
99 repository = mock.Mock()
100 test = mock.MagicMock()
101 test.setPorts = mock.Mock()
102 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
103 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
104 self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
106 def test_reassign_ports_error(self):
109 "remote_server": "REMOTE_SERVER",
110 "ixload_cfg": "IXLOAD_CFG",
111 "result_dir": "RESULT_DIR",
112 "ixia_chassis": "IXIA_CHASSIS",
118 j = jsonutils.dump_as_bytes(test_input)
119 ixload = http_ixload.IXLOADHttpTest(j)
120 repository = mock.Mock()
122 ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
123 ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
124 ixload.ix_load = mock.MagicMock()
125 ixload.ix_load.delete = mock.Mock()
126 ixload.ix_load.disconnect = mock.Mock()
127 with self.assertRaises(Exception):
128 ixload.reassign_ports(test, repository, ports_to_reassign)
130 def test_stat_collector(self):
133 http_ixload.IXLOADHttpTest.stat_collector(*args))
135 def test_IxL_StatCollectorCommand(self):
136 args = [[0, 1, 2, 3], [0, 1, 2, 3]]
138 http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
140 def test_set_results_dir(self):
141 test_stat_collector = mock.MagicMock()
142 test_stat_collector.setResultDir = mock.Mock()
143 results_on_windows = "c:/Results"
145 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
148 def test_set_results_dir_error(self):
149 test_stat_collector = ""
150 results_on_windows = "c:/Results"
151 with self.assertRaises(Exception):
152 http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
154 def test_load_config_file(self):
157 "remote_server": "REMOTE_SERVER",
158 "ixload_cfg": "IXLOAD_CFG",
159 "result_dir": "RESULT_DIR",
160 "ixia_chassis": "IXIA_CHASSIS",
166 j = jsonutils.dump_as_bytes(test_input)
167 ixload = http_ixload.IXLOADHttpTest(j)
168 ixload.ix_load = mock.MagicMock()
169 ixload.ix_load.new = mock.Mock(return_value="")
170 self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
172 def test_load_config_file_error(self):
175 "remote_server": "REMOTE_SERVER",
176 "ixload_cfg": "IXLOAD_CFG",
177 "result_dir": "RESULT_DIR",
178 "ixia_chassis": "IXIA_CHASSIS",
184 j = jsonutils.dump_as_bytes(test_input)
185 ixload = http_ixload.IXLOADHttpTest(j)
186 ixload.ix_load = "test"
187 with self.assertRaises(Exception):
188 ixload.load_config_file("ixload.cfg")
190 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
191 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
192 def test_start_http_test_connect_error(self, mock_collector_type, mock_ixload_type):
195 "remote_server": "REMOTE_SERVER",
196 "ixload_cfg": "IXLOAD_CFG",
197 "result_dir": "RESULT_DIR",
198 "ixia_chassis": "IXIA_CHASSIS",
205 j = jsonutils.dump_as_bytes(test_input)
207 mock_ixload = mock_ixload_type()
208 mock_ixload.connect.side_effect = RuntimeError
210 ixload = http_ixload.IXLOADHttpTest(j)
211 ixload.results_on_windows = 'windows_result_dir'
212 ixload.result_dir = 'my_result_dir'
214 with self.assertRaises(RuntimeError):
215 ixload.start_http_test()
217 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
218 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
219 def test_start_http_test(self, mock_collector_type, mock_ixload_type):
222 "remote_server": "REMOTE_SERVER",
223 "ixload_cfg": "IXLOAD_CFG",
224 "result_dir": "RESULT_DIR",
225 "ixia_chassis": "IXIA_CHASSIS",
232 j = jsonutils.dump_as_bytes(test_input)
234 ixload = http_ixload.IXLOADHttpTest(j)
235 ixload.results_on_windows = 'windows_result_dir'
236 ixload.result_dir = 'my_result_dir'
237 ixload.load_config_file = mock.MagicMock()
239 self.assertIsNone(ixload.start_http_test())
241 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
242 @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
243 def test_start_http_test_reassign_error(self, mock_collector_type, mock_ixload_type):
246 "remote_server": "REMOTE_SERVER",
247 "ixload_cfg": "IXLOAD_CFG",
248 "result_dir": "RESULT_DIR",
249 "ixia_chassis": "IXIA_CHASSIS",
256 j = jsonutils.dump_as_bytes(test_input)
258 ixload = http_ixload.IXLOADHttpTest(j)
259 ixload.load_config_file = mock.MagicMock()
261 reassign_ports = mock.Mock(side_effect=RuntimeError)
262 ixload.reassign_ports = reassign_ports
263 ixload.results_on_windows = 'windows_result_dir'
264 ixload.result_dir = 'my_result_dir'
266 ixload.start_http_test()
267 self.assertEqual(reassign_ports.call_count, 1)
269 @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
270 def test_main(self, IXLOADHttpTest):
271 args = ["1", "2", "3"]
272 http_ixload.main(args)