X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FFutureImpl.cc;fp=src%2Fceph%2Fsrc%2Fjournal%2FFutureImpl.cc;h=e46a3c9cbf3723b04603e2425148f294d8304a82;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/journal/FutureImpl.cc b/src/ceph/src/journal/FutureImpl.cc new file mode 100644 index 0000000..e46a3c9 --- /dev/null +++ b/src/ceph/src/journal/FutureImpl.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/FutureImpl.h" +#include "journal/Utils.h" + +namespace journal { + +FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid, + uint64_t commit_tid) + : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid), + m_commit_tid(commit_tid), + m_lock("FutureImpl::m_lock", false, false), m_safe(false), + m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE), + m_consistent_ack(this) { +} + +void FutureImpl::init(const FutureImplPtr &prev_future) { + // chain ourself to the prior future (if any) to that we known when the + // journal is consistent + if (prev_future) { + m_prev_future = prev_future; + m_prev_future->wait(&m_consistent_ack); + } else { + m_consistent_ack.complete(0); + } +} + +void FutureImpl::flush(Context *on_safe) { + + bool complete; + FlushHandlers flush_handlers; + FutureImplPtr prev_future; + { + Mutex::Locker locker(m_lock); + complete = (m_safe && m_consistent); + if (!complete) { + if (on_safe != nullptr) { + m_contexts.push_back(on_safe); + } + + prev_future = prepare_flush(&flush_handlers, m_lock); + } + } + + // instruct prior futures to flush as well + while (prev_future) { + prev_future = prev_future->prepare_flush(&flush_handlers); + } + + if (complete && on_safe != NULL) { + on_safe->complete(m_return_value); + } else if (!flush_handlers.empty()) { + // attached to journal object -- instruct it to flush all entries through + // this one. possible to become detached while lock is released, so flush + // will be re-requested by the object if it doesn't own the future + for (auto &pair : flush_handlers) { + pair.first->flush(pair.second); + } + } +} + +FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) { + Mutex::Locker locker(m_lock); + return prepare_flush(flush_handlers, m_lock); +} + +FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers, + Mutex &lock) { + assert(m_lock.is_locked()); + + if (m_flush_state == FLUSH_STATE_NONE) { + m_flush_state = FLUSH_STATE_REQUESTED; + + if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) { + flush_handlers->insert({m_flush_handler, this}); + } + } + return m_prev_future; +} + +void FutureImpl::wait(Context *on_safe) { + assert(on_safe != NULL); + { + Mutex::Locker locker(m_lock); + if (!m_safe || !m_consistent) { + m_contexts.push_back(on_safe); + return; + } + } + + on_safe->complete(m_return_value); +} + +bool FutureImpl::is_complete() const { + Mutex::Locker locker(m_lock); + return m_safe && m_consistent; +} + +int FutureImpl::get_return_value() const { + Mutex::Locker locker(m_lock); + assert(m_safe && m_consistent); + return m_return_value; +} + +bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) { + Mutex::Locker locker(m_lock); + assert(!m_flush_handler); + m_flush_handler = flush_handler; + return m_flush_state != FLUSH_STATE_NONE; +} + +void FutureImpl::safe(int r) { + m_lock.Lock(); + assert(!m_safe); + m_safe = true; + if (m_return_value == 0) { + m_return_value = r; + } + + m_flush_handler.reset(); + if (m_consistent) { + finish_unlock(); + } else { + m_lock.Unlock(); + } +} + +void FutureImpl::consistent(int r) { + m_lock.Lock(); + assert(!m_consistent); + m_consistent = true; + m_prev_future.reset(); + if (m_return_value == 0) { + m_return_value = r; + } + + if (m_safe) { + finish_unlock(); + } else { + m_lock.Unlock(); + } +} + +void FutureImpl::finish_unlock() { + assert(m_lock.is_locked()); + assert(m_safe && m_consistent); + + Contexts contexts; + contexts.swap(m_contexts); + + m_lock.Unlock(); + for (Contexts::iterator it = contexts.begin(); + it != contexts.end(); ++it) { + (*it)->complete(m_return_value); + } +} + +std::ostream &operator<<(std::ostream &os, const FutureImpl &future) { + os << "Future[tag_tid=" << future.m_tag_tid << ", " + << "entry_tid=" << future.m_entry_tid << ", " + << "commit_tid=" << future.m_commit_tid << "]"; + return os; +} + +void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) { + p->get(); +} + +void intrusive_ptr_release(FutureImpl::FlushHandler *p) { + p->put(); +} + +} // namespace journal