remove ceph code
[stor4nfv.git] / src / ceph / src / dmclock / src / dmclock_server.h
diff --git a/src/ceph/src/dmclock/src/dmclock_server.h b/src/ceph/src/dmclock/src/dmclock_server.h
deleted file mode 100644 (file)
index 2c9940d..0000000
+++ /dev/null
@@ -1,1601 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-/*
- * Copyright (C) 2017 Red Hat Inc.
- */
-
-
-#pragma once
-
-/* COMPILATION OPTIONS
- *
- * By default we include an optimization over the originally published
- * dmclock algorithm using not the values of rho and delta that were
- * sent in with a request but instead the most recent rho and delta
- * values from the requests's client. To restore the algorithm's
- * original behavior, define DO_NOT_DELAY_TAG_CALC (i.e., compiler
- * argument -DDO_NOT_DELAY_TAG_CALC).
- *
- * The prop_heap does not seem to be necessary. The only thing it
- * would help with is quickly finding the mininum proportion/prioity
- * when an idle client became active. To have the code maintain the
- * proportional heap, define USE_PROP_HEAP (i.e., compiler argument
- * -DUSE_PROP_HEAP).
- */
-
-#include <assert.h>
-
-#include <cmath>
-#include <memory>
-#include <map>
-#include <deque>
-#include <queue>
-#include <atomic>
-#include <mutex>
-#include <condition_variable>
-#include <thread>
-#include <iostream>
-#include <sstream>
-#include <limits>
-
-#include <boost/variant.hpp>
-
-#include "indirect_intrusive_heap.h"
-#include "run_every.h"
-#include "dmclock_util.h"
-#include "dmclock_recs.h"
-
-#ifdef PROFILE
-#include "profile.h"
-#endif
-
-
-namespace crimson {
-
-  namespace dmclock {
-
-    namespace c = crimson;
-
-    constexpr double max_tag = std::numeric_limits<double>::is_iec559 ?
-      std::numeric_limits<double>::infinity() :
-      std::numeric_limits<double>::max();
-    constexpr double min_tag = std::numeric_limits<double>::is_iec559 ?
-      -std::numeric_limits<double>::infinity() :
-      std::numeric_limits<double>::lowest();
-    constexpr uint tag_modulo = 1000000;
-
-    struct ClientInfo {
-      const double reservation;  // minimum
-      const double weight;       // proportional
-      const double limit;        // maximum
-
-      // multiplicative inverses of above, which we use in calculations
-      // and don't want to recalculate repeatedly
-      const double reservation_inv;
-      const double weight_inv;
-      const double limit_inv;
-
-      // order parameters -- min, "normal", max
-      ClientInfo(double _reservation, double _weight, double _limit) :
-       reservation(_reservation),
-       weight(_weight),
-       limit(_limit),
-       reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
-       weight_inv(     0.0 == weight      ? 0.0 : 1.0 / weight),
-       limit_inv(      0.0 == limit       ? 0.0 : 1.0 / limit)
-      {
-       // empty
-      }
-
-
-      friend std::ostream& operator<<(std::ostream& out,
-                                     const ClientInfo& client) {
-       out <<
-         "{ ClientInfo:: r:" << client.reservation <<
-         " w:" << std::fixed << client.weight <<
-         " l:" << std::fixed << client.limit <<
-         " 1/r:" << std::fixed << client.reservation_inv <<
-         " 1/w:" << std::fixed << client.weight_inv <<
-         " 1/l:" << std::fixed << client.limit_inv <<
-         " }";
-       return out;
-      }
-    }; // class ClientInfo
-
-
-    struct RequestTag {
-      double reservation;
-      double proportion;
-      double limit;
-      bool   ready; // true when within limit
-#ifndef DO_NOT_DELAY_TAG_CALC
-      Time   arrival;
-#endif
-
-      RequestTag(const RequestTag& prev_tag,
-                const ClientInfo& client,
-                const uint32_t delta,
-                const uint32_t rho,
-                const Time time,
-                const double cost = 0.0) :
-       reservation(cost + tag_calc(time,
-                                   prev_tag.reservation,
-                                   client.reservation_inv,
-                                   rho,
-                                   true)),
-       proportion(tag_calc(time,
-                           prev_tag.proportion,
-                           client.weight_inv,
-                           delta,
-                           true)),
-       limit(tag_calc(time,
-                      prev_tag.limit,
-                      client.limit_inv,
-                      delta,
-                      false)),
-       ready(false)
-#ifndef DO_NOT_DELAY_TAG_CALC
-       , arrival(time)
-#endif
-      {
-       assert(reservation < max_tag || proportion < max_tag);
-      }
-
-      RequestTag(const RequestTag& prev_tag,
-                const ClientInfo& client,
-                const ReqParams req_params,
-                const Time time,
-                const double cost = 0.0) :
-       RequestTag(prev_tag, client, req_params.delta, req_params.rho, time, cost)
-      { /* empty */ }
-
-      RequestTag(double _res, double _prop, double _lim, const Time _arrival) :
-       reservation(_res),
-       proportion(_prop),
-       limit(_lim),
-       ready(false)
-#ifndef DO_NOT_DELAY_TAG_CALC
-       , arrival(_arrival)
-#endif
-      {
-       assert(reservation < max_tag || proportion < max_tag);
-      }
-
-      RequestTag(const RequestTag& other) :
-       reservation(other.reservation),
-       proportion(other.proportion),
-       limit(other.limit),
-       ready(other.ready)
-#ifndef DO_NOT_DELAY_TAG_CALC
-       , arrival(other.arrival)
-#endif
-      {
-       // empty
-      }
-
-      static std::string format_tag_change(double before, double after) {
-       if (before == after) {
-         return std::string("same");
-       } else {
-         std::stringstream ss;
-         ss << format_tag(before) << "=>" << format_tag(after);
-         return ss.str();
-       }
-      }
-
-      static std::string format_tag(double value) {
-       if (max_tag == value) {
-         return std::string("max");
-       } else if (min_tag == value) {
-         return std::string("min");
-       } else {
-         return format_time(value, tag_modulo);
-       }
-      }
-
-    private:
-
-      static double tag_calc(const Time time,
-                            double prev,
-                            double increment,
-                            uint32_t dist_req_val,
-                            bool extreme_is_high) {
-       if (0.0 == increment) {
-         return extreme_is_high ? max_tag : min_tag;
-       } else {
-         if (0 != dist_req_val) {
-           increment *= dist_req_val;
-         }
-         return std::max(time, prev + increment);
-       }
-      }
-
-      friend std::ostream& operator<<(std::ostream& out,
-                                     const RequestTag& tag) {
-       out <<
-         "{ RequestTag:: ready:" << (tag.ready ? "true" : "false") <<
-         " r:" << format_tag(tag.reservation) <<
-         " p:" << format_tag(tag.proportion) <<
-         " l:" << format_tag(tag.limit) <<
-#if 0 // try to resolve this to make sure Time is operator<<'able.
-#ifndef DO_NOT_DELAY_TAG_CALC
-         " arrival:" << tag.arrival <<
-#endif
-#endif
-         " }";
-       return out;
-      }
-    }; // class RequestTag
-
-
-    // C is client identifier type, R is request type, B is heap
-    // branching factor
-    template<typename C, typename R, uint B>
-    class PriorityQueueBase {
-      // we don't want to include gtest.h just for FRIEND_TEST
-      friend class dmclock_server_client_idle_erase_Test;
-
-    public:
-
-      using RequestRef = std::unique_ptr<R>;
-
-    protected:
-
-      using TimePoint = decltype(std::chrono::steady_clock::now());
-      using Duration = std::chrono::milliseconds;
-      using MarkPoint = std::pair<TimePoint,Counter>;
-
-      enum class ReadyOption {ignore, lowers, raises};
-
-      // forward decl for friend decls
-      template<double RequestTag::*, ReadyOption, bool>
-      struct ClientCompare;
-
-      class ClientReq {
-       friend PriorityQueueBase;
-
-       RequestTag tag;
-       C          client_id;
-       RequestRef request;
-
-      public:
-
-       ClientReq(const RequestTag& _tag,
-                 const C&          _client_id,
-                 RequestRef&&      _request) :
-         tag(_tag),
-         client_id(_client_id),
-         request(std::move(_request))
-       {
-         // empty
-       }
-
-       friend std::ostream& operator<<(std::ostream& out, const ClientReq& c) {
-         out << "{ ClientReq:: tag:" << c.tag << " client:" <<
-           c.client_id << " }";
-         return out;
-       }
-      }; // class ClientReq
-
-    public:
-
-      // NOTE: ClientRec is in the "public" section for compatibility
-      // with g++ 4.8.4, which complains if it's not. By g++ 6.3.1
-      // ClientRec could be "protected" with no issue. [See comments
-      // associated with function submit_top_request.]
-      class ClientRec {
-       friend PriorityQueueBase<C,R,B>;
-
-       C                     client;
-       RequestTag            prev_tag;
-       std::deque<ClientReq> requests;
-
-       // amount added from the proportion tag as a result of
-       // an idle client becoming unidle
-       double                prop_delta = 0.0;
-
-       c::IndIntruHeapData   reserv_heap_data;
-       c::IndIntruHeapData   lim_heap_data;
-       c::IndIntruHeapData   ready_heap_data;
-#if USE_PROP_HEAP
-       c::IndIntruHeapData   prop_heap_data;
-#endif
-
-      public:
-
-       ClientInfo            info;
-       bool                  idle;
-       Counter               last_tick;
-       uint32_t              cur_rho;
-       uint32_t              cur_delta;
-
-       ClientRec(C _client,
-                 const ClientInfo& _info,
-                 Counter current_tick) :
-         client(_client),
-         prev_tag(0.0, 0.0, 0.0, TimeZero),
-         info(_info),
-         idle(true),
-         last_tick(current_tick),
-         cur_rho(1),
-         cur_delta(1)
-       {
-         // empty
-       }
-
-       inline const RequestTag& get_req_tag() const {
-         return prev_tag;
-       }
-
-       static inline void assign_unpinned_tag(double& lhs, const double rhs) {
-         if (rhs != max_tag && rhs != min_tag) {
-           lhs = rhs;
-         }
-       }
-
-       inline void update_req_tag(const RequestTag& _prev,
-                                  const Counter& _tick) {
-         assign_unpinned_tag(prev_tag.reservation, _prev.reservation);
-         assign_unpinned_tag(prev_tag.limit, _prev.limit);
-         assign_unpinned_tag(prev_tag.proportion, _prev.proportion);
-         last_tick = _tick;
-       }
-
-       inline void add_request(const RequestTag& tag,
-                               const C&          client_id,
-                               RequestRef&&      request) {
-         requests.emplace_back(ClientReq(tag, client_id, std::move(request)));
-       }
-
-       inline const ClientReq& next_request() const {
-         return requests.front();
-       }
-
-       inline ClientReq& next_request() {
-         return requests.front();
-       }
-
-       inline void pop_request() {
-         requests.pop_front();
-       }
-
-       inline bool has_request() const {
-         return !requests.empty();
-       }
-
-       inline size_t request_count() const {
-         return requests.size();
-       }
-
-       // NB: because a deque is the underlying structure, this
-       // operation might be expensive
-       bool remove_by_req_filter_fw(std::function<bool(R&&)> filter_accum) {
-         bool any_removed = false;
-         for (auto i = requests.begin();
-              i != requests.end();
-              /* no inc */) {
-           if (filter_accum(std::move(*i->request))) {
-             any_removed = true;
-             i = requests.erase(i);
-           } else {
-             ++i;
-           }
-         }
-         return any_removed;
-       }
-
-       // NB: because a deque is the underlying structure, this
-       // operation might be expensive
-       bool remove_by_req_filter_bw(std::function<bool(R&&)> filter_accum) {
-         bool any_removed = false;
-         for (auto i = requests.rbegin();
-              i != requests.rend();
-              /* no inc */) {
-           if (filter_accum(std::move(*i->request))) {
-             any_removed = true;
-             i = decltype(i){ requests.erase(std::next(i).base()) };
-           } else {
-             ++i;
-           }
-         }
-         return any_removed;
-       }
-
-       inline bool
-       remove_by_req_filter(std::function<bool(R&&)> filter_accum,
-                            bool visit_backwards) {
-         if (visit_backwards) {
-           return remove_by_req_filter_bw(filter_accum);
-         } else {
-           return remove_by_req_filter_fw(filter_accum);
-         }
-       }
-
-       friend std::ostream&
-       operator<<(std::ostream& out,
-                  const typename PriorityQueueBase<C,R,B>::ClientRec& e) {
-         out << "{ ClientRec::" <<
-           " client:" << e.client <<
-           " prev_tag:" << e.prev_tag <<
-           " req_count:" << e.requests.size() <<
-           " top_req:";
-         if (e.has_request()) {
-           out << e.next_request();
-         } else {
-           out << "none";
-         }
-         out << " }";
-
-         return out;
-       }
-      }; // class ClientRec
-
-      using ClientRecRef = std::shared_ptr<ClientRec>;
-
-      // when we try to get the next request, we'll be in one of three
-      // situations -- we'll have one to return, have one that can
-      // fire in the future, or not have any
-      enum class NextReqType { returning, future, none };
-
-      // specifies which queue next request will get popped from
-      enum class HeapId { reservation, ready };
-
-      // this is returned from next_req to tell the caller the situation
-      struct NextReq {
-       NextReqType type;
-       union {
-         HeapId    heap_id;
-         Time      when_ready;
-       };
-      };
-
-
-      // a function that can be called to look up client information
-      using ClientInfoFunc = std::function<ClientInfo(const C&)>;
-
-
-      bool empty() const {
-       DataGuard g(data_mtx);
-       return (resv_heap.empty() || ! resv_heap.top().has_request());
-      }
-
-
-      size_t client_count() const {
-       DataGuard g(data_mtx);
-       return resv_heap.size();
-      }
-
-
-      size_t request_count() const {
-       DataGuard g(data_mtx);
-       size_t total = 0;
-       for (auto i = resv_heap.cbegin(); i != resv_heap.cend(); ++i) {
-         total += i->request_count();
-       }
-       return total;
-      }
-
-
-      bool remove_by_req_filter(std::function<bool(R&&)> filter_accum,
-                               bool visit_backwards = false) {
-       bool any_removed = false;
-       DataGuard g(data_mtx);
-       for (auto i : client_map) {
-         bool modified =
-           i.second->remove_by_req_filter(filter_accum, visit_backwards);
-         if (modified) {
-           resv_heap.adjust(*i.second);
-           limit_heap.adjust(*i.second);
-           ready_heap.adjust(*i.second);
-#if USE_PROP_HEAP
-           prop_heap.adjust(*i.second);
-#endif
-           any_removed = true;
-         }
-       }
-       return any_removed;
-      }
-
-
-      // use as a default value when no accumulator is provide
-      static void request_sink(R&& req) {
-       // do nothing
-      }
-
-
-      void remove_by_client(const C& client,
-                           bool reverse = false,
-                           std::function<void (R&&)> accum = request_sink) {
-       DataGuard g(data_mtx);
-
-       auto i = client_map.find(client);
-
-       if (i == client_map.end()) return;
-
-       if (reverse) {
-         for (auto j = i->second->requests.rbegin();
-              j != i->second->requests.rend();
-              ++j) {
-           accum(std::move(*j->request));
-         }
-       } else {
-         for (auto j = i->second->requests.begin();
-              j != i->second->requests.end();
-              ++j) {
-           accum(std::move(*j->request));
-         }
-       }
-
-       i->second->requests.clear();
-
-       resv_heap.adjust(*i->second);
-       limit_heap.adjust(*i->second);
-       ready_heap.adjust(*i->second);
-#if USE_PROP_HEAP
-       prop_heap.adjust(*i->second);
-#endif
-      }
-
-
-      uint get_heap_branching_factor() const {
-       return B;
-      }
-
-
-      friend std::ostream& operator<<(std::ostream& out,
-                                     const PriorityQueueBase& q) {
-       std::lock_guard<decltype(q.data_mtx)> guard(q.data_mtx);
-
-       out << "{ PriorityQueue::";
-       for (const auto& c : q.client_map) {
-         out << "  { client:" << c.first << ", record:" << *c.second <<
-           " }";
-       }
-       if (!q.resv_heap.empty()) {
-         const auto& resv = q.resv_heap.top();
-         out << " { reservation_top:" << resv << " }";
-         const auto& ready = q.ready_heap.top();
-         out << " { ready_top:" << ready << " }";
-         const auto& limit = q.limit_heap.top();
-         out << " { limit_top:" << limit << " }";
-       } else {
-         out << " HEAPS-EMPTY";
-       }
-       out << " }";
-
-       return out;
-      }
-
-      // for debugging
-      void display_queues(std::ostream& out,
-                         bool show_res = true,
-                         bool show_lim = true,
-                         bool show_ready = true,
-                         bool show_prop = true) const {
-       auto filter = [](const ClientRec& e)->bool { return true; };
-       DataGuard g(data_mtx);
-       if (show_res) {
-         resv_heap.display_sorted(out << "RESER:", filter);
-       }
-       if (show_lim) {
-         limit_heap.display_sorted(out << "LIMIT:", filter);
-       }
-       if (show_ready) {
-         ready_heap.display_sorted(out << "READY:", filter);
-       }
-#if USE_PROP_HEAP
-       if (show_prop) {
-         prop_heap.display_sorted(out << "PROPO:", filter);
-       }
-#endif
-      } // display_queues
-
-
-    protected:
-
-      // The ClientCompare functor is essentially doing a precedes?
-      // operator, returning true if and only if the first parameter
-      // must precede the second parameter. If the second must precede
-      // the first, or if they are equivalent, false should be
-      // returned. The reason for this behavior is that it will be
-      // called to test if two items are out of order and if true is
-      // returned it will reverse the items. Therefore false is the
-      // default return when it doesn't matter to prevent unnecessary
-      // re-ordering.
-      //
-      // The template is supporting variations in sorting based on the
-      // heap in question and allowing these variations to be handled
-      // at compile-time.
-      //
-      // tag_field determines which tag is being used for comparison
-      //
-      // ready_opt determines how the ready flag influences the sort
-      //
-      // use_prop_delta determines whether the proportional delta is
-      // added in for comparison
-      template<double RequestTag::*tag_field,
-              ReadyOption ready_opt,
-              bool use_prop_delta>
-      struct ClientCompare {
-       bool operator()(const ClientRec& n1, const ClientRec& n2) const {
-         if (n1.has_request()) {
-           if (n2.has_request()) {
-             const auto& t1 = n1.next_request().tag;
-             const auto& t2 = n2.next_request().tag;
-             if (ReadyOption::ignore == ready_opt || t1.ready == t2.ready) {
-               // if we don't care about ready or the ready values are the same
-               if (use_prop_delta) {
-                 return (t1.*tag_field + n1.prop_delta) <
-                   (t2.*tag_field + n2.prop_delta);
-               } else {
-                 return t1.*tag_field < t2.*tag_field;
-               }
-             } else if (ReadyOption::raises == ready_opt) {
-               // use_ready == true && the ready fields are different
-               return t1.ready;
-             } else {
-               return t2.ready;
-             }
-           } else {
-             // n1 has request but n2 does not
-             return true;
-           }
-         } else if (n2.has_request()) {
-           // n2 has request but n1 does not
-           return false;
-         } else {
-           // both have none; keep stable w false
-           return false;
-         }
-       }
-      };
-
-      ClientInfoFunc       client_info_f;
-
-      mutable std::mutex data_mtx;
-      using DataGuard = std::lock_guard<decltype(data_mtx)>;
-
-      // stable mapping between client ids and client queues
-      std::map<C,ClientRecRef> client_map;
-
-      c::IndIntruHeap<ClientRecRef,
-                     ClientRec,
-                     &ClientRec::reserv_heap_data,
-                     ClientCompare<&RequestTag::reservation,
-                                   ReadyOption::ignore,
-                                   false>,
-                     B> resv_heap;
-#if USE_PROP_HEAP
-      c::IndIntruHeap<ClientRecRef,
-                     ClientRec,
-                     &ClientRec::prop_heap_data,
-                     ClientCompare<&RequestTag::proportion,
-                                   ReadyOption::ignore,
-                                   true>,
-                     B> prop_heap;
-#endif
-      c::IndIntruHeap<ClientRecRef,
-                     ClientRec,
-                     &ClientRec::lim_heap_data,
-                     ClientCompare<&RequestTag::limit,
-                                   ReadyOption::lowers,
-                                   false>,
-                     B> limit_heap;
-      c::IndIntruHeap<ClientRecRef,
-                     ClientRec,
-                     &ClientRec::ready_heap_data,
-                     ClientCompare<&RequestTag::proportion,
-                                   ReadyOption::raises,
-                                   true>,
-                     B> ready_heap;
-
-      // if all reservations are met and all other requestes are under
-      // limit, this will allow the request next in terms of
-      // proportion to still get issued
-      bool             allow_limit_break;
-
-      std::atomic_bool finishing;
-
-      // every request creates a tick
-      Counter tick = 0;
-
-      // performance data collection
-      size_t reserv_sched_count = 0;
-      size_t prop_sched_count = 0;
-      size_t limit_break_sched_count = 0;
-
-      Duration                  idle_age;
-      Duration                  erase_age;
-      Duration                  check_time;
-      std::deque<MarkPoint>     clean_mark_points;
-
-      // NB: All threads declared at end, so they're destructed first!
-
-      std::unique_ptr<RunEvery> cleaning_job;
-
-
-      // COMMON constructor that others feed into; we can accept three
-      // different variations of durations
-      template<typename Rep, typename Per>
-      PriorityQueueBase(ClientInfoFunc _client_info_f,
-                       std::chrono::duration<Rep,Per> _idle_age,
-                       std::chrono::duration<Rep,Per> _erase_age,
-                       std::chrono::duration<Rep,Per> _check_time,
-                       bool _allow_limit_break) :
-       client_info_f(_client_info_f),
-       allow_limit_break(_allow_limit_break),
-       finishing(false),
-       idle_age(std::chrono::duration_cast<Duration>(_idle_age)),
-       erase_age(std::chrono::duration_cast<Duration>(_erase_age)),
-       check_time(std::chrono::duration_cast<Duration>(_check_time))
-      {
-       assert(_erase_age >= _idle_age);
-       assert(_check_time < _idle_age);
-       cleaning_job =
-         std::unique_ptr<RunEvery>(
-           new RunEvery(check_time,
-                        std::bind(&PriorityQueueBase::do_clean, this)));
-      }
-
-
-      ~PriorityQueueBase() {
-       finishing = true;
-      }
-
-
-      // data_mtx must be held by caller
-      void do_add_request(RequestRef&& request,
-                         const C& client_id,
-                         const ReqParams& req_params,
-                         const Time time,
-                         const double cost = 0.0) {
-       ++tick;
-
-       // this pointer will help us create a reference to a shared
-       // pointer, no matter which of two codepaths we take
-       ClientRec* temp_client;
-
-       auto client_it = client_map.find(client_id);
-       if (client_map.end() != client_it) {
-         temp_client = &(*client_it->second); // address of obj of shared_ptr
-       } else {
-         ClientInfo info = client_info_f(client_id);
-         ClientRecRef client_rec =
-           std::make_shared<ClientRec>(client_id, info, tick);
-         resv_heap.push(client_rec);
-#if USE_PROP_HEAP
-         prop_heap.push(client_rec);
-#endif
-         limit_heap.push(client_rec);
-         ready_heap.push(client_rec);
-         client_map[client_id] = client_rec;
-         temp_client = &(*client_rec); // address of obj of shared_ptr
-       }
-
-       // for convenience, we'll create a reference to the shared pointer
-       ClientRec& client = *temp_client;
-
-       if (client.idle) {
-         // We need to do an adjustment so that idle clients compete
-         // fairly on proportional tags since those tags may have
-         // drifted from real-time. Either use the lowest existing
-         // proportion tag -- O(1) -- or the client with the lowest
-         // previous proportion tag -- O(n) where n = # clients.
-         //
-         // So we don't have to maintain a propotional queue that
-         // keeps the minimum on proportional tag alone (we're
-         // instead using a ready queue), we'll have to check each
-         // client.
-         //
-         // The alternative would be to maintain a proportional queue
-         // (define USE_PROP_TAG) and do an O(1) operation here.
-
-         // Was unable to confirm whether equality testing on
-         // std::numeric_limits<double>::max() is guaranteed, so
-         // we'll use a compile-time calculated trigger that is one
-         // third the max, which should be much larger than any
-         // expected organic value.
-         constexpr double lowest_prop_tag_trigger =
-           std::numeric_limits<double>::max() / 3.0;
-
-         double lowest_prop_tag = std::numeric_limits<double>::max();
-         for (auto const &c : client_map) {
-           // don't use ourselves (or anything else that might be
-           // listed as idle) since we're now in the map
-           if (!c.second->idle) {
-             double p;
-             // use either lowest proportion tag or previous proportion tag
-             if (c.second->has_request()) {
-               p = c.second->next_request().tag.proportion +
-                 c.second->prop_delta;
-             } else {
-               p = c.second->get_req_tag().proportion + c.second->prop_delta;
-             }
-
-             if (p < lowest_prop_tag) {
-               lowest_prop_tag = p;
-             }
-           }
-         }
-
-         // if this conditional does not fire, it
-         if (lowest_prop_tag < lowest_prop_tag_trigger) {
-           client.prop_delta = lowest_prop_tag - time;
-         }
-         client.idle = false;
-       } // if this client was idle
-
-#ifndef DO_NOT_DELAY_TAG_CALC
-       RequestTag tag(0, 0, 0, time);
-
-       if (!client.has_request()) {
-         tag = RequestTag(client.get_req_tag(),
-                          client.info,
-                          req_params,
-                          time,
-                          cost);
-
-         // copy tag to previous tag for client
-         client.update_req_tag(tag, tick);
-       }
-#else
-       RequestTag tag(client.get_req_tag(), client.info, req_params, time, cost);
-       // copy tag to previous tag for client
-       client.update_req_tag(tag, tick);
-#endif
-
-       client.add_request(tag, client.client, std::move(request));
-       if (1 == client.requests.size()) {
-         // NB: can the following 4 calls to adjust be changed
-         // promote? Can adding a request ever demote a client in the
-         // heaps?
-         resv_heap.adjust(client);
-         limit_heap.adjust(client);
-         ready_heap.adjust(client);
-#if USE_PROP_HEAP
-         prop_heap.adjust(client);
-#endif
-       }
-
-       client.cur_rho = req_params.rho;
-       client.cur_delta = req_params.delta;
-
-       resv_heap.adjust(client);
-       limit_heap.adjust(client);
-       ready_heap.adjust(client);
-#if USE_PROP_HEAP
-       prop_heap.adjust(client);
-#endif
-      } // add_request
-
-
-      // data_mtx should be held when called; top of heap should have
-      // a ready request
-      template<typename C1, IndIntruHeapData ClientRec::*C2, typename C3>
-      void pop_process_request(IndIntruHeap<C1, ClientRec, C2, C3, B>& heap,
-                              std::function<void(const C& client,
-                                                 RequestRef& request)> process) {
-       // gain access to data
-       ClientRec& top = heap.top();
-
-       RequestRef request = std::move(top.next_request().request);
-#ifndef DO_NOT_DELAY_TAG_CALC
-       RequestTag tag = top.next_request().tag;
-#endif
-
-       // pop request and adjust heaps
-       top.pop_request();
-
-#ifndef DO_NOT_DELAY_TAG_CALC
-       if (top.has_request()) {
-         ClientReq& next_first = top.next_request();
-         next_first.tag = RequestTag(tag, top.info,
-                                     top.cur_delta, top.cur_rho,
-                                     next_first.tag.arrival);
-
-         // copy tag to previous tag for client
-         top.update_req_tag(next_first.tag, tick);
-       }
-#endif
-
-       resv_heap.demote(top);
-       limit_heap.adjust(top);
-#if USE_PROP_HEAP
-       prop_heap.demote(top);
-#endif
-       ready_heap.demote(top);
-
-       // process
-       process(top.client, request);
-      } // pop_process_request
-
-
-      // data_mtx should be held when called
-      void reduce_reservation_tags(ClientRec& client) {
-       for (auto& r : client.requests) {
-         r.tag.reservation -= client.info.reservation_inv;
-
-#ifndef DO_NOT_DELAY_TAG_CALC
-         // reduce only for front tag. because next tags' value are invalid
-         break;
-#endif
-       }
-       // don't forget to update previous tag
-       client.prev_tag.reservation -= client.info.reservation_inv;
-       resv_heap.promote(client);
-      }
-
-
-      // data_mtx should be held when called
-      void reduce_reservation_tags(const C& client_id) {
-       auto client_it = client_map.find(client_id);
-
-       // means the client was cleaned from map; should never happen
-       // as long as cleaning times are long enough
-       assert(client_map.end() != client_it);
-       reduce_reservation_tags(*client_it->second);
-      }
-
-
-      // data_mtx should be held when called
-      NextReq do_next_request(Time now) {
-       NextReq result;
-
-       // if reservation queue is empty, all are empty (i.e., no active clients)
-       if(resv_heap.empty()) {
-         result.type = NextReqType::none;
-         return result;
-       }
-
-       // try constraint (reservation) based scheduling
-
-       auto& reserv = resv_heap.top();
-       if (reserv.has_request() &&
-           reserv.next_request().tag.reservation <= now) {
-         result.type = NextReqType::returning;
-         result.heap_id = HeapId::reservation;
-         return result;
-       }
-
-       // no existing reservations before now, so try weight-based
-       // scheduling
-
-       // all items that are within limit are eligible based on
-       // priority
-       auto limits = &limit_heap.top();
-       while (limits->has_request() &&
-              !limits->next_request().tag.ready &&
-              limits->next_request().tag.limit <= now) {
-         limits->next_request().tag.ready = true;
-         ready_heap.promote(*limits);
-         limit_heap.demote(*limits);
-
-         limits = &limit_heap.top();
-       }
-
-       auto& readys = ready_heap.top();
-       if (readys.has_request() &&
-           readys.next_request().tag.ready &&
-           readys.next_request().tag.proportion < max_tag) {
-         result.type = NextReqType::returning;
-         result.heap_id = HeapId::ready;
-         return result;
-       }
-
-       // if nothing is schedulable by reservation or
-       // proportion/weight, and if we allow limit break, try to
-       // schedule something with the lowest proportion tag or
-       // alternatively lowest reservation tag.
-       if (allow_limit_break) {
-         if (readys.has_request() &&
-             readys.next_request().tag.proportion < max_tag) {
-           result.type = NextReqType::returning;
-           result.heap_id = HeapId::ready;
-           return result;
-         } else if (reserv.has_request() &&
-                    reserv.next_request().tag.reservation < max_tag) {
-           result.type = NextReqType::returning;
-           result.heap_id = HeapId::reservation;
-           return result;
-         }
-       }
-
-       // nothing scheduled; make sure we re-run when next
-       // reservation item or next limited item comes up
-
-       Time next_call = TimeMax;
-       if (resv_heap.top().has_request()) {
-         next_call =
-           min_not_0_time(next_call,
-                          resv_heap.top().next_request().tag.reservation);
-       }
-       if (limit_heap.top().has_request()) {
-         const auto& next = limit_heap.top().next_request();
-         assert(!next.tag.ready || max_tag == next.tag.proportion);
-         next_call = min_not_0_time(next_call, next.tag.limit);
-       }
-       if (next_call < TimeMax) {
-         result.type = NextReqType::future;
-         result.when_ready = next_call;
-         return result;
-       } else {
-         result.type = NextReqType::none;
-         return result;
-       }
-      } // do_next_request
-
-
-      // if possible is not zero and less than current then return it;
-      // otherwise return current; the idea is we're trying to find
-      // the minimal time but ignoring zero
-      static inline const Time& min_not_0_time(const Time& current,
-                                              const Time& possible) {
-       return TimeZero == possible ? current : std::min(current, possible);
-      }
-
-
-      /*
-       * This is being called regularly by RunEvery. Every time it's
-       * called it notes the time and delta counter (mark point) in a
-       * deque. It also looks at the deque to find the most recent
-       * mark point that is older than clean_age. It then walks the
-       * map and delete all server entries that were last used before
-       * that mark point.
-       */
-      void do_clean() {
-       TimePoint now = std::chrono::steady_clock::now();
-       DataGuard g(data_mtx);
-       clean_mark_points.emplace_back(MarkPoint(now, tick));
-
-       // first erase the super-old client records
-
-       Counter erase_point = 0;
-       auto point = clean_mark_points.front();
-       while (point.first <= now - erase_age) {
-         erase_point = point.second;
-         clean_mark_points.pop_front();
-         point = clean_mark_points.front();
-       }
-
-       Counter idle_point = 0;
-       for (auto i : clean_mark_points) {
-         if (i.first <= now - idle_age) {
-           idle_point = i.second;
-         } else {
-           break;
-         }
-       }
-
-       if (erase_point > 0 || idle_point > 0) {
-         for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
-           auto i2 = i++;
-           if (erase_point && i2->second->last_tick <= erase_point) {
-             delete_from_heaps(i2->second);
-             client_map.erase(i2);
-           } else if (idle_point && i2->second->last_tick <= idle_point) {
-             i2->second->idle = true;
-           }
-         } // for
-       } // if
-      } // do_clean
-
-
-      // data_mtx must be held by caller
-      template<IndIntruHeapData ClientRec::*C1,typename C2>
-      void delete_from_heap(ClientRecRef& client,
-                           c::IndIntruHeap<ClientRecRef,ClientRec,C1,C2,B>& heap) {
-       auto i = heap.rfind(client);
-       heap.remove(i);
-      }
-
-
-      // data_mtx must be held by caller
-      void delete_from_heaps(ClientRecRef& client) {
-       delete_from_heap(client, resv_heap);
-#if USE_PROP_HEAP
-       delete_from_heap(client, prop_heap);
-#endif
-       delete_from_heap(client, limit_heap);
-       delete_from_heap(client, ready_heap);
-      }
-    }; // class PriorityQueueBase
-
-
-    template<typename C, typename R, uint B=2>
-    class PullPriorityQueue : public PriorityQueueBase<C,R,B> {
-      using super = PriorityQueueBase<C,R,B>;
-
-    public:
-
-      // When a request is pulled, this is the return type.
-      struct PullReq {
-       struct Retn {
-         C                           client;
-         typename super::RequestRef  request;
-         PhaseType                   phase;
-       };
-
-       typename super::NextReqType   type;
-       boost::variant<Retn,Time>     data;
-
-       bool is_none() const { return type == super::NextReqType::none; }
-
-       bool is_retn() const { return type == super::NextReqType::returning; }
-       Retn& get_retn() {
-         return boost::get<Retn>(data);
-       }
-
-       bool is_future() const { return type == super::NextReqType::future; }
-       Time getTime() const { return boost::get<Time>(data); }
-      };
-
-
-#ifdef PROFILE
-      ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
-      ProfileTimer<std::chrono::nanoseconds> add_request_timer;
-#endif
-
-      template<typename Rep, typename Per>
-      PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
-                       std::chrono::duration<Rep,Per> _idle_age,
-                       std::chrono::duration<Rep,Per> _erase_age,
-                       std::chrono::duration<Rep,Per> _check_time,
-                       bool _allow_limit_break = false) :
-       super(_client_info_f,
-             _idle_age, _erase_age, _check_time,
-             _allow_limit_break)
-      {
-       // empty
-      }
-
-
-      // pull convenience constructor
-      PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
-                       bool _allow_limit_break = false) :
-       PullPriorityQueue(_client_info_f,
-                         std::chrono::minutes(10),
-                         std::chrono::minutes(15),
-                         std::chrono::minutes(6),
-                         _allow_limit_break)
-      {
-       // empty
-      }
-
-
-      inline void add_request(R&& request,
-                             const C& client_id,
-                             const ReqParams& req_params,
-                             double addl_cost = 0.0) {
-       add_request(typename super::RequestRef(new R(std::move(request))),
-                   client_id,
-                   req_params,
-                   get_time(),
-                   addl_cost);
-      }
-
-
-      inline void add_request(R&& request,
-                             const C& client_id,
-                             double addl_cost = 0.0) {
-       static const ReqParams null_req_params;
-       add_request(typename super::RequestRef(new R(std::move(request))),
-                   client_id,
-                   null_req_params,
-                   get_time(),
-                   addl_cost);
-      }
-
-
-
-      inline void add_request_time(R&& request,
-                                  const C& client_id,
-                                  const ReqParams& req_params,
-                                  const Time time,
-                                  double addl_cost = 0.0) {
-       add_request(typename super::RequestRef(new R(std::move(request))),
-                   client_id,
-                   req_params,
-                   time,
-                   addl_cost);
-      }
-
-
-      inline void add_request(typename super::RequestRef&& request,
-                             const C& client_id,
-                             const ReqParams& req_params,
-                             double addl_cost = 0.0) {
-       add_request(request, req_params, client_id, get_time(), addl_cost);
-      }
-
-
-      inline void add_request(typename super::RequestRef&& request,
-                             const C& client_id,
-                             double addl_cost = 0.0) {
-       static const ReqParams null_req_params;
-       add_request(request, null_req_params, client_id, get_time(), addl_cost);
-      }
-
-
-      // this does the work; the versions above provide alternate interfaces
-      void add_request(typename super::RequestRef&& request,
-                      const C&                     client_id,
-                      const ReqParams&             req_params,
-                      const Time                   time,
-                      double                       addl_cost = 0.0) {
-       typename super::DataGuard g(this->data_mtx);
-#ifdef PROFILE
-       add_request_timer.start();
-#endif
-       super::do_add_request(std::move(request),
-                             client_id,
-                             req_params,
-                             time,
-                             addl_cost);
-       // no call to schedule_request for pull version
-#ifdef PROFILE
-       add_request_timer.stop();
-#endif
-      }
-
-
-      inline PullReq pull_request() {
-       return pull_request(get_time());
-      }
-
-
-      PullReq pull_request(Time now) {
-       PullReq result;
-       typename super::DataGuard g(this->data_mtx);
-#ifdef PROFILE
-       pull_request_timer.start();
-#endif
-
-       typename super::NextReq next = super::do_next_request(now);
-       result.type = next.type;
-       switch(next.type) {
-       case super::NextReqType::none:
-         return result;
-       case super::NextReqType::future:
-         result.data = next.when_ready;
-         return result;
-       case super::NextReqType::returning:
-         // to avoid nesting, break out and let code below handle this case
-         break;
-       default:
-         assert(false);
-       }
-
-       // we'll only get here if we're returning an entry
-
-       auto process_f =
-         [&] (PullReq& pull_result, PhaseType phase) ->
-         std::function<void(const C&,
-                            typename super::RequestRef&)> {
-         return [&pull_result, phase](const C& client,
-                                      typename super::RequestRef& request) {
-           pull_result.data =
-           typename PullReq::Retn{client, std::move(request), phase};
-         };
-       };
-
-       switch(next.heap_id) {
-       case super::HeapId::reservation:
-         super::pop_process_request(this->resv_heap,
-                                    process_f(result, PhaseType::reservation));
-         ++this->reserv_sched_count;
-         break;
-       case super::HeapId::ready:
-         super::pop_process_request(this->ready_heap,
-                                    process_f(result, PhaseType::priority));
-         { // need to use retn temporarily
-           auto& retn = boost::get<typename PullReq::Retn>(result.data);
-           super::reduce_reservation_tags(retn.client);
-         }
-         ++this->prop_sched_count;
-         break;
-       default:
-         assert(false);
-       }
-
-#ifdef PROFILE
-       pull_request_timer.stop();
-#endif
-       return result;
-      } // pull_request
-
-
-    protected:
-
-
-      // data_mtx should be held when called; unfortunately this
-      // function has to be repeated in both push & pull
-      // specializations
-      typename super::NextReq next_request() {
-       return next_request(get_time());
-      }
-    }; // class PullPriorityQueue
-
-
-    // PUSH version
-    template<typename C, typename R, uint B=2>
-    class PushPriorityQueue : public PriorityQueueBase<C,R,B> {
-
-    protected:
-
-      using super = PriorityQueueBase<C,R,B>;
-
-    public:
-
-      // a function to see whether the server can handle another request
-      using CanHandleRequestFunc = std::function<bool(void)>;
-
-      // a function to submit a request to the server; the second
-      // parameter is a callback when it's completed
-      using HandleRequestFunc =
-       std::function<void(const C&,typename super::RequestRef,PhaseType)>;
-
-    protected:
-
-      CanHandleRequestFunc can_handle_f;
-      HandleRequestFunc    handle_f;
-      // for handling timed scheduling
-      std::mutex  sched_ahead_mtx;
-      std::condition_variable sched_ahead_cv;
-      Time sched_ahead_when = TimeZero;
-
-#ifdef PROFILE
-    public:
-      ProfileTimer<std::chrono::nanoseconds> add_request_timer;
-      ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
-    protected:
-#endif
-
-      // NB: threads declared last, so constructed last and destructed first
-
-      std::thread sched_ahead_thd;
-
-    public:
-
-      // push full constructor
-      template<typename Rep, typename Per>
-      PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
-                       CanHandleRequestFunc _can_handle_f,
-                       HandleRequestFunc _handle_f,
-                       std::chrono::duration<Rep,Per> _idle_age,
-                       std::chrono::duration<Rep,Per> _erase_age,
-                       std::chrono::duration<Rep,Per> _check_time,
-                       bool _allow_limit_break = false) :
-       super(_client_info_f,
-             _idle_age, _erase_age, _check_time,
-             _allow_limit_break)
-      {
-       can_handle_f = _can_handle_f;
-       handle_f = _handle_f;
-       sched_ahead_thd = std::thread(&PushPriorityQueue::run_sched_ahead, this);
-      }
-
-
-      // push convenience constructor
-      PushPriorityQueue(typename super::ClientInfoFunc _client_info_f,
-                       CanHandleRequestFunc _can_handle_f,
-                       HandleRequestFunc _handle_f,
-                       bool _allow_limit_break = false) :
-       PushPriorityQueue(_client_info_f,
-                         _can_handle_f,
-                         _handle_f,
-                         std::chrono::minutes(10),
-                         std::chrono::minutes(15),
-                         std::chrono::minutes(6),
-                         _allow_limit_break)
-      {
-       // empty
-      }
-
-
-      ~PushPriorityQueue() {
-       this->finishing = true;
-       sched_ahead_cv.notify_one();
-       sched_ahead_thd.join();
-      }
-
-    public:
-
-      inline void add_request(R&& request,
-                             const C& client_id,
-                             const ReqParams& req_params,
-                             double addl_cost = 0.0) {
-       add_request(typename super::RequestRef(new R(std::move(request))),
-                   client_id,
-                   req_params,
-                   get_time(),
-                   addl_cost);
-      }
-
-
-      inline void add_request(typename super::RequestRef&& request,
-                             const C& client_id,
-                             const ReqParams& req_params,
-                             double addl_cost = 0.0) {
-       add_request(request, req_params, client_id, get_time(), addl_cost);
-      }
-
-
-      inline void add_request_time(const R& request,
-                                  const C& client_id,
-                                  const ReqParams& req_params,
-                                  const Time time,
-                                  double addl_cost = 0.0) {
-       add_request(typename super::RequestRef(new R(request)),
-                   client_id,
-                   req_params,
-                   time,
-                   addl_cost);
-      }
-
-
-      void add_request(typename super::RequestRef&& request,
-                      const C& client_id,
-                      const ReqParams& req_params,
-                      const Time time,
-                      double addl_cost = 0.0) {
-       typename super::DataGuard g(this->data_mtx);
-#ifdef PROFILE
-       add_request_timer.start();
-#endif
-       super::do_add_request(std::move(request),
-                             client_id,
-                             req_params,
-                             time,
-                             addl_cost);
-       schedule_request();
-#ifdef PROFILE
-       add_request_timer.stop();
-#endif
-      }
-
-
-      void request_completed() {
-       typename super::DataGuard g(this->data_mtx);
-#ifdef PROFILE
-       request_complete_timer.start();
-#endif
-       schedule_request();
-#ifdef PROFILE
-       request_complete_timer.stop();
-#endif
-      }
-
-    protected:
-
-      // data_mtx should be held when called; furthermore, the heap
-      // should not be empty and the top element of the heap should
-      // not be already handled
-      //
-      // NOTE: the use of "super::ClientRec" in either the template
-      // construct or as a parameter to submit_top_request generated
-      // a compiler error in g++ 4.8.4, when ClientRec was
-      // "protected" rather than "public". By g++ 6.3.1 this was not
-      // an issue. But for backwards compatibility
-      // PriorityQueueBase::ClientRec is public.
-      template<typename C1,
-              IndIntruHeapData super::ClientRec::*C2,
-              typename C3,
-              uint B4>
-      C submit_top_request(IndIntruHeap<C1,typename super::ClientRec,C2,C3,B4>& heap,
-                          PhaseType phase) {
-       C client_result;
-       super::pop_process_request(heap,
-                                  [this, phase, &client_result]
-                                  (const C& client,
-                                   typename super::RequestRef& request) {
-                                    client_result = client;
-                                    handle_f(client, std::move(request), phase);
-                                  });
-       return client_result;
-      }
-
-
-      // data_mtx should be held when called
-      void submit_request(typename super::HeapId heap_id) {
-       C client;
-       switch(heap_id) {
-       case super::HeapId::reservation:
-         // don't need to note client
-         (void) submit_top_request(this->resv_heap, PhaseType::reservation);
-         // unlike the other two cases, we do not reduce reservation
-         // tags here
-         ++this->reserv_sched_count;
-         break;
-       case super::HeapId::ready:
-         client = submit_top_request(this->ready_heap, PhaseType::priority);
-         super::reduce_reservation_tags(client);
-         ++this->prop_sched_count;
-         break;
-       default:
-         assert(false);
-       }
-      } // submit_request
-
-
-      // data_mtx should be held when called; unfortunately this
-      // function has to be repeated in both push & pull
-      // specializations
-      typename super::NextReq next_request() {
-       return next_request(get_time());
-      }
-
-
-      // data_mtx should be held when called; overrides member
-      // function in base class to add check for whether a request can
-      // be pushed to the server
-      typename super::NextReq next_request(Time now) {
-       if (!can_handle_f()) {
-         typename super::NextReq result;
-         result.type = super::NextReqType::none;
-         return result;
-       } else {
-         return super::do_next_request(now);
-       }
-      } // next_request
-
-
-      // data_mtx should be held when called
-      void schedule_request() {
-       typename super::NextReq next_req = next_request();
-       switch (next_req.type) {
-       case super::NextReqType::none:
-         return;
-       case super::NextReqType::future:
-         sched_at(next_req.when_ready);
-         break;
-       case super::NextReqType::returning:
-         submit_request(next_req.heap_id);
-         break;
-       default:
-         assert(false);
-       }
-      }
-
-
-      // this is the thread that handles running schedule_request at
-      // future times when nothing can be scheduled immediately
-      void run_sched_ahead() {
-       std::unique_lock<std::mutex> l(sched_ahead_mtx);
-
-       while (!this->finishing) {
-         if (TimeZero == sched_ahead_when) {
-           sched_ahead_cv.wait(l);
-         } else {
-           Time now;
-           while (!this->finishing && (now = get_time()) < sched_ahead_when) {
-             long microseconds_l = long(1 + 1000000 * (sched_ahead_when - now));
-             auto microseconds = std::chrono::microseconds(microseconds_l);
-             sched_ahead_cv.wait_for(l, microseconds);
-           }
-           sched_ahead_when = TimeZero;
-           if (this->finishing) return;
-
-           l.unlock();
-           if (!this->finishing) {
-             typename super::DataGuard g(this->data_mtx);
-             schedule_request();
-           }
-           l.lock();
-         }
-       }
-      }
-
-
-      void sched_at(Time when) {
-       std::lock_guard<std::mutex> l(sched_ahead_mtx);
-       if (this->finishing) return;
-       if (TimeZero == sched_ahead_when || when < sched_ahead_when) {
-         sched_ahead_when = when;
-         sched_ahead_cv.notify_one();
-       }
-      }
-    }; // class PushPriorityQueue
-
-  } // namespace dmclock
-} // namespace crimson