.coverage
 .settings
 storperf.egg-info
+*.db
 
 #!/bin/bash
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
 echo "Creating a docker image from the current working directory..."
 
 
 docker build -t opnfv/storperf:dev .
 
-rm Dockerfile
+rm -f Dockerfile
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
 from setuptools import setup, find_packages
 
 
                       "flake8>=2.5.1",
                       "flask>=0.10.1",
                       "flask-restful>=0.3.5",
+                      "html2text>=2016.1.8",
                       "mock>=1.3",
                       "pyyaml>=3.11",
                       "python-cinderclient>=1.5.0",
 
+++ /dev/null
-heat_template_version: 2013-05-23
-
-parameters:
-  flavor:
-    type: string
-    default: m1.small
-  image:
-    type: string
-    default: ubuntu-server
-  key_name:
-    type: string
-  public_net_id:
-    type: string
-    default: public
-  username:
-    type: string
-    default: storperf
-
-resources:
-  storperf_manager:
-    type: "OS::Nova::Server"
-    properties:
-      name: storperf-manager
-      image: { get_param: image }
-      flavor: { get_param: flavor }
-      key_name: { get_param: key_name }
-      networks:
-        - port: { get_resource: storperf_manager_port }
-      user_data: { get_resource: storperf_manager_config }
-      user_data_format: RAW
-
-  storperf_manager_config:
-    type: "OS::Heat::CloudConfig"
-    properties:
-      cloud_config:
-        users:
-        - name: { get_param: username }
-          groups: users
-          shell: /bin/bash
-          sudo: "ALL=(ALL) NOPASSWD:ALL"
-          ssh_authorized_keys:
-          - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07
-        - default
-        package_update: true
-        package_upgrade: true
-        packages:
-        - fio
-        - python
-        - rsync
-        - graphite-carbon
-        - graphite-web
-        - apache2
-        - libapache2-mod-wsgi
-        - curl
-
-  storperf_manager_port:
-    type: "OS::Neutron::Port"
-    properties:
-      network_id: { get_resource: storperf_agent_net }
-      security_groups:
-        - { get_resource: storperf_security_group }
-
-  storperf_manager_ip:
-    type: "OS::Neutron::FloatingIP"
-    properties:
-      floating_network_id: { get_param: public_net_id }
-      port_id: { get_resource: storperf_manager_port }
-
-  storperf_agent_net:
-    type: "OS::Neutron::Net"
-    properties:
-      name: storperf-agent-network
-
-  storperf_agent_subnet:
-    type: "OS::Neutron::Subnet"
-    properties:
-      name: StorPerf-Agent-Subnet
-      network_id: { get_resource: storperf_agent_net }
-      cidr: 192.168.101.0/24
-      gateway_ip: 192.168.101.1
-      enable_dhcp: true
-      allocation_pools:
-        - start: "192.168.101.2"
-          end: "192.168.101.250"
-
-  storperf_security_group:
-    type: OS::Neutron::SecurityGroup
-    properties:
-      description: Neutron security group rules
-      name: storperf_security_group
-      rules:
-      - remote_ip_prefix: 0.0.0.0/0
-        protocol: tcp
-        direction: ingress
-      - remote_ip_prefix: 0.0.0.0/0
-        protocol: icmp
-        direction: ingress
-
-  router:
-    type: OS::Neutron::Router
-
-  router_gateway:
-    type: OS::Neutron::RouterGateway
-    properties:
-      router_id: { get_resource: router }
-      network_id: { get_param: public_net_id }
-
-  router_interface:
-    type: OS::Neutron::RouterInterface
-    properties:
-      router_id: { get_resource: router }
-      subnet_id: { get_resource: storperf_agent_subnet }
-
-outputs:
-  public_ip:
-    description: Floating IP address in public network
-    value: { get_attr: [ storperf_manager_ip, floating_ip_address ] }
 
+++ /dev/null
-PYTHONPATH="`pwd`/storperf":"`pwd`/tests" nosetests --with-xunit .
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+"""
+
+from storperf.storperf_master import StorPerfMaster
+from storperf.test_executor import UnknownWorkload
+from threading import Thread
+import cPickle
+import getopt
+import json
+import logging
+import logging.config
+import logging.handlers
+import os
+import socket
+import struct
+import sys
+import time
+
+import html2text
+import requests
+
+
+class Usage(Exception):
+    pass
+
+
+def event(event_string):
+    logging.getLogger(__name__).info(event_string)
+
+
+class LogRecordStreamHandler(object):
+
+    def __init__(self):
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.socket.bind((
+            'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT))
+        self.level = logging.INFO
+
+    def read_logs(self):
+        try:
+            while True:
+                datagram = self.socket.recv(8192)
+                chunk = datagram[0:4]
+                slen = struct.unpack(">L", chunk)[0]
+                chunk = datagram[4:]
+                obj = cPickle.loads(chunk)
+                record = logging.makeLogRecord(obj)
+                if (record.levelno >= self.level):
+                    logger = logging.getLogger(record.name)
+                    logger.handle(record)
+
+        except Exception as e:
+            print "ERROR: " + str(e)
+        finally:
+            self.socket.close()
+
+
+def main(argv=None):
+    verbose = False
+    debug = False
+    report = None
+    erase = False
+    options = {}
+
+    storperf = StorPerfMaster()
+
+    if argv is None:
+        argv = sys.argv
+    try:
+        try:
+            opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh",
+                                       ["target=",
+                                        "workload=",
+                                        "report=",
+                                        "configure=",
+                                        "erase",
+                                        "nossd",
+                                        "nowarm",
+                                        "verbose",
+                                        "debug",
+                                        "help",
+                                        ])
+        except getopt.error, msg:
+            raise Usage(msg)
+
+        configuration = None
+        options['workload'] = None
+
+        for o, a in opts:
+            if o in ("-h", "--help"):
+                print __doc__
+                return 0
+            elif o in ("-t", "--target"):
+                options['filename'] = a
+            elif o in ("-v", "--verbose"):
+                verbose = True
+            elif o in ("-d", "--debug"):
+                debug = True
+            elif o in ("-s", "--nossd"):
+                options['nossd'] = a
+            elif o in ("-c", "--nowarm"):
+                options['nowarm'] = False
+            elif o in ("-w", "--workload"):
+                options['workload'] = a
+            elif o in ("-r", "--report"):
+                report = a
+            elif o in ("-e", "--erase"):
+                erase = True
+            elif o in ("-f", "--configure"):
+                configuration = dict(x.split('=') for x in a.split(','))
+
+        if (debug) or (verbose):
+            udpserver = LogRecordStreamHandler()
+
+            if (debug):
+                udpserver.level = logging.DEBUG
+
+            logging.basicConfig(format="%(asctime)s - %(name)s - " +
+                                "%(levelname)s - %(message)s")
+
+            t = Thread(target=udpserver.read_logs, args=())
+            t.setDaemon(True)
+            t.start()
+
+        if (erase):
+            response = requests.delete(
+                'http://127.0.0.1:5000/api/v1.0/configure')
+            if (response.status_code == 400):
+                content = json.loads(response.content)
+                raise Usage(content['message'])
+            return 0
+
+        if (configuration is not None):
+            response = requests.post(
+                'http://127.0.0.1:5000/api/v1.0/configure', json=configuration)
+            if (response.status_code == 400):
+                content = json.loads(response.content)
+                raise Usage(content['message'])
+
+        if (report is not None):
+            print storperf.fetch_results(report, workloads)
+        else:
+            print "Calling start..."
+            response = requests.post(
+                'http://127.0.0.1:5000/api/v1.0/start', json=options)
+            if (response.status_code == 400):
+                content = json.loads(response.content)
+                raise Usage(content['message'])
+
+            content = json.loads(response.content)
+            print "Started job id: " + content['job_id']
+
+    except Usage as e:
+        print >> sys.stderr, str(e)
+        print >> sys.stderr, "For help use --help"
+        return 2
+
+    except Exception as e:
+        print >> sys.stderr, str(e)
+        return 2
+
+
+if __name__ == "__main__":
+    sys.exit(main())
 
-########################################
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 #   Docker container for STORPERF
-########################################
+#
 # Purpose: docker image for Storperf project
 #
 # Maintained by Jose Lausuch, Mark Beierl
 #    $ docker build -t opnfv/storperf:tag .
 #
 # Execution:
-#    $ docker run -t -i opnfv/storperf /bin/bash
+#    $ docker run -t opnfv/storperf /bin/bash
 #
 
 
 python-pip \
 --no-install-recommends
 
+
+# Allow root SSH access with 'storperf' as the password
+
+RUN echo 'root:storperf' | chpasswd
+RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config
+
+# SSH login fix. Otherwise user is kicked off after login
+RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd
+
+ENV NOTVISIBLE "in users profile"
+RUN echo "export VISIBLE=now" >> /etc/profile
+RUN mkdir -p /var/run/sshd
+
 RUN mkdir -p ${repos_dir}
 RUN mkdir -p /root/.ssh
 RUN chmod 700 /root/.ssh
 RUN git clone https://gerrit.opnfv.org/gerrit/releng ${repos_dir}/releng
 RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio
 RUN cd ${repos_dir}/fio && git checkout tags/fio-2.2.10
-RUN cd ${repos_dir}/fio && make -j 4 install
+RUN cd ${repos_dir}/fio && make -j 6 install
 RUN puppet module install gdsoperations-graphite
 
+RUN chmod 600 ${repos_dir}/storperf/storperf/resources/ssh/storperf_rsa
+
+RUN pip install -r ${repos_dir}/storperf/docker/requirements.pip
+
 COPY storperf.pp /etc/puppet/manifests/storperf.pp
 RUN puppet apply /etc/puppet/manifests/storperf.pp
 
-#Let others connect to Graphite if they want our data
+# Open access to SSH if desired
+EXPOSE 22
+
+# Graphite
 EXPOSE 8000
 
+# ReST API
+EXPOSE 5000
+
 COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf
 CMD ["/usr/bin/supervisord"]
 
--- /dev/null
+pyyaml==3.10
+python-neutronclient==2.6.0
+python-heatclient==0.8.0
+python-novaclient==2.28.1
+python-glanceclient==1.1.0
+python-cinderclient==1.4.0
+python-keystoneclient==1.6.0
+flask>=0.10
+flask-restful>=0.3.5
+html2text==2016.1.8
\ No newline at end of file
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
 class { 'python':
   pip        => true,
   dev        => true,
 
 stdout_logfile = /var/log/supervisor/%(program_name)s.log
 stderr_logfile = /var/log/supervisor/%(program_name)s.log
 autorestart = true
+
+[program:sshd]
+user = root
+command = /usr/sbin/sshd =D
+stdout_logfile = /var/log/supervisor/%(program_name)s.log
+stderr_logfile = /var/log/supervisor/%(program_name)s.log
+autorestart = true
+
+[program:storperf-webapp]
+user = root
+directory = /home/opnfv/repos/storperf
+command = /usr/bin/python /home/opnfv/repos/storperf/rest_server.py
+stdout_logfile = /var/log/supervisor/%(program_name)s.log
+stderr_logfile = /var/log/supervisor/%(program_name)s.log
+autorestart = true
+
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.storperf_master import StorPerfMaster
+import json
+import logging
+import logging.config
+import os
+
+from flask import abort, Flask, request, jsonify
+from flask_restful import Resource, Api
+
+
+app = Flask(__name__)
+api = Api(app)
+storperf = StorPerfMaster()
+
+
+class Configure(Resource):
+
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+
+    def get(self):
+        return jsonify({'agent_count': storperf.agent_count,
+                        'agent_network': storperf.agent_network,
+                        'volume_size': storperf.volume_size,
+                        'stack_created': storperf.is_stack_created,
+                        'stack_id': storperf.stack_id})
+
+    def post(self):
+        if not request.json:
+            abort(400, "ERROR: No data specified")
+
+        try:
+            if ('agent_count' in request.json):
+                storperf.agent_count = request.json['agent_count']
+            if ('agent_network' in request.json):
+                storperf.agent_network = request.json['agent_network']
+            if ('volume_size' in request.json):
+                storperf.volume_size = request.json['volume_size']
+
+            storperf.validate_stack()
+            storperf.create_stack()
+
+            return jsonify({'agent_count': storperf.agent_count,
+                            'agent_network': storperf.agent_network,
+                            'volume_size': storperf.volume_size,
+                            'stack_id': storperf.stack_id})
+
+        except Exception as e:
+            abort(400, str(e))
+
+    def delete(self):
+        try:
+            storperf.delete_stack()
+        except Exception as e:
+            abort(400, str(e))
+        pass
+
+
+class StartJob(Resource):
+
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+
+    def post(self):
+        if not request.json:
+            abort(400, "ERROR: Missing configuration data")
+
+        self.logger.info(request.json)
+
+        try:
+            if ('target' in request.json):
+                storperf.filename = request.json['filename']
+            if ('nossd' in request.json):
+                storperf.precondition = False
+            if ('nowarm' in request.json):
+                storperf.warm_up = False
+            if ('workload' in request.json):
+                storperf.workloads = request.json['workload']
+
+            job_id = storperf.execute_workloads()
+
+            return jsonify({'job_id': job_id})
+
+        except Exception as e:
+            abort(400, str(e))
+
+
+class Quota(Resource):
+
+    def get(self):
+        quota = storperf.get_volume_quota()
+        return jsonify({'quota': quota})
+
+
+def setup_logging(default_path='storperf/logging.json',
+                  default_level=logging.INFO, env_key='LOG_CFG'):
+    """Setup logging configuration
+    """
+
+    path = default_path
+    value = os.getenv(env_key, None)
+    if value:
+        path = value
+    if os.path.exists(path):
+        with open(path, 'rt') as f:
+            config = json.load(f)
+        logging.config.dictConfig(config)
+    else:
+        logging.basicConfig(level=default_level)
+
+    socketHandler = logging.handlers.DatagramHandler(
+        'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT)
+    rootLogger = logging.getLogger('')
+    rootLogger.addHandler(socketHandler)
+
+
+api.add_resource(Configure, "/api/v1.0/configure")
+api.add_resource(Quota, "/api/v1.0/quota")
+api.add_resource(StartJob, "/api/v1.0/start")
+
+if __name__ == "__main__":
+    setup_logging()
+    logging.getLogger("storperf").setLevel(logging.DEBUG)
+
+    app.run(host='0.0.0.0', debug=True)
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from _sqlite3 import OperationalError
+import logging
+import sqlite3
+
+
+class ConfigurationDB(object):
+
+    db_name = "StorPerf.db"
+
+    def __init__(self):
+        """
+        Creates the StorPerf.db and configuration tables on demand
+        """
+
+        self.logger = logging.getLogger(__name__)
+        self.logger.debug("Connecting to " + ConfigurationDB.db_name)
+        db = sqlite3.connect(ConfigurationDB.db_name)
+
+        cursor = db.cursor()
+        try:
+            cursor.execute('''CREATE TABLE configuration
+            (configuration_name text,
+            key text,
+            value text)''')
+            self.logger.debug("Created configuration table")
+        except OperationalError:
+            self.logger.debug("Configuration table exists")
+
+        cursor.execute('SELECT * FROM configuration')
+
+    def delete_configuration_value(self, configuration_name, key):
+        """Deletes the value associated with the given key
+        """
+
+        db = sqlite3.connect(ConfigurationDB.db_name)
+        cursor = db.cursor()
+
+        cursor.execute(
+            "delete from configuration where configuration_name=? and key=?",
+            (configuration_name, key))
+
+        self.logger.debug("Deleted " + configuration_name + ":" + key)
+
+        db.commit()
+
+    def get_configuration_value(self, configuration_name, key):
+        """Returns a string representation of the value stored
+        with this key under the given configuration name.
+        """
+
+        db = sqlite3.connect(ConfigurationDB.db_name)
+        cursor = db.cursor()
+
+        cursor.execute(
+            """select value from configuration
+                       where configuration_name = ?
+                       and key = ?""",
+            (configuration_name, key,))
+
+        row = cursor.fetchone()
+
+        if (row is None):
+            self.logger.debug(
+                configuration_name + ":" + key + " does not exist")
+            return None
+        else:
+            self.logger.debug(
+                configuration_name + ":" + key + " is " + str(row[0]))
+            return str(row[0])
+
+    def set_configuration_value(self, configuration_name, key, value):
+        """Updates or creates the key under the given configuration
+        name so that it holds the value specified.
+        """
+
+        if (value is None):
+            return self.delete_configuration_value(configuration_name, key)
+
+        value = str(value)
+
+        db = sqlite3.connect(ConfigurationDB.db_name)
+        cursor = db.cursor()
+
+        cursor.execute(
+            "delete from configuration where configuration_name=? and key=?",
+            (configuration_name, key))
+
+        cursor.execute(
+            """insert into configuration(configuration_name, key, value)
+             values (?,?,?)""", (configuration_name, key, value))
+
+        self.logger.debug(configuration_name + ":" + key + " set to " + value)
+
+        db.commit()
 
 
         self.logger = logging.getLogger(__name__)
         self.logger.debug("Connecting to " + JobDB.db_name)
-        self.db = sqlite3.connect(JobDB.db_name)
         self.job_id = None
 
-        cursor = self.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
         try:
             cursor.execute('''CREATE TABLE jobs
             (job_id text,
         Returns a job id that is guaranteed to be unique in this
         StorPerf instance.
         """
-        cursor = self.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
 
         self.job_id = str(uuid.uuid4())
         row = cursor.execute(
         cursor.execute(
             "insert into jobs(job_id) values (?)", (self.job_id,))
         self.logger.debug("Reserved job id " + self.job_id)
-        self.db.commit()
+        db.commit()
 
     def start_workload(self, workload_name):
         """
         if (self.job_id is None):
             self.create_job_id()
 
-        cursor = self.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
+
         now = str(calendar.timegm(time.gmtime()))
 
         row = cursor.execute(
                  now,
                  workload_name,))
 
-        self.db.commit()
+        db.commit()
 
     def end_workload(self, workload_name):
         """
         if (self.job_id is None):
             self.create_job_id()
 
-        cursor = self.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
         now = str(calendar.timegm(time.gmtime()))
 
         row = cursor.execute(
                  now,
                  workload_name,))
 
-        self.db.commit()
+        db.commit()
 
     def fetch_results(self, workload_prefix=""):
         if (workload_prefix is None):
 
         self.logger.debug("Workload like: " + workload_prefix)
 
-        cursor = self.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
         cursor.execute("""select start, end, workload
             from jobs where workload like ?""",
                        (workload_prefix,))
                 '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
                 start_time + "&until=" + end_time
 
-            print '\n\t' + request + '\n'
-
             response = requests.get(request)
 
             if (response.status_code == 200):
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
+from threading import Thread
+import cmd
 import json
 import logging
 import subprocess
-from threading import Thread
 
 
 class FIOInvoker(object):
     def __init__(self):
         self.logger = logging.getLogger(__name__)
         self.event_listeners = set()
+        self.event_callback_ids = set()
+        self._remote_host = None
+        self.callback_id = None
+
+    @property
+    def remote_host(self):
+        return self._remote_host
+
+    @remote_host.setter
+    def remote_host(self, value):
+        self._remote_host = value
+        self.logger = logging.getLogger(__name__ + ":" + value)
 
     def register(self, event_listener):
         self.event_listeners.add(event_listener)
                         self.json_body = ""
 
                         for event_listener in self.event_listeners:
-                            event_listener(json_metric)
+                            event_listener(self.callback_id, json_metric)
 
                 except Exception, e:
                     self.logger.error("Error parsing JSON: %s", e)
         self.fio_process.stderr.close()
 
     def execute(self, args=[]):
-        for arg in args:
-            self.logger.debug("FIO arg: " + arg)
-
-        self.fio_process = subprocess.Popen(['fio'] + args,
+        self.logger.debug("FIO args " + str(args))
+
+        if (self.remote_host is None):
+            cmd = "fio"
+        else:
+            cmd = "ssh"
+            additional_args = ['-o', 'StrictHostKeyChecking=no',
+                               '-i', 'storperf/resources/ssh/storperf_rsa',
+                               'ubuntu@' + self.remote_host, "./fio"]
+            args = additional_args + args
+
+        self.fio_process = subprocess.Popen([cmd] + args,
                                             universal_newlines=True,
                                             stdout=subprocess.PIPE,
                                             stderr=subprocess.PIPE)
 
             "stream": "ext://sys.stdout"
         },
 
-        "info_file_handler": {
+        "file_handler": {
             "class": "logging.handlers.RotatingFileHandler",
-            "level": "INFO",
+            "level": "DEBUG",
             "formatter": "simple",
-            "filename": "info.log",
+            "filename": "storperf.log",
             "maxBytes": 10485760,
             "backupCount": 20,
             "encoding": "utf8"
     },
 
     "root": {
-        "level": "INFO",
-        "handlers": ["console", "info_file_handler", "error_file_handler"]
+        "level": "WARN",
+        "handlers": ["console", "file_handler", "error_file_handler"]
+    },
+
+    "storperf": {
+        "level": "DEBUG",
+        "handlers": ["console", "file_handler", "error_file_handler"]
     }
 }
\ No newline at end of file
 
+++ /dev/null
-##############################################################################
-# Copyright (c) 2015 EMC and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import getopt
-import json
-import logging.config
-import os
-import sys
-
-from test_executor import TestExecutor, UnknownWorkload
-
-"""
-"""
-
-
-class Usage(Exception):
-
-    def __init__(self, msg):
-        self.msg = msg
-
-
-def setup_logging(
-    default_path='storperf/logging.json',
-    default_level=logging.INFO,
-    env_key='LOG_CFG'
-):
-    """Setup logging configuration
-
-    """
-    path = default_path
-    value = os.getenv(env_key, None)
-    if value:
-        path = value
-    if os.path.exists(path):
-        with open(path, 'rt') as f:
-            config = json.load(f)
-        logging.config.dictConfig(config)
-    else:
-        logging.basicConfig(level=default_level)
-
-
-def event(event_string):
-    logging.getLogger(__name__).info(event_string)
-
-
-def main(argv=None):
-    setup_logging()
-    test_executor = TestExecutor()
-    verbose = False
-    debug = False
-    workloads = None
-    report = None
-
-    if argv is None:
-        argv = sys.argv
-    try:
-        try:
-            opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh",
-                                       ["target=",
-                                        "workload=",
-                                        "report=",
-                                        "nossd",
-                                        "nowarm",
-                                        "verbose",
-                                        "debug",
-                                        "help",
-                                        ])
-        except getopt.error, msg:
-            raise Usage(msg)
-
-        for o, a in opts:
-            if o in ("-h", "--help"):
-                print __doc__
-                return 0
-            elif o in ("-t", "--target"):
-                test_executor.filename = a
-            elif o in ("-t", "--target"):
-                report = a
-            elif o in ("-v", "--verbose"):
-                verbose = True
-            elif o in ("-d", "--debug"):
-                debug = True
-            elif o in ("-s", "--nossd"):
-                test_executor.precondition = False
-            elif o in ("-c", "--nowarm"):
-                test_executor.warm = False
-            elif o in ("-w", "--workload"):
-                workloads = a.split(",")
-            elif o in ("-r", "--report"):
-                report = a
-
-        if (debug):
-            logging.getLogger().setLevel(logging.DEBUG)
-
-        test_executor.register_workloads(workloads)
-
-    except Usage, err:
-        print >> sys.stderr, err.msg
-        print >> sys.stderr, "for help use --help"
-        return 2
-    except UnknownWorkload, err:
-        print >> sys.stderr, err.msg
-        print >> sys.stderr, "for help use --help"
-        return 2
-
-    if (verbose):
-        test_executor.register(event)
-
-    if (report is not None):
-        print test_executor.fetch_results(report, workloads)
-    else:
-        test_executor.execute()
-
-if __name__ == "__main__":
-    sys.exit(main())
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+  agent_network:
+    type: string
+    constraints:
+        - custom_constraint: neutron.network
+  flavor:
+    type: string
+    default: "StorPerf Agent"
+  key_name:
+    type: string
+    default: StorPerf
+  volume_size:
+    type: number
+    description: Size of the volume to be created.
+    default: 1
+    constraints:
+      - range: { min: 1, max: 1024 }
+        description: must be between 1 and 1024 Gb.
+  agent_count:
+    type: number
+    default: 1
+    constraints:
+      - range: { min: 1, max: 512 }
+        description: must be between 1 and 512 agents.
+
+
+resources:
+  slaves:
+    type: OS::Heat::ResourceGroup
+    properties:
+      count: {get_param: agent_count}
+      resource_def: {
+        type: "storperf-agent.yaml",
+        properties: {
+          agent_network: {get_param: agent_network},
+          flavor: {get_param: flavor},
+          key_name: {get_param: key_name},
+          volume_size: {get_param: volume_size}
+        }
+      }
+
+outputs:
+  slave_ips: {
+      description: "Slave addresses",
+      value: { get_attr: [ slaves, storperf_agent_ip] }
+  }
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+  flavor:
+    type: string
+    default: m1.small
+  image:
+    type: string
+    default: 'StorPerf Agent'
+  key_name:
+    type: string
+    default: StorPerf
+  username:
+    type: string
+    default: storperf
+  volume_size:
+    type: number
+    description: Size of the volume to be created.
+    default: 1
+    constraints:
+      - range: { min: 1, max: 1024 }
+        description: must be between 1 and 1024 Gb.
+  agent_network:
+    type: string
+    constraints:
+        - custom_constraint: neutron.network
+
+resources:
+
+  storperf_agent:
+    type: "OS::Nova::Server"
+    properties:
+      name: storperf-agent
+      image: { get_param: image }
+      flavor: { get_param: flavor }
+      key_name: { get_param: key_name }
+      networks:
+        - port: { get_resource: storperf_agent_port }
+      user_data: { get_resource: storperf_agent_config }
+      user_data_format: RAW
+
+  storperf_agent_config:
+    type: "OS::Heat::CloudConfig"
+    properties:
+      cloud_config:
+        users:
+        - name: { get_param: username }
+          groups: users
+          shell: /bin/bash
+          sudo: "ALL=(ALL) NOPASSWD:ALL"
+          ssh_authorized_keys:
+          - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07
+        - default
+        package_update: false
+        package_upgrade: false
+        manage_etc_hosts: localhost
+
+  storperf_agent_port:
+    type: "OS::Neutron::Port"
+    properties:
+      network_id: { get_param: agent_network }
+      security_groups:
+        - { get_resource: storperf_security_group }
+
+  storperf_security_group:
+    type: OS::Neutron::SecurityGroup
+    properties:
+      description: Neutron security group rules
+      name: StorPerf-Security-Group
+      rules:
+      - remote_ip_prefix: 0.0.0.0/0
+        protocol: tcp
+        direction: ingress
+      - remote_ip_prefix: 0.0.0.0/0
+        protocol: icmp
+        direction: ingress
+
+  agent_volume:
+    type: OS::Cinder::Volume
+    properties:
+      size: { get_param: volume_size }
+
+  agent_volume_att:
+    type: OS::Cinder::VolumeAttachment
+    properties:
+      instance_uuid: { get_resource: storperf_agent }
+      volume_id: { get_resource: agent_volume}
+
+outputs:
+  storperf_agent_ip:
+    description: The IP address of the agent on the StorPerf network
+    value: { get_attr: [ storperf_agent, first_address ] }
\ No newline at end of file
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from threading import Thread
+import logging
+import os
+import subprocess
+
+from db.configuration_db import ConfigurationDB
+from test_executor import TestExecutor
+import cinderclient.v2 as cinderclient
+import heatclient.client as heatclient
+import keystoneclient.v2_0 as ksclient
+
+
+class ParameterError(Exception):
+    """ """
+
+
+class StorPerfMaster(object):
+
+    def __init__(self):
+        self.logger = logging.getLogger(__name__)
+
+        self.configuration_db = ConfigurationDB()
+
+        template_file = open("storperf/resources/hot/agent-group.yaml")
+        self._agent_group_hot = template_file.read()
+        template_file = open("storperf/resources/hot/storperf-agent.yaml")
+        self._agent_resource_hot = template_file.read()
+        self._hot_files = {
+            'storperf-agent.yaml': self._agent_resource_hot
+        }
+        self.logger.debug(
+            "Loaded agent-group template as: " + self._agent_group_hot)
+        self.logger.debug(
+            "Loaded agent-resource template as: " + self._agent_resource_hot)
+
+        self._username = os.environ.get('OS_USERNAME')
+        self._password = os.environ.get('OS_PASSWORD')
+        self._tenant_name = os.environ.get('OS_TENANT_NAME')
+        self._project_name = os.environ.get('OS_PROJECT_NAME')
+        self._auth_url = os.environ.get('OS_AUTH_URL')
+
+        self._cinder_client = None
+        self._heat_client = None
+        self._test_executor = TestExecutor()
+
+    @property
+    def volume_size(self):
+        value = self.configuration_db.get_configuration_value(
+            'stack',
+            'volume_size')
+        if (value is None):
+            self.volume_size = 1
+            value = 1
+        return int(value)
+
+    @volume_size.setter
+    def volume_size(self, value):
+        if (self.stack_id is not None):
+            raise ParameterError(
+                "ERROR: Cannot change volume size after stack is created")
+
+        self.configuration_db.set_configuration_value(
+            'stack',
+            'volume_size',
+            value)
+
+    @property
+    def agent_count(self):
+        value = self.configuration_db.get_configuration_value(
+            'stack',
+            'agent_count')
+
+        if (value is None):
+            self.agent_count = 1
+            value = 1
+        return int(value)
+
+    @agent_count.setter
+    def agent_count(self, value):
+        if (self.stack_id is not None):
+            raise ParameterError(
+                "ERROR: Cannot change agent count after stack is created")
+
+        self.configuration_db.set_configuration_value(
+            'stack',
+            'agent_count',
+            value)
+
+    @property
+    def agent_network(self):
+        return self.configuration_db.get_configuration_value(
+            'stack',
+            'agent_network')
+
+    @agent_network.setter
+    def agent_network(self, value):
+        if (self.stack_id is not None):
+            raise ParameterError(
+                "ERROR: Cannot change agent network after stack is created")
+
+        self.configuration_db.set_configuration_value(
+            'stack',
+            'agent_network',
+            value)
+
+    @property
+    def stack_id(self):
+        return self.configuration_db.get_configuration_value(
+            'stack',
+            'stack_id')
+
+    @stack_id.setter
+    def stack_id(self, value):
+        self.configuration_db.set_configuration_value(
+            'stack',
+            'stack_id',
+            value)
+
+    @property
+    def volume_quota(self):
+        self._attach_to_openstack()
+        quotas = self._cinder_client.quotas.get(self._tenant_name)
+        return int(quotas.volumes)
+
+    @property
+    def filename(self):
+        return self._test_executor.filename
+
+    @filename.setter
+    def filename(self, value):
+        self._test_executor.filename = value
+
+    @property
+    def precondition(self):
+        return self._test_executor.precondition
+
+    @precondition.setter
+    def precondition(self, value):
+        self._test_executor.precondition = value
+
+    @property
+    def warm_up(self):
+        return self._test_executor.warm
+
+    @warm_up.setter
+    def warm_up(self, value):
+        self._test_executor.warm = value
+
+    @property
+    def is_stack_created(self):
+        if (self.stack_id is not None):
+            self._attach_to_openstack()
+
+            stack = self._heat_client.stacks.get(self.stack_id)
+            status = getattr(stack, 'stack_status')
+
+            self.logger.info("Status=" + status)
+            if (status == u'CREATE_COMPLETE'):
+                return True
+
+        return False
+
+    @property
+    def workloads(self):
+        return self.configuration_db.get_configuration_value(
+            'workload',
+            'workloads')
+
+    @workloads.setter
+    def workloads(self, value):
+        self._test_executor.register_workloads(value)
+
+        self.configuration_db.set_configuration_value(
+            'workload',
+            'workloads',
+            str(self._test_executor.workload_modules))
+
+    def create_stack(self):
+        if (self.stack_id is not None):
+            raise ParameterError("ERROR: Stack has already been created")
+
+        self._attach_to_openstack()
+        if (self.agent_count > self.volume_quota):
+            message = "ERROR: Volume quota too low: " + \
+                str(self.agent_count) + " > " + str(self.volume_quota)
+            raise ParameterError(message)
+
+        stack = self._heat_client.stacks.create(
+            stack_name="StorPerfAgentGroup",
+            template=self._agent_group_hot,
+            files=self._hot_files,
+            parameters=self._make_parameters())
+
+        self.stack_id = stack['stack']['id']
+        pass
+
+    def validate_stack(self):
+        self._attach_to_openstack()
+        if (self.agent_count > self.volume_quota):
+            message = "ERROR: Volume quota too low: " + \
+                str(self.agent_count) + " > " + str(self.volume_quota)
+            self.logger.error(message)
+            raise ParameterError(message)
+
+        self._heat_client.stacks.preview(
+            stack_name="StorPerfAgentGroup",
+            template=self._agent_group_hot,
+            files=self._hot_files,
+            parameters=self._make_parameters())
+        return True
+
+    def wait_for_stack_creation(self):
+
+        pass
+
+    def delete_stack(self):
+        if (self.stack_id is None):
+            raise ParameterError("ERROR: Stack does not exist")
+
+        self._attach_to_openstack()
+
+        self._heat_client.stacks.delete(stack_id=self.stack_id)
+        self.stack_id = None
+
+        pass
+
+    def execute_workloads(self):
+
+        if (self.stack_id is None):
+            raise ParameterError("ERROR: Stack does not exist")
+
+        self._attach_to_openstack()
+
+        stack = self._heat_client.stacks.get(self.stack_id)
+        outputs = getattr(stack, 'outputs')
+        slaves = outputs[0]['output_value']
+
+        setup_threads = []
+
+        for slave in slaves:
+            t = Thread(target=self._setup_slave, args=(slave,))
+            setup_threads.append(t)
+            t.start()
+
+        for thread in setup_threads:
+            thread.join()
+
+        self._test_executor.slaves = slaves
+        return self._test_executor.execute()
+
+    def _setup_slave(self, slave):
+        logger = logging.getLogger(__name__ + ":" + slave)
+
+        logger.info("Initializing slave at " + slave)
+
+        args = ['scp', '-o', 'StrictHostKeyChecking=no',
+                '-i', 'storperf/resources/ssh/storperf_rsa',
+                '/lib/x86_64-linux-gnu/libaio.so.1',
+                'ubuntu@' + slave + ":"]
+
+        logger.debug(args)
+        proc = subprocess.Popen(args,
+                                universal_newlines=True,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE)
+
+        (stdout, stderr) = proc.communicate()
+        if (len(stdout) > 0):
+            logger.debug(stdout.decode('utf-8').strip())
+        if (len(stderr) > 0):
+            logger.error(stderr.decode('utf-8').strip())
+
+        args = ['scp', '-o', 'StrictHostKeyChecking=no',
+                '-i', 'storperf/resources/ssh/storperf_rsa',
+                '/usr/local/bin/fio',
+                'ubuntu@' + slave + ":"]
+
+        logger.debug(args)
+        proc = subprocess.Popen(args,
+                                universal_newlines=True,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE)
+
+        (stdout, stderr) = proc.communicate()
+        if (len(stdout) > 0):
+            logger.debug(stdout.decode('utf-8').strip())
+        if (len(stderr) > 0):
+            logger.error(stderr.decode('utf-8').strip())
+
+        args = ['ssh', '-o', 'StrictHostKeyChecking=no',
+                '-i', 'storperf/resources/ssh/storperf_rsa',
+                'ubuntu@' + slave,
+                'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+                ]
+
+        logger.debug(args)
+        proc = subprocess.Popen(args,
+                                universal_newlines=True,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE)
+
+        (stdout, stderr) = proc.communicate()
+        if (len(stdout) > 0):
+            logger.debug(stdout.decode('utf-8').strip())
+        if (len(stderr) > 0):
+            logger.error(stderr.decode('utf-8').strip())
+
+    def _make_parameters(self):
+        heat_parameters = {}
+        heat_parameters['agent_network'] = self.agent_network
+        heat_parameters['agent_count'] = self.agent_count
+        heat_parameters['volume_size'] = self.volume_size
+        return heat_parameters
+
+    def _attach_to_openstack(self):
+
+        if (self._cinder_client is None):
+            self._cinder_client = cinderclient.Client(
+                self._username, self._password, self._project_name,
+                self._auth_url, service_type='volumev2')
+            self._cinder_client.authenticate()
+
+        if (self._heat_client is None):
+            self._keystone_client = ksclient.Client(
+                auth_url=self._auth_url,
+                username=self._username,
+                password=self._password,
+                tenant_name=self._tenant_name)
+            heat_endpoint = self._keystone_client.service_catalog.url_for(
+                service_type='orchestration')
+            self._heat_client = heatclient.Client(
+                '1', endpoint=heat_endpoint,
+                token=self._keystone_client.auth_token)
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
+from os import listdir
+from os.path import isfile, join
+from storperf.carbon.converter import JSONToCarbon
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db.job_db import JobDB
+from storperf.fio.fio_invoker import FIOInvoker
+from threading import Thread
 import imp
 import logging
-from os import listdir
 import os
-from os.path import isfile, join
-import socket
-
-from carbon.converter import JSONToCarbon
-from carbon.emitter import CarbonMetricTransmitter
-from db.job_db import JobDB
-from fio.fio_invoker import FIOInvoker
 
 
 class UnknownWorkload(Exception):
-
-    def __init__(self, msg):
-        self.msg = msg
+    pass
 
 
 class TestExecutor(object):
     def __init__(self):
         self.logger = logging.getLogger(__name__)
         self.workload_modules = []
-        self.filename = "storperf.dat"
+        self.filename = None
         self.precondition = True
         self.warm = True
         self.event_listeners = set()
         self.metrics_emitter = CarbonMetricTransmitter()
         self.prefix = None
         self.job_db = JobDB()
+        self._slaves = []
+
+    @property
+    def slaves(self):
+        return self._slaves
+
+    @slaves.setter
+    def slaves(self, slaves):
+        self.logger.debug("Set slaves to: " + str(slaves))
+        self._slaves = slaves
 
     def register(self, event_listener):
         self.event_listeners.add(event_listener)
     def unregister(self, event_listener):
         self.event_listeners.discard(event_listener)
 
-    def event(self, metric):
+    def event(self, callback_id, metric):
         carbon_metrics = self.metrics_converter.convert_to_dictionary(
             metric,
-            self.prefix)
+            callback_id)
 
-        read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
-        write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
-        read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
-        write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"]
+        read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
+        write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
+        read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
+        write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
 
         message = "Average Latency us Read/Write: " + read_latency \
             + "/" + write_latency + " IOPS r/w: " + \
             workloads = []
 
             for filename in workload_files:
-                mname, ext = os.path.splitext(filename)
+                mname, _ = os.path.splitext(filename)
                 if (not mname.startswith('_')):
                     workloads.append(mname)
+        else:
+            workloads = workloads.split(',')
 
         if (self.warm is True):
             workloads.insert(0, "_warm_up")
                                                       workload + ".py")
                 self.logger.debug("Found: " + str(workload_module))
                 if(workload_module is None):
-                    raise UnknownWorkload("Unknown workload: " + workload)
+                    raise UnknownWorkload(
+                        "ERROR: Unknown workload: " + workload)
                 self.workload_modules.append(workload_module)
             except ImportError, err:
-                raise UnknownWorkload(err)
+                raise UnknownWorkload("ERROR: " + str(err))
 
     def load_from_file(self, uri):
         uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri))
         path, fname = os.path.split(uri)
-        mname, ext = os.path.splitext(fname)
+        mname, _ = os.path.splitext(fname)
         no_ext = os.path.join(path, mname)
         self.logger.debug("Looking for: " + no_ext)
         if os.path.exists(no_ext + '.pyc'):
 
     def execute(self):
 
-        shortname = socket.getfqdn().split('.')[0]
+        self.job_db.create_job_id()
+        for slave in self.slaves:
+            t = Thread(target=self.execute_on_node, args=(slave,))
+            t.daemon = False
+            t.start()
+
+        return self.job_db.job_id
+
+    def execute_on_node(self, remote_host):
+
+        logger = logging.getLogger(__name__ + ":" + remote_host)
 
         invoker = FIOInvoker()
+        invoker.remote_host = remote_host
         invoker.register(self.event)
-        self.job_db.create_job_id()
-        self.logger.info("Starting job " + self.job_db.job_id)
+
+        logger.info(
+            "Starting job " + self.job_db.job_id + " on " + remote_host)
 
         for workload_module in self.workload_modules:
 
             workload_name = getattr(workload_module, "__name__")
             constructorMethod = getattr(workload_module, workload_name)
-            self.logger.debug(
+            logger.debug(
                 "Found workload: " + str(constructorMethod))
             workload = constructorMethod()
-            workload.filename = self.filename
+            if (self.filename is not None):
+                workload.filename = self.filename
             workload.invoker = invoker
 
             if (workload_name.startswith("_")):
                 for iodepth in iodepths:
 
                     full_workload_name = workload_name + \
+                        ".host." + remote_host + \
                         ".queue-depth." + str(iodepth) + \
                         ".block-size." + str(blocksize)
 
                     self.logger.info(
                         "Executing workload: " + full_workload_name)
 
-                    self.prefix = shortname + "." + self.job_db.job_id + \
+                    invoker.callback_id = self.job_db.job_id + \
                         "." + full_workload_name
 
                     self.job_db.start_workload(full_workload_name)
                     workload.execute()
                     self.job_db.end_workload(full_workload_name)
 
-        self.logger.info("Finished job " + self.job_db.job_id)
+        logger.info(
+            "Finished job " + self.job_db.job_id + " on " + remote_host)
 
     def fetch_results(self, job, workload_name=""):
         self.job_db.job_id = job
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import unittest
-
-import json
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from time import sleep
 import SocketServer
+import json
 import threading
-from time import sleep
-
-from carbon import converter
-from carbon.emitter import CarbonMetricTransmitter
+import unittest
 
 
 class MetricsHandler(SocketServer.BaseRequestHandler):
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import unittest
+from storperf.carbon.converter import JSONToCarbon
 import json
-
-from carbon.converter import JSONToCarbon
+import unittest
 
 
 class JSONToCarbonTest(unittest.TestCase):
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.db.configuration_db import ConfigurationDB
+import os
+import unittest
+
+
+class ConfigurationDBTest(unittest.TestCase):
+
+    def setUp(self):
+        ConfigurationDB.db_name = __name__ + ".db"
+        try:
+            os.remove(ConfigurationDB.db_name)
+        except OSError:
+            pass
+
+        self.config_db = ConfigurationDB()
+
+    def test_create_key(self):
+        expected = "ABCDE-12345"
+
+        self.config_db.set_configuration_value(
+            "test", "key", expected)
+
+        actual = self.config_db.get_configuration_value(
+            "test", "key")
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    def test_update_key(self):
+        expected = "ABCDE-12345"
+
+        self.config_db.set_configuration_value(
+            "test", "key", "initial_value")
+
+        self.config_db.set_configuration_value(
+            "test", "key", expected)
+
+        actual = self.config_db.get_configuration_value(
+            "test", "key")
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    def test_deleted_key(self):
+        expected = None
+
+        self.config_db.set_configuration_value(
+            "test", "key", "initial_value")
+
+        self.config_db.delete_configuration_value(
+            "test", "key")
+
+        actual = self.config_db.get_configuration_value(
+            "test", "key")
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
+from storperf.db.job_db import JobDB
+import os
+import sqlite3
 import unittest
 
 import mock
 
-from db.job_db import JobDB
-
 
 class JobDBTest(unittest.TestCase):
 
     def setUp(self):
-        JobDB.db_name = ":memory:"
+
+        JobDB.db_name = __name__ + '.db'
+        try:
+            os.remove(JobDB.db_name)
+        except OSError:
+            pass
         self.job = JobDB()
 
     @mock.patch("uuid.uuid4")
         mock_uuid.side_effect = (job_id,)
         workload_name = "Workload"
 
-        cursor = self.job.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
 
         row = cursor.execute(
             """select * from jobs
         self.job.start_workload(workload_name)
         self.job.end_workload(workload_name)
 
-        cursor = self.job.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
         cursor.execute(
             """select job_id, workload, start, end from jobs
                        where job_id = ?
         mock_uuid.side_effect = (job_id,)
         workload_name = "Workload"
 
-        cursor = self.job.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
 
         self.job.start_workload(workload_name)
         self.job.start_workload(workload_name)
 
         self.job.end_workload(workload_name)
 
-        cursor = self.job.db.cursor()
+        db = sqlite3.connect(JobDB.db_name)
+        cursor = db.cursor()
         cursor.execute(
             """select job_id, workload, start, end from jobs
                        where job_id = ?
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.storperf_master import StorPerfMaster
+import os
+import unittest
+
+
+class StorPerfMasterTest(unittest.TestCase):
+
+    def setUp(self):
+        ConfigurationDB.db_name = __name__ + ".db"
+        try:
+            os.remove(ConfigurationDB.db_name)
+        except OSError:
+            pass
+        self.storperf = StorPerfMaster()
+
+    def test_agent_count(self):
+        expected = 10
+
+        self.storperf.agent_count = expected
+        actual = self.storperf.agent_count
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    def test_volume_size(self):
+        expected = 20
+
+        self.storperf.volume_size = expected
+        actual = self.storperf.volume_size
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
+
+    def test_agent_network(self):
+        expected = "ABCDEF"
+
+        self.storperf.agent_network = expected
+        actual = self.storperf.agent_network
+
+        self.assertEqual(
+            expected, actual, "Did not expect: " + str(actual))
 
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class _ssd_preconditioning(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class _warm_up(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class rr(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class rs(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class rw(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class wr(_base_workload._base_workload):
 
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
 
 
 class ws(_base_workload._base_workload):