X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.cc;fp=src%2Fceph%2Fsrc%2Fcommon%2FThrottle.cc;h=1d84be68d4f183641f951b9510697f56d04ebe9d;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/common/Throttle.cc b/src/ceph/src/common/Throttle.cc new file mode 100644 index 0000000..1d84be6 --- /dev/null +++ b/src/ceph/src/common/Throttle.cc @@ -0,0 +1,680 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/scope_guard.h" + +#include "common/Throttle.h" +#include "common/perf_counters.h" + +// re-include our assert to clobber the system one; fix dout: +#include "include/assert.h" + +#define dout_subsys ceph_subsys_throttle + +#undef dout_prefix +#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " + +enum { + l_throttle_first = 532430, + l_throttle_val, + l_throttle_max, + l_throttle_get_started, + l_throttle_get, + l_throttle_get_sum, + l_throttle_get_or_fail_fail, + l_throttle_get_or_fail_success, + l_throttle_take, + l_throttle_take_sum, + l_throttle_put, + l_throttle_put_sum, + l_throttle_wait, + l_throttle_last, +}; + +Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf) + : cct(cct), name(n), logger(NULL), + max(m), + lock("Throttle::lock"), + use_perf(_use_perf) +{ + assert(m >= 0); + + if (!use_perf) + return; + + if (cct->_conf->throttler_perf_counter) { + PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last); + b.add_u64(l_throttle_val, "val", "Currently available throttle"); + b.add_u64(l_throttle_max, "max", "Max value for throttle"); + b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait"); + b.add_u64_counter(l_throttle_get, "get", "Gets"); + b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data"); + b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail"); + b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail"); + b.add_u64_counter(l_throttle_take, "take", "Takes"); + b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data"); + b.add_u64_counter(l_throttle_put, "put", "Puts"); + b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); + b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); + + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + logger->set(l_throttle_max, max); + } +} + +Throttle::~Throttle() +{ + { + Mutex::Locker l(lock); + assert(cond.empty()); + } + + if (!use_perf) + return; + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + } +} + +void Throttle::_reset_max(int64_t m) +{ + assert(lock.is_locked()); + if (static_cast(max) == m) + return; + if (!cond.empty()) + cond.front()->SignalOne(); + if (logger) + logger->set(l_throttle_max, m); + max = m; +} + +bool Throttle::_wait(int64_t c) +{ + utime_t start; + bool waited = false; + if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. + { + auto cv = cond.insert(cond.end(), new Cond); + auto w = make_scope_guard([this, cv]() { + delete *cv; + cond.erase(cv); + }); + waited = true; + ldout(cct, 2) << "_wait waiting..." << dendl; + if (logger) + start = ceph_clock_now(); + + do { + (*cv)->Wait(lock); + } while ((_should_wait(c) || cv != cond.begin())); + + ldout(cct, 2) << "_wait finished waiting" << dendl; + if (logger) { + utime_t dur = ceph_clock_now() - start; + logger->tinc(l_throttle_wait, dur); + } + } + // wake up the next guy + if (!cond.empty()) + cond.front()->SignalOne(); + } + return waited; +} + +bool Throttle::wait(int64_t m) +{ + if (0 == max && 0 == m) { + return false; + } + + Mutex::Locker l(lock); + if (m) { + assert(m > 0); + _reset_max(m); + } + ldout(cct, 10) << "wait" << dendl; + return _wait(0); +} + +int64_t Throttle::take(int64_t c) +{ + if (0 == max) { + return 0; + } + assert(c >= 0); + ldout(cct, 10) << "take " << c << dendl; + { + Mutex::Locker l(lock); + count += c; + } + if (logger) { + logger->inc(l_throttle_take); + logger->inc(l_throttle_take_sum, c); + logger->set(l_throttle_val, count); + } + return count; +} + +bool Throttle::get(int64_t c, int64_t m) +{ + if (0 == max && 0 == m) { + return false; + } + + assert(c >= 0); + ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; + if (logger) { + logger->inc(l_throttle_get_started); + } + bool waited = false; + { + Mutex::Locker l(lock); + if (m) { + assert(m > 0); + _reset_max(m); + } + waited = _wait(c); + count += c; + } + if (logger) { + logger->inc(l_throttle_get); + logger->inc(l_throttle_get_sum, c); + logger->set(l_throttle_val, count); + } + return waited; +} + +/* Returns true if it successfully got the requested amount, + * or false if it would block. + */ +bool Throttle::get_or_fail(int64_t c) +{ + if (0 == max) { + return true; + } + + assert (c >= 0); + Mutex::Locker l(lock); + if (_should_wait(c) || !cond.empty()) { + ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; + if (logger) { + logger->inc(l_throttle_get_or_fail_fail); + } + return false; + } else { + ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; + count += c; + if (logger) { + logger->inc(l_throttle_get_or_fail_success); + logger->inc(l_throttle_get); + logger->inc(l_throttle_get_sum, c); + logger->set(l_throttle_val, count); + } + return true; + } +} + +int64_t Throttle::put(int64_t c) +{ + if (0 == max) { + return 0; + } + + assert(c >= 0); + ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl; + Mutex::Locker l(lock); + if (c) { + if (!cond.empty()) + cond.front()->SignalOne(); + assert(static_cast(count) >= c); // if count goes negative, we failed somewhere! + count -= c; + if (logger) { + logger->inc(l_throttle_put); + logger->inc(l_throttle_put_sum, c); + logger->set(l_throttle_val, count); + } + } + return count; +} + +void Throttle::reset() +{ + Mutex::Locker l(lock); + if (!cond.empty()) + cond.front()->SignalOne(); + count = 0; + if (logger) { + logger->set(l_throttle_val, 0); + } +} + +enum { + l_backoff_throttle_first = l_throttle_last + 1, + l_backoff_throttle_val, + l_backoff_throttle_max, + l_backoff_throttle_get, + l_backoff_throttle_get_sum, + l_backoff_throttle_take, + l_backoff_throttle_take_sum, + l_backoff_throttle_put, + l_backoff_throttle_put_sum, + l_backoff_throttle_wait, + l_backoff_throttle_last, +}; + +BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf) + : cct(cct), name(n), logger(NULL), + conds(expected_concurrency),///< [in] determines size of conds + use_perf(_use_perf) +{ + if (!use_perf) + return; + + if (cct->_conf->throttler_perf_counter) { + PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last); + b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); + b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); + b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); + b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data"); + b.add_u64_counter(l_backoff_throttle_take, "take", "Takes"); + b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data"); + b.add_u64_counter(l_backoff_throttle_put, "put", "Puts"); + b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); + b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); + + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + logger->set(l_backoff_throttle_max, max); + } +} + +BackoffThrottle::~BackoffThrottle() +{ + { + locker l(lock); + assert(waiters.empty()); + } + + if (!use_perf) + return; + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + } +} + +bool BackoffThrottle::set_params( + double _low_threshhold, + double _high_threshhold, + double _expected_throughput, + double _high_multiple, + double _max_multiple, + uint64_t _throttle_max, + ostream *errstream) +{ + bool valid = true; + if (_low_threshhold > _high_threshhold) { + valid = false; + if (errstream) { + *errstream << "low_threshhold (" << _low_threshhold + << ") > high_threshhold (" << _high_threshhold + << ")" << std::endl; + } + } + + if (_high_multiple > _max_multiple) { + valid = false; + if (errstream) { + *errstream << "_high_multiple (" << _high_multiple + << ") > _max_multiple (" << _max_multiple + << ")" << std::endl; + } + } + + if (_low_threshhold > 1 || _low_threshhold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid low_threshhold (" << _low_threshhold << ")" + << std::endl; + } + } + + if (_high_threshhold > 1 || _high_threshhold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid high_threshhold (" << _high_threshhold << ")" + << std::endl; + } + } + + if (_max_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _max_multiple (" + << _max_multiple << ")" + << std::endl; + } + } + + if (_high_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _high_multiple (" + << _high_multiple << ")" + << std::endl; + } + } + + if (_expected_throughput < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _expected_throughput(" + << _expected_throughput << ")" + << std::endl; + } + } + + if (!valid) + return false; + + locker l(lock); + low_threshhold = _low_threshhold; + high_threshhold = _high_threshhold; + high_delay_per_count = _high_multiple / _expected_throughput; + max_delay_per_count = _max_multiple / _expected_throughput; + max = _throttle_max; + + if (logger) + logger->set(l_backoff_throttle_max, max); + + if (high_threshhold - low_threshhold > 0) { + s0 = high_delay_per_count / (high_threshhold - low_threshhold); + } else { + low_threshhold = high_threshhold; + s0 = 0; + } + + if (1 - high_threshhold > 0) { + s1 = (max_delay_per_count - high_delay_per_count) + / (1 - high_threshhold); + } else { + high_threshhold = 1; + s1 = 0; + } + + _kick_waiters(); + return true; +} + +std::chrono::duration BackoffThrottle::_get_delay(uint64_t c) const +{ + if (max == 0) + return std::chrono::duration(0); + + double r = ((double)current) / ((double)max); + if (r < low_threshhold) { + return std::chrono::duration(0); + } else if (r < high_threshhold) { + return c * std::chrono::duration( + (r - low_threshhold) * s0); + } else { + return c * std::chrono::duration( + high_delay_per_count + ((r - high_threshhold) * s1)); + } +} + +std::chrono::duration BackoffThrottle::get(uint64_t c) +{ + locker l(lock); + auto delay = _get_delay(c); + + if (logger) { + logger->inc(l_backoff_throttle_get); + logger->inc(l_backoff_throttle_get_sum, c); + } + + // fast path + if (delay == std::chrono::duration(0) && + waiters.empty() && + ((max == 0) || (current == 0) || ((current + c) <= max))) { + current += c; + + if (logger) { + logger->set(l_backoff_throttle_val, current); + } + + return std::chrono::duration(0); + } + + auto ticket = _push_waiter(); + utime_t wait_from = ceph_clock_now(); + bool waited = false; + + while (waiters.begin() != ticket) { + (*ticket)->wait(l); + waited = true; + } + + auto start = std::chrono::system_clock::now(); + delay = _get_delay(c); + while (true) { + if (!((max == 0) || (current == 0) || (current + c) <= max)) { + (*ticket)->wait(l); + waited = true; + } else if (delay > std::chrono::duration(0)) { + (*ticket)->wait_for(l, delay); + waited = true; + } else { + break; + } + assert(ticket == waiters.begin()); + delay = _get_delay(c) - (std::chrono::system_clock::now() - start); + } + waiters.pop_front(); + _kick_waiters(); + + current += c; + + if (logger) { + logger->set(l_backoff_throttle_val, current); + if (waited) { + logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from); + } + } + + return std::chrono::system_clock::now() - start; +} + +uint64_t BackoffThrottle::put(uint64_t c) +{ + locker l(lock); + assert(current >= c); + current -= c; + _kick_waiters(); + + if (logger) { + logger->inc(l_backoff_throttle_put); + logger->inc(l_backoff_throttle_put_sum, c); + logger->set(l_backoff_throttle_val, current); + } + + return current; +} + +uint64_t BackoffThrottle::take(uint64_t c) +{ + locker l(lock); + current += c; + + if (logger) { + logger->inc(l_backoff_throttle_take); + logger->inc(l_backoff_throttle_take_sum, c); + logger->set(l_backoff_throttle_val, current); + } + + return current; +} + +uint64_t BackoffThrottle::get_current() +{ + locker l(lock); + return current; +} + +uint64_t BackoffThrottle::get_max() +{ + locker l(lock); + return max; +} + +SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) + : m_lock("SimpleThrottle"), + m_max(max), + m_current(0), + m_ret(0), + m_ignore_enoent(ignore_enoent) +{ +} + +SimpleThrottle::~SimpleThrottle() +{ + Mutex::Locker l(m_lock); + assert(m_current == 0); + assert(waiters == 0); +} + +void SimpleThrottle::start_op() +{ + Mutex::Locker l(m_lock); + while (m_max == m_current) { + waiters++; + m_cond.Wait(m_lock); + waiters--; + } + ++m_current; +} + +void SimpleThrottle::end_op(int r) +{ + Mutex::Locker l(m_lock); + --m_current; + if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) + m_ret = r; + m_cond.Signal(); +} + +bool SimpleThrottle::pending_error() const +{ + Mutex::Locker l(m_lock); + return (m_ret < 0); +} + +int SimpleThrottle::wait_for_ret() +{ + Mutex::Locker l(m_lock); + while (m_current > 0) { + waiters++; + m_cond.Wait(m_lock); + waiters--; + } + return m_ret; +} + +void C_OrderedThrottle::finish(int r) { + m_ordered_throttle->finish_op(m_tid, r); +} + +OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) + : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0), + m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) { +} + +OrderedThrottle::~OrderedThrottle() { + Mutex::Locker locker(m_lock); + assert(waiters == 0); +} + +C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { + assert(on_finish != NULL); + + Mutex::Locker locker(m_lock); + uint64_t tid = m_next_tid++; + m_tid_result[tid] = Result(on_finish); + C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid); + + complete_pending_ops(); + while (m_max == m_current) { + ++waiters; + m_cond.Wait(m_lock); + --waiters; + complete_pending_ops(); + } + ++m_current; + + return ctx; +} + +void OrderedThrottle::end_op(int r) { + Mutex::Locker locker(m_lock); + assert(m_current > 0); + + if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { + m_ret_val = r; + } + --m_current; + m_cond.Signal(); +} + +void OrderedThrottle::finish_op(uint64_t tid, int r) { + Mutex::Locker locker(m_lock); + + TidResult::iterator it = m_tid_result.find(tid); + assert(it != m_tid_result.end()); + + it->second.finished = true; + it->second.ret_val = r; + m_cond.Signal(); +} + +bool OrderedThrottle::pending_error() const { + Mutex::Locker locker(m_lock); + return (m_ret_val < 0); +} + +int OrderedThrottle::wait_for_ret() { + Mutex::Locker locker(m_lock); + complete_pending_ops(); + + while (m_current > 0) { + ++waiters; + m_cond.Wait(m_lock); + --waiters; + complete_pending_ops(); + } + return m_ret_val; +} + +void OrderedThrottle::complete_pending_ops() { + assert(m_lock.is_locked()); + + while (true) { + TidResult::iterator it = m_tid_result.begin(); + if (it == m_tid_result.end() || it->first != m_complete_tid || + !it->second.finished) { + break; + } + + Result result = it->second; + m_tid_result.erase(it); + + m_lock.Unlock(); + result.on_finish->complete(result.ret_val); + m_lock.Lock(); + + ++m_complete_tid; + } +}