Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/scope_guard.h"
5
6 #include "common/Throttle.h"
7 #include "common/perf_counters.h"
8
9 // re-include our assert to clobber the system one; fix dout:
10 #include "include/assert.h"
11
12 #define dout_subsys ceph_subsys_throttle
13
14 #undef dout_prefix
15 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
16
17 enum {
18   l_throttle_first = 532430,
19   l_throttle_val,
20   l_throttle_max,
21   l_throttle_get_started,
22   l_throttle_get,
23   l_throttle_get_sum,
24   l_throttle_get_or_fail_fail,
25   l_throttle_get_or_fail_success,
26   l_throttle_take,
27   l_throttle_take_sum,
28   l_throttle_put,
29   l_throttle_put_sum,
30   l_throttle_wait,
31   l_throttle_last,
32 };
33
34 Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
35   : cct(cct), name(n), logger(NULL),
36     max(m),
37     lock("Throttle::lock"),
38     use_perf(_use_perf)
39 {
40   assert(m >= 0);
41
42   if (!use_perf)
43     return;
44
45   if (cct->_conf->throttler_perf_counter) {
46     PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
47     b.add_u64(l_throttle_val, "val", "Currently available throttle");
48     b.add_u64(l_throttle_max, "max", "Max value for throttle");
49     b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
50     b.add_u64_counter(l_throttle_get, "get", "Gets");
51     b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
52     b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
53     b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
54     b.add_u64_counter(l_throttle_take, "take", "Takes");
55     b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
56     b.add_u64_counter(l_throttle_put, "put", "Puts");
57     b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
58     b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
59
60     logger = b.create_perf_counters();
61     cct->get_perfcounters_collection()->add(logger);
62     logger->set(l_throttle_max, max);
63   }
64 }
65
66 Throttle::~Throttle()
67 {
68   {
69     Mutex::Locker l(lock);
70     assert(cond.empty());
71   }
72
73   if (!use_perf)
74     return;
75
76   if (logger) {
77     cct->get_perfcounters_collection()->remove(logger);
78     delete logger;
79   }
80 }
81
82 void Throttle::_reset_max(int64_t m)
83 {
84   assert(lock.is_locked());
85   if (static_cast<int64_t>(max) == m)
86     return;
87   if (!cond.empty())
88     cond.front()->SignalOne();
89   if (logger)
90     logger->set(l_throttle_max, m);
91   max = m;
92 }
93
94 bool Throttle::_wait(int64_t c)
95 {
96   utime_t start;
97   bool waited = false;
98   if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
99     {
100       auto cv = cond.insert(cond.end(), new Cond);
101       auto w = make_scope_guard([this, cv]() {
102           delete *cv;
103           cond.erase(cv);
104         });
105       waited = true;
106       ldout(cct, 2) << "_wait waiting..." << dendl;
107       if (logger)
108         start = ceph_clock_now();
109
110       do {
111         (*cv)->Wait(lock);
112       } while ((_should_wait(c) || cv != cond.begin()));
113
114       ldout(cct, 2) << "_wait finished waiting" << dendl;
115       if (logger) {
116         utime_t dur = ceph_clock_now() - start;
117         logger->tinc(l_throttle_wait, dur);
118       }
119     }
120     // wake up the next guy
121     if (!cond.empty())
122       cond.front()->SignalOne();
123   }
124   return waited;
125 }
126
127 bool Throttle::wait(int64_t m)
128 {
129   if (0 == max && 0 == m) {
130     return false;
131   }
132
133   Mutex::Locker l(lock);
134   if (m) {
135     assert(m > 0);
136     _reset_max(m);
137   }
138   ldout(cct, 10) << "wait" << dendl;
139   return _wait(0);
140 }
141
142 int64_t Throttle::take(int64_t c)
143 {
144   if (0 == max) {
145     return 0;
146   }
147   assert(c >= 0);
148   ldout(cct, 10) << "take " << c << dendl;
149   {
150     Mutex::Locker l(lock);
151     count += c;
152   }
153   if (logger) {
154     logger->inc(l_throttle_take);
155     logger->inc(l_throttle_take_sum, c);
156     logger->set(l_throttle_val, count);
157   }
158   return count;
159 }
160
161 bool Throttle::get(int64_t c, int64_t m)
162 {
163   if (0 == max && 0 == m) {
164     return false;
165   }
166
167   assert(c >= 0);
168   ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
169   if (logger) {
170     logger->inc(l_throttle_get_started);
171   }
172   bool waited = false;
173   {
174     Mutex::Locker l(lock);
175     if (m) {
176       assert(m > 0);
177       _reset_max(m);
178     }
179     waited = _wait(c);
180     count += c;
181   }
182   if (logger) {
183     logger->inc(l_throttle_get);
184     logger->inc(l_throttle_get_sum, c);
185     logger->set(l_throttle_val, count);
186   }
187   return waited;
188 }
189
190 /* Returns true if it successfully got the requested amount,
191  * or false if it would block.
192  */
193 bool Throttle::get_or_fail(int64_t c)
194 {
195   if (0 == max) {
196     return true;
197   }
198
199   assert (c >= 0);
200   Mutex::Locker l(lock);
201   if (_should_wait(c) || !cond.empty()) {
202     ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
203     if (logger) {
204       logger->inc(l_throttle_get_or_fail_fail);
205     }
206     return false;
207   } else {
208     ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
209     count += c;
210     if (logger) {
211       logger->inc(l_throttle_get_or_fail_success);
212       logger->inc(l_throttle_get);
213       logger->inc(l_throttle_get_sum, c);
214       logger->set(l_throttle_val, count);
215     }
216     return true;
217   }
218 }
219
220 int64_t Throttle::put(int64_t c)
221 {
222   if (0 == max) {
223     return 0;
224   }
225
226   assert(c >= 0);
227   ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
228   Mutex::Locker l(lock);
229   if (c) {
230     if (!cond.empty())
231       cond.front()->SignalOne();
232     assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
233     count -= c;
234     if (logger) {
235       logger->inc(l_throttle_put);
236       logger->inc(l_throttle_put_sum, c);
237       logger->set(l_throttle_val, count);
238     }
239   }
240   return count;
241 }
242
243 void Throttle::reset()
244 {
245   Mutex::Locker l(lock);
246   if (!cond.empty())
247     cond.front()->SignalOne();
248   count = 0;
249   if (logger) {
250     logger->set(l_throttle_val, 0);
251   }
252 }
253
254 enum {
255   l_backoff_throttle_first = l_throttle_last + 1,
256   l_backoff_throttle_val,
257   l_backoff_throttle_max,
258   l_backoff_throttle_get,
259   l_backoff_throttle_get_sum,
260   l_backoff_throttle_take,
261   l_backoff_throttle_take_sum,
262   l_backoff_throttle_put,
263   l_backoff_throttle_put_sum,
264   l_backoff_throttle_wait,
265   l_backoff_throttle_last,
266 };
267
268 BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
269   : cct(cct), name(n), logger(NULL),
270     conds(expected_concurrency),///< [in] determines size of conds
271     use_perf(_use_perf)
272 {
273   if (!use_perf)
274     return;
275
276   if (cct->_conf->throttler_perf_counter) {
277     PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
278     b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
279     b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
280     b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
281     b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
282     b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
283     b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
284     b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
285     b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
286     b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
287
288     logger = b.create_perf_counters();
289     cct->get_perfcounters_collection()->add(logger);
290     logger->set(l_backoff_throttle_max, max);
291   }
292 }
293
294 BackoffThrottle::~BackoffThrottle()
295 {
296   {
297     locker l(lock);
298     assert(waiters.empty());
299   }
300
301   if (!use_perf)
302     return;
303
304   if (logger) {
305     cct->get_perfcounters_collection()->remove(logger);
306     delete logger;
307   }
308 }
309
310 bool BackoffThrottle::set_params(
311   double _low_threshhold,
312   double _high_threshhold,
313   double _expected_throughput,
314   double _high_multiple,
315   double _max_multiple,
316   uint64_t _throttle_max,
317   ostream *errstream)
318 {
319   bool valid = true;
320   if (_low_threshhold > _high_threshhold) {
321     valid = false;
322     if (errstream) {
323       *errstream << "low_threshhold (" << _low_threshhold
324                  << ") > high_threshhold (" << _high_threshhold
325                  << ")" << std::endl;
326     }
327   }
328
329   if (_high_multiple > _max_multiple) {
330     valid = false;
331     if (errstream) {
332       *errstream << "_high_multiple (" << _high_multiple
333                  << ") > _max_multiple (" << _max_multiple
334                  << ")" << std::endl;
335     }
336   }
337
338   if (_low_threshhold > 1 || _low_threshhold < 0) {
339     valid = false;
340     if (errstream) {
341       *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
342                  << std::endl;
343     }
344   }
345
346   if (_high_threshhold > 1 || _high_threshhold < 0) {
347     valid = false;
348     if (errstream) {
349       *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
350                  << std::endl;
351     }
352   }
353
354   if (_max_multiple < 0) {
355     valid = false;
356     if (errstream) {
357       *errstream << "invalid _max_multiple ("
358                  << _max_multiple << ")"
359                  << std::endl;
360     }
361   }
362
363   if (_high_multiple < 0) {
364     valid = false;
365     if (errstream) {
366       *errstream << "invalid _high_multiple ("
367                  << _high_multiple << ")"
368                  << std::endl;
369     }
370   }
371
372   if (_expected_throughput < 0) {
373     valid = false;
374     if (errstream) {
375       *errstream << "invalid _expected_throughput("
376                  << _expected_throughput << ")"
377                  << std::endl;
378     }
379   }
380
381   if (!valid)
382     return false;
383
384   locker l(lock);
385   low_threshhold = _low_threshhold;
386   high_threshhold = _high_threshhold;
387   high_delay_per_count = _high_multiple / _expected_throughput;
388   max_delay_per_count = _max_multiple / _expected_throughput;
389   max = _throttle_max;
390
391   if (logger)
392     logger->set(l_backoff_throttle_max, max);
393
394   if (high_threshhold - low_threshhold > 0) {
395     s0 = high_delay_per_count / (high_threshhold - low_threshhold);
396   } else {
397     low_threshhold = high_threshhold;
398     s0 = 0;
399   }
400
401   if (1 - high_threshhold > 0) {
402     s1 = (max_delay_per_count - high_delay_per_count)
403       / (1 - high_threshhold);
404   } else {
405     high_threshhold = 1;
406     s1 = 0;
407   }
408
409   _kick_waiters();
410   return true;
411 }
412
413 std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
414 {
415   if (max == 0)
416     return std::chrono::duration<double>(0);
417
418   double r = ((double)current) / ((double)max);
419   if (r < low_threshhold) {
420     return std::chrono::duration<double>(0);
421   } else if (r < high_threshhold) {
422     return c * std::chrono::duration<double>(
423       (r - low_threshhold) * s0);
424   } else {
425     return c * std::chrono::duration<double>(
426       high_delay_per_count + ((r - high_threshhold) * s1));
427   }
428 }
429
430 std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
431 {
432   locker l(lock);
433   auto delay = _get_delay(c);
434
435   if (logger) {
436     logger->inc(l_backoff_throttle_get);
437     logger->inc(l_backoff_throttle_get_sum, c);
438   }
439
440   // fast path
441   if (delay == std::chrono::duration<double>(0) &&
442       waiters.empty() &&
443       ((max == 0) || (current == 0) || ((current + c) <= max))) {
444     current += c;
445
446     if (logger) {
447       logger->set(l_backoff_throttle_val, current);
448     }
449
450     return std::chrono::duration<double>(0);
451   }
452
453   auto ticket = _push_waiter();
454   utime_t wait_from = ceph_clock_now();
455   bool waited = false;
456
457   while (waiters.begin() != ticket) {
458     (*ticket)->wait(l);
459     waited = true;
460   }
461
462   auto start = std::chrono::system_clock::now();
463   delay = _get_delay(c);
464   while (true) {
465     if (!((max == 0) || (current == 0) || (current + c) <= max)) {
466       (*ticket)->wait(l);
467       waited = true;
468     } else if (delay > std::chrono::duration<double>(0)) {
469       (*ticket)->wait_for(l, delay);
470       waited = true;
471     } else {
472       break;
473     }
474     assert(ticket == waiters.begin());
475     delay = _get_delay(c) - (std::chrono::system_clock::now() - start);
476   }
477   waiters.pop_front();
478   _kick_waiters();
479
480   current += c;
481
482   if (logger) {
483     logger->set(l_backoff_throttle_val, current);
484     if (waited) {
485       logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
486     }
487   }
488
489   return std::chrono::system_clock::now() - start;
490 }
491
492 uint64_t BackoffThrottle::put(uint64_t c)
493 {
494   locker l(lock);
495   assert(current >= c);
496   current -= c;
497   _kick_waiters();
498
499   if (logger) {
500     logger->inc(l_backoff_throttle_put);
501     logger->inc(l_backoff_throttle_put_sum, c);
502     logger->set(l_backoff_throttle_val, current);
503   }
504
505   return current;
506 }
507
508 uint64_t BackoffThrottle::take(uint64_t c)
509 {
510   locker l(lock);
511   current += c;
512
513   if (logger) {
514     logger->inc(l_backoff_throttle_take);
515     logger->inc(l_backoff_throttle_take_sum, c);
516     logger->set(l_backoff_throttle_val, current);
517   }
518
519   return current;
520 }
521
522 uint64_t BackoffThrottle::get_current()
523 {
524   locker l(lock);
525   return current;
526 }
527
528 uint64_t BackoffThrottle::get_max()
529 {
530   locker l(lock);
531   return max;
532 }
533
534 SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
535   : m_lock("SimpleThrottle"),
536     m_max(max),
537     m_current(0),
538     m_ret(0),
539     m_ignore_enoent(ignore_enoent)
540 {
541 }
542
543 SimpleThrottle::~SimpleThrottle()
544 {
545   Mutex::Locker l(m_lock);
546   assert(m_current == 0);
547   assert(waiters == 0);
548 }
549
550 void SimpleThrottle::start_op()
551 {
552   Mutex::Locker l(m_lock);
553   while (m_max == m_current) {
554     waiters++;
555     m_cond.Wait(m_lock);
556     waiters--;
557   }
558   ++m_current;
559 }
560
561 void SimpleThrottle::end_op(int r)
562 {
563   Mutex::Locker l(m_lock);
564   --m_current;
565   if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
566     m_ret = r;
567   m_cond.Signal();
568 }
569
570 bool SimpleThrottle::pending_error() const
571 {
572   Mutex::Locker l(m_lock);
573   return (m_ret < 0);
574 }
575
576 int SimpleThrottle::wait_for_ret()
577 {
578   Mutex::Locker l(m_lock);
579   while (m_current > 0) {
580     waiters++;
581     m_cond.Wait(m_lock);
582     waiters--;
583   }
584   return m_ret;
585 }
586
587 void C_OrderedThrottle::finish(int r) {
588   m_ordered_throttle->finish_op(m_tid, r);
589 }
590
591 OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
592   : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
593     m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
594 }
595
596 OrderedThrottle::~OrderedThrottle() {
597   Mutex::Locker locker(m_lock);
598   assert(waiters == 0);
599 }
600
601 C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
602   assert(on_finish != NULL);
603
604   Mutex::Locker locker(m_lock);
605   uint64_t tid = m_next_tid++;
606   m_tid_result[tid] = Result(on_finish);
607   C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
608
609   complete_pending_ops();
610   while (m_max == m_current) {
611     ++waiters;
612     m_cond.Wait(m_lock);
613     --waiters;
614     complete_pending_ops();
615   }
616   ++m_current;
617
618   return ctx;
619 }
620
621 void OrderedThrottle::end_op(int r) {
622   Mutex::Locker locker(m_lock);
623   assert(m_current > 0);
624
625   if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
626     m_ret_val = r;
627   }
628   --m_current;
629   m_cond.Signal();
630 }
631
632 void OrderedThrottle::finish_op(uint64_t tid, int r) {
633   Mutex::Locker locker(m_lock);
634
635   TidResult::iterator it = m_tid_result.find(tid);
636   assert(it != m_tid_result.end());
637
638   it->second.finished = true;
639   it->second.ret_val = r;
640   m_cond.Signal();
641 }
642
643 bool OrderedThrottle::pending_error() const {
644   Mutex::Locker locker(m_lock);
645   return (m_ret_val < 0);
646 }
647
648 int OrderedThrottle::wait_for_ret() {
649   Mutex::Locker locker(m_lock);
650   complete_pending_ops();
651
652   while (m_current > 0) {
653     ++waiters;
654     m_cond.Wait(m_lock);
655     --waiters;
656     complete_pending_ops();
657   }
658   return m_ret_val;
659 }
660
661 void OrderedThrottle::complete_pending_ops() {
662   assert(m_lock.is_locked());
663
664   while (true) {
665     TidResult::iterator it = m_tid_result.begin();
666     if (it == m_tid_result.end() || it->first != m_complete_tid ||
667         !it->second.finished) {
668       break;
669     }
670
671     Result result = it->second;
672     m_tid_result.erase(it);
673
674     m_lock.Unlock();
675     result.on_finish->complete(result.ret_val);
676     m_lock.Lock();
677
678     ++m_complete_tid;
679   }
680 }