X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmds%2FMDLog.h;fp=src%2Fceph%2Fsrc%2Fmds%2FMDLog.h;h=5579e5abc007bde82ef30359d0d9ea476a437798;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mds/MDLog.h b/src/ceph/src/mds/MDLog.h new file mode 100644 index 0000000..5579e5a --- /dev/null +++ b/src/ceph/src/mds/MDLog.h @@ -0,0 +1,334 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef CEPH_MDLOG_H +#define CEPH_MDLOG_H + +enum { + l_mdl_first = 5000, + l_mdl_evadd, + l_mdl_evex, + l_mdl_evtrm, + l_mdl_ev, + l_mdl_evexg, + l_mdl_evexd, + l_mdl_segadd, + l_mdl_segex, + l_mdl_segtrm, + l_mdl_seg, + l_mdl_segexg, + l_mdl_segexd, + l_mdl_expos, + l_mdl_wrpos, + l_mdl_rdpos, + l_mdl_jlat, + l_mdl_replayed, + l_mdl_last, +}; + +#include "include/types.h" +#include "include/Context.h" + +#include "common/Thread.h" +#include "common/Cond.h" + +#include "LogSegment.h" + +#include + +class Journaler; +class JournalPointer; +class LogEvent; +class MDSRank; +class LogSegment; +class ESubtreeMap; + +class PerfCounters; + +#include +using std::map; + +#include "common/Finisher.h" + + +class MDLog { +public: + MDSRank *mds; +protected: + int num_events; // in events + + int unflushed; + + bool capped; + + // Log position which is persistent *and* for which + // submit_entry wait_for_safe callbacks have already + // been called. + uint64_t safe_pos; + + inodeno_t ino; + Journaler *journaler; + + PerfCounters *logger; + + + // -- replay -- + class ReplayThread : public Thread { + MDLog *log; + public: + explicit ReplayThread(MDLog *l) : log(l) {} + void* entry() override { + log->_replay_thread(); + return 0; + } + } replay_thread; + bool already_replayed; + + friend class ReplayThread; + friend class C_MDL_Replay; + + list waitfor_replay; + + void _replay(); // old way + void _replay_thread(); // new way + + // Journal recovery/rewrite logic + class RecoveryThread : public Thread { + MDLog *log; + MDSInternalContextBase *completion; + public: + void set_completion(MDSInternalContextBase *c) {completion = c;} + explicit RecoveryThread(MDLog *l) : log(l), completion(NULL) {} + void* entry() override { + log->_recovery_thread(completion); + return 0; + } + } recovery_thread; + void _recovery_thread(MDSInternalContextBase *completion); + void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, MDSInternalContextBase *completion); + + // -- segments -- + map segments; + set expiring_segments; + set expired_segments; + uint64_t event_seq; + int expiring_events; + int expired_events; + + struct PendingEvent { + LogEvent *le; + MDSContext *fin; + bool flush; + PendingEvent(LogEvent *e, MDSContext *c, bool f=false) : le(e), fin(c), flush(f) {} + }; + + int64_t mdsmap_up_features; + map > pending_events; // log segment -> event list + Mutex submit_mutex; + Cond submit_cond; + + void set_safe_pos(uint64_t pos) + { + Mutex::Locker l(submit_mutex); + assert(pos >= safe_pos); + safe_pos = pos; + } + friend class MDSLogContextBase; + + void _submit_thread(); + class SubmitThread : public Thread { + MDLog *log; + public: + explicit SubmitThread(MDLog *l) : log(l) {} + void* entry() override { + log->_submit_thread(); + return 0; + } + } submit_thread; + friend class SubmitThread; + +public: + const std::set &get_expiring_segments() const + { + return expiring_segments; + } +protected: + + // -- subtreemaps -- + friend class ESubtreeMap; + friend class MDCache; + + uint64_t get_last_segment_seq() const { + assert(!segments.empty()); + return segments.rbegin()->first; + } + LogSegment *get_oldest_segment() { + return segments.begin()->second; + } + void remove_oldest_segment() { + map::iterator p = segments.begin(); + delete p->second; + segments.erase(p); + } + +public: + void create_logger(); + + // replay state + map > pending_exports; + + void set_write_iohint(unsigned iohint_flags); + +public: + explicit MDLog(MDSRank *m) : mds(m), + num_events(0), + unflushed(0), + capped(false), + safe_pos(0), + journaler(0), + logger(0), + replay_thread(this), + already_replayed(false), + recovery_thread(this), + event_seq(0), expiring_events(0), expired_events(0), + mdsmap_up_features(0), + submit_mutex("MDLog::submit_mutex"), + submit_thread(this), + cur_event(NULL) { } + ~MDLog(); + + +private: + // -- segments -- + void _start_new_segment(); + void _prepare_new_segment(); + void _journal_segment_subtree_map(MDSInternalContextBase *onsync); +public: + void start_new_segment() { + Mutex::Locker l(submit_mutex); + _start_new_segment(); + } + void prepare_new_segment() { + Mutex::Locker l(submit_mutex); + _prepare_new_segment(); + } + void journal_segment_subtree_map(MDSInternalContextBase *onsync=NULL) { + submit_mutex.Lock(); + _journal_segment_subtree_map(onsync); + submit_mutex.Unlock(); + if (onsync) + flush(); + } + + LogSegment *peek_current_segment() { + return segments.empty() ? NULL : segments.rbegin()->second; + } + + LogSegment *get_current_segment() { + assert(!segments.empty()); + return segments.rbegin()->second; + } + + LogSegment *get_segment(log_segment_seq_t seq) { + if (segments.count(seq)) + return segments[seq]; + return NULL; + } + + bool have_any_segments() const { + return !segments.empty(); + } + + void flush_logger(); + + size_t get_num_events() const { return num_events; } + size_t get_num_segments() const { return segments.size(); } + + uint64_t get_read_pos() const; + uint64_t get_write_pos() const; + uint64_t get_safe_pos() const; + Journaler *get_journaler() { return journaler; } + bool empty() const { return segments.empty(); } + + bool is_capped() const { return capped; } + void cap(); + + void kick_submitter(); + void shutdown(); + + // -- events -- +private: + LogEvent *cur_event; +public: + void _start_entry(LogEvent *e); + void start_entry(LogEvent *e) { + Mutex::Locker l(submit_mutex); + _start_entry(e); + } + void cancel_entry(LogEvent *e); + void _submit_entry(LogEvent *e, MDSLogContextBase *c); + void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) { + Mutex::Locker l(submit_mutex); + _submit_entry(e, c); + submit_cond.Signal(); + } + void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) { + Mutex::Locker l(submit_mutex); + _start_entry(e); + _submit_entry(e, c); + submit_cond.Signal(); + } + bool entry_is_open() const { return cur_event != NULL; } + + void wait_for_safe( MDSInternalContextBase *c ); + void flush(); + bool is_flushed() const { + return unflushed == 0; + } + +private: + void try_expire(LogSegment *ls, int op_prio); + void _maybe_expired(LogSegment *ls, int op_prio); + void _expired(LogSegment *ls); + void _trim_expired_segments(); + + friend class C_MaybeExpiredSegment; + friend class C_MDL_Flushed; + +public: + void trim_expired_segments(); + void trim(int max=-1); + int trim_all(); + bool expiry_done() const + { + return expiring_segments.empty() && expired_segments.empty(); + }; + +private: + void write_head(MDSInternalContextBase *onfinish); + +public: + void create(MDSInternalContextBase *onfinish); // fresh, empty log! + void open(MDSInternalContextBase *onopen); // append() or replay() to follow! + void reopen(MDSInternalContextBase *onopen); + void append(); + void replay(MDSInternalContextBase *onfinish); + + void standby_trim_segments(); + + void dump_replay_status(Formatter *f) const; +}; + +#endif