1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
11 from operator import itemgetter
13 from bson.objectid import ObjectId
14 from concurrent.futures import ThreadPoolExecutor
17 def thread_execute(method, *args, **kwargs):
18 with ThreadPoolExecutor(max_workers=2) as executor:
19 result = executor.submit(method, *args, **kwargs)
23 class MemCursor(object):
24 def __init__(self, collection):
25 self.collection = collection
26 self.length = len(self.collection)
29 def _is_next_exist(self):
30 return self.length != 0
34 return thread_execute(self._is_next_exist)
36 def next_object(self):
38 return self.collection.pop()
40 def sort(self, key_or_list):
41 for k, v in key_or_list.iteritems():
47 self.collection = sorted(self.collection,
48 key=itemgetter(k), reverse=reverse)
51 def limit(self, limit):
52 if limit != 0 and limit < len(self.collection):
53 self.collection = self.collection[0: limit]
58 if skip < self.length and (skip > 0):
59 self.collection = self.collection[self.length - skip: -1]
61 elif skip >= self.length:
70 return thread_execute(self._count)
75 def __init__(self, name):
80 def _find_one(self, spec_or_id=None, *args):
81 if spec_or_id is not None and not isinstance(spec_or_id, dict):
82 spec_or_id = {"_id": spec_or_id}
83 if '_id' in spec_or_id:
84 spec_or_id['_id'] = str(spec_or_id['_id'])
85 cursor = self._find(spec_or_id, *args)
90 def find_one(self, spec_or_id=None, *args):
91 return thread_execute(self._find_one, spec_or_id, *args)
93 def _insert(self, doc_or_docs, check_keys=True):
97 if isinstance(docs, dict):
103 self._check_keys(doc)
108 doc['_id'] = str(ObjectId())
109 if not self._find_one(doc['_id']):
110 ids.append(doc['_id'])
111 self.contents.append(doc_or_docs)
120 def insert(self, doc_or_docs, check_keys=True):
121 return thread_execute(self._insert, doc_or_docs, check_keys)
124 def _compare_date(spec, value):
127 for k, v in spec.iteritems():
128 if k == '$gte' and value < v:
130 elif k == '$lt' and value < v:
134 def _in(self, content, *args):
135 if self.name == 'scenarios':
136 return self._in_scenarios(content, *args)
138 return self._in_others(content, *args)
140 def _in_scenarios_installer(self, installer, content):
142 for s_installer in content['installers']:
143 if installer == s_installer['installer']:
148 def _in_scenarios_version(self, version, content):
150 for s_installer in content['installers']:
151 for s_version in s_installer['versions']:
152 if version == s_version['version']:
156 def _in_scenarios_project(self, project, content):
158 for s_installer in content['installers']:
159 for s_version in s_installer['versions']:
160 for s_project in s_version['projects']:
161 if project == s_project['project']:
166 def _in_scenarios(self, content, *args):
168 for k, v in arg.iteritems():
169 if k == 'installers':
170 for inner in v.values():
171 for i_k, i_v in inner.iteritems():
172 if i_k == 'installer':
173 return self._in_scenarios_installer(i_v,
175 elif i_k == 'versions.version':
176 return self._in_scenarios_version(i_v,
178 elif i_k == 'versions.projects.project':
179 return self._in_scenarios_project(i_v,
181 elif content.get(k, None) != v:
186 def _in_others(self, content, *args):
188 for k, v in arg.iteritems():
189 if k == 'start_date':
190 if not MemDb._compare_date(v, content.get(k)):
192 elif k == 'trust_indicator.current':
193 if content.get('trust_indicator').get('current') != v:
195 elif not isinstance(v, dict):
196 if isinstance(v, re._pattern_type):
197 if v.match(content.get(k, None)) is None:
200 if content.get(k, None) != v:
204 def _find(self, *args):
206 for content in self.contents:
207 if self._in(content, *args):
211 def find(self, *args):
212 return MemCursor(self._find(*args))
214 def _aggregate(self, *args, **kwargs):
218 for k, v in arg.iteritems():
221 cursor = MemCursor(res)
223 for k, v in arg.iteritems():
225 cursor = cursor.sort(v)
227 cursor = cursor.skip(v)
229 cursor = cursor.limit(v)
232 def aggregate(self, *args, **kwargs):
233 return self._aggregate(*args, **kwargs)
235 def _update(self, spec, document, check_keys=True):
239 self._check_keys(document)
241 for index in range(len(self.contents)):
242 content = self.contents[index]
243 if self._in(content, spec):
244 for k, v in document.iteritems():
247 self.contents[index] = content
250 def update(self, spec, document, check_keys=True):
251 return thread_execute(self._update, spec, document, check_keys)
253 def _remove(self, spec_or_id=None):
254 if spec_or_id is None:
256 if not isinstance(spec_or_id, dict):
257 spec_or_id = {'_id': spec_or_id}
258 for index in range(len(self.contents)):
259 content = self.contents[index]
260 if self._in(content, spec_or_id):
261 del self.contents[index]
265 def remove(self, spec_or_id=None):
266 return thread_execute(self._remove, spec_or_id)
271 def _check_keys(self, doc):
272 for key in doc.keys():
274 raise NameError('key {} must not contain .'.format(key))
275 if key.startswith('$'):
276 raise NameError('key {} must not start with $'.format(key))
277 if isinstance(doc.get(key), dict):
278 self._check_keys(doc.get(key))
281 def __getattr__(name):
282 return globals()[name]
286 projects = MemDb('projects')
287 testcases = MemDb('testcases')
288 results = MemDb('results')
289 scenarios = MemDb('scenarios')
290 tokens = MemDb('tokens')