project-ize testAPI
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
1 from bson.objectid import ObjectId
2 from concurrent.futures import ThreadPoolExecutor
3
4
5 __author__ = 'serena'
6
7
8 def thread_execute(method, *args, **kwargs):
9         with ThreadPoolExecutor(max_workers=2) as executor:
10             result = executor.submit(method, *args, **kwargs)
11         return result
12
13
14 class MemCursor(object):
15     def __init__(self, collection):
16         self.collection = collection
17         self.count = len(self.collection)
18
19     def _is_next_exist(self):
20         return self.count != 0
21
22     @property
23     def fetch_next(self):
24         return thread_execute(self._is_next_exist)
25
26     def next_object(self):
27         self.count -= 1
28         return self.collection.pop()
29
30
31 class MemDb(object):
32
33     def __init__(self):
34         self.contents = []
35         pass
36
37     def _find_one(self, spec_or_id=None, *args):
38         if spec_or_id is not None and not isinstance(spec_or_id, dict):
39             spec_or_id = {"_id": spec_or_id}
40         if '_id' in spec_or_id:
41             spec_or_id['_id'] = str(spec_or_id['_id'])
42         cursor = self._find(spec_or_id, *args)
43         for result in cursor:
44             return result
45         return None
46
47     def find_one(self, spec_or_id=None, *args):
48         return thread_execute(self._find_one, spec_or_id, *args)
49
50     def _insert(self, doc_or_docs, check_keys=True):
51
52         docs = doc_or_docs
53         return_one = False
54         if isinstance(docs, dict):
55             return_one = True
56             docs = [docs]
57
58         ids = []
59         for doc in docs:
60             if '_id' not in doc:
61                 doc['_id'] = str(ObjectId())
62             if not check_keys or not self._find_one(doc['_id']):
63                 ids.append(doc['_id'])
64                 self.contents.append(doc_or_docs)
65
66         if len(ids) == 0:
67             return None
68         if return_one:
69             return ids[0]
70         else:
71             return ids
72
73     def insert(self, doc_or_docs, check_keys=True):
74         return thread_execute(self._insert, doc_or_docs, check_keys)
75
76     @staticmethod
77     def _compare_date(spec, value):
78         for k, v in spec.iteritems():
79             if k == '$gte' and value >= v:
80                 return True
81         return False
82
83     @staticmethod
84     def _in(content, *args):
85         for arg in args:
86             for k, v in arg.iteritems():
87                 if k == 'start_date':
88                     if not MemDb._compare_date(v, content.get(k)):
89                         return False
90                 elif k == 'trust_indicator':
91                     if float(content.get(k)) != float(v):
92                         return False
93                 elif content.get(k, None) != v:
94                     return False
95
96         return True
97
98     def _find(self, *args):
99         res = []
100         for content in self.contents:
101             if self._in(content, *args):
102                 res.append(content)
103
104         return res
105
106     def find(self, *args):
107         return MemCursor(self._find(*args))
108
109     def _update(self, spec, document):
110         updated = False
111         for index in range(len(self.contents)):
112             content = self.contents[index]
113             if self._in(content, spec):
114                 for k, v in document.iteritems():
115                     updated = True
116                     content[k] = v
117             self.contents[index] = content
118         return updated
119
120     def update(self, spec, document):
121         return thread_execute(self._update, spec, document)
122
123     def _remove(self, spec_or_id=None):
124         if spec_or_id is None:
125             self.contents = []
126         if not isinstance(spec_or_id, dict):
127             spec_or_id = {'_id': spec_or_id}
128         for index in range(len(self.contents)):
129             content = self.contents[index]
130             if self._in(content, spec_or_id):
131                 del self.contents[index]
132                 return True
133         return False
134
135     def remove(self, spec_or_id=None):
136         return thread_execute(self._remove, spec_or_id)
137
138     def clear(self):
139         self._remove()
140
141 pods = MemDb()
142 projects = MemDb()
143 testcases = MemDb()
144 results = MemDb()