1 from copy import deepcopy
2 from datetime import datetime
7 from opnfv_testapi.common import constants
8 import opnfv_testapi.resources.scenario_models as models
9 import test_base as base
12 class TestScenarioBase(base.TestBase):
14 super(TestScenarioBase, self).setUp()
15 self.get_res = models.Scenario
16 self.list_res = models.Scenarios
17 self.basePath = '/api/v1/scenarios'
18 self.req_d = self._load_request('scenario-c1.json')
19 self.req_2 = self._load_request('scenario-c2.json')
24 def assert_body(self, project, req=None):
28 def _load_request(f_req):
29 abs_file = os.path.join(os.path.dirname(__file__), f_req)
30 with open(abs_file, 'r') as f:
35 def create_return_name(self, req):
36 _, res = self.create(req)
37 return res.href.split('/')[-1]
39 def assert_res(self, code, scenario, req=None):
40 self.assertEqual(code, constants.HTTP_OK)
43 self.assertIsNotNone(scenario._id)
44 self.assertIsNotNone(scenario.creation_date)
46 scenario == models.Scenario.from_dict(req)
49 def _set_query(*args):
55 def _get_and_assert(self, name, req=None):
56 code, body = self.get(name)
57 self.assert_res(code, body, req)
60 class TestScenarioCreate(TestScenarioBase):
61 def test_withoutBody(self):
62 (code, body) = self.create()
63 self.assertEqual(code, constants.HTTP_BAD_REQUEST)
65 def test_emptyName(self):
66 req_empty = models.ScenarioCreateRequest('')
67 (code, body) = self.create(req_empty)
68 self.assertEqual(code, constants.HTTP_BAD_REQUEST)
69 self.assertIn('name missing', body)
71 def test_noneName(self):
72 req_none = models.ScenarioCreateRequest(None)
73 (code, body) = self.create(req_none)
74 self.assertEqual(code, constants.HTTP_BAD_REQUEST)
75 self.assertIn('name missing', body)
77 def test_success(self):
78 (code, body) = self.create_d()
79 self.assertEqual(code, constants.HTTP_OK)
80 self.assert_create_body(body)
82 def test_alreadyExist(self):
84 (code, body) = self.create_d()
85 self.assertEqual(code, constants.HTTP_FORBIDDEN)
86 self.assertIn('already exists', body)
89 class TestScenarioGet(TestScenarioBase):
91 super(TestScenarioGet, self).setUp()
92 self.scenario_1 = self.create_return_name(self.req_d)
93 self.scenario_2 = self.create_return_name(self.req_2)
95 def test_getByName(self):
96 self._get_and_assert(self.scenario_1, self.req_d)
98 def test_getAll(self):
99 self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
101 def test_queryName(self):
102 query = self._set_query('name=nosdn-nofeature-ha')
103 self._query_and_assert(query, reqs=[self.req_d])
105 def test_queryInstaller(self):
106 query = self._set_query('installer=apex')
107 self._query_and_assert(query, reqs=[self.req_d])
109 def test_queryVersion(self):
110 query = self._set_query('version=master')
111 self._query_and_assert(query, reqs=[self.req_d])
113 def test_queryProject(self):
114 query = self._set_query('project=functest')
115 self._query_and_assert(query, reqs=[self.req_d, self.req_2])
117 def test_queryCombination(self):
118 query = self._set_query('name=nosdn-nofeature-ha',
123 self._query_and_assert(query, reqs=[self.req_d])
125 def _query_and_assert(self, query, found=True, reqs=None):
126 code, body = self.query(query)
128 self.assertEqual(code, constants.HTTP_OK)
129 self.assertEqual(0, len(body.scenarios))
131 self.assertEqual(len(reqs), len(body.scenarios))
133 for scenario in body.scenarios:
134 if req['name'] == scenario.name:
135 self.assert_res(code, scenario, req)
138 class TestScenarioUpdate(TestScenarioBase):
140 super(TestScenarioUpdate, self).setUp()
141 self.scenario = self.create_return_name(self.req_d)
142 self.scenario_2 = self.create_return_name(self.req_2)
144 def _execute(set_update):
145 @functools.wraps(set_update)
147 update, scenario = set_update(self, deepcopy(self.req_d))
148 self._update_and_assert(update, scenario)
151 def _update(expected):
152 def _update(set_update):
153 @functools.wraps(set_update)
155 update, scenario = set_update(self, deepcopy(self.req_d))
156 code, body = self.update(update, self.scenario)
157 getattr(self, expected)(code, scenario)
162 def test_renameScenario(self, scenario):
163 new_name = 'nosdn-nofeature-noha'
164 scenario['name'] = new_name
165 update_req = models.ScenarioUpdateRequest(field='name',
168 term={'name': new_name})
169 return update_req, scenario
171 @_update('_forbidden')
172 def test_renameScenario_exist(self, scenario):
173 new_name = self.scenario_2
174 scenario['name'] = new_name
175 update_req = models.ScenarioUpdateRequest(field='name',
178 term={'name': new_name})
179 return update_req, scenario
181 @_update('_bad_request')
182 def test_renameScenario_noName(self, scenario):
183 new_name = self.scenario_2
184 scenario['name'] = new_name
185 update_req = models.ScenarioUpdateRequest(field='name',
189 return update_req, scenario
192 def test_addInstaller(self, scenario):
193 add = models.ScenarioInstaller(installer='daisy', versions=list())
194 scenario['installers'].append(add.format())
195 update = models.ScenarioUpdateRequest(field='installer',
199 return update, scenario
202 def test_deleteInstaller(self, scenario):
203 scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
204 scenario['installers'])
206 update = models.ScenarioUpdateRequest(field='installer',
208 locate={'installer': 'apex'})
209 return update, scenario
212 def test_addVersion(self, scenario):
213 add = models.ScenarioVersion(version='danube', projects=list())
214 scenario['installers'][0]['versions'].append(add.format())
215 update = models.ScenarioUpdateRequest(field='version',
217 locate={'installer': 'apex'},
219 return update, scenario
222 def test_deleteVersion(self, scenario):
223 scenario['installers'][0]['versions'] = filter(
224 lambda f: f['version'] != 'master',
225 scenario['installers'][0]['versions'])
227 update = models.ScenarioUpdateRequest(field='version',
229 locate={'installer': 'apex',
230 'version': 'master'})
231 return update, scenario
234 def test_changeOwner(self, scenario):
235 scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
237 update = models.ScenarioUpdateRequest(field='owner',
239 locate={'installer': 'apex',
240 'version': 'master'},
241 term={'owner': 'lucy'})
242 return update, scenario
245 def test_addProject(self, scenario):
246 add = models.ScenarioProject(project='qtip').format()
247 scenario['installers'][0]['versions'][0]['projects'].append(add)
248 update = models.ScenarioUpdateRequest(field='project',
250 locate={'installer': 'apex',
251 'version': 'master'},
253 return update, scenario
256 def test_deleteProject(self, scenario):
257 scenario['installers'][0]['versions'][0]['projects'] = filter(
258 lambda f: f['project'] != 'functest',
259 scenario['installers'][0]['versions'][0]['projects'])
261 update = models.ScenarioUpdateRequest(field='project',
266 'project': 'functest'})
267 return update, scenario
270 def test_addCustoms(self, scenario):
271 add = ['odl', 'parser', 'vping_ssh']
272 projects = scenario['installers'][0]['versions'][0]['projects']
273 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
274 functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
275 update = models.ScenarioUpdateRequest(field='customs',
280 'project': 'functest'},
282 return update, scenario
285 def test_deleteCustoms(self, scenario):
286 projects = scenario['installers'][0]['versions'][0]['projects']
287 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
288 functest['customs'] = ['healthcheck']
289 update = models.ScenarioUpdateRequest(field='customs',
294 'project': 'functest'},
296 return update, scenario
299 def test_addScore(self, scenario):
300 add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
301 projects = scenario['installers'][0]['versions'][0]['projects']
302 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
303 functest['scores'].append(add.format())
304 update = models.ScenarioUpdateRequest(field='score',
309 'project': 'functest'},
311 return update, scenario
314 def test_addTi(self, scenario):
315 add = models.ScenarioTI(date=str(datetime.now()), status='gold')
316 projects = scenario['installers'][0]['versions'][0]['projects']
317 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
318 functest['trust_indicators'].append(add.format())
319 update = models.ScenarioUpdateRequest(field='trust_indicator',
324 'project': 'functest'},
326 return update, scenario
328 def _update_and_assert(self, update_req, new_scenario, name=None):
329 code, _ = self.update(update_req, self.scenario)
330 self.assertEqual(code, constants.HTTP_OK)
331 self._get_and_assert(_none_default(name, self.scenario),
334 def _success(self, status, new_scenario):
335 self.assertEqual(status, constants.HTTP_OK)
336 self._get_and_assert(new_scenario.get('name'), new_scenario)
338 def _forbidden(self, status, new_scenario):
339 self.assertEqual(status, constants.HTTP_FORBIDDEN)
341 def _bad_request(self, status, new_scenario):
342 self.assertEqual(status, constants.HTTP_BAD_REQUEST)
345 class TestScenarioDelete(TestScenarioBase):
346 def test_notFound(self):
347 code, body = self.delete('notFound')
348 self.assertEqual(code, constants.HTTP_NOT_FOUND)
350 def test_success(self):
351 scenario = self.create_return_name(self.req_d)
352 code, _ = self.delete(scenario)
353 self.assertEqual(code, constants.HTTP_OK)
354 code, _ = self.get(scenario)
355 self.assertEqual(code, constants.HTTP_NOT_FOUND)
358 def _none_default(check, default):
359 return check if check else default