Merge "joid: Disable brahmaputra jobs, enable master jobs"
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11
12
13 def thread_execute(method, *args, **kwargs):
14         with ThreadPoolExecutor(max_workers=2) as executor:
15             result = executor.submit(method, *args, **kwargs)
16         return result
17
18
19 class MemCursor(object):
20     def __init__(self, collection):
21         self.collection = collection
22         self.count = len(self.collection)
23
24     def _is_next_exist(self):
25         return self.count != 0
26
27     @property
28     def fetch_next(self):
29         return thread_execute(self._is_next_exist)
30
31     def next_object(self):
32         self.count -= 1
33         return self.collection.pop()
34
35     def sort(self, key_or_list, direction=None):
36         return self
37
38     def limit(self, limit):
39         return self
40
41
42 class MemDb(object):
43
44     def __init__(self):
45         self.contents = []
46         pass
47
48     def _find_one(self, spec_or_id=None, *args):
49         if spec_or_id is not None and not isinstance(spec_or_id, dict):
50             spec_or_id = {"_id": spec_or_id}
51         if '_id' in spec_or_id:
52             spec_or_id['_id'] = str(spec_or_id['_id'])
53         cursor = self._find(spec_or_id, *args)
54         for result in cursor:
55             return result
56         return None
57
58     def find_one(self, spec_or_id=None, *args):
59         return thread_execute(self._find_one, spec_or_id, *args)
60
61     def _insert(self, doc_or_docs, check_keys=True):
62
63         docs = doc_or_docs
64         return_one = False
65         if isinstance(docs, dict):
66             return_one = True
67             docs = [docs]
68
69         ids = []
70         for doc in docs:
71             if '_id' not in doc:
72                 doc['_id'] = str(ObjectId())
73             if not check_keys or not self._find_one(doc['_id']):
74                 ids.append(doc['_id'])
75                 self.contents.append(doc_or_docs)
76
77         if len(ids) == 0:
78             return None
79         if return_one:
80             return ids[0]
81         else:
82             return ids
83
84     def insert(self, doc_or_docs, check_keys=True):
85         return thread_execute(self._insert, doc_or_docs, check_keys)
86
87     @staticmethod
88     def _compare_date(spec, value):
89         for k, v in spec.iteritems():
90             if k == '$gte' and value >= v:
91                 return True
92         return False
93
94     @staticmethod
95     def _in(content, *args):
96         for arg in args:
97             for k, v in arg.iteritems():
98                 if k == 'start_date':
99                     if not MemDb._compare_date(v, content.get(k)):
100                         return False
101                 elif k == 'trust_indicator':
102                     if float(content.get(k)) != float(v):
103                         return False
104                 elif content.get(k, None) != v:
105                     return False
106
107         return True
108
109     def _find(self, *args):
110         res = []
111         for content in self.contents:
112             if self._in(content, *args):
113                 res.append(content)
114
115         return res
116
117     def find(self, *args):
118         return MemCursor(self._find(*args))
119
120     def _update(self, spec, document):
121         updated = False
122         for index in range(len(self.contents)):
123             content = self.contents[index]
124             if self._in(content, spec):
125                 for k, v in document.iteritems():
126                     updated = True
127                     content[k] = v
128             self.contents[index] = content
129         return updated
130
131     def update(self, spec, document):
132         return thread_execute(self._update, spec, document)
133
134     def _remove(self, spec_or_id=None):
135         if spec_or_id is None:
136             self.contents = []
137         if not isinstance(spec_or_id, dict):
138             spec_or_id = {'_id': spec_or_id}
139         for index in range(len(self.contents)):
140             content = self.contents[index]
141             if self._in(content, spec_or_id):
142                 del self.contents[index]
143                 return True
144         return False
145
146     def remove(self, spec_or_id=None):
147         return thread_execute(self._remove, spec_or_id)
148
149     def clear(self):
150         self._remove()
151
152 pods = MemDb()
153 projects = MemDb()
154 testcases = MemDb()
155 results = MemDb()