Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / FutureImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/FutureImpl.h"
5 #include "journal/Utils.h"
6
7 namespace journal {
8
9 FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
10                        uint64_t commit_tid)
11   : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
12     m_commit_tid(commit_tid),
13     m_lock("FutureImpl::m_lock", false, false), m_safe(false),
14     m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
15     m_consistent_ack(this) {
16 }
17
18 void FutureImpl::init(const FutureImplPtr &prev_future) {
19   // chain ourself to the prior future (if any) to that we known when the
20   // journal is consistent
21   if (prev_future) {
22     m_prev_future = prev_future;
23     m_prev_future->wait(&m_consistent_ack);
24   } else {
25     m_consistent_ack.complete(0);
26   }
27 }
28
29 void FutureImpl::flush(Context *on_safe) {
30
31   bool complete;
32   FlushHandlers flush_handlers;
33   FutureImplPtr prev_future;
34   {
35     Mutex::Locker locker(m_lock);
36     complete = (m_safe && m_consistent);
37     if (!complete) {
38       if (on_safe != nullptr) {
39         m_contexts.push_back(on_safe);
40       }
41
42       prev_future = prepare_flush(&flush_handlers, m_lock);
43     }
44   }
45
46   // instruct prior futures to flush as well
47   while (prev_future) {
48     prev_future = prev_future->prepare_flush(&flush_handlers);
49   }
50
51   if (complete && on_safe != NULL) {
52     on_safe->complete(m_return_value);
53   } else if (!flush_handlers.empty()) {
54     // attached to journal object -- instruct it to flush all entries through
55     // this one.  possible to become detached while lock is released, so flush
56     // will be re-requested by the object if it doesn't own the future
57     for (auto &pair : flush_handlers) {
58       pair.first->flush(pair.second);
59     }
60   }
61 }
62
63 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
64   Mutex::Locker locker(m_lock);
65   return prepare_flush(flush_handlers, m_lock);
66 }
67
68 FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
69                                         Mutex &lock) {
70   assert(m_lock.is_locked());
71
72   if (m_flush_state == FLUSH_STATE_NONE) {
73     m_flush_state = FLUSH_STATE_REQUESTED;
74
75     if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
76       flush_handlers->insert({m_flush_handler, this});
77     }
78   }
79   return m_prev_future;
80 }
81
82 void FutureImpl::wait(Context *on_safe) {
83   assert(on_safe != NULL);
84   {
85     Mutex::Locker locker(m_lock);
86     if (!m_safe || !m_consistent) {
87       m_contexts.push_back(on_safe);
88       return;
89     }
90   }
91
92   on_safe->complete(m_return_value);
93 }
94
95 bool FutureImpl::is_complete() const {
96   Mutex::Locker locker(m_lock);
97   return m_safe && m_consistent;
98 }
99
100 int FutureImpl::get_return_value() const {
101   Mutex::Locker locker(m_lock);
102   assert(m_safe && m_consistent);
103   return m_return_value;
104 }
105
106 bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
107   Mutex::Locker locker(m_lock);
108   assert(!m_flush_handler);
109   m_flush_handler = flush_handler;
110   return m_flush_state != FLUSH_STATE_NONE;
111 }
112
113 void FutureImpl::safe(int r) {
114   m_lock.Lock();
115   assert(!m_safe);
116   m_safe = true;
117   if (m_return_value == 0) {
118     m_return_value = r;
119   }
120
121   m_flush_handler.reset();
122   if (m_consistent) {
123     finish_unlock();
124   } else {
125     m_lock.Unlock();
126   }
127 }
128
129 void FutureImpl::consistent(int r) {
130   m_lock.Lock();
131   assert(!m_consistent);
132   m_consistent = true;
133   m_prev_future.reset();
134   if (m_return_value == 0) {
135     m_return_value = r;
136   }
137
138   if (m_safe) {
139     finish_unlock();
140   } else {
141     m_lock.Unlock();
142   }
143 }
144
145 void FutureImpl::finish_unlock() {
146   assert(m_lock.is_locked());
147   assert(m_safe && m_consistent);
148
149   Contexts contexts;
150   contexts.swap(m_contexts);
151
152   m_lock.Unlock();
153   for (Contexts::iterator it = contexts.begin();
154        it != contexts.end(); ++it) {
155     (*it)->complete(m_return_value);
156   }
157 }
158
159 std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
160   os << "Future[tag_tid=" << future.m_tag_tid << ", "
161      << "entry_tid=" << future.m_entry_tid << ", "
162      << "commit_tid=" << future.m_commit_tid << "]";
163   return os;
164 }
165
166 void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
167   p->get();
168 }
169
170 void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
171   p->put();
172 }
173
174 } // namespace journal