Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / PGBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013,2014 Inktank Storage, Inc.
7  * Copyright (C) 2013,2014 Cloudwatt <libre.licensing@cloudwatt.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software
14  * Foundation.  See file COPYING.
15  *
16  */
17
18
19 #include "common/errno.h"
20 #include "common/scrub_types.h"
21 #include "ReplicatedBackend.h"
22 #include "ScrubStore.h"
23 #include "ECBackend.h"
24 #include "PGBackend.h"
25 #include "OSD.h"
26 #include "erasure-code/ErasureCodePlugin.h"
27 #include "OSDMap.h"
28 #include "PGLog.h"
29 #include "common/LogClient.h"
30 #include "messages/MOSDPGRecoveryDelete.h"
31 #include "messages/MOSDPGRecoveryDeleteReply.h"
32
33 #define dout_context cct
34 #define dout_subsys ceph_subsys_osd
35 #define DOUT_PREFIX_ARGS this
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout, this)
38 static ostream& _prefix(std::ostream *_dout, PGBackend *pgb) {
39   return *_dout << pgb->get_parent()->gen_dbg_prefix();
40 }
41
42 void PGBackend::recover_delete_object(const hobject_t &oid, eversion_t v,
43                                       RecoveryHandle *h)
44 {
45   assert(get_parent()->get_actingbackfill_shards().size() > 0);
46   for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
47     if (shard == get_parent()->whoami_shard())
48       continue;
49     if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
50       dout(20) << __func__ << " will remove " << oid << " " << v << " from "
51                << shard << dendl;
52       h->deletes[shard].push_back(make_pair(oid, v));
53       get_parent()->begin_peer_recover(shard, oid);
54     }
55   }
56 }
57
58 void PGBackend::send_recovery_deletes(int prio,
59                                       const map<pg_shard_t, vector<pair<hobject_t, eversion_t> > > &deletes)
60 {
61   epoch_t min_epoch = get_parent()->get_last_peering_reset_epoch();
62   for (const auto& p : deletes) {
63     const auto& shard = p.first;
64     const auto& objects = p.second;
65     ConnectionRef con = get_parent()->get_con_osd_cluster(
66       shard.osd,
67       get_osdmap()->get_epoch());
68     if (!con)
69       continue;
70     auto it = objects.begin();
71     while (it != objects.end()) {
72       uint64_t cost = 0;
73       uint64_t deletes = 0;
74       spg_t target_pg = spg_t(get_parent()->get_info().pgid.pgid, shard.shard);
75       MOSDPGRecoveryDelete *msg =
76         new MOSDPGRecoveryDelete(get_parent()->whoami_shard(),
77                                  target_pg,
78                                  get_osdmap()->get_epoch(),
79                                  min_epoch);
80       msg->set_priority(prio);
81
82       while (it != objects.end() &&
83              cost < cct->_conf->osd_max_push_cost &&
84              deletes < cct->_conf->osd_max_push_objects) {
85         dout(20) << __func__ << ": sending recovery delete << " << it->first
86                  << " " << it->second << " to osd." << shard << dendl;
87         msg->objects.push_back(*it);
88         cost += cct->_conf->osd_push_per_object_cost;
89         ++deletes;
90         ++it;
91       }
92
93       msg->set_cost(cost);
94       get_parent()->send_message_osd_cluster(msg, con);
95     }
96   }
97 }
98
99 bool PGBackend::handle_message(OpRequestRef op)
100 {
101   switch (op->get_req()->get_type()) {
102   case MSG_OSD_PG_RECOVERY_DELETE:
103     handle_recovery_delete(op);
104     return true;
105
106   case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
107     handle_recovery_delete_reply(op);
108     return true;
109
110   default:
111     break;
112   }
113
114   return _handle_message(op);
115 }
116
117 void PGBackend::handle_recovery_delete(OpRequestRef op)
118 {
119   const MOSDPGRecoveryDelete *m = static_cast<const MOSDPGRecoveryDelete *>(op->get_req());
120   assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
121   dout(20) << __func__ << " " << op << dendl;
122
123   op->mark_started();
124
125   C_GatherBuilder gather(cct);
126   for (const auto &p : m->objects) {
127     get_parent()->remove_missing_object(p.first, p.second, gather.new_sub());
128   }
129
130   MOSDPGRecoveryDeleteReply *reply = new MOSDPGRecoveryDeleteReply;
131   reply->from = get_parent()->whoami_shard();
132   reply->set_priority(m->get_priority());
133   reply->pgid = spg_t(get_parent()->get_info().pgid.pgid, m->from.shard);
134   reply->map_epoch = m->map_epoch;
135   reply->min_epoch = m->min_epoch;
136   reply->objects = m->objects;
137   ConnectionRef conn = m->get_connection();
138
139   gather.set_finisher(new FunctionContext(
140     [=](int r) {
141       if (r != -EAGAIN) {
142         get_parent()->send_message_osd_cluster(reply, conn.get());
143       } else {
144         reply->put();
145       }
146     }));
147   gather.activate();
148 }
149
150 void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
151 {
152   const MOSDPGRecoveryDeleteReply *m = static_cast<const MOSDPGRecoveryDeleteReply *>(op->get_req());
153   assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
154   dout(20) << __func__ << " " << op << dendl;
155
156   for (const auto &p : m->objects) {
157     ObjectRecoveryInfo recovery_info;
158     hobject_t oid = p.first;
159     recovery_info.version = p.second;
160     get_parent()->on_peer_recover(m->from, oid, recovery_info);
161     bool peers_recovered = true;
162     for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
163       if (shard == get_parent()->whoami_shard())
164         continue;
165       if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
166         dout(20) << __func__ << " " << oid << " still missing on at least "
167                  << shard << dendl;
168         peers_recovered = false;
169         break;
170       }
171     }
172     if (peers_recovered && !get_parent()->get_local_missing().is_missing(oid)) {
173       dout(20) << __func__ << " completed recovery, local_missing = "
174                << get_parent()->get_local_missing() << dendl;
175       object_stat_sum_t stat_diff;
176       stat_diff.num_objects_recovered = 1;
177       get_parent()->on_global_recover(p.first, stat_diff, true);
178     }
179   }
180 }
181
182 void PGBackend::rollback(
183   const pg_log_entry_t &entry,
184   ObjectStore::Transaction *t)
185 {
186
187   struct RollbackVisitor : public ObjectModDesc::Visitor {
188     const hobject_t &hoid;
189     PGBackend *pg;
190     ObjectStore::Transaction t;
191     RollbackVisitor(
192       const hobject_t &hoid,
193       PGBackend *pg) : hoid(hoid), pg(pg) {}
194     void append(uint64_t old_size) override {
195       ObjectStore::Transaction temp;
196       pg->rollback_append(hoid, old_size, &temp);
197       temp.append(t);
198       temp.swap(t);
199     }
200     void setattrs(map<string, boost::optional<bufferlist> > &attrs) override {
201       ObjectStore::Transaction temp;
202       pg->rollback_setattrs(hoid, attrs, &temp);
203       temp.append(t);
204       temp.swap(t);
205     }
206     void rmobject(version_t old_version) override {
207       ObjectStore::Transaction temp;
208       pg->rollback_stash(hoid, old_version, &temp);
209       temp.append(t);
210       temp.swap(t);
211     }
212     void try_rmobject(version_t old_version) override {
213       ObjectStore::Transaction temp;
214       pg->rollback_try_stash(hoid, old_version, &temp);
215       temp.append(t);
216       temp.swap(t);
217     }
218     void create() override {
219       ObjectStore::Transaction temp;
220       pg->rollback_create(hoid, &temp);
221       temp.append(t);
222       temp.swap(t);
223     }
224     void update_snaps(const set<snapid_t> &snaps) override {
225       ObjectStore::Transaction temp;
226       pg->get_parent()->pgb_set_object_snap_mapping(hoid, snaps, &temp);
227       temp.append(t);
228       temp.swap(t);
229     }
230     void rollback_extents(
231       version_t gen,
232       const vector<pair<uint64_t, uint64_t> > &extents) override {
233       ObjectStore::Transaction temp;
234       pg->rollback_extents(gen, extents, hoid, &temp);
235       temp.append(t);
236       temp.swap(t);
237     }
238   };
239
240   assert(entry.mod_desc.can_rollback());
241   RollbackVisitor vis(entry.soid, this);
242   entry.mod_desc.visit(&vis);
243   t->append(vis.t);
244 }
245
246 struct Trimmer : public ObjectModDesc::Visitor {
247   const hobject_t &soid;
248   PGBackend *pg;
249   ObjectStore::Transaction *t;
250   Trimmer(
251     const hobject_t &soid,
252     PGBackend *pg,
253     ObjectStore::Transaction *t)
254     : soid(soid), pg(pg), t(t) {}
255   void rmobject(version_t old_version) override {
256     pg->trim_rollback_object(
257       soid,
258       old_version,
259       t);
260   }
261   // try_rmobject defaults to rmobject
262   void rollback_extents(
263     version_t gen,
264     const vector<pair<uint64_t, uint64_t> > &extents) override {
265     pg->trim_rollback_object(
266       soid,
267       gen,
268       t);
269   }
270 };
271
272 void PGBackend::rollforward(
273   const pg_log_entry_t &entry,
274   ObjectStore::Transaction *t)
275 {
276   auto dpp = get_parent()->get_dpp();
277   ldpp_dout(dpp, 20) << __func__ << ": entry=" << entry << dendl;
278   if (!entry.can_rollback())
279     return;
280   Trimmer trimmer(entry.soid, this, t);
281   entry.mod_desc.visit(&trimmer);
282 }
283
284 void PGBackend::trim(
285   const pg_log_entry_t &entry,
286   ObjectStore::Transaction *t)
287 {
288   if (!entry.can_rollback())
289     return;
290   Trimmer trimmer(entry.soid, this, t);
291   entry.mod_desc.visit(&trimmer);
292 }
293
294 void PGBackend::try_stash(
295   const hobject_t &hoid,
296   version_t v,
297   ObjectStore::Transaction *t)
298 {
299   t->try_rename(
300     coll,
301     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
302     ghobject_t(hoid, v, get_parent()->whoami_shard().shard));
303 }
304
305 void PGBackend::remove(
306   const hobject_t &hoid,
307   ObjectStore::Transaction *t) {
308   assert(!hoid.is_temp());
309   t->remove(
310     coll,
311     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
312   get_parent()->pgb_clear_object_snap_mapping(hoid, t);
313 }
314
315 void PGBackend::on_change_cleanup(ObjectStore::Transaction *t)
316 {
317   dout(10) << __func__ << dendl;
318   // clear temp
319   for (set<hobject_t>::iterator i = temp_contents.begin();
320        i != temp_contents.end();
321        ++i) {
322     dout(10) << __func__ << ": Removing oid "
323              << *i << " from the temp collection" << dendl;
324     t->remove(
325       coll,
326       ghobject_t(*i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
327   }
328   temp_contents.clear();
329 }
330
331 int PGBackend::objects_list_partial(
332   const hobject_t &begin,
333   int min,
334   int max,
335   vector<hobject_t> *ls,
336   hobject_t *next)
337 {
338   assert(ls);
339   // Starts with the smallest generation to make sure the result list
340   // has the marker object (it might have multiple generations
341   // though, which would be filtered).
342   ghobject_t _next;
343   if (!begin.is_min())
344     _next = ghobject_t(begin, 0, get_parent()->whoami_shard().shard);
345   ls->reserve(max);
346   int r = 0;
347
348   if (min > max)
349     min = max;
350
351   while (!_next.is_max() && ls->size() < (unsigned)min) {
352     vector<ghobject_t> objects;
353     r = store->collection_list(
354       ch,
355       _next,
356       ghobject_t::get_max(),
357       max - ls->size(),
358       &objects,
359       &_next);
360     if (r != 0) {
361       derr << __func__ << " list collection " << ch << " got: " << cpp_strerror(r) << dendl;
362       break;
363     }
364     for (vector<ghobject_t>::iterator i = objects.begin();
365          i != objects.end();
366          ++i) {
367       if (i->is_pgmeta() || i->hobj.is_temp()) {
368         continue;
369       }
370       if (i->is_no_gen()) {
371         ls->push_back(i->hobj);
372       }
373     }
374   }
375   if (r == 0)
376     *next = _next.hobj;
377   return r;
378 }
379
380 int PGBackend::objects_list_range(
381   const hobject_t &start,
382   const hobject_t &end,
383   snapid_t seq,
384   vector<hobject_t> *ls,
385   vector<ghobject_t> *gen_obs)
386 {
387   assert(ls);
388   vector<ghobject_t> objects;
389   int r = store->collection_list(
390     ch,
391     ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
392     ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
393     INT_MAX,
394     &objects,
395     NULL);
396   ls->reserve(objects.size());
397   for (vector<ghobject_t>::iterator i = objects.begin();
398        i != objects.end();
399        ++i) {
400     if (i->is_pgmeta() || i->hobj.is_temp()) {
401       continue;
402     }
403     if (i->is_no_gen()) {
404       ls->push_back(i->hobj);
405     } else if (gen_obs) {
406       gen_obs->push_back(*i);
407     }
408   }
409   return r;
410 }
411
412 int PGBackend::objects_get_attr(
413   const hobject_t &hoid,
414   const string &attr,
415   bufferlist *out)
416 {
417   bufferptr bp;
418   int r = store->getattr(
419     ch,
420     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
421     attr.c_str(),
422     bp);
423   if (r >= 0 && out) {
424     out->clear();
425     out->push_back(std::move(bp));
426   }
427   return r;
428 }
429
430 int PGBackend::objects_get_attrs(
431   const hobject_t &hoid,
432   map<string, bufferlist> *out)
433 {
434   return store->getattrs(
435     ch,
436     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
437     *out);
438 }
439
440 void PGBackend::rollback_setattrs(
441   const hobject_t &hoid,
442   map<string, boost::optional<bufferlist> > &old_attrs,
443   ObjectStore::Transaction *t) {
444   map<string, bufferlist> to_set;
445   assert(!hoid.is_temp());
446   for (map<string, boost::optional<bufferlist> >::iterator i = old_attrs.begin();
447        i != old_attrs.end();
448        ++i) {
449     if (i->second) {
450       to_set[i->first] = i->second.get();
451     } else {
452       t->rmattr(
453         coll,
454         ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
455         i->first);
456     }
457   }
458   t->setattrs(
459     coll,
460     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
461     to_set);
462 }
463
464 void PGBackend::rollback_append(
465   const hobject_t &hoid,
466   uint64_t old_size,
467   ObjectStore::Transaction *t) {
468   assert(!hoid.is_temp());
469   t->truncate(
470     coll,
471     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
472     old_size);
473 }
474
475 void PGBackend::rollback_stash(
476   const hobject_t &hoid,
477   version_t old_version,
478   ObjectStore::Transaction *t) {
479   assert(!hoid.is_temp());
480   t->remove(
481     coll,
482     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
483   t->collection_move_rename(
484     coll,
485     ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard),
486     coll,
487     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
488 }
489
490 void PGBackend::rollback_try_stash(
491   const hobject_t &hoid,
492   version_t old_version,
493   ObjectStore::Transaction *t) {
494   assert(!hoid.is_temp());
495   t->remove(
496     coll,
497     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
498   t->try_rename(
499     coll,
500     ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard),
501     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
502 }
503
504 void PGBackend::rollback_extents(
505   version_t gen,
506   const vector<pair<uint64_t, uint64_t> > &extents,
507   const hobject_t &hoid,
508   ObjectStore::Transaction *t) {
509   auto shard = get_parent()->whoami_shard().shard;
510   for (auto &&extent: extents) {
511     t->clone_range(
512       coll,
513       ghobject_t(hoid, gen, shard),
514       ghobject_t(hoid, ghobject_t::NO_GEN, shard),
515       extent.first,
516       extent.second,
517       extent.first);
518   }
519   t->remove(
520     coll,
521     ghobject_t(hoid, gen, shard));
522 }
523
524 void PGBackend::trim_rollback_object(
525   const hobject_t &hoid,
526   version_t old_version,
527   ObjectStore::Transaction *t) {
528   assert(!hoid.is_temp());
529   t->remove(
530     coll, ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard));
531 }
532
533 PGBackend *PGBackend::build_pg_backend(
534   const pg_pool_t &pool,
535   const OSDMapRef curmap,
536   Listener *l,
537   coll_t coll,
538   ObjectStore::CollectionHandle &ch,
539   ObjectStore *store,
540   CephContext *cct)
541 {
542   switch (pool.type) {
543   case pg_pool_t::TYPE_REPLICATED: {
544     return new ReplicatedBackend(l, coll, ch, store, cct);
545   }
546   case pg_pool_t::TYPE_ERASURE: {
547     ErasureCodeInterfaceRef ec_impl;
548     ErasureCodeProfile profile = curmap->get_erasure_code_profile(pool.erasure_code_profile);
549     assert(profile.count("plugin"));
550     stringstream ss;
551     ceph::ErasureCodePluginRegistry::instance().factory(
552       profile.find("plugin")->second,
553       cct->_conf->get_val<std::string>("erasure_code_dir"),
554       profile,
555       &ec_impl,
556       &ss);
557     assert(ec_impl);
558     return new ECBackend(
559       l,
560       coll,
561       ch,
562       store,
563       cct,
564       ec_impl,
565       pool.stripe_width);
566   }
567   default:
568     ceph_abort();
569     return NULL;
570   }
571 }
572
573 /*
574  * pg lock may or may not be held
575  */
576 void PGBackend::be_scan_list(
577   ScrubMap &map, const vector<hobject_t> &ls, bool deep, uint32_t seed,
578   ThreadPool::TPHandle &handle)
579 {
580   dout(10) << __func__ << " scanning " << ls.size() << " objects"
581            << (deep ? " deeply" : "") << dendl;
582   int i = 0;
583   for (vector<hobject_t>::const_iterator p = ls.begin();
584        p != ls.end();
585        ++p, i++) {
586     handle.reset_tp_timeout();
587     hobject_t poid = *p;
588
589     struct stat st;
590     int r = store->stat(
591       ch,
592       ghobject_t(
593         poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
594       &st,
595       true);
596     if (r == 0) {
597       ScrubMap::object &o = map.objects[poid];
598       o.size = st.st_size;
599       assert(!o.negative);
600       store->getattrs(
601         ch,
602         ghobject_t(
603           poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
604         o.attrs);
605
606       // calculate the CRC32 on deep scrubs
607       if (deep) {
608         be_deep_scrub(*p, seed, o, handle);
609       }
610
611       dout(25) << __func__ << "  " << poid << dendl;
612     } else if (r == -ENOENT) {
613       dout(25) << __func__ << "  " << poid << " got " << r
614                << ", skipping" << dendl;
615     } else if (r == -EIO) {
616       dout(25) << __func__ << "  " << poid << " got " << r
617                << ", stat_error" << dendl;
618       ScrubMap::object &o = map.objects[poid];
619       o.stat_error = true;
620     } else {
621       derr << __func__ << " got: " << cpp_strerror(r) << dendl;
622       ceph_abort();
623     }
624   }
625 }
626
627 bool PGBackend::be_compare_scrub_objects(
628   pg_shard_t auth_shard,
629   const ScrubMap::object &auth,
630   const object_info_t& auth_oi,
631   const ScrubMap::object &candidate,
632   shard_info_wrapper &shard_result,
633   inconsistent_obj_wrapper &obj_result,
634   ostream &errorstream)
635 {
636   enum { CLEAN, FOUND_ERROR } error = CLEAN;
637   if (candidate.stat_error) {
638     assert(shard_result.has_stat_error());
639     error = FOUND_ERROR;
640     errorstream << "candidate had a stat error";
641   }
642   if (candidate.read_error || candidate.ec_hash_mismatch || candidate.ec_size_mismatch) {
643     error = FOUND_ERROR;
644     errorstream << "candidate had a read error";
645   }
646   if (auth.digest_present && candidate.digest_present) {
647     if (auth.digest != candidate.digest) {
648       if (error != CLEAN)
649         errorstream << ", ";
650       error = FOUND_ERROR;
651       errorstream << "data_digest 0x" << std::hex << candidate.digest
652                   << " != data_digest 0x" << auth.digest << std::dec
653                   << " from shard " << auth_shard;
654       obj_result.set_data_digest_mismatch();
655     }
656   }
657   if (auth.omap_digest_present && candidate.omap_digest_present) {
658     if (auth.omap_digest != candidate.omap_digest) {
659       if (error != CLEAN)
660         errorstream << ", ";
661       error = FOUND_ERROR;
662       errorstream << "omap_digest 0x" << std::hex << candidate.omap_digest
663                   << " != omap_digest 0x" << auth.omap_digest << std::dec
664                   << " from shard " << auth_shard;
665       obj_result.set_omap_digest_mismatch();
666     }
667   }
668   if (parent->get_pool().is_replicated()) {
669     if (auth_oi.is_data_digest() && candidate.digest_present) {
670       if (auth_oi.data_digest != candidate.digest) {
671         if (error != CLEAN)
672           errorstream << ", ";
673         error = FOUND_ERROR;
674         errorstream << "data_digest 0x" << std::hex << candidate.digest
675                     << " != data_digest 0x" << auth_oi.data_digest << std::dec
676                     << " from auth oi " << auth_oi;
677         shard_result.set_data_digest_mismatch_oi();
678       }
679     }
680     if (auth_oi.is_omap_digest() && candidate.omap_digest_present) {
681       if (auth_oi.omap_digest != candidate.omap_digest) {
682         if (error != CLEAN)
683           errorstream << ", ";
684         error = FOUND_ERROR;
685         errorstream << "omap_digest 0x" << std::hex << candidate.omap_digest
686                     << " != omap_digest 0x" << auth_oi.omap_digest << std::dec
687                     << " from auth oi " << auth_oi;
688         shard_result.set_omap_digest_mismatch_oi();
689       }
690     }
691   }
692   if (candidate.stat_error)
693     return error == FOUND_ERROR;
694   uint64_t oi_size = be_get_ondisk_size(auth_oi.size);
695   if (oi_size != candidate.size) {
696     if (error != CLEAN)
697       errorstream << ", ";
698     error = FOUND_ERROR;
699     errorstream << "size " << candidate.size
700                 << " != size " << oi_size
701                 << " from auth oi " << auth_oi;
702     shard_result.set_size_mismatch_oi();
703   }
704   if (auth.size != candidate.size) {
705     if (error != CLEAN)
706       errorstream << ", ";
707     error = FOUND_ERROR;
708     errorstream << "size " << candidate.size
709                 << " != size " << auth.size
710                 << " from shard " << auth_shard;
711     obj_result.set_size_mismatch();
712   }
713   for (map<string,bufferptr>::const_iterator i = auth.attrs.begin();
714        i != auth.attrs.end();
715        ++i) {
716     // We check system keys seperately
717     if (i->first == OI_ATTR || i->first == SS_ATTR)
718       continue;
719     if (!candidate.attrs.count(i->first)) {
720       if (error != CLEAN)
721         errorstream << ", ";
722       error = FOUND_ERROR;
723       errorstream << "attr name mismatch '" << i->first << "'";
724       obj_result.set_attr_name_mismatch();
725     } else if (candidate.attrs.find(i->first)->second.cmp(i->second)) {
726       if (error != CLEAN)
727         errorstream << ", ";
728       error = FOUND_ERROR;
729       errorstream << "attr value mismatch '" << i->first << "'";
730       obj_result.set_attr_value_mismatch();
731     }
732   }
733   for (map<string,bufferptr>::const_iterator i = candidate.attrs.begin();
734        i != candidate.attrs.end();
735        ++i) {
736     // We check system keys seperately
737     if (i->first == OI_ATTR || i->first == SS_ATTR)
738       continue;
739     if (!auth.attrs.count(i->first)) {
740       if (error != CLEAN)
741         errorstream << ", ";
742       error = FOUND_ERROR;
743       errorstream << "attr name mismatch '" << i->first << "'";
744       obj_result.set_attr_name_mismatch();
745     }
746   }
747   return error == FOUND_ERROR;
748 }
749
750 static int dcount(const object_info_t &oi)
751 {
752   int count = 0;
753   if (oi.is_data_digest())
754     count++;
755   if (oi.is_omap_digest())
756     count++;
757   return count;
758 }
759
760 map<pg_shard_t, ScrubMap *>::const_iterator
761   PGBackend::be_select_auth_object(
762   const hobject_t &obj,
763   const map<pg_shard_t,ScrubMap*> &maps,
764   object_info_t *auth_oi,
765   map<pg_shard_t, shard_info_wrapper> &shard_map,
766   inconsistent_obj_wrapper &object_error)
767 {
768   eversion_t auth_version;
769   bufferlist first_bl;
770
771   // Create list of shards with primary last so it will be auth copy all
772   // other things being equal.
773   list<pg_shard_t> shards;
774   for (map<pg_shard_t, ScrubMap *>::const_iterator j = maps.begin();
775        j != maps.end();
776        ++j) {
777     if (j->first == get_parent()->whoami_shard())
778       continue;
779     shards.push_back(j->first);
780   }
781   shards.push_back(get_parent()->whoami_shard());
782
783   map<pg_shard_t, ScrubMap *>::const_iterator auth = maps.end();
784   for (auto &l : shards) {
785     map<pg_shard_t, ScrubMap *>::const_iterator j = maps.find(l);
786     map<hobject_t, ScrubMap::object>::iterator i =
787       j->second->objects.find(obj);
788     if (i == j->second->objects.end()) {
789       continue;
790     }
791     string error_string;
792     auto& shard_info = shard_map[j->first];
793     if (j->first == get_parent()->whoami_shard())
794       shard_info.primary = true;
795     if (i->second.read_error) {
796       shard_info.set_read_error();
797       error_string += " read_error";
798     }
799     if (i->second.ec_hash_mismatch) {
800       shard_info.set_ec_hash_mismatch();
801       error_string += " ec_hash_mismatch";
802     }
803     if (i->second.ec_size_mismatch) {
804       shard_info.set_ec_size_mismatch();
805       error_string += " ec_size_mismatch";
806     }
807
808     object_info_t oi;
809     bufferlist bl;
810     map<string, bufferptr>::iterator k;
811     SnapSet ss;
812     bufferlist ss_bl;
813
814     if (i->second.stat_error) {
815       shard_info.set_stat_error();
816       error_string += " stat_error";
817       // With stat_error no further checking
818       // We don't need to also see a missing_object_info_attr
819       goto out;
820     }
821
822     // We won't pick an auth copy if the snapset is missing or won't decode.
823     if (obj.is_head() || obj.is_snapdir()) {
824       k = i->second.attrs.find(SS_ATTR);
825       if (k == i->second.attrs.end()) {
826         shard_info.set_ss_attr_missing();
827         error_string += " ss_attr_missing";
828       } else {
829         ss_bl.push_back(k->second);
830         try {
831           bufferlist::iterator bliter = ss_bl.begin();
832           ::decode(ss, bliter);
833         } catch (...) {
834           // invalid snapset, probably corrupt
835           shard_info.set_ss_attr_corrupted();
836           error_string += " ss_attr_corrupted";
837         }
838       }
839     }
840
841     k = i->second.attrs.find(OI_ATTR);
842     if (k == i->second.attrs.end()) {
843       // no object info on object, probably corrupt
844       shard_info.set_oi_attr_missing();
845       error_string += " oi_attr_missing";
846       goto out;
847     }
848     bl.push_back(k->second);
849     try {
850       bufferlist::iterator bliter = bl.begin();
851       ::decode(oi, bliter);
852     } catch (...) {
853       // invalid object info, probably corrupt
854       shard_info.set_oi_attr_corrupted();
855       error_string += " oi_attr_corrupted";
856       goto out;
857     }
858
859     // This is automatically corrected in PG::_repair_oinfo_oid()
860     assert(oi.soid == obj);
861
862     if (first_bl.length() == 0) {
863       first_bl.append(bl);
864     } else if (!object_error.has_object_info_inconsistency() && !bl.contents_equal(first_bl)) {
865       object_error.set_object_info_inconsistency();
866       error_string += " object_info_inconsistency";
867     }
868
869     if (i->second.size != be_get_ondisk_size(oi.size)) {
870       dout(5) << __func__ << " size " << i->second.size << " oi size " << oi.size << dendl;
871       shard_info.set_obj_size_oi_mismatch();
872       error_string += " obj_size_oi_mismatch";
873     }
874
875     // Don't use this particular shard due to previous errors
876     // XXX: For now we can't pick one shard for repair and another's object info or snapset
877     if (shard_info.errors)
878       goto out;
879
880     if (auth_version == eversion_t() || oi.version > auth_version ||
881         (oi.version == auth_version && dcount(oi) > dcount(*auth_oi))) {
882       auth = j;
883       *auth_oi = oi;
884       auth_version = oi.version;
885     }
886
887 out:
888     // Check error_string because some errors already generated messages
889     if (error_string != "") {
890       dout(10) << __func__ << ": error(s) osd " << j->first
891                << " for obj " << obj
892                << "," << error_string
893                << dendl;
894     }
895     // Keep scanning other shards
896   }
897   dout(10) << __func__ << ": selecting osd " << auth->first
898            << " for obj " << obj
899            << " with oi " << *auth_oi
900            << dendl;
901   return auth;
902 }
903
904 void PGBackend::be_compare_scrubmaps(
905   const map<pg_shard_t,ScrubMap*> &maps,
906   bool repair,
907   map<hobject_t, set<pg_shard_t>> &missing,
908   map<hobject_t, set<pg_shard_t>> &inconsistent,
909   map<hobject_t, list<pg_shard_t>> &authoritative,
910   map<hobject_t, pair<uint32_t,uint32_t>> &missing_digest,
911   int &shallow_errors, int &deep_errors,
912   Scrub::Store *store,
913   const spg_t& pgid,
914   const vector<int> &acting,
915   ostream &errorstream)
916 {
917   map<hobject_t,ScrubMap::object>::const_iterator i;
918   map<pg_shard_t, ScrubMap *>::const_iterator j;
919   set<hobject_t> master_set;
920   utime_t now = ceph_clock_now();
921
922   // Construct master set
923   for (j = maps.begin(); j != maps.end(); ++j) {
924     for (i = j->second->objects.begin(); i != j->second->objects.end(); ++i) {
925       master_set.insert(i->first);
926     }
927   }
928
929   // Check maps against master set and each other
930   for (set<hobject_t>::const_iterator k = master_set.begin();
931        k != master_set.end();
932        ++k) {
933     object_info_t auth_oi;
934     map<pg_shard_t, shard_info_wrapper> shard_map;
935
936     inconsistent_obj_wrapper object_error{*k};
937
938     map<pg_shard_t, ScrubMap *>::const_iterator auth =
939       be_select_auth_object(*k, maps, &auth_oi, shard_map, object_error);
940
941     list<pg_shard_t> auth_list;
942     set<pg_shard_t> object_errors;
943     if (auth == maps.end()) {
944       object_error.set_version(0);
945       object_error.set_auth_missing(*k, maps, shard_map, shallow_errors,
946         deep_errors, get_parent()->whoami_shard());
947       if (object_error.has_deep_errors())
948         ++deep_errors;
949       else if (object_error.has_shallow_errors())
950         ++shallow_errors;
951       store->add_object_error(k->pool, object_error);
952       errorstream << pgid.pgid << " soid " << *k
953                   << ": failed to pick suitable object info\n";
954       continue;
955     }
956     object_error.set_version(auth_oi.user_version);
957     ScrubMap::object& auth_object = auth->second->objects[*k];
958     set<pg_shard_t> cur_missing;
959     set<pg_shard_t> cur_inconsistent;
960
961     for (j = maps.begin(); j != maps.end(); ++j) {
962       if (j == auth)
963         shard_map[auth->first].selected_oi = true;
964       if (j->second->objects.count(*k)) {
965         shard_map[j->first].set_object(j->second->objects[*k]);
966         // Compare
967         stringstream ss;
968         bool found = be_compare_scrub_objects(auth->first,
969                                    auth_object,
970                                    auth_oi,
971                                    j->second->objects[*k],
972                                    shard_map[j->first],
973                                    object_error,
974                                    ss);
975         // Some errors might have already been set in be_select_auth_object()
976         if (shard_map[j->first].errors != 0) {
977           cur_inconsistent.insert(j->first);
978           if (shard_map[j->first].has_deep_errors())
979             ++deep_errors;
980           else
981             ++shallow_errors;
982           // Only true if be_compare_scrub_objects() found errors and put something
983           // in ss.
984           if (found)
985             errorstream << pgid << " shard " << j->first << ": soid " << *k
986                       << " " << ss.str() << "\n";
987         } else if (found) {
988           // Track possible shard to use as authoritative, if needed
989           // There are errors, without identifying the shard
990           object_errors.insert(j->first);
991         } else {
992           // XXX: The auth shard might get here that we don't know
993           // that it has the "correct" data.
994           auth_list.push_back(j->first);
995         }
996       } else {
997         cur_missing.insert(j->first);
998         shard_map[j->first].set_missing();
999         shard_map[j->first].primary = (j->first == get_parent()->whoami_shard());
1000         // Can't have any other errors if there is no information available
1001         ++shallow_errors;
1002         errorstream << pgid << " shard " << j->first << " missing " << *k
1003                     << "\n";
1004       }
1005       object_error.add_shard(j->first, shard_map[j->first]);
1006     }
1007
1008     if (auth_list.empty()) {
1009       if (object_errors.empty()) {
1010         errorstream << pgid.pgid << " soid " << *k
1011                   << ": failed to pick suitable auth object\n";
1012         goto out;
1013       }
1014       // Object errors exist and nothing in auth_list
1015       // Prefer the auth shard otherwise take first from list.
1016       pg_shard_t shard;
1017       if (object_errors.count(auth->first)) {
1018         shard = auth->first;
1019       } else {
1020         shard = *(object_errors.begin());
1021       }
1022       auth_list.push_back(shard);
1023       object_errors.erase(shard);
1024     }
1025     // At this point auth_list is populated, so we add the object errors shards
1026     // as inconsistent.
1027     cur_inconsistent.insert(object_errors.begin(), object_errors.end());
1028     if (!cur_missing.empty()) {
1029       missing[*k] = cur_missing;
1030     }
1031     if (!cur_inconsistent.empty()) {
1032       inconsistent[*k] = cur_inconsistent;
1033     }
1034     if (!cur_inconsistent.empty() || !cur_missing.empty()) {
1035       authoritative[*k] = auth_list;
1036     } else if (parent->get_pool().is_replicated()) {
1037       enum {
1038         NO = 0,
1039         MAYBE = 1,
1040         FORCE = 2,
1041       } update = NO;
1042
1043       if (auth_object.digest_present && auth_object.omap_digest_present &&
1044           (!auth_oi.is_data_digest() || !auth_oi.is_omap_digest())) {
1045         dout(20) << __func__ << " missing digest on " << *k << dendl;
1046         update = MAYBE;
1047       }
1048       if (auth_object.digest_present && auth_object.omap_digest_present &&
1049           cct->_conf->osd_debug_scrub_chance_rewrite_digest &&
1050           (((unsigned)rand() % 100) >
1051            cct->_conf->osd_debug_scrub_chance_rewrite_digest)) {
1052         dout(20) << __func__ << " randomly updating digest on " << *k << dendl;
1053         update = MAYBE;
1054       }
1055
1056       // recorded digest != actual digest?
1057       if (auth_oi.is_data_digest() && auth_object.digest_present &&
1058           auth_oi.data_digest != auth_object.digest) {
1059         assert(shard_map[auth->first].has_data_digest_mismatch_oi());
1060         errorstream << pgid << " recorded data digest 0x"
1061                     << std::hex << auth_oi.data_digest << " != on disk 0x"
1062                     << auth_object.digest << std::dec << " on " << auth_oi.soid
1063                     << "\n";
1064         if (repair)
1065           update = FORCE;
1066       }
1067       if (auth_oi.is_omap_digest() && auth_object.omap_digest_present &&
1068           auth_oi.omap_digest != auth_object.omap_digest) {
1069         assert(shard_map[auth->first].has_omap_digest_mismatch_oi());
1070         errorstream << pgid << " recorded omap digest 0x"
1071                     << std::hex << auth_oi.omap_digest << " != on disk 0x"
1072                     << auth_object.omap_digest << std::dec
1073                     << " on " << auth_oi.soid << "\n";
1074         if (repair)
1075           update = FORCE;
1076       }
1077
1078       if (update != NO) {
1079         utime_t age = now - auth_oi.local_mtime;
1080         if (update == FORCE ||
1081             age > cct->_conf->osd_deep_scrub_update_digest_min_age) {
1082           dout(20) << __func__ << " will update digest on " << *k << dendl;
1083           missing_digest[*k] = make_pair(auth_object.digest,
1084                                          auth_object.omap_digest);
1085         } else {
1086           dout(20) << __func__ << " missing digest but age " << age
1087                    << " < " << cct->_conf->osd_deep_scrub_update_digest_min_age
1088                    << " on " << *k << dendl;
1089         }
1090       }
1091     }
1092 out:
1093     if (object_error.has_deep_errors())
1094       ++deep_errors;
1095     else if (object_error.has_shallow_errors())
1096       ++shallow_errors;
1097     if (object_error.errors || object_error.union_shards.errors) {
1098       store->add_object_error(k->pool, object_error);
1099     }
1100   }
1101 }