1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
41 #include "include/types.h"
42 #include "include/Context.h"
44 #include "common/Thread.h"
45 #include "common/Cond.h"
47 #include "LogSegment.h"
63 #include "common/Finisher.h"
70 int num_events; // in events
76 // Log position which is persistent *and* for which
77 // submit_entry wait_for_safe callbacks have already
88 class ReplayThread : public Thread {
91 explicit ReplayThread(MDLog *l) : log(l) {}
92 void* entry() override {
93 log->_replay_thread();
97 bool already_replayed;
99 friend class ReplayThread;
100 friend class C_MDL_Replay;
102 list<MDSInternalContextBase*> waitfor_replay;
104 void _replay(); // old way
105 void _replay_thread(); // new way
107 // Journal recovery/rewrite logic
108 class RecoveryThread : public Thread {
110 MDSInternalContextBase *completion;
112 void set_completion(MDSInternalContextBase *c) {completion = c;}
113 explicit RecoveryThread(MDLog *l) : log(l), completion(NULL) {}
114 void* entry() override {
115 log->_recovery_thread(completion);
119 void _recovery_thread(MDSInternalContextBase *completion);
120 void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, MDSInternalContextBase *completion);
123 map<uint64_t,LogSegment*> segments;
124 set<LogSegment*> expiring_segments;
125 set<LogSegment*> expired_segments;
130 struct PendingEvent {
134 PendingEvent(LogEvent *e, MDSContext *c, bool f=false) : le(e), fin(c), flush(f) {}
137 int64_t mdsmap_up_features;
138 map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
142 void set_safe_pos(uint64_t pos)
144 Mutex::Locker l(submit_mutex);
145 assert(pos >= safe_pos);
148 friend class MDSLogContextBase;
150 void _submit_thread();
151 class SubmitThread : public Thread {
154 explicit SubmitThread(MDLog *l) : log(l) {}
155 void* entry() override {
156 log->_submit_thread();
160 friend class SubmitThread;
163 const std::set<LogSegment*> &get_expiring_segments() const
165 return expiring_segments;
170 friend class ESubtreeMap;
171 friend class MDCache;
173 uint64_t get_last_segment_seq() const {
174 assert(!segments.empty());
175 return segments.rbegin()->first;
177 LogSegment *get_oldest_segment() {
178 return segments.begin()->second;
180 void remove_oldest_segment() {
181 map<uint64_t, LogSegment*>::iterator p = segments.begin();
187 void create_logger();
190 map<inodeno_t, set<inodeno_t> > pending_exports;
192 void set_write_iohint(unsigned iohint_flags);
195 explicit MDLog(MDSRank *m) : mds(m),
203 already_replayed(false),
204 recovery_thread(this),
205 event_seq(0), expiring_events(0), expired_events(0),
206 mdsmap_up_features(0),
207 submit_mutex("MDLog::submit_mutex"),
215 void _start_new_segment();
216 void _prepare_new_segment();
217 void _journal_segment_subtree_map(MDSInternalContextBase *onsync);
219 void start_new_segment() {
220 Mutex::Locker l(submit_mutex);
221 _start_new_segment();
223 void prepare_new_segment() {
224 Mutex::Locker l(submit_mutex);
225 _prepare_new_segment();
227 void journal_segment_subtree_map(MDSInternalContextBase *onsync=NULL) {
229 _journal_segment_subtree_map(onsync);
230 submit_mutex.Unlock();
235 LogSegment *peek_current_segment() {
236 return segments.empty() ? NULL : segments.rbegin()->second;
239 LogSegment *get_current_segment() {
240 assert(!segments.empty());
241 return segments.rbegin()->second;
244 LogSegment *get_segment(log_segment_seq_t seq) {
245 if (segments.count(seq))
246 return segments[seq];
250 bool have_any_segments() const {
251 return !segments.empty();
256 size_t get_num_events() const { return num_events; }
257 size_t get_num_segments() const { return segments.size(); }
259 uint64_t get_read_pos() const;
260 uint64_t get_write_pos() const;
261 uint64_t get_safe_pos() const;
262 Journaler *get_journaler() { return journaler; }
263 bool empty() const { return segments.empty(); }
265 bool is_capped() const { return capped; }
268 void kick_submitter();
275 void _start_entry(LogEvent *e);
276 void start_entry(LogEvent *e) {
277 Mutex::Locker l(submit_mutex);
280 void cancel_entry(LogEvent *e);
281 void _submit_entry(LogEvent *e, MDSLogContextBase *c);
282 void submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
283 Mutex::Locker l(submit_mutex);
285 submit_cond.Signal();
287 void start_submit_entry(LogEvent *e, MDSLogContextBase *c = 0) {
288 Mutex::Locker l(submit_mutex);
291 submit_cond.Signal();
293 bool entry_is_open() const { return cur_event != NULL; }
295 void wait_for_safe( MDSInternalContextBase *c );
297 bool is_flushed() const {
298 return unflushed == 0;
302 void try_expire(LogSegment *ls, int op_prio);
303 void _maybe_expired(LogSegment *ls, int op_prio);
304 void _expired(LogSegment *ls);
305 void _trim_expired_segments();
307 friend class C_MaybeExpiredSegment;
308 friend class C_MDL_Flushed;
311 void trim_expired_segments();
312 void trim(int max=-1);
314 bool expiry_done() const
316 return expiring_segments.empty() && expired_segments.empty();
320 void write_head(MDSInternalContextBase *onfinish);
323 void create(MDSInternalContextBase *onfinish); // fresh, empty log!
324 void open(MDSInternalContextBase *onopen); // append() or replay() to follow!
325 void reopen(MDSInternalContextBase *onopen);
327 void replay(MDSInternalContextBase *onfinish);
329 void standby_trim_segments();
331 void dump_replay_status(Formatter *f) const;