Workload reporting 77/4377/1
authormbeierl <mark.beierl@emc.com>
Fri, 11 Dec 2015 20:31:17 +0000 (15:31 -0500)
committermbeierl <mark.beierl@emc.com>
Fri, 11 Dec 2015 20:31:17 +0000 (15:31 -0500)
Use a local db to track start/end times of runs so we
can go back to the carbon db and summarize values
at reporting time based off the raw data.

Change-Id: Ie62afd339fd1c15d82bc56c93c7cba5bd4f90fe2
JIRA: STORPERF-29
Signed-off-by: mbeierl <mark.beierl@emc.com>
storperf/carbon/emitter.py
storperf/db/__init__.py [new file with mode: 0644]
storperf/db/job_db.py [new file with mode: 0644]
storperf/fio/fio_invoker.py
storperf/main.py
storperf/test_executor.py
storperf/tests/db_tests/job_db_test.py [new file with mode: 0644]
storperf/workloads/_ssd_preconditioning.py
storperf/workloads/_warm_up.py

index e949238..8a9f734 100644 (file)
@@ -6,9 +6,8 @@
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-import logging
-
 import calendar
+import logging
 import socket
 import time
 
diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
new file mode 100644 (file)
index 0000000..a65fa78
--- /dev/null
@@ -0,0 +1,197 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from _sqlite3 import OperationalError
+import calendar
+import logging
+import sqlite3
+import time
+import uuid
+
+import requests
+
+
+class JobDB(object):
+
+    db_name = "StorPerf.db"
+
+    def __init__(self):
+        """
+        Creates the StorPerf.db and jobs tables on demand
+        """
+
+        self.logger = logging.getLogger(__name__)
+        self.logger.debug("Connecting to " + JobDB.db_name)
+        self.db = sqlite3.connect(JobDB.db_name)
+        self.job_id = None
+
+        cursor = self.db.cursor()
+        try:
+            cursor.execute('''CREATE TABLE jobs
+            (job_id text,
+            workload text,
+            start text,
+            end text)''')
+            self.logger.debug("Created job table")
+        except OperationalError:
+            self.logger.debug("Job table exists")
+
+        cursor.execute('SELECT * FROM jobs')
+
+    def create_job_id(self):
+        """
+        Returns a job id that is guaranteed to be unique in this
+        StorPerf instance.
+        """
+        cursor = self.db.cursor()
+
+        self.job_id = str(uuid.uuid4())
+        row = cursor.execute(
+            "select * from jobs where job_id = ?", (self.job_id,))
+
+        while (row.fetchone() is not None):
+            self.logger.info("Duplicate job id found, regenerating")
+            self.job_id = str(uuid.uuid4())
+            row = cursor.execute(
+                "select * from jobs where job_id = ?", (self.job_id,))
+
+        cursor.execute(
+            "insert into jobs(job_id) values (?)", (self.job_id,))
+        self.logger.debug("Reserved job id " + self.job_id)
+        self.db.commit()
+
+    def start_workload(self, workload_name):
+        """
+        Records the start time for the given workload
+        """
+        if (self.job_id is None):
+            self.create_job_id()
+
+        cursor = self.db.cursor()
+        now = str(calendar.timegm(time.gmtime()))
+
+        row = cursor.execute(
+            """select * from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (self.job_id, workload_name,))
+
+        if (row.fetchone() is None):
+            cursor.execute(
+                """insert into jobs
+                           (job_id,
+                           workload,
+                           start)
+                           values (?, ?, ?)""",
+                (self.job_id,
+                 workload_name,
+                 now,))
+        else:
+            self.logger.warn("Duplicate start time for workload "
+                             + workload_name)
+            cursor.execute(
+                """update jobs set
+                           job_id = ?,
+                           start = ?
+                           where workload = ?""",
+                (self.job_id,
+                 now,
+                 workload_name,))
+
+        self.db.commit()
+
+    def end_workload(self, workload_name):
+        """
+        Records the end time for the given workload
+        """
+        if (self.job_id is None):
+            self.create_job_id()
+
+        cursor = self.db.cursor()
+        now = str(calendar.timegm(time.gmtime()))
+
+        row = cursor.execute(
+            """select * from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (self.job_id, workload_name,))
+
+        if (row.fetchone() is None):
+            self.logger.warn("No start time recorded for workload "
+                             + workload_name)
+            cursor.execute(
+                """insert into jobs
+                           (job_id,
+                           workload,
+                           start,
+                           end)
+                           values (?, ?, ?, ?)""",
+                (self.job_id,
+                 workload_name,
+                 now,
+                 now))
+        else:
+            cursor.execute(
+                """update jobs set
+                           job_id = ?,
+                           end = ?
+                           where workload = ?""",
+                (self.job_id,
+                 now,
+                 workload_name,))
+
+        self.db.commit()
+
+    def fetch_results(self, workload_prefix=""):
+        if (workload_prefix is None):
+            workload_prefix = ""
+
+        workload_prefix = workload_prefix + "%"
+
+        stats = ()
+
+        start_time = str(calendar.timegm(time.gmtime()))
+        end_time = "0"
+
+        self.logger.debug("Workload like: " + workload_prefix)
+
+        cursor = self.db.cursor()
+        cursor.execute("""select start, end, workload
+            from jobs where workload like ?""",
+                       (workload_prefix,))
+
+        while (True):
+            row = cursor.fetchone()
+            if (row is None):
+                break
+
+            start_time = str(row[0])
+            end_time = str(row[1])
+            workload = str(row[2])
+
+            # for most of these stats, we just want the final one
+            # as that is cumulative average or whatever for the whole
+            # run
+
+            self.logger.info("workload=" + workload +
+                             "start=" + start_time + " end=" + end_time)
+
+            request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
+                '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
+                start_time + "&until=" + end_time
+
+            print '\n\t' + request + '\n'
+
+            response = requests.get(request)
+
+            if (response.status_code == 200):
+                data = response.json()
+                print data
+            else:
+                pass
index be1b37c..0b13349 100644 (file)
@@ -7,11 +7,10 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import subprocess
 import json
-from threading import Thread
-
 import logging
+import subprocess
+from threading import Thread
 
 
 class FIOInvoker(object):
@@ -28,30 +27,33 @@ class FIOInvoker(object):
 
     def stdout_handler(self):
         self.json_body = ""
-        for line in iter(self.fio_process.stdout.readline, b''):
-            if line.startswith("fio"):
-                line = ""
-                continue
-            self.json_body += line
-            try:
-                if line == "}\n":
-                    self.logger.debug(
-                        "Have a json snippet: %s", self.json_body)
-                    json_metric = json.loads(self.json_body)
-                    self.json_body = ""
-
-                    for event_listener in self.event_listeners:
-                        event_listener(json_metric)
-
-            except Exception, e:
-                self.logger.error("Error parsing JSON: %s", e)
-                pass
+        try:
+            for line in iter(self.fio_process.stdout.readline, b''):
+                if line.startswith("fio"):
+                    line = ""
+                    continue
+                self.json_body += line
+                try:
+                    if line == "}\n":
+                        self.logger.debug(
+                            "Have a json snippet: %s", self.json_body)
+                        json_metric = json.loads(self.json_body)
+                        self.json_body = ""
+
+                        for event_listener in self.event_listeners:
+                            event_listener(json_metric)
+
+                except Exception, e:
+                    self.logger.error("Error parsing JSON: %s", e)
+                    pass
+        except ValueError:
+            pass  # We might have read from the closed socket, ignore it
 
         self.fio_process.stdout.close()
 
     def stderr_handler(self):
         for line in iter(self.fio_process.stderr.readline, b''):
-            print line
+            self.logger.error("FIO Error: %s", line)
 
         self.fio_process.stderr.close()
 
index 2423b99..11357f4 100644 (file)
@@ -52,18 +52,22 @@ def main(argv=None):
     setup_logging()
     test_executor = TestExecutor()
     verbose = False
+    debug = False
     workloads = None
+    report = None
 
     if argv is None:
         argv = sys.argv
     try:
         try:
-            opts, args = getopt.getopt(argv[1:], "t:w:scvh",
+            opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh",
                                        ["target=",
                                         "workload=",
+                                        "report=",
                                         "nossd",
                                         "nowarm",
                                         "verbose",
+                                        "debug",
                                         "help",
                                         ])
         except getopt.error, msg:
@@ -75,14 +79,23 @@ def main(argv=None):
                 return 0
             elif o in ("-t", "--target"):
                 test_executor.filename = a
+            elif o in ("-t", "--target"):
+                report = a
             elif o in ("-v", "--verbose"):
                 verbose = True
+            elif o in ("-d", "--debug"):
+                debug = True
             elif o in ("-s", "--nossd"):
                 test_executor.precondition = False
             elif o in ("-c", "--nowarm"):
                 test_executor.warm = False
             elif o in ("-w", "--workload"):
                 workloads = a.split(",")
+            elif o in ("-r", "--report"):
+                report = a
+
+        if (debug):
+            logging.getLogger().setLevel(logging.DEBUG)
 
         test_executor.register_workloads(workloads)
 
@@ -98,7 +111,10 @@ def main(argv=None):
     if (verbose):
         test_executor.register(event)
 
-    test_executor.execute()
+    if (report is not None):
+        print test_executor.fetch_results(report, workloads)
+    else:
+        test_executor.execute()
 
 if __name__ == "__main__":
     sys.exit(main())
index 5fb88d4..462f06b 100644 (file)
@@ -9,15 +9,15 @@
 
 import imp
 import logging
-import os
-import socket
-
 from os import listdir
+import os
 from os.path import isfile, join
+import socket
 
-from fio.fio_invoker import FIOInvoker
-from carbon.emitter import CarbonMetricTransmitter
 from carbon.converter import JSONToCarbon
+from carbon.emitter import CarbonMetricTransmitter
+from db.job_db import JobDB
+from fio.fio_invoker import FIOInvoker
 
 
 class UnknownWorkload(Exception):
@@ -37,8 +37,8 @@ class TestExecutor(object):
         self.event_listeners = set()
         self.metrics_converter = JSONToCarbon()
         self.metrics_emitter = CarbonMetricTransmitter()
-        self.prefix = socket.getfqdn()
-        self.job_id = None
+        self.prefix = None
+        self.job_db = JobDB()
 
     def register(self, event_listener):
         self.event_listeners.add(event_listener)
@@ -50,6 +50,7 @@ class TestExecutor(object):
         carbon_metrics = self.metrics_converter.convert_to_dictionary(
             metric,
             self.prefix)
+
         read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
         write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
         read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
@@ -66,7 +67,7 @@ class TestExecutor(object):
 
     def register_workloads(self, workloads):
 
-        if (workloads is None or workloads.length() == 0):
+        if (workloads is None or len(workloads) == 0):
             workload_dir = os.path.normpath(
                 os.path.join(os.path.dirname(__file__), "workloads"))
 
@@ -112,26 +113,53 @@ class TestExecutor(object):
             return imp.load_source(mname, no_ext + '.py')
         return None
 
-    def create_job_id(self):
-        return 1234
-
     def execute(self):
-        if (self.job_id is None):
-            self.job_id = self.create_job_id()
+
+        shortname = socket.getfqdn().split('.')[0]
 
         invoker = FIOInvoker()
         invoker.register(self.event)
-
-        for numjobs in [1, 2, 4]:
-
-            for workload_module in self.workload_modules:
-                constructor = getattr(workload_module, "__name__")
-                constructorMethod = getattr(workload_module, constructor)
-                self.logger.debug(
-                    "Found constructor: " + str(constructorMethod))
-                workload = constructorMethod()
-                workload.filename = self.filename
-                workload.invoker = invoker
-                workload.options['iodepth'] = str(numjobs)
-                self.logger.info("Executing workload: " + constructor)
-                workload.execute()
+        self.job_db.create_job_id()
+        self.logger.info("Starting job " + self.job_db.job_id)
+
+        for workload_module in self.workload_modules:
+
+            workload_name = getattr(workload_module, "__name__")
+            constructorMethod = getattr(workload_module, workload_name)
+            self.logger.debug(
+                "Found workload: " + str(constructorMethod))
+            workload = constructorMethod()
+            workload.filename = self.filename
+            workload.invoker = invoker
+
+            if (workload_name.startswith("_")):
+                iodepths = [2, ]
+                blocksizes = [4096, ]
+            else:
+                iodepths = [1, 16, 128]
+                blocksizes = [4096, 65536, 1048576]
+
+            for blocksize in blocksizes:
+                for iodepth in iodepths:
+
+                    full_workload_name = workload_name + \
+                        ".queue-depth." + str(iodepth) + \
+                        ".block-size." + str(blocksize)
+
+                    workload.options['iodepth'] = str(iodepth)
+                    workload.options['bs'] = str(blocksize)
+                    self.logger.info(
+                        "Executing workload: " + full_workload_name)
+
+                    self.prefix = shortname + "." + self.job_db.job_id + \
+                        "." + full_workload_name
+
+                    self.job_db.start_workload(full_workload_name)
+                    workload.execute()
+                    self.job_db.end_workload(full_workload_name)
+
+        self.logger.info("Finished job " + self.job_db.job_id)
+
+    def fetch_results(self, job, workload_name=""):
+        self.job_db.job_id = job
+        return self.job_db.fetch_results(workload_name)
diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py
new file mode 100644 (file)
index 0000000..d9b10a2
--- /dev/null
@@ -0,0 +1,174 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import unittest
+
+import mock
+
+from db.job_db import JobDB
+
+
+class JobDBTest(unittest.TestCase):
+
+    def setUp(self):
+        JobDB.db_name = ":memory:"
+        self.job = JobDB()
+
+    @mock.patch("uuid.uuid4")
+    def test_create_job(self, mock_uuid):
+        expected = "ABCDE-12345"
+        mock_uuid.side_effect = (expected,)
+
+        self.job.create_job_id()
+
+        actual = self.job.job_id
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    @mock.patch("uuid.uuid4")
+    def test_duplicate_job_generated(self, mock_uuid):
+        duplicate = "EDCBA-12345"
+        expected = "EDCBA-54321"
+
+        mock_uuid.side_effect = (duplicate, duplicate, expected,)
+
+        self.job.create_job_id()
+        self.job.create_job_id()
+
+        actual = self.job.job_id
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    @mock.patch("uuid.uuid4")
+    @mock.patch("calendar.timegm")
+    def test_start_job(self, mock_calendar, mock_uuid):
+        job_id = "ABCDE-12345"
+        start_time = "12345"
+        mock_calendar.side_effect = (start_time,)
+        mock_uuid.side_effect = (job_id,)
+        workload_name = "Workload"
+
+        cursor = self.job.db.cursor()
+
+        row = cursor.execute(
+            """select * from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (job_id, workload_name,))
+
+        self.assertEqual(None,
+                         row.fetchone(),
+                         "Should not have been a row in the db")
+
+        self.job.start_workload(workload_name)
+
+        cursor.execute(
+            """select job_id, workload, start from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (job_id, workload_name,))
+
+        row = cursor.fetchone()
+
+        self.assertNotEqual(None, row, "Should be a row in the db")
+        self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+        self.assertEqual(
+            workload_name, row[1], "Did not expect " + str(row[1]))
+        self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+
+    @mock.patch("uuid.uuid4")
+    @mock.patch("calendar.timegm")
+    def test_end_job(self, mock_calendar, mock_uuid):
+        job_id = "ABCDE-12345"
+        start_time = "12345"
+        end_time = "54321"
+        mock_calendar.side_effect = (start_time, end_time,)
+        mock_uuid.side_effect = (job_id,)
+        workload_name = "Workload"
+
+        self.job.start_workload(workload_name)
+        self.job.end_workload(workload_name)
+
+        cursor = self.job.db.cursor()
+        cursor.execute(
+            """select job_id, workload, start, end from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (job_id, workload_name,))
+
+        row = cursor.fetchone()
+
+        self.assertNotEqual(None, row, "Should be a row in the db")
+        self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+        self.assertEqual(
+            workload_name, row[1], "Did not expect " + str(row[1]))
+        self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+        self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
+
+    @mock.patch("uuid.uuid4")
+    @mock.patch("calendar.timegm")
+    def test_duplicate_start_job(self, mock_calendar, mock_uuid):
+        job_id = "ABCDE-12345"
+        start_time_1 = "12345"
+        start_time_2 = "12346"
+
+        mock_calendar.side_effect = (start_time_1, start_time_2)
+        mock_uuid.side_effect = (job_id,)
+        workload_name = "Workload"
+
+        cursor = self.job.db.cursor()
+
+        self.job.start_workload(workload_name)
+        self.job.start_workload(workload_name)
+
+        cursor.execute(
+            """select job_id, workload, start from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (job_id, workload_name,))
+
+        row = cursor.fetchone()
+
+        self.assertNotEqual(None, row, "Should be a row in the db")
+        self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+        self.assertEqual(
+            workload_name, row[1], "Did not expect " + str(row[1]))
+        self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
+
+    @mock.patch("uuid.uuid4")
+    @mock.patch("calendar.timegm")
+    def test_end_job_without_start(self, mock_calendar, mock_uuid):
+        job_id = "ABCDE-12345"
+        start_time = "12345"
+        end_time = "54321"
+        mock_calendar.side_effect = (start_time, end_time,)
+        mock_uuid.side_effect = (job_id,)
+        workload_name = "Workload"
+
+        self.job.end_workload(workload_name)
+
+        cursor = self.job.db.cursor()
+        cursor.execute(
+            """select job_id, workload, start, end from jobs
+                       where job_id = ?
+                       and workload = ?""",
+            (job_id, workload_name,))
+
+        row = cursor.fetchone()
+
+        self.assertNotEqual(None, row, "Should be a row in the db")
+        self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
+        self.assertEqual(
+            workload_name, row[1], "Did not expect " + str(row[1]))
+        # The start time is set to the same time as end if it was never set
+        # before
+        self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
+        self.assertEqual(start_time, row[3], "Did not expect " + str(row[3]))
index 66d5fa1..e1e8bef 100644 (file)
@@ -13,4 +13,5 @@ class _ssd_preconditioning(_base_workload._base_workload):
 
     def setup(self):
         self.options['name'] = 'ssd_preconditioning'
-        self.options['rw'] = 'write'
+        self.options['rw'] = 'randwrite'
+        self.options['loops'] = '1'
index 8eaa2f1..27667ca 100644 (file)
@@ -12,6 +12,6 @@ from workloads import _base_workload
 class _warm_up(_base_workload._base_workload):
 
     def setup(self):
-        self.options['name'] = 'ssd_preconditioning'
-        self.options['rw'] = 'randwrite'
-        self.options['loops'] = '4'
+        self.options['name'] = 'warm_up'
+        self.options['rw'] = 'write'
+        self.options['loops'] = '1'