1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
14 def thread_execute(method, *args, **kwargs):
15 with ThreadPoolExecutor(max_workers=2) as executor:
16 result = executor.submit(method, *args, **kwargs)
20 class MemCursor(object):
21 def __init__(self, collection):
22 self.collection = collection
23 self.count = len(self.collection)
26 def _is_next_exist(self):
27 return self.count != 0
31 return thread_execute(self._is_next_exist)
33 def next_object(self):
35 return self.collection.pop()
37 def sort(self, key_or_list):
38 key = key_or_list[0][0]
39 if key_or_list[0][1] == -1:
44 if key_or_list is not None:
45 self.collection = sorted(self.collection,
46 key=itemgetter(key), reverse=reverse)
49 def limit(self, limit):
50 if limit != 0 and limit < len(self.collection):
51 self.collection = self.collection[0:limit]
62 def _find_one(self, spec_or_id=None, *args):
63 if spec_or_id is not None and not isinstance(spec_or_id, dict):
64 spec_or_id = {"_id": spec_or_id}
65 if '_id' in spec_or_id:
66 spec_or_id['_id'] = str(spec_or_id['_id'])
67 cursor = self._find(spec_or_id, *args)
72 def find_one(self, spec_or_id=None, *args):
73 return thread_execute(self._find_one, spec_or_id, *args)
75 def _insert(self, doc_or_docs, check_keys=True):
79 if isinstance(docs, dict):
90 doc['_id'] = str(ObjectId())
91 if not self._find_one(doc['_id']):
92 ids.append(doc['_id'])
93 self.contents.append(doc_or_docs)
102 def insert(self, doc_or_docs, check_keys=True):
103 return thread_execute(self._insert, doc_or_docs, check_keys)
106 def _compare_date(spec, value):
107 for k, v in spec.iteritems():
108 if k == '$gte' and value >= v:
113 def _in(content, *args):
115 for k, v in arg.iteritems():
116 if k == 'start_date':
117 if not MemDb._compare_date(v, content.get(k)):
119 elif k == 'trust_indicator.current':
120 if content.get('trust_indicator').get('current') != v:
122 elif content.get(k, None) != v:
127 def _find(self, *args):
129 for content in self.contents:
130 if self._in(content, *args):
135 def find(self, *args):
136 return MemCursor(self._find(*args))
138 def _update(self, spec, document, check_keys=True):
142 self._check_keys(document)
144 for index in range(len(self.contents)):
145 content = self.contents[index]
146 if self._in(content, spec):
147 for k, v in document.iteritems():
150 self.contents[index] = content
153 def update(self, spec, document, check_keys=True):
154 return thread_execute(self._update, spec, document, check_keys)
156 def _remove(self, spec_or_id=None):
157 if spec_or_id is None:
159 if not isinstance(spec_or_id, dict):
160 spec_or_id = {'_id': spec_or_id}
161 for index in range(len(self.contents)):
162 content = self.contents[index]
163 if self._in(content, spec_or_id):
164 del self.contents[index]
168 def remove(self, spec_or_id=None):
169 return thread_execute(self._remove, spec_or_id)
174 def _check_keys(self, doc):
175 for key in doc.keys():
177 raise NameError('key {} must not contain .'.format(key))
178 if key.startswith('$'):
179 raise NameError('key {} must not start with $'.format(key))
180 if isinstance(doc.get(key), dict):
181 self._check_keys(doc.get(key))
184 def __getattr__(name):
185 return globals()[name]