It mostly add encoding in open calls and leverage f-strings.
Change-Id: Ifead18fc724a452c1067dcf91dc577032edc9c59
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
     - stage: run unit tests
       script: >
         tox -e \
-          docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py38,cover
+          docs,pep8,pylint,yamllint,bashate,bandit,py38,cover
     - stage: build functest-core images
       script: sudo -E bash build.sh
       env:
 
 RUN apk -U upgrade && \
     apk --no-cache add --update \
         python3 py3-wheel libffi openssl libjpeg-turbo py3-pip bash \
-        grep sed wget ca-certificates git openssh-client qemu-img iputils coreutils mailcap libstdc++ && \
+        grep sed wget ca-certificates git openssh-client qemu-img iputils coreutils mailcap libstdc++ \
+        libxml2 libxslt && \
     apk --no-cache add --virtual .build-deps --update \
         python3-dev build-base linux-headers libffi-dev \
-        openssl-dev libjpeg-turbo-dev rust cargo && \
+        openssl-dev libjpeg-turbo-dev rust cargo \
+        libxml2-dev libxslt-dev && \
     apk --no-cache add --update py3-distlib\>=0.3.1 \
         --repository=http://dl-cdn.alpinelinux.org/alpine/edge/main && \
     git init /src/requirements && \
 
             "sudo wget https://get.docker.com/ -O script.sh && "
             "sudo chmod +x script.sh && "
             "sudo ./script.sh && "
-            "sudo docker load -i ~/{} && "
+            "sudo docker load -i "
+            f"~/{os.path.basename(self.cloudify_archive)} && "
             "sudo docker run --name cfy_manager_local -d "
             "--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro "
             "--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined "
-            "--cap-add SYS_ADMIN --network=host {}".format(
-                os.path.basename(self.cloudify_archive),
-                self.cloudify_container))
+            f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}")
         self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
         self.cfy_client = CloudifyClient(
         """Upload Cloudify plugins"""
         (_, stdout, stderr) = self.ssh.exec_command(
             "sudo docker exec cfy_manager_local "
-            "cfy plugins upload -y {} {} && "
-            "sudo docker exec cfy_manager_local cfy status".format(yaml, wgn))
+            f"cfy plugins upload -y {yaml} {wgn} && "
+            "sudo docker exec cfy_manager_local cfy status")
         self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
 
         if timeout is not None:
             if time.time() > deadline:
                 raise RuntimeError(
-                    'execution of operation {0} for deployment {1} '
-                    'timed out'.format(execution.workflow_id,
-                                       execution.deployment_id))
+                    'execution of operation {execution.workflow_id} for '
+                    'deployment {execution.deployment_id} timed out')
             # update the remaining timeout
             timeout = deadline - time.time()
 
             return execution
     raise RuntimeError('Failed to get create_deployment_environment '
                        'workflow execution.'
-                       'Available executions: {0}'.format(executions))
+                       f'Available executions: {executions}')
 
                 functest_utils.convert_ini_to_dict(
                     env.get('IMAGE_PROPERTIES')))
         extra_properties.update(
-            getattr(config.CONF, '{}_extra_properties'.format(
-                self.case_name), {}))
+            getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
         image = self.cloud.create_image(
-            name if name else '{}-img_{}'.format(self.case_name, self.guid),
+            name if name else f'{self.case_name}-img_{self.guid}',
             filename=getattr(
-                config.CONF, '{}_image'.format(self.case_name),
+                config.CONF, f'{self.case_name}_image',
                 self.filename),
             meta=extra_properties,
             disk_format=getattr(
-                config.CONF, '{}_image_format'.format(self.case_name),
+                config.CONF, f'{self.case_name}_image_format',
                 self.image_format),
             visibility=getattr(
-                config.CONF, '{}_visibility'.format(self.case_name),
+                config.CONF, f'{self.case_name}_visibility',
                 self.visibility),
             wait=True)
         self.__logger.debug("image: %s", image)
                 functest_utils.convert_ini_to_dict(
                     env.get('IMAGE_PROPERTIES')))
         extra_alt_properties.update(
-            getattr(config.CONF, '{}_extra_alt_properties'.format(
-                self.case_name), {}))
+            getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {}))
         image = self.cloud.create_image(
-            name if name else '{}-img_alt_{}'.format(
-                self.case_name, self.guid),
+            name if name else f'{self.case_name}-img_alt_{self.guid}',
             filename=getattr(
-                config.CONF, '{}_image_alt'.format(self.case_name),
+                config.CONF, f'{self.case_name}_image_alt',
                 self.filename_alt),
             meta=extra_alt_properties,
             disk_format=getattr(
-                config.CONF, '{}_image_alt_format'.format(self.case_name),
+                config.CONF, f'{self.case_name}_image_alt_format',
                 self.image_format),
             visibility=getattr(
-                config.CONF, '{}_visibility'.format(self.case_name),
+                config.CONF, f'{self.case_name}_visibility',
                 self.visibility),
             wait=True)
         self.__logger.debug("image: %s", image)
         """
         assert self.orig_cloud
         flavor = self.orig_cloud.create_flavor(
-            name if name else '{}-flavor_{}'.format(self.case_name, self.guid),
-            getattr(config.CONF, '{}_flavor_ram'.format(self.case_name),
+            name if name else f'{self.case_name}-flavor_{self.guid}',
+            getattr(config.CONF, f'{self.case_name}_flavor_ram',
                     self.flavor_ram),
-            getattr(config.CONF, '{}_flavor_vcpus'.format(self.case_name),
+            getattr(config.CONF, f'{self.case_name}_flavor_vcpus',
                     self.flavor_vcpus),
-            getattr(config.CONF, '{}_flavor_disk'.format(self.case_name),
+            getattr(config.CONF, f'{self.case_name}_flavor_disk',
                     self.flavor_disk))
         self.__logger.debug("flavor: %s", flavor)
         flavor_extra_specs = self.flavor_extra_specs.copy()
                     env.get('FLAVOR_EXTRA_SPECS')))
         flavor_extra_specs.update(
             getattr(config.CONF,
-                    '{}_flavor_extra_specs'.format(self.case_name), {}))
+                    f'{self.case_name}_flavor_extra_specs', {}))
         self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs)
         return flavor
 
         """
         assert self.orig_cloud
         flavor = self.orig_cloud.create_flavor(
-            name if name else '{}-flavor_alt_{}'.format(
-                self.case_name, self.guid),
-            getattr(config.CONF, '{}_flavor_alt_ram'.format(self.case_name),
+            name if name else f'{self.case_name}-flavor_alt_{self.guid}',
+            getattr(config.CONF, f'{self.case_name}_flavor_alt_ram',
                     self.flavor_alt_ram),
-            getattr(config.CONF, '{}_flavor_alt_vcpus'.format(self.case_name),
+            getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus',
                     self.flavor_alt_vcpus),
-            getattr(config.CONF, '{}_flavor_alt_disk'.format(self.case_name),
+            getattr(config.CONF, f'{self.case_name}_flavor_alt_disk',
                     self.flavor_alt_disk))
         self.__logger.debug("flavor: %s", flavor)
         flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
                     env.get('FLAVOR_EXTRA_SPECS')))
         flavor_alt_extra_specs.update(
             getattr(config.CONF,
-                    '{}_flavor_alt_extra_specs'.format(self.case_name), {}))
+                    f'{self.case_name}_flavor_alt_extra_specs', {}))
         self.orig_cloud.set_flavor_specs(
             flavor.id, flavor_alt_extra_specs)
         return flavor
         """
         assert self.cloud
         vm1 = self.cloud.create_server(
-            name if name else '{}-vm_{}'.format(self.case_name, self.guid),
+            name if name else f'{self.case_name}-vm_{self.guid}',
             image=self.image.id, flavor=self.flavor.id,
             auto_ip=False,
             network=self.network.id if self.network else env.get(
         """
         assert self.cloud
         self.keypair = self.cloud.create_keypair(
-            '{}-kp_{}'.format(self.case_name, self.guid))
+            f'{self.case_name}-kp_{self.guid}')
         self.__logger.debug("keypair: %s", self.keypair)
         self.__logger.debug("private_key:\n%s", self.keypair.private_key)
-        with open(self.key_filename, 'w') as private_key_file:
+        with open(
+                self.key_filename, 'w', encoding='utf-8') as private_key_file:
             private_key_file.write(self.keypair.private_key)
         self.sec = self.cloud.create_security_group(
-            '{}-sg_{}'.format(self.case_name, self.guid),
-            'created by OPNFV Functest ({})'.format(self.case_name))
+            f'{self.case_name}-sg_{self.guid}',
+            f'created by OPNFV Functest ({self.case_name})')
         self.cloud.create_security_group_rule(
             self.sec.id, port_range_min='22', port_range_max='22',
             protocol='tcp', direction='ingress')
                     fip.floating_ip_address if fip else vm1.public_v4,
                     username=getattr(
                         config.CONF,
-                        '{}_image_user'.format(self.case_name), self.username),
+                        f'{self.case_name}_image_user', self.username),
                     key_filename=self.key_filename,
                     timeout=getattr(
                         config.CONF,
-                        '{}_vm_ssh_connect_timeout'.format(self.case_name),
+                        f'{self.case_name}_vm_ssh_connect_timeout',
                         self.ssh_connect_timeout))
                 break
             except Exception as exc:  # pylint: disable=broad-except
 
             name_or_id=self.orig_cloud.auth.get(
                 "project_domain_name", "Default"))
         self.project = self.orig_cloud.create_project(
-            name='{}-project_{}'.format(self.case_name[:18], self.guid),
-            description="Created by OPNFV Functest: {}".format(
-                self.case_name),
+            name=f'{self.case_name[:18]}-project_{self.guid}',
+            description=f"Created by OPNFV Functest: {self.case_name}",
             domain_id=self.domain.id)
         self.__logger.debug("project: %s", self.project)
         self.user = self.orig_cloud.create_user(
-            name='{}-user_{}'.format(self.case_name, self.guid),
+            name=f'{self.case_name}-user_{self.guid}',
             password=self.password,
             domain_id=self.domain.id)
         self.__logger.debug("user: %s", self.user)
             elif self.orig_cloud.get_role(self.default_member.lower()):
                 self.role_name = self.default_member.lower()
             else:
-                raise Exception("Cannot detect {}".format(self.default_member))
+                raise Exception(f"Cannot detect {self.default_member}")
         except Exception:  # pylint: disable=broad-except
             self.__logger.info("Creating default role %s", self.default_member)
             role = self.orig_cloud.create_role(self.default_member)
         if env.get('NO_TENANT_NETWORK').lower() != 'true':
             assert self.ext_net
         provider = {}
-        if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
+        if hasattr(config.CONF, f'{self.case_name}_network_type'):
             provider["network_type"] = getattr(
-                config.CONF, '{}_network_type'.format(self.case_name))
-        if hasattr(config.CONF, '{}_physical_network'.format(self.case_name)):
+                config.CONF, f'{self.case_name}_network_type')
+        if hasattr(config.CONF, f'{self.case_name}_physical_network'):
             provider["physical_network"] = getattr(
-                config.CONF, '{}_physical_network'.format(self.case_name))
-        if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
+                config.CONF, f'{self.case_name}_physical_network')
+        if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
             provider["segmentation_id"] = getattr(
-                config.CONF, '{}_segmentation_id'.format(self.case_name))
+                config.CONF, f'{self.case_name}_segmentation_id')
         domain = self.orig_cloud.get_domain(
             name_or_id=self.orig_cloud.auth.get(
                 "project_domain_name", "Default"))
             self.cloud.auth['project_name'],
             domain_id=domain.id)
         self.network = self.orig_cloud.create_network(
-            '{}-net_{}'.format(self.case_name, self.guid),
+            f'{self.case_name}-net_{self.guid}',
             provider=provider, project_id=project.id,
             shared=self.shared_network)
         self.__logger.debug("network: %s", self.network)
 
         self.subnet = self.cloud.create_subnet(
             self.network.id,
-            subnet_name='{}-subnet_{}'.format(self.case_name, self.guid),
+            subnet_name=f'{self.case_name}-subnet_{self.guid}',
             cidr=getattr(
-                config.CONF, '{}_private_subnet_cidr'.format(self.case_name),
+                config.CONF, f'{self.case_name}_private_subnet_cidr',
                 self.cidr),
             enable_dhcp=True,
             dns_nameservers=[env.get('NAMESERVER')])
         self.__logger.debug("subnet: %s", self.subnet)
 
         self.router = self.cloud.create_router(
-            name='{}-router_{}'.format(self.case_name, self.guid),
+            name=f'{self.case_name}-router_{self.guid}',
             ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
         self.__logger.debug("router: %s", self.router)
         self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
 
         if not rconfig.has_section('image-feature-enabled'):
             rconfig.add_section('image-feature-enabled')
         rconfig.set('image-feature-enabled', 'api_v1', False)
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
         self.backup_tempest_config(self.conf_file, self.res_dir)
 
     def prepare(self):
         super().prepare()
         self.vm2 = self.boot_vm(
-            '{}-vm2_{}'.format(self.case_name, self.guid),
+            f'{self.case_name}-vm2_{self.guid}',
             key_name=self.keypair.id,
             security_groups=[self.sec.id])
         (self.fip2, self.ssh2) = self.connect(self.vm2)
         self.volume = self.cloud.create_volume(
-            name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
+            name=f'{self.case_name}-volume_{self.guid}', size='2',
             timeout=self.volume_timeout, wait=True)
 
     def _write_data(self):
             return testcase.TestCase.EX_RUN_ERROR
         self.logger.debug("ssh: %s", self.ssh)
         (_, stdout, stderr) = self.ssh.exec_command(
-            "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+            f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}")
         self.logger.debug(
             "volume_write stdout: %s", stdout.read().decode("utf-8"))
         self.logger.debug(
             return testcase.TestCase.EX_RUN_ERROR
         self.logger.debug("ssh: %s", self.ssh2)
         (_, stdout, stderr) = self.ssh2.exec_command(
-            "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+            f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}")
         self.logger.debug(
             "read volume stdout: %s", stdout.read().decode("utf-8"))
         self.logger.debug(
 
         if not rconfig.has_section('rbac'):
             rconfig.add_section('rbac')
         rconfig.set('rbac', 'rbac_test_roles', kwargs.get('roles', 'admin'))
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
         self.backup_tempest_config(self.conf_file, self.res_dir)
 
 
     def _prepare_test_list(self, test_name):
         """Build the list of test cases to be executed."""
-        test_yaml_file_name = 'opnfv-{}.yaml'.format(test_name)
+        test_yaml_file_name = f'opnfv-{test_name}.yaml'
         scenario_file_name = os.path.join(self.rally_scenario_dir,
                                           test_yaml_file_name)
 
                                               test_yaml_file_name)
 
             if not os.path.exists(scenario_file_name):
-                raise Exception("The scenario '%s' does not exist."
-                                % scenario_file_name)
+                raise Exception(
+                    f"The scenario '{scenario_file_name}' does not exist.")
 
         LOGGER.debug('Scenario fetched from : %s', scenario_file_name)
         test_file_name = os.path.join(self.temp_dir, test_yaml_file_name)
 
         if pod_arch and pod_arch in arch_filter:
             LOGGER.info("Apply aarch64 specific to rally config...")
-            with open(RallyBase.rally_aar4_patch_path, "r") as pfile:
+            with open(
+                    RallyBase.rally_aar4_patch_path, "r",
+                    encoding='utf-8') as pfile:
                 rally_patch_conf = pfile.read()
 
             for line in fileinput.input(RallyBase.rally_conf_path):
                 rconfig.add_section('openstack')
             rconfig.set(
                 'openstack', 'keystone_default_role', env.get("NEW_USER_ROLE"))
-            with open(rally_conf, 'w') as config_file:
+            with open(rally_conf, 'w', encoding='utf-8') as config_file:
                 rconfig.write(config_file)
 
     @staticmethod
             rconfig.read(rally_conf)
             if rconfig.has_option('openstack', 'keystone_default_role'):
                 rconfig.remove_option('openstack', 'keystone_default_role')
-            with open(rally_conf, 'w') as config_file:
+            with open(rally_conf, 'w', encoding='utf-8') as config_file:
                 rconfig.write(config_file)
 
     @staticmethod
         """Exclude scenario."""
         black_tests = []
         try:
-            with open(RallyBase.blacklist_file, 'r') as black_list_file:
+            with open(
+                    RallyBase.blacklist_file, 'r',
+                    encoding='utf-8') as black_list_file:
                 black_list_yaml = yaml.safe_load(black_list_file)
 
             deploy_scenario = env.get('DEPLOY_SCENARIO')
         func_list = []
 
         try:
-            with open(RallyBase.blacklist_file, 'r') as black_list_file:
+            with open(
+                    RallyBase.blacklist_file, 'r',
+                    encoding='utf-8') as black_list_file:
                 black_list_yaml = yaml.safe_load(black_list_file)
 
             if env.get('BLOCK_MIGRATION').lower() == 'true':
     def apply_blacklist(self, case_file_name, result_file_name):
         """Apply blacklist."""
         LOGGER.debug("Applying blacklist...")
-        with open(case_file_name, 'r') as cases_file, open(
-                result_file_name, 'w') as result_file:
+        with open(case_file_name, 'r', encoding='utf-8') as cases_file, open(
+                result_file_name, 'w', encoding='utf-8') as result_file:
             black_tests = list(set(self.excl_func() + self.excl_scenario()))
             if black_tests:
                 LOGGER.debug("Blacklisted tests: %s", str(black_tests))
         LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
 
         # save report as JSON
-        report_json_name = '{}.json'.format(test_name)
+        report_json_name = f'{test_name}.json'
         report_json_dir = os.path.join(self.results_dir, report_json_name)
         cmd = (["rally", "task", "report", "--json", "--uuid", task_id,
                 "--out", report_json_dir])
         output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
         LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
 
-        with open(report_json_dir) as json_file:
+        with open(report_json_dir, encoding='utf-8') as json_file:
             json_results = json_file.read()
         self._append_summary(json_results, test_name)
 
             if test in self.stests:
                 self.tests.append(test)
             else:
-                raise Exception("Test name '%s' is invalid" % test)
+                raise Exception(f"Test name '{test}' is invalid")
 
         if not os.path.exists(self.task_dir):
             os.makedirs(self.task_dir)
         task = os.path.join(self.rally_dir, 'task.yaml')
         if not os.path.exists(task):
             LOGGER.error("Task file '%s' does not exist.", task)
-            raise Exception("Task file '{}' does not exist.".
-                            format(task))
+            raise Exception(f"Task file '{task}' does not exist.")
         self.task_file = os.path.join(self.task_dir, 'task.yaml')
         shutil.copyfile(task, self.task_file)
 
         task_macro = os.path.join(self.rally_dir, 'macro')
         if not os.path.exists(task_macro):
             LOGGER.error("Task macro dir '%s' does not exist.", task_macro)
-            raise Exception("Task macro dir '{}' does not exist.".
-                            format(task_macro))
+            raise Exception(f"Task macro dir '{task_macro}' does not exist.")
         macro_dir = os.path.join(self.task_dir, 'macro')
         if os.path.exists(macro_dir):
             shutil.rmtree(macro_dir)
                 success_avg = 100 * item['nb_success'] / item['nb_tests']
             except ZeroDivisionError:
                 success_avg = 0
-            success_str = str("{:0.2f}".format(success_avg)) + '%'
+            success_str = f"{success_avg:0.2f}%"
             duration_str = time.strftime("%H:%M:%S",
                                          time.gmtime(item['overall_duration']))
             res_table.add_row([item['test_name'], duration_str,
             self.result = 100 * total_nb_success / total_nb_tests
         except ZeroDivisionError:
             self.result = 100
-        success_rate = "{:0.2f}".format(self.result)
+        success_rate = f"{self.result:0.2f}"
         success_rate_str = str(success_rate) + '%'
         res_table.add_row(["", "", "", ""])
         res_table.add_row(["TOTAL:", total_duration_str, total_nb_tests,
         rconfig.set('DEFAULT', 'use_stderr', False)
         rconfig.set('DEFAULT', 'log-file', 'rally.log')
         rconfig.set('DEFAULT', 'log_dir', res_dir)
-        with open(rally_conf, 'w') as config_file:
+        with open(rally_conf, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     @staticmethod
             rconfig.remove_option('DEFAULT', 'log-file')
         if rconfig.has_option('DEFAULT', 'log_dir'):
             rconfig.remove_option('DEFAULT', 'log_dir')
-        with open(rally_conf, 'w') as config_file:
+        with open(rally_conf, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def run(self, **kwargs):
             self.run_tests(**kwargs)
             self._generate_report()
             self.export_task(
-                "{}/{}.html".format(self.results_dir, self.case_name))
+                f"{self.results_dir}/{self.case_name}.html")
             self.export_task(
-                "{}/{}.xml".format(self.results_dir, self.case_name),
+                f"{self.results_dir}/{self.case_name}.xml",
                 export_type="junit-xml")
             res = testcase.TestCase.EX_OK
         except Exception:   # pylint: disable=broad-except
     def prepare_run(self, **kwargs):
         """Create resources needed by test scenarios."""
         super().prepare_run(**kwargs)
-        with open(os.path.join(self.rally_dir,
-                               'rally_jobs.yaml'), 'r') as task_file:
+        with open(
+                os.path.join(self.rally_dir, 'rally_jobs.yaml'),
+                'r', encoding='utf-8') as task_file:
             self.task_yaml = yaml.safe_load(task_file)
 
         for task in self.task_yaml:
             if task not in self.tests:
-                raise Exception("Test '%s' not in '%s'" %
-                                (task, self.tests))
+                raise Exception(f"Test '{task}' not in '{self.tests}'")
 
     def apply_blacklist(self, case_file_name, result_file_name):
         # pylint: disable=too-many-branches
             LOGGER.debug("Blacklisted tests: %s", str(black_tests))
 
         template = YAML(typ='jinja2')
-        with open(case_file_name, 'r') as fname:
+        with open(case_file_name, 'r', encoding='utf-8') as fname:
             cases = template.load(fname)
         if cases.get("version", 1) == 1:
             # scenarios in dictionary
                         cases['subtasks'].pop(sind)
                         break
 
-        with open(result_file_name, 'w') as fname:
+        with open(result_file_name, 'w', encoding='utf-8') as fname:
             template.dump(cases, fname)
 
     def build_task_args(self, test_name):
         task_name = self.task_yaml.get(test_name).get("task")
         task = os.path.join(jobs_dir, task_name)
         if not os.path.exists(task):
-            raise Exception("The scenario '%s' does not exist." % task)
+            raise Exception(f"The scenario '{task}' does not exist.")
         LOGGER.debug('Scenario fetched from : %s', task)
 
         if not os.path.exists(self.temp_dir):
 
 
     def _extract_refstack_data(self, refstack_list):
         yaml_data = ""
-        with open(refstack_list) as def_file:
+        with open(refstack_list, encoding='utf-8') as def_file:
             for line in def_file:
                 try:
                     grp = re.search(r'^([^\[]*)(\[.*\])\n*$', line)
-                    yaml_data = "{}\n{}: {}".format(
-                        yaml_data, grp.group(1), grp.group(2))
+                    yaml_data = f"{yaml_data}\n{grp.group(1)}: {grp.group(2)}"
                 except Exception:  # pylint: disable=broad-except
                     self.__logger.warning("Cannot parse %s", line)
         return yaml.full_load(yaml_data)
         for line in output.splitlines():
             try:
                 grp = re.search(r'^([^\[]*)(\[.*\])\n*$', line.decode("utf-8"))
-                yaml_data2 = "{}\n{}: {}".format(
-                    yaml_data2, grp.group(1), grp.group(2))
+                yaml_data2 = f"{yaml_data2}\n{grp.group(1)}: {grp.group(2)}"
             except Exception:  # pylint: disable=broad-except
                 self.__logger.warning("Cannot parse %s. skipping it", line)
         return yaml.full_load(yaml_data2)
     def generate_test_list(self, **kwargs):
         refstack_list = os.path.join(
             getattr(config.CONF, 'dir_refstack_data'),
-            "{}.txt".format(kwargs.get('target', 'compute')))
+            f"{kwargs.get('target', 'compute')}.txt")
         self.backup_tempest_config(self.conf_file, '/etc')
         refstack_data = self._extract_refstack_data(refstack_list)
         tempest_data = self._extract_tempest_data()
-        with open(self.list, 'w') as ref_file:
+        with open(self.list, 'w', encoding='utf-8') as ref_file:
             for key in refstack_data.keys():
                 try:
                     for data in tempest_data[key]:
                     else:
                         self.__logger.info("%s: ids differ. skipping it", key)
                         continue
-                    ref_file.write("{}{}\n".format(
-                        key, str(tempest_data[key]).replace(
-                            "'", "").replace(", ", ",")))
+                    value = str(tempest_data[key]).replace(
+                        "'", "").replace(", ", ",")
+                    ref_file.write(f"{key}{value}\n")
                 except Exception:  # pylint: disable=broad-except
                     self.__logger.info("%s: not found. skipping it", key)
                     continue
 
         scpc.put('/home/opnfv/functest/conf/env_file', remote_path='~/')
         if os.environ.get('OS_CACERT'):
             scpc.put(os.environ.get('OS_CACERT'), remote_path='~/os_cacert')
+        opt = 'export OS_CACERT=~/os_cacert && ' if os.environ.get(
+            'OS_CACERT') else ''
         (_, stdout, stderr) = self.ssh.exec_command(
             'source ~/env_file && '
             'export OS_INTERFACE=public && '
-            'export OS_AUTH_URL={} && '
-            'export OS_USERNAME={} && '
-            'export OS_PROJECT_NAME={} && '
-            'export OS_PROJECT_ID={} && '
+            f'export OS_AUTH_URL={endpoint} && '
+            f'export OS_USERNAME={self.project.user.name} && '
+            f'export OS_PROJECT_NAME={self.project.project.name} && '
+            f'export OS_PROJECT_ID={self.project.project.id} && '
             'unset OS_TENANT_NAME && '
             'unset OS_TENANT_ID && '
             'unset OS_ENDPOINT_TYPE && '
-            'export OS_PASSWORD="{}" && '
-            '{}'
+            f'export OS_PASSWORD="{self.project.password}" && '
+            f'{opt}'
             'env && '
-            'timeout {} shaker --debug --image-name {} --flavor-name {} '
-            '--server-endpoint {}:9000 --external-net {} --dns-nameservers {} '
+            f'timeout {self.shaker_timeout} shaker --debug '
+            f'--image-name {self.image.name} --flavor-name {self.flavor.name} '
+            f'--server-endpoint {self.fip.floating_ip_address}:9000 '
+            f'--external-net {self.ext_net.id} '
+            f"--dns-nameservers {env.get('NAMESERVER')} "
             '--scenario openstack/full_l2,'
             'openstack/full_l3_east_west,'
             'openstack/full_l3_north_south,'
             'openstack/perf_l3_north_south '
-            '--report report.html --output report.json'.format(
-                endpoint, self.project.user.name, self.project.project.name,
-                self.project.project.id, self.project.password,
-                'export OS_CACERT=~/os_cacert && ' if os.environ.get(
-                    'OS_CACERT') else '',
-                self.shaker_timeout, self.image.name, self.flavor.name,
-                self.fip.floating_ip_address, self.ext_net.id,
-                env.get('NAMESERVER')))
+            '--report report.html --output report.json')
         self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
         self.__logger.info("error:\n%s", stderr.read().decode("utf-8"))
         if not os.path.exists(self.res_dir):
         except scp.SCPException:
             self.__logger.exception("cannot get report files")
             return 1
-        with open(os.path.join(self.res_dir, 'report.json')) as json_file:
+        with open(
+                os.path.join(self.res_dir, 'report.json'),
+                encoding='utf-8') as json_file:
             data = json.load(json_file)
             for value in data["records"].values():
                 if value["status"] != "ok":
 
     @staticmethod
     def read_file(filename):
         """Read file and return content as a stripped list."""
-        with open(filename) as src:
+        with open(filename, encoding='utf-8') as src:
             return [line.strip() for line in src.readlines()]
 
     @staticmethod
         """
         return os.path.join(getattr(config.CONF, 'dir_rally_inst'),
                             'verification',
-                            'verifier-{}'.format(verifier_id),
+                            f'verifier-{verifier_id}',
                             'repo')
 
     @staticmethod
         """
         return os.path.join(getattr(config.CONF, 'dir_rally_inst'),
                             'verification',
-                            'verifier-{}'.format(verifier_id),
-                            'for-deployment-{}'.format(deployment_id))
+                            f'verifier-{verifier_id}',
+                            f'for-deployment-{deployment_id}')
 
     @staticmethod
     def update_tempest_conf_file(conf_file, rconfig):
         """Update defined paramters into tempest config file"""
-        with open(TempestCommon.tempest_conf_yaml) as yfile:
+        with open(TempestCommon.tempest_conf_yaml, encoding='utf-8') as yfile:
             conf_yaml = yaml.safe_load(yfile)
         if conf_yaml:
             sections = rconfig.sections()
                 for key, value in sub_conf.items():
                     rconfig.set(section, key, value)
 
-        with open(conf_file, 'w') as config_file:
+        with open(conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     @staticmethod
                 shutil.copyfile(
                     self.tempest_custom, self.list)
             else:
-                raise Exception("Tempest test list file %s NOT found."
-                                % self.tempest_custom)
+                raise Exception(
+                    f"Tempest test list file {self.tempest_custom} NOT found.")
         else:
             testr_mode = kwargs.get(
                 'mode', r'^tempest\.(api|scenario).*\[.*\bsmoke\b.*\]$')
-            cmd = "(cd {0}; stestr list '{1}' >{2} 2>/dev/null)".format(
-                self.verifier_repo_dir, testr_mode, self.list)
+            cmd = (f"(cd {self.verifier_repo_dir}; "
+                   f"stestr list '{testr_mode}' > {self.list} 2>/dev/null)")
             output = subprocess.check_output(cmd, shell=True)
             LOGGER.info("%s\n%s", cmd, output.decode("utf-8"))
         os.remove('/etc/tempest.conf')
             os.remove(self.raw_list)
         os.rename(self.list, self.raw_list)
         cases_file = self.read_file(self.raw_list)
-        with open(self.list, 'w') as result_file:
+        with open(self.list, 'w', encoding='utf-8') as result_file:
             black_tests = []
             try:
                 deploy_scenario = env.get('DEPLOY_SCENARIO')
                 if bool(deploy_scenario):
                     # if DEPLOY_SCENARIO is set we read the file
-                    with open(black_list) as black_list_file:
+                    with open(black_list, encoding='utf-8') as black_list_file:
                         black_list_yaml = yaml.safe_load(black_list_file)
                         black_list_file.close()
                         for item in black_list_yaml:
         LOGGER.info("Starting Tempest test suite: '%s'.", cmd)
 
         with open(
-                os.path.join(self.res_dir, "tempest.log"), 'w+') as f_stdout:
-
+                os.path.join(self.res_dir, "tempest.log"), 'w+',
+                encoding='utf-8') as f_stdout:
             with subprocess.Popen(
                     cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                     bufsize=1) as proc:
-
                 with proc.stdout:
                     for line in iter(proc.stdout.readline, b''):
                         if re.search(r"\} tempest\.", line.decode("utf-8")):
         LOGGER.info('Verification UUID: %s', self.verification_id)
 
         shutil.copy(
-            "{}/tempest.log".format(self.deployment_dir),
-            "{}/tempest.debug.log".format(self.res_dir))
+            f"{self.deployment_dir}/tempest.log",
+            f"{self.res_dir}/tempest.debug.log")
 
     def parse_verifier_result(self):
         """Parse and save test results."""
                     LOGGER.error("No test has been executed")
                     return
 
-            with open(os.path.join(self.res_dir,
-                                   "rally.log"), 'r') as logfile:
+            with open(os.path.join(self.res_dir, "rally.log"),
+                      'r', encoding='utf-8') as logfile:
                 output = logfile.read()
 
             success_testcases = []
         rconfig.read(rally_conf)
         if not rconfig.has_section('openstack'):
             rconfig.add_section('openstack')
-        rconfig.set('openstack', 'img_name_regex', '^{}$'.format(
-            self.image.name))
-        with open(rally_conf, 'w') as config_file:
+        rconfig.set('openstack', 'img_name_regex', f'^{self.image.name}$')
+        with open(rally_conf, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_default_role(self, rally_conf='/etc/rally/rally.conf'):
         if not rconfig.has_section('openstack'):
             rconfig.add_section('openstack')
         rconfig.set('openstack', 'swift_operator_role', role.name)
-        with open(rally_conf, 'w') as config_file:
+        with open(rally_conf, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     @staticmethod
             rconfig.remove_option('openstack', 'img_name_regex')
         if rconfig.has_option('openstack', 'swift_operator_role'):
             rconfig.remove_option('openstack', 'swift_operator_role')
-        with open(rally_conf, 'w') as config_file:
+        with open(rally_conf, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_auth_section(self):
             account_file = os.path.join(
                 getattr(config.CONF, 'dir_functest_data'), 'accounts.yaml')
             assert os.path.exists(
-                account_file), "{} doesn't exist".format(account_file)
+                account_file), f"{account_file} doesn't exist"
             rconfig.set('auth', 'test_accounts_file', account_file)
         if env.get('NO_TENANT_NETWORK').lower() == 'true':
             rconfig.set('auth', 'create_isolated_networks', False)
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_network_section(self):
             if not rconfig.has_section('network-feature-enabled'):
                 rconfig.add_section('network-feature-enabled')
             rconfig.set('network-feature-enabled', 'floating_ips', False)
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_compute_section(self):
         rconfig.set(
             'compute', 'fixed_network_name',
             self.network.name if self.network else env.get("EXTERNAL_NETWORK"))
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_validation_section(self):
         rconfig.set(
             'validation', 'network_for_ssh',
             self.network.name if self.network else env.get("EXTERNAL_NETWORK"))
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_scenario_section(self):
         rconfig = configparser.RawConfigParser()
         rconfig.read(self.conf_file)
         filename = getattr(
-            config.CONF, '{}_image'.format(self.case_name), self.filename)
+            config.CONF, f'{self.case_name}_image', self.filename)
         if not rconfig.has_section('scenario'):
             rconfig.add_section('scenario')
         rconfig.set('scenario', 'img_file', filename)
         rconfig.set('scenario', 'img_disk_format', getattr(
-            config.CONF, '{}_image_format'.format(self.case_name),
+            config.CONF, f'{self.case_name}_image_format',
             self.image_format))
         extra_properties = self.extra_properties.copy()
         if env.get('IMAGE_PROPERTIES'):
                 functest_utils.convert_ini_to_dict(
                     env.get('IMAGE_PROPERTIES')))
         extra_properties.update(
-            getattr(config.CONF, '{}_extra_properties'.format(
-                self.case_name), {}))
+            getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
         rconfig.set(
             'scenario', 'img_properties',
             functest_utils.convert_dict_to_ini(extra_properties))
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def update_dashboard_section(self):
             rconfig.set('dashboard', 'dashboard_url', env.get('DASHBOARD_URL'))
         else:
             rconfig.set('service_available', 'horizon', False)
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
 
     def configure(self, **kwargs):  # pylint: disable=unused-argument
     def __init__(self, **kwargs):
         super().__init__(**kwargs)
         self.user2 = self.orig_cloud.create_user(
-            name='{}-user2_{}'.format(self.case_name, self.project.guid),
+            name=f'{self.case_name}-user2_{self.project.guid}',
             password=self.project.password,
             domain_id=self.project.domain.id)
         self.orig_cloud.grant_role(
                 env.get("EXTERNAL_NETWORK"))
             rconfig.set(
                 'heat_plugin', 'network_for_ssh', env.get("EXTERNAL_NETWORK"))
-        with open(self.conf_file, 'w') as config_file:
+        with open(self.conf_file, 'w', encoding='utf-8') as config_file:
             rconfig.write(config_file)
         self.backup_tempest_config(self.conf_file, self.res_dir)
 
 
         if "case_name" not in kwargs:
             kwargs["case_name"] = 'vmtp'
         super().__init__(**kwargs)
-        self.config = "{}/vmtp.conf".format(self.res_dir)
+        self.config = f"{self.res_dir}/vmtp.conf"
         (_, self.privkey_filename) = tempfile.mkstemp()
         (_, self.pubkey_filename) = tempfile.mkstemp()
 
         assert self.cloud
         assert self.ext_net
         self.router = self.cloud.create_router(
-            name='{}-router_{}'.format(self.case_name, self.guid),
+            name=f'{self.case_name}-router_{self.guid}',
             ext_gateway_net_id=self.ext_net.id)
         self.__logger.debug("router: %s", self.router)
 
         Raises: Exception on error
         """
         assert self.cloud
-        name = "vmtp_{}".format(self.guid)
+        name = f"vmtp_{self.guid}"
         self.__logger.info("Creating keypair with name: '%s'", name)
         keypair = self.cloud.create_keypair(name)
         self.__logger.debug("keypair: %s", keypair)
-        with open(self.privkey_filename, 'w') as key_file:
+        with open(self.privkey_filename, 'w', encoding='utf-8') as key_file:
             key_file.write(keypair.private_key)
-        with open(self.pubkey_filename, 'w') as key_file:
+        with open(self.pubkey_filename, 'w', encoding='utf-8') as key_file:
             key_file.write(keypair.public_key)
         self.cloud.delete_keypair(keypair.id)
 
         cmd = ['vmtp', '-sc']
         output = subprocess.check_output(cmd).decode("utf-8")
         self.__logger.info("%s\n%s", " ".join(cmd), output)
-        with open(self.config, "w+") as conf:
+        with open(self.config, "w+", encoding='utf-8') as conf:
             vmtp_conf = yaml.full_load(output)
             vmtp_conf["private_key_file"] = self.privkey_filename
             vmtp_conf["public_key_file"] = self.pubkey_filename
             vmtp_conf["router_name"] = str(self.router.name)
             vmtp_conf["flavor_type"] = str(self.flavor.name)
             vmtp_conf["internal_network_name"] = [
-                "pns-internal-net_{}".format(self.guid),
-                "pns-internal-net2_{}".format(self.guid)]
-            vmtp_conf["vm_name_client"] = "TestClient_{}".format(self.guid)
-            vmtp_conf["vm_name_server"] = "TestServer_{}".format(self.guid)
-            vmtp_conf["security_group_name"] = "pns-security{}".format(
-                self.guid)
+                f"pns-internal-net_{self.guid}",
+                f"pns-internal-net2_{self.guid}"]
+            vmtp_conf["vm_name_client"] = f"TestClient_{self.guid}"
+            vmtp_conf["vm_name_server"] = f"TestServer_{self.guid}"
+            vmtp_conf["security_group_name"] = f"pns-security{self.guid}"
             vmtp_conf["dns_nameservers"] = [env.get('NAMESERVER')]
             vmtp_conf["generic_retry_count"] = self.create_server_timeout // 2
             vmtp_conf["ssh_retry_count"] = self.ssh_retry_timeout // 2
             OS_USER_DOMAIN_NAME=self.project.domain.name,
             OS_PASSWORD=self.project.password)
         if not new_env["OS_AUTH_URL"].endswith(('v3', 'v3/')):
-            new_env["OS_AUTH_URL"] = "{}/v3".format(new_env["OS_AUTH_URL"])
+            new_env["OS_AUTH_URL"] = f'{new_env["OS_AUTH_URL"]}/v3'
         try:
             del new_env['OS_TENANT_NAME']
             del new_env['OS_TENANT_ID']
         except Exception:  # pylint: disable=broad-except
             pass
-        cmd = ['vmtp', '-d', '--json', '{}/vmtp.json'.format(self.res_dir),
+        cmd = ['vmtp', '-d', '--json', f'{self.res_dir}/vmtp.json',
                '-c', self.config]
         if env.get("VMTP_HYPERVISORS"):
             hypervisors = functest_utils.convert_ini_to_list(
         output = subprocess.check_output(
             cmd, stderr=subprocess.STDOUT, env=new_env).decode("utf-8")
         self.__logger.info("%s\n%s", " ".join(cmd), output)
-        cmd = ['vmtp_genchart', '-c', '{}/vmtp.html'.format(self.res_dir),
-               '{}/vmtp.json'.format(self.res_dir)]
+        cmd = ['vmtp_genchart', '-c', f'{self.res_dir}/vmtp.html',
+               f'{self.res_dir}/vmtp.json']
         output = subprocess.check_output(
             cmd, stderr=subprocess.STDOUT).decode("utf-8")
         self.__logger.info("%s\n%s", " ".join(cmd), output)
-        with open('{}/vmtp.json'.format(self.res_dir), 'r') as res_file:
+        with open(f'{self.res_dir}/vmtp.json', 'r',
+                  encoding='utf-8') as res_file:
             self.details = json.load(res_file)
 
     def run(self, **kwargs):
             super().clean()
             os.remove(self.privkey_filename)
             os.remove(self.pubkey_filename)
-            self.cloud.delete_network("pns-internal-net_{}".format(self.guid))
-            self.cloud.delete_network("pns-internal-net2_{}".format(self.guid))
+            self.cloud.delete_network(f"pns-internal-net_{self.guid}")
+            self.cloud.delete_network(f"pns-internal-net2_{self.guid}")
         except Exception:  # pylint: disable=broad-except
             pass
 
     def prepare(self):
         super().prepare()
         self.vm2 = self.boot_vm(
-            '{}-vm2_{}'.format(self.case_name, self.guid),
+            f'{self.case_name}-vm2_{self.guid}',
             security_groups=[self.sec.id])
 
     def execute(self):
         assert self.ssh
         if not self.check_regex_in_console(self.vm2.name):
             return 1
-        (_, stdout, stderr) = self.ssh.exec_command(
-            'ping -c 1 {}'.format(
-                self.vm2.private_v4 or self.vm2.addresses[
-                    self.network.name][0].addr))
+        ip4 = self.vm2.private_v4 or self.vm2.addresses[
+            self.network.name][0].addr
+        (_, stdout, stderr) = self.ssh.exec_command(f'ping -c 1 {ip4}')
         self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
         self.__logger.info("error:\n%s", stderr.read().decode("utf-8"))
         return stdout.channel.recv_exit_status()
 
             self.result = 0
             self.vm1 = self.boot_vm()
             self.vm2 = self.boot_vm(
-                '{}-vm2_{}'.format(self.case_name, self.guid),
+                f'{self.case_name}-vm2_{self.guid}',
                 userdata=self._get_userdata())
 
             result = self._do_vping()
         :param test_ip: the IP value to substitute into the script
         :return: the bash script contents
         """
+        ip4 = self.vm1.private_v4 or self.vm1.addresses[
+            self.network.name][0].addr
         if self.vm1.private_v4 or self.vm1.addresses[
                 self.network.name][0].addr:
             return ("#!/bin/sh\n\n"
                     "while true; do\n"
-                    " ping -c 1 %s 2>&1 >/dev/null\n"
+                    f" ping -c 1 {ip4} 2>&1 >/dev/null\n"
                     " RES=$?\n"
                     " if [ \"Z$RES\" = \"Z0\" ] ; then\n"
                     "  echo 'vPing OK'\n"
                     "  echo 'vPing KO'\n"
                     " fi\n"
                     " sleep 1\n"
-                    "done\n" % str(
-                        self.vm1.private_v4 or self.vm1.addresses[
-                            self.network.name][0].addr))
+                    "done\n")
         return None
 
     def clean(self):
 
         try:
             for line in fileinput.input(cls.odl_variables_file,
                                         inplace=True):
-                print(re.sub("@{AUTH}.*",
-                             "@{{AUTH}}           {}    {}".format(
-                                 odlusername, odlpassword),
-                             line.rstrip()))
+                print(re.sub(
+                    "@{AUTH}.*",
+                    f"@{{AUTH}}           {odlusername}    {odlpassword}",
+                    line.rstrip()))
             return True
         except Exception:  # pylint: disable=broad-except
             cls.__logger.exception("Cannot set ODL creds:")
             odlusername = kwargs['odlusername']
             odlpassword = kwargs['odlpassword']
             osauthurl = kwargs['osauthurl']
-            keystoneurl = "{}://{}".format(
-                urllib.parse.urlparse(osauthurl).scheme,
-                urllib.parse.urlparse(osauthurl).netloc)
+            keystoneurl = (f"{urllib.parse.urlparse(osauthurl).scheme}://"
+                           f"{urllib.parse.urlparse(osauthurl).netloc}")
             variable = ['KEYSTONEURL:' + keystoneurl,
                         'NEUTRONURL:' + kwargs['neutronurl'],
                         'OS_AUTH_URL:"' + osauthurl + '"',
 
             'functest', 'opnfv_tests/vnf/epc')
         try:
             self.config = getattr(
-                config.CONF, 'vnf_{}_config'.format(self.case_name))
+                config.CONF, f'vnf_{self.case_name}_config')
         except Exception as exc:
             raise Exception("VNF config file not found") from exc
         self.config_file = os.path.join(self.case_dir, self.config)
         try:
             self.public_auth_url = self.get_public_auth_url(self.orig_cloud)
             if not self.public_auth_url.endswith(('v3', 'v3/')):
-                self.public_auth_url = "{}/v3".format(self.public_auth_url)
+                self.public_auth_url = f"{self.public_auth_url}/v3"
         except Exception:  # pylint: disable=broad-except
             self.public_auth_url = None
         self.sec = None
             'url': self.public_auth_url,
             'region': self.cloud.region_name if self.cloud.region_name else (
                 'RegionOne')}
-        with open(clouds_yaml, 'w') as yfile:
+        with open(clouds_yaml, 'w', encoding='utf-8') as yfile:
             yfile.write(CLOUD_TEMPLATE.format(**cloud_data))
         scpc = scp.SCPClient(self.ssh.get_transport())
         scpc.put(clouds_yaml, remote_path='~/')
                 "project_domain_name", "Default"),
             'user_domain_n': self.cloud.auth.get(
                 "user_domain_name", "Default")}
-        with open(credentials_yaml, 'w') as yfile:
+        with open(credentials_yaml, 'w', encoding='utf-8') as yfile:
             yfile.write(CREDS_TEMPLATE.format(**creds_data))
         scpc = scp.SCPClient(self.ssh.get_transport())
         scpc.put(credentials_yaml, remote_path='~/')
             'RegionOne')
         (_, stdout, stderr) = self.ssh.exec_command(
             '/snap/bin/juju metadata generate-image -d /home/ubuntu '
-            '-i {} -s xenial -r {} -u {}'.format(
-                self.image.id, region_name, self.public_auth_url))
+            f'-i {self.image.id} -s xenial -r {region_name} '
+            f'-u {self.public_auth_url}')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         return not stdout.channel.recv_exit_status()
             'RegionOne')
         (_, stdout, stderr) = self.ssh.exec_command(
             '/snap/bin/juju metadata generate-image -d /home/ubuntu '
-            '-i {} -s trusty -r {} -u {}'.format(
-                image_alt.id, region_name, self.public_auth_url))
+            f'-i {image_alt.id} -s trusty -r {region_name} '
+            f'-u {self.public_auth_url}')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         return image_alt
         region_name = self.cloud.region_name if self.cloud.region_name else (
             'RegionOne')
         (_, stdout, stderr) = self.ssh.exec_command(
-            'timeout {} '
-            '/snap/bin/juju bootstrap abot-epc/{} abot-controller '
+            f'timeout {JujuEpc.juju_timeout} '
+            f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller '
             '--agent-version 2.3.9 --metadata-source /home/ubuntu '
             '--constraints mem=2G --bootstrap-series xenial '
-            '--config network={} '
+            f'--config network={self.network.id} '
             '--config ssl-hostname-verification=false '
-            '--config external-network={} '
+            f'--config external-network={self.ext_net.id} '
             '--config use-floating-ip=true '
             '--config use-default-secgroup=true '
-            '--debug'.format(
-                JujuEpc.juju_timeout, region_name, self.network.id,
-                self.ext_net.id))
+            '--debug')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         return not stdout.channel.recv_exit_status()
         """Check application status."""
         for i in range(10):
             (_, stdout, stderr) = self.ssh.exec_command(
-                '/snap/bin/juju status --format short {}'.format(name))
+                f'/snap/bin/juju status --format short {name}')
             output = stdout.read().decode("utf-8")
             self.__logger.debug("stdout:\n%s", output)
             self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
             if stdout.channel.recv_exit_status():
                 continue
             ret = re.search(
-                r'(?=workload:({})\))'.format(status), output)
+                rf'(?=workload:({status})\))', output)
             if ret:
                 self.__logger.info("%s workload is %s", name, status)
                 break
             return not stdout.channel.recv_exit_status()
         (_, stdout, stderr) = self.ssh.exec_command(
             'PATH=/snap/bin/:$PATH '
-            'timeout {} juju-wait'.format(JujuEpc.juju_timeout))
+            f'timeout {JujuEpc.juju_timeout} juju-wait')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         if stdout.channel.recv_exit_status():
                 return False
         scpc = scp.SCPClient(self.ssh.get_transport())
         scpc.put(
-            '{}/featureFiles'.format(self.case_dir), remote_path='~/',
+            f'{self.case_dir}/featureFiles', remote_path='~/',
             recursive=True)
         (_, stdout, stderr) = self.ssh.exec_command(
-            'timeout {} /snap/bin/juju scp -- -r -v ~/featureFiles '
-            'abot-epc-basic/0:/etc/rebaca-test-suite/'.format(
-                JujuEpc.juju_timeout))
+            f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v '
+            '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/')
         output = stdout.read().decode("utf-8")
         self.__logger.debug("stdout:\n%s", output)
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         """Run test on ABoT."""
         start_time = time.time()
         (_, stdout, stderr) = self.ssh.exec_command(
-            '/snap/bin/juju run-action abot-epc-basic/0 '
-            'run tagnames={}'.format(self.details['test_vnf']['tag_name']))
+            "/snap/bin/juju run-action abot-epc-basic/0 "
+            f"run tagnames={self.details['test_vnf']['tag_name']}")
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         if stdout.channel.recv_exit_status():
             return not stdout.channel.recv_exit_status()
         (_, stdout, stderr) = self.ssh.exec_command(
             'PATH=/snap/bin/:$PATH '
-            'timeout {} juju-wait'.format(JujuEpc.juju_timeout))
+            f'timeout {JujuEpc.juju_timeout} juju-wait')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         if stdout.channel.recv_exit_status():
         duration = time.time() - start_time
         self.__logger.info("Getting results from Abot node....")
         (_, stdout, stderr) = self.ssh.exec_command(
-            'timeout {} /snap/bin/juju scp -- -v abot-epc-basic/0:'
-            '/var/lib/abot-epc-basic/artifacts/TestResults.json .'.format(
-                JujuEpc.juju_timeout))
+            f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp '
+            '-- -v abot-epc-basic/0:'
+            '/var/lib/abot-epc-basic/artifacts/TestResults.json .')
         self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
         self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
         if stdout.channel.recv_exit_status():
         scpc = scp.SCPClient(self.ssh.get_transport())
         scpc.get('TestResults.json', self.res_dir)
         self.__logger.info("Parsing the Test results...")
-        res = (process_abot_test_result('{}/TestResults.json'.format(
-            self.res_dir)))
+        res = process_abot_test_result(f'{self.res_dir}/TestResults.json')
         short_result = sig_test_format(res)
         self.__logger.info(short_result)
         self.details['test_vnf'].update(
 
 def process_abot_test_result(file_path):
     """ Process ABoT Result """
-    with open(file_path) as test_result:
+    with open(file_path, encoding='utf-8') as test_result:
         data = json.load(test_result)
         res = []
         for tests in data:
 
         output_dict = {}
         self.logger.debug('Ellis IP: %s', self.ellis_ip)
         output_dict['ellis_ip'] = self.ellis_ip
-        account_url = 'http://{0}/accounts'.format(self.ellis_ip)
+        account_url = f'http://{self.ellis_ip}/accounts'
         params = {"password": "functest",
                   "full_name": "opnfv functest user",
                   "email": "functest@opnfv.org",
         number_res = self._create_ellis_account(account_url, params)
         output_dict['number'] = number_res
 
-        session_url = 'http://{0}/session'.format(self.ellis_ip)
+        session_url = f'http://{self.ellis_ip}/session'
         session_data = {
             'username': params['email'],
             'password': params['password'],
         }
         cookies = self._get_ellis_session_cookies(session_url, session_data)
 
-        number_url = 'http://{0}/accounts/{1}/numbers'.format(
-            self.ellis_ip, params['email'])
+        number_url = (
+            f"http://{self.ellis_ip}/accounts/{params['email']}/numbers")
         self.logger.debug('Create 1st calling number on Ellis')
         number_res = self._create_ellis_number(number_url, cookies)
 
                     "try %s: cannot create ellis account", iloop + 1)
                 time.sleep(30)
         raise Exception(
-            "Unable to create an account {}".format(
-                params.get('full_name')))
+            f"Unable to create an account {params.get('full_name')}")
 
     def _get_ellis_session_cookies(self, session_url, params):
         i = 15
         """
         # pylint: disable=too-many-locals,too-many-arguments
         self.logger.info('Run Clearwater live test')
-        script = ('cd {0};'
-                  'rake test[{1}] SIGNUP_CODE={2}'
-                  .format(self.test_dir,
-                          public_domain,
-                          signup_code))
+        script = (f'cd {self.test_dir};'
+                  f'rake test[{public_domain}] SIGNUP_CODE={signup_code}')
         if self.bono_ip and self.ellis_ip:
-            subscript = ' PROXY={0} ELLIS={1}'.format(
-                self.bono_ip, self.ellis_ip)
-            script = '{0}{1}'.format(script, subscript)
-        script = ('{0}{1}'.format(script, ' --trace'))
-        cmd = "/bin/bash -c '{0}'".format(script)
+            subscript = f' PROXY={self.bono_ip} ELLIS={self.ellis_ip}'
+            script = f'{script}{subscript}'
+        script = f'{script} --trace'
+        cmd = f"/bin/bash -c '{script}'"
         self.logger.debug('Live test cmd: %s', cmd)
         output_file = os.path.join(self.result_dir, "ims_test_output.txt")
         ft_utils.execute_command(cmd,
                                  error_msg='Clearwater live test failed',
                                  output_file=output_file)
 
-        with open(output_file, 'r') as ofile:
+        with open(output_file, 'r', encoding='utf-8') as ofile:
             result = ofile.read()
 
         if result != "":
 
         # Retrieve the configuration
         try:
             self.config = getattr(
-                config.CONF, 'vnf_{}_config'.format(self.case_name))
+                config.CONF, f'vnf_{self.case_name}_config')
         except Exception as exc:
             raise Exception("VNF config file not found") from exc
 
 
         # Retrieve the configuration
         try:
             self.config = getattr(
-                config.CONF, 'vnf_{}_config'.format(self.case_name))
+                config.CONF, f'vnf_{self.case_name}_config')
         except Exception as exc:
             raise Exception("VNF config file not found") from exc
 
             project=self.project.project.id,
             domain=self.project.domain.id)
         self.keypair = self.cloud.create_keypair(
-            '{}-kp_{}'.format(self.case_name, self.guid))
+            f'{self.case_name}-kp_{self.guid}')
         self.__logger.info("keypair:\n%s", self.keypair.private_key)
-        with open(self.key_filename, 'w') as private_key_file:
+        with open(
+                self.key_filename, 'w', encoding='utf-8') as private_key_file:
             private_key_file.write(self.keypair.private_key)
 
         if self.deploy_vnf() and self.test_vnf():
 
         # Retrieve the configuration
         try:
             self.config = getattr(
-                config.CONF, 'vnf_{}_config'.format(self.case_name))
+                config.CONF, f'vnf_{self.case_name}_config')
         except Exception as exc:
             raise Exception("VNF config file not found") from exc
 
 
                 os.path.join(
                     self.util.vnf_data_dir, self.util.command_template_dir,
                     self.util.test_cmd_map_yaml_file),
-                'r') as test_cmd_map_file:
+                'r', encoding='utf-8') as test_cmd_map_file:
             self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
 
         self.util.set_credentials(credentials["cloud"])
 
-        with open(self.util.test_env_config_yaml) as file_fd:
+        with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
             test_env_config_yaml = yaml.safe_load(file_fd)
         file_fd.close()
 
 
         if not os.path.exists(self.vnf_data_dir):
             os.makedirs(self.vnf_data_dir)
 
-        with open(self.test_env_config_yaml) as file_fd:
+        with open(self.test_env_config_yaml, encoding='utf-8') as file_fd:
             test_env_config_yaml = yaml.safe_load(file_fd)
         file_fd.close()
 
         return mac_address
 
     def get_blueprint_outputs(self, cfy_manager_ip, deployment_name):
-        url = "http://%s/deployments/%s/outputs" % (
-            cfy_manager_ip, deployment_name)
-
+        url = f"http://{cfy_manager_ip}/deployments/{deployment_name}/outputs"
         response = requests.get(
             url,
             auth=requests.auth.HTTPBasicAuth('admin', 'admin'),
     def write_result_data(self, result_data):
         test_result = []
         if not os.path.isfile(self.test_result_json_file):
-            with open(self.test_result_json_file, "w") as file_fd:
+            with open(
+                    self.test_result_json_file, "w",
+                    encoding="utf-8") as file_fd:
                 pass
         else:
-            with open(self.test_result_json_file, "r") as file_fd:
+            with open(
+                    self.test_result_json_file, "r",
+                    encoding="utf-8") as file_fd:
                 test_result = json.load(file_fd)
 
         test_result.append(result_data)
 
-        with open(self.test_result_json_file, "w") as file_fd:
+        with open(
+                self.test_result_json_file, "w",
+                encoding="utf-8") as file_fd:
             json.dump(test_result, file_fd)
 
     def output_test_result_json(self):
         if os.path.isfile(self.test_result_json_file):
-            with open(self.test_result_json_file, "r") as file_fd:
+            with open(
+                    self.test_result_json_file, "r",
+                    encoding="utf-8") as file_fd:
                 test_result = json.load(file_fd)
             output_json_data = json.dumps(test_result,
                                           sort_keys=True,
 
     @staticmethod
     def get_test_scenario(file_path):
-        with open(file_path, 'r') as test_scenario_file:
+        with open(file_path, "r", encoding="utf-8") as test_scenario_file:
             test_scenario_yaml = yaml.safe_load(test_scenario_file)
         return test_scenario_yaml["test_scenario_list"]
 
         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 
         self.util = Utilvnf()
-        with open(self.util.test_env_config_yaml) as file_fd:
+        with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
             test_env_config_yaml = yaml.safe_load(file_fd)
         file_fd.close()
 
 
         self.util = Utilvnf()
         self.util.set_credentials(credentials["cloud"])
 
-        with open(self.util.test_env_config_yaml) as file_fd:
+        with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
             test_env_config_yaml = yaml.safe_load(file_fd)
         file_fd.close()
 
 
     def command_create_and_execute(self, ssh, test_cmd_file_path,
                                    cmd_input_param, prompt_file_path):
-        with open(prompt_file_path, 'r') as prompt_file:
+        with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
             prompt = yaml.safe_load(prompt_file)
         config_mode_prompt = prompt["config_mode"]
 
 
         self.util = Utilvnf()
         self.vm_controller = VmController(util_info)
 
-        with open(self.util.test_env_config_yaml) as file_fd:
+        with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
             test_env_config_yaml = yaml.safe_load(file_fd)
         file_fd.close()
 
     def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
                    parameter_file_path, prompt_file_path):
         # pylint: disable=too-many-arguments
-        with open(parameter_file_path, 'r') as parameter_file:
+        with open(
+                parameter_file_path, 'r', encoding='utf-8') as parameter_file:
             cmd_input_param = yaml.safe_load(parameter_file)
 
         cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
 
         res_dict_data_list = []
 
-        with open(parameter_file_path, 'r') as parameter_file:
+        with open(
+                parameter_file_path, 'r', encoding='utf-8') as parameter_file:
             cmd_input_param = yaml.safe_load(parameter_file)
 
         cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
         cmd_input_param["destination_ip"] = reference_vnf[
             "data_plane_network_ip"]
 
-        with open(prompt_file_path, 'r') as prompt_file:
+        with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
             prompt = yaml.safe_load(prompt_file)
         terminal_mode_prompt = prompt["terminal_mode"]
 
 
     logging.disable(logging.CRITICAL)
 
     _keystone_ip = "127.0.0.1"
-    _neutron_url = u"https://127.0.0.1:9696"
-    _neutron_id = u"dummy"
+    _neutron_url = "https://127.0.0.1:9696"
+    _neutron_id = "dummy"
     _sdn_controller_ip = "127.0.0.3"
-    _os_auth_url = "http://{}:5000/v3".format(_keystone_ip)
+    _os_auth_url = f"http://{_keystone_ip}:5000/v3"
     _os_projectname = "admin"
     _os_username = "admin"
     _os_password = "admin"
         self.test = odl.ODLTests(case_name='odl', project_name='functest')
         self.defaultargs = {'odlusername': self._odl_username,
                             'odlpassword': self._odl_password,
-                            'neutronurl': "http://{}:9696".format(
-                                self._keystone_ip),
+                            'neutronurl': f"http://{self._keystone_ip}:9696",
                             'osauthurl': self._os_auth_url,
                             'osusername': self._os_username,
                             'osuserdomainname': self._os_userdomainname,
             mock_method.assert_called_once_with(
                 os.path.join(odl.ODLTests.odl_test_repo,
                              'csit/variables/Variables.robot'), inplace=True)
-            self.assertEqual(args[0].getvalue(), "{}\n".format(msg2))
+            self.assertEqual(args[0].getvalue(), f"{msg2}\n")
 
     def test_set_vars_auth_default(self):
         self._test_set_vars(
             args[0].assert_called_once_with(self.test.odl_variables_file)
         if len(args) > 1:
             variable = [
-                'KEYSTONEURL:{}://{}'.format(
-                    urllib.parse.urlparse(self._os_auth_url).scheme,
-                    urllib.parse.urlparse(self._os_auth_url).netloc),
-                'NEUTRONURL:{}'.format(self._neutron_url),
-                'OS_AUTH_URL:"{}"'.format(self._os_auth_url),
-                'OSUSERNAME:"{}"'.format(self._os_username),
-                'OSUSERDOMAINNAME:"{}"'.format(self._os_userdomainname),
-                'OSTENANTNAME:"{}"'.format(self._os_projectname),
-                'OSPROJECTDOMAINNAME:"{}"'.format(self._os_projectdomainname),
-                'OSPASSWORD:"{}"'.format(self._os_password),
-                'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip),
-                'PORT:{}'.format(self._odl_webport),
-                'RESTCONFPORT:{}'.format(self._odl_restconfport)]
+                ('KEYSTONEURL:'
+                 f'{urllib.parse.urlparse(self._os_auth_url).scheme}://'
+                 f'{urllib.parse.urlparse(self._os_auth_url).netloc}'),
+                f'NEUTRONURL:{self._neutron_url}',
+                f'OS_AUTH_URL:"{self._os_auth_url}"',
+                f'OSUSERNAME:"{self._os_username}"',
+                f'OSUSERDOMAINNAME:"{self._os_userdomainname}"',
+                f'OSTENANTNAME:"{self._os_projectname}"',
+                f'OSPROJECTDOMAINNAME:"{self._os_projectdomainname}"',
+                f'OSPASSWORD:"{self._os_password}"',
+                f'ODL_SYSTEM_IP:{self._sdn_controller_ip}',
+                f'PORT:{self._odl_webport}',
+                f'RESTCONFPORT:{self._odl_restconfport}']
             args[1].assert_called_once_with(
                 odl.ODLTests.basic_suite_dir, odl.ODLTests.neutron_suite_dir,
                 include=[],
         self.defaultargs['odlip'] = self._sdn_controller_ip
         self.assertEqual(
             self.parser.parse_args(
-                ["--neutronurl={}".format(self._neutron_url),
-                 "--odlip={}".format(self._sdn_controller_ip)]),
+                [f"--neutronurl={self._neutron_url}",
+                 f"--odlip={self._sdn_controller_ip}"]),
             self.defaultargs)
 
     @mock.patch('sys.stderr', new_callable=six.StringIO)
     def _test_arg(self, arg, value):
         self.defaultargs[arg] = value
         self.assertEqual(
-            self.parser.parse_args(["--{}={}".format(arg, value)]),
+            self.parser.parse_args([f"--{arg}={value}"]),
             self.defaultargs)
 
     def test_odlusername(self):
 
     def test_pushtodb(self):
         self.defaultargs['pushtodb'] = True
-        self.assertEqual(self.parser.parse_args(["--{}".format('pushtodb')]),
+        self.assertEqual(self.parser.parse_args(["--pushtodb"]),
                          self.defaultargs)
 
     def test_multiple_args(self):
         self.defaultargs['odlip'] = self._sdn_controller_ip
         self.assertEqual(
             self.parser.parse_args(
-                ["--neutronurl={}".format(self._neutron_url),
-                 "--odlip={}".format(self._sdn_controller_ip)]),
+                [f"--neutronurl={self._neutron_url}",
+                 f"--odlip={self._sdn_controller_ip}"]),
             self.defaultargs)
 
 
 
             self.cinder.prepare()
         args[0].assert_called_with()
         args[1].assert_called_once_with(
-            '{}-vm2_{}'.format(self.cinder.case_name, self.cinder.guid),
+            f'{self.cinder.case_name}-vm2_{self.cinder.guid}',
             security_groups=[self.cinder.sec.id],
             key_name=self.cinder.keypair.id)
         self.cinder.cloud.create_volume.assert_not_called()
         self.cinder.prepare()
         args[0].assert_called_once_with()
         args[1].assert_called_once_with(
-            '{}-vm2_{}'.format(self.cinder.case_name, self.cinder.guid),
+            f'{self.cinder.case_name}-vm2_{self.cinder.guid}',
             security_groups=[self.cinder.sec.id],
             key_name=self.cinder.keypair.id)
         self.cinder.connect.assert_called_once_with(args[1].return_value)
         self.cinder.cloud.create_volume.assert_called_once_with(
-            name='{}-volume_{}'.format(
-                self.cinder.case_name, self.cinder.guid),
+            name=f'{self.cinder.case_name}-volume_{self.cinder.guid}',
             size='2', timeout=self.cinder.volume_timeout, wait=True)
 
     @mock.patch('scp.SCPClient.put')
         self.cinder.ssh.exec_command.return_value = (None, stdout, mock.Mock())
         self.assertEqual(self.cinder._write_data(), 0)
         self.cinder.ssh.exec_command.assert_called_once_with(
-            "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+            f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}")
         self.cinder.cloud.attach_volume.assert_called_once_with(
             self.cinder.sshvm, self.cinder.volume,
             timeout=self.cinder.volume_timeout)
         stdout.channel.recv_exit_status.return_value = 0
         self.assertEqual(self.cinder._read_data(), 0)
         self.cinder.ssh2.exec_command.assert_called_once_with(
-            "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+            f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}")
         self.cinder.cloud.attach_volume.assert_called_once_with(
             self.cinder.vm2, self.cinder.volume,
             timeout=self.cinder.volume_timeout)
 
 
     @staticmethod
     def check_scenario_file(value):
-        yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+        yaml_file = 'opnfv-test_file_name.yaml'
         if yaml_file in value:
             return False
         return True
 
     @staticmethod
     def check_temp_dir(value):
-        yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+        yaml_file = 'opnfv-test_file_name.yaml'
         if yaml_file in value:
             return True
         return False
             self, mock_method, mock_os_makedirs, mock_path_exists):
         mock_path_exists.side_effect = self.check_temp_dir
 
-        yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+        yaml_file = 'opnfv-test_file_name.yaml'
         ret_val = os.path.join(self.rally_base.temp_dir, yaml_file)
         self.assertEqual(self.rally_base._prepare_test_list('test_file_name'),
                          ret_val)
     @mock.patch('subprocess.check_output',
                 side_effect=subprocess.CalledProcessError('', ''))
     def test_export_task_ko(self, *args):
-        file_name = "{}/{}.html".format(
-            self.rally_base.results_dir, self.rally_base.case_name)
+        file_name = (f"{self.rally_base.results_dir}/"
+                     f"{self.rally_base.case_name}.html")
         with self.assertRaises(subprocess.CalledProcessError):
             self.rally_base.export_task(file_name)
         cmd = ["rally", "task", "export", "--type", "html", "--deployment",
 
     @mock.patch('subprocess.check_output', return_value=b'')
     def test_export_task(self, *args):
-        file_name = "{}/{}.html".format(
-            self.rally_base.results_dir, self.rally_base.case_name)
+        file_name = (f"{self.rally_base.results_dir}/"
+                     f"{self.rally_base.case_name}.html")
         self.assertEqual(self.rally_base.export_task(file_name), None)
         cmd = ["rally", "task", "export", "--type", "html", "--deployment",
                str(getattr(config.CONF, 'rally_deployment_name')),
     @mock.patch('subprocess.check_output',
                 side_effect=subprocess.CalledProcessError('', ''))
     def test_verify_report_ko(self, *args):
-        file_name = "{}/{}.html".format(
-            self.rally_base.results_dir, self.rally_base.case_name)
+        file_name = (f"{self.rally_base.results_dir}/"
+                     f"{self.rally_base.case_name}.html")
         with self.assertRaises(subprocess.CalledProcessError):
             self.rally_base.verify_report(file_name, "1")
         cmd = ["rally", "verify", "report", "--type", "html", "--uuid", "1",
 
     @mock.patch('subprocess.check_output', return_value=b'')
     def test_verify_report(self, *args):
-        file_name = "{}/{}.html".format(
-            self.rally_base.results_dir, self.rally_base.case_name)
+        file_name = (f"{self.rally_base.results_dir}/"
+                     f"{self.rally_base.case_name}.html")
         self.assertEqual(self.rally_base.verify_report(file_name, "1"), None)
         cmd = ["rally", "verify", "report", "--type", "html", "--uuid", "1",
                "--to", file_name]
 
             testr_mode = self.tempestcommon.mode
         verifier_repo_dir = 'test_verifier_repo_dir'
         self.tempestcommon.verifier_repo_dir = verifier_repo_dir
-        cmd = "(cd {0}; stestr list '{1}' >{2} 2>/dev/null)".format(
-            verifier_repo_dir, testr_mode, self.tempestcommon.list)
+        cmd = (f"(cd {verifier_repo_dir}; stestr list '{testr_mode}' > "
+               f"{self.tempestcommon.list} 2>/dev/null)")
         self.tempestcommon.generate_test_list(mode=testr_mode)
         args[0].assert_called_once_with(cmd, shell=True)
         args[2].assert_called_once_with('/etc/tempest.conf')
 
     def test_generate_keys1(self, *args):
         self.testcase.generate_keys()
         self.testcase.cloud.create_keypair.assert_called_once_with(
-            'vmtp_{}'.format(self.testcase.guid))
+            f'vmtp_{self.testcase.guid}')
         self.testcase.cloud.delete_keypair.assert_called_once_with('id')
-        calls = [mock.call(self.testcase.privkey_filename, 'w'),
-                 mock.call(self.testcase.pubkey_filename, 'w')]
+        calls = [mock.call(
+                    self.testcase.privkey_filename, 'w', encoding='utf-8'),
+                 mock.call(
+                    self.testcase.pubkey_filename, 'w', encoding='utf-8')]
         args[0].assert_has_calls(calls, any_order=True)
 
     @mock.patch('six.moves.builtins.open')
                 side_effect=shade.OpenStackCloudException(None)) as mock_obj, \
                 self.assertRaises(shade.OpenStackCloudException):
             self.testcase.generate_keys()
-        mock_obj.assert_called_once_with('vmtp_{}'.format(self.testcase.guid))
+        mock_obj.assert_called_once_with(f'vmtp_{self.testcase.guid}')
         args[0].assert_not_called()
 
 
 
             self.vping.prepare()
         args[0].assert_called_once_with()
         args[1].assert_called_once_with(
-            '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
+            f'{self.vping.case_name}-vm2_{self.vping.guid}',
             security_groups=[self.vping.sec.id])
 
     @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
         self.vping.prepare()
         args[0].assert_called_once_with()
         args[1].assert_called_once_with(
-            '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
+            f'{self.vping.case_name}-vm2_{self.vping.guid}',
             security_groups=[self.vping.sec.id])
 
     @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
         with self.assertRaises(ssh_exception.SSHException):
             self.vping.execute()
         self.vping.ssh.exec_command.assert_called_once_with(
-            'ping -c 1 {}'.format(self.vping.vm2.private_v4))
+            f'ping -c 1 {self.vping.vm2.private_v4}')
         args[0].assert_called_once_with('foo')
 
     @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
             self.assertEqual(self.vping.execute(), ret)
             mock_check.assert_called_once_with('foo')
         self.vping.ssh.exec_command.assert_called_once_with(
-            'ping -c 1 {}'.format(self.vping.vm2.private_v4))
+            f'ping -c 1 {self.vping.vm2.private_v4}')
 
     def test_execute1(self):
         self._test_execute()
 
                 self.cmd, info=True, error_msg=self.error_msg, verbose=True,
                 output_file=self.output_file)
             self.assertEqual(resp, 1)
-            msg_exec = ("Executing command: '%s'" % self.cmd)
+            msg_exec = f"Executing command: '{self.cmd}'"
             mock_logger_info.assert_called_once_with(msg_exec)
-            mopen.assert_called_once_with(self.output_file, "w")
+            mopen.assert_called_once_with(
+                self.output_file, "w", encoding='utf-8')
             mock_logger_error.assert_called_once_with(self.error_msg)
 
     @mock.patch('functest.utils.functest_utils.LOGGER.info')
                 self.cmd, info=True, error_msg=self.error_msg, verbose=True,
                 output_file=self.output_file)
             self.assertEqual(resp, 0)
-            msg_exec = ("Executing command: '%s'" % self.cmd)
+            msg_exec = (f"Executing command: '{self.cmd}'")
             mock_logger_info.assert_called_once_with(msg_exec)
-            mopen.assert_called_once_with(self.output_file, "w")
+            mopen.assert_called_once_with(
+                self.output_file, "w", encoding='utf-8')
 
     @mock.patch('sys.stdout')
     def test_exec_cmd_args_missing_ok(self, stdout=None):
             mock_yaml.return_value = self.file_yaml
             functest_utils.get_parameter_from_yaml(self.parameter,
                                                    self.test_file)
-            self.assertTrue(("The parameter %s is not"
-                             " defined in config_functest.yaml" %
-                             self.parameter) in excep.exception)
+            self.assertTrue((f"The parameter {self.parameter} is not"
+                             " defined in config_functest.yaml"
+                             ) in excep.exception)
 
     def test_get_param_from_yaml_def(self):
         with mock.patch('six.moves.builtins.open', mock.mock_open()), \
 
     def __init__(self):
         try:
             with open(pkg_resources.resource_filename(
-                    'functest', 'ci/config_functest.yaml')) as yfile:
+                    'functest', 'ci/config_functest.yaml'),
+                    encoding='utf-8') as yfile:
                 self.functest_yaml = yaml.safe_load(yfile)
         except Exception as error:
             raise Exception(
-                'Parse config failed: {}'.format(str(error))) from error
+                f'Parse config failed: {str(error)}') from error
 
     @staticmethod
     def _merge_dicts(dict1, dict2):
                 yield (k, dict2[k])
 
     def patch_file(self, patch_file_path):
-        with open(patch_file_path) as yfile:
+        with open(patch_file_path, encoding='utf-8') as yfile:
             patch_file = yaml.safe_load(yfile)
 
         for key in patch_file:
     @staticmethod
     def _get_attr_further(attr_now, next):  # pylint: disable=redefined-builtin
         return attr_now if next == 'general' else (
-            '{}_{}'.format(attr_now, next) if attr_now else next)
+            f'{attr_now}_{next}' if attr_now else next)
 
     def fill(self):
         try:
             self._parse(None, self.functest_yaml)
         except Exception as error:
             raise Exception(
-                'Parse config failed: {}'.format(str(error))) from error
+                f'Parse config failed: {str(error)}') from error
 
 
 CONF = Config()
 
 def execute_command(cmd, info=False, error_msg="",
                     verbose=True, output_file=None):
     if not error_msg:
-        error_msg = ("The command '%s' failed." % cmd)
-    msg_exec = ("Executing command: '%s'" % cmd)
+        error_msg = f"The command '{cmd}' failed."
+    msg_exec = f"Executing command: '{cmd}'"
     if verbose:
         if info:
             LOGGER.info(msg_exec)
             cmd, shell=True, stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT) as popen:
         if output_file:
-            with open(output_file, "w") as ofd:
+            with open(output_file, "w", encoding='utf-8') as ofd:
                 for line in iter(popen.stdout.readline, b''):
                     if output_file:
                         ofd.write(line.decode("utf-8"))
     parameter must be given in string format with dots
     Example: general.openstack.image_name
     """
-    with open(yfile) as yfd:
+    with open(yfile, encoding='utf-8') as yfd:
         file_yaml = yaml.safe_load(yfd)
     value = file_yaml
     for element in parameter.split("."):
         value = value.get(element)
         if value is None:
-            raise ValueError("The parameter %s is not defined in"
-                             " %s" % (parameter, yfile))
+            raise ValueError(f"The parameter {parameter} is not defined in"
+                             f" {yfile}")
     return value
 
 
 def convert_dict_to_ini(value):
     "Convert dict to oslo.conf input"
     assert isinstance(value, dict)
-    return ",".join("{}:{}".format(
-        key, val) for (key, val) in six.iteritems(value))
+    return ",".join(f"{key}:{val}" for (key, val) in six.iteritems(value))
 
 
 def convert_list_to_ini(value):
     "Convert list to oslo.conf input"
     assert isinstance(value, list)
-    return ",".join("{}".format(val) for val in value)
+    return ",".join(val for val in value)
 
 
 def convert_ini_to_dict(value):
 
 sphinx!=1.6.6,!=1.6.7,!=2.1.0,!=3.0.0,!=3.4.2 # BSD
 sphinx-rtd-theme
 yamllint
-ansible-lint
 doc8 # Apache-2.0
 bashate # Apache-2.0
 bandit
 
 [tox]
-envlist = docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py39,cover,perm
+envlist = docs,pep8,pylint,yamllint,bashate,bandit,py39,cover,perm
 
 [testenv]
 pip_version = pip==20.2.4
 commands =
   yamllint -s {[testenv:yamllint]files}
 
-[testenv:ansiblelint]
-basepython = python3.9
-commands =
-  ansible-lint -x303 ansible/site.yml
-
 [testenv:py37]
 commands = nosetests functest/tests/unit
 
 
 git+https://gerrit.opnfv.org/gerrit/functest#egg=functest
 git+https://github.com/collivier/cloudify-rest-client.git@4.3.3-py3#egg=cloudify-rest-client
-robotframework===3.1.1
+robotframework===4.1.2
 robotframework-httplibrary===0.4.2
-robotframework-requests===0.5.0
-robotframework-sshlibrary===3.3.0
-ansible===2.9.2
+robotframework-requests===0.9.2
+robotframework-sshlibrary===4.1.2
 xtesting===0.93.0
 git+https://github.com/PyCQA/bandit@3d0824676974e7e2e9635c10bc4f12e261f1dbdf#egg=bandit
-bandit===1.1.0
+bandit===1.7.0
 ruamel.yaml.jinja2==0.2.2
 -e git+https://opendev.org/openstack/tempest#egg=tempest
 -e git+https://opendev.org/openstack/rally.git#egg=rally
 git+https://opendev.org/openstack/rally-openstack.git#egg=rally-openstack
 git+https://github.com/xrally/xrally-kubernetes.git#egg=xrally-kubernetes
-pylint===2.9.6
-flake8===3.7.9
+pylint===2.11.1
+flake8===4.0.1
 nose===1.3.7
-ruamel.yaml===0.15.100
+ruamel.yaml===0.17.17
 sphinxcontrib-spelling===4.3.0
-ansible-lint===4.2.0
-setuptools_scm===6.0.1
+ansible-lint===5.2.1
+setuptools_scm===6.3.2