X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ffake_pymongo.py;h=3320a866ac457d2402e4123d15a3caf0aeb2d7c2;hb=6c5fcb955cada2b6323742a07ba4d38edd08ff67;hp=3c4fd01a3f00325d0e3ea26acbbeefc6600e7147;hpb=6e13b028a29cd05b032fc8e12b9fe93ba3fa6936;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py index 3c4fd01a3..3320a866a 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py @@ -6,9 +6,12 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import re + +from operator import itemgetter + from bson.objectid import ObjectId from concurrent.futures import ThreadPoolExecutor -from operator import itemgetter def thread_execute(method, *args, **kwargs): @@ -20,38 +23,52 @@ def thread_execute(method, *args, **kwargs): class MemCursor(object): def __init__(self, collection): self.collection = collection - self.count = len(self.collection) + self.length = len(self.collection) self.sorted = [] def _is_next_exist(self): - return self.count != 0 + return self.length != 0 @property def fetch_next(self): return thread_execute(self._is_next_exist) def next_object(self): - self.count -= 1 + self.length -= 1 return self.collection.pop() def sort(self, key_or_list): - key = key_or_list[0][0] - if key_or_list[0][1] == -1: - reverse = True - else: - reverse = False + for k, v in key_or_list.iteritems(): + if v == -1: + reverse = True + else: + reverse = False - if key_or_list is not None: self.collection = sorted(self.collection, - key=itemgetter(key), reverse=reverse) + key=itemgetter(k), reverse=reverse) return self def limit(self, limit): if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0:limit] - self.count = limit + self.collection = self.collection[0: limit] + self.length = limit return self + def skip(self, skip): + if skip < self.length and (skip > 0): + self.collection = self.collection[self.length - skip: -1] + self.length -= skip + elif skip >= self.length: + self.collection = [] + self.length = 0 + return self + + def _count(self): + return self.length + + def count(self): + return thread_execute(self._count) + class MemDb(object): @@ -105,10 +122,14 @@ class MemDb(object): @staticmethod def _compare_date(spec, value): + gte = True + lt = False for k, v in spec.iteritems(): - if k == '$gte' and value >= v: - return True - return False + if k == '$gte' and value < v: + gte = False + elif k == '$lt' and value < v: + lt = True + return gte and lt def _in(self, content, *args): if self.name == 'scenarios': @@ -171,9 +192,13 @@ class MemDb(object): elif k == 'trust_indicator.current': if content.get('trust_indicator').get('current') != v: return False - elif content.get(k, None) != v: - return False - + elif not isinstance(v, dict): + if isinstance(v, re._pattern_type): + if v.match(content.get(k, None)) is None: + return False + else: + if content.get(k, None) != v: + return False return True def _find(self, *args): @@ -181,12 +206,32 @@ class MemDb(object): for content in self.contents: if self._in(content, *args): res.append(content) - return res def find(self, *args): return MemCursor(self._find(*args)) + def _aggregate(self, *args, **kwargs): + res = self.contents + print args + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$match': + res = self._find(v) + cursor = MemCursor(res) + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$sort': + cursor = cursor.sort(v) + elif k == '$skip': + cursor = cursor.skip(v) + elif k == '$limit': + cursor = cursor.limit(v) + return cursor + + def aggregate(self, *args, **kwargs): + return self._aggregate(*args, **kwargs) + def _update(self, spec, document, check_keys=True): updated = False @@ -242,3 +287,4 @@ projects = MemDb('projects') testcases = MemDb('testcases') results = MemDb('results') scenarios = MemDb('scenarios') +tokens = MemDb('tokens')