X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmds%2FMDLog.cc;fp=src%2Fceph%2Fsrc%2Fmds%2FMDLog.cc;h=806974a1d0b7dc0d6f1d87c96ecce7738183ad68;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mds/MDLog.cc b/src/ceph/src/mds/MDLog.cc new file mode 100644 index 0000000..806974a --- /dev/null +++ b/src/ceph/src/mds/MDLog.cc @@ -0,0 +1,1464 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "MDSRank.h" +#include "MDLog.h" +#include "MDCache.h" +#include "LogEvent.h" +#include "MDSContext.h" + +#include "osdc/Journaler.h" +#include "mds/JournalPointer.h" + +#include "common/entity_name.h" +#include "common/perf_counters.h" +#include "common/Cond.h" + +#include "events/ESubtreeMap.h" + +#include "common/config.h" +#include "common/errno.h" +#include "include/assert.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log " + +// cons/des +MDLog::~MDLog() +{ + if (journaler) { delete journaler; journaler = 0; } + if (logger) { + g_ceph_context->get_perfcounters_collection()->remove(logger); + delete logger; + logger = 0; + } +} + + +void MDLog::create_logger() +{ + PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last); + + plb.add_u64_counter(l_mdl_evadd, "evadd", + "Events submitted", "subm", PerfCountersBuilder::PRIO_INTERESTING); + plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events"); + plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events"); + plb.add_u64(l_mdl_ev, "ev", + "Events", "evts", PerfCountersBuilder::PRIO_INTERESTING); + plb.add_u64(l_mdl_evexg, "evexg", "Expiring events"); + plb.add_u64(l_mdl_evexd, "evexd", "Current expired events"); + + plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added"); + plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments"); + plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments"); + plb.add_u64(l_mdl_seg, "seg", + "Segments", "segs", PerfCountersBuilder::PRIO_INTERESTING); + plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments"); + plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments"); + + plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position"); + plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position"); + plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position"); + plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency"); + + plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed"); + + // logger + logger = plb.create_perf_counters(); + g_ceph_context->get_perfcounters_collection()->add(logger); +} + +void MDLog::set_write_iohint(unsigned iohint_flags) +{ + journaler->set_write_iohint(iohint_flags); +} + +class C_MDL_WriteError : public MDSIOContextBase { + protected: + MDLog *mdlog; + MDSRank *get_mds() override {return mdlog->mds;} + + void finish(int r) override { + MDSRank *mds = get_mds(); + // assume journal is reliable, so don't choose action based on + // g_conf->mds_action_on_write_error. + if (r == -EBLACKLISTED) { + derr << "we have been blacklisted (fenced), respawning..." << dendl; + mds->respawn(); + } else { + derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl; + // Although it's possible that this could be something transient, + // it's severe and scary, so disable this rank until an administrator + // intervenes. + mds->clog->error() << "Unhandled journal write error on MDS rank " << + mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down."; + mds->damaged(); + ceph_abort(); // damaged should never return + } + } + + public: + explicit C_MDL_WriteError(MDLog *m) : mdlog(m) {} +}; + + +void MDLog::write_head(MDSInternalContextBase *c) +{ + Context *fin = NULL; + if (c != NULL) { + fin = new C_IO_Wrapper(mds, c); + } + journaler->write_head(fin); +} + +uint64_t MDLog::get_read_pos() const +{ + return journaler->get_read_pos(); +} + +uint64_t MDLog::get_write_pos() const +{ + return journaler->get_write_pos(); +} + +uint64_t MDLog::get_safe_pos() const +{ + return journaler->get_write_safe_pos(); +} + + + +void MDLog::create(MDSInternalContextBase *c) +{ + dout(5) << "create empty log" << dendl; + + C_GatherBuilder gather(g_ceph_context); + // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock + // XXX but should maybe that be handled inside Journaler? + gather.set_finisher(new C_IO_Wrapper(mds, c)); + + // The inode of the default Journaler we will create + ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); + + // Instantiate Journaler and start async write to RADOS + assert(journaler == NULL); + journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(), + CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, + l_mdl_jlat, mds->finisher); + assert(journaler->is_readonly()); + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + journaler->set_writeable(); + journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format); + journaler->write_head(gather.new_sub()); + + // Async write JournalPointer to RADOS + JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool()); + jp.front = ino; + jp.back = 0; + jp.save(mds->objecter, gather.new_sub()); + + gather.activate(); + + logger->set(l_mdl_expos, journaler->get_expire_pos()); + logger->set(l_mdl_wrpos, journaler->get_write_pos()); + + submit_thread.create("md_submit"); +} + +void MDLog::open(MDSInternalContextBase *c) +{ + dout(5) << "open discovering log bounds" << dendl; + + assert(!recovery_thread.is_started()); + recovery_thread.set_completion(c); + recovery_thread.create("md_recov_open"); + + submit_thread.create("md_submit"); + // either append() or replay() will follow. +} + +/** + * Final part of reopen() procedure, after recovery_thread + * has done its thing we call append() + */ +class C_ReopenComplete : public MDSInternalContext { + MDLog *mdlog; + MDSInternalContextBase *on_complete; +public: + C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {} + void finish(int r) override { + mdlog->append(); + on_complete->complete(r); + } +}; + +/** + * Given that open() has been called in the past, go through the journal + * recovery procedure again, potentially reformatting the journal if it + * was in an old format. + */ +void MDLog::reopen(MDSInternalContextBase *c) +{ + dout(5) << "reopen" << dendl; + + // Because we will call append() at the completion of this, check that we have already + // read the whole journal. + assert(journaler != NULL); + assert(journaler->get_read_pos() == journaler->get_write_pos()); + + delete journaler; + journaler = NULL; + + // recovery_thread was started at some point in the past. Although + // it has called it's completion if we made it back here, it might + // still not have been cleaned up: join it. + recovery_thread.join(); + + recovery_thread.set_completion(new C_ReopenComplete(this, c)); + recovery_thread.create("md_recov_reopen"); +} + +void MDLog::append() +{ + dout(5) << "append positioning at end and marking writeable" << dendl; + journaler->set_read_pos(journaler->get_write_pos()); + journaler->set_expire_pos(journaler->get_write_pos()); + + journaler->set_writeable(); + + logger->set(l_mdl_expos, journaler->get_write_pos()); +} + + + +// ------------------------------------------------- + +void MDLog::_start_entry(LogEvent *e) +{ + assert(submit_mutex.is_locked_by_me()); + + assert(cur_event == NULL); + cur_event = e; + + event_seq++; + + EMetaBlob *metablob = e->get_metablob(); + if (metablob) { + metablob->event_seq = event_seq; + metablob->last_subtree_map = get_last_segment_seq(); + } +} + +void MDLog::cancel_entry(LogEvent *le) +{ + assert(le == cur_event); + cur_event = NULL; + delete le; +} + +void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c) +{ + assert(submit_mutex.is_locked_by_me()); + assert(!mds->is_any_replay()); + assert(!capped); + + assert(le == cur_event); + cur_event = NULL; + + // let the event register itself in the segment + assert(!segments.empty()); + LogSegment *ls = segments.rbegin()->second; + ls->num_events++; + + le->_segment = ls; + le->update_segment(); + le->set_stamp(ceph_clock_now()); + + mdsmap_up_features = mds->mdsmap->get_up_features(); + pending_events[ls->seq].push_back(PendingEvent(le, c)); + num_events++; + + if (logger) { + logger->inc(l_mdl_evadd); + logger->set(l_mdl_ev, num_events); + } + + unflushed++; + + uint64_t period = journaler->get_layout_period(); + // start a new segment? + if (le->get_type() == EVENT_SUBTREEMAP || + (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) { + // avoid infinite loop when ESubtreeMap is very large. + // do not insert ESubtreeMap among EImportFinish events that finish + // disambiguate imports. Because the ESubtreeMap reflects the subtree + // state when all EImportFinish events are replayed. + } else if (ls->end/period != ls->offset/period || + ls->num_events >= g_conf->mds_log_events_per_segment) { + dout(10) << "submit_entry also starting new segment: last = " + << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl; + _start_new_segment(); + } else if (g_conf->mds_debug_subtrees && + le->get_type() != EVENT_SUBTREEMAP_TEST) { + // debug: journal this every time to catch subtree replay bugs. + // use a different event id so it doesn't get interpreted as a + // LogSegment boundary on replay. + LogEvent *sle = mds->mdcache->create_subtree_map(); + sle->set_type(EVENT_SUBTREEMAP_TEST); + _submit_entry(sle, NULL); + } +} + +/** + * Invoked on the flush after each entry submitted + */ +class C_MDL_Flushed : public MDSLogContextBase { +protected: + MDLog *mdlog; + MDSRank *get_mds() override {return mdlog->mds;} + MDSInternalContextBase *wrapped; + + void finish(int r) override { + if (wrapped) + wrapped->complete(r); + } + +public: + C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w) + : mdlog(m), wrapped(w) {} + C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) { + set_write_pos(wp); + } +}; + +void MDLog::_submit_thread() +{ + dout(10) << "_submit_thread start" << dendl; + + submit_mutex.Lock(); + + while (!mds->is_daemon_stopping()) { + if (g_conf->mds_log_pause) { + submit_cond.Wait(submit_mutex); + continue; + } + + map >::iterator it = pending_events.begin(); + if (it == pending_events.end()) { + submit_cond.Wait(submit_mutex); + continue; + } + + if (it->second.empty()) { + pending_events.erase(it); + continue; + } + + int64_t features = mdsmap_up_features; + PendingEvent data = it->second.front(); + it->second.pop_front(); + + submit_mutex.Unlock(); + + if (data.le) { + LogEvent *le = data.le; + LogSegment *ls = le->_segment; + // encode it, with event type + bufferlist bl; + le->encode_with_header(bl, features); + + uint64_t write_pos = journaler->get_write_pos(); + + le->set_start_off(write_pos); + if (le->get_type() == EVENT_SUBTREEMAP) + ls->offset = write_pos; + + dout(5) << "_submit_thread " << write_pos << "~" << bl.length() + << " : " << *le << dendl; + + // journal it. + const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed. + ls->end = new_write_pos; + + MDSLogContextBase *fin; + if (data.fin) { + fin = dynamic_cast(data.fin); + assert(fin); + fin->set_write_pos(new_write_pos); + } else { + fin = new C_MDL_Flushed(this, new_write_pos); + } + + journaler->wait_for_flush(fin); + + if (data.flush) + journaler->flush(); + + if (logger) + logger->set(l_mdl_wrpos, ls->end); + + delete le; + } else { + if (data.fin) { + MDSInternalContextBase* fin = + dynamic_cast(data.fin); + assert(fin); + C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin); + fin2->set_write_pos(journaler->get_write_pos()); + journaler->wait_for_flush(fin2); + } + if (data.flush) + journaler->flush(); + } + + submit_mutex.Lock(); + if (data.flush) + unflushed = 0; + else if (data.le) + unflushed++; + } + + submit_mutex.Unlock(); +} + +void MDLog::wait_for_safe(MDSInternalContextBase *c) +{ + submit_mutex.Lock(); + + bool no_pending = true; + if (!pending_events.empty()) { + pending_events.rbegin()->second.push_back(PendingEvent(NULL, c)); + no_pending = false; + submit_cond.Signal(); + } + + submit_mutex.Unlock(); + + if (no_pending && c) + journaler->wait_for_flush(new C_IO_Wrapper(mds, c)); +} + +void MDLog::flush() +{ + submit_mutex.Lock(); + + bool do_flush = unflushed > 0; + unflushed = 0; + if (!pending_events.empty()) { + pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true)); + do_flush = false; + submit_cond.Signal(); + } + + submit_mutex.Unlock(); + + if (do_flush) + journaler->flush(); +} + +void MDLog::kick_submitter() +{ + Mutex::Locker l(submit_mutex); + submit_cond.Signal(); +} + +void MDLog::cap() +{ + dout(5) << "cap" << dendl; + capped = true; +} + +void MDLog::shutdown() +{ + assert(mds->mds_lock.is_locked_by_me()); + + dout(5) << "shutdown" << dendl; + if (submit_thread.is_started()) { + assert(mds->is_daemon_stopping()); + + if (submit_thread.am_self()) { + // Called suicide from the thread: trust it to do no work after + // returning from suicide, and subsequently respect mds->is_daemon_stopping() + // and fall out of its loop. + } else { + mds->mds_lock.Unlock(); + // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else + // picking it up will do anything with it. + + submit_mutex.Lock(); + submit_cond.Signal(); + submit_mutex.Unlock(); + + mds->mds_lock.Lock(); + + submit_thread.join(); + } + } + + // Replay thread can be stuck inside e.g. Journaler::wait_for_readable, + // so we need to shutdown the journaler first. + if (journaler) { + journaler->shutdown(); + } + + if (replay_thread.is_started() && !replay_thread.am_self()) { + mds->mds_lock.Unlock(); + replay_thread.join(); + mds->mds_lock.Lock(); + } + + if (recovery_thread.is_started() && !recovery_thread.am_self()) { + mds->mds_lock.Unlock(); + recovery_thread.join(); + mds->mds_lock.Lock(); + } +} + + +// ----------------------------- +// segments + +void MDLog::_start_new_segment() +{ + _prepare_new_segment(); + _journal_segment_subtree_map(NULL); +} + +void MDLog::_prepare_new_segment() +{ + assert(submit_mutex.is_locked_by_me()); + + uint64_t seq = event_seq + 1; + dout(7) << __func__ << " seq " << seq << dendl; + + segments[seq] = new LogSegment(seq); + + logger->inc(l_mdl_segadd); + logger->set(l_mdl_seg, segments.size()); + + // Adjust to next stray dir + dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid() + << dendl; + mds->mdcache->advance_stray(); +} + +void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync) +{ + assert(submit_mutex.is_locked_by_me()); + + dout(7) << __func__ << dendl; + ESubtreeMap *sle = mds->mdcache->create_subtree_map(); + sle->event_seq = get_last_segment_seq(); + + _submit_entry(sle, new C_MDL_Flushed(this, onsync)); +} + +void MDLog::trim(int m) +{ + unsigned max_segments = g_conf->mds_log_max_segments; + int max_events = g_conf->mds_log_max_events; + if (m >= 0) + max_events = m; + + if (mds->mdcache->is_readonly()) { + dout(10) << "trim, ignoring read-only FS" << dendl; + return; + } + + // Clamp max_events to not be smaller than events per segment + if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) { + max_events = g_conf->mds_log_events_per_segment + 1; + } + + submit_mutex.Lock(); + + // trim! + dout(10) << "trim " + << segments.size() << " / " << max_segments << " segments, " + << num_events << " / " << max_events << " events" + << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring" + << ", " << expired_segments.size() << " (" << expired_events << ") expired" + << dendl; + + if (segments.empty()) { + submit_mutex.Unlock(); + return; + } + + // hack: only trim for a few seconds at a time + utime_t stop = ceph_clock_now(); + stop += 2.0; + + map::iterator p = segments.begin(); + while (p != segments.end() && + ((max_events >= 0 && + num_events - expiring_events - expired_events > max_events) || + (segments.size() - expiring_segments.size() - expired_segments.size() > max_segments))) { + + if (stop < ceph_clock_now()) + break; + + int num_expiring_segments = (int)expiring_segments.size(); + if (num_expiring_segments >= g_conf->mds_log_max_expiring) + break; + + int op_prio = CEPH_MSG_PRIO_LOW + + (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) * + num_expiring_segments / g_conf->mds_log_max_expiring; + + // look at first segment + LogSegment *ls = p->second; + assert(ls); + ++p; + + if (pending_events.count(ls->seq) || + ls->end > safe_pos) { + dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe " + << journaler->get_write_safe_pos() << " < end " << ls->end << dendl; + break; + } + if (expiring_segments.count(ls)) { + dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else if (expired_segments.count(ls)) { + dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else { + assert(expiring_segments.count(ls) == 0); + expiring_segments.insert(ls); + expiring_events += ls->num_events; + submit_mutex.Unlock(); + + uint64_t last_seq = ls->seq; + try_expire(ls, op_prio); + + submit_mutex.Lock(); + p = segments.lower_bound(last_seq + 1); + } + } + + // discard expired segments and unlock submit_mutex + _trim_expired_segments(); +} + +class C_MaybeExpiredSegment : public MDSInternalContext { + MDLog *mdlog; + LogSegment *ls; + int op_prio; + public: + C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : + MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {} + void finish(int res) override { + if (res < 0) + mdlog->mds->handle_write_error(res); + mdlog->_maybe_expired(ls, op_prio); + } +}; + +/** + * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest + * segment. + */ +int MDLog::trim_all() +{ + submit_mutex.Lock(); + + dout(10) << __func__ << ": " + << segments.size() + << "/" << expiring_segments.size() + << "/" << expired_segments.size() << dendl; + + uint64_t last_seq = 0; + if (!segments.empty()) + last_seq = get_last_segment_seq(); + + map::iterator p = segments.begin(); + while (p != segments.end() && + p->first < last_seq && p->second->end <= safe_pos) { + LogSegment *ls = p->second; + ++p; + + // Caller should have flushed journaler before calling this + if (pending_events.count(ls->seq)) { + dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl; + submit_mutex.Unlock(); + return -EAGAIN; + } + + if (expiring_segments.count(ls)) { + dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else if (expired_segments.count(ls)) { + dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + } else { + assert(expiring_segments.count(ls) == 0); + expiring_segments.insert(ls); + expiring_events += ls->num_events; + submit_mutex.Unlock(); + + uint64_t next_seq = ls->seq + 1; + try_expire(ls, CEPH_MSG_PRIO_DEFAULT); + + submit_mutex.Lock(); + p = segments.lower_bound(next_seq); + } + } + + _trim_expired_segments(); + + return 0; +} + + +void MDLog::try_expire(LogSegment *ls, int op_prio) +{ + MDSGatherBuilder gather_bld(g_ceph_context); + ls->try_to_expire(mds, gather_bld, op_prio); + + if (gather_bld.has_subs()) { + dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl; + gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio)); + gather_bld.activate(); + } else { + dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl; + submit_mutex.Lock(); + assert(expiring_segments.count(ls)); + expiring_segments.erase(ls); + expiring_events -= ls->num_events; + _expired(ls); + submit_mutex.Unlock(); + } + + logger->set(l_mdl_segexg, expiring_segments.size()); + logger->set(l_mdl_evexg, expiring_events); +} + +void MDLog::_maybe_expired(LogSegment *ls, int op_prio) +{ + if (mds->mdcache->is_readonly()) { + dout(10) << "_maybe_expired, ignoring read-only FS" << dendl; + return; + } + + dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + try_expire(ls, op_prio); +} + +void MDLog::_trim_expired_segments() +{ + assert(submit_mutex.is_locked_by_me()); + + // trim expired segments? + bool trimmed = false; + while (!segments.empty()) { + LogSegment *ls = segments.begin()->second; + if (!expired_segments.count(ls)) { + dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset + << " to expire" << dendl; + break; + } + + dout(10) << "_trim_expired_segments trimming expired " + << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl; + expired_events -= ls->num_events; + expired_segments.erase(ls); + num_events -= ls->num_events; + + // this was the oldest segment, adjust expire pos + if (journaler->get_expire_pos() < ls->end) { + journaler->set_expire_pos(ls->end); + logger->set(l_mdl_expos, ls->end); + } else { + logger->set(l_mdl_expos, ls->offset); + } + + logger->inc(l_mdl_segtrm); + logger->inc(l_mdl_evtrm, ls->num_events); + + segments.erase(ls->seq); + delete ls; + trimmed = true; + } + + submit_mutex.Unlock(); + + if (trimmed) + journaler->write_head(0); +} + +void MDLog::trim_expired_segments() +{ + submit_mutex.Lock(); + _trim_expired_segments(); +} + +void MDLog::_expired(LogSegment *ls) +{ + assert(submit_mutex.is_locked_by_me()); + + dout(5) << "_expired segment " << ls->seq << "/" << ls->offset + << ", " << ls->num_events << " events" << dendl; + + if (!capped && ls == peek_current_segment()) { + dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset + << ", last one and !capped" << dendl; + } else { + // expired. + expired_segments.insert(ls); + expired_events += ls->num_events; + + // Trigger all waiters + for (std::list::iterator i = ls->expiry_waiters.begin(); + i != ls->expiry_waiters.end(); ++i) { + (*i)->complete(0); + } + ls->expiry_waiters.clear(); + + logger->inc(l_mdl_evex, ls->num_events); + logger->inc(l_mdl_segex); + } + + logger->set(l_mdl_ev, num_events); + logger->set(l_mdl_evexd, expired_events); + logger->set(l_mdl_seg, segments.size()); + logger->set(l_mdl_segexd, expired_segments.size()); +} + + + +void MDLog::replay(MDSInternalContextBase *c) +{ + assert(journaler->is_active()); + assert(journaler->is_readonly()); + + // empty? + if (journaler->get_read_pos() == journaler->get_write_pos()) { + dout(10) << "replay - journal empty, done." << dendl; + mds->mdcache->trim(); + if (c) { + c->complete(0); + } + return; + } + + // add waiter + if (c) + waitfor_replay.push_back(c); + + // go! + dout(10) << "replay start, from " << journaler->get_read_pos() + << " to " << journaler->get_write_pos() << dendl; + + assert(num_events == 0 || already_replayed); + if (already_replayed) { + // Ensure previous instance of ReplayThread is joined before + // we create another one + replay_thread.join(); + } + already_replayed = true; + + replay_thread.create("md_log_replay"); +} + + +/** + * Resolve the JournalPointer object to a journal file, and + * instantiate a Journaler object. This may re-write the journal + * if the journal in RADOS appears to be in an old format. + * + * This is a separate thread because of the way it is initialized from inside + * the mds lock, which is also the global objecter lock -- rather than split + * it up into hard-to-read async operations linked up by contexts, + * + * When this function completes, the `journaler` attribute will be set to + * a Journaler instance using the latest available serialization format. + */ +void MDLog::_recovery_thread(MDSInternalContextBase *completion) +{ + assert(journaler == NULL); + if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) { + dout(0) << "Configuration value for mds_journal_format is out of bounds, max is " + << JOURNAL_FORMAT_MAX << dendl; + + // Oh dear, something unreadable in the store for this rank: require + // operator intervention. + mds->damaged(); + ceph_abort(); // damaged should not return + } + + // First, read the pointer object. + // If the pointer object is not present, then create it with + // front = default ino and back = null + JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool()); + const int read_result = jp.load(mds->objecter); + if (read_result == -ENOENT) { + inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); + jp.front = default_log_ino; + int write_result = jp.save(mds->objecter); + // Nothing graceful we can do for this + assert(write_result >= 0); + } else if (read_result == -EBLACKLISTED) { + derr << "Blacklisted during JournalPointer read! Respawning..." << dendl; + mds->respawn(); + ceph_abort(); // Should be unreachable because respawn calls execv + } else if (read_result != 0) { + mds->clog->error() << "failed to read JournalPointer: " << read_result + << " (" << cpp_strerror(read_result) << ")"; + mds->damaged_unlocked(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + + // If the back pointer is non-null, that means that a journal + // rewrite failed part way through. Erase the back journal + // to clean up. + if (jp.back) { + if (mds->is_standby_replay()) { + dout(1) << "Journal " << jp.front << " is being rewritten, " + << "cannot replay in standby until an active MDS completes rewrite" << dendl; + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + return; + } + completion->complete(-EAGAIN); + return; + } + dout(1) << "Erasing journal " << jp.back << dendl; + C_SaferCond erase_waiter; + Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(), + CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, + mds->finisher); + + // Read all about this journal (header + extents) + C_SaferCond recover_wait; + back.recover(&recover_wait); + int recovery_result = recover_wait.wait(); + if (recovery_result == -EBLACKLISTED) { + derr << "Blacklisted during journal recovery! Respawning..." << dendl; + mds->respawn(); + ceph_abort(); // Should be unreachable because respawn calls execv + } else if (recovery_result != 0) { + // Journaler.recover succeeds if no journal objects are present: an error + // means something worse like a corrupt header, which we can't handle here. + mds->clog->error() << "Error recovering journal " << jp.front << ": " + << cpp_strerror(recovery_result); + mds->damaged_unlocked(); + assert(recovery_result == 0); // Unreachable because damaged() calls respawn() + } + + // We could read journal, so we can erase it. + back.erase(&erase_waiter); + int erase_result = erase_waiter.wait(); + + // If we are successful, or find no data, we can update the JournalPointer to + // reflect that the back journal is gone. + if (erase_result != 0 && erase_result != -ENOENT) { + derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl; + } else { + dout(1) << "Successfully erased journal, updating journal pointer" << dendl; + jp.back = 0; + int write_result = jp.save(mds->objecter); + // Nothing graceful we can do for this + assert(write_result >= 0); + } + } + + /* Read the header from the front journal */ + Journaler *front_journal = new Journaler("mdlog", jp.front, + mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, + logger, l_mdl_jlat, mds->finisher); + + // Assign to ::journaler so that we can be aborted by ::shutdown while + // waiting for journaler recovery + { + Mutex::Locker l(mds->mds_lock); + journaler = front_journal; + } + + C_SaferCond recover_wait; + front_journal->recover(&recover_wait); + dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl; + int recovery_result = recover_wait.wait(); + dout(4) << "Journal " << jp.front << " recovered." << dendl; + + if (recovery_result == -EBLACKLISTED) { + derr << "Blacklisted during journal recovery! Respawning..." << dendl; + mds->respawn(); + ceph_abort(); // Should be unreachable because respawn calls execv + } else if (recovery_result != 0) { + mds->clog->error() << "Error recovering journal " << jp.front << ": " + << cpp_strerror(recovery_result); + mds->damaged_unlocked(); + assert(recovery_result == 0); // Unreachable because damaged() calls respawn() + } + + /* Check whether the front journal format is acceptable or needs re-write */ + if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) { + dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format() + << ", does this MDS daemon require upgrade?" << dendl; + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + journaler = NULL; + delete front_journal; + return; + } + completion->complete(-EINVAL); + } + } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) { + /* The journal is of configured format, or we are in standbyreplay and will + * tolerate replaying old journals until we have to go active. Use front_journal as + * our journaler attribute and complete */ + dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl; + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + return; + } + completion->complete(0); + } + } else { + /* Hand off to reformat routine, which will ultimately set the + * completion when it has done its thing */ + dout(1) << "Journal " << jp.front << " has old format " + << front_journal->get_stream_format() << ", it will now be updated" << dendl; + _reformat_journal(jp, front_journal, completion); + } +} + +/** + * Blocking rewrite of the journal to a new file, followed by + * swap of journal pointer to point to the new one. + * + * We write the new journal to the 'back' journal from the JournalPointer, + * swapping pointers to make that one the front journal only when we have + * safely completed. + */ +void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion) +{ + assert(!jp_in.is_null()); + assert(completion != NULL); + assert(old_journal != NULL); + + JournalPointer jp = jp_in; + + /* Set JournalPointer.back to the location we will write the new journal */ + inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); + inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid(); + jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino); + int write_result = jp.save(mds->objecter); + assert(write_result == 0); + + /* Create the new Journaler file */ + Journaler *new_journal = new Journaler("mdlog", jp.back, + mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher); + dout(4) << "Writing new journal header " << jp.back << dendl; + file_layout_t new_layout = old_journal->get_layout(); + new_journal->set_writeable(); + new_journal->create(&new_layout, g_conf->mds_journal_format); + + /* Write the new journal header to RADOS */ + C_SaferCond write_head_wait; + new_journal->write_head(&write_head_wait); + write_head_wait.wait(); + + // Read in the old journal, and whenever we have readable events, + // write them to the new journal. + int r = 0; + + // In old format journals before event_seq was introduced, the serialized + // offset of a SubtreeMap message in the log is used as the unique ID for + // a log segment. Because we change serialization, this will end up changing + // for us, so we have to explicitly update the fields that point back to that + // log segment. + std::map segment_pos_rewrite; + + // The logic in here borrowed from replay_thread expects mds_lock to be held, + // e.g. between checking readable and doing wait_for_readable so that journaler + // state doesn't change in between. + uint32_t events_transcribed = 0; + while (1) { + while (!old_journal->is_readable() && + old_journal->get_read_pos() < old_journal->get_write_pos() && + !old_journal->get_error()) { + + // Issue a journal prefetch + C_SaferCond readable_waiter; + old_journal->wait_for_readable(&readable_waiter); + + // Wait for a journal prefetch to complete + readable_waiter.wait(); + } + if (old_journal->get_error()) { + r = old_journal->get_error(); + dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; + break; + } + + if (!old_journal->is_readable() && + old_journal->get_read_pos() == old_journal->get_write_pos()) + break; + + // Read one serialized LogEvent + assert(old_journal->is_readable()); + bufferlist bl; + uint64_t le_pos = old_journal->get_read_pos(); + bool r = old_journal->try_read_entry(bl); + if (!r && old_journal->get_error()) + continue; + assert(r); + + // Update segment_pos_rewrite + LogEvent *le = LogEvent::decode(bl); + if (le) { + bool modified = false; + + if (le->get_type() == EVENT_SUBTREEMAP || + le->get_type() == EVENT_RESETJOURNAL) { + ESubtreeMap *sle = dynamic_cast(le); + if (sle == NULL || sle->event_seq == 0) { + // A non-explicit event seq: the effective sequence number + // of this segment is it's position in the old journal and + // the new effective sequence number will be its position + // in the new journal. + segment_pos_rewrite[le_pos] = new_journal->get_write_pos(); + dout(20) << __func__ << " discovered segment seq mapping " + << le_pos << " -> " << new_journal->get_write_pos() << dendl; + } + } else { + event_seq++; + } + + // Rewrite segment references if necessary + EMetaBlob *blob = le->get_metablob(); + if (blob) { + modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite); + } + + // Zero-out expire_pos in subtreemap because offsets have changed + // (expire_pos is just an optimization so it's safe to eliminate it) + if (le->get_type() == EVENT_SUBTREEMAP + || le->get_type() == EVENT_SUBTREEMAP_TEST) { + ESubtreeMap *sle = dynamic_cast(le); + assert(sle != NULL); + dout(20) << __func__ << " zeroing expire_pos in subtreemap event at " + << le_pos << " seq=" << sle->event_seq << dendl; + sle->expire_pos = 0; + modified = true; + } + + if (modified) { + bl.clear(); + le->encode_with_header(bl, mds->mdsmap->get_up_features()); + } + + delete le; + } else { + // Failure from LogEvent::decode, our job is to change the journal wrapper, + // not validate the contents, so pass it through. + dout(1) << __func__ << " transcribing un-decodable LogEvent at old position " + << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos() + << dendl; + } + + // Write (buffered, synchronous) one serialized LogEvent + events_transcribed += 1; + new_journal->append_entry(bl); + } + + dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl; + C_SaferCond flush_waiter; + new_journal->flush(&flush_waiter); + flush_waiter.wait(); + + // If failed to rewrite journal, leave the part written journal + // as garbage to be cleaned up next startup. + assert(r == 0); + + /* Now that the new journal is safe, we can flip the pointers */ + inodeno_t const tmp = jp.front; + jp.front = jp.back; + jp.back = tmp; + write_result = jp.save(mds->objecter); + assert(write_result == 0); + + /* Delete the old journal to free space */ + dout(1) << "New journal flushed, erasing old journal" << dendl; + C_SaferCond erase_waiter; + old_journal->erase(&erase_waiter); + int erase_result = erase_waiter.wait(); + assert(erase_result == 0); + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + delete new_journal; + return; + } + assert(journaler == old_journal); + journaler = NULL; + delete old_journal; + } + + /* Update the pointer to reflect we're back in clean single journal state. */ + jp.back = 0; + write_result = jp.save(mds->objecter); + assert(write_result == 0); + + /* Reset the Journaler object to its default state */ + dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl; + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + delete new_journal; + return; + } + journaler = new_journal; + journaler->set_readonly(); + journaler->set_write_error_handler(new C_MDL_WriteError(this)); + } + + /* Trigger completion */ + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + return; + } + completion->complete(0); + } +} + + +// i am a separate thread +void MDLog::_replay_thread() +{ + dout(10) << "_replay_thread start" << dendl; + + // loop + int r = 0; + while (1) { + // wait for read? + while (!journaler->is_readable() && + journaler->get_read_pos() < journaler->get_write_pos() && + !journaler->get_error()) { + C_SaferCond readable_waiter; + journaler->wait_for_readable(&readable_waiter); + r = readable_waiter.wait(); + } + if (journaler->get_error()) { + r = journaler->get_error(); + dout(0) << "_replay journaler got error " << r << ", aborting" << dendl; + if (r == -ENOENT) { + if (mds->is_standby_replay()) { + // journal has been trimmed by somebody else + r = -EAGAIN; + } else { + mds->clog->error() << "missing journal object"; + mds->damaged_unlocked(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + } else if (r == -EINVAL) { + if (journaler->get_read_pos() < journaler->get_expire_pos()) { + // this should only happen if you're following somebody else + if(journaler->is_readonly()) { + dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl; + r = -EAGAIN; + } else { + mds->clog->error() << "invalid journaler offsets"; + mds->damaged_unlocked(); + ceph_abort(); // Should be unreachable because damaged() calls respawn() + } + } else { + /* re-read head and check it + * Given that replay happens in a separate thread and + * the MDS is going to either shut down or restart when + * we return this error, doing it synchronously is fine + * -- as long as we drop the main mds lock--. */ + C_SaferCond reread_fin; + journaler->reread_head(&reread_fin); + int err = reread_fin.wait(); + if (err) { + if (err == -ENOENT && mds->is_standby_replay()) { + r = -EAGAIN; + dout(1) << "Journal header went away while in standby replay, journal rewritten?" + << dendl; + break; + } else { + dout(0) << "got error while reading head: " << cpp_strerror(err) + << dendl; + + mds->clog->error() << "error reading journal header"; + mds->damaged_unlocked(); + ceph_abort(); // Should be unreachable because damaged() calls + // respawn() + } + } + standby_trim_segments(); + if (journaler->get_read_pos() < journaler->get_expire_pos()) { + dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl; + r = -EAGAIN; + } + } + } + break; + } + + if (!journaler->is_readable() && + journaler->get_read_pos() == journaler->get_write_pos()) + break; + + assert(journaler->is_readable() || mds->is_daemon_stopping()); + + // read it + uint64_t pos = journaler->get_read_pos(); + bufferlist bl; + bool r = journaler->try_read_entry(bl); + if (!r && journaler->get_error()) + continue; + assert(r); + + // unpack event + LogEvent *le = LogEvent::decode(bl); + if (!le) { + dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() + << " -- unable to decode event" << dendl; + dout(0) << "dump of unknown or corrupt event:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + + mds->clog->error() << "corrupt journal event at " << pos << "~" + << bl.length() << " / " + << journaler->get_write_pos(); + if (g_conf->mds_log_skip_corrupt_events) { + continue; + } else { + mds->damaged_unlocked(); + ceph_abort(); // Should be unreachable because damaged() calls + // respawn() + } + + } + le->set_start_off(pos); + + // new segment? + if (le->get_type() == EVENT_SUBTREEMAP || + le->get_type() == EVENT_RESETJOURNAL) { + ESubtreeMap *sle = dynamic_cast(le); + if (sle && sle->event_seq > 0) + event_seq = sle->event_seq; + else + event_seq = pos; + segments[event_seq] = new LogSegment(event_seq, pos); + logger->set(l_mdl_seg, segments.size()); + } else { + event_seq++; + } + + // have we seen an import map yet? + if (segments.empty()) { + dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() + << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl; + } else { + dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() + << " " << le->get_stamp() << ": " << *le << dendl; + le->_segment = get_current_segment(); // replay may need this + le->_segment->num_events++; + le->_segment->end = journaler->get_read_pos(); + num_events++; + + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + return; + } + logger->inc(l_mdl_replayed); + le->replay(mds); + } + } + delete le; + + logger->set(l_mdl_rdpos, pos); + } + + // done! + if (r == 0) { + assert(journaler->get_read_pos() == journaler->get_write_pos()); + dout(10) << "_replay - complete, " << num_events + << " events" << dendl; + + logger->set(l_mdl_expos, journaler->get_expire_pos()); + } + + safe_pos = journaler->get_write_safe_pos(); + + dout(10) << "_replay_thread kicking waiters" << dendl; + { + Mutex::Locker l(mds->mds_lock); + if (mds->is_daemon_stopping()) { + return; + } + finish_contexts(g_ceph_context, waitfor_replay, r); + } + + dout(10) << "_replay_thread finish" << dendl; +} + +void MDLog::standby_trim_segments() +{ + dout(10) << "standby_trim_segments" << dendl; + uint64_t expire_pos = journaler->get_expire_pos(); + dout(10) << " expire_pos=" << expire_pos << dendl; + bool removed_segment = false; + while (have_any_segments()) { + LogSegment *seg = get_oldest_segment(); + dout(10) << " segment seq=" << seg->seq << " " << seg->offset << + "~" << seg->end - seg->offset << dendl; + + if (seg->end > expire_pos) { + dout(10) << " won't remove, not expired!" << dendl; + break; + } + + if (segments.size() == 1) { + dout(10) << " won't remove, last segment!" << dendl; + break; + } + + dout(10) << " removing segment" << dendl; + mds->mdcache->standby_trim_segment(seg); + remove_oldest_segment(); + removed_segment = true; + } + + if (removed_segment) { + dout(20) << " calling mdcache->trim!" << dendl; + mds->mdcache->trim(); + } else { + dout(20) << " removed no segments!" << dendl; + } +} + +void MDLog::dump_replay_status(Formatter *f) const +{ + f->open_object_section("replay_status"); + f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0); + f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0); + f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0); + f->dump_unsigned("num_events", get_num_events()); + f->dump_unsigned("num_segments", get_num_segments()); + f->close_section(); +}