X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Ffilestore%2FJournalingObjectStore.cc;fp=src%2Fceph%2Fsrc%2Fos%2Ffilestore%2FJournalingObjectStore.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=a9c528f0482d080ccfbf287916499ce476072542;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/os/filestore/JournalingObjectStore.cc b/src/ceph/src/os/filestore/JournalingObjectStore.cc deleted file mode 100644 index a9c528f..0000000 --- a/src/ceph/src/os/filestore/JournalingObjectStore.cc +++ /dev/null @@ -1,271 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- - -#include "JournalingObjectStore.h" - -#include "common/errno.h" -#include "common/debug.h" - -#define dout_context cct -#define dout_subsys ceph_subsys_journal -#undef dout_prefix -#define dout_prefix *_dout << "journal " - - - -void JournalingObjectStore::journal_start() -{ - dout(10) << "journal_start" << dendl; - finisher.start(); -} - -void JournalingObjectStore::journal_stop() -{ - dout(10) << "journal_stop" << dendl; - finisher.wait_for_empty(); - finisher.stop(); -} - -// A journal_replay() makes journal writeable, this closes that out. -void JournalingObjectStore::journal_write_close() -{ - if (journal) { - journal->close(); - delete journal; - journal = 0; - } - apply_manager.reset(); -} - -int JournalingObjectStore::journal_replay(uint64_t fs_op_seq) -{ - dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl; - - if (cct->_conf->journal_replay_from) { - dout(0) << "journal_replay forcing replay from " - << cct->_conf->journal_replay_from - << " instead of " << fs_op_seq << dendl; - // the previous op is the last one committed - fs_op_seq = cct->_conf->journal_replay_from - 1; - } - - uint64_t op_seq = fs_op_seq; - apply_manager.init_seq(fs_op_seq); - - if (!journal) { - submit_manager.set_op_seq(op_seq); - return 0; - } - - int err = journal->open(op_seq); - if (err < 0) { - dout(3) << "journal_replay open failed with " - << cpp_strerror(err) << dendl; - delete journal; - journal = 0; - return err; - } - - replaying = true; - - int count = 0; - while (1) { - bufferlist bl; - uint64_t seq = op_seq + 1; - if (!journal->read_entry(bl, seq)) { - dout(3) << "journal_replay: end of journal, done." << dendl; - break; - } - - if (seq <= op_seq) { - dout(3) << "journal_replay: skipping old op seq " << seq << " <= " << op_seq << dendl; - continue; - } - assert(op_seq == seq-1); - - dout(3) << "journal_replay: applying op seq " << seq << dendl; - bufferlist::iterator p = bl.begin(); - vector tls; - while (!p.end()) { - tls.emplace_back(Transaction(p)); - } - - apply_manager.op_apply_start(seq); - int r = do_transactions(tls, seq); - apply_manager.op_apply_finish(seq); - - op_seq = seq; - count++; - - dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl; - } - - if (count) - dout(3) << "journal_replay: total = " << count << dendl; - - replaying = false; - - submit_manager.set_op_seq(op_seq); - - // done reading, make writeable. - err = journal->make_writeable(); - if (err < 0) - return err; - - if (!count) - journal->committed_thru(fs_op_seq); - - return count; -} - - -// ------------------------------------ - -uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op) -{ - Mutex::Locker l(apply_lock); - while (blocked) { - dout(10) << "op_apply_start blocked, waiting" << dendl; - blocked_cond.Wait(apply_lock); - } - dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " - << (open_ops+1) << dendl; - assert(!blocked); - assert(op > committed_seq); - open_ops++; - return op; -} - -void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op) -{ - Mutex::Locker l(apply_lock); - dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " - << (open_ops-1) << ", max_applied_seq " << max_applied_seq << " -> " - << MAX(op, max_applied_seq) << dendl; - --open_ops; - assert(open_ops >= 0); - - // signal a blocked commit_start - if (blocked) { - blocked_cond.Signal(); - } - - // there can be multiple applies in flight; track the max value we - // note. note that we can't _read_ this value and learn anything - // meaningful unless/until we've quiesced all in-flight applies. - if (op > max_applied_seq) - max_applied_seq = op; -} - -uint64_t JournalingObjectStore::SubmitManager::op_submit_start() -{ - lock.Lock(); - uint64_t op = ++op_seq; - dout(10) << "op_submit_start " << op << dendl; - return op; -} - -void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op) -{ - dout(10) << "op_submit_finish " << op << dendl; - if (op != op_submitted + 1) { - dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1) - << ", OUT OF ORDER" << dendl; - assert(0 == "out of order op_submit_finish"); - } - op_submitted = op; - lock.Unlock(); -} - - -// ------------------------------------------ - -void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c) -{ - Mutex::Locker l(com_lock); - assert(c); - commit_waiters[op].push_back(c); -} - -bool JournalingObjectStore::ApplyManager::commit_start() -{ - bool ret = false; - - { - Mutex::Locker l(apply_lock); - dout(10) << "commit_start max_applied_seq " << max_applied_seq - << ", open_ops " << open_ops << dendl; - blocked = true; - while (open_ops > 0) { - dout(10) << "commit_start waiting for " << open_ops - << " open ops to drain" << dendl; - blocked_cond.Wait(apply_lock); - } - assert(open_ops == 0); - dout(10) << "commit_start blocked, all open_ops have completed" << dendl; - { - Mutex::Locker l(com_lock); - if (max_applied_seq == committed_seq) { - dout(10) << "commit_start nothing to do" << dendl; - blocked = false; - assert(commit_waiters.empty()); - goto out; - } - - committing_seq = max_applied_seq; - - dout(10) << "commit_start committing " << committing_seq - << ", still blocked" << dendl; - } - } - ret = true; - - if (journal) - journal->commit_start(committing_seq); // tell the journal too - out: - return ret; -} - -void JournalingObjectStore::ApplyManager::commit_started() -{ - Mutex::Locker l(apply_lock); - // allow new ops. (underlying fs should now be committing all prior ops) - dout(10) << "commit_started committing " << committing_seq << ", unblocking" - << dendl; - blocked = false; - blocked_cond.Signal(); -} - -void JournalingObjectStore::ApplyManager::commit_finish() -{ - Mutex::Locker l(com_lock); - dout(10) << "commit_finish thru " << committing_seq << dendl; - - if (journal) - journal->committed_thru(committing_seq); - - committed_seq = committing_seq; - - map >::iterator p = commit_waiters.begin(); - while (p != commit_waiters.end() && - p->first <= committing_seq) { - finisher.queue(p->second); - commit_waiters.erase(p++); - } -} - -void JournalingObjectStore::_op_journal_transactions( - bufferlist& tbl, uint32_t orig_len, uint64_t op, - Context *onjournal, TrackedOpRef osd_op) -{ - if (osd_op.get()) - dout(10) << "op_journal_transactions " << op << " reqid_t " - << (static_cast(osd_op.get()))->get_reqid() << dendl; - else - dout(10) << "op_journal_transactions " << op << dendl; - - if (journal && journal->is_writeable()) { - journal->submit_entry(op, tbl, orig_len, onjournal, osd_op); - } else if (onjournal) { - apply_manager.add_waiter(op, onjournal); - } -}