// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Copyright (C) 2017 Red Hat Inc. */ #pragma once #include #include #include #include #include #include #include "run_every.h" #include "dmclock_util.h" #include "dmclock_recs.h" namespace crimson { namespace dmclock { struct ServerInfo { Counter delta_prev_req; Counter rho_prev_req; uint32_t my_delta; uint32_t my_rho; ServerInfo(Counter _delta_prev_req, Counter _rho_prev_req) : delta_prev_req(_delta_prev_req), rho_prev_req(_rho_prev_req), my_delta(0), my_rho(0) { // empty } inline void req_update(Counter delta, Counter rho) { delta_prev_req = delta; rho_prev_req = rho; my_delta = 0; my_rho = 0; } inline void resp_update(PhaseType phase) { ++my_delta; if (phase == PhaseType::reservation) ++my_rho; } }; // S is server identifier type template class ServiceTracker { // we don't want to include gtest.h just for FRIEND_TEST friend class dmclock_client_server_erase_Test; using TimePoint = decltype(std::chrono::steady_clock::now()); using Duration = std::chrono::milliseconds; using MarkPoint = std::pair; Counter delta_counter; // # reqs completed Counter rho_counter; // # reqs completed via reservation std::map server_map; mutable std::mutex data_mtx; // protects Counters and map using DataGuard = std::lock_guard; // clean config std::deque clean_mark_points; Duration clean_age; // age at which ServerInfo cleaned // NB: All threads declared at end, so they're destructed firs! std::unique_ptr cleaning_job; public: // we have to start the counters at 1, as 0 is used in the // cleaning process template ServiceTracker(std::chrono::duration _clean_every, std::chrono::duration _clean_age) : delta_counter(1), rho_counter(1), clean_age(std::chrono::duration_cast(_clean_age)) { cleaning_job = std::unique_ptr( new RunEvery(_clean_every, std::bind(&ServiceTracker::do_clean, this))); } // the reason we're overloading the constructor rather than // using default values for the arguments is so that callers // have to either use all defaults or specify all timings; with // default arguments they could specify some without others ServiceTracker() : ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10)) { // empty } /* * Incorporates the RespParams received into the various counter. */ void track_resp(const S& server_id, const PhaseType& phase) { DataGuard g(data_mtx); auto it = server_map.find(server_id); if (server_map.end() == it) { // this code can only run if a request did not precede the // response or if the record was cleaned up b/w when // the request was made and now ServerInfo si(delta_counter, rho_counter); si.resp_update(phase); server_map.emplace(server_id, si); } else { it->second.resp_update(phase); } ++delta_counter; if (PhaseType::reservation == phase) { ++rho_counter; } } /* * Returns the ReqParams for the given server. */ ReqParams get_req_params(const S& server) { DataGuard g(data_mtx); auto it = server_map.find(server); if (server_map.end() == it) { server_map.emplace(server, ServerInfo(delta_counter, rho_counter)); return ReqParams(1, 1); } else { Counter delta = 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta; Counter rho = 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho; it->second.req_update(delta_counter, rho_counter); return ReqParams(uint32_t(delta), uint32_t(rho)); } } private: /* * This is being called regularly by RunEvery. Every time it's * called it notes the time and delta counter (mark point) in a * deque. It also looks at the deque to find the most recent * mark point that is older than clean_age. It then walks the * map and delete all server entries that were last used before * that mark point. */ void do_clean() { TimePoint now = std::chrono::steady_clock::now(); DataGuard g(data_mtx); clean_mark_points.emplace_back(MarkPoint(now, delta_counter)); Counter earliest = 0; auto point = clean_mark_points.front(); while (point.first <= now - clean_age) { earliest = point.second; clean_mark_points.pop_front(); point = clean_mark_points.front(); } if (earliest > 0) { for (auto i = server_map.begin(); i != server_map.end(); /* empty */) { auto i2 = i++; if (i2->second.delta_prev_req <= earliest) { server_map.erase(i2); } } } } // do_clean }; // class ServiceTracker } }