X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Flibrbd%2FAsyncObjectThrottle.cc;fp=src%2Fceph%2Fsrc%2Flibrbd%2FAsyncObjectThrottle.cc;h=1f09091e8f5e49573a5baeb13ebd3bf9a164fb28;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/librbd/AsyncObjectThrottle.cc b/src/ceph/src/librbd/AsyncObjectThrottle.cc new file mode 100644 index 0000000..1f09091 --- /dev/null +++ b/src/ceph/src/librbd/AsyncObjectThrottle.cc @@ -0,0 +1,105 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "librbd/AsyncObjectThrottle.h" +#include "common/RWLock.h" +#include "common/WorkQueue.h" +#include "librbd/AsyncRequest.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" + +namespace librbd +{ + +template +AsyncObjectThrottle::AsyncObjectThrottle( + const AsyncRequest* async_request, T &image_ctx, + const ContextFactory& context_factory, Context *ctx, + ProgressContext *prog_ctx, uint64_t object_no, uint64_t end_object_no) + : m_lock(util::unique_lock_name("librbd::AsyncThrottle::m_lock", this)), + m_async_request(async_request), m_image_ctx(image_ctx), + m_context_factory(context_factory), m_ctx(ctx), m_prog_ctx(prog_ctx), + m_object_no(object_no), m_end_object_no(end_object_no), m_current_ops(0), + m_ret(0) +{ +} + +template +void AsyncObjectThrottle::start_ops(uint64_t max_concurrent) { + assert(m_image_ctx.owner_lock.is_locked()); + bool complete; + { + Mutex::Locker l(m_lock); + for (uint64_t i = 0; i < max_concurrent; ++i) { + start_next_op(); + if (m_ret < 0 && m_current_ops == 0) { + break; + } + } + complete = (m_current_ops == 0); + } + if (complete) { + // avoid re-entrant callback + m_image_ctx.op_work_queue->queue(m_ctx, m_ret); + delete this; + } +} + +template +void AsyncObjectThrottle::finish_op(int r) { + bool complete; + { + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + Mutex::Locker locker(m_lock); + --m_current_ops; + if (r < 0 && r != -ENOENT && m_ret == 0) { + m_ret = r; + } + + start_next_op(); + complete = (m_current_ops == 0); + } + if (complete) { + m_ctx->complete(m_ret); + delete this; + } +} + +template +void AsyncObjectThrottle::start_next_op() { + bool done = false; + while (!done) { + if (m_async_request != NULL && m_async_request->is_canceled() && + m_ret == 0) { + // allow in-flight ops to complete, but don't start new ops + m_ret = -ERESTART; + return; + } else if (m_ret != 0 || m_object_no >= m_end_object_no) { + return; + } + + uint64_t ono = m_object_no++; + C_AsyncObjectThrottle *ctx = m_context_factory(*this, ono); + + int r = ctx->send(); + if (r < 0) { + m_ret = r; + delete ctx; + return; + } else if (r > 0) { + // op completed immediately + delete ctx; + } else { + ++m_current_ops; + done = true; + } + if (m_prog_ctx != NULL) { + m_prog_ctx->update_progress(ono, m_end_object_no); + } + } +} + +} // namespace librbd + +#ifndef TEST_F +template class librbd::AsyncObjectThrottle; +#endif