X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.cc;fp=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=1d84be68d4f183641f951b9510697f56d04ebe9d;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/common/Throttle.cc b/src/ceph/src/common/Throttle.cc deleted file mode 100644 index 1d84be6..0000000 --- a/src/ceph/src/common/Throttle.cc +++ /dev/null @@ -1,680 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "include/scope_guard.h" - -#include "common/Throttle.h" -#include "common/perf_counters.h" - -// re-include our assert to clobber the system one; fix dout: -#include "include/assert.h" - -#define dout_subsys ceph_subsys_throttle - -#undef dout_prefix -#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " - -enum { - l_throttle_first = 532430, - l_throttle_val, - l_throttle_max, - l_throttle_get_started, - l_throttle_get, - l_throttle_get_sum, - l_throttle_get_or_fail_fail, - l_throttle_get_or_fail_success, - l_throttle_take, - l_throttle_take_sum, - l_throttle_put, - l_throttle_put_sum, - l_throttle_wait, - l_throttle_last, -}; - -Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf) - : cct(cct), name(n), logger(NULL), - max(m), - lock("Throttle::lock"), - use_perf(_use_perf) -{ - assert(m >= 0); - - if (!use_perf) - return; - - if (cct->_conf->throttler_perf_counter) { - PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last); - b.add_u64(l_throttle_val, "val", "Currently available throttle"); - b.add_u64(l_throttle_max, "max", "Max value for throttle"); - b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait"); - b.add_u64_counter(l_throttle_get, "get", "Gets"); - b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data"); - b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail"); - b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail"); - b.add_u64_counter(l_throttle_take, "take", "Takes"); - b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data"); - b.add_u64_counter(l_throttle_put, "put", "Puts"); - b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); - b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); - - logger = b.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - logger->set(l_throttle_max, max); - } -} - -Throttle::~Throttle() -{ - { - Mutex::Locker l(lock); - assert(cond.empty()); - } - - if (!use_perf) - return; - - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - } -} - -void Throttle::_reset_max(int64_t m) -{ - assert(lock.is_locked()); - if (static_cast(max) == m) - return; - if (!cond.empty()) - cond.front()->SignalOne(); - if (logger) - logger->set(l_throttle_max, m); - max = m; -} - -bool Throttle::_wait(int64_t c) -{ - utime_t start; - bool waited = false; - if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. - { - auto cv = cond.insert(cond.end(), new Cond); - auto w = make_scope_guard([this, cv]() { - delete *cv; - cond.erase(cv); - }); - waited = true; - ldout(cct, 2) << "_wait waiting..." << dendl; - if (logger) - start = ceph_clock_now(); - - do { - (*cv)->Wait(lock); - } while ((_should_wait(c) || cv != cond.begin())); - - ldout(cct, 2) << "_wait finished waiting" << dendl; - if (logger) { - utime_t dur = ceph_clock_now() - start; - logger->tinc(l_throttle_wait, dur); - } - } - // wake up the next guy - if (!cond.empty()) - cond.front()->SignalOne(); - } - return waited; -} - -bool Throttle::wait(int64_t m) -{ - if (0 == max && 0 == m) { - return false; - } - - Mutex::Locker l(lock); - if (m) { - assert(m > 0); - _reset_max(m); - } - ldout(cct, 10) << "wait" << dendl; - return _wait(0); -} - -int64_t Throttle::take(int64_t c) -{ - if (0 == max) { - return 0; - } - assert(c >= 0); - ldout(cct, 10) << "take " << c << dendl; - { - Mutex::Locker l(lock); - count += c; - } - if (logger) { - logger->inc(l_throttle_take); - logger->inc(l_throttle_take_sum, c); - logger->set(l_throttle_val, count); - } - return count; -} - -bool Throttle::get(int64_t c, int64_t m) -{ - if (0 == max && 0 == m) { - return false; - } - - assert(c >= 0); - ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; - if (logger) { - logger->inc(l_throttle_get_started); - } - bool waited = false; - { - Mutex::Locker l(lock); - if (m) { - assert(m > 0); - _reset_max(m); - } - waited = _wait(c); - count += c; - } - if (logger) { - logger->inc(l_throttle_get); - logger->inc(l_throttle_get_sum, c); - logger->set(l_throttle_val, count); - } - return waited; -} - -/* Returns true if it successfully got the requested amount, - * or false if it would block. - */ -bool Throttle::get_or_fail(int64_t c) -{ - if (0 == max) { - return true; - } - - assert (c >= 0); - Mutex::Locker l(lock); - if (_should_wait(c) || !cond.empty()) { - ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; - if (logger) { - logger->inc(l_throttle_get_or_fail_fail); - } - return false; - } else { - ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; - count += c; - if (logger) { - logger->inc(l_throttle_get_or_fail_success); - logger->inc(l_throttle_get); - logger->inc(l_throttle_get_sum, c); - logger->set(l_throttle_val, count); - } - return true; - } -} - -int64_t Throttle::put(int64_t c) -{ - if (0 == max) { - return 0; - } - - assert(c >= 0); - ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl; - Mutex::Locker l(lock); - if (c) { - if (!cond.empty()) - cond.front()->SignalOne(); - assert(static_cast(count) >= c); // if count goes negative, we failed somewhere! - count -= c; - if (logger) { - logger->inc(l_throttle_put); - logger->inc(l_throttle_put_sum, c); - logger->set(l_throttle_val, count); - } - } - return count; -} - -void Throttle::reset() -{ - Mutex::Locker l(lock); - if (!cond.empty()) - cond.front()->SignalOne(); - count = 0; - if (logger) { - logger->set(l_throttle_val, 0); - } -} - -enum { - l_backoff_throttle_first = l_throttle_last + 1, - l_backoff_throttle_val, - l_backoff_throttle_max, - l_backoff_throttle_get, - l_backoff_throttle_get_sum, - l_backoff_throttle_take, - l_backoff_throttle_take_sum, - l_backoff_throttle_put, - l_backoff_throttle_put_sum, - l_backoff_throttle_wait, - l_backoff_throttle_last, -}; - -BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf) - : cct(cct), name(n), logger(NULL), - conds(expected_concurrency),///< [in] determines size of conds - use_perf(_use_perf) -{ - if (!use_perf) - return; - - if (cct->_conf->throttler_perf_counter) { - PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last); - b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); - b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); - b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); - b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data"); - b.add_u64_counter(l_backoff_throttle_take, "take", "Takes"); - b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data"); - b.add_u64_counter(l_backoff_throttle_put, "put", "Puts"); - b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); - b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); - - logger = b.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - logger->set(l_backoff_throttle_max, max); - } -} - -BackoffThrottle::~BackoffThrottle() -{ - { - locker l(lock); - assert(waiters.empty()); - } - - if (!use_perf) - return; - - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - } -} - -bool BackoffThrottle::set_params( - double _low_threshhold, - double _high_threshhold, - double _expected_throughput, - double _high_multiple, - double _max_multiple, - uint64_t _throttle_max, - ostream *errstream) -{ - bool valid = true; - if (_low_threshhold > _high_threshhold) { - valid = false; - if (errstream) { - *errstream << "low_threshhold (" << _low_threshhold - << ") > high_threshhold (" << _high_threshhold - << ")" << std::endl; - } - } - - if (_high_multiple > _max_multiple) { - valid = false; - if (errstream) { - *errstream << "_high_multiple (" << _high_multiple - << ") > _max_multiple (" << _max_multiple - << ")" << std::endl; - } - } - - if (_low_threshhold > 1 || _low_threshhold < 0) { - valid = false; - if (errstream) { - *errstream << "invalid low_threshhold (" << _low_threshhold << ")" - << std::endl; - } - } - - if (_high_threshhold > 1 || _high_threshhold < 0) { - valid = false; - if (errstream) { - *errstream << "invalid high_threshhold (" << _high_threshhold << ")" - << std::endl; - } - } - - if (_max_multiple < 0) { - valid = false; - if (errstream) { - *errstream << "invalid _max_multiple (" - << _max_multiple << ")" - << std::endl; - } - } - - if (_high_multiple < 0) { - valid = false; - if (errstream) { - *errstream << "invalid _high_multiple (" - << _high_multiple << ")" - << std::endl; - } - } - - if (_expected_throughput < 0) { - valid = false; - if (errstream) { - *errstream << "invalid _expected_throughput(" - << _expected_throughput << ")" - << std::endl; - } - } - - if (!valid) - return false; - - locker l(lock); - low_threshhold = _low_threshhold; - high_threshhold = _high_threshhold; - high_delay_per_count = _high_multiple / _expected_throughput; - max_delay_per_count = _max_multiple / _expected_throughput; - max = _throttle_max; - - if (logger) - logger->set(l_backoff_throttle_max, max); - - if (high_threshhold - low_threshhold > 0) { - s0 = high_delay_per_count / (high_threshhold - low_threshhold); - } else { - low_threshhold = high_threshhold; - s0 = 0; - } - - if (1 - high_threshhold > 0) { - s1 = (max_delay_per_count - high_delay_per_count) - / (1 - high_threshhold); - } else { - high_threshhold = 1; - s1 = 0; - } - - _kick_waiters(); - return true; -} - -std::chrono::duration BackoffThrottle::_get_delay(uint64_t c) const -{ - if (max == 0) - return std::chrono::duration(0); - - double r = ((double)current) / ((double)max); - if (r < low_threshhold) { - return std::chrono::duration(0); - } else if (r < high_threshhold) { - return c * std::chrono::duration( - (r - low_threshhold) * s0); - } else { - return c * std::chrono::duration( - high_delay_per_count + ((r - high_threshhold) * s1)); - } -} - -std::chrono::duration BackoffThrottle::get(uint64_t c) -{ - locker l(lock); - auto delay = _get_delay(c); - - if (logger) { - logger->inc(l_backoff_throttle_get); - logger->inc(l_backoff_throttle_get_sum, c); - } - - // fast path - if (delay == std::chrono::duration(0) && - waiters.empty() && - ((max == 0) || (current == 0) || ((current + c) <= max))) { - current += c; - - if (logger) { - logger->set(l_backoff_throttle_val, current); - } - - return std::chrono::duration(0); - } - - auto ticket = _push_waiter(); - utime_t wait_from = ceph_clock_now(); - bool waited = false; - - while (waiters.begin() != ticket) { - (*ticket)->wait(l); - waited = true; - } - - auto start = std::chrono::system_clock::now(); - delay = _get_delay(c); - while (true) { - if (!((max == 0) || (current == 0) || (current + c) <= max)) { - (*ticket)->wait(l); - waited = true; - } else if (delay > std::chrono::duration(0)) { - (*ticket)->wait_for(l, delay); - waited = true; - } else { - break; - } - assert(ticket == waiters.begin()); - delay = _get_delay(c) - (std::chrono::system_clock::now() - start); - } - waiters.pop_front(); - _kick_waiters(); - - current += c; - - if (logger) { - logger->set(l_backoff_throttle_val, current); - if (waited) { - logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from); - } - } - - return std::chrono::system_clock::now() - start; -} - -uint64_t BackoffThrottle::put(uint64_t c) -{ - locker l(lock); - assert(current >= c); - current -= c; - _kick_waiters(); - - if (logger) { - logger->inc(l_backoff_throttle_put); - logger->inc(l_backoff_throttle_put_sum, c); - logger->set(l_backoff_throttle_val, current); - } - - return current; -} - -uint64_t BackoffThrottle::take(uint64_t c) -{ - locker l(lock); - current += c; - - if (logger) { - logger->inc(l_backoff_throttle_take); - logger->inc(l_backoff_throttle_take_sum, c); - logger->set(l_backoff_throttle_val, current); - } - - return current; -} - -uint64_t BackoffThrottle::get_current() -{ - locker l(lock); - return current; -} - -uint64_t BackoffThrottle::get_max() -{ - locker l(lock); - return max; -} - -SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) - : m_lock("SimpleThrottle"), - m_max(max), - m_current(0), - m_ret(0), - m_ignore_enoent(ignore_enoent) -{ -} - -SimpleThrottle::~SimpleThrottle() -{ - Mutex::Locker l(m_lock); - assert(m_current == 0); - assert(waiters == 0); -} - -void SimpleThrottle::start_op() -{ - Mutex::Locker l(m_lock); - while (m_max == m_current) { - waiters++; - m_cond.Wait(m_lock); - waiters--; - } - ++m_current; -} - -void SimpleThrottle::end_op(int r) -{ - Mutex::Locker l(m_lock); - --m_current; - if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) - m_ret = r; - m_cond.Signal(); -} - -bool SimpleThrottle::pending_error() const -{ - Mutex::Locker l(m_lock); - return (m_ret < 0); -} - -int SimpleThrottle::wait_for_ret() -{ - Mutex::Locker l(m_lock); - while (m_current > 0) { - waiters++; - m_cond.Wait(m_lock); - waiters--; - } - return m_ret; -} - -void C_OrderedThrottle::finish(int r) { - m_ordered_throttle->finish_op(m_tid, r); -} - -OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) - : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0), - m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) { -} - -OrderedThrottle::~OrderedThrottle() { - Mutex::Locker locker(m_lock); - assert(waiters == 0); -} - -C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { - assert(on_finish != NULL); - - Mutex::Locker locker(m_lock); - uint64_t tid = m_next_tid++; - m_tid_result[tid] = Result(on_finish); - C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid); - - complete_pending_ops(); - while (m_max == m_current) { - ++waiters; - m_cond.Wait(m_lock); - --waiters; - complete_pending_ops(); - } - ++m_current; - - return ctx; -} - -void OrderedThrottle::end_op(int r) { - Mutex::Locker locker(m_lock); - assert(m_current > 0); - - if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { - m_ret_val = r; - } - --m_current; - m_cond.Signal(); -} - -void OrderedThrottle::finish_op(uint64_t tid, int r) { - Mutex::Locker locker(m_lock); - - TidResult::iterator it = m_tid_result.find(tid); - assert(it != m_tid_result.end()); - - it->second.finished = true; - it->second.ret_val = r; - m_cond.Signal(); -} - -bool OrderedThrottle::pending_error() const { - Mutex::Locker locker(m_lock); - return (m_ret_val < 0); -} - -int OrderedThrottle::wait_for_ret() { - Mutex::Locker locker(m_lock); - complete_pending_ops(); - - while (m_current > 0) { - ++waiters; - m_cond.Wait(m_lock); - --waiters; - complete_pending_ops(); - } - return m_ret_val; -} - -void OrderedThrottle::complete_pending_ops() { - assert(m_lock.is_locked()); - - while (true) { - TidResult::iterator it = m_tid_result.begin(); - if (it == m_tid_result.end() || it->first != m_complete_tid || - !it->second.finished) { - break; - } - - Result result = it->second; - m_tid_result.erase(it); - - m_lock.Unlock(); - result.on_finish->complete(result.ret_val); - m_lock.Lock(); - - ++m_complete_tid; - } -}