from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
from storperf.utilities import math as math
def __init__(self):
self.logger = logging.getLogger(__name__)
self.samples = 10
+ self.job_db = JobDB()
"""
"""
metrics[metric][io_type] = {}
series = self._lookup_prior_data(executor, metric, io_type)
+ series = self._convert_timestamps_to_samples(
+ executor, series)
steady = self._evaluate_prior_data(series)
self.logger.debug("Steady state for %s %s: %s"
return data_series
+ def _convert_timestamps_to_samples(self, executor, series):
+ workload_record = self.job_db.fetch_workloads(
+ executor.current_workload)
+ start_time = int(workload_record[0][1])
+
+ normalized_series = []
+
+ for item in series:
+ elapsed = (item[0] - start_time)
+ sample_number = (elapsed / 60) + 1
+ normalized_series.append([sample_number, item[1]])
+
+ return normalized_series
+
def _evaluate_prior_data(self, data_series):
self.logger.debug("Data series: %s" % data_series)
- if len(data_series) == 0:
- return False
number_of_samples = len(data_series)
+
+ if number_of_samples == 0:
+ return False
if (number_of_samples < self.samples):
self.logger.debug("Only %s samples, ignoring" % number_of_samples)
return False
return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
-
pod_name = dictionary.get_key_from_dict(executor.metadata,
'pod_name',
'Unknown')
@mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
- def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
- mock_time):
+ @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
+ def test_non_terminated_report(self, mock_job_db, mock_graphite_db,
+ mock_results_db, mock_time):
self._terminated = False
mock_results_db.side_effect = self.push_results_to_db
series = \
[1480456050, 217.75]]
mock_graphite_db.return_value = series
mock_time.return_value = series[-1][0] + 10
- expected_slope = 0.1185333530108134
+ expected_slope = 11.48297119140625
expected_range = 17.78
expected_average = 212.49777777777774
8,
8192))
+ mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
+
self.data_handler.data_event(self)
self.assertEqual(False, self.pushed)
self.assertEqual(False, self._terminated)
['lat.mean']
['read']
['average'])
- self.assertEqual(series, self.metadata['report_data']
- ['rw.queue-depth.8.block-size.8192']
- ['lat.mean']
- ['read']
- ['series'])
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
@mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
def test_report_that_causes_termination(self,
+ mock_job_db,
mock_graphite_db,
mock_results_db,
mock_time):
[4804560300, 219.37],
[4804560400, 219.28],
[4804560500, 217.75]]
+ report_data = [[2, 205.345],
+ [4, 201.59],
+ [6, 205.76],
+ [7, 205.76],
+ [9, 205.76],
+ [11, 205.76],
+ [12, 205.76],
+ [22, 219.37],
+ [24, 219.28],
+ [26, 217.75]]
mock_graphite_db.return_value = series
mock_time.return_value = 4804560500 + 10
- expected_slope = 0.01266822319352225
+ expected_slope = 0.7318639667704995
expected_range = 17.78
expected_average = 209.2135
8,
8192))
+ mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
+
self.data_handler.data_event(self)
self.assertEqual(expected_slope, self.metadata['report_data']
['lat.mean']
['read']
['average'])
- self.assertEqual(series, self.metadata['report_data']
+ self.assertEqual(report_data, self.metadata['report_data']
['rw.queue-depth.8.block-size.8192']
['lat.mean']
['read']