Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <condition_variable>
5 #include <mutex>
6 #include <thread>
7 #include <vector>
8
9 #include <boost/asio.hpp>
10 #include <boost/asio/spawn.hpp>
11
12 #include <beast/core/placeholders.hpp>
13 #include <beast/http/read.hpp>
14 #include <beast/http/string_body.hpp>
15 #include <beast/http/write.hpp>
16
17 #include "rgw_asio_frontend.h"
18 #include "rgw_asio_client.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 #undef dout_prefix
23 #define dout_prefix (*_dout << "asio: ")
24
25 namespace {
26
27 class Pauser {
28   std::mutex mutex;
29   std::condition_variable cond_ready; // signaled on ready==true
30   std::condition_variable cond_paused; // signaled on waiters==thread_count
31   bool ready{false};
32   int waiters{0};
33  public:
34   template <typename Func>
35   void pause(int thread_count, Func&& func);
36   void unpause();
37   void wait();
38 };
39
40 template <typename Func>
41 void Pauser::pause(int thread_count, Func&& func)
42 {
43   std::unique_lock<std::mutex> lock(mutex);
44   ready = false;
45   lock.unlock();
46
47   func();
48
49   // wait for all threads to pause
50   lock.lock();
51   cond_paused.wait(lock, [=] { return waiters == thread_count; });
52 }
53
54 void Pauser::unpause()
55 {
56   std::lock_guard<std::mutex> lock(mutex);
57   ready = true;
58   cond_ready.notify_all();
59 }
60
61 void Pauser::wait()
62 {
63   std::unique_lock<std::mutex> lock(mutex);
64   ++waiters;
65   cond_paused.notify_one(); // notify pause() that we're waiting
66   cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
67   --waiters;
68 }
69
70 using tcp = boost::asio::ip::tcp;
71
72 // coroutine to handle a client connection to completion
73 static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
74                               boost::asio::yield_context yield)
75 {
76   auto cct = env.store->ctx();
77   boost::system::error_code ec;
78
79   beast::flat_streambuf buffer{1024};
80
81   // read messages from the socket until eof
82   for (;;) {
83     // parse the header
84     rgw::asio::parser_type parser;
85     do {
86       auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
87       buffer.consume(bytes);
88     } while (!ec && !parser.got_header());
89
90     if (ec == boost::asio::error::connection_reset ||
91         ec == boost::asio::error::eof) {
92       return;
93     }
94     if (ec) {
95       auto& message = parser.get();
96       ldout(cct, 1) << "read failed: " << ec.message() << dendl;
97       ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
98       beast::http::response<beast::http::string_body> response;
99       response.status = 400;
100       response.reason = "Bad Request";
101       response.version = message.version == 10 ? 10 : 11;
102       beast::http::prepare(response);
103       beast::http::async_write(socket, std::move(response), yield[ec]);
104       // ignore ec
105       return;
106     }
107
108     // process the request
109     RGWRequest req{env.store->get_new_req_id()};
110
111     rgw::asio::ClientIO real_client{socket, parser, buffer};
112
113     auto real_client_io = rgw::io::add_reordering(
114                             rgw::io::add_buffering(cct,
115                               rgw::io::add_chunking(
116                                 rgw::io::add_conlen_controlling(
117                                   &real_client))));
118     RGWRestfulIO client(cct, &real_client_io);
119     process_request(env.store, env.rest, &req, env.uri_prefix,
120                     *env.auth_registry, &client, env.olog);
121
122     if (real_client.get_conn_close()) {
123       return;
124     }
125   }
126 }
127
128 class AsioFrontend {
129   RGWProcessEnv env;
130   boost::asio::io_service service;
131
132   tcp::acceptor acceptor;
133   tcp::socket peer_socket;
134
135   std::vector<std::thread> threads;
136   Pauser pauser;
137   std::atomic<bool> going_down{false};
138
139   CephContext* ctx() const { return env.store->ctx(); }
140
141   void accept(boost::system::error_code ec);
142
143  public:
144   AsioFrontend(const RGWProcessEnv& env)
145     : env(env), acceptor(service), peer_socket(service) {}
146
147   int init();
148   int run();
149   void stop();
150   void join();
151   void pause();
152   void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
153 };
154
155 int AsioFrontend::init()
156 {
157   auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
158   ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
159
160   boost::system::error_code ec;
161   acceptor.open(ep.protocol(), ec);
162   if (ec) {
163     lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
164     return -ec.value();
165   }
166   acceptor.set_option(tcp::acceptor::reuse_address(true));
167   acceptor.bind(ep, ec);
168   if (ec) {
169     lderr(ctx()) << "failed to bind address " << ep <<
170         ": " << ec.message() << dendl;
171     return -ec.value();
172   }
173   acceptor.listen(boost::asio::socket_base::max_connections);
174   acceptor.async_accept(peer_socket,
175                         [this] (boost::system::error_code ec) {
176                           return accept(ec);
177                         });
178   return 0;
179 }
180
181 void AsioFrontend::accept(boost::system::error_code ec)
182 {
183   if (!acceptor.is_open()) {
184     return;
185   } else if (ec == boost::asio::error::operation_aborted) {
186     return;
187   } else if (ec) {
188     throw ec;
189   }
190   auto socket = std::move(peer_socket);
191   // spawn a coroutine to handle the connection
192   boost::asio::spawn(service,
193                      [&] (boost::asio::yield_context yield) {
194                        handle_connection(env, std::move(socket), yield);
195                      });
196   acceptor.async_accept(peer_socket,
197                         [this] (boost::system::error_code ec) {
198                           return accept(ec);
199                         });
200 }
201
202 int AsioFrontend::run()
203 {
204   auto cct = ctx();
205   const int thread_count = cct->_conf->rgw_thread_pool_size;
206   threads.reserve(thread_count);
207
208   ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
209
210   for (int i = 0; i < thread_count; i++) {
211     threads.emplace_back([=] {
212       for (;;) {
213         service.run();
214         if (going_down) {
215           break;
216         }
217         pauser.wait();
218       }
219     });
220   }
221   return 0;
222 }
223
224 void AsioFrontend::stop()
225 {
226   ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
227
228   going_down = true;
229
230   boost::system::error_code ec;
231   acceptor.close(ec); // unblock the run() threads
232 }
233
234 void AsioFrontend::join()
235 {
236   if (!going_down) {
237     stop();
238   }
239   ldout(ctx(), 4) << "frontend joining threads..." << dendl;
240   for (auto& thread : threads) {
241     thread.join();
242   }
243   ldout(ctx(), 4) << "frontend done" << dendl;
244 }
245
246 void AsioFrontend::pause()
247 {
248   ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
249   pauser.pause(threads.size(), [=] {
250     // stop accepting but leave the port open
251     boost::system::error_code ec;
252     acceptor.cancel(ec);
253   });
254   ldout(ctx(), 4) << "frontend paused" << dendl;
255 }
256
257 void AsioFrontend::unpause(RGWRados* const store,
258                            rgw_auth_registry_ptr_t auth_registry)
259 {
260   env.store = store;
261   env.auth_registry = std::move(auth_registry);
262   ldout(ctx(), 4) << "frontend unpaused" << dendl;
263   service.reset();
264   acceptor.async_accept(peer_socket,
265                         [this] (boost::system::error_code ec) {
266                           return accept(ec);
267                         });
268   pauser.unpause();
269 }
270
271 } // anonymous namespace
272
273 class RGWAsioFrontend::Impl : public AsioFrontend {
274  public:
275   Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
276 };
277
278 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
279   : impl(new Impl(env))
280 {
281 }
282
283 RGWAsioFrontend::~RGWAsioFrontend() = default;
284
285 int RGWAsioFrontend::init()
286 {
287   return impl->init();
288 }
289
290 int RGWAsioFrontend::run()
291 {
292   return impl->run();
293 }
294
295 void RGWAsioFrontend::stop()
296 {
297   impl->stop();
298 }
299
300 void RGWAsioFrontend::join()
301 {
302   impl->join();
303 }
304
305 void RGWAsioFrontend::pause_for_new_config()
306 {
307   impl->pause();
308 }
309
310 void RGWAsioFrontend::unpause_with_new_config(
311   RGWRados* const store,
312   rgw_auth_registry_ptr_t auth_registry
313 ) {
314   impl->unpause(store, std::move(auth_registry));
315 }