3494280fad5ea2d2c713e32782a00ba3d21b2f09
[releng.git] / utils / test / result_collection_api / tests / unit / fake_pymongo.py
1 from bson.objectid import ObjectId
2 from concurrent.futures import ThreadPoolExecutor
3
4
5 __author__ = 'serena'
6
7
8 class MemCursor(object):
9     def __init__(self, collection):
10         self.collection = collection
11         self.count = len(self.collection)
12
13     def _is_next_exist(self):
14         return self.count != 0
15
16     @property
17     def fetch_next(self):
18         with ThreadPoolExecutor(max_workers=2) as executor:
19             result = executor.submit(self._is_next_exist)
20         return result
21
22     def next_object(self):
23         self.count -= 1
24         return self.collection.pop()
25
26
27 class MemDb(object):
28
29     def __init__(self):
30         self.contents = []
31         pass
32
33     def _find_one(self, spec_or_id=None, *args):
34         if spec_or_id is not None and not isinstance(spec_or_id, dict):
35             spec_or_id = {"_id": spec_or_id}
36         if '_id' in spec_or_id:
37             spec_or_id['_id'] = str(spec_or_id['_id'])
38         cursor = self._find(spec_or_id, *args)
39         for result in cursor:
40             return result
41         return None
42
43     def find_one(self, spec_or_id=None, *args):
44         with ThreadPoolExecutor(max_workers=2) as executor:
45             result = executor.submit(self._find_one, spec_or_id, *args)
46         return result
47
48     def _insert(self, doc_or_docs, check_keys=True):
49
50         docs = doc_or_docs
51         return_one = False
52         if isinstance(docs, dict):
53             return_one = True
54             docs = [docs]
55
56         ids = []
57         for doc in docs:
58             if '_id' not in doc:
59                 doc['_id'] = str(ObjectId())
60             if not check_keys or not self._find_one(doc['_id']):
61                 ids.append(doc['_id'])
62                 self.contents.append(doc_or_docs)
63
64         if len(ids) == 0:
65             return None
66         if return_one:
67             return ids[0]
68         else:
69             return ids
70
71     def insert(self, doc_or_docs, check_keys=True):
72         with ThreadPoolExecutor(max_workers=2) as executor:
73             result = executor.submit(self._insert, doc_or_docs, check_keys)
74         return result
75
76     @staticmethod
77     def _in(content, *args):
78         for arg in args:
79             for k, v in arg.iteritems():
80                 if k != 'creation_date' and content.get(k, None) != v:
81                     return False
82
83         return True
84
85     def _find(self, *args):
86         res = []
87         for content in self.contents:
88             if self._in(content, *args):
89                 res.append(content)
90
91         return res
92
93     def find(self, *args):
94         return MemCursor(self._find(*args))
95
96     def _update(self, spec, document):
97         updated = False
98         for index in range(len(self.contents)):
99             content = self.contents[index]
100             if self._in(content, spec):
101                 for k, v in document.iteritems():
102                     updated = True
103                     content[k] = v
104             self.contents[index] = content
105         return updated
106
107     def update(self, spec, document):
108         with ThreadPoolExecutor(max_workers=2) as executor:
109             result = executor.submit(self._update, spec, document)
110         return result
111
112     def _remove(self, spec_or_id=None):
113         if spec_or_id is None:
114             self.contents = []
115         if not isinstance(spec_or_id, dict):
116             spec_or_id = {'_id': spec_or_id}
117         for index in range(len(self.contents)):
118             content = self.contents[index]
119             if self._in(content, spec_or_id):
120                 del self.contents[index]
121                 return True
122         return False
123
124     def remove(self, spec_or_id=None):
125         with ThreadPoolExecutor(max_workers=2) as executor:
126             result = executor.submit(self._remove, spec_or_id)
127         return result
128
129     def clear(self):
130         self._remove()
131
132 pods = MemDb()
133 projects = MemDb()
134 testcases = MemDb()
135 results = MemDb()