X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Flibrbd%2Fio%2FAioCompletion.cc;fp=src%2Fceph%2Fsrc%2Flibrbd%2Fio%2FAioCompletion.cc;h=1dc6280959c2e261c7505374088da5bc98adec52;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/librbd/io/AioCompletion.cc b/src/ceph/src/librbd/io/AioCompletion.cc new file mode 100644 index 0000000..1dc6280 --- /dev/null +++ b/src/ceph/src/librbd/io/AioCompletion.cc @@ -0,0 +1,211 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/AioCompletion.h" +#include + +#include "common/ceph_context.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "common/WorkQueue.h" + +#include "librbd/ImageCtx.h" +#include "librbd/internal.h" + +#include "librbd/Journal.h" + +#ifdef WITH_LTTNG +#include "tracing/librbd.h" +#else +#define tracepoint(...) +#endif + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +int AioCompletion::wait_for_complete() { + tracepoint(librbd, aio_wait_for_complete_enter, this); + lock.Lock(); + while (state != AIO_STATE_COMPLETE) + cond.Wait(lock); + lock.Unlock(); + tracepoint(librbd, aio_wait_for_complete_exit, 0); + return 0; +} + +void AioCompletion::finalize(ssize_t rval) +{ + assert(lock.is_locked()); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + ldout(cct, 20) << "r=" << rval << dendl; + if (rval >= 0 && aio_type == AIO_TYPE_READ) { + read_result.assemble_result(cct); + } +} + +void AioCompletion::complete() { + assert(lock.is_locked()); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + tracepoint(librbd, aio_complete_enter, this, rval); + utime_t elapsed; + elapsed = ceph_clock_now() - start_time; + switch (aio_type) { + case AIO_TYPE_GENERIC: + case AIO_TYPE_OPEN: + case AIO_TYPE_CLOSE: + break; + case AIO_TYPE_READ: + ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break; + case AIO_TYPE_WRITE: + ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break; + case AIO_TYPE_DISCARD: + ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break; + case AIO_TYPE_FLUSH: + ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break; + case AIO_TYPE_WRITESAME: + ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break; + case AIO_TYPE_COMPARE_AND_WRITE: + ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break; + default: + lderr(cct) << "completed invalid aio_type: " << aio_type << dendl; + break; + } + + // inform the journal that the op has successfully committed + if (journal_tid != 0) { + assert(ictx->journal != NULL); + ictx->journal->commit_io_event(journal_tid, rval); + } + + state = AIO_STATE_CALLBACK; + if (complete_cb) { + lock.Unlock(); + complete_cb(rbd_comp, complete_arg); + lock.Lock(); + } + + if (ictx && event_notify && ictx->event_socket.is_valid()) { + ictx->completed_reqs_lock.Lock(); + ictx->completed_reqs.push_back(&m_xlist_item); + ictx->completed_reqs_lock.Unlock(); + ictx->event_socket.notify(); + } + + state = AIO_STATE_COMPLETE; + cond.Signal(); + + // note: possible for image to be closed after op marked finished + if (async_op.started()) { + async_op.finish_op(); + } + tracepoint(librbd, aio_complete_exit); +} + +void AioCompletion::init_time(ImageCtx *i, aio_type_t t) { + Mutex::Locker locker(lock); + if (ictx == nullptr) { + ictx = i; + aio_type = t; + start_time = ceph_clock_now(); + } +} + +void AioCompletion::start_op(bool ignore_type) { + Mutex::Locker locker(lock); + assert(ictx != nullptr); + assert(!async_op.started()); + if (state == AIO_STATE_PENDING && + (ignore_type || aio_type != AIO_TYPE_FLUSH)) { + async_op.start_op(*ictx); + } +} + +void AioCompletion::fail(int r) +{ + lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + lderr(cct) << cpp_strerror(r) << dendl; + assert(pending_count == 0); + rval = r; + complete(); + put_unlock(); +} + +void AioCompletion::set_request_count(uint32_t count) { + lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + ldout(cct, 20) << "pending=" << count << dendl; + assert(pending_count == 0); + pending_count = count; + lock.Unlock(); + + // if no pending requests, completion will fire now + unblock(); +} + +void AioCompletion::complete_request(ssize_t r) +{ + lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + if (rval >= 0) { + if (r < 0 && r != -EEXIST) + rval = r; + else if (r > 0) + rval += r; + } + assert(pending_count); + int count = --pending_count; + + ldout(cct, 20) << "cb=" << complete_cb << ", " + << "pending=" << pending_count << dendl; + if (!count && blockers == 0) { + finalize(rval); + complete(); + } + put_unlock(); +} + +void AioCompletion::associate_journal_event(uint64_t tid) { + Mutex::Locker l(lock); + assert(state == AIO_STATE_PENDING); + journal_tid = tid; +} + +bool AioCompletion::is_complete() { + tracepoint(librbd, aio_is_complete_enter, this); + bool done; + { + Mutex::Locker l(lock); + done = this->state == AIO_STATE_COMPLETE; + } + tracepoint(librbd, aio_is_complete_exit, done); + return done; +} + +ssize_t AioCompletion::get_return_value() { + tracepoint(librbd, aio_get_return_value_enter, this); + lock.Lock(); + ssize_t r = rval; + lock.Unlock(); + tracepoint(librbd, aio_get_return_value_exit, r); + return r; +} + +} // namespace io +} // namespace librbd