Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "common/config.h"
16 #include "osdc/Journaler.h"
17 #include "events/ESubtreeMap.h"
18 #include "events/ESession.h"
19 #include "events/ESessions.h"
20
21 #include "events/EMetaBlob.h"
22 #include "events/EResetJournal.h"
23 #include "events/ENoOp.h"
24
25 #include "events/EUpdate.h"
26 #include "events/ESlaveUpdate.h"
27 #include "events/EOpen.h"
28 #include "events/ECommitted.h"
29
30 #include "events/EExport.h"
31 #include "events/EImportStart.h"
32 #include "events/EImportFinish.h"
33 #include "events/EFragment.h"
34
35 #include "events/ETableClient.h"
36 #include "events/ETableServer.h"
37
38 #include "include/stringify.h"
39
40 #include "LogSegment.h"
41
42 #include "MDSRank.h"
43 #include "MDLog.h"
44 #include "MDCache.h"
45 #include "Server.h"
46 #include "Migrator.h"
47 #include "Mutation.h"
48
49 #include "InoTable.h"
50 #include "MDSTableClient.h"
51 #include "MDSTableServer.h"
52
53 #include "Locker.h"
54
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_mds
57 #undef dout_prefix
58 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".journal "
59
60
61 // -----------------------
62 // LogSegment
63
64 void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio)
65 {
66   set<CDir*> commit;
67
68   dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire" << dendl;
69
70   assert(g_conf->mds_kill_journal_expire_at != 1);
71
72   // commit dirs
73   for (elist<CDir*>::iterator p = new_dirfrags.begin(); !p.end(); ++p) {
74     dout(20) << " new_dirfrag " << **p << dendl;
75     assert((*p)->is_auth());
76     commit.insert(*p);
77   }
78   for (elist<CDir*>::iterator p = dirty_dirfrags.begin(); !p.end(); ++p) {
79     dout(20) << " dirty_dirfrag " << **p << dendl;
80     assert((*p)->is_auth());
81     commit.insert(*p);
82   }
83   for (elist<CDentry*>::iterator p = dirty_dentries.begin(); !p.end(); ++p) {
84     dout(20) << " dirty_dentry " << **p << dendl;
85     assert((*p)->is_auth());
86     commit.insert((*p)->get_dir());
87   }
88   for (elist<CInode*>::iterator p = dirty_inodes.begin(); !p.end(); ++p) {
89     dout(20) << " dirty_inode " << **p << dendl;
90     assert((*p)->is_auth());
91     if ((*p)->is_base()) {
92       (*p)->store(gather_bld.new_sub());
93     } else
94       commit.insert((*p)->get_parent_dn()->get_dir());
95   }
96
97   if (!commit.empty()) {
98     for (set<CDir*>::iterator p = commit.begin();
99          p != commit.end();
100          ++p) {
101       CDir *dir = *p;
102       assert(dir->is_auth());
103       if (dir->can_auth_pin()) {
104         dout(15) << "try_to_expire committing " << *dir << dendl;
105         dir->commit(0, gather_bld.new_sub(), false, op_prio);
106       } else {
107         dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
108         dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());
109       }
110     }
111   }
112
113   // master ops with possibly uncommitted slaves
114   for (set<metareqid_t>::iterator p = uncommitted_masters.begin();
115        p != uncommitted_masters.end();
116        ++p) {
117     dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
118     mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
119   }
120
121   // uncommitted fragments
122   for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
123        p != uncommitted_fragments.end();
124        ++p) {
125     dout(10) << "try_to_expire waiting for uncommitted fragment " << *p << dendl;
126     mds->mdcache->wait_for_uncommitted_fragment(*p, gather_bld.new_sub());
127   }
128
129   // nudge scatterlocks
130   for (elist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
131     CInode *in = *p;
132     dout(10) << "try_to_expire waiting for dirlock flush on " << *in << dendl;
133     mds->locker->scatter_nudge(&in->filelock, gather_bld.new_sub());
134   }
135   for (elist<CInode*>::iterator p = dirty_dirfrag_dirfragtree.begin(); !p.end(); ++p) {
136     CInode *in = *p;
137     dout(10) << "try_to_expire waiting for dirfragtreelock flush on " << *in << dendl;
138     mds->locker->scatter_nudge(&in->dirfragtreelock, gather_bld.new_sub());
139   }
140   for (elist<CInode*>::iterator p = dirty_dirfrag_nest.begin(); !p.end(); ++p) {
141     CInode *in = *p;
142     dout(10) << "try_to_expire waiting for nest flush on " << *in << dendl;
143     mds->locker->scatter_nudge(&in->nestlock, gather_bld.new_sub());
144   }
145
146   assert(g_conf->mds_kill_journal_expire_at != 2);
147
148   // open files and snap inodes 
149   if (!open_files.empty()) {
150     assert(!mds->mdlog->is_capped()); // hmm FIXME
151     EOpen *le = 0;
152     LogSegment *ls = mds->mdlog->get_current_segment();
153     assert(ls != this);
154     elist<CInode*>::iterator p = open_files.begin(member_offset(CInode, item_open_file));
155     while (!p.end()) {
156       CInode *in = *p;
157       ++p;
158       if (in->last == CEPH_NOSNAP && in->is_auth() &&
159           !in->is_ambiguous_auth() && in->is_any_caps()) {
160         if (in->is_any_caps_wanted()) {
161           dout(20) << "try_to_expire requeueing open file " << *in << dendl;
162           if (!le) {
163             le = new EOpen(mds->mdlog);
164             mds->mdlog->start_entry(le);
165           }
166           le->add_clean_inode(in);
167           ls->open_files.push_back(&in->item_open_file);
168         } else {
169           // drop inodes that aren't wanted
170           dout(20) << "try_to_expire not requeueing and delisting unwanted file " << *in << dendl;
171           in->item_open_file.remove_myself();
172         }
173       } else if (in->last != CEPH_NOSNAP && !in->client_snap_caps.empty()) {
174         // journal snap inodes that need flush. This simplify the mds failover hanlding
175         dout(20) << "try_to_expire requeueing snap needflush inode " << *in << dendl;
176         if (!le) {
177           le = new EOpen(mds->mdlog);
178           mds->mdlog->start_entry(le);
179         }
180         le->add_clean_inode(in);
181         ls->open_files.push_back(&in->item_open_file);
182       } else {
183         /*
184          * we can get a capless inode here if we replay an open file, the client fails to
185          * reconnect it, but does REPLAY an open request (that adds it to the logseg).  AFAICS
186          * it's ok for the client to replay an open on a file it doesn't have in it's cache
187          * anymore.
188          *
189          * this makes the mds less sensitive to strict open_file consistency, although it does
190          * make it easier to miss subtle problems.
191          */
192         dout(20) << "try_to_expire not requeueing and delisting capless file " << *in << dendl;
193         in->item_open_file.remove_myself();
194       }
195     }
196     if (le) {
197       mds->mdlog->submit_entry(le);
198       mds->mdlog->wait_for_safe(gather_bld.new_sub());
199       dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
200     }
201   }
202
203   assert(g_conf->mds_kill_journal_expire_at != 3);
204
205   // backtraces to be stored/updated
206   for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
207     CInode *in = *p;
208     assert(in->is_auth());
209     if (in->can_auth_pin()) {
210       dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
211       in->store_backtrace(gather_bld.new_sub(), op_prio);
212     } else {
213       dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
214       in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
215     }
216   }
217
218   assert(g_conf->mds_kill_journal_expire_at != 4);
219
220   // slave updates
221   for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
222                                                                              item));
223        !p.end(); ++p) {
224     MDSlaveUpdate *su = *p;
225     dout(10) << "try_to_expire waiting on slave update " << su << dendl;
226     assert(su->waiter == 0);
227     su->waiter = gather_bld.new_sub();
228   }
229
230   // idalloc
231   if (inotablev > mds->inotable->get_committed_version()) {
232     dout(10) << "try_to_expire saving inotable table, need " << inotablev
233               << ", committed is " << mds->inotable->get_committed_version()
234               << " (" << mds->inotable->get_committing_version() << ")"
235               << dendl;
236     mds->inotable->save(gather_bld.new_sub(), inotablev);
237   }
238
239   // sessionmap
240   if (sessionmapv > mds->sessionmap.get_committed()) {
241     dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv 
242               << ", committed is " << mds->sessionmap.get_committed()
243               << " (" << mds->sessionmap.get_committing() << ")"
244               << dendl;
245     mds->sessionmap.save(gather_bld.new_sub(), sessionmapv);
246   }
247
248   // updates to sessions for completed_requests
249   mds->sessionmap.save_if_dirty(touched_sessions, &gather_bld);
250   touched_sessions.clear();
251
252   // pending commit atids
253   for (map<int, ceph::unordered_set<version_t> >::iterator p = pending_commit_tids.begin();
254        p != pending_commit_tids.end();
255        ++p) {
256     MDSTableClient *client = mds->get_table_client(p->first);
257     assert(client);
258     for (ceph::unordered_set<version_t>::iterator q = p->second.begin();
259          q != p->second.end();
260          ++q) {
261       dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q 
262                << " pending commit (not yet acked), waiting" << dendl;
263       assert(!client->has_committed(*q));
264       client->wait_for_ack(*q, gather_bld.new_sub());
265     }
266   }
267   
268   // table servers
269   for (map<int, version_t>::iterator p = tablev.begin();
270        p != tablev.end();
271        ++p) {
272     MDSTableServer *server = mds->get_table_server(p->first);
273     assert(server);
274     if (p->second > server->get_committed_version()) {
275       dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first) 
276                << " to save, need " << p->second << dendl;
277       server->save(gather_bld.new_sub());
278     }
279   }
280
281   // truncating
282   for (set<CInode*>::iterator p = truncating_inodes.begin();
283        p != truncating_inodes.end();
284        ++p) {
285     dout(10) << "try_to_expire waiting for truncate of " << **p << dendl;
286     (*p)->add_waiter(CInode::WAIT_TRUNC, gather_bld.new_sub());
287   }
288   
289   if (gather_bld.has_subs()) {
290     dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire waiting" << dendl;
291     mds->mdlog->flush();
292   } else {
293     assert(g_conf->mds_kill_journal_expire_at != 5);
294     dout(6) << "LogSegment(" << seq << "/" << offset << ").try_to_expire success" << dendl;
295   }
296 }
297
298
299 // -----------------------
300 // EMetaBlob
301
302 EMetaBlob::EMetaBlob(MDLog *mdlog) : opened_ino(0), renamed_dirino(0),
303                                      inotablev(0), sessionmapv(0), allocated_ino(0),
304                                      last_subtree_map(0), event_seq(0)
305 { }
306
307 void EMetaBlob::add_dir_context(CDir *dir, int mode)
308 {
309   MDSRank *mds = dir->cache->mds;
310
311   list<CDentry*> parents;
312
313   // it may be okay not to include the maybe items, if
314   //  - we journaled the maybe child inode in this segment
315   //  - that subtree turns out to be unambiguously auth
316   list<CDentry*> maybe;
317   bool maybenot = false;
318
319   while (true) {
320     // already have this dir?  (we must always add in order)
321     if (lump_map.count(dir->dirfrag())) {
322       dout(20) << "EMetaBlob::add_dir_context(" << dir << ") have lump " << dir->dirfrag() << dendl;
323       break;
324     }
325
326     // stop at root/stray
327     CInode *diri = dir->get_inode();
328     CDentry *parent = diri->get_projected_parent_dn();
329
330     if (mode == TO_AUTH_SUBTREE_ROOT) {
331       // subtree root?
332       if (dir->is_subtree_root()) {
333         // match logic in MDCache::create_subtree_map()
334         if (dir->get_dir_auth().first == mds->get_nodeid()) {
335           mds_authority_t parent_auth = parent ? parent->authority() : CDIR_AUTH_UNDEF;
336           if (parent_auth.first == dir->get_dir_auth().first) {
337             if (parent_auth.second == CDIR_AUTH_UNKNOWN &&
338                 !dir->is_ambiguous_dir_auth() &&
339                 !dir->state_test(CDir::STATE_EXPORTBOUND) &&
340                 !dir->state_test(CDir::STATE_AUXSUBTREE) &&
341                 !diri->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
342               dout(0) << "EMetaBlob::add_dir_context unexpected subtree " << *dir << dendl;
343               assert(0);
344             }
345             dout(20) << "EMetaBlob::add_dir_context(" << dir << ") ambiguous or transient subtree " << dendl;
346           } else {
347             // it's an auth subtree, we don't need maybe (if any), and we're done.
348             dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached unambig auth subtree, don't need " << maybe
349                      << " at " << *dir << dendl;
350             maybe.clear();
351             break;
352           }
353         } else {
354           dout(20) << "EMetaBlob::add_dir_context(" << dir << ") reached ambig or !auth subtree, need " << maybe
355                    << " at " << *dir << dendl;
356           // we need the maybe list after all!
357           parents.splice(parents.begin(), maybe);
358           maybenot = false;
359         }
360       }
361
362       // was the inode journaled in this blob?
363       if (event_seq && diri->last_journaled == event_seq) {
364         dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri this blob " << *diri << dendl;
365         break;
366       }
367
368       // have we journaled this inode since the last subtree map?
369       if (!maybenot && last_subtree_map && diri->last_journaled >= last_subtree_map) {
370         dout(20) << "EMetaBlob::add_dir_context(" << dir << ") already have diri in this segment (" 
371                  << diri->last_journaled << " >= " << last_subtree_map << "), setting maybenot flag "
372                  << *diri << dendl;
373         maybenot = true;
374       }
375     }
376
377     if (!parent)
378       break;
379
380     if (maybenot) {
381       dout(25) << "EMetaBlob::add_dir_context(" << dir << ")      maybe " << *parent << dendl;
382       maybe.push_front(parent);
383     } else {
384       dout(25) << "EMetaBlob::add_dir_context(" << dir << ") definitely " << *parent << dendl;
385       parents.push_front(parent);
386     }
387     
388     dir = parent->get_dir();
389   }
390   
391   parents.splice(parents.begin(), maybe);
392
393   dout(20) << "EMetaBlob::add_dir_context final: " << parents << dendl;
394   for (list<CDentry*>::iterator p = parents.begin(); p != parents.end(); ++p) {
395     assert((*p)->get_projected_linkage()->is_primary());
396     add_dentry(*p, false);
397   }
398 }
399
400 void EMetaBlob::update_segment(LogSegment *ls)
401 {
402   // dirty inode mtimes
403   // -> handled directly by Server.cc, replay()
404
405   // alloc table update?
406   if (inotablev)
407     ls->inotablev = inotablev;
408   if (sessionmapv)
409     ls->sessionmapv = sessionmapv;
410
411   // truncated inodes
412   // -> handled directly by Server.cc
413
414   // client requests
415   //  note the newest request per client
416   //if (!client_reqs.empty())
417     //    ls->last_client_tid[client_reqs.rbegin()->client] = client_reqs.rbegin()->tid);
418 }
419
420 // EMetaBlob::fullbit
421
422 void EMetaBlob::fullbit::encode(bufferlist& bl, uint64_t features) const {
423   ENCODE_START(8, 5, bl);
424   ::encode(dn, bl);
425   ::encode(dnfirst, bl);
426   ::encode(dnlast, bl);
427   ::encode(dnv, bl);
428   ::encode(inode, bl, features);
429   ::encode(xattrs, bl);
430   if (inode.is_symlink())
431     ::encode(symlink, bl);
432   if (inode.is_dir()) {
433     ::encode(dirfragtree, bl);
434     ::encode(snapbl, bl);
435   }
436   ::encode(state, bl);
437   if (old_inodes.empty()) {
438     ::encode(false, bl);
439   } else {
440     ::encode(true, bl);
441     ::encode(old_inodes, bl, features);
442   }
443   if (!inode.is_dir())
444     ::encode(snapbl, bl);
445   ::encode(oldest_snap, bl);
446   ENCODE_FINISH(bl);
447 }
448
449 void EMetaBlob::fullbit::decode(bufferlist::iterator &bl) {
450   DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl);
451   ::decode(dn, bl);
452   ::decode(dnfirst, bl);
453   ::decode(dnlast, bl);
454   ::decode(dnv, bl);
455   ::decode(inode, bl);
456   ::decode(xattrs, bl);
457   if (inode.is_symlink())
458     ::decode(symlink, bl);
459   if (inode.is_dir()) {
460     ::decode(dirfragtree, bl);
461     ::decode(snapbl, bl);
462     if ((struct_v == 2) || (struct_v == 3)) {
463       bool dir_layout_exists;
464       ::decode(dir_layout_exists, bl);
465       if (dir_layout_exists) {
466         __u8 dir_struct_v;
467         ::decode(dir_struct_v, bl); // default_file_layout version
468         ::decode(inode.layout, bl); // and actual layout, that we care about
469       }
470     }
471   }
472   if (struct_v >= 6) {
473     ::decode(state, bl);
474   } else {
475     bool dirty;
476     ::decode(dirty, bl);
477     state = dirty ? EMetaBlob::fullbit::STATE_DIRTY : 0;
478   }
479
480   if (struct_v >= 3) {
481     bool old_inodes_present;
482     ::decode(old_inodes_present, bl);
483     if (old_inodes_present) {
484       ::decode(old_inodes, bl);
485     }
486   }
487   if (!inode.is_dir()) {
488     if (struct_v >= 7)
489       ::decode(snapbl, bl);
490   }
491   if (struct_v >= 8)
492     ::decode(oldest_snap, bl);
493   else
494     oldest_snap = CEPH_NOSNAP;
495
496   DECODE_FINISH(bl);
497 }
498
499 void EMetaBlob::fullbit::dump(Formatter *f) const
500 {
501   f->dump_string("dentry", dn);
502   f->dump_stream("snapid.first") << dnfirst;
503   f->dump_stream("snapid.last") << dnlast;
504   f->dump_int("dentry version", dnv);
505   f->open_object_section("inode");
506   inode.dump(f);
507   f->close_section(); // inode
508   f->open_object_section("xattrs");
509   for (map<string, bufferptr>::const_iterator iter = xattrs.begin();
510       iter != xattrs.end(); ++iter) {
511     string s(iter->second.c_str(), iter->second.length());
512     f->dump_string(iter->first.c_str(), s);
513   }
514   f->close_section(); // xattrs
515   if (inode.is_symlink()) {
516     f->dump_string("symlink", symlink);
517   }
518   if (inode.is_dir()) {
519     f->dump_stream("frag tree") << dirfragtree;
520     f->dump_string("has_snapbl", snapbl.length() ? "true" : "false");
521     if (inode.has_layout()) {
522       f->open_object_section("file layout policy");
523       // FIXME
524       f->dump_string("layout", "the layout exists");
525       f->close_section(); // file layout policy
526     }
527   }
528   f->dump_string("state", state_string());
529   if (!old_inodes.empty()) {
530     f->open_array_section("old inodes");
531     for (old_inodes_t::const_iterator iter = old_inodes.begin();
532          iter != old_inodes.end();
533          ++iter) {
534       f->open_object_section("inode");
535       f->dump_int("snapid", iter->first);
536       iter->second.dump(f);
537       f->close_section(); // inode
538     }
539     f->close_section(); // old inodes
540   }
541 }
542
543 void EMetaBlob::fullbit::generate_test_instances(list<EMetaBlob::fullbit*>& ls)
544 {
545   inode_t inode;
546   fragtree_t fragtree;
547   map<string,bufferptr> empty_xattrs;
548   bufferlist empty_snapbl;
549   fullbit *sample = new fullbit("/testdn", 0, 0, 0,
550                                 inode, fragtree, empty_xattrs, "", 0, empty_snapbl,
551                                 false, NULL);
552   ls.push_back(sample);
553 }
554
555 void EMetaBlob::fullbit::update_inode(MDSRank *mds, CInode *in)
556 {
557   in->inode = inode;
558   in->xattrs = xattrs;
559   in->maybe_export_pin();
560   if (in->inode.is_dir()) {
561     if (!(in->dirfragtree == dirfragtree)) {
562       dout(10) << "EMetaBlob::fullbit::update_inode dft " << in->dirfragtree << " -> "
563                << dirfragtree << " on " << *in << dendl;
564       in->dirfragtree = dirfragtree;
565       in->force_dirfrags();
566       if (in->has_dirfrags() && in->authority() == CDIR_AUTH_UNDEF) {
567         list<CDir*> ls;
568         in->get_nested_dirfrags(ls);
569         for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
570           CDir *dir = *p;
571           if (dir->get_num_any() == 0 &&
572               mds->mdcache->can_trim_non_auth_dirfrag(dir)) {
573             dout(10) << " closing empty non-auth dirfrag " << *dir << dendl;
574             in->close_dirfrag(dir->get_frag());
575           }
576         }
577       }
578     }
579   } else if (in->inode.is_symlink()) {
580     in->symlink = symlink;
581   }
582   in->old_inodes = old_inodes;
583   if (!in->old_inodes.empty()) {
584     snapid_t min_first = in->old_inodes.rbegin()->first + 1;
585     if (min_first > in->first)
586       in->first = min_first;
587   }
588
589   /*
590    * we can do this before linking hte inode bc the split_at would
591    * be a no-op.. we have no children (namely open snaprealms) to
592    * divy up
593    */
594   in->oldest_snap = oldest_snap;
595   in->decode_snap_blob(snapbl);
596
597   /*
598    * In case there was anything malformed in the journal that we are
599    * replaying, do sanity checks on the inodes we're replaying and
600    * go damaged instead of letting any trash into a live cache
601    */
602   if (in->is_file()) {
603     // Files must have valid layouts with a pool set
604     if (in->inode.layout.pool_id == -1 || !in->inode.layout.is_valid()) {
605       dout(0) << "EMetaBlob.replay invalid layout on ino " << *in
606               << ": " << in->inode.layout << dendl;
607       std::ostringstream oss;
608       oss << "Invalid layout for inode 0x" << std::hex << in->inode.ino
609           << std::dec << " in journal";
610       mds->clog->error() << oss.str();
611       mds->damaged();
612       ceph_abort();  // Should be unreachable because damaged() calls respawn()
613     }
614   }
615 }
616
617 // EMetaBlob::remotebit
618
619 void EMetaBlob::remotebit::encode(bufferlist& bl) const
620 {
621   ENCODE_START(2, 2, bl);
622   ::encode(dn, bl);
623   ::encode(dnfirst, bl);
624   ::encode(dnlast, bl);
625   ::encode(dnv, bl);
626   ::encode(ino, bl);
627   ::encode(d_type, bl);
628   ::encode(dirty, bl);
629   ENCODE_FINISH(bl);
630 }
631
632 void EMetaBlob::remotebit::decode(bufferlist::iterator &bl)
633 {
634   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
635   ::decode(dn, bl);
636   ::decode(dnfirst, bl);
637   ::decode(dnlast, bl);
638   ::decode(dnv, bl);
639   ::decode(ino, bl);
640   ::decode(d_type, bl);
641   ::decode(dirty, bl);
642   DECODE_FINISH(bl);
643 }
644
645 void EMetaBlob::remotebit::dump(Formatter *f) const
646 {
647   f->dump_string("dentry", dn);
648   f->dump_int("snapid.first", dnfirst);
649   f->dump_int("snapid.last", dnlast);
650   f->dump_int("dentry version", dnv);
651   f->dump_int("inodeno", ino);
652   uint32_t type = DTTOIF(d_type) & S_IFMT; // convert to type entries
653   string type_string;
654   switch(type) {
655   case S_IFREG:
656     type_string = "file"; break;
657   case S_IFLNK:
658     type_string = "symlink"; break;
659   case S_IFDIR:
660     type_string = "directory"; break;
661   case S_IFIFO:
662     type_string = "fifo"; break;
663   case S_IFCHR:
664     type_string = "chr"; break;
665   case S_IFBLK:
666     type_string = "blk"; break;
667   case S_IFSOCK:
668     type_string = "sock"; break;
669   default:
670     assert (0 == "unknown d_type!");
671   }
672   f->dump_string("d_type", type_string);
673   f->dump_string("dirty", dirty ? "true" : "false");
674 }
675
676 void EMetaBlob::remotebit::
677 generate_test_instances(list<EMetaBlob::remotebit*>& ls)
678 {
679   remotebit *remote = new remotebit("/test/dn", 0, 10, 15, 1, IFTODT(S_IFREG), false);
680   ls.push_back(remote);
681 }
682
683 // EMetaBlob::nullbit
684
685 void EMetaBlob::nullbit::encode(bufferlist& bl) const
686 {
687   ENCODE_START(2, 2, bl);
688   ::encode(dn, bl);
689   ::encode(dnfirst, bl);
690   ::encode(dnlast, bl);
691   ::encode(dnv, bl);
692   ::encode(dirty, bl);
693   ENCODE_FINISH(bl);
694 }
695
696 void EMetaBlob::nullbit::decode(bufferlist::iterator &bl)
697 {
698   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
699   ::decode(dn, bl);
700   ::decode(dnfirst, bl);
701   ::decode(dnlast, bl);
702   ::decode(dnv, bl);
703   ::decode(dirty, bl);
704   DECODE_FINISH(bl);
705 }
706
707 void EMetaBlob::nullbit::dump(Formatter *f) const
708 {
709   f->dump_string("dentry", dn);
710   f->dump_int("snapid.first", dnfirst);
711   f->dump_int("snapid.last", dnlast);
712   f->dump_int("dentry version", dnv);
713   f->dump_string("dirty", dirty ? "true" : "false");
714 }
715
716 void EMetaBlob::nullbit::generate_test_instances(list<nullbit*>& ls)
717 {
718   nullbit *sample = new nullbit("/test/dentry", 0, 10, 15, false);
719   nullbit *sample2 = new nullbit("/test/dirty", 10, 20, 25, true);
720   ls.push_back(sample);
721   ls.push_back(sample2);
722 }
723
724 // EMetaBlob::dirlump
725
726 void EMetaBlob::dirlump::encode(bufferlist& bl, uint64_t features) const
727 {
728   ENCODE_START(2, 2, bl);
729   ::encode(fnode, bl);
730   ::encode(state, bl);
731   ::encode(nfull, bl);
732   ::encode(nremote, bl);
733   ::encode(nnull, bl);
734   _encode_bits(features);
735   ::encode(dnbl, bl);
736   ENCODE_FINISH(bl);
737 }
738
739 void EMetaBlob::dirlump::decode(bufferlist::iterator &bl)
740 {
741   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl)
742   ::decode(fnode, bl);
743   ::decode(state, bl);
744   ::decode(nfull, bl);
745   ::decode(nremote, bl);
746   ::decode(nnull, bl);
747   ::decode(dnbl, bl);
748   dn_decoded = false;      // don't decode bits unless we need them.
749   DECODE_FINISH(bl);
750 }
751
752 void EMetaBlob::dirlump::dump(Formatter *f) const
753 {
754   if (!dn_decoded) {
755     dirlump *me = const_cast<dirlump*>(this);
756     me->_decode_bits();
757   }
758   f->open_object_section("fnode");
759   fnode.dump(f);
760   f->close_section(); // fnode
761   f->dump_string("state", state_string());
762   f->dump_int("nfull", nfull);
763   f->dump_int("nremote", nremote);
764   f->dump_int("nnull", nnull);
765
766   f->open_array_section("full bits");
767   for (list<ceph::shared_ptr<fullbit> >::const_iterator
768       iter = dfull.begin(); iter != dfull.end(); ++iter) {
769     f->open_object_section("fullbit");
770     (*iter)->dump(f);
771     f->close_section(); // fullbit
772   }
773   f->close_section(); // full bits
774   f->open_array_section("remote bits");
775   for (list<remotebit>::const_iterator
776       iter = dremote.begin(); iter != dremote.end(); ++iter) {
777     f->open_object_section("remotebit");
778     (*iter).dump(f);
779     f->close_section(); // remotebit
780   }
781   f->close_section(); // remote bits
782   f->open_array_section("null bits");
783   for (list<nullbit>::const_iterator
784       iter = dnull.begin(); iter != dnull.end(); ++iter) {
785     f->open_object_section("null bit");
786     (*iter).dump(f);
787     f->close_section(); // null bit
788   }
789   f->close_section(); // null bits
790 }
791
792 void EMetaBlob::dirlump::generate_test_instances(list<dirlump*>& ls)
793 {
794   ls.push_back(new dirlump());
795 }
796
797 /**
798  * EMetaBlob proper
799  */
800 void EMetaBlob::encode(bufferlist& bl, uint64_t features) const
801 {
802   ENCODE_START(8, 5, bl);
803   ::encode(lump_order, bl);
804   ::encode(lump_map, bl, features);
805   ::encode(roots, bl, features);
806   ::encode(table_tids, bl);
807   ::encode(opened_ino, bl);
808   ::encode(allocated_ino, bl);
809   ::encode(used_preallocated_ino, bl);
810   ::encode(preallocated_inos, bl);
811   ::encode(client_name, bl);
812   ::encode(inotablev, bl);
813   ::encode(sessionmapv, bl);
814   ::encode(truncate_start, bl);
815   ::encode(truncate_finish, bl);
816   ::encode(destroyed_inodes, bl);
817   ::encode(client_reqs, bl);
818   ::encode(renamed_dirino, bl);
819   ::encode(renamed_dir_frags, bl);
820   {
821     // make MDSRank use v6 format happy
822     int64_t i = -1;
823     bool b = false;
824     ::encode(i, bl);
825     ::encode(b, bl);
826   }
827   ::encode(client_flushes, bl);
828   ENCODE_FINISH(bl);
829 }
830 void EMetaBlob::decode(bufferlist::iterator &bl)
831 {
832   DECODE_START_LEGACY_COMPAT_LEN(7, 5, 5, bl);
833   ::decode(lump_order, bl);
834   ::decode(lump_map, bl);
835   if (struct_v >= 4) {
836     ::decode(roots, bl);
837   } else {
838     bufferlist rootbl;
839     ::decode(rootbl, bl);
840     if (rootbl.length()) {
841       bufferlist::iterator p = rootbl.begin();
842       roots.push_back(ceph::shared_ptr<fullbit>(new fullbit(p)));
843     }
844   }
845   ::decode(table_tids, bl);
846   ::decode(opened_ino, bl);
847   ::decode(allocated_ino, bl);
848   ::decode(used_preallocated_ino, bl);
849   ::decode(preallocated_inos, bl);
850   ::decode(client_name, bl);
851   ::decode(inotablev, bl);
852   ::decode(sessionmapv, bl);
853   ::decode(truncate_start, bl);
854   ::decode(truncate_finish, bl);
855   ::decode(destroyed_inodes, bl);
856   if (struct_v >= 2) {
857     ::decode(client_reqs, bl);
858   } else {
859     list<metareqid_t> r;
860     ::decode(r, bl);
861     while (!r.empty()) {
862         client_reqs.push_back(pair<metareqid_t,uint64_t>(r.front(), 0));
863         r.pop_front();
864     }
865   }
866   if (struct_v >= 3) {
867     ::decode(renamed_dirino, bl);
868     ::decode(renamed_dir_frags, bl);
869   }
870   if (struct_v >= 6) {
871     // ignore
872     int64_t i;
873     bool b;
874     ::decode(i, bl);
875     ::decode(b, bl);
876   }
877   if (struct_v >= 8) {
878     ::decode(client_flushes, bl);
879   }
880   DECODE_FINISH(bl);
881 }
882
883
884 /**
885  * Get all inodes touched by this metablob.  Includes the 'bits' within
886  * dirlumps, and the inodes of the dirs themselves.
887  */
888 void EMetaBlob::get_inodes(
889     std::set<inodeno_t> &inodes) const
890 {
891   // For all dirlumps in this metablob
892   for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
893     // Record inode of dirlump
894     inodeno_t const dir_ino = i->first.ino;
895     inodes.insert(dir_ino);
896
897     // Decode dirlump bits
898     dirlump const &dl = i->second;
899     dl._decode_bits();
900
901     // Record inodes of fullbits
902     list<ceph::shared_ptr<fullbit> > const &fb_list = dl.get_dfull();
903     for (list<ceph::shared_ptr<fullbit> >::const_iterator
904         iter = fb_list.begin(); iter != fb_list.end(); ++iter) {
905       inodes.insert((*iter)->inode.ino);
906     }
907
908     // Record inodes of remotebits
909     list<remotebit> const &rb_list = dl.get_dremote();
910     for (list<remotebit>::const_iterator
911         iter = rb_list.begin(); iter != rb_list.end(); ++iter) {
912       inodes.insert(iter->ino);
913     }
914   }
915 }
916
917
918 /**
919  * Get a map of dirfrag to set of dentries in that dirfrag which are
920  * touched in this operation.
921  */
922 void EMetaBlob::get_dentries(std::map<dirfrag_t, std::set<std::string> > &dentries) const
923 {
924   for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
925     dirlump const &dl = i->second;
926     dirfrag_t const &df = i->first;
927
928     // Get all bits
929     dl._decode_bits();
930     list<ceph::shared_ptr<fullbit> > const &fb_list = dl.get_dfull();
931     list<nullbit> const &nb_list = dl.get_dnull();
932     list<remotebit> const &rb_list = dl.get_dremote();
933
934     // For all bits, store dentry
935     for (list<ceph::shared_ptr<fullbit> >::const_iterator
936         iter = fb_list.begin(); iter != fb_list.end(); ++iter) {
937       dentries[df].insert((*iter)->dn);
938
939     }
940     for (list<nullbit>::const_iterator
941         iter = nb_list.begin(); iter != nb_list.end(); ++iter) {
942       dentries[df].insert(iter->dn);
943     }
944     for (list<remotebit>::const_iterator
945         iter = rb_list.begin(); iter != rb_list.end(); ++iter) {
946       dentries[df].insert(iter->dn);
947     }
948   }
949 }
950
951
952
953 /**
954  * Calculate all paths that we can infer are touched by this metablob.  Only uses
955  * information local to this metablob so it may only be the path within the
956  * subtree.
957  */
958 void EMetaBlob::get_paths(
959     std::vector<std::string> &paths) const
960 {
961   // Each dentry has a 'location' which is a 2-tuple of parent inode and dentry name
962   typedef std::pair<inodeno_t, std::string> Location;
963
964   // Whenever we see a dentry within a dirlump, we remember it as a child of
965   // the dirlump's inode
966   std::map<inodeno_t, std::list<std::string> > children;
967
968   // Whenever we see a location for an inode, remember it: this allows us to
969   // build a path given an inode
970   std::map<inodeno_t, Location> ino_locations;
971
972   // Special case: operations on root inode populate roots but not dirlumps
973   if (lump_map.empty() && !roots.empty()) {
974     paths.push_back("/");
975     return;
976   }
977
978   // First pass
979   // ==========
980   // Build a tiny local metadata cache for the path structure in this metablob
981   for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
982     inodeno_t const dir_ino = i->first.ino;
983     dirlump const &dl = i->second;
984     dl._decode_bits();
985
986     list<ceph::shared_ptr<fullbit> > const &fb_list = dl.get_dfull();
987     list<nullbit> const &nb_list = dl.get_dnull();
988     list<remotebit> const &rb_list = dl.get_dremote();
989
990     for (list<ceph::shared_ptr<fullbit> >::const_iterator
991         iter = fb_list.begin(); iter != fb_list.end(); ++iter) {
992       std::string const &dentry = (*iter)->dn;
993       children[dir_ino].push_back(dentry);
994       ino_locations[(*iter)->inode.ino] = Location(dir_ino, dentry);
995     }
996
997     for (list<nullbit>::const_iterator
998         iter = nb_list.begin(); iter != nb_list.end(); ++iter) {
999       std::string const &dentry = iter->dn;
1000       children[dir_ino].push_back(dentry);
1001     }
1002
1003     for (list<remotebit>::const_iterator
1004         iter = rb_list.begin(); iter != rb_list.end(); ++iter) {
1005       std::string const &dentry = iter->dn;
1006       children[dir_ino].push_back(dentry);
1007     }
1008   }
1009
1010   std::vector<Location> leaf_locations;
1011
1012   // Second pass
1013   // ===========
1014   // Output paths for all childless nodes in the metablob
1015   for (std::map<dirfrag_t, dirlump>::const_iterator i = lump_map.begin(); i != lump_map.end(); ++i) {
1016     inodeno_t const dir_ino = i->first.ino;
1017     dirlump const &dl = i->second;
1018     dl._decode_bits();
1019
1020     list<ceph::shared_ptr<fullbit> > const &fb_list = dl.get_dfull();
1021     for (list<ceph::shared_ptr<fullbit> >::const_iterator
1022         iter = fb_list.begin(); iter != fb_list.end(); ++iter) {
1023       std::string const &dentry = (*iter)->dn;
1024       children[dir_ino].push_back(dentry);
1025       ino_locations[(*iter)->inode.ino] = Location(dir_ino, dentry);
1026       if (children.find((*iter)->inode.ino) == children.end()) {
1027         leaf_locations.push_back(Location(dir_ino, dentry));
1028
1029       }
1030     }
1031
1032     list<nullbit> const &nb_list = dl.get_dnull();
1033     for (list<nullbit>::const_iterator
1034         iter = nb_list.begin(); iter != nb_list.end(); ++iter) {
1035       std::string const &dentry = iter->dn;
1036       leaf_locations.push_back(Location(dir_ino, dentry));
1037     }
1038
1039     list<remotebit> const &rb_list = dl.get_dremote();
1040     for (list<remotebit>::const_iterator
1041         iter = rb_list.begin(); iter != rb_list.end(); ++iter) {
1042       std::string const &dentry = iter->dn;
1043       leaf_locations.push_back(Location(dir_ino, dentry));
1044     }
1045   }
1046
1047   // For all the leaf locations identified, generate paths
1048   for (std::vector<Location>::iterator i = leaf_locations.begin(); i != leaf_locations.end(); ++i) {
1049     Location const &loc = *i;
1050     std::string path = loc.second;
1051     inodeno_t ino = loc.first;
1052     while(ino_locations.find(ino) != ino_locations.end()) {
1053       Location const &loc = ino_locations[ino];
1054       if (!path.empty()) {
1055         path = loc.second + "/" + path;
1056       } else {
1057         path = loc.second + path;
1058       }
1059       ino = loc.first;
1060     }
1061
1062     paths.push_back(path);
1063   }
1064 }
1065
1066
1067 void EMetaBlob::dump(Formatter *f) const
1068 {
1069   f->open_array_section("lumps");
1070   for (list<dirfrag_t>::const_iterator i = lump_order.begin();
1071        i != lump_order.end(); ++i) {
1072     f->open_object_section("lump");
1073     f->open_object_section("dirfrag");
1074     f->dump_stream("dirfrag") << *i;
1075     f->close_section(); // dirfrag
1076     f->open_object_section("dirlump");
1077     lump_map.at(*i).dump(f);
1078     f->close_section(); // dirlump
1079     f->close_section(); // lump
1080   }
1081   f->close_section(); // lumps
1082   
1083   f->open_array_section("roots");
1084   for (list<ceph::shared_ptr<fullbit> >::const_iterator i = roots.begin();
1085        i != roots.end(); ++i) {
1086     f->open_object_section("root");
1087     (*i)->dump(f);
1088     f->close_section(); // root
1089   }
1090   f->close_section(); // roots
1091
1092   f->open_array_section("tableclient tranactions");
1093   for (list<pair<__u8,version_t> >::const_iterator i = table_tids.begin();
1094        i != table_tids.end(); ++i) {
1095     f->open_object_section("transaction");
1096     f->dump_int("tid", i->first);
1097     f->dump_int("version", i->second);
1098     f->close_section(); // transaction
1099   }
1100   f->close_section(); // tableclient transactions
1101   
1102   f->dump_int("renamed directory inodeno", renamed_dirino);
1103   
1104   f->open_array_section("renamed directory fragments");
1105   for (list<frag_t>::const_iterator i = renamed_dir_frags.begin();
1106        i != renamed_dir_frags.end(); ++i) {
1107     f->dump_int("frag", *i);
1108   }
1109   f->close_section(); // renamed directory fragments
1110
1111   f->dump_int("inotable version", inotablev);
1112   f->dump_int("SessionMap version", sessionmapv);
1113   f->dump_int("allocated ino", allocated_ino);
1114   
1115   f->dump_stream("preallocated inos") << preallocated_inos;
1116   f->dump_int("used preallocated ino", used_preallocated_ino);
1117
1118   f->open_object_section("client name");
1119   client_name.dump(f);
1120   f->close_section(); // client name
1121
1122   f->open_array_section("inodes starting a truncate");
1123   for(list<inodeno_t>::const_iterator i = truncate_start.begin();
1124       i != truncate_start.end(); ++i) {
1125     f->dump_int("inodeno", *i);
1126   }
1127   f->close_section(); // truncate inodes
1128   f->open_array_section("inodes finishing a truncated");
1129   for(map<inodeno_t,uint64_t>::const_iterator i = truncate_finish.begin();
1130       i != truncate_finish.end(); ++i) {
1131     f->open_object_section("inode+segment");
1132     f->dump_int("inodeno", i->first);
1133     f->dump_int("truncate starting segment", i->second);
1134     f->close_section(); // truncated inode
1135   }
1136   f->close_section(); // truncate finish inodes
1137
1138   f->open_array_section("destroyed inodes");
1139   for(vector<inodeno_t>::const_iterator i = destroyed_inodes.begin();
1140       i != destroyed_inodes.end(); ++i) {
1141     f->dump_int("inodeno", *i);
1142   }
1143   f->close_section(); // destroyed inodes
1144
1145   f->open_array_section("client requests");
1146   for(list<pair<metareqid_t,uint64_t> >::const_iterator i = client_reqs.begin();
1147       i != client_reqs.end(); ++i) {
1148     f->open_object_section("Client request");
1149     f->dump_stream("request ID") << i->first;
1150     f->dump_int("oldest request on client", i->second);
1151     f->close_section(); // request
1152   }
1153   f->close_section(); // client requests
1154 }
1155
1156 void EMetaBlob::generate_test_instances(list<EMetaBlob*>& ls)
1157 {
1158   ls.push_back(new EMetaBlob());
1159 }
1160
1161 void EMetaBlob::replay(MDSRank *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
1162 {
1163   dout(10) << "EMetaBlob.replay " << lump_map.size() << " dirlumps by " << client_name << dendl;
1164
1165   assert(logseg);
1166
1167   assert(g_conf->mds_kill_journal_replay_at != 1);
1168
1169   for (list<ceph::shared_ptr<fullbit> >::iterator p = roots.begin(); p != roots.end(); ++p) {
1170     CInode *in = mds->mdcache->get_inode((*p)->inode.ino);
1171     bool isnew = in ? false:true;
1172     if (!in)
1173       in = new CInode(mds->mdcache, false);
1174     (*p)->update_inode(mds, in);
1175
1176     if (isnew)
1177       mds->mdcache->add_inode(in);
1178     if ((*p)->is_dirty()) in->_mark_dirty(logseg);
1179     dout(10) << "EMetaBlob.replay " << (isnew ? " added root ":" updated root ") << *in << dendl;    
1180   }
1181
1182   CInode *renamed_diri = 0;
1183   CDir *olddir = 0;
1184   if (renamed_dirino) {
1185     renamed_diri = mds->mdcache->get_inode(renamed_dirino);
1186     if (renamed_diri)
1187       dout(10) << "EMetaBlob.replay renamed inode is " << *renamed_diri << dendl;
1188     else
1189       dout(10) << "EMetaBlob.replay don't have renamed ino " << renamed_dirino << dendl;
1190
1191     int nnull = 0;
1192     for (list<dirfrag_t>::iterator lp = lump_order.begin(); lp != lump_order.end(); ++lp) {
1193       dirlump &lump = lump_map[*lp];
1194       if (lump.nnull) {
1195         dout(10) << "EMetaBlob.replay found null dentry in dir " << *lp << dendl;
1196         nnull += lump.nnull;
1197       }
1198     }
1199     assert(nnull <= 1);
1200   }
1201
1202   // keep track of any inodes we unlink and don't relink elsewhere
1203   map<CInode*, CDir*> unlinked;
1204   set<CInode*> linked;
1205
1206   // walk through my dirs (in order!)
1207   for (list<dirfrag_t>::iterator lp = lump_order.begin();
1208        lp != lump_order.end();
1209        ++lp) {
1210     dout(10) << "EMetaBlob.replay dir " << *lp << dendl;
1211     dirlump &lump = lump_map[*lp];
1212
1213     // the dir 
1214     CDir *dir = mds->mdcache->get_force_dirfrag(*lp, true);
1215     if (!dir) {
1216       // hmm.  do i have the inode?
1217       CInode *diri = mds->mdcache->get_inode((*lp).ino);
1218       if (!diri) {
1219         if (MDS_INO_IS_MDSDIR(lp->ino)) {
1220           assert(MDS_INO_MDSDIR(mds->get_nodeid()) != lp->ino);
1221           diri = mds->mdcache->create_system_inode(lp->ino, S_IFDIR|0755);
1222           diri->state_clear(CInode::STATE_AUTH);
1223           dout(10) << "EMetaBlob.replay created base " << *diri << dendl;
1224         } else {
1225           dout(0) << "EMetaBlob.replay missing dir ino  " << (*lp).ino << dendl;
1226           mds->clog->error() << "failure replaying journal (EMetaBlob)";
1227           mds->damaged();
1228           ceph_abort();  // Should be unreachable because damaged() calls respawn()
1229         }
1230       }
1231
1232       // create the dirfrag
1233       dir = diri->get_or_open_dirfrag(mds->mdcache, (*lp).frag);
1234
1235       if (MDS_INO_IS_BASE(lp->ino))
1236         mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
1237
1238       dout(10) << "EMetaBlob.replay added dir " << *dir << dendl;  
1239     }
1240     dir->set_version( lump.fnode.version );
1241     dir->fnode = lump.fnode;
1242
1243     if (lump.is_importing()) {
1244       dir->state_set(CDir::STATE_AUTH);
1245       dir->state_clear(CDir::STATE_COMPLETE);
1246     }
1247     if (lump.is_dirty()) {
1248       dir->_mark_dirty(logseg);
1249
1250       if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) {
1251         dout(10) << "EMetaBlob.replay      dirty nestinfo on " << *dir << dendl;
1252         mds->locker->mark_updated_scatterlock(&dir->inode->nestlock);
1253         logseg->dirty_dirfrag_nest.push_back(&dir->inode->item_dirty_dirfrag_nest);
1254       } else {
1255         dout(10) << "EMetaBlob.replay      clean nestinfo on " << *dir << dendl;
1256       }
1257       if (!(dir->fnode.fragstat == dir->fnode.accounted_fragstat)) {
1258         dout(10) << "EMetaBlob.replay      dirty fragstat on " << *dir << dendl;
1259         mds->locker->mark_updated_scatterlock(&dir->inode->filelock);
1260         logseg->dirty_dirfrag_dir.push_back(&dir->inode->item_dirty_dirfrag_dir);
1261       } else {
1262         dout(10) << "EMetaBlob.replay      clean fragstat on " << *dir << dendl;
1263       }
1264     }
1265     if (lump.is_dirty_dft()) {
1266       dout(10) << "EMetaBlob.replay      dirty dirfragtree on " << *dir << dendl;
1267       dir->state_set(CDir::STATE_DIRTYDFT);
1268       mds->locker->mark_updated_scatterlock(&dir->inode->dirfragtreelock);
1269       logseg->dirty_dirfrag_dirfragtree.push_back(&dir->inode->item_dirty_dirfrag_dirfragtree);
1270     }
1271     if (lump.is_new())
1272       dir->mark_new(logseg);
1273     if (lump.is_complete())
1274       dir->mark_complete();
1275     
1276     dout(10) << "EMetaBlob.replay updated dir " << *dir << dendl;  
1277
1278     // decode bits
1279     lump._decode_bits();
1280
1281     // full dentry+inode pairs
1282     for (list<ceph::shared_ptr<fullbit> >::const_iterator pp = lump.get_dfull().begin();
1283          pp != lump.get_dfull().end();
1284          ++pp) {
1285       ceph::shared_ptr<fullbit> p = *pp;
1286       CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
1287       if (!dn) {
1288         dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
1289         dn->set_version(p->dnv);
1290         if (p->is_dirty()) dn->_mark_dirty(logseg);
1291         dout(10) << "EMetaBlob.replay added (full) " << *dn << dendl;
1292       } else {
1293         dn->set_version(p->dnv);
1294         if (p->is_dirty()) dn->_mark_dirty(logseg);
1295         dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
1296         dn->first = p->dnfirst;
1297         assert(dn->last == p->dnlast);
1298       }
1299       if (lump.is_importing())
1300         dn->state_set(CDentry::STATE_AUTH);
1301
1302       CInode *in = mds->mdcache->get_inode(p->inode.ino, p->dnlast);
1303       if (!in) {
1304         in = new CInode(mds->mdcache, dn->is_auth(), p->dnfirst, p->dnlast);
1305         p->update_inode(mds, in);
1306         mds->mdcache->add_inode(in);
1307         if (!dn->get_linkage()->is_null()) {
1308           if (dn->get_linkage()->is_primary()) {
1309             unlinked[dn->get_linkage()->get_inode()] = dir;
1310             stringstream ss;
1311             ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1312                << " " << *dn->get_linkage()->get_inode() << " should be " << p->inode.ino;
1313             dout(0) << ss.str() << dendl;
1314             mds->clog->warn(ss);
1315           }
1316           dir->unlink_inode(dn, false);
1317         }
1318         if (unlinked.count(in))
1319           linked.insert(in);
1320         dir->link_primary_inode(dn, in);
1321         dout(10) << "EMetaBlob.replay added " << *in << dendl;
1322       } else {
1323         in->first = p->dnfirst;
1324         p->update_inode(mds, in);
1325         if (dn->get_linkage()->get_inode() != in && in->get_parent_dn()) {
1326           dout(10) << "EMetaBlob.replay unlinking " << *in << dendl;
1327           unlinked[in] = in->get_parent_dir();
1328           in->get_parent_dir()->unlink_inode(in->get_parent_dn());
1329         }
1330         if (dn->get_linkage()->get_inode() != in) {
1331           if (!dn->get_linkage()->is_null()) { // note: might be remote.  as with stray reintegration.
1332             if (dn->get_linkage()->is_primary()) {
1333               unlinked[dn->get_linkage()->get_inode()] = dir;
1334               stringstream ss;
1335               ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1336                  << " " << *dn->get_linkage()->get_inode() << " should be " << p->inode.ino;
1337               dout(0) << ss.str() << dendl;
1338               mds->clog->warn(ss);
1339             }
1340             dir->unlink_inode(dn, false);
1341           }
1342           if (unlinked.count(in))
1343             linked.insert(in);
1344           dir->link_primary_inode(dn, in);
1345           dout(10) << "EMetaBlob.replay linked " << *in << dendl;
1346         } else {
1347           dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *in << dendl;
1348         }
1349         assert(in->first == p->dnfirst ||
1350                (in->is_multiversion() && in->first > p->dnfirst));
1351       }
1352       if (p->is_dirty())
1353         in->_mark_dirty(logseg);
1354       if (p->is_dirty_parent())
1355         in->_mark_dirty_parent(logseg, p->is_dirty_pool());
1356       if (p->need_snapflush())
1357         logseg->open_files.push_back(&in->item_open_file);
1358       if (dn->is_auth())
1359         in->state_set(CInode::STATE_AUTH);
1360       else
1361         in->state_clear(CInode::STATE_AUTH);
1362       assert(g_conf->mds_kill_journal_replay_at != 2);
1363     }
1364
1365     // remote dentries
1366     for (list<remotebit>::const_iterator p = lump.get_dremote().begin();
1367          p != lump.get_dremote().end();
1368          ++p) {
1369       CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
1370       if (!dn) {
1371         dn = dir->add_remote_dentry(p->dn, p->ino, p->d_type, p->dnfirst, p->dnlast);
1372         dn->set_version(p->dnv);
1373         if (p->dirty) dn->_mark_dirty(logseg);
1374         dout(10) << "EMetaBlob.replay added " << *dn << dendl;
1375       } else {
1376         if (!dn->get_linkage()->is_null()) {
1377           dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
1378           if (dn->get_linkage()->is_primary()) {
1379             unlinked[dn->get_linkage()->get_inode()] = dir;
1380             stringstream ss;
1381             ss << "EMetaBlob.replay FIXME had dentry linked to wrong inode " << *dn
1382                << " " << *dn->get_linkage()->get_inode() << " should be remote " << p->ino;
1383             dout(0) << ss.str() << dendl;
1384           }
1385           dir->unlink_inode(dn, false);
1386         }
1387         dir->link_remote_inode(dn, p->ino, p->d_type);
1388         dn->set_version(p->dnv);
1389         if (p->dirty) dn->_mark_dirty(logseg);
1390         dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
1391         dn->first = p->dnfirst;
1392         assert(dn->last == p->dnlast);
1393       }
1394       if (lump.is_importing())
1395         dn->state_set(CDentry::STATE_AUTH);
1396     }
1397
1398     // null dentries
1399     for (list<nullbit>::const_iterator p = lump.get_dnull().begin();
1400          p != lump.get_dnull().end();
1401          ++p) {
1402       CDentry *dn = dir->lookup_exact_snap(p->dn, p->dnlast);
1403       if (!dn) {
1404         dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
1405         dn->set_version(p->dnv);
1406         if (p->dirty) dn->_mark_dirty(logseg);
1407         dout(10) << "EMetaBlob.replay added (nullbit) " << *dn << dendl;
1408       } else {
1409         dn->first = p->dnfirst;
1410         if (!dn->get_linkage()->is_null()) {
1411           dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
1412           CInode *in = dn->get_linkage()->get_inode();
1413           // For renamed inode, We may call CInode::force_dirfrag() later.
1414           // CInode::force_dirfrag() doesn't work well when inode is detached
1415           // from the hierarchy.
1416           if (!renamed_diri || renamed_diri != in) {
1417             if (dn->get_linkage()->is_primary())
1418               unlinked[in] = dir;
1419             dir->unlink_inode(dn);
1420           }
1421         }
1422         dn->set_version(p->dnv);
1423         if (p->dirty) dn->_mark_dirty(logseg);
1424         dout(10) << "EMetaBlob.replay had " << *dn << dendl;
1425         assert(dn->last == p->dnlast);
1426       }
1427       olddir = dir;
1428       if (lump.is_importing())
1429         dn->state_set(CDentry::STATE_AUTH);
1430
1431       // Make null dentries the first things we trim
1432       dout(10) << "EMetaBlob.replay pushing to bottom of lru " << *dn << dendl;
1433     }
1434   }
1435
1436   assert(g_conf->mds_kill_journal_replay_at != 3);
1437
1438   if (renamed_dirino) {
1439     if (renamed_diri) {
1440       assert(unlinked.count(renamed_diri));
1441       assert(linked.count(renamed_diri));
1442       olddir = unlinked[renamed_diri];
1443     } else {
1444       // we imported a diri we haven't seen before
1445       renamed_diri = mds->mdcache->get_inode(renamed_dirino);
1446       assert(renamed_diri);  // it was in the metablob
1447     }
1448
1449     if (olddir) {
1450       if (olddir->authority() != CDIR_AUTH_UNDEF &&
1451           renamed_diri->authority() == CDIR_AUTH_UNDEF) {
1452         assert(slaveup); // auth to non-auth, must be slave prepare
1453         list<frag_t> leaves;
1454         renamed_diri->dirfragtree.get_leaves(leaves);
1455         for (list<frag_t>::iterator p = leaves.begin(); p != leaves.end(); ++p) {
1456           CDir *dir = renamed_diri->get_dirfrag(*p);
1457           assert(dir);
1458           if (dir->get_dir_auth() == CDIR_AUTH_UNDEF)
1459             // preserve subtree bound until slave commit
1460             slaveup->olddirs.insert(dir->inode);
1461           else
1462             dir->state_set(CDir::STATE_AUTH);
1463         }
1464       }
1465
1466       mds->mdcache->adjust_subtree_after_rename(renamed_diri, olddir, false);
1467       
1468       // see if we can discard the subtree we renamed out of
1469       CDir *root = mds->mdcache->get_subtree_root(olddir);
1470       if (root->get_dir_auth() == CDIR_AUTH_UNDEF) {
1471         if (slaveup) // preserve the old dir until slave commit
1472           slaveup->olddirs.insert(olddir->inode);
1473         else
1474           mds->mdcache->try_trim_non_auth_subtree(root);
1475       }
1476     }
1477
1478     // if we are the srci importer, we'll also have some dirfrags we have to open up...
1479     if (renamed_diri->authority() != CDIR_AUTH_UNDEF) {
1480       for (list<frag_t>::iterator p = renamed_dir_frags.begin(); p != renamed_dir_frags.end(); ++p) {
1481         CDir *dir = renamed_diri->get_dirfrag(*p);
1482         if (dir) {
1483           // we already had the inode before, and we already adjusted this subtree accordingly.
1484           dout(10) << " already had+adjusted rename import bound " << *dir << dendl;
1485           assert(olddir); 
1486           continue;
1487         }
1488         dir = renamed_diri->get_or_open_dirfrag(mds->mdcache, *p);
1489         dout(10) << " creating new rename import bound " << *dir << dendl;
1490         dir->state_clear(CDir::STATE_AUTH);
1491         mds->mdcache->adjust_subtree_auth(dir, CDIR_AUTH_UNDEF);
1492       }
1493     }
1494
1495     // rename may overwrite an empty directory and move it into stray dir.
1496     unlinked.erase(renamed_diri);
1497     for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
1498       if (!linked.count(p->first))
1499         continue;
1500       assert(p->first->is_dir());
1501       mds->mdcache->adjust_subtree_after_rename(p->first, p->second, false);
1502     }
1503   }
1504
1505   if (!unlinked.empty()) {
1506     for (set<CInode*>::iterator p = linked.begin(); p != linked.end(); ++p)
1507       unlinked.erase(*p);
1508     dout(10) << " unlinked set contains " << unlinked << dendl;
1509     for (map<CInode*, CDir*>::iterator p = unlinked.begin(); p != unlinked.end(); ++p) {
1510       if (slaveup) // preserve unlinked inodes until slave commit
1511         slaveup->unlinked.insert(p->first);
1512       else
1513         mds->mdcache->remove_inode_recursive(p->first);
1514     }
1515   }
1516
1517   // table client transactions
1518   for (list<pair<__u8,version_t> >::iterator p = table_tids.begin();
1519        p != table_tids.end();
1520        ++p) {
1521     dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p->first)
1522              << " transaction " << p->second << dendl;
1523     MDSTableClient *client = mds->get_table_client(p->first);
1524     if (client)
1525       client->got_journaled_agree(p->second, logseg);
1526   }
1527
1528   // opened ino?
1529   if (opened_ino) {
1530     CInode *in = mds->mdcache->get_inode(opened_ino);
1531     assert(in);
1532     dout(10) << "EMetaBlob.replay noting opened inode " << *in << dendl;
1533     logseg->open_files.push_back(&in->item_open_file);
1534   }
1535
1536   // allocated_inos
1537   if (inotablev) {
1538     if (mds->inotable->get_version() >= inotablev) {
1539       dout(10) << "EMetaBlob.replay inotable tablev " << inotablev
1540                << " <= table " << mds->inotable->get_version() << dendl;
1541     } else {
1542       dout(10) << "EMetaBlob.replay inotable v " << inotablev
1543                << " - 1 == table " << mds->inotable->get_version()
1544                << " allocated+used " << allocated_ino
1545                << " prealloc " << preallocated_inos
1546                << dendl;
1547       if (allocated_ino)
1548         mds->inotable->replay_alloc_id(allocated_ino);
1549       if (preallocated_inos.size())
1550         mds->inotable->replay_alloc_ids(preallocated_inos);
1551
1552       // [repair bad inotable updates]
1553       if (inotablev > mds->inotable->get_version()) {
1554         mds->clog->error() << "journal replay inotablev mismatch "
1555             << mds->inotable->get_version() << " -> " << inotablev;
1556         mds->inotable->force_replay_version(inotablev);
1557       }
1558
1559       assert(inotablev == mds->inotable->get_version());
1560     }
1561   }
1562   if (sessionmapv) {
1563     if (mds->sessionmap.get_version() >= sessionmapv) {
1564       dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
1565                << " <= table " << mds->sessionmap.get_version() << dendl;
1566     } else if (mds->sessionmap.get_version() + 2 >= sessionmapv) {
1567       dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
1568                << " -(1|2) == table " << mds->sessionmap.get_version()
1569                << " prealloc " << preallocated_inos
1570                << " used " << used_preallocated_ino
1571                << dendl;
1572       Session *session = mds->sessionmap.get_session(client_name);
1573       if (session) {
1574         dout(20) << " (session prealloc " << session->info.prealloc_inos << ")" << dendl;
1575         if (used_preallocated_ino) {
1576           if (!session->info.prealloc_inos.empty()) {
1577             inodeno_t next = session->next_ino();
1578             inodeno_t i = session->take_ino(used_preallocated_ino);
1579             if (next != i)
1580               mds->clog->warn() << " replayed op " << client_reqs << " used ino " << i
1581                                << " but session next is " << next;
1582             assert(i == used_preallocated_ino);
1583             session->info.used_inos.clear();
1584           }
1585           mds->sessionmap.replay_dirty_session(session);
1586         }
1587         if (!preallocated_inos.empty()) {
1588           session->info.prealloc_inos.insert(preallocated_inos);
1589           mds->sessionmap.replay_dirty_session(session);
1590         }
1591
1592       } else {
1593         dout(10) << "EMetaBlob.replay no session for " << client_name << dendl;
1594         if (used_preallocated_ino) {
1595           mds->sessionmap.replay_advance_version();
1596         }
1597         if (!preallocated_inos.empty())
1598           mds->sessionmap.replay_advance_version();
1599       }
1600       assert(sessionmapv == mds->sessionmap.get_version());
1601     } else {
1602       mds->clog->error() << "journal replay sessionmap v " << sessionmapv
1603                         << " -(1|2) > table " << mds->sessionmap.get_version();
1604       assert(g_conf->mds_wipe_sessions);
1605       mds->sessionmap.wipe();
1606       mds->sessionmap.set_version(sessionmapv);
1607     }
1608   }
1609
1610   // truncating inodes
1611   for (list<inodeno_t>::iterator p = truncate_start.begin();
1612        p != truncate_start.end();
1613        ++p) {
1614     CInode *in = mds->mdcache->get_inode(*p);
1615     assert(in);
1616     mds->mdcache->add_recovered_truncate(in, logseg);
1617   }
1618   for (map<inodeno_t,uint64_t>::iterator p = truncate_finish.begin();
1619        p != truncate_finish.end();
1620        ++p) {
1621     LogSegment *ls = mds->mdlog->get_segment(p->second);
1622     if (ls) {
1623       CInode *in = mds->mdcache->get_inode(p->first);
1624       assert(in);
1625       mds->mdcache->remove_recovered_truncate(in, ls);
1626     }
1627   }
1628
1629   // destroyed inodes
1630   for (vector<inodeno_t>::iterator p = destroyed_inodes.begin();
1631        p != destroyed_inodes.end();
1632        ++p) {
1633     CInode *in = mds->mdcache->get_inode(*p);
1634     if (in) {
1635       dout(10) << "EMetaBlob.replay destroyed " << *p << ", dropping " << *in << dendl;
1636       CDentry *parent = in->get_parent_dn();
1637       mds->mdcache->remove_inode(in);
1638       if (parent) {
1639         dout(10) << "EMetaBlob.replay unlinked from dentry " << *parent << dendl;
1640         assert(parent->get_linkage()->is_null());
1641       }
1642     } else {
1643       dout(10) << "EMetaBlob.replay destroyed " << *p << ", not in cache" << dendl;
1644     }
1645   }
1646
1647   // client requests
1648   for (list<pair<metareqid_t, uint64_t> >::iterator p = client_reqs.begin();
1649        p != client_reqs.end();
1650        ++p) {
1651     if (p->first.name.is_client()) {
1652       dout(10) << "EMetaBlob.replay request " << p->first << " trim_to " << p->second << dendl;
1653       inodeno_t created = allocated_ino ? allocated_ino : used_preallocated_ino;
1654       // if we allocated an inode, there should be exactly one client request id.
1655       assert(created == inodeno_t() || client_reqs.size() == 1);
1656
1657       Session *session = mds->sessionmap.get_session(p->first.name);
1658       if (session) {
1659         session->add_completed_request(p->first.tid, created);
1660         if (p->second)
1661           session->trim_completed_requests(p->second);
1662       }
1663     }
1664   }
1665
1666   // client flushes
1667   for (list<pair<metareqid_t, uint64_t> >::iterator p = client_flushes.begin();
1668        p != client_flushes.end();
1669        ++p) {
1670     if (p->first.name.is_client()) {
1671       dout(10) << "EMetaBlob.replay flush " << p->first << " trim_to " << p->second << dendl;
1672       Session *session = mds->sessionmap.get_session(p->first.name);
1673       if (session) {
1674         session->add_completed_flush(p->first.tid);
1675         if (p->second)
1676           session->trim_completed_flushes(p->second);
1677       }
1678     }
1679   }
1680
1681   // update segment
1682   update_segment(logseg);
1683
1684   assert(g_conf->mds_kill_journal_replay_at != 4);
1685 }
1686
1687 // -----------------------
1688 // ESession
1689
1690 void ESession::update_segment()
1691 {
1692   _segment->sessionmapv = cmapv;
1693   if (inos.size() && inotablev)
1694     _segment->inotablev = inotablev;
1695 }
1696
1697 void ESession::replay(MDSRank *mds)
1698 {
1699   if (mds->sessionmap.get_version() >= cmapv) {
1700     dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version() 
1701              << " >= " << cmapv << ", noop" << dendl;
1702   } else {
1703     dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
1704              << " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl;
1705     Session *session;
1706     if (open) {
1707       session = mds->sessionmap.get_or_add_session(client_inst);
1708       mds->sessionmap.set_state(session, Session::STATE_OPEN);
1709       session->set_client_metadata(client_metadata);
1710       dout(10) << " opened session " << session->info.inst << dendl;
1711     } else {
1712       session = mds->sessionmap.get_session(client_inst.name);
1713       if (session) { // there always should be a session, but there's a bug
1714         if (session->connection == NULL) {
1715           dout(10) << " removed session " << session->info.inst << dendl;
1716           mds->sessionmap.remove_session(session);
1717           session = NULL;
1718         } else {
1719           session->clear();    // the client has reconnected; keep the Session, but reset
1720           dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl;
1721         }
1722       } else {
1723         mds->clog->error() << "replayed stray Session close event for " << client_inst
1724                           << " from time " << stamp << ", ignoring";
1725       }
1726     }
1727     if (session) {
1728       mds->sessionmap.replay_dirty_session(session);
1729     } else {
1730       mds->sessionmap.replay_advance_version();
1731     }
1732     assert(mds->sessionmap.get_version() == cmapv);
1733   }
1734   
1735   if (inos.size() && inotablev) {
1736     if (mds->inotable->get_version() >= inotablev) {
1737       dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
1738                << " >= " << inotablev << ", noop" << dendl;
1739     } else {
1740       dout(10) << "ESession.replay inotable " << mds->inotable->get_version()
1741                << " < " << inotablev << " " << (open ? "add":"remove") << dendl;
1742       assert(!open);  // for now
1743       mds->inotable->replay_release_ids(inos);
1744       assert(mds->inotable->get_version() == inotablev);
1745     }
1746   }
1747
1748   update_segment();
1749 }
1750
1751 void ESession::encode(bufferlist &bl, uint64_t features) const
1752 {
1753   ENCODE_START(4, 3, bl);
1754   ::encode(stamp, bl);
1755   ::encode(client_inst, bl, features);
1756   ::encode(open, bl);
1757   ::encode(cmapv, bl);
1758   ::encode(inos, bl);
1759   ::encode(inotablev, bl);
1760   ::encode(client_metadata, bl);
1761   ENCODE_FINISH(bl);
1762 }
1763
1764 void ESession::decode(bufferlist::iterator &bl)
1765 {
1766   DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, bl);
1767   if (struct_v >= 2)
1768     ::decode(stamp, bl);
1769   ::decode(client_inst, bl);
1770   ::decode(open, bl);
1771   ::decode(cmapv, bl);
1772   ::decode(inos, bl);
1773   ::decode(inotablev, bl);
1774   if (struct_v >= 4) {
1775     ::decode(client_metadata, bl);
1776   }
1777   DECODE_FINISH(bl);
1778 }
1779
1780 void ESession::dump(Formatter *f) const
1781 {
1782   f->dump_stream("client instance") << client_inst;
1783   f->dump_string("open", open ? "true" : "false");
1784   f->dump_int("client map version", cmapv);
1785   f->dump_stream("inos") << inos;
1786   f->dump_int("inotable version", inotablev);
1787   f->open_object_section("client_metadata");
1788   for (map<string, string>::const_iterator i = client_metadata.begin();
1789       i != client_metadata.end(); ++i) {
1790     f->dump_string(i->first.c_str(), i->second);
1791   }
1792   f->close_section();  // client_metadata
1793 }
1794
1795 void ESession::generate_test_instances(list<ESession*>& ls)
1796 {
1797   ls.push_back(new ESession);
1798 }
1799
1800 // -----------------------
1801 // ESessions
1802
1803 void ESessions::encode(bufferlist &bl, uint64_t features) const
1804 {
1805   ENCODE_START(1, 1, bl);
1806   ::encode(client_map, bl, features);
1807   ::encode(cmapv, bl);
1808   ::encode(stamp, bl);
1809   ENCODE_FINISH(bl);
1810 }
1811
1812 void ESessions::decode_old(bufferlist::iterator &bl)
1813 {
1814   ::decode(client_map, bl);
1815   ::decode(cmapv, bl);
1816   if (!bl.end())
1817     ::decode(stamp, bl);
1818 }
1819
1820 void ESessions::decode_new(bufferlist::iterator &bl)
1821 {
1822   DECODE_START(1, bl);
1823   ::decode(client_map, bl);
1824   ::decode(cmapv, bl);
1825   if (!bl.end())
1826     ::decode(stamp, bl);
1827   DECODE_FINISH(bl);
1828 }
1829
1830 void ESessions::dump(Formatter *f) const
1831 {
1832   f->dump_int("client map version", cmapv);
1833
1834   f->open_array_section("client map");
1835   for (map<client_t,entity_inst_t>::const_iterator i = client_map.begin();
1836        i != client_map.end(); ++i) {
1837     f->open_object_section("client");
1838     f->dump_int("client id", i->first.v);
1839     f->dump_stream("client entity") << i->second;
1840     f->close_section(); // client
1841   }
1842   f->close_section(); // client map
1843 }
1844
1845 void ESessions::generate_test_instances(list<ESessions*>& ls)
1846 {
1847   ls.push_back(new ESessions());
1848 }
1849
1850 void ESessions::update_segment()
1851 {
1852   _segment->sessionmapv = cmapv;
1853 }
1854
1855 void ESessions::replay(MDSRank *mds)
1856 {
1857   if (mds->sessionmap.get_version() >= cmapv) {
1858     dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
1859              << " >= " << cmapv << ", noop" << dendl;
1860   } else {
1861     dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
1862              << " < " << cmapv << dendl;
1863     mds->sessionmap.open_sessions(client_map);
1864     assert(mds->sessionmap.get_version() == cmapv);
1865     mds->sessionmap.set_projected(mds->sessionmap.get_version());
1866   }
1867   update_segment();
1868 }
1869
1870
1871 // -----------------------
1872 // ETableServer
1873
1874 void ETableServer::encode(bufferlist& bl, uint64_t features) const
1875 {
1876   ENCODE_START(3, 3, bl);
1877   ::encode(stamp, bl);
1878   ::encode(table, bl);
1879   ::encode(op, bl);
1880   ::encode(reqid, bl);
1881   ::encode(bymds, bl);
1882   ::encode(mutation, bl);
1883   ::encode(tid, bl);
1884   ::encode(version, bl);
1885   ENCODE_FINISH(bl);
1886 }
1887
1888 void ETableServer::decode(bufferlist::iterator &bl)
1889 {
1890   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
1891   if (struct_v >= 2)
1892     ::decode(stamp, bl);
1893   ::decode(table, bl);
1894   ::decode(op, bl);
1895   ::decode(reqid, bl);
1896   ::decode(bymds, bl);
1897   ::decode(mutation, bl);
1898   ::decode(tid, bl);
1899   ::decode(version, bl);
1900   DECODE_FINISH(bl);
1901 }
1902
1903 void ETableServer::dump(Formatter *f) const
1904 {
1905   f->dump_int("table id", table);
1906   f->dump_int("op", op);
1907   f->dump_int("request id", reqid);
1908   f->dump_int("by mds", bymds);
1909   f->dump_int("tid", tid);
1910   f->dump_int("version", version);
1911 }
1912
1913 void ETableServer::generate_test_instances(list<ETableServer*>& ls)
1914 {
1915   ls.push_back(new ETableServer());
1916 }
1917
1918
1919 void ETableServer::update_segment()
1920 {
1921   _segment->tablev[table] = version;
1922 }
1923
1924 void ETableServer::replay(MDSRank *mds)
1925 {
1926   MDSTableServer *server = mds->get_table_server(table);
1927   if (!server)
1928     return;
1929
1930   if (server->get_version() >= version) {
1931     dout(10) << "ETableServer.replay " << get_mdstable_name(table)
1932              << " " << get_mdstableserver_opname(op)
1933              << " event " << version
1934              << " <= table " << server->get_version() << dendl;
1935     return;
1936   }
1937   
1938   dout(10) << " ETableServer.replay " << get_mdstable_name(table)
1939            << " " << get_mdstableserver_opname(op)
1940            << " event " << version << " - 1 == table " << server->get_version() << dendl;
1941   assert(version-1 == server->get_version());
1942
1943   switch (op) {
1944   case TABLESERVER_OP_PREPARE:
1945     server->_prepare(mutation, reqid, bymds);
1946     server->_note_prepare(bymds, reqid);
1947     break;
1948   case TABLESERVER_OP_COMMIT:
1949     server->_commit(tid);
1950     server->_note_commit(tid);
1951     break;
1952   case TABLESERVER_OP_ROLLBACK:
1953     server->_rollback(tid);
1954     server->_note_rollback(tid);
1955     break;
1956   case TABLESERVER_OP_SERVER_UPDATE:
1957     server->_server_update(mutation);
1958     break;
1959   default:
1960     mds->clog->error() << "invalid tableserver op in ETableServer";
1961     mds->damaged();
1962     ceph_abort();  // Should be unreachable because damaged() calls respawn()
1963   }
1964   
1965   assert(version == server->get_version());
1966   update_segment();
1967 }
1968
1969
1970 // ---------------------
1971 // ETableClient
1972
1973 void ETableClient::encode(bufferlist& bl, uint64_t features) const
1974 {
1975   ENCODE_START(3, 3, bl);
1976   ::encode(stamp, bl);
1977   ::encode(table, bl);
1978   ::encode(op, bl);
1979   ::encode(tid, bl);
1980   ENCODE_FINISH(bl);
1981 }
1982
1983 void ETableClient::decode(bufferlist::iterator &bl)
1984 {
1985   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
1986   if (struct_v >= 2)
1987     ::decode(stamp, bl);
1988   ::decode(table, bl);
1989   ::decode(op, bl);
1990   ::decode(tid, bl);
1991   DECODE_FINISH(bl);
1992 }
1993
1994 void ETableClient::dump(Formatter *f) const
1995 {
1996   f->dump_int("table", table);
1997   f->dump_int("op", op);
1998   f->dump_int("tid", tid);
1999 }
2000
2001 void ETableClient::generate_test_instances(list<ETableClient*>& ls)
2002 {
2003   ls.push_back(new ETableClient());
2004 }
2005
2006 void ETableClient::replay(MDSRank *mds)
2007 {
2008   dout(10) << " ETableClient.replay " << get_mdstable_name(table)
2009            << " op " << get_mdstableserver_opname(op)
2010            << " tid " << tid << dendl;
2011     
2012   MDSTableClient *client = mds->get_table_client(table);
2013   if (!client)
2014     return;
2015
2016   assert(op == TABLESERVER_OP_ACK);
2017   client->got_journaled_ack(tid);
2018 }
2019
2020
2021 // -----------------------
2022 // ESnap
2023 /*
2024 void ESnap::update_segment()
2025 {
2026   _segment->tablev[TABLE_SNAP] = version;
2027 }
2028
2029 void ESnap::replay(MDSRank *mds)
2030 {
2031   if (mds->snaptable->get_version() >= version) {
2032     dout(10) << "ESnap.replay event " << version
2033              << " <= table " << mds->snaptable->get_version() << dendl;
2034     return;
2035   } 
2036   
2037   dout(10) << " ESnap.replay event " << version
2038            << " - 1 == table " << mds->snaptable->get_version() << dendl;
2039   assert(version-1 == mds->snaptable->get_version());
2040
2041   if (create) {
2042     version_t v;
2043     snapid_t s = mds->snaptable->create(snap.dirino, snap.name, snap.stamp, &v);
2044     assert(s == snap.snapid);
2045   } else {
2046     mds->snaptable->remove(snap.snapid);
2047   }
2048
2049   assert(version == mds->snaptable->get_version());
2050 }
2051 */
2052
2053
2054
2055 // -----------------------
2056 // EUpdate
2057
2058 void EUpdate::encode(bufferlist &bl, uint64_t features) const
2059 {
2060   ENCODE_START(4, 4, bl);
2061   ::encode(stamp, bl);
2062   ::encode(type, bl);
2063   ::encode(metablob, bl, features);
2064   ::encode(client_map, bl);
2065   ::encode(cmapv, bl);
2066   ::encode(reqid, bl);
2067   ::encode(had_slaves, bl);
2068   ENCODE_FINISH(bl);
2069 }
2070  
2071 void EUpdate::decode(bufferlist::iterator &bl)
2072 {
2073   DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl);
2074   if (struct_v >= 2)
2075     ::decode(stamp, bl);
2076   ::decode(type, bl);
2077   ::decode(metablob, bl);
2078   ::decode(client_map, bl);
2079   if (struct_v >= 3)
2080     ::decode(cmapv, bl);
2081   ::decode(reqid, bl);
2082   ::decode(had_slaves, bl);
2083   DECODE_FINISH(bl);
2084 }
2085
2086 void EUpdate::dump(Formatter *f) const
2087 {
2088   f->open_object_section("metablob");
2089   metablob.dump(f);
2090   f->close_section(); // metablob
2091
2092   f->dump_string("type", type);
2093   f->dump_int("client map length", client_map.length());
2094   f->dump_int("client map version", cmapv);
2095   f->dump_stream("reqid") << reqid;
2096   f->dump_string("had slaves", had_slaves ? "true" : "false");
2097 }
2098
2099 void EUpdate::generate_test_instances(list<EUpdate*>& ls)
2100 {
2101   ls.push_back(new EUpdate());
2102 }
2103
2104
2105 void EUpdate::update_segment()
2106 {
2107   metablob.update_segment(_segment);
2108
2109   if (client_map.length())
2110     _segment->sessionmapv = cmapv;
2111
2112   if (had_slaves)
2113     _segment->uncommitted_masters.insert(reqid);
2114 }
2115
2116 void EUpdate::replay(MDSRank *mds)
2117 {
2118   metablob.replay(mds, _segment);
2119   
2120   if (had_slaves) {
2121     dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
2122     _segment->uncommitted_masters.insert(reqid);
2123     set<mds_rank_t> slaves;
2124     mds->mdcache->add_uncommitted_master(reqid, _segment, slaves, true);
2125   }
2126   
2127   if (client_map.length()) {
2128     if (mds->sessionmap.get_version() >= cmapv) {
2129       dout(10) << "EUpdate.replay sessionmap v " << cmapv
2130                << " <= table " << mds->sessionmap.get_version() << dendl;
2131     } else {
2132       dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version()
2133                << " < " << cmapv << dendl;
2134       // open client sessions?
2135       map<client_t,entity_inst_t> cm;
2136       bufferlist::iterator blp = client_map.begin();
2137       ::decode(cm, blp);
2138       mds->sessionmap.open_sessions(cm);
2139
2140       assert(mds->sessionmap.get_version() == cmapv);
2141       mds->sessionmap.set_projected(mds->sessionmap.get_version());
2142     }
2143   }
2144   update_segment();
2145 }
2146
2147
2148 // ------------------------
2149 // EOpen
2150
2151 void EOpen::encode(bufferlist &bl, uint64_t features) const {
2152   ENCODE_START(4, 3, bl);
2153   ::encode(stamp, bl);
2154   ::encode(metablob, bl, features);
2155   ::encode(inos, bl);
2156   ::encode(snap_inos, bl);
2157   ENCODE_FINISH(bl);
2158
2159
2160 void EOpen::decode(bufferlist::iterator &bl) {
2161   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2162   if (struct_v >= 2)
2163     ::decode(stamp, bl);
2164   ::decode(metablob, bl);
2165   ::decode(inos, bl);
2166   if (struct_v >= 4)
2167     ::decode(snap_inos, bl);
2168   DECODE_FINISH(bl);
2169 }
2170
2171 void EOpen::dump(Formatter *f) const
2172 {
2173   f->open_object_section("metablob");
2174   metablob.dump(f);
2175   f->close_section(); // metablob
2176   f->open_array_section("inos involved");
2177   for (vector<inodeno_t>::const_iterator i = inos.begin();
2178        i != inos.end(); ++i) {
2179     f->dump_int("ino", *i);
2180   }
2181   f->close_section(); // inos
2182 }
2183
2184 void EOpen::generate_test_instances(list<EOpen*>& ls)
2185 {
2186   ls.push_back(new EOpen());
2187   ls.push_back(new EOpen());
2188   ls.back()->add_ino(0);
2189 }
2190
2191 void EOpen::update_segment()
2192 {
2193   // ??
2194 }
2195
2196 void EOpen::replay(MDSRank *mds)
2197 {
2198   dout(10) << "EOpen.replay " << dendl;
2199   metablob.replay(mds, _segment);
2200
2201   // note which segments inodes belong to, so we don't have to start rejournaling them
2202   for (const auto &ino : inos) {
2203     CInode *in = mds->mdcache->get_inode(ino);
2204     if (!in) {
2205       dout(0) << "EOpen.replay ino " << ino << " not in metablob" << dendl;
2206       assert(in);
2207     }
2208     _segment->open_files.push_back(&in->item_open_file);
2209   }
2210   for (const auto &vino : snap_inos) {
2211     CInode *in = mds->mdcache->get_inode(vino);
2212     if (!in) {
2213       dout(0) << "EOpen.replay ino " << vino << " not in metablob" << dendl;
2214       assert(in);
2215     }
2216     _segment->open_files.push_back(&in->item_open_file);
2217   }
2218 }
2219
2220
2221 // -----------------------
2222 // ECommitted
2223
2224 void ECommitted::replay(MDSRank *mds)
2225 {
2226   if (mds->mdcache->uncommitted_masters.count(reqid)) {
2227     dout(10) << "ECommitted.replay " << reqid << dendl;
2228     mds->mdcache->uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid);
2229     mds->mdcache->uncommitted_masters.erase(reqid);
2230   } else {
2231     dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
2232   }
2233 }
2234
2235 void ECommitted::encode(bufferlist& bl, uint64_t features) const
2236 {
2237   ENCODE_START(3, 3, bl);
2238   ::encode(stamp, bl);
2239   ::encode(reqid, bl);
2240   ENCODE_FINISH(bl);
2241
2242
2243 void ECommitted::decode(bufferlist::iterator& bl)
2244 {
2245   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2246   if (struct_v >= 2)
2247     ::decode(stamp, bl);
2248   ::decode(reqid, bl);
2249   DECODE_FINISH(bl);
2250 }
2251
2252 void ECommitted::dump(Formatter *f) const {
2253   f->dump_stream("stamp") << stamp;
2254   f->dump_stream("reqid") << reqid;
2255 }
2256
2257 void ECommitted::generate_test_instances(list<ECommitted*>& ls)
2258 {
2259   ls.push_back(new ECommitted);
2260   ls.push_back(new ECommitted);
2261   ls.back()->stamp = utime_t(1, 2);
2262   ls.back()->reqid = metareqid_t(entity_name_t::CLIENT(123), 456);
2263 }
2264
2265 // -----------------------
2266 // ESlaveUpdate
2267
2268 void link_rollback::encode(bufferlist &bl) const
2269 {
2270   ENCODE_START(2, 2, bl);
2271   ::encode(reqid, bl);
2272   ::encode(ino, bl);
2273   ::encode(was_inc, bl);
2274   ::encode(old_ctime, bl);
2275   ::encode(old_dir_mtime, bl);
2276   ::encode(old_dir_rctime, bl);
2277   ENCODE_FINISH(bl);
2278 }
2279
2280 void link_rollback::decode(bufferlist::iterator &bl)
2281 {
2282   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2283   ::decode(reqid, bl);
2284   ::decode(ino, bl);
2285   ::decode(was_inc, bl);
2286   ::decode(old_ctime, bl);
2287   ::decode(old_dir_mtime, bl);
2288   ::decode(old_dir_rctime, bl);
2289   DECODE_FINISH(bl);
2290 }
2291
2292 void link_rollback::dump(Formatter *f) const
2293 {
2294   f->dump_stream("metareqid") << reqid;
2295   f->dump_int("ino", ino);
2296   f->dump_string("was incremented", was_inc ? "true" : "false");
2297   f->dump_stream("old_ctime") << old_ctime;
2298   f->dump_stream("old_dir_mtime") << old_dir_mtime;
2299   f->dump_stream("old_dir_rctime") << old_dir_rctime;
2300 }
2301
2302 void link_rollback::generate_test_instances(list<link_rollback*>& ls)
2303 {
2304   ls.push_back(new link_rollback());
2305 }
2306
2307 void rmdir_rollback::encode(bufferlist& bl) const
2308 {
2309   ENCODE_START(2, 2, bl);
2310   ::encode(reqid, bl);
2311   ::encode(src_dir, bl);
2312   ::encode(src_dname, bl);
2313   ::encode(dest_dir, bl);
2314   ::encode(dest_dname, bl);
2315   ENCODE_FINISH(bl);
2316 }
2317
2318 void rmdir_rollback::decode(bufferlist::iterator& bl)
2319 {
2320   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2321   ::decode(reqid, bl);
2322   ::decode(src_dir, bl);
2323   ::decode(src_dname, bl);
2324   ::decode(dest_dir, bl);
2325   ::decode(dest_dname, bl);
2326   DECODE_FINISH(bl);
2327 }
2328
2329 void rmdir_rollback::dump(Formatter *f) const
2330 {
2331   f->dump_stream("metareqid") << reqid;
2332   f->dump_stream("source directory") << src_dir;
2333   f->dump_string("source dname", src_dname);
2334   f->dump_stream("destination directory") << dest_dir;
2335   f->dump_string("destination dname", dest_dname);
2336 }
2337
2338 void rmdir_rollback::generate_test_instances(list<rmdir_rollback*>& ls)
2339 {
2340   ls.push_back(new rmdir_rollback());
2341 }
2342
2343 void rename_rollback::drec::encode(bufferlist &bl) const
2344 {
2345   ENCODE_START(2, 2, bl);
2346   ::encode(dirfrag, bl);
2347   ::encode(dirfrag_old_mtime, bl);
2348   ::encode(dirfrag_old_rctime, bl);
2349   ::encode(ino, bl);
2350   ::encode(remote_ino, bl);
2351   ::encode(dname, bl);
2352   ::encode(remote_d_type, bl);
2353   ::encode(old_ctime, bl);
2354   ENCODE_FINISH(bl);
2355 }
2356
2357 void rename_rollback::drec::decode(bufferlist::iterator &bl)
2358 {
2359   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2360   ::decode(dirfrag, bl);
2361   ::decode(dirfrag_old_mtime, bl);
2362   ::decode(dirfrag_old_rctime, bl);
2363   ::decode(ino, bl);
2364   ::decode(remote_ino, bl);
2365   ::decode(dname, bl);
2366   ::decode(remote_d_type, bl);
2367   ::decode(old_ctime, bl);
2368   DECODE_FINISH(bl);
2369 }
2370
2371 void rename_rollback::drec::dump(Formatter *f) const
2372 {
2373   f->dump_stream("directory fragment") << dirfrag;
2374   f->dump_stream("directory old mtime") << dirfrag_old_mtime;
2375   f->dump_stream("directory old rctime") << dirfrag_old_rctime;
2376   f->dump_int("ino", ino);
2377   f->dump_int("remote ino", remote_ino);
2378   f->dump_string("dname", dname);
2379   uint32_t type = DTTOIF(remote_d_type) & S_IFMT; // convert to type entries
2380   string type_string;
2381   switch(type) {
2382   case S_IFREG:
2383     type_string = "file"; break;
2384   case S_IFLNK:
2385     type_string = "symlink"; break;
2386   case S_IFDIR:
2387     type_string = "directory"; break;
2388   default:
2389     type_string = "UNKNOWN-" + stringify((int)type); break;
2390   }
2391   f->dump_string("remote dtype", type_string);
2392   f->dump_stream("old ctime") << old_ctime;
2393 }
2394
2395 void rename_rollback::drec::generate_test_instances(list<drec*>& ls)
2396 {
2397   ls.push_back(new drec());
2398   ls.back()->remote_d_type = IFTODT(S_IFREG);
2399 }
2400
2401 void rename_rollback::encode(bufferlist &bl) const
2402 {
2403   ENCODE_START(2, 2, bl);
2404   ::encode(reqid, bl);
2405   encode(orig_src, bl);
2406   encode(orig_dest, bl);
2407   encode(stray, bl);
2408   ::encode(ctime, bl);
2409   ENCODE_FINISH(bl);
2410 }
2411
2412 void rename_rollback::decode(bufferlist::iterator &bl)
2413 {
2414   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2415   ::decode(reqid, bl);
2416   decode(orig_src, bl);
2417   decode(orig_dest, bl);
2418   decode(stray, bl);
2419   ::decode(ctime, bl);
2420   DECODE_FINISH(bl);
2421 }
2422
2423 void rename_rollback::dump(Formatter *f) const
2424 {
2425   f->dump_stream("request id") << reqid;
2426   f->open_object_section("original src drec");
2427   orig_src.dump(f);
2428   f->close_section(); // original src drec
2429   f->open_object_section("original dest drec");
2430   orig_dest.dump(f);
2431   f->close_section(); // original dest drec
2432   f->open_object_section("stray drec");
2433   stray.dump(f);
2434   f->close_section(); // stray drec
2435   f->dump_stream("ctime") << ctime;
2436 }
2437
2438 void rename_rollback::generate_test_instances(list<rename_rollback*>& ls)
2439 {
2440   ls.push_back(new rename_rollback());
2441   ls.back()->orig_src.remote_d_type = IFTODT(S_IFREG);
2442   ls.back()->orig_dest.remote_d_type = IFTODT(S_IFREG);
2443   ls.back()->stray.remote_d_type = IFTODT(S_IFREG);
2444 }
2445
2446 void ESlaveUpdate::encode(bufferlist &bl, uint64_t features) const
2447 {
2448   ENCODE_START(3, 3, bl);
2449   ::encode(stamp, bl);
2450   ::encode(type, bl);
2451   ::encode(reqid, bl);
2452   ::encode(master, bl);
2453   ::encode(op, bl);
2454   ::encode(origop, bl);
2455   ::encode(commit, bl, features);
2456   ::encode(rollback, bl);
2457   ENCODE_FINISH(bl);
2458
2459
2460 void ESlaveUpdate::decode(bufferlist::iterator &bl)
2461 {
2462   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2463   if (struct_v >= 2)
2464     ::decode(stamp, bl);
2465   ::decode(type, bl);
2466   ::decode(reqid, bl);
2467   ::decode(master, bl);
2468   ::decode(op, bl);
2469   ::decode(origop, bl);
2470   ::decode(commit, bl);
2471   ::decode(rollback, bl);
2472   DECODE_FINISH(bl);
2473 }
2474
2475 void ESlaveUpdate::dump(Formatter *f) const
2476 {
2477   f->open_object_section("metablob");
2478   commit.dump(f);
2479   f->close_section(); // metablob
2480
2481   f->dump_int("rollback length", rollback.length());
2482   f->dump_string("type", type);
2483   f->dump_stream("metareqid") << reqid;
2484   f->dump_int("master", master);
2485   f->dump_int("op", op);
2486   f->dump_int("original op", origop);
2487 }
2488
2489 void ESlaveUpdate::generate_test_instances(list<ESlaveUpdate*>& ls)
2490 {
2491   ls.push_back(new ESlaveUpdate());
2492 }
2493
2494
2495 void ESlaveUpdate::replay(MDSRank *mds)
2496 {
2497   MDSlaveUpdate *su;
2498   switch (op) {
2499   case ESlaveUpdate::OP_PREPARE:
2500     dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master 
2501              << ": applying commit, saving rollback info" << dendl;
2502     su = new MDSlaveUpdate(origop, rollback, _segment->slave_updates);
2503     commit.replay(mds, _segment, su);
2504     mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
2505     break;
2506
2507   case ESlaveUpdate::OP_COMMIT:
2508     su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
2509     if (su) {
2510       dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
2511       mds->mdcache->finish_uncommitted_slave_update(reqid, master);
2512     } else {
2513       dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master 
2514                << ": ignoring, no previously saved prepare" << dendl;
2515     }
2516     break;
2517
2518   case ESlaveUpdate::OP_ROLLBACK:
2519     dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
2520              << ": applying rollback commit blob" << dendl;
2521     commit.replay(mds, _segment);
2522     su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
2523     if (su)
2524       mds->mdcache->finish_uncommitted_slave_update(reqid, master);
2525     break;
2526
2527   default:
2528     mds->clog->error() << "invalid op in ESlaveUpdate";
2529     mds->damaged();
2530     ceph_abort();  // Should be unreachable because damaged() calls respawn()
2531   }
2532 }
2533
2534
2535 // -----------------------
2536 // ESubtreeMap
2537
2538 void ESubtreeMap::encode(bufferlist& bl, uint64_t features) const
2539 {
2540   ENCODE_START(6, 5, bl);
2541   ::encode(stamp, bl);
2542   ::encode(metablob, bl, features);
2543   ::encode(subtrees, bl);
2544   ::encode(ambiguous_subtrees, bl);
2545   ::encode(expire_pos, bl);
2546   ::encode(event_seq, bl);
2547   ENCODE_FINISH(bl);
2548 }
2549  
2550 void ESubtreeMap::decode(bufferlist::iterator &bl)
2551 {
2552   DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
2553   if (struct_v >= 2)
2554     ::decode(stamp, bl);
2555   ::decode(metablob, bl);
2556   ::decode(subtrees, bl);
2557   if (struct_v >= 4)
2558     ::decode(ambiguous_subtrees, bl);
2559   if (struct_v >= 3)
2560     ::decode(expire_pos, bl);
2561   if (struct_v >= 6)
2562     ::decode(event_seq, bl);
2563   DECODE_FINISH(bl);
2564 }
2565
2566 void ESubtreeMap::dump(Formatter *f) const
2567 {
2568   f->open_object_section("metablob");
2569   metablob.dump(f);
2570   f->close_section(); // metablob
2571   
2572   f->open_array_section("subtrees");
2573   for(map<dirfrag_t,vector<dirfrag_t> >::const_iterator i = subtrees.begin();
2574       i != subtrees.end(); ++i) {
2575     f->open_object_section("tree");
2576     f->dump_stream("root dirfrag") << i->first;
2577     for (vector<dirfrag_t>::const_iterator j = i->second.begin();
2578          j != i->second.end(); ++j) {
2579       f->dump_stream("bound dirfrag") << *j;
2580     }
2581     f->close_section(); // tree
2582   }
2583   f->close_section(); // subtrees
2584
2585   f->open_array_section("ambiguous subtrees");
2586   for(set<dirfrag_t>::const_iterator i = ambiguous_subtrees.begin();
2587       i != ambiguous_subtrees.end(); ++i) {
2588     f->dump_stream("dirfrag") << *i;
2589   }
2590   f->close_section(); // ambiguous subtrees
2591
2592   f->dump_int("expire position", expire_pos);
2593 }
2594
2595 void ESubtreeMap::generate_test_instances(list<ESubtreeMap*>& ls)
2596 {
2597   ls.push_back(new ESubtreeMap());
2598 }
2599
2600 void ESubtreeMap::replay(MDSRank *mds) 
2601 {
2602   if (expire_pos && expire_pos > mds->mdlog->journaler->get_expire_pos())
2603     mds->mdlog->journaler->set_expire_pos(expire_pos);
2604
2605   // suck up the subtree map?
2606   if (mds->mdcache->is_subtrees()) {
2607     dout(10) << "ESubtreeMap.replay -- i already have import map; verifying" << dendl;
2608     int errors = 0;
2609
2610     for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
2611          p != subtrees.end();
2612          ++p) {
2613       CDir *dir = mds->mdcache->get_dirfrag(p->first);
2614       if (!dir) {
2615         mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2616                           << " subtree root " << p->first << " not in cache";
2617         ++errors;
2618         continue;
2619       }
2620       
2621       if (!mds->mdcache->is_subtree(dir)) {
2622         mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2623                           << " subtree root " << p->first << " not a subtree in cache";
2624         ++errors;
2625         continue;
2626       }
2627       if (dir->get_dir_auth().first != mds->get_nodeid()) {
2628         mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2629                           << " subtree root " << p->first
2630                           << " is not mine in cache (it's " << dir->get_dir_auth() << ")";
2631         ++errors;
2632         continue;
2633       }
2634
2635       for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
2636         mds->mdcache->get_force_dirfrag(*q, true);
2637
2638       set<CDir*> bounds;
2639       mds->mdcache->get_subtree_bounds(dir, bounds);
2640       for (vector<dirfrag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q) {
2641         CDir *b = mds->mdcache->get_dirfrag(*q);
2642         if (!b) {
2643           mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2644                             << " subtree " << p->first << " bound " << *q << " not in cache";
2645         ++errors;
2646           continue;
2647         }
2648         if (bounds.count(b) == 0) {
2649           mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2650                             << " subtree " << p->first << " bound " << *q << " not a bound in cache";
2651         ++errors;
2652           continue;
2653         }
2654         bounds.erase(b);
2655       }
2656       for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q) {
2657         mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2658                           << " subtree " << p->first << " has extra bound in cache " << (*q)->dirfrag();
2659         ++errors;
2660       }
2661       
2662       if (ambiguous_subtrees.count(p->first)) {
2663         if (!mds->mdcache->have_ambiguous_import(p->first)) {
2664           mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2665                             << " subtree " << p->first << " is ambiguous but is not in our cache";
2666           ++errors;
2667         }
2668       } else {
2669         if (mds->mdcache->have_ambiguous_import(p->first)) {
2670           mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2671                             << " subtree " << p->first << " is not ambiguous but is in our cache";
2672           ++errors;
2673         }
2674       }
2675     }
2676     
2677     list<CDir*> subs;
2678     mds->mdcache->list_subtrees(subs);
2679     for (list<CDir*>::iterator p = subs.begin(); p != subs.end(); ++p) {
2680       CDir *dir = *p;
2681       if (dir->get_dir_auth().first != mds->get_nodeid())
2682         continue;
2683       if (subtrees.count(dir->dirfrag()) == 0) {
2684         mds->clog->error() << " replayed ESubtreeMap at " << get_start_off()
2685                           << " does not include cache subtree " << dir->dirfrag();
2686         ++errors;
2687       }
2688     }
2689
2690     if (errors) {
2691       dout(0) << "journal subtrees: " << subtrees << dendl;
2692       dout(0) << "journal ambig_subtrees: " << ambiguous_subtrees << dendl;
2693       mds->mdcache->show_subtrees();
2694       assert(!g_conf->mds_debug_subtrees || errors == 0);
2695     }
2696     return;
2697   }
2698
2699   dout(10) << "ESubtreeMap.replay -- reconstructing (auth) subtree spanning tree" << dendl;
2700   
2701   // first, stick the spanning tree in my cache
2702   //metablob.print(*_dout);
2703   metablob.replay(mds, _segment);
2704   
2705   // restore import/export maps
2706   for (map<dirfrag_t, vector<dirfrag_t> >::iterator p = subtrees.begin();
2707        p != subtrees.end();
2708        ++p) {
2709     CDir *dir = mds->mdcache->get_dirfrag(p->first);
2710     assert(dir);
2711     if (ambiguous_subtrees.count(p->first)) {
2712       // ambiguous!
2713       mds->mdcache->add_ambiguous_import(p->first, p->second);
2714       mds->mdcache->adjust_bounded_subtree_auth(dir, p->second,
2715                                                 mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
2716     } else {
2717       // not ambiguous
2718       mds->mdcache->adjust_bounded_subtree_auth(dir, p->second, mds->get_nodeid());
2719     }
2720   }
2721
2722   mds->mdcache->recalc_auth_bits(true);
2723
2724   mds->mdcache->show_subtrees();
2725 }
2726
2727
2728
2729 // -----------------------
2730 // EFragment
2731
2732 void EFragment::replay(MDSRank *mds)
2733 {
2734   dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl;
2735
2736   list<CDir*> resultfrags;
2737   list<MDSInternalContextBase*> waiters;
2738   list<frag_t> old_frags;
2739
2740   // in may be NULL if it wasn't in our cache yet.  if it's a prepare
2741   // it will be once we replay the metablob , but first we need to
2742   // refragment anything we already have in the cache.
2743   CInode *in = mds->mdcache->get_inode(ino);
2744
2745   switch (op) {
2746   case OP_PREPARE:
2747     mds->mdcache->add_uncommitted_fragment(dirfrag_t(ino, basefrag), bits, orig_frags, _segment, &rollback);
2748
2749     if (in)
2750       mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true);
2751     break;
2752
2753   case OP_ROLLBACK:
2754     if (in) {
2755       in->dirfragtree.get_leaves_under(basefrag, old_frags);
2756       if (orig_frags.empty()) {
2757         // old format EFragment
2758         mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true);
2759       } else {
2760         for (list<frag_t>::iterator p = orig_frags.begin(); p != orig_frags.end(); ++p)
2761           mds->mdcache->force_dir_fragment(in, *p);
2762       }
2763     }
2764     mds->mdcache->rollback_uncommitted_fragment(dirfrag_t(ino, basefrag), old_frags);
2765     break;
2766
2767   case OP_COMMIT:
2768   case OP_FINISH:
2769     mds->mdcache->finish_uncommitted_fragment(dirfrag_t(ino, basefrag), op);
2770     break;
2771
2772   default:
2773     ceph_abort();
2774   }
2775
2776   metablob.replay(mds, _segment);
2777   if (in && g_conf->mds_debug_frag)
2778     in->verify_dirfrags();
2779 }
2780
2781 void EFragment::encode(bufferlist &bl, uint64_t features) const {
2782   ENCODE_START(5, 4, bl);
2783   ::encode(stamp, bl);
2784   ::encode(op, bl);
2785   ::encode(ino, bl);
2786   ::encode(basefrag, bl);
2787   ::encode(bits, bl);
2788   ::encode(metablob, bl, features);
2789   ::encode(orig_frags, bl);
2790   ::encode(rollback, bl);
2791   ENCODE_FINISH(bl);
2792 }
2793
2794 void EFragment::decode(bufferlist::iterator &bl) {
2795   DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
2796   if (struct_v >= 2)
2797     ::decode(stamp, bl);
2798   if (struct_v >= 3)
2799     ::decode(op, bl);
2800   ::decode(ino, bl);
2801   ::decode(basefrag, bl);
2802   ::decode(bits, bl);
2803   ::decode(metablob, bl);
2804   if (struct_v >= 5) {
2805     ::decode(orig_frags, bl);
2806     ::decode(rollback, bl);
2807   }
2808   DECODE_FINISH(bl);
2809 }
2810
2811 void EFragment::dump(Formatter *f) const
2812 {
2813   /*f->open_object_section("Metablob");
2814   metablob.dump(f); // sadly we don't have this; dunno if we'll get it
2815   f->close_section();*/
2816   f->dump_string("op", op_name(op));
2817   f->dump_stream("ino") << ino;
2818   f->dump_stream("base frag") << basefrag;
2819   f->dump_int("bits", bits);
2820 }
2821
2822 void EFragment::generate_test_instances(list<EFragment*>& ls)
2823 {
2824   ls.push_back(new EFragment);
2825   ls.push_back(new EFragment);
2826   ls.back()->op = OP_PREPARE;
2827   ls.back()->ino = 1;
2828   ls.back()->bits = 5;
2829 }
2830
2831 void dirfrag_rollback::encode(bufferlist &bl) const
2832 {
2833   ENCODE_START(1, 1, bl);
2834   ::encode(fnode, bl);
2835   ENCODE_FINISH(bl);
2836 }
2837
2838 void dirfrag_rollback::decode(bufferlist::iterator &bl)
2839 {
2840   DECODE_START(1, bl);
2841   ::decode(fnode, bl);
2842   DECODE_FINISH(bl);
2843 }
2844
2845
2846
2847 // =========================================================================
2848
2849 // -----------------------
2850 // EExport
2851
2852 void EExport::replay(MDSRank *mds)
2853 {
2854   dout(10) << "EExport.replay " << base << dendl;
2855   metablob.replay(mds, _segment);
2856   
2857   CDir *dir = mds->mdcache->get_dirfrag(base);
2858   assert(dir);
2859   
2860   set<CDir*> realbounds;
2861   for (set<dirfrag_t>::iterator p = bounds.begin();
2862        p != bounds.end();
2863        ++p) {
2864     CDir *bd = mds->mdcache->get_dirfrag(*p);
2865     assert(bd);
2866     realbounds.insert(bd);
2867   }
2868
2869   // adjust auth away
2870   mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds, CDIR_AUTH_UNDEF);
2871
2872   mds->mdcache->try_trim_non_auth_subtree(dir);
2873 }
2874
2875 void EExport::encode(bufferlist& bl, uint64_t features) const
2876 {
2877   ENCODE_START(4, 3, bl);
2878   ::encode(stamp, bl);
2879   ::encode(metablob, bl, features);
2880   ::encode(base, bl);
2881   ::encode(bounds, bl);
2882   ::encode(target, bl);
2883   ENCODE_FINISH(bl);
2884 }
2885
2886 void EExport::decode(bufferlist::iterator &bl)
2887 {
2888   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2889   if (struct_v >= 2)
2890     ::decode(stamp, bl);
2891   ::decode(metablob, bl);
2892   ::decode(base, bl);
2893   ::decode(bounds, bl);
2894   if (struct_v >= 4)
2895     ::decode(target, bl);
2896   DECODE_FINISH(bl);
2897 }
2898
2899 void EExport::dump(Formatter *f) const
2900 {
2901   f->dump_float("stamp", (double)stamp);
2902   /*f->open_object_section("Metablob");
2903   metablob.dump(f); // sadly we don't have this; dunno if we'll get it
2904   f->close_section();*/
2905   f->dump_stream("base dirfrag") << base;
2906   f->open_array_section("bounds dirfrags");
2907   for (set<dirfrag_t>::const_iterator i = bounds.begin();
2908       i != bounds.end(); ++i) {
2909     f->dump_stream("dirfrag") << *i;
2910   }
2911   f->close_section(); // bounds dirfrags
2912 }
2913
2914 void EExport::generate_test_instances(list<EExport*>& ls)
2915 {
2916   EExport *sample = new EExport();
2917   ls.push_back(sample);
2918 }
2919
2920
2921 // -----------------------
2922 // EImportStart
2923
2924 void EImportStart::update_segment()
2925 {
2926   _segment->sessionmapv = cmapv;
2927 }
2928
2929 void EImportStart::replay(MDSRank *mds)
2930 {
2931   dout(10) << "EImportStart.replay " << base << " bounds " << bounds << dendl;
2932   //metablob.print(*_dout);
2933   metablob.replay(mds, _segment);
2934
2935   // put in ambiguous import list
2936   mds->mdcache->add_ambiguous_import(base, bounds);
2937
2938   // set auth partially to us so we don't trim it
2939   CDir *dir = mds->mdcache->get_dirfrag(base);
2940   assert(dir);
2941
2942   set<CDir*> realbounds;
2943   for (vector<dirfrag_t>::iterator p = bounds.begin();
2944        p != bounds.end();
2945        ++p) {
2946     CDir *bd = mds->mdcache->get_dirfrag(*p);
2947     assert(bd);
2948     if (!bd->is_subtree_root())
2949       bd->state_clear(CDir::STATE_AUTH);
2950     realbounds.insert(bd);
2951   }
2952
2953   mds->mdcache->adjust_bounded_subtree_auth(dir, realbounds,
2954                                             mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
2955
2956   // open client sessions?
2957   if (mds->sessionmap.get_version() >= cmapv) {
2958     dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() 
2959              << " >= " << cmapv << ", noop" << dendl;
2960   } else {
2961     dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() 
2962              << " < " << cmapv << dendl;
2963     map<client_t,entity_inst_t> cm;
2964     bufferlist::iterator blp = client_map.begin();
2965     ::decode(cm, blp);
2966     mds->sessionmap.open_sessions(cm);
2967     assert(mds->sessionmap.get_version() == cmapv);
2968     mds->sessionmap.set_projected(mds->sessionmap.get_version());
2969   }
2970   update_segment();
2971 }
2972
2973 void EImportStart::encode(bufferlist &bl, uint64_t features) const {
2974   ENCODE_START(4, 3, bl);
2975   ::encode(stamp, bl);
2976   ::encode(base, bl);
2977   ::encode(metablob, bl, features);
2978   ::encode(bounds, bl);
2979   ::encode(cmapv, bl);
2980   ::encode(client_map, bl);
2981   ::encode(from, bl);
2982   ENCODE_FINISH(bl);
2983 }
2984
2985 void EImportStart::decode(bufferlist::iterator &bl) {
2986   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
2987   if (struct_v >= 2)
2988     ::decode(stamp, bl);
2989   ::decode(base, bl);
2990   ::decode(metablob, bl);
2991   ::decode(bounds, bl);
2992   ::decode(cmapv, bl);
2993   ::decode(client_map, bl);
2994   if (struct_v >= 4)
2995     ::decode(from, bl);
2996   DECODE_FINISH(bl);
2997 }
2998
2999 void EImportStart::dump(Formatter *f) const
3000 {
3001   f->dump_stream("base dirfrag") << base;
3002   f->open_array_section("boundary dirfrags");
3003   for (vector<dirfrag_t>::const_iterator iter = bounds.begin();
3004       iter != bounds.end(); ++iter) {
3005     f->dump_stream("frag") << *iter;
3006   }
3007   f->close_section();
3008 }
3009
3010 void EImportStart::generate_test_instances(list<EImportStart*>& ls)
3011 {
3012   ls.push_back(new EImportStart);
3013 }
3014
3015 // -----------------------
3016 // EImportFinish
3017
3018 void EImportFinish::replay(MDSRank *mds)
3019 {
3020   if (mds->mdcache->have_ambiguous_import(base)) {
3021     dout(10) << "EImportFinish.replay " << base << " success=" << success << dendl;
3022     if (success) {
3023       mds->mdcache->finish_ambiguous_import(base);
3024     } else {
3025       CDir *dir = mds->mdcache->get_dirfrag(base);
3026       assert(dir);
3027       vector<dirfrag_t> bounds;
3028       mds->mdcache->get_ambiguous_import_bounds(base, bounds);
3029       mds->mdcache->adjust_bounded_subtree_auth(dir, bounds, CDIR_AUTH_UNDEF);
3030       mds->mdcache->cancel_ambiguous_import(dir);
3031       mds->mdcache->try_trim_non_auth_subtree(dir);
3032    }
3033   } else {
3034     // this shouldn't happen unless this is an old journal
3035     dout(10) << "EImportFinish.replay " << base << " success=" << success
3036              << " on subtree not marked as ambiguous" 
3037              << dendl;
3038     mds->clog->error() << "failure replaying journal (EImportFinish)";
3039     mds->damaged();
3040     ceph_abort();  // Should be unreachable because damaged() calls respawn()
3041   }
3042 }
3043
3044 void EImportFinish::encode(bufferlist& bl, uint64_t features) const
3045 {
3046   ENCODE_START(3, 3, bl);
3047   ::encode(stamp, bl);
3048   ::encode(base, bl);
3049   ::encode(success, bl);
3050   ENCODE_FINISH(bl);
3051 }
3052
3053 void EImportFinish::decode(bufferlist::iterator &bl)
3054 {
3055   DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
3056   if (struct_v >= 2)
3057     ::decode(stamp, bl);
3058   ::decode(base, bl);
3059   ::decode(success, bl);
3060   DECODE_FINISH(bl);
3061 }
3062
3063 void EImportFinish::dump(Formatter *f) const
3064 {
3065   f->dump_stream("base dirfrag") << base;
3066   f->dump_string("success", success ? "true" : "false");
3067 }
3068 void EImportFinish::generate_test_instances(list<EImportFinish*>& ls)
3069 {
3070   ls.push_back(new EImportFinish);
3071   ls.push_back(new EImportFinish);
3072   ls.back()->success = true;
3073 }
3074
3075
3076 // ------------------------
3077 // EResetJournal
3078
3079 void EResetJournal::encode(bufferlist& bl, uint64_t features) const
3080 {
3081   ENCODE_START(2, 2, bl);
3082   ::encode(stamp, bl);
3083   ENCODE_FINISH(bl);
3084 }
3085  
3086 void EResetJournal::decode(bufferlist::iterator &bl)
3087 {
3088   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
3089   ::decode(stamp, bl);
3090   DECODE_FINISH(bl);
3091 }
3092
3093 void EResetJournal::dump(Formatter *f) const
3094 {
3095   f->dump_stream("timestamp") << stamp;
3096 }
3097
3098 void EResetJournal::generate_test_instances(list<EResetJournal*>& ls)
3099 {
3100   ls.push_back(new EResetJournal());
3101 }
3102
3103 void EResetJournal::replay(MDSRank *mds)
3104 {
3105   dout(1) << "EResetJournal" << dendl;
3106
3107   mds->sessionmap.wipe();
3108   mds->inotable->replay_reset();
3109
3110   if (mds->mdsmap->get_root() == mds->get_nodeid()) {
3111     CDir *rootdir = mds->mdcache->get_root()->get_or_open_dirfrag(mds->mdcache, frag_t());
3112     mds->mdcache->adjust_subtree_auth(rootdir, mds->get_nodeid());   
3113   }
3114
3115   CDir *mydir = mds->mdcache->get_myin()->get_or_open_dirfrag(mds->mdcache, frag_t());
3116   mds->mdcache->adjust_subtree_auth(mydir, mds->get_nodeid());   
3117
3118   mds->mdcache->recalc_auth_bits(true);
3119
3120   mds->mdcache->show_subtrees();
3121 }
3122
3123
3124 void ENoOp::encode(bufferlist &bl, uint64_t features) const
3125 {
3126   ENCODE_START(2, 2, bl);
3127   ::encode(pad_size, bl);
3128   uint8_t const pad = 0xff;
3129   for (unsigned int i = 0; i < pad_size; ++i) {
3130     ::encode(pad, bl);
3131   }
3132   ENCODE_FINISH(bl);
3133 }
3134
3135
3136 void ENoOp::decode(bufferlist::iterator &bl)
3137 {
3138   DECODE_START(2, bl);
3139   ::decode(pad_size, bl);
3140   if (bl.get_remaining() != pad_size) {
3141     // This is spiritually an assertion, but expressing in a way that will let
3142     // journal debug tools catch it and recognise a malformed entry.
3143     throw buffer::end_of_buffer();
3144   } else {
3145     bl.advance(pad_size);
3146   }
3147   DECODE_FINISH(bl);
3148 }
3149
3150
3151 void ENoOp::replay(MDSRank *mds)
3152 {
3153   dout(4) << "ENoOp::replay, " << pad_size << " bytes skipped in journal" << dendl;
3154 }
3155
3156 /**
3157  * If re-formatting an old journal that used absolute log position
3158  * references as segment sequence numbers, use this function to update
3159  * it.
3160  *
3161  * @param mds
3162  * MDSRank instance, just used for logging
3163  * @param old_to_new
3164  * Map of old journal segment sequence numbers to new journal segment sequence numbers
3165  *
3166  * @return
3167  * True if the event was modified.
3168  */
3169 bool EMetaBlob::rewrite_truncate_finish(MDSRank const *mds,
3170     std::map<log_segment_seq_t, log_segment_seq_t> const &old_to_new)
3171 {
3172   bool modified = false;
3173   map<inodeno_t, log_segment_seq_t> new_trunc_finish;
3174   for (std::map<inodeno_t, log_segment_seq_t>::iterator i = truncate_finish.begin();
3175       i != truncate_finish.end(); ++i) {
3176     if (old_to_new.count(i->second)) {
3177       dout(20) << __func__ << " applying segment seq mapping "
3178         << i->second << " -> " << old_to_new.find(i->second)->second << dendl;
3179       new_trunc_finish[i->first] = old_to_new.find(i->second)->second;
3180       modified = true;
3181     } else {
3182       dout(20) << __func__ << " no segment seq mapping found for "
3183         << i->second << dendl;
3184       new_trunc_finish[i->first] = i->second;
3185     }
3186   }
3187   truncate_finish = new_trunc_finish;
3188
3189   return modified;
3190 }