from datetime import datetime
import json
import logging
+import mimetypes
import os
import re
-import requests
+from urllib.parse import urlparse
+import boto3
+from boto3.s3.transfer import TransferConfig
+import botocore
import prettytable
-import six
+import requests
from xtesting.utils import decorators
from xtesting.utils import env
+from xtesting.utils import constants
__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-@six.add_metaclass(abc.ABCMeta)
-class TestCase(object):
+class TestCase(metaclass=abc.ABCMeta):
# pylint: disable=too-many-instance-attributes
"""Base model for single test case."""
EX_TESTCASE_SKIPPED = os.EX_SOFTWARE - 3
"""requirements are unmet"""
+ EX_PUBLISH_ARTIFACTS_ERROR = os.EX_SOFTWARE - 4
+ """publish_artifacts() failed"""
+
+ dir_results = constants.RESULTS_DIR
_job_name_rule = "(dai|week)ly-(.+?)-[0-9]*"
- _headers = {'Content-Type': 'application/json'}
+ headers = {'Content-Type': 'application/json'}
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
self.start_time = 0
self.stop_time = 0
self.is_skipped = False
+ self.output_log_name = os.path.basename(constants.LOG_PATH)
+ self.output_debug_log_name = os.path.basename(constants.DEBUG_LOG_PATH)
+ self.res_dir = os.path.join(self.dir_results, self.case_name)
def __str__(self):
try:
assert self.stop_time
if self.stop_time < self.start_time:
return "XX:XX"
- return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod(
- self.stop_time - self.start_time, 60))
+ return "{}:{}".format(
+ str(int(self.stop_time - self.start_time) // 60).zfill(2),
+ str(int(self.stop_time - self.start_time) % 60).zfill(2))
except Exception: # pylint: disable=broad-except
self.__logger.error("Please run test before getting the duration")
return "XX:XX"
data["version"] = "unknown"
req = requests.post(
url, data=json.dumps(data, sort_keys=True),
- headers=self._headers)
+ headers=self.headers)
req.raise_for_status()
- self.__logger.info(
- "The results were successfully pushed to DB")
+ if urlparse(url).scheme != "file":
+ # href must be postprocessed as OPNFV testapi is misconfigured
+ # (localhost is returned)
+ uid = re.sub(r'^.*/api/v1/results/*', '', req.json()["href"])
+ netloc = env.get('TEST_DB_EXT_URL') if env.get(
+ 'TEST_DB_EXT_URL') else env.get('TEST_DB_URL')
+ self.__logger.info(
+ "The results were successfully pushed to DB: \n\n%s\n",
+ os.path.join(netloc, uid))
except AssertionError:
self.__logger.exception(
"Please run test before publishing the results")
return TestCase.EX_PUSH_TO_DB_ERROR
return TestCase.EX_OK
+ def publish_artifacts(self): # pylint: disable=too-many-locals
+ """Push the artifacts to the S3 repository.
+
+ It allows publishing the artifacts.
+
+ It could be overriden if the common implementation is not
+ suitable.
+
+ The credentials must be configured before publishing the artifacts:
+
+ * fill ~/.aws/credentials or ~/.boto,
+ * set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in env.
+
+ The next vars must be set in env:
+
+ * S3_ENDPOINT_URL (http://127.0.0.1:9000),
+ * S3_DST_URL (s3://xtesting/prefix),
+ * HTTP_DST_URL (http://127.0.0.1/prefix).
+
+ Returns:
+ TestCase.EX_OK if artifacts were published to repository.
+ TestCase.EX_PUBLISH_ARTIFACTS_ERROR otherwise.
+ """
+ try:
+ b3resource = boto3.resource(
+ 's3', endpoint_url=os.environ["S3_ENDPOINT_URL"])
+ dst_s3_url = os.environ["S3_DST_URL"]
+ multipart_threshold = 5 * 1024 ** 5 if "google" in os.environ[
+ "S3_ENDPOINT_URL"] else 8 * 1024 * 1024
+ config = TransferConfig(multipart_threshold=multipart_threshold)
+ bucket_name = urlparse(dst_s3_url).netloc
+ try:
+ b3resource.meta.client.head_bucket(Bucket=bucket_name)
+ except botocore.exceptions.ClientError as exc:
+ error_code = exc.response['Error']['Code']
+ if error_code == '404':
+ # pylint: disable=no-member
+ b3resource.create_bucket(Bucket=bucket_name)
+ else:
+ raise exc
+ except Exception as exc: # pylint: disable=broad-except
+ raise exc
+ path = urlparse(dst_s3_url).path.strip("/")
+ dst_http_url = os.environ["HTTP_DST_URL"]
+ output_str = "\n"
+ self.details["links"] = []
+ for log_file in [self.output_log_name, self.output_debug_log_name]:
+ if os.path.exists(os.path.join(self.dir_results, log_file)):
+ abs_file = os.path.join(self.dir_results, log_file)
+ mime_type = mimetypes.guess_type(abs_file)
+ self.__logger.debug(
+ "Publishing %s %s", abs_file, mime_type)
+ # pylint: disable=no-member
+ b3resource.Bucket(bucket_name).upload_file(
+ abs_file, os.path.join(path, log_file), Config=config,
+ ExtraArgs={'ContentType': mime_type[
+ 0] or 'application/octet-stream'})
+ link = os.path.join(dst_http_url, log_file)
+ output_str += "\n{}".format(link)
+ self.details["links"].append(link)
+ for root, _, files in os.walk(self.res_dir):
+ for pub_file in files:
+ abs_file = os.path.join(root, pub_file)
+ mime_type = mimetypes.guess_type(abs_file)
+ self.__logger.debug(
+ "Publishing %s %s", abs_file, mime_type)
+ # pylint: disable=no-member
+ b3resource.Bucket(bucket_name).upload_file(
+ abs_file,
+ os.path.join(path, os.path.relpath(
+ os.path.join(root, pub_file),
+ start=self.dir_results)),
+ Config=config,
+ ExtraArgs={'ContentType': mime_type[
+ 0] or 'application/octet-stream'})
+ link = os.path.join(dst_http_url, os.path.relpath(
+ os.path.join(root, pub_file),
+ start=self.dir_results))
+ output_str += "\n{}".format(link)
+ self.details["links"].append(link)
+ self.__logger.info(
+ "All artifacts were successfully published: %s\n", output_str)
+ return TestCase.EX_OK
+ except KeyError as ex:
+ self.__logger.error("Please check env var: %s", str(ex))
+ return TestCase.EX_PUBLISH_ARTIFACTS_ERROR
+ except botocore.exceptions.NoCredentialsError:
+ self.__logger.error(
+ "Please fill ~/.aws/credentials, ~/.boto or set "
+ "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in env")
+ return TestCase.EX_PUBLISH_ARTIFACTS_ERROR
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot publish the artifacts")
+ return TestCase.EX_PUBLISH_ARTIFACTS_ERROR
+
def clean(self):
"""Clean the resources.