1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
15 def load_json_file(full_path):
16 """Loads JSON from file
17 :param target_filename:
20 if not os.path.isfile(full_path):
21 raise Exception('File(%s) does not exist' % full_path)
23 with open(full_path, 'r') as file:
24 return json.load(file)
27 def write_json_file(full_path, data):
28 """write JSON from file
29 :param target_filename:
33 with open(full_path, 'w+') as file:
34 file.write(json.dumps(data))
37 def match_rep_in_file(regex, full_path):
38 if not os.path.isfile(full_path):
39 raise Exception('File(%s) does not exist' % full_path)
41 with open(full_path, 'r') as file:
43 result = re.search(regex, line)
50 def get_doctor_test_root_dir():
51 current_dir = os.path.split(os.path.realpath(__file__))[0]
52 return os.path.dirname(current_dir)
55 class SSHClient(object):
56 def __init__(self, ip, username, password=None, pkey=None,
57 key_filename=None, log=None, look_for_keys=False,
59 self.client = paramiko.SSHClient()
60 self.client.set_missing_host_key_policy(paramiko.WarningPolicy())
61 self.client.connect(ip, username=username, password=password,
62 pkey=pkey, key_filename=key_filename,
63 look_for_keys=look_for_keys,
64 allow_agent=allow_agent)
70 def ssh(self, command):
72 self.log.info("Executing: %s" % command)
73 stdin, stdout, stderr = self.client.exec_command(command)
74 ret = stdout.channel.recv_exit_status()
76 for line in stdout.read().splitlines():
77 output.append(line.decode('utf-8'))
80 self.log.info("*** FAILED to run command %s (%s)"
83 "Unable to run \ncommand: %s\nret: %s"
86 self.log.info("*** SUCCESSFULLY run command %s" % command)
89 def scp(self, source, dest, method='put'):
91 self.log.info("Copy %s -> %s" % (source, dest))
92 ftp = self.client.open_sftp()
101 from threading import Thread
102 from functools import wraps
105 def async_func(*args, **kwargs):
106 thread = Thread(target=func, args=args, kwargs=kwargs)