add test result/dashboard related unittests in testAPI and refactor its response
[releng.git] / utils / test / result_collection_api / tests / unit / fake_pymongo.py
1 from bson.objectid import ObjectId
2 from concurrent.futures import ThreadPoolExecutor
3
4
5 __author__ = 'serena'
6
7
8 class MemCursor(object):
9     def __init__(self, collection):
10         self.collection = collection
11         self.count = len(self.collection)
12
13     def _is_next_exist(self):
14         return self.count != 0
15
16     @property
17     def fetch_next(self):
18         with ThreadPoolExecutor(max_workers=2) as executor:
19             result = executor.submit(self._is_next_exist)
20         return result
21
22     def next_object(self):
23         self.count -= 1
24         return self.collection.pop()
25
26
27 class MemDb(object):
28
29     def __init__(self):
30         self.contents = []
31         pass
32
33     def _find_one(self, spec_or_id=None, *args):
34         if spec_or_id is not None and not isinstance(spec_or_id, dict):
35             spec_or_id = {"_id": spec_or_id}
36         cursor = self._find(spec_or_id, *args)
37         for result in cursor:
38             return result
39         return None
40
41     def find_one(self, spec_or_id=None, *args):
42         with ThreadPoolExecutor(max_workers=2) as executor:
43             result = executor.submit(self._find_one, spec_or_id, *args)
44         return result
45
46     def _insert(self, doc_or_docs, check_keys=True):
47
48         docs = doc_or_docs
49         return_one = False
50         if isinstance(docs, dict):
51             return_one = True
52             docs = [docs]
53
54         ids = []
55         for doc in docs:
56             if '_id' not in doc:
57                 doc['_id'] = str(ObjectId())
58             if not check_keys or not self._find_one(doc['_id']):
59                 ids.append(doc['_id'])
60                 self.contents.append(doc_or_docs)
61
62         if len(ids) == 0:
63             return None
64         if return_one:
65             return ids[0]
66         else:
67             return ids
68
69     def insert(self, doc_or_docs, check_keys=True):
70         with ThreadPoolExecutor(max_workers=2) as executor:
71             result = executor.submit(self._insert, doc_or_docs, check_keys)
72         return result
73
74     @staticmethod
75     def _in(content, *args):
76         for arg in args:
77             for k, v in arg.iteritems():
78                 if k != 'creation_date' and content.get(k, None) != v:
79                     return False
80
81         return True
82
83     def _find(self, *args):
84         res = []
85         for content in self.contents:
86             if self._in(content, *args):
87                 res.append(content)
88
89         return res
90
91     def find(self, *args):
92         return MemCursor(self._find(*args))
93
94     def _update(self, spec, document):
95         updated = False
96         for index in range(len(self.contents)):
97             content = self.contents[index]
98             if self._in(content, spec):
99                 for k, v in document.iteritems():
100                     updated = True
101                     content[k] = v
102             self.contents[index] = content
103         return updated
104
105     def update(self, spec, document):
106         with ThreadPoolExecutor(max_workers=2) as executor:
107             result = executor.submit(self._update, spec, document)
108         return result
109
110     def _remove(self, spec_or_id=None):
111         if spec_or_id is None:
112             self.contents = []
113         if not isinstance(spec_or_id, dict):
114             spec_or_id = {'_id': spec_or_id}
115         for index in range(len(self.contents)):
116             content = self.contents[index]
117             if self._in(content, spec_or_id):
118                 del self.contents[index]
119                 return True
120         return False
121
122     def remove(self, spec_or_id=None):
123         with ThreadPoolExecutor(max_workers=2) as executor:
124             result = executor.submit(self._remove, spec_or_id)
125         return result
126
127     def clear(self):
128         self._remove()
129
130 pods = MemDb()
131 projects = MemDb()
132 testcases = MemDb()
133 results = MemDb()