X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_asio_frontend.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_asio_frontend.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ee6be6256a5ba69b588c612a517b4d190eeee5db;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_asio_frontend.cc b/src/ceph/src/rgw/rgw_asio_frontend.cc deleted file mode 100644 index ee6be62..0000000 --- a/src/ceph/src/rgw/rgw_asio_frontend.cc +++ /dev/null @@ -1,315 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include - -#include "rgw_asio_frontend.h" -#include "rgw_asio_client.h" - -#define dout_subsys ceph_subsys_rgw - -#undef dout_prefix -#define dout_prefix (*_dout << "asio: ") - -namespace { - -class Pauser { - std::mutex mutex; - std::condition_variable cond_ready; // signaled on ready==true - std::condition_variable cond_paused; // signaled on waiters==thread_count - bool ready{false}; - int waiters{0}; - public: - template - void pause(int thread_count, Func&& func); - void unpause(); - void wait(); -}; - -template -void Pauser::pause(int thread_count, Func&& func) -{ - std::unique_lock lock(mutex); - ready = false; - lock.unlock(); - - func(); - - // wait for all threads to pause - lock.lock(); - cond_paused.wait(lock, [=] { return waiters == thread_count; }); -} - -void Pauser::unpause() -{ - std::lock_guard lock(mutex); - ready = true; - cond_ready.notify_all(); -} - -void Pauser::wait() -{ - std::unique_lock lock(mutex); - ++waiters; - cond_paused.notify_one(); // notify pause() that we're waiting - cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() - --waiters; -} - -using tcp = boost::asio::ip::tcp; - -// coroutine to handle a client connection to completion -static void handle_connection(RGWProcessEnv& env, tcp::socket socket, - boost::asio::yield_context yield) -{ - auto cct = env.store->ctx(); - boost::system::error_code ec; - - beast::flat_streambuf buffer{1024}; - - // read messages from the socket until eof - for (;;) { - // parse the header - rgw::asio::parser_type parser; - do { - auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]); - buffer.consume(bytes); - } while (!ec && !parser.got_header()); - - if (ec == boost::asio::error::connection_reset || - ec == boost::asio::error::eof) { - return; - } - if (ec) { - auto& message = parser.get(); - ldout(cct, 1) << "read failed: " << ec.message() << dendl; - ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; - beast::http::response response; - response.status = 400; - response.reason = "Bad Request"; - response.version = message.version == 10 ? 10 : 11; - beast::http::prepare(response); - beast::http::async_write(socket, std::move(response), yield[ec]); - // ignore ec - return; - } - - // process the request - RGWRequest req{env.store->get_new_req_id()}; - - rgw::asio::ClientIO real_client{socket, parser, buffer}; - - auto real_client_io = rgw::io::add_reordering( - rgw::io::add_buffering(cct, - rgw::io::add_chunking( - rgw::io::add_conlen_controlling( - &real_client)))); - RGWRestfulIO client(cct, &real_client_io); - process_request(env.store, env.rest, &req, env.uri_prefix, - *env.auth_registry, &client, env.olog); - - if (real_client.get_conn_close()) { - return; - } - } -} - -class AsioFrontend { - RGWProcessEnv env; - boost::asio::io_service service; - - tcp::acceptor acceptor; - tcp::socket peer_socket; - - std::vector threads; - Pauser pauser; - std::atomic going_down{false}; - - CephContext* ctx() const { return env.store->ctx(); } - - void accept(boost::system::error_code ec); - - public: - AsioFrontend(const RGWProcessEnv& env) - : env(env), acceptor(service), peer_socket(service) {} - - int init(); - int run(); - void stop(); - void join(); - void pause(); - void unpause(RGWRados* store, rgw_auth_registry_ptr_t); -}; - -int AsioFrontend::init() -{ - auto ep = tcp::endpoint{tcp::v4(), static_cast(env.port)}; - ldout(ctx(), 4) << "frontend listening on " << ep << dendl; - - boost::system::error_code ec; - acceptor.open(ep.protocol(), ec); - if (ec) { - lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; - return -ec.value(); - } - acceptor.set_option(tcp::acceptor::reuse_address(true)); - acceptor.bind(ep, ec); - if (ec) { - lderr(ctx()) << "failed to bind address " << ep << - ": " << ec.message() << dendl; - return -ec.value(); - } - acceptor.listen(boost::asio::socket_base::max_connections); - acceptor.async_accept(peer_socket, - [this] (boost::system::error_code ec) { - return accept(ec); - }); - return 0; -} - -void AsioFrontend::accept(boost::system::error_code ec) -{ - if (!acceptor.is_open()) { - return; - } else if (ec == boost::asio::error::operation_aborted) { - return; - } else if (ec) { - throw ec; - } - auto socket = std::move(peer_socket); - // spawn a coroutine to handle the connection - boost::asio::spawn(service, - [&] (boost::asio::yield_context yield) { - handle_connection(env, std::move(socket), yield); - }); - acceptor.async_accept(peer_socket, - [this] (boost::system::error_code ec) { - return accept(ec); - }); -} - -int AsioFrontend::run() -{ - auto cct = ctx(); - const int thread_count = cct->_conf->rgw_thread_pool_size; - threads.reserve(thread_count); - - ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; - - for (int i = 0; i < thread_count; i++) { - threads.emplace_back([=] { - for (;;) { - service.run(); - if (going_down) { - break; - } - pauser.wait(); - } - }); - } - return 0; -} - -void AsioFrontend::stop() -{ - ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; - - going_down = true; - - boost::system::error_code ec; - acceptor.close(ec); // unblock the run() threads -} - -void AsioFrontend::join() -{ - if (!going_down) { - stop(); - } - ldout(ctx(), 4) << "frontend joining threads..." << dendl; - for (auto& thread : threads) { - thread.join(); - } - ldout(ctx(), 4) << "frontend done" << dendl; -} - -void AsioFrontend::pause() -{ - ldout(ctx(), 4) << "frontend pausing threads..." << dendl; - pauser.pause(threads.size(), [=] { - // stop accepting but leave the port open - boost::system::error_code ec; - acceptor.cancel(ec); - }); - ldout(ctx(), 4) << "frontend paused" << dendl; -} - -void AsioFrontend::unpause(RGWRados* const store, - rgw_auth_registry_ptr_t auth_registry) -{ - env.store = store; - env.auth_registry = std::move(auth_registry); - ldout(ctx(), 4) << "frontend unpaused" << dendl; - service.reset(); - acceptor.async_accept(peer_socket, - [this] (boost::system::error_code ec) { - return accept(ec); - }); - pauser.unpause(); -} - -} // anonymous namespace - -class RGWAsioFrontend::Impl : public AsioFrontend { - public: - Impl(const RGWProcessEnv& env) : AsioFrontend(env) {} -}; - -RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env) - : impl(new Impl(env)) -{ -} - -RGWAsioFrontend::~RGWAsioFrontend() = default; - -int RGWAsioFrontend::init() -{ - return impl->init(); -} - -int RGWAsioFrontend::run() -{ - return impl->run(); -} - -void RGWAsioFrontend::stop() -{ - impl->stop(); -} - -void RGWAsioFrontend::join() -{ - impl->join(); -} - -void RGWAsioFrontend::pause_for_new_config() -{ - impl->pause(); -} - -void RGWAsioFrontend::unpause_with_new_config( - RGWRados* const store, - rgw_auth_registry_ptr_t auth_registry -) { - impl->unpause(store, std::move(auth_registry)); -}