Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / dmclock / sim / src / sim_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5  * Copyright (C) 2016 Red Hat Inc.
6  */
7
8
9 #pragma once
10
11
12 #include <atomic>
13 #include <mutex>
14 #include <condition_variable>
15 #include <thread>
16 #include <chrono>
17 #include <vector>
18 #include <deque>
19 #include <iostream>
20
21 #include "sim_recs.h"
22
23
24 namespace crimson {
25   namespace qos_simulation {
26
27     struct req_op_t {};
28     struct wait_op_t {};
29     constexpr struct req_op_t req_op {};
30     constexpr struct wait_op_t wait_op {};
31
32
33     enum class CliOp { req, wait };
34     struct CliInst {
35       CliOp op;
36       union {
37         std::chrono::milliseconds wait_time;
38         struct {
39           uint32_t count;
40           std::chrono::microseconds time_bw_reqs;
41           uint16_t max_outstanding;
42         } req_params;
43       } args;
44
45       // D is a duration type
46       template<typename D>
47       CliInst(wait_op_t, D duration) :
48         op(CliOp::wait)
49       {
50         args.wait_time =
51           std::chrono::duration_cast<std::chrono::milliseconds>(duration);
52       }
53
54       CliInst(req_op_t,
55               uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
56         op(CliOp::req)
57       {
58         args.req_params.count = count;
59         args.req_params.max_outstanding = max_outstanding;
60         uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
61         args.req_params.time_bw_reqs = std::chrono::microseconds(us);
62       }
63     };
64
65
66     using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
67
68
69     template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
70     class SimulatedClient {
71     public:
72
73       struct InternalStats {
74         std::mutex mtx;
75         std::chrono::nanoseconds track_resp_time;
76         std::chrono::nanoseconds get_req_params_time;
77         uint32_t track_resp_count;
78         uint32_t get_req_params_count;
79
80         InternalStats() :
81           track_resp_time(0),
82           get_req_params_time(0),
83           track_resp_count(0),
84           get_req_params_count(0)
85         {
86           // empty
87         }
88       };
89
90       using SubmitFunc =
91         std::function<void(const ServerId&,
92                            TestRequest&&,
93                            const ClientId&,
94                            const ReqPm&)>;
95
96       using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
97
98       typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
99
100       static TimePoint now() { return std::chrono::steady_clock::now(); }
101
102     protected:
103
104       struct RespQueueItem {
105         TestResponse response;
106         ServerId     server_id;
107         RespPm       resp_params;
108       };
109
110       const ClientId id;
111       const SubmitFunc submit_f;
112       const ServerSelectFunc server_select_f;
113       const ClientAccumFunc accum_f;
114
115       std::vector<CliInst> instructions;
116
117       SvcTrk service_tracker;
118
119       // TODO: use lock rather than atomic???
120       std::atomic_ulong        outstanding_ops;
121       std::atomic_bool         requests_complete;
122
123       std::deque<RespQueueItem> resp_queue;
124
125       std::mutex               mtx_req;
126       std::condition_variable  cv_req;
127
128       std::mutex               mtx_resp;
129       std::condition_variable  cv_resp;
130
131       using RespGuard = std::lock_guard<decltype(mtx_resp)>;
132       using Lock = std::unique_lock<std::mutex>;
133
134       // data collection
135
136       std::vector<TimePoint>   op_times;
137       Accum                    accumulator;
138       InternalStats            internal_stats;
139
140       std::thread              thd_req;
141       std::thread              thd_resp;
142
143     public:
144
145       SimulatedClient(ClientId _id,
146                       const SubmitFunc& _submit_f,
147                       const ServerSelectFunc& _server_select_f,
148                       const ClientAccumFunc& _accum_f,
149                       const std::vector<CliInst>& _instrs) :
150         id(_id),
151         submit_f(_submit_f),
152         server_select_f(_server_select_f),
153         accum_f(_accum_f),
154         instructions(_instrs),
155         service_tracker(),
156         outstanding_ops(0),
157         requests_complete(false)
158       {
159         size_t op_count = 0;
160         for (auto i : instructions) {
161           if (CliOp::req == i.op) {
162             op_count += i.args.req_params.count;
163           }
164         }
165         op_times.reserve(op_count);
166
167         thd_resp = std::thread(&SimulatedClient::run_resp, this);
168         thd_req = std::thread(&SimulatedClient::run_req, this);
169       }
170
171
172       SimulatedClient(ClientId _id,
173                       const SubmitFunc& _submit_f,
174                       const ServerSelectFunc& _server_select_f,
175                       const ClientAccumFunc& _accum_f,
176                       uint16_t _ops_to_run,
177                       double _iops_goal,
178                       uint16_t _outstanding_ops_allowed) :
179         SimulatedClient(_id,
180                         _submit_f, _server_select_f, _accum_f,
181                         {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
182       {
183         // empty
184       }
185
186
187       SimulatedClient(const SimulatedClient&) = delete;
188       SimulatedClient(SimulatedClient&&) = delete;
189       SimulatedClient& operator=(const SimulatedClient&) = delete;
190       SimulatedClient& operator=(SimulatedClient&&) = delete;
191
192       virtual ~SimulatedClient() {
193         wait_until_done();
194       }
195
196       void receive_response(const TestResponse& resp,
197                             const ServerId& server_id,
198                             const RespPm& resp_params) {
199         RespGuard g(mtx_resp);
200         resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
201         cv_resp.notify_one();
202       }
203
204       const std::vector<TimePoint>& get_op_times() const { return op_times; }
205
206       void wait_until_done() {
207         if (thd_req.joinable()) thd_req.join();
208         if (thd_resp.joinable()) thd_resp.join();
209       }
210
211       const Accum& get_accumulator() const { return accumulator; }
212
213       const InternalStats& get_internal_stats() const { return internal_stats; }
214
215     protected:
216
217       void run_req() {
218         size_t ops_count = 0;
219         for (auto i : instructions) {
220           if (CliOp::wait == i.op) {
221             std::this_thread::sleep_for(i.args.wait_time);
222           } else if (CliOp::req == i.op) {
223             Lock l(mtx_req);
224             for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
225               while (outstanding_ops >= i.args.req_params.max_outstanding) {
226                 cv_req.wait(l);
227               }
228
229               l.unlock();
230               auto now = std::chrono::steady_clock::now();
231               const ServerId& server = server_select_f(o);
232
233               ReqPm rp =
234                 time_stats_w_return<decltype(internal_stats.get_req_params_time),
235                                     ReqPm>(internal_stats.mtx,
236                                            internal_stats.get_req_params_time,
237                                            [&]() -> ReqPm {
238                                              return service_tracker.get_req_params(server);
239                                            });
240               count_stats(internal_stats.mtx,
241                           internal_stats.get_req_params_count);
242
243               submit_f(server,
244                        TestRequest{server, static_cast<uint32_t>(o), 12},
245                        id, rp);
246               ++outstanding_ops;
247               l.lock(); // lock for return to top of loop
248
249               auto delay_time = now + i.args.req_params.time_bw_reqs;
250               while (std::chrono::steady_clock::now() < delay_time) {
251                 cv_req.wait_until(l, delay_time);
252               } // while
253             } // for
254             ops_count += i.args.req_params.count;
255           } else {
256             assert(false);
257           }
258         } // for loop
259
260         requests_complete = true;
261
262         // all requests made, thread ends
263       }
264
265
266       void run_resp() {
267         std::chrono::milliseconds delay(1000);
268         int op = 0;
269
270         Lock l(mtx_resp);
271
272         // since the following code would otherwise be repeated (except for
273         // the call to notify_one) in the two loops below; let's avoid
274         // repetition and define it once.
275         const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
276           if (!resp_queue.empty()) {
277             RespQueueItem item = resp_queue.front();
278             resp_queue.pop_front();
279
280             l.unlock();
281
282             // data collection
283
284             op_times.push_back(now());
285             accum_f(accumulator, item.resp_params);
286
287             // processing
288
289 #if 0 // not needed
290             TestResponse& resp = item.response;
291 #endif
292
293             time_stats(internal_stats.mtx,
294                        internal_stats.track_resp_time,
295                        [&](){
296                          service_tracker.track_resp(item.server_id, item.resp_params);
297                        });
298             count_stats(internal_stats.mtx,
299                         internal_stats.track_resp_count);
300
301             --outstanding_ops;
302             if (notify_req_cv) {
303               cv_req.notify_one();
304             }
305
306             l.lock();
307           }
308         };
309
310         while(!requests_complete.load()) {
311           while(resp_queue.empty() && !requests_complete.load()) {
312             cv_resp.wait_for(l, delay);
313           }
314           proc_resp(true);
315         }
316
317         while(outstanding_ops.load() > 0) {
318           while(resp_queue.empty() && outstanding_ops.load() > 0) {
319             cv_resp.wait_for(l, delay);
320           }
321           proc_resp(false); // don't call notify_one as all requests are complete
322         }
323
324         // all responses received, thread ends
325       }
326     }; // class SimulatedClient
327
328
329   }; // namespace qos_simulation
330 }; // namespace crimson