Job run lifecycle rework 37/12637/3
authorMark Beierl <mark.beierl@emc.com>
Mon, 25 Apr 2016 13:55:03 +0000 (09:55 -0400)
committerMark Beierl <mark.beierl@emc.com>
Mon, 25 Apr 2016 14:51:31 +0000 (10:51 -0400)
Change the way slave jobs are managed so that they are in step
with each other, and we can track the overall thread that is
running them.  This lays groundwork for STORPERF-20 and
STORPERF-44

JIRA: STORPERF-33 STORPERF-43

Change-Id: Iaff48a2823ba85d6512e9782fd9091a72639835c
Signed-off-by: Mark Beierl <mark.beierl@emc.com>
ci/verify.sh
cli.py
rest_server.py
storperf/carbon/emitter.py
storperf/db/job_db.py
storperf/fio/fio_invoker.py
storperf/storperf_master.py
storperf/test_executor.py
storperf/tests/db_tests/job_db_test.py
storperf/tests/workload_tests/workload_subclass_test.py [new file with mode: 0644]
storperf/workloads/_base_workload.py

index 2f67e94..70ecb6a 100755 (executable)
@@ -19,6 +19,8 @@ virtualenv $WORKSPACE/storperf_venv
 source $WORKSPACE/storperf_venv/bin/activate
 
 pip install setuptools
+pip install autoflake=00.6.6
+pip install autopep8==1.2.2
 pip install coverage==4.0.3
 pip install flask==0.10
 pip install flask-restful==0.3.5
@@ -27,6 +29,7 @@ pip install flake8==2.5.4
 pip install html2text==2016.1.8
 pip install mock==1.3.0
 pip install nose==1.3.7
+pip install pysqlite==2.8.2
 pip install python-cinderclient==1.6.0
 pip install python-glanceclient==1.1.0
 pip install python-heatclient==0.8.0
diff --git a/cli.py b/cli.py
index 560d77d..5595314 100644 (file)
--- a/cli.py
+++ b/cli.py
@@ -10,7 +10,6 @@
 """
 
 from storperf.storperf_master import StorPerfMaster
-from storperf.test_executor import UnknownWorkload
 from threading import Thread
 import cPickle
 import getopt
@@ -18,13 +17,10 @@ import json
 import logging
 import logging.config
 import logging.handlers
-import os
 import socket
 import struct
 import sys
-import time
 
-import html2text
 import requests
 
 
@@ -49,7 +45,7 @@ class LogRecordStreamHandler(object):
             while True:
                 datagram = self.socket.recv(8192)
                 chunk = datagram[0:4]
-                slen = struct.unpack(">L", chunk)[0]
+                struct.unpack(">L", chunk)[0]
                 chunk = datagram[4:]
                 obj = cPickle.loads(chunk)
                 record = logging.makeLogRecord(obj)
index 073004a..ffb750e 100644 (file)
@@ -62,7 +62,6 @@ class Configure(Resource):
             storperf.delete_stack()
         except Exception as e:
             abort(400, str(e))
-        pass
 
 
 class StartJob(Resource):
@@ -87,6 +86,9 @@ class StartJob(Resource):
                 storperf.workloads = request.json['workload']
             else:
                 storperf.workloads = None
+            # Add block size, queue depth, number of passes here.
+            if ('workload' in request.json):
+                storperf.workloads = request.json['workload']
 
             job_id = storperf.execute_workloads()
 
index 8a9f734..6104fd4 100644 (file)
@@ -26,12 +26,12 @@ class CarbonMetricTransmitter():
         else:
             timestamp = str(calendar.timegm(time.gmtime()))
 
-        self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.carbon_socket.connect((self.carbon_host, self.carbon_port))
+        carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        carbon_socket.connect((self.carbon_host, self.carbon_port))
 
         for key, metric in metrics.items():
             message = key + " " + metric + " " + timestamp
             self.logger.debug("Metric: " + message)
-            self.carbon_socket.send(message + '\n')
+            carbon_socket.send(message + '\n')
 
-        self.carbon_socket.close()
+        carbon_socket.close()
index 57c82cb..8aa4c11 100644 (file)
@@ -76,11 +76,13 @@ class JobDB(object):
             db.commit()
             db.close()
 
-    def start_workload(self, workload_name):
+    def start_workload(self, workload):
         """
         Records the start time for the given workload
         """
 
+        workload_name = workload.fullname
+
         if (self.job_id is None):
             self.create_job_id()
 
@@ -122,13 +124,15 @@ class JobDB(object):
             db.commit()
             db.close()
 
-    def end_workload(self, workload_name):
+    def end_workload(self, workload):
         """
         Records the end time for the given workload
         """
         if (self.job_id is None):
             self.create_job_id()
 
+        workload_name = workload.fullname
+
         with db_mutex:
 
             db = sqlite3.connect(JobDB.db_name)
@@ -174,8 +178,6 @@ class JobDB(object):
 
         workload_prefix = workload_prefix + "%"
 
-        stats = ()
-
         start_time = str(calendar.timegm(time.gmtime()))
         end_time = "0"
 
index 5e30a76..fad2546 100644 (file)
@@ -8,7 +8,6 @@
 ##############################################################################
 
 from threading import Thread
-import cmd
 import json
 import logging
 import subprocess
@@ -48,17 +47,17 @@ class FIOInvoker(object):
                 self.json_body += line
                 try:
                     if line == "}\n":
-                        self.logger.debug(
-                            "Have a json snippet: %s", self.json_body)
                         json_metric = json.loads(self.json_body)
                         self.json_body = ""
 
                         for event_listener in self.event_listeners:
-                            event_listener(self.callback_id, json_metric)
-
+                            try:
+                                event_listener(self.callback_id, json_metric)
+                            except Exception, e:
+                                self.logger.error("Notifying listener %s: %s",
+                                                  self.callback_id, e)
                 except Exception, e:
                     self.logger.error("Error parsing JSON: %s", e)
-                    pass
         except ValueError:
             pass  # We might have read from the closed socket, ignore it
 
index a467aef..b4fef7f 100644 (file)
@@ -202,7 +202,6 @@ class StorPerfMaster(object):
             parameters=self._make_parameters())
 
         self.stack_id = stack['stack']['id']
-        pass
 
     def validate_stack(self):
         self._attach_to_openstack()
@@ -232,8 +231,6 @@ class StorPerfMaster(object):
         self._heat_client.stacks.delete(stack_id=self.stack_id)
         self.stack_id = None
 
-        pass
-
     def execute_workloads(self):
 
         if (self.stack_id is None):
index 734b514..497d17c 100644 (file)
@@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter
 from storperf.db.job_db import JobDB
 from storperf.fio.fio_invoker import FIOInvoker
 from threading import Thread
+import copy
 import imp
 import logging
 import os
@@ -37,6 +38,7 @@ class TestExecutor(object):
         self.prefix = None
         self.job_db = JobDB()
         self._slaves = []
+        self._workload_thread = None
 
     @property
     def slaves(self):
@@ -58,22 +60,9 @@ class TestExecutor(object):
             metric,
             callback_id)
 
-        read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
-        write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
-        read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
-        write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
-        message = "Average Latency us Read/Write: " + read_latency \
-            + "/" + write_latency + " IOPS r/w: " + \
-            read_iops + "/" + write_iops
-
-        for event_listener in self.event_listeners:
-            event_listener(message)
-
         self.metrics_emitter.transmit_metrics(carbon_metrics)
 
     def register_workloads(self, workloads):
-
         if (workloads is None or len(workloads) == 0):
             workload_dir = os.path.normpath(
                 os.path.join(os.path.dirname(__file__), "workloads"))
@@ -124,66 +113,59 @@ class TestExecutor(object):
         return None
 
     def execute(self):
-
         self.job_db.create_job_id()
-        for slave in self.slaves:
-            t = Thread(target=self.execute_on_node, args=(slave,))
-            t.daemon = False
-            t.start()
-
+        self._workload_thread = Thread(target=self.execute_workloads, args=())
+        self._workload_thread.start()
         return self.job_db.job_id
 
-    def execute_on_node(self, remote_host):
-
-        logger = logging.getLogger(__name__ + ":" + remote_host)
-
-        invoker = FIOInvoker()
-        invoker.remote_host = remote_host
-        invoker.register(self.event)
-
-        logger.info(
-            "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+    def execute_workloads(self):
         for workload_module in self.workload_modules:
-
             workload_name = getattr(workload_module, "__name__")
             constructorMethod = getattr(workload_module, workload_name)
-            logger.debug(
-                "Found workload: " + str(constructorMethod))
             workload = constructorMethod()
             if (self.filename is not None):
                 workload.filename = self.filename
-            workload.invoker = invoker
 
             if (workload_name.startswith("_")):
                 iodepths = [2, ]
-                blocksizes = [65536, ]
+                blocksizes = [8192, ]
             else:
                 iodepths = [1, 16, 128]
-                blocksizes = [4096, 65536, 1048576]
+                blocksizes = [8192, 4096, 512]
+
+            workload.id = self.job_db.job_id
 
             for blocksize in blocksizes:
                 for iodepth in iodepths:
-
-                    full_workload_name = workload_name + \
-                        ".host." + remote_host + \
-                        ".queue-depth." + str(iodepth) + \
-                        ".block-size." + str(blocksize)
-
                     workload.options['iodepth'] = str(iodepth)
                     workload.options['bs'] = str(blocksize)
-                    self.logger.info(
-                        "Executing workload: " + full_workload_name)
 
-                    invoker.callback_id = self.job_db.job_id + \
-                        "." + full_workload_name
+                    slave_threads = []
+                    for slave in self.slaves:
+                        slave_workload = copy.copy(workload)
+                        slave_workload.remote_host = slave
+                        t = Thread(target=self.execute_on_node,
+                                   args=(slave_workload,))
+                        t.daemon = False
+                        t.start()
+                        slave_threads.append(t)
+
+                    for slave_thread in slave_threads:
+                        slave_thread.join()
+
+    def execute_on_node(self, workload):
+
+        invoker = FIOInvoker()
+        invoker.register(self.event)
+        workload.invoker = invoker
+
+        self.logger.info("Starting " + workload.fullname)
 
-                    self.job_db.start_workload(full_workload_name)
-                    workload.execute()
-                    self.job_db.end_workload(full_workload_name)
+        self.job_db.start_workload(workload)
+        workload.execute()
+        self.job_db.end_workload(workload)
 
-        logger.info(
-            "Finished job " + self.job_db.job_id + " on " + remote_host)
+        self.logger.info("Ended " + workload.fullname)
 
     def fetch_results(self, job, workload_name=""):
         self.job_db.job_id = job
index 4620412..92b1482 100644 (file)
@@ -8,6 +8,7 @@
 ##############################################################################
 
 from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
 import os
 import sqlite3
 import unittest
@@ -60,7 +61,7 @@ class JobDBTest(unittest.TestCase):
         start_time = "12345"
         mock_calendar.side_effect = (start_time,)
         mock_uuid.side_effect = (job_id,)
-        workload_name = "Workload"
+        workload = rr()
 
         db = sqlite3.connect(JobDB.db_name)
         cursor = db.cursor()
@@ -69,26 +70,26 @@ class JobDBTest(unittest.TestCase):
             """select * from jobs
                        where job_id = ?
                        and workload = ?""",
-            (job_id, workload_name,))
+            (job_id, workload.fullname,))
 
         self.assertEqual(None,
                          row.fetchone(),
                          "Should not have been a row in the db")
 
-        self.job.start_workload(workload_name)
+        self.job.start_workload(workload)
 
         cursor.execute(
             """select job_id, workload, start from jobs
                        where job_id = ?
                        and workload = ?""",
-            (job_id, workload_name,))
+            (job_id, workload.fullname,))
 
         row = cursor.fetchone()
 
         self.assertNotEqual(None, row, "Should be a row in the db")
         self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
         self.assertEqual(
-            workload_name, row[1], "Did not expect " + str(row[1]))
+            workload.fullname, row[1], "Did not expect " + str(row[1]))
         self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
 
     @mock.patch("uuid.uuid4")
@@ -99,10 +100,10 @@ class JobDBTest(unittest.TestCase):
         end_time = "54321"
         mock_calendar.side_effect = (start_time, end_time,)
         mock_uuid.side_effect = (job_id,)
-        workload_name = "Workload"
+        workload = rr()
 
-        self.job.start_workload(workload_name)
-        self.job.end_workload(workload_name)
+        self.job.start_workload(workload)
+        self.job.end_workload(workload)
 
         db = sqlite3.connect(JobDB.db_name)
         cursor = db.cursor()
@@ -110,14 +111,14 @@ class JobDBTest(unittest.TestCase):
             """select job_id, workload, start, end from jobs
                        where job_id = ?
                        and workload = ?""",
-            (job_id, workload_name,))
+            (job_id, workload.fullname,))
 
         row = cursor.fetchone()
 
         self.assertNotEqual(None, row, "Should be a row in the db")
         self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
         self.assertEqual(
-            workload_name, row[1], "Did not expect " + str(row[1]))
+            workload.fullname, row[1], "Did not expect " + str(row[1]))
         self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
         self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
 
@@ -130,26 +131,26 @@ class JobDBTest(unittest.TestCase):
 
         mock_calendar.side_effect = (start_time_1, start_time_2)
         mock_uuid.side_effect = (job_id,)
-        workload_name = "Workload"
+        workload = rr()
 
         db = sqlite3.connect(JobDB.db_name)
         cursor = db.cursor()
 
-        self.job.start_workload(workload_name)
-        self.job.start_workload(workload_name)
+        self.job.start_workload(workload)
+        self.job.start_workload(workload)
 
         cursor.execute(
             """select job_id, workload, start from jobs
                        where job_id = ?
                        and workload = ?""",
-            (job_id, workload_name,))
+            (job_id, workload.fullname,))
 
         row = cursor.fetchone()
 
         self.assertNotEqual(None, row, "Should be a row in the db")
         self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
         self.assertEqual(
-            workload_name, row[1], "Did not expect " + str(row[1]))
+            workload.fullname, row[1], "Did not expect " + str(row[1]))
         self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
 
     @mock.patch("uuid.uuid4")
@@ -160,9 +161,9 @@ class JobDBTest(unittest.TestCase):
         end_time = "54321"
         mock_calendar.side_effect = (start_time, end_time,)
         mock_uuid.side_effect = (job_id,)
-        workload_name = "Workload"
+        workload = rr()
 
-        self.job.end_workload(workload_name)
+        self.job.end_workload(workload)
 
         db = sqlite3.connect(JobDB.db_name)
         cursor = db.cursor()
@@ -170,14 +171,14 @@ class JobDBTest(unittest.TestCase):
             """select job_id, workload, start, end from jobs
                        where job_id = ?
                        and workload = ?""",
-            (job_id, workload_name,))
+            (job_id, workload.fullname,))
 
         row = cursor.fetchone()
 
         self.assertNotEqual(None, row, "Should be a row in the db")
         self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
         self.assertEqual(
-            workload_name, row[1], "Did not expect " + str(row[1]))
+            workload.fullname, row[1], "Did not expect " + str(row[1]))
         # The start time is set to the same time as end if it was never set
         # before
         self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
diff --git a/storperf/tests/workload_tests/workload_subclass_test.py b/storperf/tests/workload_tests/workload_subclass_test.py
new file mode 100644 (file)
index 0000000..97b6b46
--- /dev/null
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+from storperf.workloads.rr import rr
+from storperf.workloads.rs import rs
+from storperf.workloads.rw import rw
+from storperf.workloads.wr import wr
+from storperf.workloads.ws import ws
+
+
+class WorkloadSubclassTest(unittest.TestCase):
+
+    def setUp(self):
+        pass
+
+    def test_local_name(self):
+        workload = rr()
+        self.assertEqual(workload.fullname,
+                         "None.rr.None.queue-depth.1.block-size.64k",
+                         workload.fullname)
+
+    def test_remote_name(self):
+        workload = rw()
+        workload.remote_host = "192.168.0.1"
+        self.assertEqual(workload.fullname,
+                         "None.rw.192-168-0-1.queue-depth.1.block-size.64k",
+                         workload.fullname)
+
+    def test_blocksize(self):
+        workload = rs()
+        workload.options["bs"] = "4k"
+        self.assertEqual(workload.fullname,
+                         "None.rs.None.queue-depth.1.block-size.4k",
+                         workload.fullname)
+
+    def test_queue_depth(self):
+        workload = wr()
+        workload.options["iodepth"] = "8"
+        self.assertEqual(workload.fullname,
+                         "None.wr.None.queue-depth.8.block-size.64k",
+                         workload.fullname)
+
+    def test_id(self):
+        workload = ws()
+        workload.id = "workloadid"
+        self.assertEqual(workload.fullname,
+                         "workloadid.ws.None.queue-depth.1.block-size.64k",
+                         workload.fullname)
index f7c14ad..4eccc08 100644 (file)
@@ -13,7 +13,7 @@ import logging
 class _base_workload(object):
 
     def __init__(self):
-        self.logger = logging.getLogger(__name__)
+        self.logger = logging.getLogger(self.__class__.__name__)
         self.default_filesize = "100%"
         self.filename = '/dev/vdb'
         self.options = {
@@ -25,12 +25,19 @@ class _base_workload(object):
             'numjobs': '1',
             'loops': '2',
             'output-format': 'json',
-            'status-interval': '600'
+            'status-interval': '60'
         }
         self.invoker = None
+        self.remote_host = None
+        self.id = None
 
     def execute(self):
+        if self.invoker is None:
+            raise ValueError("No invoker has been set")
+
         args = []
+        self.invoker.remote_host = self.remote_host
+        self.invoker.callback_id = self.fullname
 
         if self.filename.startswith("/dev"):
             self.options['size'] = "100%"
@@ -52,3 +59,19 @@ class _base_workload(object):
 
     def setup(self):
         pass
+
+    @property
+    def remote_host(self):
+        return str(self._remote_host)
+
+    @remote_host.setter
+    def remote_host(self, value):
+        self._remote_host = value
+
+    @property
+    def fullname(self):
+        return str(self.id) + "." + \
+            self.__class__.__name__ + "." + \
+            str(self.remote_host).replace(".", "-") + \
+            ".queue-depth." + str(self.options['iodepth']) + \
+            ".block-size." + str(self.options['bs'])