Normalize data series 57/28757/1
authormbeierl <mark.beierl@dell.com>
Wed, 15 Feb 2017 21:38:29 +0000 (16:38 -0500)
committermbeierl <mark.beierl@dell.com>
Wed, 15 Feb 2017 21:38:29 +0000 (16:38 -0500)
Changes the data series from using seconds to samples
so that slope is not artificially flattened.

Change-Id: Idf87926a47c2ba67e66e2254d3572adad7a81b44
JIRA: STORPERF-106
Signed-off-by: mbeierl <mark.beierl@dell.com>
storperf/utilities/data_handler.py
tests/utilities_tests/data_handler_test.py

index b62d37b..e38f502 100644 (file)
@@ -14,6 +14,7 @@ import time
 
 from storperf.db import test_results_db
 from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
 from storperf.utilities import data_treatment as DataTreatment
 from storperf.utilities import dictionary
 from storperf.utilities import math as math
@@ -25,6 +26,7 @@ class DataHandler(object):
     def __init__(self):
         self.logger = logging.getLogger(__name__)
         self.samples = 10
+        self.job_db = JobDB()
 
     """
     """
@@ -43,6 +45,8 @@ class DataHandler(object):
                     metrics[metric][io_type] = {}
 
                     series = self._lookup_prior_data(executor, metric, io_type)
+                    series = self._convert_timestamps_to_samples(
+                        executor, series)
                     steady = self._evaluate_prior_data(series)
 
                     self.logger.debug("Steady state for %s %s: %s"
@@ -112,11 +116,26 @@ class DataHandler(object):
 
         return data_series
 
+    def _convert_timestamps_to_samples(self, executor, series):
+        workload_record = self.job_db.fetch_workloads(
+            executor.current_workload)
+        start_time = int(workload_record[0][1])
+
+        normalized_series = []
+
+        for item in series:
+            elapsed = (item[0] - start_time)
+            sample_number = (elapsed / 60) + 1
+            normalized_series.append([sample_number, item[1]])
+
+        return normalized_series
+
     def _evaluate_prior_data(self, data_series):
         self.logger.debug("Data series: %s" % data_series)
-        if len(data_series) == 0:
-            return False
         number_of_samples = len(data_series)
+
+        if number_of_samples == 0:
+            return False
         if (number_of_samples < self.samples):
             self.logger.debug("Only %s samples, ignoring" % number_of_samples)
             return False
@@ -124,7 +143,6 @@ class DataHandler(object):
         return SteadyState.steady_state(data_series)
 
     def _push_to_db(self, executor):
-
         pod_name = dictionary.get_key_from_dict(executor.metadata,
                                                 'pod_name',
                                                 'Unknown')
index 90df0f6..93b0b97 100644 (file)
@@ -150,8 +150,9 @@ class DataHandlerTest(unittest.TestCase):
     @mock.patch("time.time")
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
     @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
-    def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
-                                   mock_time):
+    @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
+    def test_non_terminated_report(self, mock_job_db, mock_graphite_db,
+                                   mock_results_db, mock_time):
         self._terminated = False
         mock_results_db.side_effect = self.push_results_to_db
         series = \
@@ -166,7 +167,7 @@ class DataHandlerTest(unittest.TestCase):
              [1480456050, 217.75]]
         mock_graphite_db.return_value = series
         mock_time.return_value = series[-1][0] + 10
-        expected_slope = 0.1185333530108134
+        expected_slope = 11.48297119140625
         expected_range = 17.78
         expected_average = 212.49777777777774
 
@@ -176,6 +177,8 @@ class DataHandlerTest(unittest.TestCase):
                                   8,
                                   8192))
 
+        mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
+
         self.data_handler.data_event(self)
         self.assertEqual(False, self.pushed)
         self.assertEqual(False, self._terminated)
@@ -195,17 +198,14 @@ class DataHandlerTest(unittest.TestCase):
                          ['lat.mean']
                          ['read']
                          ['average'])
-        self.assertEqual(series, self.metadata['report_data']
-                         ['rw.queue-depth.8.block-size.8192']
-                         ['lat.mean']
-                         ['read']
-                         ['series'])
 
     @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
     @mock.patch("time.time")
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
     @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+    @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
     def test_report_that_causes_termination(self,
+                                            mock_job_db,
                                             mock_graphite_db,
                                             mock_results_db,
                                             mock_time):
@@ -221,10 +221,20 @@ class DataHandlerTest(unittest.TestCase):
                   [4804560300, 219.37],
                   [4804560400, 219.28],
                   [4804560500, 217.75]]
+        report_data = [[2, 205.345],
+                       [4, 201.59],
+                       [6, 205.76],
+                       [7, 205.76],
+                       [9, 205.76],
+                       [11, 205.76],
+                       [12, 205.76],
+                       [22, 219.37],
+                       [24, 219.28],
+                       [26, 217.75]]
         mock_graphite_db.return_value = series
         mock_time.return_value = 4804560500 + 10
 
-        expected_slope = 0.01266822319352225
+        expected_slope = 0.7318639667704995
         expected_range = 17.78
         expected_average = 209.2135
 
@@ -234,6 +244,8 @@ class DataHandlerTest(unittest.TestCase):
                                   8,
                                   8192))
 
+        mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
+
         self.data_handler.data_event(self)
 
         self.assertEqual(expected_slope, self.metadata['report_data']
@@ -251,7 +263,7 @@ class DataHandlerTest(unittest.TestCase):
                          ['lat.mean']
                          ['read']
                          ['average'])
-        self.assertEqual(series, self.metadata['report_data']
+        self.assertEqual(report_data, self.metadata['report_data']
                          ['rw.queue-depth.8.block-size.8192']
                          ['lat.mean']
                          ['read']