#
import os
import re
+import sys
from functest2Dashboard import format_functest_for_dashboard, \
check_functest_case_exist
from yardstick2Dashboard import format_yardstick_for_dashboard, \
# - check_<Project>_case_exist
-def check_dashboard_ready_project(test_project, path):
+def check_dashboard_ready_project(test_project):
# Check that the first param corresponds to a project
# for whoch dashboard processing is available
- subdirectories = os.listdir(path)
- for testfile in subdirectories:
- m = re.search('^(.*)(2Dashboard.py)$', testfile)
- if m:
- if (m.group(1) == test_project):
- return True
- return False
+ # print("test_project: %s" % test_project)
+ project_module = 'dashboard.'+test_project + '2Dashboard'
+ return True if project_module in sys.modules else False
def check_dashboard_ready_case(project, case):
return eval(cmd)
-def get_dashboard_cases(path):
+def get_dashboard_cases():
# Retrieve all the test cases that could provide
# Dashboard ready graphs
# look in the releng repo
# search all the project2Dashboard.py files
# we assume that dashboard processing of project <Project>
# is performed in the <Project>2Dashboard.py file
- dashboard_test_cases = []
- subdirectories = os.listdir(path)
- for testfile in subdirectories:
- m = re.search('^(.*)(2Dashboard.py)$', testfile)
- if m:
- dashboard_test_cases.append(m.group(1))
+ modules = []
+ cp = re.compile('dashboard.*2Dashboard')
+ for module in sys.modules:
+ if re.match(cp, module):
+ modules.append(module)
- return dashboard_test_cases
+ return modules
def get_dashboard_result(project, case, results):
# feng.xiaowei@zte.com.cn refactor test_project to project 5-19-2016
# feng.xiaowei@zte.com.cn refactor response body 5-19-2016
# feng.xiaowei@zte.com.cn refactor pod/project response info 5-19-2016
-# feng.xiaowei@zte.com.cn refactor case related handler 5-20-2016
+# feng.xiaowei@zte.com.cn refactor testcase related handler 5-20-2016
+# feng.xiaowei@zte.com.cn refactor result related handler 5-23-2016
+# feng.xiaowei@zte.com.cn refactor dashboard related handler 5-24-2016
##############################################################################
import json
+from datetime import datetime, timedelta
from tornado.web import RequestHandler, asynchronous, HTTPError
from tornado import gen
-from datetime import datetime, timedelta
-from models import TestResult, CreateResponse
+from models import CreateResponse
+from resources.result_models import TestResult
from resources.testcase_models import Testcase
from resources.project_models import Project
from resources.pod_models import Pod
from common.constants import DEFAULT_REPRESENTATION, HTTP_BAD_REQUEST, \
HTTP_NOT_FOUND, HTTP_FORBIDDEN
from common.config import prepare_put_request
-
-from dashboard.dashboard_utils import get_dashboard_cases, \
- check_dashboard_ready_project, check_dashboard_ready_case, \
- get_dashboard_result
+from dashboard.dashboard_utils import check_dashboard_ready_project, \
+ check_dashboard_ready_case, get_dashboard_result
def format_data(data, cls):
self.db = self.settings["db"]
def prepare(self):
- if not (self.request.method == "GET" or self.request.method == "DELETE"):
+ if self.request.method != "GET" and self.request.method != "DELETE":
if self.request.headers.get("Content-Type") is not None:
if self.request.headers["Content-Type"].startswith(
DEFAULT_REPRESENTATION):
self.set_header("Content-Type", DEFAULT_REPRESENTATION)
self.finish()
+ def _create_response(self, resource):
+ href = self.request.full_url() + '/' + resource
+ return CreateResponse(href=href).format()
+
class VersionHandler(RequestHandler):
""" Display a message for the API version """
pod.creation_date = datetime.now()
yield self.db.pods.insert(pod.format())
-
- res = CreateResponse(self.request.full_url() + '/{}'.format(pod.name))
- self.finish_request(res.format())
+ self.finish_request(self._create_response(pod.name))
@asynchronous
@gen.coroutine
project.creation_date = datetime.now()
yield self.db.projects.insert(project.format())
-
- res = CreateResponse(self.request.full_url() + '/{}'.format(project.name))
- self.finish_request(res.format())
-
+ self.finish_request(self._create_response(project.name))
@asynchronous
@gen.coroutine
testcase.creation_date = datetime.now()
yield self.db.testcases.insert(testcase.format())
- res = CreateResponse(self.request.full_url() + '/{}'.format(testcase.name))
- self.finish_request(res.format())
+ self.finish_request(self._create_response(testcase.name))
@asynchronous
@gen.coroutine
=> get results with optional filters
"""
- project_arg = self.get_query_argument("project", None)
- case_arg = self.get_query_argument("case", None)
- pod_arg = self.get_query_argument("pod", None)
- version_arg = self.get_query_argument("version", None)
- installer_arg = self.get_query_argument("installer", None)
- build_tag_arg = self.get_query_argument("build_tag", None)
- scenario_arg = self.get_query_argument("scenario", None)
- criteria_arg = self.get_query_argument("criteria", None)
- period_arg = self.get_query_argument("period", None)
- trust_indicator_arg = self.get_query_argument("trust_indicator", None)
-
# prepare request
- get_request = dict()
- if result_id is None:
+ query = dict()
+ if result_id is not None:
+ query["_id"] = result_id
+ answer = yield self.db.results.find_one(query)
+ if answer is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "test result {} Not Exist".format(result_id))
+ else:
+ answer = format_data(answer, TestResult)
+ else:
+ pod_arg = self.get_query_argument("pod", None)
+ project_arg = self.get_query_argument("project", None)
+ case_arg = self.get_query_argument("case", None)
+ version_arg = self.get_query_argument("version", None)
+ installer_arg = self.get_query_argument("installer", None)
+ build_tag_arg = self.get_query_argument("build_tag", None)
+ scenario_arg = self.get_query_argument("scenario", None)
+ criteria_arg = self.get_query_argument("criteria", None)
+ period_arg = self.get_query_argument("period", None)
+ trust_indicator_arg = self.get_query_argument("trust_indicator",
+ None)
+
if project_arg is not None:
- get_request["project_name"] = project_arg
+ query["project_name"] = project_arg
if case_arg is not None:
- get_request["case_name"] = case_arg
+ query["case_name"] = case_arg
if pod_arg is not None:
- get_request["pod_name"] = pod_arg
+ query["pod_name"] = pod_arg
if version_arg is not None:
- get_request["version"] = version_arg
+ query["version"] = version_arg
if installer_arg is not None:
- get_request["installer"] = installer_arg
+ query["installer"] = installer_arg
if build_tag_arg is not None:
- get_request["build_tag"] = build_tag_arg
+ query["build_tag"] = build_tag_arg
if scenario_arg is not None:
- get_request["scenario"] = scenario_arg
+ query["scenario"] = scenario_arg
if criteria_arg is not None:
- get_request["criteria_tag"] = criteria_arg
+ query["criteria_tag"] = criteria_arg
if trust_indicator_arg is not None:
- get_request["trust_indicator_arg"] = trust_indicator_arg
+ query["trust_indicator_arg"] = trust_indicator_arg
if period_arg is not None:
try:
if period_arg > 0:
period = datetime.now() - timedelta(days=period_arg)
obj = {"$gte": str(period)}
- get_request["creation_date"] = obj
- else:
- get_request["_id"] = result_id
+ query["creation_date"] = obj
- print get_request
- res = []
- # fetching results
- cursor = self.db.test_results.find(get_request)
- while (yield cursor.fetch_next):
- test_result = TestResult.test_result_from_dict(
- cursor.next_object())
- res.append(test_result.format_http())
-
- # building meta object
- meta = dict()
- meta["total"] = len(res)
+ res = []
+ cursor = self.db.results.find(query)
+ while (yield cursor.fetch_next):
+ res.append(format_data(cursor.next_object(), TestResult))
+ answer = {'results': res}
- # final response object
- answer = dict()
- answer["test_results"] = res
- answer["meta"] = meta
self.finish_request(answer)
@asynchronous
# check for request payload
if self.json_args is None:
- raise HTTPError(HTTP_BAD_REQUEST)
+ raise HTTPError(HTTP_BAD_REQUEST, 'no payload')
+
+ result = TestResult.from_dict(self.json_args)
- # check for missing parameters in the request payload
- if self.json_args.get("project_name") is None:
- raise HTTPError(HTTP_BAD_REQUEST)
- if self.json_args.get("case_name") is None:
- raise HTTPError(HTTP_BAD_REQUEST)
# check for pod_name instead of id,
# keeping id for current implementations
- if self.json_args.get("pod_name") is None:
- raise HTTPError(HTTP_BAD_REQUEST)
+ if result.pod_name is None:
+ raise HTTPError(HTTP_BAD_REQUEST, 'pod is not provided')
- # TODO : replace checks with jsonschema
- # check for project
- mongo_dict = yield self.db.projects.find_one(
- {"name": self.json_args.get("project_name")})
- if mongo_dict is None:
- raise HTTPError(HTTP_NOT_FOUND,
- "Could not find project [{}] "
- .format(self.json_args.get("project_name")))
+ # check for missing parameters in the request payload
+ if result.project_name is None:
+ raise HTTPError(HTTP_BAD_REQUEST, 'project is not provided')
- # check for case
- mongo_dict = yield self.db.testcases.find_one(
- {"name": self.json_args.get("case_name")})
- if mongo_dict is None:
- raise HTTPError(HTTP_NOT_FOUND,
- "Could not find case [{}] "
- .format(self.json_args.get("case_name")))
+ if result.case_name is None:
+ raise HTTPError(HTTP_BAD_REQUEST, 'testcase is not provided')
+ # TODO : replace checks with jsonschema
# check for pod
- mongo_dict = yield self.db.pods.find_one(
- {"name": self.json_args.get("pod_name")})
- if mongo_dict is None:
+ the_pod = yield self.db.pods.find_one({"name": result.pod_name})
+ if the_pod is None:
raise HTTPError(HTTP_NOT_FOUND,
"Could not find POD [{}] "
.format(self.json_args.get("pod_name")))
+ # check for project
+ the_project = yield self.db.projects.find_one(
+ {"name": result.project_name})
+ if the_project is None:
+ raise HTTPError(HTTP_NOT_FOUND, "Could not find project [{}] "
+ .format(result.project_name))
+
+ # check for testcase
+ the_testcase = yield self.db.testcases.find_one(
+ {"name": result.case_name})
+ if the_testcase is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "Could not find testcase [{}] "
+ .format(result.case_name))
+
# convert payload to object
- test_result = TestResult.test_result_from_dict(self.json_args)
- test_result.creation_date = datetime.now()
+ result.creation_date = datetime.now()
- future = self.db.test_results.insert(test_result.format(),
- check_keys=False)
- result = yield future
- test_result._id = result
+ _id = yield self.db.results.insert(result.format(), check_keys=False)
- self.finish_request(test_result.format_http())
+ self.finish_request(self._create_response(_id))
class DashboardHandler(GenericApiHandler):
@asynchronous
@gen.coroutine
- def get(self, result_id=None):
+ def get(self):
"""
Retrieve dashboard ready result(s) for a test project
Available filters for this request are :
period_arg = self.get_query_argument("period", None)
# prepare request
- get_request = dict()
+ query = dict()
- # /dashboard?project=<>&pod=<>...
- if (result_id is None):
- if project_arg is not None:
- get_request["project_name"] = project_arg
+ if project_arg is not None:
+ query["project_name"] = project_arg
- if case_arg is not None:
- get_request["case_name"] = case_arg
+ if case_arg is not None:
+ query["case_name"] = case_arg
- if pod_arg is not None:
- get_request["pod_name"] = pod_arg
+ if pod_arg is not None:
+ query["pod_name"] = pod_arg
- if version_arg is not None:
- get_request["version"] = version_arg
+ if version_arg is not None:
+ query["version"] = version_arg
- if installer_arg is not None:
- get_request["installer"] = installer_arg
+ if installer_arg is not None:
+ query["installer"] = installer_arg
- if period_arg is not None:
- try:
- period_arg = int(period_arg)
- except:
- raise HTTPError(HTTP_BAD_REQUEST)
- if period_arg > 0:
- period = datetime.now() - timedelta(days=period_arg)
- obj = {"$gte": str(period)}
- get_request["creation_date"] = obj
- else:
- get_request["_id"] = result_id
-
- dashboard = []
+ if period_arg is not None:
+ try:
+ period_arg = int(period_arg)
+ except:
+ raise HTTPError(HTTP_BAD_REQUEST)
+ if period_arg > 0:
+ period = datetime.now() - timedelta(days=period_arg)
+ obj = {"$gte": str(period)}
+ query["creation_date"] = obj
# on /dashboard retrieve the list of projects and testcases
# ready for dashboard
if project_arg is None:
- raise HTTPError(HTTP_NOT_FOUND,
- "error:Project name missing")
- elif check_dashboard_ready_project(project_arg, "./dashboard"):
- res = []
-
- if case_arg is None:
- raise HTTPError(
- HTTP_NOT_FOUND,
- "error:Test case missing for project " + project_arg)
-
- # special case of status for project
- if case_arg == "status":
- del get_request["case_name"]
- # retention time to be agreed
- # last five days by default?
- # TODO move to DB
- period = datetime.now() - timedelta(days=5)
- get_request["creation_date"] = {"$gte": period}
-
- # fetching results
- cursor = self.db.test_results.find(get_request)
- while (yield cursor.fetch_next):
- test_result = TestResult.test_result_from_dict(
- cursor.next_object())
- res.append(test_result.format_http())
+ raise HTTPError(HTTP_NOT_FOUND, "Project name missing")
- if check_dashboard_ready_case(project_arg, case_arg):
- dashboard = get_dashboard_result(project_arg, case_arg, res)
- else:
- raise HTTPError(
- HTTP_NOT_FOUND,
- "error:" + case_arg +
- " test case not case dashboard ready on project " +
- project_arg)
+ if not check_dashboard_ready_project(project_arg):
+ raise HTTPError(HTTP_NOT_FOUND,
+ 'Project [{}] not dashboard ready'
+ .format(project_arg))
- else:
- dashboard.append(
- {"error": "Project not recognized or not dashboard ready"})
- dashboard.append(
- {"Dashboard-ready-projects":
- get_dashboard_cases("./dashboard")})
+ if case_arg is None:
raise HTTPError(
HTTP_NOT_FOUND,
- "error: no dashboard ready data for this project")
+ 'Test case missing for project [{}]'.format(project_arg))
- # fetching results
- # cursor = self.db.test_results.find(get_request)
- # while (yield cursor.fetch_next):
- # test_result = TestResult.test_result_from_dict(
- # cursor.next_object())
- # res.append(test_result.format_http())
+ if not check_dashboard_ready_case(project_arg, case_arg):
+ raise HTTPError(
+ HTTP_NOT_FOUND,
+ 'Test case [{}] not dashboard ready for project [{}]'
+ .format(case_arg, project_arg))
- # building meta object
- meta = dict()
+ # special case of status for project
+ res = []
+ if case_arg != "status":
+ cursor = self.db.results.find(query)
+ while (yield cursor.fetch_next):
+ result = TestResult.from_dict(cursor.next_object())
+ res.append(result.format_http())
# final response object
- answer = dict()
- answer["dashboard"] = dashboard
- answer["meta"] = meta
- self.finish_request(answer)
+ self.finish_request(get_dashboard_result(project_arg, case_arg, res))
# feng.xiaowei@zte.com.cn mv TestProject to project_models.py 5-19-2016\r
# feng.xiaowei@zte.com.cn delete meta class 5-19-2016\r
# feng.xiaowei@zte.com.cn add CreateResponse 5-19-2016\r
-# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016\r
+# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016\r
+# feng.xiaowei@zte.com.cn mv TestResut to result_models.py 5-23-2016\r
##############################################################################\r
\r
\r
\r
def format(self):\r
return {'href': self.href}\r
-\r
-\r
-class TestResult:\r
- """ Describes a test result"""\r
-\r
- def __init__(self):\r
- self._id = None\r
- self.case_name = None\r
- self.project_name = None\r
- self.pod_name = None\r
- self.installer = None\r
- self.version = None\r
- self.description = None\r
- self.creation_date = None\r
- self.details = None\r
- self.build_tag = None\r
- self.scenario = None\r
- self.criteria = None\r
- self.trust_indicator = None\r
-\r
- @staticmethod\r
- def test_result_from_dict(test_result_dict):\r
-\r
- if test_result_dict is None:\r
- return None\r
-\r
- t = TestResult()\r
- t._id = test_result_dict.get('_id')\r
- t.case_name = test_result_dict.get('case_name')\r
- t.pod_name = test_result_dict.get('pod_name')\r
- t.project_name = test_result_dict.get('project_name')\r
- t.description = test_result_dict.get('description')\r
- t.creation_date = str(test_result_dict.get('creation_date'))\r
- t.details = test_result_dict.get('details')\r
- t.version = test_result_dict.get('version')\r
- t.installer = test_result_dict.get('installer')\r
- t.build_tag = test_result_dict.get('build_tag')\r
- t.scenario = test_result_dict.get('scenario')\r
- t.criteria = test_result_dict.get('criteria')\r
- # 0 < trust indicator < 1\r
- # if bad value => set this indicator to 0\r
- if test_result_dict.get('trust_indicator') is not None:\r
- if isinstance(test_result_dict.get('trust_indicator'),\r
- (int, long, float)):\r
- if test_result_dict.get('trust_indicator') < 0:\r
- t.trust_indicator = 0\r
- elif test_result_dict.get('trust_indicator') > 1:\r
- t.trust_indicator = 1\r
- else:\r
- t.trust_indicator = test_result_dict.get('trust_indicator')\r
- else:\r
- t.trust_indicator = 0\r
- else:\r
- t.trust_indicator = 0\r
- return t\r
-\r
- def format(self):\r
- return {\r
- "case_name": self.case_name,\r
- "project_name": self.project_name,\r
- "pod_name": self.pod_name,\r
- "description": self.description,\r
- "creation_date": str(self.creation_date),\r
- "version": self.version,\r
- "installer": self.installer,\r
- "details": self.details,\r
- "build_tag": self.build_tag,\r
- "scenario": self.scenario,\r
- "criteria": self.criteria,\r
- "trust_indicator": self.trust_indicator\r
- }\r
-\r
- def format_http(self):\r
- return {\r
- "_id": str(self._id),\r
- "case_name": self.case_name,\r
- "project_name": self.project_name,\r
- "pod_name": self.pod_name,\r
- "description": self.description,\r
- "creation_date": str(self.creation_date),\r
- "version": self.version,\r
- "installer": self.installer,\r
- "details": self.details,\r
- "build_tag": self.build_tag,\r
- "scenario": self.scenario,\r
- "criteria": self.criteria,\r
- "trust_indicator": self.trust_indicator\r
- }\r
--- /dev/null
+
+class ResultCreateRequest(object):
+ def __init__(self,
+ pod_name=None,
+ project_name=None,
+ case_name=None,
+ installer=None,
+ version=None,
+ description=None,
+ details=None,
+ build_tag=None,
+ scenario=None,
+ criteria=None,
+ trust_indicator=None):
+ self.pod_name = pod_name
+ self.project_name = project_name
+ self.case_name = case_name
+ self.installer = installer
+ self.version = version
+ self.description = description
+ self.details = details
+ self.build_tag = build_tag
+ self.scenario = scenario
+ self.criteria = criteria
+ self.trust_indicator = trust_indicator
+
+ def format(self):
+ return {
+ "pod_name": self.pod_name,
+ "project_name": self.project_name,
+ "case_name": self.case_name,
+ "installer": self.installer,
+ "version": self.version,
+ "description": self.description,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator
+ }
+
+
+class TestResult:
+ """ Describes a test result"""
+
+ def __init__(self):
+ self._id = None
+ self.case_name = None
+ self.project_name = None
+ self.pod_name = None
+ self.installer = None
+ self.version = None
+ self.description = None
+ self.creation_date = None
+ self.details = None
+ self.build_tag = None
+ self.scenario = None
+ self.criteria = None
+ self.trust_indicator = None
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = TestResult()
+ t._id = a_dict.get('_id')
+ t.case_name = a_dict.get('case_name')
+ t.pod_name = a_dict.get('pod_name')
+ t.project_name = a_dict.get('project_name')
+ t.description = a_dict.get('description')
+ t.creation_date = str(a_dict.get('creation_date'))
+ t.details = a_dict.get('details')
+ t.version = a_dict.get('version')
+ t.installer = a_dict.get('installer')
+ t.build_tag = a_dict.get('build_tag')
+ t.scenario = a_dict.get('scenario')
+ t.criteria = a_dict.get('criteria')
+ # 0 < trust indicator < 1
+ # if bad value => set this indicator to 0
+ t.trust_indicator = a_dict.get('trust_indicator')
+ if t.trust_indicator is not None:
+ if isinstance(t.trust_indicator, (int, long, float)):
+ if t.trust_indicator < 0:
+ t.trust_indicator = 0
+ elif t.trust_indicator > 1:
+ t.trust_indicator = 1
+ else:
+ t.trust_indicator = 0
+ else:
+ t.trust_indicator = 0
+ return t
+
+ def format(self):
+ return {
+ "case_name": self.case_name,
+ "project_name": self.project_name,
+ "pod_name": self.pod_name,
+ "description": self.description,
+ "creation_date": str(self.creation_date),
+ "version": self.version,
+ "installer": self.installer,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator
+ }
+
+ def format_http(self):
+ return {
+ "_id": str(self._id),
+ "case_name": self.case_name,
+ "project_name": self.project_name,
+ "pod_name": self.pod_name,
+ "description": self.description,
+ "creation_date": str(self.creation_date),
+ "version": self.version,
+ "installer": self.installer,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator
+ }
+
+
+class TestResults(object):
+ def __init__(self, results=list()):
+ self.results = results
+
+ @staticmethod
+ def from_dict(a_dict):
+ if a_dict is None:
+ return None
+
+ res = TestResults()
+ for result in a_dict.get('results'):
+ res.results.append(TestResult.from_dict(result))
+ return res
from bson.objectid import ObjectId
from concurrent.futures import ThreadPoolExecutor
+
__author__ = 'serena'
result = executor.submit(self._find_one, spec_or_id, *args)
return result
- def _insert(self, doc_or_docs):
+ def _insert(self, doc_or_docs, check_keys=True):
docs = doc_or_docs
return_one = False
ids = []
for doc in docs:
if '_id' not in doc:
- doc['_id'] = ObjectId()
- if not self._find_one(doc['_id']):
+ doc['_id'] = str(ObjectId())
+ if not check_keys or not self._find_one(doc['_id']):
ids.append(doc['_id'])
self.contents.append(doc_or_docs)
else:
return ids
- def insert(self, doc_or_docs):
+ def insert(self, doc_or_docs, check_keys=True):
with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(self._insert, doc_or_docs)
+ result = executor.submit(self._insert, doc_or_docs, check_keys)
return result
@staticmethod
def _in(content, *args):
for arg in args:
for k, v in arg.iteritems():
- if content.get(k, None) != v:
+ if k != 'creation_date' and content.get(k, None) != v:
return False
return True
pods = MemDb()
projects = MemDb()
testcases = MemDb()
-test_results = MemDb()
+results = MemDb()
return self.create(self.req_e, *args)
def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
if req:
req = req.format()
-
- res = self.fetch(self._get_uri(*args),
+ res = self.fetch(self._update_uri(uri, *args),
method='POST',
body=json.dumps(req),
headers=self.headers)
return self._get_return(res, self.create_res)
- def create_help(self, uri, req, cls):
- res = self.fetch(uri,
- method='POST',
- body=json.dumps(req.format()),
- headers=self.headers)
-
- return self._get_return(res, cls)
-
def get(self, *args):
res = self.fetch(self._get_uri(*args),
method='GET',
def inner():
new_args, num = self._get_valid_args(*args)
- return self.get_res if num != self._need_arg_num() else self.list_res
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
return self._get_return(res, inner())
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
def update(self, new=None, *args):
if new:
new = new.format()
new_args = tuple(['%s' % arg for arg in args if arg is not None])
return new_args, len(new_args)
- def _need_arg_num(self):
- return self.basePath.count('%s')
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query
def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
new_args, num = self._get_valid_args(*args)
- uri = self.basePath
- if num != self._need_arg_num():
- uri += '/%s'
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
- return uri % tuple(['%s' % arg for arg in new_args])
+ return r_uri % tuple(['%s' % arg for arg in new_args])
def _get_return(self, res, cls):
code = res.code
@staticmethod
def _get_return_body(code, body, cls):
- return cls.from_dict(json.loads(body)) if code < 300 else body
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
def assert_create_body(self, body, req=None, *args):
if not req:
fake_pymongo.pods.clear()
fake_pymongo.projects.clear()
fake_pymongo.testcases.clear()
- fake_pymongo.test_results.clear()
+ fake_pymongo.results.clear()
--- /dev/null
+import unittest
+
+from test_result import TestResultBase
+from common.constants import HTTP_NOT_FOUND, HTTP_OK
+
+__author__ = '__serena__'
+
+
+class TestDashboardBase(TestResultBase):
+ def setUp(self):
+ super(TestDashboardBase, self).setUp()
+ self.basePath = '/dashboard'
+ self.create_help('/results', self.req_d)
+ self.create_help('/results', self.req_d)
+ self.list_res = None
+
+
+class TestDashboardQuery(TestDashboardBase):
+ def test_projectMissing(self):
+ code, body = self.query(self._set_query(project='missing'))
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Project name missing', body)
+
+ def test_projectNotReady(self):
+ code, body = self.query(self._set_query(project='notReadyProject'))
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Project [notReadyProject] not dashboard ready', body)
+
+ def test_testcaseMissing(self):
+ code, body = self.query(self._set_query(case='missing'))
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Test case missing for project [{}]'
+ .format(self.project),
+ body)
+
+ def test_testcaseNotReady(self):
+ code, body = self.query(self._set_query(case='notReadyCase'))
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn(
+ 'Test case [notReadyCase] not dashboard ready for project [%s]'
+ % self.project,
+ body)
+
+ def test_success(self):
+ code, body = self.query(self._set_query())
+ self.assertEqual(code, HTTP_OK)
+ self.assertIn('{"description": "vPing results for Dashboard"}', body)
+
+ def test_caseIsStatus(self):
+ code, body = self.query(self._set_query(case='status'))
+ self.assertEqual(code, HTTP_OK)
+ self.assertIn('{"description": "Functest status"}', body)
+
+ def _set_query(self, project=None, case=None):
+ uri = ''
+ for k, v in list(locals().iteritems()):
+ if k == 'self' or k == 'uri':
+ continue
+ if v is None:
+ v = eval('self.' + k)
+ if v != 'missing':
+ uri += '{}={}&'.format(k, v)
+ uri += 'pod={}&'.format(self.pod)
+ uri += 'version={}&'.format(self.version)
+ uri += 'installer={}&'.format(self.installer)
+ uri += 'period={}&'.format(5)
+ return uri[0:-1]
+
+
+if __name__ == '__main__':
+ unittest.main()
def setUp(self):
super(MyTest, self).setUp()
self.db = fake_pymongo
+ self.addCleanup(self._clear)
self.io_loop.run_sync(self.fixture_setup)
def get_app(self):
def test_find_one(self):
user = yield self.db.pods.find_one({'name': 'test1'})
self.assertEqual(user, self.test1)
+ self.db.pods.remove()
@gen_test
def test_find(self):
user = yield self.db.pods.find_one({'_id': '1'})
self.assertIsNone(user)
+ @gen_test
+ def test_insert_check_keys(self):
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'},
+ check_keys=False)
+ cursor = self.db.pods.find({'_id': '1'})
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test1'])
+
+ def _clear(self):
+ self.db.pods.clear()
+
if __name__ == '__main__':
unittest.main()
--- /dev/null
+import unittest
+
+from test_base import TestBase
+from resources.pod_models import PodCreateRequest
+from resources.project_models import ProjectCreateRequest
+from resources.testcase_models import TestcaseCreateRequest
+from resources.result_models import ResultCreateRequest, \
+ TestResult, TestResults
+from common.constants import HTTP_OK, HTTP_BAD_REQUEST, HTTP_NOT_FOUND
+
+
+__author__ = '__serena__'
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ return t
+
+
+class TestResultBase(TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = '10s'
+ self.trust_indicator = 0.7
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ description='vping use ssh',
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = TestResult
+ self.list_res = TestResults
+ self.basePath = '/results'
+ self.req_pod = PodCreateRequest(self.pod, 'metal', 'zte pod 1')
+ self.req_project = ProjectCreateRequest(self.project, 'vping test')
+ self.req_testcase = TestcaseCreateRequest('/cases/vping',
+ self.case,
+ 'vping-ssh test')
+ self.create_help('/pods', self.req_pod)
+ self.create_help('/projects', self.req_project)
+ self.create_help('/projects/%s/cases', self.req_testcase, self.project)
+
+ def assert_res(self, code, result):
+ self.assertEqual(code, HTTP_OK)
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ self.assertEqual(result.description, req.description)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.trust_indicator, req.trust_indicator)
+ self.assertIsNotNone(result.creation_date)
+ self.assertIsNotNone(result._id)
+
+
+class TestResultCreate(TestResultBase):
+ def test_nobody(self):
+ (code, body) = self.create(None)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('no payload', body)
+
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('pod is not provided', body)
+
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('project is not provided', body)
+
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('testcase is not provided', body)
+
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find POD', body)
+
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find project', body)
+
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find testcase', body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_href(body)
+
+
+class TestResultGet(TestResultBase):
+ def test_getOne(self):
+ _, res = self.create_d()
+ _id = res.href.split('/')[-1]
+ code, body = self.get(_id)
+ self.assert_res(code, body)
+
+ def test_queryPod(self):
+ self._query_and_assert(self._set_query('pod'))
+
+ def test_queryProject(self):
+ self._query_and_assert(self._set_query('project'))
+
+ def test_queryTestcase(self):
+ self._query_and_assert(self._set_query('case'))
+
+ def test_queryVersion(self):
+ self._query_and_assert(self._set_query('version'))
+
+ def test_queryInstaller(self):
+ self._query_and_assert(self._set_query('installer'))
+
+ def test_queryBuildTag(self):
+ self._query_and_assert(self._set_query('build_tag'))
+
+ def test_queryScenario(self):
+ self._query_and_assert(self._set_query('scenario'))
+
+ def test_queryTrustIndicator(self):
+ self._query_and_assert(self._set_query('trust_indicator'))
+
+ def test_queryCriteria(self):
+ self._query_and_assert(self._set_query('criteria'))
+
+ def test_queryPeriod(self):
+ self._query_and_assert(self._set_query('period=1'))
+
+ def test_combination(self):
+ self._query_and_assert(self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'))
+
+ def test_notFound(self):
+ self._query_and_assert(self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'),
+ found=False)
+
+ def _query_and_assert(self, query, found=True):
+ _, res = self.create_d()
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(0, len(body.results))
+ else:
+ for result in body.results:
+ self.assert_res(code, result)
+
+ def _set_query(self, *args):
+ uri = ''
+ for arg in args:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, eval('self.' + arg))
+ return uri[0: -1]
+
+if __name__ == '__main__':
+ unittest.main()
from test_base import TestBase
from resources.testcase_models import TestcaseCreateRequest, \
Testcase, Testcases, TestcaseUpdateRequest
-from resources.project_models import ProjectCreateRequest, Project
+from resources.project_models import ProjectCreateRequest
from common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
HTTP_FORBIDDEN, HTTP_NOT_FOUND
class TestCaseBase(TestBase):
def setUp(self):
super(TestCaseBase, self).setUp()
- self.req_d = TestcaseCreateRequest('/cases/vping_1', 'vping_1', 'vping-ssh test')
- self.req_e = TestcaseCreateRequest('/cases/doctor_1', 'doctor_1', 'create doctor')
- self.update_d = TestcaseUpdateRequest('vping_1', 'vping-ssh test', 'functest')
- self.update_e = TestcaseUpdateRequest('doctor_1', 'create doctor', 'functest')
+ self.req_d = TestcaseCreateRequest('/cases/vping_1',
+ 'vping_1',
+ 'vping-ssh test')
+ self.req_e = TestcaseCreateRequest('/cases/doctor_1',
+ 'doctor_1',
+ 'create doctor')
+ self.update_d = TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
self.get_res = Testcase
self.list_res = Testcases
self.update_res = Testcase
def create_project(self):
req_p = ProjectCreateRequest('functest', 'vping-ssh test')
- self.create_help('/projects', req_p, Project)
+ self.create_help('/projects', req_p)
self.project = req_p.name
def create_d(self):