Steady State Metrics 97/28997/1
authormbeierl <mark.beierl@dell.com>
Fri, 17 Feb 2017 22:43:39 +0000 (17:43 -0500)
committermbeierl <mark.beierl@dell.com>
Fri, 17 Feb 2017 22:43:54 +0000 (17:43 -0500)
Changes the overall value of all metrics to be based on the
calculated steady state values instead of the average for the
entire run.

Change-Id: I121929d5fe2dd43df7f289b82e9f5291c9ea9aab
JIRA: STORPERF-107
Signed-off-by: mbeierl <mark.beierl@dell.com>
ci/start_job.sh
rest_server.py
storperf/db/graphite_db.py
storperf/db/job_db.py
storperf/storperf_master.py
storperf/test_executor.py
storperf/utilities/data_handler.py
tests/utilities_tests/data_handler_test.py

index 1a71735..b40abc9 100755 (executable)
@@ -13,7 +13,7 @@ cat << EOF > body.json
    "block_sizes": "${BLOCK_SIZE}",
    "nowarm": "string",
    "nossd": "string",
-   "deadline": 1200,
+   "deadline": 20,
    "queue_depths": "${QUEUE_DEPTH}",
    "workload": "${WORKLOAD}",
     "metadata": {
index 5c84fdb..20c1783 100644 (file)
@@ -7,20 +7,19 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import io
 import json
-import logging
 import logging.config
 import os
-from storperf.db.job_db import JobDB
-from storperf.plot.barchart import Barchart
-from storperf.storperf_master import StorPerfMaster
 import sys
 
 from flask import abort, Flask, request, jsonify, send_from_directory
 from flask_restful import Resource, Api, fields
 from flask_restful_swagger import swagger
 
+from storperf.db.job_db import JobDB
+from storperf.plot.barchart import Barchart
+from storperf.storperf_master import StorPerfMaster
+
 
 app = Flask(__name__, static_url_path="")
 api = swagger.docs(Api(app), apiVersion='1.0')
@@ -37,7 +36,6 @@ def send_swagger(path):
 def results_page(job_id):
 
     job_db = JobDB()
-    params = {}
 
     params = job_db.fetch_workload_params(job_id)
 
index 8be91c8..c8a2d35 100644 (file)
@@ -1,7 +1,14 @@
-import calendar
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
 import json
 import logging
-import time
 
 import requests
 
@@ -37,106 +44,6 @@ class GraphiteDB(object):
 
         return series
 
-    def fetch_averages(self, workload):
-        workload_executions = self._job_db.fetch_workloads(workload)
-
-        # Create a map of job runs
-        workload_names = {}
-        for workload_execution in workload_executions:
-            name = '.'.join(workload_execution[0].split('.')[0:6])
-            if name in workload_names:
-                workload_record = workload_names[name]
-                start = workload_record[0]
-                end = workload_record[1]
-            else:
-                start = None
-                end = None
-
-            if start is None or workload_execution[1] < start:
-                start = workload_execution[1]
-
-            if end is None or workload_execution[2] > end:
-                end = workload_execution[2]
-
-            workload_names[name] = [start, end]
-
-        averages = {}
-
-        for io_type in ['read', 'write']:
-            for workload_name, times in workload_names.iteritems():
-                workload_pattern = self.make_fullname_pattern(workload_name)
-                short_name = '.'.join(workload_name.split('.')[1:6])
-                start = times[0]
-                end = times[1]
-
-                if end is None:
-                    end = str(calendar.timegm(time.gmtime()))
-                averages[short_name + ".duration"] = \
-                    (int(end) - int(start))
-
-                key = short_name + "." + io_type
-
-                request = ("http://127.0.0.1:8000/render/?target="
-                           "averageSeries(%s.jobs.1.%s.lat.mean)"
-                           "&format=json"
-                           "&from=%s"
-                           "&until=%s" %
-                           (workload_pattern, io_type, start, end))
-                self.logger.debug("Calling %s" % (request))
-
-                response = requests.get(request)
-                if (response.status_code == 200):
-                    averages[key + ".latency"] = \
-                        self._average_results(json.loads(response.content))
-
-                request = ("http://127.0.0.1:8000/render/?target="
-                           "averageSeries(%s.jobs.1.%s.bw)"
-                           "&format=json"
-                           "&from=%s"
-                           "&until=%s" %
-                           (workload_pattern, io_type, start, end))
-                self.logger.debug("Calling %s" % (request))
-
-                response = requests.get(request)
-                if (response.status_code == 200):
-                    averages[key + ".throughput"] = \
-                        self._average_results(json.loads(response.content))
-
-                request = ("http://127.0.0.1:8000/render/?target="
-                           "averageSeries(%s.jobs.1.%s.iops)"
-                           "&format=json"
-                           "&from=%s"
-                           "&until=%s" %
-                           (workload_pattern, io_type, start, end))
-                self.logger.debug("Calling %s" % (request))
-
-                response = requests.get(request)
-                if (response.status_code == 200):
-                    averages[key + ".iops"] = \
-                        self._average_results(json.loads(response.content))
-
-        return averages
-
-    def _average_results(self, results):
-
-        for item in results:
-            datapoints = item['datapoints']
-
-            total = 0
-            count = 0
-
-            for datapoint in datapoints:
-                if datapoint[0] is not None:
-                    total += datapoint[0]
-                    count += 1
-
-            if count > 0:
-                average = total / count
-            else:
-                average = total
-
-        return average
-
     def _series_results(self, results):
 
         series = []
index 05160ec..eabcb54 100644 (file)
@@ -58,9 +58,9 @@ class JobDB(object):
                 cursor.execute('''CREATE TABLE job_summary
                 (job_id text,
                 summary text)''')
-                self.logger.debug("Created job table")
+                self.logger.debug("Created job summary table")
             except OperationalError:
-                self.logger.debug("Job table exists")
+                self.logger.debug("Job summary table exists")
 
             cursor.execute('SELECT * FROM jobs')
             cursor.execute('SELECT * FROM job_params')
@@ -250,6 +250,5 @@ class JobDB(object):
                 if (row is None):
                     break
                 params[row[0]] = row[1]
-
             db.close()
         return params
index 7bbadae..6d9625c 100644 (file)
@@ -8,6 +8,7 @@
 ##############################################################################
 
 from datetime import datetime
+import json
 import logging
 import os
 import socket
@@ -22,7 +23,6 @@ from scp import SCPClient
 
 import heatclient.client as heatclient
 from storperf.db.configuration_db import ConfigurationDB
-from storperf.db.graphite_db import GraphiteDB
 from storperf.db.job_db import JobDB
 from storperf.test_executor import TestExecutor
 
@@ -323,8 +323,14 @@ class StorPerfMaster(object):
         return self._test_executor.terminate()
 
     def fetch_results(self, job_id):
-        graphite_db = GraphiteDB()
-        return graphite_db.fetch_averages(job_id)
+        if self._test_executor.job_db.job_id == job_id:
+            return self._test_executor.metadata['metrics']
+
+        workload_params = self.job_db.fetch_workload_params(job_id)
+        if 'report' in workload_params:
+            report = json.loads(workload_params['report'])
+            return report['metrics']
+        return {}
 
     def fetch_metadata(self, job_id):
         return self.job_db.fetch_workload_params(job_id)
index d46e0c7..6b6316c 100644 (file)
@@ -9,6 +9,7 @@
 
 import copy
 import imp
+import json
 import logging
 from os import listdir
 import os
@@ -170,6 +171,7 @@ class TestExecutor(object):
         self.job_db.create_job_id()
         self.job_db.record_workload_params(metadata)
         self.metadata = metadata
+        self.metadata['metrics'] = {}
         self._workload_thread = Thread(target=self.execute_workloads,
                                        args=(),
                                        name="Workload thread")
@@ -310,8 +312,11 @@ class TestExecutor(object):
 
         self.end_time = time.time()
         self._terminated = True
+        report = {'report': json.dumps(self.metadata)}
+        self.job_db.record_workload_params(report)
         self.broadcast_event()
         self.unregister(data_handler.data_event)
+        self.job_db.job_id = None
 
     def execute_on_node(self, workload):
 
index a4c9ae4..d95d6fa 100644 (file)
@@ -37,6 +37,10 @@ class DataHandler(object):
         if executor.terminated:
             self._push_to_db(executor)
         else:
+            workload = '.'.join(executor.current_workload.split('.')[1:6])
+            if 'metrics' not in executor.metadata:
+                executor.metadata['metrics'] = {}
+
             steady_state = True
             metrics = {}
             for metric in ('lat.mean', 'iops', 'bw'):
@@ -60,14 +64,15 @@ class DataHandler(object):
                         math.slope(treated_data['slope_data'])
                     metrics[metric][io_type]['range'] = \
                         math.range_value(treated_data['range_data'])
-                    metrics[metric][io_type]['average'] = \
-                        math.average(treated_data['average_data'])
+                    average = math.average(treated_data['average_data'])
+                    metrics[metric][io_type]['average'] = average
+
+                    metrics_key = '%s.%s.%s' % (workload, io_type, metric)
+                    executor.metadata['metrics'][metrics_key] = average
 
                     if not steady:
                         steady_state = False
 
-            workload = '.'.join(executor.current_workload.split('.')[1:6])
-
             if 'report_data' not in executor.metadata:
                 executor.metadata['report_data'] = {}
 
@@ -168,9 +173,7 @@ class DataHandler(object):
 
         payload['timestart'] = executor.start_time
         payload['duration'] = duration
-        graphite_db = GraphiteDB()
-        payload['metrics'] = graphite_db.fetch_averages(
-            executor.job_db.job_id)
+
         if steady_state:
             criteria = 'PASS'
         else:
index 333bed0..4630d54 100644 (file)
@@ -18,13 +18,8 @@ from storperf.utilities.data_handler import DataHandler
 class MockGraphiteDB(object):
 
     def __init__(self):
-        self.called = False
         self.series = []
 
-    def fetch_averages(self, job_id):
-        self.called = True
-        return None
-
     def fetch_series(self, job_id, timeframe):
         return self.series