Data Reporting Gate 11/24911/2
authorMark Beierl <mark.beierl@dell.com>
Fri, 25 Nov 2016 18:28:40 +0000 (13:28 -0500)
committerMark Beierl <mark.beierl@dell.com>
Fri, 25 Nov 2016 18:40:34 +0000 (13:40 -0500)
Module that allows passing of a gate only once all other
peers have also reported in within a specified time period

Change-Id: If4baf1d4377026c7833a6f30bfc2e36698f675e8
JIRA: STORPERF-71
Signed-off-by: Mark Beierl <mark.beierl@dell.com>
storperf/utilities/thread_gate.py [new file with mode: 0644]
tests/utilities_tests/__init__.py
tests/utilities_tests/thread_gate_test.py [new file with mode: 0644]

diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
new file mode 100644 (file)
index 0000000..b0dde50
--- /dev/null
@@ -0,0 +1,58 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+Creates a gate object that allows synchronization between an arbitrary
+number of callers.
+"""
+import logging
+import time
+
+
+class FailureToReportException(Exception):
+    pass
+
+
+class ThreadGate(object):
+
+    def __init__(self, size, timeout=60):
+        self.logger = logging.getLogger(__name__)
+        self._gate_size = size
+        self._timeout = timeout
+        self._registrants = {}
+        self._creation_time = time.time()
+
+    """
+    Calling this method returns a true or false, indicating that enough
+    of the other registrants have reported in
+    """
+
+    def report(self, gate_id):
+        now = time.time()
+        self._registrants[gate_id] = now
+        ready = True
+        self.logger.debug("Gate report for %s", gate_id)
+
+        total_missing = self._gate_size - len(self._registrants)
+        if total_missing > 0:
+            self.logger.debug("Not all registrants have reported in")
+            time_since_creation = now - self._creation_time
+            if (time_since_creation > (self._timeout * 2)):
+                self.logger.error("%s registrant(s) have never reported in",
+                                  total_missing)
+                raise FailureToReportException
+            return False
+
+        for k, v in self._registrants.items():
+            time_since_last_report = now - v
+            if time_since_last_report > self._timeout:
+                self.logger.debug("Registrant %s last reported %s ago",
+                                  k, time_since_last_report)
+                ready = False
+
+        return ready
index 73444b6..6218fe3 100644 (file)
@@ -6,3 +6,6 @@
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
+import logging
+
+logging.basicConfig(level=logging.DEBUG)
diff --git a/tests/utilities_tests/thread_gate_test.py b/tests/utilities_tests/thread_gate_test.py
new file mode 100644 (file)
index 0000000..de8b15a
--- /dev/null
@@ -0,0 +1,57 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import time
+import unittest
+
+from storperf.utilities.thread_gate import FailureToReportException
+from storperf.utilities.thread_gate import ThreadGate
+
+
+class ThreadGateTest(unittest.TestCase):
+
+    def setUp(self):
+        pass
+
+    def test_one_one_report(self):
+        gate = ThreadGate(1)
+        self.assertEqual(True, gate.report(1))
+
+    def test_two_one_report(self):
+        gate = ThreadGate(2)
+        self.assertEqual(False, gate.report(1))
+
+    def test_two_two_reports(self):
+        gate = ThreadGate(2)
+        self.assertEqual(False, gate.report(1))
+        self.assertEqual(True, gate.report(2))
+
+    def test_two_one_duplicate_reports(self):
+        gate = ThreadGate(2)
+        self.assertEqual(False, gate.report(1))
+        self.assertEqual(False, gate.report(1))
+        self.assertEqual(True, gate.report(2))
+
+    def test_two_old_old_report(self):
+        timeout = 5
+        gate = ThreadGate(2, timeout)
+        report_time = time.time() - (timeout * 2)
+        gate._registrants[2] = report_time
+        self.assertEqual(False, gate.report(1))
+
+    def test_two_never_report(self):
+        timeout = 5
+        gate = ThreadGate(2, timeout)
+        report_time = time.time() - (timeout * 3)
+        gate._creation_time = report_time
+        try:
+            gate.report(1)
+            self.fail()
+        except FailureToReportException:
+            pass