X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_ggate%2FServer.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_ggate%2FServer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=6fde848dbc2bb41d6cf634ff748f8f53bd3d416c;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_ggate/Server.cc b/src/ceph/src/tools/rbd_ggate/Server.cc deleted file mode 100644 index 6fde848..0000000 --- a/src/ceph/src/tools/rbd_ggate/Server.cc +++ /dev/null @@ -1,270 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "common/debug.h" -#include "common/errno.h" -#include "Driver.h" -#include "Server.h" -#include "Request.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd -#undef dout_prefix -#define dout_prefix *_dout << "rbd::ggate::Server: " << this \ - << " " << __func__ << ": " - -namespace rbd { -namespace ggate { - -Server::Server(Driver *drv, librbd::Image& image) - : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"), - m_reader_thread(this, &Server::reader_entry), - m_writer_thread(this, &Server::writer_entry) { -} - -void Server::run() { - dout(10) << dendl; - - int r = start(); - assert(r == 0); - - dout(20) << "entering run loop" << dendl; - - { - Mutex::Locker locker(m_lock); - while (!m_stopping) { - m_cond.WaitInterval(m_lock, utime_t(1, 0)); - } - } - - dout(20) << "exiting run loop" << dendl; - - stop(); -} - -int Server::start() { - dout(10) << dendl; - - m_reader_thread.create("rbd_reader"); - m_writer_thread.create("rbd_writer"); - return 0; -} - -void Server::stop() { - dout(10) << dendl; - - { - Mutex::Locker locker(m_lock); - assert(m_stopping); - } - - m_reader_thread.join(); - m_writer_thread.join(); - - wait_clean(); -} - -void Server::io_start(IOContext *ctx) { - dout(20) << ctx << dendl; - - Mutex::Locker locker(m_lock); - m_io_pending.push_back(&ctx->item); -} - -void Server::io_finish(IOContext *ctx) { - dout(20) << ctx << dendl; - - Mutex::Locker locker(m_lock); - assert(ctx->item.is_on_list()); - - ctx->item.remove_myself(); - m_io_finished.push_back(&ctx->item); - m_cond.Signal(); -} - -Server::IOContext *Server::wait_io_finish() { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - while (m_io_finished.empty() && !m_stopping) { - m_cond.Wait(m_lock); - } - - if (m_io_finished.empty()) { - return nullptr; - } - - IOContext *ret = m_io_finished.front(); - m_io_finished.pop_front(); - - return ret; -} - -void Server::wait_clean() { - dout(20) << dendl; - - assert(!m_reader_thread.is_started()); - - Mutex::Locker locker(m_lock); - - while (!m_io_pending.empty()) { - m_cond.Wait(m_lock); - } - - while (!m_io_finished.empty()) { - ceph::unique_ptr free_ctx(m_io_finished.front()); - m_io_finished.pop_front(); - } -} - -void Server::aio_callback(librbd::completion_t cb, void *arg) { - librbd::RBD::AioCompletion *aio_completion = - reinterpret_cast(cb); - - IOContext *ctx = reinterpret_cast(arg); - int r = aio_completion->get_return_value(); - - ctx->server->handle_aio(ctx, r); - aio_completion->release(); -} - -void Server::handle_aio(IOContext *ctx, int r) { - dout(20) << ctx << ": r=" << r << dendl; - - if (r == -EINVAL) { - // if shrinking an image, a pagecache writeback might reference - // extents outside of the range of the new image extents - dout(5) << "masking IO out-of-bounds error" << dendl; - ctx->req->bl.clear(); - r = 0; - } - - if (r < 0) { - ctx->req->set_error(-r); - } else if ((ctx->req->get_cmd() == Request::Read) && - r != static_cast(ctx->req->get_length())) { - int pad_byte_count = static_cast (ctx->req->get_length()) - r; - ctx->req->bl.append_zero(pad_byte_count); - dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl; - ctx->req->set_error(0); - } else { - ctx->req->set_error(0); - } - io_finish(ctx); -} - -void Server::reader_entry() { - dout(20) << dendl; - - while (!m_stopping) { - ceph::unique_ptr ctx(new IOContext(this)); - - dout(20) << "waiting for ggate request" << dendl; - - int r = m_drv->recv(&ctx->req); - if (r < 0) { - if (r != -ECANCELED) { - derr << "recv: " << cpp_strerror(r) << dendl; - } - Mutex::Locker locker(m_lock); - m_stopping = true; - m_cond.Signal(); - return; - } - - IOContext *pctx = ctx.release(); - - dout(20) << pctx << ": start: " << *pctx << dendl; - - io_start(pctx); - librbd::RBD::AioCompletion *c = - new librbd::RBD::AioCompletion(pctx, aio_callback); - switch (pctx->req->get_cmd()) - { - case rbd::ggate::Request::Write: - m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(), - pctx->req->bl, c); - break; - case rbd::ggate::Request::Read: - m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(), - pctx->req->bl, c); - break; - case rbd::ggate::Request::Flush: - m_image.aio_flush(c); - break; - case rbd::ggate::Request::Discard: - m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c); - break; - default: - derr << pctx << ": invalid request command: " << pctx->req->get_cmd() - << dendl; - c->release(); - Mutex::Locker locker(m_lock); - m_stopping = true; - m_cond.Signal(); - return; - } - } - dout(20) << "terminated" << dendl; -} - -void Server::writer_entry() { - dout(20) << dendl; - - while (!m_stopping) { - dout(20) << "waiting for io request" << dendl; - - ceph::unique_ptr ctx(wait_io_finish()); - if (!ctx) { - dout(20) << "no io requests, terminating" << dendl; - return; - } - - dout(20) << ctx.get() << ": got: " << *ctx << dendl; - - int r = m_drv->send(ctx->req); - if (r < 0) { - derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl; - Mutex::Locker locker(m_lock); - m_stopping = true; - m_cond.Signal(); - return; - } - dout(20) << ctx.get() << " finish" << dendl; - } - dout(20) << "terminated" << dendl; -} - -std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) { - - os << "[" << ctx.req->get_id(); - - switch (ctx.req->get_cmd()) - { - case rbd::ggate::Request::Write: - os << " Write "; - break; - case rbd::ggate::Request::Read: - os << " Read "; - break; - case rbd::ggate::Request::Flush: - os << " Flush "; - break; - case rbd::ggate::Request::Discard: - os << " Discard "; - break; - default: - os << " Unknow(" << ctx.req->get_cmd() << ") "; - break; - } - - os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " " - << ctx.req->get_error() << "]"; - - return os; -} - -} // namespace ggate -} // namespace rbd -