debug = False
report = None
erase = False
+ terminate = False
options = {}
storperf = StorPerfMaster()
argv = sys.argv
try:
try:
- opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh",
+ opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdTh",
["target=",
"workload=",
"report=",
"nowarm",
"verbose",
"debug",
+ "terminate",
"help",
])
except getopt.error, msg:
report = a
elif o in ("-e", "--erase"):
erase = True
+ elif o in ("-T", "--terminate"):
+ terminate = True
elif o in ("-f", "--configure"):
configuration = dict(x.split('=') for x in a.split(','))
raise Usage(content['message'])
return 0
+ if (terminate):
+ response = requests.delete(
+ 'http://127.0.0.1:5000/api/v1.0/job')
+ if (response.status_code == 400):
+ content = json.loads(response.content)
+ raise Usage(content['message'])
+ return 0
+
if (configuration is not None):
response = requests.post(
'http://127.0.0.1:5000/api/v1.0/configure', json=configuration)
else:
print "Calling start..."
response = requests.post(
- 'http://127.0.0.1:5000/api/v1.0/start', json=options)
+ 'http://127.0.0.1:5000/api/v1.0/job', json=options)
if (response.status_code == 400):
content = json.loads(response.content)
raise Usage(content['message'])
abort(400, str(e))
-class StartJob(Resource):
+class Job(Resource):
def __init__(self):
self.logger = logging.getLogger(__name__)
except Exception as e:
abort(400, str(e))
+ def delete(self):
+ try:
+ storperf.terminate_workloads()
+ return True
+ except Exception as e:
+ abort(400, str(e))
+
class Quota(Resource):
api.add_resource(Configure, "/api/v1.0/configure")
api.add_resource(Quota, "/api/v1.0/quota")
-api.add_resource(StartJob, "/api/v1.0/start")
+api.add_resource(Job, "/api/v1.0/job")
if __name__ == "__main__":
setup_logging()
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from threading import Thread
import json
import logging
import subprocess
+from threading import Thread
class FIOInvoker(object):
stderr=subprocess.PIPE)
t = Thread(target=self.stdout_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
t = Thread(target=self.stderr_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
+ self.logger.debug("Started fio on " + self.remote_host)
t.join()
+ self.logger.debug("Finished fio on " + self.remote_host)
+
+ def terminate(self):
+ self.logger.debug("Terminating fio on " + self.remote_host)
+ cmd = ['ssh', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ 'storperf@' + self.remote_host,
+ 'sudo', 'killall', '-9', 'fio']
+
+ kill_process = subprocess.Popen(cmd,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ for line in iter(kill_process.stdout.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stdout.close()
+
+ for line in iter(kill_process.stderr.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stderr.close()
self.stack_id = None
def execute_workloads(self):
-
if (self.stack_id is None):
raise ParameterError("ERROR: Stack does not exist")
self._test_executor.slaves = slaves
return self._test_executor.execute()
+ def terminate_workloads(self):
+ return self._test_executor.terminate()
+
def _setup_slave(self, slave):
logger = logging.getLogger(__name__ + ":" + slave)
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._terminated = False
+ self._workload_executors = []
self._workload_thread = None
@property
self._workload_thread.start()
return self.job_db.job_id
+ def terminate(self):
+ self._terminated = True
+ for workload in self._workload_executors:
+ workload.terminate()
+
def execute_workloads(self):
+ self._terminated = False
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
for blocksize in blocksizes:
for iodepth in iodepths:
+ if self._terminated:
+ return
+
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
for slave in self.slaves:
slave_workload = copy.copy(workload)
slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
t = Thread(target=self.execute_on_node,
args=(slave_workload,))
t.daemon = False
for slave_thread in slave_threads:
slave_thread.join()
+ self._workload_executors = []
+
def execute_on_node(self, workload):
invoker = FIOInvoker()
self.invoker.execute(args)
+ def terminate(self):
+ if self.invoker is not None:
+ self.invoker.terminate()
+
def setup(self):
pass