Support Custom Workloads 71/59571/2
authormbeierl <mark.beierl@dell.com>
Thu, 5 Jul 2018 19:22:43 +0000 (15:22 -0400)
committermbeierl <mark.beierl@dell.com>
Fri, 6 Jul 2018 20:40:31 +0000 (16:40 -0400)
Refactors interaction with test_executor to clean up the
tight coupling.

Adds ability to specify custom workloads.

Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503
JIRA: STORPERF-246
Co-Authored-By: Ameed.Ashour.Ext@Nokia.com
Signed-off-by: mbeierl <mark.beierl@dell.com>
docker/storperf-master/Dockerfile
docker/storperf-master/rest_server.py
docker/storperf-master/storperf/fio/fio_invoker.py
docker/storperf-master/storperf/storperf_master.py
docker/storperf-master/storperf/test_executor.py
docker/storperf-master/storperf/utilities/data_handler.py
docker/storperf-master/storperf/workloads/_base_workload.py
docker/storperf-master/storperf/workloads/_custom_workload.py [new file with mode: 0644]
docker/storperf-master/storperf/workloads/_ssd_preconditioning.py [deleted file]
docs/testing/user/test-usage.rst

index fec3931..c95e3ca 100644 (file)
@@ -21,7 +21,7 @@ FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder
 
 RUN ulimit -n 1024
 
-LABEL version="5.0" description="OPNFV Storperf Docker container"
+LABEL version="7.0" description="OPNFV Storperf Docker container"
 
 ARG BRANCH=master
 
@@ -30,6 +30,7 @@ ENV repos_dir /home/opnfv/repos
 RUN apk --no-cache add --update \
     git \
     alpine-sdk \
+    coreutils \
     linux-headers \
     libaio \
     libaio-dev \
@@ -38,8 +39,8 @@ RUN apk --no-cache add --update \
 # Third party git fetches
 RUN git config --global http.sslVerify false
 RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio
-RUN cd ${repos_dir}/fio && git checkout tags/fio-2.99
-RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make install
+RUN cd ${repos_dir}/fio && git checkout tags/fio-3.7
+RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make -j $(grep -c ^processor /proc/cpuinfo) install
 
 # Build StorPerf
 
index 939e91f..9d9b7fa 100644 (file)
@@ -350,10 +350,10 @@ for any single test iteration.
                 storperf.queue_depths = request.json['queue_depths']
             if ('block_sizes' in request.json):
                 storperf.block_sizes = request.json['block_sizes']
+            storperf.workloads = None
+            storperf.custom_workloads = None
             if ('workload' in request.json):
                 storperf.workloads = request.json['workload']
-            else:
-                storperf.workloads = None
             if ('metadata' in request.json):
                 metadata = request.json['metadata']
             else:
@@ -378,13 +378,98 @@ for any single test iteration.
     )
     def delete(self):
         self.logger.info("Threads: %s" % sys._current_frames())
-        print sys._current_frames()
         try:
             return jsonify({'Slaves': storperf.terminate_workloads()})
         except Exception as e:
             abort(400, str(e))
 
 
+@swagger.model
+class WorkloadV2Model:
+    resource_fields = {
+        'target': fields.String,
+        'deadline': fields.Integer,
+        "steady_state_samples": fields.Integer,
+        'workloads': fields.Nested,
+        'queue_depths': fields.String,
+        'block_sizes': fields.String
+    }
+
+
+class Job_v2(Resource):
+
+    """Job API"""
+
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+
+    @swagger.operation(
+        parameters=[
+            {
+                "name": "body",
+                "description": """Start execution of a workload with the
+following parameters:
+
+"target": The target device to profile",
+
+"deadline": if specified, the maximum duration in minutes
+for any single test iteration.
+
+"workloads":if specified, the workload to run. Defaults to all.
+                """,
+                "required": True,
+                "type": "WorkloadV2Model",
+                "paramType": "body"
+            }
+        ],
+        type=WorkloadResponseModel.__name__,
+        responseMessages=[
+            {
+                "code": 200,
+                "message": "Job submitted"
+            },
+            {
+                "code": 400,
+                "message": "Missing configuration data"
+            }
+        ]
+    )
+    def post(self):
+        if not request.json:
+            abort(400, "ERROR: Missing configuration data")
+
+        self.logger.info(request.json)
+
+        try:
+            if ('target' in request.json):
+                storperf.filename = request.json['target']
+            if ('deadline' in request.json):
+                storperf.deadline = request.json['deadline']
+            if ('steady_state_samples' in request.json):
+                storperf.steady_state_samples = request.json[
+                    'steady_state_samples']
+            if ('queue_depths' in request.json):
+                storperf.queue_depths = request.json['queue_depths']
+            if ('block_sizes' in request.json):
+                storperf.block_sizes = request.json['block_sizes']
+            storperf.workloads = None
+            storperf.custom_workloads = None
+            if ('workloads' in request.json):
+                storperf.custom_workloads = request.json['workloads']
+            if ('metadata' in request.json):
+                metadata = request.json['metadata']
+            else:
+                metadata = {}
+
+            job_id = storperf.execute_workloads(metadata)
+
+            return jsonify({'job_id': job_id})
+
+        except Exception as e:
+            self.logger.exception(e)
+            abort(400, str(e))
+
+
 @swagger.model
 class QuotaModel:
 
@@ -433,6 +518,7 @@ def setup_logging(default_path='logging.json',
 api.add_resource(Configure, "/api/v1.0/configurations")
 api.add_resource(Quota, "/api/v1.0/quotas")
 api.add_resource(Job, "/api/v1.0/jobs")
+api.add_resource(Job_v2, "/api/v2.0/jobs")
 api.add_resource(Logs, "/api/v1.0/logs")
 
 if __name__ == "__main__":
index 0360ea2..a361eec 100644 (file)
@@ -23,6 +23,7 @@ class FIOInvoker(object):
         self.callback_id = None
         self.terminated = False
         self.metadata = var_dict
+        self.stderr = []
 
     @property
     def remote_host(self):
@@ -60,13 +61,13 @@ class FIOInvoker(object):
                                         "Event listener callback")
                                     event_listener(
                                         self.callback_id, json_metric)
-                                except Exception, e:
+                                except Exception as e:
                                     self.logger.exception(
                                         "Notifying listener %s: %s",
                                         self.callback_id, e)
                                 self.logger.debug(
                                     "Event listener callback complete")
-                except Exception, e:
+                except Exception as e:
                     self.logger.error("Error parsing JSON: %s", e)
         except IOError:
             pass  # We might have read from the closed socket, ignore it
@@ -78,6 +79,7 @@ class FIOInvoker(object):
         self.logger.debug("Started")
         for line in iter(stderr.readline, b''):
             self.logger.error("FIO Error: %s", line.rstrip())
+            self.stderr.append(line.rstrip())
 
             # Sometime, FIO gets stuck and will give us this message:
             # fio: job 'sequential_read' hasn't exited in 60 seconds,
@@ -125,6 +127,9 @@ class FIOInvoker(object):
         self.logger.debug("Joining stdout handler")
         tout.join()
         self.logger.debug("Ended")
+        if exit_status != 0:
+            return self.stderr
+        return None
 
     def terminate(self):
         self.logger.debug("Terminating fio on " + self.remote_host)
index 7a1444e..1025789 100644 (file)
@@ -59,7 +59,7 @@ class StorPerfMaster(object):
                                              self.stack_settings)
         self.username = None
         self.password = None
-        self._test_executor = TestExecutor()
+        self._test_executor = None
         self._agent_count = 1
         self._agent_image = "Ubuntu 14.04"
         self._agent_flavor = "storperf"
@@ -72,6 +72,13 @@ class StorPerfMaster(object):
         self._last_snaps_check_time = None
         self._slave_addresses = []
         self._thread_pool = worker_pool(20)
+        self._filename = None
+        self._deadline = None
+        self._steady_state_samples = 10
+        self._queue_depths = [1, 4, 8]
+        self._block_sizes = [512, 4096, 16384]
+        self._workload_modules = []
+        self._custom_workloads = []
 
     @property
     def volume_count(self):
@@ -245,57 +252,68 @@ class StorPerfMaster(object):
 
     @property
     def filename(self):
-        return self._test_executor.filename
+        return self._filename
 
     @filename.setter
     def filename(self, value):
-        self._test_executor.filename = value
+        self._filename = value
 
     @property
     def deadline(self):
-        return self._test_executor.deadline
+        return self._deadline
 
     @deadline.setter
     def deadline(self, value):
-        self._test_executor.deadline = value
+        self._deadline = value
 
     @property
     def steady_state_samples(self):
-        return self._test_executor.steady_state_samples
+        return self._steady_state_samples
 
     @steady_state_samples.setter
     def steady_state_samples(self, value):
-        self._test_executor.steady_state_samples = value
+        self._steady_state_samples = value
 
     @property
     def queue_depths(self):
-        return self._test_executor.queue_depths
+        return self._queue_depths
 
     @queue_depths.setter
     def queue_depths(self, value):
-        self._test_executor.queue_depths = value
+        self._queue_depths = value
 
     @property
     def block_sizes(self):
-        return self._test_executor.block_sizes
+        return self._block_sizes
 
     @block_sizes.setter
     def block_sizes(self, value):
-        self._test_executor.block_sizes = value
-
-    @property
-    def is_stack_created(self):
-        return (self.stack_id is not None and
-                (self.heat_stack.get_status() == u'CREATE_COMPLETE' or
-                 self.heat_stack.get_status() == u'UPDATE_COMPLETE'))
+        self._block_sizes = value
 
     @property
     def workloads(self):
-        return str(self._test_executor.workload_modules)
+        return self._workload_modules
 
     @workloads.setter
     def workloads(self, value):
-        self._test_executor.register_workloads(value)
+        executor = TestExecutor()
+        executor.register_workloads(value)
+        self._workload_modules = value
+
+    @property
+    def custom_workloads(self):
+        return self._custom_workloads
+
+    @custom_workloads.setter
+    def custom_workloads(self, value):
+        self.logger.info("Custom workloads = %s" % value)
+        self._custom_workloads = value
+
+    @property
+    def is_stack_created(self):
+        return (self.stack_id is not None and
+                (self.heat_stack.get_status() == u'CREATE_COMPLETE' or
+                 self.heat_stack.get_status() == u'UPDATE_COMPLETE'))
 
     def get_logs(self, lines=None):
         LOG_DIR = './storperf.log'
@@ -358,14 +376,29 @@ class StorPerfMaster(object):
             self.heat_stack.clean()
         return stack_id
 
+    def executor_event(self, executor):
+        if executor.terminated:
+            self._test_executor = None
+
     def execute_workloads(self, metadata={}):
+        if (self._test_executor is not None and
+                (not self._test_executor.terminated and
+                 self._test_executor.job_id is not None)):
+            raise Exception("ERROR: Job {} is already running".format(
+                self._test_executor.job_id))
+
         if (self.stack_id is None):
             raise ParameterError("ERROR: Stack does not exist")
 
-        if (not self._test_executor.terminated and
-                self._test_executor.job_id is not None):
-            raise Exception("ERROR: Job {} is already running".format(
-                self._test_executor.job_id))
+        self._test_executor = TestExecutor()
+        self._test_executor.register(self.executor_event)
+        self._test_executor.register_workloads(self._workload_modules)
+        self._test_executor.custom_workloads = self.custom_workloads
+        self._test_executor.block_sizes = self._block_sizes
+        self._test_executor.filename = self._filename
+        self._test_executor.deadline = self._deadline
+        self._test_executor.steady_state_samples = self._steady_state_samples
+        self._test_executor.queue_depths = self._queue_depths
 
         slaves = self._slave_addresses
 
@@ -397,10 +430,14 @@ class StorPerfMaster(object):
         return job_id
 
     def terminate_workloads(self):
-        return self._test_executor.terminate()
+        if self._test_executor is not None:
+            return self._test_executor.terminate()
+        else:
+            return True
 
     def fetch_results(self, job_id):
-        if self._test_executor.job_db.job_id == job_id:
+        if (self._test_executor is not None and
+                self._test_executor.job_db.job_id == job_id):
             return self._test_executor.metadata['details']['metrics']
 
         workload_params = self.job_db.fetch_workload_params(job_id)
@@ -413,7 +450,19 @@ class StorPerfMaster(object):
         return self.job_db.fetch_workload_params(job_id)
 
     def fetch_job_status(self, job_id):
-        return self._test_executor.execution_status(job_id)
+        results = {}
+
+        if (self._test_executor is not None and
+                self._test_executor.job_id == job_id):
+            results['Status'] = 'Running'
+            results['Workloads'] = self._test_executor.workload_status
+        else:
+            jobs = self.job_db.fetch_jobs()
+            for job in jobs:
+                if job == job_id:
+                    results['Status'] = "Completed"
+
+        return results
 
     def fetch_all_jobs(self, metrics_type):
         job_list = self.job_db.fetch_jobs()
index 2ed6a9e..38e052e 100644 (file)
@@ -11,6 +11,7 @@ import copy
 import imp
 import json
 import logging
+from multiprocessing.pool import ThreadPool
 from os import listdir
 import os
 from os.path import isfile, join
@@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB
 from storperf.fio.fio_invoker import FIOInvoker
 from storperf.utilities.data_handler import DataHandler
 from storperf.utilities.thread_gate import ThreadGate
+from storperf.workloads._custom_workload import _custom_workload
 
 
 class UnknownWorkload(Exception):
     pass
 
 
+class InvalidWorkloadName(Exception):
+    pass
+
+
 class TestExecutor(object):
 
     def __init__(self):
         self.logger = logging.getLogger(__name__)
         self.workload_modules = []
+        self._custom_workloads = {}
         self.filename = None
         self.deadline = None
         self.steady_state_samples = 10
@@ -43,7 +50,6 @@ class TestExecutor(object):
         self.end_time = None
         self.current_workload = None
         self.workload_status = {}
-        self.result_url = None
         self._queue_depths = [1, 4, 8]
         self._block_sizes = [512, 4096, 16384]
         self.event_listeners = set()
@@ -97,6 +103,16 @@ class TestExecutor(object):
         self.logger.debug("Set volume count to: " + str(volume_count))
         self._volume_count = volume_count
 
+    @property
+    def custom_workloads(self):
+        return self._custom_workloads
+
+    @custom_workloads.setter
+    def custom_workloads(self, custom_workloads):
+        self.logger.debug("Set custom workloads to: %s " %
+                          custom_workloads)
+        self._custom_workloads = custom_workloads
+
     @property
     def queue_depths(self):
         return ','.join(self._queue_depths)
@@ -152,7 +168,7 @@ class TestExecutor(object):
                 self.logger.debug("Notifying event listener %s",
                                   event_listener)
                 event_listener(self)
-            except Exception, e:
+            except Exception as e:
                 self.logger.exception("While notifying listener %s", e)
 
     def register_workloads(self, workloads):
@@ -185,7 +201,7 @@ class TestExecutor(object):
                         "ERROR: Unknown workload: " + workload)
                 if workload_module not in self.workload_modules:
                     self.workload_modules.append(workload_module)
-            except ImportError, err:
+            except ImportError as err:
                 raise UnknownWorkload("ERROR: " + str(err))
 
     def load_from_file(self, uri):
@@ -193,23 +209,26 @@ class TestExecutor(object):
         path, fname = os.path.split(uri)
         mname, _ = os.path.splitext(fname)
         no_ext = os.path.join(path, mname)
-        self.logger.debug("Looking for: " + no_ext)
         if os.path.exists(no_ext + '.pyc'):
-            self.logger.debug("Loading compiled: " + mname + " from " + no_ext)
             return imp.load_compiled(mname, no_ext + '.pyc')
         if os.path.exists(no_ext + '.py'):
-            self.logger.debug("Compiling: " + mname + " from " + no_ext)
             return imp.load_source(mname, no_ext + '.py')
         return None
 
     def execute(self, metadata):
         self.job_db.create_job_id()
+        try:
+            self.test_params()
+        except Exception as e:
+            self.terminate()
+            raise e
         self.job_db.record_workload_params(metadata)
         self._setup_metadata(metadata)
         self._workload_thread = Thread(target=self.execute_workloads,
                                        args=(),
                                        name="Workload thread")
         self._workload_thread.start()
+        # seems to be hanging here
         return self.job_db.job_id
 
     def terminate(self):
@@ -225,38 +244,61 @@ class TestExecutor(object):
             terminated_hosts.append(workload.remote_host)
         return terminated_hosts
 
-    def execution_status(self, job_id):
-
-        result = {}
-        status = "Completed"
-
-        if self.job_db.job_id == job_id and self._terminated is False:
-            status = "Running"
-
-            result['Status'] = status
-            result['Workloads'] = self.workload_status
-            result['TestResultURL'] = self.result_url
-
-        else:
-            jobs = self.job_db.fetch_jobs()
-            self.logger.info("Jobs")
-            self.logger.info(jobs)
-            for job in jobs:
-                if self.job_db.job_id == job_id and self._terminated is False:
-                    status = "Running"
-                    result['Status'] = status
-                    result['Workloads'] = self.workload_status
-                    result['TestResultURL'] = self.result_url
-                else:
-                    result[job] = {}
-                    result[job]['Status'] = "Completed"
-
-        return result
+    def test_params(self):
+        workloads = self._create_workload_matrix()
+        for current_workload in workloads:
+            workload = current_workload['workload']
+            self.logger.info("Testing FIO parameters for %s"
+                             % current_workload)
+            result = self._execute_workload(current_workload,
+                                            workload,
+                                            parse_only=True)
+            if result:
+                message = result[0]
+                self.logger.error("FIO parameter validation failed")
+                raise Exception("Workload parameter validation failed %s"
+                                % message)
+        pass
+
+    def _execute_workload(self, current_workload, workload, parse_only=False):
+        workload.options['iodepth'] = str(current_workload['queue-depth'])
+        workload.options['bs'] = str(current_workload['blocksize'])
+        slave_threads = []
+        thread_pool = ThreadPool(processes=len(self.slaves) *
+                                 self.volume_count)
+
+        for slave in self.slaves:
+            volume_number = 0
+            while volume_number < self.volume_count:
+                slave_workload = copy.copy(current_workload['workload'])
+                slave_workload.remote_host = slave
+                last_char_of_filename = chr(
+                    ord(slave_workload.filename[-1:]) + volume_number)
+                slave_workload.filename = ("%s%s" %
+                                           (slave_workload.filename[:-1],
+                                            last_char_of_filename))
+                self.logger.debug("Device to profile on %s: %s" %
+                                  (slave, slave_workload.filename))
+                self._workload_executors.append(slave_workload)
+
+                worker = thread_pool.apply_async(
+                    self.execute_on_node, (slave_workload, parse_only))
+                slave_threads.append(worker)
+                volume_number += 1
+
+        final_result = None
+        for slave_thread in slave_threads:
+            self.logger.debug("Waiting on %s" % slave_thread)
+            result = slave_thread.get()
+            self.logger.debug("Done waiting for %s, exit status %s" %
+                              (slave_thread, result))
+            if result:
+                final_result = result
+        return final_result
 
     def execute_workloads(self):
         self._terminated = False
         self.logger.info("Starting job %s" % (self.job_db.job_id))
-        self.event_listeners.clear()
         data_handler = DataHandler()
         self.register(data_handler.data_event)
 
@@ -267,12 +309,13 @@ class TestExecutor(object):
         workloads = self._create_workload_matrix()
 
         for current_workload in workloads:
+            if self._terminated:
+                continue
+
             workload = current_workload['workload']
             self._thread_gate = ThreadGate(len(self.slaves),
                                            workload.options['status-interval'])
 
-            if self._terminated:
-                return
             self.current_workload = current_workload['name']
 
             self.logger.info("Starting run %s" % self.current_workload)
@@ -287,34 +330,7 @@ class TestExecutor(object):
                 t = Thread(target=scheduler.run, args=())
                 t.start()
 
-            workload.options['iodepth'] = str(current_workload['queue-depth'])
-            workload.options['bs'] = str(current_workload['blocksize'])
-
-            slave_threads = []
-            for slave in self.slaves:
-                volume_number = 0
-                while volume_number < self.volume_count:
-                    slave_workload = copy.copy(current_workload['workload'])
-                    slave_workload.remote_host = slave
-                    last_char_of_filename = chr(ord(
-                        slave_workload.filename[-1:]) + volume_number)
-                    slave_workload.filename = "%s%s" % \
-                        (slave_workload.filename[:-1], last_char_of_filename)
-                    self.logger.debug("Device to profile: %s" %
-                                      slave_workload.filename)
-                    self._workload_executors.append(slave_workload)
-                    t = Thread(target=self.execute_on_node,
-                               args=(slave_workload,),
-                               name="%s worker" % slave)
-                    t.daemon = False
-                    t.start()
-                    slave_threads.append(t)
-                    volume_number += 1
-
-            for slave_thread in slave_threads:
-                self.logger.debug("Waiting on %s" % slave_thread)
-                slave_thread.join()
-                self.logger.debug("Done waiting for %s" % slave_thread)
+            self._execute_workload(current_workload, workload)
 
             if not scheduler.empty():
                 try:
@@ -337,59 +353,106 @@ class TestExecutor(object):
         report = {'report': json.dumps(self.metadata)}
         self.job_db.record_workload_params(report)
         self.job_db.job_id = None
-        if self.result_url is not None:
-            self.logger.info("Results can be found at %s" % self.result_url)
 
     def _create_workload_matrix(self):
         workloads = []
 
-        for workload_module in self.workload_modules:
-            workload_name = getattr(workload_module, "__name__")
-
-            constructorMethod = getattr(workload_module, workload_name)
-            workload = constructorMethod()
-            if (self.filename is not None):
-                workload.filename = self.filename
-            workload.id = self.job_db.job_id
-
-            if (workload_name.startswith("_")):
-                iodepths = [8, ]
-                blocksizes = [16384, ]
-            else:
-                iodepths = self._queue_depths
-                blocksizes = self._block_sizes
+        if self._custom_workloads:
+            for workload_name in self._custom_workloads.iterkeys():
+                if not workload_name.isalnum():
+                    raise InvalidWorkloadName(
+                        "Workload name must be alphanumeric only: %s" %
+                        workload_name)
+                workload = _custom_workload()
+                workload.options['name'] = workload_name
+                workload.name = workload_name
+                if (self.filename is not None):
+                    workload.filename = self.filename
+                workload.id = self.job_db.job_id
+
+                workload_params = self._custom_workloads[workload_name]
+                for param, value in workload_params.iteritems():
+                    if param == "readwrite":
+                        param = "rw"
+                    if param in workload.fixed_options:
+                        self.logger.warn("Skipping fixed option %s" % param)
+                        continue
+                    workload.options[param] = value
+
+                for blocksize in self._block_sizes:
+                    for iodepth in self._queue_depths:
+
+                        name = '%s.%s.queue-depth.%s.block-size.%s' % \
+                            (self.job_db.job_id, workload_name, iodepth,
+                             blocksize)
+                        self.workload_status[name] = "Pending"
+
+                        workload.options['bs'] = blocksize
+                        workload.options['iodepth'] = iodepth
+
+                        parameters = {'queue-depth': iodepth,
+                                      'blocksize': blocksize,
+                                      'name': name,
+                                      'workload_name': workload_name,
+                                      'status': 'Pending',
+                                      'workload': workload}
+
+                        self.logger.info("Workload %s=%s" %
+                                         (name, workload.options))
+
+                        workloads.append(parameters)
+        else:
+            for workload_module in self.workload_modules:
+                workload_name = getattr(workload_module, "__name__")
+
+                constructorMethod = getattr(workload_module, workload_name)
+                workload = constructorMethod()
+                if (self.filename is not None):
+                    workload.filename = self.filename
+                workload.id = self.job_db.job_id
+
+                if (workload_name.startswith("_")):
+                    iodepths = [8, ]
+                    blocksizes = [16384, ]
+                else:
+                    iodepths = self._queue_depths
+                    blocksizes = self._block_sizes
 
-            for blocksize in blocksizes:
-                for iodepth in iodepths:
+                for blocksize in blocksizes:
+                    for iodepth in iodepths:
 
-                    name = '%s.%s.queue-depth.%s.block-size.%s' % \
-                        (self.job_db.job_id, workload_name, iodepth, blocksize)
-                    self.workload_status[name] = "Pending"
+                        name = '%s.%s.queue-depth.%s.block-size.%s' % \
+                            (self.job_db.job_id, workload_name, iodepth,
+                             blocksize)
+                        self.workload_status[name] = "Pending"
 
-                    parameters = {'queue-depth': iodepth,
-                                  'blocksize': blocksize,
-                                  'name': name,
-                                  'workload_name': workload_name,
-                                  'status': 'Pending',
-                                  'workload': workload}
+                        parameters = {'queue-depth': iodepth,
+                                      'blocksize': blocksize,
+                                      'name': name,
+                                      'workload_name': workload_name,
+                                      'status': 'Pending',
+                                      'workload': workload}
 
-                    self.logger.info("Workload %s=%s" % (name, parameters))
+                        self.logger.info("Workload %s=%s" % (name, parameters))
 
-                    workloads.append(parameters)
+                        workloads.append(parameters)
 
         return workloads
 
-    def execute_on_node(self, workload):
+    def execute_on_node(self, workload, parse_only=False):
 
         invoker = FIOInvoker(self.metadata)
-        invoker.register(self.event)
         workload.invoker = invoker
 
         self.logger.info("Starting " + workload.fullname)
 
-        self.job_db.start_workload(workload)
-        workload.execute()
-        self.job_db.end_workload(workload)
-        invoker.unregister(self.event)
+        if not parse_only:
+            invoker.register(self.event)
+            self.job_db.start_workload(workload)
+        result = workload.execute(parse_only)
+        if not parse_only:
+            self.job_db.end_workload(workload)
+            invoker.unregister(self.event)
 
         self.logger.info("Ended " + workload.fullname)
+        return result
index b85517f..c7d70a7 100644 (file)
@@ -151,14 +151,12 @@ class DataHandler(object):
         test_db = os.environ.get('TEST_DB_URL')
         if test_db is not None:
             self.logger.info("Pushing results to %s" % (test_db))
-            try:
-                response = test_results_db.push_results_to_db(
-                    test_db,
-                    executor.metadata,
-                    self.logger)
-                executor.result_url = response['href']
-            except Exception:
-                self.logger.exception("Error pushing results into Database")
+            response = test_results_db.push_results_to_db(
+                test_db,
+                executor.metadata,
+                self.logger)
+            if response:
+                self.logger.info("Results reference: %s" % response['href'])
 
     def _determine_criteria(self, metadata):
         steady_state = True
index c2c7b7b..9b04314 100644 (file)
@@ -30,8 +30,9 @@ class _base_workload(object):
         self.invoker = None
         self.remote_host = None
         self.id = None
+        self.name = self.__class__.__name__
 
-    def execute(self):
+    def execute(self, parse_only=False):
         if self.invoker is None:
             raise ValueError("No invoker has been set")
 
@@ -55,7 +56,10 @@ class _base_workload(object):
         for key, value in self.options.iteritems():
             args.append('--' + key + "=" + value)
 
-        self.invoker.execute(args)
+        if parse_only:
+            args.append('--parse-only')
+
+        return self.invoker.execute(args)
 
     def terminate(self):
         if self.invoker is not None:
@@ -74,11 +78,11 @@ class _base_workload(object):
 
     @property
     def fullname(self):
-        host_file = self.remote_host+"."+self.filename
+        host_file = self.remote_host + "." + self.filename
         host_file = host_file.replace(".", "-").replace("/", "-")
         return ("%s.%s.queue-depth.%s.block-size.%s.%s"
                 % (str(self.id),
-                   self.__class__.__name__,
+                   self.name,
                    str(self.options['iodepth']),
                    str(self.options['bs']),
                    host_file))
diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py
new file mode 100644 (file)
index 0000000..9e0100d
--- /dev/null
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+from storperf.workloads import _base_workload
+
+
+class _custom_workload(_base_workload._base_workload):
+
+    def __init__(self):
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.default_filesize = "1G"
+        self.filename = '/dev/vdb'
+        self.fixed_options = {
+            'loops': '200',
+            'output-format': 'json',
+            'status-interval': '60'
+        }
+        self.options = {
+            'ioengine': 'libaio',
+            'direct': '1',
+            'numjobs': '1',
+            'rw': 'read',
+            'bs': '64k',
+            'iodepth': '1'
+        }
+        self.options.update(self.fixed_options)
+        self.invoker = None
+        self.remote_host = None
+        self.id = None
diff --git a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py
deleted file mode 100644 (file)
index cce3c31..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 EMC and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from storperf.workloads import _base_workload
-
-
-class _ssd_preconditioning(_base_workload._base_workload):
-
-    def setup(self):
-        self.options['name'] = 'ssd_preconditioning'
-        self.options['rw'] = 'randwrite'
-        self.options['loops'] = '1'
index 78bee4e..ef54b6b 100644 (file)
@@ -37,6 +37,7 @@ Configure The Environment
 The following pieces of information are required to prepare the environment:
 
 - The number of VMs/Cinder volumes to create.
+- The Cinder volume type (optional) to create
 - The Glance image that holds the VM operating system to use.
 - The OpenStack flavor to use when creating the VMs.
 - The name of the public network that agents will use.
@@ -66,20 +67,21 @@ takes a JSON payload as follows.
 
 .. code-block:: json
 
-   {
-     "agent_count": int,
-     "agent_flavor": string
-     "agent_image": string,
-     "public_network": string,
-     "volume_size": int,
-     "volume_count": int,
-     "availability_zone": string,
-     "username": string,
-     "password": string
-   }
+       {
+         "agent_count": int,
+         "agent_flavor": "string",
+         "agent_image": "string",
+         "availability_zone": "string",
+         "password": "string",
+         "public_network": "string",
+         "username": "string",
+         "volume_count": int,
+         "volume_size": int,
+         "volume_type": "string"
+       }
 
 This call will block until the stack is created, at which point it will return
-the OpenStack heat stack id.
+the OpenStack heat stack id as well as the IP addresses of the slave agents.
 
 Initialize the Cinder Volumes
 =============================
@@ -128,12 +130,95 @@ rr
 rs
    Read, Sequential.  100% read of sequential blocks of data
 rw
-   Read / Write Mix, Random.  70% random read, 30% random write
+   Read / Write Mix, Sequential.  70% random read, 30% random write
 wr
    Write, Random.  100% write of random blocks
 ws
    Write, Sequential.  100% write of sequential blocks.
 
+Custom Workload Types
+~~~~~~~~~~~~~~~~~~~~~
+New in Gambia (7.0), you can specify custom workload parameters for StorPerf
+to pass on to FIO.  This is available in the /api/v2.0/jobs API, and takes
+a different format than the default v1.0 API.
+
+The format is as follows:
+
+.. code-block:: json
+
+  "workloads": {
+    "name": {
+       "fio argument": "fio value"
+    }
+  }
+
+The name is used the same way the 'rr', 'rs', 'rw', etc is used, but can be
+any arbitrary alphanumeric string.  This is for you to identify the job later.
+Following the name is a series of arguments to pass on to FIO.  The most
+important on of these is the actual I/O operation to perform.  From the `FIO
+manual`__, there are a number of different workloads:
+
+.. _FIO_IOP: http://git.kernel.dk/cgit/fio/tree/HOWTO#n985
+__ FIO_IOP_
+
+* read
+* write
+* trim
+* randread
+* etc
+
+This is an example of how the original 'ws' workload looks in the new format:
+
+.. code-block:: json
+
+  "workloads": {
+    "ws": {
+       "rw": "write"
+    }
+  }
+
+Using this format, it is now possible to initiate any combination of IO
+workload type.  For example, a mix of 60% reads and 40% writes scattered
+randomly throughout the volume being profiled would be:
+
+.. code-block:: json
+
+  "workloads": {
+    "6040randrw": {
+       "rw": "randrw",
+        "rwmixread": "60"
+    }
+  }
+
+Additional arguments can be added as needed.  Here is an example of random
+writes, with 25% duplicated blocks, followed by a second run of 75/25% mixed
+reads and writes.  This can be used to test the deduplication capabilities
+of the underlying storage driver.
+
+.. code-block:: json
+
+  "workloads": {
+    "dupwrite": {
+       "rw": "randwrite",
+        "dedupe_percentage": "25"
+    },
+    "7525randrw": {
+       "rw": "randrw",
+        "rwmixread": "75",
+        "dedupe_percentage": "25"
+    }
+  }
+
+There is no limit on the number of workloads and additional FIO arguments
+that can be specified.
+
+Note that as in v1.0, the list of workloads will be iterated over with the
+block sizes and queue depths specified.
+
+StorPerf will also do a verification of the arguments given prior to returning
+a Job ID from the ReST call.  If an argument fails validation, the error
+will be returned in the payload of the response.
+
 Block Sizes
 ~~~~~~~~~~~
 A comma delimited list of the different block sizes to use when reading and