Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / dmclock / sim / src / sim_server.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5  * Copyright (C) 2016 Red Hat Inc.
6  */
7
8
9 #pragma once
10
11
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <chrono>
16 #include <deque>
17
18 #include "sim_recs.h"
19
20
21 namespace crimson {
22   namespace qos_simulation {
23
24     template<typename Q, typename ReqPm, typename RespPm, typename Accum>
25     class SimulatedServer {
26
27       struct QueueItem {
28         ClientId                     client;
29         std::unique_ptr<TestRequest> request;
30         RespPm                       additional;
31
32         QueueItem(const ClientId&                _client,
33                   std::unique_ptr<TestRequest>&& _request,
34                   const RespPm&                  _additional) :
35           client(_client),
36           request(std::move(_request)),
37           additional(_additional)
38         {
39           // empty
40         }
41       }; // QueueItem
42
43     public:
44
45       struct InternalStats {
46         std::mutex mtx;
47         std::chrono::nanoseconds add_request_time;
48         std::chrono::nanoseconds request_complete_time;
49         uint32_t add_request_count;
50         uint32_t request_complete_count;
51
52         InternalStats() :
53           add_request_time(0),
54           request_complete_time(0),
55           add_request_count(0),
56           request_complete_count(0)
57         {
58           // empty
59         }
60       };
61
62       using ClientRespFunc = std::function<void(ClientId,
63                                                 const TestResponse&,
64                                                 const ServerId&,
65                                                 const RespPm&)>;
66
67       using ServerAccumFunc = std::function<void(Accum& accumulator,
68                                                  const RespPm& additional)>;
69
70     protected:
71
72       const ServerId                 id;
73       Q*                             priority_queue;
74       ClientRespFunc                 client_resp_f;
75       int                            iops;
76       size_t                         thread_pool_size;
77
78       bool                           finishing;
79       std::chrono::microseconds      op_time;
80
81       std::mutex                     inner_queue_mtx;
82       std::condition_variable        inner_queue_cv;
83       std::deque<QueueItem>          inner_queue;
84
85       std::thread*                   threads;
86
87       using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>;
88       using Lock = std::unique_lock<std::mutex>;
89
90       // data collection
91
92       ServerAccumFunc accum_f;
93       Accum accumulator;
94
95       InternalStats internal_stats;
96
97     public:
98
99       using CanHandleRequestFunc = std::function<bool(void)>;
100       using HandleRequestFunc =
101         std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&)>;
102       using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>;
103                                         
104
105       SimulatedServer(ServerId _id,
106                       int _iops,
107                       size_t _thread_pool_size,
108                       const ClientRespFunc& _client_resp_f,
109                       const ServerAccumFunc& _accum_f,
110                       CreateQueueF _create_queue_f) :
111         id(_id),
112         priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread,
113                                                  this),
114                                        std::bind(&SimulatedServer::inner_post,
115                                                  this,
116                                                  std::placeholders::_1,
117                                                  std::placeholders::_2,
118                                                  std::placeholders::_3))),
119         client_resp_f(_client_resp_f),
120         iops(_iops),
121         thread_pool_size(_thread_pool_size),
122         finishing(false),
123         accum_f(_accum_f)
124       {
125         op_time =
126           std::chrono::microseconds((int) (0.5 +
127                                            thread_pool_size * 1000000.0 / iops));
128         std::chrono::milliseconds delay(1000);
129         threads = new std::thread[thread_pool_size];
130         for (size_t i = 0; i < thread_pool_size; ++i) {
131           threads[i] = std::thread(&SimulatedServer::run, this, delay);
132         }
133       }
134
135       virtual ~SimulatedServer() {
136         Lock l(inner_queue_mtx);
137         finishing = true;
138         inner_queue_cv.notify_all();
139         l.unlock();
140
141         for (size_t i = 0; i < thread_pool_size; ++i) {
142           threads[i].join();
143         }
144
145         delete[] threads;
146
147         delete priority_queue;
148       }
149
150       void post(TestRequest&& request,
151                 const ClientId& client_id,
152                 const ReqPm& req_params)
153       {
154         time_stats(internal_stats.mtx,
155                    internal_stats.add_request_time,
156                    [&](){
157                      priority_queue->add_request(std::move(request),
158                                                  client_id, req_params);
159                    });
160         count_stats(internal_stats.mtx,
161                     internal_stats.add_request_count);
162       }
163
164       bool has_avail_thread() {
165         InnerQGuard g(inner_queue_mtx);
166         return inner_queue.size() <= thread_pool_size;
167       }
168
169       const Accum& get_accumulator() const { return accumulator; }
170       const Q& get_priority_queue() const { return *priority_queue; }
171       const InternalStats& get_internal_stats() const { return internal_stats; }
172
173     protected:
174
175       void inner_post(const ClientId& client,
176                       std::unique_ptr<TestRequest> request,
177                       const RespPm& additional) {
178         Lock l(inner_queue_mtx);
179         assert(!finishing);
180         accum_f(accumulator, additional);
181         inner_queue.emplace_back(QueueItem(client,
182                                            std::move(request),
183                                            additional));
184         inner_queue_cv.notify_one();
185       }
186
187       void run(std::chrono::milliseconds check_period) {
188         Lock l(inner_queue_mtx);
189         while(true) {
190           while(inner_queue.empty() && !finishing) {
191             inner_queue_cv.wait_for(l, check_period);
192           }
193           if (!inner_queue.empty()) {
194             auto& front = inner_queue.front();
195             auto client = front.client;
196             auto req = std::move(front.request);
197             auto additional = front.additional;
198             inner_queue.pop_front();
199
200             l.unlock();
201
202             // simulation operation by sleeping; then call function to
203             // notify server of completion
204             std::this_thread::sleep_for(op_time);
205
206             // TODO: rather than assuming this constructor exists, perhaps
207             // pass in a function that does this mapping?
208             client_resp_f(client, TestResponse{req->epoch}, id, additional);
209
210             time_stats(internal_stats.mtx,
211                        internal_stats.request_complete_time,
212                        [&](){
213                          priority_queue->request_completed();
214                        });
215             count_stats(internal_stats.mtx,
216                         internal_stats.request_complete_count);
217
218             l.lock(); // in prep for next iteration of loop
219           } else {
220             break;
221           }
222         }
223       }
224     }; // class SimulatedServer
225
226   }; // namespace qos_simulation
227 }; // namespace crimson