Cancel Job API 83/12683/2
authorMark Beierl <mark.beierl@emc.com>
Tue, 26 Apr 2016 17:18:23 +0000 (13:18 -0400)
committerMark Beierl <mark.beierl@emc.com>
Tue, 26 Apr 2016 17:42:25 +0000 (13:42 -0400)
Add the ability to terminate a running job via the API

JIRA: STORPERF-20

Change-Id: I73a701cff9712207f5e14cfcc6b8fb7e0ab59aed
Signed-off-by: Mark Beierl <mark.beierl@emc.com>
cli.py
rest_server.py
storperf/fio/fio_invoker.py
storperf/storperf_master.py
storperf/test_executor.py
storperf/workloads/_base_workload.py

diff --git a/cli.py b/cli.py
index 5595314..1f20e31 100644 (file)
--- a/cli.py
+++ b/cli.py
@@ -64,6 +64,7 @@ def main(argv=None):
     debug = False
     report = None
     erase = False
+    terminate = False
     options = {}
 
     storperf = StorPerfMaster()
@@ -72,7 +73,7 @@ def main(argv=None):
         argv = sys.argv
     try:
         try:
-            opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh",
+            opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdTh",
                                        ["target=",
                                         "workload=",
                                         "report=",
@@ -82,6 +83,7 @@ def main(argv=None):
                                         "nowarm",
                                         "verbose",
                                         "debug",
+                                        "terminate",
                                         "help",
                                         ])
         except getopt.error, msg:
@@ -110,6 +112,8 @@ def main(argv=None):
                 report = a
             elif o in ("-e", "--erase"):
                 erase = True
+            elif o in ("-T", "--terminate"):
+                terminate = True
             elif o in ("-f", "--configure"):
                 configuration = dict(x.split('=') for x in a.split(','))
 
@@ -134,6 +138,14 @@ def main(argv=None):
                 raise Usage(content['message'])
             return 0
 
+        if (terminate):
+            response = requests.delete(
+                'http://127.0.0.1:5000/api/v1.0/job')
+            if (response.status_code == 400):
+                content = json.loads(response.content)
+                raise Usage(content['message'])
+            return 0
+
         if (configuration is not None):
             response = requests.post(
                 'http://127.0.0.1:5000/api/v1.0/configure', json=configuration)
@@ -146,7 +158,7 @@ def main(argv=None):
         else:
             print "Calling start..."
             response = requests.post(
-                'http://127.0.0.1:5000/api/v1.0/start', json=options)
+                'http://127.0.0.1:5000/api/v1.0/job', json=options)
             if (response.status_code == 400):
                 content = json.loads(response.content)
                 raise Usage(content['message'])
index ffb750e..1194ab5 100644 (file)
@@ -64,7 +64,7 @@ class Configure(Resource):
             abort(400, str(e))
 
 
-class StartJob(Resource):
+class Job(Resource):
 
     def __init__(self):
         self.logger = logging.getLogger(__name__)
@@ -97,6 +97,13 @@ class StartJob(Resource):
         except Exception as e:
             abort(400, str(e))
 
+    def delete(self):
+        try:
+            storperf.terminate_workloads()
+            return True
+        except Exception as e:
+            abort(400, str(e))
+
 
 class Quota(Resource):
 
@@ -129,7 +136,7 @@ def setup_logging(default_path='storperf/logging.json',
 
 api.add_resource(Configure, "/api/v1.0/configure")
 api.add_resource(Quota, "/api/v1.0/quota")
-api.add_resource(StartJob, "/api/v1.0/start")
+api.add_resource(Job, "/api/v1.0/job")
 
 if __name__ == "__main__":
     setup_logging()
index fad2546..4f39eb7 100644 (file)
@@ -7,10 +7,10 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-from threading import Thread
 import json
 import logging
 import subprocess
+from threading import Thread
 
 
 class FIOInvoker(object):
@@ -88,11 +88,35 @@ class FIOInvoker(object):
                                             stderr=subprocess.PIPE)
 
         t = Thread(target=self.stdout_handler, args=())
-        t.daemon = False
+        t.daemon = True
         t.start()
 
         t = Thread(target=self.stderr_handler, args=())
-        t.daemon = False
+        t.daemon = True
         t.start()
 
+        self.logger.debug("Started fio on " + self.remote_host)
         t.join()
+        self.logger.debug("Finished fio on " + self.remote_host)
+
+    def terminate(self):
+        self.logger.debug("Terminating fio on " + self.remote_host)
+        cmd = ['ssh', '-o', 'StrictHostKeyChecking=no',
+               '-i', 'storperf/resources/ssh/storperf_rsa',
+               'storperf@' + self.remote_host,
+               'sudo', 'killall', '-9', 'fio']
+
+        kill_process = subprocess.Popen(cmd,
+                                        universal_newlines=True,
+                                        stdout=subprocess.PIPE,
+                                        stderr=subprocess.PIPE)
+
+        for line in iter(kill_process.stdout.readline, b''):
+            self.logger.debug("FIO Termination: " + line)
+
+        kill_process.stdout.close()
+
+        for line in iter(kill_process.stderr.readline, b''):
+            self.logger.debug("FIO Termination: " + line)
+
+        kill_process.stderr.close()
index b4fef7f..c684ce6 100644 (file)
@@ -232,7 +232,6 @@ class StorPerfMaster(object):
         self.stack_id = None
 
     def execute_workloads(self):
-
         if (self.stack_id is None):
             raise ParameterError("ERROR: Stack does not exist")
 
@@ -255,6 +254,9 @@ class StorPerfMaster(object):
         self._test_executor.slaves = slaves
         return self._test_executor.execute()
 
+    def terminate_workloads(self):
+        return self._test_executor.terminate()
+
     def _setup_slave(self, slave):
         logger = logging.getLogger(__name__ + ":" + slave)
 
index 497d17c..aa8a415 100644 (file)
@@ -38,6 +38,8 @@ class TestExecutor(object):
         self.prefix = None
         self.job_db = JobDB()
         self._slaves = []
+        self._terminated = False
+        self._workload_executors = []
         self._workload_thread = None
 
     @property
@@ -118,7 +120,13 @@ class TestExecutor(object):
         self._workload_thread.start()
         return self.job_db.job_id
 
+    def terminate(self):
+        self._terminated = True
+        for workload in self._workload_executors:
+            workload.terminate()
+
     def execute_workloads(self):
+        self._terminated = False
         for workload_module in self.workload_modules:
             workload_name = getattr(workload_module, "__name__")
             constructorMethod = getattr(workload_module, workload_name)
@@ -137,6 +145,9 @@ class TestExecutor(object):
 
             for blocksize in blocksizes:
                 for iodepth in iodepths:
+                    if self._terminated:
+                        return
+
                     workload.options['iodepth'] = str(iodepth)
                     workload.options['bs'] = str(blocksize)
 
@@ -144,6 +155,9 @@ class TestExecutor(object):
                     for slave in self.slaves:
                         slave_workload = copy.copy(workload)
                         slave_workload.remote_host = slave
+
+                        self._workload_executors.append(slave_workload)
+
                         t = Thread(target=self.execute_on_node,
                                    args=(slave_workload,))
                         t.daemon = False
@@ -153,6 +167,8 @@ class TestExecutor(object):
                     for slave_thread in slave_threads:
                         slave_thread.join()
 
+                    self._workload_executors = []
+
     def execute_on_node(self, workload):
 
         invoker = FIOInvoker()
index 4eccc08..050a15c 100644 (file)
@@ -57,6 +57,10 @@ class _base_workload(object):
 
         self.invoker.execute(args)
 
+    def terminate(self):
+        if self.invoker is not None:
+            self.invoker.terminate()
+
     def setup(self):
         pass