Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
25
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
29 #undef dout_prefix
30 #define dout_prefix _prefix(_dout, this)
31 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32   return *_dout << pgb->get_parent()->gen_dbg_prefix();
33 }
34
35 namespace {
36 class PG_SendMessageOnConn: public Context {
37   PGBackend::Listener *pg;
38   Message *reply;
39   ConnectionRef conn;
40   public:
41   PG_SendMessageOnConn(
42     PGBackend::Listener *pg,
43     Message *reply,
44     ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45   void finish(int) override {
46     pg->send_message_osd_cluster(reply, conn.get());
47   }
48 };
49
50 class PG_RecoveryQueueAsync : public Context {
51   PGBackend::Listener *pg;
52   unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53   public:
54   PG_RecoveryQueueAsync(
55     PGBackend::Listener *pg,
56     GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57   void finish(int) override {
58     pg->schedule_recovery_work(c.release());
59   }
60 };
61 }
62
63 struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64   ReplicatedBackend *pg;
65   RepModifyRef rm;
66   C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67     : pg(pg), rm(r) {}
68   void finish(int r) override {
69     pg->repop_applied(rm);
70   }
71 };
72
73 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74   ReplicatedBackend *pg;
75   RepModifyRef rm;
76   C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77     : pg(pg), rm(r) {}
78   void finish(int r) override {
79     pg->repop_commit(rm);
80   }
81 };
82
83 static void log_subop_stats(
84   PerfCounters *logger,
85   OpRequestRef op, int subop)
86 {
87   utime_t now = ceph_clock_now();
88   utime_t latency = now;
89   latency -= op->get_req()->get_recv_stamp();
90
91
92   logger->inc(l_osd_sop);
93   logger->tinc(l_osd_sop_lat, latency);
94   logger->inc(subop);
95
96   if (subop != l_osd_sop_pull) {
97     uint64_t inb = op->get_req()->get_data().length();
98     logger->inc(l_osd_sop_inb, inb);
99     if (subop == l_osd_sop_w) {
100       logger->inc(l_osd_sop_w_inb, inb);
101       logger->tinc(l_osd_sop_w_lat, latency);
102     } else if (subop == l_osd_sop_push) {
103       logger->inc(l_osd_sop_push_inb, inb);
104       logger->tinc(l_osd_sop_push_lat, latency);
105     } else
106       assert("no support subop" == 0);
107   } else {
108     logger->tinc(l_osd_sop_pull_lat, latency);
109   }
110 }
111
112 ReplicatedBackend::ReplicatedBackend(
113   PGBackend::Listener *pg,
114   coll_t coll,
115   ObjectStore::CollectionHandle &c,
116   ObjectStore *store,
117   CephContext *cct) :
118   PGBackend(cct, pg, store, coll, c) {}
119
120 void ReplicatedBackend::run_recovery_op(
121   PGBackend::RecoveryHandle *_h,
122   int priority)
123 {
124   RPGHandle *h = static_cast<RPGHandle *>(_h);
125   send_pushes(priority, h->pushes);
126   send_pulls(priority, h->pulls);
127   send_recovery_deletes(priority, h->deletes);
128   delete h;
129 }
130
131 int ReplicatedBackend::recover_object(
132   const hobject_t &hoid,
133   eversion_t v,
134   ObjectContextRef head,
135   ObjectContextRef obc,
136   RecoveryHandle *_h
137   )
138 {
139   dout(10) << __func__ << ": " << hoid << dendl;
140   RPGHandle *h = static_cast<RPGHandle *>(_h);
141   if (get_parent()->get_local_missing().is_missing(hoid)) {
142     assert(!obc);
143     // pull
144     prepare_pull(
145       v,
146       hoid,
147       head,
148       h);
149   } else {
150     assert(obc);
151     int started = start_pushes(
152       hoid,
153       obc,
154       h);
155     if (started < 0) {
156       pushing[hoid].clear();
157       return started;
158     }
159   }
160   return 0;
161 }
162
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
164 {
165   for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
166       i != pull_from_peer.end();
167       ) {
168     if (osdmap->is_down(i->first.osd)) {
169       dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
170                << ", osdmap has it marked down" << dendl;
171       for (set<hobject_t>::iterator j = i->second.begin();
172            j != i->second.end();
173            ++j) {
174         get_parent()->cancel_pull(*j);
175         clear_pull(pulling.find(*j), false);
176       }
177       pull_from_peer.erase(i++);
178     } else {
179       ++i;
180     }
181   }
182 }
183
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
185 {
186   dout(10) << __func__ << ": " << op << dendl;
187   switch (op->get_req()->get_type()) {
188   case MSG_OSD_PG_PULL:
189     return true;
190   default:
191     return false;
192   }
193 }
194
195 bool ReplicatedBackend::_handle_message(
196   OpRequestRef op
197   )
198 {
199   dout(10) << __func__ << ": " << op << dendl;
200   switch (op->get_req()->get_type()) {
201   case MSG_OSD_PG_PUSH:
202     do_push(op);
203     return true;
204
205   case MSG_OSD_PG_PULL:
206     do_pull(op);
207     return true;
208
209   case MSG_OSD_PG_PUSH_REPLY:
210     do_push_reply(op);
211     return true;
212
213   case MSG_OSD_SUBOP: {
214     const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
215     if (m->ops.size() == 0) {
216       assert(0);
217     }
218     break;
219   }
220
221   case MSG_OSD_REPOP: {
222     do_repop(op);
223     return true;
224   }
225
226   case MSG_OSD_REPOPREPLY: {
227     do_repop_reply(op);
228     return true;
229   }
230
231   default:
232     break;
233   }
234   return false;
235 }
236
237 void ReplicatedBackend::clear_recovery_state()
238 {
239   // clear pushing/pulling maps
240   for (auto &&i: pushing) {
241     for (auto &&j: i.second) {
242       get_parent()->release_locks(j.second.lock_manager);
243     }
244   }
245   pushing.clear();
246
247   for (auto &&i: pulling) {
248     get_parent()->release_locks(i.second.lock_manager);
249   }
250   pulling.clear();
251   pull_from_peer.clear();
252 }
253
254 void ReplicatedBackend::on_change()
255 {
256   dout(10) << __func__ << dendl;
257   for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
258        i != in_progress_ops.end();
259        in_progress_ops.erase(i++)) {
260     if (i->second.on_commit)
261       delete i->second.on_commit;
262     if (i->second.on_applied)
263       delete i->second.on_applied;
264   }
265   clear_recovery_state();
266 }
267
268 void ReplicatedBackend::on_flushed()
269 {
270 }
271
272 int ReplicatedBackend::objects_read_sync(
273   const hobject_t &hoid,
274   uint64_t off,
275   uint64_t len,
276   uint32_t op_flags,
277   bufferlist *bl)
278 {
279   return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
280 }
281
282 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
283   int r;
284   Context *c;
285   AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
286   void finish(ThreadPool::TPHandle&) override {
287     c->complete(r);
288     c = NULL;
289   }
290   ~AsyncReadCallback() override {
291     delete c;
292   }
293 };
294 void ReplicatedBackend::objects_read_async(
295   const hobject_t &hoid,
296   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
297                   pair<bufferlist*, Context*> > > &to_read,
298   Context *on_complete,
299   bool fast_read)
300 {
301   // There is no fast read implementation for replication backend yet
302   assert(!fast_read);
303
304   int r = 0;
305   for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
306                  pair<bufferlist*, Context*> > >::const_iterator i =
307            to_read.begin();
308        i != to_read.end() && r >= 0;
309        ++i) {
310     int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
311                          i->first.get<1>(), *(i->second.first),
312                          i->first.get<2>());
313     if (i->second.second) {
314       get_parent()->schedule_recovery_work(
315         get_parent()->bless_gencontext(
316           new AsyncReadCallback(_r, i->second.second)));
317     }
318     if (_r < 0)
319       r = _r;
320   }
321   get_parent()->schedule_recovery_work(
322     get_parent()->bless_gencontext(
323       new AsyncReadCallback(r, on_complete)));
324 }
325
326 class C_OSD_OnOpCommit : public Context {
327   ReplicatedBackend *pg;
328   ReplicatedBackend::InProgressOp *op;
329 public:
330   C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
331     : pg(pg), op(op) {}
332   void finish(int) override {
333     pg->op_commit(op);
334   }
335 };
336
337 class C_OSD_OnOpApplied : public Context {
338   ReplicatedBackend *pg;
339   ReplicatedBackend::InProgressOp *op;
340 public:
341   C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
342     : pg(pg), op(op) {}
343   void finish(int) override {
344     pg->op_applied(op);
345   }
346 };
347
348 void generate_transaction(
349   PGTransactionUPtr &pgt,
350   const coll_t &coll,
351   bool legacy_log_entries,
352   vector<pg_log_entry_t> &log_entries,
353   ObjectStore::Transaction *t,
354   set<hobject_t> *added,
355   set<hobject_t> *removed)
356 {
357   assert(t);
358   assert(added);
359   assert(removed);
360
361   for (auto &&le: log_entries) {
362     le.mark_unrollbackable();
363     auto oiter = pgt->op_map.find(le.soid);
364     if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
365       bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
366       ::encode(oiter->second.updated_snaps->second, bl);
367       le.snaps.swap(bl);
368       le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
369     }
370   }
371
372   pgt->safe_create_traverse(
373     [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
374       const hobject_t &oid = obj_op.first;
375       const ghobject_t goid =
376         ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
377       const PGTransaction::ObjectOperation &op = obj_op.second;
378
379       if (oid.is_temp()) {
380         if (op.is_fresh_object()) {
381           added->insert(oid);
382         } else if (op.is_delete()) {
383           removed->insert(oid);
384         }
385       }
386
387       if (op.delete_first) {
388         t->remove(coll, goid);
389       }
390
391       match(
392         op.init_type,
393         [&](const PGTransaction::ObjectOperation::Init::None &) {
394         },
395         [&](const PGTransaction::ObjectOperation::Init::Create &op) {
396           t->touch(coll, goid);
397         },
398         [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
399           t->clone(
400             coll,
401             ghobject_t(
402               op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
403             goid);
404         },
405         [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
406           assert(op.source.is_temp());
407           t->collection_move_rename(
408             coll,
409             ghobject_t(
410               op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
411             coll,
412             goid);
413         });
414
415       if (op.truncate) {
416         t->truncate(coll, goid, op.truncate->first);
417         if (op.truncate->first != op.truncate->second)
418           t->truncate(coll, goid, op.truncate->second);
419       }
420
421       if (!op.attr_updates.empty()) {
422         map<string, bufferlist> attrs;
423         for (auto &&p: op.attr_updates) {
424           if (p.second)
425             attrs[p.first] = *(p.second);
426           else
427             t->rmattr(coll, goid, p.first);
428         }
429         t->setattrs(coll, goid, attrs);
430       }
431
432       if (op.clear_omap)
433         t->omap_clear(coll, goid);
434       if (op.omap_header)
435         t->omap_setheader(coll, goid, *(op.omap_header));
436
437       for (auto &&up: op.omap_updates) {
438         using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
439         switch (up.first) {
440         case UpdateType::Remove:
441           t->omap_rmkeys(coll, goid, up.second);
442           break;
443         case UpdateType::Insert:
444           t->omap_setkeys(coll, goid, up.second);
445           break;
446         }
447       }
448
449       // updated_snaps doesn't matter since we marked unrollbackable
450
451       if (op.alloc_hint) {
452         auto &hint = *(op.alloc_hint);
453         t->set_alloc_hint(
454           coll,
455           goid,
456           hint.expected_object_size,
457           hint.expected_write_size,
458           hint.flags);
459       }
460
461       for (auto &&extent: op.buffer_updates) {
462         using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
463         match(
464           extent.get_val(),
465           [&](const BufferUpdate::Write &op) {
466             t->write(
467               coll,
468               goid,
469               extent.get_off(),
470               extent.get_len(),
471               op.buffer);
472           },
473           [&](const BufferUpdate::Zero &op) {
474             t->zero(
475               coll,
476               goid,
477               extent.get_off(),
478               extent.get_len());
479           },
480           [&](const BufferUpdate::CloneRange &op) {
481             assert(op.len == extent.get_len());
482             t->clone_range(
483               coll,
484               ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
485               goid,
486               op.offset,
487               extent.get_len(),
488               extent.get_off());
489           });
490       }
491     });
492 }
493
494 void ReplicatedBackend::submit_transaction(
495   const hobject_t &soid,
496   const object_stat_sum_t &delta_stats,
497   const eversion_t &at_version,
498   PGTransactionUPtr &&_t,
499   const eversion_t &trim_to,
500   const eversion_t &roll_forward_to,
501   const vector<pg_log_entry_t> &_log_entries,
502   boost::optional<pg_hit_set_history_t> &hset_history,
503   Context *on_local_applied_sync,
504   Context *on_all_acked,
505   Context *on_all_commit,
506   ceph_tid_t tid,
507   osd_reqid_t reqid,
508   OpRequestRef orig_op)
509 {
510   parent->apply_stats(
511     soid,
512     delta_stats);
513
514   vector<pg_log_entry_t> log_entries(_log_entries);
515   ObjectStore::Transaction op_t;
516   PGTransactionUPtr t(std::move(_t));
517   set<hobject_t> added, removed;
518   generate_transaction(
519     t,
520     coll,
521     (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
522     log_entries,
523     &op_t,
524     &added,
525     &removed);
526   assert(added.size() <= 1);
527   assert(removed.size() <= 1);
528
529   assert(!in_progress_ops.count(tid));
530   InProgressOp &op = in_progress_ops.insert(
531     make_pair(
532       tid,
533       InProgressOp(
534         tid, on_all_commit, on_all_acked,
535         orig_op, at_version)
536       )
537     ).first->second;
538
539   op.waiting_for_applied.insert(
540     parent->get_actingbackfill_shards().begin(),
541     parent->get_actingbackfill_shards().end());
542   op.waiting_for_commit.insert(
543     parent->get_actingbackfill_shards().begin(),
544     parent->get_actingbackfill_shards().end());
545
546   issue_op(
547     soid,
548     at_version,
549     tid,
550     reqid,
551     trim_to,
552     at_version,
553     added.size() ? *(added.begin()) : hobject_t(),
554     removed.size() ? *(removed.begin()) : hobject_t(),
555     log_entries,
556     hset_history,
557     &op,
558     op_t);
559
560   add_temp_objs(added);
561   clear_temp_objs(removed);
562
563   parent->log_operation(
564     log_entries,
565     hset_history,
566     trim_to,
567     at_version,
568     true,
569     op_t);
570   
571   op_t.register_on_applied_sync(on_local_applied_sync);
572   op_t.register_on_applied(
573     parent->bless_context(
574       new C_OSD_OnOpApplied(this, &op)));
575   op_t.register_on_commit(
576     parent->bless_context(
577       new C_OSD_OnOpCommit(this, &op)));
578
579   vector<ObjectStore::Transaction> tls;
580   tls.push_back(std::move(op_t));
581
582   parent->queue_transactions(tls, op.op);
583 }
584
585 void ReplicatedBackend::op_applied(
586   InProgressOp *op)
587 {
588   FUNCTRACE();
589   OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
590   dout(10) << __func__ << ": " << op->tid << dendl;
591   if (op->op) {
592     op->op->mark_event("op_applied");
593     op->op->pg_trace.event("op applied");
594   }
595
596   op->waiting_for_applied.erase(get_parent()->whoami_shard());
597   parent->op_applied(op->v);
598
599   if (op->waiting_for_applied.empty()) {
600     op->on_applied->complete(0);
601     op->on_applied = 0;
602   }
603   if (op->done()) {
604     assert(!op->on_commit && !op->on_applied);
605     in_progress_ops.erase(op->tid);
606   }
607 }
608
609 void ReplicatedBackend::op_commit(
610   InProgressOp *op)
611 {
612   FUNCTRACE();
613   OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
614   dout(10) << __func__ << ": " << op->tid << dendl;
615   if (op->op) {
616     op->op->mark_event("op_commit");
617     op->op->pg_trace.event("op commit");
618   }
619
620   op->waiting_for_commit.erase(get_parent()->whoami_shard());
621
622   if (op->waiting_for_commit.empty()) {
623     op->on_commit->complete(0);
624     op->on_commit = 0;
625   }
626   if (op->done()) {
627     assert(!op->on_commit && !op->on_applied);
628     in_progress_ops.erase(op->tid);
629   }
630 }
631
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
633 {
634   static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
635   const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
636   assert(r->get_header().type == MSG_OSD_REPOPREPLY);
637
638   op->mark_started();
639
640   // must be replication.
641   ceph_tid_t rep_tid = r->get_tid();
642   pg_shard_t from = r->from;
643
644   if (in_progress_ops.count(rep_tid)) {
645     map<ceph_tid_t, InProgressOp>::iterator iter =
646       in_progress_ops.find(rep_tid);
647     InProgressOp &ip_op = iter->second;
648     const MOSDOp *m = NULL;
649     if (ip_op.op)
650       m = static_cast<const MOSDOp *>(ip_op.op->get_req());
651
652     if (m)
653       dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
654               << " ack_type " << (int)r->ack_type
655               << " from " << from
656               << dendl;
657     else
658       dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
659               << " ack_type " << (int)r->ack_type
660               << " from " << from
661               << dendl;
662
663     // oh, good.
664
665     if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
666       assert(ip_op.waiting_for_commit.count(from));
667       ip_op.waiting_for_commit.erase(from);
668       if (ip_op.op) {
669         ostringstream ss;
670         ss << "sub_op_commit_rec from " << from;
671         ip_op.op->mark_event_string(ss.str());
672         ip_op.op->pg_trace.event("sub_op_commit_rec");
673       }
674     } else {
675       assert(ip_op.waiting_for_applied.count(from));
676       if (ip_op.op) {
677         ostringstream ss;
678         ss << "sub_op_applied_rec from " << from;
679         ip_op.op->mark_event_string(ss.str());
680         ip_op.op->pg_trace.event("sub_op_applied_rec");
681       }
682     }
683     ip_op.waiting_for_applied.erase(from);
684
685     parent->update_peer_last_complete_ondisk(
686       from,
687       r->get_last_complete_ondisk());
688
689     if (ip_op.waiting_for_applied.empty() &&
690         ip_op.on_applied) {
691       ip_op.on_applied->complete(0);
692       ip_op.on_applied = 0;
693     }
694     if (ip_op.waiting_for_commit.empty() &&
695         ip_op.on_commit) {
696       ip_op.on_commit->complete(0);
697       ip_op.on_commit= 0;
698     }
699     if (ip_op.done()) {
700       assert(!ip_op.on_commit && !ip_op.on_applied);
701       in_progress_ops.erase(iter);
702     }
703   }
704 }
705
706 void ReplicatedBackend::be_deep_scrub(
707   const hobject_t &poid,
708   uint32_t seed,
709   ScrubMap::object &o,
710   ThreadPool::TPHandle &handle)
711 {
712   dout(10) << __func__ << " " << poid << " seed " 
713            << std::hex << seed << std::dec << dendl;
714   bufferhash h(seed), oh(seed);
715   bufferlist bl, hdrbl;
716   int r;
717   __u64 pos = 0;
718
719   uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
720
721   while (true) {
722     handle.reset_tp_timeout();
723     r = store->read(
724           ch,
725           ghobject_t(
726             poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
727           pos,
728           cct->_conf->osd_deep_scrub_stride, bl,
729           fadvise_flags);
730     if (r <= 0)
731       break;
732
733     h << bl;
734     pos += bl.length();
735     bl.clear();
736   }
737   if (r == -EIO) {
738     dout(25) << __func__ << "  " << poid << " got "
739              << r << " on read, read_error" << dendl;
740     o.read_error = true;
741     return;
742   }
743   o.digest = h.digest();
744   o.digest_present = true;
745
746   bl.clear();
747   r = store->omap_get_header(
748     coll,
749     ghobject_t(
750       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
751     &hdrbl, true);
752   // NOTE: bobtail to giant, we would crc the head as (len, head).
753   // that changes at the same time we start using a non-zero seed.
754   if (r == 0 && hdrbl.length()) {
755     dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
756              << dendl;
757     if (seed == 0) {
758       // legacy
759       bufferlist bl;
760       ::encode(hdrbl, bl);
761       oh << bl;
762     } else {
763       oh << hdrbl;
764     }
765   } else if (r == -EIO) {
766     dout(25) << __func__ << "  " << poid << " got "
767              << r << " on omap header read, read_error" << dendl;
768     o.read_error = true;
769     return;
770   }
771
772   ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
773     coll,
774     ghobject_t(
775       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
776   assert(iter);
777   for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
778     iter->next(false)) {
779     handle.reset_tp_timeout();
780
781     dout(25) << "CRC key " << iter->key() << " value:\n";
782     iter->value().hexdump(*_dout);
783     *_dout << dendl;
784
785     ::encode(iter->key(), bl);
786     ::encode(iter->value(), bl);
787     oh << bl;
788     bl.clear();
789   }
790
791   if (iter->status() < 0) {
792     dout(25) << __func__ << "  " << poid
793              << " on omap scan, db status error" << dendl;
794     o.read_error = true;
795     return;
796   }
797
798   //Store final calculated CRC32 of omap header & key/values
799   o.omap_digest = oh.digest();
800   o.omap_digest_present = true;
801   dout(20) << __func__ << "  " << poid << " omap_digest "
802            << std::hex << o.omap_digest << std::dec << dendl;
803 }
804
805 void ReplicatedBackend::_do_push(OpRequestRef op)
806 {
807   const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
808   assert(m->get_type() == MSG_OSD_PG_PUSH);
809   pg_shard_t from = m->from;
810
811   op->mark_started();
812
813   vector<PushReplyOp> replies;
814   ObjectStore::Transaction t;
815   ostringstream ss;
816   if (get_parent()->check_failsafe_full(ss)) {
817     dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
818     ceph_abort();
819   }
820   for (vector<PushOp>::const_iterator i = m->pushes.begin();
821        i != m->pushes.end();
822        ++i) {
823     replies.push_back(PushReplyOp());
824     handle_push(from, *i, &(replies.back()), &t);
825   }
826
827   MOSDPGPushReply *reply = new MOSDPGPushReply;
828   reply->from = get_parent()->whoami_shard();
829   reply->set_priority(m->get_priority());
830   reply->pgid = get_info().pgid;
831   reply->map_epoch = m->map_epoch;
832   reply->min_epoch = m->min_epoch;
833   reply->replies.swap(replies);
834   reply->compute_cost(cct);
835
836   t.register_on_complete(
837     new PG_SendMessageOnConn(
838       get_parent(), reply, m->get_connection()));
839
840   get_parent()->queue_transaction(std::move(t));
841 }
842
843 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
844   ReplicatedBackend *bc;
845   list<ReplicatedBackend::pull_complete_info> to_continue;
846   int priority;
847   C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
848     : bc(bc), priority(priority) {}
849
850   void finish(ThreadPool::TPHandle &handle) override {
851     ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
852     for (auto &&i: to_continue) {
853       auto j = bc->pulling.find(i.hoid);
854       assert(j != bc->pulling.end());
855       ObjectContextRef obc = j->second.obc;
856       bc->clear_pull(j, false /* already did it */);
857       int started = bc->start_pushes(i.hoid, obc, h);
858       if (started < 0) {
859         bc->pushing[i.hoid].clear();
860         bc->get_parent()->primary_failed(i.hoid);
861         bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
862       } else if (!started) {
863         bc->get_parent()->on_global_recover(
864           i.hoid, i.stat, false);
865       }
866       handle.reset_tp_timeout();
867     }
868     bc->run_recovery_op(h, priority);
869   }
870 };
871
872 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
873 {
874   const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
875   assert(m->get_type() == MSG_OSD_PG_PUSH);
876   pg_shard_t from = m->from;
877
878   op->mark_started();
879
880   vector<PullOp> replies(1);
881
882   ostringstream ss;
883   if (get_parent()->check_failsafe_full(ss)) {
884     dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
885     ceph_abort();
886   }
887
888   ObjectStore::Transaction t;
889   list<pull_complete_info> to_continue;
890   for (vector<PushOp>::const_iterator i = m->pushes.begin();
891        i != m->pushes.end();
892        ++i) {
893     bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
894     if (more)
895       replies.push_back(PullOp());
896   }
897   if (!to_continue.empty()) {
898     C_ReplicatedBackend_OnPullComplete *c =
899       new C_ReplicatedBackend_OnPullComplete(
900         this,
901         m->get_priority());
902     c->to_continue.swap(to_continue);
903     t.register_on_complete(
904       new PG_RecoveryQueueAsync(
905         get_parent(),
906         get_parent()->bless_gencontext(c)));
907   }
908   replies.erase(replies.end() - 1);
909
910   if (replies.size()) {
911     MOSDPGPull *reply = new MOSDPGPull;
912     reply->from = parent->whoami_shard();
913     reply->set_priority(m->get_priority());
914     reply->pgid = get_info().pgid;
915     reply->map_epoch = m->map_epoch;
916     reply->min_epoch = m->min_epoch;
917     reply->set_pulls(&replies);
918     reply->compute_cost(cct);
919
920     t.register_on_complete(
921       new PG_SendMessageOnConn(
922         get_parent(), reply, m->get_connection()));
923   }
924
925   get_parent()->queue_transaction(std::move(t));
926 }
927
928 void ReplicatedBackend::do_pull(OpRequestRef op)
929 {
930   MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
931   assert(m->get_type() == MSG_OSD_PG_PULL);
932   pg_shard_t from = m->from;
933
934   map<pg_shard_t, vector<PushOp> > replies;
935   vector<PullOp> pulls;
936   m->take_pulls(&pulls);
937   for (auto& i : pulls) {
938     replies[from].push_back(PushOp());
939     handle_pull(from, i, &(replies[from].back()));
940   }
941   send_pushes(m->get_priority(), replies);
942 }
943
944 void ReplicatedBackend::do_push_reply(OpRequestRef op)
945 {
946   const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
947   assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
948   pg_shard_t from = m->from;
949
950   vector<PushOp> replies(1);
951   for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
952        i != m->replies.end();
953        ++i) {
954     bool more = handle_push_reply(from, *i, &(replies.back()));
955     if (more)
956       replies.push_back(PushOp());
957   }
958   replies.erase(replies.end() - 1);
959
960   map<pg_shard_t, vector<PushOp> > _replies;
961   _replies[from].swap(replies);
962   send_pushes(m->get_priority(), _replies);
963 }
964
965 Message * ReplicatedBackend::generate_subop(
966   const hobject_t &soid,
967   const eversion_t &at_version,
968   ceph_tid_t tid,
969   osd_reqid_t reqid,
970   eversion_t pg_trim_to,
971   eversion_t pg_roll_forward_to,
972   hobject_t new_temp_oid,
973   hobject_t discard_temp_oid,
974   const vector<pg_log_entry_t> &log_entries,
975   boost::optional<pg_hit_set_history_t> &hset_hist,
976   ObjectStore::Transaction &op_t,
977   pg_shard_t peer,
978   const pg_info_t &pinfo)
979 {
980   int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
981   // forward the write/update/whatever
982   MOSDRepOp *wr = new MOSDRepOp(
983     reqid, parent->whoami_shard(),
984     spg_t(get_info().pgid.pgid, peer.shard),
985     soid, acks_wanted,
986     get_osdmap()->get_epoch(),
987     parent->get_last_peering_reset_epoch(),
988     tid, at_version);
989
990   // ship resulting transaction, log entries, and pg_stats
991   if (!parent->should_send_op(peer, soid)) {
992     dout(10) << "issue_repop shipping empty opt to osd." << peer
993              <<", object " << soid
994              << " beyond MAX(last_backfill_started "
995              << ", pinfo.last_backfill "
996              << pinfo.last_backfill << ")" << dendl;
997     ObjectStore::Transaction t;
998     ::encode(t, wr->get_data());
999   } else {
1000     ::encode(op_t, wr->get_data());
1001     wr->get_header().data_off = op_t.get_data_alignment();
1002   }
1003
1004   ::encode(log_entries, wr->logbl);
1005
1006   if (pinfo.is_incomplete())
1007     wr->pg_stats = pinfo.stats;  // reflects backfill progress
1008   else
1009     wr->pg_stats = get_info().stats;
1010
1011   wr->pg_trim_to = pg_trim_to;
1012   wr->pg_roll_forward_to = pg_roll_forward_to;
1013
1014   wr->new_temp_oid = new_temp_oid;
1015   wr->discard_temp_oid = discard_temp_oid;
1016   wr->updated_hit_set_history = hset_hist;
1017   return wr;
1018 }
1019
1020 void ReplicatedBackend::issue_op(
1021   const hobject_t &soid,
1022   const eversion_t &at_version,
1023   ceph_tid_t tid,
1024   osd_reqid_t reqid,
1025   eversion_t pg_trim_to,
1026   eversion_t pg_roll_forward_to,
1027   hobject_t new_temp_oid,
1028   hobject_t discard_temp_oid,
1029   const vector<pg_log_entry_t> &log_entries,
1030   boost::optional<pg_hit_set_history_t> &hset_hist,
1031   InProgressOp *op,
1032   ObjectStore::Transaction &op_t)
1033 {
1034   if (op->op)
1035     op->op->pg_trace.event("issue replication ops");
1036
1037   if (parent->get_actingbackfill_shards().size() > 1) {
1038     ostringstream ss;
1039     set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1040     replicas.erase(parent->whoami_shard());
1041     ss << "waiting for subops from " << replicas;
1042     if (op->op)
1043       op->op->mark_sub_op_sent(ss.str());
1044   }
1045   for (set<pg_shard_t>::const_iterator i =
1046          parent->get_actingbackfill_shards().begin();
1047        i != parent->get_actingbackfill_shards().end();
1048        ++i) {
1049     if (*i == parent->whoami_shard()) continue;
1050     pg_shard_t peer = *i;
1051     const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1052
1053     Message *wr;
1054     wr = generate_subop(
1055       soid,
1056       at_version,
1057       tid,
1058       reqid,
1059       pg_trim_to,
1060       pg_roll_forward_to,
1061       new_temp_oid,
1062       discard_temp_oid,
1063       log_entries,
1064       hset_hist,
1065       op_t,
1066       peer,
1067       pinfo);
1068     if (op->op)
1069       wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1070     get_parent()->send_message_osd_cluster(
1071       peer.osd, wr, get_osdmap()->get_epoch());
1072   }
1073 }
1074
1075 // sub op modify
1076 void ReplicatedBackend::do_repop(OpRequestRef op)
1077 {
1078   static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1079   const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1080   int msg_type = m->get_type();
1081   assert(MSG_OSD_REPOP == msg_type);
1082
1083   const hobject_t& soid = m->poid;
1084
1085   dout(10) << __func__ << " " << soid
1086            << " v " << m->version
1087            << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1088            << " " << m->logbl.length()
1089            << dendl;
1090
1091   // sanity checks
1092   assert(m->map_epoch >= get_info().history.same_interval_since);
1093
1094   // we better not be missing this.
1095   assert(!parent->get_log().get_missing().is_missing(soid));
1096
1097   int ackerosd = m->get_source().num();
1098
1099   op->mark_started();
1100
1101   RepModifyRef rm(std::make_shared<RepModify>());
1102   rm->op = op;
1103   rm->ackerosd = ackerosd;
1104   rm->last_complete = get_info().last_complete;
1105   rm->epoch_started = get_osdmap()->get_epoch();
1106
1107   assert(m->logbl.length());
1108   // shipped transaction and log entries
1109   vector<pg_log_entry_t> log;
1110
1111   bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1112   ::decode(rm->opt, p);
1113
1114   if (m->new_temp_oid != hobject_t()) {
1115     dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1116     add_temp_obj(m->new_temp_oid);
1117   }
1118   if (m->discard_temp_oid != hobject_t()) {
1119     dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1120     if (rm->opt.empty()) {
1121       dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1122                << " since we won't get the transaction" << dendl;
1123       rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1124     }
1125     clear_temp_obj(m->discard_temp_oid);
1126   }
1127
1128   p = const_cast<bufferlist&>(m->logbl).begin();
1129   ::decode(log, p);
1130   rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1131
1132   bool update_snaps = false;
1133   if (!rm->opt.empty()) {
1134     // If the opt is non-empty, we infer we are before
1135     // last_backfill (according to the primary, not our
1136     // not-quite-accurate value), and should update the
1137     // collections now.  Otherwise, we do it later on push.
1138     update_snaps = true;
1139   }
1140   parent->update_stats(m->pg_stats);
1141   parent->log_operation(
1142     log,
1143     m->updated_hit_set_history,
1144     m->pg_trim_to,
1145     m->pg_roll_forward_to,
1146     update_snaps,
1147     rm->localt);
1148
1149   rm->opt.register_on_commit(
1150     parent->bless_context(
1151       new C_OSD_RepModifyCommit(this, rm)));
1152   rm->localt.register_on_applied(
1153     parent->bless_context(
1154       new C_OSD_RepModifyApply(this, rm)));
1155   vector<ObjectStore::Transaction> tls;
1156   tls.reserve(2);
1157   tls.push_back(std::move(rm->localt));
1158   tls.push_back(std::move(rm->opt));
1159   parent->queue_transactions(tls, op);
1160   // op is cleaned up by oncommit/onapply when both are executed
1161 }
1162
1163 void ReplicatedBackend::repop_applied(RepModifyRef rm)
1164 {
1165   rm->op->mark_event("sub_op_applied");
1166   rm->applied = true;
1167   rm->op->pg_trace.event("sup_op_applied");
1168
1169   dout(10) << __func__ << " on " << rm << " op "
1170            << *rm->op->get_req() << dendl;
1171   const Message *m = rm->op->get_req();
1172   const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1173   eversion_t version = req->version;
1174
1175   // send ack to acker only if we haven't sent a commit already
1176   if (!rm->committed) {
1177     Message *ack = new MOSDRepOpReply(
1178       req, parent->whoami_shard(),
1179       0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1180     ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1181     ack->trace = rm->op->pg_trace;
1182     get_parent()->send_message_osd_cluster(
1183       rm->ackerosd, ack, get_osdmap()->get_epoch());
1184   }
1185
1186   parent->op_applied(version);
1187 }
1188
1189 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1190 {
1191   rm->op->mark_commit_sent();
1192   rm->op->pg_trace.event("sup_op_commit");
1193   rm->committed = true;
1194
1195   // send commit.
1196   const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1197   assert(m->get_type() == MSG_OSD_REPOP);
1198   dout(10) << __func__ << " on op " << *m
1199            << ", sending commit to osd." << rm->ackerosd
1200            << dendl;
1201   assert(get_osdmap()->is_up(rm->ackerosd));
1202
1203   get_parent()->update_last_complete_ondisk(rm->last_complete);
1204
1205   MOSDRepOpReply *reply = new MOSDRepOpReply(
1206     m,
1207     get_parent()->whoami_shard(),
1208     0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1209   reply->set_last_complete_ondisk(rm->last_complete);
1210   reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1211   reply->trace = rm->op->pg_trace;
1212   get_parent()->send_message_osd_cluster(
1213     rm->ackerosd, reply, get_osdmap()->get_epoch());
1214
1215   log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1216 }
1217
1218
1219 // ===========================================================
1220
1221 void ReplicatedBackend::calc_head_subsets(
1222   ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1223   const pg_missing_t& missing,
1224   const hobject_t &last_backfill,
1225   interval_set<uint64_t>& data_subset,
1226   map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1227   ObcLockManager &manager)
1228 {
1229   dout(10) << "calc_head_subsets " << head
1230            << " clone_overlap " << snapset.clone_overlap << dendl;
1231
1232   uint64_t size = obc->obs.oi.size;
1233   if (size)
1234     data_subset.insert(0, size);
1235
1236   if (get_parent()->get_pool().allow_incomplete_clones()) {
1237     dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1238     return;
1239   }
1240
1241   if (!cct->_conf->osd_recover_clone_overlap) {
1242     dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1243     return;
1244   }
1245
1246
1247   interval_set<uint64_t> cloning;
1248   interval_set<uint64_t> prev;
1249   if (size)
1250     prev.insert(0, size);
1251
1252   for (int j=snapset.clones.size()-1; j>=0; j--) {
1253     hobject_t c = head;
1254     c.snap = snapset.clones[j];
1255     prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1256     if (!missing.is_missing(c) &&
1257         c < last_backfill &&
1258         get_parent()->try_lock_for_read(c, manager)) {
1259       dout(10) << "calc_head_subsets " << head << " has prev " << c
1260                << " overlap " << prev << dendl;
1261       clone_subsets[c] = prev;
1262       cloning.union_of(prev);
1263       break;
1264     }
1265     dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1266              << " overlap " << prev << dendl;
1267   }
1268
1269
1270   if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1271     dout(10) << "skipping clone, too many holes" << dendl;
1272     get_parent()->release_locks(manager);
1273     clone_subsets.clear();
1274     cloning.clear();
1275   }
1276
1277   // what's left for us to push?
1278   data_subset.subtract(cloning);
1279
1280   dout(10) << "calc_head_subsets " << head
1281            << "  data_subset " << data_subset
1282            << "  clone_subsets " << clone_subsets << dendl;
1283 }
1284
1285 void ReplicatedBackend::calc_clone_subsets(
1286   SnapSet& snapset, const hobject_t& soid,
1287   const pg_missing_t& missing,
1288   const hobject_t &last_backfill,
1289   interval_set<uint64_t>& data_subset,
1290   map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1291   ObcLockManager &manager)
1292 {
1293   dout(10) << "calc_clone_subsets " << soid
1294            << " clone_overlap " << snapset.clone_overlap << dendl;
1295
1296   uint64_t size = snapset.clone_size[soid.snap];
1297   if (size)
1298     data_subset.insert(0, size);
1299
1300   if (get_parent()->get_pool().allow_incomplete_clones()) {
1301     dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1302     return;
1303   }
1304
1305   if (!cct->_conf->osd_recover_clone_overlap) {
1306     dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1307     return;
1308   }
1309
1310   unsigned i;
1311   for (i=0; i < snapset.clones.size(); i++)
1312     if (snapset.clones[i] == soid.snap)
1313       break;
1314
1315   // any overlap with next older clone?
1316   interval_set<uint64_t> cloning;
1317   interval_set<uint64_t> prev;
1318   if (size)
1319     prev.insert(0, size);
1320   for (int j=i-1; j>=0; j--) {
1321     hobject_t c = soid;
1322     c.snap = snapset.clones[j];
1323     prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1324     if (!missing.is_missing(c) &&
1325         c < last_backfill &&
1326         get_parent()->try_lock_for_read(c, manager)) {
1327       dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1328                << " overlap " << prev << dendl;
1329       clone_subsets[c] = prev;
1330       cloning.union_of(prev);
1331       break;
1332     }
1333     dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1334              << " overlap " << prev << dendl;
1335   }
1336
1337   // overlap with next newest?
1338   interval_set<uint64_t> next;
1339   if (size)
1340     next.insert(0, size);
1341   for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1342     hobject_t c = soid;
1343     c.snap = snapset.clones[j];
1344     next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1345     if (!missing.is_missing(c) &&
1346         c < last_backfill &&
1347         get_parent()->try_lock_for_read(c, manager)) {
1348       dout(10) << "calc_clone_subsets " << soid << " has next " << c
1349                << " overlap " << next << dendl;
1350       clone_subsets[c] = next;
1351       cloning.union_of(next);
1352       break;
1353     }
1354     dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1355              << " overlap " << next << dendl;
1356   }
1357
1358   if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1359     dout(10) << "skipping clone, too many holes" << dendl;
1360     get_parent()->release_locks(manager);
1361     clone_subsets.clear();
1362     cloning.clear();
1363   }
1364
1365
1366   // what's left for us to push?
1367   data_subset.subtract(cloning);
1368
1369   dout(10) << "calc_clone_subsets " << soid
1370            << "  data_subset " << data_subset
1371            << "  clone_subsets " << clone_subsets << dendl;
1372 }
1373
1374 void ReplicatedBackend::prepare_pull(
1375   eversion_t v,
1376   const hobject_t& soid,
1377   ObjectContextRef headctx,
1378   RPGHandle *h)
1379 {
1380   assert(get_parent()->get_local_missing().get_items().count(soid));
1381   eversion_t _v = get_parent()->get_local_missing().get_items().find(
1382     soid)->second.need;
1383   assert(_v == v);
1384   const map<hobject_t, set<pg_shard_t>> &missing_loc(
1385     get_parent()->get_missing_loc_shards());
1386   const map<pg_shard_t, pg_missing_t > &peer_missing(
1387     get_parent()->get_shard_missing());
1388   map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1389   assert(q != missing_loc.end());
1390   assert(!q->second.empty());
1391
1392   // pick a pullee
1393   vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1394   random_shuffle(shuffle.begin(), shuffle.end());
1395   vector<pg_shard_t>::iterator p = shuffle.begin();
1396   assert(get_osdmap()->is_up(p->osd));
1397   pg_shard_t fromshard = *p;
1398
1399   dout(7) << "pull " << soid
1400           << " v " << v
1401           << " on osds " << q->second
1402           << " from osd." << fromshard
1403           << dendl;
1404
1405   assert(peer_missing.count(fromshard));
1406   const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1407   if (pmissing.is_missing(soid, v)) {
1408     assert(pmissing.get_items().find(soid)->second.have != v);
1409     dout(10) << "pulling soid " << soid << " from osd " << fromshard
1410              << " at version " << pmissing.get_items().find(soid)->second.have
1411              << " rather than at version " << v << dendl;
1412     v = pmissing.get_items().find(soid)->second.have;
1413     assert(get_parent()->get_log().get_log().objects.count(soid) &&
1414            (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1415             pg_log_entry_t::LOST_REVERT) &&
1416            (get_parent()->get_log().get_log().objects.find(
1417              soid)->second->reverting_to ==
1418             v));
1419   }
1420
1421   ObjectRecoveryInfo recovery_info;
1422   ObcLockManager lock_manager;
1423
1424   if (soid.is_snap()) {
1425     assert(!get_parent()->get_local_missing().is_missing(
1426              soid.get_head()) ||
1427            !get_parent()->get_local_missing().is_missing(
1428              soid.get_snapdir()));
1429     assert(headctx);
1430     // check snapset
1431     SnapSetContext *ssc = headctx->ssc;
1432     assert(ssc);
1433     dout(10) << " snapset " << ssc->snapset << dendl;
1434     recovery_info.ss = ssc->snapset;
1435     calc_clone_subsets(
1436       ssc->snapset, soid, get_parent()->get_local_missing(),
1437       get_info().last_backfill,
1438       recovery_info.copy_subset,
1439       recovery_info.clone_subset,
1440       lock_manager);
1441     // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1442     dout(10) << " pulling " << recovery_info << dendl;
1443
1444     assert(ssc->snapset.clone_size.count(soid.snap));
1445     recovery_info.size = ssc->snapset.clone_size[soid.snap];
1446   } else {
1447     // pulling head or unversioned object.
1448     // always pull the whole thing.
1449     recovery_info.copy_subset.insert(0, (uint64_t)-1);
1450     recovery_info.size = ((uint64_t)-1);
1451   }
1452
1453   h->pulls[fromshard].push_back(PullOp());
1454   PullOp &op = h->pulls[fromshard].back();
1455   op.soid = soid;
1456
1457   op.recovery_info = recovery_info;
1458   op.recovery_info.soid = soid;
1459   op.recovery_info.version = v;
1460   op.recovery_progress.data_complete = false;
1461   op.recovery_progress.omap_complete = false;
1462   op.recovery_progress.data_recovered_to = 0;
1463   op.recovery_progress.first = true;
1464
1465   assert(!pulling.count(soid));
1466   pull_from_peer[fromshard].insert(soid);
1467   PullInfo &pi = pulling[soid];
1468   pi.from = fromshard;
1469   pi.soid = soid;
1470   pi.head_ctx = headctx;
1471   pi.recovery_info = op.recovery_info;
1472   pi.recovery_progress = op.recovery_progress;
1473   pi.cache_dont_need = h->cache_dont_need;
1474   pi.lock_manager = std::move(lock_manager);
1475 }
1476
1477 /*
1478  * intelligently push an object to a replica.  make use of existing
1479  * clones/heads and dup data ranges where possible.
1480  */
1481 int ReplicatedBackend::prep_push_to_replica(
1482   ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1483   PushOp *pop, bool cache_dont_need)
1484 {
1485   const object_info_t& oi = obc->obs.oi;
1486   uint64_t size = obc->obs.oi.size;
1487
1488   dout(10) << __func__ << ": " << soid << " v" << oi.version
1489            << " size " << size << " to osd." << peer << dendl;
1490
1491   map<hobject_t, interval_set<uint64_t>> clone_subsets;
1492   interval_set<uint64_t> data_subset;
1493
1494   ObcLockManager lock_manager;
1495   // are we doing a clone on the replica?
1496   if (soid.snap && soid.snap < CEPH_NOSNAP) {
1497     hobject_t head = soid;
1498     head.snap = CEPH_NOSNAP;
1499
1500     // try to base push off of clones that succeed/preceed poid
1501     // we need the head (and current SnapSet) locally to do that.
1502     if (get_parent()->get_local_missing().is_missing(head)) {
1503       dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1504       return prep_push(obc, soid, peer, pop, cache_dont_need);
1505     }
1506     hobject_t snapdir = head;
1507     snapdir.snap = CEPH_SNAPDIR;
1508     if (get_parent()->get_local_missing().is_missing(snapdir)) {
1509       dout(15) << "push_to_replica missing snapdir " << snapdir
1510                << ", pushing raw clone" << dendl;
1511       return prep_push(obc, soid, peer, pop, cache_dont_need);
1512     }
1513
1514     SnapSetContext *ssc = obc->ssc;
1515     assert(ssc);
1516     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1517     pop->recovery_info.ss = ssc->snapset;
1518     map<pg_shard_t, pg_missing_t>::const_iterator pm =
1519       get_parent()->get_shard_missing().find(peer);
1520     assert(pm != get_parent()->get_shard_missing().end());
1521     map<pg_shard_t, pg_info_t>::const_iterator pi =
1522       get_parent()->get_shard_info().find(peer);
1523     assert(pi != get_parent()->get_shard_info().end());
1524     calc_clone_subsets(
1525       ssc->snapset, soid,
1526       pm->second,
1527       pi->second.last_backfill,
1528       data_subset, clone_subsets,
1529       lock_manager);
1530   } else if (soid.snap == CEPH_NOSNAP) {
1531     // pushing head or unversioned object.
1532     // base this on partially on replica's clones?
1533     SnapSetContext *ssc = obc->ssc;
1534     assert(ssc);
1535     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1536     calc_head_subsets(
1537       obc,
1538       ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1539       get_parent()->get_shard_info().find(peer)->second.last_backfill,
1540       data_subset, clone_subsets,
1541       lock_manager);
1542   }
1543
1544   return prep_push(
1545     obc,
1546     soid,
1547     peer,
1548     oi.version,
1549     data_subset,
1550     clone_subsets,
1551     pop,
1552     cache_dont_need,
1553     std::move(lock_manager));
1554 }
1555
1556 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1557                              const hobject_t& soid, pg_shard_t peer,
1558                              PushOp *pop, bool cache_dont_need)
1559 {
1560   interval_set<uint64_t> data_subset;
1561   if (obc->obs.oi.size)
1562     data_subset.insert(0, obc->obs.oi.size);
1563   map<hobject_t, interval_set<uint64_t>> clone_subsets;
1564
1565   return prep_push(obc, soid, peer,
1566             obc->obs.oi.version, data_subset, clone_subsets,
1567             pop, cache_dont_need, ObcLockManager());
1568 }
1569
1570 int ReplicatedBackend::prep_push(
1571   ObjectContextRef obc,
1572   const hobject_t& soid, pg_shard_t peer,
1573   eversion_t version,
1574   interval_set<uint64_t> &data_subset,
1575   map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1576   PushOp *pop,
1577   bool cache_dont_need,
1578   ObcLockManager &&lock_manager)
1579 {
1580   get_parent()->begin_peer_recover(peer, soid);
1581   // take note.
1582   PushInfo &pi = pushing[soid][peer];
1583   pi.obc = obc;
1584   pi.recovery_info.size = obc->obs.oi.size;
1585   pi.recovery_info.copy_subset = data_subset;
1586   pi.recovery_info.clone_subset = clone_subsets;
1587   pi.recovery_info.soid = soid;
1588   pi.recovery_info.oi = obc->obs.oi;
1589   pi.recovery_info.ss = pop->recovery_info.ss;
1590   pi.recovery_info.version = version;
1591   pi.lock_manager = std::move(lock_manager);
1592
1593   ObjectRecoveryProgress new_progress;
1594   int r = build_push_op(pi.recovery_info,
1595                         pi.recovery_progress,
1596                         &new_progress,
1597                         pop,
1598                         &(pi.stat), cache_dont_need);
1599   if (r < 0)
1600     return r;
1601   pi.recovery_progress = new_progress;
1602   return 0;
1603 }
1604
1605 void ReplicatedBackend::submit_push_data(
1606   const ObjectRecoveryInfo &recovery_info,
1607   bool first,
1608   bool complete,
1609   bool cache_dont_need,
1610   const interval_set<uint64_t> &intervals_included,
1611   bufferlist data_included,
1612   bufferlist omap_header,
1613   const map<string, bufferlist> &attrs,
1614   const map<string, bufferlist> &omap_entries,
1615   ObjectStore::Transaction *t)
1616 {
1617   hobject_t target_oid;
1618   if (first && complete) {
1619     target_oid = recovery_info.soid;
1620   } else {
1621     target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1622                                                         recovery_info.version);
1623     if (first) {
1624       dout(10) << __func__ << ": Adding oid "
1625                << target_oid << " in the temp collection" << dendl;
1626       add_temp_obj(target_oid);
1627     }
1628   }
1629
1630   if (first) {
1631     t->remove(coll, ghobject_t(target_oid));
1632     t->touch(coll, ghobject_t(target_oid));
1633     t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1634     if (omap_header.length()) 
1635       t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1636
1637     bufferlist bv = attrs.at(OI_ATTR);
1638     object_info_t oi(bv);
1639     t->set_alloc_hint(coll, ghobject_t(target_oid),
1640                       oi.expected_object_size,
1641                       oi.expected_write_size,
1642                       oi.alloc_hint_flags);
1643   }
1644   uint64_t off = 0;
1645   uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1646   if (cache_dont_need)
1647     fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1648   for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1649        p != intervals_included.end();
1650        ++p) {
1651     bufferlist bit;
1652     bit.substr_of(data_included, off, p.get_len());
1653     t->write(coll, ghobject_t(target_oid),
1654              p.get_start(), p.get_len(), bit, fadvise_flags);
1655     off += p.get_len();
1656   }
1657
1658   if (!omap_entries.empty())
1659     t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1660   if (!attrs.empty())
1661     t->setattrs(coll, ghobject_t(target_oid), attrs);
1662
1663   if (complete) {
1664     if (!first) {
1665       dout(10) << __func__ << ": Removing oid "
1666                << target_oid << " from the temp collection" << dendl;
1667       clear_temp_obj(target_oid);
1668       t->remove(coll, ghobject_t(recovery_info.soid));
1669       t->collection_move_rename(coll, ghobject_t(target_oid),
1670                                 coll, ghobject_t(recovery_info.soid));
1671     }
1672
1673     submit_push_complete(recovery_info, t);
1674   }
1675 }
1676
1677 void ReplicatedBackend::submit_push_complete(
1678   const ObjectRecoveryInfo &recovery_info,
1679   ObjectStore::Transaction *t)
1680 {
1681   for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1682          recovery_info.clone_subset.begin();
1683        p != recovery_info.clone_subset.end();
1684        ++p) {
1685     for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1686          q != p->second.end();
1687          ++q) {
1688       dout(15) << " clone_range " << p->first << " "
1689                << q.get_start() << "~" << q.get_len() << dendl;
1690       t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1691                      q.get_start(), q.get_len(), q.get_start());
1692     }
1693   }
1694 }
1695
1696 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1697   const ObjectRecoveryInfo& recovery_info,
1698   SnapSetContext *ssc,
1699   ObcLockManager &manager)
1700 {
1701   if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1702     return recovery_info;
1703   ObjectRecoveryInfo new_info = recovery_info;
1704   new_info.copy_subset.clear();
1705   new_info.clone_subset.clear();
1706   assert(ssc);
1707   get_parent()->release_locks(manager); // might already have locks
1708   calc_clone_subsets(
1709     ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1710     get_info().last_backfill,
1711     new_info.copy_subset, new_info.clone_subset,
1712     manager);
1713   return new_info;
1714 }
1715
1716 bool ReplicatedBackend::handle_pull_response(
1717   pg_shard_t from, const PushOp &pop, PullOp *response,
1718   list<pull_complete_info> *to_continue,
1719   ObjectStore::Transaction *t)
1720 {
1721   interval_set<uint64_t> data_included = pop.data_included;
1722   bufferlist data;
1723   data = pop.data;
1724   dout(10) << "handle_pull_response "
1725            << pop.recovery_info
1726            << pop.after_progress
1727            << " data.size() is " << data.length()
1728            << " data_included: " << data_included
1729            << dendl;
1730   if (pop.version == eversion_t()) {
1731     // replica doesn't have it!
1732     _failed_pull(from, pop.soid);
1733     return false;
1734   }
1735
1736   const hobject_t &hoid = pop.soid;
1737   assert((data_included.empty() && data.length() == 0) ||
1738          (!data_included.empty() && data.length() > 0));
1739
1740   auto piter = pulling.find(hoid);
1741   if (piter == pulling.end()) {
1742     return false;
1743   }
1744
1745   PullInfo &pi = piter->second;
1746   if (pi.recovery_info.size == (uint64_t(-1))) {
1747     pi.recovery_info.size = pop.recovery_info.size;
1748     pi.recovery_info.copy_subset.intersection_of(
1749       pop.recovery_info.copy_subset);
1750   }
1751   // If primary doesn't have object info and didn't know version
1752   if (pi.recovery_info.version == eversion_t()) {
1753     pi.recovery_info.version = pop.version;
1754   }
1755
1756   bool first = pi.recovery_progress.first;
1757   if (first) {
1758     // attrs only reference the origin bufferlist (decode from
1759     // MOSDPGPush message) whose size is much greater than attrs in
1760     // recovery. If obc cache it (get_obc maybe cache the attr), this
1761     // causes the whole origin bufferlist would not be free until obc
1762     // is evicted from obc cache. So rebuild the bufferlists before
1763     // cache it.
1764     auto attrset = pop.attrset;
1765     for (auto& a : attrset) {
1766       a.second.rebuild();
1767     }
1768     pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1769     pi.recovery_info.oi = pi.obc->obs.oi;
1770     pi.recovery_info = recalc_subsets(
1771       pi.recovery_info,
1772       pi.obc->ssc,
1773       pi.lock_manager);
1774   }
1775
1776
1777   interval_set<uint64_t> usable_intervals;
1778   bufferlist usable_data;
1779   trim_pushed_data(pi.recovery_info.copy_subset,
1780                    data_included,
1781                    data,
1782                    &usable_intervals,
1783                    &usable_data);
1784   data_included = usable_intervals;
1785   data.claim(usable_data);
1786
1787
1788   pi.recovery_progress = pop.after_progress;
1789
1790   dout(10) << "new recovery_info " << pi.recovery_info
1791            << ", new progress " << pi.recovery_progress
1792            << dendl;
1793
1794   bool complete = pi.is_complete();
1795
1796   submit_push_data(pi.recovery_info, first,
1797                    complete, pi.cache_dont_need,
1798                    data_included, data,
1799                    pop.omap_header,
1800                    pop.attrset,
1801                    pop.omap_entries,
1802                    t);
1803
1804   pi.stat.num_keys_recovered += pop.omap_entries.size();
1805   pi.stat.num_bytes_recovered += data.length();
1806
1807   if (complete) {
1808     pi.stat.num_objects_recovered++;
1809     clear_pull_from(piter);
1810     to_continue->push_back({hoid, pi.stat});
1811     get_parent()->on_local_recover(
1812       hoid, pi.recovery_info, pi.obc, false, t);
1813     return false;
1814   } else {
1815     response->soid = pop.soid;
1816     response->recovery_info = pi.recovery_info;
1817     response->recovery_progress = pi.recovery_progress;
1818     return true;
1819   }
1820 }
1821
1822 void ReplicatedBackend::handle_push(
1823   pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1824   ObjectStore::Transaction *t)
1825 {
1826   dout(10) << "handle_push "
1827            << pop.recovery_info
1828            << pop.after_progress
1829            << dendl;
1830   bufferlist data;
1831   data = pop.data;
1832   bool first = pop.before_progress.first;
1833   bool complete = pop.after_progress.data_complete &&
1834     pop.after_progress.omap_complete;
1835
1836   response->soid = pop.recovery_info.soid;
1837   submit_push_data(pop.recovery_info,
1838                    first,
1839                    complete,
1840                    true, // must be replicate
1841                    pop.data_included,
1842                    data,
1843                    pop.omap_header,
1844                    pop.attrset,
1845                    pop.omap_entries,
1846                    t);
1847
1848   if (complete)
1849     get_parent()->on_local_recover(
1850       pop.recovery_info.soid,
1851       pop.recovery_info,
1852       ObjectContextRef(), // ok, is replica
1853       false,
1854       t);
1855 }
1856
1857 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1858 {
1859   for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1860        i != pushes.end();
1861        ++i) {
1862     ConnectionRef con = get_parent()->get_con_osd_cluster(
1863       i->first.osd,
1864       get_osdmap()->get_epoch());
1865     if (!con)
1866       continue;
1867     vector<PushOp>::iterator j = i->second.begin();
1868     while (j != i->second.end()) {
1869       uint64_t cost = 0;
1870       uint64_t pushes = 0;
1871       MOSDPGPush *msg = new MOSDPGPush();
1872       msg->from = get_parent()->whoami_shard();
1873       msg->pgid = get_parent()->primary_spg_t();
1874       msg->map_epoch = get_osdmap()->get_epoch();
1875       msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1876       msg->set_priority(prio);
1877       for (;
1878            (j != i->second.end() &&
1879             cost < cct->_conf->osd_max_push_cost &&
1880             pushes < cct->_conf->osd_max_push_objects) ;
1881            ++j) {
1882         dout(20) << __func__ << ": sending push " << *j
1883                  << " to osd." << i->first << dendl;
1884         cost += j->cost(cct);
1885         pushes += 1;
1886         msg->pushes.push_back(*j);
1887       }
1888       msg->set_cost(cost);
1889       get_parent()->send_message_osd_cluster(msg, con);
1890     }
1891   }
1892 }
1893
1894 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1895 {
1896   for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1897        i != pulls.end();
1898        ++i) {
1899     ConnectionRef con = get_parent()->get_con_osd_cluster(
1900       i->first.osd,
1901       get_osdmap()->get_epoch());
1902     if (!con)
1903       continue;
1904     dout(20) << __func__ << ": sending pulls " << i->second
1905              << " to osd." << i->first << dendl;
1906     MOSDPGPull *msg = new MOSDPGPull();
1907     msg->from = parent->whoami_shard();
1908     msg->set_priority(prio);
1909     msg->pgid = get_parent()->primary_spg_t();
1910     msg->map_epoch = get_osdmap()->get_epoch();
1911     msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1912     msg->set_pulls(&i->second);
1913     msg->compute_cost(cct);
1914     get_parent()->send_message_osd_cluster(msg, con);
1915   }
1916 }
1917
1918 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1919                                      const ObjectRecoveryProgress &progress,
1920                                      ObjectRecoveryProgress *out_progress,
1921                                      PushOp *out_op,
1922                                      object_stat_sum_t *stat,
1923                                      bool cache_dont_need)
1924 {
1925   ObjectRecoveryProgress _new_progress;
1926   if (!out_progress)
1927     out_progress = &_new_progress;
1928   ObjectRecoveryProgress &new_progress = *out_progress;
1929   new_progress = progress;
1930
1931   dout(7) << __func__ << " " << recovery_info.soid
1932           << " v " << recovery_info.version
1933           << " size " << recovery_info.size
1934           << " recovery_info: " << recovery_info
1935           << dendl;
1936
1937   eversion_t v  = recovery_info.version;
1938   if (progress.first) {
1939     int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1940     if(r < 0) {
1941       dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl; 
1942       return r;
1943     }
1944     r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1945     if(r < 0) {
1946       dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1947       return r;
1948     }
1949
1950     // Debug
1951     bufferlist bv = out_op->attrset[OI_ATTR];
1952     object_info_t oi;
1953     try {
1954      bufferlist::iterator bliter = bv.begin();
1955      ::decode(oi, bliter);
1956     } catch (...) {
1957       dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1958       return -EINVAL;
1959     }
1960
1961     // If requestor didn't know the version, use ours
1962     if (v == eversion_t()) {
1963       v = oi.version;
1964     } else if (oi.version != v) {
1965       get_parent()->clog_error() << get_info().pgid << " push "
1966                                  << recovery_info.soid << " v "
1967                                  << recovery_info.version
1968                                  << " failed because local copy is "
1969                                  << oi.version;
1970       return -EINVAL;
1971     }
1972
1973     new_progress.first = false;
1974   }
1975   // Once we provide the version subsequent requests will have it, so
1976   // at this point it must be known.
1977   assert(v != eversion_t());
1978
1979   uint64_t available = cct->_conf->osd_recovery_max_chunk;
1980   if (!progress.omap_complete) {
1981     ObjectMap::ObjectMapIterator iter =
1982       store->get_omap_iterator(coll,
1983                                ghobject_t(recovery_info.soid));
1984     assert(iter);
1985     for (iter->lower_bound(progress.omap_recovered_to);
1986          iter->valid();
1987          iter->next(false)) {
1988       if (!out_op->omap_entries.empty() &&
1989           ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1990             out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1991            available <= iter->key().size() + iter->value().length()))
1992         break;
1993       out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1994
1995       if ((iter->key().size() + iter->value().length()) <= available)
1996         available -= (iter->key().size() + iter->value().length());
1997       else
1998         available = 0;
1999     }
2000     if (!iter->valid())
2001       new_progress.omap_complete = true;
2002     else
2003       new_progress.omap_recovered_to = iter->key();
2004   }
2005
2006   if (available > 0) {
2007     if (!recovery_info.copy_subset.empty()) {
2008       interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2009       map<uint64_t, uint64_t> m;
2010       int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2011                             copy_subset.range_end(), m);
2012       if (r >= 0)  {
2013         interval_set<uint64_t> fiemap_included(m);
2014         copy_subset.intersection_of(fiemap_included);
2015       } else {
2016         // intersection of copy_subset and empty interval_set would be empty anyway
2017         copy_subset.clear();
2018       }
2019
2020       out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2021                                     available);
2022       if (out_op->data_included.empty()) // zero filled section, skip to end!
2023         new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2024       else
2025         new_progress.data_recovered_to = out_op->data_included.range_end();
2026     }
2027   } else {
2028     out_op->data_included.clear();
2029   }
2030
2031   for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2032        p != out_op->data_included.end();
2033        ++p) {
2034     bufferlist bit;
2035     int r = store->read(ch, ghobject_t(recovery_info.soid),
2036                 p.get_start(), p.get_len(), bit,
2037                 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2038     if (cct->_conf->osd_debug_random_push_read_error &&
2039         (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2040       dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2041       r = -EIO;
2042     }
2043     if (r < 0) {
2044       return r;
2045     }
2046     if (p.get_len() != bit.length()) {
2047       dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2048                << " is actually " << p.get_start() << "~" << bit.length()
2049                << dendl;
2050       interval_set<uint64_t>::iterator save = p++;
2051       if (bit.length() == 0)
2052         out_op->data_included.erase(save);     //Remove this empty interval
2053       else
2054         save.set_len(bit.length());
2055       // Remove any other intervals present
2056       while (p != out_op->data_included.end()) {
2057         interval_set<uint64_t>::iterator save = p++;
2058         out_op->data_included.erase(save);
2059       }
2060       new_progress.data_complete = true;
2061       out_op->data.claim_append(bit);
2062       break;
2063     }
2064     out_op->data.claim_append(bit);
2065   }
2066
2067   if (new_progress.is_complete(recovery_info)) {
2068     new_progress.data_complete = true;
2069     if (stat)
2070       stat->num_objects_recovered++;
2071   }
2072
2073   if (stat) {
2074     stat->num_keys_recovered += out_op->omap_entries.size();
2075     stat->num_bytes_recovered += out_op->data.length();
2076   }
2077
2078   get_parent()->get_logger()->inc(l_osd_push);
2079   get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2080
2081   // send
2082   out_op->version = v;
2083   out_op->soid = recovery_info.soid;
2084   out_op->recovery_info = recovery_info;
2085   out_op->after_progress = new_progress;
2086   out_op->before_progress = progress;
2087   return 0;
2088 }
2089
2090 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2091 {
2092   op->recovery_info.version = eversion_t();
2093   op->version = eversion_t();
2094   op->soid = soid;
2095 }
2096
2097 bool ReplicatedBackend::handle_push_reply(
2098   pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2099 {
2100   const hobject_t &soid = op.soid;
2101   if (pushing.count(soid) == 0) {
2102     dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2103              << ", or anybody else"
2104              << dendl;
2105     return false;
2106   } else if (pushing[soid].count(peer) == 0) {
2107     dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2108              << dendl;
2109     return false;
2110   } else {
2111     PushInfo *pi = &pushing[soid][peer];
2112     bool error = pushing[soid].begin()->second.recovery_progress.error;
2113
2114     if (!pi->recovery_progress.data_complete && !error) {
2115       dout(10) << " pushing more from, "
2116                << pi->recovery_progress.data_recovered_to
2117                << " of " << pi->recovery_info.copy_subset << dendl;
2118       ObjectRecoveryProgress new_progress;
2119       int r = build_push_op(
2120         pi->recovery_info,
2121         pi->recovery_progress, &new_progress, reply,
2122         &(pi->stat));
2123       // Handle the case of a read error right after we wrote, which is
2124       // hopefuilly extremely rare.
2125       if (r < 0) {
2126         dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2127
2128         error = true;
2129         goto done;
2130       }
2131       pi->recovery_progress = new_progress;
2132       return true;
2133     } else {
2134       // done!
2135 done:
2136       if (!error)
2137         get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2138
2139       get_parent()->release_locks(pi->lock_manager);
2140       object_stat_sum_t stat = pi->stat;
2141       eversion_t v = pi->recovery_info.version;
2142       pushing[soid].erase(peer);
2143       pi = NULL;
2144
2145       if (pushing[soid].empty()) {
2146         if (!error)
2147           get_parent()->on_global_recover(soid, stat, false);
2148         else
2149           get_parent()->on_primary_error(soid, v);
2150         pushing.erase(soid);
2151       } else {
2152         // This looks weird, but we erased the current peer and need to remember
2153         // the error on any other one, while getting more acks.
2154         if (error)
2155           pushing[soid].begin()->second.recovery_progress.error = true;
2156         dout(10) << "pushed " << soid << ", still waiting for push ack from "
2157                  << pushing[soid].size() << " others" << dendl;
2158       }
2159       return false;
2160     }
2161   }
2162 }
2163
2164 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2165 {
2166   const hobject_t &soid = op.soid;
2167   struct stat st;
2168   int r = store->stat(ch, ghobject_t(soid), &st);
2169   if (r != 0) {
2170     get_parent()->clog_error() << get_info().pgid << " "
2171                                << peer << " tried to pull " << soid
2172                                << " but got " << cpp_strerror(-r);
2173     prep_push_op_blank(soid, reply);
2174   } else {
2175     ObjectRecoveryInfo &recovery_info = op.recovery_info;
2176     ObjectRecoveryProgress &progress = op.recovery_progress;
2177     if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2178       // Adjust size and copy_subset
2179       recovery_info.size = st.st_size;
2180       recovery_info.copy_subset.clear();
2181       if (st.st_size)
2182         recovery_info.copy_subset.insert(0, st.st_size);
2183       assert(recovery_info.clone_subset.empty());
2184     }
2185
2186     r = build_push_op(recovery_info, progress, 0, reply);
2187     if (r < 0)
2188       prep_push_op_blank(soid, reply);
2189   }
2190 }
2191
2192 /**
2193  * trim received data to remove what we don't want
2194  *
2195  * @param copy_subset intervals we want
2196  * @param data_included intervals we got
2197  * @param data_recieved data we got
2198  * @param intervals_usable intervals we want to keep
2199  * @param data_usable matching data we want to keep
2200  */
2201 void ReplicatedBackend::trim_pushed_data(
2202   const interval_set<uint64_t> &copy_subset,
2203   const interval_set<uint64_t> &intervals_received,
2204   bufferlist data_received,
2205   interval_set<uint64_t> *intervals_usable,
2206   bufferlist *data_usable)
2207 {
2208   if (intervals_received.subset_of(copy_subset)) {
2209     *intervals_usable = intervals_received;
2210     *data_usable = data_received;
2211     return;
2212   }
2213
2214   intervals_usable->intersection_of(copy_subset,
2215                                     intervals_received);
2216
2217   uint64_t off = 0;
2218   for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2219        p != intervals_received.end();
2220        ++p) {
2221     interval_set<uint64_t> x;
2222     x.insert(p.get_start(), p.get_len());
2223     x.intersection_of(copy_subset);
2224     for (interval_set<uint64_t>::const_iterator q = x.begin();
2225          q != x.end();
2226          ++q) {
2227       bufferlist sub;
2228       uint64_t data_off = off + (q.get_start() - p.get_start());
2229       sub.substr_of(data_received, data_off, q.get_len());
2230       data_usable->claim_append(sub);
2231     }
2232     off += p.get_len();
2233   }
2234 }
2235
2236 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2237 {
2238   dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2239   list<pg_shard_t> fl = { from };
2240   get_parent()->failed_push(fl, soid);
2241
2242   clear_pull(pulling.find(soid));
2243 }
2244
2245 void ReplicatedBackend::clear_pull_from(
2246   map<hobject_t, PullInfo>::iterator piter)
2247 {
2248   auto from = piter->second.from;
2249   pull_from_peer[from].erase(piter->second.soid);
2250   if (pull_from_peer[from].empty())
2251     pull_from_peer.erase(from);
2252 }
2253
2254 void ReplicatedBackend::clear_pull(
2255   map<hobject_t, PullInfo>::iterator piter,
2256   bool clear_pull_from_peer)
2257 {
2258   if (clear_pull_from_peer) {
2259     clear_pull_from(piter);
2260   }
2261   get_parent()->release_locks(piter->second.lock_manager);
2262   pulling.erase(piter);
2263 }
2264
2265 int ReplicatedBackend::start_pushes(
2266   const hobject_t &soid,
2267   ObjectContextRef obc,
2268   RPGHandle *h)
2269 {
2270   list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2271
2272   dout(20) << __func__ << " soid " << soid << dendl;
2273   // who needs it?
2274   assert(get_parent()->get_actingbackfill_shards().size() > 0);
2275   for (set<pg_shard_t>::iterator i =
2276          get_parent()->get_actingbackfill_shards().begin();
2277        i != get_parent()->get_actingbackfill_shards().end();
2278        ++i) {
2279     if (*i == get_parent()->whoami_shard()) continue;
2280     pg_shard_t peer = *i;
2281     map<pg_shard_t, pg_missing_t>::const_iterator j =
2282       get_parent()->get_shard_missing().find(peer);
2283     assert(j != get_parent()->get_shard_missing().end());
2284     if (j->second.is_missing(soid)) {
2285       shards.push_back(j);
2286     }
2287   }
2288
2289   // If more than 1 read will occur ignore possible request to not cache
2290   bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2291
2292   for (auto j : shards) {
2293     pg_shard_t peer = j->first;
2294     h->pushes[peer].push_back(PushOp());
2295     int r = prep_push_to_replica(obc, soid, peer,
2296             &(h->pushes[peer].back()), cache);
2297     if (r < 0) {
2298       // Back out all failed reads
2299       for (auto k : shards) {
2300         pg_shard_t p = k->first;
2301         dout(10) << __func__ << " clean up peer " << p << dendl;
2302         h->pushes[p].pop_back();
2303         if (p == peer) break;
2304       }
2305       return r;
2306     }
2307   }
2308   return shards.size();
2309 }