bugfix: pagination crash due to memory limitation
[releng.git] / utils / test / testapi / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
12
13
14 def thread_execute(method, *args, **kwargs):
15         with ThreadPoolExecutor(max_workers=2) as executor:
16             result = executor.submit(method, *args, **kwargs)
17         return result
18
19
20 class MemCursor(object):
21     def __init__(self, collection):
22         self.collection = collection
23         self.length = len(self.collection)
24         self.sorted = []
25
26     def _is_next_exist(self):
27         return self.length != 0
28
29     @property
30     def fetch_next(self):
31         return thread_execute(self._is_next_exist)
32
33     def next_object(self):
34         self.length -= 1
35         return self.collection.pop()
36
37     def sort(self, key_or_list):
38         for k, v in key_or_list.iteritems():
39             if v == -1:
40                 reverse = True
41             else:
42                 reverse = False
43
44             self.collection = sorted(self.collection,
45                                      key=itemgetter(k), reverse=reverse)
46         return self
47
48     def limit(self, limit):
49         if limit != 0 and limit < len(self.collection):
50             self.collection = self.collection[0: limit]
51             self.length = limit
52         return self
53
54     def skip(self, skip):
55         if skip < self.length and (skip > 0):
56             self.collection = self.collection[self.length - skip: -1]
57             self.length -= skip
58         elif skip >= self.length:
59             self.collection = []
60             self.length = 0
61         return self
62
63     def _count(self):
64         return self.length
65
66     def count(self):
67         return thread_execute(self._count)
68
69
70 class MemDb(object):
71
72     def __init__(self, name):
73         self.name = name
74         self.contents = []
75         pass
76
77     def _find_one(self, spec_or_id=None, *args):
78         if spec_or_id is not None and not isinstance(spec_or_id, dict):
79             spec_or_id = {"_id": spec_or_id}
80         if '_id' in spec_or_id:
81             spec_or_id['_id'] = str(spec_or_id['_id'])
82         cursor = self._find(spec_or_id, *args)
83         for result in cursor:
84             return result
85         return None
86
87     def find_one(self, spec_or_id=None, *args):
88         return thread_execute(self._find_one, spec_or_id, *args)
89
90     def _insert(self, doc_or_docs, check_keys=True):
91
92         docs = doc_or_docs
93         return_one = False
94         if isinstance(docs, dict):
95             return_one = True
96             docs = [docs]
97
98         if check_keys:
99             for doc in docs:
100                 self._check_keys(doc)
101
102         ids = []
103         for doc in docs:
104             if '_id' not in doc:
105                 doc['_id'] = str(ObjectId())
106             if not self._find_one(doc['_id']):
107                 ids.append(doc['_id'])
108                 self.contents.append(doc_or_docs)
109
110         if len(ids) == 0:
111             return None
112         if return_one:
113             return ids[0]
114         else:
115             return ids
116
117     def insert(self, doc_or_docs, check_keys=True):
118         return thread_execute(self._insert, doc_or_docs, check_keys)
119
120     @staticmethod
121     def _compare_date(spec, value):
122         for k, v in spec.iteritems():
123             if k == '$gte' and value >= v:
124                 return True
125         return False
126
127     def _in(self, content, *args):
128         if self.name == 'scenarios':
129             return self._in_scenarios(content, *args)
130         else:
131             return self._in_others(content, *args)
132
133     def _in_scenarios_installer(self, installer, content):
134         hit = False
135         for s_installer in content['installers']:
136             if installer == s_installer['installer']:
137                 hit = True
138
139         return hit
140
141     def _in_scenarios_version(self, version, content):
142         hit = False
143         for s_installer in content['installers']:
144             for s_version in s_installer['versions']:
145                 if version == s_version['version']:
146                     hit = True
147         return hit
148
149     def _in_scenarios_project(self, project, content):
150         hit = False
151         for s_installer in content['installers']:
152             for s_version in s_installer['versions']:
153                 for s_project in s_version['projects']:
154                     if project == s_project['project']:
155                         hit = True
156
157         return hit
158
159     def _in_scenarios(self, content, *args):
160         for arg in args:
161             for k, v in arg.iteritems():
162                 if k == 'installers':
163                     for inner in v.values():
164                         for i_k, i_v in inner.iteritems():
165                             if i_k == 'installer':
166                                 return self._in_scenarios_installer(i_v,
167                                                                     content)
168                             elif i_k == 'versions.version':
169                                 return self._in_scenarios_version(i_v,
170                                                                   content)
171                             elif i_k == 'versions.projects.project':
172                                 return self._in_scenarios_project(i_v,
173                                                                   content)
174                 elif content.get(k, None) != v:
175                     return False
176
177         return True
178
179     def _in_others(self, content, *args):
180         for arg in args:
181             for k, v in arg.iteritems():
182                 if k == 'start_date':
183                     if not MemDb._compare_date(v, content.get(k)):
184                         return False
185                 elif k == 'trust_indicator.current':
186                     if content.get('trust_indicator').get('current') != v:
187                         return False
188                 elif content.get(k, None) != v:
189                     return False
190
191         return True
192
193     def _find(self, *args):
194         res = []
195         for content in self.contents:
196             if self._in(content, *args):
197                 res.append(content)
198
199         return res
200
201     def find(self, *args):
202         return MemCursor(self._find(*args))
203
204     def _aggregate(self, *args, **kwargs):
205         res = self.contents
206         print args
207         for arg in args[0]:
208             for k, v in arg.iteritems():
209                 if k == '$match':
210                     res = self._find(v)
211         cursor = MemCursor(res)
212         for arg in args[0]:
213             for k, v in arg.iteritems():
214                 if k == '$sort':
215                     cursor = cursor.sort(v)
216                 elif k == '$skip':
217                     cursor = cursor.skip(v)
218                 elif k == '$limit':
219                     cursor = cursor.limit(v)
220         return cursor
221
222     def aggregate(self, *args, **kwargs):
223         return self._aggregate(*args, **kwargs)
224
225     def _update(self, spec, document, check_keys=True):
226         updated = False
227
228         if check_keys:
229             self._check_keys(document)
230
231         for index in range(len(self.contents)):
232             content = self.contents[index]
233             if self._in(content, spec):
234                 for k, v in document.iteritems():
235                     updated = True
236                     content[k] = v
237             self.contents[index] = content
238         return updated
239
240     def update(self, spec, document, check_keys=True):
241         return thread_execute(self._update, spec, document, check_keys)
242
243     def _remove(self, spec_or_id=None):
244         if spec_or_id is None:
245             self.contents = []
246         if not isinstance(spec_or_id, dict):
247             spec_or_id = {'_id': spec_or_id}
248         for index in range(len(self.contents)):
249             content = self.contents[index]
250             if self._in(content, spec_or_id):
251                 del self.contents[index]
252                 return True
253         return False
254
255     def remove(self, spec_or_id=None):
256         return thread_execute(self._remove, spec_or_id)
257
258     def clear(self):
259         self._remove()
260
261     def _check_keys(self, doc):
262         for key in doc.keys():
263             if '.' in key:
264                 raise NameError('key {} must not contain .'.format(key))
265             if key.startswith('$'):
266                 raise NameError('key {} must not start with $'.format(key))
267             if isinstance(doc.get(key), dict):
268                 self._check_keys(doc.get(key))
269
270
271 def __getattr__(name):
272     return globals()[name]
273
274
275 pods = MemDb('pods')
276 projects = MemDb('projects')
277 testcases = MemDb('testcases')
278 results = MemDb('results')
279 scenarios = MemDb('scenarios')
280 tokens = MemDb('tokens')