initial code repo
[stor4nfv.git] / src / ceph / src / common / Throttle.h
diff --git a/src/ceph/src/common/Throttle.h b/src/ceph/src/common/Throttle.h
new file mode 100644 (file)
index 0000000..efc5ba0
--- /dev/null
@@ -0,0 +1,337 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_THROTTLE_H
+#define CEPH_THROTTLE_H
+
+#include <map>
+#include <list>
+#include <chrono>
+#include <atomic>
+#include <iostream>
+#include <condition_variable>
+#include <stdexcept>
+
+#include "Cond.h"
+#include "include/Context.h"
+
+class CephContext;
+class PerfCounters;
+
+/**
+ * @class Throttle
+ * Throttles the maximum number of active requests.
+ *
+ * This class defines the maximum number of slots currently taken away. The
+ * excessive requests for more of them are delayed, until some slots are put
+ * back, so @p get_current() drops below the limit after fulfills the requests.
+ */
+class Throttle {
+  CephContext *cct;
+  const std::string name;
+  PerfCounters *logger;
+  std::atomic<unsigned> count = { 0 }, max = { 0 };
+  Mutex lock;
+  list<Cond*> cond;
+  const bool use_perf;
+  bool shutting_down = false;
+  Cond shutdown_clear;
+
+public:
+  Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
+  ~Throttle();
+
+private:
+  void _reset_max(int64_t m);
+  bool _should_wait(int64_t c) const {
+    int64_t m = max;
+    int64_t cur = count;
+    return
+      m &&
+      ((c <= m && cur + c > m) || // normally stay under max
+       (c >= m && cur > m));     // except for large c
+  }
+
+  bool _wait(int64_t c);
+
+public:
+  /**
+   * gets the number of currently taken slots
+   * @returns the number of taken slots
+   */
+  int64_t get_current() const {
+    return count;
+  }
+
+  /**
+   * get the max number of slots
+   * @returns the max number of slots
+   */
+  int64_t get_max() const { return max; }
+
+  /**
+   * return true if past midpoint
+   */
+  bool past_midpoint() const {
+    return count >= max / 2;
+  }
+
+  /**
+   * set the new max number, and wait until the number of taken slots drains
+   * and drops below this limit.
+   *
+   * @param m the new max number
+   * @returns true if this method is blocked, false it it returns immediately
+   */
+  bool wait(int64_t m = 0);
+
+  /**
+   * take the specified number of slots from the stock regardless the throttling
+   * @param c number of slots to take
+   * @returns the total number of taken slots
+   */
+  int64_t take(int64_t c = 1);
+
+  /**
+   * get the specified amount of slots from the stock, but will wait if the
+   * total number taken by consumer would exceed the maximum number.
+   * @param c number of slots to get
+   * @param m new maximum number to set, ignored if it is 0
+   * @returns true if this request is blocked due to the throttling, false 
+   * otherwise
+   */
+  bool get(int64_t c = 1, int64_t m = 0);
+
+  /**
+   * the unblocked version of @p get()
+   * @returns true if it successfully got the requested amount,
+   * or false if it would block.
+   */
+  bool get_or_fail(int64_t c = 1);
+
+  /**
+   * put slots back to the stock
+   * @param c number of slots to return
+   * @returns number of requests being hold after this
+   */
+  int64_t put(int64_t c = 1);
+   /**
+   * reset the zero to the stock
+   */
+  void reset();
+
+  bool should_wait(int64_t c) const {
+    return _should_wait(c);
+  }
+  void reset_max(int64_t m) {
+    Mutex::Locker l(lock);
+    _reset_max(m);
+  }
+};
+
+/**
+ * BackoffThrottle
+ *
+ * Creates a throttle which gradually induces delays when get() is called
+ * based on params low_threshhold, high_threshhold, expected_throughput,
+ * high_multiple, and max_multiple.
+ *
+ * In [0, low_threshhold), we want no delay.
+ *
+ * In [low_threshhold, high_threshhold), delays should be injected based
+ * on a line from 0 at low_threshhold to
+ * high_multiple * (1/expected_throughput) at high_threshhold.
+ *
+ * In [high_threshhold, 1), we want delays injected based on a line from
+ * (high_multiple * (1/expected_throughput)) at high_threshhold to
+ * (high_multiple * (1/expected_throughput)) +
+ * (max_multiple * (1/expected_throughput)) at 1.
+ *
+ * Let the current throttle ratio (current/max) be r, low_threshhold be l,
+ * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
+ * and max_delay (max_muliple / expected_throughput) be m.
+ *
+ * delay = 0, r \in [0, l)
+ * delay = (r - l) * (e / (h - l)), r \in [l, h)
+ * delay = e + (r - h)((m - e)/(1 - h))
+ */
+class BackoffThrottle {
+  CephContext *cct;
+  const std::string name;
+  PerfCounters *logger;
+
+  std::mutex lock;
+  using locker = std::unique_lock<std::mutex>;
+
+  unsigned next_cond = 0;
+
+  /// allocated once to avoid constantly allocating new ones
+  vector<std::condition_variable> conds;
+
+  const bool use_perf;
+
+  /// pointers into conds
+  list<std::condition_variable*> waiters;
+
+  std::list<std::condition_variable*>::iterator _push_waiter() {
+    unsigned next = next_cond++;
+    if (next_cond == conds.size())
+      next_cond = 0;
+    return waiters.insert(waiters.end(), &(conds[next]));
+  }
+
+  void _kick_waiters() {
+    if (!waiters.empty())
+      waiters.front()->notify_all();
+  }
+
+  /// see above, values are in [0, 1].
+  double low_threshhold = 0;
+  double high_threshhold = 1;
+
+  /// see above, values are in seconds
+  double high_delay_per_count = 0;
+  double max_delay_per_count = 0;
+
+  /// Filled in in set_params
+  double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
+  double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
+
+  /// max
+  uint64_t max = 0;
+  uint64_t current = 0;
+
+  std::chrono::duration<double> _get_delay(uint64_t c) const;
+
+public:
+  /**
+   * set_params
+   *
+   * Sets params.  If the params are invalid, returns false
+   * and populates errstream (if non-null) with a user compreshensible
+   * explanation.
+   */
+  bool set_params(
+    double low_threshhold,
+    double high_threshhold,
+    double expected_throughput,
+    double high_multiple,
+    double max_multiple,
+    uint64_t throttle_max,
+    ostream *errstream);
+
+  std::chrono::duration<double> get(uint64_t c = 1);
+  std::chrono::duration<double> wait() {
+    return get(0);
+  }
+  uint64_t put(uint64_t c = 1);
+  uint64_t take(uint64_t c = 1);
+  uint64_t get_current();
+  uint64_t get_max();
+
+  BackoffThrottle(CephContext *cct, const std::string& n,
+    unsigned expected_concurrency, ///< [in] determines size of conds
+    bool _use_perf = true);
+  ~BackoffThrottle();
+};
+
+
+/**
+ * @class SimpleThrottle
+ * This is a simple way to bound the number of concurrent operations.
+ *
+ * It tracks the first error encountered, and makes it available
+ * when all requests are complete. wait_for_ret() should be called
+ * before the instance is destroyed.
+ *
+ * Re-using the same instance isn't safe if you want to check each set
+ * of operations for errors, since the return value is not reset.
+ */
+class SimpleThrottle {
+public:
+  SimpleThrottle(uint64_t max, bool ignore_enoent);
+  ~SimpleThrottle();
+  void start_op();
+  void end_op(int r);
+  bool pending_error() const;
+  int wait_for_ret();
+private:
+  mutable Mutex m_lock;
+  Cond m_cond;
+  uint64_t m_max;
+  uint64_t m_current;
+  int m_ret;
+  bool m_ignore_enoent;
+  uint32_t waiters = 0;
+};
+
+
+class OrderedThrottle;
+
+class C_OrderedThrottle : public Context {
+public:
+  C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
+    : m_ordered_throttle(ordered_throttle), m_tid(tid) {
+  }
+
+protected:
+  void finish(int r) override;
+
+private:
+  OrderedThrottle *m_ordered_throttle;
+  uint64_t m_tid;
+};
+
+/**
+ * @class OrderedThrottle
+ * Throttles the maximum number of active requests and completes them in order
+ *
+ * Operations can complete out-of-order but their associated Context callback
+ * will completed in-order during invokation of start_op() and wait_for_ret()
+ */
+class OrderedThrottle {
+public:
+  OrderedThrottle(uint64_t max, bool ignore_enoent);
+  ~OrderedThrottle();
+
+  C_OrderedThrottle *start_op(Context *on_finish);
+  void end_op(int r);
+
+  bool pending_error() const;
+  int wait_for_ret();
+
+protected:
+  friend class C_OrderedThrottle;
+
+  void finish_op(uint64_t tid, int r);
+
+private:
+  struct Result {
+    bool finished;
+    int ret_val;
+    Context *on_finish;
+
+    Result(Context *_on_finish = NULL)
+      : finished(false), ret_val(0), on_finish(_on_finish) {
+    }
+  };
+
+  typedef std::map<uint64_t, Result> TidResult;
+
+  mutable Mutex m_lock;
+  Cond m_cond;
+  uint64_t m_max;
+  uint64_t m_current;
+  int m_ret_val;
+  bool m_ignore_enoent;
+
+  uint64_t m_next_tid;
+  uint64_t m_complete_tid;
+
+  TidResult m_tid_result;
+
+  void complete_pending_ops();
+  uint32_t waiters = 0;
+};
+
+#endif