Merge "update Readme.rst to be consistent with current implementation"
[releng.git] / utils / test / testapi / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from operator import itemgetter
10
11 from bson.objectid import ObjectId
12 from concurrent.futures import ThreadPoolExecutor
13
14
15 def thread_execute(method, *args, **kwargs):
16         with ThreadPoolExecutor(max_workers=2) as executor:
17             result = executor.submit(method, *args, **kwargs)
18         return result
19
20
21 class MemCursor(object):
22     def __init__(self, collection):
23         self.collection = collection
24         self.length = len(self.collection)
25         self.sorted = []
26
27     def _is_next_exist(self):
28         return self.length != 0
29
30     @property
31     def fetch_next(self):
32         return thread_execute(self._is_next_exist)
33
34     def next_object(self):
35         self.length -= 1
36         return self.collection.pop()
37
38     def sort(self, key_or_list):
39         for k, v in key_or_list.iteritems():
40             if v == -1:
41                 reverse = True
42             else:
43                 reverse = False
44
45             self.collection = sorted(self.collection,
46                                      key=itemgetter(k), reverse=reverse)
47         return self
48
49     def limit(self, limit):
50         if limit != 0 and limit < len(self.collection):
51             self.collection = self.collection[0: limit]
52             self.length = limit
53         return self
54
55     def skip(self, skip):
56         if skip < self.length and (skip > 0):
57             self.collection = self.collection[self.length - skip: -1]
58             self.length -= skip
59         elif skip >= self.length:
60             self.collection = []
61             self.length = 0
62         return self
63
64     def _count(self):
65         return self.length
66
67     def count(self):
68         return thread_execute(self._count)
69
70
71 class MemDb(object):
72
73     def __init__(self, name):
74         self.name = name
75         self.contents = []
76         pass
77
78     def _find_one(self, spec_or_id=None, *args):
79         if spec_or_id is not None and not isinstance(spec_or_id, dict):
80             spec_or_id = {"_id": spec_or_id}
81         if '_id' in spec_or_id:
82             spec_or_id['_id'] = str(spec_or_id['_id'])
83         cursor = self._find(spec_or_id, *args)
84         for result in cursor:
85             return result
86         return None
87
88     def find_one(self, spec_or_id=None, *args):
89         return thread_execute(self._find_one, spec_or_id, *args)
90
91     def _insert(self, doc_or_docs, check_keys=True):
92
93         docs = doc_or_docs
94         return_one = False
95         if isinstance(docs, dict):
96             return_one = True
97             docs = [docs]
98
99         if check_keys:
100             for doc in docs:
101                 self._check_keys(doc)
102
103         ids = []
104         for doc in docs:
105             if '_id' not in doc:
106                 doc['_id'] = str(ObjectId())
107             if not self._find_one(doc['_id']):
108                 ids.append(doc['_id'])
109                 self.contents.append(doc_or_docs)
110
111         if len(ids) == 0:
112             return None
113         if return_one:
114             return ids[0]
115         else:
116             return ids
117
118     def insert(self, doc_or_docs, check_keys=True):
119         return thread_execute(self._insert, doc_or_docs, check_keys)
120
121     @staticmethod
122     def _compare_date(spec, value):
123         gte = True
124         lt = False
125         for k, v in spec.iteritems():
126             if k == '$gte' and value < v:
127                 gte = False
128             elif k == '$lt' and value < v:
129                 lt = True
130         return gte and lt
131
132     def _in(self, content, *args):
133         if self.name == 'scenarios':
134             return self._in_scenarios(content, *args)
135         else:
136             return self._in_others(content, *args)
137
138     def _in_scenarios_installer(self, installer, content):
139         hit = False
140         for s_installer in content['installers']:
141             if installer == s_installer['installer']:
142                 hit = True
143
144         return hit
145
146     def _in_scenarios_version(self, version, content):
147         hit = False
148         for s_installer in content['installers']:
149             for s_version in s_installer['versions']:
150                 if version == s_version['version']:
151                     hit = True
152         return hit
153
154     def _in_scenarios_project(self, project, content):
155         hit = False
156         for s_installer in content['installers']:
157             for s_version in s_installer['versions']:
158                 for s_project in s_version['projects']:
159                     if project == s_project['project']:
160                         hit = True
161
162         return hit
163
164     def _in_scenarios(self, content, *args):
165         for arg in args:
166             for k, v in arg.iteritems():
167                 if k == 'installers':
168                     for inner in v.values():
169                         for i_k, i_v in inner.iteritems():
170                             if i_k == 'installer':
171                                 return self._in_scenarios_installer(i_v,
172                                                                     content)
173                             elif i_k == 'versions.version':
174                                 return self._in_scenarios_version(i_v,
175                                                                   content)
176                             elif i_k == 'versions.projects.project':
177                                 return self._in_scenarios_project(i_v,
178                                                                   content)
179                 elif content.get(k, None) != v:
180                     return False
181
182         return True
183
184     def _in_others(self, content, *args):
185         for arg in args:
186             for k, v in arg.iteritems():
187                 if k == 'start_date':
188                     if not MemDb._compare_date(v, content.get(k)):
189                         return False
190                 elif k == 'trust_indicator.current':
191                     if content.get('trust_indicator').get('current') != v:
192                         return False
193                 elif not isinstance(v, dict) and content.get(k, None) != v:
194                     return False
195         return True
196
197     def _find(self, *args):
198         res = []
199         for content in self.contents:
200             if self._in(content, *args):
201                 res.append(content)
202
203         return res
204
205     def find(self, *args):
206         return MemCursor(self._find(*args))
207
208     def _aggregate(self, *args, **kwargs):
209         res = self.contents
210         print args
211         for arg in args[0]:
212             for k, v in arg.iteritems():
213                 if k == '$match':
214                     res = self._find(v)
215         cursor = MemCursor(res)
216         for arg in args[0]:
217             for k, v in arg.iteritems():
218                 if k == '$sort':
219                     cursor = cursor.sort(v)
220                 elif k == '$skip':
221                     cursor = cursor.skip(v)
222                 elif k == '$limit':
223                     cursor = cursor.limit(v)
224         return cursor
225
226     def aggregate(self, *args, **kwargs):
227         return self._aggregate(*args, **kwargs)
228
229     def _update(self, spec, document, check_keys=True):
230         updated = False
231
232         if check_keys:
233             self._check_keys(document)
234
235         for index in range(len(self.contents)):
236             content = self.contents[index]
237             if self._in(content, spec):
238                 for k, v in document.iteritems():
239                     updated = True
240                     content[k] = v
241             self.contents[index] = content
242         return updated
243
244     def update(self, spec, document, check_keys=True):
245         return thread_execute(self._update, spec, document, check_keys)
246
247     def _remove(self, spec_or_id=None):
248         if spec_or_id is None:
249             self.contents = []
250         if not isinstance(spec_or_id, dict):
251             spec_or_id = {'_id': spec_or_id}
252         for index in range(len(self.contents)):
253             content = self.contents[index]
254             if self._in(content, spec_or_id):
255                 del self.contents[index]
256                 return True
257         return False
258
259     def remove(self, spec_or_id=None):
260         return thread_execute(self._remove, spec_or_id)
261
262     def clear(self):
263         self._remove()
264
265     def _check_keys(self, doc):
266         for key in doc.keys():
267             if '.' in key:
268                 raise NameError('key {} must not contain .'.format(key))
269             if key.startswith('$'):
270                 raise NameError('key {} must not start with $'.format(key))
271             if isinstance(doc.get(key), dict):
272                 self._check_keys(doc.get(key))
273
274
275 def __getattr__(name):
276     return globals()[name]
277
278
279 pods = MemDb('pods')
280 projects = MemDb('projects')
281 testcases = MemDb('testcases')
282 results = MemDb('results')
283 scenarios = MemDb('scenarios')
284 tokens = MemDb('tokens')