@swagger.model()
class TestResult(object):
- def __init__(self):
- self._id = None
- self.case_name = None
- self.project_name = None
- self.pod_name = None
- self.installer = None
- self.version = None
- self.start_date = None
- self.stop_date = None
- self.details = None
- self.build_tag = None
- self.scenario = None
- self.criteria = None
- self.trust_indicator = None
+ def __init__(self, _id=None, case_name=None, project_name=None,
+ pod_name=None, installer=None, version=None,
+ start_date=None, stop_date=None, details=None,
+ build_tag=None, scenario=None, criteria=None,
+ trust_indicator=None):
+ self._id = _id
+ self.case_name = case_name
+ self.project_name = project_name
+ self.pod_name = pod_name
+ self.installer = installer
+ self.version = version
+ self.start_date = start_date
+ self.stop_date = stop_date
+ self.details = details
+ self.build_tag = build_tag
+ self.scenario = scenario
+ self.criteria = criteria
+ self.trust_indicator = trust_indicator
@staticmethod
def from_dict(a_dict):
@swagger.model()
class TestResults(object):
"""
- @ptype testcases: C{list} of L{TestResult}
+ @property results:
+ @ptype results: C{list} of L{TestResult}
"""
- def __init__(self, results=list()):
- self.results = results
+ def __init__(self):
+ self.results = list()
@staticmethod
def from_dict(a_dict):
__author__ = 'serena'
+def thread_execute(method, *args, **kwargs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(method, *args, **kwargs)
+ return result
+
+
class MemCursor(object):
def __init__(self, collection):
self.collection = collection
@property
def fetch_next(self):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._is_next_exist)
- return result
+ return thread_execute(self._is_next_exist)
def next_object(self):
self.count -= 1
return None
def find_one(self, spec_or_id=None, *args):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._find_one, spec_or_id, *args)
- return result
+ return thread_execute(self._find_one, spec_or_id, *args)
def _insert(self, doc_or_docs, check_keys=True):
return ids
def insert(self, doc_or_docs, check_keys=True):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._insert, doc_or_docs, check_keys)
- return result
+ return thread_execute(self._insert, doc_or_docs, check_keys)
@staticmethod
def _in(content, *args):
return updated
def update(self, spec, document):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._update, spec, document)
- return result
+ return thread_execute(self._update, spec, document)
def _remove(self, spec_or_id=None):
if spec_or_id is None:
return False
def remove(self, spec_or_id=None):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._remove, spec_or_id)
- return result
+ return thread_execute(self._remove, spec_or_id)
def clear(self):
self._remove()