Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / PGLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software
14  * Foundation.  See file COPYING.
15  *
16  */
17
18 #include "PGLog.h"
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
21
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_osd
24 #undef dout_prefix
25 #define dout_prefix _prefix(_dout, this)
26
27 static ostream& _prefix(std::ostream *_dout, const PGLog *pglog)
28 {
29   return *_dout << pglog->gen_prefix();
30 }
31
32 //////////////////// PGLog::IndexedLog ////////////////////
33
34 void PGLog::IndexedLog::split_out_child(
35   pg_t child_pgid,
36   unsigned split_bits,
37   PGLog::IndexedLog *target)
38 {
39   unindex();
40   *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits));
41   index();
42   target->index();
43   reset_rollback_info_trimmed_to_riter();
44 }
45
46 void PGLog::IndexedLog::trim(
47   CephContext* cct,
48   eversion_t s,
49   set<eversion_t> *trimmed,
50   set<string>* trimmed_dups,
51   eversion_t *write_from_dups)
52 {
53   if (complete_to != log.end() &&
54       complete_to->version <= s) {
55     generic_dout(0) << " bad trim to " << s << " when complete_to is "
56                     << complete_to->version
57                     << " on " << *this << dendl;
58   }
59
60   assert(s <= can_rollback_to);
61
62   auto earliest_dup_version =
63     log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked
64     ? 0u
65     : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked;
66
67   while (!log.empty()) {
68     const pg_log_entry_t &e = *log.begin();
69     if (e.version > s)
70       break;
71     generic_dout(20) << "trim " << e << dendl;
72     if (trimmed)
73       trimmed->insert(e.version);
74
75     unindex(e);         // remove from index,
76
77     // add to dup list
78     generic_dout(20) << "earliest_dup_version = " << earliest_dup_version << dendl;
79     if (e.version.version >= earliest_dup_version) {
80       if (write_from_dups != nullptr && *write_from_dups > e.version) {
81         generic_dout(20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl;
82         *write_from_dups = e.version;
83       }
84       dups.push_back(pg_log_dup_t(e));
85       index(dups.back());
86       for (const auto& extra : e.extra_reqids) {
87         // note: extras have the same version as outer op
88         dups.push_back(pg_log_dup_t(e.version, extra.second,
89                                     extra.first, e.return_code));
90         index(dups.back());
91       }
92     }
93
94     if (rollback_info_trimmed_to_riter == log.rend() ||
95         e.version == rollback_info_trimmed_to_riter->version) {
96       log.pop_front();
97       rollback_info_trimmed_to_riter = log.rend();
98     } else {
99       log.pop_front();
100     }
101   }
102
103   while (!dups.empty()) {
104     const auto& e = *dups.begin();
105     if (e.version.version >= earliest_dup_version)
106       break;
107     generic_dout(20) << "trim dup " << e << dendl;
108     if (trimmed_dups)
109       trimmed_dups->insert(e.get_key_name());
110     if (indexed_data & PGLOG_INDEXED_DUPS) {
111       dup_index.erase(e.reqid);
112     }
113     dups.pop_front();
114   }
115
116   // raise tail?
117   if (tail < s)
118     tail = s;
119 }
120
121 ostream& PGLog::IndexedLog::print(ostream& out) const
122 {
123   out << *this << std::endl;
124   for (list<pg_log_entry_t>::const_iterator p = log.begin();
125        p != log.end();
126        ++p) {
127     out << *p << " " <<
128       (logged_object(p->soid) ? "indexed" : "NOT INDEXED") <<
129       std::endl;
130     assert(!p->reqid_is_indexed() || logged_req(p->reqid));
131   }
132
133   for (list<pg_log_dup_t>::const_iterator p = dups.begin();
134        p != dups.end();
135        ++p) {
136     out << *p << std::endl;
137   }
138
139   return out;
140 }
141
142 //////////////////// PGLog ////////////////////
143
144 void PGLog::reset_backfill()
145 {
146   missing.clear();
147 }
148
149 void PGLog::clear() {
150   missing.clear();
151   log.clear();
152   log_keys_debug.clear();
153   undirty();
154 }
155
156 void PGLog::clear_info_log(
157   spg_t pgid,
158   ObjectStore::Transaction *t) {
159   coll_t coll(pgid);
160   t->remove(coll, pgid.make_pgmeta_oid());
161 }
162
163 void PGLog::trim(
164   eversion_t trim_to,
165   pg_info_t &info)
166 {
167   // trim?
168   if (trim_to > log.tail) {
169     // We shouldn't be trimming the log past last_complete
170     assert(trim_to <= info.last_complete);
171
172     dout(10) << "trim " << log << " to " << trim_to << dendl;
173     log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
174     info.log_tail = log.tail;
175   }
176 }
177
178 void PGLog::proc_replica_log(
179   pg_info_t &oinfo,
180   const pg_log_t &olog,
181   pg_missing_t& omissing,
182   pg_shard_t from) const
183 {
184   dout(10) << "proc_replica_log for osd." << from << ": "
185            << oinfo << " " << olog << " " << omissing << dendl;
186
187   if (olog.head < log.tail) {
188     dout(10) << __func__ << ": osd." << from << " does not overlap, not looking "
189              << "for divergent objects" << dendl;
190     return;
191   }
192   if (olog.head == log.head) {
193     dout(10) << __func__ << ": osd." << from << " same log head, not looking "
194              << "for divergent objects" << dendl;
195     return;
196   }
197   assert(olog.head >= log.tail);
198
199   /*
200     basically what we're doing here is rewinding the remote log,
201     dropping divergent entries, until we find something that matches
202     our master log.  we then reset last_update to reflect the new
203     point up to which missing is accurate.
204
205     later, in activate(), missing will get wound forward again and
206     we will send the peer enough log to arrive at the same state.
207   */
208
209   for (map<hobject_t, pg_missing_item>::const_iterator i = omissing.get_items().begin();
210        i != omissing.get_items().end();
211        ++i) {
212     dout(20) << " before missing " << i->first << " need " << i->second.need
213              << " have " << i->second.have << dendl;
214   }
215
216   list<pg_log_entry_t>::const_reverse_iterator first_non_divergent =
217     log.log.rbegin();
218   while (1) {
219     if (first_non_divergent == log.log.rend())
220       break;
221     if (first_non_divergent->version <= olog.head) {
222       dout(20) << "merge_log point (usually last shared) is "
223                << *first_non_divergent << dendl;
224       break;
225     }
226     ++first_non_divergent;
227   }
228
229   /* Because olog.head >= log.tail, we know that both pgs must at least have
230    * the event represented by log.tail.  Similarly, because log.head >= olog.tail,
231    * we know that the even represented by olog.tail must be common to both logs.
232    * Furthermore, the event represented by a log tail was necessarily trimmed,
233    * thus neither olog.tail nor log.tail can be divergent. It's
234    * possible that olog/log contain no actual events between olog.head and
235    * MAX(log.tail, olog.tail), however, since they might have been split out.
236    * Thus, if we cannot find an event e such that
237    * log.tail <= e.version <= log.head, the last_update must actually be
238    * MAX(log.tail, olog.tail).
239    */
240   eversion_t limit = MAX(olog.tail, log.tail);
241   eversion_t lu =
242     (first_non_divergent == log.log.rend() ||
243      first_non_divergent->version < limit) ?
244     limit :
245     first_non_divergent->version;
246
247   IndexedLog folog(olog);
248   auto divergent = folog.rewind_from_head(lu);
249   _merge_divergent_entries(
250     folog,
251     divergent,
252     oinfo,
253     olog.get_can_rollback_to(),
254     omissing,
255     0,
256     this);
257
258   if (lu < oinfo.last_update) {
259     dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
260     oinfo.last_update = lu;
261   }
262
263   if (omissing.have_missing()) {
264     eversion_t first_missing =
265       omissing.get_items().at(omissing.get_rmissing().begin()->second).need;
266     oinfo.last_complete = eversion_t();
267     list<pg_log_entry_t>::const_iterator i = olog.log.begin();
268     for (;
269          i != olog.log.end();
270          ++i) {
271       if (i->version < first_missing)
272         oinfo.last_complete = i->version;
273       else
274         break;
275     }
276   } else {
277     oinfo.last_complete = oinfo.last_update;
278   }
279 } // proc_replica_log
280
281 /**
282  * rewind divergent entries at the head of the log
283  *
284  * This rewinds entries off the head of our log that are divergent.
285  * This is used by replicas during activation.
286  *
287  * @param newhead new head to rewind to
288  */
289 void PGLog::rewind_divergent_log(eversion_t newhead,
290                                  pg_info_t &info, LogEntryHandler *rollbacker,
291                                  bool &dirty_info, bool &dirty_big_info)
292 {
293   dout(10) << "rewind_divergent_log truncate divergent future " <<
294     newhead << dendl;
295
296
297   if (info.last_complete > newhead)
298     info.last_complete = newhead;
299
300   auto divergent = log.rewind_from_head(newhead);
301   if (!divergent.empty()) {
302     mark_dirty_from(divergent.front().version);
303   }
304   for (auto &&entry: divergent) {
305     dout(10) << "rewind_divergent_log future divergent " << entry << dendl;
306   }
307   info.last_update = newhead;
308
309   _merge_divergent_entries(
310     log,
311     divergent,
312     info,
313     log.get_can_rollback_to(),
314     missing,
315     rollbacker,
316     this);
317
318   dirty_info = true;
319   dirty_big_info = true;
320 }
321
322 void PGLog::merge_log(pg_info_t &oinfo, pg_log_t &olog, pg_shard_t fromosd,
323                       pg_info_t &info, LogEntryHandler *rollbacker,
324                       bool &dirty_info, bool &dirty_big_info)
325 {
326   dout(10) << "merge_log " << olog << " from osd." << fromosd
327            << " into " << log << dendl;
328
329   // Check preconditions
330
331   // If our log is empty, the incoming log needs to have not been trimmed.
332   assert(!log.null() || olog.tail == eversion_t());
333   // The logs must overlap.
334   assert(log.head >= olog.tail && olog.head >= log.tail);
335
336   for (map<hobject_t, pg_missing_item>::const_iterator i = missing.get_items().begin();
337        i != missing.get_items().end();
338        ++i) {
339     dout(20) << "pg_missing_t sobject: " << i->first << dendl;
340   }
341
342   bool changed = false;
343
344   // extend on tail?
345   //  this is just filling in history.  it does not affect our
346   //  missing set, as that should already be consistent with our
347   //  current log.
348   eversion_t orig_tail = log.tail;
349   if (olog.tail < log.tail) {
350     dout(10) << "merge_log extending tail to " << olog.tail << dendl;
351     list<pg_log_entry_t>::iterator from = olog.log.begin();
352     list<pg_log_entry_t>::iterator to;
353     eversion_t last;
354     for (to = from;
355          to != olog.log.end();
356          ++to) {
357       if (to->version > log.tail)
358         break;
359       log.index(*to);
360       dout(15) << *to << dendl;
361       last = to->version;
362     }
363     mark_dirty_to(last);
364
365     // splice into our log.
366     log.log.splice(log.log.begin(),
367                    olog.log, from, to);
368
369     info.log_tail = log.tail = olog.tail;
370     changed = true;
371   }
372
373   if (oinfo.stats.reported_seq < info.stats.reported_seq ||   // make sure reported always increases
374       oinfo.stats.reported_epoch < info.stats.reported_epoch) {
375     oinfo.stats.reported_seq = info.stats.reported_seq;
376     oinfo.stats.reported_epoch = info.stats.reported_epoch;
377   }
378   if (info.last_backfill.is_max())
379     info.stats = oinfo.stats;
380   info.hit_set = oinfo.hit_set;
381
382   // do we have divergent entries to throw out?
383   if (olog.head < log.head) {
384     rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info);
385     changed = true;
386   }
387
388   // extend on head?
389   if (olog.head > log.head) {
390     dout(10) << "merge_log extending head to " << olog.head << dendl;
391
392     // find start point in olog
393     list<pg_log_entry_t>::iterator to = olog.log.end();
394     list<pg_log_entry_t>::iterator from = olog.log.end();
395     eversion_t lower_bound = MAX(olog.tail, orig_tail);
396     while (1) {
397       if (from == olog.log.begin())
398         break;
399       --from;
400       dout(20) << "  ? " << *from << dendl;
401       if (from->version <= log.head) {
402         lower_bound = MAX(lower_bound, from->version);
403         ++from;
404         break;
405       }
406     }
407     dout(20) << "merge_log cut point (usually last shared) is "
408              << lower_bound << dendl;
409     mark_dirty_from(lower_bound);
410
411     auto divergent = log.rewind_from_head(lower_bound);
412     // move aside divergent items
413     for (auto &&oe: divergent) {
414       dout(10) << "merge_log divergent " << oe << dendl;
415     }
416     log.roll_forward_to(log.head, rollbacker);
417
418     mempool::osd_pglog::list<pg_log_entry_t> new_entries;
419     new_entries.splice(new_entries.end(), olog.log, from, to);
420     append_log_entries_update_missing(
421       info.last_backfill,
422       info.last_backfill_bitwise,
423       new_entries,
424       false,
425       &log,
426       missing,
427       rollbacker,
428       this);
429
430     _merge_divergent_entries(
431       log,
432       divergent,
433       info,
434       log.get_can_rollback_to(),
435       missing,
436       rollbacker,
437       this);
438
439     info.last_update = log.head = olog.head;
440
441     // We cannot rollback into the new log entries
442     log.skip_can_rollback_to_to_head();
443
444     info.last_user_version = oinfo.last_user_version;
445     info.purged_snaps = oinfo.purged_snaps;
446
447     changed = true;
448   }
449
450   // now handle dups
451   if (merge_log_dups(olog)) {
452     changed = true;
453   }
454
455   dout(10) << "merge_log result " << log << " " << missing <<
456     " changed=" << changed << dendl;
457
458   if (changed) {
459     dirty_info = true;
460     dirty_big_info = true;
461   }
462 }
463
464
465 // returns true if any changes were made to log.dups
466 bool PGLog::merge_log_dups(const pg_log_t& olog) {
467   bool changed = false;
468
469   if (!olog.dups.empty()) {
470     if (log.dups.empty()) {
471       dout(10) << "merge_log copying olog dups to log " <<
472         olog.dups.front().version << " to " <<
473         olog.dups.back().version << dendl;
474       changed = true;
475       dirty_from_dups = eversion_t();
476       dirty_to_dups = eversion_t::max();
477       // since our log.dups is empty just copy them
478       for (const auto& i : olog.dups) {
479         log.dups.push_back(i);
480         log.index(log.dups.back());
481       }
482     } else {
483       // since our log.dups is not empty try to extend on each end
484
485       if (olog.dups.back().version > log.dups.back().version) {
486         // extend the dups's tail (i.e., newer dups)
487         dout(10) << "merge_log extending dups tail to " <<
488           olog.dups.back().version << dendl;
489         changed = true;
490
491         auto log_tail_version = log.dups.back().version;
492
493         auto insert_cursor = log.dups.end();
494         eversion_t last_shared = eversion_t::max();
495         for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) {
496           if (i->version <= log_tail_version) break;
497           log.dups.insert(insert_cursor, *i);
498           last_shared = i->version;
499
500           auto prev = insert_cursor;
501           --prev;
502           // be sure to pass reference of copy in log.dups
503           log.index(*prev);
504
505           --insert_cursor; // make sure we insert in reverse order
506         }
507         mark_dirty_from_dups(last_shared);
508       }
509
510       if (olog.dups.front().version < log.dups.front().version) {
511         // extend the dups's head (i.e., older dups)
512         dout(10) << "merge_log extending dups head to " <<
513           olog.dups.front().version << dendl;
514         changed = true;
515
516         eversion_t last;
517         auto insert_cursor = log.dups.begin();
518         for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) {
519           if (i->version >= insert_cursor->version) break;
520           log.dups.insert(insert_cursor, *i);
521           last = i->version;
522           auto prev = insert_cursor;
523           --prev;
524           // be sure to pass address of copy in log.dups
525           log.index(*prev);
526         }
527         mark_dirty_to_dups(last);
528       }
529     }
530   }
531
532   // remove any dup entries that overlap with pglog
533   if (!log.dups.empty() && log.dups.back().version >= log.tail) {
534     dout(10) << "merge_log removed dups overlapping log entries [" <<
535       log.tail << "," << log.dups.back().version << "]" << dendl;
536     changed = true;
537
538     while (!log.dups.empty() && log.dups.back().version >= log.tail) {
539       log.unindex(log.dups.back());
540       mark_dirty_from_dups(log.dups.back().version);
541       log.dups.pop_back();
542     }
543   }
544
545   return changed;
546 }
547
548 void PGLog::check() {
549   if (!pg_log_debug)
550     return;
551   if (log.log.size() != log_keys_debug.size()) {
552     derr << "log.log.size() != log_keys_debug.size()" << dendl;
553     derr << "actual log:" << dendl;
554     for (list<pg_log_entry_t>::iterator i = log.log.begin();
555          i != log.log.end();
556          ++i) {
557       derr << "    " << *i << dendl;
558     }
559     derr << "log_keys_debug:" << dendl;
560     for (set<string>::const_iterator i = log_keys_debug.begin();
561          i != log_keys_debug.end();
562          ++i) {
563       derr << "    " << *i << dendl;
564     }
565   }
566   assert(log.log.size() == log_keys_debug.size());
567   for (list<pg_log_entry_t>::iterator i = log.log.begin();
568        i != log.log.end();
569        ++i) {
570     assert(log_keys_debug.count(i->get_key_name()));
571   }
572 }
573
574 // non-static
575 void PGLog::write_log_and_missing(
576   ObjectStore::Transaction& t,
577   map<string,bufferlist> *km,
578   const coll_t& coll,
579   const ghobject_t &log_oid,
580   bool require_rollback)
581 {
582   if (is_dirty()) {
583     dout(5) << "write_log_and_missing with: "
584              << "dirty_to: " << dirty_to
585              << ", dirty_from: " << dirty_from
586              << ", writeout_from: " << writeout_from
587              << ", trimmed: " << trimmed
588              << ", trimmed_dups: " << trimmed_dups
589              << ", clear_divergent_priors: " << clear_divergent_priors
590              << dendl;
591     _write_log_and_missing(
592       t, km, log, coll, log_oid,
593       dirty_to,
594       dirty_from,
595       writeout_from,
596       trimmed,
597       trimmed_dups,
598       missing,
599       !touched_log,
600       require_rollback,
601       clear_divergent_priors,
602       dirty_to_dups,
603       dirty_from_dups,
604       write_from_dups,
605       &rebuilt_missing_with_deletes,
606       (pg_log_debug ? &log_keys_debug : nullptr));
607     undirty();
608   } else {
609     dout(10) << "log is not dirty" << dendl;
610   }
611 }
612
613 // static
614 void PGLog::write_log_and_missing_wo_missing(
615     ObjectStore::Transaction& t,
616     map<string,bufferlist> *km,
617     pg_log_t &log,
618     const coll_t& coll, const ghobject_t &log_oid,
619     map<eversion_t, hobject_t> &divergent_priors,
620     bool require_rollback
621     )
622 {
623   _write_log_and_missing_wo_missing(
624     t, km, log, coll, log_oid,
625     divergent_priors, eversion_t::max(), eversion_t(), eversion_t(),
626     set<eversion_t>(),
627     set<string>(),
628     true, true, require_rollback,
629     eversion_t::max(), eversion_t(), eversion_t(), nullptr);
630 }
631
632 // static
633 void PGLog::write_log_and_missing(
634     ObjectStore::Transaction& t,
635     map<string,bufferlist> *km,
636     pg_log_t &log,
637     const coll_t& coll,
638     const ghobject_t &log_oid,
639     const pg_missing_tracker_t &missing,
640     bool require_rollback,
641     bool *rebuilt_missing_with_deletes)
642 {
643   _write_log_and_missing(
644     t, km, log, coll, log_oid,
645     eversion_t::max(),
646     eversion_t(),
647     eversion_t(),
648     set<eversion_t>(),
649     set<string>(),
650     missing,
651     true, require_rollback, false,
652     eversion_t::max(),
653     eversion_t(),
654     eversion_t(),
655     rebuilt_missing_with_deletes, nullptr);
656 }
657
658 // static
659 void PGLog::_write_log_and_missing_wo_missing(
660   ObjectStore::Transaction& t,
661   map<string,bufferlist> *km,
662   pg_log_t &log,
663   const coll_t& coll, const ghobject_t &log_oid,
664   map<eversion_t, hobject_t> &divergent_priors,
665   eversion_t dirty_to,
666   eversion_t dirty_from,
667   eversion_t writeout_from,
668   const set<eversion_t> &trimmed,
669   const set<string> &trimmed_dups,
670   bool dirty_divergent_priors,
671   bool touch_log,
672   bool require_rollback,
673   eversion_t dirty_to_dups,
674   eversion_t dirty_from_dups,
675   eversion_t write_from_dups,
676   set<string> *log_keys_debug
677   )
678 {
679   set<string> to_remove(trimmed_dups);
680   for (set<eversion_t>::const_iterator i = trimmed.begin();
681        i != trimmed.end();
682        ++i) {
683     to_remove.insert(i->get_key_name());
684     if (log_keys_debug) {
685       assert(log_keys_debug->count(i->get_key_name()));
686       log_keys_debug->erase(i->get_key_name());
687     }
688   }
689
690   // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl;
691   if (touch_log)
692     t.touch(coll, log_oid);
693   if (dirty_to != eversion_t()) {
694     t.omap_rmkeyrange(
695       coll, log_oid,
696       eversion_t().get_key_name(), dirty_to.get_key_name());
697     clear_up_to(log_keys_debug, dirty_to.get_key_name());
698   }
699   if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
700     // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
701     t.omap_rmkeyrange(
702       coll, log_oid,
703       dirty_from.get_key_name(), eversion_t::max().get_key_name());
704     clear_after(log_keys_debug, dirty_from.get_key_name());
705   }
706
707   for (list<pg_log_entry_t>::iterator p = log.log.begin();
708        p != log.log.end() && p->version <= dirty_to;
709        ++p) {
710     bufferlist bl(sizeof(*p) * 2);
711     p->encode_with_checksum(bl);
712     (*km)[p->get_key_name()].claim(bl);
713   }
714
715   for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
716        p != log.log.rend() &&
717          (p->version >= dirty_from || p->version >= writeout_from) &&
718          p->version >= dirty_to;
719        ++p) {
720     bufferlist bl(sizeof(*p) * 2);
721     p->encode_with_checksum(bl);
722     (*km)[p->get_key_name()].claim(bl);
723   }
724
725   if (log_keys_debug) {
726     for (map<string, bufferlist>::iterator i = (*km).begin();
727          i != (*km).end();
728          ++i) {
729       if (i->first[0] == '_')
730         continue;
731       assert(!log_keys_debug->count(i->first));
732       log_keys_debug->insert(i->first);
733     }
734   }
735
736   // process dups after log_keys_debug is filled, so dups do not
737   // end up in that set
738   if (dirty_to_dups != eversion_t()) {
739     pg_log_dup_t min, dirty_to_dup;
740     dirty_to_dup.version = dirty_to_dups;
741     t.omap_rmkeyrange(
742       coll, log_oid,
743       min.get_key_name(), dirty_to_dup.get_key_name());
744   }
745   if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
746     pg_log_dup_t max, dirty_from_dup;
747     max.version = eversion_t::max();
748     dirty_from_dup.version = dirty_from_dups;
749     t.omap_rmkeyrange(
750       coll, log_oid,
751       dirty_from_dup.get_key_name(), max.get_key_name());
752   }
753
754   for (const auto& entry : log.dups) {
755     if (entry.version > dirty_to_dups)
756       break;
757     bufferlist bl;
758     ::encode(entry, bl);
759     (*km)[entry.get_key_name()].claim(bl);
760   }
761
762   for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
763        p != log.dups.rend() &&
764          (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
765          p->version >= dirty_to_dups;
766        ++p) {
767     bufferlist bl;
768     ::encode(*p, bl);
769     (*km)[p->get_key_name()].claim(bl);
770   }
771
772   if (dirty_divergent_priors) {
773     //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
774     ::encode(divergent_priors, (*km)["divergent_priors"]);
775   }
776   if (require_rollback) {
777     ::encode(
778       log.get_can_rollback_to(),
779       (*km)["can_rollback_to"]);
780     ::encode(
781       log.get_rollback_info_trimmed_to(),
782       (*km)["rollback_info_trimmed_to"]);
783   }
784
785   if (!to_remove.empty())
786     t.omap_rmkeys(coll, log_oid, to_remove);
787 }
788
789 // static
790 void PGLog::_write_log_and_missing(
791   ObjectStore::Transaction& t,
792   map<string,bufferlist>* km,
793   pg_log_t &log,
794   const coll_t& coll, const ghobject_t &log_oid,
795   eversion_t dirty_to,
796   eversion_t dirty_from,
797   eversion_t writeout_from,
798   const set<eversion_t> &trimmed,
799   const set<string> &trimmed_dups,
800   const pg_missing_tracker_t &missing,
801   bool touch_log,
802   bool require_rollback,
803   bool clear_divergent_priors,
804   eversion_t dirty_to_dups,
805   eversion_t dirty_from_dups,
806   eversion_t write_from_dups,
807   bool *rebuilt_missing_with_deletes, // in/out param
808   set<string> *log_keys_debug
809   ) {
810   set<string> to_remove(trimmed_dups);
811   for (set<eversion_t>::const_iterator i = trimmed.begin();
812        i != trimmed.end();
813        ++i) {
814     to_remove.insert(i->get_key_name());
815     if (log_keys_debug) {
816       assert(log_keys_debug->count(i->get_key_name()));
817       log_keys_debug->erase(i->get_key_name());
818     }
819   }
820
821   if (touch_log)
822     t.touch(coll, log_oid);
823   if (dirty_to != eversion_t()) {
824     t.omap_rmkeyrange(
825       coll, log_oid,
826       eversion_t().get_key_name(), dirty_to.get_key_name());
827     clear_up_to(log_keys_debug, dirty_to.get_key_name());
828   }
829   if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
830     //   dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
831     t.omap_rmkeyrange(
832       coll, log_oid,
833       dirty_from.get_key_name(), eversion_t::max().get_key_name());
834     clear_after(log_keys_debug, dirty_from.get_key_name());
835   }
836
837   for (list<pg_log_entry_t>::iterator p = log.log.begin();
838        p != log.log.end() && p->version <= dirty_to;
839        ++p) {
840     bufferlist bl(sizeof(*p) * 2);
841     p->encode_with_checksum(bl);
842     (*km)[p->get_key_name()].claim(bl);
843   }
844
845   for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
846        p != log.log.rend() &&
847          (p->version >= dirty_from || p->version >= writeout_from) &&
848          p->version >= dirty_to;
849        ++p) {
850     bufferlist bl(sizeof(*p) * 2);
851     p->encode_with_checksum(bl);
852     (*km)[p->get_key_name()].claim(bl);
853   }
854
855   if (log_keys_debug) {
856     for (map<string, bufferlist>::iterator i = (*km).begin();
857          i != (*km).end();
858          ++i) {
859       if (i->first[0] == '_')
860         continue;
861       assert(!log_keys_debug->count(i->first));
862       log_keys_debug->insert(i->first);
863     }
864   }
865
866   // process dups after log_keys_debug is filled, so dups do not
867   // end up in that set
868   if (dirty_to_dups != eversion_t()) {
869     pg_log_dup_t min, dirty_to_dup;
870     dirty_to_dup.version = dirty_to_dups;
871     t.omap_rmkeyrange(
872       coll, log_oid,
873       min.get_key_name(), dirty_to_dup.get_key_name());
874   }
875   if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
876     pg_log_dup_t max, dirty_from_dup;
877     max.version = eversion_t::max();
878     dirty_from_dup.version = dirty_from_dups;
879     t.omap_rmkeyrange(
880       coll, log_oid,
881       dirty_from_dup.get_key_name(), max.get_key_name());
882   }
883
884   for (const auto& entry : log.dups) {
885     if (entry.version > dirty_to_dups)
886       break;
887     bufferlist bl;
888     ::encode(entry, bl);
889     (*km)[entry.get_key_name()].claim(bl);
890   }
891
892   for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
893        p != log.dups.rend() &&
894          (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
895          p->version >= dirty_to_dups;
896        ++p) {
897     bufferlist bl;
898     ::encode(*p, bl);
899     (*km)[p->get_key_name()].claim(bl);
900   }
901
902   if (clear_divergent_priors) {
903     //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
904     to_remove.insert("divergent_priors");
905   }
906   // since we encode individual missing items instead of a whole
907   // missing set, we need another key to store this bit of state
908   if (*rebuilt_missing_with_deletes) {
909     (*km)["may_include_deletes_in_missing"] = bufferlist();
910     *rebuilt_missing_with_deletes = false;
911   }
912   missing.get_changed(
913     [&](const hobject_t &obj) {
914       string key = string("missing/") + obj.to_str();
915       pg_missing_item item;
916       if (!missing.is_missing(obj, &item)) {
917         to_remove.insert(key);
918       } else {
919         uint64_t features = missing.may_include_deletes ? CEPH_FEATURE_OSD_RECOVERY_DELETES : 0;
920         ::encode(make_pair(obj, item), (*km)[key], features);
921       }
922     });
923   if (require_rollback) {
924     ::encode(
925       log.get_can_rollback_to(),
926       (*km)["can_rollback_to"]);
927     ::encode(
928       log.get_rollback_info_trimmed_to(),
929       (*km)["rollback_info_trimmed_to"]);
930   }
931
932   if (!to_remove.empty())
933     t.omap_rmkeys(coll, log_oid, to_remove);
934 }
935
936 void PGLog::rebuild_missing_set_with_deletes(ObjectStore *store,
937                                              coll_t pg_coll,
938                                              const pg_info_t &info)
939 {
940   // save entries not generated from the current log (e.g. added due
941   // to repair, EIO handling, or divergent_priors).
942   map<hobject_t, pg_missing_item> extra_missing;
943   for (const auto& p : missing.get_items()) {
944     if (!log.logged_object(p.first)) {
945       dout(20) << __func__ << " extra missing entry: " << p.first
946                << " " << p.second << dendl;
947       extra_missing[p.first] = p.second;
948     }
949   }
950   missing.clear();
951   missing.may_include_deletes = true;
952
953   // go through the log and add items that are not present or older
954   // versions on disk, just as if we were reading the log + metadata
955   // off disk originally
956   set<hobject_t> did;
957   for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
958        i != log.log.rend();
959        ++i) {
960     if (i->version <= info.last_complete)
961       break;
962     if (i->soid > info.last_backfill ||
963         i->is_error() ||
964         did.find(i->soid) != did.end())
965       continue;
966     did.insert(i->soid);
967
968     bufferlist bv;
969     int r = store->getattr(
970         pg_coll,
971         ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
972         OI_ATTR,
973         bv);
974     dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl;
975
976     if (r >= 0) {
977       object_info_t oi(bv);
978       dout(20) << __func__ << " store version = " << oi.version << dendl;
979       if (oi.version < i->version) {
980         missing.add(i->soid, i->version, oi.version, i->is_delete());
981       }
982     } else {
983       missing.add(i->soid, i->version, eversion_t(), i->is_delete());
984     }
985   }
986
987   for (const auto& p : extra_missing) {
988     missing.add(p.first, p.second.need, p.second.have, p.second.is_delete());
989   }
990   rebuilt_missing_with_deletes = true;
991 }