Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / PGLog.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software 
14  * Foundation.  See file COPYING.
15  * 
16  */
17 #pragma once
18
19 // re-include our assert to clobber boost's
20 #include "include/assert.h"
21 #include "osd_types.h"
22 #include "os/ObjectStore.h"
23 #include <list>
24 using namespace std;
25
26 #define PGLOG_INDEXED_OBJECTS          (1 << 0)
27 #define PGLOG_INDEXED_CALLER_OPS       (1 << 1)
28 #define PGLOG_INDEXED_EXTRA_CALLER_OPS (1 << 2)
29 #define PGLOG_INDEXED_DUPS             (1 << 3)
30 #define PGLOG_INDEXED_ALL              (PGLOG_INDEXED_OBJECTS | \
31                                         PGLOG_INDEXED_CALLER_OPS | \
32                                         PGLOG_INDEXED_EXTRA_CALLER_OPS | \
33                                         PGLOG_INDEXED_DUPS)
34
35 class CephContext;
36
37 struct PGLog : DoutPrefixProvider {
38   DoutPrefixProvider *prefix_provider;
39   string gen_prefix() const override {
40     return prefix_provider ? prefix_provider->gen_prefix() : "";
41   }
42   unsigned get_subsys() const override {
43     return prefix_provider ? prefix_provider->get_subsys() :
44       (unsigned)ceph_subsys_osd;
45   }
46   CephContext *get_cct() const override {
47     return cct;
48   }
49
50   ////////////////////////////// sub classes //////////////////////////////
51   struct LogEntryHandler {
52     virtual void rollback(
53       const pg_log_entry_t &entry) = 0;
54     virtual void rollforward(
55       const pg_log_entry_t &entry) = 0;
56     virtual void trim(
57       const pg_log_entry_t &entry) = 0;
58     virtual void remove(
59       const hobject_t &hoid) = 0;
60     virtual void try_stash(
61       const hobject_t &hoid,
62       version_t v) = 0;
63     virtual ~LogEntryHandler() {}
64   };
65
66   /* Exceptions */
67   class read_log_and_missing_error : public buffer::error {
68   public:
69     explicit read_log_and_missing_error(const char *what) {
70       snprintf(buf, sizeof(buf), "read_log_and_missing_error: %s", what);
71     }
72     const char *what() const throw () override {
73       return buf;
74     }
75   private:
76     char buf[512];
77   };
78
79 public:
80   /**
81    * IndexLog - adds in-memory index of the log, by oid.
82    * plus some methods to manipulate it all.
83    */
84   struct IndexedLog : public pg_log_t {
85     mutable ceph::unordered_map<hobject_t,pg_log_entry_t*> objects;  // ptrs into log.  be careful!
86     mutable ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
87     mutable ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
88     mutable ceph::unordered_map<osd_reqid_t,pg_log_dup_t*> dup_index;
89
90     // recovery pointers
91     list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
92     version_t last_requested = 0;               // last object requested by primary
93
94     //
95   private:
96     mutable __u16 indexed_data = 0;
97     /**
98      * rollback_info_trimmed_to_riter points to the first log entry <=
99      * rollback_info_trimmed_to
100      *
101      * It's a reverse_iterator because rend() is a natural representation for
102      * tail, and rbegin() works nicely for head.
103      */
104     mempool::osd_pglog::list<pg_log_entry_t>::reverse_iterator
105       rollback_info_trimmed_to_riter;
106
107     template <typename F>
108     void advance_can_rollback_to(eversion_t to, F &&f) {
109       if (to > can_rollback_to)
110         can_rollback_to = to;
111
112       if (to > rollback_info_trimmed_to)
113         rollback_info_trimmed_to = to;
114
115       while (rollback_info_trimmed_to_riter != log.rbegin()) {
116         --rollback_info_trimmed_to_riter;
117         if (rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to) {
118           ++rollback_info_trimmed_to_riter;
119           break;
120         }
121         f(*rollback_info_trimmed_to_riter);
122       }
123     }
124
125     void reset_rollback_info_trimmed_to_riter() {
126       rollback_info_trimmed_to_riter = log.rbegin();
127       while (rollback_info_trimmed_to_riter != log.rend() &&
128              rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to)
129         ++rollback_info_trimmed_to_riter;
130     }
131
132     // indexes objects, caller ops and extra caller ops
133   public:
134     IndexedLog() :
135       complete_to(log.end()),
136       last_requested(0),
137       indexed_data(0),
138       rollback_info_trimmed_to_riter(log.rbegin())
139     { }
140
141     template <typename... Args>
142     IndexedLog(Args&&... args) :
143       pg_log_t(std::forward<Args>(args)...),
144       complete_to(log.end()),
145       last_requested(0),
146       indexed_data(0),
147       rollback_info_trimmed_to_riter(log.rbegin())
148     {
149       reset_rollback_info_trimmed_to_riter();
150       index();
151     }
152
153     IndexedLog(const IndexedLog &rhs) :
154       pg_log_t(rhs),
155       complete_to(log.end()),
156       last_requested(rhs.last_requested),
157       indexed_data(0),
158       rollback_info_trimmed_to_riter(log.rbegin())
159     {
160       reset_rollback_info_trimmed_to_riter();
161       index(rhs.indexed_data);
162     }
163
164     IndexedLog &operator=(const IndexedLog &rhs) {
165       this->~IndexedLog();
166       new (this) IndexedLog(rhs);
167       return *this;
168     }
169
170     void trim_rollback_info_to(eversion_t to, LogEntryHandler *h) {
171       advance_can_rollback_to(
172         to,
173         [&](pg_log_entry_t &entry) {
174           h->trim(entry);
175         });
176     }
177     void roll_forward_to(eversion_t to, LogEntryHandler *h) {
178       advance_can_rollback_to(
179         to,
180         [&](pg_log_entry_t &entry) {
181           h->rollforward(entry);
182         });
183     }
184
185     void skip_can_rollback_to_to_head() {
186       advance_can_rollback_to(head, [&](const pg_log_entry_t &entry) {});
187     }
188
189     mempool::osd_pglog::list<pg_log_entry_t> rewind_from_head(eversion_t newhead) {
190       auto divergent = pg_log_t::rewind_from_head(newhead);
191       index();
192       reset_rollback_info_trimmed_to_riter();
193       return divergent;
194     }
195
196     template <typename T>
197     void scan_log_after(
198       const eversion_t &bound, ///< [in] scan entries > bound
199       T &&f) const {
200       auto iter = log.rbegin();
201       while (iter != log.rend() && iter->version > bound)
202         ++iter;
203
204       while (true) {
205         if (iter == log.rbegin())
206           break;
207         f(*(--iter));
208       }
209     }
210
211     /****/
212     void claim_log_and_clear_rollback_info(const pg_log_t& o) {
213       // we must have already trimmed the old entries
214       assert(rollback_info_trimmed_to == head);
215       assert(rollback_info_trimmed_to_riter == log.rbegin());
216
217       *this = IndexedLog(o);
218
219       skip_can_rollback_to_to_head();
220       index();
221     }
222
223     void split_out_child(
224       pg_t child_pgid,
225       unsigned split_bits,
226       IndexedLog *target);
227
228     void zero() {
229       // we must have already trimmed the old entries
230       assert(rollback_info_trimmed_to == head);
231       assert(rollback_info_trimmed_to_riter == log.rbegin());
232
233       unindex();
234       pg_log_t::clear();
235       rollback_info_trimmed_to_riter = log.rbegin();
236       reset_recovery_pointers();
237     }
238     void clear() {
239       skip_can_rollback_to_to_head();
240       zero();
241     }
242     void reset_recovery_pointers() {
243       complete_to = log.end();
244       last_requested = 0;
245     }
246
247     bool logged_object(const hobject_t& oid) const {
248       if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
249          index_objects();
250       }
251       return objects.count(oid);
252     }
253
254     bool logged_req(const osd_reqid_t &r) const {
255       if (!(indexed_data & PGLOG_INDEXED_CALLER_OPS)) {
256         index_caller_ops();
257       }
258       if (!caller_ops.count(r)) {
259         if (!(indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS)) {
260           index_extra_caller_ops();
261         }
262         return extra_caller_ops.count(r);
263       }
264       return true;
265     }
266
267     bool get_request(
268       const osd_reqid_t &r,
269       eversion_t *version,
270       version_t *user_version,
271       int *return_code) const
272     {
273       assert(version);
274       assert(user_version);
275       assert(return_code);
276       ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
277       if (!(indexed_data & PGLOG_INDEXED_CALLER_OPS)) {
278         index_caller_ops();
279       }
280       p = caller_ops.find(r);
281       if (p != caller_ops.end()) {
282         *version = p->second->version;
283         *user_version = p->second->user_version;
284         *return_code = p->second->return_code;
285         return true;
286       }
287
288       // warning: we will return *a* request for this reqid, but not
289       // necessarily the most recent.
290       if (!(indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS)) {
291         index_extra_caller_ops();
292       }
293       p = extra_caller_ops.find(r);
294       if (p != extra_caller_ops.end()) {
295         for (auto i = p->second->extra_reqids.begin();
296              i != p->second->extra_reqids.end();
297              ++i) {
298           if (i->first == r) {
299             *version = p->second->version;
300             *user_version = i->second;
301             *return_code = p->second->return_code;
302             return true;
303           }
304         }
305         assert(0 == "in extra_caller_ops but not extra_reqids");
306       }
307
308       if (!(indexed_data & PGLOG_INDEXED_DUPS)) {
309         index_dups();
310       }
311       auto q = dup_index.find(r);
312       if (q != dup_index.end()) {
313         *version = q->second->version;
314         *user_version = q->second->user_version;
315         *return_code = q->second->return_code;
316         return true;
317       }
318
319       return false;
320     }
321
322     /// get a (bounded) list of recent reqids for the given object
323     void get_object_reqids(const hobject_t& oid, unsigned max,
324                            mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls) const {
325        // make sure object is present at least once before we do an
326        // O(n) search.
327       if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
328         index_objects();
329       }
330       if (objects.count(oid) == 0)
331         return;
332       for (list<pg_log_entry_t>::const_reverse_iterator i = log.rbegin();
333            i != log.rend();
334            ++i) {
335         if (i->soid == oid) {
336           if (i->reqid_is_indexed())
337             pls->push_back(make_pair(i->reqid, i->user_version));
338           pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
339           if (pls->size() >= max) {
340             if (pls->size() > max) {
341               pls->resize(max);
342             }
343             return;
344           }
345         }
346       }
347     }
348
349     void index(__u16 to_index = PGLOG_INDEXED_ALL) const {
350       // if to_index is 0, no need to run any of this code, especially
351       // loop below; this can happen with copy constructor for
352       // IndexedLog (and indirectly through assignment operator)
353       if (!to_index) return;
354
355       if (to_index & PGLOG_INDEXED_OBJECTS)
356         objects.clear();
357       if (to_index & PGLOG_INDEXED_CALLER_OPS)
358         caller_ops.clear();
359       if (to_index & PGLOG_INDEXED_EXTRA_CALLER_OPS)
360         extra_caller_ops.clear();
361       if (to_index & PGLOG_INDEXED_DUPS) {
362         dup_index.clear();
363         for (auto& i : dups) {
364           dup_index[i.reqid] = const_cast<pg_log_dup_t*>(&i);
365         }
366       }
367
368       constexpr __u16 any_log_entry_index =
369         PGLOG_INDEXED_OBJECTS |
370         PGLOG_INDEXED_CALLER_OPS |
371         PGLOG_INDEXED_EXTRA_CALLER_OPS;
372
373       if (to_index & any_log_entry_index) {
374         for (list<pg_log_entry_t>::const_iterator i = log.begin();
375              i != log.end();
376              ++i) {
377           if (to_index & PGLOG_INDEXED_OBJECTS) {
378             if (i->object_is_indexed()) {
379               objects[i->soid] = const_cast<pg_log_entry_t*>(&(*i));
380             }
381           }
382
383           if (to_index & PGLOG_INDEXED_CALLER_OPS) {
384             if (i->reqid_is_indexed()) {
385               caller_ops[i->reqid] = const_cast<pg_log_entry_t*>(&(*i));
386             }
387           }
388
389           if (to_index & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
390             for (auto j = i->extra_reqids.begin();
391                  j != i->extra_reqids.end();
392                  ++j) {
393               extra_caller_ops.insert(
394                 make_pair(j->first, const_cast<pg_log_entry_t*>(&(*i))));
395             }
396           }
397         }
398       }
399
400       indexed_data |= to_index;
401     }
402
403     void index_objects() const {
404       index(PGLOG_INDEXED_OBJECTS);
405     }
406
407     void index_caller_ops() const {
408       index(PGLOG_INDEXED_CALLER_OPS);
409     }
410
411     void index_extra_caller_ops() const {
412       index(PGLOG_INDEXED_EXTRA_CALLER_OPS);
413     }
414
415     void index_dups() const {
416       index(PGLOG_INDEXED_DUPS);
417     }
418
419     void index(pg_log_entry_t& e) {
420       if ((indexed_data & PGLOG_INDEXED_OBJECTS) && e.object_is_indexed()) {
421         if (objects.count(e.soid) == 0 ||
422             objects[e.soid]->version < e.version)
423           objects[e.soid] = &e;
424       }
425       if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
426         // divergent merge_log indexes new before unindexing old
427         if (e.reqid_is_indexed()) {
428           caller_ops[e.reqid] = &e;
429         }
430       }
431       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
432         for (auto j = e.extra_reqids.begin();
433              j != e.extra_reqids.end();
434              ++j) {
435           extra_caller_ops.insert(make_pair(j->first, &e));
436         }
437       }
438     }
439
440     void unindex() {
441       objects.clear();
442       caller_ops.clear();
443       extra_caller_ops.clear();
444       dup_index.clear();
445       indexed_data = 0;
446     }
447
448     void unindex(const pg_log_entry_t& e) {
449       // NOTE: this only works if we remove from the _tail_ of the log!
450       if (indexed_data & PGLOG_INDEXED_OBJECTS) {
451         if (objects.count(e.soid) && objects[e.soid]->version == e.version)
452           objects.erase(e.soid);
453       }
454       if (e.reqid_is_indexed()) {
455         if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
456           // divergent merge_log indexes new before unindexing old
457           if (caller_ops.count(e.reqid) && caller_ops[e.reqid] == &e)
458             caller_ops.erase(e.reqid);
459         }
460       }
461       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
462         for (auto j = e.extra_reqids.begin();
463              j != e.extra_reqids.end();
464              ++j) {
465           for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
466                  extra_caller_ops.find(j->first);
467                k != extra_caller_ops.end() && k->first == j->first;
468                ++k) {
469             if (k->second == &e) {
470               extra_caller_ops.erase(k);
471               break;
472             }
473           }
474         }
475       }
476     }
477
478     void index(pg_log_dup_t& e) {
479       if (indexed_data & PGLOG_INDEXED_DUPS) {
480         dup_index[e.reqid] = &e;
481       }
482     }
483
484     void unindex(const pg_log_dup_t& e) {
485       if (indexed_data & PGLOG_INDEXED_DUPS) {
486         auto i = dup_index.find(e.reqid);
487         if (i != dup_index.end()) {
488           dup_index.erase(i);
489         }
490       }
491     }
492
493     // actors
494     void add(const pg_log_entry_t& e, bool applied = true) {
495       if (!applied) {
496         assert(get_can_rollback_to() == head);
497       }
498
499       // make sure our buffers don't pin bigger buffers
500       e.mod_desc.trim_bl();
501
502       // add to log
503       log.push_back(e);
504
505       // riter previously pointed to the previous entry
506       if (rollback_info_trimmed_to_riter == log.rbegin())
507         ++rollback_info_trimmed_to_riter;
508
509       assert(e.version > head);
510       assert(head.version == 0 || e.version.version > head.version);
511       head = e.version;
512
513       // to our index
514       if ((indexed_data & PGLOG_INDEXED_OBJECTS) && e.object_is_indexed()) {
515         objects[e.soid] = &(log.back());
516       }
517       if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
518         if (e.reqid_is_indexed()) {
519           caller_ops[e.reqid] = &(log.back());
520         }
521       }
522
523       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
524         for (auto j = e.extra_reqids.begin();
525              j != e.extra_reqids.end();
526              ++j) {
527           extra_caller_ops.insert(make_pair(j->first, &(log.back())));
528         }
529       }
530
531       if (!applied) {
532         skip_can_rollback_to_to_head();
533       }
534     } // add
535
536     void trim(
537       CephContext* cct,
538       eversion_t s,
539       set<eversion_t> *trimmed,
540       set<string>* trimmed_dups,
541       eversion_t *write_from_dups);
542
543     ostream& print(ostream& out) const;
544   }; // IndexedLog
545
546
547 protected:
548   //////////////////// data members ////////////////////
549
550   pg_missing_tracker_t missing;
551   IndexedLog  log;
552
553   eversion_t dirty_to;         ///< must clear/writeout all keys <= dirty_to
554   eversion_t dirty_from;       ///< must clear/writeout all keys >= dirty_from
555   eversion_t writeout_from;    ///< must writout keys >= writeout_from
556   set<eversion_t> trimmed;     ///< must clear keys in trimmed
557   eversion_t dirty_to_dups;    ///< must clear/writeout all dups <= dirty_to_dups
558   eversion_t dirty_from_dups;  ///< must clear/writeout all dups >= dirty_from_dups
559   eversion_t write_from_dups;  ///< must write keys >= write_from_dups
560   set<string> trimmed_dups;    ///< must clear keys in trimmed_dups
561   CephContext *cct;
562   bool pg_log_debug;
563   /// Log is clean on [dirty_to, dirty_from)
564   bool touched_log;
565   bool clear_divergent_priors;
566   bool rebuilt_missing_with_deletes = false;
567
568   void mark_dirty_to(eversion_t to) {
569     if (to > dirty_to)
570       dirty_to = to;
571   }
572   void mark_dirty_from(eversion_t from) {
573     if (from < dirty_from)
574       dirty_from = from;
575   }
576   void mark_writeout_from(eversion_t from) {
577     if (from < writeout_from)
578       writeout_from = from;
579   }
580   void mark_dirty_to_dups(eversion_t to) {
581     if (to > dirty_to_dups)
582       dirty_to_dups = to;
583   }
584   void mark_dirty_from_dups(eversion_t from) {
585     if (from < dirty_from_dups)
586       dirty_from_dups = from;
587   }
588 public:
589   bool is_dirty() const {
590     return !touched_log ||
591       (dirty_to != eversion_t()) ||
592       (dirty_from != eversion_t::max()) ||
593       (writeout_from != eversion_t::max()) ||
594       !(trimmed.empty()) ||
595       !missing.is_clean() ||
596       !(trimmed_dups.empty()) ||
597       (dirty_to_dups != eversion_t()) ||
598       (dirty_from_dups != eversion_t::max()) ||
599       (write_from_dups != eversion_t::max()) ||
600       rebuilt_missing_with_deletes;
601   }
602   void mark_log_for_rewrite() {
603     mark_dirty_to(eversion_t::max());
604     mark_dirty_from(eversion_t());
605     mark_dirty_to_dups(eversion_t::max());
606     mark_dirty_from_dups(eversion_t());
607     touched_log = false;
608   }
609   bool get_rebuilt_missing_with_deletes() const {
610     return rebuilt_missing_with_deletes;
611   }
612 protected:
613
614   /// DEBUG
615   set<string> log_keys_debug;
616   static void clear_after(set<string> *log_keys_debug, const string &lb) {
617     if (!log_keys_debug)
618       return;
619     for (set<string>::iterator i = log_keys_debug->lower_bound(lb);
620          i != log_keys_debug->end();
621          log_keys_debug->erase(i++));
622   }
623   static void clear_up_to(set<string> *log_keys_debug, const string &ub) {
624     if (!log_keys_debug)
625       return;
626     for (set<string>::iterator i = log_keys_debug->begin();
627          i != log_keys_debug->end() && *i < ub;
628          log_keys_debug->erase(i++));
629   }
630
631   void check();
632   void undirty() {
633     dirty_to = eversion_t();
634     dirty_from = eversion_t::max();
635     touched_log = true;
636     trimmed.clear();
637     trimmed_dups.clear();
638     writeout_from = eversion_t::max();
639     check();
640     missing.flush();
641     dirty_to_dups = eversion_t();
642     dirty_from_dups = eversion_t::max();
643     write_from_dups = eversion_t::max();
644   }
645 public:
646
647   // cppcheck-suppress noExplicitConstructor
648   PGLog(CephContext *cct, DoutPrefixProvider *dpp = nullptr) :
649     prefix_provider(dpp),
650     dirty_from(eversion_t::max()),
651     writeout_from(eversion_t::max()),
652     dirty_from_dups(eversion_t::max()),
653     write_from_dups(eversion_t::max()),
654     cct(cct),
655     pg_log_debug(!(cct && !(cct->_conf->osd_debug_pg_log_writeout))),
656     touched_log(false),
657     clear_divergent_priors(false)
658   { }
659
660   void reset_backfill();
661
662   void clear();
663
664   //////////////////// get or set missing ////////////////////
665
666   const pg_missing_tracker_t& get_missing() const { return missing; }
667   void revise_have(hobject_t oid, eversion_t have) {
668     missing.revise_have(oid, have);
669   }
670
671   void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) {
672     missing.add(oid, need, have, false);
673   }
674
675   //////////////////// get or set log ////////////////////
676
677   const IndexedLog &get_log() const { return log; }
678
679   const eversion_t &get_tail() const { return log.tail; }
680
681   void set_tail(eversion_t tail) { log.tail = tail; }
682
683   const eversion_t &get_head() const { return log.head; }
684
685   void set_head(eversion_t head) { log.head = head; }
686
687   void set_last_requested(version_t last_requested) {
688     log.last_requested = last_requested;
689   }
690
691   void index() { log.index(); }
692
693   void unindex() { log.unindex(); }
694
695   void add(const pg_log_entry_t& e, bool applied = true) {
696     mark_writeout_from(e.version);
697     log.add(e, applied);
698   }
699
700   void reset_recovery_pointers() { log.reset_recovery_pointers(); }
701
702   static void clear_info_log(
703     spg_t pgid,
704     ObjectStore::Transaction *t);
705
706   void trim(
707     eversion_t trim_to,
708     pg_info_t &info);
709
710   void roll_forward_to(
711     eversion_t roll_forward_to,
712     LogEntryHandler *h) {
713     log.roll_forward_to(
714       roll_forward_to,
715       h);
716   }
717
718   eversion_t get_can_rollback_to() const {
719     return log.get_can_rollback_to();
720   }
721
722   void roll_forward(LogEntryHandler *h) {
723     roll_forward_to(
724       log.head,
725       h);
726   }
727
728   //////////////////// get or set log & missing ////////////////////
729
730   void reset_backfill_claim_log(const pg_log_t &o, LogEntryHandler *h) {
731     log.trim_rollback_info_to(log.head, h);
732     log.claim_log_and_clear_rollback_info(o);
733     missing.clear();
734     mark_dirty_to(eversion_t::max());
735     mark_dirty_to_dups(eversion_t::max());
736   }
737
738   void split_into(
739       pg_t child_pgid,
740       unsigned split_bits,
741       PGLog *opg_log) {
742     log.split_out_child(child_pgid, split_bits, &opg_log->log);
743     missing.split_into(child_pgid, split_bits, &(opg_log->missing));
744     opg_log->mark_dirty_to(eversion_t::max());
745     opg_log->mark_dirty_to_dups(eversion_t::max());
746     mark_dirty_to(eversion_t::max());
747     mark_dirty_to_dups(eversion_t::max());
748     if (missing.may_include_deletes)
749       opg_log->rebuilt_missing_with_deletes = true;
750   }
751
752   void recover_got(hobject_t oid, eversion_t v, pg_info_t &info) {
753     if (missing.is_missing(oid, v)) {
754       missing.got(oid, v);
755
756       // raise last_complete?
757       if (missing.get_items().empty()) {
758         log.complete_to = log.log.end();
759         info.last_complete = info.last_update;
760       }
761       while (log.complete_to != log.log.end()) {
762         if (missing.get_items().at(
763               missing.get_rmissing().begin()->second
764               ).need <= log.complete_to->version)
765           break;
766         if (info.last_complete < log.complete_to->version)
767           info.last_complete = log.complete_to->version;
768         ++log.complete_to;
769       }
770     }
771
772     assert(log.get_can_rollback_to() >= v);
773   }
774
775   void reset_complete_to(pg_info_t *info) {
776     log.complete_to = log.log.begin();
777     while (!missing.get_items().empty() && log.complete_to->version <
778            missing.get_items().at(
779              missing.get_rmissing().begin()->second
780              ).need) {
781       assert(log.complete_to != log.log.end());
782       ++log.complete_to;
783     }
784     assert(log.complete_to != log.log.end());
785     if (log.complete_to == log.log.begin()) {
786       if (info)
787         info->last_complete = eversion_t();
788     } else {
789       --log.complete_to;
790       if (info)
791         info->last_complete = log.complete_to->version;
792       ++log.complete_to;
793     }
794   }
795
796   void activate_not_complete(pg_info_t &info) {
797     reset_complete_to(&info);
798     log.last_requested = 0;
799   }
800
801   void proc_replica_log(pg_info_t &oinfo,
802                         const pg_log_t &olog,
803                         pg_missing_t& omissing, pg_shard_t from) const;
804
805   void rebuild_missing_set_with_deletes(ObjectStore *store,
806                                         coll_t pg_coll,
807                                         const pg_info_t &info);
808
809 protected:
810   static void split_by_object(
811     mempool::osd_pglog::list<pg_log_entry_t> &entries,
812     map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t>> *out_entries) {
813     while (!entries.empty()) {
814       auto &out_list = (*out_entries)[entries.front().soid];
815       out_list.splice(out_list.end(), entries, entries.begin());
816     }
817   }
818
819   /**
820    * _merge_object_divergent_entries
821    *
822    * There are 5 distinct cases:
823    * 1) There is a more recent update: in this case we assume we adjusted the
824    *    store and missing during merge_log
825    * 2) The first entry in the divergent sequence is a create.  This might
826    *    either be because the object is a clone or because prior_version is
827    *    eversion_t().  In this case the object does not exist and we must
828    *    adjust missing and the store to match.
829    * 3) We are currently missing the object.  In this case, we adjust the
830    *    missing to our prior_version taking care to add a divergent_prior
831    *    if necessary
832    * 4) We can rollback all of the entries.  In this case, we do so using
833    *    the rollbacker and return -- the object does not go into missing.
834    * 5) We cannot rollback at least 1 of the entries.  In this case, we
835    *    clear the object out of the store and add a missing entry at
836    *    prior_version taking care to add a divergent_prior if
837    *    necessary.
838    */
839   template <typename missing_type>
840   static void _merge_object_divergent_entries(
841     const IndexedLog &log,               ///< [in] log to merge against
842     const hobject_t &hoid,               ///< [in] object we are merging
843     const mempool::osd_pglog::list<pg_log_entry_t> &orig_entries, ///< [in] entries for hoid to merge
844     const pg_info_t &info,              ///< [in] info for merging entries
845     eversion_t olog_can_rollback_to,     ///< [in] rollback boundary
846     missing_type &missing,               ///< [in,out] missing to adjust, use
847     LogEntryHandler *rollbacker,         ///< [in] optional rollbacker object
848     const DoutPrefixProvider *dpp        ///< [in] logging provider
849     ) {
850     ldpp_dout(dpp, 20) << __func__ << ": merging hoid " << hoid
851                        << " entries: " << orig_entries << dendl;
852
853     if (hoid > info.last_backfill) {
854       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid << " after last_backfill"
855                          << dendl;
856       return;
857     }
858
859     // entries is non-empty
860     assert(!orig_entries.empty());
861     // strip out and ignore ERROR entries
862     mempool::osd_pglog::list<pg_log_entry_t> entries;
863     eversion_t last;
864     bool seen_non_error = false;
865     for (list<pg_log_entry_t>::const_iterator i = orig_entries.begin();
866          i != orig_entries.end();
867          ++i) {
868       // all entries are on hoid
869       assert(i->soid == hoid);
870       // did not see error entries before this entry and this entry is not error
871       // then this entry is the first non error entry
872       bool first_non_error = ! seen_non_error && ! i->is_error();
873       if (! i->is_error() ) {
874         // see a non error entry now
875         seen_non_error = true;
876       }
877       
878       // No need to check the first entry since it prior_version is unavailable
879       // in the list
880       // No need to check if the prior_version is the minimal version
881       // No need to check the first non-error entry since the leading error
882       // entries are not its prior version
883       if (i != orig_entries.begin() && i->prior_version != eversion_t() &&
884           ! first_non_error) {
885         // in increasing order of version
886         assert(i->version > last);
887         // prior_version correct (unless it is an ERROR entry)
888         assert(i->prior_version == last || i->is_error());
889       }
890       if (i->is_error()) {
891         ldpp_dout(dpp, 20) << __func__ << ": ignoring " << *i << dendl;
892       } else {
893         ldpp_dout(dpp, 20) << __func__ << ": keeping " << *i << dendl;
894         entries.push_back(*i);
895         last = i->version;
896       }
897     }
898     if (entries.empty()) {
899       ldpp_dout(dpp, 10) << __func__ << ": no non-ERROR entries" << dendl;
900       return;
901     }
902
903     const eversion_t prior_version = entries.begin()->prior_version;
904     const eversion_t first_divergent_update = entries.begin()->version;
905     const eversion_t last_divergent_update = entries.rbegin()->version;
906     const bool object_not_in_store =
907       !missing.is_missing(hoid) &&
908       entries.rbegin()->is_delete();
909     ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
910                        << " prior_version: " << prior_version
911                        << " first_divergent_update: " << first_divergent_update
912                        << " last_divergent_update: " << last_divergent_update
913                        << dendl;
914
915     ceph::unordered_map<hobject_t, pg_log_entry_t*>::const_iterator objiter =
916       log.objects.find(hoid);
917     if (objiter != log.objects.end() &&
918         objiter->second->version >= first_divergent_update) {
919       /// Case 1)
920       ldpp_dout(dpp, 10) << __func__ << ": more recent entry found: "
921                          << *objiter->second << ", already merged" << dendl;
922
923       assert(objiter->second->version > last_divergent_update);
924
925       // ensure missing has been updated appropriately
926       if (objiter->second->is_update() ||
927           (missing.may_include_deletes && objiter->second->is_delete())) {
928         assert(missing.is_missing(hoid) &&
929                missing.get_items().at(hoid).need == objiter->second->version);
930       } else {
931         assert(!missing.is_missing(hoid));
932       }
933       missing.revise_have(hoid, eversion_t());
934       if (rollbacker) {
935         if (!object_not_in_store) {
936           rollbacker->remove(hoid);
937         }
938         for (auto &&i: entries) {
939           rollbacker->trim(i);
940         }
941       }
942       return;
943     }
944
945     ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
946                        <<" has no more recent entries in log" << dendl;
947     if (prior_version == eversion_t() || entries.front().is_clone()) {
948       /// Case 2)
949       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
950                          << " prior_version or op type indicates creation,"
951                          << " deleting"
952                          << dendl;
953       if (missing.is_missing(hoid))
954         missing.rm(missing.get_items().find(hoid));
955       if (rollbacker) {
956         if (!object_not_in_store) {
957           rollbacker->remove(hoid);
958         }
959         for (auto &&i: entries) {
960           rollbacker->trim(i);
961         }
962       }
963       return;
964     }
965
966     if (missing.is_missing(hoid)) {
967       /// Case 3)
968       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
969                          << " missing, " << missing.get_items().at(hoid)
970                          << " adjusting" << dendl;
971
972       if (missing.get_items().at(hoid).have == prior_version) {
973         ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
974                            << " missing.have is prior_version " << prior_version
975                            << " removing from missing" << dendl;
976         missing.rm(missing.get_items().find(hoid));
977       } else {
978         ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
979                            << " missing.have is " << missing.get_items().at(hoid).have
980                            << ", adjusting" << dendl;
981         missing.revise_need(hoid, prior_version, false);
982         if (prior_version <= info.log_tail) {
983           ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
984                              << " prior_version " << prior_version
985                              << " <= info.log_tail "
986                              << info.log_tail << dendl;
987         }
988       }
989       if (rollbacker) {
990         for (auto &&i: entries) {
991           rollbacker->trim(i);
992         }
993       }
994       return;
995     }
996
997     ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
998                        << " must be rolled back or recovered,"
999                        << " attempting to rollback"
1000                        << dendl;
1001     bool can_rollback = true;
1002     /// Distinguish between 4) and 5)
1003     for (list<pg_log_entry_t>::const_reverse_iterator i = entries.rbegin();
1004          i != entries.rend();
1005          ++i) {
1006       if (!i->can_rollback() || i->version <= olog_can_rollback_to) {
1007         ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid << " cannot rollback "
1008                            << *i << dendl;
1009         can_rollback = false;
1010         break;
1011       }
1012     }
1013
1014     if (can_rollback) {
1015       /// Case 4)
1016       for (list<pg_log_entry_t>::const_reverse_iterator i = entries.rbegin();
1017            i != entries.rend();
1018            ++i) {
1019         assert(i->can_rollback() && i->version > olog_can_rollback_to);
1020         ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
1021                            << " rolling back " << *i << dendl;
1022         if (rollbacker)
1023           rollbacker->rollback(*i);
1024       }
1025       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
1026                          << " rolled back" << dendl;
1027       return;
1028     } else {
1029       /// Case 5)
1030       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid << " cannot roll back, "
1031                          << "removing and adding to missing" << dendl;
1032       if (rollbacker) {
1033         if (!object_not_in_store)
1034           rollbacker->remove(hoid);
1035         for (auto &&i: entries) {
1036           rollbacker->trim(i);
1037         }
1038       }
1039       missing.add(hoid, prior_version, eversion_t(), false);
1040       if (prior_version <= info.log_tail) {
1041         ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
1042                            << " prior_version " << prior_version
1043                            << " <= info.log_tail "
1044                            << info.log_tail << dendl;
1045       }
1046     }
1047   }
1048
1049   /// Merge all entries using above
1050   template <typename missing_type>
1051   static void _merge_divergent_entries(
1052     const IndexedLog &log,               ///< [in] log to merge against
1053     mempool::osd_pglog::list<pg_log_entry_t> &entries,       ///< [in] entries to merge
1054     const pg_info_t &oinfo,              ///< [in] info for merging entries
1055     eversion_t olog_can_rollback_to,     ///< [in] rollback boundary
1056     missing_type &omissing,              ///< [in,out] missing to adjust, use
1057     LogEntryHandler *rollbacker,         ///< [in] optional rollbacker object
1058     const DoutPrefixProvider *dpp        ///< [in] logging provider
1059     ) {
1060     map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t> > split;
1061     split_by_object(entries, &split);
1062     for (map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t>>::iterator i = split.begin();
1063          i != split.end();
1064          ++i) {
1065       _merge_object_divergent_entries(
1066         log,
1067         i->first,
1068         i->second,
1069         oinfo,
1070         olog_can_rollback_to,
1071         omissing,
1072         rollbacker,
1073         dpp);
1074     }
1075   }
1076
1077   /**
1078    * Exists for use in TestPGLog for simply testing single divergent log
1079    * cases
1080    */
1081   void merge_old_entry(
1082     ObjectStore::Transaction& t,
1083     const pg_log_entry_t& oe,
1084     const pg_info_t& info,
1085     LogEntryHandler *rollbacker) {
1086     mempool::osd_pglog::list<pg_log_entry_t> entries;
1087     entries.push_back(oe);
1088     _merge_object_divergent_entries(
1089       log,
1090       oe.soid,
1091       entries,
1092       info,
1093       log.get_can_rollback_to(),
1094       missing,
1095       rollbacker,
1096       this);
1097   }
1098
1099   bool merge_log_dups(const pg_log_t& olog);
1100
1101 public:
1102
1103   void rewind_divergent_log(eversion_t newhead,
1104                             pg_info_t &info,
1105                             LogEntryHandler *rollbacker,
1106                             bool &dirty_info,
1107                             bool &dirty_big_info);
1108
1109   void merge_log(pg_info_t &oinfo,
1110                  pg_log_t &olog,
1111                  pg_shard_t from,
1112                  pg_info_t &info, LogEntryHandler *rollbacker,
1113                  bool &dirty_info, bool &dirty_big_info);
1114
1115   template <typename missing_type>
1116   static bool append_log_entries_update_missing(
1117     const hobject_t &last_backfill,
1118     bool last_backfill_bitwise,
1119     const mempool::osd_pglog::list<pg_log_entry_t> &entries,
1120     bool maintain_rollback,
1121     IndexedLog *log,
1122     missing_type &missing,
1123     LogEntryHandler *rollbacker,
1124     const DoutPrefixProvider *dpp) {
1125     bool invalidate_stats = false;
1126     if (log && !entries.empty()) {
1127       assert(log->head < entries.begin()->version);
1128     }
1129     for (list<pg_log_entry_t>::const_iterator p = entries.begin();
1130          p != entries.end();
1131          ++p) {
1132       invalidate_stats = invalidate_stats || !p->is_error();
1133       if (log) {
1134         ldpp_dout(dpp, 20) << "update missing, append " << *p << dendl;
1135         log->add(*p);
1136       }
1137       if (p->soid <= last_backfill &&
1138           !p->is_error()) {
1139         if (missing.may_include_deletes) {
1140           missing.add_next_event(*p);
1141         } else {
1142           if (p->is_delete()) {
1143             missing.rm(p->soid, p->version);
1144           } else {
1145             missing.add_next_event(*p);
1146           }
1147           if (rollbacker) {
1148             // hack to match PG::mark_all_unfound_lost
1149             if (maintain_rollback && p->is_lost_delete() && p->can_rollback()) {
1150               rollbacker->try_stash(p->soid, p->version.version);
1151             } else if (p->is_delete()) {
1152               rollbacker->remove(p->soid);
1153             }
1154           }
1155         }
1156       }
1157     }
1158     return invalidate_stats;
1159   }
1160   bool append_new_log_entries(
1161     const hobject_t &last_backfill,
1162     bool last_backfill_bitwise,
1163     const mempool::osd_pglog::list<pg_log_entry_t> &entries,
1164     LogEntryHandler *rollbacker) {
1165     bool invalidate_stats = append_log_entries_update_missing(
1166       last_backfill,
1167       last_backfill_bitwise,
1168       entries,
1169       true,
1170       &log,
1171       missing,
1172       rollbacker,
1173       this);
1174     if (!entries.empty()) {
1175       mark_writeout_from(entries.begin()->version);
1176       if (entries.begin()->is_lost_delete()) {
1177         // hack: since lost deletes queue recovery directly, and don't
1178         // go through activate_not_complete() again, our complete_to
1179         // iterator may still point at log.end(). Reset it to point
1180         // before these new lost_delete entries.  This only occurs
1181         // when lost+delete entries are initially added, which is
1182         // always in a list of solely lost_delete entries, so it is
1183         // sufficient to check whether the first entry is a
1184         // lost_delete
1185         reset_complete_to(nullptr);
1186       }
1187     }
1188     return invalidate_stats;
1189   }
1190
1191   void write_log_and_missing(
1192     ObjectStore::Transaction& t,
1193     map<string,bufferlist> *km,
1194     const coll_t& coll,
1195     const ghobject_t &log_oid,
1196     bool require_rollback);
1197
1198   static void write_log_and_missing_wo_missing(
1199     ObjectStore::Transaction& t,
1200     map<string,bufferlist>* km,
1201     pg_log_t &log,
1202     const coll_t& coll,
1203     const ghobject_t &log_oid, map<eversion_t, hobject_t> &divergent_priors,
1204     bool require_rollback);
1205
1206   static void write_log_and_missing(
1207     ObjectStore::Transaction& t,
1208     map<string,bufferlist>* km,
1209     pg_log_t &log,
1210     const coll_t& coll,
1211     const ghobject_t &log_oid,
1212     const pg_missing_tracker_t &missing,
1213     bool require_rollback,
1214     bool *rebuilt_missing_set_with_deletes);
1215
1216   static void _write_log_and_missing_wo_missing(
1217     ObjectStore::Transaction& t,
1218     map<string,bufferlist>* km,
1219     pg_log_t &log,
1220     const coll_t& coll, const ghobject_t &log_oid,
1221     map<eversion_t, hobject_t> &divergent_priors,
1222     eversion_t dirty_to,
1223     eversion_t dirty_from,
1224     eversion_t writeout_from,
1225     const set<eversion_t> &trimmed,
1226     const set<string> &trimmed_dups,
1227     bool dirty_divergent_priors,
1228     bool touch_log,
1229     bool require_rollback,
1230     eversion_t dirty_to_dups,
1231     eversion_t dirty_from_dups,
1232     eversion_t write_from_dups,
1233     set<string> *log_keys_debug
1234     );
1235
1236   static void _write_log_and_missing(
1237     ObjectStore::Transaction& t,
1238     map<string,bufferlist>* km,
1239     pg_log_t &log,
1240     const coll_t& coll, const ghobject_t &log_oid,
1241     eversion_t dirty_to,
1242     eversion_t dirty_from,
1243     eversion_t writeout_from,
1244     const set<eversion_t> &trimmed,
1245     const set<string> &trimmed_dups,
1246     const pg_missing_tracker_t &missing,
1247     bool touch_log,
1248     bool require_rollback,
1249     bool clear_divergent_priors,
1250     eversion_t dirty_to_dups,
1251     eversion_t dirty_from_dups,
1252     eversion_t write_from_dups,
1253     bool *rebuilt_missing_with_deletes,
1254     set<string> *log_keys_debug
1255     );
1256
1257   void read_log_and_missing(
1258     ObjectStore *store,
1259     coll_t pg_coll,
1260     coll_t log_coll,
1261     ghobject_t log_oid,
1262     const pg_info_t &info,
1263     bool force_rebuild_missing,
1264     ostringstream &oss,
1265     bool tolerate_divergent_missing_log,
1266     bool debug_verify_stored_missing = false
1267     ) {
1268     return read_log_and_missing(
1269       store, pg_coll, log_coll, log_oid, info,
1270       log, missing, force_rebuild_missing, oss,
1271       tolerate_divergent_missing_log,
1272       &clear_divergent_priors,
1273       this,
1274       (pg_log_debug ? &log_keys_debug : nullptr),
1275       debug_verify_stored_missing);
1276   }
1277
1278   template <typename missing_type>
1279   static void read_log_and_missing(
1280     ObjectStore *store,
1281     coll_t pg_coll,
1282     coll_t log_coll,
1283     ghobject_t log_oid,
1284     const pg_info_t &info,
1285     IndexedLog &log,
1286     missing_type &missing,
1287     bool force_rebuild_missing,
1288     ostringstream &oss,
1289     bool tolerate_divergent_missing_log,
1290     bool *clear_divergent_priors = nullptr,
1291     const DoutPrefixProvider *dpp = nullptr,
1292     set<string> *log_keys_debug = nullptr,
1293     bool debug_verify_stored_missing = false
1294     ) {
1295     ldpp_dout(dpp, 20) << "read_log_and_missing coll " << pg_coll
1296                        << " log_oid " << log_oid << dendl;
1297
1298     // legacy?
1299     struct stat st;
1300     int r = store->stat(log_coll, log_oid, &st);
1301     assert(r == 0);
1302     assert(st.st_size == 0);
1303
1304     // will get overridden below if it had been recorded
1305     eversion_t on_disk_can_rollback_to = info.last_update;
1306     eversion_t on_disk_rollback_info_trimmed_to = eversion_t();
1307     ObjectMap::ObjectMapIterator p = store->get_omap_iterator(log_coll, log_oid);
1308     map<eversion_t, hobject_t> divergent_priors;
1309     bool must_rebuild = force_rebuild_missing;
1310     missing.may_include_deletes = false;
1311     list<pg_log_entry_t> entries;
1312     list<pg_log_dup_t> dups;
1313     if (p) {
1314       for (p->seek_to_first(); p->valid() ; p->next(false)) {
1315         // non-log pgmeta_oid keys are prefixed with _; skip those
1316         if (p->key()[0] == '_')
1317           continue;
1318         bufferlist bl = p->value();//Copy bufferlist before creating iterator
1319         bufferlist::iterator bp = bl.begin();
1320         if (p->key() == "divergent_priors") {
1321           ::decode(divergent_priors, bp);
1322           ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
1323                              << " divergent_priors" << dendl;
1324           must_rebuild = true;
1325           debug_verify_stored_missing = false;
1326         } else if (p->key() == "can_rollback_to") {
1327           ::decode(on_disk_can_rollback_to, bp);
1328         } else if (p->key() == "rollback_info_trimmed_to") {
1329           ::decode(on_disk_rollback_info_trimmed_to, bp);
1330         } else if (p->key() == "may_include_deletes_in_missing") {
1331           missing.may_include_deletes = true;
1332         } else if (p->key().substr(0, 7) == string("missing")) {
1333           hobject_t oid;
1334           pg_missing_item item;
1335           ::decode(oid, bp);
1336           ::decode(item, bp);
1337           if (item.is_delete()) {
1338             assert(missing.may_include_deletes);
1339           }
1340           missing.add(oid, item.need, item.have, item.is_delete());
1341         } else if (p->key().substr(0, 4) == string("dup_")) {
1342           pg_log_dup_t dup;
1343           ::decode(dup, bp);
1344           if (!dups.empty()) {
1345             assert(dups.back().version < dup.version);
1346           }
1347           dups.push_back(dup);
1348         } else {
1349           pg_log_entry_t e;
1350           e.decode_with_checksum(bp);
1351           ldpp_dout(dpp, 20) << "read_log_and_missing " << e << dendl;
1352           if (!entries.empty()) {
1353             pg_log_entry_t last_e(entries.back());
1354             assert(last_e.version.version < e.version.version);
1355             assert(last_e.version.epoch <= e.version.epoch);
1356           }
1357           entries.push_back(e);
1358           if (log_keys_debug)
1359             log_keys_debug->insert(e.get_key_name());
1360         }
1361       }
1362     }
1363     log = IndexedLog(
1364       info.last_update,
1365       info.log_tail,
1366       on_disk_can_rollback_to,
1367       on_disk_rollback_info_trimmed_to,
1368       std::move(entries),
1369       std::move(dups));
1370
1371     if (must_rebuild || debug_verify_stored_missing) {
1372       // build missing
1373       if (debug_verify_stored_missing || info.last_complete < info.last_update) {
1374         ldpp_dout(dpp, 10)
1375           << "read_log_and_missing checking for missing items over interval ("
1376           << info.last_complete
1377           << "," << info.last_update << "]" << dendl;
1378
1379         set<hobject_t> did;
1380         set<hobject_t> checked;
1381         set<hobject_t> skipped;
1382         for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
1383              i != log.log.rend();
1384              ++i) {
1385           if (!debug_verify_stored_missing && i->version <= info.last_complete) break;
1386           if (i->soid > info.last_backfill)
1387             continue;
1388           if (i->is_error())
1389             continue;
1390           if (did.count(i->soid)) continue;
1391           did.insert(i->soid);
1392
1393           if (!missing.may_include_deletes && i->is_delete())
1394             continue;
1395
1396           bufferlist bv;
1397           int r = store->getattr(
1398             pg_coll,
1399             ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
1400             OI_ATTR,
1401             bv);
1402           if (r >= 0) {
1403             object_info_t oi(bv);
1404             if (oi.version < i->version) {
1405               ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i
1406                                  << " (have " << oi.version << ")" << dendl;
1407               if (debug_verify_stored_missing) {
1408                 auto miter = missing.get_items().find(i->soid);
1409                 assert(miter != missing.get_items().end());
1410                 assert(miter->second.need == i->version);
1411                 // the 'have' version is reset if an object is deleted,
1412                 // then created again
1413                 assert(miter->second.have == oi.version || miter->second.have == eversion_t());
1414                 checked.insert(i->soid);
1415               } else {
1416                 missing.add(i->soid, i->version, oi.version, i->is_delete());
1417               }
1418             }
1419           } else {
1420             ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
1421             if (debug_verify_stored_missing) {
1422               auto miter = missing.get_items().find(i->soid);
1423               if (i->is_delete()) {
1424                 assert(miter == missing.get_items().end() ||
1425                        (miter->second.need == i->version &&
1426                         miter->second.have == eversion_t()));
1427               } else {
1428                 assert(miter != missing.get_items().end());
1429                 assert(miter->second.need == i->version);
1430                 assert(miter->second.have == eversion_t());
1431               }
1432               checked.insert(i->soid);
1433             } else {
1434               missing.add(i->soid, i->version, eversion_t(), i->is_delete());
1435             }
1436           }
1437         }
1438         if (debug_verify_stored_missing) {
1439           for (auto &&i: missing.get_items()) {
1440             if (checked.count(i.first))
1441               continue;
1442             if (i.first > info.last_backfill) {
1443               ldpp_dout(dpp, -1) << __func__ << ": invalid missing set entry "
1444                                 << "found before last_backfill: "
1445                                 << i.first << " " << i.second
1446                                 << " last_backfill = " << info.last_backfill
1447                                 << dendl;
1448               assert(0 == "invalid missing set entry found");
1449             }
1450             bufferlist bv;
1451             int r = store->getattr(
1452               pg_coll,
1453               ghobject_t(i.first, ghobject_t::NO_GEN, info.pgid.shard),
1454               OI_ATTR,
1455               bv);
1456             if (r >= 0) {
1457               object_info_t oi(bv);
1458               assert(oi.version == i.second.have);
1459             } else {
1460               assert(i.second.is_delete() || eversion_t() == i.second.have);
1461             }
1462           }
1463         } else {
1464           assert(must_rebuild);
1465           for (map<eversion_t, hobject_t>::reverse_iterator i =
1466                  divergent_priors.rbegin();
1467                i != divergent_priors.rend();
1468                ++i) {
1469             if (i->first <= info.last_complete) break;
1470             if (i->second > info.last_backfill)
1471               continue;
1472             if (did.count(i->second)) continue;
1473             did.insert(i->second);
1474             bufferlist bv;
1475             int r = store->getattr(
1476               pg_coll,
1477               ghobject_t(i->second, ghobject_t::NO_GEN, info.pgid.shard),
1478               OI_ATTR,
1479               bv);
1480             if (r >= 0) {
1481               object_info_t oi(bv);
1482               /**
1483                  * 1) we see this entry in the divergent priors mapping
1484                  * 2) we didn't see an entry for this object in the log
1485                  *
1486                  * From 1 & 2 we know that either the object does not exist
1487                  * or it is at the version specified in the divergent_priors
1488                  * map since the object would have been deleted atomically
1489                  * with the addition of the divergent_priors entry, an older
1490                  * version would not have been recovered, and a newer version
1491                  * would show up in the log above.
1492                  */
1493                 /**
1494                  * Unfortunately the assessment above is incorrect because of
1495                  * http://tracker.ceph.com/issues/17916 (we were incorrectly
1496                  * not removing the divergent_priors set from disk state!),
1497                  * so let's check that.
1498                  */
1499               if (oi.version > i->first && tolerate_divergent_missing_log) {
1500                 ldpp_dout(dpp, 0) << "read_log divergent_priors entry (" << *i
1501                                   << ") inconsistent with disk state (" <<  oi
1502                                   << "), assuming it is tracker.ceph.com/issues/17916"
1503                                   << dendl;
1504               } else {
1505                 assert(oi.version == i->first);
1506               }
1507             } else {
1508               ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
1509               missing.add(i->second, i->first, eversion_t(), false);
1510             }
1511           }
1512         }
1513         if (clear_divergent_priors)
1514           (*clear_divergent_priors) = true;
1515       }
1516     }
1517
1518     if (!must_rebuild) {
1519       if (clear_divergent_priors)
1520         (*clear_divergent_priors) = false;
1521       missing.flush();
1522     }
1523     ldpp_dout(dpp, 10) << "read_log_and_missing done" << dendl;
1524   } // static read_log_and_missing
1525 }; // struct PGLog