import logging
import logging.config
import os
+from storperf.db.job_db import JobDB
+from storperf.plot.barchart import Barchart
+from storperf.storperf_master import StorPerfMaster
+import sys
from flask import abort, Flask, request, jsonify, send_from_directory
from flask_restful import Resource, Api, fields
from flask_restful_swagger import swagger
-from storperf.db.job_db import JobDB
-from storperf.plot.barchart import Barchart
-from storperf.storperf_master import StorPerfMaster
-
app = Flask(__name__, static_url_path="")
api = swagger.docs(Api(app), apiVersion='1.0')
]
)
def delete(self):
+ self.logger.info("Threads: %s" % sys._current_frames())
+ print sys._current_frames()
try:
return jsonify({'Slaves': storperf.terminate_workloads()})
except Exception as e:
def transmit_metrics(self, metrics):
if 'timestamp' in metrics:
- timestamp = metrics.pop('timestamp')
- else:
- timestamp = str(calendar.timegm(time.gmtime()))
+ metrics.pop('timestamp')
+ timestamp = str(calendar.timegm(time.gmtime()))
carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
carbon_socket.connect((self.carbon_host, self.carbon_port))
-from storperf.db.job_db import JobDB
import calendar
import json
import logging
import requests
+from storperf.db.job_db import JobDB
+
class GraphiteDB(object):
self._job_db = JobDB()
self.logger = logging.getLogger(__name__)
+ def fetch_series(self, workload, metric, io_type, time, duration):
+
+ series = []
+ end = time
+ start = end - duration
+
+ request = ("http://127.0.0.1:8000/render/?target="
+ "averageSeries(%s.*.jobs.1.%s.%s)"
+ "&format=json"
+ "&from=%s"
+ "&until=%s" %
+ (workload, io_type, metric,
+ start, end))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ series = self._series_results(json.loads(response.content))
+
+ return series
+
def fetch_averages(self, workload):
workload_executions = self._job_db.fetch_workloads(workload)
return average
+ def _series_results(self, results):
+
+ series = []
+
+ for item in results:
+ datapoints = item['datapoints']
+ for datapoint in datapoints:
+ if datapoint[0] is not None:
+ series.append([datapoint[1], datapoint[0]])
+
+ return series
+
def make_fullname_pattern(self, workload):
parts = workload.split('.')
wildcards_needed = 7 - len(parts)
headers = {'Content-Type': 'application/json'}
try:
if logger:
- logger.debug("Pushing results to %s" % (url))
+ logger.info("Pushing results to %s" % (url))
+ logger.debug("Parameters: %s" % params)
r = requests.post(url, data=json.dumps(params), headers=headers)
if logger:
logger.debug(r)
for event_listener in self.event_listeners:
try:
+ self.logger.debug(
+ "Event listener callback")
event_listener(self.callback_id, json_metric)
except Exception, e:
self.logger.exception(
"Notifying listener %s: %s",
self.callback_id, e)
- self.logger.info(
+ self.logger.debug(
"Event listener callback complete")
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
self.logger.debug("Finished")
def execute(self, args=[]):
-
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.remote_host, username='storperf',
command = "sudo ./fio " + ' '.join(args)
self.logger.debug("Remote command: %s" % command)
- (_, stdout, stderr) = ssh.exec_command(command)
+
+ chan = ssh.get_transport().open_session(timeout=None)
+ chan.settimeout(None)
+ chan.exec_command(command)
+ stdout = chan.makefile('r', -1)
+ stderr = chan.makefile_stderr('r', -1)
tout = Thread(target=self.stdout_handler, args=(stdout,),
name="%s stdout" % self._remote_host)
terr.start()
self.logger.info("Started fio on " + self.remote_host)
+ exit_status = chan.recv_exit_status()
+ self.logger.info("Finished fio on %s with exit code %s" %
+ (self.remote_host, exit_status))
+
+ stdout.close()
+ stderr.close()
+
+ self.logger.debug("Joining stderr handler")
terr.join()
+ self.logger.debug("Joining stdout handler")
tout.join()
- self.logger.info("Finished fio on " + self.remote_host)
+ self.logger.debug("Ended")
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)
},
"loggers": {
- "my_module": {
- "level": "ERROR",
- "handlers": ["console"],
- "propagate": "no"
+ "": {
+ "level": "INFO",
+ "handlers": ["console", "file_handler", "error_file_handler"]
+ },
+ "storperf": {
+ "level": "DEBUG"
+ },
+ "storperf.carbon.emitter": {
+ "level": "INFO"
}
- },
-
- "root": {
- "level": "WARN",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf": {
- "level": "DEBUG",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf.carbon.emitter": {
- "level": "INFO",
- "handlers": ["console", "file_handler", "error_file_handler"]
}
-
}
\ No newline at end of file
(_, stdout, stderr) = ssh.exec_command(cmd)
for line in stdout.readlines():
- logger.debug(line.decode('utf-8').strip())
+ logger.debug(line.strip())
for line in stderr.readlines():
- logger.error(line.decode('utf-8').strip())
+ logger.error(line.strip())
def _make_parameters(self):
heat_parameters = {}
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
self.metadata = {}
self.start_time = None
self.end_time = None
+ self.current_workload = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
try:
event_listener(self)
except Exception, e:
- self.logger.error("Notifying listener: %s", e)
+ self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
self.workload_modules = []
self.job_db.record_workload_params(metadata)
self.metadata = metadata
self._workload_thread = Thread(target=self.execute_workloads,
- args=())
+ args=(),
+ name="Workload thread")
self._workload_thread.start()
return self.job_db.job_id
def terminate(self):
self._terminated = True
- return self.terminate_current_run()
+ self.end_time = time.time()
+ return self._terminate_current_run()
- def terminate_current_run(self):
+ def _terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
for blocksize in blocksizes:
for iodepth in iodepths:
- scheduler = sched.scheduler(time.time, time.sleep)
if self._terminated:
return
+ self.current_workload = (
+ "%s.%s.queue-depth.%s.block-size.%s" %
+ (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
+ self.logger.info("Starting run %s" % self.current_workload)
+
+ scheduler = sched.scheduler(time.time, time.sleep)
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run, ())
+ self._terminate_current_run,
+ ())
t = Thread(target=scheduler.run, args=())
t.start()
self._workload_executors.append(slave_workload)
t = Thread(target=self.execute_on_node,
- args=(slave_workload,))
+ args=(slave_workload,),
+ name="%s worker" % slave)
t.daemon = False
t.start()
slave_threads.append(t)
for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
if not scheduler.empty():
try:
except:
pass
+ self.logger.info("Completed run %s" %
+ self.current_workload)
self._workload_executors = []
+ self.current_workload = None
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
import logging
import os
+from time import sleep
+import time
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
class DataHandler(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.samples = 11
"""
"""
def data_event(self, executor):
- self.logger.info("Event received")
-
- # Data lookup
+ self.logger.debug("Event received")
if executor.terminated:
self._push_to_db(executor)
+ else:
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ steady = self._evaluate_prior_data(series)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ metrics[metric][io_type]['average'] = \
+ math.average(treated_data['average_data'])
+
+ if not steady:
+ steady_state = False
+
+ executor.metadata['report_data'] = metrics
+ executor.metadata['steady_state'] = steady_state
+
+ if steady_state:
+ executor.terminate()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (self.samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _evaluate_prior_data(self, data_series):
+ self.logger.debug("Data series: %s" % data_series)
+ if len(data_series) == 0:
+ return False
+ earliest_timestamp = data_series[0][0]
+ latest_timestamp = data_series[-1][0]
+ duration = latest_timestamp - earliest_timestamp
+ if (duration < 60 * self.samples):
+ self.logger.debug("Only %s minutes of samples, ignoring" %
+ (duration / 60,))
+ return False
+
+ return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
test_db = os.environ.get('TEST_DB_URL')
sum_yi_xi += xi * yi
sum_yi += yi
- beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \
- (sum_xi**2 - m * sum_xi_sq) # The slope
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
# beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
# needed
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import logging
+
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import math as math
has been reached with the data that is passed to it.
"""
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
# Pre conditioning the data to match the algorithms
treated_data = DataTreatment.data_treatment(data_series)
steady_state = range_condition and slope_condition
+ logger.debug("Range %s < %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s < %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
else:
steady_state = False
k, time_since_last_report)
ready = False
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
return ready
'bs': '64k',
'iodepth': '1',
'numjobs': '1',
- 'loops': '1',
+ 'loops': '20',
'output-format': 'json',
'status-interval': '60'
}
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.carbon import converter
-from storperf.carbon.emitter import CarbonMetricTransmitter
-from time import sleep
import SocketServer
import json
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
import threading
+from time import sleep, strptime
import unittest
+import mock
+
class MetricsHandler(SocketServer.BaseRequestHandler):
t.setDaemon(True)
t.start()
- def test_transmit_metrics(self):
+ @mock.patch("time.gmtime")
+ def test_transmit_metrics(self, mock_time):
+
+ mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
testconv = converter.Converter()
- json_object = json.loads("""{"timestamp" : "12345", "key":"value" }""")
+ json_object = json.loads(
+ """{"timestamp" : "975542400", "key":"value" }""")
result = testconv.convert_json_to_flat(json_object, "host.run-name")
emitter = CarbonMetricTransmitter()
count += 1
sleep(0.1)
- self.assertEqual("host.run-name.key value 12345\n",
+ self.assertEqual("host.run-name.key value 975542400\n",
CarbonMetricTransmitterTest.response,
CarbonMetricTransmitterTest.response)
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.db.graphite_db import GraphiteDB
import unittest
+import mock
+
+from storperf.db.graphite_db import GraphiteDB
+
+
+class MockResponse():
+
+ def __init__(self):
+ self.content = ""
+ self.status_code = 200
+
class GraphiteDBTest(unittest.TestCase):
# self.graphdb.fetch_averages(u'32d31724-fac1-44f3-9033-ca8e00066a36')
pass
+ @mock.patch("requests.get")
+ def test_fetch_series(self, mock_requests):
+
+ response = MockResponse()
+ response.content = """
+[
+ {
+ "datapoints": [
+ [null,1480455880],
+ [null,1480455890],
+ [null,1480455900],
+ [205.345,1480455910],
+ [201.59,1480455920],
+ [205.76,1480455930],
+ [null,1480455940],
+ [null,1480455950],
+ [null,1480455960],
+ [215.655,1480455970],
+ [214.16,1480455980],
+ [213.57,1480455990],
+ [null,1480456000],
+ [null,1480456010],
+ [null,1480456020],
+ [219.37,1480456030],
+ [219.28,1480456040],
+ [217.75,1480456050],
+ [null,1480456060]
+ ],
+ "target":"averageSeries(.8192.*.jobs.1.write.iops)"
+ }
+]"""
+ expected = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+
+ mock_requests.side_effect = (response, )
+
+ actual = self.graphdb.fetch_series("workload", "iops",
+ "write", 0, 600)
+ self.assertEqual(expected, actual)
+
def fetch_workloads(self, workload):
workloads = [[u'32d31724-fac1-44f3-9033-ca8e00066a36.'
u'_warm_up.queue-depth.32.block-size.8192.10-9-15-151',
##############################################################################
import os
+from storperf.utilities.data_handler import DataHandler
import unittest
import mock
-from storperf.utilities.data_handler import DataHandler
-
class MockGraphiteDB(object):
def __init__(self):
self.called = False
+ self.series = []
def fetch_averages(self, job_id):
self.called = True
return None
+ def fetch_series(self, job_id, timeframe):
+ return self.series
+
class DataHandlerTest(unittest.TestCase):
mock.job_id = "1"
self.job_db = mock
self.pushed = False
+ self.current_workload = None
pass
@property
self.pushed = True
pass
- def test_not_terminated_report(self):
- self.data_handler.data_event(self)
+ def terminate(self):
+ self._terminated = True
+
+ @mock.patch("time.time")
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_lookup_prior_data(self, mock_graphite_db, mock_time):
+ self._terminated = False
+ expected = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+ mock_graphite_db.return_value = expected
+ mock_time.return_value = expected[-1][0] + 10
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
+
+ actual = self.data_handler._lookup_prior_data(self, 'read', 'iops')
+ self.assertEqual(expected, actual)
+
+ def test_short_sample(self):
+ series = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(False, actual)
+
+ def test_long_not_steady_sample(self):
+ series = [[4804559100, 205345],
+ [4804559200, 20159],
+ [4804559300, 20576],
+ [4804560300, 21937],
+ [4804560400, 21928],
+ [4804560500, 21775]]
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(False, actual)
+
+ def test_long_steady_sample(self):
+ series = [[4804559100, 205.345],
+ [4804559200, 201.59],
+ [4804559300, 205.76],
+ [4804560300, 219.37],
+ [4804560400, 219.28],
+ [4804560500, 217.75]]
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(True, actual)
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
@mock.patch("storperf.db.test_results_db.push_results_to_db")
self.assertEqual(True, self.pushed)
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
- @mock.patch("storperf.utilities.data_handler.GraphiteDB")
- def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
+ mock_time):
self._terminated = False
mock_results_db.side_effect = self.push_results_to_db
- mock_graphite_db.side_effect = MockGraphiteDB
+ series = \
+ [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+ mock_graphite_db.return_value = series
+ mock_time.return_value = series[-1][0] + 10
+ expected_slope = 0.1185333530108134
+ expected_range = 17.78
+ expected_average = 212.49777777777774
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
self.data_handler.data_event(self)
self.assertEqual(False, self.pushed)
+ self.assertEqual(False, self._terminated)
+
+ self.assertEqual(expected_slope, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['slope'])
+ self.assertEqual(expected_range, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['range'])
+ self.assertEqual(expected_average, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['average'])
+ self.assertEqual(series, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['series'])
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("time.time")
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_report_that_causes_termination(self,
+ mock_graphite_db,
+ mock_results_db,
+ mock_time):
+ self._terminated = False
+ mock_results_db.side_effect = self.push_results_to_db
+ series = [[4804559100, 205.345],
+ [4804559200, 201.59],
+ [4804559300, 205.76],
+ [4804560300, 219.37],
+ [4804560400, 219.28],
+ [4804560500, 217.75]]
+ mock_graphite_db.return_value = series
+ mock_time.return_value = 4804560500 + 10
+
+ expected_slope = 0.011830471529818998
+ expected_range = 17.78
+ expected_average = 211.51583333333335
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
+
+ self.data_handler.data_event(self)
+
+ self.assertEqual(expected_slope, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['slope'])
+ self.assertEqual(expected_range, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['range'])
+ self.assertEqual(expected_average, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['average'])
+ self.assertEqual(series, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['series'])
+ self.assertEqual(True, self._terminated)
+
+ self.assertEqual(False, self.pushed)
+ self.assertEqual(True, self._terminated)
expected = 1.5
actual = Slope.slope([[0.0, 0], [1, 1], [2, 3]])
self.assertEqual(expected, actual)
+
+ def test_infinte_slope(self):
+ expected = None
+ actual = Slope.slope([[1480623510, 1295.87], [1480623520, 1380.79]])
+ self.assertEqual(expected, actual)