X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fdmclock%2Fsrc%2Fdmclock_server.h;fp=src%2Fceph%2Fsrc%2Fdmclock%2Fsrc%2Fdmclock_server.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=2c9940dc6c17f8af7ff6c9b990951a7672ddb547;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/dmclock/src/dmclock_server.h b/src/ceph/src/dmclock/src/dmclock_server.h deleted file mode 100644 index 2c9940d..0000000 --- a/src/ceph/src/dmclock/src/dmclock_server.h +++ /dev/null @@ -1,1601 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Copyright (C) 2017 Red Hat Inc. - */ - - -#pragma once - -/* COMPILATION OPTIONS - * - * By default we include an optimization over the originally published - * dmclock algorithm using not the values of rho and delta that were - * sent in with a request but instead the most recent rho and delta - * values from the requests's client. To restore the algorithm's - * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler - * argument -DDO_NOT_DELAY_TAG_CALC). - * - * The prop_heap does not seem to be necessary. The only thing it - * would help with is quickly finding the mininum proportion/prioity - * when an idle client became active. To have the code maintain the - * proportional heap, define USE_PROP_HEAP (i.e., compiler argument - * -DUSE_PROP_HEAP). - */ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "indirect_intrusive_heap.h" -#include "run_every.h" -#include "dmclock_util.h" -#include "dmclock_recs.h" - -#ifdef PROFILE -#include "profile.h" -#endif - - -namespace crimson { - - namespace dmclock { - - namespace c = crimson; - - constexpr double max_tag = std::numeric_limits::is_iec559 ? - std::numeric_limits::infinity() : - std::numeric_limits::max(); - constexpr double min_tag = std::numeric_limits::is_iec559 ? - -std::numeric_limits::infinity() : - std::numeric_limits::lowest(); - constexpr uint tag_modulo = 1000000; - - struct ClientInfo { - const double reservation; // minimum - const double weight; // proportional - const double limit; // maximum - - // multiplicative inverses of above, which we use in calculations - // and don't want to recalculate repeatedly - const double reservation_inv; - const double weight_inv; - const double limit_inv; - - // order parameters -- min, "normal", max - ClientInfo(double _reservation, double _weight, double _limit) : - reservation(_reservation), - weight(_weight), - limit(_limit), - reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation), - weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight), - limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit) - { - // empty - } - - - friend std::ostream& operator<<(std::ostream& out, - const ClientInfo& client) { - out << - "{ ClientInfo:: r:" << client.reservation << - " w:" << std::fixed << client.weight << - " l:" << std::fixed << client.limit << - " 1/r:" << std::fixed << client.reservation_inv << - " 1/w:" << std::fixed << client.weight_inv << - " 1/l:" << std::fixed << client.limit_inv << - " }"; - return out; - } - }; // class ClientInfo - - - struct RequestTag { - double reservation; - double proportion; - double limit; - bool ready; // true when within limit -#ifndef DO_NOT_DELAY_TAG_CALC - Time arrival; -#endif - - RequestTag(const RequestTag& prev_tag, - const ClientInfo& client, - const uint32_t delta, - const uint32_t rho, - const Time time, - const double cost = 0.0) : - reservation(cost + tag_calc(time, - prev_tag.reservation, - client.reservation_inv, - rho, - true)), - proportion(tag_calc(time, - prev_tag.proportion, - client.weight_inv, - delta, - true)), - limit(tag_calc(time, - prev_tag.limit, - client.limit_inv, - delta, - false)), - ready(false) -#ifndef DO_NOT_DELAY_TAG_CALC - , arrival(time) -#endif - { - assert(reservation < max_tag || proportion < max_tag); - } - - RequestTag(const RequestTag& prev_tag, - const ClientInfo& client, - const ReqParams req_params, - const Time time, - const double cost = 0.0) : - RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost) - { /* empty */ } - - RequestTag(double _res, double _prop, double _lim, const Time _arrival) : - reservation(_res), - proportion(_prop), - limit(_lim), - ready(false) -#ifndef DO_NOT_DELAY_TAG_CALC - , arrival(_arrival) -#endif - { - assert(reservation < max_tag || proportion < max_tag); - } - - RequestTag(const RequestTag& other) : - reservation(other.reservation), - proportion(other.proportion), - limit(other.limit), - ready(other.ready) -#ifndef DO_NOT_DELAY_TAG_CALC - , arrival(other.arrival) -#endif - { - // empty - } - - static std::string format_tag_change(double before, double after) { - if (before == after) { - return std::string("same"); - } else { - std::stringstream ss; - ss << format_tag(before) << "=>" << format_tag(after); - return ss.str(); - } - } - - static std::string format_tag(double value) { - if (max_tag == value) { - return std::string("max"); - } else if (min_tag == value) { - return std::string("min"); - } else { - return format_time(value, tag_modulo); - } - } - - private: - - static double tag_calc(const Time time, - double prev, - double increment, - uint32_t dist_req_val, - bool extreme_is_high) { - if (0.0 == increment) { - return extreme_is_high ? max_tag : min_tag; - } else { - if (0 != dist_req_val) { - increment *= dist_req_val; - } - return std::max(time, prev + increment); - } - } - - friend std::ostream& operator<<(std::ostream& out, - const RequestTag& tag) { - out << - "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") << - " r:" << format_tag(tag.reservation) << - " p:" << format_tag(tag.proportion) << - " l:" << format_tag(tag.limit) << -#if 0 // try to resolve this to make sure Time is operator<<'able. -#ifndef DO_NOT_DELAY_TAG_CALC - " arrival:" << tag.arrival << -#endif -#endif - " }"; - return out; - } - }; // class RequestTag - - - // C is client identifier type, R is request type, B is heap - // branching factor - template - class PriorityQueueBase { - // we don't want to include gtest.h just for FRIEND_TEST - friend class dmclock_server_client_idle_erase_Test; - - public: - - using RequestRef = std::unique_ptr; - - protected: - - using TimePoint = decltype(std::chrono::steady_clock::now()); - using Duration = std::chrono::milliseconds; - using MarkPoint = std::pair; - - enum class ReadyOption {ignore, lowers, raises}; - - // forward decl for friend decls - template - struct ClientCompare; - - class ClientReq { - friend PriorityQueueBase; - - RequestTag tag; - C client_id; - RequestRef request; - - public: - - ClientReq(const RequestTag& _tag, - const C& _client_id, - RequestRef&& _request) : - tag(_tag), - client_id(_client_id), - request(std::move(_request)) - { - // empty - } - - friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) { - out << "{ ClientReq:: tag:" << c.tag << " client:" << - c.client_id << " }"; - return out; - } - }; // class ClientReq - - public: - - // NOTE: ClientRec is in the "public" section for compatibility - // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1 - // ClientRec could be "protected" with no issue. [See comments - // associated with function submit_top_request.] - class ClientRec { - friend PriorityQueueBase; - - C client; - RequestTag prev_tag; - std::deque requests; - - // amount added from the proportion tag as a result of - // an idle client becoming unidle - double prop_delta = 0.0; - - c::IndIntruHeapData reserv_heap_data; - c::IndIntruHeapData lim_heap_data; - c::IndIntruHeapData ready_heap_data; -#if USE_PROP_HEAP - c::IndIntruHeapData prop_heap_data; -#endif - - public: - - ClientInfo info; - bool idle; - Counter last_tick; - uint32_t cur_rho; - uint32_t cur_delta; - - ClientRec(C _client, - const ClientInfo& _info, - Counter current_tick) : - client(_client), - prev_tag(0.0, 0.0, 0.0, TimeZero), - info(_info), - idle(true), - last_tick(current_tick), - cur_rho(1), - cur_delta(1) - { - // empty - } - - inline const RequestTag& get_req_tag() const { - return prev_tag; - } - - static inline void assign_unpinned_tag(double& lhs, const double rhs) { - if (rhs != max_tag && rhs != min_tag) { - lhs = rhs; - } - } - - inline void update_req_tag(const RequestTag& _prev, - const Counter& _tick) { - assign_unpinned_tag(prev_tag.reservation, _prev.reservation); - assign_unpinned_tag(prev_tag.limit, _prev.limit); - assign_unpinned_tag(prev_tag.proportion, _prev.proportion); - last_tick = _tick; - } - - inline void add_request(const RequestTag& tag, - const C& client_id, - RequestRef&& request) { - requests.emplace_back(ClientReq(tag, client_id, std::move(request))); - } - - inline const ClientReq& next_request() const { - return requests.front(); - } - - inline ClientReq& next_request() { - return requests.front(); - } - - inline void pop_request() { - requests.pop_front(); - } - - inline bool has_request() const { - return !requests.empty(); - } - - inline size_t request_count() const { - return requests.size(); - } - - // NB: because a deque is the underlying structure, this - // operation might be expensive - bool remove_by_req_filter_fw(std::function filter_accum) { - bool any_removed = false; - for (auto i = requests.begin(); - i != requests.end(); - /* no inc */) { - if (filter_accum(std::move(*i->request))) { - any_removed = true; - i = requests.erase(i); - } else { - ++i; - } - } - return any_removed; - } - - // NB: because a deque is the underlying structure, this - // operation might be expensive - bool remove_by_req_filter_bw(std::function filter_accum) { - bool any_removed = false; - for (auto i = requests.rbegin(); - i != requests.rend(); - /* no inc */) { - if (filter_accum(std::move(*i->request))) { - any_removed = true; - i = decltype(i){ requests.erase(std::next(i).base()) }; - } else { - ++i; - } - } - return any_removed; - } - - inline bool - remove_by_req_filter(std::function filter_accum, - bool visit_backwards) { - if (visit_backwards) { - return remove_by_req_filter_bw(filter_accum); - } else { - return remove_by_req_filter_fw(filter_accum); - } - } - - friend std::ostream& - operator<<(std::ostream& out, - const typename PriorityQueueBase::ClientRec& e) { - out << "{ ClientRec::" << - " client:" << e.client << - " prev_tag:" << e.prev_tag << - " req_count:" << e.requests.size() << - " top_req:"; - if (e.has_request()) { - out << e.next_request(); - } else { - out << "none"; - } - out << " }"; - - return out; - } - }; // class ClientRec - - using ClientRecRef = std::shared_ptr; - - // when we try to get the next request, we'll be in one of three - // situations -- we'll have one to return, have one that can - // fire in the future, or not have any - enum class NextReqType { returning, future, none }; - - // specifies which queue next request will get popped from - enum class HeapId { reservation, ready }; - - // this is returned from next_req to tell the caller the situation - struct NextReq { - NextReqType type; - union { - HeapId heap_id; - Time when_ready; - }; - }; - - - // a function that can be called to look up client information - using ClientInfoFunc = std::function; - - - bool empty() const { - DataGuard g(data_mtx); - return (resv_heap.empty() || ! resv_heap.top().has_request()); - } - - - size_t client_count() const { - DataGuard g(data_mtx); - return resv_heap.size(); - } - - - size_t request_count() const { - DataGuard g(data_mtx); - size_t total = 0; - for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) { - total += i->request_count(); - } - return total; - } - - - bool remove_by_req_filter(std::function filter_accum, - bool visit_backwards = false) { - bool any_removed = false; - DataGuard g(data_mtx); - for (auto i : client_map) { - bool modified = - i.second->remove_by_req_filter(filter_accum, visit_backwards); - if (modified) { - resv_heap.adjust(*i.second); - limit_heap.adjust(*i.second); - ready_heap.adjust(*i.second); -#if USE_PROP_HEAP - prop_heap.adjust(*i.second); -#endif - any_removed = true; - } - } - return any_removed; - } - - - // use as a default value when no accumulator is provide - static void request_sink(R&& req) { - // do nothing - } - - - void remove_by_client(const C& client, - bool reverse = false, - std::function accum = request_sink) { - DataGuard g(data_mtx); - - auto i = client_map.find(client); - - if (i == client_map.end()) return; - - if (reverse) { - for (auto j = i->second->requests.rbegin(); - j != i->second->requests.rend(); - ++j) { - accum(std::move(*j->request)); - } - } else { - for (auto j = i->second->requests.begin(); - j != i->second->requests.end(); - ++j) { - accum(std::move(*j->request)); - } - } - - i->second->requests.clear(); - - resv_heap.adjust(*i->second); - limit_heap.adjust(*i->second); - ready_heap.adjust(*i->second); -#if USE_PROP_HEAP - prop_heap.adjust(*i->second); -#endif - } - - - uint get_heap_branching_factor() const { - return B; - } - - - friend std::ostream& operator<<(std::ostream& out, - const PriorityQueueBase& q) { - std::lock_guard guard(q.data_mtx); - - out << "{ PriorityQueue::"; - for (const auto& c : q.client_map) { - out << " { client:" << c.first << ", record:" << *c.second << - " }"; - } - if (!q.resv_heap.empty()) { - const auto& resv = q.resv_heap.top(); - out << " { reservation_top:" << resv << " }"; - const auto& ready = q.ready_heap.top(); - out << " { ready_top:" << ready << " }"; - const auto& limit = q.limit_heap.top(); - out << " { limit_top:" << limit << " }"; - } else { - out << " HEAPS-EMPTY"; - } - out << " }"; - - return out; - } - - // for debugging - void display_queues(std::ostream& out, - bool show_res = true, - bool show_lim = true, - bool show_ready = true, - bool show_prop = true) const { - auto filter = [](const ClientRec& e)->bool { return true; }; - DataGuard g(data_mtx); - if (show_res) { - resv_heap.display_sorted(out << "RESER:", filter); - } - if (show_lim) { - limit_heap.display_sorted(out << "LIMIT:", filter); - } - if (show_ready) { - ready_heap.display_sorted(out << "READY:", filter); - } -#if USE_PROP_HEAP - if (show_prop) { - prop_heap.display_sorted(out << "PROPO:", filter); - } -#endif - } // display_queues - - - protected: - - // The ClientCompare functor is essentially doing a precedes? - // operator, returning true if and only if the first parameter - // must precede the second parameter. If the second must precede - // the first, or if they are equivalent, false should be - // returned. The reason for this behavior is that it will be - // called to test if two items are out of order and if true is - // returned it will reverse the items. Therefore false is the - // default return when it doesn't matter to prevent unnecessary - // re-ordering. - // - // The template is supporting variations in sorting based on the - // heap in question and allowing these variations to be handled - // at compile-time. - // - // tag_field determines which tag is being used for comparison - // - // ready_opt determines how the ready flag influences the sort - // - // use_prop_delta determines whether the proportional delta is - // added in for comparison - template - struct ClientCompare { - bool operator()(const ClientRec& n1, const ClientRec& n2) const { - if (n1.has_request()) { - if (n2.has_request()) { - const auto& t1 = n1.next_request().tag; - const auto& t2 = n2.next_request().tag; - if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) { - // if we don't care about ready or the ready values are the same - if (use_prop_delta) { - return (t1.*tag_field + n1.prop_delta) < - (t2.*tag_field + n2.prop_delta); - } else { - return t1.*tag_field < t2.*tag_field; - } - } else if (ReadyOption::raises == ready_opt) { - // use_ready == true && the ready fields are different - return t1.ready; - } else { - return t2.ready; - } - } else { - // n1 has request but n2 does not - return true; - } - } else if (n2.has_request()) { - // n2 has request but n1 does not - return false; - } else { - // both have none; keep stable w false - return false; - } - } - }; - - ClientInfoFunc client_info_f; - - mutable std::mutex data_mtx; - using DataGuard = std::lock_guard; - - // stable mapping between client ids and client queues - std::map client_map; - - c::IndIntruHeap, - B> resv_heap; -#if USE_PROP_HEAP - c::IndIntruHeap, - B> prop_heap; -#endif - c::IndIntruHeap, - B> limit_heap; - c::IndIntruHeap, - B> ready_heap; - - // if all reservations are met and all other requestes are under - // limit, this will allow the request next in terms of - // proportion to still get issued - bool allow_limit_break; - - std::atomic_bool finishing; - - // every request creates a tick - Counter tick = 0; - - // performance data collection - size_t reserv_sched_count = 0; - size_t prop_sched_count = 0; - size_t limit_break_sched_count = 0; - - Duration idle_age; - Duration erase_age; - Duration check_time; - std::deque clean_mark_points; - - // NB: All threads declared at end, so they're destructed first! - - std::unique_ptr cleaning_job; - - - // COMMON constructor that others feed into; we can accept three - // different variations of durations - template - PriorityQueueBase(ClientInfoFunc _client_info_f, - std::chrono::duration _idle_age, - std::chrono::duration _erase_age, - std::chrono::duration _check_time, - bool _allow_limit_break) : - client_info_f(_client_info_f), - allow_limit_break(_allow_limit_break), - finishing(false), - idle_age(std::chrono::duration_cast(_idle_age)), - erase_age(std::chrono::duration_cast(_erase_age)), - check_time(std::chrono::duration_cast(_check_time)) - { - assert(_erase_age >= _idle_age); - assert(_check_time < _idle_age); - cleaning_job = - std::unique_ptr( - new RunEvery(check_time, - std::bind(&PriorityQueueBase::do_clean, this))); - } - - - ~PriorityQueueBase() { - finishing = true; - } - - - // data_mtx must be held by caller - void do_add_request(RequestRef&& request, - const C& client_id, - const ReqParams& req_params, - const Time time, - const double cost = 0.0) { - ++tick; - - // this pointer will help us create a reference to a shared - // pointer, no matter which of two codepaths we take - ClientRec* temp_client; - - auto client_it = client_map.find(client_id); - if (client_map.end() != client_it) { - temp_client = &(*client_it->second); // address of obj of shared_ptr - } else { - ClientInfo info = client_info_f(client_id); - ClientRecRef client_rec = - std::make_shared(client_id, info, tick); - resv_heap.push(client_rec); -#if USE_PROP_HEAP - prop_heap.push(client_rec); -#endif - limit_heap.push(client_rec); - ready_heap.push(client_rec); - client_map[client_id] = client_rec; - temp_client = &(*client_rec); // address of obj of shared_ptr - } - - // for convenience, we'll create a reference to the shared pointer - ClientRec& client = *temp_client; - - if (client.idle) { - // We need to do an adjustment so that idle clients compete - // fairly on proportional tags since those tags may have - // drifted from real-time. Either use the lowest existing - // proportion tag -- O(1) -- or the client with the lowest - // previous proportion tag -- O(n) where n = # clients. - // - // So we don't have to maintain a propotional queue that - // keeps the minimum on proportional tag alone (we're - // instead using a ready queue), we'll have to check each - // client. - // - // The alternative would be to maintain a proportional queue - // (define USE_PROP_TAG) and do an O(1) operation here. - - // Was unable to confirm whether equality testing on - // std::numeric_limits::max() is guaranteed, so - // we'll use a compile-time calculated trigger that is one - // third the max, which should be much larger than any - // expected organic value. - constexpr double lowest_prop_tag_trigger = - std::numeric_limits::max() / 3.0; - - double lowest_prop_tag = std::numeric_limits::max(); - for (auto const &c : client_map) { - // don't use ourselves (or anything else that might be - // listed as idle) since we're now in the map - if (!c.second->idle) { - double p; - // use either lowest proportion tag or previous proportion tag - if (c.second->has_request()) { - p = c.second->next_request().tag.proportion + - c.second->prop_delta; - } else { - p = c.second->get_req_tag().proportion + c.second->prop_delta; - } - - if (p < lowest_prop_tag) { - lowest_prop_tag = p; - } - } - } - - // if this conditional does not fire, it - if (lowest_prop_tag < lowest_prop_tag_trigger) { - client.prop_delta = lowest_prop_tag - time; - } - client.idle = false; - } // if this client was idle - -#ifndef DO_NOT_DELAY_TAG_CALC - RequestTag tag(0, 0, 0, time); - - if (!client.has_request()) { - tag = RequestTag(client.get_req_tag(), - client.info, - req_params, - time, - cost); - - // copy tag to previous tag for client - client.update_req_tag(tag, tick); - } -#else - RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost); - // copy tag to previous tag for client - client.update_req_tag(tag, tick); -#endif - - client.add_request(tag, client.client, std::move(request)); - if (1 == client.requests.size()) { - // NB: can the following 4 calls to adjust be changed - // promote? Can adding a request ever demote a client in the - // heaps? - resv_heap.adjust(client); - limit_heap.adjust(client); - ready_heap.adjust(client); -#if USE_PROP_HEAP - prop_heap.adjust(client); -#endif - } - - client.cur_rho = req_params.rho; - client.cur_delta = req_params.delta; - - resv_heap.adjust(client); - limit_heap.adjust(client); - ready_heap.adjust(client); -#if USE_PROP_HEAP - prop_heap.adjust(client); -#endif - } // add_request - - - // data_mtx should be held when called; top of heap should have - // a ready request - template - void pop_process_request(IndIntruHeap& heap, - std::function process) { - // gain access to data - ClientRec& top = heap.top(); - - RequestRef request = std::move(top.next_request().request); -#ifndef DO_NOT_DELAY_TAG_CALC - RequestTag tag = top.next_request().tag; -#endif - - // pop request and adjust heaps - top.pop_request(); - -#ifndef DO_NOT_DELAY_TAG_CALC - if (top.has_request()) { - ClientReq& next_first = top.next_request(); - next_first.tag = RequestTag(tag, top.info, - top.cur_delta, top.cur_rho, - next_first.tag.arrival); - - // copy tag to previous tag for client - top.update_req_tag(next_first.tag, tick); - } -#endif - - resv_heap.demote(top); - limit_heap.adjust(top); -#if USE_PROP_HEAP - prop_heap.demote(top); -#endif - ready_heap.demote(top); - - // process - process(top.client, request); - } // pop_process_request - - - // data_mtx should be held when called - void reduce_reservation_tags(ClientRec& client) { - for (auto& r : client.requests) { - r.tag.reservation -= client.info.reservation_inv; - -#ifndef DO_NOT_DELAY_TAG_CALC - // reduce only for front tag. because next tags' value are invalid - break; -#endif - } - // don't forget to update previous tag - client.prev_tag.reservation -= client.info.reservation_inv; - resv_heap.promote(client); - } - - - // data_mtx should be held when called - void reduce_reservation_tags(const C& client_id) { - auto client_it = client_map.find(client_id); - - // means the client was cleaned from map; should never happen - // as long as cleaning times are long enough - assert(client_map.end() != client_it); - reduce_reservation_tags(*client_it->second); - } - - - // data_mtx should be held when called - NextReq do_next_request(Time now) { - NextReq result; - - // if reservation queue is empty, all are empty (i.e., no active clients) - if(resv_heap.empty()) { - result.type = NextReqType::none; - return result; - } - - // try constraint (reservation) based scheduling - - auto& reserv = resv_heap.top(); - if (reserv.has_request() && - reserv.next_request().tag.reservation <= now) { - result.type = NextReqType::returning; - result.heap_id = HeapId::reservation; - return result; - } - - // no existing reservations before now, so try weight-based - // scheduling - - // all items that are within limit are eligible based on - // priority - auto limits = &limit_heap.top(); - while (limits->has_request() && - !limits->next_request().tag.ready && - limits->next_request().tag.limit <= now) { - limits->next_request().tag.ready = true; - ready_heap.promote(*limits); - limit_heap.demote(*limits); - - limits = &limit_heap.top(); - } - - auto& readys = ready_heap.top(); - if (readys.has_request() && - readys.next_request().tag.ready && - readys.next_request().tag.proportion < max_tag) { - result.type = NextReqType::returning; - result.heap_id = HeapId::ready; - return result; - } - - // if nothing is schedulable by reservation or - // proportion/weight, and if we allow limit break, try to - // schedule something with the lowest proportion tag or - // alternatively lowest reservation tag. - if (allow_limit_break) { - if (readys.has_request() && - readys.next_request().tag.proportion < max_tag) { - result.type = NextReqType::returning; - result.heap_id = HeapId::ready; - return result; - } else if (reserv.has_request() && - reserv.next_request().tag.reservation < max_tag) { - result.type = NextReqType::returning; - result.heap_id = HeapId::reservation; - return result; - } - } - - // nothing scheduled; make sure we re-run when next - // reservation item or next limited item comes up - - Time next_call = TimeMax; - if (resv_heap.top().has_request()) { - next_call = - min_not_0_time(next_call, - resv_heap.top().next_request().tag.reservation); - } - if (limit_heap.top().has_request()) { - const auto& next = limit_heap.top().next_request(); - assert(!next.tag.ready || max_tag == next.tag.proportion); - next_call = min_not_0_time(next_call, next.tag.limit); - } - if (next_call < TimeMax) { - result.type = NextReqType::future; - result.when_ready = next_call; - return result; - } else { - result.type = NextReqType::none; - return result; - } - } // do_next_request - - - // if possible is not zero and less than current then return it; - // otherwise return current; the idea is we're trying to find - // the minimal time but ignoring zero - static inline const Time& min_not_0_time(const Time& current, - const Time& possible) { - return TimeZero == possible ? current : std::min(current, possible); - } - - - /* - * This is being called regularly by RunEvery. Every time it's - * called it notes the time and delta counter (mark point) in a - * deque. It also looks at the deque to find the most recent - * mark point that is older than clean_age. It then walks the - * map and delete all server entries that were last used before - * that mark point. - */ - void do_clean() { - TimePoint now = std::chrono::steady_clock::now(); - DataGuard g(data_mtx); - clean_mark_points.emplace_back(MarkPoint(now, tick)); - - // first erase the super-old client records - - Counter erase_point = 0; - auto point = clean_mark_points.front(); - while (point.first <= now - erase_age) { - erase_point = point.second; - clean_mark_points.pop_front(); - point = clean_mark_points.front(); - } - - Counter idle_point = 0; - for (auto i : clean_mark_points) { - if (i.first <= now - idle_age) { - idle_point = i.second; - } else { - break; - } - } - - if (erase_point > 0 || idle_point > 0) { - for (auto i = client_map.begin(); i != client_map.end(); /* empty */) { - auto i2 = i++; - if (erase_point && i2->second->last_tick <= erase_point) { - delete_from_heaps(i2->second); - client_map.erase(i2); - } else if (idle_point && i2->second->last_tick <= idle_point) { - i2->second->idle = true; - } - } // for - } // if - } // do_clean - - - // data_mtx must be held by caller - template - void delete_from_heap(ClientRecRef& client, - c::IndIntruHeap& heap) { - auto i = heap.rfind(client); - heap.remove(i); - } - - - // data_mtx must be held by caller - void delete_from_heaps(ClientRecRef& client) { - delete_from_heap(client, resv_heap); -#if USE_PROP_HEAP - delete_from_heap(client, prop_heap); -#endif - delete_from_heap(client, limit_heap); - delete_from_heap(client, ready_heap); - } - }; // class PriorityQueueBase - - - template - class PullPriorityQueue : public PriorityQueueBase { - using super = PriorityQueueBase; - - public: - - // When a request is pulled, this is the return type. - struct PullReq { - struct Retn { - C client; - typename super::RequestRef request; - PhaseType phase; - }; - - typename super::NextReqType type; - boost::variant data; - - bool is_none() const { return type == super::NextReqType::none; } - - bool is_retn() const { return type == super::NextReqType::returning; } - Retn& get_retn() { - return boost::get(data); - } - - bool is_future() const { return type == super::NextReqType::future; } - Time getTime() const { return boost::get