1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include "JournalingObjectStore.h"
5 #include "common/errno.h"
6 #include "common/debug.h"
8 #define dout_context cct
9 #define dout_subsys ceph_subsys_journal
11 #define dout_prefix *_dout << "journal "
15 void JournalingObjectStore::journal_start()
17 dout(10) << "journal_start" << dendl;
21 void JournalingObjectStore::journal_stop()
23 dout(10) << "journal_stop" << dendl;
24 finisher.wait_for_empty();
28 // A journal_replay() makes journal writeable, this closes that out.
29 void JournalingObjectStore::journal_write_close()
36 apply_manager.reset();
39 int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
41 dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl;
43 if (cct->_conf->journal_replay_from) {
44 dout(0) << "journal_replay forcing replay from "
45 << cct->_conf->journal_replay_from
46 << " instead of " << fs_op_seq << dendl;
47 // the previous op is the last one committed
48 fs_op_seq = cct->_conf->journal_replay_from - 1;
51 uint64_t op_seq = fs_op_seq;
52 apply_manager.init_seq(fs_op_seq);
55 submit_manager.set_op_seq(op_seq);
59 int err = journal->open(op_seq);
61 dout(3) << "journal_replay open failed with "
62 << cpp_strerror(err) << dendl;
73 uint64_t seq = op_seq + 1;
74 if (!journal->read_entry(bl, seq)) {
75 dout(3) << "journal_replay: end of journal, done." << dendl;
80 dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl;
83 assert(op_seq == seq-1);
85 dout(3) << "journal_replay: applying op seq " << seq << dendl;
86 bufferlist::iterator p = bl.begin();
87 vector<ObjectStore::Transaction> tls;
89 tls.emplace_back(Transaction(p));
92 apply_manager.op_apply_start(seq);
93 int r = do_transactions(tls, seq);
94 apply_manager.op_apply_finish(seq);
99 dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
103 dout(3) << "journal_replay: total = " << count << dendl;
107 submit_manager.set_op_seq(op_seq);
109 // done reading, make writeable.
110 err = journal->make_writeable();
115 journal->committed_thru(fs_op_seq);
121 // ------------------------------------
123 uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
125 Mutex::Locker l(apply_lock);
127 dout(10) << "op_apply_start blocked, waiting" << dendl;
128 blocked_cond.Wait(apply_lock);
130 dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> "
131 << (open_ops+1) << dendl;
133 assert(op > committed_seq);
138 void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
140 Mutex::Locker l(apply_lock);
141 dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> "
142 << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> "
143 << MAX(op, max_applied_seq) << dendl;
145 assert(open_ops >= 0);
147 // signal a blocked commit_start
149 blocked_cond.Signal();
152 // there can be multiple applies in flight; track the max value we
153 // note. note that we can't _read_ this value and learn anything
154 // meaningful unless/until we've quiesced all in-flight applies.
155 if (op > max_applied_seq)
156 max_applied_seq = op;
159 uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
162 uint64_t op = ++op_seq;
163 dout(10) << "op_submit_start " << op << dendl;
167 void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
169 dout(10) << "op_submit_finish " << op << dendl;
170 if (op != op_submitted + 1) {
171 dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
172 << ", OUT OF ORDER" << dendl;
173 assert(0 == "out of order op_submit_finish");
180 // ------------------------------------------
182 void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
184 Mutex::Locker l(com_lock);
186 commit_waiters[op].push_back(c);
189 bool JournalingObjectStore::ApplyManager::commit_start()
194 Mutex::Locker l(apply_lock);
195 dout(10) << "commit_start max_applied_seq " << max_applied_seq
196 << ", open_ops " << open_ops << dendl;
198 while (open_ops > 0) {
199 dout(10) << "commit_start waiting for " << open_ops
200 << " open ops to drain" << dendl;
201 blocked_cond.Wait(apply_lock);
203 assert(open_ops == 0);
204 dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
206 Mutex::Locker l(com_lock);
207 if (max_applied_seq == committed_seq) {
208 dout(10) << "commit_start nothing to do" << dendl;
210 assert(commit_waiters.empty());
214 committing_seq = max_applied_seq;
216 dout(10) << "commit_start committing " << committing_seq
217 << ", still blocked" << dendl;
223 journal->commit_start(committing_seq); // tell the journal too
228 void JournalingObjectStore::ApplyManager::commit_started()
230 Mutex::Locker l(apply_lock);
231 // allow new ops. (underlying fs should now be committing all prior ops)
232 dout(10) << "commit_started committing " << committing_seq << ", unblocking"
235 blocked_cond.Signal();
238 void JournalingObjectStore::ApplyManager::commit_finish()
240 Mutex::Locker l(com_lock);
241 dout(10) << "commit_finish thru " << committing_seq << dendl;
244 journal->committed_thru(committing_seq);
246 committed_seq = committing_seq;
248 map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
249 while (p != commit_waiters.end() &&
250 p->first <= committing_seq) {
251 finisher.queue(p->second);
252 commit_waiters.erase(p++);
256 void JournalingObjectStore::_op_journal_transactions(
257 bufferlist& tbl, uint32_t orig_len, uint64_t op,
258 Context *onjournal, TrackedOpRef osd_op)
261 dout(10) << "op_journal_transactions " << op << " reqid_t "
262 << (static_cast<OpRequest *>(osd_op.get()))->get_reqid() << dendl;
264 dout(10) << "op_journal_transactions " << op << dendl;
266 if (journal && journal->is_writeable()) {
267 journal->submit_entry(op, tbl, orig_len, onjournal, osd_op);
268 } else if (onjournal) {
269 apply_manager.add_waiter(op, onjournal);