Fix multiple workload runs 19/27319/1
authormbeierl <mark.beierl@dell.com>
Fri, 20 Jan 2017 21:05:24 +0000 (16:05 -0500)
committermbeierl <mark.beierl@dell.com>
Fri, 20 Jan 2017 21:05:24 +0000 (16:05 -0500)
Change reporting so that multiple workloads in one job execution can
be reported instead of overwriting the previous value.

Change the daily job to use a single, multiple workload run.

Change-Id: I8e350350ae13d2272b584af7a60ad269de160587
JIRA: STORPERF-98
Signed-off-by: mbeierl <mark.beierl@dell.com>
ci/daily.sh
storperf/test_executor.py
storperf/utilities/data_handler.py
storperf/utilities/thread_gate.py
tests/utilities_tests/data_handler_test.py

index e3b64cc..1e77d67 100755 (executable)
@@ -96,32 +96,22 @@ do
     | awk '/Status/ {print $2}' | sed 's/"//g'`
 done
 
-for WORKLOAD in ws wr rs rr rw
+export QUEUE_DEPTH=1,2,8
+export BLOCK_SIZE=2048,8192,16384
+export WORKLOAD=ws,wr,rs,rr,rw
+export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"
+
+JOB=`$WORKSPACE/ci/start_job.sh \
+    | awk '/job_id/ {print $2}' | sed 's/"//g'`
+JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
+    | awk '/Status/ {print $2}' | sed 's/"//g'`
+while [ "$JOB_STATUS" != "Completed" ]
 do
-    for BLOCK_SIZE in 2048 8192 16384
-    do
-        for QUEUE_DEPTH in 1 2 8
-        do
-            export QUEUE_DEPTH
-            export BLOCK_SIZE
-            export WORKLOAD
-            export SCENARIO_NAME="${CINDER_BACKEND}_${WORKLOAD}"
-
-            JOB=`$WORKSPACE/ci/start_job.sh \
-                | awk '/job_id/ {print $2}' | sed 's/"//g'`
-            JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
-                | awk '/Status/ {print $2}' | sed 's/"//g'`
-            while [ "$JOB_STATUS" != "Completed" ]
-            do
-                sleep 60
-                JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
-                    | awk '/Status/ {print $2}' | sed 's/"//g'`
-            done
-        done
-    done
+    sleep 60
+    JOB_STATUS=`curl -s -X GET "http://127.0.0.1:5000/api/v1.0/jobs?id=$JOB&type=status" \
+        | awk '/Status/ {print $2}' | sed 's/"//g'`
 done
 
-
 echo "Deleting stack for cleanup"
 curl -X DELETE --header 'Accept: application/json' 'http://127.0.0.1:5000/api/v1.0/configurations'
 
index 9c9393f..cda6c78 100644 (file)
@@ -177,9 +177,9 @@ class TestExecutor(object):
     def terminate(self):
         self._terminated = True
         self.end_time = time.time()
-        return self._terminate_current_run()
+        return self.terminate_current_run()
 
-    def _terminate_current_run(self):
+    def terminate_current_run(self):
         self.logger.info("Terminating current run")
         terminated_hosts = []
         for workload in self._workload_executors:
@@ -243,7 +243,7 @@ class TestExecutor(object):
                     if self.deadline is not None \
                             and not workload_name.startswith("_"):
                         event = scheduler.enter(self.deadline * 60, 1,
-                                                self._terminate_current_run,
+                                                self.terminate_current_run,
                                                 ())
                         t = Thread(target=scheduler.run, args=())
                         t.start()
index ebc1bfd..0aae3b1 100644 (file)
@@ -17,6 +17,7 @@ from storperf.utilities import math as math
 from storperf.utilities import steady_state as SteadyState
 from time import sleep
 import time
+import json
 
 
 class DataHandler(object):
@@ -61,13 +62,21 @@ class DataHandler(object):
                     if not steady:
                         steady_state = False
 
-            executor.metadata['report_data'] = metrics
-            executor.metadata['steady_state'] = steady_state
+            workload = '.'.join(executor.current_workload.split('.')[1:6])
+
+            if 'report_data' not in executor.metadata:
+                executor.metadata['report_data'] = {}
+
+            if 'steady_state' not in executor.metadata:
+                executor.metadata['steady_state'] = {}
+
+            executor.metadata['report_data'][workload] = metrics
+            executor.metadata['steady_state'][workload] = steady_state
 
             workload_name = executor.current_workload.split('.')[1]
 
             if steady_state and not workload_name.startswith('_'):
-                executor.terminate()
+                executor.terminate_current_run()
 
     def _lookup_prior_data(self, executor, metric, io_type):
         workload = executor.current_workload
@@ -112,7 +121,7 @@ class DataHandler(object):
         duration = latest_timestamp - earliest_timestamp
         if (duration < 60 * self.samples):
             self.logger.debug("Only %s minutes of samples, ignoring" %
-                              (duration / 60,))
+                              ((duration / 60 + 1),))
             return False
 
         return SteadyState.steady_state(data_series)
@@ -160,6 +169,6 @@ class DataHandler(object):
                                                    scenario,
                                                    criteria,
                                                    build_tag,
-                                                   payload)
+                                                   json.dumps(payload))
             except:
                 self.logger.exception("Error pushing results into Database")
index 295b8be..38acbb1 100644 (file)
@@ -12,6 +12,7 @@ number of callers.
 """
 import logging
 import time
+from threading import Lock
 
 
 class FailureToReportException(Exception):
@@ -26,6 +27,7 @@ class ThreadGate(object):
         self._timeout = timeout
         self._registrants = {}
         self._creation_time = time.time()
+        self._lock = Lock()
 
     """
     Calling this method returns a true or false, indicating that enough
@@ -33,31 +35,33 @@ class ThreadGate(object):
     """
 
     def report(self, gate_id):
-        now = time.time()
-        self._registrants[gate_id] = now
-        ready = True
-        self.logger.debug("Gate report for %s", gate_id)
-
-        total_missing = self._gate_size - len(self._registrants)
-        if total_missing > 0:
-            self.logger.debug("Not all registrants have reported in")
-            time_since_creation = now - self._creation_time
-            if (time_since_creation > (self._timeout * 2)):
-                self.logger.error("%s registrant(s) have never reported in",
-                                  total_missing)
-                raise FailureToReportException
-            return False
-
-        for k, v in self._registrants.items():
-            time_since_last_report = now - v
-            if time_since_last_report > self._timeout:
-                self.logger.debug("Registrant %s last reported %s ago",
-                                  k, time_since_last_report)
-                ready = False
-
-        self.logger.debug("Gate pass? %s", ready)
-
-        if ready:
-            self._registrants.clear()
-
-        return ready
+        with self._lock:
+            now = time.time()
+            self._registrants[gate_id] = now
+            ready = True
+            self.logger.debug("Gate report for %s", gate_id)
+
+            total_missing = self._gate_size - len(self._registrants)
+            if total_missing > 0:
+                self.logger.debug("Not all registrants have reported in")
+                time_since_creation = now - self._creation_time
+                if (time_since_creation > (self._timeout * 2)):
+                    self.logger.error(
+                        "%s registrant(s) have never reported in",
+                        total_missing)
+                    raise FailureToReportException
+                return False
+
+            for k, v in self._registrants.items():
+                time_since_last_report = now - v
+                if time_since_last_report > self._timeout:
+                    self.logger.debug("Registrant %s last reported %s ago",
+                                      k, time_since_last_report)
+                    ready = False
+
+            self.logger.debug("Gate pass? %s", ready)
+
+            if ready:
+                self._registrants.clear()
+
+            return ready
index b175c87..8115c6d 100644 (file)
@@ -57,6 +57,9 @@ class DataHandlerTest(unittest.TestCase):
     def terminate(self):
         self._terminated = True
 
+    def terminate_current_run(self):
+        self._terminated = True
+
     @mock.patch("time.time")
     @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
     @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
@@ -163,18 +166,22 @@ class DataHandlerTest(unittest.TestCase):
         self.assertEqual(False, self._terminated)
 
         self.assertEqual(expected_slope, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['slope'])
         self.assertEqual(expected_range, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['range'])
         self.assertEqual(expected_average, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['average'])
         self.assertEqual(series, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['series'])
@@ -211,18 +218,22 @@ class DataHandlerTest(unittest.TestCase):
         self.data_handler.data_event(self)
 
         self.assertEqual(expected_slope, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['slope'])
         self.assertEqual(expected_range, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['range'])
         self.assertEqual(expected_average, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['average'])
         self.assertEqual(series, self.metadata['report_data']
+                         ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']
                          ['series'])