source $WORKSPACE/storperf_venv/bin/activate
pip install setuptools
+pip install autoflake=00.6.6
+pip install autopep8==1.2.2
pip install coverage==4.0.3
pip install flask==0.10
pip install flask-restful==0.3.5
pip install html2text==2016.1.8
pip install mock==1.3.0
pip install nose==1.3.7
+pip install pysqlite==2.8.2
pip install python-cinderclient==1.6.0
pip install python-glanceclient==1.1.0
pip install python-heatclient==0.8.0
"""
from storperf.storperf_master import StorPerfMaster
-from storperf.test_executor import UnknownWorkload
from threading import Thread
import cPickle
import getopt
import logging
import logging.config
import logging.handlers
-import os
import socket
import struct
import sys
-import time
-import html2text
import requests
while True:
datagram = self.socket.recv(8192)
chunk = datagram[0:4]
- slen = struct.unpack(">L", chunk)[0]
+ struct.unpack(">L", chunk)[0]
chunk = datagram[4:]
obj = cPickle.loads(chunk)
record = logging.makeLogRecord(obj)
storperf.delete_stack()
except Exception as e:
abort(400, str(e))
- pass
class StartJob(Resource):
storperf.workloads = request.json['workload']
else:
storperf.workloads = None
+ # Add block size, queue depth, number of passes here.
+ if ('workload' in request.json):
+ storperf.workloads = request.json['workload']
job_id = storperf.execute_workloads()
else:
timestamp = str(calendar.timegm(time.gmtime()))
- self.carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.carbon_socket.connect((self.carbon_host, self.carbon_port))
+ carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ carbon_socket.connect((self.carbon_host, self.carbon_port))
for key, metric in metrics.items():
message = key + " " + metric + " " + timestamp
self.logger.debug("Metric: " + message)
- self.carbon_socket.send(message + '\n')
+ carbon_socket.send(message + '\n')
- self.carbon_socket.close()
+ carbon_socket.close()
db.commit()
db.close()
- def start_workload(self, workload_name):
+ def start_workload(self, workload):
"""
Records the start time for the given workload
"""
+ workload_name = workload.fullname
+
if (self.job_id is None):
self.create_job_id()
db.commit()
db.close()
- def end_workload(self, workload_name):
+ def end_workload(self, workload):
"""
Records the end time for the given workload
"""
if (self.job_id is None):
self.create_job_id()
+ workload_name = workload.fullname
+
with db_mutex:
db = sqlite3.connect(JobDB.db_name)
workload_prefix = workload_prefix + "%"
- stats = ()
-
start_time = str(calendar.timegm(time.gmtime()))
end_time = "0"
##############################################################################
from threading import Thread
-import cmd
import json
import logging
import subprocess
self.json_body += line
try:
if line == "}\n":
- self.logger.debug(
- "Have a json snippet: %s", self.json_body)
json_metric = json.loads(self.json_body)
self.json_body = ""
for event_listener in self.event_listeners:
- event_listener(self.callback_id, json_metric)
-
+ try:
+ event_listener(self.callback_id, json_metric)
+ except Exception, e:
+ self.logger.error("Notifying listener %s: %s",
+ self.callback_id, e)
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
- pass
except ValueError:
pass # We might have read from the closed socket, ignore it
parameters=self._make_parameters())
self.stack_id = stack['stack']['id']
- pass
def validate_stack(self):
self._attach_to_openstack()
self._heat_client.stacks.delete(stack_id=self.stack_id)
self.stack_id = None
- pass
-
def execute_workloads(self):
if (self.stack_id is None):
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from threading import Thread
+import copy
import imp
import logging
import os
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._workload_thread = None
@property
def slaves(self):
metric,
callback_id)
- read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
- message = "Average Latency us Read/Write: " + read_latency \
- + "/" + write_latency + " IOPS r/w: " + \
- read_iops + "/" + write_iops
-
- for event_listener in self.event_listeners:
- event_listener(message)
-
self.metrics_emitter.transmit_metrics(carbon_metrics)
def register_workloads(self, workloads):
-
if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
return None
def execute(self):
-
self.job_db.create_job_id()
- for slave in self.slaves:
- t = Thread(target=self.execute_on_node, args=(slave,))
- t.daemon = False
- t.start()
-
+ self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self._workload_thread.start()
return self.job_db.job_id
- def execute_on_node(self, remote_host):
-
- logger = logging.getLogger(__name__ + ":" + remote_host)
-
- invoker = FIOInvoker()
- invoker.remote_host = remote_host
- invoker.register(self.event)
-
- logger.info(
- "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+ def execute_workloads(self):
for workload_module in self.workload_modules:
-
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- logger.debug(
- "Found workload: " + str(constructorMethod))
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
- workload.invoker = invoker
if (workload_name.startswith("_")):
iodepths = [2, ]
- blocksizes = [65536, ]
+ blocksizes = [8192, ]
else:
iodepths = [1, 16, 128]
- blocksizes = [4096, 65536, 1048576]
+ blocksizes = [8192, 4096, 512]
+
+ workload.id = self.job_db.job_id
for blocksize in blocksizes:
for iodepth in iodepths:
-
- full_workload_name = workload_name + \
- ".host." + remote_host + \
- ".queue-depth." + str(iodepth) + \
- ".block-size." + str(blocksize)
-
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
- self.logger.info(
- "Executing workload: " + full_workload_name)
- invoker.callback_id = self.job_db.job_id + \
- "." + full_workload_name
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,))
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ slave_thread.join()
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(full_workload_name)
- workload.execute()
- self.job_db.end_workload(full_workload_name)
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
- logger.info(
- "Finished job " + self.job_db.job_id + " on " + remote_host)
+ self.logger.info("Ended " + workload.fullname)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job
##############################################################################
from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
import os
import sqlite3
import unittest
start_time = "12345"
mock_calendar.side_effect = (start_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
"""select * from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
self.assertEqual(None,
row.fetchone(),
"Should not have been a row in the db")
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.start_workload(workload_name)
- self.job.end_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
self.assertEqual(end_time, row[3], "Did not expect " + str(row[3]))
mock_calendar.side_effect = (start_time_1, start_time_2)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
- self.job.start_workload(workload_name)
- self.job.start_workload(workload_name)
+ self.job.start_workload(workload)
+ self.job.start_workload(workload)
cursor.execute(
"""select job_id, workload, start from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2]))
@mock.patch("uuid.uuid4")
end_time = "54321"
mock_calendar.side_effect = (start_time, end_time,)
mock_uuid.side_effect = (job_id,)
- workload_name = "Workload"
+ workload = rr()
- self.job.end_workload(workload_name)
+ self.job.end_workload(workload)
db = sqlite3.connect(JobDB.db_name)
cursor = db.cursor()
"""select job_id, workload, start, end from jobs
where job_id = ?
and workload = ?""",
- (job_id, workload_name,))
+ (job_id, workload.fullname,))
row = cursor.fetchone()
self.assertNotEqual(None, row, "Should be a row in the db")
self.assertEqual(job_id, row[0], "Did not expect " + str(row[0]))
self.assertEqual(
- workload_name, row[1], "Did not expect " + str(row[1]))
+ workload.fullname, row[1], "Did not expect " + str(row[1]))
# The start time is set to the same time as end if it was never set
# before
self.assertEqual(start_time, row[2], "Did not expect " + str(row[2]))
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+from storperf.workloads.rr import rr
+from storperf.workloads.rs import rs
+from storperf.workloads.rw import rw
+from storperf.workloads.wr import wr
+from storperf.workloads.ws import ws
+
+
+class WorkloadSubclassTest(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_local_name(self):
+ workload = rr()
+ self.assertEqual(workload.fullname,
+ "None.rr.None.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_remote_name(self):
+ workload = rw()
+ workload.remote_host = "192.168.0.1"
+ self.assertEqual(workload.fullname,
+ "None.rw.192-168-0-1.queue-depth.1.block-size.64k",
+ workload.fullname)
+
+ def test_blocksize(self):
+ workload = rs()
+ workload.options["bs"] = "4k"
+ self.assertEqual(workload.fullname,
+ "None.rs.None.queue-depth.1.block-size.4k",
+ workload.fullname)
+
+ def test_queue_depth(self):
+ workload = wr()
+ workload.options["iodepth"] = "8"
+ self.assertEqual(workload.fullname,
+ "None.wr.None.queue-depth.8.block-size.64k",
+ workload.fullname)
+
+ def test_id(self):
+ workload = ws()
+ workload.id = "workloadid"
+ self.assertEqual(workload.fullname,
+ "workloadid.ws.None.queue-depth.1.block-size.64k",
+ workload.fullname)
class _base_workload(object):
def __init__(self):
- self.logger = logging.getLogger(__name__)
+ self.logger = logging.getLogger(self.__class__.__name__)
self.default_filesize = "100%"
self.filename = '/dev/vdb'
self.options = {
'numjobs': '1',
'loops': '2',
'output-format': 'json',
- 'status-interval': '600'
+ 'status-interval': '60'
}
self.invoker = None
+ self.remote_host = None
+ self.id = None
def execute(self):
+ if self.invoker is None:
+ raise ValueError("No invoker has been set")
+
args = []
+ self.invoker.remote_host = self.remote_host
+ self.invoker.callback_id = self.fullname
if self.filename.startswith("/dev"):
self.options['size'] = "100%"
def setup(self):
pass
+
+ @property
+ def remote_host(self):
+ return str(self._remote_host)
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+
+ @property
+ def fullname(self):
+ return str(self.id) + "." + \
+ self.__class__.__name__ + "." + \
+ str(self.remote_host).replace(".", "-") + \
+ ".queue-depth." + str(self.options['iodepth']) + \
+ ".block-size." + str(self.options['bs'])