remove ceph code
[stor4nfv.git] / src / ceph / src / rgw / rgw_asio_frontend.cc
diff --git a/src/ceph/src/rgw/rgw_asio_frontend.cc b/src/ceph/src/rgw/rgw_asio_frontend.cc
deleted file mode 100644 (file)
index ee6be62..0000000
+++ /dev/null
@@ -1,315 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <condition_variable>
-#include <mutex>
-#include <thread>
-#include <vector>
-
-#include <boost/asio.hpp>
-#include <boost/asio/spawn.hpp>
-
-#include <beast/core/placeholders.hpp>
-#include <beast/http/read.hpp>
-#include <beast/http/string_body.hpp>
-#include <beast/http/write.hpp>
-
-#include "rgw_asio_frontend.h"
-#include "rgw_asio_client.h"
-
-#define dout_subsys ceph_subsys_rgw
-
-#undef dout_prefix
-#define dout_prefix (*_dout << "asio: ")
-
-namespace {
-
-class Pauser {
-  std::mutex mutex;
-  std::condition_variable cond_ready; // signaled on ready==true
-  std::condition_variable cond_paused; // signaled on waiters==thread_count
-  bool ready{false};
-  int waiters{0};
- public:
-  template <typename Func>
-  void pause(int thread_count, Func&& func);
-  void unpause();
-  void wait();
-};
-
-template <typename Func>
-void Pauser::pause(int thread_count, Func&& func)
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ready = false;
-  lock.unlock();
-
-  func();
-
-  // wait for all threads to pause
-  lock.lock();
-  cond_paused.wait(lock, [=] { return waiters == thread_count; });
-}
-
-void Pauser::unpause()
-{
-  std::lock_guard<std::mutex> lock(mutex);
-  ready = true;
-  cond_ready.notify_all();
-}
-
-void Pauser::wait()
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ++waiters;
-  cond_paused.notify_one(); // notify pause() that we're waiting
-  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
-  --waiters;
-}
-
-using tcp = boost::asio::ip::tcp;
-
-// coroutine to handle a client connection to completion
-static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
-                              boost::asio::yield_context yield)
-{
-  auto cct = env.store->ctx();
-  boost::system::error_code ec;
-
-  beast::flat_streambuf buffer{1024};
-
-  // read messages from the socket until eof
-  for (;;) {
-    // parse the header
-    rgw::asio::parser_type parser;
-    do {
-      auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
-      buffer.consume(bytes);
-    } while (!ec && !parser.got_header());
-
-    if (ec == boost::asio::error::connection_reset ||
-        ec == boost::asio::error::eof) {
-      return;
-    }
-    if (ec) {
-      auto& message = parser.get();
-      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
-      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
-      beast::http::response<beast::http::string_body> response;
-      response.status = 400;
-      response.reason = "Bad Request";
-      response.version = message.version == 10 ? 10 : 11;
-      beast::http::prepare(response);
-      beast::http::async_write(socket, std::move(response), yield[ec]);
-      // ignore ec
-      return;
-    }
-
-    // process the request
-    RGWRequest req{env.store->get_new_req_id()};
-
-    rgw::asio::ClientIO real_client{socket, parser, buffer};
-
-    auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
-                              rgw::io::add_chunking(
-                                rgw::io::add_conlen_controlling(
-                                  &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
-    process_request(env.store, env.rest, &req, env.uri_prefix,
-                    *env.auth_registry, &client, env.olog);
-
-    if (real_client.get_conn_close()) {
-      return;
-    }
-  }
-}
-
-class AsioFrontend {
-  RGWProcessEnv env;
-  boost::asio::io_service service;
-
-  tcp::acceptor acceptor;
-  tcp::socket peer_socket;
-
-  std::vector<std::thread> threads;
-  Pauser pauser;
-  std::atomic<bool> going_down{false};
-
-  CephContext* ctx() const { return env.store->ctx(); }
-
-  void accept(boost::system::error_code ec);
-
- public:
-  AsioFrontend(const RGWProcessEnv& env)
-    : env(env), acceptor(service), peer_socket(service) {}
-
-  int init();
-  int run();
-  void stop();
-  void join();
-  void pause();
-  void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
-};
-
-int AsioFrontend::init()
-{
-  auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
-  ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
-
-  boost::system::error_code ec;
-  acceptor.open(ep.protocol(), ec);
-  if (ec) {
-    lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
-    return -ec.value();
-  }
-  acceptor.set_option(tcp::acceptor::reuse_address(true));
-  acceptor.bind(ep, ec);
-  if (ec) {
-    lderr(ctx()) << "failed to bind address " << ep <<
-        ": " << ec.message() << dendl;
-    return -ec.value();
-  }
-  acceptor.listen(boost::asio::socket_base::max_connections);
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
-  return 0;
-}
-
-void AsioFrontend::accept(boost::system::error_code ec)
-{
-  if (!acceptor.is_open()) {
-    return;
-  } else if (ec == boost::asio::error::operation_aborted) {
-    return;
-  } else if (ec) {
-    throw ec;
-  }
-  auto socket = std::move(peer_socket);
-  // spawn a coroutine to handle the connection
-  boost::asio::spawn(service,
-                     [&] (boost::asio::yield_context yield) {
-                       handle_connection(env, std::move(socket), yield);
-                     });
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
-}
-
-int AsioFrontend::run()
-{
-  auto cct = ctx();
-  const int thread_count = cct->_conf->rgw_thread_pool_size;
-  threads.reserve(thread_count);
-
-  ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
-
-  for (int i = 0; i < thread_count; i++) {
-    threads.emplace_back([=] {
-      for (;;) {
-        service.run();
-        if (going_down) {
-          break;
-        }
-        pauser.wait();
-      }
-    });
-  }
-  return 0;
-}
-
-void AsioFrontend::stop()
-{
-  ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
-
-  going_down = true;
-
-  boost::system::error_code ec;
-  acceptor.close(ec); // unblock the run() threads
-}
-
-void AsioFrontend::join()
-{
-  if (!going_down) {
-    stop();
-  }
-  ldout(ctx(), 4) << "frontend joining threads..." << dendl;
-  for (auto& thread : threads) {
-    thread.join();
-  }
-  ldout(ctx(), 4) << "frontend done" << dendl;
-}
-
-void AsioFrontend::pause()
-{
-  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
-  pauser.pause(threads.size(), [=] {
-    // stop accepting but leave the port open
-    boost::system::error_code ec;
-    acceptor.cancel(ec);
-  });
-  ldout(ctx(), 4) << "frontend paused" << dendl;
-}
-
-void AsioFrontend::unpause(RGWRados* const store,
-                           rgw_auth_registry_ptr_t auth_registry)
-{
-  env.store = store;
-  env.auth_registry = std::move(auth_registry);
-  ldout(ctx(), 4) << "frontend unpaused" << dendl;
-  service.reset();
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
-  pauser.unpause();
-}
-
-} // anonymous namespace
-
-class RGWAsioFrontend::Impl : public AsioFrontend {
- public:
-  Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
-};
-
-RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
-  : impl(new Impl(env))
-{
-}
-
-RGWAsioFrontend::~RGWAsioFrontend() = default;
-
-int RGWAsioFrontend::init()
-{
-  return impl->init();
-}
-
-int RGWAsioFrontend::run()
-{
-  return impl->run();
-}
-
-void RGWAsioFrontend::stop()
-{
-  impl->stop();
-}
-
-void RGWAsioFrontend::join()
-{
-  impl->join();
-}
-
-void RGWAsioFrontend::pause_for_new_config()
-{
-  impl->pause();
-}
-
-void RGWAsioFrontend::unpause_with_new_config(
-  RGWRados* const store,
-  rgw_auth_registry_ptr_t auth_registry
-) {
-  impl->unpause(store, std::move(auth_registry));
-}