--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2017 Red Hat Inc.
+ */
+
+
+#pragma once
+
+#include <map>
+#include <deque>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+
+#include "run_every.h"
+#include "dmclock_util.h"
+#include "dmclock_recs.h"
+
+
+namespace crimson {
+ namespace dmclock {
+ struct ServerInfo {
+ Counter delta_prev_req;
+ Counter rho_prev_req;
+ uint32_t my_delta;
+ uint32_t my_rho;
+
+ ServerInfo(Counter _delta_prev_req,
+ Counter _rho_prev_req) :
+ delta_prev_req(_delta_prev_req),
+ rho_prev_req(_rho_prev_req),
+ my_delta(0),
+ my_rho(0)
+ {
+ // empty
+ }
+
+ inline void req_update(Counter delta, Counter rho) {
+ delta_prev_req = delta;
+ rho_prev_req = rho;
+ my_delta = 0;
+ my_rho = 0;
+ }
+
+ inline void resp_update(PhaseType phase) {
+ ++my_delta;
+ if (phase == PhaseType::reservation) ++my_rho;
+ }
+ };
+
+
+ // S is server identifier type
+ template<typename S>
+ class ServiceTracker {
+ // we don't want to include gtest.h just for FRIEND_TEST
+ friend class dmclock_client_server_erase_Test;
+
+ using TimePoint = decltype(std::chrono::steady_clock::now());
+ using Duration = std::chrono::milliseconds;
+ using MarkPoint = std::pair<TimePoint,Counter>;
+
+ Counter delta_counter; // # reqs completed
+ Counter rho_counter; // # reqs completed via reservation
+ std::map<S,ServerInfo> server_map;
+ mutable std::mutex data_mtx; // protects Counters and map
+
+ using DataGuard = std::lock_guard<decltype(data_mtx)>;
+
+ // clean config
+
+ std::deque<MarkPoint> clean_mark_points;
+ Duration clean_age; // age at which ServerInfo cleaned
+
+ // NB: All threads declared at end, so they're destructed firs!
+
+ std::unique_ptr<RunEvery> cleaning_job;
+
+
+ public:
+
+ // we have to start the counters at 1, as 0 is used in the
+ // cleaning process
+ template<typename Rep, typename Per>
+ ServiceTracker(std::chrono::duration<Rep,Per> _clean_every,
+ std::chrono::duration<Rep,Per> _clean_age) :
+ delta_counter(1),
+ rho_counter(1),
+ clean_age(std::chrono::duration_cast<Duration>(_clean_age))
+ {
+ cleaning_job =
+ std::unique_ptr<RunEvery>(
+ new RunEvery(_clean_every,
+ std::bind(&ServiceTracker::do_clean, this)));
+ }
+
+
+ // the reason we're overloading the constructor rather than
+ // using default values for the arguments is so that callers
+ // have to either use all defaults or specify all timings; with
+ // default arguments they could specify some without others
+ ServiceTracker() :
+ ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10))
+ {
+ // empty
+ }
+
+
+ /*
+ * Incorporates the RespParams received into the various counter.
+ */
+ void track_resp(const S& server_id, const PhaseType& phase) {
+ DataGuard g(data_mtx);
+
+ auto it = server_map.find(server_id);
+ if (server_map.end() == it) {
+ // this code can only run if a request did not precede the
+ // response or if the record was cleaned up b/w when
+ // the request was made and now
+ ServerInfo si(delta_counter, rho_counter);
+ si.resp_update(phase);
+ server_map.emplace(server_id, si);
+ } else {
+ it->second.resp_update(phase);
+ }
+
+ ++delta_counter;
+ if (PhaseType::reservation == phase) {
+ ++rho_counter;
+ }
+ }
+
+
+ /*
+ * Returns the ReqParams for the given server.
+ */
+ ReqParams get_req_params(const S& server) {
+ DataGuard g(data_mtx);
+ auto it = server_map.find(server);
+ if (server_map.end() == it) {
+ server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
+ return ReqParams(1, 1);
+ } else {
+ Counter delta =
+ 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
+ Counter rho =
+ 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
+
+ it->second.req_update(delta_counter, rho_counter);
+
+ return ReqParams(uint32_t(delta), uint32_t(rho));
+ }
+ }
+
+ private:
+
+ /*
+ * This is being called regularly by RunEvery. Every time it's
+ * called it notes the time and delta counter (mark point) in a
+ * deque. It also looks at the deque to find the most recent
+ * mark point that is older than clean_age. It then walks the
+ * map and delete all server entries that were last used before
+ * that mark point.
+ */
+ void do_clean() {
+ TimePoint now = std::chrono::steady_clock::now();
+ DataGuard g(data_mtx);
+ clean_mark_points.emplace_back(MarkPoint(now, delta_counter));
+
+ Counter earliest = 0;
+ auto point = clean_mark_points.front();
+ while (point.first <= now - clean_age) {
+ earliest = point.second;
+ clean_mark_points.pop_front();
+ point = clean_mark_points.front();
+ }
+
+ if (earliest > 0) {
+ for (auto i = server_map.begin();
+ i != server_map.end();
+ /* empty */) {
+ auto i2 = i++;
+ if (i2->second.delta_prev_req <= earliest) {
+ server_map.erase(i2);
+ }
+ }
+ }
+ } // do_clean
+ }; // class ServiceTracker
+ }
+}