initial code repo
[stor4nfv.git] / src / ceph / src / rgw / rgw_asio_frontend.cc
diff --git a/src/ceph/src/rgw/rgw_asio_frontend.cc b/src/ceph/src/rgw/rgw_asio_frontend.cc
new file mode 100644 (file)
index 0000000..ee6be62
--- /dev/null
@@ -0,0 +1,315 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <boost/asio.hpp>
+#include <boost/asio/spawn.hpp>
+
+#include <beast/core/placeholders.hpp>
+#include <beast/http/read.hpp>
+#include <beast/http/string_body.hpp>
+#include <beast/http/write.hpp>
+
+#include "rgw_asio_frontend.h"
+#include "rgw_asio_client.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "asio: ")
+
+namespace {
+
+class Pauser {
+  std::mutex mutex;
+  std::condition_variable cond_ready; // signaled on ready==true
+  std::condition_variable cond_paused; // signaled on waiters==thread_count
+  bool ready{false};
+  int waiters{0};
+ public:
+  template <typename Func>
+  void pause(int thread_count, Func&& func);
+  void unpause();
+  void wait();
+};
+
+template <typename Func>
+void Pauser::pause(int thread_count, Func&& func)
+{
+  std::unique_lock<std::mutex> lock(mutex);
+  ready = false;
+  lock.unlock();
+
+  func();
+
+  // wait for all threads to pause
+  lock.lock();
+  cond_paused.wait(lock, [=] { return waiters == thread_count; });
+}
+
+void Pauser::unpause()
+{
+  std::lock_guard<std::mutex> lock(mutex);
+  ready = true;
+  cond_ready.notify_all();
+}
+
+void Pauser::wait()
+{
+  std::unique_lock<std::mutex> lock(mutex);
+  ++waiters;
+  cond_paused.notify_one(); // notify pause() that we're waiting
+  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
+  --waiters;
+}
+
+using tcp = boost::asio::ip::tcp;
+
+// coroutine to handle a client connection to completion
+static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
+                              boost::asio::yield_context yield)
+{
+  auto cct = env.store->ctx();
+  boost::system::error_code ec;
+
+  beast::flat_streambuf buffer{1024};
+
+  // read messages from the socket until eof
+  for (;;) {
+    // parse the header
+    rgw::asio::parser_type parser;
+    do {
+      auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
+      buffer.consume(bytes);
+    } while (!ec && !parser.got_header());
+
+    if (ec == boost::asio::error::connection_reset ||
+        ec == boost::asio::error::eof) {
+      return;
+    }
+    if (ec) {
+      auto& message = parser.get();
+      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
+      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
+      beast::http::response<beast::http::string_body> response;
+      response.status = 400;
+      response.reason = "Bad Request";
+      response.version = message.version == 10 ? 10 : 11;
+      beast::http::prepare(response);
+      beast::http::async_write(socket, std::move(response), yield[ec]);
+      // ignore ec
+      return;
+    }
+
+    // process the request
+    RGWRequest req{env.store->get_new_req_id()};
+
+    rgw::asio::ClientIO real_client{socket, parser, buffer};
+
+    auto real_client_io = rgw::io::add_reordering(
+                            rgw::io::add_buffering(cct,
+                              rgw::io::add_chunking(
+                                rgw::io::add_conlen_controlling(
+                                  &real_client))));
+    RGWRestfulIO client(cct, &real_client_io);
+    process_request(env.store, env.rest, &req, env.uri_prefix,
+                    *env.auth_registry, &client, env.olog);
+
+    if (real_client.get_conn_close()) {
+      return;
+    }
+  }
+}
+
+class AsioFrontend {
+  RGWProcessEnv env;
+  boost::asio::io_service service;
+
+  tcp::acceptor acceptor;
+  tcp::socket peer_socket;
+
+  std::vector<std::thread> threads;
+  Pauser pauser;
+  std::atomic<bool> going_down{false};
+
+  CephContext* ctx() const { return env.store->ctx(); }
+
+  void accept(boost::system::error_code ec);
+
+ public:
+  AsioFrontend(const RGWProcessEnv& env)
+    : env(env), acceptor(service), peer_socket(service) {}
+
+  int init();
+  int run();
+  void stop();
+  void join();
+  void pause();
+  void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
+};
+
+int AsioFrontend::init()
+{
+  auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
+  ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
+
+  boost::system::error_code ec;
+  acceptor.open(ep.protocol(), ec);
+  if (ec) {
+    lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
+    return -ec.value();
+  }
+  acceptor.set_option(tcp::acceptor::reuse_address(true));
+  acceptor.bind(ep, ec);
+  if (ec) {
+    lderr(ctx()) << "failed to bind address " << ep <<
+        ": " << ec.message() << dendl;
+    return -ec.value();
+  }
+  acceptor.listen(boost::asio::socket_base::max_connections);
+  acceptor.async_accept(peer_socket,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
+  return 0;
+}
+
+void AsioFrontend::accept(boost::system::error_code ec)
+{
+  if (!acceptor.is_open()) {
+    return;
+  } else if (ec == boost::asio::error::operation_aborted) {
+    return;
+  } else if (ec) {
+    throw ec;
+  }
+  auto socket = std::move(peer_socket);
+  // spawn a coroutine to handle the connection
+  boost::asio::spawn(service,
+                     [&] (boost::asio::yield_context yield) {
+                       handle_connection(env, std::move(socket), yield);
+                     });
+  acceptor.async_accept(peer_socket,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
+}
+
+int AsioFrontend::run()
+{
+  auto cct = ctx();
+  const int thread_count = cct->_conf->rgw_thread_pool_size;
+  threads.reserve(thread_count);
+
+  ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
+
+  for (int i = 0; i < thread_count; i++) {
+    threads.emplace_back([=] {
+      for (;;) {
+        service.run();
+        if (going_down) {
+          break;
+        }
+        pauser.wait();
+      }
+    });
+  }
+  return 0;
+}
+
+void AsioFrontend::stop()
+{
+  ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
+
+  going_down = true;
+
+  boost::system::error_code ec;
+  acceptor.close(ec); // unblock the run() threads
+}
+
+void AsioFrontend::join()
+{
+  if (!going_down) {
+    stop();
+  }
+  ldout(ctx(), 4) << "frontend joining threads..." << dendl;
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  ldout(ctx(), 4) << "frontend done" << dendl;
+}
+
+void AsioFrontend::pause()
+{
+  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
+  pauser.pause(threads.size(), [=] {
+    // stop accepting but leave the port open
+    boost::system::error_code ec;
+    acceptor.cancel(ec);
+  });
+  ldout(ctx(), 4) << "frontend paused" << dendl;
+}
+
+void AsioFrontend::unpause(RGWRados* const store,
+                           rgw_auth_registry_ptr_t auth_registry)
+{
+  env.store = store;
+  env.auth_registry = std::move(auth_registry);
+  ldout(ctx(), 4) << "frontend unpaused" << dendl;
+  service.reset();
+  acceptor.async_accept(peer_socket,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
+  pauser.unpause();
+}
+
+} // anonymous namespace
+
+class RGWAsioFrontend::Impl : public AsioFrontend {
+ public:
+  Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
+};
+
+RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
+  : impl(new Impl(env))
+{
+}
+
+RGWAsioFrontend::~RGWAsioFrontend() = default;
+
+int RGWAsioFrontend::init()
+{
+  return impl->init();
+}
+
+int RGWAsioFrontend::run()
+{
+  return impl->run();
+}
+
+void RGWAsioFrontend::stop()
+{
+  impl->stop();
+}
+
+void RGWAsioFrontend::join()
+{
+  impl->join();
+}
+
+void RGWAsioFrontend::pause_for_new_config()
+{
+  impl->pause();
+}
+
+void RGWAsioFrontend::unpause_with_new_config(
+  RGWRados* const store,
+  rgw_auth_registry_ptr_t auth_registry
+) {
+  impl->unpause(store, std::move(auth_registry));
+}