initial code repo
[stor4nfv.git] / src / ceph / src / dmclock / src / dmclock_client.h
diff --git a/src/ceph/src/dmclock/src/dmclock_client.h b/src/ceph/src/dmclock/src/dmclock_client.h
new file mode 100644 (file)
index 0000000..92f4cf8
--- /dev/null
@@ -0,0 +1,193 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2017 Red Hat Inc.
+ */
+
+
+#pragma once
+
+#include <map>
+#include <deque>
+#include <chrono>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+
+#include "run_every.h"
+#include "dmclock_util.h"
+#include "dmclock_recs.h"
+
+
+namespace crimson {
+  namespace dmclock {
+    struct ServerInfo {
+      Counter   delta_prev_req;
+      Counter   rho_prev_req;
+      uint32_t  my_delta;
+      uint32_t  my_rho;
+
+      ServerInfo(Counter _delta_prev_req,
+                Counter _rho_prev_req) :
+       delta_prev_req(_delta_prev_req),
+       rho_prev_req(_rho_prev_req),
+       my_delta(0),
+       my_rho(0)
+      {
+       // empty
+      }
+
+      inline void req_update(Counter delta, Counter rho) {
+       delta_prev_req = delta;
+       rho_prev_req = rho;
+       my_delta = 0;
+       my_rho = 0;
+      }
+
+      inline void resp_update(PhaseType phase) {
+       ++my_delta;
+       if (phase == PhaseType::reservation) ++my_rho;
+      }
+    };
+
+
+    // S is server identifier type
+    template<typename S>
+    class ServiceTracker {
+      // we don't want to include gtest.h just for FRIEND_TEST
+      friend class dmclock_client_server_erase_Test;
+
+      using TimePoint = decltype(std::chrono::steady_clock::now());
+      using Duration = std::chrono::milliseconds;
+      using MarkPoint = std::pair<TimePoint,Counter>;
+
+      Counter                 delta_counter; // # reqs completed
+      Counter                 rho_counter;   // # reqs completed via reservation
+      std::map<S,ServerInfo>  server_map;
+      mutable std::mutex      data_mtx;      // protects Counters and map
+
+      using DataGuard = std::lock_guard<decltype(data_mtx)>;
+
+      // clean config
+
+      std::deque<MarkPoint>     clean_mark_points;
+      Duration                  clean_age;     // age at which ServerInfo cleaned
+
+      // NB: All threads declared at end, so they're destructed firs!
+
+      std::unique_ptr<RunEvery> cleaning_job;
+
+
+    public:
+
+      // we have to start the counters at 1, as 0 is used in the
+      // cleaning process
+      template<typename Rep, typename Per>
+      ServiceTracker(std::chrono::duration<Rep,Per> _clean_every,
+                    std::chrono::duration<Rep,Per> _clean_age) :
+       delta_counter(1),
+       rho_counter(1),
+       clean_age(std::chrono::duration_cast<Duration>(_clean_age))
+      {
+       cleaning_job =
+         std::unique_ptr<RunEvery>(
+           new RunEvery(_clean_every,
+                        std::bind(&ServiceTracker::do_clean, this)));
+      }
+
+
+      // the reason we're overloading the constructor rather than
+      // using default values for the arguments is so that callers
+      // have to either use all defaults or specify all timings; with
+      // default arguments they could specify some without others
+      ServiceTracker() :
+       ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10))
+      {
+       // empty
+      }
+
+
+      /*
+       * Incorporates the RespParams received into the various counter.
+       */
+      void track_resp(const S& server_id, const PhaseType& phase) {
+       DataGuard g(data_mtx);
+
+       auto it = server_map.find(server_id);
+       if (server_map.end() == it) {
+         // this code can only run if a request did not precede the
+         // response or if the record was cleaned up b/w when
+         // the request was made and now
+         ServerInfo si(delta_counter, rho_counter);
+         si.resp_update(phase);
+         server_map.emplace(server_id, si);
+       } else {
+         it->second.resp_update(phase);
+       }
+
+       ++delta_counter;
+       if (PhaseType::reservation == phase) {
+         ++rho_counter;
+       }
+      }
+
+
+      /*
+       * Returns the ReqParams for the given server.
+       */
+      ReqParams get_req_params(const S& server) {
+       DataGuard g(data_mtx);
+       auto it = server_map.find(server);
+       if (server_map.end() == it) {
+         server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
+         return ReqParams(1, 1);
+       } else {
+         Counter delta =
+           1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
+         Counter rho =
+           1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
+
+         it->second.req_update(delta_counter, rho_counter);
+
+         return ReqParams(uint32_t(delta), uint32_t(rho));
+       }
+      }
+
+    private:
+
+      /*
+       * This is being called regularly by RunEvery. Every time it's
+       * called it notes the time and delta counter (mark point) in a
+       * deque. It also looks at the deque to find the most recent
+       * mark point that is older than clean_age. It then walks the
+       * map and delete all server entries that were last used before
+       * that mark point.
+       */
+      void do_clean() {
+       TimePoint now = std::chrono::steady_clock::now();
+       DataGuard g(data_mtx);
+       clean_mark_points.emplace_back(MarkPoint(now, delta_counter));
+
+       Counter earliest = 0;
+       auto point = clean_mark_points.front();
+       while (point.first <= now - clean_age) {
+         earliest = point.second;
+         clean_mark_points.pop_front();
+         point = clean_mark_points.front();
+       }
+
+       if (earliest > 0) {
+         for (auto i = server_map.begin();
+              i != server_map.end();
+              /* empty */) {
+           auto i2 = i++;
+           if (i2->second.delta_prev_req <= earliest) {
+             server_map.erase(i2);
+           }
+         }
+       }
+      } // do_clean
+    }; // class ServiceTracker
+  }
+}