Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / MDLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "MDSRank.h"
16 #include "MDLog.h"
17 #include "MDCache.h"
18 #include "LogEvent.h"
19 #include "MDSContext.h"
20
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
23
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
27
28 #include "events/ESubtreeMap.h"
29
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/assert.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39 // cons/des
40 MDLog::~MDLog()
41 {
42   if (journaler) { delete journaler; journaler = 0; }
43   if (logger) {
44     g_ceph_context->get_perfcounters_collection()->remove(logger);
45     delete logger;
46     logger = 0;
47   }
48 }
49
50
51 void MDLog::create_logger()
52 {
53   PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
55   plb.add_u64_counter(l_mdl_evadd, "evadd",
56       "Events submitted", "subm", PerfCountersBuilder::PRIO_INTERESTING);
57   plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
58   plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
59   plb.add_u64(l_mdl_ev, "ev",
60       "Events", "evts", PerfCountersBuilder::PRIO_INTERESTING);
61   plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
62   plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
63
64   plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
65   plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
66   plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
67   plb.add_u64(l_mdl_seg, "seg",
68       "Segments", "segs", PerfCountersBuilder::PRIO_INTERESTING);
69   plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
70   plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
71
72   plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
73   plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler  write position");
74   plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler  read position");
75   plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
76
77   plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed");
78
79   // logger
80   logger = plb.create_perf_counters();
81   g_ceph_context->get_perfcounters_collection()->add(logger);
82 }
83
84 void MDLog::set_write_iohint(unsigned iohint_flags)
85 {
86   journaler->set_write_iohint(iohint_flags);
87 }
88
89 class C_MDL_WriteError : public MDSIOContextBase {
90   protected:
91   MDLog *mdlog;
92   MDSRank *get_mds() override {return mdlog->mds;}
93
94   void finish(int r) override {
95     MDSRank *mds = get_mds();
96     // assume journal is reliable, so don't choose action based on
97     // g_conf->mds_action_on_write_error.
98     if (r == -EBLACKLISTED) {
99       derr << "we have been blacklisted (fenced), respawning..." << dendl;
100       mds->respawn();
101     } else {
102       derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
103       // Although it's possible that this could be something transient,
104       // it's severe and scary, so disable this rank until an administrator
105       // intervenes.
106       mds->clog->error() << "Unhandled journal write error on MDS rank " <<
107         mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
108       mds->damaged();
109       ceph_abort();  // damaged should never return
110     }
111   }
112
113   public:
114   explicit C_MDL_WriteError(MDLog *m) : mdlog(m) {}
115 };
116
117
118 void MDLog::write_head(MDSInternalContextBase *c) 
119 {
120   Context *fin = NULL;
121   if (c != NULL) {
122     fin = new C_IO_Wrapper(mds, c);
123   }
124   journaler->write_head(fin);
125 }
126
127 uint64_t MDLog::get_read_pos() const
128 {
129   return journaler->get_read_pos(); 
130 }
131
132 uint64_t MDLog::get_write_pos() const
133 {
134   return journaler->get_write_pos(); 
135 }
136
137 uint64_t MDLog::get_safe_pos() const
138 {
139   return journaler->get_write_safe_pos(); 
140 }
141
142
143
144 void MDLog::create(MDSInternalContextBase *c)
145 {
146   dout(5) << "create empty log" << dendl;
147
148   C_GatherBuilder gather(g_ceph_context);
149   // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
150   // XXX but should maybe that be handled inside Journaler?
151   gather.set_finisher(new C_IO_Wrapper(mds, c));
152
153   // The inode of the default Journaler we will create
154   ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
155
156   // Instantiate Journaler and start async write to RADOS
157   assert(journaler == NULL);
158   journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
159                             CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
160                             l_mdl_jlat, mds->finisher);
161   assert(journaler->is_readonly());
162   journaler->set_write_error_handler(new C_MDL_WriteError(this));
163   journaler->set_writeable();
164   journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
165   journaler->write_head(gather.new_sub());
166
167   // Async write JournalPointer to RADOS
168   JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
169   jp.front = ino;
170   jp.back = 0;
171   jp.save(mds->objecter, gather.new_sub());
172
173   gather.activate();
174
175   logger->set(l_mdl_expos, journaler->get_expire_pos());
176   logger->set(l_mdl_wrpos, journaler->get_write_pos());
177
178   submit_thread.create("md_submit");
179 }
180
181 void MDLog::open(MDSInternalContextBase *c)
182 {
183   dout(5) << "open discovering log bounds" << dendl;
184
185   assert(!recovery_thread.is_started());
186   recovery_thread.set_completion(c);
187   recovery_thread.create("md_recov_open");
188
189   submit_thread.create("md_submit");
190   // either append() or replay() will follow.
191 }
192
193 /**
194  * Final part of reopen() procedure, after recovery_thread
195  * has done its thing we call append()
196  */
197 class C_ReopenComplete : public MDSInternalContext {
198   MDLog *mdlog;
199   MDSInternalContextBase *on_complete;
200 public:
201   C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
202   void finish(int r) override {
203     mdlog->append();
204     on_complete->complete(r);
205   }
206 };
207
208 /**
209  * Given that open() has been called in the past, go through the journal
210  * recovery procedure again, potentially reformatting the journal if it
211  * was in an old format.
212  */
213 void MDLog::reopen(MDSInternalContextBase *c)
214 {
215   dout(5) << "reopen" << dendl;
216
217   // Because we will call append() at the completion of this, check that we have already
218   // read the whole journal.
219   assert(journaler != NULL);
220   assert(journaler->get_read_pos() == journaler->get_write_pos());
221
222   delete journaler;
223   journaler = NULL;
224
225   // recovery_thread was started at some point in the past.  Although
226   // it has called it's completion if we made it back here, it might
227   // still not have been cleaned up: join it.
228   recovery_thread.join();
229
230   recovery_thread.set_completion(new C_ReopenComplete(this, c));
231   recovery_thread.create("md_recov_reopen");
232 }
233
234 void MDLog::append()
235 {
236   dout(5) << "append positioning at end and marking writeable" << dendl;
237   journaler->set_read_pos(journaler->get_write_pos());
238   journaler->set_expire_pos(journaler->get_write_pos());
239   
240   journaler->set_writeable();
241
242   logger->set(l_mdl_expos, journaler->get_write_pos());
243 }
244
245
246
247 // -------------------------------------------------
248
249 void MDLog::_start_entry(LogEvent *e)
250 {
251   assert(submit_mutex.is_locked_by_me());
252
253   assert(cur_event == NULL);
254   cur_event = e;
255
256   event_seq++;
257
258   EMetaBlob *metablob = e->get_metablob();
259   if (metablob) {
260     metablob->event_seq = event_seq;
261     metablob->last_subtree_map = get_last_segment_seq();
262   }
263 }
264
265 void MDLog::cancel_entry(LogEvent *le)
266 {
267   assert(le == cur_event);
268   cur_event = NULL;
269   delete le;
270 }
271
272 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
273 {
274   assert(submit_mutex.is_locked_by_me());
275   assert(!mds->is_any_replay());
276   assert(!capped);
277
278   assert(le == cur_event);
279   cur_event = NULL;
280
281   // let the event register itself in the segment
282   assert(!segments.empty());
283   LogSegment *ls = segments.rbegin()->second;
284   ls->num_events++;
285
286   le->_segment = ls;
287   le->update_segment();
288   le->set_stamp(ceph_clock_now());
289
290   mdsmap_up_features = mds->mdsmap->get_up_features();
291   pending_events[ls->seq].push_back(PendingEvent(le, c));
292   num_events++;
293
294   if (logger) {
295     logger->inc(l_mdl_evadd);
296     logger->set(l_mdl_ev, num_events);
297   }
298
299   unflushed++;
300   
301   uint64_t period = journaler->get_layout_period();
302   // start a new segment?
303   if (le->get_type() == EVENT_SUBTREEMAP ||
304       (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
305     // avoid infinite loop when ESubtreeMap is very large.
306     // do not insert ESubtreeMap among EImportFinish events that finish
307     // disambiguate imports. Because the ESubtreeMap reflects the subtree
308     // state when all EImportFinish events are replayed.
309   } else if (ls->end/period != ls->offset/period ||
310              ls->num_events >= g_conf->mds_log_events_per_segment) {
311     dout(10) << "submit_entry also starting new segment: last = "
312              << ls->seq  << "/" << ls->offset << ", event seq = " << event_seq << dendl;
313     _start_new_segment();
314   } else if (g_conf->mds_debug_subtrees &&
315              le->get_type() != EVENT_SUBTREEMAP_TEST) {
316     // debug: journal this every time to catch subtree replay bugs.
317     // use a different event id so it doesn't get interpreted as a
318     // LogSegment boundary on replay.
319     LogEvent *sle = mds->mdcache->create_subtree_map();
320     sle->set_type(EVENT_SUBTREEMAP_TEST);
321     _submit_entry(sle, NULL);
322   }
323 }
324
325 /**
326  * Invoked on the flush after each entry submitted
327  */
328 class C_MDL_Flushed : public MDSLogContextBase {
329 protected:
330   MDLog *mdlog;
331   MDSRank *get_mds() override {return mdlog->mds;}
332   MDSInternalContextBase *wrapped;
333
334   void finish(int r) override {
335     if (wrapped)
336       wrapped->complete(r);
337   }
338
339 public:
340   C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w)
341     : mdlog(m), wrapped(w) {}
342   C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
343     set_write_pos(wp);
344   }
345 };
346
347 void MDLog::_submit_thread()
348 {
349   dout(10) << "_submit_thread start" << dendl;
350
351   submit_mutex.Lock();
352
353   while (!mds->is_daemon_stopping()) {
354     if (g_conf->mds_log_pause) {
355       submit_cond.Wait(submit_mutex);
356       continue;
357     }
358
359     map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
360     if (it == pending_events.end()) {
361       submit_cond.Wait(submit_mutex);
362       continue;
363     }
364
365     if (it->second.empty()) {
366       pending_events.erase(it);
367       continue;
368     }
369
370     int64_t features = mdsmap_up_features;
371     PendingEvent data = it->second.front();
372     it->second.pop_front();
373
374     submit_mutex.Unlock();
375
376     if (data.le) {
377       LogEvent *le = data.le;
378       LogSegment *ls = le->_segment;
379       // encode it, with event type
380       bufferlist bl;
381       le->encode_with_header(bl, features);
382
383       uint64_t write_pos = journaler->get_write_pos();
384
385       le->set_start_off(write_pos);
386       if (le->get_type() == EVENT_SUBTREEMAP)
387         ls->offset = write_pos;
388
389       dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
390               << " : " << *le << dendl;
391
392       // journal it.
393       const uint64_t new_write_pos = journaler->append_entry(bl);  // bl is destroyed.
394       ls->end = new_write_pos;
395
396       MDSLogContextBase *fin;
397       if (data.fin) {
398         fin = dynamic_cast<MDSLogContextBase*>(data.fin);
399         assert(fin);
400         fin->set_write_pos(new_write_pos);
401       } else {
402         fin = new C_MDL_Flushed(this, new_write_pos);
403       }
404
405       journaler->wait_for_flush(fin);
406
407       if (data.flush)
408         journaler->flush();
409
410       if (logger)
411         logger->set(l_mdl_wrpos, ls->end);
412
413       delete le;
414     } else {
415       if (data.fin) {
416         MDSInternalContextBase* fin =
417                 dynamic_cast<MDSInternalContextBase*>(data.fin);
418         assert(fin);
419         C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
420         fin2->set_write_pos(journaler->get_write_pos());
421         journaler->wait_for_flush(fin2);
422       }
423       if (data.flush)
424         journaler->flush();
425     }
426
427     submit_mutex.Lock();
428     if (data.flush)
429       unflushed = 0;
430     else if (data.le)
431       unflushed++;
432   }
433
434   submit_mutex.Unlock();
435 }
436
437 void MDLog::wait_for_safe(MDSInternalContextBase *c)
438 {
439   submit_mutex.Lock();
440
441   bool no_pending = true;
442   if (!pending_events.empty()) {
443     pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
444     no_pending = false;
445     submit_cond.Signal();
446   }
447
448   submit_mutex.Unlock();
449
450   if (no_pending && c)
451     journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
452 }
453
454 void MDLog::flush()
455 {
456   submit_mutex.Lock();
457
458   bool do_flush = unflushed > 0;
459   unflushed = 0;
460   if (!pending_events.empty()) {
461     pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
462     do_flush = false;
463     submit_cond.Signal();
464   }
465
466   submit_mutex.Unlock();
467
468   if (do_flush)
469     journaler->flush();
470 }
471
472 void MDLog::kick_submitter()
473 {
474   Mutex::Locker l(submit_mutex);
475   submit_cond.Signal();
476 }
477
478 void MDLog::cap()
479
480   dout(5) << "cap" << dendl;
481   capped = true;
482 }
483
484 void MDLog::shutdown()
485 {
486   assert(mds->mds_lock.is_locked_by_me());
487
488   dout(5) << "shutdown" << dendl;
489   if (submit_thread.is_started()) {
490     assert(mds->is_daemon_stopping());
491
492     if (submit_thread.am_self()) {
493       // Called suicide from the thread: trust it to do no work after
494       // returning from suicide, and subsequently respect mds->is_daemon_stopping()
495       // and fall out of its loop.
496     } else {
497       mds->mds_lock.Unlock();
498       // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
499       // picking it up will do anything with it.
500    
501       submit_mutex.Lock();
502       submit_cond.Signal();
503       submit_mutex.Unlock();
504
505       mds->mds_lock.Lock();
506
507       submit_thread.join();
508     }
509   }
510
511   // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
512   // so we need to shutdown the journaler first.
513   if (journaler) {
514     journaler->shutdown();
515   }
516
517   if (replay_thread.is_started() && !replay_thread.am_self()) {
518     mds->mds_lock.Unlock();
519     replay_thread.join();
520     mds->mds_lock.Lock();
521   }
522
523   if (recovery_thread.is_started() && !recovery_thread.am_self()) {
524     mds->mds_lock.Unlock();
525     recovery_thread.join();
526     mds->mds_lock.Lock();
527   }
528 }
529
530
531 // -----------------------------
532 // segments
533
534 void MDLog::_start_new_segment()
535 {
536   _prepare_new_segment();
537   _journal_segment_subtree_map(NULL);
538 }
539
540 void MDLog::_prepare_new_segment()
541 {
542   assert(submit_mutex.is_locked_by_me());
543
544   uint64_t seq = event_seq + 1;
545   dout(7) << __func__ << " seq " << seq << dendl;
546
547   segments[seq] = new LogSegment(seq);
548
549   logger->inc(l_mdl_segadd);
550   logger->set(l_mdl_seg, segments.size());
551
552   // Adjust to next stray dir
553   dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid() 
554            << dendl;
555   mds->mdcache->advance_stray();
556 }
557
558 void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
559 {
560   assert(submit_mutex.is_locked_by_me());
561
562   dout(7) << __func__ << dendl;
563   ESubtreeMap *sle = mds->mdcache->create_subtree_map();
564   sle->event_seq = get_last_segment_seq();
565
566   _submit_entry(sle, new C_MDL_Flushed(this, onsync));
567 }
568
569 void MDLog::trim(int m)
570 {
571   unsigned max_segments = g_conf->mds_log_max_segments;
572   int max_events = g_conf->mds_log_max_events;
573   if (m >= 0)
574     max_events = m;
575
576   if (mds->mdcache->is_readonly()) {
577     dout(10) << "trim, ignoring read-only FS" <<  dendl;
578     return;
579   }
580
581   // Clamp max_events to not be smaller than events per segment
582   if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) {
583     max_events = g_conf->mds_log_events_per_segment + 1;
584   }
585
586   submit_mutex.Lock();
587
588   // trim!
589   dout(10) << "trim " 
590            << segments.size() << " / " << max_segments << " segments, " 
591            << num_events << " / " << max_events << " events"
592            << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
593            << ", " << expired_segments.size() << " (" << expired_events << ") expired"
594            << dendl;
595
596   if (segments.empty()) {
597     submit_mutex.Unlock();
598     return;
599   }
600
601   // hack: only trim for a few seconds at a time
602   utime_t stop = ceph_clock_now();
603   stop += 2.0;
604
605   map<uint64_t,LogSegment*>::iterator p = segments.begin();
606   while (p != segments.end() &&
607          ((max_events >= 0 &&
608            num_events - expiring_events - expired_events > max_events) ||
609           (segments.size() - expiring_segments.size() - expired_segments.size() > max_segments))) {
610     
611     if (stop < ceph_clock_now())
612       break;
613
614     int num_expiring_segments = (int)expiring_segments.size();
615     if (num_expiring_segments >= g_conf->mds_log_max_expiring)
616       break;
617
618     int op_prio = CEPH_MSG_PRIO_LOW +
619                   (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
620                   num_expiring_segments / g_conf->mds_log_max_expiring;
621
622     // look at first segment
623     LogSegment *ls = p->second;
624     assert(ls);
625     ++p;
626     
627     if (pending_events.count(ls->seq) ||
628         ls->end > safe_pos) {
629       dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
630               << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
631       break;
632     }
633     if (expiring_segments.count(ls)) {
634       dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
635               << ", " << ls->num_events << " events" << dendl;
636     } else if (expired_segments.count(ls)) {
637       dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
638               << ", " << ls->num_events << " events" << dendl;
639     } else {
640       assert(expiring_segments.count(ls) == 0);
641       expiring_segments.insert(ls);
642       expiring_events += ls->num_events;
643       submit_mutex.Unlock();
644
645       uint64_t last_seq = ls->seq;
646       try_expire(ls, op_prio);
647
648       submit_mutex.Lock();
649       p = segments.lower_bound(last_seq + 1);
650     }
651   }
652
653   // discard expired segments and unlock submit_mutex
654   _trim_expired_segments();
655 }
656
657 class C_MaybeExpiredSegment : public MDSInternalContext {
658   MDLog *mdlog;
659   LogSegment *ls;
660   int op_prio;
661   public:
662   C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
663     MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
664   void finish(int res) override {
665     if (res < 0)
666       mdlog->mds->handle_write_error(res);
667     mdlog->_maybe_expired(ls, op_prio);
668   }
669 };
670
671 /**
672  * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
673  * segment.
674  */
675 int MDLog::trim_all()
676 {
677   submit_mutex.Lock();
678
679   dout(10) << __func__ << ": "
680            << segments.size()
681            << "/" << expiring_segments.size()
682            << "/" << expired_segments.size() << dendl;
683
684   uint64_t last_seq = 0;
685   if (!segments.empty())
686     last_seq = get_last_segment_seq();
687
688   map<uint64_t,LogSegment*>::iterator p = segments.begin();
689   while (p != segments.end() &&
690          p->first < last_seq && p->second->end <= safe_pos) {
691     LogSegment *ls = p->second;
692     ++p;
693
694     // Caller should have flushed journaler before calling this
695     if (pending_events.count(ls->seq)) {
696       dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
697       submit_mutex.Unlock();
698       return -EAGAIN;
699     }
700
701     if (expiring_segments.count(ls)) {
702       dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
703               << ", " << ls->num_events << " events" << dendl;
704     } else if (expired_segments.count(ls)) {
705       dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
706               << ", " << ls->num_events << " events" << dendl;
707     } else {
708       assert(expiring_segments.count(ls) == 0);
709       expiring_segments.insert(ls);
710       expiring_events += ls->num_events;
711       submit_mutex.Unlock();
712
713       uint64_t next_seq = ls->seq + 1;
714       try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
715
716       submit_mutex.Lock();
717       p = segments.lower_bound(next_seq);
718     }
719   }
720
721   _trim_expired_segments();
722
723   return 0;
724 }
725
726
727 void MDLog::try_expire(LogSegment *ls, int op_prio)
728 {
729   MDSGatherBuilder gather_bld(g_ceph_context);
730   ls->try_to_expire(mds, gather_bld, op_prio);
731
732   if (gather_bld.has_subs()) {
733     dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
734     gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
735     gather_bld.activate();
736   } else {
737     dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
738     submit_mutex.Lock();
739     assert(expiring_segments.count(ls));
740     expiring_segments.erase(ls);
741     expiring_events -= ls->num_events;
742     _expired(ls);
743     submit_mutex.Unlock();
744   }
745   
746   logger->set(l_mdl_segexg, expiring_segments.size());
747   logger->set(l_mdl_evexg, expiring_events);
748 }
749
750 void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
751 {
752   if (mds->mdcache->is_readonly()) {
753     dout(10) << "_maybe_expired, ignoring read-only FS" <<  dendl;
754     return;
755   }
756
757   dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
758            << ", " << ls->num_events << " events" << dendl;
759   try_expire(ls, op_prio);
760 }
761
762 void MDLog::_trim_expired_segments()
763 {
764   assert(submit_mutex.is_locked_by_me());
765
766   // trim expired segments?
767   bool trimmed = false;
768   while (!segments.empty()) {
769     LogSegment *ls = segments.begin()->second;
770     if (!expired_segments.count(ls)) {
771       dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
772                << " to expire" << dendl;
773       break;
774     }
775     
776     dout(10) << "_trim_expired_segments trimming expired "
777              << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
778     expired_events -= ls->num_events;
779     expired_segments.erase(ls);
780     num_events -= ls->num_events;
781       
782     // this was the oldest segment, adjust expire pos
783     if (journaler->get_expire_pos() < ls->end) {
784       journaler->set_expire_pos(ls->end);
785       logger->set(l_mdl_expos, ls->end);
786     } else {
787       logger->set(l_mdl_expos, ls->offset);
788     }
789     
790     logger->inc(l_mdl_segtrm);
791     logger->inc(l_mdl_evtrm, ls->num_events);
792     
793     segments.erase(ls->seq);
794     delete ls;
795     trimmed = true;
796   }
797
798   submit_mutex.Unlock();
799
800   if (trimmed)
801     journaler->write_head(0);
802 }
803
804 void MDLog::trim_expired_segments()
805 {
806   submit_mutex.Lock();
807   _trim_expired_segments();
808 }
809
810 void MDLog::_expired(LogSegment *ls)
811 {
812   assert(submit_mutex.is_locked_by_me());
813
814   dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
815           << ", " << ls->num_events << " events" << dendl;
816
817   if (!capped && ls == peek_current_segment()) {
818     dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
819             << ", last one and !capped" << dendl;
820   } else {
821     // expired.
822     expired_segments.insert(ls);
823     expired_events += ls->num_events;
824
825     // Trigger all waiters
826     for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
827         i != ls->expiry_waiters.end(); ++i) {
828       (*i)->complete(0);
829     }
830     ls->expiry_waiters.clear();
831     
832     logger->inc(l_mdl_evex, ls->num_events);
833     logger->inc(l_mdl_segex);
834   }
835
836   logger->set(l_mdl_ev, num_events);
837   logger->set(l_mdl_evexd, expired_events);
838   logger->set(l_mdl_seg, segments.size());
839   logger->set(l_mdl_segexd, expired_segments.size());
840 }
841
842
843
844 void MDLog::replay(MDSInternalContextBase *c)
845 {
846   assert(journaler->is_active());
847   assert(journaler->is_readonly());
848
849   // empty?
850   if (journaler->get_read_pos() == journaler->get_write_pos()) {
851     dout(10) << "replay - journal empty, done." << dendl;
852     mds->mdcache->trim();
853     if (c) {
854       c->complete(0);
855     }
856     return;
857   }
858
859   // add waiter
860   if (c)
861     waitfor_replay.push_back(c);
862
863   // go!
864   dout(10) << "replay start, from " << journaler->get_read_pos()
865            << " to " << journaler->get_write_pos() << dendl;
866
867   assert(num_events == 0 || already_replayed);
868   if (already_replayed) {
869     // Ensure previous instance of ReplayThread is joined before
870     // we create another one
871     replay_thread.join();
872   }
873   already_replayed = true;
874
875   replay_thread.create("md_log_replay");
876 }
877
878
879 /**
880  * Resolve the JournalPointer object to a journal file, and
881  * instantiate a Journaler object.  This may re-write the journal
882  * if the journal in RADOS appears to be in an old format.
883  *
884  * This is a separate thread because of the way it is initialized from inside
885  * the mds lock, which is also the global objecter lock -- rather than split
886  * it up into hard-to-read async operations linked up by contexts, 
887  *
888  * When this function completes, the `journaler` attribute will be set to
889  * a Journaler instance using the latest available serialization format.
890  */
891 void MDLog::_recovery_thread(MDSInternalContextBase *completion)
892 {
893   assert(journaler == NULL);
894   if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
895       dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
896               << JOURNAL_FORMAT_MAX << dendl;
897
898       // Oh dear, something unreadable in the store for this rank: require
899       // operator intervention.
900       mds->damaged();
901       ceph_abort();  // damaged should not return
902   }
903
904   // First, read the pointer object.
905   // If the pointer object is not present, then create it with
906   // front = default ino and back = null
907   JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
908   const int read_result = jp.load(mds->objecter);
909   if (read_result == -ENOENT) {
910     inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
911     jp.front = default_log_ino;
912     int write_result = jp.save(mds->objecter);
913     // Nothing graceful we can do for this
914     assert(write_result >= 0);
915   } else if (read_result == -EBLACKLISTED) {
916     derr << "Blacklisted during JournalPointer read!  Respawning..." << dendl;
917     mds->respawn();
918     ceph_abort(); // Should be unreachable because respawn calls execv
919   } else if (read_result != 0) {
920     mds->clog->error() << "failed to read JournalPointer: " << read_result
921                        << " (" << cpp_strerror(read_result) << ")";
922     mds->damaged_unlocked();
923     ceph_abort();  // Should be unreachable because damaged() calls respawn()
924   }
925
926   // If the back pointer is non-null, that means that a journal
927   // rewrite failed part way through.  Erase the back journal
928   // to clean up.
929   if (jp.back) {
930     if (mds->is_standby_replay()) {
931       dout(1) << "Journal " << jp.front << " is being rewritten, "
932         << "cannot replay in standby until an active MDS completes rewrite" << dendl;
933       Mutex::Locker l(mds->mds_lock);
934       if (mds->is_daemon_stopping()) {
935         return;
936       }
937       completion->complete(-EAGAIN);
938       return;
939     }
940     dout(1) << "Erasing journal " << jp.back << dendl;
941     C_SaferCond erase_waiter;
942     Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(),
943         CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
944         mds->finisher);
945
946     // Read all about this journal (header + extents)
947     C_SaferCond recover_wait;
948     back.recover(&recover_wait);
949     int recovery_result = recover_wait.wait();
950     if (recovery_result == -EBLACKLISTED) {
951       derr << "Blacklisted during journal recovery!  Respawning..." << dendl;
952       mds->respawn();
953       ceph_abort(); // Should be unreachable because respawn calls execv
954     } else if (recovery_result != 0) {
955       // Journaler.recover succeeds if no journal objects are present: an error
956       // means something worse like a corrupt header, which we can't handle here.
957       mds->clog->error() << "Error recovering journal " << jp.front << ": "
958         << cpp_strerror(recovery_result);
959       mds->damaged_unlocked();
960       assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
961     }
962
963     // We could read journal, so we can erase it.
964     back.erase(&erase_waiter);
965     int erase_result = erase_waiter.wait();
966
967     // If we are successful, or find no data, we can update the JournalPointer to
968     // reflect that the back journal is gone.
969     if (erase_result != 0 && erase_result != -ENOENT) {
970       derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
971     } else {
972       dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
973       jp.back = 0;
974       int write_result = jp.save(mds->objecter);
975       // Nothing graceful we can do for this
976       assert(write_result >= 0);
977     }
978   }
979
980   /* Read the header from the front journal */
981   Journaler *front_journal = new Journaler("mdlog", jp.front,
982       mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
983       logger, l_mdl_jlat, mds->finisher);
984
985   // Assign to ::journaler so that we can be aborted by ::shutdown while
986   // waiting for journaler recovery
987   {
988     Mutex::Locker l(mds->mds_lock);
989     journaler = front_journal;
990   }
991
992   C_SaferCond recover_wait;
993   front_journal->recover(&recover_wait);
994   dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
995   int recovery_result = recover_wait.wait();
996   dout(4) << "Journal " << jp.front << " recovered." << dendl;
997
998   if (recovery_result == -EBLACKLISTED) {
999     derr << "Blacklisted during journal recovery!  Respawning..." << dendl;
1000     mds->respawn();
1001     ceph_abort(); // Should be unreachable because respawn calls execv
1002   } else if (recovery_result != 0) {
1003     mds->clog->error() << "Error recovering journal " << jp.front << ": "
1004       << cpp_strerror(recovery_result);
1005     mds->damaged_unlocked();
1006     assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1007   }
1008
1009   /* Check whether the front journal format is acceptable or needs re-write */
1010   if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1011     dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1012             << ", does this MDS daemon require upgrade?" << dendl;
1013     {
1014       Mutex::Locker l(mds->mds_lock);
1015       if (mds->is_daemon_stopping()) {
1016         journaler = NULL;
1017         delete front_journal;
1018         return;
1019       }
1020       completion->complete(-EINVAL);
1021     }
1022   } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
1023     /* The journal is of configured format, or we are in standbyreplay and will
1024      * tolerate replaying old journals until we have to go active. Use front_journal as
1025      * our journaler attribute and complete */
1026     dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1027     journaler->set_write_error_handler(new C_MDL_WriteError(this));
1028     {
1029       Mutex::Locker l(mds->mds_lock);
1030       if (mds->is_daemon_stopping()) {
1031         return;
1032       }
1033       completion->complete(0);
1034     }
1035   } else {
1036     /* Hand off to reformat routine, which will ultimately set the
1037      * completion when it has done its thing */
1038     dout(1) << "Journal " << jp.front << " has old format "
1039       << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1040     _reformat_journal(jp, front_journal, completion);
1041   }
1042 }
1043
1044 /**
1045  * Blocking rewrite of the journal to a new file, followed by
1046  * swap of journal pointer to point to the new one.
1047  *
1048  * We write the new journal to the 'back' journal from the JournalPointer,
1049  * swapping pointers to make that one the front journal only when we have
1050  * safely completed.
1051  */
1052 void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
1053 {
1054   assert(!jp_in.is_null());
1055   assert(completion != NULL);
1056   assert(old_journal != NULL);
1057
1058   JournalPointer jp = jp_in;
1059
1060   /* Set JournalPointer.back to the location we will write the new journal */
1061   inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1062   inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1063   jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1064   int write_result = jp.save(mds->objecter);
1065   assert(write_result == 0);
1066
1067   /* Create the new Journaler file */
1068   Journaler *new_journal = new Journaler("mdlog", jp.back,
1069       mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1070   dout(4) << "Writing new journal header " << jp.back << dendl;
1071   file_layout_t new_layout = old_journal->get_layout();
1072   new_journal->set_writeable();
1073   new_journal->create(&new_layout, g_conf->mds_journal_format);
1074
1075   /* Write the new journal header to RADOS */
1076   C_SaferCond write_head_wait;
1077   new_journal->write_head(&write_head_wait);
1078   write_head_wait.wait();
1079
1080   // Read in the old journal, and whenever we have readable events,
1081   // write them to the new journal.
1082   int r = 0;
1083
1084   // In old format journals before event_seq was introduced, the serialized
1085   // offset of a SubtreeMap message in the log is used as the unique ID for
1086   // a log segment.  Because we change serialization, this will end up changing
1087   // for us, so we have to explicitly update the fields that point back to that
1088   // log segment.
1089   std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
1090
1091   // The logic in here borrowed from replay_thread expects mds_lock to be held,
1092   // e.g. between checking readable and doing wait_for_readable so that journaler
1093   // state doesn't change in between.
1094   uint32_t events_transcribed = 0;
1095   while (1) {
1096     while (!old_journal->is_readable() &&
1097            old_journal->get_read_pos() < old_journal->get_write_pos() &&
1098            !old_journal->get_error()) {
1099
1100       // Issue a journal prefetch
1101       C_SaferCond readable_waiter;
1102       old_journal->wait_for_readable(&readable_waiter);
1103
1104       // Wait for a journal prefetch to complete
1105       readable_waiter.wait();
1106     }
1107     if (old_journal->get_error()) {
1108       r = old_journal->get_error();
1109       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1110       break;
1111     }
1112
1113     if (!old_journal->is_readable() &&
1114         old_journal->get_read_pos() == old_journal->get_write_pos())
1115       break;
1116
1117     // Read one serialized LogEvent
1118     assert(old_journal->is_readable());
1119     bufferlist bl;
1120     uint64_t le_pos = old_journal->get_read_pos();
1121     bool r = old_journal->try_read_entry(bl);
1122     if (!r && old_journal->get_error())
1123       continue;
1124     assert(r);
1125
1126     // Update segment_pos_rewrite
1127     LogEvent *le = LogEvent::decode(bl);
1128     if (le) {
1129       bool modified = false;
1130
1131       if (le->get_type() == EVENT_SUBTREEMAP ||
1132           le->get_type() == EVENT_RESETJOURNAL) {
1133         ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1134         if (sle == NULL || sle->event_seq == 0) {
1135           // A non-explicit event seq: the effective sequence number 
1136           // of this segment is it's position in the old journal and
1137           // the new effective sequence number will be its position
1138           // in the new journal.
1139           segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1140           dout(20) << __func__ << " discovered segment seq mapping "
1141             << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1142         }
1143       } else {
1144         event_seq++;
1145       }
1146
1147       // Rewrite segment references if necessary
1148       EMetaBlob *blob = le->get_metablob();
1149       if (blob) {
1150         modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1151       }
1152
1153       // Zero-out expire_pos in subtreemap because offsets have changed
1154       // (expire_pos is just an optimization so it's safe to eliminate it)
1155       if (le->get_type() == EVENT_SUBTREEMAP
1156           || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1157         ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1158         assert(sle != NULL);
1159         dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1160           << le_pos << " seq=" << sle->event_seq << dendl;
1161         sle->expire_pos = 0;
1162         modified = true;
1163       }
1164
1165       if (modified) {
1166         bl.clear();
1167         le->encode_with_header(bl, mds->mdsmap->get_up_features());
1168       }
1169
1170       delete le;
1171     } else {
1172       // Failure from LogEvent::decode, our job is to change the journal wrapper,
1173       // not validate the contents, so pass it through.
1174       dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1175         << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1176         << dendl;
1177     }
1178
1179     // Write (buffered, synchronous) one serialized LogEvent
1180     events_transcribed += 1;
1181     new_journal->append_entry(bl);
1182   }
1183
1184   dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1185   C_SaferCond flush_waiter;
1186   new_journal->flush(&flush_waiter);
1187   flush_waiter.wait();
1188
1189   // If failed to rewrite journal, leave the part written journal
1190   // as garbage to be cleaned up next startup.
1191   assert(r == 0);
1192
1193   /* Now that the new journal is safe, we can flip the pointers */
1194   inodeno_t const tmp = jp.front;
1195   jp.front = jp.back;
1196   jp.back = tmp;
1197   write_result = jp.save(mds->objecter);
1198   assert(write_result == 0);
1199
1200   /* Delete the old journal to free space */
1201   dout(1) << "New journal flushed, erasing old journal" << dendl;
1202   C_SaferCond erase_waiter;
1203   old_journal->erase(&erase_waiter);
1204   int erase_result = erase_waiter.wait();
1205   assert(erase_result == 0);
1206   {
1207     Mutex::Locker l(mds->mds_lock);
1208     if (mds->is_daemon_stopping()) {
1209       delete new_journal;
1210       return;
1211     }
1212     assert(journaler == old_journal);
1213     journaler = NULL;
1214     delete old_journal;
1215   }
1216
1217   /* Update the pointer to reflect we're back in clean single journal state. */
1218   jp.back = 0;
1219   write_result = jp.save(mds->objecter);
1220   assert(write_result == 0);
1221
1222   /* Reset the Journaler object to its default state */
1223   dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1224   {
1225     Mutex::Locker l(mds->mds_lock);
1226     if (mds->is_daemon_stopping()) {
1227       delete new_journal;
1228       return;
1229     }
1230     journaler = new_journal;
1231     journaler->set_readonly();
1232     journaler->set_write_error_handler(new C_MDL_WriteError(this));
1233   }
1234
1235   /* Trigger completion */
1236   {
1237     Mutex::Locker l(mds->mds_lock);
1238     if (mds->is_daemon_stopping()) {
1239       return;
1240     }
1241     completion->complete(0);
1242   }
1243 }
1244
1245
1246 // i am a separate thread
1247 void MDLog::_replay_thread()
1248 {
1249   dout(10) << "_replay_thread start" << dendl;
1250
1251   // loop
1252   int r = 0;
1253   while (1) {
1254     // wait for read?
1255     while (!journaler->is_readable() &&
1256            journaler->get_read_pos() < journaler->get_write_pos() &&
1257            !journaler->get_error()) {
1258       C_SaferCond readable_waiter;
1259       journaler->wait_for_readable(&readable_waiter);
1260       r = readable_waiter.wait();
1261     }
1262     if (journaler->get_error()) {
1263       r = journaler->get_error();
1264       dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1265       if (r == -ENOENT) {
1266         if (mds->is_standby_replay()) {
1267           // journal has been trimmed by somebody else
1268           r = -EAGAIN;
1269         } else {
1270           mds->clog->error() << "missing journal object";
1271           mds->damaged_unlocked();
1272           ceph_abort();  // Should be unreachable because damaged() calls respawn()
1273         }
1274       } else if (r == -EINVAL) {
1275         if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1276           // this should only happen if you're following somebody else
1277           if(journaler->is_readonly()) {
1278             dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1279             r = -EAGAIN;
1280           } else {
1281             mds->clog->error() << "invalid journaler offsets";
1282             mds->damaged_unlocked();
1283             ceph_abort();  // Should be unreachable because damaged() calls respawn()
1284           }
1285         } else {
1286           /* re-read head and check it
1287            * Given that replay happens in a separate thread and
1288            * the MDS is going to either shut down or restart when
1289            * we return this error, doing it synchronously is fine
1290            * -- as long as we drop the main mds lock--. */
1291           C_SaferCond reread_fin;
1292           journaler->reread_head(&reread_fin);
1293           int err = reread_fin.wait();
1294           if (err) {
1295             if (err == -ENOENT && mds->is_standby_replay()) {
1296               r = -EAGAIN;
1297               dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1298                       << dendl;
1299               break;
1300             } else {
1301                 dout(0) << "got error while reading head: " << cpp_strerror(err)
1302                         << dendl;
1303
1304                 mds->clog->error() << "error reading journal header";
1305                 mds->damaged_unlocked();
1306                 ceph_abort();  // Should be unreachable because damaged() calls
1307                             // respawn()
1308             }
1309           }
1310           standby_trim_segments();
1311           if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1312             dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1313             r = -EAGAIN;
1314           }
1315         }
1316       }
1317       break;
1318     }
1319
1320     if (!journaler->is_readable() &&
1321         journaler->get_read_pos() == journaler->get_write_pos())
1322       break;
1323     
1324     assert(journaler->is_readable() || mds->is_daemon_stopping());
1325     
1326     // read it
1327     uint64_t pos = journaler->get_read_pos();
1328     bufferlist bl;
1329     bool r = journaler->try_read_entry(bl);
1330     if (!r && journaler->get_error())
1331       continue;
1332     assert(r);
1333     
1334     // unpack event
1335     LogEvent *le = LogEvent::decode(bl);
1336     if (!le) {
1337       dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() 
1338               << " -- unable to decode event" << dendl;
1339       dout(0) << "dump of unknown or corrupt event:\n";
1340       bl.hexdump(*_dout);
1341       *_dout << dendl;
1342
1343       mds->clog->error() << "corrupt journal event at " << pos << "~"
1344                          << bl.length() << " / "
1345                          << journaler->get_write_pos();
1346       if (g_conf->mds_log_skip_corrupt_events) {
1347         continue;
1348       } else {
1349         mds->damaged_unlocked();
1350         ceph_abort();  // Should be unreachable because damaged() calls
1351                     // respawn()
1352       }
1353
1354     }
1355     le->set_start_off(pos);
1356
1357     // new segment?
1358     if (le->get_type() == EVENT_SUBTREEMAP ||
1359         le->get_type() == EVENT_RESETJOURNAL) {
1360       ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1361       if (sle && sle->event_seq > 0)
1362         event_seq = sle->event_seq;
1363       else
1364         event_seq = pos;
1365       segments[event_seq] = new LogSegment(event_seq, pos);
1366       logger->set(l_mdl_seg, segments.size());
1367     } else {
1368       event_seq++;
1369     }
1370
1371     // have we seen an import map yet?
1372     if (segments.empty()) {
1373       dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() 
1374                << " " << le->get_stamp() << " -- waiting for subtree_map.  (skipping " << *le << ")" << dendl;
1375     } else {
1376       dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() 
1377                << " " << le->get_stamp() << ": " << *le << dendl;
1378       le->_segment = get_current_segment();    // replay may need this
1379       le->_segment->num_events++;
1380       le->_segment->end = journaler->get_read_pos();
1381       num_events++;
1382
1383       {
1384         Mutex::Locker l(mds->mds_lock);
1385         if (mds->is_daemon_stopping()) {
1386           return;
1387         }
1388         logger->inc(l_mdl_replayed);
1389         le->replay(mds);
1390       }
1391     }
1392     delete le;
1393
1394     logger->set(l_mdl_rdpos, pos);
1395   }
1396
1397   // done!
1398   if (r == 0) {
1399     assert(journaler->get_read_pos() == journaler->get_write_pos());
1400     dout(10) << "_replay - complete, " << num_events
1401              << " events" << dendl;
1402
1403     logger->set(l_mdl_expos, journaler->get_expire_pos());
1404   }
1405
1406   safe_pos = journaler->get_write_safe_pos();
1407
1408   dout(10) << "_replay_thread kicking waiters" << dendl;
1409   {
1410     Mutex::Locker l(mds->mds_lock);
1411     if (mds->is_daemon_stopping()) {
1412       return;
1413     }
1414     finish_contexts(g_ceph_context, waitfor_replay, r);  
1415   }
1416
1417   dout(10) << "_replay_thread finish" << dendl;
1418 }
1419
1420 void MDLog::standby_trim_segments()
1421 {
1422   dout(10) << "standby_trim_segments" << dendl;
1423   uint64_t expire_pos = journaler->get_expire_pos();
1424   dout(10) << " expire_pos=" << expire_pos << dendl;
1425   bool removed_segment = false;
1426   while (have_any_segments()) {
1427     LogSegment *seg = get_oldest_segment();
1428     dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1429       "~" << seg->end - seg->offset << dendl;
1430
1431     if (seg->end > expire_pos) {
1432       dout(10) << " won't remove, not expired!" << dendl;
1433       break;
1434     }
1435
1436     if (segments.size() == 1) {
1437       dout(10) << " won't remove, last segment!" << dendl;
1438       break;
1439     }
1440
1441     dout(10) << " removing segment" << dendl;
1442     mds->mdcache->standby_trim_segment(seg);
1443     remove_oldest_segment();
1444     removed_segment = true;
1445   }
1446
1447   if (removed_segment) {
1448     dout(20) << " calling mdcache->trim!" << dendl;
1449     mds->mdcache->trim();
1450   } else {
1451     dout(20) << " removed no segments!" << dendl;
1452   }
1453 }
1454
1455 void MDLog::dump_replay_status(Formatter *f) const
1456 {
1457   f->open_object_section("replay_status");
1458   f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1459   f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1460   f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1461   f->dump_unsigned("num_events", get_num_events());
1462   f->dump_unsigned("num_segments", get_num_segments());
1463   f->close_section();
1464 }