// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "common/debug.h" #include "common/errno.h" #include "Driver.h" #include "Server.h" #include "Request.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd #undef dout_prefix #define dout_prefix *_dout << "rbd::ggate::Server: " << this \ << " " << __func__ << ": " namespace rbd { namespace ggate { Server::Server(Driver *drv, librbd::Image& image) : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"), m_reader_thread(this, &Server::reader_entry), m_writer_thread(this, &Server::writer_entry) { } void Server::run() { dout(10) << dendl; int r = start(); assert(r == 0); dout(20) << "entering run loop" << dendl; { Mutex::Locker locker(m_lock); while (!m_stopping) { m_cond.WaitInterval(m_lock, utime_t(1, 0)); } } dout(20) << "exiting run loop" << dendl; stop(); } int Server::start() { dout(10) << dendl; m_reader_thread.create("rbd_reader"); m_writer_thread.create("rbd_writer"); return 0; } void Server::stop() { dout(10) << dendl; { Mutex::Locker locker(m_lock); assert(m_stopping); } m_reader_thread.join(); m_writer_thread.join(); wait_clean(); } void Server::io_start(IOContext *ctx) { dout(20) << ctx << dendl; Mutex::Locker locker(m_lock); m_io_pending.push_back(&ctx->item); } void Server::io_finish(IOContext *ctx) { dout(20) << ctx << dendl; Mutex::Locker locker(m_lock); assert(ctx->item.is_on_list()); ctx->item.remove_myself(); m_io_finished.push_back(&ctx->item); m_cond.Signal(); } Server::IOContext *Server::wait_io_finish() { dout(20) << dendl; Mutex::Locker locker(m_lock); while (m_io_finished.empty() && !m_stopping) { m_cond.Wait(m_lock); } if (m_io_finished.empty()) { return nullptr; } IOContext *ret = m_io_finished.front(); m_io_finished.pop_front(); return ret; } void Server::wait_clean() { dout(20) << dendl; assert(!m_reader_thread.is_started()); Mutex::Locker locker(m_lock); while (!m_io_pending.empty()) { m_cond.Wait(m_lock); } while (!m_io_finished.empty()) { ceph::unique_ptr free_ctx(m_io_finished.front()); m_io_finished.pop_front(); } } void Server::aio_callback(librbd::completion_t cb, void *arg) { librbd::RBD::AioCompletion *aio_completion = reinterpret_cast(cb); IOContext *ctx = reinterpret_cast(arg); int r = aio_completion->get_return_value(); ctx->server->handle_aio(ctx, r); aio_completion->release(); } void Server::handle_aio(IOContext *ctx, int r) { dout(20) << ctx << ": r=" << r << dendl; if (r == -EINVAL) { // if shrinking an image, a pagecache writeback might reference // extents outside of the range of the new image extents dout(5) << "masking IO out-of-bounds error" << dendl; ctx->req->bl.clear(); r = 0; } if (r < 0) { ctx->req->set_error(-r); } else if ((ctx->req->get_cmd() == Request::Read) && r != static_cast(ctx->req->get_length())) { int pad_byte_count = static_cast (ctx->req->get_length()) - r; ctx->req->bl.append_zero(pad_byte_count); dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl; ctx->req->set_error(0); } else { ctx->req->set_error(0); } io_finish(ctx); } void Server::reader_entry() { dout(20) << dendl; while (!m_stopping) { ceph::unique_ptr ctx(new IOContext(this)); dout(20) << "waiting for ggate request" << dendl; int r = m_drv->recv(&ctx->req); if (r < 0) { if (r != -ECANCELED) { derr << "recv: " << cpp_strerror(r) << dendl; } Mutex::Locker locker(m_lock); m_stopping = true; m_cond.Signal(); return; } IOContext *pctx = ctx.release(); dout(20) << pctx << ": start: " << *pctx << dendl; io_start(pctx); librbd::RBD::AioCompletion *c = new librbd::RBD::AioCompletion(pctx, aio_callback); switch (pctx->req->get_cmd()) { case rbd::ggate::Request::Write: m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(), pctx->req->bl, c); break; case rbd::ggate::Request::Read: m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(), pctx->req->bl, c); break; case rbd::ggate::Request::Flush: m_image.aio_flush(c); break; case rbd::ggate::Request::Discard: m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c); break; default: derr << pctx << ": invalid request command: " << pctx->req->get_cmd() << dendl; c->release(); Mutex::Locker locker(m_lock); m_stopping = true; m_cond.Signal(); return; } } dout(20) << "terminated" << dendl; } void Server::writer_entry() { dout(20) << dendl; while (!m_stopping) { dout(20) << "waiting for io request" << dendl; ceph::unique_ptr ctx(wait_io_finish()); if (!ctx) { dout(20) << "no io requests, terminating" << dendl; return; } dout(20) << ctx.get() << ": got: " << *ctx << dendl; int r = m_drv->send(ctx->req); if (r < 0) { derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl; Mutex::Locker locker(m_lock); m_stopping = true; m_cond.Signal(); return; } dout(20) << ctx.get() << " finish" << dendl; } dout(20) << "terminated" << dendl; } std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) { os << "[" << ctx.req->get_id(); switch (ctx.req->get_cmd()) { case rbd::ggate::Request::Write: os << " Write "; break; case rbd::ggate::Request::Read: os << " Read "; break; case rbd::ggate::Request::Flush: os << " Flush "; break; case rbd::ggate::Request::Discard: os << " Discard "; break; default: os << " Unknow(" << ctx.req->get_cmd() << ") "; break; } os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " " << ctx.req->get_error() << "]"; return os; } } // namespace ggate } // namespace rbd