X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.h;fp=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=efc5ba037ae80e0b918641ff0702979118a684c6;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/common/Throttle.h b/src/ceph/src/common/Throttle.h deleted file mode 100644 index efc5ba0..0000000 --- a/src/ceph/src/common/Throttle.h +++ /dev/null @@ -1,337 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_THROTTLE_H -#define CEPH_THROTTLE_H - -#include -#include -#include -#include -#include -#include -#include - -#include "Cond.h" -#include "include/Context.h" - -class CephContext; -class PerfCounters; - -/** - * @class Throttle - * Throttles the maximum number of active requests. - * - * This class defines the maximum number of slots currently taken away. The - * excessive requests for more of them are delayed, until some slots are put - * back, so @p get_current() drops below the limit after fulfills the requests. - */ -class Throttle { - CephContext *cct; - const std::string name; - PerfCounters *logger; - std::atomic count = { 0 }, max = { 0 }; - Mutex lock; - list cond; - const bool use_perf; - bool shutting_down = false; - Cond shutdown_clear; - -public: - Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true); - ~Throttle(); - -private: - void _reset_max(int64_t m); - bool _should_wait(int64_t c) const { - int64_t m = max; - int64_t cur = count; - return - m && - ((c <= m && cur + c > m) || // normally stay under max - (c >= m && cur > m)); // except for large c - } - - bool _wait(int64_t c); - -public: - /** - * gets the number of currently taken slots - * @returns the number of taken slots - */ - int64_t get_current() const { - return count; - } - - /** - * get the max number of slots - * @returns the max number of slots - */ - int64_t get_max() const { return max; } - - /** - * return true if past midpoint - */ - bool past_midpoint() const { - return count >= max / 2; - } - - /** - * set the new max number, and wait until the number of taken slots drains - * and drops below this limit. - * - * @param m the new max number - * @returns true if this method is blocked, false it it returns immediately - */ - bool wait(int64_t m = 0); - - /** - * take the specified number of slots from the stock regardless the throttling - * @param c number of slots to take - * @returns the total number of taken slots - */ - int64_t take(int64_t c = 1); - - /** - * get the specified amount of slots from the stock, but will wait if the - * total number taken by consumer would exceed the maximum number. - * @param c number of slots to get - * @param m new maximum number to set, ignored if it is 0 - * @returns true if this request is blocked due to the throttling, false - * otherwise - */ - bool get(int64_t c = 1, int64_t m = 0); - - /** - * the unblocked version of @p get() - * @returns true if it successfully got the requested amount, - * or false if it would block. - */ - bool get_or_fail(int64_t c = 1); - - /** - * put slots back to the stock - * @param c number of slots to return - * @returns number of requests being hold after this - */ - int64_t put(int64_t c = 1); - /** - * reset the zero to the stock - */ - void reset(); - - bool should_wait(int64_t c) const { - return _should_wait(c); - } - void reset_max(int64_t m) { - Mutex::Locker l(lock); - _reset_max(m); - } -}; - -/** - * BackoffThrottle - * - * Creates a throttle which gradually induces delays when get() is called - * based on params low_threshhold, high_threshhold, expected_throughput, - * high_multiple, and max_multiple. - * - * In [0, low_threshhold), we want no delay. - * - * In [low_threshhold, high_threshhold), delays should be injected based - * on a line from 0 at low_threshhold to - * high_multiple * (1/expected_throughput) at high_threshhold. - * - * In [high_threshhold, 1), we want delays injected based on a line from - * (high_multiple * (1/expected_throughput)) at high_threshhold to - * (high_multiple * (1/expected_throughput)) + - * (max_multiple * (1/expected_throughput)) at 1. - * - * Let the current throttle ratio (current/max) be r, low_threshhold be l, - * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e, - * and max_delay (max_muliple / expected_throughput) be m. - * - * delay = 0, r \in [0, l) - * delay = (r - l) * (e / (h - l)), r \in [l, h) - * delay = e + (r - h)((m - e)/(1 - h)) - */ -class BackoffThrottle { - CephContext *cct; - const std::string name; - PerfCounters *logger; - - std::mutex lock; - using locker = std::unique_lock; - - unsigned next_cond = 0; - - /// allocated once to avoid constantly allocating new ones - vector conds; - - const bool use_perf; - - /// pointers into conds - list waiters; - - std::list::iterator _push_waiter() { - unsigned next = next_cond++; - if (next_cond == conds.size()) - next_cond = 0; - return waiters.insert(waiters.end(), &(conds[next])); - } - - void _kick_waiters() { - if (!waiters.empty()) - waiters.front()->notify_all(); - } - - /// see above, values are in [0, 1]. - double low_threshhold = 0; - double high_threshhold = 1; - - /// see above, values are in seconds - double high_delay_per_count = 0; - double max_delay_per_count = 0; - - /// Filled in in set_params - double s0 = 0; ///< e / (h - l), l != h, 0 otherwise - double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise - - /// max - uint64_t max = 0; - uint64_t current = 0; - - std::chrono::duration _get_delay(uint64_t c) const; - -public: - /** - * set_params - * - * Sets params. If the params are invalid, returns false - * and populates errstream (if non-null) with a user compreshensible - * explanation. - */ - bool set_params( - double low_threshhold, - double high_threshhold, - double expected_throughput, - double high_multiple, - double max_multiple, - uint64_t throttle_max, - ostream *errstream); - - std::chrono::duration get(uint64_t c = 1); - std::chrono::duration wait() { - return get(0); - } - uint64_t put(uint64_t c = 1); - uint64_t take(uint64_t c = 1); - uint64_t get_current(); - uint64_t get_max(); - - BackoffThrottle(CephContext *cct, const std::string& n, - unsigned expected_concurrency, ///< [in] determines size of conds - bool _use_perf = true); - ~BackoffThrottle(); -}; - - -/** - * @class SimpleThrottle - * This is a simple way to bound the number of concurrent operations. - * - * It tracks the first error encountered, and makes it available - * when all requests are complete. wait_for_ret() should be called - * before the instance is destroyed. - * - * Re-using the same instance isn't safe if you want to check each set - * of operations for errors, since the return value is not reset. - */ -class SimpleThrottle { -public: - SimpleThrottle(uint64_t max, bool ignore_enoent); - ~SimpleThrottle(); - void start_op(); - void end_op(int r); - bool pending_error() const; - int wait_for_ret(); -private: - mutable Mutex m_lock; - Cond m_cond; - uint64_t m_max; - uint64_t m_current; - int m_ret; - bool m_ignore_enoent; - uint32_t waiters = 0; -}; - - -class OrderedThrottle; - -class C_OrderedThrottle : public Context { -public: - C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid) - : m_ordered_throttle(ordered_throttle), m_tid(tid) { - } - -protected: - void finish(int r) override; - -private: - OrderedThrottle *m_ordered_throttle; - uint64_t m_tid; -}; - -/** - * @class OrderedThrottle - * Throttles the maximum number of active requests and completes them in order - * - * Operations can complete out-of-order but their associated Context callback - * will completed in-order during invokation of start_op() and wait_for_ret() - */ -class OrderedThrottle { -public: - OrderedThrottle(uint64_t max, bool ignore_enoent); - ~OrderedThrottle(); - - C_OrderedThrottle *start_op(Context *on_finish); - void end_op(int r); - - bool pending_error() const; - int wait_for_ret(); - -protected: - friend class C_OrderedThrottle; - - void finish_op(uint64_t tid, int r); - -private: - struct Result { - bool finished; - int ret_val; - Context *on_finish; - - Result(Context *_on_finish = NULL) - : finished(false), ret_val(0), on_finish(_on_finish) { - } - }; - - typedef std::map TidResult; - - mutable Mutex m_lock; - Cond m_cond; - uint64_t m_max; - uint64_t m_current; - int m_ret_val; - bool m_ignore_enoent; - - uint64_t m_next_tid; - uint64_t m_complete_tid; - - TidResult m_tid_result; - - void complete_pending_ops(); - uint32_t waiters = 0; -}; - -#endif