Fix security issues of eval-s in testapi
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tests / unit / fake_pymongo.py
index ef9c719..3dd87e6 100644 (file)
@@ -80,11 +80,15 @@ class MemDb(object):
             return_one = True
             docs = [docs]
 
+        if check_keys:
+            for doc in docs:
+                self._check_keys(doc)
+
         ids = []
         for doc in docs:
             if '_id' not in doc:
                 doc['_id'] = str(ObjectId())
-            if not check_keys or not self._find_one(doc['_id']):
+            if not self._find_one(doc['_id']):
                 ids.append(doc['_id'])
                 self.contents.append(doc_or_docs)
 
@@ -112,8 +116,8 @@ class MemDb(object):
                 if k == 'start_date':
                     if not MemDb._compare_date(v, content.get(k)):
                         return False
-                elif k == 'trust_indicator':
-                    if float(content.get(k)) != float(v):
+                elif k == 'trust_indicator.current':
+                    if content.get('trust_indicator').get('current') != v:
                         return False
                 elif content.get(k, None) != v:
                     return False
@@ -131,8 +135,12 @@ class MemDb(object):
     def find(self, *args):
         return MemCursor(self._find(*args))
 
-    def _update(self, spec, document):
+    def _update(self, spec, document, check_keys=True):
         updated = False
+
+        if check_keys:
+            self._check_keys(document)
+
         for index in range(len(self.contents)):
             content = self.contents[index]
             if self._in(content, spec):
@@ -142,8 +150,8 @@ class MemDb(object):
             self.contents[index] = content
         return updated
 
-    def update(self, spec, document):
-        return thread_execute(self._update, spec, document)
+    def update(self, spec, document, check_keys=True):
+        return thread_execute(self._update, spec, document, check_keys)
 
     def _remove(self, spec_or_id=None):
         if spec_or_id is None:
@@ -163,6 +171,20 @@ class MemDb(object):
     def clear(self):
         self._remove()
 
+    def _check_keys(self, doc):
+        for key in doc.keys():
+            if '.' in key:
+                raise NameError('key {} must not contain .'.format(key))
+            if key.startswith('$'):
+                raise NameError('key {} must not start with $'.format(key))
+            if isinstance(doc.get(key), dict):
+                self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+    return globals()[name]
+
+
 pods = MemDb()
 projects = MemDb()
 testcases = MemDb()