lfiles = glob.glob(os.path.join(
self.src_dir, 'results', 'cnf-testsuite-results-*.yml'))
results = max(lfiles, key=os.path.getmtime)
- with open(os.path.join(self.src_dir, 'results', results)) as yfile:
+ with open(os.path.join(
+ self.src_dir, 'results', results), encoding='utf-8') as yfile:
self.details = yaml.safe_load(yfile)
msg = prettytable.PrettyTable(
header_style='upper', padding_width=5,
for deployment in self.deployment_list:
with open(pkg_resources.resource_filename(
'functest_kubernetes',
- 'ims/{}-depl.yaml'.format(deployment))) as yfile:
+ 'ims/{}-depl.yaml'.format(deployment)),
+ encoding='utf-8') as yfile:
template = Template(yfile.read())
body = yaml.safe_load(template.render(
dockerhub_repo=os.getenv(
for service in self.deployment_list:
with open(pkg_resources.resource_filename(
'functest_kubernetes',
- 'ims/{}-svc.yaml'.format(service))) as yfile:
+ 'ims/{}-svc.yaml'.format(service)),
+ encoding='utf-8') as yfile:
body = yaml.safe_load(yfile)
resp = self.corev1.create_namespaced_service(
body=body, namespace=self.namespace)
cmd_line, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=env) as process:
boutput = process.stdout.read()
- with open(os.path.join(self.res_dir, 'e2e.log'), 'wb') as foutput:
+ with open(os.path.join(
+ self.res_dir, 'e2e.log'), 'w', encoding='utf-8') as foutput:
foutput.write(boutput)
grp = re.search(
r'^(FAIL|SUCCESS)!.* ([0-9]+) Passed \| ([0-9]+) Failed \|'
"GcrReleaseRegistry": "{}/gke-release".format(gcr_repo),
"MicrosoftRegistry": "mcr.microsoft.com",
}
- with open("{}/repositories.yml".format(self.res_dir), 'w') as file:
+ with open("{}/repositories.yml".format(
+ self.res_dir), 'w', encoding='utf-8') as file:
yaml.dump(repo_list, file)
result['existing@kubernetes']['message'])
return
with open(pkg_resources.resource_filename(
- 'functest_kubernetes', 'rally/all-in-one.yaml')) as file:
+ 'functest_kubernetes', 'rally/all-in-one.yaml'),
+ encoding='utf-8') as file:
template = Template(file.read())
task = yaml.safe_load(template.render(
concurrency=kwargs.get("concurrency", self.concurrency),
self.res_dir, "{}.html".format(self.case_name)))
if "files" in result:
for path in result["files"]:
- with open(path, "w+") as output:
+ with open(path, "w+", encoding='utf-8') as output:
output.write(result["files"][path])
result = rapi.task.export(
[task_instance["uuid"]], "junit-xml",
self.res_dir, "{}.xml".format(self.case_name)))
if "files" in result:
for path in result["files"]:
- with open(path, "w+") as output:
+ with open(path, "w+", encoding='utf-8') as output:
output.write(result["files"][path])
self.stop_time = time.time()
self.__logger.debug("create_namespace: %s", api_response)
with open(pkg_resources.resource_filename(
"functest_kubernetes",
- "security/{}.yaml".format(self.job_name))) as yfile:
+ "security/{}.yaml".format(self.job_name)),
+ encoding='utf-8') as yfile:
template = Template(yfile.read())
body = yaml.safe_load(template.render(
dockerhub_repo=os.getenv("DOCKERHUB_REPO",