args:
                 ARCH: ${ARCH}
         ports:
-            - "8000:8000"
+            - "8001:8000"
         env_file: ${ENV_FILE}
         volumes:
             - ${CARBON_DIR}:/opt/graphite/storage/whisper
             - ./storperf-master/:/storperf
+        links:
+            - storperf-graphite
 
     storperf-reporting:
         container_name: "storperf-reporting"
             args:
                 ARCH: ${ARCH}
 
+    storperf-graphite:
+        container_name: "storperf-graphite"
+        build: storperf-graphite
+        ports:
+            - "8080:8080"
+
     storperf-httpfrontend:
         container_name: "storperf-httpfrontend"
         build:
         links:
             - storperf-master
             - storperf-reporting
-            - storperf-swaggerui
\ No newline at end of file
+            - storperf-swaggerui
+            - storperf-graphite
 
--- /dev/null
+##############################################################################
+# Copyright (c) 2017 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+#   Docker container for StorPerf HTTP Graphite
+#
+# Build:
+#    $ docker build -t opnfv/storperf-graphite:tag .
+##
+
+# From https://github.com/SchweizerischeBundesbahnen/docker-graphite
+
+ARG ARCH=x86_64
+ARG ALPINE_VERSION=v3.5
+FROM multiarch/alpine:$ARCH-$ALPINE_VERSION
+
+# Install basic stuff =)
+RUN apk add --no-cache \
+  bash \
+  ca-certificates \
+  nginx \
+  openssl \
+  py2-pip \
+  supervisor \
+  tini \
+  && pip install \
+  supervisor-stdout \
+  gunicorn
+
+# Install graphite
+ENV GRAPHITE_ROOT /opt/graphite
+
+RUN apk add --no-cache \
+  alpine-sdk \
+  fontconfig \
+  libffi \
+  libffi-dev \
+  python-dev \
+  py-cairo \
+  && export PYTHONPATH="/opt/graphite/lib/:/opt/graphite/webapp/" \
+  && pip install https://github.com/graphite-project/whisper/tarball/master \
+  && pip install https://github.com/graphite-project/carbon/tarball/master \
+  && pip install https://github.com/graphite-project/graphite-web/tarball/master \
+  && apk del \
+  alpine-sdk \
+  python-dev \
+  libffi-dev
+
+EXPOSE 8080
+EXPOSE 2003
+EXPOSE 2004
+EXPOSE 7002
+
+VOLUME ["/opt/graphite/conf", "/opt/graphite/storage"]
+
+COPY run.sh /run.sh
+COPY etc/ /etc/
+COPY conf/ /opt/graphite/conf.example/
+
+# Enable tiny init
+ENTRYPOINT ["/sbin/tini", "--"]
+CMD ["/bin/bash", "/run.sh"]
 
 [cache]
-LOCAL_DATA_DIR = /var/lib/graphite/storage/whisper/
+LOCAL_DATA_DIR = /opt/graphite/storage/whisper/
 
 # Specify the user to drop privileges to
 # If this is blank carbon runs as the user that invokes it
 # This user must have write access to the local data directory
-USER = 
+USER =
 
 # Limit the size of the cache to avoid swapping or becoming CPU bound.
 # Sorts and serving cache queries gets more expensive as the cache grows.
 LINE_RECEIVER_INTERFACE = 0.0.0.0
 LINE_RECEIVER_PORT = 2003
 
-ENABLE_UDP_LISTENER = True
-UDP_RECEIVER_INTERFACE = 0.0.0.0
-UDP_RECEIVER_PORT = 2003
-
 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
 PICKLE_RECEIVER_PORT = 2004
 
 CACHE_QUERY_INTERFACE = 0.0.0.0
 CACHE_QUERY_PORT = 7002
 
+# By default, carbon-cache will log every whisper update and cache hit. This can be excessive and
+# degrade performance if logging on the same volume as the whisper data is stored.
 LOG_UPDATES = False
+LOG_CACHE_HITS = False
+ENABLE_LOGROTATION = True
+LOG_LISTENER_CONNECTIONS = False
 
 # Enable AMQP if you want to receve metrics using an amqp broker
 # ENABLE_AMQP = False
 # NOTE: you cannot run both a cache and a relay on the same server
 # with the default configuration, you have to specify a distinict
 # interfaces and ports for the listeners.
-
-[relay]
-LINE_RECEIVER_INTERFACE = 0.0.0.0
-LINE_RECEIVER_PORT = 2003
-
-PICKLE_RECEIVER_INTERFACE = 0.0.0.0
-PICKLE_RECEIVER_PORT = 2004
-
-CACHE_SERVERS = server1, server2, server3
-MAX_QUEUE_SIZE = 10000
 
--- /dev/null
+# flake8: noqa
+# Edit this file to override the default graphite settings, do not edit settings.py
+
+# Turn on debugging and restart apache if you ever see an "Internal Server Error" page
+# DEBUG = True
+
+# Set your local timezone (django will try to figure this out automatically)
+TIME_ZONE = 'Europe/Zurich'
+
+# Secret key for django
+SECRET_KEY = '%%SECRET_KEY%%'
+
+URL_PREFIX = "/graphite/"
 
--- /dev/null
+[min]
+pattern = \.lower$
+xFilesFactor = 0.1
+aggregationMethod = min
+
+[max]
+pattern = \.upper(_\d+)?$
+xFilesFactor = 0.1
+aggregationMethod = max
+
+[sum]
+pattern = \.sum$
+xFilesFactor = 0
+aggregationMethod = sum
+
+[count]
+pattern = \.count$
+xFilesFactor = 0
+aggregationMethod = sum
+
+[count_legacy]
+pattern = ^stats_counts.*
+xFilesFactor = 0
+aggregationMethod = sum
+
+[default_average]
+pattern = .*
+xFilesFactor = 0.3
+aggregationMethod = average
 
--- /dev/null
+[default]
+pattern = .*
+retentions = 60s:14d
 
--- /dev/null
+server {
+  listen      8080;
+  server_name graphite;
+  charset     utf-8;
+  # Django admin media.
+  location /graphite/static/admin/ {
+    alias /usr/lib/python2.7/site-packages/django/contrib/admin/static/admin/;
+  }
+
+  # Your project's static media.
+  location /graphite/static/ {
+    alias /opt/graphite/webapp/content/;
+  }
+
+  # Finally, send all non-media requests to the Django server.
+  location / {
+    proxy_pass                 http://127.0.0.1:8000;
+    proxy_set_header           X-Real-IP   $remote_addr;
+    proxy_set_header           X-Forwarded-For  $proxy_add_x_forwarded_for;
+    proxy_set_header           X-Forwarded-Proto  $scheme;
+    proxy_set_header           X-Forwarded-Server  $host;
+    proxy_set_header           X-Forwarded-Host  $host;
+    proxy_set_header           Host  $host;
+
+    client_max_body_size       10m;
+    client_body_buffer_size    128k;
+
+    proxy_connect_timeout      90;
+    proxy_send_timeout         90;
+    proxy_read_timeout         90;
+
+    proxy_buffer_size          4k;
+    proxy_buffers              4 32k;
+    proxy_busy_buffers_size    64k;
+    proxy_temp_file_write_size 64k;
+  }
+}
 
--- /dev/null
+worker_processes  1;
+pid /var/run/nginx.pid;
+daemon off;
+
+events {
+  worker_connections 1024;
+  use epoll;
+}
+
+http {
+  include       mime.types;
+  default_type  application/octet-stream;
+
+  sendfile        on;
+  keepalive_timeout  65;
+
+  gzip  on;
+
+  include /etc/nginx/conf.d/*;
+}
 
--- /dev/null
+[program:carbon-cache]
+autostart = true
+autorestart = true
+stdout_events_enabled = true
+stderr_events_enabled = true
+stdout_logfile_maxbytes = 1MB
+stdout_logfile_backups = 0
+stderr_logfile_maxbytes = 1MB
+stderr_logfile_backups = 0
+
+command = /opt/graphite/bin/carbon-cache.py --pidfile /var/run/carbon-cache-a.pid --debug start
 
--- /dev/null
+[program:graphite-webapp]
+autostart = true
+autorestart = true
+stdout_events_enabled = true
+stderr_events_enabled = true
+stdout_logfile_maxbytes = 1MB
+stdout_logfile_backups = 0
+stderr_logfile_maxbytes = 1MB
+stderr_logfile_backups = 0
+
+directory = /opt/graphite/webapp
+environment = PYTHONPATH='/opt/graphite/webapp'
+command = /usr/bin/gunicorn -b127.0.0.1:8000 -w2 graphite.wsgi
 
--- /dev/null
+[program:nginx]
+autostart = true
+autorestart = true
+stdout_events_enabled = true
+stderr_events_enabled = true
+stdout_logfile_maxbytes = 1MB
+stdout_logfile_backups = 0
+stderr_logfile_maxbytes = 1MB
+stderr_logfile_backups = 0
+
+command = /usr/sbin/nginx -c /etc/nginx/nginx.conf
 
--- /dev/null
+[unix_http_server]
+file=/run/supervisord.sock
+
+[supervisord]
+user = root
+nodaemon = true
+logfile_maxbytes = 10MB
+logfile_backups = 0
+pidfile = /tmp/supervisord.pid
+logfile = /tmp/supervisord.log
+environment = GRAPHITE_STORAGE_DIR='/opt/graphite/storage',GRAPHITE_CONF_DIR='/opt/graphite/conf'
+
+[rpcinterface:supervisor]
+supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
+
+[supervisorctl]
+serverurl=unix:///run/supervisord.sock
+
+[eventlistener:stdout]
+command = supervisor_stdout
+buffer_size = 100
+events = PROCESS_LOG
+result_handler = supervisor_stdout:event_handler
+
+[include]
+files = /etc/supervisor.d/*.ini
 
--- /dev/null
+#!/bin/bash
+
+whisper_dir="/opt/graphite/storage/whisper/"
+webapp_dir="/opt/graphite/storage/log/webapp/"
+
+cd /opt/graphite
+
+if [ -d $whisper_dir ]; then
+    echo "Whisper directory already exists"
+else
+    echo "...creating missing whisper dir"
+    mkdir -p $whisper_dir
+fi
+
+if [ -d $webapp_dir ]; then
+    echo "Webapp directory already exists"
+else
+    echo "...creating missing webapp dir"
+    mkdir -p $webapp_dir
+fi
+
+if [ ! -f /opt/graphite/conf/local_settings.py ]; then
+  echo "Creating default config for graphite-web..."
+  cp /opt/graphite/conf.example/local_settings.py /opt/graphite/conf/local_settings.py
+  RANDOM_STRING=$(python -c 'import random; import string; print "".join([random.SystemRandom().choice(string.digits + string.letters) for i in range(100)])')
+  sed "s/%%SECRET_KEY%%/${RANDOM_STRING}/" -i /opt/graphite/conf/local_settings.py
+fi
+
+if [ ! -L /opt/graphite/webapp/graphite/local_settings.py ]; then
+  echo "Creating symbolic link for local_settings.py in graphite-web..."
+  ln -s /opt/graphite/conf/local_settings.py /opt/graphite/webapp/graphite/local_settings.py
+fi
+
+sed "s/%%CLUSTER_SERVERS%%/${CLUSTER_SERVERS}/" -i /opt/graphite/conf/local_settings.py
+
+if [ ! -f /opt/graphite/conf/carbon.conf ]; then
+  echo "Creating default config for carbon..."
+  cp /opt/graphite/conf.example/carbon.conf /opt/graphite/conf/carbon.conf
+fi
+
+if [ ! -f /opt/graphite/conf/storage-schemas.conf ]; then
+  echo "Creating default storage schema for carbon..."
+  cp /opt/graphite/conf.example/storage-schemas.conf /opt/graphite/conf/storage-schemas.conf
+fi
+
+if [ ! -f /opt/graphite/conf/storage-aggregation.conf ]; then
+  echo "Creating default storage schema for carbon..."
+  cp /opt/graphite/conf.example/storage-aggregation.conf /opt/graphite/conf/storage-aggregation.conf
+fi
+
+if [ ! -f /opt/graphite/storage/graphite.db ]; then
+  echo "Creating database..."
+  PYTHONPATH=$GRAPHITE_ROOT/webapp django-admin.py migrate --settings=graphite.settings --run-syncdb --noinput
+  chown nginx:nginx /opt/graphite/storage/graphite.db
+  # Auto-magical create an django user with default login
+  script="from django.contrib.auth.models import User;
+
+username = 'admin';
+password = 'admin';
+email = 'admin@example.com';
+
+if User.objects.filter(username=username).count()==0:
+    User.objects.create_superuser(username, email, password);
+    print('Superuser created.');
+else:
+    print('Superuser creation skipped.');
+
+"
+  printf "$script" | PYTHONPATH=$GRAPHITE_ROOT/webapp django-admin.py shell --settings=graphite.settings
+fi
+
+exec supervisord -c /etc/supervisord.conf
 
         }
 
         location /graphite/ {
-            proxy_pass http://storperf-master:8000;
+            proxy_pass http://storperf-graphite:8080;
             proxy_set_header Host $host:$proxy_port;
         }
 
 
 
 EXPOSE 5000
 
-# Install Graphite
-# Everything from here down will be removed once Graphite/Carbon gets broken
-# out into its own container.
-
-RUN apk --no-cache add --update \
-    python \
-    py-pip \
-    python-dev \
-    alpine-sdk \
-    py-tz \
-    nginx \
-    cairo \
-    supervisor
-
-RUN deluser xfs
-
-RUN pip install \
-    gunicorn==17.5 \
-    Django==1.6.11 \
-    django-tagging==0.3.1 \
-    cairocffi \
-    constants \
-    zope.interface
-
-RUN adduser -S -g www-data -u 33 www-data
-
-RUN pip install whisper==0.9.15
-RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/lib" carbon==0.9.15
-RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/webapp" graphite-web==0.9.15
-
-ADD graphite/nginx.conf /etc/nginx/nginx.conf
-ADD graphite/local_settings.py /var/lib/graphite/webapp/graphite/local_settings.py
-ADD graphite/carbon.conf /var/lib/graphite/conf/carbon.conf
-ADD graphite/storage-schemas.conf /var/lib/graphite/conf/storage-schemas.conf
-RUN mkdir -p /opt/graphite/storage
-RUN ln -s /var/lib/graphite/storage/whisper /opt/graphite/storage/whisper
-RUN touch /var/lib/graphite/storage/graphite.db /var/lib/graphite/storage/index
-RUN chown -R www-data /var/lib/graphite/storage
-RUN chmod 0775 /var/lib/graphite/storage /var/lib/graphite/storage/whisper
-RUN chmod 0664 /var/lib/graphite/storage/graphite.db
-
-RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput
-ADD graphite/initial_data.json /var/lib/graphite/webapp/graphite/initial_data.json
-RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput
-
-RUN mkdir -p /var/log/supervisor
-
-COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf
-
-EXPOSE 8000
-
 # Entry point
-
-CMD ["/usr/bin/supervisord"]
+CMD [ "python", "./rest_server.py" ]
 
+++ /dev/null
-[
-  {
-    "pk": 1,
-    "model": "auth.user",
-    "fields": {
-      "username": "admin",
-      "first_name": "",
-      "last_name": "",
-      "is_active": true,
-      "is_superuser": true,
-      "is_staff": true,
-      "last_login": "2011-09-20 17:02:14",
-      "groups": [],
-      "user_permissions": [],
-      "password": "sha1$1b11b$edeb0a67a9622f1f2cfeabf9188a711f5ac7d236",
-      "email": "root@example.com",
-      "date_joined": "2011-09-20 17:02:14"
-    }
-  }
-]
 
+++ /dev/null
-TIME_ZONE = 'UTC'
 
+++ /dev/null
-daemon off;
-user www-data;
-worker_processes 1;
-pid /var/run/nginx.pid;
-
-events {
-  worker_connections 1024;
-}
-
-http {
-  sendfile on;
-  tcp_nopush on;
-  tcp_nodelay on;
-  keepalive_timeout 65;
-  types_hash_max_size 2048;
-  server_tokens off;
-
-  server_names_hash_bucket_size 32;
-
-  include /etc/nginx/mime.types;
-  default_type application/octet-stream;
-
-  access_log /var/log/nginx/access.log;
-  error_log /var/log/nginx/error.log;
-
-  gzip on;
-  gzip_disable "msie6";
-
-  server {
-    listen 8000 default_server;
-    server_name _;
-
-    open_log_file_cache max=1000 inactive=20s min_uses=2 valid=1m;
-
-    location / {
-        proxy_pass                 http://127.0.0.1:8080;
-        proxy_set_header           X-Real-IP   $remote_addr;
-        proxy_set_header           X-Forwarded-For  $proxy_add_x_forwarded_for;
-        proxy_set_header           X-Forwarded-Proto  $scheme;
-        proxy_set_header           X-Forwarded-Server  $host;
-        proxy_set_header           X-Forwarded-Host  $http_host;
-        proxy_set_header           Host  $http_host;
-
-        client_max_body_size       10m;
-        client_body_buffer_size    128k;
-
-        proxy_connect_timeout      90;
-        proxy_send_timeout         90;
-        proxy_read_timeout         90;
-
-        proxy_buffer_size          4k;
-        proxy_buffers              4 32k;
-        proxy_busy_buffers_size    64k;
-        proxy_temp_file_write_size 64k;
-    }
-
-    add_header Access-Control-Allow-Origin "*";
-    add_header Access-Control-Allow-Methods "GET, OPTIONS";
-    add_header Access-Control-Allow-Headers "origin, authorization, accept";
-
-    location /content {
-      alias /var/lib/graphite/webapp/content;
-    }
-
-    location /media {
-      alias /usr/share/pyshared/django/contrib/admin/media;
-    }
-  }
-}
 
+++ /dev/null
-[carbon]
-pattern = ^carbon\..*
-retentions = 1m:31d,10m:1y,1h:5y
-
-[default]
-pattern = .*
-retentions = 10s:8d,1m:31d,10m:1y,1h:5y
 
 import sys
 
 from flask import abort, Flask, request, jsonify
+from flask_cors import CORS
 from flask_restful import Resource, Api, fields
 from flask_restful_swagger import swagger
 
-from flask_cors import CORS
 from storperf.storperf_master import StorPerfMaster
 
+
 app = Flask(__name__, static_url_path="")
 CORS(app)
 api = swagger.docs(Api(app), apiVersion='1.0')
 
 import socket
 import time
 
+from storperf.db.graphite_db import GraphiteDB
+
 
 class CarbonMetricTransmitter():
 
-    carbon_host = '127.0.0.1'
-    carbon_port = 2003
+    carbon_servers = [('127.0.0.1', 2003),
+                      ('storperf-graphite', 2003)]
 
     def __init__(self):
         self.logger = logging.getLogger(__name__)
+        self.graphite_db = GraphiteDB()
+        self.commit_markers = {}
 
-    def transmit_metrics(self, metrics):
-        if 'timestamp' in metrics:
-            metrics.pop('timestamp')
+    def transmit_metrics(self, metrics, commit_marker):
         timestamp = str(calendar.timegm(time.gmtime()))
+        self.commit_markers[commit_marker] = int(timestamp)
+
+        carbon_socket = None
+
+        for host, port in self.carbon_servers:
+            try:
+                carbon_socket = socket.socket(socket.AF_INET,
+                                              socket.SOCK_STREAM)
+                carbon_socket.connect((host, port))
+
+                for key, value in metrics.items():
+                    try:
+                        float(value)
+                        message = "%s %s %s\n" \
+                            % (key, value, timestamp)
+                        self.logger.debug("Metric: " + message.strip())
+                        carbon_socket.send(message)
+                    except ValueError:
+                        self.logger.debug("Ignoring non numeric metric %s %s"
+                                          % (key, value))
+
+                message = "%s.commit-marker %s %s\n" \
+                    % (commit_marker, timestamp, timestamp)
+                carbon_socket.send(message)
+                self.logger.debug("Marker %s" % message.strip())
+                self.logger.info("Sent metrics to %s:%s with timestamp %s"
+                                 % (host, port, timestamp))
+
+            except Exception, e:
+                self.logger.error("While notifying carbon %s:%s %s"
+                                  % (host, port, e))
+
+            if carbon_socket is not None:
+                carbon_socket.close()
+
+    def confirm_commit(self, commit_marker):
+        marker_timestamp = self.commit_markers[commit_marker]
+        request = "%s.commit-marker&from=%s" \
+            % (commit_marker, marker_timestamp - 60)
+        marker_data = self.graphite_db.fetch_item(request)
+        self.logger.debug("Marker data %s" % marker_data)
+        fetched_timestamps = self.parse_timestamp(marker_data)
 
-        carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        carbon_socket.connect((self.carbon_host, self.carbon_port))
+        return marker_timestamp in fetched_timestamps
 
-        for key, metric in metrics.items():
-            message = key + " " + metric + " " + timestamp
-            self.logger.debug("Metric: " + message)
-            carbon_socket.send(message + '\n')
+    def parse_timestamp(self, marker_data):
+        timestamps = []
+        if (type(marker_data) is list and
+                len(marker_data) > 0):
+            datapoints = marker_data[0]['datapoints']
+            for datapoint in datapoints:
+                try:
+                    timestamps.append(int(datapoint[0]))
+                except Exception:
+                    pass
 
-        carbon_socket.close()
-        self.logger.info("Sent metrics to carbon with timestamp %s"
-                         % timestamp)
+        return timestamps
 
 
 import json
 import logging
-
 import requests
 
-from storperf.db.job_db import JobDB
-
 
 class GraphiteDB(object):
 
+    graphite_host = "storperf-graphite"
+    graphite_port = 8080
+
     def __init__(self):
-        """
-        """
-        self._job_db = JobDB()
         self.logger = logging.getLogger(__name__)
 
+    def fetch_item(self, target):
+
+        result = None
+        request = ("http://%s:%s/graphite/render/?format=json&target=%s"
+                   % (self.graphite_host, self.graphite_port, target))
+        self.logger.debug("Calling %s" % (request))
+
+        response = requests.get(request)
+        if (response.status_code == 200):
+            result = json.loads(response.content)
+
+        return result
+
     def fetch_series(self, workload, metric, io_type, time, duration):
 
         series = []
         end = time
         start = end - duration
 
-        request = ("http://127.0.0.1:8000/render/?target="
+        request = ("http://%s:%s/graphite/render/?target="
                    "averageSeries(%s.*.jobs.1.%s.%s)"
                    "&format=json"
                    "&from=%s"
-                   "&until=%s" %
-                   (workload, io_type, metric,
-                    start, end))
+                   "&until=%s"
+                   % (self.graphite_host, self.graphite_port,
+                      workload, io_type, metric,
+                      start, end))
         self.logger.debug("Calling %s" % (request))
 
         response = requests.get(request)
 
 import logging
 import os
 import socket
-from storperf.db.configuration_db import ConfigurationDB
-from storperf.db.job_db import JobDB
-from storperf.test_executor import TestExecutor
 from threading import Thread
 from time import sleep
 
 from cinderclient import client as cinderclient
-import heatclient.client as heatclient
 from keystoneauth1 import loading
 from keystoneauth1 import session
 import paramiko
 from scp import SCPClient
 
+import heatclient.client as heatclient
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.db.job_db import JobDB
+from storperf.test_executor import TestExecutor
+
 
 class ParameterError(Exception):
     """ """
             str(self._test_executor.workload_modules))
 
     def get_logs(self, lines=None):
-        LOG_DIR = '/var/log/supervisor/storperf-webapp.log'
+        LOG_DIR = './storperf.log'
 
         if isinstance(lines, int):
             logs = []
 
 import os
 from os.path import isfile, join
 import sched
+from threading import Thread
+from time import sleep
+import time
+
 from storperf.carbon.converter import Converter
 from storperf.carbon.emitter import CarbonMetricTransmitter
 from storperf.db.job_db import JobDB
 from storperf.fio.fio_invoker import FIOInvoker
 from storperf.utilities.data_handler import DataHandler
 from storperf.utilities.thread_gate import ThreadGate
-from threading import Thread
-import time
 
 
 class UnknownWorkload(Exception):
             metric,
             callback_id)
 
-        self.metrics_emitter.transmit_metrics(carbon_metrics)
+        self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id)
+
+        commit_count = 10
+        while (commit_count > 0 and
+               not self.metrics_emitter.confirm_commit(callback_id)):
+            self.logger.info("Waiting 1 more second for commit")
+            sleep(1)
+            commit_count -= 1
 
         if self._thread_gate.report(callback_id):
             self.broadcast_event()
                     if self._terminated:
                         return
                     self.current_workload = (
-                        "%s.%s.queue-depth.%s.block-size.%s" %
-                        (self.job_db.job_id,
-                         workload_name,
-                         iodepth,
-                         blocksize))
+                        "%s.%s.queue-depth.%s.block-size.%s"
+                        % (self.job_db.job_id,
+                           workload_name,
+                           iodepth,
+                           blocksize))
 
                     self.logger.info("Starting run %s" % self.current_workload)
                     self.workload_status[self.current_workload] = "Running"
                     if not scheduler.empty():
                         try:
                             scheduler.cancel(event)
-                        except:
+                        except ValueError:
                             pass
 
-                    self.logger.info("Completed run %s" %
-                                     self.current_workload)
+                    self.logger.info("Completed run %s"
+                                     % self.current_workload)
                     self.workload_status[self.current_workload] = "Completed"
                     self._workload_executors = []
                     self.current_workload = None
 
 
 import logging
 import os
+import time
+
 from storperf.db import test_results_db
 from storperf.db.graphite_db import GraphiteDB
 from storperf.db.job_db import JobDB
 from storperf.utilities import dictionary
 from storperf.utilities import math as math
 from storperf.utilities import steady_state as SteadyState
-from time import sleep
-import time
 
 
 class DataHandler(object):
         # A bit of a hack here as Carbon might not be finished storing the
         # data we just sent to it
         now = int(time.time())
-        backtime = 60 * (executor.steady_state_samples + 2)
+        backtime = 60 * (executor.steady_state_samples + 1)
         data_series = graphite_db.fetch_series(workload,
                                                metric,
                                                io_type,
                                                now,
                                                backtime)
-        most_recent_time = now
-        if len(data_series) > 0:
-            most_recent_time = data_series[-1][0]
-
-        delta = now - most_recent_time
-        self.logger.debug("Last update to graphite was %s ago" % delta)
-
-        while (delta < 5 or (delta > 60 and delta < 120)):
-            sleep(5)
-            data_series = graphite_db.fetch_series(workload,
-                                                   metric,
-                                                   io_type,
-                                                   now,
-                                                   backtime)
-            if len(data_series) > 0:
-                most_recent_time = data_series[-1][0]
-            delta = time.time() - most_recent_time
-            self.logger.debug("Last update to graphite was %s ago" % delta)
-
         return data_series
 
     def _convert_timestamps_to_samples(self, executor, series):
                                                               build_tag,
                                                               payload)
                 executor.result_url = response['href']
-            except:
-                self.logger.exception("Error pushing results into Database")
+            except Exception as e:
+                self.logger.exception("Error pushing results into Database",
+                                      e)
 
 
     @property
     def fullname(self):
-        return ("%s.%s.queue-depth.%s.block-size.%s.%s" %
-                (str(self.id),
-                 self.__class__.__name__,
-                 str(self.options['iodepth']),
-                 str(self.options['bs']),
-                 str(self.remote_host).replace(".", "-")))
+        return ("%s.%s.queue-depth.%s.block-size.%s.%s"
+                % (str(self.id),
+                   self.__class__.__name__,
+                   str(self.options['iodepth']),
+                   str(self.options['bs']),
+                   str(self.remote_host).replace(".", "-")))
 
+++ /dev/null
-[supervisord]
-nodaemon = true
-environment = GRAPHITE_STORAGE_DIR='/var/lib/graphite/storage',GRAPHITE_CONF_DIR='/var/lib/graphite/conf'
-
-[program:nginx]
-command = /usr/sbin/nginx
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:carbon-cache]
-user = www-data
-command = /var/lib/graphite/bin/carbon-cache.py --debug start
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:graphite-webapp]
-user = www-data
-directory = /var/lib/graphite/webapp
-environment = PYTHONPATH='/var/lib/graphite/webapp'
-command = /usr/bin/gunicorn_django -b127.0.0.1:8080 -w2 graphite/settings.py
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:storperf-webapp]
-user = root
-directory = /storperf/
-command = /usr/bin/python rest_server.py
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
 
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-import SocketServer
 import json
-from storperf.carbon import converter
-from storperf.carbon.emitter import CarbonMetricTransmitter
-import threading
-from time import sleep, strptime
+from time import strptime
 import unittest
 
 import mock
 
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
+
+
+addresses = []
+data = []
+connect_exception = []
+send_exception = []
+
+
+class MockSocket(object):
 
-class MetricsHandler(SocketServer.BaseRequestHandler):
+    def __init__(self, *args):
+        pass
 
-    def handle(self):
-        # Echo the back to the client
-        CarbonMetricTransmitterTest.response = self.request.recv(1024)
-        return
+    def connect(self, address):
+        if len(connect_exception) != 0:
+            raise connect_exception[0]
+        addresses.append(address)
 
+    def send(self, datum):
+        if len(send_exception) != 0:
+            raise send_exception[0]
+        data.append(datum)
 
-class MetricsServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
-    pass
+    def close(self):
+        pass
 
 
 class CarbonMetricTransmitterTest(unittest.TestCase):
     response = None
 
     def setUp(self):
+        del addresses[:]
+        del data[:]
+        del connect_exception[:]
+        del send_exception[:]
+
+    @mock.patch("socket.socket")
+    @mock.patch("time.gmtime")
+    def test_transmit_metrics(self, mock_time, mock_socket):
+
+        mock_socket.side_effect = MockSocket
+
+        mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
+
+        testconv = converter.Converter()
+        json_object = json.loads(
+            """{"timestamp" : "975542400", "key":123.0 }""")
+        result = testconv.convert_json_to_flat(json_object, "host.run-name")
 
-        address = ('localhost', 0)
-        server = MetricsServer(address, MetricsHandler)
-        ip, self.listen_port = server.server_address
+        emitter = CarbonMetricTransmitter()
+        emitter.carbon_port = self.listen_port
+        emitter.transmit_metrics(result, None)
 
-        t = threading.Thread(target=server.serve_forever)
-        t.setDaemon(True)
-        t.start()
+        self.assertEqual("host.run-name.key 123.0 975542400\n",
+                         data[1],
+                         data[1])
 
+    @mock.patch("socket.socket")
     @mock.patch("time.gmtime")
-    def test_transmit_metrics(self, mock_time):
+    def test_skip_non_numeric_metrics(self, mock_time, mock_socket):
+
+        mock_socket.side_effect = MockSocket
 
         mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
 
 
         emitter = CarbonMetricTransmitter()
         emitter.carbon_port = self.listen_port
-        emitter.transmit_metrics(result)
+        emitter.transmit_metrics(result, None)
+
+        self.assertEqual("None.commit-marker 975542400 975542400\n",
+                         data[1],
+                         data[1])
+
+    @mock.patch("socket.socket")
+    def test_connect_fails(self, mock_socket):
+
+        mock_socket.side_effect = MockSocket
+        connect_exception.append(Exception("Mock connection error"))
+
+        testconv = converter.Converter()
+        json_object = json.loads(
+            """{"timestamp" : "975542400", "key":"value" }""")
+        result = testconv.convert_json_to_flat(json_object, "host.run-name")
+
+        emitter = CarbonMetricTransmitter()
+        emitter.carbon_port = self.listen_port
+        emitter.transmit_metrics(result, None)
+
+        self.assertEqual(0,
+                         len(data),
+                         len(data))
 
-        count = 0
+    @mock.patch("socket.socket")
+    def test_send_fails(self, mock_socket):
 
-        while (CarbonMetricTransmitterTest.response is None and count < 10):
-            count += 1
-            sleep(0.1)
+        mock_socket.side_effect = MockSocket
+        send_exception.append(Exception("Mock send error"))
+
+        testconv = converter.Converter()
+        json_object = json.loads(
+            """{"timestamp" : "975542400", "key":"value" }""")
+        result = testconv.convert_json_to_flat(json_object, "host.run-name")
+
+        emitter = CarbonMetricTransmitter()
+        emitter.carbon_port = self.listen_port
+        emitter.transmit_metrics(result, None)
+
+        self.assertEqual(0,
+                         len(data),
+                         len(data))
+
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+    def test_confirm_commit(self, mock_graphite_db):
+        graphite_return = json.loads("""[
+          {"target":
+           "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+           "datapoints": [[1503078366.0, 1503078370]]}]
+           """)
+        mock_graphite_db.return_value = graphite_return
+
+        commit_marker = "commit-marker"
+
+        emitter = CarbonMetricTransmitter()
+        emitter.commit_markers[commit_marker] = 1503078366
+
+        committed = emitter.confirm_commit(commit_marker)
+        self.assertTrue(committed)
+
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+    def test_confirm_multiple_commits(self, mock_graphite_db):
+        graphite_return = json.loads("""[
+          {"target":
+           "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+           "datapoints": [
+             [1503078300.0, 1503078350],
+             [1503078366.0, 1503078360]]}]
+           """)
+        mock_graphite_db.return_value = graphite_return
+
+        commit_marker = "commit-marker"
+
+        emitter = CarbonMetricTransmitter()
+        emitter.commit_markers[commit_marker] = 1503078366
+
+        committed = emitter.confirm_commit(commit_marker)
+        self.assertTrue(committed)
+
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+    def test_empty_commit(self, mock_graphite_db):
+        graphite_return = json.loads("[]")
+        mock_graphite_db.return_value = graphite_return
+
+        commit_marker = "commit-marker"
+
+        emitter = CarbonMetricTransmitter()
+        emitter.commit_markers[commit_marker] = 1503078366
+
+        committed = emitter.confirm_commit(commit_marker)
+        self.assertFalse(committed)
+
+    @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+    def test_badtimestamp_commit(self, mock_graphite_db):
+        graphite_return = json.loads("""[
+          {"target":
+           "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+           "datapoints": [[1234, 1503078370]]}]
+           """)
+        mock_graphite_db.return_value = graphite_return
+
+        commit_marker = "commit-marker"
+
+        emitter = CarbonMetricTransmitter()
+        emitter.commit_markers[commit_marker] = 1503078366
+
+        committed = emitter.confirm_commit(commit_marker)
+        self.assertFalse(committed)
+
+    def test_timestamp_parse(self):
+        emitter = CarbonMetricTransmitter()
+        result = json.loads("""[
+          {"target":
+           "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+           "datapoints": [[1503078366.0, 1503078370]]}]
+           """)
+        timestamps = emitter.parse_timestamp(result)
+        self.assertEqual(1503078366, timestamps[0], timestamps[0])
 
-        self.assertEqual("host.run-name.key value 975542400\n",
-                         CarbonMetricTransmitterTest.response,
-                         CarbonMetricTransmitterTest.response)
 
 if __name__ == '__main__':
     unittest.main()
 
 ##############################################################################
 
 import os
-from storperf.utilities.data_handler import DataHandler
 import unittest
 
 import mock
 
+from storperf.utilities.data_handler import DataHandler
+
 
 class MockGraphiteDB(object):
 
         mock_graphite_db.return_value = expected
         mock_time.return_value = expected[-1][0] + 10
 
-        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
-                                 ("job_id",
-                                  "rw",
-                                  8,
-                                  8192))
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+                                 % ("job_id",
+                                    "rw",
+                                    8,
+                                    8192))
 
         actual = self.data_handler._lookup_prior_data(self, 'read', 'iops')
         self.assertEqual(expected, actual)
     @mock.patch("time.time")
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
     @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
-    @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
-    def test_non_terminated_report(self, mock_job_db, mock_graphite_db,
+    @mock.patch("storperf.db.job_db.JobDB.fetch_workloads")
+    def test_non_terminated_report(self, mock_job_db,
+                                   mock_graphite_db,
                                    mock_results_db, mock_time):
         self._terminated = False
         mock_results_db.side_effect = self.push_results_to_db
         expected_range = 17.78
         expected_average = 212.49777777777774
 
-        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
-                                 ("job_id",
-                                  "rw",
-                                  8,
-                                  8192))
-
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+                                 % ("job_id",
+                                    "rw",
+                                    8,
+                                    8192))
         mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
 
         self.data_handler.data_event(self)
     @mock.patch("time.time")
     @mock.patch("storperf.db.test_results_db.push_results_to_db")
     @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
-    @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
+    @mock.patch("storperf.db.job_db.JobDB.fetch_workloads")
     def test_report_that_causes_termination(self,
                                             mock_job_db,
                                             mock_graphite_db,
         expected_range = 17.78
         expected_average = 209.2135
 
-        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
-                                 ("job_id",
-                                  "rw",
-                                  8,
-                                  8192))
-
+        self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+                                 % ("job_id",
+                                    "rw",
+                                    8,
+                                    8192))
         mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
 
         self.data_handler.data_event(self)