Merge "[docker] Use image IDs instead of tags"
[releng.git] / utils / test / testapi / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
12
13
14 def thread_execute(method, *args, **kwargs):
15         with ThreadPoolExecutor(max_workers=2) as executor:
16             result = executor.submit(method, *args, **kwargs)
17         return result
18
19
20 class MemCursor(object):
21     def __init__(self, collection):
22         self.collection = collection
23         self.count = len(self.collection)
24         self.sorted = []
25
26     def _is_next_exist(self):
27         return self.count != 0
28
29     @property
30     def fetch_next(self):
31         return thread_execute(self._is_next_exist)
32
33     def next_object(self):
34         self.count -= 1
35         return self.collection.pop()
36
37     def sort(self, key_or_list):
38         key = key_or_list[0][0]
39         if key_or_list[0][1] == -1:
40             reverse = True
41         else:
42             reverse = False
43
44         if key_or_list is not None:
45             self.collection = sorted(self.collection,
46                                      key=itemgetter(key), reverse=reverse)
47         return self
48
49     def limit(self, limit):
50         if limit != 0 and limit < len(self.collection):
51             self.collection = self.collection[0:limit]
52             self.count = limit
53         return self
54
55
56 class MemDb(object):
57
58     def __init__(self, name):
59         self.name = name
60         self.contents = []
61         pass
62
63     def _find_one(self, spec_or_id=None, *args):
64         if spec_or_id is not None and not isinstance(spec_or_id, dict):
65             spec_or_id = {"_id": spec_or_id}
66         if '_id' in spec_or_id:
67             spec_or_id['_id'] = str(spec_or_id['_id'])
68         cursor = self._find(spec_or_id, *args)
69         for result in cursor:
70             return result
71         return None
72
73     def find_one(self, spec_or_id=None, *args):
74         return thread_execute(self._find_one, spec_or_id, *args)
75
76     def _insert(self, doc_or_docs, check_keys=True):
77
78         docs = doc_or_docs
79         return_one = False
80         if isinstance(docs, dict):
81             return_one = True
82             docs = [docs]
83
84         if check_keys:
85             for doc in docs:
86                 self._check_keys(doc)
87
88         ids = []
89         for doc in docs:
90             if '_id' not in doc:
91                 doc['_id'] = str(ObjectId())
92             if not self._find_one(doc['_id']):
93                 ids.append(doc['_id'])
94                 self.contents.append(doc_or_docs)
95
96         if len(ids) == 0:
97             return None
98         if return_one:
99             return ids[0]
100         else:
101             return ids
102
103     def insert(self, doc_or_docs, check_keys=True):
104         return thread_execute(self._insert, doc_or_docs, check_keys)
105
106     @staticmethod
107     def _compare_date(spec, value):
108         for k, v in spec.iteritems():
109             if k == '$gte' and value >= v:
110                 return True
111         return False
112
113     def _in(self, content, *args):
114         if self.name == 'scenarios':
115             return self._in_scenarios(content, *args)
116         else:
117             return self._in_others(content, *args)
118
119     def _in_scenarios_installer(self, installer, content):
120         hit = False
121         for s_installer in content['installers']:
122             if installer == s_installer['installer']:
123                 hit = True
124
125         return hit
126
127     def _in_scenarios_version(self, version, content):
128         hit = False
129         for s_installer in content['installers']:
130             for s_version in s_installer['versions']:
131                 if version == s_version['version']:
132                     hit = True
133         return hit
134
135     def _in_scenarios_project(self, project, content):
136         hit = False
137         for s_installer in content['installers']:
138             for s_version in s_installer['versions']:
139                 for s_project in s_version['projects']:
140                     if project == s_project['project']:
141                         hit = True
142
143         return hit
144
145     def _in_scenarios(self, content, *args):
146         for arg in args:
147             for k, v in arg.iteritems():
148                 if k == 'installers':
149                     for inner in v.values():
150                         for i_k, i_v in inner.iteritems():
151                             if i_k == 'installer':
152                                 return self._in_scenarios_installer(i_v,
153                                                                     content)
154                             elif i_k == 'versions.version':
155                                 return self._in_scenarios_version(i_v,
156                                                                   content)
157                             elif i_k == 'versions.projects.project':
158                                 return self._in_scenarios_project(i_v,
159                                                                   content)
160                 elif content.get(k, None) != v:
161                     return False
162
163         return True
164
165     def _in_others(self, content, *args):
166         for arg in args:
167             for k, v in arg.iteritems():
168                 if k == 'start_date':
169                     if not MemDb._compare_date(v, content.get(k)):
170                         return False
171                 elif k == 'trust_indicator.current':
172                     if content.get('trust_indicator').get('current') != v:
173                         return False
174                 elif content.get(k, None) != v:
175                     return False
176
177         return True
178
179     def _find(self, *args):
180         res = []
181         for content in self.contents:
182             if self._in(content, *args):
183                 res.append(content)
184
185         return res
186
187     def find(self, *args):
188         return MemCursor(self._find(*args))
189
190     def _update(self, spec, document, check_keys=True):
191         updated = False
192
193         if check_keys:
194             self._check_keys(document)
195
196         for index in range(len(self.contents)):
197             content = self.contents[index]
198             if self._in(content, spec):
199                 for k, v in document.iteritems():
200                     updated = True
201                     content[k] = v
202             self.contents[index] = content
203         return updated
204
205     def update(self, spec, document, check_keys=True):
206         return thread_execute(self._update, spec, document, check_keys)
207
208     def _remove(self, spec_or_id=None):
209         if spec_or_id is None:
210             self.contents = []
211         if not isinstance(spec_or_id, dict):
212             spec_or_id = {'_id': spec_or_id}
213         for index in range(len(self.contents)):
214             content = self.contents[index]
215             if self._in(content, spec_or_id):
216                 del self.contents[index]
217                 return True
218         return False
219
220     def remove(self, spec_or_id=None):
221         return thread_execute(self._remove, spec_or_id)
222
223     def clear(self):
224         self._remove()
225
226     def _check_keys(self, doc):
227         for key in doc.keys():
228             if '.' in key:
229                 raise NameError('key {} must not contain .'.format(key))
230             if key.startswith('$'):
231                 raise NameError('key {} must not start with $'.format(key))
232             if isinstance(doc.get(key), dict):
233                 self._check_keys(doc.get(key))
234
235
236 def __getattr__(name):
237     return globals()[name]
238
239
240 pods = MemDb('pods')
241 projects = MemDb('projects')
242 testcases = MemDb('testcases')
243 results = MemDb('results')
244 scenarios = MemDb('scenarios')
245 tokens = MemDb('tokens')