Fix security issues of eval-s in testapi
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
index 9b4d120..3dd87e6 100644 (file)
@@ -8,6 +8,7 @@
 ##############################################################################
 from bson.objectid import ObjectId
 from concurrent.futures import ThreadPoolExecutor
+from operator import itemgetter
 
 
 def thread_execute(method, *args, **kwargs):
@@ -20,6 +21,7 @@ class MemCursor(object):
     def __init__(self, collection):
         self.collection = collection
         self.count = len(self.collection)
+        self.sorted = []
 
     def _is_next_exist(self):
         return self.count != 0
@@ -32,10 +34,22 @@ class MemCursor(object):
         self.count -= 1
         return self.collection.pop()
 
-    def sort(self, key_or_list, direction=None):
+    def sort(self, key_or_list):
+        key = key_or_list[0][0]
+        if key_or_list[0][1] == -1:
+            reverse = True
+        else:
+            reverse = False
+
+        if key_or_list is not None:
+            self.collection = sorted(self.collection,
+                                     key=itemgetter(key), reverse=reverse)
         return self
 
     def limit(self, limit):
+        if limit != 0 and limit < len(self.collection):
+            self.collection = self.collection[0:limit]
+            self.count = limit
         return self
 
 
@@ -66,11 +80,15 @@ class MemDb(object):
             return_one = True
             docs = [docs]
 
+        if check_keys:
+            for doc in docs:
+                self._check_keys(doc)
+
         ids = []
         for doc in docs:
             if '_id' not in doc:
                 doc['_id'] = str(ObjectId())
-            if not check_keys or not self._find_one(doc['_id']):
+            if not self._find_one(doc['_id']):
                 ids.append(doc['_id'])
                 self.contents.append(doc_or_docs)
 
@@ -98,8 +116,8 @@ class MemDb(object):
                 if k == 'start_date':
                     if not MemDb._compare_date(v, content.get(k)):
                         return False
-                elif k == 'trust_indicator':
-                    if float(content.get(k)) != float(v):
+                elif k == 'trust_indicator.current':
+                    if content.get('trust_indicator').get('current') != v:
                         return False
                 elif content.get(k, None) != v:
                     return False
@@ -117,8 +135,12 @@ class MemDb(object):
     def find(self, *args):
         return MemCursor(self._find(*args))
 
-    def _update(self, spec, document):
+    def _update(self, spec, document, check_keys=True):
         updated = False
+
+        if check_keys:
+            self._check_keys(document)
+
         for index in range(len(self.contents)):
             content = self.contents[index]
             if self._in(content, spec):
@@ -128,8 +150,8 @@ class MemDb(object):
             self.contents[index] = content
         return updated
 
-    def update(self, spec, document):
-        return thread_execute(self._update, spec, document)
+    def update(self, spec, document, check_keys=True):
+        return thread_execute(self._update, spec, document, check_keys)
 
     def _remove(self, spec_or_id=None):
         if spec_or_id is None:
@@ -149,6 +171,20 @@ class MemDb(object):
     def clear(self):
         self._remove()
 
+    def _check_keys(self, doc):
+        for key in doc.keys():
+            if '.' in key:
+                raise NameError('key {} must not contain .'.format(key))
+            if key.startswith('$'):
+                raise NameError('key {} must not start with $'.format(key))
+            if isinstance(doc.get(key), dict):
+                self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+    return globals()[name]
+
+
 pods = MemDb()
 projects = MemDb()
 testcases = MemDb()