Steady state detection 45/25445/3
authorMark Beierl <mark.beierl@dell.com>
Sat, 3 Dec 2016 03:25:48 +0000 (22:25 -0500)
committermbeierl <mark.beierl@dell.com>
Mon, 5 Dec 2016 18:11:08 +0000 (13:11 -0500)
Detection of steady state after 10+ samples of data

Change-Id: I29368b06188c6370d17b3d567fece6486d171235
JIRA: STORPERF-72 STORPERF-73
Signed-off-by: Mark Beierl <mark.beierl@dell.com>
17 files changed:
rest_server.py
storperf/carbon/emitter.py
storperf/db/graphite_db.py
storperf/db/test_results_db.py
storperf/fio/fio_invoker.py
storperf/logging.json
storperf/storperf_master.py
storperf/test_executor.py
storperf/utilities/data_handler.py
storperf/utilities/math.py
storperf/utilities/steady_state.py
storperf/utilities/thread_gate.py
storperf/workloads/_base_workload.py
tests/carbon_tests/emitter_test.py
tests/db_tests/graphite_db_test.py
tests/utilities_tests/data_handler_test.py
tests/utilities_tests/math_slope_test.py

index d852bbb..40b9b77 100644 (file)
@@ -12,15 +12,15 @@ import json
 import logging
 import logging.config
 import os
+from storperf.db.job_db import JobDB
+from storperf.plot.barchart import Barchart
+from storperf.storperf_master import StorPerfMaster
+import sys
 
 from flask import abort, Flask, request, jsonify, send_from_directory
 from flask_restful import Resource, Api, fields
 from flask_restful_swagger import swagger
 
-from storperf.db.job_db import JobDB
-from storperf.plot.barchart import Barchart
-from storperf.storperf_master import StorPerfMaster
-
 
 app = Flask(__name__, static_url_path="")
 api = swagger.docs(Api(app), apiVersion='1.0')
@@ -372,6 +372,8 @@ prior to running any further tests,
         ]
     )
     def delete(self):
+        self.logger.info("Threads: %s" % sys._current_frames())
+        print sys._current_frames()
         try:
             return jsonify({'Slaves': storperf.terminate_workloads()})
         except Exception as e:
index c9af8a6..e23dc79 100644 (file)
@@ -22,9 +22,8 @@ class CarbonMetricTransmitter():
 
     def transmit_metrics(self, metrics):
         if 'timestamp' in metrics:
-            timestamp = metrics.pop('timestamp')
-        else:
-            timestamp = str(calendar.timegm(time.gmtime()))
+            metrics.pop('timestamp')
+        timestamp = str(calendar.timegm(time.gmtime()))
 
         carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         carbon_socket.connect((self.carbon_host, self.carbon_port))
index c44d2aa..8be91c8 100644 (file)
@@ -1,4 +1,3 @@
-from storperf.db.job_db import JobDB
 import calendar
 import json
 import logging
@@ -6,6 +5,8 @@ import time
 
 import requests
 
+from storperf.db.job_db import JobDB
+
 
 class GraphiteDB(object):
 
@@ -15,6 +16,27 @@ class GraphiteDB(object):
         self._job_db = JobDB()
         self.logger = logging.getLogger(__name__)
 
+    def fetch_series(self, workload, metric, io_type, time, duration):
+
+        series = []
+        end = time
+        start = end - duration
+
+        request = ("http://127.0.0.1:8000/render/?target="
+                   "averageSeries(%s.*.jobs.1.%s.%s)"
+                   "&format=json"
+                   "&from=%s"
+                   "&until=%s" %
+                   (workload, io_type, metric,
+                    start, end))
+        self.logger.debug("Calling %s" % (request))
+
+        response = requests.get(request)
+        if (response.status_code == 200):
+            series = self._series_results(json.loads(response.content))
+
+        return series
+
     def fetch_averages(self, workload):
         workload_executions = self._job_db.fetch_workloads(workload)
 
@@ -115,6 +137,18 @@ class GraphiteDB(object):
 
         return average
 
+    def _series_results(self, results):
+
+        series = []
+
+        for item in results:
+            datapoints = item['datapoints']
+            for datapoint in datapoints:
+                if datapoint[0] is not None:
+                    series.append([datapoint[1], datapoint[0]])
+
+        return series
+
     def make_fullname_pattern(self, workload):
         parts = workload.split('.')
         wildcards_needed = 7 - len(parts)
index 4ee7a52..049b0b8 100644 (file)
@@ -44,7 +44,8 @@ def push_results_to_db(db_url, project, case_name,
     headers = {'Content-Type': 'application/json'}
     try:
         if logger:
-            logger.debug("Pushing results to %s" % (url))
+            logger.info("Pushing results to %s" % (url))
+            logger.debug("Parameters: %s" % params)
         r = requests.post(url, data=json.dumps(params), headers=headers)
         if logger:
             logger.debug(r)
index 315b243..2febf25 100644 (file)
@@ -54,12 +54,14 @@ class FIOInvoker(object):
 
                         for event_listener in self.event_listeners:
                             try:
+                                self.logger.debug(
+                                    "Event listener callback")
                                 event_listener(self.callback_id, json_metric)
                             except Exception, e:
                                 self.logger.exception(
                                     "Notifying listener %s: %s",
                                     self.callback_id, e)
-                            self.logger.info(
+                            self.logger.debug(
                                 "Event listener callback complete")
                 except Exception, e:
                     self.logger.error("Error parsing JSON: %s", e)
@@ -78,7 +80,6 @@ class FIOInvoker(object):
         self.logger.debug("Finished")
 
     def execute(self, args=[]):
-
         ssh = paramiko.SSHClient()
         ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
         ssh.connect(self.remote_host, username='storperf',
@@ -87,7 +88,12 @@ class FIOInvoker(object):
 
         command = "sudo ./fio " + ' '.join(args)
         self.logger.debug("Remote command: %s" % command)
-        (_, stdout, stderr) = ssh.exec_command(command)
+
+        chan = ssh.get_transport().open_session(timeout=None)
+        chan.settimeout(None)
+        chan.exec_command(command)
+        stdout = chan.makefile('r', -1)
+        stderr = chan.makefile_stderr('r', -1)
 
         tout = Thread(target=self.stdout_handler, args=(stdout,),
                       name="%s stdout" % self._remote_host)
@@ -100,9 +106,18 @@ class FIOInvoker(object):
         terr.start()
 
         self.logger.info("Started fio on " + self.remote_host)
+        exit_status = chan.recv_exit_status()
+        self.logger.info("Finished fio on %s with exit code %s" %
+                         (self.remote_host, exit_status))
+
+        stdout.close()
+        stderr.close()
+
+        self.logger.debug("Joining stderr handler")
         terr.join()
+        self.logger.debug("Joining stdout handler")
         tout.join()
-        self.logger.info("Finished fio on " + self.remote_host)
+        self.logger.debug("Ended")
 
     def terminate(self):
         self.logger.debug("Terminating fio on " + self.remote_host)
index 6d6026e..74df494 100644 (file)
     },
 
     "loggers": {
-        "my_module": {
-            "level": "ERROR",
-            "handlers": ["console"],
-            "propagate": "no"
+        "": {
+            "level": "INFO",
+            "handlers": ["console", "file_handler", "error_file_handler"]
+        },
+        "storperf": {
+            "level": "DEBUG"
+        },
+        "storperf.carbon.emitter": {
+            "level": "INFO"
         }
-    },
-
-    "root": {
-        "level": "WARN",
-        "handlers": ["console", "file_handler", "error_file_handler"]
-    },
-
-    "storperf": {
-        "level": "DEBUG",
-        "handlers": ["console", "file_handler", "error_file_handler"]
-    },
-
-    "storperf.carbon.emitter": {
-        "level": "INFO",
-        "handlers": ["console", "file_handler", "error_file_handler"]
     }
-
 }
\ No newline at end of file
index 35cba72..2975afd 100644 (file)
@@ -378,9 +378,9 @@ class StorPerfMaster(object):
         (_, stdout, stderr) = ssh.exec_command(cmd)
 
         for line in stdout.readlines():
-            logger.debug(line.decode('utf-8').strip())
+            logger.debug(line.strip())
         for line in stderr.readlines():
-            logger.error(line.decode('utf-8').strip())
+            logger.error(line.strip())
 
     def _make_parameters(self):
         heat_parameters = {}
index 530ce80..8350e43 100644 (file)
@@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter
 from storperf.carbon.emitter import CarbonMetricTransmitter
 from storperf.db.job_db import JobDB
 from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
 from storperf.utilities.thread_gate import ThreadGate
-from utilities.data_handler import DataHandler
 
 
 class UnknownWorkload(Exception):
@@ -41,6 +41,7 @@ class TestExecutor(object):
         self.metadata = {}
         self.start_time = None
         self.end_time = None
+        self.current_workload = None
         self._queue_depths = [1, 4, 8]
         self._block_sizes = [512, 4096, 16384]
         self.event_listeners = set()
@@ -106,7 +107,7 @@ class TestExecutor(object):
             try:
                 event_listener(self)
             except Exception, e:
-                self.logger.error("Notifying listener: %s", e)
+                self.logger.exception("While notifying listener %s", e)
 
     def register_workloads(self, workloads):
         self.workload_modules = []
@@ -166,15 +167,17 @@ class TestExecutor(object):
         self.job_db.record_workload_params(metadata)
         self.metadata = metadata
         self._workload_thread = Thread(target=self.execute_workloads,
-                                       args=())
+                                       args=(),
+                                       name="Workload thread")
         self._workload_thread.start()
         return self.job_db.job_id
 
     def terminate(self):
         self._terminated = True
-        return self.terminate_current_run()
+        self.end_time = time.time()
+        return self._terminate_current_run()
 
-    def terminate_current_run(self):
+    def _terminate_current_run(self):
         self.logger.info("Terminating current run")
         terminated_hosts = []
         for workload in self._workload_executors:
@@ -222,14 +225,23 @@ class TestExecutor(object):
             for blocksize in blocksizes:
                 for iodepth in iodepths:
 
-                    scheduler = sched.scheduler(time.time, time.sleep)
                     if self._terminated:
                         return
+                    self.current_workload = (
+                        "%s.%s.queue-depth.%s.block-size.%s" %
+                        (self.job_db.job_id,
+                         workload_name,
+                         iodepth,
+                         blocksize))
 
+                    self.logger.info("Starting run %s" % self.current_workload)
+
+                    scheduler = sched.scheduler(time.time, time.sleep)
                     if self.deadline is not None \
                             and not workload_name.startswith("_"):
                         event = scheduler.enter(self.deadline * 60, 1,
-                                                self.terminate_current_run, ())
+                                                self._terminate_current_run,
+                                                ())
                         t = Thread(target=scheduler.run, args=())
                         t.start()
 
@@ -244,13 +256,16 @@ class TestExecutor(object):
                         self._workload_executors.append(slave_workload)
 
                         t = Thread(target=self.execute_on_node,
-                                   args=(slave_workload,))
+                                   args=(slave_workload,),
+                                   name="%s worker" % slave)
                         t.daemon = False
                         t.start()
                         slave_threads.append(t)
 
                     for slave_thread in slave_threads:
+                        self.logger.debug("Waiting on %s" % slave_thread)
                         slave_thread.join()
+                        self.logger.debug("Done waiting for %s" % slave_thread)
 
                     if not scheduler.empty():
                         try:
@@ -258,7 +273,10 @@ class TestExecutor(object):
                         except:
                             pass
 
+                    self.logger.info("Completed run %s" %
+                                     self.current_workload)
                     self._workload_executors = []
+                    self.current_workload = None
 
             self.logger.info("Completed workload %s" % (workload_name))
         self.logger.info("Completed job %s" % (self.job_db.job_id))
index 03c764c..ebf715a 100644 (file)
 
 import logging
 import os
+from time import sleep
+import time
 
 from storperf.db import test_results_db
 from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
 from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
 
 
 class DataHandler(object):
 
     def __init__(self):
         self.logger = logging.getLogger(__name__)
+        self.samples = 11
 
     """
     """
 
     def data_event(self, executor):
-        self.logger.info("Event received")
-
-        # Data lookup
+        self.logger.debug("Event received")
 
         if executor.terminated:
             self._push_to_db(executor)
+        else:
+            steady_state = True
+            metrics = {}
+            for metric in ('lat.mean', 'iops', 'bw'):
+                metrics[metric] = {}
+                for io_type in ('read', 'write'):
+                    metrics[metric][io_type] = {}
+
+                    series = self._lookup_prior_data(executor, metric, io_type)
+                    steady = self._evaluate_prior_data(series)
+
+                    self.logger.debug("Steady state for %s %s: %s"
+                                      % (io_type, metric, steady))
+
+                    metrics[metric][io_type]['series'] = series
+                    metrics[metric][io_type]['steady_state'] = steady
+                    treated_data = DataTreatment.data_treatment(series)
+
+                    metrics[metric][io_type]['slope'] = \
+                        math.slope(treated_data['slope_data'])
+                    metrics[metric][io_type]['range'] = \
+                        math.range_value(treated_data['range_data'])
+                    metrics[metric][io_type]['average'] = \
+                        math.average(treated_data['average_data'])
+
+                    if not steady:
+                        steady_state = False
+
+            executor.metadata['report_data'] = metrics
+            executor.metadata['steady_state'] = steady_state
+
+            if steady_state:
+                executor.terminate()
+
+    def _lookup_prior_data(self, executor, metric, io_type):
+        workload = executor.current_workload
+        graphite_db = GraphiteDB()
+
+        # A bit of a hack here as Carbon might not be finished storing the
+        # data we just sent to it
+        now = int(time.time())
+        backtime = 60 * (self.samples + 2)
+        data_series = graphite_db.fetch_series(workload,
+                                               metric,
+                                               io_type,
+                                               now,
+                                               backtime)
+        most_recent_time = now
+        if len(data_series) > 0:
+            most_recent_time = data_series[-1][0]
+
+        delta = now - most_recent_time
+        self.logger.debug("Last update to graphite was %s ago" % delta)
+
+        while (delta < 5 or (delta > 60 and delta < 120)):
+            sleep(5)
+            data_series = graphite_db.fetch_series(workload,
+                                                   metric,
+                                                   io_type,
+                                                   now,
+                                                   backtime)
+            if len(data_series) > 0:
+                most_recent_time = data_series[-1][0]
+            delta = time.time() - most_recent_time
+            self.logger.debug("Last update to graphite was %s ago" % delta)
+
+        return data_series
+
+    def _evaluate_prior_data(self, data_series):
+        self.logger.debug("Data series: %s" % data_series)
+        if len(data_series) == 0:
+            return False
+        earliest_timestamp = data_series[0][0]
+        latest_timestamp = data_series[-1][0]
+        duration = latest_timestamp - earliest_timestamp
+        if (duration < 60 * self.samples):
+            self.logger.debug("Only %s minutes of samples, ignoring" %
+                              (duration / 60,))
+            return False
+
+        return SteadyState.steady_state(data_series)
 
     def _push_to_db(self, executor):
         test_db = os.environ.get('TEST_DB_URL')
index a11ec19..4ddddca 100644 (file)
@@ -52,8 +52,11 @@ def slope(data_series):
             sum_yi_xi += xi * yi
             sum_yi += yi
 
-        beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \
-            (sum_xi**2 - m * sum_xi_sq)  # The slope
+        over = (sum_xi**2 - m * sum_xi_sq)
+        if over == 0:
+            beta2 = None  # Infinite slope
+        else:
+            beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over  # The slope
         # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
         # needed
 
index 233bc78..0bbe21e 100644 (file)
@@ -6,6 +6,8 @@
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
+import logging
+
 from storperf.utilities import data_treatment as DataTreatment
 from storperf.utilities import math as math
 
@@ -22,6 +24,8 @@ def steady_state(data_series):
     has been reached with the data that is passed to it.
     """
 
+    logger = logging.getLogger('storperf.utilities.steady_state')
+
     # Pre conditioning the data to match the algorithms
     treated_data = DataTreatment.data_treatment(data_series)
 
@@ -39,6 +43,11 @@ def steady_state(data_series):
 
         steady_state = range_condition and slope_condition
 
+        logger.debug("Range %s < %s?" % (abs(range_value),
+                                         (0.20 * abs(average_value))))
+        logger.debug("Slope %s < %s?" % (abs(slope_value),
+                                         (0.10 * abs(average_value))))
+        logger.debug("Steady State? %s" % steady_state)
     else:
         steady_state = False
 
index b0dde50..295b8be 100644 (file)
@@ -55,4 +55,9 @@ class ThreadGate(object):
                                   k, time_since_last_report)
                 ready = False
 
+        self.logger.debug("Gate pass? %s", ready)
+
+        if ready:
+            self._registrants.clear()
+
         return ready
index 874e99c..3896cc6 100644 (file)
@@ -23,7 +23,7 @@ class _base_workload(object):
             'bs': '64k',
             'iodepth': '1',
             'numjobs': '1',
-            'loops': '1',
+            'loops': '20',
             'output-format': 'json',
             'status-interval': '60'
         }
index fe19ed2..7f61049 100644 (file)
@@ -7,14 +7,16 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-from storperf.carbon import converter
-from storperf.carbon.emitter import CarbonMetricTransmitter
-from time import sleep
 import SocketServer
 import json
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
 import threading
+from time import sleep, strptime
 import unittest
 
+import mock
+
 
 class MetricsHandler(SocketServer.BaseRequestHandler):
 
@@ -42,10 +44,14 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
         t.setDaemon(True)
         t.start()
 
-    def test_transmit_metrics(self):
+    @mock.patch("time.gmtime")
+    def test_transmit_metrics(self, mock_time):
+
+        mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
 
         testconv = converter.Converter()
-        json_object = json.loads("""{"timestamp" : "12345", "key":"value" }""")
+        json_object = json.loads(
+            """{"timestamp" : "975542400", "key":"value" }""")
         result = testconv.convert_json_to_flat(json_object, "host.run-name")
 
         emitter = CarbonMetricTransmitter()
@@ -58,7 +64,7 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
             count += 1
             sleep(0.1)
 
-        self.assertEqual("host.run-name.key value 12345\n",
+        self.assertEqual("host.run-name.key value 975542400\n",
                          CarbonMetricTransmitterTest.response,
                          CarbonMetricTransmitterTest.response)
 
index e13545b..d4c6fb6 100644 (file)
@@ -7,9 +7,19 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-from storperf.db.graphite_db import GraphiteDB
 import unittest
 
+import mock
+
+from storperf.db.graphite_db import GraphiteDB
+
+
+class MockResponse():
+
+    def __init__(self):
+        self.content = ""
+        self.status_code = 200
+
 
 class GraphiteDBTest(unittest.TestCase):
 
@@ -32,6 +42,53 @@ class GraphiteDBTest(unittest.TestCase):
         # self.graphdb.fetch_averages(u'32d31724-fac1-44f3-9033-ca8e00066a36')
         pass
 
+    @mock.patch("requests.get")
+    def test_fetch_series(self, mock_requests):
+
+        response = MockResponse()
+        response.content = """
+[
+    {
+        "datapoints": [
+            [null,1480455880],
+            [null,1480455890],
+            [null,1480455900],
+            [205.345,1480455910],
+            [201.59,1480455920],
+            [205.76,1480455930],
+            [null,1480455940],
+            [null,1480455950],
+            [null,1480455960],
+            [215.655,1480455970],
+            [214.16,1480455980],
+            [213.57,1480455990],
+            [null,1480456000],
+            [null,1480456010],
+            [null,1480456020],
+            [219.37,1480456030],
+            [219.28,1480456040],
+            [217.75,1480456050],
+            [null,1480456060]
+        ],
+        "target":"averageSeries(.8192.*.jobs.1.write.iops)"
+    }
+]"""
+        expected = [[1480455910, 205.345],
+                    [1480455920, 201.59],
+                    [1480455930, 205.76],
+                    [1480455970, 215.655],
+                    [1480455980, 214.16],
+                    [1480455990, 213.57],
+                    [1480456030, 219.37],
+                    [1480456040, 219.28],
+                    [1480456050, 217.75]]
+
+        mock_requests.side_effect = (response, )
+
+        actual = self.graphdb.fetch_series("workload", "iops",
+                                           "write", 0, 600)
+        self.assertEqual(expected, actual)
+
     def fetch_workloads(self, workload):
         workloads = [[u'32d31724-fac1-44f3-9033-ca8e00066a36.'
                       u'_warm_up.queue-depth.32.block-size.8192.10-9-15-151',
index 482b98e..b175c87 100644 (file)
@@ -8,22 +8,25 @@
 ##############################################################################
 
 import os
+from storperf.utilities.data_handler import DataHandler
 import unittest
 
 import mock
 
-from storperf.utilities.data_handler import DataHandler
-
 
 class MockGraphiteDB(object):
 
     def __init__(self):
         self.called = False
+        self.series = []
 
     def fetch_averages(self, job_id):
         self.called = True
         return None
 
+    def fetch_series(self, job_id, timeframe):
+        return self.series
+
 
 class DataHandlerTest(unittest.TestCase):
 
@@ -40,6 +43,7 @@ class DataHandlerTest(unittest.TestCase):
         mock.job_id = "1"
         self.job_db = mock
         self.pushed = False
+        self.current_workload = None
         pass
 
     @property
@@ -50,8 +54,68 @@ class DataHandlerTest(unittest.TestCase):
         self.pushed = True
         pass
 
-    def test_not_terminated_report(self):
-        self.data_handler.data_event(self)
+    def terminate(self):
+        self._terminated = True
+
+    @mock.patch("time.time")
+    @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+    def test_lookup_prior_data(self, mock_graphite_db, mock_time):
+        self._terminated = False
+        expected = [[1480455910, 205.345],
+                    [1480455920, 201.59],
+                    [1480455930, 205.76],
+                    [1480455970, 215.655],
+                    [1480455980, 214.16],
+                    [1480455990, 213.57],
+                    [1480456030, 219.37],
+                    [1480456040, 219.28],
+                    [1480456050, 217.75]]
+        mock_graphite_db.return_value = expected
+        mock_time.return_value = expected[-1][0] + 10
+
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+                                 ("job_id",
+                                  "rw",
+                                  8,
+                                  8192))
+
+        actual = self.data_handler._lookup_prior_data(self, 'read', 'iops')
+        self.assertEqual(expected, actual)
+
+    def test_short_sample(self):
+        series = [[1480455910, 205.345],
+                  [1480455920, 201.59],
+                  [1480455930, 205.76],
+                  [1480455970, 215.655],
+                  [1480455980, 214.16],
+                  [1480455990, 213.57],
+                  [1480456030, 219.37],
+                  [1480456040, 219.28],
+                  [1480456050, 217.75]]
+
+        actual = self.data_handler._evaluate_prior_data(series)
+        self.assertEqual(False, actual)
+
+    def test_long_not_steady_sample(self):
+        series = [[4804559100, 205345],
+                  [4804559200, 20159],
+                  [4804559300, 20576],
+                  [4804560300, 21937],
+                  [4804560400, 21928],
+                  [4804560500, 21775]]
+        actual = self.data_handler._evaluate_prior_data(series)
+        self.assertEqual(False, actual)
+
+    def test_long_steady_sample(self):
+        series = [[4804559100, 205.345],
+                  [4804559200, 201.59],
+                  [4804559300, 205.76],
+                  [4804560300, 219.37],
+                  [4804560400, 219.28],
+                  [4804560500, 217.75]]
+        actual = self.data_handler._evaluate_prior_data(series)
+        self.assertEqual(True, actual)
 
     @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
@@ -65,12 +129,104 @@ class DataHandlerTest(unittest.TestCase):
         self.assertEqual(True, self.pushed)
 
     @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+    @mock.patch("time.time")
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
-    @mock.patch("storperf.utilities.data_handler.GraphiteDB")
-    def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+    def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
+                                   mock_time):
         self._terminated = False
         mock_results_db.side_effect = self.push_results_to_db
-        mock_graphite_db.side_effect = MockGraphiteDB
+        series = \
+            [[1480455910, 205.345],
+             [1480455920, 201.59],
+             [1480455930, 205.76],
+             [1480455970, 215.655],
+             [1480455980, 214.16],
+             [1480455990, 213.57],
+             [1480456030, 219.37],
+             [1480456040, 219.28],
+             [1480456050, 217.75]]
+        mock_graphite_db.return_value = series
+        mock_time.return_value = series[-1][0] + 10
+        expected_slope = 0.1185333530108134
+        expected_range = 17.78
+        expected_average = 212.49777777777774
+
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+                                 ("job_id",
+                                  "rw",
+                                  8,
+                                  8192))
 
         self.data_handler.data_event(self)
         self.assertEqual(False, self.pushed)
+        self.assertEqual(False, self._terminated)
+
+        self.assertEqual(expected_slope, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['slope'])
+        self.assertEqual(expected_range, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['range'])
+        self.assertEqual(expected_average, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['average'])
+        self.assertEqual(series, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['series'])
+
+    @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+    @mock.patch("time.time")
+    @mock.patch("storperf.db.test_results_db.push_results_to_db")
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+    def test_report_that_causes_termination(self,
+                                            mock_graphite_db,
+                                            mock_results_db,
+                                            mock_time):
+        self._terminated = False
+        mock_results_db.side_effect = self.push_results_to_db
+        series = [[4804559100, 205.345],
+                  [4804559200, 201.59],
+                  [4804559300, 205.76],
+                  [4804560300, 219.37],
+                  [4804560400, 219.28],
+                  [4804560500, 217.75]]
+        mock_graphite_db.return_value = series
+        mock_time.return_value = 4804560500 + 10
+
+        expected_slope = 0.011830471529818998
+        expected_range = 17.78
+        expected_average = 211.51583333333335
+
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+                                 ("job_id",
+                                  "rw",
+                                  8,
+                                  8192))
+
+        self.data_handler.data_event(self)
+
+        self.assertEqual(expected_slope, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['slope'])
+        self.assertEqual(expected_range, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['range'])
+        self.assertEqual(expected_average, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['average'])
+        self.assertEqual(series, self.metadata['report_data']
+                         ['lat.mean']
+                         ['read']
+                         ['series'])
+        self.assertEqual(True, self._terminated)
+
+        self.assertEqual(False, self.pushed)
+        self.assertEqual(True, self._terminated)
index 6c05aa4..24d5cd7 100644 (file)
@@ -65,3 +65,8 @@ class MathSlopeTest(unittest.TestCase):
         expected = 1.5
         actual = Slope.slope([[0.0, 0], [1, 1], [2, 3]])
         self.assertEqual(expected, actual)
+
+    def test_infinte_slope(self):
+        expected = None
+        actual = Slope.slope([[1480623510, 1295.87], [1480623520, 1380.79]])
+        self.assertEqual(expected, actual)