Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / cephfs / JournalTool.cc
1 // -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * ceph - scalable distributed file system
5  *
6  * copyright (c) 2014 john spray <john.spray@inktank.com>
7  *
8  * this is free software; you can redistribute it and/or
9  * modify it under the terms of the gnu lesser general public
10  * license version 2.1, as published by the free software
11  * foundation.  see file copying.
12  */
13
14
15 #include <sstream>
16
17 #include "common/ceph_argparse.h"
18 #include "common/errno.h"
19 #include "osdc/Journaler.h"
20 #include "mds/mdstypes.h"
21 #include "mds/LogEvent.h"
22 #include "mds/InoTable.h"
23
24 #include "mds/events/ENoOp.h"
25 #include "mds/events/EUpdate.h"
26
27 #include "JournalScanner.h"
28 #include "EventOutput.h"
29 #include "Dumper.h"
30 #include "Resetter.h"
31
32 #include "JournalTool.h"
33
34
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mds
37 #undef dout_prefix
38 #define dout_prefix *_dout << __func__ << ": "
39
40
41
42 void JournalTool::usage()
43 {
44   std::cout << "Usage: \n"
45     << "  cephfs-journal-tool [options] journal <command>\n"
46     << "    <command>:\n"
47     << "      inspect\n"
48     << "      import <path>\n"
49     << "      export <path>\n"
50     << "      reset [--force]\n"
51     << "  cephfs-journal-tool [options] header <get|set <field> <value>\n"
52     << "  cephfs-journal-tool [options] event <effect> <selector> <output> [special options]\n"
53     << "    <selector>:\n"
54     << "      --range=<start>..<end>\n"
55     << "      --path=<substring>\n"
56     << "      --inode=<integer>\n"
57     << "      --type=<UPDATE|OPEN|SESSION...><\n"
58     << "      --frag=<ino>.<frag> [--dname=<dentry string>]\n"
59     << "      --client=<session id integer>\n"
60     << "    <effect>: [get|recover_dentries|splice]\n"
61     << "    <output>: [summary|list|binary|json] [--path <path>]\n"
62     << "\n"
63     << "General options:\n"
64     << "  --rank=filesystem:mds-rank  Journal rank (required if multiple\n"
65     << "                              file systems, default is rank 0 on\n"
66     << "                              the only filesystem otherwise.\n"
67     << "\n"
68     << "Special options\n"
69     << "  --alternate-pool <name>     Alternative metadata pool to target\n"
70     << "                              when using recover_dentries.\n";
71
72   generic_client_usage();
73 }
74
75
76 /**
77  * Handle arguments and hand off to journal/header/event mode
78  */
79 int JournalTool::main(std::vector<const char*> &argv)
80 {
81   int r;
82
83   dout(10) << "JournalTool::main " << dendl;
84   // Common arg parsing
85   // ==================
86   if (argv.empty()) {
87     usage();
88     return -EINVAL;
89   }
90
91   std::vector<const char*>::iterator arg = argv.begin();
92
93   std::string rank_str;
94   if(!ceph_argparse_witharg(argv, arg, &rank_str, "--rank", (char*)NULL)) {
95     // Default: act on rank 0.  Will give the user an error if they
96     // try invoking this way when they have more than one filesystem.
97     rank_str = "0";
98   }
99
100   r = role_selector.parse(*fsmap, rank_str);
101   if (r != 0) {
102     derr << "Couldn't determine MDS rank." << dendl;
103     return r;
104   }
105
106   std::string mode;
107   if (arg == argv.end()) {
108     derr << "Missing mode [journal|header|event]" << dendl;
109     return -EINVAL;
110   }
111   mode = std::string(*arg);
112   arg = argv.erase(arg);
113
114   // RADOS init
115   // ==========
116   r = rados.init_with_context(g_ceph_context);
117   if (r < 0) {
118     derr << "RADOS unavailable, cannot scan filesystem journal" << dendl;
119     return r;
120   }
121
122   dout(4) << "JournalTool: connecting to RADOS..." << dendl;
123   r = rados.connect();
124   if (r < 0) {
125     derr << "couldn't connect to cluster: " << cpp_strerror(r) << dendl;
126     return r;
127   }
128  
129   auto fs = fsmap->get_filesystem(role_selector.get_ns());
130   assert(fs != nullptr);
131   int64_t const pool_id = fs->mds_map.get_metadata_pool();
132   dout(4) << "JournalTool: resolving pool " << pool_id << dendl;
133   std::string pool_name;
134   r = rados.pool_reverse_lookup(pool_id, &pool_name);
135   if (r < 0) {
136     derr << "Pool " << pool_id << " named in MDS map not found in RADOS!" << dendl;
137     return r;
138   }
139
140   dout(4) << "JournalTool: creating IoCtx.." << dendl;
141   r = rados.ioctx_create(pool_name.c_str(), input);
142   assert(r == 0);
143   output.dup(input);
144
145   // Execution
146   // =========
147   for (auto role : role_selector.get_roles()) {
148     rank = role.rank;
149     dout(4) << "Executing for rank " << rank << dendl;
150     if (mode == std::string("journal")) {
151       r = main_journal(argv);
152     } else if (mode == std::string("header")) {
153       r = main_header(argv);
154     } else if (mode == std::string("event")) {
155       r = main_event(argv);
156     } else {
157       derr << "Bad command '" << mode << "'" << dendl;
158       usage();
159       return -EINVAL;
160     }
161
162     if (r != 0) {
163       return r;
164     }
165   }
166
167   return r;
168 }
169
170
171 /**
172  * Handle arguments for 'journal' mode
173  *
174  * This is for operations that act on the journal as a whole.
175  */
176 int JournalTool::main_journal(std::vector<const char*> &argv)
177 {
178   std::string command = argv[0];
179   if (command == "inspect") {
180     return journal_inspect();
181   } else if (command == "export" || command == "import") {
182     if (argv.size() >= 2) {
183       std::string const path = argv[1];
184       return journal_export(path, command == "import");
185     } else {
186       derr << "Missing path" << dendl;
187       return -EINVAL;
188     }
189   } else if (command == "reset") {
190     bool force = false;
191     if (argv.size() == 2) {
192       if (std::string(argv[1]) == "--force") {
193         force = true;
194       } else {
195         std::cerr << "Unknown argument " << argv[1] << std::endl;
196         usage();
197         return -EINVAL;
198       }
199     } else if (argv.size() > 2) {
200       std::cerr << "Too many arguments!" << std::endl;
201       usage();
202       return -EINVAL;
203     }
204     return journal_reset(force);
205   } else {
206     derr << "Bad journal command '" << command << "'" << dendl;
207     return -EINVAL;
208   }
209 }
210
211
212 /**
213  * Parse arguments and execute for 'header' mode
214  *
215  * This is for operations that act on the header only.
216  */
217 int JournalTool::main_header(std::vector<const char*> &argv)
218 {
219   JournalFilter filter;
220   JournalScanner js(input, rank, filter);
221   int r = js.scan(false);
222   if (r < 0) {
223     std::cerr << "Unable to scan journal" << std::endl;
224     return r;
225   }
226
227   if (!js.header_present) {
228     std::cerr << "Header object not found!" << std::endl;
229     return -ENOENT;
230   } else if (!js.header_valid && js.header == NULL) {
231     // Can't do a read or a single-field write without a copy of the original
232     derr << "Header could not be read!" << dendl;
233     return -ENOENT;
234   } else {
235     assert(js.header != NULL);
236   }
237
238   if (argv.size() == 0) {
239     derr << "Invalid header command, must be [get|set]" << dendl;
240     return -EINVAL;
241   }
242   std::vector<const char *>::iterator arg = argv.begin();
243   std::string const command = *arg;
244   arg = argv.erase(arg);
245
246   if (command == std::string("get")) {
247     // Write JSON journal dump to stdout
248     JSONFormatter jf(true);
249     js.header->dump(&jf);
250     jf.flush(std::cout);
251     std::cout << std::endl;
252   } else if (command == std::string("set")) {
253     // Need two more args <key> <val>
254     if (argv.size() != 2) {
255       derr << "'set' requires two arguments <trimmed_pos|expire_pos|write_pos> <value>" << dendl;
256       return -EINVAL;
257     }
258
259     std::string const field_name = *arg;
260     arg = argv.erase(arg);
261
262     std::string const value_str = *arg;
263     arg = argv.erase(arg);
264     assert(argv.empty());
265
266     std::string parse_err;
267     uint64_t new_val = strict_strtoll(value_str.c_str(), 0, &parse_err);
268     if (!parse_err.empty()) {
269       derr << "Invalid value '" << value_str << "': " << parse_err << dendl;
270       return -EINVAL;
271     }
272
273     uint64_t *field = NULL;
274     if (field_name == "trimmed_pos") {
275       field = &(js.header->trimmed_pos);
276     } else if (field_name == "expire_pos") {
277       field = &(js.header->expire_pos);
278     } else if (field_name == "write_pos") {
279       field = &(js.header->write_pos);
280     } else {
281       derr << "Invalid field '" << field_name << "'" << dendl;
282       return -EINVAL;
283     }
284
285     std::cout << "Updating " << field_name << std::hex << " 0x" << *field << " -> 0x" << new_val << std::dec << std::endl;
286     *field = new_val;
287
288     dout(4) << "Writing object..." << dendl;
289     bufferlist header_bl;
290     ::encode(*(js.header), header_bl);
291     output.write_full(js.obj_name(0), header_bl);
292     dout(4) << "Write complete." << dendl;
293     std::cout << "Successfully updated header." << std::endl;
294   } else {
295     derr << "Bad header command '" << command << "'" << dendl;
296     return -EINVAL;
297   }
298
299   return 0;
300 }
301
302
303 /**
304  * Parse arguments and execute for 'event' mode
305  *
306  * This is for operations that act on LogEvents within the log
307  */
308 int JournalTool::main_event(std::vector<const char*> &argv)
309 {
310   int r;
311
312   std::vector<const char*>::iterator arg = argv.begin();
313
314   std::string command = *(arg++);
315   if (command != "get" && command != "splice" && command != "recover_dentries") {
316     derr << "Unknown argument '" << command << "'" << dendl;
317     usage();
318     return -EINVAL;
319   }
320
321   if (arg == argv.end()) {
322     derr << "Incomplete command line" << dendl;
323     usage();
324     return -EINVAL;
325   }
326
327   // Parse filter options
328   // ====================
329   JournalFilter filter;
330   r = filter.parse_args(argv, arg);
331   if (r) {
332     return r;
333   }
334
335   // Parse output options
336   // ====================
337   if (arg == argv.end()) {
338     derr << "Missing output command" << dendl;
339     usage();
340   }
341   std::string output_style = *(arg++);
342   if (output_style != "binary" && output_style != "json" &&
343       output_style != "summary" && output_style != "list") {
344       derr << "Unknown argument: '" << output_style << "'" << dendl;
345       usage();
346       return -EINVAL;
347   }
348
349   std::string output_path = "dump";
350   while(arg != argv.end()) {
351     std::string arg_str;
352     if (ceph_argparse_witharg(argv, arg, &arg_str, "--path", (char*)NULL)) {
353       output_path = arg_str;
354     } else if (ceph_argparse_witharg(argv, arg, &arg_str, "--alternate-pool",
355                                      nullptr)) {
356       dout(1) << "Using alternate pool " << arg_str << dendl;
357       int r = rados.ioctx_create(arg_str.c_str(), output);
358       assert(r == 0);
359       other_pool = true;
360     } else {
361       derr << "Unknown argument: '" << *arg << "'" << dendl;
362       usage();
363       return -EINVAL;
364     }
365   }
366
367   // Execute command
368   // ===============
369   JournalScanner js(input, rank, filter);
370   if (command == "get") {
371     r = js.scan();
372     if (r) {
373       derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
374       return r;
375     }
376   } else if (command == "recover_dentries") {
377     r = js.scan();
378     if (r) {
379       derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
380       return r;
381     }
382
383     bool dry_run = false;
384     if (arg != argv.end() && ceph_argparse_flag(argv, arg, "--dry_run", (char*)NULL)) {
385       dry_run = true;
386     }
387
388     /**
389      * Iterate over log entries, attempting to scavenge from each one
390      */
391     std::set<inodeno_t> consumed_inos;
392     for (JournalScanner::EventMap::iterator i = js.events.begin();
393          i != js.events.end(); ++i) {
394       LogEvent *le = i->second.log_event;
395       EMetaBlob const *mb = le->get_metablob();
396       if (mb) {
397         int scav_r = recover_dentries(*mb, dry_run, &consumed_inos);
398         if (scav_r) {
399           dout(1) << "Error processing event 0x" << std::hex << i->first << std::dec
400                   << ": " << cpp_strerror(scav_r) << ", continuing..." << dendl;
401           if (r == 0) {
402             r = scav_r;
403           }
404           // Our goal is to read all we can, so don't stop on errors, but
405           // do record them for possible later output
406           js.errors.insert(std::make_pair(i->first,
407                 JournalScanner::EventError(scav_r, cpp_strerror(r))));
408         }
409       }
410     }
411
412     /**
413      * Update InoTable to reflect any inode numbers consumed during scavenge
414      */
415     dout(4) << "consumed " << consumed_inos.size() << " inodes" << dendl;
416     if (consumed_inos.size() && !dry_run) {
417       int consume_r = consume_inos(consumed_inos);
418       if (consume_r) {
419         dout(1) << "Error updating InoTable for " << consumed_inos.size()
420                 << " consume inos: " << cpp_strerror(consume_r) << dendl;
421         if (r == 0) {
422           r = consume_r;
423         }
424       }
425     }
426
427     // Remove consumed dentries from lost+found.
428     if (other_pool && !dry_run) {
429       std::set<std::string> found;
430
431       for (auto i : consumed_inos) {
432         char s[20];
433
434         snprintf(s, sizeof(s), "%llx_head", (unsigned long long) i);
435         dout(20) << "removing " << s << dendl;
436         found.insert(std::string(s));
437       }
438
439       object_t frag_oid;
440       frag_oid = InodeStore::get_object_name(CEPH_INO_LOST_AND_FOUND,
441                                              frag_t(), "");
442       output.omap_rm_keys(frag_oid.name, found);
443     }
444   } else if (command == "splice") {
445     r = js.scan();
446     if (r) {
447       derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
448       return r;
449     }
450
451     uint64_t start, end;
452     if (filter.get_range(start, end)) {
453       // Special case for range filter: erase a numeric range in the log
454       uint64_t range = end - start;
455       int r = erase_region(js, start, range);
456       if (r) {
457         derr << "Failed to erase region 0x" << std::hex << start << "~0x" << range << std::dec
458              << ": " << cpp_strerror(r) << dendl;
459         return r;
460       }
461     } else {
462       // General case: erase a collection of individual entries in the log
463       for (JournalScanner::EventMap::iterator i = js.events.begin(); i != js.events.end(); ++i) {
464         dout(4) << "Erasing offset 0x" << std::hex << i->first << std::dec << dendl;
465
466         int r = erase_region(js, i->first, i->second.raw_size);
467         if (r) {
468           derr << "Failed to erase event 0x" << std::hex << i->first << std::dec
469                << ": " << cpp_strerror(r) << dendl;
470           return r;
471         }
472       }
473     }
474
475
476   } else {
477     derr << "Unknown argument '" << command << "'" << dendl;
478     usage();
479     return -EINVAL;
480   }
481
482   // Generate output
483   // ===============
484   EventOutput output(js, output_path);
485   int output_result = 0;
486   if (output_style == "binary") {
487       output_result = output.binary();
488   } else if (output_style == "json") {
489       output_result = output.json();
490   } else if (output_style == "summary") {
491       output.summary();
492   } else if (output_style == "list") {
493       output.list();
494   } else {
495     std::cerr << "Bad output command '" << output_style << "'" << std::endl;
496     return -EINVAL;
497   }
498
499   if (output_result != 0) {
500     std::cerr << "Error writing output: " << cpp_strerror(output_result) << std::endl;
501   }
502
503   return output_result;
504 }
505
506 /**
507  * Provide the user with information about the condition of the journal,
508  * especially indicating what range of log events is available and where
509  * any gaps or corruptions in the journal are.
510  */
511 int JournalTool::journal_inspect()
512 {
513   int r;
514
515   JournalFilter filter;
516   JournalScanner js(input, rank, filter);
517   r = js.scan();
518   if (r) {
519     std::cerr << "Failed to scan journal (" << cpp_strerror(r) << ")" << std::endl;
520     return r;
521   }
522
523   js.report(std::cout);
524
525   return 0;
526 }
527
528
529 /**
530  * Attempt to export a binary dump of the journal.
531  *
532  * This is allowed to fail if the header is malformed or there are
533  * objects inaccessible, in which case the user would have to fall
534  * back to manually listing RADOS objects and extracting them, which
535  * they can do with the ``rados`` CLI.
536  */
537 int JournalTool::journal_export(std::string const &path, bool import)
538 {
539   int r = 0;
540   JournalScanner js(input, rank);
541
542   if (!import) {
543     /*
544      * If doing an export, first check that the header is valid and
545      * no objects are missing before trying to dump
546      */
547     r = js.scan();
548     if (r < 0) {
549       derr << "Unable to scan journal, assuming badly damaged" << dendl;
550       return r;
551     }
552     if (!js.is_readable()) {
553       derr << "Journal not readable, attempt object-by-object dump with `rados`" << dendl;
554       return -EIO;
555     }
556   }
557
558   /*
559    * Assuming we can cleanly read the journal data, dump it out to a file
560    */
561   {
562     Dumper dumper;
563     r = dumper.init(mds_role_t(role_selector.get_ns(), rank));
564     if (r < 0) {
565       derr << "dumper::init failed: " << cpp_strerror(r) << dendl;
566       return r;
567     }
568     if (import) {
569       r = dumper.undump(path.c_str());
570     } else {
571       r = dumper.dump(path.c_str());
572     }
573     dumper.shutdown();
574   }
575
576   return r;
577 }
578
579
580 /**
581  * Truncate journal and insert EResetJournal
582  */
583 int JournalTool::journal_reset(bool hard)
584 {
585   int r = 0;
586   Resetter resetter;
587   r = resetter.init();
588   if (r < 0) {
589     derr << "resetter::init failed: " << cpp_strerror(r) << dendl;
590     return r;
591   }
592
593   if (hard) {
594     r = resetter.reset_hard(mds_role_t(role_selector.get_ns(), rank));
595   } else {
596     r = resetter.reset(mds_role_t(role_selector.get_ns(), rank));
597   }
598   resetter.shutdown();
599
600   return r;
601 }
602
603
604 /**
605  * Selective offline replay which only reads out dentries and writes
606  * them to the backing store iff their version is > what is currently
607  * in the backing store.
608  *
609  * In order to write dentries to the backing store, we may create the
610  * required enclosing dirfrag objects.
611  *
612  * Test this by running scavenge on an unflushed journal, then nuking
613  * it offline, then starting an MDS and seeing that the dentries are
614  * visible.
615  *
616  * @param metablob an EMetaBlob retrieved from the journal
617  * @param dry_run if true, do no writes to RADOS
618  * @param consumed_inos output, populated with any inos inserted
619  * @returns 0 on success, else negative error code
620  */
621 int JournalTool::recover_dentries(
622     EMetaBlob const &metablob,
623     bool const dry_run,
624     std::set<inodeno_t> *consumed_inos)
625 {
626   assert(consumed_inos != NULL);
627
628   int r = 0;
629
630   // Replay fullbits (dentry+inode)
631   for (list<dirfrag_t>::const_iterator lp = metablob.lump_order.begin();
632        lp != metablob.lump_order.end(); ++lp)
633   {
634     dirfrag_t const &frag = *lp;
635     EMetaBlob::dirlump const &lump = metablob.lump_map.find(frag)->second;
636     lump._decode_bits();
637     object_t frag_oid = InodeStore::get_object_name(frag.ino, frag.frag, "");
638
639     dout(4) << "inspecting lump " << frag_oid.name << dendl;
640
641
642     // We will record old fnode version for use in hard link handling
643     // If we don't read an old fnode, take version as zero and write in
644     // all hardlinks we find.
645     version_t old_fnode_version = 0;
646
647     // Update fnode in omap header of dirfrag object
648     bool write_fnode = false;
649     bufferlist old_fnode_bl;
650     r = input.omap_get_header(frag_oid.name, &old_fnode_bl);
651     if (r == -ENOENT) {
652       // Creating dirfrag from scratch
653       dout(4) << "failed to read OMAP header from directory fragment "
654         << frag_oid.name << " " << cpp_strerror(r) << dendl;
655       write_fnode = true;
656       // Note: creating the dirfrag *without* a backtrace, relying on
657       // MDS to regenerate backtraces on read or in FSCK
658     } else if (r == 0) {
659       // Conditionally update existing omap header
660       fnode_t old_fnode;
661       bufferlist::iterator old_fnode_iter = old_fnode_bl.begin();
662       try {
663         old_fnode.decode(old_fnode_iter);
664         dout(4) << "frag " << frag_oid.name << " fnode old v" <<
665           old_fnode.version << " vs new v" << lump.fnode.version << dendl;
666         old_fnode_version = old_fnode.version;
667         write_fnode = old_fnode_version < lump.fnode.version;
668       } catch (const buffer::error &err) {
669         dout(1) << "frag " << frag_oid.name
670                 << " is corrupt, overwriting" << dendl;
671         write_fnode = true;
672       }
673     } else {
674       // Unexpected error
675       dout(4) << "failed to read OMAP header from directory fragment "
676         << frag_oid.name << " " << cpp_strerror(r) << dendl;
677       return r;
678     }
679
680     if ((other_pool || write_fnode) && !dry_run) {
681       dout(4) << "writing fnode to omap header" << dendl;
682       bufferlist fnode_bl;
683       lump.fnode.encode(fnode_bl);
684       if (!other_pool || frag.ino >= MDS_INO_SYSTEM_BASE) {
685         r = output.omap_set_header(frag_oid.name, fnode_bl);
686       }
687       if (r != 0) {
688         derr << "Failed to write fnode for frag object "
689              << frag_oid.name << dendl;
690         return r;
691       }
692     }
693
694     std::set<std::string> read_keys;
695
696     // Compose list of potentially-existing dentries we would like to fetch
697     list<ceph::shared_ptr<EMetaBlob::fullbit> > const &fb_list =
698       lump.get_dfull();
699     for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi =
700         fb_list.begin(); fbi != fb_list.end(); ++fbi) {
701       EMetaBlob::fullbit const &fb = *(*fbi);
702
703       // Get a key like "foobar_head"
704       std::string key;
705       dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
706       dn_key.encode(key);
707       read_keys.insert(key);
708     }
709
710     list<EMetaBlob::remotebit> const &rb_list =
711       lump.get_dremote();
712     for (list<EMetaBlob::remotebit>::const_iterator rbi =
713         rb_list.begin(); rbi != rb_list.end(); ++rbi) {
714       EMetaBlob::remotebit const &rb = *rbi;
715
716       // Get a key like "foobar_head"
717       std::string key;
718       dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
719       dn_key.encode(key);
720       read_keys.insert(key);
721     }
722
723     list<EMetaBlob::nullbit> const &nb_list = lump.get_dnull();
724     for (auto& nb : nb_list) {
725       // Get a key like "foobar_head"
726       std::string key;
727       dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
728       dn_key.encode(key);
729       read_keys.insert(key);
730     }
731
732     // Perform bulk read of existing dentries
733     std::map<std::string, bufferlist> read_vals;
734     r = input.omap_get_vals_by_keys(frag_oid.name, read_keys, &read_vals);
735     if (r == -ENOENT && other_pool) {
736       r = output.omap_get_vals_by_keys(frag_oid.name, read_keys, &read_vals);
737     }
738     if (r != 0) {
739       derr << "unexpected error reading fragment object "
740            << frag_oid.name << ": " << cpp_strerror(r) << dendl;
741       return r;
742     }
743
744     // Compose list of dentries we will write back
745     std::map<std::string, bufferlist> write_vals;
746     for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi =
747         fb_list.begin(); fbi != fb_list.end(); ++fbi) {
748       EMetaBlob::fullbit const &fb = *(*fbi);
749
750       // Get a key like "foobar_head"
751       std::string key;
752       dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
753       dn_key.encode(key);
754
755       dout(4) << "inspecting fullbit " << frag_oid.name << "/" << fb.dn
756         << dendl;
757       bool write_dentry = false;
758       if (read_vals.find(key) == read_vals.end()) {
759         dout(4) << "dentry did not already exist, will create" << dendl;
760         write_dentry = true;
761       } else {
762         dout(4) << "dentry " << key << " existed already" << dendl;
763         dout(4) << "dentry exists, checking versions..." << dendl;
764         bufferlist &old_dentry = read_vals[key];
765         // Decode dentry+inode
766         bufferlist::iterator q = old_dentry.begin();
767
768         snapid_t dnfirst;
769         ::decode(dnfirst, q);
770         char dentry_type;
771         ::decode(dentry_type, q);
772
773         if (dentry_type == 'L') {
774           // leave write_dentry false, we have no version to
775           // compare with in a hardlink, so it's not safe to
776           // squash over it with what's in this fullbit
777           dout(10) << "Existing remote inode in slot to be (maybe) written "
778                << "by a full inode from the journal dn '" << fb.dn.c_str()
779                << "' with lump fnode version " << lump.fnode.version
780                << "vs existing fnode version " << old_fnode_version << dendl;
781           write_dentry = old_fnode_version < lump.fnode.version;
782         } else if (dentry_type == 'I') {
783           // Read out inode version to compare with backing store
784           InodeStore inode;
785           inode.decode_bare(q);
786           dout(4) << "decoded embedded inode version "
787             << inode.inode.version << " vs fullbit version "
788             << fb.inode.version << dendl;
789           if (inode.inode.version < fb.inode.version) {
790             write_dentry = true;
791           }
792         } else {
793           dout(4) << "corrupt dentry in backing store, overwriting from "
794             "journal" << dendl;
795           write_dentry = true;
796         }
797       }
798
799       if ((other_pool || write_dentry) && !dry_run) {
800         dout(4) << "writing I dentry " << key << " into frag "
801           << frag_oid.name << dendl;
802
803         // Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
804         bufferlist dentry_bl;
805         ::encode(fb.dnfirst, dentry_bl);
806         ::encode('I', dentry_bl);
807         encode_fullbit_as_inode(fb, true, &dentry_bl);
808
809         // Record for writing to RADOS
810         write_vals[key] = dentry_bl;
811         consumed_inos->insert(fb.inode.ino);
812       }
813     }
814
815     for (list<EMetaBlob::remotebit>::const_iterator rbi =
816         rb_list.begin(); rbi != rb_list.end(); ++rbi) {
817       EMetaBlob::remotebit const &rb = *rbi;
818
819       // Get a key like "foobar_head"
820       std::string key;
821       dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
822       dn_key.encode(key);
823
824       dout(4) << "inspecting remotebit " << frag_oid.name << "/" << rb.dn
825         << dendl;
826       bool write_dentry = false;
827       if (read_vals.find(key) == read_vals.end()) {
828         dout(4) << "dentry did not already exist, will create" << dendl;
829         write_dentry = true;
830       } else {
831         dout(4) << "dentry " << key << " existed already" << dendl;
832         dout(4) << "dentry exists, checking versions..." << dendl;
833         bufferlist &old_dentry = read_vals[key];
834         // Decode dentry+inode
835         bufferlist::iterator q = old_dentry.begin();
836
837         snapid_t dnfirst;
838         ::decode(dnfirst, q);
839         char dentry_type;
840         ::decode(dentry_type, q);
841
842         if (dentry_type == 'L') {
843           dout(10) << "Existing hardlink inode in slot to be (maybe) written "
844                << "by a remote inode from the journal dn '" << rb.dn.c_str()
845                << "' with lump fnode version " << lump.fnode.version
846                << "vs existing fnode version " << old_fnode_version << dendl;
847           write_dentry = old_fnode_version < lump.fnode.version;
848         } else if (dentry_type == 'I') {
849           dout(10) << "Existing full inode in slot to be (maybe) written "
850                << "by a remote inode from the journal dn '" << rb.dn.c_str()
851                << "' with lump fnode version " << lump.fnode.version
852                << "vs existing fnode version " << old_fnode_version << dendl;
853           write_dentry = old_fnode_version < lump.fnode.version;
854         } else {
855           dout(4) << "corrupt dentry in backing store, overwriting from "
856             "journal" << dendl;
857           write_dentry = true;
858         }
859       }
860
861       if ((other_pool || write_dentry) && !dry_run) {
862         dout(4) << "writing L dentry " << key << " into frag "
863           << frag_oid.name << dendl;
864
865         // Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
866         bufferlist dentry_bl;
867         ::encode(rb.dnfirst, dentry_bl);
868         ::encode('L', dentry_bl);
869         ::encode(rb.ino, dentry_bl);
870         ::encode(rb.d_type, dentry_bl);
871
872         // Record for writing to RADOS
873         write_vals[key] = dentry_bl;
874         consumed_inos->insert(rb.ino);
875       }
876     }
877
878     std::set<std::string> null_vals;
879     for (auto& nb : nb_list) {
880       std::string key;
881       dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
882       dn_key.encode(key);
883
884       dout(4) << "inspecting nullbit " << frag_oid.name << "/" << nb.dn
885         << dendl;
886
887       auto it = read_vals.find(key);
888       if (it != read_vals.end()) {
889         dout(4) << "dentry exists, will remove" << dendl;
890
891         bufferlist::iterator q = it->second.begin();
892         snapid_t dnfirst;
893         ::decode(dnfirst, q);
894         char dentry_type;
895         ::decode(dentry_type, q);
896
897         bool remove_dentry = false;
898         if (dentry_type == 'L') {
899           dout(10) << "Existing hardlink inode in slot to be (maybe) removed "
900             << "by null journal dn '" << nb.dn.c_str()
901             << "' with lump fnode version " << lump.fnode.version
902             << "vs existing fnode version " << old_fnode_version << dendl;
903           remove_dentry = old_fnode_version < lump.fnode.version;
904         } else if (dentry_type == 'I') {
905           dout(10) << "Existing full inode in slot to be (maybe) removed "
906             << "by null journal dn '" << nb.dn.c_str()
907             << "' with lump fnode version " << lump.fnode.version
908             << "vs existing fnode version " << old_fnode_version << dendl;
909           remove_dentry = old_fnode_version < lump.fnode.version;
910         } else {
911           dout(4) << "corrupt dentry in backing store, will remove" << dendl;
912           remove_dentry = true;
913         }
914
915         if (remove_dentry)
916           null_vals.insert(key);
917       }
918     }
919
920     // Write back any new/changed dentries
921     if (!write_vals.empty()) {
922       r = output.omap_set(frag_oid.name, write_vals);
923       if (r != 0) {
924         derr << "error writing dentries to " << frag_oid.name
925              << ": " << cpp_strerror(r) << dendl;
926         return r;
927       }
928     }
929
930     // remove any null dentries
931     if (!null_vals.empty()) {
932       r = output.omap_rm_keys(frag_oid.name, null_vals);
933       if (r != 0) {
934         derr << "error removing dentries from " << frag_oid.name
935           << ": " << cpp_strerror(r) << dendl;
936         return r;
937       }
938     }
939   }
940
941   /* Now that we've looked at the dirlumps, we finally pay attention to
942    * the roots (i.e. inodes without ancestry).  This is necessary in order
943    * to pick up dirstat updates on ROOT_INO.  dirstat updates are functionally
944    * important because clients use them to infer completeness
945    * of directories
946    */
947   for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator p =
948        metablob.roots.begin(); p != metablob.roots.end(); ++p) {
949     EMetaBlob::fullbit const &fb = *(*p);
950     inodeno_t ino = fb.inode.ino;
951     dout(4) << "updating root 0x" << std::hex << ino << std::dec << dendl;
952
953     object_t root_oid = InodeStore::get_object_name(ino, frag_t(), ".inode");
954     dout(4) << "object id " << root_oid.name << dendl;
955
956     bool write_root_ino = false;
957     bufferlist old_root_ino_bl;
958     r = input.read(root_oid.name, old_root_ino_bl, (1<<22), 0);
959     if (r == -ENOENT) {
960       dout(4) << "root does not exist, will create" << dendl;
961       write_root_ino = true;
962     } else if (r >= 0) {
963       r = 0;
964       InodeStore old_inode;
965       dout(4) << "root exists, will modify (" << old_root_ino_bl.length()
966         << ")" << dendl;
967       bufferlist::iterator inode_bl_iter = old_root_ino_bl.begin(); 
968       std::string magic;
969       ::decode(magic, inode_bl_iter);
970       if (magic == CEPH_FS_ONDISK_MAGIC) {
971         dout(4) << "magic ok" << dendl;
972         old_inode.decode(inode_bl_iter);
973
974         if (old_inode.inode.version < fb.inode.version) {
975           write_root_ino = true;
976         }
977       } else {
978         dout(4) << "magic bad: '" << magic << "'" << dendl;
979         write_root_ino = true;
980       }
981     } else {
982       derr << "error reading root inode object " << root_oid.name
983             << ": " << cpp_strerror(r) << dendl;
984       return r;
985     }
986
987     if (write_root_ino && !dry_run) {
988       dout(4) << "writing root ino " << root_oid.name
989                << " version " << fb.inode.version << dendl;
990
991       // Compose: root ino format is magic,InodeStore(bare=false)
992       bufferlist new_root_ino_bl;
993       ::encode(std::string(CEPH_FS_ONDISK_MAGIC), new_root_ino_bl);
994       encode_fullbit_as_inode(fb, false, &new_root_ino_bl);
995
996       // Write to RADOS
997       r = output.write_full(root_oid.name, new_root_ino_bl);
998       if (r != 0) {
999         derr << "error writing inode object " << root_oid.name
1000               << ": " << cpp_strerror(r) << dendl;
1001         return r;
1002       }
1003     }
1004   }
1005
1006   return r;
1007 }
1008
1009
1010 /**
1011  * Erase a region of the log by overwriting it with ENoOp
1012  *
1013  */
1014 int JournalTool::erase_region(JournalScanner const &js, uint64_t const pos, uint64_t const length)
1015 {
1016   // To erase this region, we use our preamble, the encoding overhead
1017   // of an ENoOp, and our trailing start ptr.  Calculate how much padding
1018   // is needed inside the ENoOp to make up the difference.
1019   bufferlist tmp;
1020   ENoOp enoop(0);
1021   enoop.encode_with_header(tmp, CEPH_FEATURES_SUPPORTED_DEFAULT);
1022
1023   dout(4) << "erase_region " << pos << " len=" << length << dendl;
1024
1025   // FIXME: get the preamble/postamble length via JournalStream
1026   int32_t padding = length - tmp.length() - sizeof(uint32_t) - sizeof(uint64_t) - sizeof(uint64_t);
1027   dout(4) << "erase_region padding=0x" << std::hex << padding << std::dec << dendl;
1028
1029   if (padding < 0) {
1030     derr << "Erase region " << length << " too short" << dendl;
1031     return -EINVAL;
1032   }
1033
1034   // Serialize an ENoOp with the correct amount of padding
1035   enoop = ENoOp(padding);
1036   bufferlist entry;
1037   enoop.encode_with_header(entry, CEPH_FEATURES_SUPPORTED_DEFAULT);
1038   JournalStream stream(JOURNAL_FORMAT_RESILIENT);
1039
1040   // Serialize region of log stream
1041   bufferlist log_data;
1042   stream.write(entry, &log_data, pos);
1043
1044   dout(4) << "erase_region data length " << log_data.length() << dendl;
1045   assert(log_data.length() == length);
1046
1047   // Write log stream region to RADOS
1048   // FIXME: get object size somewhere common to scan_events
1049   uint32_t object_size = g_conf->mds_log_segment_size;
1050   if (object_size == 0) {
1051     // Default layout object size
1052     object_size = file_layout_t::get_default().object_size;
1053   }
1054
1055   uint64_t write_offset = pos;
1056   uint64_t obj_offset = (pos / object_size);
1057   int r = 0;
1058   while(log_data.length()) {
1059     std::string const oid = js.obj_name(obj_offset);
1060     uint32_t offset_in_obj = write_offset % object_size;
1061     uint32_t write_len = min(log_data.length(), object_size - offset_in_obj);
1062
1063     r = output.write(oid, log_data, write_len, offset_in_obj);
1064     if (r < 0) {
1065       return r;
1066     } else {
1067       dout(4) << "Wrote " << write_len << " bytes to " << oid << dendl;
1068       r = 0;
1069     }
1070      
1071     log_data.splice(0, write_len);
1072     write_offset += write_len;
1073     obj_offset++;
1074   }
1075
1076   return r;
1077 }
1078
1079 /**
1080  * Given an EMetaBlob::fullbit containing an inode, write out
1081  * the encoded inode in the format used by InodeStore (i.e. the
1082  * backing store format)
1083  *
1084  * This is a distant cousin of EMetaBlob::fullbit::update_inode, but for use
1085  * on an offline InodeStore instance.  It's way simpler, because we are just
1086  * uncritically hauling the data between structs.
1087  *
1088  * @param fb a fullbit extracted from a journal entry
1089  * @param bare if true, leave out [EN|DE]CODE_START decoration
1090  * @param out_bl output, write serialized inode to this bufferlist
1091  */
1092 void JournalTool::encode_fullbit_as_inode(
1093   const EMetaBlob::fullbit &fb,
1094   const bool bare,
1095   bufferlist *out_bl)
1096 {
1097   assert(out_bl != NULL);
1098
1099   // Compose InodeStore
1100   InodeStore new_inode;
1101   new_inode.inode = fb.inode;
1102   new_inode.xattrs = fb.xattrs;
1103   new_inode.dirfragtree = fb.dirfragtree;
1104   new_inode.snap_blob = fb.snapbl;
1105   new_inode.symlink = fb.symlink;
1106   new_inode.old_inodes = fb.old_inodes;
1107
1108   // Serialize InodeStore
1109   if (bare) {
1110     new_inode.encode_bare(*out_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
1111   } else {
1112     new_inode.encode(*out_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
1113   }
1114 }
1115
1116 /**
1117  * Given a list of inode numbers known to be in use by
1118  * inodes in the backing store, ensure that none of these
1119  * numbers are listed as free in the InoTables in the
1120  * backing store.
1121  *
1122  * Used after injecting inodes into the backing store, to
1123  * ensure that the same inode numbers are not subsequently
1124  * used for new files during ordinary operation.
1125  *
1126  * @param inos list of inode numbers to be removed from
1127  *             free lists in InoTables
1128  * @returns 0 on success, else negative error code
1129  */
1130 int JournalTool::consume_inos(const std::set<inodeno_t> &inos)
1131 {
1132   int r = 0;
1133
1134   // InoTable is a per-MDS structure, so iterate over assigned ranks
1135   auto fs = fsmap->get_filesystem(role_selector.get_ns());
1136   std::set<mds_rank_t> in_ranks;
1137   fs->mds_map.get_mds_set(in_ranks);
1138
1139   for (std::set<mds_rank_t>::iterator rank_i = in_ranks.begin();
1140       rank_i != in_ranks.end(); ++rank_i)
1141   {
1142     // Compose object name
1143     std::ostringstream oss;
1144     oss << "mds" << *rank_i << "_inotable";
1145     object_t inotable_oid = object_t(oss.str());
1146
1147     // Read object
1148     bufferlist inotable_bl;
1149     int read_r = input.read(inotable_oid.name, inotable_bl, (1<<22), 0);
1150     if (read_r < 0) {
1151       // Things are really bad if we can't read inotable.  Beyond our powers.
1152       derr << "unable to read inotable '" << inotable_oid.name << "': "
1153         << cpp_strerror(read_r) << dendl;
1154       r = r ? r : read_r;
1155       continue;
1156     }
1157
1158     // Deserialize InoTable
1159     version_t inotable_ver;
1160     bufferlist::iterator q = inotable_bl.begin();
1161     ::decode(inotable_ver, q);
1162     InoTable ino_table(NULL);
1163     ino_table.decode(q);
1164     
1165     // Update InoTable in memory
1166     bool inotable_modified = false;
1167     for (std::set<inodeno_t>::iterator i = inos.begin();
1168         i != inos.end(); ++i)
1169     {
1170       const inodeno_t ino = *i;
1171       if (ino_table.force_consume(ino)) {
1172         dout(4) << "Used ino 0x" << std::hex << ino << std::dec
1173           << " requires inotable update" << dendl;
1174         inotable_modified = true;
1175       }
1176     }
1177
1178     // Serialize and write InoTable
1179     if (inotable_modified) {
1180       inotable_ver += 1;
1181       dout(4) << "writing modified inotable version " << inotable_ver << dendl;
1182       bufferlist inotable_new_bl;
1183       ::encode(inotable_ver, inotable_new_bl);
1184       ino_table.encode_state(inotable_new_bl);
1185       int write_r = output.write_full(inotable_oid.name, inotable_new_bl);
1186       if (write_r != 0) {
1187         derr << "error writing modified inotable " << inotable_oid.name
1188           << ": " << cpp_strerror(write_r) << dendl;
1189         r = r ? r : read_r;
1190         continue;
1191       }
1192     }
1193   }
1194
1195   return r;
1196 }
1197