// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "include/scope_guard.h" #include "common/Throttle.h" #include "common/perf_counters.h" // re-include our assert to clobber the system one; fix dout: #include "include/assert.h" #define dout_subsys ceph_subsys_throttle #undef dout_prefix #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " enum { l_throttle_first = 532430, l_throttle_val, l_throttle_max, l_throttle_get_started, l_throttle_get, l_throttle_get_sum, l_throttle_get_or_fail_fail, l_throttle_get_or_fail_success, l_throttle_take, l_throttle_take_sum, l_throttle_put, l_throttle_put_sum, l_throttle_wait, l_throttle_last, }; Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf) : cct(cct), name(n), logger(NULL), max(m), lock("Throttle::lock"), use_perf(_use_perf) { assert(m >= 0); if (!use_perf) return; if (cct->_conf->throttler_perf_counter) { PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last); b.add_u64(l_throttle_val, "val", "Currently available throttle"); b.add_u64(l_throttle_max, "max", "Max value for throttle"); b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait"); b.add_u64_counter(l_throttle_get, "get", "Gets"); b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data"); b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail"); b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail"); b.add_u64_counter(l_throttle_take, "take", "Takes"); b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data"); b.add_u64_counter(l_throttle_put, "put", "Puts"); b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); logger = b.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); logger->set(l_throttle_max, max); } } Throttle::~Throttle() { { Mutex::Locker l(lock); assert(cond.empty()); } if (!use_perf) return; if (logger) { cct->get_perfcounters_collection()->remove(logger); delete logger; } } void Throttle::_reset_max(int64_t m) { assert(lock.is_locked()); if (static_cast(max) == m) return; if (!cond.empty()) cond.front()->SignalOne(); if (logger) logger->set(l_throttle_max, m); max = m; } bool Throttle::_wait(int64_t c) { utime_t start; bool waited = false; if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters. { auto cv = cond.insert(cond.end(), new Cond); auto w = make_scope_guard([this, cv]() { delete *cv; cond.erase(cv); }); waited = true; ldout(cct, 2) << "_wait waiting..." << dendl; if (logger) start = ceph_clock_now(); do { (*cv)->Wait(lock); } while ((_should_wait(c) || cv != cond.begin())); ldout(cct, 2) << "_wait finished waiting" << dendl; if (logger) { utime_t dur = ceph_clock_now() - start; logger->tinc(l_throttle_wait, dur); } } // wake up the next guy if (!cond.empty()) cond.front()->SignalOne(); } return waited; } bool Throttle::wait(int64_t m) { if (0 == max && 0 == m) { return false; } Mutex::Locker l(lock); if (m) { assert(m > 0); _reset_max(m); } ldout(cct, 10) << "wait" << dendl; return _wait(0); } int64_t Throttle::take(int64_t c) { if (0 == max) { return 0; } assert(c >= 0); ldout(cct, 10) << "take " << c << dendl; { Mutex::Locker l(lock); count += c; } if (logger) { logger->inc(l_throttle_take); logger->inc(l_throttle_take_sum, c); logger->set(l_throttle_val, count); } return count; } bool Throttle::get(int64_t c, int64_t m) { if (0 == max && 0 == m) { return false; } assert(c >= 0); ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; if (logger) { logger->inc(l_throttle_get_started); } bool waited = false; { Mutex::Locker l(lock); if (m) { assert(m > 0); _reset_max(m); } waited = _wait(c); count += c; } if (logger) { logger->inc(l_throttle_get); logger->inc(l_throttle_get_sum, c); logger->set(l_throttle_val, count); } return waited; } /* Returns true if it successfully got the requested amount, * or false if it would block. */ bool Throttle::get_or_fail(int64_t c) { if (0 == max) { return true; } assert (c >= 0); Mutex::Locker l(lock); if (_should_wait(c) || !cond.empty()) { ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; if (logger) { logger->inc(l_throttle_get_or_fail_fail); } return false; } else { ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; count += c; if (logger) { logger->inc(l_throttle_get_or_fail_success); logger->inc(l_throttle_get); logger->inc(l_throttle_get_sum, c); logger->set(l_throttle_val, count); } return true; } } int64_t Throttle::put(int64_t c) { if (0 == max) { return 0; } assert(c >= 0); ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl; Mutex::Locker l(lock); if (c) { if (!cond.empty()) cond.front()->SignalOne(); assert(static_cast(count) >= c); // if count goes negative, we failed somewhere! count -= c; if (logger) { logger->inc(l_throttle_put); logger->inc(l_throttle_put_sum, c); logger->set(l_throttle_val, count); } } return count; } void Throttle::reset() { Mutex::Locker l(lock); if (!cond.empty()) cond.front()->SignalOne(); count = 0; if (logger) { logger->set(l_throttle_val, 0); } } enum { l_backoff_throttle_first = l_throttle_last + 1, l_backoff_throttle_val, l_backoff_throttle_max, l_backoff_throttle_get, l_backoff_throttle_get_sum, l_backoff_throttle_take, l_backoff_throttle_take_sum, l_backoff_throttle_put, l_backoff_throttle_put_sum, l_backoff_throttle_wait, l_backoff_throttle_last, }; BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf) : cct(cct), name(n), logger(NULL), conds(expected_concurrency),///< [in] determines size of conds use_perf(_use_perf) { if (!use_perf) return; if (cct->_conf->throttler_perf_counter) { PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last); b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data"); b.add_u64_counter(l_backoff_throttle_take, "take", "Takes"); b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data"); b.add_u64_counter(l_backoff_throttle_put, "put", "Puts"); b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); logger = b.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); logger->set(l_backoff_throttle_max, max); } } BackoffThrottle::~BackoffThrottle() { { locker l(lock); assert(waiters.empty()); } if (!use_perf) return; if (logger) { cct->get_perfcounters_collection()->remove(logger); delete logger; } } bool BackoffThrottle::set_params( double _low_threshhold, double _high_threshhold, double _expected_throughput, double _high_multiple, double _max_multiple, uint64_t _throttle_max, ostream *errstream) { bool valid = true; if (_low_threshhold > _high_threshhold) { valid = false; if (errstream) { *errstream << "low_threshhold (" << _low_threshhold << ") > high_threshhold (" << _high_threshhold << ")" << std::endl; } } if (_high_multiple > _max_multiple) { valid = false; if (errstream) { *errstream << "_high_multiple (" << _high_multiple << ") > _max_multiple (" << _max_multiple << ")" << std::endl; } } if (_low_threshhold > 1 || _low_threshhold < 0) { valid = false; if (errstream) { *errstream << "invalid low_threshhold (" << _low_threshhold << ")" << std::endl; } } if (_high_threshhold > 1 || _high_threshhold < 0) { valid = false; if (errstream) { *errstream << "invalid high_threshhold (" << _high_threshhold << ")" << std::endl; } } if (_max_multiple < 0) { valid = false; if (errstream) { *errstream << "invalid _max_multiple (" << _max_multiple << ")" << std::endl; } } if (_high_multiple < 0) { valid = false; if (errstream) { *errstream << "invalid _high_multiple (" << _high_multiple << ")" << std::endl; } } if (_expected_throughput < 0) { valid = false; if (errstream) { *errstream << "invalid _expected_throughput(" << _expected_throughput << ")" << std::endl; } } if (!valid) return false; locker l(lock); low_threshhold = _low_threshhold; high_threshhold = _high_threshhold; high_delay_per_count = _high_multiple / _expected_throughput; max_delay_per_count = _max_multiple / _expected_throughput; max = _throttle_max; if (logger) logger->set(l_backoff_throttle_max, max); if (high_threshhold - low_threshhold > 0) { s0 = high_delay_per_count / (high_threshhold - low_threshhold); } else { low_threshhold = high_threshhold; s0 = 0; } if (1 - high_threshhold > 0) { s1 = (max_delay_per_count - high_delay_per_count) / (1 - high_threshhold); } else { high_threshhold = 1; s1 = 0; } _kick_waiters(); return true; } std::chrono::duration BackoffThrottle::_get_delay(uint64_t c) const { if (max == 0) return std::chrono::duration(0); double r = ((double)current) / ((double)max); if (r < low_threshhold) { return std::chrono::duration(0); } else if (r < high_threshhold) { return c * std::chrono::duration( (r - low_threshhold) * s0); } else { return c * std::chrono::duration( high_delay_per_count + ((r - high_threshhold) * s1)); } } std::chrono::duration BackoffThrottle::get(uint64_t c) { locker l(lock); auto delay = _get_delay(c); if (logger) { logger->inc(l_backoff_throttle_get); logger->inc(l_backoff_throttle_get_sum, c); } // fast path if (delay == std::chrono::duration(0) && waiters.empty() && ((max == 0) || (current == 0) || ((current + c) <= max))) { current += c; if (logger) { logger->set(l_backoff_throttle_val, current); } return std::chrono::duration(0); } auto ticket = _push_waiter(); utime_t wait_from = ceph_clock_now(); bool waited = false; while (waiters.begin() != ticket) { (*ticket)->wait(l); waited = true; } auto start = std::chrono::system_clock::now(); delay = _get_delay(c); while (true) { if (!((max == 0) || (current == 0) || (current + c) <= max)) { (*ticket)->wait(l); waited = true; } else if (delay > std::chrono::duration(0)) { (*ticket)->wait_for(l, delay); waited = true; } else { break; } assert(ticket == waiters.begin()); delay = _get_delay(c) - (std::chrono::system_clock::now() - start); } waiters.pop_front(); _kick_waiters(); current += c; if (logger) { logger->set(l_backoff_throttle_val, current); if (waited) { logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from); } } return std::chrono::system_clock::now() - start; } uint64_t BackoffThrottle::put(uint64_t c) { locker l(lock); assert(current >= c); current -= c; _kick_waiters(); if (logger) { logger->inc(l_backoff_throttle_put); logger->inc(l_backoff_throttle_put_sum, c); logger->set(l_backoff_throttle_val, current); } return current; } uint64_t BackoffThrottle::take(uint64_t c) { locker l(lock); current += c; if (logger) { logger->inc(l_backoff_throttle_take); logger->inc(l_backoff_throttle_take_sum, c); logger->set(l_backoff_throttle_val, current); } return current; } uint64_t BackoffThrottle::get_current() { locker l(lock); return current; } uint64_t BackoffThrottle::get_max() { locker l(lock); return max; } SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) : m_lock("SimpleThrottle"), m_max(max), m_current(0), m_ret(0), m_ignore_enoent(ignore_enoent) { } SimpleThrottle::~SimpleThrottle() { Mutex::Locker l(m_lock); assert(m_current == 0); assert(waiters == 0); } void SimpleThrottle::start_op() { Mutex::Locker l(m_lock); while (m_max == m_current) { waiters++; m_cond.Wait(m_lock); waiters--; } ++m_current; } void SimpleThrottle::end_op(int r) { Mutex::Locker l(m_lock); --m_current; if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) m_ret = r; m_cond.Signal(); } bool SimpleThrottle::pending_error() const { Mutex::Locker l(m_lock); return (m_ret < 0); } int SimpleThrottle::wait_for_ret() { Mutex::Locker l(m_lock); while (m_current > 0) { waiters++; m_cond.Wait(m_lock); waiters--; } return m_ret; } void C_OrderedThrottle::finish(int r) { m_ordered_throttle->finish_op(m_tid, r); } OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0), m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) { } OrderedThrottle::~OrderedThrottle() { Mutex::Locker locker(m_lock); assert(waiters == 0); } C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { assert(on_finish != NULL); Mutex::Locker locker(m_lock); uint64_t tid = m_next_tid++; m_tid_result[tid] = Result(on_finish); C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid); complete_pending_ops(); while (m_max == m_current) { ++waiters; m_cond.Wait(m_lock); --waiters; complete_pending_ops(); } ++m_current; return ctx; } void OrderedThrottle::end_op(int r) { Mutex::Locker locker(m_lock); assert(m_current > 0); if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { m_ret_val = r; } --m_current; m_cond.Signal(); } void OrderedThrottle::finish_op(uint64_t tid, int r) { Mutex::Locker locker(m_lock); TidResult::iterator it = m_tid_result.find(tid); assert(it != m_tid_result.end()); it->second.finished = true; it->second.ret_val = r; m_cond.Signal(); } bool OrderedThrottle::pending_error() const { Mutex::Locker locker(m_lock); return (m_ret_val < 0); } int OrderedThrottle::wait_for_ret() { Mutex::Locker locker(m_lock); complete_pending_ops(); while (m_current > 0) { ++waiters; m_cond.Wait(m_lock); --waiters; complete_pending_ops(); } return m_ret_val; } void OrderedThrottle::complete_pending_ops() { assert(m_lock.is_locked()); while (true) { TidResult::iterator it = m_tid_result.begin(); if (it == m_tid_result.end() || it->first != m_complete_tid || !it->second.finished) { break; } Result result = it->second; m_tid_result.erase(it); m_lock.Unlock(); result.on_finish->complete(result.ret_val); m_lock.Lock(); ++m_complete_tid; } }