Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Throttle.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
6
7 #include <map>
8 #include <list>
9 #include <chrono>
10 #include <atomic>
11 #include <iostream>
12 #include <condition_variable>
13 #include <stdexcept>
14
15 #include "Cond.h"
16 #include "include/Context.h"
17
18 class CephContext;
19 class PerfCounters;
20
21 /**
22  * @class Throttle
23  * Throttles the maximum number of active requests.
24  *
25  * This class defines the maximum number of slots currently taken away. The
26  * excessive requests for more of them are delayed, until some slots are put
27  * back, so @p get_current() drops below the limit after fulfills the requests.
28  */
29 class Throttle {
30   CephContext *cct;
31   const std::string name;
32   PerfCounters *logger;
33   std::atomic<unsigned> count = { 0 }, max = { 0 };
34   Mutex lock;
35   list<Cond*> cond;
36   const bool use_perf;
37   bool shutting_down = false;
38   Cond shutdown_clear;
39
40 public:
41   Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
42   ~Throttle();
43
44 private:
45   void _reset_max(int64_t m);
46   bool _should_wait(int64_t c) const {
47     int64_t m = max;
48     int64_t cur = count;
49     return
50       m &&
51       ((c <= m && cur + c > m) || // normally stay under max
52        (c >= m && cur > m));     // except for large c
53   }
54
55   bool _wait(int64_t c);
56
57 public:
58   /**
59    * gets the number of currently taken slots
60    * @returns the number of taken slots
61    */
62   int64_t get_current() const {
63     return count;
64   }
65
66   /**
67    * get the max number of slots
68    * @returns the max number of slots
69    */
70   int64_t get_max() const { return max; }
71
72   /**
73    * return true if past midpoint
74    */
75   bool past_midpoint() const {
76     return count >= max / 2;
77   }
78
79   /**
80    * set the new max number, and wait until the number of taken slots drains
81    * and drops below this limit.
82    *
83    * @param m the new max number
84    * @returns true if this method is blocked, false it it returns immediately
85    */
86   bool wait(int64_t m = 0);
87
88   /**
89    * take the specified number of slots from the stock regardless the throttling
90    * @param c number of slots to take
91    * @returns the total number of taken slots
92    */
93   int64_t take(int64_t c = 1);
94
95   /**
96    * get the specified amount of slots from the stock, but will wait if the
97    * total number taken by consumer would exceed the maximum number.
98    * @param c number of slots to get
99    * @param m new maximum number to set, ignored if it is 0
100    * @returns true if this request is blocked due to the throttling, false 
101    * otherwise
102    */
103   bool get(int64_t c = 1, int64_t m = 0);
104
105   /**
106    * the unblocked version of @p get()
107    * @returns true if it successfully got the requested amount,
108    * or false if it would block.
109    */
110   bool get_or_fail(int64_t c = 1);
111
112   /**
113    * put slots back to the stock
114    * @param c number of slots to return
115    * @returns number of requests being hold after this
116    */
117   int64_t put(int64_t c = 1);
118    /**
119    * reset the zero to the stock
120    */
121   void reset();
122
123   bool should_wait(int64_t c) const {
124     return _should_wait(c);
125   }
126   void reset_max(int64_t m) {
127     Mutex::Locker l(lock);
128     _reset_max(m);
129   }
130 };
131
132 /**
133  * BackoffThrottle
134  *
135  * Creates a throttle which gradually induces delays when get() is called
136  * based on params low_threshhold, high_threshhold, expected_throughput,
137  * high_multiple, and max_multiple.
138  *
139  * In [0, low_threshhold), we want no delay.
140  *
141  * In [low_threshhold, high_threshhold), delays should be injected based
142  * on a line from 0 at low_threshhold to
143  * high_multiple * (1/expected_throughput) at high_threshhold.
144  *
145  * In [high_threshhold, 1), we want delays injected based on a line from
146  * (high_multiple * (1/expected_throughput)) at high_threshhold to
147  * (high_multiple * (1/expected_throughput)) +
148  * (max_multiple * (1/expected_throughput)) at 1.
149  *
150  * Let the current throttle ratio (current/max) be r, low_threshhold be l,
151  * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
152  * and max_delay (max_muliple / expected_throughput) be m.
153  *
154  * delay = 0, r \in [0, l)
155  * delay = (r - l) * (e / (h - l)), r \in [l, h)
156  * delay = e + (r - h)((m - e)/(1 - h))
157  */
158 class BackoffThrottle {
159   CephContext *cct;
160   const std::string name;
161   PerfCounters *logger;
162
163   std::mutex lock;
164   using locker = std::unique_lock<std::mutex>;
165
166   unsigned next_cond = 0;
167
168   /// allocated once to avoid constantly allocating new ones
169   vector<std::condition_variable> conds;
170
171   const bool use_perf;
172
173   /// pointers into conds
174   list<std::condition_variable*> waiters;
175
176   std::list<std::condition_variable*>::iterator _push_waiter() {
177     unsigned next = next_cond++;
178     if (next_cond == conds.size())
179       next_cond = 0;
180     return waiters.insert(waiters.end(), &(conds[next]));
181   }
182
183   void _kick_waiters() {
184     if (!waiters.empty())
185       waiters.front()->notify_all();
186   }
187
188   /// see above, values are in [0, 1].
189   double low_threshhold = 0;
190   double high_threshhold = 1;
191
192   /// see above, values are in seconds
193   double high_delay_per_count = 0;
194   double max_delay_per_count = 0;
195
196   /// Filled in in set_params
197   double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
198   double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
199
200   /// max
201   uint64_t max = 0;
202   uint64_t current = 0;
203
204   std::chrono::duration<double> _get_delay(uint64_t c) const;
205
206 public:
207   /**
208    * set_params
209    *
210    * Sets params.  If the params are invalid, returns false
211    * and populates errstream (if non-null) with a user compreshensible
212    * explanation.
213    */
214   bool set_params(
215     double low_threshhold,
216     double high_threshhold,
217     double expected_throughput,
218     double high_multiple,
219     double max_multiple,
220     uint64_t throttle_max,
221     ostream *errstream);
222
223   std::chrono::duration<double> get(uint64_t c = 1);
224   std::chrono::duration<double> wait() {
225     return get(0);
226   }
227   uint64_t put(uint64_t c = 1);
228   uint64_t take(uint64_t c = 1);
229   uint64_t get_current();
230   uint64_t get_max();
231
232   BackoffThrottle(CephContext *cct, const std::string& n,
233     unsigned expected_concurrency, ///< [in] determines size of conds
234     bool _use_perf = true);
235   ~BackoffThrottle();
236 };
237
238
239 /**
240  * @class SimpleThrottle
241  * This is a simple way to bound the number of concurrent operations.
242  *
243  * It tracks the first error encountered, and makes it available
244  * when all requests are complete. wait_for_ret() should be called
245  * before the instance is destroyed.
246  *
247  * Re-using the same instance isn't safe if you want to check each set
248  * of operations for errors, since the return value is not reset.
249  */
250 class SimpleThrottle {
251 public:
252   SimpleThrottle(uint64_t max, bool ignore_enoent);
253   ~SimpleThrottle();
254   void start_op();
255   void end_op(int r);
256   bool pending_error() const;
257   int wait_for_ret();
258 private:
259   mutable Mutex m_lock;
260   Cond m_cond;
261   uint64_t m_max;
262   uint64_t m_current;
263   int m_ret;
264   bool m_ignore_enoent;
265   uint32_t waiters = 0;
266 };
267
268
269 class OrderedThrottle;
270
271 class C_OrderedThrottle : public Context {
272 public:
273   C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
274     : m_ordered_throttle(ordered_throttle), m_tid(tid) {
275   }
276
277 protected:
278   void finish(int r) override;
279
280 private:
281   OrderedThrottle *m_ordered_throttle;
282   uint64_t m_tid;
283 };
284
285 /**
286  * @class OrderedThrottle
287  * Throttles the maximum number of active requests and completes them in order
288  *
289  * Operations can complete out-of-order but their associated Context callback
290  * will completed in-order during invokation of start_op() and wait_for_ret()
291  */
292 class OrderedThrottle {
293 public:
294   OrderedThrottle(uint64_t max, bool ignore_enoent);
295   ~OrderedThrottle();
296
297   C_OrderedThrottle *start_op(Context *on_finish);
298   void end_op(int r);
299
300   bool pending_error() const;
301   int wait_for_ret();
302
303 protected:
304   friend class C_OrderedThrottle;
305
306   void finish_op(uint64_t tid, int r);
307
308 private:
309   struct Result {
310     bool finished;
311     int ret_val;
312     Context *on_finish;
313
314     Result(Context *_on_finish = NULL)
315       : finished(false), ret_val(0), on_finish(_on_finish) {
316     }
317   };
318
319   typedef std::map<uint64_t, Result> TidResult;
320
321   mutable Mutex m_lock;
322   Cond m_cond;
323   uint64_t m_max;
324   uint64_t m_current;
325   int m_ret_val;
326   bool m_ignore_enoent;
327
328   uint64_t m_next_tid;
329   uint64_t m_complete_tid;
330
331   TidResult m_tid_result;
332
333   void complete_pending_ops();
334   uint32_t waiters = 0;
335 };
336
337 #endif