1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/debug.h"
5 #include "common/errno.h"
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rbd
13 #define dout_prefix *_dout << "rbd::ggate::Server: " << this \
14 << " " << __func__ << ": "
19 Server::Server(Driver *drv, librbd::Image& image)
20 : m_drv(drv), m_image(image), m_lock("rbd::ggate::Server::m_lock"),
21 m_reader_thread(this, &Server::reader_entry),
22 m_writer_thread(this, &Server::writer_entry) {
31 dout(20) << "entering run loop" << dendl;
34 Mutex::Locker locker(m_lock);
36 m_cond.WaitInterval(m_lock, utime_t(1, 0));
40 dout(20) << "exiting run loop" << dendl;
48 m_reader_thread.create("rbd_reader");
49 m_writer_thread.create("rbd_writer");
57 Mutex::Locker locker(m_lock);
61 m_reader_thread.join();
62 m_writer_thread.join();
67 void Server::io_start(IOContext *ctx) {
68 dout(20) << ctx << dendl;
70 Mutex::Locker locker(m_lock);
71 m_io_pending.push_back(&ctx->item);
74 void Server::io_finish(IOContext *ctx) {
75 dout(20) << ctx << dendl;
77 Mutex::Locker locker(m_lock);
78 assert(ctx->item.is_on_list());
80 ctx->item.remove_myself();
81 m_io_finished.push_back(&ctx->item);
85 Server::IOContext *Server::wait_io_finish() {
88 Mutex::Locker locker(m_lock);
90 while (m_io_finished.empty() && !m_stopping) {
94 if (m_io_finished.empty()) {
98 IOContext *ret = m_io_finished.front();
99 m_io_finished.pop_front();
104 void Server::wait_clean() {
107 assert(!m_reader_thread.is_started());
109 Mutex::Locker locker(m_lock);
111 while (!m_io_pending.empty()) {
115 while (!m_io_finished.empty()) {
116 ceph::unique_ptr<IOContext> free_ctx(m_io_finished.front());
117 m_io_finished.pop_front();
121 void Server::aio_callback(librbd::completion_t cb, void *arg) {
122 librbd::RBD::AioCompletion *aio_completion =
123 reinterpret_cast<librbd::RBD::AioCompletion*>(cb);
125 IOContext *ctx = reinterpret_cast<IOContext *>(arg);
126 int r = aio_completion->get_return_value();
128 ctx->server->handle_aio(ctx, r);
129 aio_completion->release();
132 void Server::handle_aio(IOContext *ctx, int r) {
133 dout(20) << ctx << ": r=" << r << dendl;
136 // if shrinking an image, a pagecache writeback might reference
137 // extents outside of the range of the new image extents
138 dout(5) << "masking IO out-of-bounds error" << dendl;
139 ctx->req->bl.clear();
144 ctx->req->set_error(-r);
145 } else if ((ctx->req->get_cmd() == Request::Read) &&
146 r != static_cast<int>(ctx->req->get_length())) {
147 int pad_byte_count = static_cast<int> (ctx->req->get_length()) - r;
148 ctx->req->bl.append_zero(pad_byte_count);
149 dout(20) << ctx << ": pad byte count: " << pad_byte_count << dendl;
150 ctx->req->set_error(0);
152 ctx->req->set_error(0);
157 void Server::reader_entry() {
160 while (!m_stopping) {
161 ceph::unique_ptr<IOContext> ctx(new IOContext(this));
163 dout(20) << "waiting for ggate request" << dendl;
165 int r = m_drv->recv(&ctx->req);
167 if (r != -ECANCELED) {
168 derr << "recv: " << cpp_strerror(r) << dendl;
170 Mutex::Locker locker(m_lock);
176 IOContext *pctx = ctx.release();
178 dout(20) << pctx << ": start: " << *pctx << dendl;
181 librbd::RBD::AioCompletion *c =
182 new librbd::RBD::AioCompletion(pctx, aio_callback);
183 switch (pctx->req->get_cmd())
185 case rbd::ggate::Request::Write:
186 m_image.aio_write(pctx->req->get_offset(), pctx->req->get_length(),
189 case rbd::ggate::Request::Read:
190 m_image.aio_read(pctx->req->get_offset(), pctx->req->get_length(),
193 case rbd::ggate::Request::Flush:
194 m_image.aio_flush(c);
196 case rbd::ggate::Request::Discard:
197 m_image.aio_discard(pctx->req->get_offset(), pctx->req->get_length(), c);
200 derr << pctx << ": invalid request command: " << pctx->req->get_cmd()
203 Mutex::Locker locker(m_lock);
209 dout(20) << "terminated" << dendl;
212 void Server::writer_entry() {
215 while (!m_stopping) {
216 dout(20) << "waiting for io request" << dendl;
218 ceph::unique_ptr<IOContext> ctx(wait_io_finish());
220 dout(20) << "no io requests, terminating" << dendl;
224 dout(20) << ctx.get() << ": got: " << *ctx << dendl;
226 int r = m_drv->send(ctx->req);
228 derr << ctx.get() << ": send: " << cpp_strerror(r) << dendl;
229 Mutex::Locker locker(m_lock);
234 dout(20) << ctx.get() << " finish" << dendl;
236 dout(20) << "terminated" << dendl;
239 std::ostream &operator<<(std::ostream &os, const Server::IOContext &ctx) {
241 os << "[" << ctx.req->get_id();
243 switch (ctx.req->get_cmd())
245 case rbd::ggate::Request::Write:
248 case rbd::ggate::Request::Read:
251 case rbd::ggate::Request::Flush:
254 case rbd::ggate::Request::Discard:
258 os << " Unknow(" << ctx.req->get_cmd() << ") ";
262 os << ctx.req->get_offset() << "~" << ctx.req->get_length() << " "
263 << ctx.req->get_error() << "]";