1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
14 def thread_execute(method, *args, **kwargs):
15 with ThreadPoolExecutor(max_workers=2) as executor:
16 result = executor.submit(method, *args, **kwargs)
20 class MemCursor(object):
21 def __init__(self, collection):
22 self.collection = collection
23 self.length = len(self.collection)
26 def _is_next_exist(self):
27 return self.length != 0
31 return thread_execute(self._is_next_exist)
33 def next_object(self):
35 return self.collection.pop()
37 def sort(self, key_or_list):
38 for k, v in key_or_list.iteritems():
44 self.collection = sorted(self.collection,
45 key=itemgetter(k), reverse=reverse)
48 def limit(self, limit):
49 if limit != 0 and limit < len(self.collection):
50 self.collection = self.collection[0: limit]
55 if skip < self.length and (skip > 0):
56 self.collection = self.collection[self.length - skip: -1]
58 elif skip >= self.length:
67 return thread_execute(self._count)
72 def __init__(self, name):
77 def _find_one(self, spec_or_id=None, *args):
78 if spec_or_id is not None and not isinstance(spec_or_id, dict):
79 spec_or_id = {"_id": spec_or_id}
80 if '_id' in spec_or_id:
81 spec_or_id['_id'] = str(spec_or_id['_id'])
82 cursor = self._find(spec_or_id, *args)
87 def find_one(self, spec_or_id=None, *args):
88 return thread_execute(self._find_one, spec_or_id, *args)
90 def _insert(self, doc_or_docs, check_keys=True):
94 if isinstance(docs, dict):
100 self._check_keys(doc)
105 doc['_id'] = str(ObjectId())
106 if not self._find_one(doc['_id']):
107 ids.append(doc['_id'])
108 self.contents.append(doc_or_docs)
117 def insert(self, doc_or_docs, check_keys=True):
118 return thread_execute(self._insert, doc_or_docs, check_keys)
121 def _compare_date(spec, value):
122 for k, v in spec.iteritems():
123 if k == '$gte' and value >= v:
127 def _in(self, content, *args):
128 if self.name == 'scenarios':
129 return self._in_scenarios(content, *args)
131 return self._in_others(content, *args)
133 def _in_scenarios_installer(self, installer, content):
135 for s_installer in content['installers']:
136 if installer == s_installer['installer']:
141 def _in_scenarios_version(self, version, content):
143 for s_installer in content['installers']:
144 for s_version in s_installer['versions']:
145 if version == s_version['version']:
149 def _in_scenarios_project(self, project, content):
151 for s_installer in content['installers']:
152 for s_version in s_installer['versions']:
153 for s_project in s_version['projects']:
154 if project == s_project['project']:
159 def _in_scenarios(self, content, *args):
161 for k, v in arg.iteritems():
162 if k == 'installers':
163 for inner in v.values():
164 for i_k, i_v in inner.iteritems():
165 if i_k == 'installer':
166 return self._in_scenarios_installer(i_v,
168 elif i_k == 'versions.version':
169 return self._in_scenarios_version(i_v,
171 elif i_k == 'versions.projects.project':
172 return self._in_scenarios_project(i_v,
174 elif content.get(k, None) != v:
179 def _in_others(self, content, *args):
181 for k, v in arg.iteritems():
182 if k == 'start_date':
183 if not MemDb._compare_date(v, content.get(k)):
185 elif k == 'trust_indicator.current':
186 if content.get('trust_indicator').get('current') != v:
188 elif content.get(k, None) != v:
193 def _find(self, *args):
195 for content in self.contents:
196 if self._in(content, *args):
201 def find(self, *args):
202 return MemCursor(self._find(*args))
204 def _aggregate(self, *args, **kwargs):
208 for k, v in arg.iteritems():
211 cursor = MemCursor(res)
213 for k, v in arg.iteritems():
215 cursor = cursor.sort(v)
217 cursor = cursor.skip(v)
219 cursor = cursor.limit(v)
222 def aggregate(self, *args, **kwargs):
223 return self._aggregate(*args, **kwargs)
225 def _update(self, spec, document, check_keys=True):
229 self._check_keys(document)
231 for index in range(len(self.contents)):
232 content = self.contents[index]
233 if self._in(content, spec):
234 for k, v in document.iteritems():
237 self.contents[index] = content
240 def update(self, spec, document, check_keys=True):
241 return thread_execute(self._update, spec, document, check_keys)
243 def _remove(self, spec_or_id=None):
244 if spec_or_id is None:
246 if not isinstance(spec_or_id, dict):
247 spec_or_id = {'_id': spec_or_id}
248 for index in range(len(self.contents)):
249 content = self.contents[index]
250 if self._in(content, spec_or_id):
251 del self.contents[index]
255 def remove(self, spec_or_id=None):
256 return thread_execute(self._remove, spec_or_id)
261 def _check_keys(self, doc):
262 for key in doc.keys():
264 raise NameError('key {} must not contain .'.format(key))
265 if key.startswith('$'):
266 raise NameError('key {} must not start with $'.format(key))
267 if isinstance(doc.get(key), dict):
268 self._check_keys(doc.get(key))
271 def __getattr__(name):
272 return globals()[name]
276 projects = MemDb('projects')
277 testcases = MemDb('testcases')
278 results = MemDb('results')
279 scenarios = MemDb('scenarios')
280 tokens = MemDb('tokens')