Data Handling Refactoring 43/25043/3
authorMark Beierl <mark.beierl@dell.com>
Mon, 28 Nov 2016 21:17:05 +0000 (16:17 -0500)
committerMark Beierl <mark.beierl@dell.com>
Wed, 30 Nov 2016 21:45:08 +0000 (21:45 +0000)
Break out test db interaction into new module and
make the push event driven instead of the sleep
that was there before.

Change-Id: I9485aba1405f6c3b4ee5000168fbc037efa87c81
JIRA: STORPERF-90
Signed-off-by: Mark Beierl <mark.beierl@dell.com>
storperf/test_executor.py
storperf/utilities/data_handler.py [new file with mode: 0644]
tests/db_tests/job_db_test.py
tests/utilities_tests/data_handler_test.py [new file with mode: 0644]

index 3c456a6..530ce80 100644 (file)
@@ -19,11 +19,10 @@ import time
 
 from storperf.carbon.converter import Converter
 from storperf.carbon.emitter import CarbonMetricTransmitter
-from storperf.db import test_results_db
-from storperf.db.graphite_db import GraphiteDB
 from storperf.db.job_db import JobDB
 from storperf.fio.fio_invoker import FIOInvoker
-from storperf.utilities import dictionary
+from storperf.utilities.thread_gate import ThreadGate
+from utilities.data_handler import DataHandler
 
 
 class UnknownWorkload(Exception):
@@ -39,7 +38,9 @@ class TestExecutor(object):
         self.precondition = True
         self.deadline = None
         self.warm = True
-        self.metadata = None
+        self.metadata = {}
+        self.start_time = None
+        self.end_time = None
         self._queue_depths = [1, 4, 8]
         self._block_sizes = [512, 4096, 16384]
         self.event_listeners = set()
@@ -51,6 +52,7 @@ class TestExecutor(object):
         self._terminated = False
         self._workload_executors = []
         self._workload_thread = None
+        self._thread_gate = None
 
     @property
     def slaves(self):
@@ -74,6 +76,10 @@ class TestExecutor(object):
     def block_sizes(self):
         return ','.join(self._block_sizes)
 
+    @property
+    def terminated(self):
+        return self._terminated
+
     @block_sizes.setter
     def block_sizes(self, block_sizes):
         self.logger.debug("Set block_sizes to: " + str(block_sizes))
@@ -92,6 +98,16 @@ class TestExecutor(object):
 
         self.metrics_emitter.transmit_metrics(carbon_metrics)
 
+        if self._thread_gate.report(callback_id):
+            self.broadcast_event()
+
+    def broadcast_event(self):
+        for event_listener in self.event_listeners:
+            try:
+                event_listener(self)
+            except Exception, e:
+                self.logger.error("Notifying listener: %s", e)
+
     def register_workloads(self, workloads):
         self.workload_modules = []
 
@@ -178,8 +194,10 @@ class TestExecutor(object):
     def execute_workloads(self):
         self._terminated = False
         self.logger.info("Starting job %s" % (self.job_db.job_id))
+        data_handler = DataHandler()
+        self.register(data_handler.data_event)
 
-        start_time = time.time()
+        self.start_time = time.time()
 
         for workload_module in self.workload_modules:
             workload_name = getattr(workload_module, "__name__")
@@ -198,6 +216,8 @@ class TestExecutor(object):
                 blocksizes = self._block_sizes
 
             workload.id = self.job_db.job_id
+            self._thread_gate = ThreadGate(len(self.slaves),
+                                           workload.options['status-interval'])
 
             for blocksize in blocksizes:
                 for iodepth in iodepths:
@@ -243,56 +263,9 @@ class TestExecutor(object):
             self.logger.info("Completed workload %s" % (workload_name))
         self.logger.info("Completed job %s" % (self.job_db.job_id))
 
-        end_time = time.time()
-        pod_name = dictionary.get_key_from_dict(self.metadata,
-                                                'pod_name',
-                                                'Unknown')
-        version = dictionary.get_key_from_dict(self.metadata,
-                                               'version',
-                                               'Unknown')
-        scenario = dictionary.get_key_from_dict(self.metadata,
-                                                'scenario_name',
-                                                'Unknown')
-        build_tag = dictionary.get_key_from_dict(self.metadata,
-                                                 'build_tag',
-                                                 'Unknown')
-        duration = end_time - start_time
-        test_db = os.environ.get('TEST_DB_URL')
-
-        if test_db is not None:
-            # I really do not like doing this.  As our threads just
-            # terminated, their final results are still being spooled
-            # off to Carbon.  Need to give that a little time to finish
-            time.sleep(5)
-            self.logger.info("Pushing results to %s" % (test_db))
-
-            payload = self.metadata
-            payload['timestart'] = start_time
-            payload['duration'] = duration
-            payload['status'] = 'OK'
-            graphite_db = GraphiteDB()
-            payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
-            criteria = {}
-            criteria['block_sizes'] = self.block_sizes
-            criteria['queue_depths'] = self.queue_depths
-
-            try:
-                test_results_db.push_results_to_db(test_db,
-                                                   "storperf",
-                                                   "Latency Test",
-                                                   start_time,
-                                                   end_time,
-                                                   self.logger,
-                                                   pod_name,
-                                                   version,
-                                                   scenario,
-                                                   criteria,
-                                                   build_tag,
-                                                   payload)
-            except:
-                self.logger.exception("Error pushing results into Database")
-
+        self.end_time = time.time()
         self._terminated = True
+        self.broadcast_event()
 
     def execute_on_node(self, workload):
 
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
new file mode 100644 (file)
index 0000000..03c764c
--- /dev/null
@@ -0,0 +1,79 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import dictionary
+
+
+class DataHandler(object):
+
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+
+    """
+    """
+
+    def data_event(self, executor):
+        self.logger.info("Event received")
+
+        # Data lookup
+
+        if executor.terminated:
+            self._push_to_db(executor)
+
+    def _push_to_db(self, executor):
+        test_db = os.environ.get('TEST_DB_URL')
+
+        if test_db is not None:
+            pod_name = dictionary.get_key_from_dict(executor.metadata,
+                                                    'pod_name',
+                                                    'Unknown')
+            version = dictionary.get_key_from_dict(executor.metadata,
+                                                   'version',
+                                                   'Unknown')
+            scenario = dictionary.get_key_from_dict(executor.metadata,
+                                                    'scenario_name',
+                                                    'Unknown')
+            build_tag = dictionary.get_key_from_dict(executor.metadata,
+                                                     'build_tag',
+                                                     'Unknown')
+            duration = executor.end_time - executor.start_time
+
+            self.logger.info("Pushing results to %s" % (test_db))
+
+            payload = executor.metadata
+            payload['timestart'] = executor.start_time
+            payload['duration'] = duration
+            payload['status'] = 'OK'
+            graphite_db = GraphiteDB()
+            payload['metrics'] = graphite_db.fetch_averages(
+                executor.job_db.job_id)
+            criteria = {}
+            criteria['block_sizes'] = executor.block_sizes
+            criteria['queue_depths'] = executor.queue_depths
+
+            try:
+                test_results_db.push_results_to_db(test_db,
+                                                   "storperf",
+                                                   "Latency Test",
+                                                   executor.start_time,
+                                                   executor.end_time,
+                                                   self.logger,
+                                                   pod_name,
+                                                   version,
+                                                   scenario,
+                                                   criteria,
+                                                   build_tag,
+                                                   payload)
+            except:
+                self.logger.exception("Error pushing results into Database")
index fe3d9f1..ccfb9cc 100644 (file)
@@ -7,14 +7,15 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-from storperf.db.job_db import JobDB
-from storperf.workloads.rr import rr
 import os
 import sqlite3
 import unittest
 
 import mock
 
+from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
+
 
 class JobDBTest(unittest.TestCase):
 
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
new file mode 100644 (file)
index 0000000..482b98e
--- /dev/null
@@ -0,0 +1,76 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import unittest
+
+import mock
+
+from storperf.utilities.data_handler import DataHandler
+
+
+class MockGraphiteDB(object):
+
+    def __init__(self):
+        self.called = False
+
+    def fetch_averages(self, job_id):
+        self.called = True
+        return None
+
+
+class DataHandlerTest(unittest.TestCase):
+
+    def setUp(self):
+        self.event_listeners = set()
+        self.data_handler = DataHandler()
+        self._terminated = False
+        self.args = None
+        self.start_time = 0
+        self.end_time = 1
+        self.metadata = {}
+        self.block_sizes = "1"
+        self.queue_depths = "1"
+        mock.job_id = "1"
+        self.job_db = mock
+        self.pushed = False
+        pass
+
+    @property
+    def terminated(self):
+        return self._terminated
+
+    def push_results_to_db(self, *args):
+        self.pushed = True
+        pass
+
+    def test_not_terminated_report(self):
+        self.data_handler.data_event(self)
+
+    @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+    @mock.patch("storperf.db.test_results_db.push_results_to_db")
+    @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+    def test_terminated_report(self, mock_graphite_db, mock_results_db):
+        self._terminated = True
+        mock_results_db.side_effect = self.push_results_to_db
+        mock_graphite_db.side_effect = MockGraphiteDB
+
+        self.data_handler.data_event(self)
+        self.assertEqual(True, self.pushed)
+
+    @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+    @mock.patch("storperf.db.test_results_db.push_results_to_db")
+    @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+    def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+        self._terminated = False
+        mock_results_db.side_effect = self.push_results_to_db
+        mock_graphite_db.side_effect = MockGraphiteDB
+
+        self.data_handler.data_event(self)
+        self.assertEqual(False, self.pushed)