1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/FutureImpl.h"
5 #include "journal/Utils.h"
9 FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
11 : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
12 m_commit_tid(commit_tid),
13 m_lock("FutureImpl::m_lock", false, false), m_safe(false),
14 m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
15 m_consistent_ack(this) {
18 void FutureImpl::init(const FutureImplPtr &prev_future) {
19 // chain ourself to the prior future (if any) to that we known when the
20 // journal is consistent
22 m_prev_future = prev_future;
23 m_prev_future->wait(&m_consistent_ack);
25 m_consistent_ack.complete(0);
29 void FutureImpl::flush(Context *on_safe) {
32 FlushHandlers flush_handlers;
33 FutureImplPtr prev_future;
35 Mutex::Locker locker(m_lock);
36 complete = (m_safe && m_consistent);
38 if (on_safe != nullptr) {
39 m_contexts.push_back(on_safe);
42 prev_future = prepare_flush(&flush_handlers, m_lock);
46 // instruct prior futures to flush as well
48 prev_future = prev_future->prepare_flush(&flush_handlers);
51 if (complete && on_safe != NULL) {
52 on_safe->complete(m_return_value);
53 } else if (!flush_handlers.empty()) {
54 // attached to journal object -- instruct it to flush all entries through
55 // this one. possible to become detached while lock is released, so flush
56 // will be re-requested by the object if it doesn't own the future
57 for (auto &pair : flush_handlers) {
58 pair.first->flush(pair.second);
63 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
64 Mutex::Locker locker(m_lock);
65 return prepare_flush(flush_handlers, m_lock);
68 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
70 assert(m_lock.is_locked());
72 if (m_flush_state == FLUSH_STATE_NONE) {
73 m_flush_state = FLUSH_STATE_REQUESTED;
75 if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
76 flush_handlers->insert({m_flush_handler, this});
82 void FutureImpl::wait(Context *on_safe) {
83 assert(on_safe != NULL);
85 Mutex::Locker locker(m_lock);
86 if (!m_safe || !m_consistent) {
87 m_contexts.push_back(on_safe);
92 on_safe->complete(m_return_value);
95 bool FutureImpl::is_complete() const {
96 Mutex::Locker locker(m_lock);
97 return m_safe && m_consistent;
100 int FutureImpl::get_return_value() const {
101 Mutex::Locker locker(m_lock);
102 assert(m_safe && m_consistent);
103 return m_return_value;
106 bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
107 Mutex::Locker locker(m_lock);
108 assert(!m_flush_handler);
109 m_flush_handler = flush_handler;
110 return m_flush_state != FLUSH_STATE_NONE;
113 void FutureImpl::safe(int r) {
117 if (m_return_value == 0) {
121 m_flush_handler.reset();
129 void FutureImpl::consistent(int r) {
131 assert(!m_consistent);
133 m_prev_future.reset();
134 if (m_return_value == 0) {
145 void FutureImpl::finish_unlock() {
146 assert(m_lock.is_locked());
147 assert(m_safe && m_consistent);
150 contexts.swap(m_contexts);
153 for (Contexts::iterator it = contexts.begin();
154 it != contexts.end(); ++it) {
155 (*it)->complete(m_return_value);
159 std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
160 os << "Future[tag_tid=" << future.m_tag_tid << ", "
161 << "entry_tid=" << future.m_entry_tid << ", "
162 << "commit_tid=" << future.m_commit_tid << "]";
166 void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
170 void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
174 } // namespace journal