X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ffake_pymongo.py;h=0ca83df6223ad235785e146620efe0d47323116a;hb=b271f2e15333b4a37bcef9a75ac645dfaa93942b;hp=d86d8eadf5e8a7e5a6a4c59117985f0bcf74b191;hpb=7408371048bdb55a119e384925477a7bc527ab01;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py index d86d8eadf..0ca83df62 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py @@ -6,9 +6,10 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from operator import itemgetter + from bson.objectid import ObjectId from concurrent.futures import ThreadPoolExecutor -from operator import itemgetter def thread_execute(method, *args, **kwargs): @@ -20,42 +21,57 @@ def thread_execute(method, *args, **kwargs): class MemCursor(object): def __init__(self, collection): self.collection = collection - self.count = len(self.collection) + self.length = len(self.collection) self.sorted = [] def _is_next_exist(self): - return self.count != 0 + return self.length != 0 @property def fetch_next(self): return thread_execute(self._is_next_exist) def next_object(self): - self.count -= 1 + self.length -= 1 return self.collection.pop() def sort(self, key_or_list): - key = key_or_list[0][0] - if key_or_list[0][1] == -1: - reverse = True - else: - reverse = False + for k, v in key_or_list.iteritems(): + if v == -1: + reverse = True + else: + reverse = False - if key_or_list is not None: self.collection = sorted(self.collection, - key=itemgetter(key), reverse=reverse) + key=itemgetter(k), reverse=reverse) return self def limit(self, limit): if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0:limit] - self.count = limit + self.collection = self.collection[0: limit] + self.length = limit + return self + + def skip(self, skip): + if skip < self.length and (skip > 0): + self.collection = self.collection[self.length - skip: -1] + self.length -= skip + elif skip >= self.length: + self.collection = [] + self.length = 0 return self + def _count(self): + return self.length + + def count(self): + return thread_execute(self._count) + class MemDb(object): - def __init__(self): + def __init__(self, name): + self.name = name self.contents = [] pass @@ -104,13 +120,68 @@ class MemDb(object): @staticmethod def _compare_date(spec, value): + gte = True + lt = False for k, v in spec.iteritems(): - if k == '$gte' and value >= v: - return True - return False + if k == '$gte' and value < v: + gte = False + elif k == '$lt' and value < v: + lt = True + return gte and lt + + def _in(self, content, *args): + if self.name == 'scenarios': + return self._in_scenarios(content, *args) + else: + return self._in_others(content, *args) + + def _in_scenarios_installer(self, installer, content): + hit = False + for s_installer in content['installers']: + if installer == s_installer['installer']: + hit = True + + return hit + + def _in_scenarios_version(self, version, content): + hit = False + for s_installer in content['installers']: + for s_version in s_installer['versions']: + if version == s_version['version']: + hit = True + return hit + + def _in_scenarios_project(self, project, content): + hit = False + for s_installer in content['installers']: + for s_version in s_installer['versions']: + for s_project in s_version['projects']: + if project == s_project['project']: + hit = True + + return hit + + def _in_scenarios(self, content, *args): + for arg in args: + for k, v in arg.iteritems(): + if k == 'installers': + for inner in v.values(): + for i_k, i_v in inner.iteritems(): + if i_k == 'installer': + return self._in_scenarios_installer(i_v, + content) + elif i_k == 'versions.version': + return self._in_scenarios_version(i_v, + content) + elif i_k == 'versions.projects.project': + return self._in_scenarios_project(i_v, + content) + elif content.get(k, None) != v: + return False - @staticmethod - def _in(content, *args): + return True + + def _in_others(self, content, *args): for arg in args: for k, v in arg.iteritems(): if k == 'start_date': @@ -119,9 +190,8 @@ class MemDb(object): elif k == 'trust_indicator.current': if content.get('trust_indicator').get('current') != v: return False - elif content.get(k, None) != v: + elif not isinstance(v, dict) and content.get(k, None) != v: return False - return True def _find(self, *args): @@ -135,6 +205,27 @@ class MemDb(object): def find(self, *args): return MemCursor(self._find(*args)) + def _aggregate(self, *args, **kwargs): + res = self.contents + print args + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$match': + res = self._find(v) + cursor = MemCursor(res) + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$sort': + cursor = cursor.sort(v) + elif k == '$skip': + cursor = cursor.skip(v) + elif k == '$limit': + cursor = cursor.limit(v) + return cursor + + def aggregate(self, *args, **kwargs): + return self._aggregate(*args, **kwargs) + def _update(self, spec, document, check_keys=True): updated = False @@ -185,8 +276,9 @@ def __getattr__(name): return globals()[name] -pods = MemDb() -projects = MemDb() -testcases = MemDb() -results = MemDb() -scenarios = MemDb() +pods = MemDb('pods') +projects = MemDb('projects') +testcases = MemDb('testcases') +results = MemDb('results') +scenarios = MemDb('scenarios') +tokens = MemDb('tokens')