// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include #include #include #include #include #include #include #include #include #include #include "rgw_asio_frontend.h" #include "rgw_asio_client.h" #define dout_subsys ceph_subsys_rgw #undef dout_prefix #define dout_prefix (*_dout << "asio: ") namespace { class Pauser { std::mutex mutex; std::condition_variable cond_ready; // signaled on ready==true std::condition_variable cond_paused; // signaled on waiters==thread_count bool ready{false}; int waiters{0}; public: template void pause(int thread_count, Func&& func); void unpause(); void wait(); }; template void Pauser::pause(int thread_count, Func&& func) { std::unique_lock lock(mutex); ready = false; lock.unlock(); func(); // wait for all threads to pause lock.lock(); cond_paused.wait(lock, [=] { return waiters == thread_count; }); } void Pauser::unpause() { std::lock_guard lock(mutex); ready = true; cond_ready.notify_all(); } void Pauser::wait() { std::unique_lock lock(mutex); ++waiters; cond_paused.notify_one(); // notify pause() that we're waiting cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() --waiters; } using tcp = boost::asio::ip::tcp; // coroutine to handle a client connection to completion static void handle_connection(RGWProcessEnv& env, tcp::socket socket, boost::asio::yield_context yield) { auto cct = env.store->ctx(); boost::system::error_code ec; beast::flat_streambuf buffer{1024}; // read messages from the socket until eof for (;;) { // parse the header rgw::asio::parser_type parser; do { auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]); buffer.consume(bytes); } while (!ec && !parser.got_header()); if (ec == boost::asio::error::connection_reset || ec == boost::asio::error::eof) { return; } if (ec) { auto& message = parser.get(); ldout(cct, 1) << "read failed: " << ec.message() << dendl; ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; beast::http::response response; response.status = 400; response.reason = "Bad Request"; response.version = message.version == 10 ? 10 : 11; beast::http::prepare(response); beast::http::async_write(socket, std::move(response), yield[ec]); // ignore ec return; } // process the request RGWRequest req{env.store->get_new_req_id()}; rgw::asio::ClientIO real_client{socket, parser, buffer}; auto real_client_io = rgw::io::add_reordering( rgw::io::add_buffering(cct, rgw::io::add_chunking( rgw::io::add_conlen_controlling( &real_client)))); RGWRestfulIO client(cct, &real_client_io); process_request(env.store, env.rest, &req, env.uri_prefix, *env.auth_registry, &client, env.olog); if (real_client.get_conn_close()) { return; } } } class AsioFrontend { RGWProcessEnv env; boost::asio::io_service service; tcp::acceptor acceptor; tcp::socket peer_socket; std::vector threads; Pauser pauser; std::atomic going_down{false}; CephContext* ctx() const { return env.store->ctx(); } void accept(boost::system::error_code ec); public: AsioFrontend(const RGWProcessEnv& env) : env(env), acceptor(service), peer_socket(service) {} int init(); int run(); void stop(); void join(); void pause(); void unpause(RGWRados* store, rgw_auth_registry_ptr_t); }; int AsioFrontend::init() { auto ep = tcp::endpoint{tcp::v4(), static_cast(env.port)}; ldout(ctx(), 4) << "frontend listening on " << ep << dendl; boost::system::error_code ec; acceptor.open(ep.protocol(), ec); if (ec) { lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; return -ec.value(); } acceptor.set_option(tcp::acceptor::reuse_address(true)); acceptor.bind(ep, ec); if (ec) { lderr(ctx()) << "failed to bind address " << ep << ": " << ec.message() << dendl; return -ec.value(); } acceptor.listen(boost::asio::socket_base::max_connections); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); return 0; } void AsioFrontend::accept(boost::system::error_code ec) { if (!acceptor.is_open()) { return; } else if (ec == boost::asio::error::operation_aborted) { return; } else if (ec) { throw ec; } auto socket = std::move(peer_socket); // spawn a coroutine to handle the connection boost::asio::spawn(service, [&] (boost::asio::yield_context yield) { handle_connection(env, std::move(socket), yield); }); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); } int AsioFrontend::run() { auto cct = ctx(); const int thread_count = cct->_conf->rgw_thread_pool_size; threads.reserve(thread_count); ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; for (int i = 0; i < thread_count; i++) { threads.emplace_back([=] { for (;;) { service.run(); if (going_down) { break; } pauser.wait(); } }); } return 0; } void AsioFrontend::stop() { ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; going_down = true; boost::system::error_code ec; acceptor.close(ec); // unblock the run() threads } void AsioFrontend::join() { if (!going_down) { stop(); } ldout(ctx(), 4) << "frontend joining threads..." << dendl; for (auto& thread : threads) { thread.join(); } ldout(ctx(), 4) << "frontend done" << dendl; } void AsioFrontend::pause() { ldout(ctx(), 4) << "frontend pausing threads..." << dendl; pauser.pause(threads.size(), [=] { // stop accepting but leave the port open boost::system::error_code ec; acceptor.cancel(ec); }); ldout(ctx(), 4) << "frontend paused" << dendl; } void AsioFrontend::unpause(RGWRados* const store, rgw_auth_registry_ptr_t auth_registry) { env.store = store; env.auth_registry = std::move(auth_registry); ldout(ctx(), 4) << "frontend unpaused" << dendl; service.reset(); acceptor.async_accept(peer_socket, [this] (boost::system::error_code ec) { return accept(ec); }); pauser.unpause(); } } // anonymous namespace class RGWAsioFrontend::Impl : public AsioFrontend { public: Impl(const RGWProcessEnv& env) : AsioFrontend(env) {} }; RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env) : impl(new Impl(env)) { } RGWAsioFrontend::~RGWAsioFrontend() = default; int RGWAsioFrontend::init() { return impl->init(); } int RGWAsioFrontend::run() { return impl->run(); } void RGWAsioFrontend::stop() { impl->stop(); } void RGWAsioFrontend::join() { impl->join(); } void RGWAsioFrontend::pause_for_new_config() { impl->pause(); } void RGWAsioFrontend::unpause_with_new_config( RGWRados* const store, rgw_auth_registry_ptr_t auth_registry ) { impl->unpause(store, std::move(auth_registry)); }