--- /dev/null
+
+
+from threading import Thread, Event, Lock
+import datetime
+import time
+
+
+class GetterThread(Thread):
+ def __init__(self, view):
+ super(GetterThread, self).__init__()
+ self._view = view
+ self.event = Event()
+
+ def run(self):
+ try:
+ t0 = time.time()
+ val = self._view._get()
+ t1 = time.time()
+ except:
+ self._view.log.exception("Error while calling _get:")
+ # TODO: separate channel for passing back error
+ self._view.value = None
+ self._view.value_when = None
+ self._view.getter_thread = None
+ else:
+ with self._view.lock:
+ self._view.latency = t1 - t0
+ self._view.value = val
+ self._view.value_when = datetime.datetime.now()
+ self._view.getter_thread = None
+
+ self.event.set()
+
+
+class RemoteViewCache(object):
+ """
+ A cached view of something we have to fetch remotely, e.g. the output
+ of a 'tell' command to a daemon, or something we read from RADOS like
+ and rbd image's status.
+
+ There are two reasons this wrapper exists:
+ * To reduce load on remote entities when we frequently fetch something
+ (i.e. 10 page loads in one second should not translate into 10 separate
+ remote fetches)
+ * To handle the blocking nature of some remote operations
+ (i.e. if RADOS isn't responding, we need a predictable way to either
+ return no data, or return stale data, when something from RADOS is
+ requested)
+
+ Note that relying on timeouts in underlying libraries isn't wise here:
+ if something is really slow, we would like to return a 'stale' state
+ to a GUI quickly, but we should let our underlying request to the cluster
+ run to completion so that we get some data even if it's slow, and so that
+ a polling caller doesn't end up firing off a large number of requests to
+ the cluster because each one timed out.
+
+ Subclasses may override _init and must override _get
+ """
+
+ def __init__(self, module_inst):
+ self.initialized = False
+
+ self.log = module_inst.log
+
+ self.getter_thread = None
+
+ # Consider data within 1s old to be sufficiently fresh
+ self.stale_period = 1.0
+
+ # Return stale data if
+ self.timeout = 5
+
+ self.event = Event()
+ self.value_when = None
+ self.value = None
+ self.latency = 0
+ self.lock = Lock()
+
+ self._module = module_inst
+
+ def init(self):
+ self._init()
+ self.initialized = True
+
+ VALUE_OK = 0
+ VALUE_STALE = 1
+ VALUE_NONE = 2
+
+ def get(self):
+ """
+ If data less than `stale_period` old is available, return it
+ immediately.
+ If an attempt to fetch data does not complete within `timeout`, then
+ return the most recent data available, with a status to indicate that
+ it is stale.
+
+ Initialization does not count towards the timeout, so the first call
+ on one of these objects during the process lifetime may be slower
+ than subsequent calls.
+
+ :return: 2-tuple of value status code, value
+ """
+ with self.lock:
+ if not self.initialized:
+ self.init()
+
+ now = datetime.datetime.now()
+ if self.value_when and now - self.value_when < datetime.timedelta(
+ seconds=self.stale_period):
+ return self.VALUE_OK, self.value
+
+ if self.getter_thread is None:
+ self.getter_thread = GetterThread(self)
+ self.getter_thread.start()
+
+ ev = self.getter_thread.event
+
+ success = ev.wait(timeout=self.timeout)
+
+ with self.lock:
+ if success:
+ # We fetched the data within the timeout
+ return self.VALUE_OK, self.value
+ elif self.value_when is not None:
+ # We have some data, but it doesn't meet freshness requirements
+ return self.VALUE_STALE, self.value
+ else:
+ # We have no data, not even stale data
+ return self.VALUE_NONE, None
+
+ def _init(self):
+ pass
+
+ def _get(self):
+ raise NotImplementedError()