Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / cephfs / DataScan.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "include/compat.h"
16 #include "common/errno.h"
17 #include "common/ceph_argparse.h"
18 #include <fstream>
19 #include "include/util.h"
20
21 #include "mds/CInode.h"
22 #include "cls/cephfs/cls_cephfs_client.h"
23
24 #include "PgFiles.h"
25 #include "DataScan.h"
26 #include "include/compat.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
30 #undef dout_prefix
31 #define dout_prefix *_dout << "datascan." << __func__ << ": "
32
33 void DataScan::usage()
34 {
35   std::cout << "Usage: \n"
36     << "  cephfs-data-scan init [--force-init]\n"
37     << "  cephfs-data-scan scan_extents [--force-pool] [--worker_n N --worker_m M] <data pool name>\n"
38     << "  cephfs-data-scan scan_inodes [--force-pool] [--force-corrupt] [--worker_n N --worker_m M] <data pool name>\n"
39     << "  cephfs-data-scan pg_files <path> <pg id> [<pg id>...]\n"
40     << "  cephfs-data-scan scan_links\n"
41     << "\n"
42     << "    --force-corrupt: overrite apparently corrupt structures\n"
43     << "    --force-init: write root inodes even if they exist\n"
44     << "    --force-pool: use data pool even if it is not in FSMap\n"
45     << "    --worker_m: Maximum number of workers\n"
46     << "    --worker_n: Worker number, range 0-(worker_m-1)\n"
47     << "\n"
48     << "  cephfs-data-scan scan_frags [--force-corrupt]\n"
49     << "  cephfs-data-scan cleanup <data pool name>\n"
50     << std::endl;
51
52   generic_client_usage();
53 }
54
55 bool DataScan::parse_kwarg(
56     const std::vector<const char*> &args,
57     std::vector<const char *>::const_iterator &i,
58     int *r)
59 {
60   if (i + 1 == args.end()) {
61     return false;
62   }
63
64   const std::string arg(*i);
65   const std::string val(*(i + 1));
66
67   if (arg == std::string("--output-dir")) {
68     if (driver != NULL) {
69       derr << "Unexpected --output-dir: output already selected!" << dendl;
70       *r = -EINVAL;
71       return false;
72     }
73     dout(4) << "Using local file output to '" << val << "'" << dendl;
74     driver = new LocalFileDriver(val, data_io);
75     return true;
76   } else if (arg == std::string("--worker_n")) {
77     std::string err;
78     n = strict_strtoll(val.c_str(), 10, &err);
79     if (!err.empty()) {
80       std::cerr << "Invalid worker number '" << val << "'" << std::endl;
81       *r = -EINVAL;
82       return false;
83     }
84     return true;
85   } else if (arg == std::string("--worker_m")) {
86     std::string err;
87     m = strict_strtoll(val.c_str(), 10, &err);
88     if (!err.empty()) {
89       std::cerr << "Invalid worker count '" << val << "'" << std::endl;
90       *r = -EINVAL;
91       return false;
92     }
93     return true;
94   } else if (arg == std::string("--filter-tag")) {
95     filter_tag = val;
96     dout(10) << "Applying tag filter: '" << filter_tag << "'" << dendl;
97     return true;
98   } else if (arg == std::string("--filesystem")) {
99     std::shared_ptr<const Filesystem> fs;
100     *r = fsmap->parse_filesystem(val, &fs);
101     if (*r != 0) {
102       std::cerr << "Invalid filesystem '" << val << "'" << std::endl;
103       return false;
104     }
105     fscid = fs->fscid;
106     return true;
107   } else if (arg == std::string("--alternate-pool")) {
108     metadata_pool_name = val;
109     return true;
110   } else {
111     return false;
112   }
113 }
114
115 bool DataScan::parse_arg(
116     const std::vector<const char*> &args,
117     std::vector<const char *>::const_iterator &i)
118 {
119   const std::string arg(*i);
120   if (arg == "--force-pool") {
121     force_pool = true;
122     return true;
123   } else if (arg == "--force-corrupt") {
124     force_corrupt = true;
125     return true;
126   } else if (arg == "--force-init") {
127     force_init = true;
128     return true;
129   } else {
130     return false;
131   }
132 }
133
134 int DataScan::main(const std::vector<const char*> &args)
135 {
136   // Parse args
137   // ==========
138   if (args.size() < 1) {
139     usage();
140     return -EINVAL;
141   }
142
143   // Common RADOS init: open metadata pool
144   // =====================================
145   librados::Rados rados;
146   int r = rados.init_with_context(g_ceph_context);
147   if (r < 0) {
148     derr << "RADOS unavailable" << dendl;
149     return r;
150   }
151
152   std::string const &command = args[0];
153   std::string data_pool_name;
154
155   std::string pg_files_path;
156   std::set<pg_t> pg_files_pgs;
157
158   // Consume any known --key val or --flag arguments
159   for (std::vector<const char *>::const_iterator i = args.begin() + 1;
160        i != args.end(); ++i) {
161     if (parse_kwarg(args, i, &r)) {
162       // Skip the kwarg value field
163       ++i;
164       continue;
165     } else if (r) {
166       return r;
167     }
168
169     if (parse_arg(args, i)) {
170       continue;
171     }
172
173     // Trailing positional argument
174     if (i + 1 == args.end() &&
175         (command == "scan_inodes"
176          || command == "scan_extents"
177          || command == "cleanup")) {
178       data_pool_name = *i;
179       continue;
180     }
181
182     if (command == "pg_files") {
183       if (i == args.begin() + 1) {
184         pg_files_path = *i;
185         continue;
186       } else {
187         pg_t pg;
188         bool parsed = pg.parse(*i);
189         if (!parsed) {
190           std::cerr << "Invalid PG '" << *i << "'" << std::endl;
191           return -EINVAL;
192         } else {
193           pg_files_pgs.insert(pg);
194           continue;
195         }
196       }
197
198     }
199
200     // Fall through: unhandled
201     std::cerr << "Unknown argument '" << *i << "'" << std::endl;
202     return -EINVAL;
203   }
204
205   // If caller didn't specify a namespace, try to pick
206   // one if only one exists
207   if (fscid == FS_CLUSTER_ID_NONE) {
208     if (fsmap->filesystem_count() == 1) {
209       fscid = fsmap->get_filesystem()->fscid;
210     } else {
211       std::cerr << "Specify a filesystem with --filesystem" << std::endl;
212       return -EINVAL;
213     }
214   }
215   auto fs =  fsmap->get_filesystem(fscid);
216   assert(fs != nullptr);
217
218   // Default to output to metadata pool
219   if (driver == NULL) {
220     driver = new MetadataDriver();
221     driver->set_force_corrupt(force_corrupt);
222     driver->set_force_init(force_init);
223     dout(4) << "Using metadata pool output" << dendl;
224   }
225
226   dout(4) << "connecting to RADOS..." << dendl;
227   r = rados.connect();
228   if (r < 0) {
229     std::cerr << "couldn't connect to cluster: " << cpp_strerror(r)
230               << std::endl;
231     return r;
232   }
233
234   r = driver->init(rados, metadata_pool_name, fsmap, fscid);
235   if (r < 0) {
236     return r;
237   }
238
239   if (command == "pg_files") {
240     auto pge = PgFiles(objecter, pg_files_pgs);
241     pge.init();
242     return pge.scan_path(pg_files_path);
243   }
244
245   // Initialize data_io for those commands that need it
246   if (command == "scan_inodes" ||
247       command == "scan_extents" ||
248       command == "cleanup") {
249     if (data_pool_name.empty()) {
250       std::cerr << "Data pool not specified" << std::endl;
251       usage();
252       return -EINVAL;
253     }
254
255     data_pool_id = rados.pool_lookup(data_pool_name.c_str());
256     if (data_pool_id < 0) {
257       std::cerr << "Data pool '" << data_pool_name << "' not found!" << std::endl;
258       return -ENOENT;
259     } else {
260       dout(4) << "data pool '" << data_pool_name
261         << "' has ID " << data_pool_id << dendl;
262     }
263
264     if (!fs->mds_map.is_data_pool(data_pool_id)) {
265       std::cerr << "Warning: pool '" << data_pool_name << "' is not a "
266         "CephFS data pool!" << std::endl;
267       if (!force_pool) {
268         std::cerr << "Use --force-pool to continue" << std::endl;
269         return -EINVAL;
270       }
271     }
272
273     dout(4) << "opening data pool '" << data_pool_name << "'" << dendl;
274     r = rados.ioctx_create(data_pool_name.c_str(), data_io);
275     if (r != 0) {
276       return r;
277     }
278   }
279
280   // Initialize metadata_io from MDSMap for scan_frags
281   if (command == "scan_frags" || command == "scan_links") {
282     const auto fs = fsmap->get_filesystem(fscid);
283     if (fs == nullptr) {
284       std::cerr << "Filesystem id " << fscid << " does not exist" << std::endl;
285       return -ENOENT;
286     }
287     int64_t const metadata_pool_id = fs->mds_map.get_metadata_pool();
288
289     dout(4) << "resolving metadata pool " << metadata_pool_id << dendl;
290     int r = rados.pool_reverse_lookup(metadata_pool_id, &metadata_pool_name);
291     if (r < 0) {
292       std::cerr << "Pool " << metadata_pool_id
293         << " identified in MDS map not found in RADOS!" << std::endl;
294       return r;
295     }
296
297     r = rados.ioctx_create(metadata_pool_name.c_str(), metadata_io);
298     if (r != 0) {
299       return r;
300     }
301   }
302
303   // Finally, dispatch command
304   if (command == "scan_inodes") {
305     return scan_inodes();
306   } else if (command == "scan_extents") {
307     return scan_extents();
308   } else if (command == "scan_frags") {
309     return scan_frags();
310   } else if (command == "scan_links") {
311     return scan_links();
312   } else if (command == "cleanup") {
313     return cleanup();
314   } else if (command == "init") {
315     return driver->init_roots(fs->mds_map.get_first_data_pool());
316   } else {
317     std::cerr << "Unknown command '" << command << "'" << std::endl;
318     return -EINVAL;
319   }
320 }
321
322 int MetadataDriver::inject_unlinked_inode(
323     inodeno_t inono, int mode, int64_t data_pool_id)
324 {
325   const object_t oid = InodeStore::get_object_name(inono, frag_t(), ".inode");
326
327   // Skip if exists
328   bool already_exists = false;
329   int r = root_exists(inono, &already_exists);
330   if (r) {
331     return r;
332   }
333   if (already_exists && !force_init) {
334     std::cerr << "Inode 0x" << std::hex << inono << std::dec << " already"
335                " exists, skipping create.  Use --force-init to overwrite"
336                " the existing object." << std::endl;
337     return 0;
338   }
339
340   // Compose
341   InodeStore inode;
342   inode.inode.ino = inono;
343   inode.inode.version = 1;
344   inode.inode.xattr_version = 1;
345   inode.inode.mode = 0500 | mode;
346   // Fake dirstat.nfiles to 1, so that the directory doesn't appear to be empty
347   // (we won't actually give the *correct* dirstat here though)
348   inode.inode.dirstat.nfiles = 1;
349
350   inode.inode.ctime =
351     inode.inode.mtime = ceph_clock_now();
352   inode.inode.nlink = 1;
353   inode.inode.truncate_size = -1ull;
354   inode.inode.truncate_seq = 1;
355   inode.inode.uid = g_conf->mds_root_ino_uid;
356   inode.inode.gid = g_conf->mds_root_ino_gid;
357
358   // Force layout to default: should we let users override this so that
359   // they don't have to mount the filesystem to correct it?
360   inode.inode.layout = file_layout_t::get_default();
361   inode.inode.layout.pool_id = data_pool_id;
362   inode.inode.dir_layout.dl_dir_hash = g_conf->mds_default_dir_hash;
363
364   // Assume that we will get our stats wrong, and that we may
365   // be ignoring dirfrags that exist
366   inode.damage_flags |= (DAMAGE_STATS | DAMAGE_RSTATS | DAMAGE_FRAGTREE);
367
368   // Serialize
369   bufferlist inode_bl;
370   ::encode(std::string(CEPH_FS_ONDISK_MAGIC), inode_bl);
371   inode.encode(inode_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
372
373   // Write
374   r = metadata_io.write_full(oid.name, inode_bl);
375   if (r != 0) {
376     derr << "Error writing '" << oid.name << "': " << cpp_strerror(r) << dendl;
377     return r;
378   }
379
380   return r;
381 }
382
383 int MetadataDriver::root_exists(inodeno_t ino, bool *result)
384 {
385   object_t oid = InodeStore::get_object_name(ino, frag_t(), ".inode");
386   uint64_t size;
387   time_t mtime;
388   int r = metadata_io.stat(oid.name, &size, &mtime);
389   if (r == -ENOENT) {
390     *result = false;
391     return 0;
392   } else if (r < 0) {
393     return r;
394   }
395
396   *result = true;
397   return 0;
398 }
399
400 int MetadataDriver::init_roots(int64_t data_pool_id)
401 {
402   int r = 0;
403   r = inject_unlinked_inode(MDS_INO_ROOT, S_IFDIR|0755, data_pool_id);
404   if (r != 0) {
405     return r;
406   }
407   r = inject_unlinked_inode(MDS_INO_MDSDIR(0), S_IFDIR, data_pool_id);
408   if (r != 0) {
409     return r;
410   }
411   bool created = false;
412   r = find_or_create_dirfrag(MDS_INO_MDSDIR(0), frag_t(), &created);
413   if (r != 0) {
414     return r;
415   }
416
417   return 0;
418 }
419
420 int MetadataDriver::check_roots(bool *result)
421 {
422   int r;
423   r = root_exists(MDS_INO_ROOT, result);
424   if (r != 0) {
425     return r;
426   }
427   if (!*result) {
428     return 0;
429   }
430
431   r = root_exists(MDS_INO_MDSDIR(0), result);
432   if (r != 0) {
433     return r;
434   }
435   if (!*result) {
436     return 0;
437   }
438
439   return 0;
440 }
441
442 /**
443  * Stages:
444  *
445  * SERIAL init
446  *  0. Create root inodes if don't exist
447  * PARALLEL scan_extents
448  *  1. Size and mtime recovery: scan ALL objects, and update 0th
449  *   objects with max size and max mtime seen.
450  * PARALLEL scan_inodes
451  *  2. Inode recovery: scan ONLY 0th objects, and inject metadata
452  *   into dirfrag OMAPs, creating blank dirfrags as needed.  No stats
453  *   or rstats at this stage.  Inodes without backtraces go into
454  *   lost+found
455  * TODO: SERIAL "recover stats"
456  *  3. Dirfrag statistics: depth first traverse into metadata tree,
457  *    rebuilding dir sizes.
458  * TODO PARALLEL "clean up"
459  *  4. Cleanup; go over all 0th objects (and dirfrags if we tagged
460  *   anything onto them) and remove any of the xattrs that we
461  *   used for accumulating.
462  */
463
464
465 int parse_oid(const std::string &oid, uint64_t *inode_no, uint64_t *obj_id)
466 {
467   if (oid.find(".") == std::string::npos || oid.find(".") == oid.size() - 1) {
468     return -EINVAL;
469   }
470
471   std::string err;
472   std::string inode_str = oid.substr(0, oid.find("."));
473   *inode_no = strict_strtoll(inode_str.c_str(), 16, &err);
474   if (!err.empty()) {
475     return -EINVAL;
476   }
477
478   std::string pos_string = oid.substr(oid.find(".") + 1);
479   *obj_id = strict_strtoll(pos_string.c_str(), 16, &err);
480   if (!err.empty()) {
481     return -EINVAL;
482   }
483
484   return 0;
485 }
486
487
488 int DataScan::scan_extents()
489 {
490   return forall_objects(data_io, false, [this](
491         std::string const &oid,
492         uint64_t obj_name_ino,
493         uint64_t obj_name_offset) -> int
494   {
495     // Read size
496     uint64_t size;
497     time_t mtime;
498     int r = data_io.stat(oid, &size, &mtime);
499     dout(10) << "handling object " << obj_name_ino
500              << "." << obj_name_offset << dendl;
501     if (r != 0) {
502       dout(4) << "Cannot stat '" << oid << "': skipping" << dendl;
503       return r;
504     }
505
506     // I need to keep track of
507     //  * The highest object ID seen
508     //  * The size of the highest object ID seen
509     //  * The largest object seen
510     //
511     //  Given those things, I can later infer the object chunking
512     //  size, the offset of the last object (chunk size * highest ID seen)
513     //  and the actual size (offset of last object + size of highest ID seen)
514     //
515     //  This logic doesn't take account of striping.
516     r = ClsCephFSClient::accumulate_inode_metadata(
517         data_io,
518         obj_name_ino,
519         obj_name_offset,
520         size,
521         mtime);
522     if (r < 0) {
523       derr << "Failed to accumulate metadata data from '"
524         << oid << "': " << cpp_strerror(r) << dendl;
525       return r;
526     }
527
528     return r;
529   });
530 }
531
532 int DataScan::probe_filter(librados::IoCtx &ioctx)
533 {
534   bufferlist filter_bl;
535   ClsCephFSClient::build_tag_filter("test", &filter_bl);
536   librados::ObjectCursor range_i;
537   librados::ObjectCursor range_end;
538
539   std::vector<librados::ObjectItem> tmp_result;
540   librados::ObjectCursor tmp_next;
541   int r = ioctx.object_list(ioctx.object_list_begin(), ioctx.object_list_end(),
542                             1, filter_bl, &tmp_result, &tmp_next);
543
544   return r >= 0;
545 }
546
547 int DataScan::forall_objects(
548     librados::IoCtx &ioctx,
549     bool untagged_only,
550     std::function<int(std::string, uint64_t, uint64_t)> handler
551     )
552 {
553   librados::ObjectCursor range_i;
554   librados::ObjectCursor range_end;
555   ioctx.object_list_slice(
556       ioctx.object_list_begin(),
557       ioctx.object_list_end(),
558       n,
559       m,
560       &range_i,
561       &range_end);
562
563
564   bufferlist filter_bl;
565
566   bool legacy_filtering = false;
567   if (untagged_only) {
568     // probe to deal with older OSDs that don't support
569     // the cephfs pgls filtering mode
570     legacy_filtering = !probe_filter(ioctx);
571     if (!legacy_filtering) {
572       ClsCephFSClient::build_tag_filter(filter_tag, &filter_bl);
573     }
574   }
575
576   int r = 0;
577   while(range_i < range_end) {
578     std::vector<librados::ObjectItem> result;
579     int r = ioctx.object_list(range_i, range_end, 1,
580                                 filter_bl, &result, &range_i);
581     if (r < 0) {
582       derr << "Unexpected error listing objects: " << cpp_strerror(r) << dendl;
583       return r;
584     }
585
586     for (const auto &i : result) {
587       const std::string &oid = i.oid;
588       uint64_t obj_name_ino = 0;
589       uint64_t obj_name_offset = 0;
590       r = parse_oid(oid, &obj_name_ino, &obj_name_offset);
591       if (r != 0) {
592         dout(4) << "Bad object name '" << oid << "', skipping" << dendl;
593         continue;
594       }
595
596       if (untagged_only && legacy_filtering) {
597         dout(20) << "Applying filter to " << oid << dendl;
598
599         // We are only interested in 0th objects during this phase: we touched
600         // the other objects during scan_extents
601         if (obj_name_offset != 0) {
602           dout(20) << "Non-zeroth object" << dendl;
603           continue;
604         }
605
606         bufferlist scrub_tag_bl;
607         int r = ioctx.getxattr(oid, "scrub_tag", scrub_tag_bl);
608         if (r >= 0) {
609           std::string read_tag;
610           bufferlist::iterator q = scrub_tag_bl.begin();
611           try {
612             ::decode(read_tag, q);
613             if (read_tag == filter_tag) {
614               dout(20) << "skipping " << oid << " because it has the filter_tag"
615                        << dendl;
616               continue;
617             }
618           } catch (const buffer::error &err) {
619           }
620           dout(20) << "read non-matching tag '" << read_tag << "'" << dendl;
621         } else {
622           dout(20) << "no tag read (" << r << ")" << dendl;
623         }
624
625       } else if (untagged_only) {
626         assert(obj_name_offset == 0);
627         dout(20) << "OSD matched oid " << oid << dendl;
628       }
629
630       int this_oid_r = handler(oid, obj_name_ino, obj_name_offset);
631       if (r == 0 && this_oid_r < 0) {
632         r = this_oid_r;
633       }
634     }
635   }
636
637   return r;
638 }
639
640 int DataScan::scan_inodes()
641 {
642   bool roots_present;
643   int r = driver->check_roots(&roots_present);
644   if (r != 0) {
645     derr << "Unexpected error checking roots: '"
646       << cpp_strerror(r) << "'" << dendl;
647     return r;
648   }
649
650   if (!roots_present) {
651     std::cerr << "Some or all system inodes are absent.  Run 'init' from "
652       "one node before running 'scan_inodes'" << std::endl;
653     return -EIO;
654   }
655
656   return forall_objects(data_io, true, [this](
657         std::string const &oid,
658         uint64_t obj_name_ino,
659         uint64_t obj_name_offset) -> int
660   {
661     int r = 0;
662
663     dout(10) << "handling object "
664              << std::hex << obj_name_ino << "." << obj_name_offset << std::dec
665              << dendl;
666
667     AccumulateResult accum_res;
668     inode_backtrace_t backtrace;
669     file_layout_t loaded_layout = file_layout_t::get_default();
670     r = ClsCephFSClient::fetch_inode_accumulate_result(
671         data_io, oid, &backtrace, &loaded_layout, &accum_res);
672
673     if (r == -EINVAL) {
674       dout(4) << "Accumulated metadata missing from '"
675               << oid << ", did you run scan_extents?" << dendl;
676       return r;
677     } else if (r < 0) {
678       dout(4) << "Unexpected error loading accumulated metadata from '"
679               << oid << "': " << cpp_strerror(r) << dendl;
680       // FIXME: this creates situation where if a client has a corrupt
681       // backtrace/layout, we will fail to inject it.  We should (optionally)
682       // proceed if the backtrace/layout is corrupt but we have valid
683       // accumulated metadata.
684       return r;
685     }
686
687     const time_t file_mtime = accum_res.max_mtime;
688     uint64_t file_size = 0;
689     bool have_backtrace = !(backtrace.ancestors.empty());
690
691     // This is the layout we will use for injection, populated either
692     // from loaded_layout or from best guesses
693     file_layout_t guessed_layout;
694     guessed_layout.pool_id = data_pool_id;
695
696     // Calculate file_size, guess the layout
697     if (accum_res.ceiling_obj_index > 0) {
698       uint32_t chunk_size = file_layout_t::get_default().object_size;
699       // When there are multiple objects, the largest object probably
700       // indicates the chunk size.  But not necessarily, because files
701       // can be sparse.  Only make this assumption if size seen
702       // is a power of two, as chunk sizes typically are.
703       if ((accum_res.max_obj_size & (accum_res.max_obj_size - 1)) == 0) {
704         chunk_size = accum_res.max_obj_size;
705       }
706
707       if (loaded_layout.pool_id == -1) {
708         // If no stashed layout was found, guess it
709         guessed_layout.object_size = chunk_size;
710         guessed_layout.stripe_unit = chunk_size;
711         guessed_layout.stripe_count = 1;
712       } else if (!loaded_layout.is_valid() ||
713           loaded_layout.object_size < accum_res.max_obj_size) {
714         // If the max size seen exceeds what the stashed layout claims, then
715         // disbelieve it.  Guess instead.  Same for invalid layouts on disk.
716         dout(4) << "bogus xattr layout on 0x" << std::hex << obj_name_ino
717                 << std::dec << ", ignoring in favour of best guess" << dendl;
718         guessed_layout.object_size = chunk_size;
719         guessed_layout.stripe_unit = chunk_size;
720         guessed_layout.stripe_count = 1;
721       } else {
722         // We have a stashed layout that we can't disprove, so apply it
723         guessed_layout = loaded_layout;
724         dout(20) << "loaded layout from xattr:"
725           << " os: " << guessed_layout.object_size
726           << " sc: " << guessed_layout.stripe_count
727           << " su: " << guessed_layout.stripe_unit
728           << dendl;
729         // User might have transplanted files from a pool with a different
730         // ID, so whatever the loaded_layout says, we'll force the injected
731         // layout to point to the pool we really read from
732         guessed_layout.pool_id = data_pool_id;
733       }
734
735       if (guessed_layout.stripe_count == 1) {
736         // Unstriped file: simple chunking
737         file_size = guessed_layout.object_size * accum_res.ceiling_obj_index
738                     + accum_res.ceiling_obj_size;
739       } else {
740         // Striped file: need to examine the last stripe_count objects
741         // in the file to determine the size.
742
743         // How many complete (i.e. not last stripe) objects?
744         uint64_t complete_objs = 0;
745         if (accum_res.ceiling_obj_index > guessed_layout.stripe_count - 1) {
746           complete_objs = (accum_res.ceiling_obj_index / guessed_layout.stripe_count) * guessed_layout.stripe_count;
747         } else {
748           complete_objs = 0;
749         }
750
751         // How many potentially-short objects (i.e. last stripe set) objects?
752         uint64_t partial_objs = accum_res.ceiling_obj_index + 1 - complete_objs;
753
754         dout(10) << "calculating striped size from complete objs: "
755                  << complete_objs << ", partial objs: " << partial_objs
756                  << dendl;
757
758         // Maximum amount of data that may be in the incomplete objects
759         uint64_t incomplete_size = 0;
760
761         // For each short object, calculate the max file size within it
762         // and accumulate the maximum
763         for (uint64_t i = complete_objs; i < complete_objs + partial_objs; ++i) {
764           char buf[60];
765           snprintf(buf, sizeof(buf), "%llx.%08llx",
766               (long long unsigned)obj_name_ino, (long long unsigned)i);
767
768           uint64_t osize(0);
769           time_t omtime(0);
770           r = data_io.stat(std::string(buf), &osize, &omtime);
771           if (r == 0) {
772             if (osize > 0) {
773               // Upper bound within this object
774               uint64_t upper_size = (osize - 1) / guessed_layout.stripe_unit
775                 * (guessed_layout.stripe_unit * guessed_layout.stripe_count)
776                 + (i % guessed_layout.stripe_count)
777                 * guessed_layout.stripe_unit + (osize - 1)
778                 % guessed_layout.stripe_unit + 1;
779               incomplete_size = MAX(incomplete_size, upper_size);
780             }
781           } else if (r == -ENOENT) {
782             // Absent object, treat as size 0 and ignore.
783           } else {
784             // Unexpected error, carry r to outer scope for handling.
785             break;
786           }
787         }
788         if (r != 0 && r != -ENOENT) {
789           derr << "Unexpected error checking size of ino 0x" << std::hex
790                << obj_name_ino << std::dec << ": " << cpp_strerror(r) << dendl;
791           return r;
792         }
793         file_size = complete_objs * guessed_layout.object_size
794                     + incomplete_size;
795       }
796     } else {
797       file_size = accum_res.ceiling_obj_size;
798       if (loaded_layout.pool_id < 0
799           || loaded_layout.object_size < accum_res.max_obj_size) {
800         // No layout loaded, or inconsistent layout, use default
801         guessed_layout = file_layout_t::get_default();
802         guessed_layout.pool_id = data_pool_id;
803       } else {
804         guessed_layout = loaded_layout;
805       }
806     }
807
808     // Santity checking backtrace ino against object name
809     if (have_backtrace && backtrace.ino != obj_name_ino) {
810       dout(4) << "Backtrace ino 0x" << std::hex << backtrace.ino
811         << " doesn't match object name ino 0x" << obj_name_ino
812         << std::dec << dendl;
813       have_backtrace = false;
814     }
815
816     InodeStore dentry;
817     build_file_dentry(obj_name_ino, file_size, file_mtime, guessed_layout, &dentry);
818
819     // Inject inode to the metadata pool
820     if (have_backtrace) {
821       inode_backpointer_t root_bp = *(backtrace.ancestors.rbegin());
822       if (MDS_INO_IS_MDSDIR(root_bp.dirino)) {
823         /* Special case for strays: even if we have a good backtrace,
824          * don't put it in the stray dir, because while that would technically
825          * give it linkage it would still be invisible to the user */
826         r = driver->inject_lost_and_found(obj_name_ino, dentry);
827         if (r < 0) {
828           dout(4) << "Error injecting 0x" << std::hex << backtrace.ino
829             << std::dec << " into lost+found: " << cpp_strerror(r) << dendl;
830           if (r == -EINVAL) {
831             dout(4) << "Use --force-corrupt to overwrite structures that "
832                        "appear to be corrupt" << dendl;
833           }
834         }
835       } else {
836         /* Happy case: we will inject a named dentry for this inode */
837         r = driver->inject_with_backtrace(backtrace, dentry);
838         if (r < 0) {
839           dout(4) << "Error injecting 0x" << std::hex << backtrace.ino
840             << std::dec << " with backtrace: " << cpp_strerror(r) << dendl;
841           if (r == -EINVAL) {
842             dout(4) << "Use --force-corrupt to overwrite structures that "
843                        "appear to be corrupt" << dendl;
844           }
845         }
846       }
847     } else {
848       /* Backtrace-less case: we will inject a lost+found dentry */
849       r = driver->inject_lost_and_found(
850           obj_name_ino, dentry);
851       if (r < 0) {
852         dout(4) << "Error injecting 0x" << std::hex << obj_name_ino
853           << std::dec << " into lost+found: " << cpp_strerror(r) << dendl;
854         if (r == -EINVAL) {
855           dout(4) << "Use --force-corrupt to overwrite structures that "
856                      "appear to be corrupt" << dendl;
857         }
858       }
859     }
860
861     return r;
862   });
863 }
864
865 int DataScan::cleanup()
866 {
867   // We are looking for only zeroth object
868   //
869   return forall_objects(data_io, true, [this](
870         std::string const &oid,
871         uint64_t obj_name_ino,
872         uint64_t obj_name_offset) -> int
873       {
874       int r = 0;
875       r = ClsCephFSClient::delete_inode_accumulate_result(data_io, oid);
876       if (r < 0) {
877       dout(4) << "Error deleting accumulated metadata from '"
878       << oid << "': " << cpp_strerror(r) << dendl;
879       }
880       return r;
881       });
882 }
883
884 bool DataScan::valid_ino(inodeno_t ino) const
885 {
886   return (ino >= inodeno_t((1ull << 40)))
887     || (MDS_INO_IS_STRAY(ino))
888     || (MDS_INO_IS_MDSDIR(ino))
889     || ino == MDS_INO_ROOT
890     || ino == MDS_INO_CEPH;
891 }
892
893 int DataScan::scan_links()
894 {
895   MetadataDriver *metadata_driver = dynamic_cast<MetadataDriver*>(driver);
896   if (!metadata_driver) {
897     derr << "Unexpected --output-dir option for scan_links" << dendl;
898     return -EINVAL;
899   }
900
901   interval_set<inodeno_t> used_inos;
902   map<inodeno_t, int> remote_links;
903
904   struct link_info_t {
905     inodeno_t dirino;
906     frag_t frag;
907     string name;
908     version_t version;
909     int nlink;
910     bool is_dir;
911     link_info_t() : version(0), nlink(0), is_dir(false) {}
912     link_info_t(inodeno_t di, frag_t df, const string& n, const inode_t i) :
913       dirino(di), frag(df), name(n),
914       version(i.version), nlink(i.nlink), is_dir(S_IFDIR & i.mode) {}
915     dirfrag_t dirfrag() const {
916       return dirfrag_t(dirino, frag);
917     }
918   };
919   map<inodeno_t, list<link_info_t> > dup_primaries;
920   map<inodeno_t, link_info_t> bad_nlink_inos;
921
922   map<dirfrag_t, set<string> > to_remove;
923
924   enum {
925     SCAN_INOS = 1,
926     CHECK_LINK,
927   };
928
929   for (int step = SCAN_INOS; step <= CHECK_LINK; step++) {
930     const librados::NObjectIterator it_end = metadata_io.nobjects_end();
931     for (auto it = metadata_io.nobjects_begin(); it != it_end; ++it) {
932       const std::string oid = it->get_oid();
933
934       uint64_t dir_ino = 0;
935       uint64_t frag_id = 0;
936       int r = parse_oid(oid, &dir_ino, &frag_id);
937       if (r == -EINVAL) {
938         dout(10) << "Not a dirfrag: '" << oid << "'" << dendl;
939         continue;
940       } else {
941         // parse_oid can only do 0 or -EINVAL
942         assert(r == 0);
943       }
944
945       if (!valid_ino(dir_ino)) {
946         dout(10) << "Not a dirfrag (invalid ino): '" << oid << "'" << dendl;
947         continue;
948       }
949
950       std::map<std::string, bufferlist> items;
951       r = metadata_io.omap_get_vals(oid, "", (uint64_t)-1, &items);
952       if (r < 0) {
953         derr << "Error getting omap from '" << oid << "': " << cpp_strerror(r) << dendl;
954         return r;
955       }
956
957       for (auto& p : items) {
958         bufferlist::iterator q = p.second.begin();
959         string dname;
960         snapid_t last;
961         dentry_key_t::decode_helper(p.first, dname, last);
962
963         if (last != CEPH_NOSNAP)
964           continue;
965
966         try {
967           snapid_t dnfirst;
968           ::decode(dnfirst, q);
969           char dentry_type;
970           ::decode(dentry_type, q);
971           if (dentry_type == 'I') {
972             InodeStore inode;
973             inode.decode_bare(q);
974             inodeno_t ino = inode.inode.ino;
975
976             if (step == SCAN_INOS) {
977               if (used_inos.contains(ino, 1)) {
978                 dup_primaries[ino].size();
979               } else {
980                 used_inos.insert(ino);
981               }
982             } else if (step == CHECK_LINK) {
983               auto q = dup_primaries.find(ino);
984               if (q != dup_primaries.end()) {
985                 q->second.push_back(link_info_t(dir_ino, frag_id, dname, inode.inode));
986               } else {
987                 int nlink = 0;
988                 auto r = remote_links.find(ino);
989                 if (r != remote_links.end())
990                   nlink = r->second;
991                 if (!MDS_INO_IS_STRAY(dir_ino))
992                   nlink++;
993                 if (inode.inode.nlink != nlink) {
994                   derr << "Bad nlink on " << ino << " expected " << nlink
995                        << " has " << inode.inode.nlink << dendl;
996                   bad_nlink_inos[ino] = link_info_t(dir_ino, frag_id, dname, inode.inode);
997                   bad_nlink_inos[ino].nlink = nlink;
998                 }
999               }
1000             }
1001           } else if (dentry_type == 'L') {
1002             inodeno_t ino;
1003             unsigned char d_type;
1004             ::decode(ino, q);
1005             ::decode(d_type, q);
1006
1007             if (step == SCAN_INOS) {
1008               remote_links[ino]++;
1009             } else if (step == CHECK_LINK) {
1010               if (!used_inos.contains(ino, 1)) {
1011                 derr << "Bad remote link dentry 0x" << std::hex << dir_ino
1012                      << std::dec << "/" << dname
1013                      << ", ino " << ino << " not found" << dendl;
1014                 std::string key;
1015                 dentry_key_t dn_key(CEPH_NOSNAP, dname.c_str());
1016                 dn_key.encode(key);
1017                 to_remove[dirfrag_t(dir_ino, frag_id)].insert(key);
1018               }
1019             }
1020           } else {
1021             derr << "Invalid tag char '" << dentry_type << "' dentry 0x" << dir_ino
1022                  << std::dec << "/" << dname << dendl;
1023             return -EINVAL;
1024           }
1025         } catch (const buffer::error &err) {
1026           derr << "Error decoding dentry 0x" << std::hex << dir_ino
1027                << std::dec << "/" << dname << dendl;
1028           return -EINVAL;
1029         }
1030       }
1031     }
1032   }
1033   used_inos.clear();
1034
1035   for (auto& p : dup_primaries) {
1036     link_info_t newest;
1037     for (auto& q : p.second) {
1038       if (q.version > newest.version) {
1039         newest = q;
1040       } else if (q.version == newest.version &&
1041                  !MDS_INO_IS_STRAY(q.dirino) &&
1042                  MDS_INO_IS_STRAY(newest.dirino)) {
1043         newest = q;
1044       }
1045     }
1046
1047     for (auto& q : p.second) {
1048       // in the middle of dir fragmentation?
1049       if (newest.dirino == q.dirino && newest.name == q.name)
1050         continue;
1051
1052       std::string key;
1053       dentry_key_t dn_key(CEPH_NOSNAP, q.name.c_str());
1054       dn_key.encode(key);
1055       to_remove[q.dirfrag()].insert(key);
1056       derr << "Remove duplicated ino 0x" << p.first << " from "
1057            << q.dirfrag() << "/" << q.name << dendl;
1058     }
1059
1060     int nlink = 0;
1061     auto q = remote_links.find(p.first);
1062     if (q != remote_links.end())
1063       nlink = q->second;
1064     if (!MDS_INO_IS_STRAY(newest.dirino))
1065       nlink++;
1066
1067     if (nlink != newest.nlink) {
1068       derr << "Bad nlink on " << p.first << " expected " << nlink
1069            << " has " << newest.nlink << dendl;
1070       bad_nlink_inos[p.first] = newest;
1071       bad_nlink_inos[p.first].nlink = nlink;
1072     }
1073   }
1074   dup_primaries.clear();
1075   remote_links.clear();
1076
1077   for (auto& p : to_remove) {
1078     object_t frag_oid = InodeStore::get_object_name(p.first.ino, p.first.frag, "");
1079
1080     int r = metadata_io.omap_rm_keys(frag_oid.name, p.second);
1081     if (r != 0) {
1082       derr << "Error removing duplicated dentries from " << p.first << dendl;
1083       return r;
1084     }
1085   }
1086   to_remove.clear();
1087
1088   for (auto &p : bad_nlink_inos) {
1089     InodeStore inode;
1090     int r = read_dentry(p.second.dirino, p.second.frag, p.second.name, &inode);
1091     if (r < 0) {
1092       derr << "Unexpected error reading dentry "
1093            << p.second.dirfrag() << "/" << p.second.name
1094            << ": " << cpp_strerror(r) << dendl;
1095       return r;
1096     }
1097
1098     if (inode.inode.ino != p.first || inode.inode.version != p.second.version)
1099       continue;
1100
1101     inode.inode.nlink = p.second.nlink;
1102     r = metadata_driver->inject_linkage(p.second.dirino, p.second.name, p.second.frag, inode);
1103     if (r < 0)
1104       return r;
1105   }
1106
1107   return 0;
1108 }
1109
1110 int DataScan::scan_frags()
1111 {
1112   bool roots_present;
1113   int r = driver->check_roots(&roots_present);
1114   if (r != 0) {
1115     derr << "Unexpected error checking roots: '"
1116       << cpp_strerror(r) << "'" << dendl;
1117     return r;
1118   }
1119
1120   if (!roots_present) {
1121     std::cerr << "Some or all system inodes are absent.  Run 'init' from "
1122       "one node before running 'scan_inodes'" << std::endl;
1123     return -EIO;
1124   }
1125
1126   return forall_objects(metadata_io, true, [this](
1127         std::string const &oid,
1128         uint64_t obj_name_ino,
1129         uint64_t obj_name_offset) -> int
1130   {
1131     int r = 0;
1132     r = parse_oid(oid, &obj_name_ino, &obj_name_offset);
1133     if (r != 0) {
1134       dout(4) << "Bad object name '" << oid << "', skipping" << dendl;
1135       return r;
1136     }
1137
1138     if (obj_name_ino < (1ULL << 40)) {
1139       // FIXME: we're skipping stray dirs here: if they're
1140       // orphaned then we should be resetting them some other
1141       // way
1142       dout(10) << "Skipping system ino " << obj_name_ino << dendl;
1143       return 0;
1144     }
1145
1146     AccumulateResult accum_res;
1147     inode_backtrace_t backtrace;
1148
1149     // Default to inherit layout (i.e. no explicit layout on dir) which is
1150     // expressed as a zeroed layout struct (see inode_t::has_layout)
1151     file_layout_t loaded_layout;
1152
1153     int parent_r = 0;
1154     bufferlist parent_bl;
1155     int layout_r = 0;
1156     bufferlist layout_bl;
1157     bufferlist op_bl;
1158
1159     librados::ObjectReadOperation op;
1160     op.getxattr("parent", &parent_bl, &parent_r);
1161     op.getxattr("layout", &layout_bl, &layout_r);
1162     r = metadata_io.operate(oid, &op, &op_bl);
1163     if (r != 0 && r != -ENODATA) {
1164       derr << "Unexpected error reading backtrace: " << cpp_strerror(parent_r) << dendl;
1165       return r;
1166     }
1167
1168     if (parent_r != -ENODATA) {
1169       try {
1170         bufferlist::iterator q = parent_bl.begin();
1171         backtrace.decode(q);
1172       } catch (buffer::error &e) {
1173         dout(4) << "Corrupt backtrace on '" << oid << "': " << e << dendl;
1174         if (!force_corrupt) {
1175           return -EINVAL;
1176         } else {
1177           // Treat backtrace as absent: we'll inject into lost+found
1178           backtrace = inode_backtrace_t();
1179         }
1180       }
1181     }
1182
1183     if (layout_r != -ENODATA) {
1184       try {
1185         bufferlist::iterator q = layout_bl.begin();
1186         ::decode(loaded_layout, q);
1187       } catch (buffer::error &e) {
1188         dout(4) << "Corrupt layout on '" << oid << "': " << e << dendl;
1189         if (!force_corrupt) {
1190           return -EINVAL;
1191         }
1192       }
1193     }
1194
1195     bool have_backtrace = !(backtrace.ancestors.empty());
1196
1197     // Santity checking backtrace ino against object name
1198     if (have_backtrace && backtrace.ino != obj_name_ino) {
1199       dout(4) << "Backtrace ino 0x" << std::hex << backtrace.ino
1200         << " doesn't match object name ino 0x" << obj_name_ino
1201         << std::dec << dendl;
1202       have_backtrace = false;
1203     }
1204
1205     uint64_t fnode_version = 0;
1206     fnode_t fnode;
1207     r = read_fnode(obj_name_ino, frag_t(), &fnode, &fnode_version);
1208     if (r == -EINVAL) {
1209       derr << "Corrupt fnode on " << oid << dendl;
1210       if (force_corrupt) {
1211         fnode.fragstat.mtime = 0;
1212         fnode.fragstat.nfiles = 1;
1213         fnode.fragstat.nsubdirs = 0;
1214         fnode.accounted_fragstat = fnode.fragstat;
1215       } else {
1216         return r;
1217       }
1218     }
1219
1220     InodeStore dentry;
1221     build_dir_dentry(obj_name_ino, fnode.accounted_fragstat,
1222                 loaded_layout, &dentry);
1223
1224     // Inject inode to the metadata pool
1225     if (have_backtrace) {
1226       inode_backpointer_t root_bp = *(backtrace.ancestors.rbegin());
1227       if (MDS_INO_IS_MDSDIR(root_bp.dirino)) {
1228         /* Special case for strays: even if we have a good backtrace,
1229          * don't put it in the stray dir, because while that would technically
1230          * give it linkage it would still be invisible to the user */
1231         r = driver->inject_lost_and_found(obj_name_ino, dentry);
1232         if (r < 0) {
1233           dout(4) << "Error injecting 0x" << std::hex << backtrace.ino
1234             << std::dec << " into lost+found: " << cpp_strerror(r) << dendl;
1235           if (r == -EINVAL) {
1236             dout(4) << "Use --force-corrupt to overwrite structures that "
1237                        "appear to be corrupt" << dendl;
1238           }
1239         }
1240       } else {
1241         /* Happy case: we will inject a named dentry for this inode */
1242         r = driver->inject_with_backtrace(backtrace, dentry);
1243         if (r < 0) {
1244           dout(4) << "Error injecting 0x" << std::hex << backtrace.ino
1245             << std::dec << " with backtrace: " << cpp_strerror(r) << dendl;
1246           if (r == -EINVAL) {
1247             dout(4) << "Use --force-corrupt to overwrite structures that "
1248                        "appear to be corrupt" << dendl;
1249           }
1250         }
1251       }
1252     } else {
1253       /* Backtrace-less case: we will inject a lost+found dentry */
1254       r = driver->inject_lost_and_found(
1255           obj_name_ino, dentry);
1256       if (r < 0) {
1257         dout(4) << "Error injecting 0x" << std::hex << obj_name_ino
1258           << std::dec << " into lost+found: " << cpp_strerror(r) << dendl;
1259         if (r == -EINVAL) {
1260           dout(4) << "Use --force-corrupt to overwrite structures that "
1261                      "appear to be corrupt" << dendl;
1262         }
1263       }
1264     }
1265
1266     return r;
1267   });
1268 }
1269
1270 int MetadataTool::read_fnode(
1271     inodeno_t ino, frag_t frag, fnode_t *fnode,
1272     uint64_t *last_version)
1273 {
1274   assert(fnode != NULL);
1275
1276   object_t frag_oid = InodeStore::get_object_name(ino, frag, "");
1277   bufferlist fnode_bl;
1278   int r = metadata_io.omap_get_header(frag_oid.name, &fnode_bl);
1279   *last_version = metadata_io.get_last_version();
1280   if (r < 0) {
1281     return r;
1282   }
1283
1284   bufferlist::iterator old_fnode_iter = fnode_bl.begin();
1285   try {
1286     (*fnode).decode(old_fnode_iter);
1287   } catch (const buffer::error &err) {
1288     return -EINVAL;
1289   }
1290
1291   return 0;
1292 }
1293
1294 int MetadataTool::read_dentry(inodeno_t parent_ino, frag_t frag,
1295                 const std::string &dname, InodeStore *inode)
1296 {
1297   assert(inode != NULL);
1298
1299
1300   std::string key;
1301   dentry_key_t dn_key(CEPH_NOSNAP, dname.c_str());
1302   dn_key.encode(key);
1303
1304   std::set<std::string> keys;
1305   keys.insert(key);
1306   std::map<std::string, bufferlist> vals;
1307   object_t frag_oid = InodeStore::get_object_name(parent_ino, frag, "");
1308   int r = metadata_io.omap_get_vals_by_keys(frag_oid.name, keys, &vals);  
1309   dout(20) << "oid=" << frag_oid.name
1310            << " dname=" << dname
1311            << " frag=" << frag
1312            << ", r=" << r << dendl;
1313   if (r < 0) {
1314     return r;
1315   }
1316
1317   if (vals.find(key) == vals.end()) {
1318     dout(20) << key << " not found in result" << dendl;
1319     return -ENOENT;
1320   }
1321
1322   try {
1323     bufferlist::iterator q = vals[key].begin();
1324     snapid_t dnfirst;
1325     ::decode(dnfirst, q);
1326     char dentry_type;
1327     ::decode(dentry_type, q);
1328     if (dentry_type == 'I') {
1329       inode->decode_bare(q);
1330       return 0;
1331     } else {
1332       dout(20) << "dentry type '" << dentry_type << "': cannot"
1333                   "read an inode out of that" << dendl;
1334       return -EINVAL;
1335     }
1336   } catch (const buffer::error &err) {
1337     dout(20) << "encoding error in dentry 0x" << std::hex << parent_ino
1338              << std::dec << "/" << dname << dendl;
1339     return -EINVAL;
1340   }
1341
1342   return 0;
1343 }
1344
1345 int MetadataDriver::inject_lost_and_found(
1346     inodeno_t ino, const InodeStore &dentry)
1347 {
1348   // Create lost+found if doesn't exist
1349   bool created = false;
1350   int r = find_or_create_dirfrag(CEPH_INO_ROOT, frag_t(), &created);
1351   if (r < 0) {
1352     return r;
1353   }
1354   InodeStore lf_ino;
1355   r = read_dentry(CEPH_INO_ROOT, frag_t(), "lost+found", &lf_ino);
1356   if (r == -ENOENT || r == -EINVAL) {
1357     if (r == -EINVAL && !force_corrupt) {
1358       return r;
1359     }
1360
1361     // To have a directory not specify a layout, give it zeros (see
1362     // inode_t::has_layout)
1363     file_layout_t inherit_layout;
1364
1365     // Construct LF inode
1366     frag_info_t fragstat;
1367     fragstat.nfiles = 1,
1368     build_dir_dentry(CEPH_INO_LOST_AND_FOUND, fragstat, inherit_layout, &lf_ino);
1369
1370     // Inject link to LF inode in the root dir
1371     r = inject_linkage(CEPH_INO_ROOT, "lost+found", frag_t(), lf_ino);
1372     if (r < 0) {
1373       return r;
1374     }
1375   } else {
1376     if (!(lf_ino.inode.mode & S_IFDIR)) {
1377       derr << "lost+found exists but is not a directory!" << dendl;
1378       // In this case we error out, and the user should do something about
1379       // this problem.
1380       return -EINVAL;
1381     }
1382   }
1383
1384   r = find_or_create_dirfrag(CEPH_INO_LOST_AND_FOUND, frag_t(), &created);
1385   if (r < 0) {
1386     return r;
1387   }
1388
1389   InodeStore recovered_ino;
1390
1391
1392   const std::string dname = lost_found_dname(ino);
1393
1394   // Write dentry into lost+found dirfrag
1395   return inject_linkage(lf_ino.inode.ino, dname, frag_t(), dentry);
1396 }
1397
1398
1399 int MetadataDriver::get_frag_of(
1400     inodeno_t dirino,
1401     const std::string &target_dname,
1402     frag_t *result_ft)
1403 {
1404   object_t root_frag_oid = InodeStore::get_object_name(dirino, frag_t(), "");
1405
1406   dout(20) << "dirino=" << dirino << " target_dname=" << target_dname << dendl;
1407
1408   // Find and load fragtree if existing dirfrag
1409   // ==========================================
1410   bool have_backtrace = false; 
1411   bufferlist parent_bl;
1412   int r = metadata_io.getxattr(root_frag_oid.name, "parent", parent_bl);
1413   if (r == -ENODATA) {
1414     dout(10) << "No backtrace on '" << root_frag_oid << "'" << dendl;
1415   } else if (r < 0) {
1416     dout(4) << "Unexpected error on '" << root_frag_oid << "': "
1417       << cpp_strerror(r) << dendl;
1418     return r;
1419   }
1420
1421   // Deserialize backtrace
1422   inode_backtrace_t backtrace;
1423   if (parent_bl.length()) {
1424     try {
1425       bufferlist::iterator q = parent_bl.begin();
1426       backtrace.decode(q);
1427       have_backtrace = true;
1428     } catch (buffer::error &e) {
1429       dout(4) << "Corrupt backtrace on '" << root_frag_oid << "': " << e << dendl;
1430     }
1431   }
1432
1433   if (!(have_backtrace && backtrace.ancestors.size())) {
1434     // Can't work out fragtree without a backtrace
1435     dout(4) << "No backtrace on '" << root_frag_oid
1436             << "': cannot determine fragtree" << dendl;
1437     return -ENOENT;
1438   }
1439
1440   // The parentage of dirino
1441   const inode_backpointer_t &bp = *(backtrace.ancestors.begin());
1442
1443   // The inode of dirino's parent
1444   const inodeno_t parent_ino = bp.dirino;
1445
1446   // The dname of dirino in its parent.
1447   const std::string &parent_dname = bp.dname;
1448
1449   dout(20) << "got backtrace parent " << parent_ino << "/"
1450            << parent_dname << dendl;
1451
1452   // The primary dentry for dirino
1453   InodeStore existing_dentry;
1454
1455   // See if we can find ourselves in dirfrag zero of the parent: this
1456   // is a fast path that avoids needing to go further up the tree
1457   // if the parent isn't fragmented (worst case we would have to
1458   // go all the way to the root)
1459   r = read_dentry(parent_ino, frag_t(), parent_dname, &existing_dentry);
1460   if (r >= 0) {
1461     // Great, fast path: return the fragtree from here
1462     if (existing_dentry.inode.ino != dirino) {
1463       dout(4) << "Unexpected inode in dentry! 0x" << std::hex
1464               << existing_dentry.inode.ino
1465               << " vs expected 0x" << dirino << std::dec << dendl;
1466       return -ENOENT;
1467     }
1468     dout(20) << "fast path, fragtree is "
1469              << existing_dentry.dirfragtree << dendl;
1470     *result_ft = existing_dentry.pick_dirfrag(target_dname);
1471     dout(20) << "frag is " << *result_ft << dendl;
1472     return 0;
1473   } else if (r != -ENOENT) {
1474     // Dentry not present in 0th frag, must read parent's fragtree
1475     frag_t parent_frag;
1476     r = get_frag_of(parent_ino, parent_dname, &parent_frag);
1477     if (r == 0) {
1478       // We have the parent fragtree, so try again to load our dentry
1479       r = read_dentry(parent_ino, parent_frag, parent_dname, &existing_dentry);
1480       if (r >= 0) {
1481         // Got it!
1482         *result_ft = existing_dentry.pick_dirfrag(target_dname);
1483         dout(20) << "resolved via parent, frag is " << *result_ft << dendl;
1484         return 0;
1485       } else {
1486         if (r == -EINVAL || r == -ENOENT) {
1487           return -ENOENT;  // dentry missing or corrupt, so frag is missing
1488         } else {
1489           return r;
1490         }
1491       }
1492     } else {
1493       // Couldn't resolve parent fragtree, so can't find ours.
1494       return r;
1495     }
1496   } else if (r == -EINVAL) {
1497     // Unreadable dentry, can't know the fragtree.
1498     return -ENOENT;
1499   } else {
1500     // Unexpected error, raise it
1501     return r;
1502   }
1503 }
1504
1505
1506 int MetadataDriver::inject_with_backtrace(
1507     const inode_backtrace_t &backtrace, const InodeStore &dentry)
1508     
1509 {
1510
1511   // On dirfrags
1512   // ===========
1513   // In order to insert something into a directory, we first (ideally)
1514   // need to know the fragtree for the directory.  Sometimes we can't
1515   // get that, in which case we just go ahead and insert it into
1516   // fragment zero for a good chance of that being the right thing
1517   // anyway (most moderate-sized dirs aren't fragmented!)
1518
1519   // On ancestry
1520   // ===========
1521   // My immediate ancestry should be correct, so if we can find that
1522   // directory's dirfrag then go inject it there.  This works well
1523   // in the case that this inode's dentry was somehow lost and we
1524   // are recreating it, because the rest of the hierarchy
1525   // will probably still exist.
1526   //
1527   // It's more of a "better than nothing" approach when rebuilding
1528   // a whole tree, as backtraces will in general not be up to date
1529   // beyond the first parent, if anything in the trace was ever
1530   // moved after the file was created.
1531
1532   // On inode numbers
1533   // ================
1534   // The backtrace tells us inodes for each of the parents.  If we are
1535   // creating those parent dirfrags, then there is a risk that somehow
1536   // the inode indicated here was also used for data (not a dirfrag) at
1537   // some stage.  That would be a zany situation, and we don't check
1538   // for it here, because to do so would require extra IOs for everything
1539   // we inject, and anyway wouldn't guarantee that the inode number
1540   // wasn't in use in some dentry elsewhere in the metadata tree that
1541   // just happened not to have any data objects.
1542
1543   // On multiple workers touching the same traces
1544   // ============================================
1545   // When creating linkage for a directory, *only* create it if we are
1546   // also creating the object.  That way, we might not manage to get the
1547   // *right* linkage for a directory, but at least we won't multiply link
1548   // it.  We assume that if a root dirfrag exists for a directory, then
1549   // it is linked somewhere (i.e. that the metadata pool is not already
1550   // inconsistent).
1551   //
1552   // Making sure *that* is true is someone else's job!  Probably someone
1553   // who is not going to run in parallel, so that they can self-consistently
1554   // look at versions and move things around as they go.
1555   // Note this isn't 100% safe: if we die immediately after creating dirfrag
1556   // object, next run will fail to create linkage for the dirfrag object
1557   // and leave it orphaned.
1558
1559   inodeno_t ino = backtrace.ino;
1560   dout(10) << "  inode: 0x" << std::hex << ino << std::dec << dendl;
1561   for (std::vector<inode_backpointer_t>::const_iterator i = backtrace.ancestors.begin();
1562       i != backtrace.ancestors.end(); ++i) {
1563     const inode_backpointer_t &backptr = *i;
1564     dout(10) << "  backptr: 0x" << std::hex << backptr.dirino << std::dec
1565       << "/" << backptr.dname << dendl;
1566
1567     // Examine root dirfrag for parent
1568     const inodeno_t parent_ino = backptr.dirino;
1569     const std::string dname = backptr.dname;
1570
1571     frag_t fragment;
1572     int r = get_frag_of(parent_ino, dname, &fragment);
1573     if (r == -ENOENT) {
1574       // Don't know fragment, fall back to assuming root
1575       dout(20) << "don't know fragment for 0x" << std::hex <<
1576         parent_ino << std::dec << "/" << dname << ", will insert to root"
1577         << dendl;
1578     }
1579
1580     // Find or create dirfrag
1581     // ======================
1582     bool created_dirfrag;
1583     r = find_or_create_dirfrag(parent_ino, fragment, &created_dirfrag);
1584     if (r < 0) {
1585       return r;
1586     }
1587
1588     // Check if dentry already exists
1589     // ==============================
1590     InodeStore existing_dentry;
1591     r = read_dentry(parent_ino, fragment, dname, &existing_dentry);
1592     bool write_dentry = false;
1593     if (r == -ENOENT || r == -EINVAL) {
1594       if (r == -EINVAL && !force_corrupt) {
1595         return r;
1596       }
1597       // Missing or corrupt dentry
1598       write_dentry = true;
1599     } else if (r < 0) {
1600       derr << "Unexpected error reading dentry 0x" << std::hex
1601         << parent_ino << std::dec << "/"
1602         << dname << ": " << cpp_strerror(r) << dendl;
1603       break;
1604     } else {
1605       // Dentry already present, does it link to me?
1606       if (existing_dentry.inode.ino == ino) {
1607         dout(20) << "Dentry 0x" << std::hex
1608           << parent_ino << std::dec << "/"
1609           << dname << " already exists and points to me" << dendl;
1610       } else {
1611         derr << "Dentry 0x" << std::hex
1612           << parent_ino << std::dec << "/"
1613           << dname << " already exists but points to 0x"
1614           << std::hex << existing_dentry.inode.ino << std::dec << dendl;
1615         // Fall back to lost+found!
1616         return inject_lost_and_found(backtrace.ino, dentry);
1617       }
1618     }
1619
1620     // Inject linkage
1621     // ==============
1622
1623     if (write_dentry) {
1624       if (i == backtrace.ancestors.begin()) {
1625         // This is the linkage for the file of interest
1626         dout(10) << "Linking inode 0x" << std::hex << ino
1627           << " at 0x" << parent_ino << "/" << dname << std::dec
1628           << " with size=" << dentry.inode.size << " bytes" << dendl;
1629
1630         r = inject_linkage(parent_ino, dname, fragment, dentry);
1631       } else {
1632         // This is the linkage for an ancestor directory
1633         InodeStore ancestor_dentry;
1634         ancestor_dentry.inode.mode = 0755 | S_IFDIR;
1635
1636         // Set nfiles to something non-zero, to fool any other code
1637         // that tries to ignore 'empty' directories.  This won't be
1638         // accurate, but it should avoid functional issues.
1639
1640         ancestor_dentry.inode.dirstat.nfiles = 1;
1641         ancestor_dentry.inode.dir_layout.dl_dir_hash =
1642                                                g_conf->mds_default_dir_hash;
1643
1644         ancestor_dentry.inode.nlink = 1;
1645         ancestor_dentry.inode.ino = ino;
1646         ancestor_dentry.inode.uid = g_conf->mds_root_ino_uid;
1647         ancestor_dentry.inode.gid = g_conf->mds_root_ino_gid;
1648         ancestor_dentry.inode.version = 1;
1649         ancestor_dentry.inode.backtrace_version = 1;
1650         r = inject_linkage(parent_ino, dname, fragment, ancestor_dentry);
1651       }
1652
1653       if (r < 0) {
1654         return r;
1655       }
1656     }
1657
1658     if (!created_dirfrag) {
1659       // If the parent dirfrag already existed, then stop traversing the
1660       // backtrace: assume that the other ancestors already exist too.  This
1661       // is an assumption rather than a truth, but it's a convenient way
1662       // to avoid the risk of creating multiply-linked directories while
1663       // injecting data.  If there are in fact missing ancestors, this
1664       // should be fixed up using a separate tool scanning the metadata
1665       // pool.
1666       break;
1667     } else {
1668       // Proceed up the backtrace, creating parents
1669       ino = parent_ino;
1670     }
1671   }
1672
1673   return 0;
1674 }
1675
1676 int MetadataDriver::find_or_create_dirfrag(
1677     inodeno_t ino,
1678     frag_t fragment,
1679     bool *created)
1680 {
1681   assert(created != NULL);
1682
1683   fnode_t existing_fnode;
1684   *created = false;
1685
1686   uint64_t read_version = 0;
1687   int r = read_fnode(ino, fragment, &existing_fnode, &read_version);
1688   dout(10) << "read_version = " << read_version << dendl;
1689
1690   if (r == -ENOENT || r == -EINVAL) {
1691     if (r == -EINVAL && !force_corrupt) {
1692       return r;
1693     }
1694
1695     // Missing or corrupt fnode, create afresh
1696     bufferlist fnode_bl;
1697     fnode_t blank_fnode;
1698     blank_fnode.version = 1;
1699     // mark it as non-empty
1700     blank_fnode.fragstat.nfiles = 1;
1701     blank_fnode.accounted_fragstat = blank_fnode.fragstat;
1702     blank_fnode.damage_flags |= (DAMAGE_STATS | DAMAGE_RSTATS);
1703     blank_fnode.encode(fnode_bl);
1704
1705
1706     librados::ObjectWriteOperation op;
1707
1708     if (read_version) {
1709       assert(r == -EINVAL);
1710       // Case A: We must assert that the version isn't changed since we saw the object
1711       // was unreadable, to avoid the possibility of two data-scan processes
1712       // both creating the frag.
1713       op.assert_version(read_version);
1714     } else {
1715       assert(r == -ENOENT);
1716       // Case B: The object didn't exist in read_fnode, so while creating it we must
1717       // use an exclusive create to correctly populate *creating with
1718       // whether we created it ourselves or someone beat us to it.
1719       op.create(true);
1720     }
1721
1722     object_t frag_oid = InodeStore::get_object_name(ino, fragment, "");
1723     op.omap_set_header(fnode_bl);
1724     r = metadata_io.operate(frag_oid.name, &op);
1725     if (r == -EOVERFLOW || r == -EEXIST) {
1726       // Someone else wrote it (see case A above)
1727       dout(10) << "Dirfrag creation race: 0x" << std::hex
1728         << ino << " " << fragment << std::dec << dendl;
1729       *created = false;
1730       return 0;
1731     } else if (r < 0) {
1732       // We were unable to create or write it, error out
1733       derr << "Failed to create dirfrag 0x" << std::hex
1734         << ino << std::dec << ": " << cpp_strerror(r) << dendl;
1735       return r;
1736     } else {
1737       // Success: the dirfrag object now exists with a value header
1738       dout(10) << "Created dirfrag: 0x" << std::hex
1739         << ino << std::dec << dendl;
1740       *created = true;
1741     }
1742   } else if (r < 0) {
1743     derr << "Unexpected error reading dirfrag 0x" << std::hex
1744       << ino << std::dec << " : " << cpp_strerror(r) << dendl;
1745     return r;
1746   } else {
1747     dout(20) << "Dirfrag already exists: 0x" << std::hex
1748       << ino << " " << fragment << std::dec << dendl;
1749   }
1750
1751   return 0;
1752 }
1753
1754 int MetadataDriver::inject_linkage(
1755     inodeno_t dir_ino, const std::string &dname,
1756     const frag_t fragment, const InodeStore &inode)
1757 {
1758   // We have no information about snapshots, so everything goes
1759   // in as CEPH_NOSNAP
1760   snapid_t snap = CEPH_NOSNAP;
1761
1762   object_t frag_oid = InodeStore::get_object_name(dir_ino, fragment, "");
1763
1764   std::string key;
1765   dentry_key_t dn_key(snap, dname.c_str());
1766   dn_key.encode(key);
1767
1768   bufferlist dentry_bl;
1769   ::encode(snap, dentry_bl);
1770   ::encode('I', dentry_bl);
1771   inode.encode_bare(dentry_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
1772
1773   // Write out
1774   std::map<std::string, bufferlist> vals;
1775   vals[key] = dentry_bl;
1776   int r = metadata_io.omap_set(frag_oid.name, vals);
1777   if (r != 0) {
1778     derr << "Error writing dentry 0x" << std::hex
1779       << dir_ino << std::dec << "/"
1780       << dname << ": " << cpp_strerror(r) << dendl;
1781     return r;
1782   } else {
1783     dout(20) << "Injected dentry 0x" << std::hex
1784       << dir_ino << "/" << dname << " pointing to 0x"
1785       << inode.inode.ino << std::dec << dendl;
1786     return 0;
1787   }
1788 }
1789
1790
1791 int MetadataDriver::init(
1792   librados::Rados &rados, std::string &metadata_pool_name, const FSMap *fsmap,
1793   fs_cluster_id_t fscid)
1794 {
1795   if (metadata_pool_name.empty()) {
1796     auto fs =  fsmap->get_filesystem(fscid);
1797     assert(fs != nullptr);
1798     int64_t const metadata_pool_id = fs->mds_map.get_metadata_pool();
1799
1800     dout(4) << "resolving metadata pool " << metadata_pool_id << dendl;
1801     int r = rados.pool_reverse_lookup(metadata_pool_id, &metadata_pool_name);
1802     if (r < 0) {
1803       derr << "Pool " << metadata_pool_id
1804            << " identified in MDS map not found in RADOS!" << dendl;
1805       return r;
1806     }
1807     dout(4) << "found metadata pool '" << metadata_pool_name << "'" << dendl;
1808   } else {
1809     dout(4) << "forcing metadata pool '" << metadata_pool_name << "'" << dendl;
1810   }
1811   return rados.ioctx_create(metadata_pool_name.c_str(), metadata_io);
1812 }
1813
1814 int LocalFileDriver::init(
1815   librados::Rados &rados, std::string &metadata_pool_name, const FSMap *fsmap,
1816   fs_cluster_id_t fscid)
1817 {
1818   return 0;
1819 }
1820
1821 int LocalFileDriver::inject_data(
1822     const std::string &file_path,
1823     uint64_t size,
1824     uint32_t chunk_size,
1825     inodeno_t ino)
1826 {
1827   // Scrape the file contents out of the data pool and into the
1828   // local filesystem
1829   std::fstream f;
1830   f.open(file_path.c_str(), std::fstream::out | std::fstream::binary);
1831
1832   for (uint64_t offset = 0; offset < size; offset += chunk_size) {
1833     bufferlist bl;
1834
1835     char buf[32];
1836     snprintf(buf, sizeof(buf),
1837         "%llx.%08llx",
1838         (unsigned long long)ino,
1839         (unsigned long long)(offset / chunk_size));
1840     std::string oid(buf);
1841
1842     int r = data_io.read(oid, bl, chunk_size, 0);
1843
1844     if (r <= 0 && r != -ENOENT) {
1845       derr << "error reading data object '" << oid << "': "
1846         << cpp_strerror(r) << dendl;
1847       f.close();
1848       return r;
1849     } else if (r >=0) {
1850       
1851       f.seekp(offset);
1852       bl.write_stream(f);
1853     }
1854   }
1855   f.close();
1856
1857   return 0;
1858 }
1859
1860
1861 int LocalFileDriver::inject_with_backtrace(
1862     const inode_backtrace_t &bt,
1863     const InodeStore &dentry)
1864 {
1865   std::string path_builder = path;
1866
1867   // Iterate through backtrace creating directory parents
1868   std::vector<inode_backpointer_t>::const_reverse_iterator i;
1869   for (i = bt.ancestors.rbegin();
1870       i != bt.ancestors.rend(); ++i) {
1871
1872     const inode_backpointer_t &backptr = *i;
1873     path_builder += "/";
1874     path_builder += backptr.dname;
1875
1876     // Last entry is the filename itself
1877     bool is_file = (i + 1 == bt.ancestors.rend());
1878     if (is_file) {
1879       // FIXME: inject_data won't cope with interesting (i.e. striped)
1880       // layouts (need a librados-compatible Filer to read these)
1881       inject_data(path_builder, dentry.inode.size,
1882                   dentry.inode.layout.object_size, bt.ino);
1883     } else {
1884       int r = mkdir(path_builder.c_str(), 0755);
1885       if (r != 0 && r != -EPERM) {
1886         derr << "error creating directory: '" << path_builder << "': "
1887           << cpp_strerror(r) << dendl;
1888         return r;
1889       }
1890     }
1891   }
1892
1893   return 0;
1894 }
1895
1896 int LocalFileDriver::inject_lost_and_found(
1897     inodeno_t ino,
1898     const InodeStore &dentry)
1899 {
1900   std::string lf_path = path + "/lost+found";
1901   int r = mkdir(lf_path.c_str(), 0755);
1902   if (r != 0 && r != -EPERM) {
1903     derr << "error creating directory: '" << lf_path << "': "
1904       << cpp_strerror(r) << dendl;
1905     return r;
1906   }
1907   
1908   std::string file_path = lf_path + "/" + lost_found_dname(ino);
1909   return inject_data(file_path, dentry.inode.size,
1910                      dentry.inode.layout.object_size, ino);
1911 }
1912
1913 int LocalFileDriver::init_roots(int64_t data_pool_id)
1914 {
1915   // Ensure that the path exists and is a directory
1916   bool exists;
1917   int r = check_roots(&exists);
1918   if (r != 0) {
1919     return r;
1920   }
1921
1922   if (exists) {
1923     return 0;
1924   } else {
1925     return ::mkdir(path.c_str(), 0755);
1926   }
1927 }
1928
1929 int LocalFileDriver::check_roots(bool *result)
1930 {
1931   // Check if the path exists and is a directory
1932   DIR *d = ::opendir(path.c_str());
1933   if (d == NULL) {
1934     *result = false;
1935   } else {
1936     int r = closedir(d);
1937     if (r != 0) {
1938       // Weird, but maybe possible with e.g. stale FD on NFS mount?
1939       *result = false;
1940     } else {
1941       *result = true;
1942     }
1943   }
1944
1945   return 0;
1946 }
1947
1948 void MetadataTool::build_file_dentry(
1949     inodeno_t ino, uint64_t file_size, time_t file_mtime,
1950     const file_layout_t &layout, InodeStore *out)
1951 {
1952   assert(out != NULL);
1953
1954   out->inode.mode = 0500 | S_IFREG;
1955   out->inode.size = file_size;
1956   out->inode.max_size_ever = file_size;
1957   out->inode.mtime.tv.tv_sec = file_mtime;
1958   out->inode.atime.tv.tv_sec = file_mtime;
1959   out->inode.ctime.tv.tv_sec = file_mtime;
1960
1961   out->inode.layout = layout;
1962
1963   out->inode.truncate_seq = 1;
1964   out->inode.truncate_size = -1ull;
1965
1966   out->inode.inline_data.version = CEPH_INLINE_NONE;
1967
1968   out->inode.nlink = 1;
1969   out->inode.ino = ino;
1970   out->inode.version = 1;
1971   out->inode.backtrace_version = 1;
1972   out->inode.uid = g_conf->mds_root_ino_uid;
1973   out->inode.gid = g_conf->mds_root_ino_gid;
1974 }
1975
1976 void MetadataTool::build_dir_dentry(
1977     inodeno_t ino, const frag_info_t &fragstat,
1978     const file_layout_t &layout, InodeStore *out)
1979 {
1980   assert(out != NULL);
1981
1982   out->inode.mode = 0755 | S_IFDIR;
1983   out->inode.dirstat = fragstat;
1984   out->inode.mtime.tv.tv_sec = fragstat.mtime;
1985   out->inode.atime.tv.tv_sec = fragstat.mtime;
1986   out->inode.ctime.tv.tv_sec = fragstat.mtime;
1987
1988   out->inode.layout = layout;
1989   out->inode.dir_layout.dl_dir_hash = g_conf->mds_default_dir_hash;
1990
1991   out->inode.truncate_seq = 1;
1992   out->inode.truncate_size = -1ull;
1993
1994   out->inode.inline_data.version = CEPH_INLINE_NONE;
1995
1996   out->inode.nlink = 1;
1997   out->inode.ino = ino;
1998   out->inode.version = 1;
1999   out->inode.backtrace_version = 1;
2000   out->inode.uid = g_conf->mds_root_ino_uid;
2001   out->inode.gid = g_conf->mds_root_ino_gid;
2002 }
2003