1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
13 def thread_execute(method, *args, **kwargs):
14 with ThreadPoolExecutor(max_workers=2) as executor:
15 result = executor.submit(method, *args, **kwargs)
19 class MemCursor(object):
20 def __init__(self, collection):
21 self.collection = collection
22 self.count = len(self.collection)
24 def _is_next_exist(self):
25 return self.count != 0
29 return thread_execute(self._is_next_exist)
31 def next_object(self):
33 return self.collection.pop()
35 def sort(self, key_or_list, direction=None):
38 def limit(self, limit):
48 def _find_one(self, spec_or_id=None, *args):
49 if spec_or_id is not None and not isinstance(spec_or_id, dict):
50 spec_or_id = {"_id": spec_or_id}
51 if '_id' in spec_or_id:
52 spec_or_id['_id'] = str(spec_or_id['_id'])
53 cursor = self._find(spec_or_id, *args)
58 def find_one(self, spec_or_id=None, *args):
59 return thread_execute(self._find_one, spec_or_id, *args)
61 def _insert(self, doc_or_docs, check_keys=True):
65 if isinstance(docs, dict):
72 doc['_id'] = str(ObjectId())
73 if not check_keys or not self._find_one(doc['_id']):
74 ids.append(doc['_id'])
75 self.contents.append(doc_or_docs)
84 def insert(self, doc_or_docs, check_keys=True):
85 return thread_execute(self._insert, doc_or_docs, check_keys)
88 def _compare_date(spec, value):
89 for k, v in spec.iteritems():
90 if k == '$gte' and value >= v:
95 def _in(content, *args):
97 for k, v in arg.iteritems():
99 if not MemDb._compare_date(v, content.get(k)):
101 elif k == 'trust_indicator':
102 if float(content.get(k)) != float(v):
104 elif content.get(k, None) != v:
109 def _find(self, *args):
111 for content in self.contents:
112 if self._in(content, *args):
117 def find(self, *args):
118 return MemCursor(self._find(*args))
120 def _update(self, spec, document):
122 for index in range(len(self.contents)):
123 content = self.contents[index]
124 if self._in(content, spec):
125 for k, v in document.iteritems():
128 self.contents[index] = content
131 def update(self, spec, document):
132 return thread_execute(self._update, spec, document)
134 def _remove(self, spec_or_id=None):
135 if spec_or_id is None:
137 if not isinstance(spec_or_id, dict):
138 spec_or_id = {'_id': spec_or_id}
139 for index in range(len(self.contents)):
140 content = self.contents[index]
141 if self._in(content, spec_or_id):
142 del self.contents[index]
146 def remove(self, spec_or_id=None):
147 return thread_execute(self._remove, spec_or_id)