Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / dmclock / src / dmclock_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5  * Copyright (C) 2017 Red Hat Inc.
6  */
7
8
9 #pragma once
10
11 /* COMPILATION OPTIONS
12  *
13  * By default we include an optimization over the originally published
14  * dmclock algorithm using not the values of rho and delta that were
15  * sent in with a request but instead the most recent rho and delta
16  * values from the requests's client. To restore the algorithm's
17  * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
18  * argument -DDO_NOT_DELAY_TAG_CALC).
19  *
20  * The prop_heap does not seem to be necessary. The only thing it
21  * would help with is quickly finding the mininum proportion/prioity
22  * when an idle client became active. To have the code maintain the
23  * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
24  * -DUSE_PROP_HEAP).
25  */
26
27 #include <assert.h>
28
29 #include <cmath>
30 #include <memory>
31 #include <map>
32 #include <deque>
33 #include <queue>
34 #include <atomic>
35 #include <mutex>
36 #include <condition_variable>
37 #include <thread>
38 #include <iostream>
39 #include <sstream>
40 #include <limits>
41
42 #include <boost/variant.hpp>
43
44 #include "indirect_intrusive_heap.h"
45 #include "run_every.h"
46 #include "dmclock_util.h"
47 #include "dmclock_recs.h"
48
49 #ifdef PROFILE
50 #include "profile.h"
51 #endif
52
53
54 namespace crimson {
55
56   namespace dmclock {
57
58     namespace c = crimson;
59
60     constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
61       std::numeric_limits<double>::infinity() :
62       std::numeric_limits<double>::max();
63     constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
64       -std::numeric_limits<double>::infinity() :
65       std::numeric_limits<double>::lowest();
66     constexpr uint tag_modulo = 1000000;
67
68     struct ClientInfo {
69       const double reservation;  // minimum
70       const double weight;       // proportional
71       const double limit;        // maximum
72
73       // multiplicative inverses of above, which we use in calculations
74       // and don't want to recalculate repeatedly
75       const double reservation_inv;
76       const double weight_inv;
77       const double limit_inv;
78
79       // order parameters -- min, "normal", max
80       ClientInfo(double _reservation, double _weight, double _limit) :
81         reservation(_reservation),
82         weight(_weight),
83         limit(_limit),
84         reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
85         weight_inv(     0.0 == weight      ? 0.0 : 1.0 / weight),
86         limit_inv(      0.0 == limit       ? 0.0 : 1.0 / limit)
87       {
88         // empty
89       }
90
91
92       friend std::ostream& operator<<(std::ostream& out,
93                                       const ClientInfo& client) {
94         out <<
95           "{ ClientInfo:: r:" << client.reservation <<
96           " w:" << std::fixed << client.weight <<
97           " l:" << std::fixed << client.limit <<
98           " 1/r:" << std::fixed << client.reservation_inv <<
99           " 1/w:" << std::fixed << client.weight_inv <<
100           " 1/l:" << std::fixed << client.limit_inv <<
101           " }";
102         return out;
103       }
104     }; // class ClientInfo
105
106
107     struct RequestTag {
108       double reservation;
109       double proportion;
110       double limit;
111       bool   ready; // true when within limit
112 #ifndef DO_NOT_DELAY_TAG_CALC
113       Time   arrival;
114 #endif
115
116       RequestTag(const RequestTag& prev_tag,
117                  const ClientInfo& client,
118                  const uint32_t delta,
119                  const uint32_t rho,
120                  const Time time,
121                  const double cost = 0.0) :
122         reservation(cost + tag_calc(time,
123                                     prev_tag.reservation,
124                                     client.reservation_inv,
125                                     rho,
126                                     true)),
127         proportion(tag_calc(time,
128                             prev_tag.proportion,
129                             client.weight_inv,
130                             delta,
131                             true)),
132         limit(tag_calc(time,
133                        prev_tag.limit,
134                        client.limit_inv,
135                        delta,
136                        false)),
137         ready(false)
138 #ifndef DO_NOT_DELAY_TAG_CALC
139         , arrival(time)
140 #endif
141       {
142         assert(reservation < max_tag || proportion < max_tag);
143       }
144
145       RequestTag(const RequestTag& prev_tag,
146                  const ClientInfo& client,
147                  const ReqParams req_params,
148                  const Time time,
149                  const double cost = 0.0) :
150         RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost)
151       { /* empty */ }
152
153       RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
154         reservation(_res),
155         proportion(_prop),
156         limit(_lim),
157         ready(false)
158 #ifndef DO_NOT_DELAY_TAG_CALC
159         , arrival(_arrival)
160 #endif
161       {
162         assert(reservation < max_tag || proportion < max_tag);
163       }
164
165       RequestTag(const RequestTag& other) :
166         reservation(other.reservation),
167         proportion(other.proportion),
168         limit(other.limit),
169         ready(other.ready)
170 #ifndef DO_NOT_DELAY_TAG_CALC
171         , arrival(other.arrival)
172 #endif
173       {
174         // empty
175       }
176
177       static std::string format_tag_change(double before, double after) {
178         if (before == after) {
179           return std::string("same");
180         } else {
181           std::stringstream ss;
182           ss << format_tag(before) << "=>" << format_tag(after);
183           return ss.str();
184         }
185       }
186
187       static std::string format_tag(double value) {
188         if (max_tag == value) {
189           return std::string("max");
190         } else if (min_tag == value) {
191           return std::string("min");
192         } else {
193           return format_time(value, tag_modulo);
194         }
195       }
196
197     private:
198
199       static double tag_calc(const Time time,
200                              double prev,
201                              double increment,
202                              uint32_t dist_req_val,
203                              bool extreme_is_high) {
204         if (0.0 == increment) {
205           return extreme_is_high ? max_tag : min_tag;
206         } else {
207           if (0 != dist_req_val) {
208             increment *= dist_req_val;
209           }
210           return std::max(time, prev + increment);
211         }
212       }
213
214       friend std::ostream& operator<<(std::ostream& out,
215                                       const RequestTag& tag) {
216         out <<
217           "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
218           " r:" << format_tag(tag.reservation) <<
219           " p:" << format_tag(tag.proportion) <<
220           " l:" << format_tag(tag.limit) <<
221 #if 0 // try to resolve this to make sure Time is operator<<'able.
222 #ifndef DO_NOT_DELAY_TAG_CALC
223           " arrival:" << tag.arrival <<
224 #endif
225 #endif
226           " }";
227         return out;
228       }
229     }; // class RequestTag
230
231
232     // C is client identifier type, R is request type, B is heap
233     // branching factor
234     template<typename C, typename R, uint B>
235     class PriorityQueueBase {
236       // we don't want to include gtest.h just for FRIEND_TEST
237       friend class dmclock_server_client_idle_erase_Test;
238
239     public:
240
241       using RequestRef = std::unique_ptr<R>;
242
243     protected:
244
245       using TimePoint = decltype(std::chrono::steady_clock::now());
246       using Duration = std::chrono::milliseconds;
247       using MarkPoint = std::pair<TimePoint,Counter>;
248
249       enum class ReadyOption {ignore, lowers, raises};
250
251       // forward decl for friend decls
252       template<double RequestTag::*, ReadyOption, bool>
253       struct ClientCompare;
254
255       class ClientReq {
256         friend PriorityQueueBase;
257
258         RequestTag tag;
259         C          client_id;
260         RequestRef request;
261
262       public:
263
264         ClientReq(const RequestTag& _tag,
265                   const C&          _client_id,
266                   RequestRef&&      _request) :
267           tag(_tag),
268           client_id(_client_id),
269           request(std::move(_request))
270         {
271           // empty
272         }
273
274         friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
275           out << "{ ClientReq:: tag:" << c.tag << " client:" <<
276             c.client_id << " }";
277           return out;
278         }
279       }; // class ClientReq
280
281     public:
282
283       // NOTE: ClientRec is in the "public" section for compatibility
284       // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
285       // ClientRec could be "protected" with no issue. [See comments
286       // associated with function submit_top_request.]
287       class ClientRec {
288         friend PriorityQueueBase<C,R,B>;
289
290         C                     client;
291         RequestTag            prev_tag;
292         std::deque<ClientReq> requests;
293
294         // amount added from the proportion tag as a result of
295         // an idle client becoming unidle
296         double                prop_delta = 0.0;
297
298         c::IndIntruHeapData   reserv_heap_data;
299         c::IndIntruHeapData   lim_heap_data;
300         c::IndIntruHeapData   ready_heap_data;
301 #if USE_PROP_HEAP
302         c::IndIntruHeapData   prop_heap_data;
303 #endif
304
305       public:
306
307         ClientInfo            info;
308         bool                  idle;
309         Counter               last_tick;
310         uint32_t              cur_rho;
311         uint32_t              cur_delta;
312
313         ClientRec(C _client,
314                   const ClientInfo& _info,
315                   Counter current_tick) :
316           client(_client),
317           prev_tag(0.0, 0.0, 0.0, TimeZero),
318           info(_info),
319           idle(true),
320           last_tick(current_tick),
321           cur_rho(1),
322           cur_delta(1)
323         {
324           // empty
325         }
326
327         inline const RequestTag& get_req_tag() const {
328           return prev_tag;
329         }
330
331         static inline void assign_unpinned_tag(double& lhs, const double rhs) {
332           if (rhs != max_tag && rhs != min_tag) {
333             lhs = rhs;
334           }
335         }
336
337         inline void update_req_tag(const RequestTag& _prev,
338                                    const Counter& _tick) {
339           assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
340           assign_unpinned_tag(prev_tag.limit, _prev.limit);
341           assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
342           last_tick = _tick;
343         }
344
345         inline void add_request(const RequestTag& tag,
346                                 const C&          client_id,
347                                 RequestRef&&      request) {
348           requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
349         }
350
351         inline const ClientReq& next_request() const {
352           return requests.front();
353         }
354
355         inline ClientReq& next_request() {
356           return requests.front();
357         }
358
359         inline void pop_request() {
360           requests.pop_front();
361         }
362
363         inline bool has_request() const {
364           return !requests.empty();
365         }
366
367         inline size_t request_count() const {
368           return requests.size();
369         }
370
371         // NB: because a deque is the underlying structure, this
372         // operation might be expensive
373         bool remove_by_req_filter_fw(std::function<bool(R&&)> filter_accum) {
374           bool any_removed = false;
375           for (auto i = requests.begin();
376                i != requests.end();
377                /* no inc */) {
378             if (filter_accum(std::move(*i->request))) {
379               any_removed = true;
380               i = requests.erase(i);
381             } else {
382               ++i;
383             }
384           }
385           return any_removed;
386         }
387
388         // NB: because a deque is the underlying structure, this
389         // operation might be expensive
390         bool remove_by_req_filter_bw(std::function<bool(R&&)> filter_accum) {
391           bool any_removed = false;
392           for (auto i = requests.rbegin();
393                i != requests.rend();
394                /* no inc */) {
395             if (filter_accum(std::move(*i->request))) {
396               any_removed = true;
397               i = decltype(i){ requests.erase(std::next(i).base()) };
398             } else {
399               ++i;
400             }
401           }
402           return any_removed;
403         }
404
405         inline bool
406         remove_by_req_filter(std::function<bool(R&&)> filter_accum,
407                              bool visit_backwards) {
408           if (visit_backwards) {
409             return remove_by_req_filter_bw(filter_accum);
410           } else {
411             return remove_by_req_filter_fw(filter_accum);
412           }
413         }
414
415         friend std::ostream&
416         operator<<(std::ostream& out,
417                    const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
418           out << "{ ClientRec::" <<
419             " client:" << e.client <<
420             " prev_tag:" << e.prev_tag <<
421             " req_count:" << e.requests.size() <<
422             " top_req:";
423           if (e.has_request()) {
424             out << e.next_request();
425           } else {
426             out << "none";
427           }
428           out << " }";
429
430           return out;
431         }
432       }; // class ClientRec
433
434       using ClientRecRef = std::shared_ptr<ClientRec>;
435
436       // when we try to get the next request, we'll be in one of three
437       // situations -- we'll have one to return, have one that can
438       // fire in the future, or not have any
439       enum class NextReqType { returning, future, none };
440
441       // specifies which queue next request will get popped from
442       enum class HeapId { reservation, ready };
443
444       // this is returned from next_req to tell the caller the situation
445       struct NextReq {
446         NextReqType type;
447         union {
448           HeapId    heap_id;
449           Time      when_ready;
450         };
451       };
452
453
454       // a function that can be called to look up client information
455       using ClientInfoFunc = std::function<ClientInfo(const C&)>;
456
457
458       bool empty() const {
459         DataGuard g(data_mtx);
460         return (resv_heap.empty() || ! resv_heap.top().has_request());
461       }
462
463
464       size_t client_count() const {
465         DataGuard g(data_mtx);
466         return resv_heap.size();
467       }
468
469
470       size_t request_count() const {
471         DataGuard g(data_mtx);
472         size_t total = 0;
473         for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
474           total += i->request_count();
475         }
476         return total;
477       }
478
479
480       bool remove_by_req_filter(std::function<bool(R&&)> filter_accum,
481                                 bool visit_backwards = false) {
482         bool any_removed = false;
483         DataGuard g(data_mtx);
484         for (auto i : client_map) {
485           bool modified =
486             i.second->remove_by_req_filter(filter_accum, visit_backwards);
487           if (modified) {
488             resv_heap.adjust(*i.second);
489             limit_heap.adjust(*i.second);
490             ready_heap.adjust(*i.second);
491 #if USE_PROP_HEAP
492             prop_heap.adjust(*i.second);
493 #endif
494             any_removed = true;
495           }
496         }
497         return any_removed;
498       }
499
500
501       // use as a default value when no accumulator is provide
502       static void request_sink(R&& req) {
503         // do nothing
504       }
505
506
507       void remove_by_client(const C& client,
508                             bool reverse = false,
509                             std::function<void (R&&)> accum = request_sink) {
510         DataGuard g(data_mtx);
511
512         auto i = client_map.find(client);
513
514         if (i == client_map.end()) return;
515
516         if (reverse) {
517           for (auto j = i->second->requests.rbegin();
518                j != i->second->requests.rend();
519                ++j) {
520             accum(std::move(*j->request));
521           }
522         } else {
523           for (auto j = i->second->requests.begin();
524                j != i->second->requests.end();
525                ++j) {
526             accum(std::move(*j->request));
527           }
528         }
529
530         i->second->requests.clear();
531
532         resv_heap.adjust(*i->second);
533         limit_heap.adjust(*i->second);
534         ready_heap.adjust(*i->second);
535 #if USE_PROP_HEAP
536         prop_heap.adjust(*i->second);
537 #endif
538       }
539
540
541       uint get_heap_branching_factor() const {
542         return B;
543       }
544
545
546       friend std::ostream& operator<<(std::ostream& out,
547                                       const PriorityQueueBase& q) {
548         std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
549
550         out << "{ PriorityQueue::";
551         for (const auto& c : q.client_map) {
552           out << "  { client:" << c.first << ", record:" << *c.second <<
553             " }";
554         }
555         if (!q.resv_heap.empty()) {
556           const auto& resv = q.resv_heap.top();
557           out << " { reservation_top:" << resv << " }";
558           const auto& ready = q.ready_heap.top();
559           out << " { ready_top:" << ready << " }";
560           const auto& limit = q.limit_heap.top();
561           out << " { limit_top:" << limit << " }";
562         } else {
563           out << " HEAPS-EMPTY";
564         }
565         out << " }";
566
567         return out;
568       }
569
570       // for debugging
571       void display_queues(std::ostream& out,
572                           bool show_res = true,
573                           bool show_lim = true,
574                           bool show_ready = true,
575                           bool show_prop = true) const {
576         auto filter = [](const ClientRec& e)->bool { return true; };
577         DataGuard g(data_mtx);
578         if (show_res) {
579           resv_heap.display_sorted(out << "RESER:", filter);
580         }
581         if (show_lim) {
582           limit_heap.display_sorted(out << "LIMIT:", filter);
583         }
584         if (show_ready) {
585           ready_heap.display_sorted(out << "READY:", filter);
586         }
587 #if USE_PROP_HEAP
588         if (show_prop) {
589           prop_heap.display_sorted(out << "PROPO:", filter);
590         }
591 #endif
592       } // display_queues
593
594
595     protected:
596
597       // The ClientCompare functor is essentially doing a precedes?
598       // operator, returning true if and only if the first parameter
599       // must precede the second parameter. If the second must precede
600       // the first, or if they are equivalent, false should be
601       // returned. The reason for this behavior is that it will be
602       // called to test if two items are out of order and if true is
603       // returned it will reverse the items. Therefore false is the
604       // default return when it doesn't matter to prevent unnecessary
605       // re-ordering.
606       //
607       // The template is supporting variations in sorting based on the
608       // heap in question and allowing these variations to be handled
609       // at compile-time.
610       //
611       // tag_field determines which tag is being used for comparison
612       //
613       // ready_opt determines how the ready flag influences the sort
614       //
615       // use_prop_delta determines whether the proportional delta is
616       // added in for comparison
617       template<double RequestTag::*tag_field,
618                ReadyOption ready_opt,
619                bool use_prop_delta>
620       struct ClientCompare {
621         bool operator()(const ClientRec& n1, const ClientRec& n2) const {
622           if (n1.has_request()) {
623             if (n2.has_request()) {
624               const auto& t1 = n1.next_request().tag;
625               const auto& t2 = n2.next_request().tag;
626               if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
627                 // if we don't care about ready or the ready values are the same
628                 if (use_prop_delta) {
629                   return (t1.*tag_field + n1.prop_delta) <
630                     (t2.*tag_field + n2.prop_delta);
631                 } else {
632                   return t1.*tag_field < t2.*tag_field;
633                 }
634               } else if (ReadyOption::raises == ready_opt) {
635                 // use_ready == true && the ready fields are different
636                 return t1.ready;
637               } else {
638                 return t2.ready;
639               }
640             } else {
641               // n1 has request but n2 does not
642               return true;
643             }
644           } else if (n2.has_request()) {
645             // n2 has request but n1 does not
646             return false;
647           } else {
648             // both have none; keep stable w false
649             return false;
650           }
651         }
652       };
653
654       ClientInfoFunc       client_info_f;
655
656       mutable std::mutex data_mtx;
657       using DataGuard = std::lock_guard<decltype(data_mtx)>;
658
659       // stable mapping between client ids and client queues
660       std::map<C,ClientRecRef> client_map;
661
662       c::IndIntruHeap<ClientRecRef,
663                       ClientRec,
664                       &ClientRec::reserv_heap_data,
665                       ClientCompare<&RequestTag::reservation,
666                                     ReadyOption::ignore,
667                                     false>,
668                       B> resv_heap;
669 #if USE_PROP_HEAP
670       c::IndIntruHeap<ClientRecRef,
671                       ClientRec,
672                       &ClientRec::prop_heap_data,
673                       ClientCompare<&RequestTag::proportion,
674                                     ReadyOption::ignore,
675                                     true>,
676                       B> prop_heap;
677 #endif
678       c::IndIntruHeap<ClientRecRef,
679                       ClientRec,
680                       &ClientRec::lim_heap_data,
681                       ClientCompare<&RequestTag::limit,
682                                     ReadyOption::lowers,
683                                     false>,
684                       B> limit_heap;
685       c::IndIntruHeap<ClientRecRef,
686                       ClientRec,
687                       &ClientRec::ready_heap_data,
688                       ClientCompare<&RequestTag::proportion,
689                                     ReadyOption::raises,
690                                     true>,
691                       B> ready_heap;
692
693       // if all reservations are met and all other requestes are under
694       // limit, this will allow the request next in terms of
695       // proportion to still get issued
696       bool             allow_limit_break;
697
698       std::atomic_bool finishing;
699
700       // every request creates a tick
701       Counter tick = 0;
702
703       // performance data collection
704       size_t reserv_sched_count = 0;
705       size_t prop_sched_count = 0;
706       size_t limit_break_sched_count = 0;
707
708       Duration                  idle_age;
709       Duration                  erase_age;
710       Duration                  check_time;
711       std::deque<MarkPoint>     clean_mark_points;
712
713       // NB: All threads declared at end, so they're destructed first!
714
715       std::unique_ptr<RunEvery> cleaning_job;
716
717
718       // COMMON constructor that others feed into; we can accept three
719       // different variations of durations
720       template<typename Rep, typename Per>
721       PriorityQueueBase(ClientInfoFunc _client_info_f,
722                         std::chrono::duration<Rep,Per> _idle_age,
723                         std::chrono::duration<Rep,Per> _erase_age,
724                         std::chrono::duration<Rep,Per> _check_time,
725                         bool _allow_limit_break) :
726         client_info_f(_client_info_f),
727         allow_limit_break(_allow_limit_break),
728         finishing(false),
729         idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
730         erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
731         check_time(std::chrono::duration_cast<Duration>(_check_time))
732       {
733         assert(_erase_age >= _idle_age);
734         assert(_check_time < _idle_age);
735         cleaning_job =
736           std::unique_ptr<RunEvery>(
737             new RunEvery(check_time,
738                          std::bind(&PriorityQueueBase::do_clean, this)));
739       }
740
741
742       ~PriorityQueueBase() {
743         finishing = true;
744       }
745
746
747       // data_mtx must be held by caller
748       void do_add_request(RequestRef&& request,
749                           const C& client_id,
750                           const ReqParams& req_params,
751                           const Time time,
752                           const double cost = 0.0) {
753         ++tick;
754
755         // this pointer will help us create a reference to a shared
756         // pointer, no matter which of two codepaths we take
757         ClientRec* temp_client;
758
759         auto client_it = client_map.find(client_id);
760         if (client_map.end() != client_it) {
761           temp_client = &(*client_it->second); // address of obj of shared_ptr
762         } else {
763           ClientInfo info = client_info_f(client_id);
764           ClientRecRef client_rec =
765             std::make_shared<ClientRec>(client_id, info, tick);
766           resv_heap.push(client_rec);
767 #if USE_PROP_HEAP
768           prop_heap.push(client_rec);
769 #endif
770           limit_heap.push(client_rec);
771           ready_heap.push(client_rec);
772           client_map[client_id] = client_rec;
773           temp_client = &(*client_rec); // address of obj of shared_ptr
774         }
775
776         // for convenience, we'll create a reference to the shared pointer
777         ClientRec& client = *temp_client;
778
779         if (client.idle) {
780           // We need to do an adjustment so that idle clients compete
781           // fairly on proportional tags since those tags may have
782           // drifted from real-time. Either use the lowest existing
783           // proportion tag -- O(1) -- or the client with the lowest
784           // previous proportion tag -- O(n) where n = # clients.
785           //
786           // So we don't have to maintain a propotional queue that
787           // keeps the minimum on proportional tag alone (we're
788           // instead using a ready queue), we'll have to check each
789           // client.
790           //
791           // The alternative would be to maintain a proportional queue
792           // (define USE_PROP_TAG) and do an O(1) operation here.
793
794           // Was unable to confirm whether equality testing on
795           // std::numeric_limits<double>::max() is guaranteed, so
796           // we'll use a compile-time calculated trigger that is one
797           // third the max, which should be much larger than any
798           // expected organic value.
799           constexpr double lowest_prop_tag_trigger =
800             std::numeric_limits<double>::max() / 3.0;
801
802           double lowest_prop_tag = std::numeric_limits<double>::max();
803           for (auto const &c : client_map) {
804             // don't use ourselves (or anything else that might be
805             // listed as idle) since we're now in the map
806             if (!c.second->idle) {
807               double p;
808               // use either lowest proportion tag or previous proportion tag
809               if (c.second->has_request()) {
810                 p = c.second->next_request().tag.proportion +
811                   c.second->prop_delta;
812               } else {
813                 p = c.second->get_req_tag().proportion + c.second->prop_delta;
814               }
815
816               if (p < lowest_prop_tag) {
817                 lowest_prop_tag = p;
818               }
819             }
820           }
821
822           // if this conditional does not fire, it
823           if (lowest_prop_tag < lowest_prop_tag_trigger) {
824             client.prop_delta = lowest_prop_tag - time;
825           }
826           client.idle = false;
827         } // if this client was idle
828
829 #ifndef DO_NOT_DELAY_TAG_CALC
830         RequestTag tag(0, 0, 0, time);
831
832         if (!client.has_request()) {
833           tag = RequestTag(client.get_req_tag(),
834                            client.info,
835                            req_params,
836                            time,
837                            cost);
838
839           // copy tag to previous tag for client
840           client.update_req_tag(tag, tick);
841         }
842 #else
843         RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
844         // copy tag to previous tag for client
845         client.update_req_tag(tag, tick);
846 #endif
847
848         client.add_request(tag, client.client, std::move(request));
849         if (1 == client.requests.size()) {
850           // NB: can the following 4 calls to adjust be changed
851           // promote? Can adding a request ever demote a client in the
852           // heaps?
853           resv_heap.adjust(client);
854           limit_heap.adjust(client);
855           ready_heap.adjust(client);
856 #if USE_PROP_HEAP
857           prop_heap.adjust(client);
858 #endif
859         }
860
861         client.cur_rho = req_params.rho;
862         client.cur_delta = req_params.delta;
863
864         resv_heap.adjust(client);
865         limit_heap.adjust(client);
866         ready_heap.adjust(client);
867 #if USE_PROP_HEAP
868         prop_heap.adjust(client);
869 #endif
870       } // add_request
871
872
873       // data_mtx should be held when called; top of heap should have
874       // a ready request
875       template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
876       void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
877                                std::function<void(const C& client,
878                                                   RequestRef& request)> process) {
879         // gain access to data
880         ClientRec& top = heap.top();
881
882         RequestRef request = std::move(top.next_request().request);
883 #ifndef DO_NOT_DELAY_TAG_CALC
884         RequestTag tag = top.next_request().tag;
885 #endif
886
887         // pop request and adjust heaps
888         top.pop_request();
889
890 #ifndef DO_NOT_DELAY_TAG_CALC
891         if (top.has_request()) {
892           ClientReq& next_first = top.next_request();
893           next_first.tag = RequestTag(tag, top.info,
894                                       top.cur_delta, top.cur_rho,
895                                       next_first.tag.arrival);
896
897           // copy tag to previous tag for client
898           top.update_req_tag(next_first.tag, tick);
899         }
900 #endif
901
902         resv_heap.demote(top);
903         limit_heap.adjust(top);
904 #if USE_PROP_HEAP
905         prop_heap.demote(top);
906 #endif
907         ready_heap.demote(top);
908
909         // process
910         process(top.client, request);
911       } // pop_process_request
912
913
914       // data_mtx should be held when called
915       void reduce_reservation_tags(ClientRec& client) {
916         for (auto& r : client.requests) {
917           r.tag.reservation -= client.info.reservation_inv;
918
919 #ifndef DO_NOT_DELAY_TAG_CALC
920           // reduce only for front tag. because next tags' value are invalid
921           break;
922 #endif
923         }
924         // don't forget to update previous tag
925         client.prev_tag.reservation -= client.info.reservation_inv;
926         resv_heap.promote(client);
927       }
928
929
930       // data_mtx should be held when called
931       void reduce_reservation_tags(const C& client_id) {
932         auto client_it = client_map.find(client_id);
933
934         // means the client was cleaned from map; should never happen
935         // as long as cleaning times are long enough
936         assert(client_map.end() != client_it);
937         reduce_reservation_tags(*client_it->second);
938       }
939
940
941       // data_mtx should be held when called
942       NextReq do_next_request(Time now) {
943         NextReq result;
944
945         // if reservation queue is empty, all are empty (i.e., no active clients)
946         if(resv_heap.empty()) {
947           result.type = NextReqType::none;
948           return result;
949         }
950
951         // try constraint (reservation) based scheduling
952
953         auto& reserv = resv_heap.top();
954         if (reserv.has_request() &&
955             reserv.next_request().tag.reservation <= now) {
956           result.type = NextReqType::returning;
957           result.heap_id = HeapId::reservation;
958           return result;
959         }
960
961         // no existing reservations before now, so try weight-based
962         // scheduling
963
964         // all items that are within limit are eligible based on
965         // priority
966         auto limits = &limit_heap.top();
967         while (limits->has_request() &&
968                !limits->next_request().tag.ready &&
969                limits->next_request().tag.limit <= now) {
970           limits->next_request().tag.ready = true;
971           ready_heap.promote(*limits);
972           limit_heap.demote(*limits);
973
974           limits = &limit_heap.top();
975         }
976
977         auto& readys = ready_heap.top();
978         if (readys.has_request() &&
979             readys.next_request().tag.ready &&
980             readys.next_request().tag.proportion < max_tag) {
981           result.type = NextReqType::returning;
982           result.heap_id = HeapId::ready;
983           return result;
984         }
985
986         // if nothing is schedulable by reservation or
987         // proportion/weight, and if we allow limit break, try to
988         // schedule something with the lowest proportion tag or
989         // alternatively lowest reservation tag.
990         if (allow_limit_break) {
991           if (readys.has_request() &&
992               readys.next_request().tag.proportion < max_tag) {
993             result.type = NextReqType::returning;
994             result.heap_id = HeapId::ready;
995             return result;
996           } else if (reserv.has_request() &&
997                      reserv.next_request().tag.reservation < max_tag) {
998             result.type = NextReqType::returning;
999             result.heap_id = HeapId::reservation;
1000             return result;
1001           }
1002         }
1003
1004         // nothing scheduled; make sure we re-run when next
1005         // reservation item or next limited item comes up
1006
1007         Time next_call = TimeMax;
1008         if (resv_heap.top().has_request()) {
1009           next_call =
1010             min_not_0_time(next_call,
1011                            resv_heap.top().next_request().tag.reservation);
1012         }
1013         if (limit_heap.top().has_request()) {
1014           const auto& next = limit_heap.top().next_request();
1015           assert(!next.tag.ready || max_tag == next.tag.proportion);
1016           next_call = min_not_0_time(next_call, next.tag.limit);
1017         }
1018         if (next_call < TimeMax) {
1019           result.type = NextReqType::future;
1020           result.when_ready = next_call;
1021           return result;
1022         } else {
1023           result.type = NextReqType::none;
1024           return result;
1025         }
1026       } // do_next_request
1027
1028
1029       // if possible is not zero and less than current then return it;
1030       // otherwise return current; the idea is we're trying to find
1031       // the minimal time but ignoring zero
1032       static inline const Time& min_not_0_time(const Time& current,
1033                                                const Time& possible) {
1034         return TimeZero == possible ? current : std::min(current, possible);
1035       }
1036
1037
1038       /*
1039        * This is being called regularly by RunEvery. Every time it's
1040        * called it notes the time and delta counter (mark point) in a
1041        * deque. It also looks at the deque to find the most recent
1042        * mark point that is older than clean_age. It then walks the
1043        * map and delete all server entries that were last used before
1044        * that mark point.
1045        */
1046       void do_clean() {
1047         TimePoint now = std::chrono::steady_clock::now();
1048         DataGuard g(data_mtx);
1049         clean_mark_points.emplace_back(MarkPoint(now, tick));
1050
1051         // first erase the super-old client records
1052
1053         Counter erase_point = 0;
1054         auto point = clean_mark_points.front();
1055         while (point.first <= now - erase_age) {
1056           erase_point = point.second;
1057           clean_mark_points.pop_front();
1058           point = clean_mark_points.front();
1059         }
1060
1061         Counter idle_point = 0;
1062         for (auto i : clean_mark_points) {
1063           if (i.first <= now - idle_age) {
1064             idle_point = i.second;
1065           } else {
1066             break;
1067           }
1068         }
1069
1070         if (erase_point > 0 || idle_point > 0) {
1071           for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
1072             auto i2 = i++;
1073             if (erase_point && i2->second->last_tick <= erase_point) {
1074               delete_from_heaps(i2->second);
1075               client_map.erase(i2);
1076             } else if (idle_point && i2->second->last_tick <= idle_point) {
1077               i2->second->idle = true;
1078             }
1079           } // for
1080         } // if
1081       } // do_clean
1082
1083
1084       // data_mtx must be held by caller
1085       template<IndIntruHeapData ClientRec::*C1,typename C2>
1086       void delete_from_heap(ClientRecRef& client,
1087                             c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
1088         auto i = heap.rfind(client);
1089         heap.remove(i);
1090       }
1091
1092
1093       // data_mtx must be held by caller
1094       void delete_from_heaps(ClientRecRef& client) {
1095         delete_from_heap(client, resv_heap);
1096 #if USE_PROP_HEAP
1097         delete_from_heap(client, prop_heap);
1098 #endif
1099         delete_from_heap(client, limit_heap);
1100         delete_from_heap(client, ready_heap);
1101       }
1102     }; // class PriorityQueueBase
1103
1104
1105     template<typename C, typename R, uint B=2>
1106     class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
1107       using super = PriorityQueueBase<C,R,B>;
1108
1109     public:
1110
1111       // When a request is pulled, this is the return type.
1112       struct PullReq {
1113         struct Retn {
1114           C                           client;
1115           typename super::RequestRef  request;
1116           PhaseType                   phase;
1117         };
1118
1119         typename super::NextReqType   type;
1120         boost::variant<Retn,Time>     data;
1121
1122         bool is_none() const { return type == super::NextReqType::none; }
1123
1124         bool is_retn() const { return type == super::NextReqType::returning; }
1125         Retn& get_retn() {
1126           return boost::get<Retn>(data);
1127         }
1128
1129         bool is_future() const { return type == super::NextReqType::future; }
1130         Time getTime() const { return boost::get<Time>(data); }
1131       };
1132
1133
1134 #ifdef PROFILE
1135       ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
1136       ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1137 #endif
1138
1139       template<typename Rep, typename Per>
1140       PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1141                         std::chrono::duration<Rep,Per> _idle_age,
1142                         std::chrono::duration<Rep,Per> _erase_age,
1143                         std::chrono::duration<Rep,Per> _check_time,
1144                         bool _allow_limit_break = false) :
1145         super(_client_info_f,
1146               _idle_age, _erase_age, _check_time,
1147               _allow_limit_break)
1148       {
1149         // empty
1150       }
1151
1152
1153       // pull convenience constructor
1154       PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1155                         bool _allow_limit_break = false) :
1156         PullPriorityQueue(_client_info_f,
1157                           std::chrono::minutes(10),
1158                           std::chrono::minutes(15),
1159                           std::chrono::minutes(6),
1160                           _allow_limit_break)
1161       {
1162         // empty
1163       }
1164
1165
1166       inline void add_request(R&& request,
1167                               const C& client_id,
1168                               const ReqParams& req_params,
1169                               double addl_cost = 0.0) {
1170         add_request(typename super::RequestRef(new R(std::move(request))),
1171                     client_id,
1172                     req_params,
1173                     get_time(),
1174                     addl_cost);
1175       }
1176
1177
1178       inline void add_request(R&& request,
1179                               const C& client_id,
1180                               double addl_cost = 0.0) {
1181         static const ReqParams null_req_params;
1182         add_request(typename super::RequestRef(new R(std::move(request))),
1183                     client_id,
1184                     null_req_params,
1185                     get_time(),
1186                     addl_cost);
1187       }
1188
1189
1190
1191       inline void add_request_time(R&& request,
1192                                    const C& client_id,
1193                                    const ReqParams& req_params,
1194                                    const Time time,
1195                                    double addl_cost = 0.0) {
1196         add_request(typename super::RequestRef(new R(std::move(request))),
1197                     client_id,
1198                     req_params,
1199                     time,
1200                     addl_cost);
1201       }
1202
1203
1204       inline void add_request(typename super::RequestRef&& request,
1205                               const C& client_id,
1206                               const ReqParams& req_params,
1207                               double addl_cost = 0.0) {
1208         add_request(request, req_params, client_id, get_time(), addl_cost);
1209       }
1210
1211
1212       inline void add_request(typename super::RequestRef&& request,
1213                               const C& client_id,
1214                               double addl_cost = 0.0) {
1215         static const ReqParams null_req_params;
1216         add_request(request, null_req_params, client_id, get_time(), addl_cost);
1217       }
1218
1219
1220       // this does the work; the versions above provide alternate interfaces
1221       void add_request(typename super::RequestRef&& request,
1222                        const C&                     client_id,
1223                        const ReqParams&             req_params,
1224                        const Time                   time,
1225                        double                       addl_cost = 0.0) {
1226         typename super::DataGuard g(this->data_mtx);
1227 #ifdef PROFILE
1228         add_request_timer.start();
1229 #endif
1230         super::do_add_request(std::move(request),
1231                               client_id,
1232                               req_params,
1233                               time,
1234                               addl_cost);
1235         // no call to schedule_request for pull version
1236 #ifdef PROFILE
1237         add_request_timer.stop();
1238 #endif
1239       }
1240
1241
1242       inline PullReq pull_request() {
1243         return pull_request(get_time());
1244       }
1245
1246
1247       PullReq pull_request(Time now) {
1248         PullReq result;
1249         typename super::DataGuard g(this->data_mtx);
1250 #ifdef PROFILE
1251         pull_request_timer.start();
1252 #endif
1253
1254         typename super::NextReq next = super::do_next_request(now);
1255         result.type = next.type;
1256         switch(next.type) {
1257         case super::NextReqType::none:
1258           return result;
1259         case super::NextReqType::future:
1260           result.data = next.when_ready;
1261           return result;
1262         case super::NextReqType::returning:
1263           // to avoid nesting, break out and let code below handle this case
1264           break;
1265         default:
1266           assert(false);
1267         }
1268
1269         // we'll only get here if we're returning an entry
1270
1271         auto process_f =
1272           [&] (PullReq& pull_result, PhaseType phase) ->
1273           std::function<void(const C&,
1274                              typename super::RequestRef&)> {
1275           return [&pull_result, phase](const C& client,
1276                                        typename super::RequestRef& request) {
1277             pull_result.data =
1278             typename PullReq::Retn{client, std::move(request), phase};
1279           };
1280         };
1281
1282         switch(next.heap_id) {
1283         case super::HeapId::reservation:
1284           super::pop_process_request(this->resv_heap,
1285                                      process_f(result, PhaseType::reservation));
1286           ++this->reserv_sched_count;
1287           break;
1288         case super::HeapId::ready:
1289           super::pop_process_request(this->ready_heap,
1290                                      process_f(result, PhaseType::priority));
1291           { // need to use retn temporarily
1292             auto& retn = boost::get<typename PullReq::Retn>(result.data);
1293             super::reduce_reservation_tags(retn.client);
1294           }
1295           ++this->prop_sched_count;
1296           break;
1297         default:
1298           assert(false);
1299         }
1300
1301 #ifdef PROFILE
1302         pull_request_timer.stop();
1303 #endif
1304         return result;
1305       } // pull_request
1306
1307
1308     protected:
1309
1310
1311       // data_mtx should be held when called; unfortunately this
1312       // function has to be repeated in both push & pull
1313       // specializations
1314       typename super::NextReq next_request() {
1315         return next_request(get_time());
1316       }
1317     }; // class PullPriorityQueue
1318
1319
1320     // PUSH version
1321     template<typename C, typename R, uint B=2>
1322     class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
1323
1324     protected:
1325
1326       using super = PriorityQueueBase<C,R,B>;
1327
1328     public:
1329
1330       // a function to see whether the server can handle another request
1331       using CanHandleRequestFunc = std::function<bool(void)>;
1332
1333       // a function to submit a request to the server; the second
1334       // parameter is a callback when it's completed
1335       using HandleRequestFunc =
1336         std::function<void(const C&,typename super::RequestRef,PhaseType)>;
1337
1338     protected:
1339
1340       CanHandleRequestFunc can_handle_f;
1341       HandleRequestFunc    handle_f;
1342       // for handling timed scheduling
1343       std::mutex  sched_ahead_mtx;
1344       std::condition_variable sched_ahead_cv;
1345       Time sched_ahead_when = TimeZero;
1346
1347 #ifdef PROFILE
1348     public:
1349       ProfileTimer<std::chrono::nanoseconds> add_request_timer;
1350       ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
1351     protected:
1352 #endif
1353
1354       // NB: threads declared last, so constructed last and destructed first
1355
1356       std::thread sched_ahead_thd;
1357
1358     public:
1359
1360       // push full constructor
1361       template<typename Rep, typename Per>
1362       PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1363                         CanHandleRequestFunc _can_handle_f,
1364                         HandleRequestFunc _handle_f,
1365                         std::chrono::duration<Rep,Per> _idle_age,
1366                         std::chrono::duration<Rep,Per> _erase_age,
1367                         std::chrono::duration<Rep,Per> _check_time,
1368                         bool _allow_limit_break = false) :
1369         super(_client_info_f,
1370               _idle_age, _erase_age, _check_time,
1371               _allow_limit_break)
1372       {
1373         can_handle_f = _can_handle_f;
1374         handle_f = _handle_f;
1375         sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
1376       }
1377
1378
1379       // push convenience constructor
1380       PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
1381                         CanHandleRequestFunc _can_handle_f,
1382                         HandleRequestFunc _handle_f,
1383                         bool _allow_limit_break = false) :
1384         PushPriorityQueue(_client_info_f,
1385                           _can_handle_f,
1386                           _handle_f,
1387                           std::chrono::minutes(10),
1388                           std::chrono::minutes(15),
1389                           std::chrono::minutes(6),
1390                           _allow_limit_break)
1391       {
1392         // empty
1393       }
1394
1395
1396       ~PushPriorityQueue() {
1397         this->finishing = true;
1398         sched_ahead_cv.notify_one();
1399         sched_ahead_thd.join();
1400       }
1401
1402     public:
1403
1404       inline void add_request(R&& request,
1405                               const C& client_id,
1406                               const ReqParams& req_params,
1407                               double addl_cost = 0.0) {
1408         add_request(typename super::RequestRef(new R(std::move(request))),
1409                     client_id,
1410                     req_params,
1411                     get_time(),
1412                     addl_cost);
1413       }
1414
1415
1416       inline void add_request(typename super::RequestRef&& request,
1417                               const C& client_id,
1418                               const ReqParams& req_params,
1419                               double addl_cost = 0.0) {
1420         add_request(request, req_params, client_id, get_time(), addl_cost);
1421       }
1422
1423
1424       inline void add_request_time(const R& request,
1425                                    const C& client_id,
1426                                    const ReqParams& req_params,
1427                                    const Time time,
1428                                    double addl_cost = 0.0) {
1429         add_request(typename super::RequestRef(new R(request)),
1430                     client_id,
1431                     req_params,
1432                     time,
1433                     addl_cost);
1434       }
1435
1436
1437       void add_request(typename super::RequestRef&& request,
1438                        const C& client_id,
1439                        const ReqParams& req_params,
1440                        const Time time,
1441                        double addl_cost = 0.0) {
1442         typename super::DataGuard g(this->data_mtx);
1443 #ifdef PROFILE
1444         add_request_timer.start();
1445 #endif
1446         super::do_add_request(std::move(request),
1447                               client_id,
1448                               req_params,
1449                               time,
1450                               addl_cost);
1451         schedule_request();
1452 #ifdef PROFILE
1453         add_request_timer.stop();
1454 #endif
1455       }
1456
1457
1458       void request_completed() {
1459         typename super::DataGuard g(this->data_mtx);
1460 #ifdef PROFILE
1461         request_complete_timer.start();
1462 #endif
1463         schedule_request();
1464 #ifdef PROFILE
1465         request_complete_timer.stop();
1466 #endif
1467       }
1468
1469     protected:
1470
1471       // data_mtx should be held when called; furthermore, the heap
1472       // should not be empty and the top element of the heap should
1473       // not be already handled
1474       //
1475       // NOTE: the use of "super::ClientRec" in either the template
1476       // construct or as a parameter to submit_top_request generated
1477       // a compiler error in g++ 4.8.4, when ClientRec was
1478       // "protected" rather than "public". By g++ 6.3.1 this was not
1479       // an issue. But for backwards compatibility
1480       // PriorityQueueBase::ClientRec is public.
1481       template<typename C1,
1482                IndIntruHeapData super::ClientRec::*C2,
1483                typename C3,
1484                uint B4>
1485       C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
1486                            PhaseType phase) {
1487         C client_result;
1488         super::pop_process_request(heap,
1489                                    [this, phase, &client_result]
1490                                    (const C& client,
1491                                     typename super::RequestRef& request) {
1492                                      client_result = client;
1493                                      handle_f(client, std::move(request), phase);
1494                                    });
1495         return client_result;
1496       }
1497
1498
1499       // data_mtx should be held when called
1500       void submit_request(typename super::HeapId heap_id) {
1501         C client;
1502         switch(heap_id) {
1503         case super::HeapId::reservation:
1504           // don't need to note client
1505           (void) submit_top_request(this->resv_heap, PhaseType::reservation);
1506           // unlike the other two cases, we do not reduce reservation
1507           // tags here
1508           ++this->reserv_sched_count;
1509           break;
1510         case super::HeapId::ready:
1511           client = submit_top_request(this->ready_heap, PhaseType::priority);
1512           super::reduce_reservation_tags(client);
1513           ++this->prop_sched_count;
1514           break;
1515         default:
1516           assert(false);
1517         }
1518       } // submit_request
1519
1520
1521       // data_mtx should be held when called; unfortunately this
1522       // function has to be repeated in both push & pull
1523       // specializations
1524       typename super::NextReq next_request() {
1525         return next_request(get_time());
1526       }
1527
1528
1529       // data_mtx should be held when called; overrides member
1530       // function in base class to add check for whether a request can
1531       // be pushed to the server
1532       typename super::NextReq next_request(Time now) {
1533         if (!can_handle_f()) {
1534           typename super::NextReq result;
1535           result.type = super::NextReqType::none;
1536           return result;
1537         } else {
1538           return super::do_next_request(now);
1539         }
1540       } // next_request
1541
1542
1543       // data_mtx should be held when called
1544       void schedule_request() {
1545         typename super::NextReq next_req = next_request();
1546         switch (next_req.type) {
1547         case super::NextReqType::none:
1548           return;
1549         case super::NextReqType::future:
1550           sched_at(next_req.when_ready);
1551           break;
1552         case super::NextReqType::returning:
1553           submit_request(next_req.heap_id);
1554           break;
1555         default:
1556           assert(false);
1557         }
1558       }
1559
1560
1561       // this is the thread that handles running schedule_request at
1562       // future times when nothing can be scheduled immediately
1563       void run_sched_ahead() {
1564         std::unique_lock<std::mutex> l(sched_ahead_mtx);
1565
1566         while (!this->finishing) {
1567           if (TimeZero == sched_ahead_when) {
1568             sched_ahead_cv.wait(l);
1569           } else {
1570             Time now;
1571             while (!this->finishing && (now = get_time()) < sched_ahead_when) {
1572               long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
1573               auto microseconds = std::chrono::microseconds(microseconds_l);
1574               sched_ahead_cv.wait_for(l, microseconds);
1575             }
1576             sched_ahead_when = TimeZero;
1577             if (this->finishing) return;
1578
1579             l.unlock();
1580             if (!this->finishing) {
1581               typename super::DataGuard g(this->data_mtx);
1582               schedule_request();
1583             }
1584             l.lock();
1585           }
1586         }
1587       }
1588
1589
1590       void sched_at(Time when) {
1591         std::lock_guard<std::mutex> l(sched_ahead_mtx);
1592         if (this->finishing) return;
1593         if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
1594           sched_ahead_when = when;
1595           sched_ahead_cv.notify_one();
1596         }
1597       }
1598     }; // class PushPriorityQueue
1599
1600   } // namespace dmclock
1601 } // namespace crimson