1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2017 Red Hat Inc.
11 /* COMPILATION OPTIONS
13 * By default we include an optimization over the originally published
14 * dmclock algorithm using not the values of rho and delta that were
15 * sent in with a request but instead the most recent rho and delta
16 * values from the requests's client. To restore the algorithm's
17 * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
18 * argument -DDO_NOT_DELAY_TAG_CALC).
20 * The prop_heap does not seem to be necessary. The only thing it
21 * would help with is quickly finding the mininum proportion/prioity
22 * when an idle client became active. To have the code maintain the
23 * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
36 #include <condition_variable>
42 #include <boost/variant.hpp>
44 #include "indirect_intrusive_heap.h"
45 #include "run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
58 namespace c = crimson;
60 constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
61 std::numeric_limits<double>::infinity() :
62 std::numeric_limits<double>::max();
63 constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
64 -std::numeric_limits<double>::infinity() :
65 std::numeric_limits<double>::lowest();
66 constexpr uint tag_modulo = 1000000;
69 const double reservation; // minimum
70 const double weight; // proportional
71 const double limit; // maximum
73 // multiplicative inverses of above, which we use in calculations
74 // and don't want to recalculate repeatedly
75 const double reservation_inv;
76 const double weight_inv;
77 const double limit_inv;
79 // order parameters -- min, "normal", max
80 ClientInfo(double _reservation, double _weight, double _limit) :
81 reservation(_reservation),
84 reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
85 weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
86 limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
92 friend std::ostream& operator<<(std::ostream& out,
93 const ClientInfo& client) {
95 "{ ClientInfo:: r:" << client.reservation <<
96 " w:" << std::fixed << client.weight <<
97 " l:" << std::fixed << client.limit <<
98 " 1/r:" << std::fixed << client.reservation_inv <<
99 " 1/w:" << std::fixed << client.weight_inv <<
100 " 1/l:" << std::fixed << client.limit_inv <<
104 }; // class ClientInfo
111 bool ready; // true when within limit
112 #ifndef DO_NOT_DELAY_TAG_CALC
116 RequestTag(const RequestTag& prev_tag,
117 const ClientInfo& client,
118 const uint32_t delta,
121 const double cost = 0.0) :
122 reservation(cost + tag_calc(time,
123 prev_tag.reservation,
124 client.reservation_inv,
127 proportion(tag_calc(time,
138 #ifndef DO_NOT_DELAY_TAG_CALC
142 assert(reservation < max_tag || proportion < max_tag);
145 RequestTag(const RequestTag& prev_tag,
146 const ClientInfo& client,
147 const ReqParams req_params,
149 const double cost = 0.0) :
150 RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost)
153 RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
158 #ifndef DO_NOT_DELAY_TAG_CALC
162 assert(reservation < max_tag || proportion < max_tag);
165 RequestTag(const RequestTag& other) :
166 reservation(other.reservation),
167 proportion(other.proportion),
170 #ifndef DO_NOT_DELAY_TAG_CALC
171 , arrival(other.arrival)
177 static std::string format_tag_change(double before, double after) {
178 if (before == after) {
179 return std::string("same");
181 std::stringstream ss;
182 ss << format_tag(before) << "=>" << format_tag(after);
187 static std::string format_tag(double value) {
188 if (max_tag == value) {
189 return std::string("max");
190 } else if (min_tag == value) {
191 return std::string("min");
193 return format_time(value, tag_modulo);
199 static double tag_calc(const Time time,
202 uint32_t dist_req_val,
203 bool extreme_is_high) {
204 if (0.0 == increment) {
205 return extreme_is_high ? max_tag : min_tag;
207 if (0 != dist_req_val) {
208 increment *= dist_req_val;
210 return std::max(time, prev + increment);
214 friend std::ostream& operator<<(std::ostream& out,
215 const RequestTag& tag) {
217 "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
218 " r:" << format_tag(tag.reservation) <<
219 " p:" << format_tag(tag.proportion) <<
220 " l:" << format_tag(tag.limit) <<
221 #if 0 // try to resolve this to make sure Time is operator<<'able.
222 #ifndef DO_NOT_DELAY_TAG_CALC
223 " arrival:" << tag.arrival <<
229 }; // class RequestTag
232 // C is client identifier type, R is request type, B is heap
234 template<typename C, typename R, uint B>
235 class PriorityQueueBase {
236 // we don't want to include gtest.h just for FRIEND_TEST
237 friend class dmclock_server_client_idle_erase_Test;
241 using RequestRef = std::unique_ptr<R>;
245 using TimePoint = decltype(std::chrono::steady_clock::now());
246 using Duration = std::chrono::milliseconds;
247 using MarkPoint = std::pair<TimePoint,Counter>;
249 enum class ReadyOption {ignore, lowers, raises};
251 // forward decl for friend decls
252 template<double RequestTag::*, ReadyOption, bool>
253 struct ClientCompare;
256 friend PriorityQueueBase;
264 ClientReq(const RequestTag& _tag,
266 RequestRef&& _request) :
268 client_id(_client_id),
269 request(std::move(_request))
274 friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
275 out << "{ ClientReq:: tag:" << c.tag << " client:" <<
279 }; // class ClientReq
283 // NOTE: ClientRec is in the "public" section for compatibility
284 // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
285 // ClientRec could be "protected" with no issue. [See comments
286 // associated with function submit_top_request.]
288 friend PriorityQueueBase<C,R,B>;
292 std::deque<ClientReq> requests;
294 // amount added from the proportion tag as a result of
295 // an idle client becoming unidle
296 double prop_delta = 0.0;
298 c::IndIntruHeapData reserv_heap_data;
299 c::IndIntruHeapData lim_heap_data;
300 c::IndIntruHeapData ready_heap_data;
302 c::IndIntruHeapData prop_heap_data;
314 const ClientInfo& _info,
315 Counter current_tick) :
317 prev_tag(0.0, 0.0, 0.0, TimeZero),
320 last_tick(current_tick),
327 inline const RequestTag& get_req_tag() const {
331 static inline void assign_unpinned_tag(double& lhs, const double rhs) {
332 if (rhs != max_tag && rhs != min_tag) {
337 inline void update_req_tag(const RequestTag& _prev,
338 const Counter& _tick) {
339 assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
340 assign_unpinned_tag(prev_tag.limit, _prev.limit);
341 assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
345 inline void add_request(const RequestTag& tag,
347 RequestRef&& request) {
348 requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
351 inline const ClientReq& next_request() const {
352 return requests.front();
355 inline ClientReq& next_request() {
356 return requests.front();
359 inline void pop_request() {
360 requests.pop_front();
363 inline bool has_request() const {
364 return !requests.empty();
367 inline size_t request_count() const {
368 return requests.size();
371 // NB: because a deque is the underlying structure, this
372 // operation might be expensive
373 bool remove_by_req_filter_fw(std::function<bool(R&&)> filter_accum) {
374 bool any_removed = false;
375 for (auto i = requests.begin();
378 if (filter_accum(std::move(*i->request))) {
380 i = requests.erase(i);
388 // NB: because a deque is the underlying structure, this
389 // operation might be expensive
390 bool remove_by_req_filter_bw(std::function<bool(R&&)> filter_accum) {
391 bool any_removed = false;
392 for (auto i = requests.rbegin();
393 i != requests.rend();
395 if (filter_accum(std::move(*i->request))) {
397 i = decltype(i){ requests.erase(std::next(i).base()) };
406 remove_by_req_filter(std::function<bool(R&&)> filter_accum,
407 bool visit_backwards) {
408 if (visit_backwards) {
409 return remove_by_req_filter_bw(filter_accum);
411 return remove_by_req_filter_fw(filter_accum);
416 operator<<(std::ostream& out,
417 const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
418 out << "{ ClientRec::" <<
419 " client:" << e.client <<
420 " prev_tag:" << e.prev_tag <<
421 " req_count:" << e.requests.size() <<
423 if (e.has_request()) {
424 out << e.next_request();
432 }; // class ClientRec
434 using ClientRecRef = std::shared_ptr<ClientRec>;
436 // when we try to get the next request, we'll be in one of three
437 // situations -- we'll have one to return, have one that can
438 // fire in the future, or not have any
439 enum class NextReqType { returning, future, none };
441 // specifies which queue next request will get popped from
442 enum class HeapId { reservation, ready };
444 // this is returned from next_req to tell the caller the situation
454 // a function that can be called to look up client information
455 using ClientInfoFunc = std::function<ClientInfo(const C&)>;
459 DataGuard g(data_mtx);
460 return (resv_heap.empty() || ! resv_heap.top().has_request());
464 size_t client_count() const {
465 DataGuard g(data_mtx);
466 return resv_heap.size();
470 size_t request_count() const {
471 DataGuard g(data_mtx);
473 for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
474 total += i->request_count();
480 bool remove_by_req_filter(std::function<bool(R&&)> filter_accum,
481 bool visit_backwards = false) {
482 bool any_removed = false;
483 DataGuard g(data_mtx);
484 for (auto i : client_map) {
486 i.second->remove_by_req_filter(filter_accum, visit_backwards);
488 resv_heap.adjust(*i.second);
489 limit_heap.adjust(*i.second);
490 ready_heap.adjust(*i.second);
492 prop_heap.adjust(*i.second);
501 // use as a default value when no accumulator is provide
502 static void request_sink(R&& req) {
507 void remove_by_client(const C& client,
508 bool reverse = false,
509 std::function<void (R&&)> accum = request_sink) {
510 DataGuard g(data_mtx);
512 auto i = client_map.find(client);
514 if (i == client_map.end()) return;
517 for (auto j = i->second->requests.rbegin();
518 j != i->second->requests.rend();
520 accum(std::move(*j->request));
523 for (auto j = i->second->requests.begin();
524 j != i->second->requests.end();
526 accum(std::move(*j->request));
530 i->second->requests.clear();
532 resv_heap.adjust(*i->second);
533 limit_heap.adjust(*i->second);
534 ready_heap.adjust(*i->second);
536 prop_heap.adjust(*i->second);
541 uint get_heap_branching_factor() const {
546 friend std::ostream& operator<<(std::ostream& out,
547 const PriorityQueueBase& q) {
548 std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
550 out << "{ PriorityQueue::";
551 for (const auto& c : q.client_map) {
552 out << " { client:" << c.first << ", record:" << *c.second <<
555 if (!q.resv_heap.empty()) {
556 const auto& resv = q.resv_heap.top();
557 out << " { reservation_top:" << resv << " }";
558 const auto& ready = q.ready_heap.top();
559 out << " { ready_top:" << ready << " }";
560 const auto& limit = q.limit_heap.top();
561 out << " { limit_top:" << limit << " }";
563 out << " HEAPS-EMPTY";
571 void display_queues(std::ostream& out,
572 bool show_res = true,
573 bool show_lim = true,
574 bool show_ready = true,
575 bool show_prop = true) const {
576 auto filter = [](const ClientRec& e)->bool { return true; };
577 DataGuard g(data_mtx);
579 resv_heap.display_sorted(out << "RESER:", filter);
582 limit_heap.display_sorted(out << "LIMIT:", filter);
585 ready_heap.display_sorted(out << "READY:", filter);
589 prop_heap.display_sorted(out << "PROPO:", filter);
597 // The ClientCompare functor is essentially doing a precedes?
598 // operator, returning true if and only if the first parameter
599 // must precede the second parameter. If the second must precede
600 // the first, or if they are equivalent, false should be
601 // returned. The reason for this behavior is that it will be
602 // called to test if two items are out of order and if true is
603 // returned it will reverse the items. Therefore false is the
604 // default return when it doesn't matter to prevent unnecessary
607 // The template is supporting variations in sorting based on the
608 // heap in question and allowing these variations to be handled
611 // tag_field determines which tag is being used for comparison
613 // ready_opt determines how the ready flag influences the sort
615 // use_prop_delta determines whether the proportional delta is
616 // added in for comparison
617 template<double RequestTag::*tag_field,
618 ReadyOption ready_opt,
620 struct ClientCompare {
621 bool operator()(const ClientRec& n1, const ClientRec& n2) const {
622 if (n1.has_request()) {
623 if (n2.has_request()) {
624 const auto& t1 = n1.next_request().tag;
625 const auto& t2 = n2.next_request().tag;
626 if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
627 // if we don't care about ready or the ready values are the same
628 if (use_prop_delta) {
629 return (t1.*tag_field + n1.prop_delta) <
630 (t2.*tag_field + n2.prop_delta);
632 return t1.*tag_field < t2.*tag_field;
634 } else if (ReadyOption::raises == ready_opt) {
635 // use_ready == true && the ready fields are different
641 // n1 has request but n2 does not
644 } else if (n2.has_request()) {
645 // n2 has request but n1 does not
648 // both have none; keep stable w false
654 ClientInfoFunc client_info_f;
656 mutable std::mutex data_mtx;
657 using DataGuard = std::lock_guard<decltype(data_mtx)>;
659 // stable mapping between client ids and client queues
660 std::map<C,ClientRecRef> client_map;
662 c::IndIntruHeap<ClientRecRef,
664 &ClientRec::reserv_heap_data,
665 ClientCompare<&RequestTag::reservation,
670 c::IndIntruHeap<ClientRecRef,
672 &ClientRec::prop_heap_data,
673 ClientCompare<&RequestTag::proportion,
678 c::IndIntruHeap<ClientRecRef,
680 &ClientRec::lim_heap_data,
681 ClientCompare<&RequestTag::limit,
685 c::IndIntruHeap<ClientRecRef,
687 &ClientRec::ready_heap_data,
688 ClientCompare<&RequestTag::proportion,
693 // if all reservations are met and all other requestes are under
694 // limit, this will allow the request next in terms of
695 // proportion to still get issued
696 bool allow_limit_break;
698 std::atomic_bool finishing;
700 // every request creates a tick
703 // performance data collection
704 size_t reserv_sched_count = 0;
705 size_t prop_sched_count = 0;
706 size_t limit_break_sched_count = 0;
711 std::deque<MarkPoint> clean_mark_points;
713 // NB: All threads declared at end, so they're destructed first!
715 std::unique_ptr<RunEvery> cleaning_job;
718 // COMMON constructor that others feed into; we can accept three
719 // different variations of durations
720 template<typename Rep, typename Per>
721 PriorityQueueBase(ClientInfoFunc _client_info_f,
722 std::chrono::duration<Rep,Per> _idle_age,
723 std::chrono::duration<Rep,Per> _erase_age,
724 std::chrono::duration<Rep,Per> _check_time,
725 bool _allow_limit_break) :
726 client_info_f(_client_info_f),
727 allow_limit_break(_allow_limit_break),
729 idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
730 erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
731 check_time(std::chrono::duration_cast<Duration>(_check_time))
733 assert(_erase_age >= _idle_age);
734 assert(_check_time < _idle_age);
736 std::unique_ptr<RunEvery>(
737 new RunEvery(check_time,
738 std::bind(&PriorityQueueBase::do_clean, this)));
742 ~PriorityQueueBase() {
747 // data_mtx must be held by caller
748 void do_add_request(RequestRef&& request,
750 const ReqParams& req_params,
752 const double cost = 0.0) {
755 // this pointer will help us create a reference to a shared
756 // pointer, no matter which of two codepaths we take
757 ClientRec* temp_client;
759 auto client_it = client_map.find(client_id);
760 if (client_map.end() != client_it) {
761 temp_client = &(*client_it->second); // address of obj of shared_ptr
763 ClientInfo info = client_info_f(client_id);
764 ClientRecRef client_rec =
765 std::make_shared<ClientRec>(client_id, info, tick);
766 resv_heap.push(client_rec);
768 prop_heap.push(client_rec);
770 limit_heap.push(client_rec);
771 ready_heap.push(client_rec);
772 client_map[client_id] = client_rec;
773 temp_client = &(*client_rec); // address of obj of shared_ptr
776 // for convenience, we'll create a reference to the shared pointer
777 ClientRec& client = *temp_client;
780 // We need to do an adjustment so that idle clients compete
781 // fairly on proportional tags since those tags may have
782 // drifted from real-time. Either use the lowest existing
783 // proportion tag -- O(1) -- or the client with the lowest
784 // previous proportion tag -- O(n) where n = # clients.
786 // So we don't have to maintain a propotional queue that
787 // keeps the minimum on proportional tag alone (we're
788 // instead using a ready queue), we'll have to check each
791 // The alternative would be to maintain a proportional queue
792 // (define USE_PROP_TAG) and do an O(1) operation here.
794 // Was unable to confirm whether equality testing on
795 // std::numeric_limits<double>::max() is guaranteed, so
796 // we'll use a compile-time calculated trigger that is one
797 // third the max, which should be much larger than any
798 // expected organic value.
799 constexpr double lowest_prop_tag_trigger =
800 std::numeric_limits<double>::max() / 3.0;
802 double lowest_prop_tag = std::numeric_limits<double>::max();
803 for (auto const &c : client_map) {
804 // don't use ourselves (or anything else that might be
805 // listed as idle) since we're now in the map
806 if (!c.second->idle) {
808 // use either lowest proportion tag or previous proportion tag
809 if (c.second->has_request()) {
810 p = c.second->next_request().tag.proportion +
811 c.second->prop_delta;
813 p = c.second->get_req_tag().proportion + c.second->prop_delta;
816 if (p < lowest_prop_tag) {
822 // if this conditional does not fire, it
823 if (lowest_prop_tag < lowest_prop_tag_trigger) {
824 client.prop_delta = lowest_prop_tag - time;
827 } // if this client was idle
829 #ifndef DO_NOT_DELAY_TAG_CALC
830 RequestTag tag(0, 0, 0, time);
832 if (!client.has_request()) {
833 tag = RequestTag(client.get_req_tag(),
839 // copy tag to previous tag for client
840 client.update_req_tag(tag, tick);
843 RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
844 // copy tag to previous tag for client
845 client.update_req_tag(tag, tick);
848 client.add_request(tag, client.client, std::move(request));
849 if (1 == client.requests.size()) {
850 // NB: can the following 4 calls to adjust be changed
851 // promote? Can adding a request ever demote a client in the
853 resv_heap.adjust(client);
854 limit_heap.adjust(client);
855 ready_heap.adjust(client);
857 prop_heap.adjust(client);
861 client.cur_rho = req_params.rho;
862 client.cur_delta = req_params.delta;
864 resv_heap.adjust(client);
865 limit_heap.adjust(client);
866 ready_heap.adjust(client);
868 prop_heap.adjust(client);
873 // data_mtx should be held when called; top of heap should have
875 template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
876 void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
877 std::function<void(const C& client,
878 RequestRef& request)> process) {
879 // gain access to data
880 ClientRec& top = heap.top();
882 RequestRef request = std::move(top.next_request().request);
883 #ifndef DO_NOT_DELAY_TAG_CALC
884 RequestTag tag = top.next_request().tag;
887 // pop request and adjust heaps
890 #ifndef DO_NOT_DELAY_TAG_CALC
891 if (top.has_request()) {
892 ClientReq& next_first = top.next_request();
893 next_first.tag = RequestTag(tag, top.info,
894 top.cur_delta, top.cur_rho,
895 next_first.tag.arrival);
897 // copy tag to previous tag for client
898 top.update_req_tag(next_first.tag, tick);
902 resv_heap.demote(top);
903 limit_heap.adjust(top);
905 prop_heap.demote(top);
907 ready_heap.demote(top);
910 process(top.client, request);
911 } // pop_process_request
914 // data_mtx should be held when called
915 void reduce_reservation_tags(ClientRec& client) {
916 for (auto& r : client.requests) {
917 r.tag.reservation -= client.info.reservation_inv;
919 #ifndef DO_NOT_DELAY_TAG_CALC
920 // reduce only for front tag. because next tags' value are invalid
924 // don't forget to update previous tag
925 client.prev_tag.reservation -= client.info.reservation_inv;
926 resv_heap.promote(client);
930 // data_mtx should be held when called
931 void reduce_reservation_tags(const C& client_id) {
932 auto client_it = client_map.find(client_id);
934 // means the client was cleaned from map; should never happen
935 // as long as cleaning times are long enough
936 assert(client_map.end() != client_it);
937 reduce_reservation_tags(*client_it->second);
941 // data_mtx should be held when called
942 NextReq do_next_request(Time now) {
945 // if reservation queue is empty, all are empty (i.e., no active clients)
946 if(resv_heap.empty()) {
947 result.type = NextReqType::none;
951 // try constraint (reservation) based scheduling
953 auto& reserv = resv_heap.top();
954 if (reserv.has_request() &&
955 reserv.next_request().tag.reservation <= now) {
956 result.type = NextReqType::returning;
957 result.heap_id = HeapId::reservation;
961 // no existing reservations before now, so try weight-based
964 // all items that are within limit are eligible based on
966 auto limits = &limit_heap.top();
967 while (limits->has_request() &&
968 !limits->next_request().tag.ready &&
969 limits->next_request().tag.limit <= now) {
970 limits->next_request().tag.ready = true;
971 ready_heap.promote(*limits);
972 limit_heap.demote(*limits);
974 limits = &limit_heap.top();
977 auto& readys = ready_heap.top();
978 if (readys.has_request() &&
979 readys.next_request().tag.ready &&
980 readys.next_request().tag.proportion < max_tag) {
981 result.type = NextReqType::returning;
982 result.heap_id = HeapId::ready;
986 // if nothing is schedulable by reservation or
987 // proportion/weight, and if we allow limit break, try to
988 // schedule something with the lowest proportion tag or
989 // alternatively lowest reservation tag.
990 if (allow_limit_break) {
991 if (readys.has_request() &&
992 readys.next_request().tag.proportion < max_tag) {
993 result.type = NextReqType::returning;
994 result.heap_id = HeapId::ready;
996 } else if (reserv.has_request() &&
997 reserv.next_request().tag.reservation < max_tag) {
998 result.type = NextReqType::returning;
999 result.heap_id = HeapId::reservation;
1004 // nothing scheduled; make sure we re-run when next
1005 // reservation item or next limited item comes up
1007 Time next_call = TimeMax;
1008 if (resv_heap.top().has_request()) {
1010 min_not_0_time(next_call,
1011 resv_heap.top().next_request().tag.reservation);
1013 if (limit_heap.top().has_request()) {
1014 const auto& next = limit_heap.top().next_request();
1015 assert(!next.tag.ready || max_tag == next.tag.proportion);
1016 next_call = min_not_0_time(next_call, next.tag.limit);
1018 if (next_call < TimeMax) {
1019 result.type = NextReqType::future;
1020 result.when_ready = next_call;
1023 result.type = NextReqType::none;
1026 } // do_next_request
1029 // if possible is not zero and less than current then return it;
1030 // otherwise return current; the idea is we're trying to find
1031 // the minimal time but ignoring zero
1032 static inline const Time& min_not_0_time(const Time& current,
1033 const Time& possible) {
1034 return TimeZero == possible ? current : std::min(current, possible);
1039 * This is being called regularly by RunEvery. Every time it's
1040 * called it notes the time and delta counter (mark point) in a
1041 * deque. It also looks at the deque to find the most recent
1042 * mark point that is older than clean_age. It then walks the
1043 * map and delete all server entries that were last used before
1047 TimePoint now = std::chrono::steady_clock::now();
1048 DataGuard g(data_mtx);
1049 clean_mark_points.emplace_back(MarkPoint(now, tick));
1051 // first erase the super-old client records
1053 Counter erase_point = 0;
1054 auto point = clean_mark_points.front();
1055 while (point.first <= now - erase_age) {
1056 erase_point = point.second;
1057 clean_mark_points.pop_front();
1058 point = clean_mark_points.front();
1061 Counter idle_point = 0;
1062 for (auto i : clean_mark_points) {
1063 if (i.first <= now - idle_age) {
1064 idle_point = i.second;
1070 if (erase_point > 0 || idle_point > 0) {
1071 for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1073 if (erase_point && i2->second->last_tick <= erase_point) {
1074 delete_from_heaps(i2->second);
1075 client_map.erase(i2);
1076 } else if (idle_point && i2->second->last_tick <= idle_point) {
1077 i2->second->idle = true;
1084 // data_mtx must be held by caller
1085 template<IndIntruHeapData ClientRec::*C1,typename C2>
1086 void delete_from_heap(ClientRecRef& client,
1087 c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
1088 auto i = heap.rfind(client);
1093 // data_mtx must be held by caller
1094 void delete_from_heaps(ClientRecRef& client) {
1095 delete_from_heap(client, resv_heap);
1097 delete_from_heap(client, prop_heap);
1099 delete_from_heap(client, limit_heap);
1100 delete_from_heap(client, ready_heap);
1102 }; // class PriorityQueueBase
1105 template<typename C, typename R, uint B=2>
1106 class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
1107 using super = PriorityQueueBase<C,R,B>;
1111 // When a request is pulled, this is the return type.
1115 typename super::RequestRef request;
1119 typename super::NextReqType type;
1120 boost::variant<Retn,Time> data;
1122 bool is_none() const { return type == super::NextReqType::none; }
1124 bool is_retn() const { return type == super::NextReqType::returning; }
1126 return boost::get<Retn>(data);
1129 bool is_future() const { return type == super::NextReqType::future; }
1130 Time getTime() const { return boost::get<Time>(data); }
1135 ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1136 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1139 template<typename Rep, typename Per>
1140 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1141 std::chrono::duration<Rep,Per> _idle_age,
1142 std::chrono::duration<Rep,Per> _erase_age,
1143 std::chrono::duration<Rep,Per> _check_time,
1144 bool _allow_limit_break = false) :
1145 super(_client_info_f,
1146 _idle_age, _erase_age, _check_time,
1153 // pull convenience constructor
1154 PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1155 bool _allow_limit_break = false) :
1156 PullPriorityQueue(_client_info_f,
1157 std::chrono::minutes(10),
1158 std::chrono::minutes(15),
1159 std::chrono::minutes(6),
1166 inline void add_request(R&& request,
1168 const ReqParams& req_params,
1169 double addl_cost = 0.0) {
1170 add_request(typename super::RequestRef(new R(std::move(request))),
1178 inline void add_request(R&& request,
1180 double addl_cost = 0.0) {
1181 static const ReqParams null_req_params;
1182 add_request(typename super::RequestRef(new R(std::move(request))),
1191 inline void add_request_time(R&& request,
1193 const ReqParams& req_params,
1195 double addl_cost = 0.0) {
1196 add_request(typename super::RequestRef(new R(std::move(request))),
1204 inline void add_request(typename super::RequestRef&& request,
1206 const ReqParams& req_params,
1207 double addl_cost = 0.0) {
1208 add_request(request, req_params, client_id, get_time(), addl_cost);
1212 inline void add_request(typename super::RequestRef&& request,
1214 double addl_cost = 0.0) {
1215 static const ReqParams null_req_params;
1216 add_request(request, null_req_params, client_id, get_time(), addl_cost);
1220 // this does the work; the versions above provide alternate interfaces
1221 void add_request(typename super::RequestRef&& request,
1223 const ReqParams& req_params,
1225 double addl_cost = 0.0) {
1226 typename super::DataGuard g(this->data_mtx);
1228 add_request_timer.start();
1230 super::do_add_request(std::move(request),
1235 // no call to schedule_request for pull version
1237 add_request_timer.stop();
1242 inline PullReq pull_request() {
1243 return pull_request(get_time());
1247 PullReq pull_request(Time now) {
1249 typename super::DataGuard g(this->data_mtx);
1251 pull_request_timer.start();
1254 typename super::NextReq next = super::do_next_request(now);
1255 result.type = next.type;
1257 case super::NextReqType::none:
1259 case super::NextReqType::future:
1260 result.data = next.when_ready;
1262 case super::NextReqType::returning:
1263 // to avoid nesting, break out and let code below handle this case
1269 // we'll only get here if we're returning an entry
1272 [&] (PullReq& pull_result, PhaseType phase) ->
1273 std::function<void(const C&,
1274 typename super::RequestRef&)> {
1275 return [&pull_result, phase](const C& client,
1276 typename super::RequestRef& request) {
1278 typename PullReq::Retn{client, std::move(request), phase};
1282 switch(next.heap_id) {
1283 case super::HeapId::reservation:
1284 super::pop_process_request(this->resv_heap,
1285 process_f(result, PhaseType::reservation));
1286 ++this->reserv_sched_count;
1288 case super::HeapId::ready:
1289 super::pop_process_request(this->ready_heap,
1290 process_f(result, PhaseType::priority));
1291 { // need to use retn temporarily
1292 auto& retn = boost::get<typename PullReq::Retn>(result.data);
1293 super::reduce_reservation_tags(retn.client);
1295 ++this->prop_sched_count;
1302 pull_request_timer.stop();
1311 // data_mtx should be held when called; unfortunately this
1312 // function has to be repeated in both push & pull
1314 typename super::NextReq next_request() {
1315 return next_request(get_time());
1317 }; // class PullPriorityQueue
1321 template<typename C, typename R, uint B=2>
1322 class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
1326 using super = PriorityQueueBase<C,R,B>;
1330 // a function to see whether the server can handle another request
1331 using CanHandleRequestFunc = std::function<bool(void)>;
1333 // a function to submit a request to the server; the second
1334 // parameter is a callback when it's completed
1335 using HandleRequestFunc =
1336 std::function<void(const C&,typename super::RequestRef,PhaseType)>;
1340 CanHandleRequestFunc can_handle_f;
1341 HandleRequestFunc handle_f;
1342 // for handling timed scheduling
1343 std::mutex sched_ahead_mtx;
1344 std::condition_variable sched_ahead_cv;
1345 Time sched_ahead_when = TimeZero;
1349 ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1350 ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1354 // NB: threads declared last, so constructed last and destructed first
1356 std::thread sched_ahead_thd;
1360 // push full constructor
1361 template<typename Rep, typename Per>
1362 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1363 CanHandleRequestFunc _can_handle_f,
1364 HandleRequestFunc _handle_f,
1365 std::chrono::duration<Rep,Per> _idle_age,
1366 std::chrono::duration<Rep,Per> _erase_age,
1367 std::chrono::duration<Rep,Per> _check_time,
1368 bool _allow_limit_break = false) :
1369 super(_client_info_f,
1370 _idle_age, _erase_age, _check_time,
1373 can_handle_f = _can_handle_f;
1374 handle_f = _handle_f;
1375 sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1379 // push convenience constructor
1380 PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1381 CanHandleRequestFunc _can_handle_f,
1382 HandleRequestFunc _handle_f,
1383 bool _allow_limit_break = false) :
1384 PushPriorityQueue(_client_info_f,
1387 std::chrono::minutes(10),
1388 std::chrono::minutes(15),
1389 std::chrono::minutes(6),
1396 ~PushPriorityQueue() {
1397 this->finishing = true;
1398 sched_ahead_cv.notify_one();
1399 sched_ahead_thd.join();
1404 inline void add_request(R&& request,
1406 const ReqParams& req_params,
1407 double addl_cost = 0.0) {
1408 add_request(typename super::RequestRef(new R(std::move(request))),
1416 inline void add_request(typename super::RequestRef&& request,
1418 const ReqParams& req_params,
1419 double addl_cost = 0.0) {
1420 add_request(request, req_params, client_id, get_time(), addl_cost);
1424 inline void add_request_time(const R& request,
1426 const ReqParams& req_params,
1428 double addl_cost = 0.0) {
1429 add_request(typename super::RequestRef(new R(request)),
1437 void add_request(typename super::RequestRef&& request,
1439 const ReqParams& req_params,
1441 double addl_cost = 0.0) {
1442 typename super::DataGuard g(this->data_mtx);
1444 add_request_timer.start();
1446 super::do_add_request(std::move(request),
1453 add_request_timer.stop();
1458 void request_completed() {
1459 typename super::DataGuard g(this->data_mtx);
1461 request_complete_timer.start();
1465 request_complete_timer.stop();
1471 // data_mtx should be held when called; furthermore, the heap
1472 // should not be empty and the top element of the heap should
1473 // not be already handled
1475 // NOTE: the use of "super::ClientRec" in either the template
1476 // construct or as a parameter to submit_top_request generated
1477 // a compiler error in g++ 4.8.4, when ClientRec was
1478 // "protected" rather than "public". By g++ 6.3.1 this was not
1479 // an issue. But for backwards compatibility
1480 // PriorityQueueBase::ClientRec is public.
1481 template<typename C1,
1482 IndIntruHeapData super::ClientRec::*C2,
1485 C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1488 super::pop_process_request(heap,
1489 [this, phase, &client_result]
1491 typename super::RequestRef& request) {
1492 client_result = client;
1493 handle_f(client, std::move(request), phase);
1495 return client_result;
1499 // data_mtx should be held when called
1500 void submit_request(typename super::HeapId heap_id) {
1503 case super::HeapId::reservation:
1504 // don't need to note client
1505 (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1506 // unlike the other two cases, we do not reduce reservation
1508 ++this->reserv_sched_count;
1510 case super::HeapId::ready:
1511 client = submit_top_request(this->ready_heap, PhaseType::priority);
1512 super::reduce_reservation_tags(client);
1513 ++this->prop_sched_count;
1521 // data_mtx should be held when called; unfortunately this
1522 // function has to be repeated in both push & pull
1524 typename super::NextReq next_request() {
1525 return next_request(get_time());
1529 // data_mtx should be held when called; overrides member
1530 // function in base class to add check for whether a request can
1531 // be pushed to the server
1532 typename super::NextReq next_request(Time now) {
1533 if (!can_handle_f()) {
1534 typename super::NextReq result;
1535 result.type = super::NextReqType::none;
1538 return super::do_next_request(now);
1543 // data_mtx should be held when called
1544 void schedule_request() {
1545 typename super::NextReq next_req = next_request();
1546 switch (next_req.type) {
1547 case super::NextReqType::none:
1549 case super::NextReqType::future:
1550 sched_at(next_req.when_ready);
1552 case super::NextReqType::returning:
1553 submit_request(next_req.heap_id);
1561 // this is the thread that handles running schedule_request at
1562 // future times when nothing can be scheduled immediately
1563 void run_sched_ahead() {
1564 std::unique_lock<std::mutex> l(sched_ahead_mtx);
1566 while (!this->finishing) {
1567 if (TimeZero == sched_ahead_when) {
1568 sched_ahead_cv.wait(l);
1571 while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1572 long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1573 auto microseconds = std::chrono::microseconds(microseconds_l);
1574 sched_ahead_cv.wait_for(l, microseconds);
1576 sched_ahead_when = TimeZero;
1577 if (this->finishing) return;
1580 if (!this->finishing) {
1581 typename super::DataGuard g(this->data_mtx);
1590 void sched_at(Time when) {
1591 std::lock_guard<std::mutex> l(sched_ahead_mtx);
1592 if (this->finishing) return;
1593 if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1594 sched_ahead_when = when;
1595 sched_ahead_cv.notify_one();
1598 }; // class PushPriorityQueue
1600 } // namespace dmclock
1601 } // namespace crimson