It also fixes lots of new pylint warnings.
Change-Id: I3376aee5649596c53bc2941ad4d8e0674a2967fe
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
jobs:
build:
docker:
- - image: circleci/python:3.8
+ - image: circleci/python:3.9
steps:
- checkout
- run:
- repo: _
dport:
container: alpine
- tag: '3.13'
+ tag: '3.14'
steps:
- name: build opnfv/functest-core
containers:
FROM opnfv/functest-benchmarking
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
COPY blacklist.yaml /src/functest/functest/opnfv_tests/openstack/rally/blacklist.yaml
CMD ["run_tests", "-t", "all"]
cp /src/neutron/rally-jobs/task-neutron.yaml /home/opnfv/functest/data/rally/neutron/rally-jobs/ && \
rm -r /src/vmtp /src/neutron && \
apk del .build-deps
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
CMD ["run_tests", "-t", "all"]
-FROM alpine:3.13
+FROM alpine:3.14
ARG BRANCH=master
ARG OPENSTACK_TAG=master
sed -i -E /^tempest==+.*$/d /src/requirements/upper-constraints.txt && \
sed -i -E /^six=/d /src/requirements/upper-constraints.txt && \
sed -i -E /^distlib=/d /src/requirements/upper-constraints.txt && \
+ sed -i -E /^packaging=/d /src/requirements/upper-constraints.txt && \
case $(uname -m) in aarch*|arm*) sed -i -E /^PyNaCl=/d /src/requirements/upper-constraints.txt && apk add --no-cache py3-pynacl ;; esac && \
sed -i -E /#egg=functest/d /src/functest/upper-constraints.txt && \
pip3 install --use-deprecated=legacy-resolver --no-cache-dir --src /src -c/src/functest/upper-constraints.txt -c/src/requirements/upper-constraints.txt \
rm -r /src/requirements/.git /src/functest/.git \
/tmp/Switch-to-threading.Thread-for-Rally-tasks.patch \
/tmp/Create-new-server-in-test_create_backup.patch && \
- cp /src/functest/functest/ci/logging.ini /usr/lib/python3.8/site-packages/xtesting/ci/ && \
- cp /src/functest/functest/ci/logging.debug.ini /usr/lib/python3.8/site-packages/xtesting/ci/ && \
+ cp /src/functest/functest/ci/logging.ini /usr/lib/python3.9/site-packages/xtesting/ci/ && \
+ cp /src/functest/functest/ci/logging.debug.ini /usr/lib/python3.9/site-packages/xtesting/ci/ && \
bash -c "mkdir -p /var/lib/xtesting /home/opnfv" && \
ln -s /var/lib/xtesting /home/opnfv/functest && \
bash -c "mkdir -p /home/opnfv/functest{/conf,/data,/images,/results} /home/opnfv/repos/vnfs" && \
git checkout FETCH_HEAD) && \
rm -r /src/odl_test/.git thirdparty-requirements.txt && \
apk del .build-deps
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
CMD ["run_tests", "-t", "all"]
FROM opnfv/functest-smoke
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
COPY tempest_conf.yaml /src/functest/functest/opnfv_tests/openstack/tempest/custom_tests/tempest_conf.yaml
CMD ["run_tests", "-t", "all"]
COPY compute.txt /home/opnfv/functest/data/refstack/compute.txt
COPY object.txt /home/opnfv/functest/data/refstack/object.txt
COPY platform.txt /home/opnfv/functest/data/refstack/platform.txt
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
CMD ["run_tests", "-t", "all"]
/src/vims-test/build-infra/.git /src/opnfv-vnf-vyos-blueprint/.git \
/tmp/clearwater-heat-singlenet-deps.patch && \
apk del .build-deps
-COPY testcases.yaml /usr/lib/python3.8/site-packages/xtesting/ci/testcases.yaml
+COPY testcases.yaml /usr/lib/python3.9/site-packages/xtesting/ci/testcases.yaml
CMD ["run_tests", "-t", "all"]
"""Initialize Cloudify testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify"
- super(Cloudify, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.cfy_client = None
def prepare(self):
- super(Cloudify, self).prepare()
+ super().prepare()
for port in self.ports:
self.cloud.create_security_group_rule(
self.sec.id, port_range_min=port, port_range_max=port,
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmready1'
- super(VmReady1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.image = None
self.flavor = None
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(VmReady1, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.image = self.publish_image()
self.flavor = self.create_flavor()
try:
assert self.orig_cloud
assert self.cloud
- super(VmReady1, self).clean()
+ super().clean()
if self.image:
self.cloud.delete_image(self.image.id)
if self.flavor:
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmready2'
- super(VmReady2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.orig_cloud
self.project = tenantnetwork.NewProject(
def clean(self):
try:
- super(VmReady2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'singlevm1'
- super(SingleVm1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.sshvm = None
self.sec = None
self.fip = None
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(SingleVm1, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
self.prepare()
self.cloud.delete_security_group(self.sec.id)
if self.keypair:
self.cloud.delete_keypair(self.keypair.name)
- super(SingleVm1, self).clean()
+ super().clean()
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean all resources")
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'singlevm2'
- super(SingleVm2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.orig_cloud
self.project = tenantnetwork.NewProject(
def clean(self):
try:
- super(SingleVm2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork1'
- super(TenantNetwork1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
self.res_dir = os.path.join(self.dir_results, self.case_name)
self.output_log_name = 'functest.log'
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork2'
- super(TenantNetwork2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.cloud
self.project = NewProject(
def clean(self):
try:
- super(TenantNetwork2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'connection_check'
- super(ConnectionCheck, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.output_log_name = 'functest.log'
self.output_debug_log_name = 'functest.debug.log'
try:
# pylint: disable=missing-docstring
-import logging
-
from six.moves import configparser
from functest.opnfv_tests.openstack.tempest import tempest
class Barbican(tempest.TempestCommon):
- __logger = logging.getLogger(__name__)
-
def configure(self, **kwargs):
- super(Barbican, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('auth'):
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cinder_test"
- super(CinderCheck, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm2 = None
self.fip2 = None
return self._write_data() or self._read_data()
def prepare(self):
- super(CinderCheck, self).prepare()
+ super().prepare()
self.vm2 = self.boot_vm(
'{}-vm2_{}'.format(self.case_name, self.guid),
key_name=self.keypair.id,
self.cloud.delete_floating_ip(self.fip2.id)
if self.volume:
self.cloud.delete_volume(self.volume.id)
- super(CinderCheck, self).clean()
+ super().clean()
# pylint: disable=missing-docstring
-import logging
-
from six.moves import configparser
from functest.opnfv_tests.openstack.tempest import tempest
class Patrole(tempest.TempestCommon):
- __logger = logging.getLogger(__name__)
-
def configure(self, **kwargs):
- super(Patrole, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('rbac'):
def __init__(self, **kwargs):
"""Initialize RallyBase object."""
- super(RallyBase, self).__init__(**kwargs)
+ super().__init__(**kwargs)
assert self.orig_cloud
assert self.project
if self.orig_cloud.get_role("admin"):
cmd = ("rally deployment list | awk '/" +
getattr(config.CONF, 'rally_deployment_name') +
"/ {print $2}'")
- proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- deployment_uuid = proc.stdout.readline().rstrip()
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as proc:
+ deployment_uuid = proc.stdout.readline().rstrip()
return deployment_uuid.decode("utf-8")
@staticmethod
def apply_blacklist(self, case_file_name, result_file_name):
"""Apply blacklist."""
LOGGER.debug("Applying blacklist...")
- cases_file = open(case_file_name, 'r')
- result_file = open(result_file_name, 'w')
-
- black_tests = list(set(self.excl_func() +
- self.excl_scenario()))
-
- if black_tests:
- LOGGER.debug("Blacklisted tests: %s", str(black_tests))
-
- include = True
- for cases_line in cases_file:
- if include:
- for black_tests_line in black_tests:
- if re.search(black_tests_line,
- cases_line.strip().rstrip(':')):
- include = False
- break
+ with open(case_file_name, 'r') as cases_file, open(
+ result_file_name, 'w') as result_file:
+ black_tests = list(set(self.excl_func() + self.excl_scenario()))
+ if black_tests:
+ LOGGER.debug("Blacklisted tests: %s", str(black_tests))
+
+ include = True
+ for cases_line in cases_file:
+ if include:
+ for black_tests_line in black_tests:
+ if re.search(black_tests_line,
+ cases_line.strip().rstrip(':')):
+ include = False
+ break
+ else:
+ result_file.write(str(cases_line))
else:
- result_file.write(str(cases_line))
- else:
- if cases_line.isspace():
- include = True
-
- cases_file.close()
- result_file.close()
+ if cases_line.isspace():
+ include = True
@staticmethod
def file_is_empty(file_name):
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- json_results = open(report_json_dir).read()
+ with open(report_json_dir) as json_file:
+ json_results = json_file.read()
self._append_summary(json_results, test_name)
# parse JSON operation result
self.clean_rally_logs()
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(RallyBase, self).clean()
+ super().clean()
def is_successful(self):
"""The overall result of the test."""
if item['task_status'] is False:
return testcase.TestCase.EX_TESTCASE_FAILED
- return super(RallyBase, self).is_successful()
+ return super().is_successful()
@staticmethod
def update_rally_logs(res_dir, rally_conf='/etc/rally/rally.conf'):
"""Run testcase."""
self.start_time = time.time()
try:
- assert super(RallyBase, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.update_rally_logs(self.res_dir)
self.create_rally_deployment(environ=self.project.get_environ())
"""Initialize RallySanity object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_sanity"
- super(RallySanity, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.smoke = True
self.scenario_dir = os.path.join(self.rally_scenario_dir, 'sanity')
"""Initialize RallyFull object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_full"
- super(RallyFull, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.smoke = False
self.scenario_dir = os.path.join(self.rally_scenario_dir, 'full')
"""Initialize RallyJobs object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_jobs"
- super(RallyJobs, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.task_file = os.path.join(self.rally_dir, 'rally_jobs.yaml')
self.task_yaml = None
def prepare_run(self, **kwargs):
"""Create resources needed by test scenarios."""
- super(RallyJobs, self).prepare_run(**kwargs)
+ super().prepare_run(**kwargs)
with open(os.path.join(self.rally_dir,
'rally_jobs.yaml'), 'r') as task_file:
self.task_yaml = yaml.safe_load(task_file)
check_console_loop = 12
def __init__(self, **kwargs):
- super(Shaker, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.role = None
def check_requirements(self):
self.project.clean()
def prepare(self):
- super(Shaker, self).prepare()
+ super().prepare()
self.cloud.create_security_group_rule(
self.sec.id, port_range_min=self.port, port_range_max=self.port,
protocol='tcp', direction='ingress')
return stdout.channel.recv_exit_status()
def clean(self):
- super(Shaker, self).clean()
+ super().clean()
if self.role:
self.orig_cloud.delete_role(self.role.id)
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tempest'
- super(TempestCommon, self).__init__(**kwargs)
+ super().__init__(**kwargs)
assert self.orig_cloud
assert self.cloud
assert self.project
}
cmd = ["rally", "verify", "show", "--uuid", verif_id]
LOGGER.info("Showing result for a verification: '%s'.", cmd)
- proc = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- for line in proc.stdout:
- LOGGER.info(line.decode("utf-8").rstrip())
- new_line = line.decode("utf-8").replace(' ', '').split('|')
- if 'Tests' in new_line:
- break
- if 'Testscount' in new_line:
- result['num_tests'] = int(new_line[2])
- elif 'Success' in new_line:
- result['num_success'] = int(new_line[2])
- elif 'Skipped' in new_line:
- result['num_skipped'] = int(new_line[2])
- elif 'Failures' in new_line:
- result['num_failures'] = int(new_line[2])
+ with subprocess.Popen(
+ cmd, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as proc:
+ for line in proc.stdout:
+ LOGGER.info(line.decode("utf-8").rstrip())
+ new_line = line.decode("utf-8").replace(' ', '').split('|')
+ if 'Tests' in new_line:
+ break
+ if 'Testscount' in new_line:
+ result['num_tests'] = int(new_line[2])
+ elif 'Success' in new_line:
+ result['num_success'] = int(new_line[2])
+ elif 'Skipped' in new_line:
+ result['num_skipped'] = int(new_line[2])
+ elif 'Failures' in new_line:
+ result['num_failures'] = int(new_line[2])
return result
@staticmethod
cmd = ("rally verify list-verifiers | awk '/" +
getattr(config.CONF, 'tempest_verifier_name') +
"/ {print $2}'")
- proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
- verifier_uuid = proc.stdout.readline().rstrip()
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL) as proc:
+ verifier_uuid = proc.stdout.readline().rstrip()
return verifier_uuid.decode("utf-8")
@staticmethod
os.remove(self.raw_list)
os.rename(self.list, self.raw_list)
cases_file = self.read_file(self.raw_list)
- result_file = open(self.list, 'w')
- black_tests = []
- try:
- deploy_scenario = env.get('DEPLOY_SCENARIO')
- if bool(deploy_scenario):
- # if DEPLOY_SCENARIO is set we read the file
- black_list_file = open(black_list)
- black_list_yaml = yaml.safe_load(black_list_file)
- black_list_file.close()
- for item in black_list_yaml:
- scenarios = item['scenarios']
- in_it = rally.RallyBase.in_iterable_re
- if in_it(deploy_scenario, scenarios):
- tests = item['tests']
- black_tests.extend(tests)
- except Exception: # pylint: disable=broad-except
+ with open(self.list, 'w') as result_file:
black_tests = []
- LOGGER.debug("Tempest blacklist file does not exist.")
+ try:
+ deploy_scenario = env.get('DEPLOY_SCENARIO')
+ if bool(deploy_scenario):
+ # if DEPLOY_SCENARIO is set we read the file
+ with open(black_list) as black_list_file:
+ black_list_yaml = yaml.safe_load(black_list_file)
+ black_list_file.close()
+ for item in black_list_yaml:
+ scenarios = item['scenarios']
+ in_it = rally.RallyBase.in_iterable_re
+ if in_it(deploy_scenario, scenarios):
+ tests = item['tests']
+ black_tests.extend(tests)
+ except Exception: # pylint: disable=broad-except
+ black_tests = []
+ LOGGER.debug("Tempest blacklist file does not exist.")
- for cases_line in cases_file:
- for black_tests_line in black_tests:
- if re.search(black_tests_line, cases_line):
- break
- else:
- result_file.write(str(cases_line) + '\n')
- result_file.close()
+ for cases_line in cases_file:
+ for black_tests_line in black_tests:
+ if re.search(black_tests_line, cases_line):
+ break
+ else:
+ result_file.write(str(cases_line) + '\n')
def run_verifier_tests(self, **kwargs):
"""Execute tempest test cases."""
cmd.extend(kwargs.get('option', []))
LOGGER.info("Starting Tempest test suite: '%s'.", cmd)
- f_stdout = open(
- os.path.join(self.res_dir, "tempest.log"), 'w+')
-
- proc = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- bufsize=1)
-
- with proc.stdout:
- for line in iter(proc.stdout.readline, b''):
- if re.search(r"\} tempest\.", line.decode("utf-8")):
- LOGGER.info(line.rstrip())
- elif re.search(r'(?=\(UUID=(.*)\))', line.decode("utf-8")):
- self.verification_id = re.search(
- r'(?=\(UUID=(.*)\))', line.decode("utf-8")).group(1)
- f_stdout.write(line.decode("utf-8"))
- proc.wait()
- f_stdout.close()
+ with open(
+ os.path.join(self.res_dir, "tempest.log"), 'w+') as f_stdout:
+
+ with subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ bufsize=1) as proc:
+
+ with proc.stdout:
+ for line in iter(proc.stdout.readline, b''):
+ if re.search(r"\} tempest\.", line.decode("utf-8")):
+ LOGGER.info(line.rstrip())
+ elif re.search(r'(?=\(UUID=(.*)\))',
+ line.decode("utf-8")):
+ self.verification_id = re.search(
+ r'(?=\(UUID=(.*)\))',
+ line.decode("utf-8")).group(1)
+ f_stdout.write(line.decode("utf-8"))
+ proc.wait()
if self.verification_id is None:
raise Exception('Verification UUID not found')
def run(self, **kwargs):
self.start_time = time.time()
try:
- assert super(TempestCommon, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
if not os.path.exists(self.res_dir):
os.makedirs(self.res_dir)
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(TempestCommon, self).clean()
+ super().clean()
def is_successful(self):
"""The overall result of the test."""
if self.tests_count and (
self.details.get("tests_number", 0) != self.tests_count):
return testcase.TestCase.EX_TESTCASE_FAILED
- return super(TempestCommon, self).is_successful()
+ return super().is_successful()
class TempestHeat(TempestCommon):
flavor_alt_disk = 4
def __init__(self, **kwargs):
- super(TempestHeat, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.user2 = self.orig_cloud.create_user(
name='{}-user2_{}'.format(self.case_name, self.project.guid),
password=self.project.password,
def configure(self, **kwargs):
assert self.user2
- super(TempestHeat, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('heat_plugin'):
"""
Cleanup all OpenStack objects. Should be called on completion.
"""
- super(TempestHeat, self).clean()
+ super().clean()
if self.user2:
self.orig_cloud.delete_user(self.user2.id)
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmtp'
- super(Vmtp, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.config = "{}/vmtp.conf".format(self.res_dir)
(_, self.privkey_filename) = tempfile.mkstemp()
(_, self.pubkey_filename) = tempfile.mkstemp()
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(Vmtp, self).run(**kwargs) == self.EX_OK
+ assert super().run(**kwargs) == self.EX_OK
status = testcase.TestCase.EX_RUN_ERROR
if self.orig_cloud.get_role("admin"):
role_name = "admin"
def clean(self):
try:
assert self.cloud
- super(Vmtp, self).clean()
+ super().clean()
os.remove(self.privkey_filename)
os.remove(self.pubkey_filename)
self.cloud.delete_network("pns-internal-net_{}".format(self.guid))
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_ssh"
- super(VPingSSH, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.vm2 = None
def prepare(self):
- super(VPingSSH, self).prepare()
+ super().prepare()
self.vm2 = self.boot_vm(
'{}-vm2_{}'.format(self.case_name, self.guid),
security_groups=[self.sec.id])
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- super(VPingSSH, self).clean()
+ super().clean()
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_userdata"
- super(VPingUserdata, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm1 = None
self.vm2 = None
"""
try:
assert self.cloud
- assert super(VPingUserdata, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
self.vm1 = self.boot_vm()
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- super(VPingUserdata, self).clean()
+ super().clean()
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
- super(ODLTests, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.res_dir = os.path.join(
getattr(config.CONF, 'dir_results'), 'odl')
self.xml_file = os.path.join(self.res_dir, 'output.xml')
else:
if not self.set_robotframework_vars(odlusername, odlpassword):
return self.EX_RUN_ERROR
- return super(ODLTests, self).run(variable=variable, suites=suites)
+ return super().run(variable=variable, suites=suites)
def run(self, **kwargs):
"""Run suites in OPNFV environment
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
- super(JujuEpc, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
self.case_dir = pkg_resources.resource_filename(
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
requirements=functest_utils.get_parameter_from_yaml(
return not stdout.channel.recv_exit_status()
def publish_image_alt(self, name=None):
- image_alt = super(JujuEpc, self).publish_image_alt(name)
+ image_alt = super().publish_image_alt(name)
region_name = self.cloud.region_name if self.cloud.region_name else (
'RegionOne')
(_, stdout, stderr) = self.ssh.exec_command(
except OSError as ex:
if ex.errno != errno.EEXIST:
self.__logger.exception("Cannot create %s", self.res_dir)
- raise Exception
+ raise Exception from ex
self.__logger.info("ENV:\n%s", env.string())
try:
assert self._install_juju()
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(JujuEpc, self).clean()
+ super().clean()
def sig_test_format(sig_test):
"""Initialize CloudifyIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
- super(CloudifyIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
network, security group, fip, VM creation
"""
- assert super(CloudifyIms, self).execute() == 0
+ assert super().execute() == 0
start_time = time.time()
self.orig_cloud.set_network_quotas(
self.project.project.name,
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyIms, self).clean()
+ super().clean()
"""Initialize HeatIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "heat_ims"
- super(HeatIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(HeatIms, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
if not self.execute():
pass
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean stack ressources")
- super(HeatIms, self).clean()
+ super().clean()
if self.role:
self.orig_cloud.delete_role(self.role.id)
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vyos_vrouter"
- super(CloudifyVrouter, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/router')
network, security group, fip, VM creation
"""
# network creation
- super(CloudifyVrouter, self).execute()
+ super().execute()
start_time = time.time()
self.put_private_key()
self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyVrouter, self).clean()
+ super().clean()
credentials = util_info["credentials"]
self.vnf_ctrl = VnfController(util_info)
- test_cmd_map_file = open(
- os.path.join(
- self.util.vnf_data_dir, self.util.command_template_dir,
- self.util.test_cmd_map_yaml_file),
- 'r')
- self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- test_cmd_map_file.close()
+ with open(
+ os.path.join(
+ self.util.vnf_data_dir, self.util.command_template_dir,
+ self.util.test_cmd_map_yaml_file),
+ 'r') as test_cmd_map_file:
+ self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
self.util.set_credentials(credentials["cloud"])
def write_result_data(self, result_data):
test_result = []
if not os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "w")
- file_fd.close()
+ with open(self.test_result_json_file, "w") as file_fd:
+ pass
else:
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "r") as file_fd:
+ test_result = json.load(file_fd)
test_result.append(result_data)
- file_fd = open(self.test_result_json_file, "w")
- json.dump(test_result, file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "w") as file_fd:
+ json.dump(test_result, file_fd)
def output_test_result_json(self):
if os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "r") as file_fd:
+ test_result = json.load(file_fd)
output_json_data = json.dumps(test_result,
sort_keys=True,
indent=4)
@staticmethod
def get_test_scenario(file_path):
- test_scenario_file = open(file_path,
- 'r')
- test_scenario_yaml = yaml.safe_load(test_scenario_file)
- test_scenario_file.close()
+ with open(file_path, 'r') as test_scenario_file:
+ test_scenario_yaml = yaml.safe_load(test_scenario_file)
return test_scenario_yaml["test_scenario_list"]
def command_create_and_execute(self, ssh, test_cmd_file_path,
cmd_input_param, prompt_file_path):
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
config_mode_prompt = prompt["config_mode"]
commands = self.command_gen_from_template(test_cmd_file_path,
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
parameter_file_path, prompt_file_path):
# pylint: disable=too-many-arguments
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(parameter_file_path, 'r') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"]
res_dict_data_list = []
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(parameter_file_path, 'r') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
cmd_input_param["destination_ip"] = reference_vnf[
"data_plane_network_ip"]
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
terminal_mode_prompt = prompt["terminal_mode"]
ssh = SshClient(target_vnf["floating_ip"],
def setUp(self):
self.parser = odl.ODLParser()
- super(ODLArgParserTesting, self).setUp()
+ super().setUp()
def test_default(self):
self.assertEqual(self.parser.parse_args(), self.defaultargs)
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'tempest.subprocess.Popen') as mock_popen:
- mock_stdout = mock.Mock()
- attrs = {'stdout.readline.return_value': b'test_deploy_id'}
- mock_stdout.configure_mock(**attrs)
- mock_popen.return_value = mock_stdout
+ attrs = {'return_value.__enter__.return_value.'
+ 'stdout.readline.return_value': b'test_deploy_id'}
+ mock_popen.configure_mock(**attrs)
self.assertEqual(self.tempestcommon.get_verifier_id(),
'test_deploy_id')
setattr(config.CONF, 'tempest_verifier_name', 'test_deploy_name')
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'tempest.subprocess.Popen') as mock_popen:
- mock_stdout = mock.Mock()
- attrs = {'stdout.readline.return_value': b'test_deploy_id'}
- mock_stdout.configure_mock(**attrs)
- mock_popen.return_value = mock_stdout
+ attrs = {'return_value.__enter__.return_value.'
+ 'stdout.readline.return_value': b'test_deploy_id'}
+ mock_popen.configure_mock(**attrs)
self.assertEqual(rally.RallyBase.get_verifier_deployment_id(),
'test_deploy_id')
@mock.patch('six.moves.builtins.open')
def test_generate_keys2(self, *args):
- # pylint: disable=bad-continuation
with mock.patch.object(
self.testcase.cloud, "create_keypair",
side_effect=shade.OpenStackCloudException(None)) as mock_obj, \
mock.mock_open()) as mopen:
stream = six.BytesIO()
stream.write(self.cmd_readline().encode("utf-8"))
- mock_obj2 = mock.Mock()
- attrs = {'stdout': stream, 'wait.return_value': 1}
- mock_obj2.configure_mock(**attrs)
- mock_subproc_open.return_value = mock_obj2
+ attrs = {
+ 'return_value.__enter__.return_value.stdout': stream,
+ 'return_value.__enter__.return_value.wait.return_value': 1}
+ mock_subproc_open.configure_mock(**attrs)
resp = functest_utils.execute_command(
self.cmd, info=True, error_msg=self.error_msg, verbose=True,
output_file=self.output_file)
mock.mock_open()) as mopen:
stream = six.BytesIO()
stream.write(self.cmd_readline().encode("utf-8"))
- mock_obj2 = mock.Mock()
- attrs = {'stdout': stream, 'wait.return_value': 0}
- mock_obj2.configure_mock(**attrs)
- mock_subproc_open.return_value = mock_obj2
+ attrs = {
+ 'return_value.__enter__.return_value.stdout': stream,
+ 'return_value.__enter__.return_value.wait.return_value': 0}
+ mock_subproc_open.configure_mock(**attrs)
resp = functest_utils.execute_command(
self.cmd, info=True, error_msg=self.error_msg, verbose=True,
output_file=self.output_file)
as mock_subproc_open:
stream = six.BytesIO()
stream.write(self.cmd_readline().encode("utf-8"))
- mock_obj2 = mock.Mock()
- attrs = {'stdout': stream, 'wait.return_value': 0}
- mock_obj2.configure_mock(**attrs)
- mock_subproc_open.return_value = mock_obj2
+ attrs = {
+ 'return_value.__enter__.return_value.stdout': stream,
+ 'return_value.__enter__.return_value.wait.return_value': 0}
+ mock_subproc_open.configure_mock(**attrs)
resp = functest_utils.execute_command(
self.cmd, info=False, error_msg="", verbose=False,
output_file=None)
# pylint: disable=unused-argument
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open:
+ attrs = {}
stream = six.BytesIO()
stream.write(self.cmd_readline().encode("utf-8"))
- mock_obj2 = mock.Mock()
- attrs = {'stdout': stream, 'wait.return_value': 1}
- mock_obj2.configure_mock(**attrs)
- mock_subproc_open.return_value = mock_obj2
+ attrs = {
+ 'return_value.__enter__.return_value.stdout': stream,
+ 'return_value.__enter__.return_value.wait.return_value': 1}
+ mock_subproc_open.configure_mock(**attrs)
resp = functest_utils.execute_command(
self.cmd, info=False, error_msg="", verbose=False,
output_file=None)
class Config():
def __init__(self):
try:
- # pylint: disable=bad-continuation
with open(pkg_resources.resource_filename(
'functest', 'ci/config_functest.yaml')) as yfile:
self.functest_yaml = yaml.safe_load(yfile)
except Exception as error:
- raise Exception('Parse config failed: {}'.format(str(error)))
+ raise Exception(
+ 'Parse config failed: {}'.format(str(error))) from error
@staticmethod
def _merge_dicts(dict1, dict2):
try:
self._parse(None, self.functest_yaml)
except Exception as error:
- raise Exception('Parse config failed: {}'.format(str(error)))
+ raise Exception(
+ 'Parse config failed: {}'.format(str(error))) from error
CONF = Config()
LOGGER.info(msg_exec)
else:
LOGGER.debug(msg_exec)
- popen = subprocess.Popen(
- cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- if output_file:
- ofd = open(output_file, "w")
- for line in iter(popen.stdout.readline, b''):
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as popen:
if output_file:
- ofd.write(line.decode("utf-8"))
- else:
- line = line.decode("utf-8").replace('\n', '')
- print(line)
- sys.stdout.flush()
- if output_file:
- ofd.close()
- popen.stdout.close()
- returncode = popen.wait()
+ with open(output_file, "w") as ofd:
+ for line in iter(popen.stdout.readline, b''):
+ if output_file:
+ ofd.write(line.decode("utf-8"))
+ else:
+ line = line.decode("utf-8").replace('\n', '')
+ print(line)
+ sys.stdout.flush()
+ returncode = popen.wait()
if returncode != 0:
if verbose:
LOGGER.error(error_msg)
[tox]
-envlist = docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py38,cover,perm
+envlist = docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py39,cover,perm
[testenv]
pip_version = pip==20.2.4
functest/tests/unit
[testenv:docs]
-basepython = python3.8
+basepython = python3.9
commands =
doc8 \
--ignore-path api/build \
sphinx-build -W -b spelling -Dextensions=sphinxcontrib.spelling docs docs/build/spellcheck
[testenv:pep8]
-basepython = python3.8
+basepython = python3.9
commands = flake8
[testenv:pylint]
-basepython = python3.8
+basepython = python3.9
commands =
pylint \
- --ignore-imports=y --min-similarity-lines=10 \
+ --ignore-imports=y --min-similarity-lines=15 \
--disable=locally-disabled functest
[testenv:yamllint]
-basepython = python3.8
+basepython = python3.9
files =
.travis.yml
docker
yamllint -s {[testenv:yamllint]files}
[testenv:ansiblelint]
-basepython = python3.8
+basepython = python3.9
commands =
ansible-lint -x303 ansible/site.yml
commands = nosetests functest/tests/unit
[testenv:bashate]
-basepython = python3.8
+basepython = python3.9
files =
functest/opnfv_tests/openstack/cinder/write_data.sh
functest/opnfv_tests/openstack/cinder/read_data.sh
[testenv:bandit]
-basepython = python3.8
+basepython = python3.9
commands = bandit -r functest -x tests -n 5 -ll -s B601,B602
[testenv:cover]
-basepython = python3.8
+basepython = python3.9
dirs =
functest/tests/unit/odl
functest/tests/unit/openstack/vping
--cover-min-percentage 100 {[testenv:cover]dirs}
[testenv:perm]
-basepython = python3.8
+basepython = python3.9
whitelist_externals = bash
path=. -not -path './.tox/*' -not -path './.git/*' -not -path './docs/com/pres/reveal.js/*'
commands =