Merge "Apex: updates for post-danube"
[releng.git] / utils / test / testapi / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
12
13
14 def thread_execute(method, *args, **kwargs):
15         with ThreadPoolExecutor(max_workers=2) as executor:
16             result = executor.submit(method, *args, **kwargs)
17         return result
18
19
20 class MemCursor(object):
21     def __init__(self, collection):
22         self.collection = collection
23         self.length = len(self.collection)
24         self.sorted = []
25
26     def _is_next_exist(self):
27         return self.length != 0
28
29     @property
30     def fetch_next(self):
31         return thread_execute(self._is_next_exist)
32
33     def next_object(self):
34         self.length -= 1
35         return self.collection.pop()
36
37     def sort(self, key_or_list):
38         for k, v in key_or_list.iteritems():
39             if v == -1:
40                 reverse = True
41             else:
42                 reverse = False
43
44             self.collection = sorted(self.collection,
45                                      key=itemgetter(k), reverse=reverse)
46         return self
47
48     def limit(self, limit):
49         if limit != 0 and limit < len(self.collection):
50             self.collection = self.collection[0: limit]
51             self.length = limit
52         return self
53
54     def skip(self, skip):
55         if skip < self.length and (skip > 0):
56             self.collection = self.collection[self.length - skip: -1]
57             self.length -= skip
58         elif skip >= self.length:
59             self.collection = []
60             self.length = 0
61         return self
62
63     def _count(self):
64         return self.length
65
66     def count(self):
67         return thread_execute(self._count)
68
69
70 class MemDb(object):
71
72     def __init__(self, name):
73         self.name = name
74         self.contents = []
75         pass
76
77     def _find_one(self, spec_or_id=None, *args):
78         if spec_or_id is not None and not isinstance(spec_or_id, dict):
79             spec_or_id = {"_id": spec_or_id}
80         if '_id' in spec_or_id:
81             spec_or_id['_id'] = str(spec_or_id['_id'])
82         cursor = self._find(spec_or_id, *args)
83         for result in cursor:
84             return result
85         return None
86
87     def find_one(self, spec_or_id=None, *args):
88         return thread_execute(self._find_one, spec_or_id, *args)
89
90     def _insert(self, doc_or_docs, check_keys=True):
91
92         docs = doc_or_docs
93         return_one = False
94         if isinstance(docs, dict):
95             return_one = True
96             docs = [docs]
97
98         if check_keys:
99             for doc in docs:
100                 self._check_keys(doc)
101
102         ids = []
103         for doc in docs:
104             if '_id' not in doc:
105                 doc['_id'] = str(ObjectId())
106             if not self._find_one(doc['_id']):
107                 ids.append(doc['_id'])
108                 self.contents.append(doc_or_docs)
109
110         if len(ids) == 0:
111             return None
112         if return_one:
113             return ids[0]
114         else:
115             return ids
116
117     def insert(self, doc_or_docs, check_keys=True):
118         return thread_execute(self._insert, doc_or_docs, check_keys)
119
120     @staticmethod
121     def _compare_date(spec, value):
122         gte = True
123         lt = False
124         for k, v in spec.iteritems():
125             if k == '$gte' and value < v:
126                 gte = False
127             elif k == '$lt' and value < v:
128                 lt = True
129         return gte and lt
130
131     def _in(self, content, *args):
132         if self.name == 'scenarios':
133             return self._in_scenarios(content, *args)
134         else:
135             return self._in_others(content, *args)
136
137     def _in_scenarios_installer(self, installer, content):
138         hit = False
139         for s_installer in content['installers']:
140             if installer == s_installer['installer']:
141                 hit = True
142
143         return hit
144
145     def _in_scenarios_version(self, version, content):
146         hit = False
147         for s_installer in content['installers']:
148             for s_version in s_installer['versions']:
149                 if version == s_version['version']:
150                     hit = True
151         return hit
152
153     def _in_scenarios_project(self, project, content):
154         hit = False
155         for s_installer in content['installers']:
156             for s_version in s_installer['versions']:
157                 for s_project in s_version['projects']:
158                     if project == s_project['project']:
159                         hit = True
160
161         return hit
162
163     def _in_scenarios(self, content, *args):
164         for arg in args:
165             for k, v in arg.iteritems():
166                 if k == 'installers':
167                     for inner in v.values():
168                         for i_k, i_v in inner.iteritems():
169                             if i_k == 'installer':
170                                 return self._in_scenarios_installer(i_v,
171                                                                     content)
172                             elif i_k == 'versions.version':
173                                 return self._in_scenarios_version(i_v,
174                                                                   content)
175                             elif i_k == 'versions.projects.project':
176                                 return self._in_scenarios_project(i_v,
177                                                                   content)
178                 elif content.get(k, None) != v:
179                     return False
180
181         return True
182
183     def _in_others(self, content, *args):
184         for arg in args:
185             for k, v in arg.iteritems():
186                 if k == 'start_date':
187                     if not MemDb._compare_date(v, content.get(k)):
188                         return False
189                 elif k == 'trust_indicator.current':
190                     if content.get('trust_indicator').get('current') != v:
191                         return False
192                 elif content.get(k, None) != v:
193                     return False
194
195         return True
196
197     def _find(self, *args):
198         res = []
199         for content in self.contents:
200             if self._in(content, *args):
201                 res.append(content)
202
203         return res
204
205     def find(self, *args):
206         return MemCursor(self._find(*args))
207
208     def _aggregate(self, *args, **kwargs):
209         res = self.contents
210         print args
211         for arg in args[0]:
212             for k, v in arg.iteritems():
213                 if k == '$match':
214                     res = self._find(v)
215         cursor = MemCursor(res)
216         for arg in args[0]:
217             for k, v in arg.iteritems():
218                 if k == '$sort':
219                     cursor = cursor.sort(v)
220                 elif k == '$skip':
221                     cursor = cursor.skip(v)
222                 elif k == '$limit':
223                     cursor = cursor.limit(v)
224         return cursor
225
226     def aggregate(self, *args, **kwargs):
227         return self._aggregate(*args, **kwargs)
228
229     def _update(self, spec, document, check_keys=True):
230         updated = False
231
232         if check_keys:
233             self._check_keys(document)
234
235         for index in range(len(self.contents)):
236             content = self.contents[index]
237             if self._in(content, spec):
238                 for k, v in document.iteritems():
239                     updated = True
240                     content[k] = v
241             self.contents[index] = content
242         return updated
243
244     def update(self, spec, document, check_keys=True):
245         return thread_execute(self._update, spec, document, check_keys)
246
247     def _remove(self, spec_or_id=None):
248         if spec_or_id is None:
249             self.contents = []
250         if not isinstance(spec_or_id, dict):
251             spec_or_id = {'_id': spec_or_id}
252         for index in range(len(self.contents)):
253             content = self.contents[index]
254             if self._in(content, spec_or_id):
255                 del self.contents[index]
256                 return True
257         return False
258
259     def remove(self, spec_or_id=None):
260         return thread_execute(self._remove, spec_or_id)
261
262     def clear(self):
263         self._remove()
264
265     def _check_keys(self, doc):
266         for key in doc.keys():
267             if '.' in key:
268                 raise NameError('key {} must not contain .'.format(key))
269             if key.startswith('$'):
270                 raise NameError('key {} must not start with $'.format(key))
271             if isinstance(doc.get(key), dict):
272                 self._check_keys(doc.get(key))
273
274
275 def __getattr__(name):
276     return globals()[name]
277
278
279 pods = MemDb('pods')
280 projects = MemDb('projects')
281 testcases = MemDb('testcases')
282 results = MemDb('results')
283 scenarios = MemDb('scenarios')
284 tokens = MemDb('tokens')