1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
16 def load_json_file(full_path):
17 """Loads JSON from file
18 :param target_filename:
21 if not os.path.isfile(full_path):
22 raise Exception('File(%s) does not exist' % full_path)
24 with open(full_path, 'r') as file:
25 return json.load(file)
28 def write_json_file(full_path, data):
29 """write JSON from file
30 :param target_filename:
34 with open(full_path, 'w+') as file:
35 file.write(json.dumps(data))
38 def match_rep_in_file(regex, full_path):
39 if not os.path.isfile(full_path):
40 raise Exception('File(%s) does not exist' % full_path)
42 with open(full_path, 'r') as file:
44 result = re.search(regex, line)
51 def get_doctor_test_root_dir():
52 current_dir = os.path.split(os.path.realpath(__file__))[0]
53 return os.path.dirname(current_dir)
56 class SSHClient(object):
57 def __init__(self, ip, username, password=None, pkey=None,
58 key_filename=None, log=None, look_for_keys=False,
60 self.client = paramiko.SSHClient()
61 self.client.set_missing_host_key_policy(paramiko.WarningPolicy())
62 self.client.connect(ip, username=username, password=password,
63 pkey=pkey, key_filename=key_filename,
64 look_for_keys=look_for_keys,
65 allow_agent=allow_agent)
71 def ssh(self, command, raise_enabled=True):
73 self.log.info("Executing: %s" % command)
74 stdin, stdout, stderr = self.client.exec_command(command)
75 ret = stdout.channel.recv_exit_status()
77 for line in stdout.read().splitlines():
78 output.append(line.decode('utf-8'))
79 if ret and raise_enabled:
81 self.log.info("*** FAILED to run command %s (%s)"
84 "Unable to run \ncommand: %s\nret: %s"
87 self.log.info("*** SUCCESSFULLY run command %s" % command)
90 def scp(self, source, dest, method='put'):
92 self.log.info("Copy %s -> %s" % (source, dest))
93 ftp = self.client.open_sftp()
101 class LocalSSH(object):
103 def __init__(self, log):
105 self.log.info('Init local ssh client')
109 output = "%s failed!!!" % cmd
111 output = subprocess.check_output((cmd), shell=True,
112 universal_newlines=True)
113 except subprocess.CalledProcessError:
117 def scp(self, src_file, dst_file):
118 return subprocess.check_output("cp %s %s" % (src_file, dst_file),
123 from threading import Thread
124 from functools import wraps
127 def async_func(*args, **kwargs):
128 thread = Thread(target=func, args=args, kwargs=kwargs)