1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
12 #include <condition_variable>
16 #include "include/Context.h"
23 * Throttles the maximum number of active requests.
25 * This class defines the maximum number of slots currently taken away. The
26 * excessive requests for more of them are delayed, until some slots are put
27 * back, so @p get_current() drops below the limit after fulfills the requests.
31 const std::string name;
33 std::atomic<unsigned> count = { 0 }, max = { 0 };
37 bool shutting_down = false;
41 Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
45 void _reset_max(int64_t m);
46 bool _should_wait(int64_t c) const {
51 ((c <= m && cur + c > m) || // normally stay under max
52 (c >= m && cur > m)); // except for large c
55 bool _wait(int64_t c);
59 * gets the number of currently taken slots
60 * @returns the number of taken slots
62 int64_t get_current() const {
67 * get the max number of slots
68 * @returns the max number of slots
70 int64_t get_max() const { return max; }
73 * return true if past midpoint
75 bool past_midpoint() const {
76 return count >= max / 2;
80 * set the new max number, and wait until the number of taken slots drains
81 * and drops below this limit.
83 * @param m the new max number
84 * @returns true if this method is blocked, false it it returns immediately
86 bool wait(int64_t m = 0);
89 * take the specified number of slots from the stock regardless the throttling
90 * @param c number of slots to take
91 * @returns the total number of taken slots
93 int64_t take(int64_t c = 1);
96 * get the specified amount of slots from the stock, but will wait if the
97 * total number taken by consumer would exceed the maximum number.
98 * @param c number of slots to get
99 * @param m new maximum number to set, ignored if it is 0
100 * @returns true if this request is blocked due to the throttling, false
103 bool get(int64_t c = 1, int64_t m = 0);
106 * the unblocked version of @p get()
107 * @returns true if it successfully got the requested amount,
108 * or false if it would block.
110 bool get_or_fail(int64_t c = 1);
113 * put slots back to the stock
114 * @param c number of slots to return
115 * @returns number of requests being hold after this
117 int64_t put(int64_t c = 1);
119 * reset the zero to the stock
123 bool should_wait(int64_t c) const {
124 return _should_wait(c);
126 void reset_max(int64_t m) {
127 Mutex::Locker l(lock);
135 * Creates a throttle which gradually induces delays when get() is called
136 * based on params low_threshhold, high_threshhold, expected_throughput,
137 * high_multiple, and max_multiple.
139 * In [0, low_threshhold), we want no delay.
141 * In [low_threshhold, high_threshhold), delays should be injected based
142 * on a line from 0 at low_threshhold to
143 * high_multiple * (1/expected_throughput) at high_threshhold.
145 * In [high_threshhold, 1), we want delays injected based on a line from
146 * (high_multiple * (1/expected_throughput)) at high_threshhold to
147 * (high_multiple * (1/expected_throughput)) +
148 * (max_multiple * (1/expected_throughput)) at 1.
150 * Let the current throttle ratio (current/max) be r, low_threshhold be l,
151 * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
152 * and max_delay (max_muliple / expected_throughput) be m.
154 * delay = 0, r \in [0, l)
155 * delay = (r - l) * (e / (h - l)), r \in [l, h)
156 * delay = e + (r - h)((m - e)/(1 - h))
158 class BackoffThrottle {
160 const std::string name;
161 PerfCounters *logger;
164 using locker = std::unique_lock<std::mutex>;
166 unsigned next_cond = 0;
168 /// allocated once to avoid constantly allocating new ones
169 vector<std::condition_variable> conds;
173 /// pointers into conds
174 list<std::condition_variable*> waiters;
176 std::list<std::condition_variable*>::iterator _push_waiter() {
177 unsigned next = next_cond++;
178 if (next_cond == conds.size())
180 return waiters.insert(waiters.end(), &(conds[next]));
183 void _kick_waiters() {
184 if (!waiters.empty())
185 waiters.front()->notify_all();
188 /// see above, values are in [0, 1].
189 double low_threshhold = 0;
190 double high_threshhold = 1;
192 /// see above, values are in seconds
193 double high_delay_per_count = 0;
194 double max_delay_per_count = 0;
196 /// Filled in in set_params
197 double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
198 double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
202 uint64_t current = 0;
204 std::chrono::duration<double> _get_delay(uint64_t c) const;
210 * Sets params. If the params are invalid, returns false
211 * and populates errstream (if non-null) with a user compreshensible
215 double low_threshhold,
216 double high_threshhold,
217 double expected_throughput,
218 double high_multiple,
220 uint64_t throttle_max,
223 std::chrono::duration<double> get(uint64_t c = 1);
224 std::chrono::duration<double> wait() {
227 uint64_t put(uint64_t c = 1);
228 uint64_t take(uint64_t c = 1);
229 uint64_t get_current();
232 BackoffThrottle(CephContext *cct, const std::string& n,
233 unsigned expected_concurrency, ///< [in] determines size of conds
234 bool _use_perf = true);
240 * @class SimpleThrottle
241 * This is a simple way to bound the number of concurrent operations.
243 * It tracks the first error encountered, and makes it available
244 * when all requests are complete. wait_for_ret() should be called
245 * before the instance is destroyed.
247 * Re-using the same instance isn't safe if you want to check each set
248 * of operations for errors, since the return value is not reset.
250 class SimpleThrottle {
252 SimpleThrottle(uint64_t max, bool ignore_enoent);
256 bool pending_error() const;
259 mutable Mutex m_lock;
264 bool m_ignore_enoent;
265 uint32_t waiters = 0;
269 class OrderedThrottle;
271 class C_OrderedThrottle : public Context {
273 C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
274 : m_ordered_throttle(ordered_throttle), m_tid(tid) {
278 void finish(int r) override;
281 OrderedThrottle *m_ordered_throttle;
286 * @class OrderedThrottle
287 * Throttles the maximum number of active requests and completes them in order
289 * Operations can complete out-of-order but their associated Context callback
290 * will completed in-order during invokation of start_op() and wait_for_ret()
292 class OrderedThrottle {
294 OrderedThrottle(uint64_t max, bool ignore_enoent);
297 C_OrderedThrottle *start_op(Context *on_finish);
300 bool pending_error() const;
304 friend class C_OrderedThrottle;
306 void finish_op(uint64_t tid, int r);
314 Result(Context *_on_finish = NULL)
315 : finished(false), ret_val(0), on_finish(_on_finish) {
319 typedef std::map<uint64_t, Result> TidResult;
321 mutable Mutex m_lock;
326 bool m_ignore_enoent;
329 uint64_t m_complete_tid;
331 TidResult m_tid_result;
333 void complete_pending_ops();
334 uint32_t waiters = 0;