Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / ceph_objectstore_tool.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <boost/program_options/variables_map.hpp>
16 #include <boost/program_options/parsers.hpp>
17 #include <boost/scoped_ptr.hpp>
18 #include <boost/optional.hpp>
19
20 #include <stdlib.h>
21
22 #include "common/Formatter.h"
23 #include "common/errno.h"
24 #include "common/ceph_argparse.h"
25
26 #include "global/global_init.h"
27
28 #include "os/ObjectStore.h"
29 #include "os/filestore/FileJournal.h"
30 #include "os/filestore/FileStore.h"
31 #ifdef HAVE_LIBFUSE
32 #include "os/FuseStore.h"
33 #endif
34
35 #include "osd/PGLog.h"
36 #include "osd/OSD.h"
37 #include "osd/PG.h"
38
39 #include "json_spirit/json_spirit_value.h"
40 #include "json_spirit/json_spirit_reader.h"
41
42 #include "rebuild_mondb.h"
43 #include "ceph_objectstore_tool.h"
44 #include "include/compat.h"
45 #include "include/util.h"
46
47 namespace po = boost::program_options;
48 using namespace std;
49
50 #ifdef INTERNAL_TEST
51 CompatSet get_test_compat_set() {
52   CompatSet::FeatureSet ceph_osd_feature_compat;
53   CompatSet::FeatureSet ceph_osd_feature_ro_compat;
54   CompatSet::FeatureSet ceph_osd_feature_incompat;
55   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
56   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
57   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
58   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
59   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
60   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
61   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
62   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
63   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
64 #ifdef INTERNAL_TEST2
65   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
66   ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
67 #endif
68   return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
69                    ceph_osd_feature_incompat);
70 }
71 #endif
72
73 const ssize_t max_read = 1024 * 1024;
74 const int fd_none = INT_MIN;
75 bool outistty;
76 bool dry_run;
77
78 struct action_on_object_t {
79   virtual ~action_on_object_t() {}
80   virtual int call(ObjectStore *store, coll_t coll, ghobject_t &ghobj, object_info_t &oi) = 0;
81 };
82
83 int _action_on_all_objects_in_pg(ObjectStore *store, coll_t coll, action_on_object_t &action, bool debug)
84 {
85   unsigned LIST_AT_A_TIME = 100;
86   ghobject_t next;
87   while (!next.is_max()) {
88     vector<ghobject_t> list;
89     int r = store->collection_list(
90                                    coll,
91                                    next,
92                                    ghobject_t::get_max(),
93                                    LIST_AT_A_TIME,
94                                    &list,
95                                    &next);
96     if (r < 0) {
97       cerr << "Error listing collection: " << coll << ", "
98            << cpp_strerror(r) << std::endl;
99       return r;
100     }
101     for (vector<ghobject_t>::iterator obj = list.begin();
102          obj != list.end();
103          ++obj) {
104       if (obj->is_pgmeta())
105         continue;
106       object_info_t oi;
107       if (coll != coll_t::meta()) {
108         bufferlist attr;
109         r = store->getattr(coll, *obj, OI_ATTR, attr);
110         if (r < 0) {
111           cerr << "Error getting attr on : " << make_pair(coll, *obj) << ", "
112                << cpp_strerror(r) << std::endl;
113           continue;
114         }
115         bufferlist::iterator bp = attr.begin();
116         try {
117           ::decode(oi, bp);
118         } catch (...) {
119           r = -EINVAL;
120           cerr << "Error getting attr on : " << make_pair(coll, *obj) << ", "
121                << cpp_strerror(r) << std::endl;
122           continue;
123         }
124       }
125       r = action.call(store, coll, *obj, oi);
126       if (r < 0)
127         return r;
128     }
129   }
130   return 0;
131 }
132
133 int action_on_all_objects_in_pg(ObjectStore *store, string pgidstr, action_on_object_t &action, bool debug)
134 {
135   spg_t pgid;
136   // Scan collections in case this is an ec pool but no shard specified
137   unsigned scanned = 0;
138   int r = 0;
139   vector<coll_t> colls_to_check;
140   vector<coll_t> candidates;
141   r = store->list_collections(candidates);
142   if (r < 0) {
143     cerr << "Error listing collections: " << cpp_strerror(r) << std::endl;
144     return r;
145   }
146   pgid.parse(pgidstr.c_str());
147   for (vector<coll_t>::iterator i = candidates.begin();
148        i != candidates.end();
149        ++i) {
150     spg_t cand_pgid;
151     if (!i->is_pg(&cand_pgid))
152       continue;
153
154     // If an exact match or treat no shard as any shard
155     if (cand_pgid == pgid ||
156         (pgid.is_no_shard() && pgid.pgid == cand_pgid.pgid)) {
157       colls_to_check.push_back(*i);
158     }
159   }
160
161   if (debug)
162     cerr << colls_to_check.size() << " pgs to scan" << std::endl;
163   for (vector<coll_t>::iterator i = colls_to_check.begin();
164        i != colls_to_check.end();
165        ++i, ++scanned) {
166     if (debug)
167       cerr << "Scanning " << *i << ", " << scanned << "/"
168            << colls_to_check.size() << " completed" << std::endl;
169     r = _action_on_all_objects_in_pg(store, *i, action, debug);
170     if (r < 0)
171       break;
172   }
173   return r;
174 }
175
176 int action_on_all_objects_in_exact_pg(ObjectStore *store, coll_t coll, action_on_object_t &action, bool debug)
177 {
178   int r = _action_on_all_objects_in_pg(store, coll, action, debug);
179   return r;
180 }
181
182 int _action_on_all_objects(ObjectStore *store, action_on_object_t &action, bool debug)
183 {
184   unsigned scanned = 0;
185   int r = 0;
186   vector<coll_t> colls_to_check;
187   vector<coll_t> candidates;
188   r = store->list_collections(candidates);
189   if (r < 0) {
190     cerr << "Error listing collections: " << cpp_strerror(r) << std::endl;
191     return r;
192   }
193   for (vector<coll_t>::iterator i = candidates.begin();
194        i != candidates.end();
195        ++i) {
196     if (i->is_pg()) {
197       colls_to_check.push_back(*i);
198     }
199   }
200
201   if (debug)
202     cerr << colls_to_check.size() << " pgs to scan" << std::endl;
203   for (vector<coll_t>::iterator i = colls_to_check.begin();
204        i != colls_to_check.end();
205        ++i, ++scanned) {
206     if (debug)
207       cerr << "Scanning " << *i << ", " << scanned << "/"
208            << colls_to_check.size() << " completed" << std::endl;
209     r = _action_on_all_objects_in_pg(store, *i, action, debug);
210     if (r < 0)
211       return r;
212   }
213   return 0;
214 }
215
216 int action_on_all_objects(ObjectStore *store, action_on_object_t &action, bool debug)
217 {
218   int r = _action_on_all_objects(store, action, debug);
219   return r;
220 }
221
222 struct pgid_object_list {
223   list<pair<coll_t, ghobject_t> > _objects;
224
225   void insert(coll_t coll, ghobject_t &ghobj) {
226     _objects.push_back(make_pair(coll, ghobj));
227   }
228
229   void dump(Formatter *f, bool human_readable) const {
230     if (!human_readable)
231       f->open_array_section("pgid_objects");
232     for (list<pair<coll_t, ghobject_t> >::const_iterator i = _objects.begin();
233          i != _objects.end();
234          ++i) {
235       f->open_array_section("pgid_object");
236       spg_t pgid;
237       bool is_pg = i->first.is_pg(&pgid);
238       if (is_pg)
239         f->dump_string("pgid", stringify(pgid));
240       if (!is_pg || !human_readable)
241         f->dump_string("coll", i->first.to_str());
242       f->open_object_section("ghobject");
243       i->second.dump(f);
244       f->close_section();
245       f->close_section();
246       if (human_readable) {
247         f->flush(cout);
248         cout << std::endl;
249       }
250     }
251     if (!human_readable) {
252       f->close_section();
253       f->flush(cout);
254       cout << std::endl;
255     }
256   }
257 };
258
259 struct lookup_ghobject : public action_on_object_t {
260   pgid_object_list _objects;
261   const string _name;
262   const boost::optional<std::string> _namespace;
263   bool _need_snapset;
264
265   lookup_ghobject(const string& name, const boost::optional<std::string>& nspace, bool need_snapset = false) : _name(name),
266                   _namespace(nspace), _need_snapset(need_snapset) { }
267
268   int call(ObjectStore *store, coll_t coll, ghobject_t &ghobj, object_info_t &oi) override {
269     if (_need_snapset && !ghobj.hobj.has_snapset())
270       return 0;
271     if ((_name.length() == 0 || ghobj.hobj.oid.name == _name) &&
272         (!_namespace || ghobj.hobj.nspace == _namespace))
273       _objects.insert(coll, ghobj);
274     return 0;
275   }
276
277   int size() const {
278     return _objects._objects.size();
279   }
280
281   pair<coll_t, ghobject_t> pop() {
282      pair<coll_t, ghobject_t> front = _objects._objects.front();
283      _objects._objects.pop_front();
284      return front;
285   }
286
287   void dump(Formatter *f, bool human_readable) const {
288     _objects.dump(f, human_readable);
289   }
290 };
291
292 ghobject_t infos_oid = OSD::make_infos_oid();
293 ghobject_t log_oid;
294 ghobject_t biginfo_oid;
295
296 int file_fd = fd_none;
297 bool debug;
298 super_header sh;
299 uint64_t testalign;
300
301 static int get_fd_data(int fd, bufferlist &bl)
302 {
303   uint64_t total = 0;
304   do {
305     ssize_t bytes = bl.read_fd(fd, max_read);
306     if (bytes < 0) {
307       cerr << "read_fd error " << cpp_strerror(bytes) << std::endl;
308       return bytes;
309     }
310
311     if (bytes == 0)
312       break;
313
314     total += bytes;
315   } while(true);
316
317   assert(bl.length() == total);
318   return 0;
319 }
320
321 int get_log(ObjectStore *fs, __u8 struct_ver,
322             coll_t coll, spg_t pgid, const pg_info_t &info,
323             PGLog::IndexedLog &log, pg_missing_t &missing)
324 {
325   try {
326     ostringstream oss;
327     assert(struct_ver > 0);
328     PGLog::read_log_and_missing(fs, coll,
329                     struct_ver >= 8 ? coll : coll_t::meta(),
330                     struct_ver >= 8 ? pgid.make_pgmeta_oid() : log_oid,
331                     info, log, missing,
332                                 struct_ver < 9,
333                                 oss,
334                     g_ceph_context->_conf->osd_ignore_stale_divergent_priors);
335     if (debug && oss.str().size())
336       cerr << oss.str() << std::endl;
337   }
338   catch (const buffer::error &e) {
339     cerr << "read_log_and_missing threw exception error " << e.what() << std::endl;
340     return -EFAULT;
341   }
342   return 0;
343 }
344
345 void dump_log(Formatter *formatter, ostream &out, pg_log_t &log,
346               pg_missing_t &missing)
347 {
348   formatter->open_object_section("op_log");
349   formatter->open_object_section("pg_log_t");
350   log.dump(formatter);
351   formatter->close_section();
352   formatter->flush(out);
353   formatter->open_object_section("pg_missing_t");
354   missing.dump(formatter);
355   formatter->close_section();
356   formatter->flush(out);
357   formatter->open_object_section("map");
358   formatter->close_section();
359   formatter->close_section();
360   formatter->flush(out);
361 }
362
363 //Based on part of OSD::load_pgs()
364 int finish_remove_pgs(ObjectStore *store)
365 {
366   vector<coll_t> ls;
367   int r = store->list_collections(ls);
368   if (r < 0) {
369     cerr << "finish_remove_pgs: failed to list pgs: " << cpp_strerror(r)
370       << std::endl;
371     return r;
372   }
373
374   for (vector<coll_t>::iterator it = ls.begin();
375        it != ls.end();
376        ++it) {
377     spg_t pgid;
378
379     if (it->is_temp(&pgid) ||
380        (it->is_pg(&pgid) && PG::_has_removal_flag(store, pgid))) {
381       cout << "finish_remove_pgs " << *it << " removing " << pgid << std::endl;
382       OSD::recursive_remove_collection(g_ceph_context, store, pgid, *it);
383       continue;
384     }
385
386     //cout << "finish_remove_pgs ignoring unrecognized " << *it << std::endl;
387   }
388   return 0;
389 }
390
391 #pragma GCC diagnostic ignored "-Wpragmas"
392 #pragma GCC diagnostic push
393 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
394
395 int mark_pg_for_removal(ObjectStore *fs, spg_t pgid, ObjectStore::Transaction *t)
396 {
397   pg_info_t info(pgid);
398   coll_t coll(pgid);
399   ghobject_t pgmeta_oid(info.pgid.make_pgmeta_oid());
400
401   bufferlist bl;
402   epoch_t map_epoch = 0;
403   int r = PG::peek_map_epoch(fs, pgid, &map_epoch, &bl);
404   if (r < 0)
405     cerr << __func__ << " warning: peek_map_epoch reported error" << std::endl;
406   PastIntervals past_intervals;
407   __u8 struct_v;
408   r = PG::read_info(fs, pgid, coll, bl, info, past_intervals, struct_v);
409   if (r < 0) {
410     cerr << __func__ << " error on read_info " << cpp_strerror(r) << std::endl;
411     return r;
412   }
413   assert(struct_v >= 8);
414   // new omap key
415   cout << "setting '_remove' omap key" << std::endl;
416   map<string,bufferlist> values;
417   ::encode((char)1, values["_remove"]);
418   t->omap_setkeys(coll, pgmeta_oid, values);
419   return 0;
420 }
421
422 #pragma GCC diagnostic pop
423 #pragma GCC diagnostic warning "-Wpragmas"
424
425 int initiate_new_remove_pg(ObjectStore *store, spg_t r_pgid,
426                            ObjectStore::Sequencer &osr)
427 {
428   if (!dry_run)
429     finish_remove_pgs(store);
430   if (!store->collection_exists(coll_t(r_pgid)))
431     return -ENOENT;
432
433   cout << " marking collection for removal" << std::endl;
434   if (dry_run)
435     return 0;
436   ObjectStore::Transaction rmt;
437   int r = mark_pg_for_removal(store, r_pgid, &rmt);
438   if (r < 0) {
439     return r;
440   }
441   store->apply_transaction(&osr, std::move(rmt));
442   finish_remove_pgs(store);
443   return r;
444 }
445
446 int write_info(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info,
447     PastIntervals &past_intervals)
448 {
449   //Empty for this
450   coll_t coll(info.pgid);
451   ghobject_t pgmeta_oid(info.pgid.make_pgmeta_oid());
452   map<string,bufferlist> km;
453   pg_info_t last_written_info;
454   int ret = PG::_prepare_write_info(
455     g_ceph_context,
456     &km, epoch,
457     info,
458     last_written_info,
459     past_intervals,
460     true, true, false);
461   if (ret) cerr << "Failed to write info" << std::endl;
462   t.omap_setkeys(coll, pgmeta_oid, km);
463   return ret;
464 }
465
466 typedef map<eversion_t, hobject_t> divergent_priors_t;
467
468 int write_pg(ObjectStore::Transaction &t, epoch_t epoch, pg_info_t &info,
469              pg_log_t &log, PastIntervals &past_intervals,
470              divergent_priors_t &divergent,
471              pg_missing_t &missing)
472 {
473   int ret = write_info(t, epoch, info, past_intervals);
474   if (ret)
475     return ret;
476   coll_t coll(info.pgid);
477   map<string,bufferlist> km;
478
479   if (!divergent.empty()) {
480     assert(missing.get_items().empty());
481     PGLog::write_log_and_missing_wo_missing(
482       t, &km, log, coll, info.pgid.make_pgmeta_oid(), divergent, true);
483   } else {
484     pg_missing_tracker_t tmissing(missing);
485     bool rebuilt_missing_set_with_deletes = missing.may_include_deletes;
486     PGLog::write_log_and_missing(
487       t, &km, log, coll, info.pgid.make_pgmeta_oid(), tmissing, true,
488       &rebuilt_missing_set_with_deletes);
489   }
490   t.omap_setkeys(coll, info.pgid.make_pgmeta_oid(), km);
491   return 0;
492 }
493
494 const int OMAP_BATCH_SIZE = 25;
495 void get_omap_batch(ObjectMap::ObjectMapIterator &iter, map<string, bufferlist> &oset)
496 {
497   oset.clear();
498   for (int count = OMAP_BATCH_SIZE; count && iter->valid(); --count, iter->next()) {
499     oset.insert(pair<string, bufferlist>(iter->key(), iter->value()));
500   }
501 }
502
503 int ObjectStoreTool::export_file(ObjectStore *store, coll_t cid, ghobject_t &obj)
504 {
505   struct stat st;
506   mysize_t total;
507   footer ft;
508
509   int ret = store->stat(cid, obj, &st);
510   if (ret < 0)
511     return ret;
512
513   cerr << "Read " << obj << std::endl;
514
515   total = st.st_size;
516   if (debug)
517     cerr << "size=" << total << std::endl;
518
519   object_begin objb(obj);
520
521   {
522     bufferptr bp;
523     bufferlist bl;
524     ret = store->getattr(cid, obj, OI_ATTR, bp);
525     if (ret < 0) {
526       cerr << "getattr failure object_info " << ret << std::endl;
527       return ret;
528     }
529     bl.push_back(bp);
530     decode(objb.oi, bl);
531     if (debug)
532       cerr << "object_info: " << objb.oi << std::endl;
533   }
534
535   // NOTE: we include whiteouts, lost, etc.
536
537   ret = write_section(TYPE_OBJECT_BEGIN, objb, file_fd);
538   if (ret < 0)
539     return ret;
540
541   uint64_t offset = 0;
542   bufferlist rawdatabl;
543   while(total > 0) {
544     rawdatabl.clear();
545     mysize_t len = max_read;
546     if (len > total)
547       len = total;
548
549     ret = store->read(cid, obj, offset, len, rawdatabl);
550     if (ret < 0)
551       return ret;
552     if (ret == 0)
553       return -EINVAL;
554
555     data_section dblock(offset, len, rawdatabl);
556     if (debug)
557       cerr << "data section offset=" << offset << " len=" << len << std::endl;
558
559     total -= ret;
560     offset += ret;
561
562     ret = write_section(TYPE_DATA, dblock, file_fd);
563     if (ret) return ret;
564   }
565
566   //Handle attrs for this object
567   map<string,bufferptr> aset;
568   ret = store->getattrs(cid, obj, aset);
569   if (ret) return ret;
570   attr_section as(aset);
571   ret = write_section(TYPE_ATTRS, as, file_fd);
572   if (ret)
573     return ret;
574
575   if (debug) {
576     cerr << "attrs size " << aset.size() << std::endl;
577   }
578
579   //Handle omap information
580   bufferlist hdrbuf;
581   ret = store->omap_get_header(cid, obj, &hdrbuf, true);
582   if (ret < 0) {
583     cerr << "omap_get_header: " << cpp_strerror(ret) << std::endl;
584     return ret;
585   }
586
587   omap_hdr_section ohs(hdrbuf);
588   ret = write_section(TYPE_OMAP_HDR, ohs, file_fd);
589   if (ret)
590     return ret;
591
592   ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(cid, obj);
593   if (!iter) {
594     ret = -ENOENT;
595     cerr << "omap_get_iterator: " << cpp_strerror(ret) << std::endl;
596     return ret;
597   }
598   iter->seek_to_first();
599   int mapcount = 0;
600   map<string, bufferlist> out;
601   while(iter->valid()) {
602     get_omap_batch(iter, out);
603
604     if (out.empty()) break;
605
606     mapcount += out.size();
607     omap_section oms(out);
608     ret = write_section(TYPE_OMAP, oms, file_fd);
609     if (ret)
610       return ret;
611   }
612   if (debug)
613     cerr << "omap map size " << mapcount << std::endl;
614
615   ret = write_simple(TYPE_OBJECT_END, file_fd);
616   if (ret)
617     return ret;
618
619   return 0;
620 }
621
622 int ObjectStoreTool::export_files(ObjectStore *store, coll_t coll)
623 {
624   ghobject_t next;
625
626   while (!next.is_max()) {
627     vector<ghobject_t> objects;
628     int r = store->collection_list(coll, next, ghobject_t::get_max(), 300,
629       &objects, &next);
630     if (r < 0)
631       return r;
632     for (vector<ghobject_t>::iterator i = objects.begin();
633          i != objects.end();
634          ++i) {
635       assert(!i->hobj.is_meta());
636       if (i->is_pgmeta() || i->hobj.is_temp()) {
637         continue;
638       }
639       r = export_file(store, coll, *i);
640       if (r < 0)
641         return r;
642     }
643   }
644   return 0;
645 }
646
647 int set_inc_osdmap(ObjectStore *store, epoch_t e, bufferlist& bl, bool force,
648                    ObjectStore::Sequencer &osr) {
649   OSDMap::Incremental inc;
650   bufferlist::iterator it = bl.begin();
651   inc.decode(it);
652   if (e == 0) {
653     e = inc.epoch;
654   } else if (e != inc.epoch) {
655     cerr << "incremental.epoch mismatch: "
656          << inc.epoch << " != " << e << std::endl;
657     if (force) {
658       cerr << "But will continue anyway." << std::endl;
659     } else {
660       return -EINVAL;
661     }
662   }
663   const ghobject_t inc_oid = OSD::get_inc_osdmap_pobject_name(e);
664   if (!store->exists(coll_t::meta(), inc_oid)) {
665     cerr << "inc-osdmap (" << inc_oid << ") does not exist." << std::endl;
666     if (!force) {
667       return -ENOENT;
668     }
669     cout << "Creating a new epoch." << std::endl;
670   }
671   if (dry_run)
672     return 0;
673   ObjectStore::Transaction t;
674   t.write(coll_t::meta(), inc_oid, 0, bl.length(), bl);
675   t.truncate(coll_t::meta(), inc_oid, bl.length());
676   int ret = store->apply_transaction(&osr, std::move(t));
677   if (ret) {
678     cerr << "Failed to set inc-osdmap (" << inc_oid << "): " << ret << std::endl;
679   } else {
680     cout << "Wrote inc-osdmap." << inc.epoch << std::endl;
681   }
682   return ret;
683 }
684
685 int get_inc_osdmap(ObjectStore *store, epoch_t e, bufferlist& bl)
686 {
687   if (store->read(coll_t::meta(),
688                   OSD::get_inc_osdmap_pobject_name(e),
689                   0, 0, bl) < 0) {
690     return -ENOENT;
691   }
692   return 0;
693 }
694
695 int set_osdmap(ObjectStore *store, epoch_t e, bufferlist& bl, bool force,
696                ObjectStore::Sequencer &osr) {
697   OSDMap osdmap;
698   osdmap.decode(bl);
699   if (e == 0) {
700     e = osdmap.get_epoch();
701   } else if (e != osdmap.get_epoch()) {
702     cerr << "osdmap.epoch mismatch: "
703          << e << " != " << osdmap.get_epoch() << std::endl;
704     if (force) {
705       cerr << "But will continue anyway." << std::endl;
706     } else {
707       return -EINVAL;
708     }
709   }
710   const ghobject_t full_oid = OSD::get_osdmap_pobject_name(e);
711   if (!store->exists(coll_t::meta(), full_oid)) {
712     cerr << "osdmap (" << full_oid << ") does not exist." << std::endl;
713     if (!force) {
714       return -ENOENT;
715     }
716     cout << "Creating a new epoch." << std::endl;
717   }
718   if (dry_run)
719     return 0;
720   ObjectStore::Transaction t;
721   t.write(coll_t::meta(), full_oid, 0, bl.length(), bl);
722   t.truncate(coll_t::meta(), full_oid, bl.length());
723   int ret = store->apply_transaction(&osr, std::move(t));
724   if (ret) {
725     cerr << "Failed to set osdmap (" << full_oid << "): " << ret << std::endl;
726   } else {
727     cout << "Wrote osdmap." << osdmap.get_epoch() << std::endl;
728   }
729   return ret;
730 }
731
732 int get_osdmap(ObjectStore *store, epoch_t e, OSDMap &osdmap, bufferlist& bl)
733 {
734   bool found = store->read(
735       coll_t::meta(), OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
736   if (!found) {
737     cerr << "Can't find OSDMap for pg epoch " << e << std::endl;
738     return -ENOENT;
739   }
740   osdmap.decode(bl);
741   if (debug)
742     cerr << osdmap << std::endl;
743   return 0;
744 }
745
746 int add_osdmap(ObjectStore *store, metadata_section &ms)
747 {
748   return get_osdmap(store, ms.map_epoch, ms.osdmap, ms.osdmap_bl);
749 }
750
751 int ObjectStoreTool::do_export(ObjectStore *fs, coll_t coll, spg_t pgid,
752     pg_info_t &info, epoch_t map_epoch, __u8 struct_ver,
753     const OSDSuperblock& superblock,
754     PastIntervals &past_intervals)
755 {
756   PGLog::IndexedLog log;
757   pg_missing_t missing;
758
759   cerr << "Exporting " << pgid << std::endl;
760
761   int ret = get_log(fs, struct_ver, coll, pgid, info, log, missing);
762   if (ret > 0)
763       return ret;
764
765   if (debug) {
766     Formatter *formatter = Formatter::create("json-pretty");
767     assert(formatter);
768     dump_log(formatter, cerr, log, missing);
769     delete formatter;
770   }
771   write_super();
772
773   pg_begin pgb(pgid, superblock);
774   // Special case: If replicated pg don't require the importing OSD to have shard feature
775   if (pgid.is_no_shard()) {
776     pgb.superblock.compat_features.incompat.remove(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
777   }
778   ret = write_section(TYPE_PG_BEGIN, pgb, file_fd);
779   if (ret)
780     return ret;
781
782   // The metadata_section is now before files, so import can detect
783   // errors and abort without wasting time.
784   metadata_section ms(
785     struct_ver,
786     map_epoch,
787     info,
788     log,
789     past_intervals,
790     missing);
791   ret = add_osdmap(fs, ms);
792   if (ret)
793     return ret;
794   ret = write_section(TYPE_PG_METADATA, ms, file_fd);
795   if (ret)
796     return ret;
797
798   ret = export_files(fs, coll);
799   if (ret) {
800     cerr << "export_files error " << ret << std::endl;
801     return ret;
802   }
803
804   ret = write_simple(TYPE_PG_END, file_fd);
805   if (ret)
806     return ret;
807
808   return 0;
809 }
810
811 int get_data(ObjectStore *store, coll_t coll, ghobject_t hoid,
812     ObjectStore::Transaction *t, bufferlist &bl)
813 {
814   bufferlist::iterator ebliter = bl.begin();
815   data_section ds;
816   ds.decode(ebliter);
817
818   if (debug)
819     cerr << "\tdata: offset " << ds.offset << " len " << ds.len << std::endl;
820   t->write(coll, hoid, ds.offset, ds.len,  ds.databl);
821   return 0;
822 }
823
824 int get_attrs(
825   ObjectStore *store, coll_t coll, ghobject_t hoid,
826   ObjectStore::Transaction *t, bufferlist &bl,
827   OSDriver &driver, SnapMapper &snap_mapper)
828 {
829   bufferlist::iterator ebliter = bl.begin();
830   attr_section as;
831   as.decode(ebliter);
832
833   if (debug)
834     cerr << "\tattrs: len " << as.data.size() << std::endl;
835   t->setattrs(coll, hoid, as.data);
836
837   // This could have been handled in the caller if we didn't need to
838   // support exports that didn't include object_info_t in object_begin.
839   if (hoid.generation == ghobject_t::NO_GEN) {
840     if (hoid.hobj.snap < CEPH_MAXSNAP) {
841       map<string,bufferlist>::iterator mi = as.data.find(OI_ATTR);
842       if (mi != as.data.end()) {
843         object_info_t oi(mi->second);
844
845         if (debug)
846           cerr << "object_info " << oi << std::endl;
847
848         OSDriver::OSTransaction _t(driver.get_transaction(t));
849         set<snapid_t> oi_snaps(oi.legacy_snaps.begin(), oi.legacy_snaps.end());
850         if (!oi_snaps.empty()) {
851           if (debug)
852             cerr << "\tsetting legacy snaps " << oi_snaps << std::endl;
853           snap_mapper.add_oid(hoid.hobj, oi_snaps, &_t);
854         }
855       }
856     } else {
857       if (hoid.hobj.is_head()) {
858         map<string,bufferlist>::iterator mi = as.data.find(SS_ATTR);
859         if (mi != as.data.end()) {
860           SnapSet snapset;
861           auto p = mi->second.begin();
862           snapset.decode(p);
863           cout << "snapset " << snapset << std::endl;
864           if (!snapset.is_legacy()) {
865             for (auto& p : snapset.clone_snaps) {
866               ghobject_t clone = hoid;
867               clone.hobj.snap = p.first;
868               set<snapid_t> snaps(p.second.begin(), p.second.end());
869               if (!store->exists(coll, clone)) {
870                 // no clone, skip.  this is probably a cache pool.  this works
871                 // because we use a separate transaction per object and clones
872                 // come before head in the archive.
873                 if (debug)
874                   cerr << "\tskipping missing " << clone << " (snaps "
875                        << snaps << ")" << std::endl;
876                 continue;
877               }
878               if (debug)
879                 cerr << "\tsetting " << clone.hobj << " snaps " << snaps
880                      << std::endl;
881               OSDriver::OSTransaction _t(driver.get_transaction(t));
882               assert(!snaps.empty());
883               snap_mapper.add_oid(clone.hobj, snaps, &_t);
884             }
885           }
886         } else {
887           cerr << "missing SS_ATTR on " << hoid << std::endl;
888         }
889       }
890     }
891   }
892
893   return 0;
894 }
895
896 int get_omap_hdr(ObjectStore *store, coll_t coll, ghobject_t hoid,
897     ObjectStore::Transaction *t, bufferlist &bl)
898 {
899   bufferlist::iterator ebliter = bl.begin();
900   omap_hdr_section oh;
901   oh.decode(ebliter);
902
903   if (debug)
904     cerr << "\tomap header: " << string(oh.hdr.c_str(), oh.hdr.length())
905       << std::endl;
906   t->omap_setheader(coll, hoid, oh.hdr);
907   return 0;
908 }
909
910 int get_omap(ObjectStore *store, coll_t coll, ghobject_t hoid,
911     ObjectStore::Transaction *t, bufferlist &bl)
912 {
913   bufferlist::iterator ebliter = bl.begin();
914   omap_section os;
915   os.decode(ebliter);
916
917   if (debug)
918     cerr << "\tomap: size " << os.omap.size() << std::endl;
919   t->omap_setkeys(coll, hoid, os.omap);
920   return 0;
921 }
922
923 int ObjectStoreTool::get_object(ObjectStore *store, coll_t coll,
924                                 bufferlist &bl, OSDMap &curmap,
925                                 bool *skipped_objects,
926                                 ObjectStore::Sequencer &osr)
927 {
928   ObjectStore::Transaction tran;
929   ObjectStore::Transaction *t = &tran;
930   bufferlist::iterator ebliter = bl.begin();
931   object_begin ob;
932   ob.decode(ebliter);
933   OSDriver driver(
934     store,
935     coll_t(),
936     OSD::make_snapmapper_oid());
937   spg_t pg;
938   coll.is_pg_prefix(&pg);
939   SnapMapper mapper(g_ceph_context, &driver, 0, 0, 0, pg.shard);
940
941   if (ob.hoid.hobj.is_temp()) {
942     cerr << "ERROR: Export contains temporary object '" << ob.hoid << "'" << std::endl;
943     return -EFAULT;
944   }
945   assert(g_ceph_context);
946   if (ob.hoid.hobj.nspace != g_ceph_context->_conf->osd_hit_set_namespace) {
947     object_t oid = ob.hoid.hobj.oid;
948     object_locator_t loc(ob.hoid.hobj);
949     pg_t raw_pgid = curmap.object_locator_to_pg(oid, loc);
950     pg_t pgid = curmap.raw_pg_to_pg(raw_pgid);
951
952     spg_t coll_pgid;
953     if (coll.is_pg(&coll_pgid) == false) {
954       cerr << "INTERNAL ERROR: Bad collection during import" << std::endl;
955       return -EFAULT;
956     }
957     if (coll_pgid.shard != ob.hoid.shard_id) {
958       cerr << "INTERNAL ERROR: Importing shard " << coll_pgid.shard
959         << " but object shard is " << ob.hoid.shard_id << std::endl;
960       return -EFAULT;
961     }
962
963     if (coll_pgid.pgid != pgid) {
964       cerr << "Skipping object '" << ob.hoid << "' which belongs in pg " << pgid << std::endl;
965       *skipped_objects = true;
966       skip_object(bl);
967       return 0;
968     }
969   }
970
971   if (!dry_run)
972     t->touch(coll, ob.hoid);
973
974   cout << "Write " << ob.hoid << std::endl;
975
976   bufferlist ebl;
977   bool done = false;
978   while(!done) {
979     sectiontype_t type;
980     int ret = read_section(&type, &ebl);
981     if (ret)
982       return ret;
983
984     //cout << "\tdo_object: Section type " << hex << type << dec << std::endl;
985     //cout << "\t\tsection size " << ebl.length() << std::endl;
986     if (type >= END_OF_TYPES) {
987       cout << "Skipping unknown object section type" << std::endl;
988       continue;
989     }
990     switch(type) {
991     case TYPE_DATA:
992       if (dry_run) break;
993       ret = get_data(store, coll, ob.hoid, t, ebl);
994       if (ret) return ret;
995       break;
996     case TYPE_ATTRS:
997       if (dry_run) break;
998       ret = get_attrs(store, coll, ob.hoid, t, ebl, driver, mapper);
999       if (ret) return ret;
1000       break;
1001     case TYPE_OMAP_HDR:
1002       if (dry_run) break;
1003       ret = get_omap_hdr(store, coll, ob.hoid, t, ebl);
1004       if (ret) return ret;
1005       break;
1006     case TYPE_OMAP:
1007       if (dry_run) break;
1008       ret = get_omap(store, coll, ob.hoid, t, ebl);
1009       if (ret) return ret;
1010       break;
1011     case TYPE_OBJECT_END:
1012       done = true;
1013       break;
1014     default:
1015       cerr << "Unknown section type " << type << std::endl;
1016       return -EFAULT;
1017     }
1018   }
1019   if (!dry_run)
1020     store->apply_transaction(&osr, std::move(*t));
1021   return 0;
1022 }
1023
1024 int get_pg_metadata(ObjectStore *store, bufferlist &bl, metadata_section &ms,
1025     const OSDSuperblock& sb, OSDMap& curmap, spg_t pgid)
1026 {
1027   bufferlist::iterator ebliter = bl.begin();
1028   ms.decode(ebliter);
1029   spg_t old_pgid = ms.info.pgid;
1030   ms.info.pgid = pgid;
1031
1032 #if DIAGNOSTIC
1033   Formatter *formatter = new JSONFormatter(true);
1034   cout << "export pgid " << old_pgid << std::endl;
1035   cout << "struct_v " << (int)ms.struct_ver << std::endl;
1036   cout << "map epoch " << ms.map_epoch << std::endl;
1037
1038   formatter->open_object_section("importing OSDMap");
1039   ms.osdmap.dump(formatter);
1040   formatter->close_section();
1041   formatter->flush(cout);
1042   cout << std::endl;
1043
1044   cout << "osd current epoch " << sb.current_epoch << std::endl;
1045   formatter->open_object_section("current OSDMap");
1046   curmap.dump(formatter);
1047   formatter->close_section();
1048   formatter->flush(cout);
1049   cout << std::endl;
1050
1051   formatter->open_object_section("info");
1052   ms.info.dump(formatter);
1053   formatter->close_section();
1054   formatter->flush(cout);
1055   cout << std::endl;
1056
1057   formatter->open_object_section("log");
1058   ms.log.dump(formatter);
1059   formatter->close_section();
1060   formatter->flush(cout);
1061   cout << std::endl;
1062
1063   formatter->flush(cout);
1064   cout << std::endl;
1065 #endif
1066
1067   if (ms.osdmap.get_epoch() != 0 && ms.map_epoch != ms.osdmap.get_epoch()) {
1068     cerr << "FATAL: Invalid OSDMap epoch in export data" << std::endl;
1069     return -EFAULT;
1070   }
1071
1072   if (ms.map_epoch > sb.current_epoch) {
1073     cerr << "ERROR: Export PG's map_epoch " << ms.map_epoch << " > OSD's epoch " << sb.current_epoch << std::endl;
1074     cerr << "The OSD you are using is older than the exported PG" << std::endl;
1075     cerr << "Either use another OSD or join selected OSD to cluster to update it first" << std::endl;
1076     return -EINVAL;
1077   }
1078
1079   // Pool verified to exist for call to get_pg_num().
1080   unsigned new_pg_num = curmap.get_pg_num(pgid.pgid.pool());
1081
1082   if (pgid.pgid.ps() >= new_pg_num) {
1083     cerr << "Illegal pgid, the seed is larger than current pg_num" << std::endl;
1084     return -EINVAL;
1085   }
1086
1087   // Old exports didn't include OSDMap, see if we have a copy locally
1088   if (ms.osdmap.get_epoch() == 0) {
1089     OSDMap findmap;
1090     bufferlist findmap_bl;
1091     int ret = get_osdmap(store, ms.map_epoch, findmap, findmap_bl);
1092     if (ret == 0) {
1093       ms.osdmap.deepish_copy_from(findmap);
1094     } else {
1095       cerr << "WARNING: No OSDMap in old export,"
1096            " some objects may be ignored due to a split" << std::endl;
1097     }
1098   }
1099
1100   // Make sure old_pg_num is 0 in the unusual case that OSDMap not in export
1101   // nor can we find a local copy.
1102   unsigned old_pg_num = 0;
1103   if (ms.osdmap.get_epoch() != 0)
1104     old_pg_num = ms.osdmap.get_pg_num(pgid.pgid.pool());
1105
1106   if (debug) {
1107     cerr << "old_pg_num " << old_pg_num << std::endl;
1108     cerr << "new_pg_num " << new_pg_num << std::endl;
1109     cerr << ms.osdmap << std::endl;
1110     cerr << curmap << std::endl;
1111   }
1112
1113   // If we have managed to have a good OSDMap we can do these checks
1114   if (old_pg_num) {
1115     if (old_pgid.pgid.ps() >= old_pg_num) {
1116       cerr << "FATAL: pgid invalid for original map epoch" << std::endl;
1117       return -EFAULT;
1118     }
1119     if (pgid.pgid.ps() >= old_pg_num) {
1120       cout << "NOTICE: Post split pgid specified" << std::endl;
1121     } else {
1122       spg_t parent(pgid);
1123       if (parent.is_split(old_pg_num, new_pg_num, NULL)) {
1124             cerr << "WARNING: Split occurred, some objects may be ignored" << std::endl;
1125       }
1126     }
1127   }
1128
1129   if (debug) {
1130     cerr << "Import pgid " << ms.info.pgid << std::endl;
1131     cerr << "Clearing past_intervals " << ms.past_intervals << std::endl;
1132     cerr << "Zero same_interval_since " << ms.info.history.same_interval_since << std::endl;
1133   }
1134
1135   // Let osd recompute past_intervals and same_interval_since
1136   ms.past_intervals.clear();
1137   ms.info.history.same_interval_since =  0;
1138
1139   if (debug)
1140     cerr << "Changing pg epoch " << ms.map_epoch << " to " << sb.current_epoch << std::endl;
1141
1142   ms.map_epoch = sb.current_epoch;
1143
1144   return 0;
1145 }
1146
1147 // out: pg_log_t that only has entries that apply to import_pgid using curmap
1148 // reject: Entries rejected from "in" are in the reject.log.  Other fields not set.
1149 void filter_divergent_priors(spg_t import_pgid, const OSDMap &curmap,
1150   const string &hit_set_namespace, const divergent_priors_t &in,
1151   divergent_priors_t &out, divergent_priors_t &reject)
1152 {
1153   out.clear();
1154   reject.clear();
1155
1156   for (divergent_priors_t::const_iterator i = in.begin();
1157        i != in.end(); ++i) {
1158
1159     // Reject divergent priors for temporary objects
1160     if (i->second.is_temp()) {
1161       reject.insert(*i);
1162       continue;
1163     }
1164
1165     if (i->second.nspace != hit_set_namespace) {
1166       object_t oid = i->second.oid;
1167       object_locator_t loc(i->second);
1168       pg_t raw_pgid = curmap.object_locator_to_pg(oid, loc);
1169       pg_t pgid = curmap.raw_pg_to_pg(raw_pgid);
1170
1171       if (import_pgid.pgid == pgid) {
1172         out.insert(*i);
1173       } else {
1174         reject.insert(*i);
1175       }
1176     } else {
1177       out.insert(*i);
1178     }
1179   }
1180 }
1181
1182 int ObjectStoreTool::do_import(ObjectStore *store, OSDSuperblock& sb,
1183                                bool force, std::string pgidstr,
1184                                ObjectStore::Sequencer &osr)
1185 {
1186   bufferlist ebl;
1187   pg_info_t info;
1188   PGLog::IndexedLog log;
1189   bool skipped_objects = false;
1190
1191   if (!dry_run)
1192     finish_remove_pgs(store);
1193
1194   int ret = read_super();
1195   if (ret)
1196     return ret;
1197
1198   if (sh.magic != super_header::super_magic) {
1199     cerr << "Invalid magic number" << std::endl;
1200     return -EFAULT;
1201   }
1202
1203   if (sh.version > super_header::super_ver) {
1204     cerr << "Can't handle export format version=" << sh.version << std::endl;
1205     return -EINVAL;
1206   }
1207
1208   //First section must be TYPE_PG_BEGIN
1209   sectiontype_t type;
1210   ret = read_section(&type, &ebl);
1211   if (ret)
1212     return ret;
1213   if (type == TYPE_POOL_BEGIN) {
1214     cerr << "Pool exports cannot be imported into a PG" << std::endl;
1215     return -EINVAL;
1216   } else if (type != TYPE_PG_BEGIN) {
1217     cerr << "Invalid first section type " << type << std::endl;
1218     return -EFAULT;
1219   }
1220
1221   bufferlist::iterator ebliter = ebl.begin();
1222   pg_begin pgb;
1223   pgb.decode(ebliter);
1224   spg_t pgid = pgb.pgid;
1225   spg_t orig_pgid = pgid;
1226
1227   if (pgidstr.length()) {
1228     spg_t user_pgid;
1229
1230     bool ok = user_pgid.parse(pgidstr.c_str());
1231     // This succeeded in main() already
1232     assert(ok);
1233     if (pgid != user_pgid) {
1234       if (pgid.pool() != user_pgid.pool()) {
1235         cerr << "Can't specify a different pgid pool, must be " << pgid.pool() << std::endl;
1236         return -EINVAL;
1237       }
1238       if (pgid.is_no_shard() && !user_pgid.is_no_shard()) {
1239         cerr << "Can't specify a sharded pgid with a non-sharded export" << std::endl;
1240         return -EINVAL;
1241       }
1242       // Get shard from export information if not specified
1243       if (!pgid.is_no_shard() && user_pgid.is_no_shard()) {
1244         user_pgid.shard = pgid.shard;
1245       }
1246       if (pgid.shard != user_pgid.shard) {
1247         cerr << "Can't specify a different shard, must be " << pgid.shard << std::endl;
1248         return -EINVAL;
1249       }
1250       pgid = user_pgid;
1251     }
1252   }
1253
1254   if (!pgb.superblock.cluster_fsid.is_zero()
1255       && pgb.superblock.cluster_fsid != sb.cluster_fsid) {
1256     cerr << "Export came from different cluster with fsid "
1257          << pgb.superblock.cluster_fsid << std::endl;
1258     return -EINVAL;
1259   }
1260
1261   if (debug) {
1262     cerr << "Exported features: " << pgb.superblock.compat_features << std::endl;
1263   }
1264
1265   // Special case: Old export has SHARDS incompat feature on replicated pg, remove it
1266   if (pgid.is_no_shard())
1267     pgb.superblock.compat_features.incompat.remove(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
1268
1269   if (sb.compat_features.compare(pgb.superblock.compat_features) == -1) {
1270     CompatSet unsupported = sb.compat_features.unsupported(pgb.superblock.compat_features);
1271
1272     cerr << "Export has incompatible features set " << unsupported << std::endl;
1273
1274     // Let them import if they specify the --force option
1275     if (!force)
1276         return 11;  // Positive return means exit status
1277   }
1278
1279   // Don't import if pool no longer exists
1280   OSDMap curmap;
1281   bufferlist bl;
1282   ret = get_osdmap(store, sb.current_epoch, curmap, bl);
1283   if (ret) {
1284     cerr << "Can't find local OSDMap" << std::endl;
1285     return ret;
1286   }
1287   if (!curmap.have_pg_pool(pgid.pgid.m_pool)) {
1288     cerr << "Pool " << pgid.pgid.m_pool << " no longer exists" << std::endl;
1289     // Special exit code for this error, used by test code
1290     return 10;  // Positive return means exit status
1291   }
1292
1293   ghobject_t pgmeta_oid = pgid.make_pgmeta_oid();
1294   log_oid = OSD::make_pg_log_oid(pgid);
1295   biginfo_oid = OSD::make_pg_biginfo_oid(pgid);
1296
1297   //Check for PG already present.
1298   coll_t coll(pgid);
1299   if (store->collection_exists(coll)) {
1300     cerr << "pgid " << pgid << " already exists" << std::endl;
1301     return -EEXIST;
1302   }
1303
1304   if (!dry_run) {
1305     ObjectStore::Transaction t;
1306     PG::_create(t, pgid,
1307                 pgid.get_split_bits(curmap.get_pg_pool(pgid.pool())->get_pg_num()));
1308     PG::_init(t, pgid, NULL);
1309
1310     // mark this coll for removal until we're done
1311     map<string,bufferlist> values;
1312     ::encode((char)1, values["_remove"]);
1313     t.omap_setkeys(coll, pgid.make_pgmeta_oid(), values);
1314
1315     store->apply_transaction(&osr, std::move(t));
1316   }
1317
1318   cout << "Importing pgid " << pgid;
1319   if (orig_pgid != pgid) {
1320     cout << " exported as " << orig_pgid;
1321   }
1322   cout << std::endl;
1323
1324   bool done = false;
1325   bool found_metadata = false;
1326   metadata_section ms;
1327   while(!done) {
1328     ret = read_section(&type, &ebl);
1329     if (ret)
1330       return ret;
1331
1332     //cout << "do_import: Section type " << hex << type << dec << std::endl;
1333     if (type >= END_OF_TYPES) {
1334       cout << "Skipping unknown section type" << std::endl;
1335       continue;
1336     }
1337     switch(type) {
1338     case TYPE_OBJECT_BEGIN:
1339       ret = get_object(store, coll, ebl, curmap, &skipped_objects, osr);
1340       if (ret) return ret;
1341       break;
1342     case TYPE_PG_METADATA:
1343       ret = get_pg_metadata(store, ebl, ms, sb, curmap, pgid);
1344       if (ret) return ret;
1345       found_metadata = true;
1346       break;
1347     case TYPE_PG_END:
1348       done = true;
1349       break;
1350     default:
1351       cerr << "Unknown section type " << type << std::endl;
1352       return -EFAULT;
1353     }
1354   }
1355
1356   if (!found_metadata) {
1357     cerr << "Missing metadata section" << std::endl;
1358     return -EFAULT;
1359   }
1360
1361   ObjectStore::Transaction t;
1362   if (!dry_run) {
1363     pg_log_t newlog, reject;
1364     pg_log_t::filter_log(pgid, curmap, g_ceph_context->_conf->osd_hit_set_namespace,
1365       ms.log, newlog, reject);
1366     if (debug) {
1367       for (list<pg_log_entry_t>::iterator i = newlog.log.begin();
1368            i != newlog.log.end(); ++i)
1369         cerr << "Keeping log entry " << *i << std::endl;
1370       for (list<pg_log_entry_t>::iterator i = reject.log.begin();
1371            i != reject.log.end(); ++i)
1372         cerr << "Skipping log entry " << *i << std::endl;
1373     }
1374
1375     divergent_priors_t newdp, rejectdp;
1376     filter_divergent_priors(pgid, curmap, g_ceph_context->_conf->osd_hit_set_namespace,
1377       ms.divergent_priors, newdp, rejectdp);
1378     ms.divergent_priors = newdp;
1379     if (debug) {
1380       for (divergent_priors_t::iterator i = newdp.begin();
1381            i != newdp.end(); ++i)
1382         cerr << "Keeping divergent_prior " << *i << std::endl;
1383       for (divergent_priors_t::iterator i = rejectdp.begin();
1384            i != rejectdp.end(); ++i)
1385         cerr << "Skipping divergent_prior " << *i << std::endl;
1386     }
1387
1388     ms.missing.filter_objects([&](const hobject_t &obj) {
1389         if (obj.nspace == g_ceph_context->_conf->osd_hit_set_namespace)
1390           return false;
1391         assert(!obj.is_temp());
1392         object_t oid = obj.oid;
1393         object_locator_t loc(obj);
1394         pg_t raw_pgid = curmap.object_locator_to_pg(oid, loc);
1395         pg_t _pgid = curmap.raw_pg_to_pg(raw_pgid);
1396
1397         return pgid.pgid != _pgid;
1398       });
1399
1400
1401     if (debug) {
1402       pg_missing_t missing;
1403       Formatter *formatter = Formatter::create("json-pretty");
1404       dump_log(formatter, cerr, newlog, ms.missing);
1405       delete formatter;
1406     }
1407
1408     // Just like a split invalidate stats since the object count is changed
1409     if (skipped_objects)
1410       ms.info.stats.stats_invalid = true;
1411
1412     ret = write_pg(
1413       t,
1414       ms.map_epoch,
1415       ms.info,
1416       newlog,
1417       ms.past_intervals,
1418       ms.divergent_priors,
1419       ms.missing);
1420     if (ret) return ret;
1421   }
1422
1423   // done, clear removal flag
1424   if (debug)
1425     cerr << "done, clearing removal flag" << std::endl;
1426
1427   if (!dry_run) {
1428     set<string> remove;
1429     remove.insert("_remove");
1430     t.omap_rmkeys(coll, pgid.make_pgmeta_oid(), remove);
1431     store->apply_transaction(&osr, std::move(t));
1432   }
1433
1434   return 0;
1435 }
1436
1437 int do_list(ObjectStore *store, string pgidstr, string object, boost::optional<std::string> nspace,
1438             Formatter *formatter, bool debug, bool human_readable, bool head)
1439 {
1440   int r;
1441   lookup_ghobject lookup(object, nspace, head);
1442   if (pgidstr.length() > 0) {
1443     r = action_on_all_objects_in_pg(store, pgidstr, lookup, debug);
1444   } else {
1445     r = action_on_all_objects(store, lookup, debug);
1446   }
1447   if (r)
1448     return r;
1449   lookup.dump(formatter, human_readable);
1450   formatter->flush(cout);
1451   return 0;
1452 }
1453
1454 int do_meta(ObjectStore *store, string object, Formatter *formatter, bool debug, bool human_readable)
1455 {
1456   int r;
1457   boost::optional<std::string> nspace; // Not specified
1458   lookup_ghobject lookup(object, nspace);
1459   r = action_on_all_objects_in_exact_pg(store, coll_t::meta(), lookup, debug);
1460   if (r)
1461     return r;
1462   lookup.dump(formatter, human_readable);
1463   formatter->flush(cout);
1464   return 0;
1465 }
1466
1467 int remove_object(coll_t coll, ghobject_t &ghobj,
1468   SnapMapper &mapper,
1469   MapCacher::Transaction<std::string, bufferlist> *_t,
1470   ObjectStore::Transaction *t)
1471 {
1472   int r = mapper.remove_oid(ghobj.hobj, _t);
1473   if (r < 0 && r != -ENOENT) {
1474     cerr << "remove_oid returned " << cpp_strerror(r) << std::endl;
1475     return r;
1476   }
1477
1478   t->remove(coll, ghobj);
1479   return 0;
1480 }
1481
1482 int get_snapset(ObjectStore *store, coll_t coll, ghobject_t &ghobj, SnapSet &ss, bool silent);
1483
1484 int do_remove_object(ObjectStore *store, coll_t coll,
1485                      ghobject_t &ghobj, bool all, bool force,
1486                      ObjectStore::Sequencer &osr)
1487 {
1488   spg_t pg;
1489   coll.is_pg_prefix(&pg);
1490   OSDriver driver(
1491     store,
1492     coll_t(),
1493     OSD::make_snapmapper_oid());
1494   SnapMapper mapper(g_ceph_context, &driver, 0, 0, 0, pg.shard);
1495   struct stat st;
1496
1497   int r = store->stat(coll, ghobj, &st);
1498   if (r < 0) {
1499     cerr << "remove: " << cpp_strerror(r) << std::endl;
1500     return r;
1501   }
1502
1503   SnapSet ss;
1504   if (ghobj.hobj.has_snapset()) {
1505     r = get_snapset(store, coll, ghobj, ss, false);
1506     if (r < 0) {
1507       cerr << "Can't get snapset error " << cpp_strerror(r) << std::endl;
1508       return r;
1509     }
1510     if (!ss.snaps.empty() && !all) {
1511       if (force) {
1512         cout << "WARNING: only removing "
1513              << (ghobj.hobj.is_head() ? "head" : "snapdir")
1514              << " with snapshots present" << std::endl;
1515         ss.snaps.clear();
1516       } else {
1517         cerr << "Snapshots are present, use removeall to delete everything" << std::endl;
1518         return -EINVAL;
1519       }
1520     }
1521   }
1522
1523   ObjectStore::Transaction t;
1524   OSDriver::OSTransaction _t(driver.get_transaction(&t));
1525
1526   cout << "remove " << ghobj << std::endl;
1527
1528   if (!dry_run) {
1529     r = remove_object(coll, ghobj, mapper, &_t, &t);
1530     if (r < 0)
1531       return r;
1532   }
1533
1534   ghobject_t snapobj = ghobj;
1535   for (vector<snapid_t>::iterator i = ss.snaps.begin() ;
1536        i != ss.snaps.end() ; ++i) {
1537     snapobj.hobj.snap = *i;
1538     cout << "remove " << snapobj << std::endl;
1539     if (!dry_run) {
1540       r = remove_object(coll, snapobj, mapper, &_t, &t);
1541       if (r < 0)
1542         return r;
1543     }
1544   }
1545
1546   if (!dry_run)
1547     store->apply_transaction(&osr, std::move(t));
1548
1549   return 0;
1550 }
1551
1552 int do_list_attrs(ObjectStore *store, coll_t coll, ghobject_t &ghobj)
1553 {
1554   map<string,bufferptr> aset;
1555   int r = store->getattrs(coll, ghobj, aset);
1556   if (r < 0) {
1557     cerr << "getattrs: " << cpp_strerror(r) << std::endl;
1558     return r;
1559   }
1560
1561   for (map<string,bufferptr>::iterator i = aset.begin();i != aset.end(); ++i) {
1562     string key(i->first);
1563     if (outistty)
1564       key = cleanbin(key);
1565     cout << key << std::endl;
1566   }
1567   return 0;
1568 }
1569
1570 int do_list_omap(ObjectStore *store, coll_t coll, ghobject_t &ghobj)
1571 {
1572   ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(coll, ghobj);
1573   if (!iter) {
1574     cerr << "omap_get_iterator: " << cpp_strerror(ENOENT) << std::endl;
1575     return -ENOENT;
1576   }
1577   iter->seek_to_first();
1578   map<string, bufferlist> oset;
1579   while(iter->valid()) {
1580     get_omap_batch(iter, oset);
1581
1582     for (map<string,bufferlist>::iterator i = oset.begin();i != oset.end(); ++i) {
1583       string key(i->first);
1584       if (outistty)
1585         key = cleanbin(key);
1586       cout << key << std::endl;
1587     }
1588   }
1589   return 0;
1590 }
1591
1592 int do_get_bytes(ObjectStore *store, coll_t coll, ghobject_t &ghobj, int fd)
1593 {
1594   struct stat st;
1595   mysize_t total;
1596
1597   int ret = store->stat(coll, ghobj, &st);
1598   if (ret < 0) {
1599     cerr << "get-bytes: " << cpp_strerror(ret) << std::endl;
1600     return ret;
1601   }
1602
1603   total = st.st_size;
1604   if (debug)
1605     cerr << "size=" << total << std::endl;
1606
1607   uint64_t offset = 0;
1608   bufferlist rawdatabl;
1609   while(total > 0) {
1610     rawdatabl.clear();
1611     mysize_t len = max_read;
1612     if (len > total)
1613       len = total;
1614
1615     ret = store->read(coll, ghobj, offset, len, rawdatabl);
1616     if (ret < 0)
1617       return ret;
1618     if (ret == 0)
1619       return -EINVAL;
1620
1621     if (debug)
1622       cerr << "data section offset=" << offset << " len=" << len << std::endl;
1623
1624     total -= ret;
1625     offset += ret;
1626
1627     ret = write(fd, rawdatabl.c_str(), ret);
1628     if (ret == -1) {
1629       perror("write");
1630       return -errno;
1631     }
1632   }
1633
1634   return 0;
1635 }
1636
1637 int do_set_bytes(ObjectStore *store, coll_t coll,
1638                  ghobject_t &ghobj, int fd,
1639                  ObjectStore::Sequencer &osr)
1640 {
1641   ObjectStore::Transaction tran;
1642   ObjectStore::Transaction *t = &tran;
1643
1644   if (debug)
1645     cerr << "Write " << ghobj << std::endl;
1646
1647   if (!dry_run) {
1648     t->touch(coll, ghobj);
1649     t->truncate(coll, ghobj, 0);
1650   }
1651
1652   uint64_t offset = 0;
1653   bufferlist rawdatabl;
1654   do {
1655     rawdatabl.clear();
1656     ssize_t bytes = rawdatabl.read_fd(fd, max_read);
1657     if (bytes < 0) {
1658       cerr << "read_fd error " << cpp_strerror(bytes) << std::endl;
1659       return bytes;
1660     }
1661
1662     if (bytes == 0)
1663       break;
1664
1665     if (debug)
1666       cerr << "\tdata: offset " << offset << " bytes " << bytes << std::endl;
1667     if (!dry_run)
1668       t->write(coll, ghobj, offset, bytes,  rawdatabl);
1669
1670     offset += bytes;
1671     // XXX: Should we apply_transaction() every once in a while for very large files
1672   } while(true);
1673
1674   if (!dry_run)
1675     store->apply_transaction(&osr, std::move(*t));
1676   return 0;
1677 }
1678
1679 int do_get_attr(ObjectStore *store, coll_t coll, ghobject_t &ghobj, string key)
1680 {
1681   bufferptr bp;
1682
1683   int r = store->getattr(coll, ghobj, key.c_str(), bp);
1684   if (r < 0) {
1685     cerr << "getattr: " << cpp_strerror(r) << std::endl;
1686     return r;
1687   }
1688
1689   string value(bp.c_str(), bp.length());
1690   if (outistty) {
1691     value = cleanbin(value);
1692     value.push_back('\n');
1693   }
1694   cout << value;
1695
1696   return 0;
1697 }
1698
1699 int do_set_attr(ObjectStore *store, coll_t coll,
1700                 ghobject_t &ghobj, string key, int fd,
1701                 ObjectStore::Sequencer &osr)
1702 {
1703   ObjectStore::Transaction tran;
1704   ObjectStore::Transaction *t = &tran;
1705   bufferlist bl;
1706
1707   if (debug)
1708     cerr << "Setattr " << ghobj << std::endl;
1709
1710   int ret = get_fd_data(fd, bl);
1711   if (ret < 0)
1712     return ret;
1713
1714   if (dry_run)
1715     return 0;
1716
1717   t->touch(coll, ghobj);
1718
1719   t->setattr(coll, ghobj, key,  bl);
1720
1721   store->apply_transaction(&osr, std::move(*t));
1722   return 0;
1723 }
1724
1725 int do_rm_attr(ObjectStore *store, coll_t coll,
1726                ghobject_t &ghobj, string key,
1727                ObjectStore::Sequencer &osr)
1728 {
1729   ObjectStore::Transaction tran;
1730   ObjectStore::Transaction *t = &tran;
1731
1732   if (debug)
1733     cerr << "Rmattr " << ghobj << std::endl;
1734
1735   if (dry_run)
1736     return 0;
1737
1738   t->rmattr(coll, ghobj, key);
1739
1740   store->apply_transaction(&osr, std::move(*t));
1741   return 0;
1742 }
1743
1744 int do_get_omap(ObjectStore *store, coll_t coll, ghobject_t &ghobj, string key)
1745 {
1746   set<string> keys;
1747   map<string, bufferlist> out;
1748
1749   keys.insert(key);
1750
1751   int r = store->omap_get_values(coll, ghobj, keys, &out);
1752   if (r < 0) {
1753     cerr << "omap_get_values: " << cpp_strerror(r) << std::endl;
1754     return r;
1755   }
1756
1757   if (out.empty()) {
1758     cerr << "Key not found" << std::endl;
1759     return -ENOENT;
1760   }
1761
1762   assert(out.size() == 1);
1763
1764   bufferlist bl = out.begin()->second;
1765   string value(bl.c_str(), bl.length());
1766   if (outistty) {
1767     value = cleanbin(value);
1768     value.push_back('\n');
1769   }
1770   cout << value;
1771
1772   return 0;
1773 }
1774
1775 int do_set_omap(ObjectStore *store, coll_t coll,
1776                 ghobject_t &ghobj, string key, int fd,
1777                 ObjectStore::Sequencer &osr)
1778 {
1779   ObjectStore::Transaction tran;
1780   ObjectStore::Transaction *t = &tran;
1781   map<string, bufferlist> attrset;
1782   bufferlist valbl;
1783
1784   if (debug)
1785     cerr << "Set_omap " << ghobj << std::endl;
1786
1787   int ret = get_fd_data(fd, valbl);
1788   if (ret < 0)
1789     return ret;
1790
1791   attrset.insert(pair<string, bufferlist>(key, valbl));
1792
1793   if (dry_run)
1794     return 0;
1795
1796   t->touch(coll, ghobj);
1797
1798   t->omap_setkeys(coll, ghobj, attrset);
1799
1800   store->apply_transaction(&osr, std::move(*t));
1801   return 0;
1802 }
1803
1804 int do_rm_omap(ObjectStore *store, coll_t coll,
1805                ghobject_t &ghobj, string key,
1806                ObjectStore::Sequencer &osr)
1807 {
1808   ObjectStore::Transaction tran;
1809   ObjectStore::Transaction *t = &tran;
1810   set<string> keys;
1811
1812   keys.insert(key);
1813
1814   if (debug)
1815     cerr << "Rm_omap " << ghobj << std::endl;
1816
1817   if (dry_run)
1818     return 0;
1819
1820   t->omap_rmkeys(coll, ghobj, keys);
1821
1822   store->apply_transaction(&osr, std::move(*t));
1823   return 0;
1824 }
1825
1826 int do_get_omaphdr(ObjectStore *store, coll_t coll, ghobject_t &ghobj)
1827 {
1828   bufferlist hdrbl;
1829
1830   int r = store->omap_get_header(coll, ghobj, &hdrbl, true);
1831   if (r < 0) {
1832     cerr << "omap_get_header: " << cpp_strerror(r) << std::endl;
1833     return r;
1834   }
1835
1836   string header(hdrbl.c_str(), hdrbl.length());
1837   if (outistty) {
1838     header = cleanbin(header);
1839     header.push_back('\n');
1840   }
1841   cout << header;
1842
1843   return 0;
1844 }
1845
1846 int do_set_omaphdr(ObjectStore *store, coll_t coll,
1847                    ghobject_t &ghobj, int fd,
1848                    ObjectStore::Sequencer &osr)
1849 {
1850   ObjectStore::Transaction tran;
1851   ObjectStore::Transaction *t = &tran;
1852   bufferlist hdrbl;
1853
1854   if (debug)
1855     cerr << "Omap_setheader " << ghobj << std::endl;
1856
1857   int ret = get_fd_data(fd, hdrbl);
1858   if (ret)
1859     return ret;
1860
1861   if (dry_run)
1862     return 0;
1863
1864   t->touch(coll, ghobj);
1865
1866   t->omap_setheader(coll, ghobj, hdrbl);
1867
1868   store->apply_transaction(&osr, std::move(*t));
1869   return 0;
1870 }
1871
1872 struct do_fix_lost : public action_on_object_t {
1873   ObjectStore::Sequencer *osr;
1874
1875   explicit do_fix_lost(ObjectStore::Sequencer *_osr) : osr(_osr) {}
1876
1877   int call(ObjectStore *store, coll_t coll,
1878                    ghobject_t &ghobj, object_info_t &oi) override {
1879     if (oi.is_lost()) {
1880       cout << coll << "/" << ghobj << " is lost";
1881       if (!dry_run)
1882         cout << ", fixing";
1883       cout << std::endl;
1884       if (dry_run)
1885         return 0;
1886       oi.clear_flag(object_info_t::FLAG_LOST);
1887       bufferlist bl;
1888       ::encode(oi, bl, -1);  /* fixme: using full features */
1889       ObjectStore::Transaction t;
1890       t.setattr(coll, ghobj, OI_ATTR, bl);
1891       int r = store->apply_transaction(osr, std::move(t));
1892       if (r < 0) {
1893         cerr << "Error getting fixing attr on : " << make_pair(coll, ghobj)
1894              << ", "
1895              << cpp_strerror(r) << std::endl;
1896         return r;
1897       }
1898     }
1899     return 0;
1900   }
1901 };
1902
1903 int get_snapset(ObjectStore *store, coll_t coll, ghobject_t &ghobj, SnapSet &ss, bool silent = false)
1904 {
1905   bufferlist attr;
1906   int r = store->getattr(coll, ghobj, SS_ATTR, attr);
1907   if (r < 0) {
1908     if (!silent)
1909       cerr << "Error getting snapset on : " << make_pair(coll, ghobj) << ", "
1910            << cpp_strerror(r) << std::endl;
1911     return r;
1912   }
1913   bufferlist::iterator bp = attr.begin();
1914   try {
1915     ::decode(ss, bp);
1916   } catch (...) {
1917     r = -EINVAL;
1918     cerr << "Error decoding snapset on : " << make_pair(coll, ghobj) << ", "
1919          << cpp_strerror(r) << std::endl;
1920     return r;
1921   }
1922   return 0;
1923 }
1924
1925 int print_obj_info(ObjectStore *store, coll_t coll, ghobject_t &ghobj, Formatter* formatter)
1926 {
1927   int r = 0;
1928   formatter->open_object_section("obj");
1929   formatter->open_object_section("id");
1930   ghobj.dump(formatter);
1931   formatter->close_section();
1932
1933   bufferlist attr;
1934   int gr = store->getattr(coll, ghobj, OI_ATTR, attr);
1935   if (gr < 0) {
1936     r = gr;
1937     cerr << "Error getting attr on : " << make_pair(coll, ghobj) << ", "
1938        << cpp_strerror(r) << std::endl;
1939   } else {
1940     object_info_t oi;
1941     bufferlist::iterator bp = attr.begin();
1942     try {
1943       ::decode(oi, bp);
1944       formatter->open_object_section("info");
1945       oi.dump(formatter);
1946       formatter->close_section();
1947     } catch (...) {
1948       r = -EINVAL;
1949       cerr << "Error decoding attr on : " << make_pair(coll, ghobj) << ", "
1950            << cpp_strerror(r) << std::endl;
1951     }
1952   }
1953   struct stat st;
1954   int sr =  store->stat(coll, ghobj, &st, true);
1955   if (sr < 0) {
1956     r = sr;
1957     cerr << "Error stat on : " << make_pair(coll, ghobj) << ", "
1958          << cpp_strerror(r) << std::endl;
1959   } else {
1960     formatter->open_object_section("stat");
1961     formatter->dump_int("size", st.st_size);
1962     formatter->dump_int("blksize", st.st_blksize);
1963     formatter->dump_int("blocks", st.st_blocks);
1964     formatter->dump_int("nlink", st.st_nlink);
1965     formatter->close_section();
1966   }
1967
1968   if (ghobj.hobj.has_snapset()) {
1969     SnapSet ss;
1970     int snr = get_snapset(store, coll, ghobj, ss);
1971     if (snr < 0) {
1972       r = snr;
1973     } else {
1974       formatter->open_object_section("SnapSet");
1975       ss.dump(formatter);
1976       formatter->close_section();
1977     }
1978   }
1979   formatter->close_section();
1980   formatter->flush(cout);
1981   cout << std::endl;
1982   return r;
1983 }
1984
1985 int set_size(ObjectStore *store, coll_t coll, ghobject_t &ghobj, uint64_t setsize, Formatter* formatter,
1986              ObjectStore::Sequencer &osr, bool corrupt)
1987 {
1988   if (ghobj.hobj.is_snapdir()) {
1989     cerr << "Can't set the size of a snapdir" << std::endl;
1990     return -EINVAL;
1991   }
1992   bufferlist attr;
1993   int r = store->getattr(coll, ghobj, OI_ATTR, attr);
1994   if (r < 0) {
1995     cerr << "Error getting attr on : " << make_pair(coll, ghobj) << ", "
1996        << cpp_strerror(r) << std::endl;
1997     return r;
1998   }
1999   object_info_t oi;
2000   bufferlist::iterator bp = attr.begin();
2001   try {
2002     ::decode(oi, bp);
2003   } catch (...) {
2004     r = -EINVAL;
2005     cerr << "Error getting attr on : " << make_pair(coll, ghobj) << ", "
2006          << cpp_strerror(r) << std::endl;
2007     return r;
2008   }
2009   struct stat st;
2010   r =  store->stat(coll, ghobj, &st, true);
2011   if (r < 0) {
2012     cerr << "Error stat on : " << make_pair(coll, ghobj) << ", "
2013          << cpp_strerror(r) << std::endl;
2014   }
2015   ghobject_t head(ghobj);
2016   SnapSet ss;
2017   bool found_head = true;
2018   map<snapid_t, uint64_t>::iterator csi;
2019   bool is_snap = ghobj.hobj.is_snap();
2020   if (is_snap) {
2021     head.hobj = head.hobj.get_head();
2022     r = get_snapset(store, coll, head, ss, true);
2023     if (r < 0 && r != -ENOENT) {
2024       // Requested get_snapset() silent, so if not -ENOENT show error
2025       cerr << "Error getting snapset on : " << make_pair(coll, head) << ", "
2026            << cpp_strerror(r) << std::endl;
2027       return r;
2028     }
2029     if (r == -ENOENT) {
2030       head.hobj = head.hobj.get_snapdir();
2031       r = get_snapset(store, coll, head, ss);
2032       if (r < 0)
2033         return r;
2034       found_head = false;
2035     } else {
2036       found_head = true;
2037     }
2038     csi = ss.clone_size.find(ghobj.hobj.snap);
2039     if (csi == ss.clone_size.end()) {
2040       cerr << "SnapSet is missing clone_size for snap " << ghobj.hobj.snap << std::endl;
2041       return -EINVAL;
2042     }
2043   }
2044   if ((uint64_t)st.st_size == setsize && oi.size == setsize
2045        && (!is_snap || csi->second == setsize)) {
2046     cout << "Size of object is already " << setsize << std::endl;
2047     return 0;
2048   }
2049   cout << "Setting size to " << setsize << ", stat size " << st.st_size
2050        << ", obj info size " << oi.size;
2051   if (is_snap) {
2052     cout << ", " << (found_head ? "head" : "snapdir")
2053          << " clone_size " << csi->second;
2054     csi->second = setsize;
2055   }
2056   cout << std::endl;
2057   if (!dry_run) {
2058     attr.clear();
2059     oi.size = setsize;
2060     ::encode(oi, attr, -1);  /* fixme: using full features */
2061     ObjectStore::Transaction t;
2062     t.setattr(coll, ghobj, OI_ATTR, attr);
2063     // Only modify object info if we want to corrupt it
2064     if (!corrupt)
2065       t.truncate(coll, ghobj, setsize);
2066     if (is_snap) {
2067       bufferlist snapattr;
2068       snapattr.clear();
2069       ::encode(ss, snapattr);
2070       t.setattr(coll, head, SS_ATTR, snapattr);
2071     }
2072     r = store->apply_transaction(&osr, std::move(t));
2073     if (r < 0) {
2074       cerr << "Error writing object info: " << make_pair(coll, ghobj) << ", "
2075          << cpp_strerror(r) << std::endl;
2076       return r;
2077     }
2078   }
2079   return 0;
2080 }
2081
2082 int clear_snapset(ObjectStore *store, coll_t coll, ghobject_t &ghobj,
2083                   string arg, ObjectStore::Sequencer &osr)
2084 {
2085   SnapSet ss;
2086   int ret = get_snapset(store, coll, ghobj, ss);
2087   if (ret < 0)
2088     return ret;
2089
2090   // Use "head" to set head_exists incorrectly
2091   if (arg == "corrupt" || arg == "head")
2092     ss.head_exists = !ghobj.hobj.is_head();
2093   else if (ss.head_exists != ghobj.hobj.is_head()) {
2094     cerr << "Correcting head_exists, set to "
2095          << (ghobj.hobj.is_head() ? "true" : "false") << std::endl;
2096     ss.head_exists = ghobj.hobj.is_head();
2097   }
2098   // Use "corrupt" to clear entire SnapSet
2099   // Use "seq" to just corrupt SnapSet.seq
2100   if (arg == "corrupt" || arg == "seq")
2101     ss.seq = 0;
2102   // Use "snaps" to just clear SnapSet.snaps
2103   if (arg == "corrupt" || arg == "snaps")
2104     ss.snaps.clear();
2105   // By default just clear clone, clone_overlap and clone_size
2106   if (arg == "corrupt")
2107     arg = "";
2108   if (arg == "" || arg == "clones")
2109     ss.clones.clear();
2110   if (arg == "" || arg == "clone_overlap")
2111     ss.clone_overlap.clear();
2112   if (arg == "" || arg == "clone_size")
2113     ss.clone_size.clear();
2114   // Break all clone sizes by adding 1
2115   if (arg == "size") {
2116     for (map<snapid_t, uint64_t>::iterator i = ss.clone_size.begin();
2117          i != ss.clone_size.end(); ++i)
2118       ++(i->second);
2119   }
2120
2121   if (!dry_run) {
2122     bufferlist bl;
2123     ::encode(ss, bl);
2124     ObjectStore::Transaction t;
2125     t.setattr(coll, ghobj, SS_ATTR, bl);
2126     int r = store->apply_transaction(&osr, std::move(t));
2127     if (r < 0) {
2128       cerr << "Error setting snapset on : " << make_pair(coll, ghobj) << ", "
2129            << cpp_strerror(r) << std::endl;
2130       return r;
2131     }
2132   }
2133   return 0;
2134 }
2135
2136 vector<snapid_t>::iterator find(vector<snapid_t> &v, snapid_t clid)
2137 {
2138   return std::find(v.begin(), v.end(), clid);
2139 }
2140
2141 map<snapid_t, interval_set<uint64_t> >::iterator
2142 find(map<snapid_t, interval_set<uint64_t> > &m, snapid_t clid)
2143 {
2144   return m.find(clid);
2145 }
2146
2147 map<snapid_t, uint64_t>::iterator find(map<snapid_t, uint64_t> &m,
2148                                        snapid_t clid)
2149 {
2150   return m.find(clid);
2151 }
2152
2153 template<class T>
2154 int remove_from(T &mv, string name, snapid_t cloneid, bool force)
2155 {
2156   typename T::iterator i = find(mv, cloneid);
2157   if (i != mv.end()) {
2158     mv.erase(i);
2159   } else {
2160     cerr << "Clone " << cloneid << " doesn't exist in " << name;
2161     if (force) {
2162       cerr << " (ignored)" << std::endl;
2163       return 0;
2164     }
2165     cerr << std::endl;
2166     return -EINVAL;
2167   }
2168   return 0;
2169 }
2170
2171 int remove_clone(ObjectStore *store, coll_t coll, ghobject_t &ghobj, snapid_t cloneid, bool force,
2172                      ObjectStore::Sequencer &osr)
2173 {
2174   // XXX: Don't allow this if in a cache tier or former cache tier
2175   // bool allow_incomplete_clones() const {
2176   //    return cache_mode != CACHEMODE_NONE || has_flag(FLAG_INCOMPLETE_CLONES);
2177
2178   SnapSet snapset;
2179   int ret = get_snapset(store, coll, ghobj, snapset);
2180   if (ret < 0)
2181     return ret;
2182
2183   // Derived from trim_object()
2184   // ...from snapset
2185   vector<snapid_t>::iterator p;
2186   for (p = snapset.clones.begin(); p != snapset.clones.end(); ++p)
2187     if (*p == cloneid)
2188       break;
2189   if (p == snapset.clones.end()) {
2190     cerr << "Clone " << cloneid << " not present";
2191     return -ENOENT;
2192   }
2193   if (p != snapset.clones.begin()) {
2194     // not the oldest... merge overlap into next older clone
2195     vector<snapid_t>::iterator n = p - 1;
2196     hobject_t prev_coid = ghobj.hobj;
2197     prev_coid.snap = *n;
2198     //bool adjust_prev_bytes = is_present_clone(prev_coid);
2199
2200     //if (adjust_prev_bytes)
2201     //  ctx->delta_stats.num_bytes -= snapset.get_clone_bytes(*n);
2202
2203     snapset.clone_overlap[*n].intersection_of(
2204         snapset.clone_overlap[*p]);
2205
2206     //if (adjust_prev_bytes)
2207     //  ctx->delta_stats.num_bytes += snapset.get_clone_bytes(*n);
2208   }
2209
2210   ret = remove_from(snapset.clones, "clones", cloneid, force);
2211   if (ret) return ret;
2212   ret = remove_from(snapset.clone_overlap, "clone_overlap", cloneid, force);
2213   if (ret) return ret;
2214   ret = remove_from(snapset.clone_size, "clone_size", cloneid, force);
2215   if (ret) return ret;
2216
2217   if (dry_run)
2218     return 0;
2219
2220   bufferlist bl;
2221   ::encode(snapset, bl);
2222   ObjectStore::Transaction t;
2223   t.setattr(coll, ghobj, SS_ATTR, bl);
2224   int r = store->apply_transaction(&osr, std::move(t));
2225   if (r < 0) {
2226     cerr << "Error setting snapset on : " << make_pair(coll, ghobj) << ", "
2227          << cpp_strerror(r) << std::endl;
2228     return r;
2229   }
2230   cout << "Removal of clone " << cloneid << " complete" << std::endl;
2231   cout << "Use pg repair after OSD restarted to correct stat information" << std::endl;
2232   return 0;
2233 }
2234
2235 int dup(string srcpath, ObjectStore *src, string dstpath, ObjectStore *dst)
2236 {
2237   cout << "dup from " << src->get_type() << ": " << srcpath << "\n"
2238        << "      to " << dst->get_type() << ": " << dstpath
2239        << std::endl;
2240   ObjectStore::Sequencer osr("dup");
2241   int num, i;
2242   vector<coll_t> collections;
2243   int r;
2244
2245   r = src->mount();
2246   if (r < 0) {
2247     cerr << "failed to mount src: " << cpp_strerror(r) << std::endl;
2248     return r;
2249   }
2250   r = dst->mount();
2251   if (r < 0) {
2252     cerr << "failed to mount dst: " << cpp_strerror(r) << std::endl;
2253     goto out_src;
2254   }
2255
2256   if (src->get_fsid() != dst->get_fsid()) {
2257     cerr << "src fsid " << src->get_fsid() << " != dest " << dst->get_fsid()
2258          << std::endl;
2259     goto out;
2260   }
2261   cout << "fsid " << src->get_fsid() << std::endl;
2262
2263   // make sure dst is empty
2264   r = dst->list_collections(collections);
2265   if (r < 0) {
2266     cerr << "error listing collections on dst: " << cpp_strerror(r) << std::endl;
2267     goto out;
2268   }
2269   if (!collections.empty()) {
2270     cerr << "destination store is not empty" << std::endl;
2271     goto out;
2272   }
2273
2274   r = src->list_collections(collections);
2275   if (r < 0) {
2276     cerr << "error listing collections on src: " << cpp_strerror(r) << std::endl;
2277     goto out;
2278   }
2279
2280   num = collections.size();
2281   cout << num << " collections" << std::endl;
2282   i = 1;
2283   for (auto cid : collections) {
2284     cout << i++ << "/" << num << " " << cid << std::endl;
2285     {
2286       ObjectStore::Transaction t;
2287       int bits = src->collection_bits(cid);
2288       if (bits < 0) {
2289         if (src->get_type() == "filestore" && cid.is_meta()) {
2290           bits = 0;
2291         } else {
2292           cerr << "cannot get bit count for collection " << cid << ": "
2293                << cpp_strerror(bits) << std::endl;
2294           goto out;
2295         }
2296       }
2297       t.create_collection(cid, bits);
2298       dst->apply_transaction(&osr, std::move(t));
2299     }
2300
2301     ghobject_t pos;
2302     uint64_t n = 0;
2303     uint64_t bytes = 0, keys = 0;
2304     while (true) {
2305       vector<ghobject_t> ls;
2306       r = src->collection_list(cid, pos, ghobject_t::get_max(), 1000, &ls, &pos);
2307       if (r < 0) {
2308         cerr << "collection_list on " << cid << " from " << pos << " got: "
2309              << cpp_strerror(r) << std::endl;
2310         goto out;
2311       }
2312       if (ls.empty()) {
2313         break;
2314       }
2315       
2316       for (auto& oid : ls) {
2317         //cout << "  " << cid << " " << oid << std::endl;
2318         if (n % 100 == 0) {
2319           cout << "  " << std::setw(16) << n << " objects, "
2320                << std::setw(16) << bytes << " bytes, "
2321                << std::setw(16) << keys << " keys"
2322                << std::setw(1) << "\r" << std::flush;
2323         }
2324         n++;
2325
2326         ObjectStore::Transaction t;
2327         t.touch(cid, oid);
2328
2329         map<string,bufferptr> attrs;
2330         src->getattrs(cid, oid, attrs);
2331         if (!attrs.empty()) {
2332           t.setattrs(cid, oid, attrs);
2333         }
2334
2335         bufferlist bl;
2336         src->read(cid, oid, 0, 0, bl);
2337         if (bl.length()) {
2338           t.write(cid, oid, 0, bl.length(), bl);
2339           bytes += bl.length();
2340         }
2341
2342         bufferlist header;
2343         map<string,bufferlist> omap;
2344         src->omap_get(cid, oid, &header, &omap);
2345         if (header.length()) {
2346           t.omap_setheader(cid, oid, header);
2347           ++keys;
2348         }
2349         if (!omap.empty()) {
2350           keys += omap.size();
2351           t.omap_setkeys(cid, oid, omap);
2352         }
2353
2354         dst->apply_transaction(&osr, std::move(t));
2355       }
2356     }
2357     cout << "  " << std::setw(16) << n << " objects, "
2358          << std::setw(16) << bytes << " bytes, "
2359          << std::setw(16) << keys << " keys"
2360          << std::setw(1) << std::endl;
2361   }
2362
2363   // keyring
2364   cout << "keyring" << std::endl;
2365   {
2366     bufferlist bl;
2367     string s = srcpath + "/keyring";
2368     string err;
2369     r = bl.read_file(s.c_str(), &err);
2370     if (r < 0) {
2371       cerr << "failed to copy " << s << ": " << err << std::endl;
2372     } else {
2373       string d = dstpath + "/keyring";
2374       bl.write_file(d.c_str(), 0600);
2375     }
2376   }
2377
2378   // osd metadata
2379   cout << "duping osd metadata" << std::endl;
2380   {
2381     for (auto k : {"magic", "whoami", "ceph_fsid", "fsid"}) {
2382       string val;
2383       src->read_meta(k, &val);
2384       dst->write_meta(k, val);
2385     }
2386   }
2387
2388   dst->write_meta("ready", "ready");
2389
2390   cout << "done." << std::endl;
2391   r = 0;
2392  out:
2393   dst->umount();
2394  out_src:
2395   src->umount();
2396   return r;
2397 }
2398
2399 void usage(po::options_description &desc)
2400 {
2401     cerr << std::endl;
2402     cerr << desc << std::endl;
2403     cerr << std::endl;
2404     cerr << "Positional syntax:" << std::endl;
2405     cerr << std::endl;
2406     cerr << "ceph-objectstore-tool ... <object> (get|set)-bytes [file]" << std::endl;
2407     cerr << "ceph-objectstore-tool ... <object> set-(attr|omap) <key> [file]" << std::endl;
2408     cerr << "ceph-objectstore-tool ... <object> (get|rm)-(attr|omap) <key>" << std::endl;
2409     cerr << "ceph-objectstore-tool ... <object> get-omaphdr" << std::endl;
2410     cerr << "ceph-objectstore-tool ... <object> set-omaphdr [file]" << std::endl;
2411     cerr << "ceph-objectstore-tool ... <object> list-attrs" << std::endl;
2412     cerr << "ceph-objectstore-tool ... <object> list-omap" << std::endl;
2413     cerr << "ceph-objectstore-tool ... <object> remove|removeall" << std::endl;
2414     cerr << "ceph-objectstore-tool ... <object> dump" << std::endl;
2415     cerr << "ceph-objectstore-tool ... <object> set-size" << std::endl;
2416     cerr << "ceph-objectstore-tool ... <object> remove-clone-metadata <cloneid>" << std::endl;
2417     cerr << std::endl;
2418     cerr << "<object> can be a JSON object description as displayed" << std::endl;
2419     cerr << "by --op list." << std::endl;
2420     cerr << "<object> can be an object name which will be looked up in all" << std::endl;
2421     cerr << "the OSD's PGs." << std::endl;
2422     cerr << "<object> can be the empty string ('') which with a provided pgid " << std::endl;
2423     cerr << "specifies the pgmeta object" << std::endl;
2424     cerr << std::endl;
2425     cerr << "The optional [file] argument will read stdin or write stdout" << std::endl;
2426     cerr << "if not specified or if '-' specified." << std::endl;
2427 }
2428
2429 bool ends_with(const string& check, const string& ending)
2430 {
2431     return check.size() >= ending.size() && check.rfind(ending) == (check.size() - ending.size());
2432 }
2433
2434 // Based on FileStore::dump_journal(), set-up enough to only dump
2435 int mydump_journal(Formatter *f, string journalpath, bool m_journal_dio)
2436 {
2437   int r;
2438
2439   if (!journalpath.length())
2440     return -EINVAL;
2441
2442   FileJournal *journal = new FileJournal(g_ceph_context, uuid_d(), NULL, NULL,
2443                                          journalpath.c_str(), m_journal_dio);
2444   r = journal->_fdump(*f, false);
2445   delete journal;
2446   return r;
2447 }
2448
2449 int apply_layout_settings(ObjectStore *os, const OSDSuperblock &superblock,
2450                           const string &pool_name, const spg_t &pgid, bool dry_run)
2451 {
2452   int r = 0;
2453
2454   FileStore *fs = dynamic_cast<FileStore*>(os);
2455   if (!fs) {
2456     cerr << "Nothing to do for non-filestore backend" << std::endl;
2457     return 0; // making this return success makes testing easier
2458   }
2459
2460   OSDMap curmap;
2461   bufferlist bl;
2462   r = get_osdmap(os, superblock.current_epoch, curmap, bl);
2463   if (r) {
2464     cerr << "Can't find local OSDMap: " << cpp_strerror(r) << std::endl;
2465     return r;
2466   }
2467
2468   int64_t poolid = -1;
2469   if (pool_name.length()) {
2470     poolid = curmap.lookup_pg_pool_name(pool_name);
2471     if (poolid < 0) {
2472       cerr << "Couldn't find pool " << pool_name << ": " << cpp_strerror(poolid)
2473            << std::endl;
2474       return poolid;
2475     }
2476   }
2477
2478   vector<coll_t> collections, filtered_colls;
2479   r = os->list_collections(collections);
2480   if (r < 0) {
2481     cerr << "Error listing collections: " << cpp_strerror(r) << std::endl;
2482     return r;
2483   }
2484
2485   for (auto const &coll : collections) {
2486     spg_t coll_pgid;
2487     if (coll.is_pg(&coll_pgid) &&
2488         ((poolid >= 0 && coll_pgid.pool() == (uint64_t)poolid) ||
2489          coll_pgid == pgid)) {
2490       filtered_colls.push_back(coll);
2491     }
2492   }
2493
2494   size_t done = 0, total = filtered_colls.size();
2495   for (auto const &coll : filtered_colls) {
2496     if (dry_run) {
2497       cerr << "Would apply layout settings to " << coll << std::endl;
2498     } else {
2499       cerr << "Finished " << done << "/" << total << " collections" << "\r";
2500       r = fs->apply_layout_settings(coll);
2501       if (r < 0) {
2502         cerr << "Error applying layout settings to " << coll << std::endl;
2503         return r;
2504       }
2505     }
2506     ++done;
2507   }
2508
2509   cerr << "Finished " << total << "/" << total << " collections" << "\r" << std::endl;
2510   return r;
2511 }
2512
2513 int main(int argc, char **argv)
2514 {
2515   string dpath, jpath, pgidstr, op, file, mountpoint, mon_store_path, object;
2516   string target_data_path, fsid;
2517   string objcmd, arg1, arg2, type, format, argnspace, pool;
2518   boost::optional<std::string> nspace;
2519   spg_t pgid;
2520   unsigned epoch = 0;
2521   ghobject_t ghobj;
2522   bool human_readable;
2523   bool force;
2524   Formatter *formatter;
2525   bool head;
2526
2527   po::options_description desc("Allowed options");
2528   desc.add_options()
2529     ("help", "produce help message")
2530     ("type", po::value<string>(&type),
2531      "Arg is one of [bluestore, filestore (default), memstore]")
2532     ("data-path", po::value<string>(&dpath),
2533      "path to object store, mandatory")
2534     ("journal-path", po::value<string>(&jpath),
2535      "path to journal, use if tool can't find it")
2536     ("pgid", po::value<string>(&pgidstr),
2537      "PG id, mandatory for info, log, remove, export, export-remove, rm-past-intervals, mark-complete, and mandatory for apply-layout-settings if --pool is not specified")
2538     ("pool", po::value<string>(&pool),
2539      "Pool name, mandatory for apply-layout-settings if --pgid is not specified")
2540     ("op", po::value<string>(&op),
2541      "Arg is one of [info, log, remove, mkfs, fsck, repair, fuse, dup, export, export-remove, import, list, fix-lost, list-pgs, rm-past-intervals, dump-journal, dump-super, meta-list, "
2542      "get-osdmap, set-osdmap, get-inc-osdmap, set-inc-osdmap, mark-complete, apply-layout-settings, update-mon-db]")
2543     ("epoch", po::value<unsigned>(&epoch),
2544      "epoch# for get-osdmap and get-inc-osdmap, the current epoch in use if not specified")
2545     ("file", po::value<string>(&file),
2546      "path of file to export, export-remove, import, get-osdmap, set-osdmap, get-inc-osdmap or set-inc-osdmap")
2547     ("mon-store-path", po::value<string>(&mon_store_path),
2548      "path of monstore to update-mon-db")
2549     ("fsid", po::value<string>(&fsid),
2550      "fsid for new store created by mkfs")
2551     ("target-data-path", po::value<string>(&target_data_path),
2552      "path of target object store (for --op dup)")
2553     ("mountpoint", po::value<string>(&mountpoint),
2554      "fuse mountpoint")
2555     ("format", po::value<string>(&format)->default_value("json-pretty"),
2556      "Output format which may be json, json-pretty, xml, xml-pretty")
2557     ("debug", "Enable diagnostic output to stderr")
2558     ("force", "Ignore some types of errors and proceed with operation - USE WITH CAUTION: CORRUPTION POSSIBLE NOW OR IN THE FUTURE")
2559     ("skip-journal-replay", "Disable journal replay")
2560     ("skip-mount-omap", "Disable mounting of omap")
2561     ("head", "Find head/snapdir when searching for objects by name")
2562     ("dry-run", "Don't modify the objectstore")
2563     ("namespace", po::value<string>(&argnspace), "Specify namespace when searching for objects")
2564     ;
2565
2566   po::options_description positional("Positional options");
2567   positional.add_options()
2568     ("object", po::value<string>(&object), "'' for pgmeta_oid, object name or ghobject in json")
2569     ("objcmd", po::value<string>(&objcmd), "command [(get|set)-bytes, (get|set|rm)-(attr|omap), (get|set)-omaphdr, list-attrs, list-omap, remove]")
2570     ("arg1", po::value<string>(&arg1), "arg1 based on cmd")
2571     ("arg2", po::value<string>(&arg2), "arg2 based on cmd")
2572     ("test-align", po::value<uint64_t>(&testalign)->default_value(0), "hidden align option for testing")
2573     ;
2574
2575   po::options_description all("All options");
2576   all.add(desc).add(positional);
2577
2578   po::positional_options_description pd;
2579   pd.add("object", 1).add("objcmd", 1).add("arg1", 1).add("arg2", 1);
2580
2581   vector<string> ceph_option_strings;
2582   po::variables_map vm;
2583   try {
2584     po::parsed_options parsed =
2585       po::command_line_parser(argc, argv).options(all).allow_unregistered().positional(pd).run();
2586     po::store( parsed, vm);
2587     po::notify(vm);
2588     ceph_option_strings = po::collect_unrecognized(parsed.options,
2589                                                    po::include_positional);
2590   } catch(po::error &e) {
2591     std::cerr << e.what() << std::endl;
2592     return 1;
2593   }
2594
2595   if (vm.count("help")) {
2596     usage(all);
2597     return 1;
2598   }
2599
2600   debug = (vm.count("debug") > 0);
2601
2602   force = (vm.count("force") > 0);
2603
2604   if (vm.count("namespace"))
2605     nspace = argnspace;
2606
2607   dry_run = (vm.count("dry-run") > 0);
2608
2609   osflagbits_t flags = 0;
2610   if (dry_run || vm.count("skip-journal-replay"))
2611     flags |= SKIP_JOURNAL_REPLAY;
2612   if (vm.count("skip-mount-omap"))
2613     flags |= SKIP_MOUNT_OMAP;
2614   if (op == "update-mon-db")
2615     flags |= SKIP_JOURNAL_REPLAY;
2616
2617   head = (vm.count("head") > 0);
2618
2619   vector<const char *> ceph_options;
2620   env_to_vec(ceph_options);
2621   ceph_options.reserve(ceph_options.size() + ceph_option_strings.size());
2622   for (vector<string>::iterator i = ceph_option_strings.begin();
2623        i != ceph_option_strings.end();
2624        ++i) {
2625     ceph_options.push_back(i->c_str());
2626   }
2627
2628   char fn[PATH_MAX];
2629   snprintf(fn, sizeof(fn), "%s/type", dpath.c_str());
2630   int fd = ::open(fn, O_RDONLY);
2631   if (fd >= 0) {
2632     bufferlist bl;
2633     bl.read_fd(fd, 64);
2634     if (bl.length()) {
2635       string dp_type = string(bl.c_str(), bl.length() - 1);  // drop \n
2636       if (vm.count("type") && dp_type != "" && type != dp_type)
2637         cerr << "WARNING: Ignoring type \"" << type << "\" - found data-path type \""
2638              << dp_type << "\"" << std::endl;
2639       type = dp_type;
2640       //cout << "object store type is " << type << std::endl;
2641     }
2642     ::close(fd);
2643   }
2644   if (!vm.count("type") && type == "") {
2645     type = "filestore";
2646   }
2647   if (!vm.count("data-path") &&
2648      !(op == "dump-journal" && type == "filestore")) {
2649     cerr << "Must provide --data-path" << std::endl;
2650     usage(desc);
2651     return 1;
2652   }
2653   if (type == "filestore" && !vm.count("journal-path")) {
2654     jpath = dpath + "/journal";
2655   }
2656   if (!vm.count("op") && !vm.count("object")) {
2657     cerr << "Must provide --op or object command..." << std::endl;
2658     usage(desc);
2659     return 1;
2660   }
2661   if (op != "list" &&
2662       vm.count("op") && vm.count("object")) {
2663     cerr << "Can't specify both --op and object command syntax" << std::endl;
2664     usage(desc);
2665     return 1;
2666   }
2667   if (op == "apply-layout-settings" && !(vm.count("pool") ^ vm.count("pgid"))) {
2668     cerr << "apply-layout-settings requires either --pool or --pgid"
2669          << std::endl;
2670     usage(desc);
2671     return 1;
2672   }
2673   if (op != "list" && vm.count("object") && !vm.count("objcmd")) {
2674     cerr << "Invalid syntax, missing command" << std::endl;
2675     usage(desc);
2676     return 1;
2677   }
2678   if (op == "fuse" && mountpoint.length() == 0) {
2679     cerr << "Missing fuse mountpoint" << std::endl;
2680     usage(desc);
2681     return 1;
2682   }
2683   outistty = isatty(STDOUT_FILENO);
2684
2685   file_fd = fd_none;
2686   if ((op == "export" || op == "export-remove" || op == "get-osdmap" || op == "get-inc-osdmap") && !dry_run) {
2687     if (!vm.count("file") || file == "-") {
2688       if (outistty) {
2689         cerr << "stdout is a tty and no --file filename specified" << std::endl;
2690         return 1;
2691       }
2692       file_fd = STDOUT_FILENO;
2693     } else {
2694       file_fd = open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC, 0666);
2695     }
2696   } else if (op == "import" || op == "set-osdmap" || op == "set-inc-osdmap") {
2697     if (!vm.count("file") || file == "-") {
2698       if (isatty(STDIN_FILENO)) {
2699         cerr << "stdin is a tty and no --file filename specified" << std::endl;
2700         return 1;
2701       }
2702       file_fd = STDIN_FILENO;
2703     } else {
2704       file_fd = open(file.c_str(), O_RDONLY);
2705     }
2706   }
2707
2708   ObjectStoreTool tool = ObjectStoreTool(file_fd, dry_run);
2709
2710   if (vm.count("file") && file_fd == fd_none && !dry_run) {
2711     cerr << "--file option only applies to import, export, export-remove, "
2712          << "get-osdmap, set-osdmap, get-inc-osdmap or set-inc-osdmap" << std::endl;
2713     return 1;
2714   }
2715
2716   if (file_fd != fd_none && file_fd < 0) {
2717     string err = string("file: ") + file;
2718     perror(err.c_str());
2719     return 1;
2720   }
2721
2722   auto cct = global_init(
2723     NULL, ceph_options, CEPH_ENTITY_TYPE_OSD,
2724     CODE_ENVIRONMENT_UTILITY_NODOUT, 0);
2725     //CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
2726   common_init_finish(g_ceph_context);
2727   g_conf = g_ceph_context->_conf;
2728   if (debug) {
2729     g_conf->set_val_or_die("log_to_stderr", "true");
2730     g_conf->set_val_or_die("err_to_stderr", "true");
2731   }
2732   g_conf->apply_changes(NULL);
2733
2734   // Special list handling.  Treating pretty_format as human readable,
2735   // with one object per line and not an enclosing array.
2736   human_readable = ends_with(format, "-pretty");
2737   if ((op == "list" || op == "meta-list") && human_readable) {
2738     // Remove -pretty from end of format which we know is there
2739     format = format.substr(0, format.size() - strlen("-pretty"));
2740   }
2741
2742   formatter = Formatter::create(format);
2743   if (formatter == NULL) {
2744     cerr << "unrecognized format: " << format << std::endl;
2745     return 1;
2746   }
2747
2748   // Special handling for filestore journal, so we can dump it without mounting
2749   if (op == "dump-journal" && type == "filestore") {
2750     int ret = mydump_journal(formatter, jpath, g_conf->journal_dio);
2751     if (ret < 0) {
2752       cerr << "journal-path: " << jpath << ": "
2753            << cpp_strerror(ret) << std::endl;
2754       return 1;
2755     }
2756     formatter->flush(cout);
2757     return 0;
2758   }
2759
2760   //Verify that data-path really exists
2761   struct stat st;
2762   if (::stat(dpath.c_str(), &st) == -1) {
2763     string err = string("data-path: ") + dpath;
2764     perror(err.c_str());
2765     return 1;
2766   }
2767
2768   if (pgidstr.length() && !pgid.parse(pgidstr.c_str())) {
2769     cerr << "Invalid pgid '" << pgidstr << "' specified" << std::endl;
2770     return 1;
2771   }
2772
2773   //Verify that the journal-path really exists
2774   if (type == "filestore") {
2775     if (::stat(jpath.c_str(), &st) == -1) {
2776       string err = string("journal-path: ") + jpath;
2777       perror(err.c_str());
2778       return 1;
2779     }
2780     if (S_ISDIR(st.st_mode)) {
2781       cerr << "journal-path: " << jpath << ": "
2782            << cpp_strerror(EISDIR) << std::endl;
2783       return 1;
2784     }
2785   }
2786
2787   ObjectStore *fs = ObjectStore::create(g_ceph_context, type, dpath, jpath, flags);
2788   if (fs == NULL) {
2789     cerr << "Unable to create store of type " << type << std::endl;
2790     return 1;
2791   }
2792
2793   if (op == "fsck" || op == "fsck-deep") {
2794     int r = fs->fsck(op == "fsck-deep");
2795     if (r < 0) {
2796       cerr << "fsck failed: " << cpp_strerror(r) << std::endl;
2797       return 1;
2798     }
2799     if (r > 0) {
2800       cerr << "fsck found " << r << " errors" << std::endl;
2801       return 1;
2802     }
2803     cout << "fsck found no errors" << std::endl;
2804     return 0;
2805   }
2806   if (op == "repair" || op == "repair-deep") {
2807     int r = fs->repair(op == "repair-deep");
2808     if (r < 0) {
2809       cerr << "repair failed: " << cpp_strerror(r) << std::endl;
2810       return 1;
2811     }
2812     if (r > 0) {
2813       cerr << "repair found " << r << " errors" << std::endl;
2814       return 1;
2815     }
2816     cout << "repair found no errors" << std::endl;
2817     return 0;
2818   }
2819   if (op == "mkfs") {
2820     if (fsid.length()) {
2821       uuid_d f;
2822       bool r = f.parse(fsid.c_str());
2823       if (!r) {
2824         cerr << "failed to parse uuid '" << fsid << "'" << std::endl;
2825         return 1;
2826       }
2827       fs->set_fsid(f);
2828     }
2829     int r = fs->mkfs();
2830     if (r < 0) {
2831       cerr << "mkfs failed: " << cpp_strerror(r) << std::endl;
2832       return 1;
2833     }
2834     return 0;
2835   }
2836   if (op == "dup") {
2837     string target_type;
2838     char fn[PATH_MAX];
2839     snprintf(fn, sizeof(fn), "%s/type", target_data_path.c_str());
2840     int fd = ::open(fn, O_RDONLY);
2841     if (fd < 0) {
2842       cerr << "Unable to open " << target_data_path << "/type" << std::endl;
2843       exit(1);
2844     }
2845     bufferlist bl;
2846     bl.read_fd(fd, 64);
2847     if (bl.length()) {
2848       target_type = string(bl.c_str(), bl.length() - 1);  // drop \n
2849     }
2850     ::close(fd);
2851     ObjectStore *targetfs = ObjectStore::create(
2852       g_ceph_context, target_type,
2853       target_data_path, "", 0);
2854     if (targetfs == NULL) {
2855       cerr << "Unable to open store of type " << target_type << std::endl;
2856       return 1;
2857     }
2858     int r = dup(dpath, fs, target_data_path, targetfs);
2859     if (r < 0) {
2860       cerr << "dup failed: " << cpp_strerror(r) << std::endl;
2861       return 1;
2862     }
2863     return 0;
2864   }
2865
2866   ObjectStore::Sequencer *osr = new ObjectStore::Sequencer(__func__);
2867   int ret = fs->mount();
2868   if (ret < 0) {
2869     if (ret == -EBUSY) {
2870       cerr << "OSD has the store locked" << std::endl;
2871     } else {
2872       cerr << "Mount failed with '" << cpp_strerror(ret) << "'" << std::endl;
2873     }
2874     return 1;
2875   }
2876
2877   if (op == "fuse") {
2878 #ifdef HAVE_LIBFUSE
2879     FuseStore fuse(fs, mountpoint);
2880     cout << "mounting fuse at " << mountpoint << " ..." << std::endl;
2881     int r = fuse.main();
2882     if (r < 0) {
2883       cerr << "failed to mount fuse: " << cpp_strerror(r) << std::endl;
2884       return 1;
2885     }
2886 #else
2887     cerr << "fuse support not enabled" << std::endl;
2888 #endif
2889     return 0;
2890   }
2891
2892   vector<coll_t> ls;
2893   vector<coll_t>::iterator it;
2894   CompatSet supported;
2895
2896 #ifdef INTERNAL_TEST
2897   supported = get_test_compat_set();
2898 #else
2899   supported = OSD::get_osd_compat_set();
2900 #endif
2901
2902   bufferlist bl;
2903   OSDSuperblock superblock;
2904   bufferlist::iterator p;
2905   ret = fs->read(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, 0, bl);
2906   if (ret < 0) {
2907     cerr << "Failure to read OSD superblock: " << cpp_strerror(ret) << std::endl;
2908     goto out;
2909   }
2910
2911   p = bl.begin();
2912   ::decode(superblock, p);
2913
2914   if (debug) {
2915     cerr << "Cluster fsid=" << superblock.cluster_fsid << std::endl;
2916   }
2917
2918   if (debug) {
2919     cerr << "Supported features: " << supported << std::endl;
2920     cerr << "On-disk features: " << superblock.compat_features << std::endl;
2921   }
2922   if (supported.compare(superblock.compat_features) == -1) {
2923     CompatSet unsupported = supported.unsupported(superblock.compat_features);
2924     cerr << "On-disk OSD incompatible features set "
2925       << unsupported << std::endl;
2926     ret = -EINVAL;
2927     goto out;
2928   }
2929
2930   if (op == "apply-layout-settings") {
2931     ret = apply_layout_settings(fs, superblock, pool, pgid, dry_run);
2932     goto out;
2933   }
2934
2935   if (op != "list" && vm.count("object")) {
2936     // Special case: Create pgmeta_oid if empty string specified
2937     // This can't conflict with any actual object names.
2938     if (object == "") {
2939       ghobj = pgid.make_pgmeta_oid();
2940     } else {
2941     json_spirit::Value v;
2942     try {
2943       if (!json_spirit::read(object, v) ||
2944           (v.type() != json_spirit::array_type && v.type() != json_spirit::obj_type)) {
2945         // Special: Need head/snapdir so set even if user didn't specify
2946         if (vm.count("objcmd") && (objcmd == "remove-clone-metadata"))
2947           head = true;
2948         lookup_ghobject lookup(object, nspace, head);
2949         if (action_on_all_objects(fs, lookup, debug)) {
2950           throw std::runtime_error("Internal error");
2951         } else {
2952           if (lookup.size() != 1) {
2953             stringstream ss;
2954             if (lookup.size() == 0)
2955               ss << "No object id '" << object << "' found or invalid JSON specified";
2956             else
2957               ss << "Found " << lookup.size() << " objects with id '" << object
2958                  << "', please use a JSON spec from --op list instead";
2959             throw std::runtime_error(ss.str());
2960           }
2961           pair<coll_t, ghobject_t> found = lookup.pop();
2962           pgidstr = found.first.to_str();
2963           pgid.parse(pgidstr.c_str());
2964           ghobj = found.second;
2965         }
2966       } else {
2967         stringstream ss;
2968         if (pgidstr.length() == 0 && v.type() != json_spirit::array_type) {
2969           ss << "Without --pgid the object '" << object
2970              << "' must be a JSON array";
2971           throw std::runtime_error(ss.str());
2972         }
2973         if (v.type() == json_spirit::array_type) {
2974           json_spirit::Array array = v.get_array();
2975           if (array.size() != 2) {
2976             ss << "Object '" << object
2977                << "' must be a JSON array with 2 elements";
2978             throw std::runtime_error(ss.str());
2979           }
2980           vector<json_spirit::Value>::iterator i = array.begin();
2981           assert(i != array.end());
2982           if (i->type() != json_spirit::str_type) {
2983             ss << "Object '" << object
2984                << "' must be a JSON array with the first element a string";
2985             throw std::runtime_error(ss.str());
2986           }
2987           string object_pgidstr = i->get_str();
2988           if (object_pgidstr != "meta") {
2989             spg_t object_pgid;
2990             object_pgid.parse(object_pgidstr.c_str());
2991             if (pgidstr.length() > 0) {
2992               if (object_pgid != pgid) {
2993                 ss << "object '" << object
2994                    << "' has a pgid different from the --pgid="
2995                    << pgidstr << " option";
2996                 throw std::runtime_error(ss.str());
2997               }
2998             } else {
2999               pgidstr = object_pgidstr;
3000               pgid = object_pgid;
3001             }
3002           } else {
3003             pgidstr = object_pgidstr;
3004           }
3005           ++i;
3006           v = *i;
3007         }
3008         try {
3009           ghobj.decode(v);
3010         } catch (std::runtime_error& e) {
3011           ss << "Decode object JSON error: " << e.what();
3012           throw std::runtime_error(ss.str());
3013         }
3014         if (pgidstr != "meta" && (uint64_t)pgid.pgid.m_pool != (uint64_t)ghobj.hobj.pool) {
3015           cerr << "Object pool and pgid pool don't match" << std::endl;
3016           ret = 1;
3017           goto out;
3018         }
3019       }
3020     } catch (std::runtime_error& e) {
3021       cerr << e.what() << std::endl;
3022       ret = 1;
3023       goto out;
3024     }
3025     }
3026   }
3027
3028   // The ops which require --pgid option are checked here and
3029   // mentioned in the usage for --pgid.
3030   if ((op == "info" || op == "log" || op == "remove" || op == "export"
3031       || op == "export-remove" || op == "rm-past-intervals" || op == "mark-complete") &&
3032       pgidstr.length() == 0) {
3033     cerr << "Must provide pgid" << std::endl;
3034     usage(desc);
3035     ret = 1;
3036     goto out;
3037   }
3038
3039   if (op == "import") {
3040
3041     try {
3042       ret = tool.do_import(fs, superblock, force, pgidstr, *osr);
3043     }
3044     catch (const buffer::error &e) {
3045       cerr << "do_import threw exception error " << e.what() << std::endl;
3046       ret = -EFAULT;
3047     }
3048     if (ret == -EFAULT) {
3049       cerr << "Corrupt input for import" << std::endl;
3050     }
3051     if (ret == 0)
3052       cout << "Import successful" << std::endl;
3053     goto out;
3054   } else if (op == "dump-journal-mount") {
3055     // Undocumented feature to dump journal with mounted fs
3056     // This doesn't support the format option, but it uses the
3057     // ObjectStore::dump_journal() and mounts to get replay to run.
3058     ret = fs->dump_journal(cout);
3059     if (ret) {
3060       if (ret == -EOPNOTSUPP) {
3061         cerr << "Object store type \"" << type << "\" doesn't support journal dump" << std::endl;
3062       } else {
3063         cerr << "Journal dump failed with error " << cpp_strerror(ret) << std::endl;
3064       }
3065     }
3066     goto out;
3067   } else if (op == "get-osdmap") {
3068     bufferlist bl;
3069     OSDMap osdmap;
3070     if (epoch == 0) {
3071       epoch = superblock.current_epoch;
3072     }
3073     ret = get_osdmap(fs, epoch, osdmap, bl);
3074     if (ret) {
3075       cerr << "Failed to get osdmap#" << epoch << ": "
3076            << cpp_strerror(ret) << std::endl;
3077       goto out;
3078     }
3079     ret = bl.write_fd(file_fd);
3080     if (ret) {
3081       cerr << "Failed to write to " << file << ": " << cpp_strerror(ret) << std::endl;
3082     } else {
3083       cout << "osdmap#" << epoch << " exported." << std::endl;
3084     }
3085     goto out;
3086   } else if (op == "set-osdmap") {
3087     bufferlist bl;
3088     ret = get_fd_data(file_fd, bl);
3089     if (ret < 0) {
3090       cerr << "Failed to read osdmap " << cpp_strerror(ret) << std::endl;
3091     } else {
3092       ret = set_osdmap(fs, epoch, bl, force, *osr);
3093     }
3094     goto out;
3095   } else if (op == "get-inc-osdmap") {
3096     bufferlist bl;
3097     if (epoch == 0) {
3098       epoch = superblock.current_epoch;
3099     }
3100     ret = get_inc_osdmap(fs, epoch, bl);
3101     if (ret < 0) {
3102       cerr << "Failed to get incremental osdmap# " << epoch << ": "
3103            << cpp_strerror(ret) << std::endl;
3104       goto out;
3105     }
3106     ret = bl.write_fd(file_fd);
3107     if (ret) {
3108       cerr << "Failed to write to " << file << ": " << cpp_strerror(ret) << std::endl;
3109     } else {
3110       cout << "inc-osdmap#" << epoch << " exported." << std::endl;
3111     }
3112     goto out;
3113   } else if (op == "set-inc-osdmap") {
3114     bufferlist bl;
3115     ret = get_fd_data(file_fd, bl);
3116     if (ret < 0) {
3117       cerr << "Failed to read incremental osdmap  " << cpp_strerror(ret) << std::endl;
3118       goto out;
3119     } else {
3120       ret = set_inc_osdmap(fs, epoch, bl, force, *osr);
3121     }
3122     goto out;
3123   } else if (op == "update-mon-db") {
3124     if (!vm.count("mon-store-path")) {
3125       cerr << "Please specify the path to monitor db to update" << std::endl;
3126       ret = -EINVAL;
3127     } else {
3128       ret = update_mon_db(*fs, superblock, dpath + "/keyring", mon_store_path);
3129     }
3130     goto out;
3131   }
3132
3133   log_oid = OSD::make_pg_log_oid(pgid);
3134   biginfo_oid = OSD::make_pg_biginfo_oid(pgid);
3135
3136   if (op == "remove") {
3137     if (!force && !dry_run) {
3138       cerr << "Please use export-remove or you must use --force option" << std::endl;
3139       ret = -EINVAL;
3140       goto out;
3141     }
3142     ret = initiate_new_remove_pg(fs, pgid, *osr);
3143     if (ret < 0) {
3144       cerr << "PG '" << pgid << "' not found" << std::endl;
3145       goto out;
3146     }
3147     cout << "Remove successful" << std::endl;
3148     goto out;
3149   }
3150
3151   if (op == "fix-lost") {
3152     boost::scoped_ptr<action_on_object_t> action;
3153     action.reset(new do_fix_lost(osr));
3154     if (pgidstr.length())
3155       ret = action_on_all_objects_in_exact_pg(fs, coll_t(pgid), *action, debug);
3156     else
3157       ret = action_on_all_objects(fs, *action, debug);
3158     goto out;
3159   }
3160
3161   if (op == "list") {
3162     ret = do_list(fs, pgidstr, object, nspace, formatter, debug,
3163                   human_readable, head);
3164     if (ret < 0) {
3165       cerr << "do_list failed: " << cpp_strerror(ret) << std::endl;
3166     }
3167     goto out;
3168   }
3169
3170   if (op == "dump-super") {
3171     formatter->open_object_section("superblock");
3172     superblock.dump(formatter);
3173     formatter->close_section();
3174     formatter->flush(cout);
3175     cout << std::endl;
3176     goto out;
3177   }
3178
3179   if (op == "meta-list") {
3180     ret = do_meta(fs, object, formatter, debug, human_readable);
3181     if (ret < 0) {
3182       cerr << "do_meta failed: " << cpp_strerror(ret) << std::endl;
3183     }
3184     goto out;
3185   }
3186
3187   ret = fs->list_collections(ls);
3188   if (ret < 0) {
3189     cerr << "failed to list pgs: " << cpp_strerror(ret) << std::endl;
3190     goto out;
3191   }
3192
3193   if (debug && op == "list-pgs")
3194     cout << "Performing list-pgs operation" << std::endl;
3195
3196   // Find pg
3197   for (it = ls.begin(); it != ls.end(); ++it) {
3198     spg_t tmppgid;
3199
3200     if (pgidstr == "meta") {
3201       if (it->to_str() == "meta")
3202         break;
3203       else
3204         continue;
3205     }
3206
3207     if (!it->is_pg(&tmppgid)) {
3208       continue;
3209     }
3210
3211     if (it->is_temp(&tmppgid)) {
3212       continue;
3213     }
3214
3215     if (op != "list-pgs" && tmppgid != pgid) {
3216       continue;
3217     }
3218
3219     if (op != "list-pgs") {
3220       //Found!
3221       break;
3222     }
3223
3224     cout << tmppgid << std::endl;
3225   }
3226
3227   if (op == "list-pgs") {
3228     ret = 0;
3229     goto out;
3230   }
3231
3232   // If not an object command nor any of the ops handled below, then output this usage
3233   // before complaining about a bad pgid
3234   if (!vm.count("objcmd") && op != "export" && op != "export-remove" && op != "info" && op != "log" && op != "rm-past-intervals" && op != "mark-complete") {
3235     cerr << "Must provide --op (info, log, remove, mkfs, fsck, repair, export, export-remove, import, list, fix-lost, list-pgs, rm-past-intervals, dump-journal, dump-super, meta-list, "
3236       "get-osdmap, set-osdmap, get-inc-osdmap, set-inc-osdmap, mark-complete)"
3237          << std::endl;
3238     usage(desc);
3239     ret = 1;
3240     goto out;
3241   }
3242   epoch_t map_epoch;
3243 // The following code for export, info, log require omap or !skip-mount-omap
3244   if (it != ls.end()) {
3245
3246     coll_t coll = *it;
3247
3248     if (vm.count("objcmd")) {
3249       ret = 0;
3250       if (objcmd == "remove" || objcmd == "removeall") {
3251         bool all = (objcmd == "removeall");
3252         ret = do_remove_object(fs, coll, ghobj, all, force, *osr);
3253         goto out;
3254       } else if (objcmd == "list-attrs") {
3255         ret = do_list_attrs(fs, coll, ghobj);
3256         goto out;
3257       } else if (objcmd == "list-omap") {
3258         ret = do_list_omap(fs, coll, ghobj);
3259         goto out;
3260       } else if (objcmd == "get-bytes" || objcmd == "set-bytes") {
3261         if (objcmd == "get-bytes") {
3262           int fd;
3263           if (vm.count("arg1") == 0 || arg1 == "-") {
3264             fd = STDOUT_FILENO;
3265           } else {
3266             fd = open(arg1.c_str(), O_WRONLY|O_TRUNC|O_CREAT|O_EXCL|O_LARGEFILE, 0666);
3267             if (fd == -1) {
3268               cerr << "open " << arg1 << " " << cpp_strerror(errno) << std::endl;
3269               ret = 1;
3270               goto out;
3271             }
3272           }
3273           ret = do_get_bytes(fs, coll, ghobj, fd);
3274           if (fd != STDOUT_FILENO)
3275             close(fd);
3276         } else {
3277           int fd;
3278           if (vm.count("arg1") == 0 || arg1 == "-") {
3279             // Since read_fd() doesn't handle ^D from a tty stdin, don't allow it.
3280             if (isatty(STDIN_FILENO)) {
3281                 cerr << "stdin is a tty and no file specified" << std::endl;
3282                 ret = 1;
3283                 goto out;
3284             }
3285             fd = STDIN_FILENO;
3286           } else {
3287             fd = open(arg1.c_str(), O_RDONLY|O_LARGEFILE, 0666);
3288             if (fd == -1) {
3289               cerr << "open " << arg1 << " " << cpp_strerror(errno) << std::endl;
3290               ret = 1;
3291               goto out;
3292             }
3293           }
3294           ret = do_set_bytes(fs, coll, ghobj, fd, *osr);
3295           if (fd != STDIN_FILENO)
3296             close(fd);
3297         }
3298         goto out;
3299       } else if (objcmd == "get-attr") {
3300         if (vm.count("arg1") == 0) {
3301           usage(desc);
3302           ret = 1;
3303           goto out;
3304         }
3305         ret = do_get_attr(fs, coll, ghobj, arg1);
3306         goto out;
3307       } else if (objcmd == "set-attr") {
3308         if (vm.count("arg1") == 0) {
3309           usage(desc);
3310           ret = 1;
3311         }
3312
3313         int fd;
3314         if (vm.count("arg2") == 0 || arg2 == "-") {
3315           // Since read_fd() doesn't handle ^D from a tty stdin, don't allow it.
3316           if (isatty(STDIN_FILENO)) {
3317             cerr << "stdin is a tty and no file specified" << std::endl;
3318             ret = 1;
3319             goto out;
3320           }
3321           fd = STDIN_FILENO;
3322         } else {
3323           fd = open(arg2.c_str(), O_RDONLY|O_LARGEFILE, 0666);
3324           if (fd == -1) {
3325             cerr << "open " << arg2 << " " << cpp_strerror(errno) << std::endl;
3326             ret = 1;
3327             goto out;
3328           }
3329         }
3330         ret = do_set_attr(fs, coll, ghobj, arg1, fd, *osr);
3331         if (fd != STDIN_FILENO)
3332           close(fd);
3333         goto out;
3334       } else if (objcmd == "rm-attr") {
3335         if (vm.count("arg1") == 0) {
3336           usage(desc);
3337           ret = 1;
3338           goto out;
3339         }
3340         ret = do_rm_attr(fs, coll, ghobj, arg1, *osr);
3341         goto out;
3342       } else if (objcmd == "get-omap") {
3343         if (vm.count("arg1") == 0) {
3344           usage(desc);
3345           ret = 1;
3346           goto out;
3347         }
3348         ret = do_get_omap(fs, coll, ghobj, arg1);
3349         goto out;
3350       } else if (objcmd == "set-omap") {
3351         if (vm.count("arg1") == 0) {
3352           usage(desc);
3353           ret = 1;
3354           goto out;
3355         }
3356         int fd;
3357         if (vm.count("arg2") == 0 || arg2 == "-") {
3358           // Since read_fd() doesn't handle ^D from a tty stdin, don't allow it.
3359           if (isatty(STDIN_FILENO)) {
3360             cerr << "stdin is a tty and no file specified" << std::endl;
3361             ret = 1;
3362             goto out;
3363           }
3364           fd = STDIN_FILENO;
3365         } else {
3366           fd = open(arg2.c_str(), O_RDONLY|O_LARGEFILE, 0666);
3367           if (fd == -1) {
3368             cerr << "open " << arg2 << " " << cpp_strerror(errno) << std::endl;
3369             ret = 1;
3370             goto out;
3371           }
3372         }
3373         ret = do_set_omap(fs, coll, ghobj, arg1, fd, *osr);
3374         if (fd != STDIN_FILENO)
3375           close(fd);
3376         goto out;
3377       } else if (objcmd == "rm-omap") {
3378         if (vm.count("arg1") == 0) {
3379           usage(desc);
3380           ret = 1;
3381           goto out;
3382         }
3383         ret = do_rm_omap(fs, coll, ghobj, arg1, *osr);
3384         goto out;
3385       } else if (objcmd == "get-omaphdr") {
3386         if (vm.count("arg1")) {
3387           usage(desc);
3388           ret = 1;
3389           goto out;
3390         }
3391         ret = do_get_omaphdr(fs, coll, ghobj);
3392         goto out;
3393       } else if (objcmd == "set-omaphdr") {
3394         // Extra arg
3395         if (vm.count("arg2")) {
3396           usage(desc);
3397           ret = 1;
3398           goto out;
3399         }
3400         int fd;
3401         if (vm.count("arg1") == 0 || arg1 == "-") {
3402           // Since read_fd() doesn't handle ^D from a tty stdin, don't allow it.
3403           if (isatty(STDIN_FILENO)) {
3404             cerr << "stdin is a tty and no file specified" << std::endl;
3405             ret = 1;
3406             goto out;
3407           }
3408           fd = STDIN_FILENO;
3409         } else {
3410           fd = open(arg1.c_str(), O_RDONLY|O_LARGEFILE, 0666);
3411           if (fd == -1) {
3412             cerr << "open " << arg1 << " " << cpp_strerror(errno) << std::endl;
3413             ret = 1;
3414             goto out;
3415           }
3416         }
3417         ret = do_set_omaphdr(fs, coll, ghobj, fd, *osr);
3418         if (fd != STDIN_FILENO)
3419           close(fd);
3420         goto out;
3421       } else if (objcmd == "dump") {
3422         // There should not be any other arguments
3423         if (vm.count("arg1") || vm.count("arg2")) {
3424           usage(desc);
3425           ret = 1;
3426           goto out;
3427         }
3428         ret = print_obj_info(fs, coll, ghobj, formatter);
3429         goto out;
3430       } else if (objcmd == "set-size" || objcmd == "corrupt-size") {
3431         // Undocumented testing feature
3432         bool corrupt = (objcmd == "corrupt-size");
3433         // Extra arg
3434         if (vm.count("arg1") == 0 || vm.count("arg2")) {
3435           usage(desc);
3436           ret = 1;
3437           goto out;
3438         }
3439         if (arg1.length() == 0 || !isdigit(arg1.c_str()[0])) {
3440           cerr << "Invalid size '" << arg1 << "' specified" << std::endl;
3441           ret = 1;
3442           goto out;
3443         }
3444         uint64_t size = atoll(arg1.c_str());
3445         ret = set_size(fs, coll, ghobj, size, formatter, *osr, corrupt);
3446         goto out;
3447       } else if (objcmd == "clear-snapset") {
3448         // UNDOCUMENTED: For testing zap SnapSet
3449         // IGNORE extra args since not in usage anyway
3450         if (!ghobj.hobj.has_snapset()) {
3451           cerr << "'" << objcmd << "' requires a head or snapdir object" << std::endl;
3452           ret = 1;
3453           goto out;
3454         }
3455         ret = clear_snapset(fs, coll, ghobj, arg1, *osr);
3456         goto out;
3457       } else if (objcmd == "remove-clone-metadata") {
3458         // Extra arg
3459         if (vm.count("arg1") == 0 || vm.count("arg2")) {
3460           usage(desc);
3461           ret = 1;
3462           goto out;
3463         }
3464         if (!ghobj.hobj.has_snapset()) {
3465           cerr << "'" << objcmd << "' requires a head or snapdir object" << std::endl;
3466           ret = 1;
3467           goto out;
3468         }
3469         if (arg1.length() == 0 || !isdigit(arg1.c_str()[0])) {
3470           cerr << "Invalid cloneid '" << arg1 << "' specified" << std::endl;
3471           ret = 1;
3472           goto out;
3473         }
3474         snapid_t cloneid = atoi(arg1.c_str());
3475         ret = remove_clone(fs, coll, ghobj, cloneid, force, *osr);
3476         goto out;
3477       }
3478       cerr << "Unknown object command '" << objcmd << "'" << std::endl;
3479       usage(desc);
3480       ret = 1;
3481       goto out;
3482     }
3483
3484     bufferlist bl;
3485     map_epoch = 0;
3486     ret = PG::peek_map_epoch(fs, pgid, &map_epoch, &bl);
3487     if (ret < 0)
3488       cerr << "peek_map_epoch reports error" << std::endl;
3489     if (debug)
3490       cerr << "map_epoch " << map_epoch << std::endl;
3491
3492     pg_info_t info(pgid);
3493     PastIntervals past_intervals;
3494     __u8 struct_ver;
3495     ret = PG::read_info(fs, pgid, coll, bl, info, past_intervals,
3496                       struct_ver);
3497     if (ret < 0) {
3498       cerr << "read_info error " << cpp_strerror(ret) << std::endl;
3499       goto out;
3500     }
3501     if (struct_ver < PG::compat_struct_v) {
3502       cerr << "PG is too old to upgrade, use older Ceph version" << std::endl;
3503       ret = -EFAULT;
3504       goto out;
3505     }
3506     if (debug)
3507       cerr << "struct_v " << (int)struct_ver << std::endl;
3508
3509     if (op == "export" || op == "export-remove") {
3510       ret = tool.do_export(fs, coll, pgid, info, map_epoch, struct_ver, superblock, past_intervals);
3511       if (ret == 0) {
3512         cerr << "Export successful" << std::endl;
3513         if (op == "export-remove") {
3514           ret = initiate_new_remove_pg(fs, pgid, *osr);
3515           // Export succeeded, so pgid is there
3516           assert(ret == 0);
3517           cerr << "Remove successful" << std::endl;
3518         }
3519       }
3520     } else if (op == "info") {
3521       formatter->open_object_section("info");
3522       info.dump(formatter);
3523       formatter->close_section();
3524       formatter->flush(cout);
3525       cout << std::endl;
3526     } else if (op == "log") {
3527       PGLog::IndexedLog log;
3528       pg_missing_t missing;
3529       ret = get_log(fs, struct_ver, coll, pgid, info, log, missing);
3530       if (ret < 0)
3531           goto out;
3532
3533       dump_log(formatter, cout, log, missing);
3534     } else if (op == "rm-past-intervals") {
3535       ObjectStore::Transaction tran;
3536       ObjectStore::Transaction *t = &tran;
3537
3538       if (struct_ver < PG::compat_struct_v) {
3539         cerr << "Can't remove past-intervals, version mismatch " << (int)struct_ver
3540           << " (pg)  < compat " << (int)PG::compat_struct_v << " (tool)"
3541           << std::endl;
3542         ret = -EFAULT;
3543         goto out;
3544       }
3545
3546       cout << "Remove past-intervals " << past_intervals << std::endl;
3547
3548       past_intervals.clear();
3549       if (dry_run) {
3550         ret = 0;
3551         goto out;
3552       }
3553       ret = write_info(*t, map_epoch, info, past_intervals);
3554
3555       if (ret == 0) {
3556         fs->apply_transaction(osr, std::move(*t));
3557         cout << "Removal succeeded" << std::endl;
3558       }
3559     } else if (op == "mark-complete") {
3560       ObjectStore::Transaction tran;
3561       ObjectStore::Transaction *t = &tran;
3562
3563       if (struct_ver < PG::compat_struct_v) {
3564         cerr << "Can't mark-complete, version mismatch " << (int)struct_ver
3565              << " (pg)  < compat " << (int)PG::compat_struct_v << " (tool)"
3566              << std::endl;
3567         ret = 1;
3568         goto out;
3569       }
3570
3571       cout << "Marking complete " << std::endl;
3572
3573       info.last_update = eversion_t(superblock.current_epoch, info.last_update.version + 1);
3574       info.last_backfill = hobject_t::get_max();
3575       info.last_epoch_started = superblock.current_epoch;
3576       info.history.last_epoch_started = superblock.current_epoch;
3577       info.history.last_epoch_clean = superblock.current_epoch;
3578       past_intervals.clear();
3579
3580       if (!dry_run) {
3581         ret = write_info(*t, map_epoch, info, past_intervals);
3582         if (ret != 0)
3583           goto out;
3584         fs->apply_transaction(osr, std::move(*t));
3585       }
3586       cout << "Marking complete succeeded" << std::endl;
3587     } else {
3588       assert(!"Should have already checked for valid --op");
3589     }
3590   } else {
3591     cerr << "PG '" << pgid << "' not found" << std::endl;
3592     ret = -ENOENT;
3593   }
3594
3595 out:
3596   int r = fs->umount();
3597   delete osr;
3598   if (r < 0) {
3599     cerr << "umount failed: " << cpp_strerror(r) << std::endl;
3600     // If no previous error, then use umount() error
3601     if (ret == 0)
3602       ret = r;
3603   }
3604
3605   if (dry_run) {
3606     // Export output can go to stdout, so put this message on stderr
3607     if (op == "export")
3608       cerr << "dry-run: Nothing changed" << std::endl;
3609     else
3610       cout << "dry-run: Nothing changed" << std::endl;
3611   }
3612
3613   if (ret < 0)
3614     ret = 1;
3615   return ret;
3616 }