Merge "Apex: Add jinja2 package check and install"
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11
12
13 def thread_execute(method, *args, **kwargs):
14         with ThreadPoolExecutor(max_workers=2) as executor:
15             result = executor.submit(method, *args, **kwargs)
16         return result
17
18
19 class MemCursor(object):
20     def __init__(self, collection):
21         self.collection = collection
22         self.count = len(self.collection)
23
24     def _is_next_exist(self):
25         return self.count != 0
26
27     @property
28     def fetch_next(self):
29         return thread_execute(self._is_next_exist)
30
31     def next_object(self):
32         self.count -= 1
33         return self.collection.pop()
34
35
36 class MemDb(object):
37
38     def __init__(self):
39         self.contents = []
40         pass
41
42     def _find_one(self, spec_or_id=None, *args):
43         if spec_or_id is not None and not isinstance(spec_or_id, dict):
44             spec_or_id = {"_id": spec_or_id}
45         if '_id' in spec_or_id:
46             spec_or_id['_id'] = str(spec_or_id['_id'])
47         cursor = self._find(spec_or_id, *args)
48         for result in cursor:
49             return result
50         return None
51
52     def find_one(self, spec_or_id=None, *args):
53         return thread_execute(self._find_one, spec_or_id, *args)
54
55     def _insert(self, doc_or_docs, check_keys=True):
56
57         docs = doc_or_docs
58         return_one = False
59         if isinstance(docs, dict):
60             return_one = True
61             docs = [docs]
62
63         ids = []
64         for doc in docs:
65             if '_id' not in doc:
66                 doc['_id'] = str(ObjectId())
67             if not check_keys or not self._find_one(doc['_id']):
68                 ids.append(doc['_id'])
69                 self.contents.append(doc_or_docs)
70
71         if len(ids) == 0:
72             return None
73         if return_one:
74             return ids[0]
75         else:
76             return ids
77
78     def insert(self, doc_or_docs, check_keys=True):
79         return thread_execute(self._insert, doc_or_docs, check_keys)
80
81     @staticmethod
82     def _compare_date(spec, value):
83         for k, v in spec.iteritems():
84             if k == '$gte' and value >= v:
85                 return True
86         return False
87
88     @staticmethod
89     def _in(content, *args):
90         for arg in args:
91             for k, v in arg.iteritems():
92                 if k == 'start_date':
93                     if not MemDb._compare_date(v, content.get(k)):
94                         return False
95                 elif k == 'trust_indicator':
96                     if float(content.get(k)) != float(v):
97                         return False
98                 elif content.get(k, None) != v:
99                     return False
100
101         return True
102
103     def _find(self, *args):
104         res = []
105         for content in self.contents:
106             if self._in(content, *args):
107                 res.append(content)
108
109         return res
110
111     def find(self, *args):
112         return MemCursor(self._find(*args))
113
114     def _update(self, spec, document):
115         updated = False
116         for index in range(len(self.contents)):
117             content = self.contents[index]
118             if self._in(content, spec):
119                 for k, v in document.iteritems():
120                     updated = True
121                     content[k] = v
122             self.contents[index] = content
123         return updated
124
125     def update(self, spec, document):
126         return thread_execute(self._update, spec, document)
127
128     def _remove(self, spec_or_id=None):
129         if spec_or_id is None:
130             self.contents = []
131         if not isinstance(spec_or_id, dict):
132             spec_or_id = {'_id': spec_or_id}
133         for index in range(len(self.contents)):
134             content = self.contents[index]
135             if self._in(content, spec_or_id):
136                 del self.contents[index]
137                 return True
138         return False
139
140     def remove(self, spec_or_id=None):
141         return thread_execute(self._remove, spec_or_id)
142
143     def clear(self):
144         self._remove()
145
146 pods = MemDb()
147 projects = MemDb()
148 testcases = MemDb()
149 results = MemDb()