Fix security issues of eval-s in testapi
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from bson.objectid import ObjectId
10 from concurrent.futures import ThreadPoolExecutor
11 from operator import itemgetter
12
13
14 def thread_execute(method, *args, **kwargs):
15         with ThreadPoolExecutor(max_workers=2) as executor:
16             result = executor.submit(method, *args, **kwargs)
17         return result
18
19
20 class MemCursor(object):
21     def __init__(self, collection):
22         self.collection = collection
23         self.count = len(self.collection)
24         self.sorted = []
25
26     def _is_next_exist(self):
27         return self.count != 0
28
29     @property
30     def fetch_next(self):
31         return thread_execute(self._is_next_exist)
32
33     def next_object(self):
34         self.count -= 1
35         return self.collection.pop()
36
37     def sort(self, key_or_list):
38         key = key_or_list[0][0]
39         if key_or_list[0][1] == -1:
40             reverse = True
41         else:
42             reverse = False
43
44         if key_or_list is not None:
45             self.collection = sorted(self.collection,
46                                      key=itemgetter(key), reverse=reverse)
47         return self
48
49     def limit(self, limit):
50         if limit != 0 and limit < len(self.collection):
51             self.collection = self.collection[0:limit]
52             self.count = limit
53         return self
54
55
56 class MemDb(object):
57
58     def __init__(self):
59         self.contents = []
60         pass
61
62     def _find_one(self, spec_or_id=None, *args):
63         if spec_or_id is not None and not isinstance(spec_or_id, dict):
64             spec_or_id = {"_id": spec_or_id}
65         if '_id' in spec_or_id:
66             spec_or_id['_id'] = str(spec_or_id['_id'])
67         cursor = self._find(spec_or_id, *args)
68         for result in cursor:
69             return result
70         return None
71
72     def find_one(self, spec_or_id=None, *args):
73         return thread_execute(self._find_one, spec_or_id, *args)
74
75     def _insert(self, doc_or_docs, check_keys=True):
76
77         docs = doc_or_docs
78         return_one = False
79         if isinstance(docs, dict):
80             return_one = True
81             docs = [docs]
82
83         if check_keys:
84             for doc in docs:
85                 self._check_keys(doc)
86
87         ids = []
88         for doc in docs:
89             if '_id' not in doc:
90                 doc['_id'] = str(ObjectId())
91             if not self._find_one(doc['_id']):
92                 ids.append(doc['_id'])
93                 self.contents.append(doc_or_docs)
94
95         if len(ids) == 0:
96             return None
97         if return_one:
98             return ids[0]
99         else:
100             return ids
101
102     def insert(self, doc_or_docs, check_keys=True):
103         return thread_execute(self._insert, doc_or_docs, check_keys)
104
105     @staticmethod
106     def _compare_date(spec, value):
107         for k, v in spec.iteritems():
108             if k == '$gte' and value >= v:
109                 return True
110         return False
111
112     @staticmethod
113     def _in(content, *args):
114         for arg in args:
115             for k, v in arg.iteritems():
116                 if k == 'start_date':
117                     if not MemDb._compare_date(v, content.get(k)):
118                         return False
119                 elif k == 'trust_indicator.current':
120                     if content.get('trust_indicator').get('current') != v:
121                         return False
122                 elif content.get(k, None) != v:
123                     return False
124
125         return True
126
127     def _find(self, *args):
128         res = []
129         for content in self.contents:
130             if self._in(content, *args):
131                 res.append(content)
132
133         return res
134
135     def find(self, *args):
136         return MemCursor(self._find(*args))
137
138     def _update(self, spec, document, check_keys=True):
139         updated = False
140
141         if check_keys:
142             self._check_keys(document)
143
144         for index in range(len(self.contents)):
145             content = self.contents[index]
146             if self._in(content, spec):
147                 for k, v in document.iteritems():
148                     updated = True
149                     content[k] = v
150             self.contents[index] = content
151         return updated
152
153     def update(self, spec, document, check_keys=True):
154         return thread_execute(self._update, spec, document, check_keys)
155
156     def _remove(self, spec_or_id=None):
157         if spec_or_id is None:
158             self.contents = []
159         if not isinstance(spec_or_id, dict):
160             spec_or_id = {'_id': spec_or_id}
161         for index in range(len(self.contents)):
162             content = self.contents[index]
163             if self._in(content, spec_or_id):
164                 del self.contents[index]
165                 return True
166         return False
167
168     def remove(self, spec_or_id=None):
169         return thread_execute(self._remove, spec_or_id)
170
171     def clear(self):
172         self._remove()
173
174     def _check_keys(self, doc):
175         for key in doc.keys():
176             if '.' in key:
177                 raise NameError('key {} must not contain .'.format(key))
178             if key.startswith('$'):
179                 raise NameError('key {} must not start with $'.format(key))
180             if isinstance(doc.get(key), dict):
181                 self._check_keys(doc.get(key))
182
183
184 def __getattr__(name):
185     return globals()[name]
186
187
188 pods = MemDb()
189 projects = MemDb()
190 testcases = MemDb()
191 results = MemDb()