DB Access Mutex 49/10649/1 brahmaputra.1.0
authorMark Beierl <mark.beierl@emc.com>
Wed, 24 Feb 2016 22:04:14 +0000 (23:04 +0100)
committerMark Beierl <mark.beierl@emc.com>
Wed, 24 Feb 2016 22:06:45 +0000 (22:06 +0000)
Addition of a mutex to prevent mutiple, simultaneous DB accesses as SQLite is single threaded

Change-Id: Iad2cd94015f7fb604dc6b60636db939099f52757
JIRA: STORPERF-32
Signed-off-by: Mark Beierl <mark.beierl@emc.com>
(cherry picked from commit eed2ece373aede00f43e2655dca451266d4e9c37)

storperf/db/configuration_db.py
storperf/db/job_db.py

index 649c186..700588d 100644 (file)
@@ -8,74 +8,88 @@
 ##############################################################################
 
 from _sqlite3 import OperationalError
+from threading import Lock
 import logging
 import sqlite3
 
+db_mutex = Lock()
+
 
 class ConfigurationDB(object):
 
-    db_name = "StorPerf.db"
+    db_name = "StorPerfConfig.db"
 
     def __init__(self):
         """
-        Creates the StorPerf.db and configuration tables on demand
+        Creates the StorPerfConfig.db and configuration tables on demand
         """
 
         self.logger = logging.getLogger(__name__)
         self.logger.debug("Connecting to " + ConfigurationDB.db_name)
-        db = sqlite3.connect(ConfigurationDB.db_name)
-
-        cursor = db.cursor()
-        try:
-            cursor.execute('''CREATE TABLE configuration
-            (configuration_name text,
-            key text,
-            value text)''')
-            self.logger.debug("Created configuration table")
-        except OperationalError:
-            self.logger.debug("Configuration table exists")
-
-        cursor.execute('SELECT * FROM configuration')
+        with db_mutex:
+            db = sqlite3.connect(ConfigurationDB.db_name)
+
+            cursor = db.cursor()
+            try:
+                cursor.execute('''CREATE TABLE configuration
+                (configuration_name text,
+                key text,
+                value text)''')
+                self.logger.debug("Created configuration table")
+            except OperationalError:
+                self.logger.debug("Configuration table exists")
+
+            cursor.execute('SELECT * FROM configuration')
+            db.commit()
+            db.close()
 
     def delete_configuration_value(self, configuration_name, key):
         """Deletes the value associated with the given key
         """
 
-        db = sqlite3.connect(ConfigurationDB.db_name)
-        cursor = db.cursor()
+        with db_mutex:
+            db = sqlite3.connect(ConfigurationDB.db_name)
+            cursor = db.cursor()
 
-        cursor.execute(
-            "delete from configuration where configuration_name=? and key=?",
-            (configuration_name, key))
+            cursor.execute(
+                "delete from configuration where configuration_name=? and key=?",
+                (configuration_name, key))
 
-        self.logger.debug("Deleted " + configuration_name + ":" + key)
+            self.logger.debug("Deleted " + configuration_name + ":" + key)
 
-        db.commit()
+            db.commit()
+            db.close()
 
     def get_configuration_value(self, configuration_name, key):
         """Returns a string representation of the value stored
         with this key under the given configuration name.
         """
 
-        db = sqlite3.connect(ConfigurationDB.db_name)
-        cursor = db.cursor()
+        with db_mutex:
+            db = sqlite3.connect(ConfigurationDB.db_name)
+            cursor = db.cursor()
 
-        cursor.execute(
-            """select value from configuration
-                       where configuration_name = ?
-                       and key = ?""",
-            (configuration_name, key,))
+            cursor.execute(
+                """select value from configuration
+                           where configuration_name = ?
+                           and key = ?""",
+                (configuration_name, key,))
 
-        row = cursor.fetchone()
+            row = cursor.fetchone()
 
-        if (row is None):
-            self.logger.debug(
-                configuration_name + ":" + key + " does not exist")
-            return None
-        else:
-            self.logger.debug(
-                configuration_name + ":" + key + " is " + str(row[0]))
-            return str(row[0])
+            return_value = None
+
+            if (row is None):
+                self.logger.debug(
+                    configuration_name + ":" + key + " does not exist")
+            else:
+                self.logger.debug(
+                    configuration_name + ":" + key + " is " + str(row[0]))
+                return_value = str(row[0])
+
+            db.close()
+
+            return return_value
 
     def set_configuration_value(self, configuration_name, key, value):
         """Updates or creates the key under the given configuration
@@ -85,19 +99,22 @@ class ConfigurationDB(object):
         if (value is None):
             return self.delete_configuration_value(configuration_name, key)
 
-        value = str(value)
+        with db_mutex:
+            value = str(value)
 
-        db = sqlite3.connect(ConfigurationDB.db_name)
-        cursor = db.cursor()
+            db = sqlite3.connect(ConfigurationDB.db_name)
+            cursor = db.cursor()
 
-        cursor.execute(
-            "delete from configuration where configuration_name=? and key=?",
-            (configuration_name, key))
+            cursor.execute(
+                "delete from configuration where configuration_name=? and key=?",
+                (configuration_name, key))
 
-        cursor.execute(
-            """insert into configuration(configuration_name, key, value)
-             values (?,?,?)""", (configuration_name, key, value))
+            cursor.execute(
+                """insert into configuration(configuration_name, key, value)
+                 values (?,?,?)""", (configuration_name, key, value))
 
-        self.logger.debug(configuration_name + ":" + key + " set to " + value)
+            self.logger.debug(
+                configuration_name + ":" + key + " set to " + value)
 
-        db.commit()
+            db.commit()
+            db.close()
index bec8d3f..f24ccf4 100644 (file)
@@ -8,6 +8,7 @@
 ##############################################################################
 
 from _sqlite3 import OperationalError
+from threading import Lock
 import calendar
 import logging
 import sqlite3
@@ -17,140 +18,152 @@ import uuid
 import requests
 
 
+db_mutex = Lock()
+
+
 class JobDB(object):
 
-    db_name = "StorPerf.db"
+    db_name = "StorPerfJob.db"
 
     def __init__(self):
         """
-        Creates the StorPerf.db and jobs tables on demand
+        Creates the StorPerfJob.db and jobs tables on demand
         """
 
         self.logger = logging.getLogger(__name__)
         self.logger.debug("Connecting to " + JobDB.db_name)
         self.job_id = None
 
-        db = sqlite3.connect(JobDB.db_name)
-        cursor = db.cursor()
-        try:
-            cursor.execute('''CREATE TABLE jobs
-            (job_id text,
-            workload text,
-            start text,
-            end text)''')
-            self.logger.debug("Created job table")
-        except OperationalError:
-            self.logger.debug("Job table exists")
-
-        cursor.execute('SELECT * FROM jobs')
+        with db_mutex:
+            db = sqlite3.connect(JobDB.db_name)
+            cursor = db.cursor()
+            try:
+                cursor.execute('''CREATE TABLE jobs
+                (job_id text,
+                workload text,
+                start text,
+                end text)''')
+                self.logger.debug("Created job table")
+            except OperationalError:
+                self.logger.debug("Job table exists")
+
+            cursor.execute('SELECT * FROM jobs')
+            db.commit()
+            db.close()
 
     def create_job_id(self):
         """
         Returns a job id that is guaranteed to be unique in this
         StorPerf instance.
         """
-        db = sqlite3.connect(JobDB.db_name)
-        cursor = db.cursor()
+        with db_mutex:
+            db = sqlite3.connect(JobDB.db_name)
+            cursor = db.cursor()
 
-        self.job_id = str(uuid.uuid4())
-        row = cursor.execute(
-            "select * from jobs where job_id = ?", (self.job_id,))
-
-        while (row.fetchone() is not None):
-            self.logger.info("Duplicate job id found, regenerating")
             self.job_id = str(uuid.uuid4())
             row = cursor.execute(
                 "select * from jobs where job_id = ?", (self.job_id,))
 
-        cursor.execute(
-            "insert into jobs(job_id) values (?)", (self.job_id,))
-        self.logger.debug("Reserved job id " + self.job_id)
-        db.commit()
+            while (row.fetchone() is not None):
+                self.logger.info("Duplicate job id found, regenerating")
+                self.job_id = str(uuid.uuid4())
+                row = cursor.execute(
+                    "select * from jobs where job_id = ?", (self.job_id,))
+
+            cursor.execute(
+                "insert into jobs(job_id) values (?)", (self.job_id,))
+            self.logger.debug("Reserved job id " + self.job_id)
+            db.commit()
+            db.close()
 
     def start_workload(self, workload_name):
         """
         Records the start time for the given workload
         """
-        if (self.job_id is None):
-            self.create_job_id()
-
-        db = sqlite3.connect(JobDB.db_name)
-        cursor = db.cursor()
-
-        now = str(calendar.timegm(time.gmtime()))
+        with db_mutex:
+            if (self.job_id is None):
+                self.create_job_id()
 
-        row = cursor.execute(
-            """select * from jobs
-                       where job_id = ?
-                       and workload = ?""",
-            (self.job_id, workload_name,))
+            db = sqlite3.connect(JobDB.db_name)
+            cursor = db.cursor()
 
-        if (row.fetchone() is None):
-            cursor.execute(
-                """insert into jobs
-                           (job_id,
-                           workload,
-                           start)
-                           values (?, ?, ?)""",
-                (self.job_id,
-                 workload_name,
-                 now,))
-        else:
-            self.logger.warn("Duplicate start time for workload "
-                             + workload_name)
-            cursor.execute(
-                """update jobs set
-                           job_id = ?,
-                           start = ?
-                           where workload = ?""",
-                (self.job_id,
-                 now,
-                 workload_name,))
+            now = str(calendar.timegm(time.gmtime()))
 
-        db.commit()
+            row = cursor.execute(
+                """select * from jobs
+                           where job_id = ?
+                           and workload = ?""",
+                (self.job_id, workload_name,))
+
+            if (row.fetchone() is None):
+                cursor.execute(
+                    """insert into jobs
+                               (job_id,
+                               workload,
+                               start)
+                               values (?, ?, ?)""",
+                    (self.job_id,
+                     workload_name,
+                     now,))
+            else:
+                self.logger.warn("Duplicate start time for workload "
+                                 + workload_name)
+                cursor.execute(
+                    """update jobs set
+                               job_id = ?,
+                               start = ?
+                               where workload = ?""",
+                    (self.job_id,
+                     now,
+                     workload_name,))
+
+            db.commit()
+            db.close()
 
     def end_workload(self, workload_name):
         """
         Records the end time for the given workload
         """
-        if (self.job_id is None):
-            self.create_job_id()
-
-        db = sqlite3.connect(JobDB.db_name)
-        cursor = db.cursor()
-        now = str(calendar.timegm(time.gmtime()))
-
-        row = cursor.execute(
-            """select * from jobs
-                       where job_id = ?
-                       and workload = ?""",
-            (self.job_id, workload_name,))
-
-        if (row.fetchone() is None):
-            self.logger.warn("No start time recorded for workload "
-                             + workload_name)
-            cursor.execute(
-                """insert into jobs
-                           (job_id,
-                           workload,
-                           start,
-                           end)
-                           values (?, ?, ?, ?)""",
-                (self.job_id,
-                 workload_name,
-                 now,
-                 now))
-        else:
-            cursor.execute(
-                """update jobs set
-                           job_id = ?,
-                           end = ?
-                           where workload = ?""",
-                (self.job_id,
-                 now,
-                 workload_name,))
+        with db_mutex:
+            if (self.job_id is None):
+                self.create_job_id()
 
-        db.commit()
+            db = sqlite3.connect(JobDB.db_name)
+            cursor = db.cursor()
+            now = str(calendar.timegm(time.gmtime()))
+
+            row = cursor.execute(
+                """select * from jobs
+                           where job_id = ?
+                           and workload = ?""",
+                (self.job_id, workload_name,))
+
+            if (row.fetchone() is None):
+                self.logger.warn("No start time recorded for workload "
+                                 + workload_name)
+                cursor.execute(
+                    """insert into jobs
+                               (job_id,
+                               workload,
+                               start,
+                               end)
+                               values (?, ?, ?, ?)""",
+                    (self.job_id,
+                     workload_name,
+                     now,
+                     now))
+            else:
+                cursor.execute(
+                    """update jobs set
+                               job_id = ?,
+                               end = ?
+                               where workload = ?""",
+                    (self.job_id,
+                     now,
+                     workload_name,))
+
+            db.commit()
+            db.close()
 
     def fetch_results(self, workload_prefix=""):
         if (workload_prefix is None):
@@ -165,36 +178,39 @@ class JobDB(object):
 
         self.logger.debug("Workload like: " + workload_prefix)
 
-        db = sqlite3.connect(JobDB.db_name)
-        cursor = db.cursor()
-        cursor.execute("""select start, end, workload
-            from jobs where workload like ?""",
-                       (workload_prefix,))
+        with db_mutex:
 
-        while (True):
-            row = cursor.fetchone()
-            if (row is None):
-                break
+            db = sqlite3.connect(JobDB.db_name)
+            cursor = db.cursor()
+            cursor.execute("""select start, end, workload
+                from jobs where workload like ?""",
+                           (workload_prefix,))
 
-            start_time = str(row[0])
-            end_time = str(row[1])
-            workload = str(row[2])
+            while (True):
+                row = cursor.fetchone()
+                if (row is None):
+                    break
 
-            # for most of these stats, we just want the final one
-            # as that is cumulative average or whatever for the whole
-            # run
+                start_time = str(row[0])
+                end_time = str(row[1])
+                workload = str(row[2])
 
-            self.logger.info("workload=" + workload +
-                             "start=" + start_time + " end=" + end_time)
+                # for most of these stats, we just want the final one
+                # as that is cumulative average or whatever for the whole
+                # run
 
-            request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
-                '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
-                start_time + "&until=" + end_time
+                self.logger.info("workload=" + workload +
+                                 "start=" + start_time + " end=" + end_time)
 
-            response = requests.get(request)
+                request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
+                    '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
+                    start_time + "&until=" + end_time
 
-            if (response.status_code == 200):
-                data = response.json()
-                print data
-            else:
-                pass
+                response = requests.get(request)
+
+                if (response.status_code == 200):
+                    data = response.json()
+                    print data
+                else:
+                    pass
+            db.close()