1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Copyright (C) 2016 Red Hat Inc.
14 #include <condition_variable>
25 namespace qos_simulation {
29 constexpr struct req_op_t req_op {};
30 constexpr struct wait_op_t wait_op {};
33 enum class CliOp { req, wait };
37 std::chrono::milliseconds wait_time;
40 std::chrono::microseconds time_bw_reqs;
41 uint16_t max_outstanding;
45 // D is a duration type
47 CliInst(wait_op_t, D duration) :
51 std::chrono::duration_cast<std::chrono::milliseconds>(duration);
55 uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
58 args.req_params.count = count;
59 args.req_params.max_outstanding = max_outstanding;
60 uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
61 args.req_params.time_bw_reqs = std::chrono::microseconds(us);
66 using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
69 template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
70 class SimulatedClient {
73 struct InternalStats {
75 std::chrono::nanoseconds track_resp_time;
76 std::chrono::nanoseconds get_req_params_time;
77 uint32_t track_resp_count;
78 uint32_t get_req_params_count;
82 get_req_params_time(0),
84 get_req_params_count(0)
91 std::function<void(const ServerId&,
96 using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
98 typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
100 static TimePoint now() { return std::chrono::steady_clock::now(); }
104 struct RespQueueItem {
105 TestResponse response;
111 const SubmitFunc submit_f;
112 const ServerSelectFunc server_select_f;
113 const ClientAccumFunc accum_f;
115 std::vector<CliInst> instructions;
117 SvcTrk service_tracker;
119 // TODO: use lock rather than atomic???
120 std::atomic_ulong outstanding_ops;
121 std::atomic_bool requests_complete;
123 std::deque<RespQueueItem> resp_queue;
126 std::condition_variable cv_req;
129 std::condition_variable cv_resp;
131 using RespGuard = std::lock_guard<decltype(mtx_resp)>;
132 using Lock = std::unique_lock<std::mutex>;
136 std::vector<TimePoint> op_times;
138 InternalStats internal_stats;
141 std::thread thd_resp;
145 SimulatedClient(ClientId _id,
146 const SubmitFunc& _submit_f,
147 const ServerSelectFunc& _server_select_f,
148 const ClientAccumFunc& _accum_f,
149 const std::vector<CliInst>& _instrs) :
152 server_select_f(_server_select_f),
154 instructions(_instrs),
157 requests_complete(false)
160 for (auto i : instructions) {
161 if (CliOp::req == i.op) {
162 op_count += i.args.req_params.count;
165 op_times.reserve(op_count);
167 thd_resp = std::thread(&SimulatedClient::run_resp, this);
168 thd_req = std::thread(&SimulatedClient::run_req, this);
172 SimulatedClient(ClientId _id,
173 const SubmitFunc& _submit_f,
174 const ServerSelectFunc& _server_select_f,
175 const ClientAccumFunc& _accum_f,
176 uint16_t _ops_to_run,
178 uint16_t _outstanding_ops_allowed) :
180 _submit_f, _server_select_f, _accum_f,
181 {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
187 SimulatedClient(const SimulatedClient&) = delete;
188 SimulatedClient(SimulatedClient&&) = delete;
189 SimulatedClient& operator=(const SimulatedClient&) = delete;
190 SimulatedClient& operator=(SimulatedClient&&) = delete;
192 virtual ~SimulatedClient() {
196 void receive_response(const TestResponse& resp,
197 const ServerId& server_id,
198 const RespPm& resp_params) {
199 RespGuard g(mtx_resp);
200 resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
201 cv_resp.notify_one();
204 const std::vector<TimePoint>& get_op_times() const { return op_times; }
206 void wait_until_done() {
207 if (thd_req.joinable()) thd_req.join();
208 if (thd_resp.joinable()) thd_resp.join();
211 const Accum& get_accumulator() const { return accumulator; }
213 const InternalStats& get_internal_stats() const { return internal_stats; }
218 size_t ops_count = 0;
219 for (auto i : instructions) {
220 if (CliOp::wait == i.op) {
221 std::this_thread::sleep_for(i.args.wait_time);
222 } else if (CliOp::req == i.op) {
224 for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
225 while (outstanding_ops >= i.args.req_params.max_outstanding) {
230 auto now = std::chrono::steady_clock::now();
231 const ServerId& server = server_select_f(o);
234 time_stats_w_return<decltype(internal_stats.get_req_params_time),
235 ReqPm>(internal_stats.mtx,
236 internal_stats.get_req_params_time,
238 return service_tracker.get_req_params(server);
240 count_stats(internal_stats.mtx,
241 internal_stats.get_req_params_count);
244 TestRequest{server, static_cast<uint32_t>(o), 12},
247 l.lock(); // lock for return to top of loop
249 auto delay_time = now + i.args.req_params.time_bw_reqs;
250 while (std::chrono::steady_clock::now() < delay_time) {
251 cv_req.wait_until(l, delay_time);
254 ops_count += i.args.req_params.count;
260 requests_complete = true;
262 // all requests made, thread ends
267 std::chrono::milliseconds delay(1000);
272 // since the following code would otherwise be repeated (except for
273 // the call to notify_one) in the two loops below; let's avoid
274 // repetition and define it once.
275 const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
276 if (!resp_queue.empty()) {
277 RespQueueItem item = resp_queue.front();
278 resp_queue.pop_front();
284 op_times.push_back(now());
285 accum_f(accumulator, item.resp_params);
290 TestResponse& resp = item.response;
293 time_stats(internal_stats.mtx,
294 internal_stats.track_resp_time,
296 service_tracker.track_resp(item.server_id, item.resp_params);
298 count_stats(internal_stats.mtx,
299 internal_stats.track_resp_count);
310 while(!requests_complete.load()) {
311 while(resp_queue.empty() && !requests_complete.load()) {
312 cv_resp.wait_for(l, delay);
317 while(outstanding_ops.load() > 0) {
318 while(resp_queue.empty() && outstanding_ops.load() > 0) {
319 cv_resp.wait_for(l, delay);
321 proc_resp(false); // don't call notify_one as all requests are complete
324 // all responses received, thread ends
326 }; // class SimulatedClient
329 }; // namespace qos_simulation
330 }; // namespace crimson