X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ffake_pymongo.py;h=3320a866ac457d2402e4123d15a3caf0aeb2d7c2;hb=6c5fcb955cada2b6323742a07ba4d38edd08ff67;hp=d86d8eadf5e8a7e5a6a4c59117985f0bcf74b191;hpb=76e722eca7e73a1a920ff5fb1fa2dc2107753276;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py index d86d8eadf..3320a866a 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py @@ -6,9 +6,12 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import re + +from operator import itemgetter + from bson.objectid import ObjectId from concurrent.futures import ThreadPoolExecutor -from operator import itemgetter def thread_execute(method, *args, **kwargs): @@ -20,42 +23,57 @@ def thread_execute(method, *args, **kwargs): class MemCursor(object): def __init__(self, collection): self.collection = collection - self.count = len(self.collection) + self.length = len(self.collection) self.sorted = [] def _is_next_exist(self): - return self.count != 0 + return self.length != 0 @property def fetch_next(self): return thread_execute(self._is_next_exist) def next_object(self): - self.count -= 1 + self.length -= 1 return self.collection.pop() def sort(self, key_or_list): - key = key_or_list[0][0] - if key_or_list[0][1] == -1: - reverse = True - else: - reverse = False + for k, v in key_or_list.iteritems(): + if v == -1: + reverse = True + else: + reverse = False - if key_or_list is not None: self.collection = sorted(self.collection, - key=itemgetter(key), reverse=reverse) + key=itemgetter(k), reverse=reverse) return self def limit(self, limit): if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0:limit] - self.count = limit + self.collection = self.collection[0: limit] + self.length = limit + return self + + def skip(self, skip): + if skip < self.length and (skip > 0): + self.collection = self.collection[self.length - skip: -1] + self.length -= skip + elif skip >= self.length: + self.collection = [] + self.length = 0 return self + def _count(self): + return self.length + + def count(self): + return thread_execute(self._count) + class MemDb(object): - def __init__(self): + def __init__(self, name): + self.name = name self.contents = [] pass @@ -104,13 +122,68 @@ class MemDb(object): @staticmethod def _compare_date(spec, value): + gte = True + lt = False for k, v in spec.iteritems(): - if k == '$gte' and value >= v: - return True - return False + if k == '$gte' and value < v: + gte = False + elif k == '$lt' and value < v: + lt = True + return gte and lt + + def _in(self, content, *args): + if self.name == 'scenarios': + return self._in_scenarios(content, *args) + else: + return self._in_others(content, *args) + + def _in_scenarios_installer(self, installer, content): + hit = False + for s_installer in content['installers']: + if installer == s_installer['installer']: + hit = True + + return hit + + def _in_scenarios_version(self, version, content): + hit = False + for s_installer in content['installers']: + for s_version in s_installer['versions']: + if version == s_version['version']: + hit = True + return hit + + def _in_scenarios_project(self, project, content): + hit = False + for s_installer in content['installers']: + for s_version in s_installer['versions']: + for s_project in s_version['projects']: + if project == s_project['project']: + hit = True + + return hit + + def _in_scenarios(self, content, *args): + for arg in args: + for k, v in arg.iteritems(): + if k == 'installers': + for inner in v.values(): + for i_k, i_v in inner.iteritems(): + if i_k == 'installer': + return self._in_scenarios_installer(i_v, + content) + elif i_k == 'versions.version': + return self._in_scenarios_version(i_v, + content) + elif i_k == 'versions.projects.project': + return self._in_scenarios_project(i_v, + content) + elif content.get(k, None) != v: + return False - @staticmethod - def _in(content, *args): + return True + + def _in_others(self, content, *args): for arg in args: for k, v in arg.iteritems(): if k == 'start_date': @@ -119,9 +192,13 @@ class MemDb(object): elif k == 'trust_indicator.current': if content.get('trust_indicator').get('current') != v: return False - elif content.get(k, None) != v: - return False - + elif not isinstance(v, dict): + if isinstance(v, re._pattern_type): + if v.match(content.get(k, None)) is None: + return False + else: + if content.get(k, None) != v: + return False return True def _find(self, *args): @@ -129,12 +206,32 @@ class MemDb(object): for content in self.contents: if self._in(content, *args): res.append(content) - return res def find(self, *args): return MemCursor(self._find(*args)) + def _aggregate(self, *args, **kwargs): + res = self.contents + print args + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$match': + res = self._find(v) + cursor = MemCursor(res) + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$sort': + cursor = cursor.sort(v) + elif k == '$skip': + cursor = cursor.skip(v) + elif k == '$limit': + cursor = cursor.limit(v) + return cursor + + def aggregate(self, *args, **kwargs): + return self._aggregate(*args, **kwargs) + def _update(self, spec, document, check_keys=True): updated = False @@ -185,8 +282,9 @@ def __getattr__(name): return globals()[name] -pods = MemDb() -projects = MemDb() -testcases = MemDb() -results = MemDb() -scenarios = MemDb() +pods = MemDb('pods') +projects = MemDb('projects') +testcases = MemDb('testcases') +results = MemDb('results') +scenarios = MemDb('scenarios') +tokens = MemDb('tokens')