Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / dmclock / src / dmclock_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5  * Copyright (C) 2017 Red Hat Inc.
6  */
7
8
9 #pragma once
10
11 #include <map>
12 #include <deque>
13 #include <chrono>
14 #include <thread>
15 #include <mutex>
16 #include <condition_variable>
17
18 #include "run_every.h"
19 #include "dmclock_util.h"
20 #include "dmclock_recs.h"
21
22
23 namespace crimson {
24   namespace dmclock {
25     struct ServerInfo {
26       Counter   delta_prev_req;
27       Counter   rho_prev_req;
28       uint32_t  my_delta;
29       uint32_t  my_rho;
30
31       ServerInfo(Counter _delta_prev_req,
32                  Counter _rho_prev_req) :
33         delta_prev_req(_delta_prev_req),
34         rho_prev_req(_rho_prev_req),
35         my_delta(0),
36         my_rho(0)
37       {
38         // empty
39       }
40
41       inline void req_update(Counter delta, Counter rho) {
42         delta_prev_req = delta;
43         rho_prev_req = rho;
44         my_delta = 0;
45         my_rho = 0;
46       }
47
48       inline void resp_update(PhaseType phase) {
49         ++my_delta;
50         if (phase == PhaseType::reservation) ++my_rho;
51       }
52     };
53
54
55     // S is server identifier type
56     template<typename S>
57     class ServiceTracker {
58       // we don't want to include gtest.h just for FRIEND_TEST
59       friend class dmclock_client_server_erase_Test;
60
61       using TimePoint = decltype(std::chrono::steady_clock::now());
62       using Duration = std::chrono::milliseconds;
63       using MarkPoint = std::pair<TimePoint,Counter>;
64
65       Counter                 delta_counter; // # reqs completed
66       Counter                 rho_counter;   // # reqs completed via reservation
67       std::map<S,ServerInfo>  server_map;
68       mutable std::mutex      data_mtx;      // protects Counters and map
69
70       using DataGuard = std::lock_guard<decltype(data_mtx)>;
71
72       // clean config
73
74       std::deque<MarkPoint>     clean_mark_points;
75       Duration                  clean_age;     // age at which ServerInfo cleaned
76
77       // NB: All threads declared at end, so they're destructed firs!
78
79       std::unique_ptr<RunEvery> cleaning_job;
80
81
82     public:
83
84       // we have to start the counters at 1, as 0 is used in the
85       // cleaning process
86       template<typename Rep, typename Per>
87       ServiceTracker(std::chrono::duration<Rep,Per> _clean_every,
88                      std::chrono::duration<Rep,Per> _clean_age) :
89         delta_counter(1),
90         rho_counter(1),
91         clean_age(std::chrono::duration_cast<Duration>(_clean_age))
92       {
93         cleaning_job =
94           std::unique_ptr<RunEvery>(
95             new RunEvery(_clean_every,
96                          std::bind(&ServiceTracker::do_clean, this)));
97       }
98
99
100       // the reason we're overloading the constructor rather than
101       // using default values for the arguments is so that callers
102       // have to either use all defaults or specify all timings; with
103       // default arguments they could specify some without others
104       ServiceTracker() :
105         ServiceTracker(std::chrono::minutes(5), std::chrono::minutes(10))
106       {
107         // empty
108       }
109
110
111       /*
112        * Incorporates the RespParams received into the various counter.
113        */
114       void track_resp(const S& server_id, const PhaseType& phase) {
115         DataGuard g(data_mtx);
116
117         auto it = server_map.find(server_id);
118         if (server_map.end() == it) {
119           // this code can only run if a request did not precede the
120           // response or if the record was cleaned up b/w when
121           // the request was made and now
122           ServerInfo si(delta_counter, rho_counter);
123           si.resp_update(phase);
124           server_map.emplace(server_id, si);
125         } else {
126           it->second.resp_update(phase);
127         }
128
129         ++delta_counter;
130         if (PhaseType::reservation == phase) {
131           ++rho_counter;
132         }
133       }
134
135
136       /*
137        * Returns the ReqParams for the given server.
138        */
139       ReqParams get_req_params(const S& server) {
140         DataGuard g(data_mtx);
141         auto it = server_map.find(server);
142         if (server_map.end() == it) {
143           server_map.emplace(server, ServerInfo(delta_counter, rho_counter));
144           return ReqParams(1, 1);
145         } else {
146           Counter delta =
147             1 + delta_counter - it->second.delta_prev_req - it->second.my_delta;
148           Counter rho =
149             1 + rho_counter - it->second.rho_prev_req - it->second.my_rho;
150
151           it->second.req_update(delta_counter, rho_counter);
152
153           return ReqParams(uint32_t(delta), uint32_t(rho));
154         }
155       }
156
157     private:
158
159       /*
160        * This is being called regularly by RunEvery. Every time it's
161        * called it notes the time and delta counter (mark point) in a
162        * deque. It also looks at the deque to find the most recent
163        * mark point that is older than clean_age. It then walks the
164        * map and delete all server entries that were last used before
165        * that mark point.
166        */
167       void do_clean() {
168         TimePoint now = std::chrono::steady_clock::now();
169         DataGuard g(data_mtx);
170         clean_mark_points.emplace_back(MarkPoint(now, delta_counter));
171
172         Counter earliest = 0;
173         auto point = clean_mark_points.front();
174         while (point.first <= now - clean_age) {
175           earliest = point.second;
176           clean_mark_points.pop_front();
177           point = clean_mark_points.front();
178         }
179
180         if (earliest > 0) {
181           for (auto i = server_map.begin();
182                i != server_map.end();
183                /* empty */) {
184             auto i2 = i++;
185             if (i2->second.delta_prev_req <= earliest) {
186               server_map.erase(i2);
187             }
188           }
189         }
190       } // do_clean
191     }; // class ServiceTracker
192   }
193 }