Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / ECBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <iostream>
16 #include <sstream>
17
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
26
27 #include "PrimaryLogPG.h"
28
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
32 #undef dout_prefix
33 #define dout_prefix _prefix(_dout, this)
34 static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35   return *_dout << pgb->get_parent()->gen_dbg_prefix();
36 }
37
38 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39   list<ECBackend::RecoveryOp> ops;
40 };
41
42 ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43   switch (rhs.pipeline_state) {
44   case ECBackend::pipeline_state_t::CACHE_VALID:
45     return lhs << "CACHE_VALID";
46   case ECBackend::pipeline_state_t::CACHE_INVALID:
47     return lhs << "CACHE_INVALID";
48   default:
49     assert(0 == "invalid pipeline state");
50   }
51   return lhs; // unreachable
52 }
53
54 static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55 {
56   lhs << "[";
57   for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58        i != rhs.end();
59        ++i) {
60     if (i != rhs.begin())
61       lhs << ", ";
62     lhs << make_pair(i->first, i->second.length());
63   }
64   return lhs << "]";
65 }
66
67 static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68 {
69   lhs << "[";
70   for (map<int, bufferlist>::const_iterator i = rhs.begin();
71        i != rhs.end();
72        ++i) {
73     if (i != rhs.begin())
74       lhs << ", ";
75     lhs << make_pair(i->first, i->second.length());
76   }
77   return lhs << "]";
78 }
79
80 static ostream &operator<<(
81   ostream &lhs,
82   const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83 {
84   return lhs << "(" << rhs.get<0>() << ", "
85              << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86 }
87
88 ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89 {
90   return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91              << ", need=" << rhs.need
92              << ", want_attrs=" << rhs.want_attrs
93              << ")";
94 }
95
96 ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97 {
98   lhs << "read_result_t(r=" << rhs.r
99       << ", errors=" << rhs.errors;
100   if (rhs.attrs) {
101     lhs << ", attrs=" << rhs.attrs.get();
102   } else {
103     lhs << ", noattrs";
104   }
105   return lhs << ", returned=" << rhs.returned << ")";
106 }
107
108 ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109 {
110   lhs << "ReadOp(tid=" << rhs.tid;
111   if (rhs.op && rhs.op->get_req()) {
112     lhs << ", op=";
113     rhs.op->get_req()->print(lhs);
114   }
115   return lhs << ", to_read=" << rhs.to_read
116              << ", complete=" << rhs.complete
117              << ", priority=" << rhs.priority
118              << ", obj_to_source=" << rhs.obj_to_source
119              << ", source_to_obj=" << rhs.source_to_obj
120              << ", in_progress=" << rhs.in_progress << ")";
121 }
122
123 void ECBackend::ReadOp::dump(Formatter *f) const
124 {
125   f->dump_unsigned("tid", tid);
126   if (op && op->get_req()) {
127     f->dump_stream("op") << *(op->get_req());
128   }
129   f->dump_stream("to_read") << to_read;
130   f->dump_stream("complete") << complete;
131   f->dump_int("priority", priority);
132   f->dump_stream("obj_to_source") << obj_to_source;
133   f->dump_stream("source_to_obj") << source_to_obj;
134   f->dump_stream("in_progress") << in_progress;
135 }
136
137 ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138 {
139   lhs << "Op(" << rhs.hoid
140       << " v=" << rhs.version
141       << " tt=" << rhs.trim_to
142       << " tid=" << rhs.tid
143       << " reqid=" << rhs.reqid;
144   if (rhs.client_op && rhs.client_op->get_req()) {
145     lhs << " client_op=";
146     rhs.client_op->get_req()->print(lhs);
147   }
148   lhs << " roll_forward_to=" << rhs.roll_forward_to
149       << " temp_added=" << rhs.temp_added
150       << " temp_cleared=" << rhs.temp_cleared
151       << " pending_read=" << rhs.pending_read
152       << " remote_read=" << rhs.remote_read
153       << " remote_read_result=" << rhs.remote_read_result
154       << " pending_apply=" << rhs.pending_apply
155       << " pending_commit=" << rhs.pending_commit
156       << " plan.to_read=" << rhs.plan.to_read
157       << " plan.will_write=" << rhs.plan.will_write
158       << ")";
159   return lhs;
160 }
161
162 ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163 {
164   return lhs << "RecoveryOp("
165              << "hoid=" << rhs.hoid
166              << " v=" << rhs.v
167              << " missing_on=" << rhs.missing_on
168              << " missing_on_shards=" << rhs.missing_on_shards
169              << " recovery_info=" << rhs.recovery_info
170              << " recovery_progress=" << rhs.recovery_progress
171              << " obc refcount=" << rhs.obc.use_count()
172              << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173              << " waiting_on_pushes=" << rhs.waiting_on_pushes
174              << " extent_requested=" << rhs.extent_requested
175              << ")";
176 }
177
178 void ECBackend::RecoveryOp::dump(Formatter *f) const
179 {
180   f->dump_stream("hoid") << hoid;
181   f->dump_stream("v") << v;
182   f->dump_stream("missing_on") << missing_on;
183   f->dump_stream("missing_on_shards") << missing_on_shards;
184   f->dump_stream("recovery_info") << recovery_info;
185   f->dump_stream("recovery_progress") << recovery_progress;
186   f->dump_stream("state") << tostr(state);
187   f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188   f->dump_stream("extent_requested") << extent_requested;
189 }
190
191 ECBackend::ECBackend(
192   PGBackend::Listener *pg,
193   coll_t coll,
194   ObjectStore::CollectionHandle &ch,
195   ObjectStore *store,
196   CephContext *cct,
197   ErasureCodeInterfaceRef ec_impl,
198   uint64_t stripe_width)
199   : PGBackend(cct, pg, store, coll, ch),
200     ec_impl(ec_impl),
201     sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202   assert((ec_impl->get_data_chunk_count() *
203           ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204 }
205
206 PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207 {
208   return new ECRecoveryHandle;
209 }
210
211 void ECBackend::_failed_push(const hobject_t &hoid,
212   pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213 {
214   ECBackend::read_result_t &res = in.second;
215   dout(10) << __func__ << ": Read error " << hoid << " r="
216            << res.r << " errors=" << res.errors << dendl;
217   dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218            << dendl;
219   assert(recovery_ops.count(hoid));
220   recovery_ops.erase(hoid);
221
222   list<pg_shard_t> fl;
223   for (auto&& i : res.errors) {
224     fl.push_back(i.first);
225   }
226   get_parent()->failed_push(fl, hoid);
227 }
228
229 struct OnRecoveryReadComplete :
230   public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
231   ECBackend *pg;
232   hobject_t hoid;
233   set<int> want;
234   OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
235     : pg(pg), hoid(hoid) {}
236   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
237     ECBackend::read_result_t &res = in.second;
238     if (!(res.r == 0 && res.errors.empty())) {
239         pg->_failed_push(hoid, in);
240         return;
241     }
242     assert(res.returned.size() == 1);
243     pg->handle_recovery_read_complete(
244       hoid,
245       res.returned.back(),
246       res.attrs,
247       in.first);
248   }
249 };
250
251 struct RecoveryMessages {
252   map<hobject_t,
253       ECBackend::read_request_t> reads;
254   void read(
255     ECBackend *ec,
256     const hobject_t &hoid, uint64_t off, uint64_t len,
257     const set<pg_shard_t> &need,
258     bool attrs) {
259     list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
260     to_read.push_back(boost::make_tuple(off, len, 0));
261     assert(!reads.count(hoid));
262     reads.insert(
263       make_pair(
264         hoid,
265         ECBackend::read_request_t(
266           to_read,
267           need,
268           attrs,
269           new OnRecoveryReadComplete(
270             ec,
271             hoid))));
272   }
273
274   map<pg_shard_t, vector<PushOp> > pushes;
275   map<pg_shard_t, vector<PushReplyOp> > push_replies;
276   ObjectStore::Transaction t;
277   RecoveryMessages() {}
278   ~RecoveryMessages(){}
279 };
280
281 void ECBackend::handle_recovery_push(
282   const PushOp &op,
283   RecoveryMessages *m)
284 {
285   ostringstream ss;
286   if (get_parent()->check_failsafe_full(ss)) {
287     dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
288     ceph_abort();
289   }
290
291   bool oneshot = op.before_progress.first && op.after_progress.data_complete;
292   ghobject_t tobj;
293   if (oneshot) {
294     tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
295                       get_parent()->whoami_shard().shard);
296   } else {
297     tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
298                                                              op.version),
299                       ghobject_t::NO_GEN,
300                       get_parent()->whoami_shard().shard);
301     if (op.before_progress.first) {
302       dout(10) << __func__ << ": Adding oid "
303                << tobj.hobj << " in the temp collection" << dendl;
304       add_temp_obj(tobj.hobj);
305     }
306   }
307
308   if (op.before_progress.first) {
309     m->t.remove(coll, tobj);
310     m->t.touch(coll, tobj);
311   }
312
313   if (!op.data_included.empty()) {
314     uint64_t start = op.data_included.range_start();
315     uint64_t end = op.data_included.range_end();
316     assert(op.data.length() == (end - start));
317
318     m->t.write(
319       coll,
320       tobj,
321       start,
322       op.data.length(),
323       op.data);
324   } else {
325     assert(op.data.length() == 0);
326   }
327
328   if (op.before_progress.first) {
329     assert(op.attrset.count(string("_")));
330     m->t.setattrs(
331       coll,
332       tobj,
333       op.attrset);
334   }
335
336   if (op.after_progress.data_complete && !oneshot) {
337     dout(10) << __func__ << ": Removing oid "
338              << tobj.hobj << " from the temp collection" << dendl;
339     clear_temp_obj(tobj.hobj);
340     m->t.remove(coll, ghobject_t(
341         op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
342     m->t.collection_move_rename(
343       coll, tobj,
344       coll, ghobject_t(
345         op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
346   }
347   if (op.after_progress.data_complete) {
348     if ((get_parent()->pgb_is_primary())) {
349       assert(recovery_ops.count(op.soid));
350       assert(recovery_ops[op.soid].obc);
351       get_parent()->on_local_recover(
352         op.soid,
353         op.recovery_info,
354         recovery_ops[op.soid].obc,
355         false,
356         &m->t);
357     } else {
358       get_parent()->on_local_recover(
359         op.soid,
360         op.recovery_info,
361         ObjectContextRef(),
362         false,
363         &m->t);
364     }
365   }
366   m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
367   m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
368 }
369
370 void ECBackend::handle_recovery_push_reply(
371   const PushReplyOp &op,
372   pg_shard_t from,
373   RecoveryMessages *m)
374 {
375   if (!recovery_ops.count(op.soid))
376     return;
377   RecoveryOp &rop = recovery_ops[op.soid];
378   assert(rop.waiting_on_pushes.count(from));
379   rop.waiting_on_pushes.erase(from);
380   continue_recovery_op(rop, m);
381 }
382
383 void ECBackend::handle_recovery_read_complete(
384   const hobject_t &hoid,
385   boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
386   boost::optional<map<string, bufferlist> > attrs,
387   RecoveryMessages *m)
388 {
389   dout(10) << __func__ << ": returned " << hoid << " "
390            << "(" << to_read.get<0>()
391            << ", " << to_read.get<1>()
392            << ", " << to_read.get<2>()
393            << ")"
394            << dendl;
395   assert(recovery_ops.count(hoid));
396   RecoveryOp &op = recovery_ops[hoid];
397   assert(op.returned_data.empty());
398   map<int, bufferlist*> target;
399   for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
400        i != op.missing_on_shards.end();
401        ++i) {
402     target[*i] = &(op.returned_data[*i]);
403   }
404   map<int, bufferlist> from;
405   for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
406       i != to_read.get<2>().end();
407       ++i) {
408     from[i->first.shard].claim(i->second);
409   }
410   dout(10) << __func__ << ": " << from << dendl;
411   int r = ECUtil::decode(sinfo, ec_impl, from, target);
412   assert(r == 0);
413   if (attrs) {
414     op.xattrs.swap(*attrs);
415
416     if (!op.obc) {
417       // attrs only reference the origin bufferlist (decode from
418       // ECSubReadReply message) whose size is much greater than attrs
419       // in recovery. If obc cache it (get_obc maybe cache the attr),
420       // this causes the whole origin bufferlist would not be free
421       // until obc is evicted from obc cache. So rebuild the
422       // bufferlist before cache it.
423       for (map<string, bufferlist>::iterator it = op.xattrs.begin();
424            it != op.xattrs.end();
425            ++it) {
426         it->second.rebuild();
427       }
428       // Need to remove ECUtil::get_hinfo_key() since it should not leak out
429       // of the backend (see bug #12983)
430       map<string, bufferlist> sanitized_attrs(op.xattrs);
431       sanitized_attrs.erase(ECUtil::get_hinfo_key());
432       op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
433       assert(op.obc);
434       op.recovery_info.size = op.obc->obs.oi.size;
435       op.recovery_info.oi = op.obc->obs.oi;
436     }
437
438     ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
439     if (op.obc->obs.oi.size > 0) {
440       assert(op.xattrs.count(ECUtil::get_hinfo_key()));
441       bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
442       ::decode(hinfo, bp);
443     }
444     op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
445   }
446   assert(op.xattrs.size());
447   assert(op.obc);
448   continue_recovery_op(op, m);
449 }
450
451 struct SendPushReplies : public Context {
452   PGBackend::Listener *l;
453   epoch_t epoch;
454   map<int, MOSDPGPushReply*> replies;
455   SendPushReplies(
456     PGBackend::Listener *l,
457     epoch_t epoch,
458     map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
459     replies.swap(in);
460   }
461   void finish(int) override {
462     for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
463          i != replies.end();
464          ++i) {
465       l->send_message_osd_cluster(i->first, i->second, epoch);
466     }
467     replies.clear();
468   }
469   ~SendPushReplies() override {
470     for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
471          i != replies.end();
472          ++i) {
473       i->second->put();
474     }
475     replies.clear();
476   }
477 };
478
479 void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
480 {
481   for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
482        i != m.pushes.end();
483        m.pushes.erase(i++)) {
484     MOSDPGPush *msg = new MOSDPGPush();
485     msg->set_priority(priority);
486     msg->map_epoch = get_parent()->get_epoch();
487     msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
488     msg->from = get_parent()->whoami_shard();
489     msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
490     msg->pushes.swap(i->second);
491     msg->compute_cost(cct);
492     get_parent()->send_message(
493       i->first.osd,
494       msg);
495   }
496   map<int, MOSDPGPushReply*> replies;
497   for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
498          m.push_replies.begin();
499        i != m.push_replies.end();
500        m.push_replies.erase(i++)) {
501     MOSDPGPushReply *msg = new MOSDPGPushReply();
502     msg->set_priority(priority);
503     msg->map_epoch = get_parent()->get_epoch();
504     msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
505     msg->from = get_parent()->whoami_shard();
506     msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
507     msg->replies.swap(i->second);
508     msg->compute_cost(cct);
509     replies.insert(make_pair(i->first.osd, msg));
510   }
511
512   if (!replies.empty()) {
513     (m.t).register_on_complete(
514         get_parent()->bless_context(
515           new SendPushReplies(
516             get_parent(),
517             get_parent()->get_epoch(),
518             replies)));
519     get_parent()->queue_transaction(std::move(m.t));
520   } 
521
522   if (m.reads.empty())
523     return;
524   start_read_op(
525     priority,
526     m.reads,
527     OpRequestRef(),
528     false, true);
529 }
530
531 void ECBackend::continue_recovery_op(
532   RecoveryOp &op,
533   RecoveryMessages *m)
534 {
535   dout(10) << __func__ << ": continuing " << op << dendl;
536   while (1) {
537     switch (op.state) {
538     case RecoveryOp::IDLE: {
539       // start read
540       op.state = RecoveryOp::READING;
541       assert(!op.recovery_progress.data_complete);
542       set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
543       uint64_t from = op.recovery_progress.data_recovered_to;
544       uint64_t amount = get_recovery_chunk_size();
545
546       if (op.recovery_progress.first && op.obc) {
547         /* We've got the attrs and the hinfo, might as well use them */
548         op.hinfo = get_hash_info(op.hoid);
549         assert(op.hinfo);
550         op.xattrs = op.obc->attr_cache;
551         ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
552       }
553
554       set<pg_shard_t> to_read;
555       int r = get_min_avail_to_read_shards(
556         op.hoid, want, true, false, &to_read);
557       if (r != 0) {
558         // we must have lost a recovery source
559         assert(!op.recovery_progress.first);
560         dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
561                  << dendl;
562         get_parent()->cancel_pull(op.hoid);
563         recovery_ops.erase(op.hoid);
564         return;
565       }
566       m->read(
567         this,
568         op.hoid,
569         op.recovery_progress.data_recovered_to,
570         amount,
571         to_read,
572         op.recovery_progress.first && !op.obc);
573       op.extent_requested = make_pair(
574         from,
575         amount);
576       dout(10) << __func__ << ": IDLE return " << op << dendl;
577       return;
578     }
579     case RecoveryOp::READING: {
580       // read completed, start write
581       assert(op.xattrs.size());
582       assert(op.returned_data.size());
583       op.state = RecoveryOp::WRITING;
584       ObjectRecoveryProgress after_progress = op.recovery_progress;
585       after_progress.data_recovered_to += op.extent_requested.second;
586       after_progress.first = false;
587       if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
588         after_progress.data_recovered_to =
589           sinfo.logical_to_next_stripe_offset(
590             op.obc->obs.oi.size);
591         after_progress.data_complete = true;
592       }
593       for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
594            mi != op.missing_on.end();
595            ++mi) {
596         assert(op.returned_data.count(mi->shard));
597         m->pushes[*mi].push_back(PushOp());
598         PushOp &pop = m->pushes[*mi].back();
599         pop.soid = op.hoid;
600         pop.version = op.v;
601         pop.data = op.returned_data[mi->shard];
602         dout(10) << __func__ << ": before_progress=" << op.recovery_progress
603                  << ", after_progress=" << after_progress
604                  << ", pop.data.length()=" << pop.data.length()
605                  << ", size=" << op.obc->obs.oi.size << dendl;
606         assert(
607           pop.data.length() ==
608           sinfo.aligned_logical_offset_to_chunk_offset(
609             after_progress.data_recovered_to -
610             op.recovery_progress.data_recovered_to)
611           );
612         if (pop.data.length())
613           pop.data_included.insert(
614             sinfo.aligned_logical_offset_to_chunk_offset(
615               op.recovery_progress.data_recovered_to),
616             pop.data.length()
617             );
618         if (op.recovery_progress.first) {
619           pop.attrset = op.xattrs;
620         }
621         pop.recovery_info = op.recovery_info;
622         pop.before_progress = op.recovery_progress;
623         pop.after_progress = after_progress;
624         if (*mi != get_parent()->primary_shard())
625           get_parent()->begin_peer_recover(
626             *mi,
627             op.hoid);
628       }
629       op.returned_data.clear();
630       op.waiting_on_pushes = op.missing_on;
631       op.recovery_progress = after_progress;
632       dout(10) << __func__ << ": READING return " << op << dendl;
633       return;
634     }
635     case RecoveryOp::WRITING: {
636       if (op.waiting_on_pushes.empty()) {
637         if (op.recovery_progress.data_complete) {
638           op.state = RecoveryOp::COMPLETE;
639           for (set<pg_shard_t>::iterator i = op.missing_on.begin();
640                i != op.missing_on.end();
641                ++i) {
642             if (*i != get_parent()->primary_shard()) {
643               dout(10) << __func__ << ": on_peer_recover on " << *i
644                        << ", obj " << op.hoid << dendl;
645               get_parent()->on_peer_recover(
646                 *i,
647                 op.hoid,
648                 op.recovery_info);
649             }
650           }
651           object_stat_sum_t stat;
652           stat.num_bytes_recovered = op.recovery_info.size;
653           stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
654           stat.num_objects_recovered = 1;
655           get_parent()->on_global_recover(op.hoid, stat, false);
656           dout(10) << __func__ << ": WRITING return " << op << dendl;
657           recovery_ops.erase(op.hoid);
658           return;
659         } else {
660           op.state = RecoveryOp::IDLE;
661           dout(10) << __func__ << ": WRITING continue " << op << dendl;
662           continue;
663         }
664       }
665       return;
666     }
667     // should never be called once complete
668     case RecoveryOp::COMPLETE:
669     default: {
670       ceph_abort();
671     };
672     }
673   }
674 }
675
676 void ECBackend::run_recovery_op(
677   RecoveryHandle *_h,
678   int priority)
679 {
680   ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
681   RecoveryMessages m;
682   for (list<RecoveryOp>::iterator i = h->ops.begin();
683        i != h->ops.end();
684        ++i) {
685     dout(10) << __func__ << ": starting " << *i << dendl;
686     assert(!recovery_ops.count(i->hoid));
687     RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
688     continue_recovery_op(op, &m);
689   }
690
691   dispatch_recovery_messages(m, priority);
692   send_recovery_deletes(priority, h->deletes);
693   delete _h;
694 }
695
696 int ECBackend::recover_object(
697   const hobject_t &hoid,
698   eversion_t v,
699   ObjectContextRef head,
700   ObjectContextRef obc,
701   RecoveryHandle *_h)
702 {
703   ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
704   h->ops.push_back(RecoveryOp());
705   h->ops.back().v = v;
706   h->ops.back().hoid = hoid;
707   h->ops.back().obc = obc;
708   h->ops.back().recovery_info.soid = hoid;
709   h->ops.back().recovery_info.version = v;
710   if (obc) {
711     h->ops.back().recovery_info.size = obc->obs.oi.size;
712     h->ops.back().recovery_info.oi = obc->obs.oi;
713   }
714   if (hoid.is_snap()) {
715     if (obc) {
716       assert(obc->ssc);
717       h->ops.back().recovery_info.ss = obc->ssc->snapset;
718     } else if (head) {
719       assert(head->ssc);
720       h->ops.back().recovery_info.ss = head->ssc->snapset;
721     } else {
722       assert(0 == "neither obc nor head set for a snap object");
723     }
724   }
725   h->ops.back().recovery_progress.omap_complete = true;
726   for (set<pg_shard_t>::const_iterator i =
727          get_parent()->get_actingbackfill_shards().begin();
728        i != get_parent()->get_actingbackfill_shards().end();
729        ++i) {
730     dout(10) << "checking " << *i << dendl;
731     if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
732       h->ops.back().missing_on.insert(*i);
733       h->ops.back().missing_on_shards.insert(i->shard);
734     }
735   }
736   dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
737   return 0;
738 }
739
740 bool ECBackend::can_handle_while_inactive(
741   OpRequestRef _op)
742 {
743   return false;
744 }
745
746 bool ECBackend::_handle_message(
747   OpRequestRef _op)
748 {
749   dout(10) << __func__ << ": " << *_op->get_req() << dendl;
750   int priority = _op->get_req()->get_priority();
751   switch (_op->get_req()->get_type()) {
752   case MSG_OSD_EC_WRITE: {
753     // NOTE: this is non-const because handle_sub_write modifies the embedded
754     // ObjectStore::Transaction in place (and then std::move's it).  It does
755     // not conflict with ECSubWrite's operator<<.
756     MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
757       _op->get_nonconst_req());
758     handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
759     return true;
760   }
761   case MSG_OSD_EC_WRITE_REPLY: {
762     const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
763       _op->get_req());
764     handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
765     return true;
766   }
767   case MSG_OSD_EC_READ: {
768     const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
769     MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
770     reply->pgid = get_parent()->primary_spg_t();
771     reply->map_epoch = get_parent()->get_epoch();
772     reply->min_epoch = get_parent()->get_interval_start_epoch();
773     handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
774     reply->trace = _op->pg_trace;
775     get_parent()->send_message_osd_cluster(
776       op->op.from.osd, reply, get_parent()->get_epoch());
777     return true;
778   }
779   case MSG_OSD_EC_READ_REPLY: {
780     // NOTE: this is non-const because handle_sub_read_reply steals resulting
781     // buffers.  It does not conflict with ECSubReadReply operator<<.
782     MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
783       _op->get_nonconst_req());
784     RecoveryMessages rm;
785     handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
786     dispatch_recovery_messages(rm, priority);
787     return true;
788   }
789   case MSG_OSD_PG_PUSH: {
790     const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
791     RecoveryMessages rm;
792     for (vector<PushOp>::const_iterator i = op->pushes.begin();
793          i != op->pushes.end();
794          ++i) {
795       handle_recovery_push(*i, &rm);
796     }
797     dispatch_recovery_messages(rm, priority);
798     return true;
799   }
800   case MSG_OSD_PG_PUSH_REPLY: {
801     const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
802       _op->get_req());
803     RecoveryMessages rm;
804     for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
805          i != op->replies.end();
806          ++i) {
807       handle_recovery_push_reply(*i, op->from, &rm);
808     }
809     dispatch_recovery_messages(rm, priority);
810     return true;
811   }
812   default:
813     return false;
814   }
815   return false;
816 }
817
818 struct SubWriteCommitted : public Context {
819   ECBackend *pg;
820   OpRequestRef msg;
821   ceph_tid_t tid;
822   eversion_t version;
823   eversion_t last_complete;
824   const ZTracer::Trace trace;
825   SubWriteCommitted(
826     ECBackend *pg,
827     OpRequestRef msg,
828     ceph_tid_t tid,
829     eversion_t version,
830     eversion_t last_complete,
831     const ZTracer::Trace &trace)
832     : pg(pg), msg(msg), tid(tid),
833       version(version), last_complete(last_complete), trace(trace) {}
834   void finish(int) override {
835     if (msg)
836       msg->mark_event("sub_op_committed");
837     pg->sub_write_committed(tid, version, last_complete, trace);
838   }
839 };
840 void ECBackend::sub_write_committed(
841   ceph_tid_t tid, eversion_t version, eversion_t last_complete,
842   const ZTracer::Trace &trace) {
843   if (get_parent()->pgb_is_primary()) {
844     ECSubWriteReply reply;
845     reply.tid = tid;
846     reply.last_complete = last_complete;
847     reply.committed = true;
848     reply.from = get_parent()->whoami_shard();
849     handle_sub_write_reply(
850       get_parent()->whoami_shard(),
851       reply, trace);
852   } else {
853     get_parent()->update_last_complete_ondisk(last_complete);
854     MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
855     r->pgid = get_parent()->primary_spg_t();
856     r->map_epoch = get_parent()->get_epoch();
857     r->min_epoch = get_parent()->get_interval_start_epoch();
858     r->op.tid = tid;
859     r->op.last_complete = last_complete;
860     r->op.committed = true;
861     r->op.from = get_parent()->whoami_shard();
862     r->set_priority(CEPH_MSG_PRIO_HIGH);
863     r->trace = trace;
864     r->trace.event("sending sub op commit");
865     get_parent()->send_message_osd_cluster(
866       get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
867   }
868 }
869
870 struct SubWriteApplied : public Context {
871   ECBackend *pg;
872   OpRequestRef msg;
873   ceph_tid_t tid;
874   eversion_t version;
875   const ZTracer::Trace trace;
876   SubWriteApplied(
877     ECBackend *pg,
878     OpRequestRef msg,
879     ceph_tid_t tid,
880     eversion_t version,
881     const ZTracer::Trace &trace)
882     : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
883   void finish(int) override {
884     if (msg)
885       msg->mark_event("sub_op_applied");
886     pg->sub_write_applied(tid, version, trace);
887   }
888 };
889 void ECBackend::sub_write_applied(
890   ceph_tid_t tid, eversion_t version,
891   const ZTracer::Trace &trace) {
892   parent->op_applied(version);
893   if (get_parent()->pgb_is_primary()) {
894     ECSubWriteReply reply;
895     reply.from = get_parent()->whoami_shard();
896     reply.tid = tid;
897     reply.applied = true;
898     handle_sub_write_reply(
899       get_parent()->whoami_shard(),
900       reply, trace);
901   } else {
902     MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
903     r->pgid = get_parent()->primary_spg_t();
904     r->map_epoch = get_parent()->get_epoch();
905     r->min_epoch = get_parent()->get_interval_start_epoch();
906     r->op.from = get_parent()->whoami_shard();
907     r->op.tid = tid;
908     r->op.applied = true;
909     r->set_priority(CEPH_MSG_PRIO_HIGH);
910     r->trace = trace;
911     r->trace.event("sending sub op apply");
912     get_parent()->send_message_osd_cluster(
913       get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
914   }
915 }
916
917 void ECBackend::handle_sub_write(
918   pg_shard_t from,
919   OpRequestRef msg,
920   ECSubWrite &op,
921   const ZTracer::Trace &trace,
922   Context *on_local_applied_sync)
923 {
924   if (msg)
925     msg->mark_started();
926   trace.event("handle_sub_write");
927   assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
928   if (!get_parent()->pgb_is_primary())
929     get_parent()->update_stats(op.stats);
930   ObjectStore::Transaction localt;
931   if (!op.temp_added.empty()) {
932     add_temp_objs(op.temp_added);
933   }
934   if (op.backfill) {
935     for (set<hobject_t>::iterator i = op.temp_removed.begin();
936          i != op.temp_removed.end();
937          ++i) {
938       dout(10) << __func__ << ": removing object " << *i
939                << " since we won't get the transaction" << dendl;
940       localt.remove(
941         coll,
942         ghobject_t(
943           *i,
944           ghobject_t::NO_GEN,
945           get_parent()->whoami_shard().shard));
946     }
947   }
948   clear_temp_objs(op.temp_removed);
949   get_parent()->log_operation(
950     op.log_entries,
951     op.updated_hit_set_history,
952     op.trim_to,
953     op.roll_forward_to,
954     !op.backfill,
955     localt);
956
957   PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
958   if (_rPG && !_rPG->is_undersized() &&
959       (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
960     op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
961
962   if (on_local_applied_sync) {
963     dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
964     localt.register_on_applied_sync(on_local_applied_sync);
965   }
966   localt.register_on_commit(
967     get_parent()->bless_context(
968       new SubWriteCommitted(
969         this, msg, op.tid,
970         op.at_version,
971         get_parent()->get_info().last_complete, trace)));
972   localt.register_on_applied(
973     get_parent()->bless_context(
974       new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
975   vector<ObjectStore::Transaction> tls;
976   tls.reserve(2);
977   tls.push_back(std::move(op.t));
978   tls.push_back(std::move(localt));
979   get_parent()->queue_transactions(tls, msg);
980 }
981
982 void ECBackend::handle_sub_read(
983   pg_shard_t from,
984   const ECSubRead &op,
985   ECSubReadReply *reply,
986   const ZTracer::Trace &trace)
987 {
988   trace.event("handle sub read");
989   shard_id_t shard = get_parent()->whoami_shard().shard;
990   for(auto i = op.to_read.begin();
991       i != op.to_read.end();
992       ++i) {
993     int r = 0;
994     ECUtil::HashInfoRef hinfo;
995     if (!get_parent()->get_pool().allows_ecoverwrites()) {
996       hinfo = get_hash_info(i->first);
997       if (!hinfo) {
998         r = -EIO;
999         get_parent()->clog_error() << "Corruption detected: object " << i->first
1000                                    << " is missing hash_info";
1001         dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1002         goto error;
1003       }
1004     }
1005     for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1006       bufferlist bl;
1007       r = store->read(
1008         ch,
1009         ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1010         j->get<0>(),
1011         j->get<1>(),
1012         bl, j->get<2>());
1013       if (r < 0) {
1014         get_parent()->clog_error() << "Error " << r
1015                                    << " reading object "
1016                                    << i->first;
1017         dout(5) << __func__ << ": Error " << r
1018                 << " reading " << i->first << dendl;
1019         goto error;
1020       } else {
1021         dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1022         reply->buffers_read[i->first].push_back(
1023           make_pair(
1024             j->get<0>(),
1025             bl)
1026           );
1027       }
1028
1029       if (!get_parent()->get_pool().allows_ecoverwrites()) {
1030         // This shows that we still need deep scrub because large enough files
1031         // are read in sections, so the digest check here won't be done here.
1032         // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
1033         // the state of our chunk in case other chunks could substitute.
1034         assert(hinfo->has_chunk_hash());
1035         if ((bl.length() == hinfo->get_total_chunk_size()) &&
1036             (j->get<0>() == 0)) {
1037           dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1038           bufferhash h(-1);
1039           h << bl;
1040           if (h.digest() != hinfo->get_chunk_hash(shard)) {
1041             get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
1042                                        << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1043             dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1044                     << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1045             r = -EIO;
1046             goto error;
1047           }
1048         }
1049       }
1050     }
1051     continue;
1052 error:
1053     // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
1054     // the state of our chunk in case other chunks could substitute.
1055     reply->buffers_read.erase(i->first);
1056     reply->errors[i->first] = r;
1057   }
1058   for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1059        i != op.attrs_to_read.end();
1060        ++i) {
1061     dout(10) << __func__ << ": fulfilling attr request on "
1062              << *i << dendl;
1063     if (reply->errors.count(*i))
1064       continue;
1065     int r = store->getattrs(
1066       ch,
1067       ghobject_t(
1068         *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1069       reply->attrs_read[*i]);
1070     if (r < 0) {
1071       reply->buffers_read.erase(*i);
1072       reply->errors[*i] = r;
1073     }
1074   }
1075   reply->from = get_parent()->whoami_shard();
1076   reply->tid = op.tid;
1077 }
1078
1079 void ECBackend::handle_sub_write_reply(
1080   pg_shard_t from,
1081   const ECSubWriteReply &op,
1082   const ZTracer::Trace &trace)
1083 {
1084   map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1085   assert(i != tid_to_op_map.end());
1086   if (op.committed) {
1087     trace.event("sub write committed");
1088     assert(i->second.pending_commit.count(from));
1089     i->second.pending_commit.erase(from);
1090     if (from != get_parent()->whoami_shard()) {
1091       get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1092     }
1093   }
1094   if (op.applied) {
1095     trace.event("sub write applied");
1096     assert(i->second.pending_apply.count(from));
1097     i->second.pending_apply.erase(from);
1098   }
1099
1100   if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1101     dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1102     i->second.on_all_applied->complete(0);
1103     i->second.on_all_applied = 0;
1104     i->second.trace.event("ec write all applied");
1105   }
1106   if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1107     dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1108     i->second.on_all_commit->complete(0);
1109     i->second.on_all_commit = 0;
1110     i->second.trace.event("ec write all committed");
1111   }
1112   check_ops();
1113 }
1114
1115 void ECBackend::handle_sub_read_reply(
1116   pg_shard_t from,
1117   ECSubReadReply &op,
1118   RecoveryMessages *m,
1119   const ZTracer::Trace &trace)
1120 {
1121   trace.event("ec sub read reply");
1122   dout(10) << __func__ << ": reply " << op << dendl;
1123   map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1124   if (iter == tid_to_read_map.end()) {
1125     //canceled
1126     dout(20) << __func__ << ": dropped " << op << dendl;
1127     return;
1128   }
1129   ReadOp &rop = iter->second;
1130   for (auto i = op.buffers_read.begin();
1131        i != op.buffers_read.end();
1132        ++i) {
1133     assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1134     if (!rop.to_read.count(i->first)) {
1135       // We canceled this read! @see filter_read_op
1136       dout(20) << __func__ << " to_read skipping" << dendl;
1137       continue;
1138     }
1139     list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1140       rop.to_read.find(i->first)->second.to_read.begin();
1141     list<
1142       boost::tuple<
1143         uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1144       rop.complete[i->first].returned.begin();
1145     for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1146          j != i->second.end();
1147          ++j, ++req_iter, ++riter) {
1148       assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1149       assert(riter != rop.complete[i->first].returned.end());
1150       pair<uint64_t, uint64_t> adjusted =
1151         sinfo.aligned_offset_len_to_chunk(
1152           make_pair(req_iter->get<0>(), req_iter->get<1>()));
1153       assert(adjusted.first == j->first);
1154       riter->get<2>()[from].claim(j->second);
1155     }
1156   }
1157   for (auto i = op.attrs_read.begin();
1158        i != op.attrs_read.end();
1159        ++i) {
1160     assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1161     if (!rop.to_read.count(i->first)) {
1162       // We canceled this read! @see filter_read_op
1163       dout(20) << __func__ << " to_read skipping" << dendl;
1164       continue;
1165     }
1166     rop.complete[i->first].attrs = map<string, bufferlist>();
1167     (*(rop.complete[i->first].attrs)).swap(i->second);
1168   }
1169   for (auto i = op.errors.begin();
1170        i != op.errors.end();
1171        ++i) {
1172     rop.complete[i->first].errors.insert(
1173       make_pair(
1174         from,
1175         i->second));
1176     dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1177   }
1178
1179   map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1180                                         shard_to_read_map.find(from);
1181   assert(siter != shard_to_read_map.end());
1182   assert(siter->second.count(op.tid));
1183   siter->second.erase(op.tid);
1184
1185   assert(rop.in_progress.count(from));
1186   rop.in_progress.erase(from);
1187   unsigned is_complete = 0;
1188   // For redundant reads check for completion as each shard comes in,
1189   // or in a non-recovery read check for completion once all the shards read.
1190   // TODO: It would be nice if recovery could send more reads too
1191   if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
1192     for (map<hobject_t, read_result_t>::const_iterator iter =
1193         rop.complete.begin();
1194       iter != rop.complete.end();
1195       ++iter) {
1196       set<int> have;
1197       for (map<pg_shard_t, bufferlist>::const_iterator j =
1198           iter->second.returned.front().get<2>().begin();
1199         j != iter->second.returned.front().get<2>().end();
1200         ++j) {
1201         have.insert(j->first.shard);
1202         dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1203       }
1204       set<int> want_to_read, dummy_minimum;
1205       get_want_to_read_shards(&want_to_read);
1206       int err;
1207       if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
1208         dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1209         if (rop.in_progress.empty()) {
1210           // If we don't have enough copies and we haven't sent reads for all shards
1211           // we can send the rest of the reads, if any.
1212           if (!rop.do_redundant_reads) {
1213             int r = send_all_remaining_reads(iter->first, rop);
1214             if (r == 0) {
1215               // We added to in_progress and not incrementing is_complete
1216               continue;
1217             }
1218             // Couldn't read any additional shards so handle as completed with errors
1219           }
1220           // We don't want to confuse clients / RBD with objectstore error
1221           // values in particular ENOENT.  We may have different error returns
1222           // from different shards, so we'll return minimum_to_decode() error
1223           // (usually EIO) to reader.  It is likely an error here is due to a
1224           // damaged pg.
1225           rop.complete[iter->first].r = err;
1226           ++is_complete;
1227         }
1228       } else {
1229         assert(rop.complete[iter->first].r == 0);
1230         if (!rop.complete[iter->first].errors.empty()) {
1231           if (cct->_conf->osd_read_ec_check_for_errors) {
1232             dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1233             err = rop.complete[iter->first].errors.begin()->second;
1234             rop.complete[iter->first].r = err;
1235           } else {
1236             get_parent()->clog_warn() << "Error(s) ignored for "
1237                                        << iter->first << " enough copies available";
1238             dout(10) << __func__ << " Error(s) ignored for " << iter->first
1239                      << " enough copies available" << dendl;
1240             rop.complete[iter->first].errors.clear();
1241           }
1242         }
1243         ++is_complete;
1244       }
1245     }
1246   }
1247   if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1248     dout(20) << __func__ << " Complete: " << rop << dendl;
1249     rop.trace.event("ec read complete");
1250     complete_read_op(rop, m);
1251   } else {
1252     dout(10) << __func__ << " readop not complete: " << rop << dendl;
1253   }
1254 }
1255
1256 void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1257 {
1258   map<hobject_t, read_request_t>::iterator reqiter =
1259     rop.to_read.begin();
1260   map<hobject_t, read_result_t>::iterator resiter =
1261     rop.complete.begin();
1262   assert(rop.to_read.size() == rop.complete.size());
1263   for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1264     if (reqiter->second.cb) {
1265       pair<RecoveryMessages *, read_result_t &> arg(
1266         m, resiter->second);
1267       reqiter->second.cb->complete(arg);
1268       reqiter->second.cb = NULL;
1269     }
1270   }
1271   tid_to_read_map.erase(rop.tid);
1272 }
1273
1274 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
1275   ECBackend *ec;
1276   ceph_tid_t tid;
1277   FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1278   void finish(ThreadPool::TPHandle &handle) override {
1279     auto ropiter = ec->tid_to_read_map.find(tid);
1280     assert(ropiter != ec->tid_to_read_map.end());
1281     int priority = ropiter->second.priority;
1282     RecoveryMessages rm;
1283     ec->complete_read_op(ropiter->second, &rm);
1284     ec->dispatch_recovery_messages(rm, priority);
1285   }
1286 };
1287
1288 void ECBackend::filter_read_op(
1289   const OSDMapRef& osdmap,
1290   ReadOp &op)
1291 {
1292   set<hobject_t> to_cancel;
1293   for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1294        i != op.source_to_obj.end();
1295        ++i) {
1296     if (osdmap->is_down(i->first.osd)) {
1297       to_cancel.insert(i->second.begin(), i->second.end());
1298       op.in_progress.erase(i->first);
1299       continue;
1300     }
1301   }
1302
1303   if (to_cancel.empty())
1304     return;
1305
1306   for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1307        i != op.source_to_obj.end();
1308        ) {
1309     for (set<hobject_t>::iterator j = i->second.begin();
1310          j != i->second.end();
1311          ) {
1312       if (to_cancel.count(*j))
1313         i->second.erase(j++);
1314       else
1315         ++j;
1316     }
1317     if (i->second.empty()) {
1318       op.source_to_obj.erase(i++);
1319     } else {
1320       assert(!osdmap->is_down(i->first.osd));
1321       ++i;
1322     }
1323   }
1324
1325   for (set<hobject_t>::iterator i = to_cancel.begin();
1326        i != to_cancel.end();
1327        ++i) {
1328     get_parent()->cancel_pull(*i);
1329
1330     assert(op.to_read.count(*i));
1331     read_request_t &req = op.to_read.find(*i)->second;
1332     dout(10) << __func__ << ": canceling " << req
1333              << "  for obj " << *i << dendl;
1334     assert(req.cb);
1335     delete req.cb;
1336     req.cb = NULL;
1337
1338     op.to_read.erase(*i);
1339     op.complete.erase(*i);
1340     recovery_ops.erase(*i);
1341   }
1342
1343   if (op.in_progress.empty()) {
1344     get_parent()->schedule_recovery_work(
1345       get_parent()->bless_gencontext(
1346         new FinishReadOp(this, op.tid)));
1347   }
1348 }
1349
1350 void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1351 {
1352   set<ceph_tid_t> tids_to_filter;
1353   for (map<pg_shard_t, set<ceph_tid_t> >::iterator 
1354        i = shard_to_read_map.begin();
1355        i != shard_to_read_map.end();
1356        ) {
1357     if (osdmap->is_down(i->first.osd)) {
1358       tids_to_filter.insert(i->second.begin(), i->second.end());
1359       shard_to_read_map.erase(i++);
1360     } else {
1361       ++i;
1362     }
1363   }
1364   for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1365        i != tids_to_filter.end();
1366        ++i) {
1367     map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1368     assert(j != tid_to_read_map.end());
1369     filter_read_op(osdmap, j->second);
1370   }
1371 }
1372
1373 void ECBackend::on_change()
1374 {
1375   dout(10) << __func__ << dendl;
1376
1377   completed_to = eversion_t();
1378   committed_to = eversion_t();
1379   pipeline_state.clear();
1380   waiting_reads.clear();
1381   waiting_state.clear();
1382   waiting_commit.clear();
1383   for (auto &&op: tid_to_op_map) {
1384     cache.release_write_pin(op.second.pin);
1385   }
1386   tid_to_op_map.clear();
1387
1388   for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1389        i != tid_to_read_map.end();
1390        ++i) {
1391     dout(10) << __func__ << ": cancelling " << i->second << dendl;
1392     for (map<hobject_t, read_request_t>::iterator j =
1393            i->second.to_read.begin();
1394          j != i->second.to_read.end();
1395          ++j) {
1396       delete j->second.cb;
1397       j->second.cb = 0;
1398     }
1399   }
1400   tid_to_read_map.clear();
1401   in_progress_client_reads.clear();
1402   shard_to_read_map.clear();
1403   clear_recovery_state();
1404 }
1405
1406 void ECBackend::clear_recovery_state()
1407 {
1408   recovery_ops.clear();
1409 }
1410
1411 void ECBackend::on_flushed()
1412 {
1413 }
1414
1415 void ECBackend::dump_recovery_info(Formatter *f) const
1416 {
1417   f->open_array_section("recovery_ops");
1418   for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1419        i != recovery_ops.end();
1420        ++i) {
1421     f->open_object_section("op");
1422     i->second.dump(f);
1423     f->close_section();
1424   }
1425   f->close_section();
1426   f->open_array_section("read_ops");
1427   for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1428        i != tid_to_read_map.end();
1429        ++i) {
1430     f->open_object_section("read_op");
1431     i->second.dump(f);
1432     f->close_section();
1433   }
1434   f->close_section();
1435 }
1436
1437 void ECBackend::submit_transaction(
1438   const hobject_t &hoid,
1439   const object_stat_sum_t &delta_stats,
1440   const eversion_t &at_version,
1441   PGTransactionUPtr &&t,
1442   const eversion_t &trim_to,
1443   const eversion_t &roll_forward_to,
1444   const vector<pg_log_entry_t> &log_entries,
1445   boost::optional<pg_hit_set_history_t> &hset_history,
1446   Context *on_local_applied_sync,
1447   Context *on_all_applied,
1448   Context *on_all_commit,
1449   ceph_tid_t tid,
1450   osd_reqid_t reqid,
1451   OpRequestRef client_op
1452   )
1453 {
1454   assert(!tid_to_op_map.count(tid));
1455   Op *op = &(tid_to_op_map[tid]);
1456   op->hoid = hoid;
1457   op->delta_stats = delta_stats;
1458   op->version = at_version;
1459   op->trim_to = trim_to;
1460   op->roll_forward_to = MAX(roll_forward_to, committed_to);
1461   op->log_entries = log_entries;
1462   std::swap(op->updated_hit_set_history, hset_history);
1463   op->on_local_applied_sync = on_local_applied_sync;
1464   op->on_all_applied = on_all_applied;
1465   op->on_all_commit = on_all_commit;
1466   op->tid = tid;
1467   op->reqid = reqid;
1468   op->client_op = client_op;
1469   if (client_op)
1470     op->trace = client_op->pg_trace;
1471   
1472   dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1473   start_rmw(op, std::move(t));
1474   dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1475 }
1476
1477 void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1478   if (!waiting_state.empty()) {
1479     waiting_state.back().on_write.emplace_back(std::move(cb));
1480   } else if (!waiting_reads.empty()) {
1481     waiting_reads.back().on_write.emplace_back(std::move(cb));
1482   } else {
1483     // Nothing earlier in the pipeline, just call it
1484     cb();
1485   }
1486 }
1487
1488 int ECBackend::get_min_avail_to_read_shards(
1489   const hobject_t &hoid,
1490   const set<int> &want,
1491   bool for_recovery,
1492   bool do_redundant_reads,
1493   set<pg_shard_t> *to_read)
1494 {
1495   // Make sure we don't do redundant reads for recovery
1496   assert(!for_recovery || !do_redundant_reads);
1497
1498   set<int> have;
1499   map<shard_id_t, pg_shard_t> shards;
1500
1501   for (set<pg_shard_t>::const_iterator i =
1502          get_parent()->get_acting_shards().begin();
1503        i != get_parent()->get_acting_shards().end();
1504        ++i) {
1505     dout(10) << __func__ << ": checking acting " << *i << dendl;
1506     const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1507     if (!missing.is_missing(hoid)) {
1508       assert(!have.count(i->shard));
1509       have.insert(i->shard);
1510       assert(!shards.count(i->shard));
1511       shards.insert(make_pair(i->shard, *i));
1512     }
1513   }
1514
1515   if (for_recovery) {
1516     for (set<pg_shard_t>::const_iterator i =
1517            get_parent()->get_backfill_shards().begin();
1518          i != get_parent()->get_backfill_shards().end();
1519          ++i) {
1520       if (have.count(i->shard)) {
1521         assert(shards.count(i->shard));
1522         continue;
1523       }
1524       dout(10) << __func__ << ": checking backfill " << *i << dendl;
1525       assert(!shards.count(i->shard));
1526       const pg_info_t &info = get_parent()->get_shard_info(*i);
1527       const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1528       if (hoid < info.last_backfill &&
1529           !missing.is_missing(hoid)) {
1530         have.insert(i->shard);
1531         shards.insert(make_pair(i->shard, *i));
1532       }
1533     }
1534
1535     map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1536       get_parent()->get_missing_loc_shards().find(hoid);
1537     if (miter != get_parent()->get_missing_loc_shards().end()) {
1538       for (set<pg_shard_t>::iterator i = miter->second.begin();
1539            i != miter->second.end();
1540            ++i) {
1541         dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1542         auto m = get_parent()->maybe_get_shard_missing(*i);
1543         if (m) {
1544           assert(!(*m).is_missing(hoid));
1545         }
1546         have.insert(i->shard);
1547         shards.insert(make_pair(i->shard, *i));
1548       }
1549     }
1550   }
1551
1552   set<int> need;
1553   int r = ec_impl->minimum_to_decode(want, have, &need);
1554   if (r < 0)
1555     return r;
1556
1557   if (do_redundant_reads) {
1558       need.swap(have);
1559   } 
1560
1561   if (!to_read)
1562     return 0;
1563
1564   for (set<int>::iterator i = need.begin();
1565        i != need.end();
1566        ++i) {
1567     assert(shards.count(shard_id_t(*i)));
1568     to_read->insert(shards[shard_id_t(*i)]);
1569   }
1570   return 0;
1571 }
1572
1573 int ECBackend::get_remaining_shards(
1574   const hobject_t &hoid,
1575   const set<int> &avail,
1576   set<pg_shard_t> *to_read)
1577 {
1578   set<int> need;
1579   map<shard_id_t, pg_shard_t> shards;
1580
1581   for (set<pg_shard_t>::const_iterator i =
1582          get_parent()->get_acting_shards().begin();
1583        i != get_parent()->get_acting_shards().end();
1584        ++i) {
1585     dout(10) << __func__ << ": checking acting " << *i << dendl;
1586     const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1587     if (!missing.is_missing(hoid)) {
1588       assert(!need.count(i->shard));
1589       need.insert(i->shard);
1590       assert(!shards.count(i->shard));
1591       shards.insert(make_pair(i->shard, *i));
1592     }
1593   }
1594
1595   if (!to_read)
1596     return 0;
1597
1598   for (set<int>::iterator i = need.begin();
1599        i != need.end();
1600        ++i) {
1601     assert(shards.count(shard_id_t(*i)));
1602     if (avail.find(*i) == avail.end())
1603       to_read->insert(shards[shard_id_t(*i)]);
1604   }
1605   return 0;
1606 }
1607
1608 void ECBackend::start_read_op(
1609   int priority,
1610   map<hobject_t, read_request_t> &to_read,
1611   OpRequestRef _op,
1612   bool do_redundant_reads,
1613   bool for_recovery)
1614 {
1615   ceph_tid_t tid = get_parent()->get_tid();
1616   assert(!tid_to_read_map.count(tid));
1617   auto &op = tid_to_read_map.emplace(
1618     tid,
1619     ReadOp(
1620       priority,
1621       tid,
1622       do_redundant_reads,
1623       for_recovery,
1624       _op,
1625       std::move(to_read))).first->second;
1626   dout(10) << __func__ << ": starting " << op << dendl;
1627   if (_op) {
1628     op.trace = _op->pg_trace;
1629     op.trace.event("start ec read");
1630   }
1631   do_read_op(op);
1632 }
1633
1634 void ECBackend::do_read_op(ReadOp &op)
1635 {
1636   int priority = op.priority;
1637   ceph_tid_t tid = op.tid;
1638
1639   dout(10) << __func__ << ": starting read " << op << dendl;
1640
1641   map<pg_shard_t, ECSubRead> messages;
1642   for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1643        i != op.to_read.end();
1644        ++i) {
1645     bool need_attrs = i->second.want_attrs;
1646     for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1647          j != i->second.need.end();
1648          ++j) {
1649       if (need_attrs) {
1650         messages[*j].attrs_to_read.insert(i->first);
1651         need_attrs = false;
1652       }
1653       op.obj_to_source[i->first].insert(*j);
1654       op.source_to_obj[*j].insert(i->first);
1655     }
1656     for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1657            i->second.to_read.begin();
1658          j != i->second.to_read.end();
1659          ++j) {
1660       pair<uint64_t, uint64_t> chunk_off_len =
1661         sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1662       for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1663            k != i->second.need.end();
1664            ++k) {
1665         messages[*k].to_read[i->first].push_back(
1666           boost::make_tuple(
1667             chunk_off_len.first,
1668             chunk_off_len.second,
1669             j->get<2>()));
1670       }
1671       assert(!need_attrs);
1672     }
1673   }
1674
1675   for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1676        i != messages.end();
1677        ++i) {
1678     op.in_progress.insert(i->first);
1679     shard_to_read_map[i->first].insert(op.tid);
1680     i->second.tid = tid;
1681     MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1682     msg->set_priority(priority);
1683     msg->pgid = spg_t(
1684       get_parent()->whoami_spg_t().pgid,
1685       i->first.shard);
1686     msg->map_epoch = get_parent()->get_epoch();
1687     msg->min_epoch = get_parent()->get_interval_start_epoch();
1688     msg->op = i->second;
1689     msg->op.from = get_parent()->whoami_shard();
1690     msg->op.tid = tid;
1691     if (op.trace) {
1692       // initialize a child span for this shard
1693       msg->trace.init("ec sub read", nullptr, &op.trace);
1694       msg->trace.keyval("shard", i->first.shard.id);
1695     }
1696     get_parent()->send_message_osd_cluster(
1697       i->first.osd,
1698       msg,
1699       get_parent()->get_epoch());
1700   }
1701   dout(10) << __func__ << ": started " << op << dendl;
1702 }
1703
1704 ECUtil::HashInfoRef ECBackend::get_hash_info(
1705   const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1706 {
1707   dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1708   ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1709   if (!ref) {
1710     dout(10) << __func__ << ": not in cache " << hoid << dendl;
1711     struct stat st;
1712     int r = store->stat(
1713       ch,
1714       ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1715       &st);
1716     ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1717     // XXX: What does it mean if there is no object on disk?
1718     if (r >= 0) {
1719       dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1720       bufferlist bl;
1721       if (attrs) {
1722         map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1723         if (k == attrs->end()) {
1724           dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1725         } else {
1726           bl.push_back(k->second);
1727         }
1728       } else {
1729         r = store->getattr(
1730           ch,
1731           ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1732           ECUtil::get_hinfo_key(),
1733           bl);
1734         if (r < 0) {
1735           dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1736           bl.clear(); // just in case
1737         }
1738       }
1739       if (bl.length() > 0) {
1740         bufferlist::iterator bp = bl.begin();
1741         ::decode(hinfo, bp);
1742         if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1743           dout(0) << __func__ << ": Mismatch of total_chunk_size "
1744                                << hinfo.get_total_chunk_size() << dendl;
1745           return ECUtil::HashInfoRef();
1746         }
1747       } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1748         return ECUtil::HashInfoRef();
1749       }
1750     }
1751     ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1752   }
1753   return ref;
1754 }
1755
1756 void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1757 {
1758   assert(op);
1759
1760   op->plan = ECTransaction::get_write_plan(
1761     sinfo,
1762     std::move(t),
1763     [&](const hobject_t &i) {
1764       ECUtil::HashInfoRef ref = get_hash_info(i, false);
1765       if (!ref) {
1766         derr << __func__ << ": get_hash_info(" << i << ")"
1767              << " returned a null pointer and there is no "
1768              << " way to recover from such an error in this "
1769              << " context" << dendl;
1770         ceph_abort();
1771       }
1772       return ref;
1773     },
1774     get_parent()->get_dpp());
1775
1776   dout(10) << __func__ << ": " << *op << dendl;
1777
1778   waiting_state.push_back(*op);
1779   check_ops();
1780 }
1781
1782 bool ECBackend::try_state_to_reads()
1783 {
1784   if (waiting_state.empty())
1785     return false;
1786
1787   Op *op = &(waiting_state.front());
1788   if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1789     assert(get_parent()->get_pool().allows_ecoverwrites());
1790     dout(20) << __func__ << ": blocking " << *op
1791              << " because it requires an rmw and the cache is invalid "
1792              << pipeline_state
1793              << dendl;
1794     return false;
1795   }
1796
1797   if (op->invalidates_cache()) {
1798     dout(20) << __func__ << ": invalidating cache after this op"
1799              << dendl;
1800     pipeline_state.invalidate();
1801     op->using_cache = false;
1802   } else {
1803     op->using_cache = pipeline_state.caching_enabled();
1804   }
1805
1806   waiting_state.pop_front();
1807   waiting_reads.push_back(*op);
1808
1809   if (op->using_cache) {
1810     cache.open_write_pin(op->pin);
1811
1812     extent_set empty;
1813     for (auto &&hpair: op->plan.will_write) {
1814       auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1815       const extent_set &to_read_plan =
1816         to_read_plan_iter == op->plan.to_read.end() ?
1817         empty :
1818         to_read_plan_iter->second;
1819
1820       extent_set remote_read = cache.reserve_extents_for_rmw(
1821         hpair.first,
1822         op->pin,
1823         hpair.second,
1824         to_read_plan);
1825
1826       extent_set pending_read = to_read_plan;
1827       pending_read.subtract(remote_read);
1828
1829       if (!remote_read.empty()) {
1830         op->remote_read[hpair.first] = std::move(remote_read);
1831       }
1832       if (!pending_read.empty()) {
1833         op->pending_read[hpair.first] = std::move(pending_read);
1834       }
1835     }
1836   } else {
1837     op->remote_read = op->plan.to_read;
1838   }
1839
1840   dout(10) << __func__ << ": " << *op << dendl;
1841
1842   if (!op->remote_read.empty()) {
1843     assert(get_parent()->get_pool().allows_ecoverwrites());
1844     objects_read_async_no_cache(
1845       op->remote_read,
1846       [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1847         for (auto &&i: results) {
1848           op->remote_read_result.emplace(i.first, i.second.second);
1849         }
1850         check_ops();
1851       });
1852   }
1853
1854   return true;
1855 }
1856
1857 bool ECBackend::try_reads_to_commit()
1858 {
1859   if (waiting_reads.empty())
1860     return false;
1861   Op *op = &(waiting_reads.front());
1862   if (op->read_in_progress())
1863     return false;
1864   waiting_reads.pop_front();
1865   waiting_commit.push_back(*op);
1866
1867   dout(10) << __func__ << ": starting commit on " << *op << dendl;
1868   dout(20) << __func__ << ": " << cache << dendl;
1869
1870   get_parent()->apply_stats(
1871     op->hoid,
1872     op->delta_stats);
1873
1874   if (op->using_cache) {
1875     for (auto &&hpair: op->pending_read) {
1876       op->remote_read_result[hpair.first].insert(
1877         cache.get_remaining_extents_for_rmw(
1878           hpair.first,
1879           op->pin,
1880           hpair.second));
1881     }
1882     op->pending_read.clear();
1883   } else {
1884     assert(op->pending_read.empty());
1885   }
1886
1887   map<shard_id_t, ObjectStore::Transaction> trans;
1888   for (set<pg_shard_t>::const_iterator i =
1889          get_parent()->get_actingbackfill_shards().begin();
1890        i != get_parent()->get_actingbackfill_shards().end();
1891        ++i) {
1892     trans[i->shard];
1893   }
1894
1895   op->trace.event("start ec write");
1896
1897   map<hobject_t,extent_map> written;
1898   if (op->plan.t) {
1899     ECTransaction::generate_transactions(
1900       op->plan,
1901       ec_impl,
1902       get_parent()->get_info().pgid.pgid,
1903       (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
1904       sinfo,
1905       op->remote_read_result,
1906       op->log_entries,
1907       &written,
1908       &trans,
1909       &(op->temp_added),
1910       &(op->temp_cleared),
1911       get_parent()->get_dpp());
1912   }
1913
1914   dout(20) << __func__ << ": " << cache << dendl;
1915   dout(20) << __func__ << ": written: " << written << dendl;
1916   dout(20) << __func__ << ": op: " << *op << dendl;
1917
1918   if (!get_parent()->get_pool().allows_ecoverwrites()) {
1919     for (auto &&i: op->log_entries) {
1920       if (i.requires_kraken()) {
1921         derr << __func__ << ": log entry " << i << " requires kraken"
1922              << " but overwrites are not enabled!" << dendl;
1923         ceph_abort();
1924       }
1925     }
1926   }
1927
1928   map<hobject_t,extent_set> written_set;
1929   for (auto &&i: written) {
1930     written_set[i.first] = i.second.get_interval_set();
1931   }
1932   dout(20) << __func__ << ": written_set: " << written_set << dendl;
1933   assert(written_set == op->plan.will_write);
1934
1935   if (op->using_cache) {
1936     for (auto &&hpair: written) {
1937       dout(20) << __func__ << ": " << hpair << dendl;
1938       cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1939     }
1940   }
1941   op->remote_read.clear();
1942   op->remote_read_result.clear();
1943
1944   dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1945   ObjectStore::Transaction empty;
1946   bool should_write_local = false;
1947   ECSubWrite local_write_op;
1948   for (set<pg_shard_t>::const_iterator i =
1949          get_parent()->get_actingbackfill_shards().begin();
1950        i != get_parent()->get_actingbackfill_shards().end();
1951        ++i) {
1952     op->pending_apply.insert(*i);
1953     op->pending_commit.insert(*i);
1954     map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1955       trans.find(i->shard);
1956     assert(iter != trans.end());
1957     bool should_send = get_parent()->should_send_op(*i, op->hoid);
1958     const pg_stat_t &stats =
1959       should_send ?
1960       get_info().stats :
1961       parent->get_shard_info().find(*i)->second.stats;
1962
1963     ECSubWrite sop(
1964       get_parent()->whoami_shard(),
1965       op->tid,
1966       op->reqid,
1967       op->hoid,
1968       stats,
1969       should_send ? iter->second : empty,
1970       op->version,
1971       op->trim_to,
1972       op->roll_forward_to,
1973       op->log_entries,
1974       op->updated_hit_set_history,
1975       op->temp_added,
1976       op->temp_cleared,
1977       !should_send);
1978
1979     ZTracer::Trace trace;
1980     if (op->trace) {
1981       // initialize a child span for this shard
1982       trace.init("ec sub write", nullptr, &op->trace);
1983       trace.keyval("shard", i->shard.id);
1984     }
1985
1986     if (*i == get_parent()->whoami_shard()) {
1987       should_write_local = true;
1988       local_write_op.claim(sop);
1989     } else {
1990       MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
1991       r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
1992       r->map_epoch = get_parent()->get_epoch();
1993       r->min_epoch = get_parent()->get_interval_start_epoch();
1994       r->trace = trace;
1995       get_parent()->send_message_osd_cluster(
1996         i->osd, r, get_parent()->get_epoch());
1997     }
1998   }
1999   if (should_write_local) {
2000       handle_sub_write(
2001         get_parent()->whoami_shard(),
2002         op->client_op,
2003         local_write_op,
2004         op->trace,
2005         op->on_local_applied_sync);
2006       op->on_local_applied_sync = 0;
2007   }
2008
2009   for (auto i = op->on_write.begin();
2010        i != op->on_write.end();
2011        op->on_write.erase(i++)) {
2012     (*i)();
2013   }
2014
2015   return true;
2016 }
2017
2018 bool ECBackend::try_finish_rmw()
2019 {
2020   if (waiting_commit.empty())
2021     return false;
2022   Op *op = &(waiting_commit.front());
2023   if (op->write_in_progress())
2024     return false;
2025   waiting_commit.pop_front();
2026
2027   dout(10) << __func__ << ": " << *op << dendl;
2028   dout(20) << __func__ << ": " << cache << dendl;
2029
2030   if (op->roll_forward_to > completed_to)
2031     completed_to = op->roll_forward_to;
2032   if (op->version > committed_to)
2033     committed_to = op->version;
2034
2035   if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
2036     if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2037         waiting_reads.empty() &&
2038         waiting_commit.empty()) {
2039       // submit a dummy transaction to kick the rollforward
2040       auto tid = get_parent()->get_tid();
2041       Op *nop = &(tid_to_op_map[tid]);
2042       nop->hoid = op->hoid;
2043       nop->trim_to = op->trim_to;
2044       nop->roll_forward_to = op->version;
2045       nop->tid = tid;
2046       nop->reqid = op->reqid;
2047       waiting_reads.push_back(*nop);
2048     }
2049   }
2050
2051   if (op->using_cache) {
2052     cache.release_write_pin(op->pin);
2053   }
2054   tid_to_op_map.erase(op->tid);
2055
2056   if (waiting_reads.empty() &&
2057       waiting_commit.empty()) {
2058     pipeline_state.clear();
2059     dout(20) << __func__ << ": clearing pipeline_state "
2060              << pipeline_state
2061              << dendl;
2062   }
2063   return true;
2064 }
2065
2066 void ECBackend::check_ops()
2067 {
2068   while (try_state_to_reads() ||
2069          try_reads_to_commit() ||
2070          try_finish_rmw());
2071 }
2072
2073 int ECBackend::objects_read_sync(
2074   const hobject_t &hoid,
2075   uint64_t off,
2076   uint64_t len,
2077   uint32_t op_flags,
2078   bufferlist *bl)
2079 {
2080   return -EOPNOTSUPP;
2081 }
2082
2083 void ECBackend::objects_read_async(
2084   const hobject_t &hoid,
2085   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2086              pair<bufferlist*, Context*> > > &to_read,
2087   Context *on_complete,
2088   bool fast_read)
2089 {
2090   map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2091     reads;
2092
2093   uint32_t flags = 0;
2094   extent_set es;
2095   for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2096          pair<bufferlist*, Context*> > >::const_iterator i =
2097          to_read.begin();
2098        i != to_read.end();
2099        ++i) {
2100     pair<uint64_t, uint64_t> tmp =
2101       sinfo.offset_len_to_stripe_bounds(
2102         make_pair(i->first.get<0>(), i->first.get<1>()));
2103
2104     extent_set esnew;
2105     esnew.insert(tmp.first, tmp.second);
2106     es.union_of(esnew);
2107     flags |= i->first.get<2>();
2108   }
2109
2110   if (!es.empty()) {
2111     auto &offsets = reads[hoid];
2112     for (auto j = es.begin();
2113          j != es.end();
2114          ++j) {
2115       offsets.push_back(
2116         boost::make_tuple(
2117           j.get_start(),
2118           j.get_len(),
2119           flags));
2120     }
2121   }
2122
2123   struct cb {
2124     ECBackend *ec;
2125     hobject_t hoid;
2126     list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2127               pair<bufferlist*, Context*> > > to_read;
2128     unique_ptr<Context> on_complete;
2129     cb(const cb&) = delete;
2130     cb(cb &&) = default;
2131     cb(ECBackend *ec,
2132        const hobject_t &hoid,
2133        const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2134                   pair<bufferlist*, Context*> > > &to_read,
2135        Context *on_complete)
2136       : ec(ec),
2137         hoid(hoid),
2138         to_read(to_read),
2139         on_complete(on_complete) {}
2140     void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2141       auto dpp = ec->get_parent()->get_dpp();
2142       ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2143                          << dendl;
2144       ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2145                          << dendl;
2146
2147       auto &got = results[hoid];
2148
2149       int r = 0;
2150       for (auto &&read: to_read) {
2151         if (got.first < 0) {
2152           if (read.second.second) {
2153             read.second.second->complete(got.first);
2154           }
2155           if (r == 0)
2156             r = got.first;
2157         } else {
2158           assert(read.second.first);
2159           uint64_t offset = read.first.get<0>();
2160           uint64_t length = read.first.get<1>();
2161           auto range = got.second.get_containing_range(offset, length);
2162           assert(range.first != range.second);
2163           assert(range.first.get_off() <= offset);
2164           assert(
2165             (offset + length) <=
2166             (range.first.get_off() + range.first.get_len()));
2167           read.second.first->substr_of(
2168             range.first.get_val(),
2169             offset - range.first.get_off(),
2170             length);
2171           if (read.second.second) {
2172             read.second.second->complete(length);
2173             read.second.second = nullptr;
2174           }
2175         }
2176       }
2177       to_read.clear();
2178       if (on_complete) {
2179         on_complete.release()->complete(r);
2180       }
2181     }
2182     ~cb() {
2183       for (auto &&i: to_read) {
2184         delete i.second.second;
2185       }
2186       to_read.clear();
2187     }
2188   };
2189   objects_read_and_reconstruct(
2190     reads,
2191     fast_read,
2192     make_gen_lambda_context<
2193       map<hobject_t,pair<int, extent_map> > &&, cb>(
2194         cb(this,
2195            hoid,
2196            to_read,
2197            on_complete)));
2198 }
2199
2200 struct CallClientContexts :
2201   public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2202   hobject_t hoid;
2203   ECBackend *ec;
2204   ECBackend::ClientAsyncReadStatus *status;
2205   list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2206   CallClientContexts(
2207     hobject_t hoid,
2208     ECBackend *ec,
2209     ECBackend::ClientAsyncReadStatus *status,
2210     const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2211     : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2212   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2213     ECBackend::read_result_t &res = in.second;
2214     extent_map result;
2215     if (res.r != 0)
2216       goto out;
2217     assert(res.returned.size() == to_read.size());
2218     assert(res.r == 0);
2219     assert(res.errors.empty());
2220     for (auto &&read: to_read) {
2221       pair<uint64_t, uint64_t> adjusted =
2222         ec->sinfo.offset_len_to_stripe_bounds(
2223           make_pair(read.get<0>(), read.get<1>()));
2224       assert(res.returned.front().get<0>() == adjusted.first &&
2225              res.returned.front().get<1>() == adjusted.second);
2226       map<int, bufferlist> to_decode;
2227       bufferlist bl;
2228       for (map<pg_shard_t, bufferlist>::iterator j =
2229              res.returned.front().get<2>().begin();
2230            j != res.returned.front().get<2>().end();
2231            ++j) {
2232         to_decode[j->first.shard].claim(j->second);
2233       }
2234       int r = ECUtil::decode(
2235         ec->sinfo,
2236         ec->ec_impl,
2237         to_decode,
2238         &bl);
2239       if (r < 0) {
2240         res.r = r;
2241         goto out;
2242       }
2243       bufferlist trimmed;
2244       trimmed.substr_of(
2245         bl,
2246         read.get<0>() - adjusted.first,
2247         MIN(read.get<1>(),
2248             bl.length() - (read.get<0>() - adjusted.first)));
2249       result.insert(
2250         read.get<0>(), trimmed.length(), std::move(trimmed));
2251       res.returned.pop_front();
2252     }
2253 out:
2254     status->complete_object(hoid, res.r, std::move(result));
2255     ec->kick_reads();
2256   }
2257 };
2258
2259 void ECBackend::objects_read_and_reconstruct(
2260   const map<hobject_t,
2261     std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2262   > &reads,
2263   bool fast_read,
2264   GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2265 {
2266   in_progress_client_reads.emplace_back(
2267     reads.size(), std::move(func));
2268   if (!reads.size()) {
2269     kick_reads();
2270     return;
2271   }
2272
2273   set<int> want_to_read;
2274   get_want_to_read_shards(&want_to_read);
2275     
2276   map<hobject_t, read_request_t> for_read_op;
2277   for (auto &&to_read: reads) {
2278     set<pg_shard_t> shards;
2279     int r = get_min_avail_to_read_shards(
2280       to_read.first,
2281       want_to_read,
2282       false,
2283       fast_read,
2284       &shards);
2285     assert(r == 0);
2286
2287     CallClientContexts *c = new CallClientContexts(
2288       to_read.first,
2289       this,
2290       &(in_progress_client_reads.back()),
2291       to_read.second);
2292     for_read_op.insert(
2293       make_pair(
2294         to_read.first,
2295         read_request_t(
2296           to_read.second,
2297           shards,
2298           false,
2299           c)));
2300   }
2301
2302   start_read_op(
2303     CEPH_MSG_PRIO_DEFAULT,
2304     for_read_op,
2305     OpRequestRef(),
2306     fast_read, false);
2307   return;
2308 }
2309
2310
2311 int ECBackend::send_all_remaining_reads(
2312   const hobject_t &hoid,
2313   ReadOp &rop)
2314 {
2315   set<int> already_read;
2316   const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2317   for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2318     already_read.insert(i->shard);
2319   dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2320   set<pg_shard_t> shards;
2321   int r = get_remaining_shards(hoid, already_read, &shards);
2322   if (r)
2323     return r;
2324   if (shards.empty())
2325     return -EIO;
2326
2327   dout(10) << __func__ << " Read remaining shards " << shards << dendl;
2328
2329   // TODOSAM: this doesn't seem right
2330   list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2331     rop.to_read.find(hoid)->second.to_read;
2332   GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2333     rop.to_read.find(hoid)->second.cb;
2334
2335   map<hobject_t, read_request_t> for_read_op;
2336   for_read_op.insert(
2337     make_pair(
2338       hoid,
2339       read_request_t(
2340         offsets,
2341         shards,
2342         false,
2343         c)));
2344
2345   rop.to_read.swap(for_read_op);
2346   do_read_op(rop);
2347   return 0;
2348 }
2349
2350 int ECBackend::objects_get_attrs(
2351   const hobject_t &hoid,
2352   map<string, bufferlist> *out)
2353 {
2354   int r = store->getattrs(
2355     ch,
2356     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2357     *out);
2358   if (r < 0)
2359     return r;
2360
2361   for (map<string, bufferlist>::iterator i = out->begin();
2362        i != out->end();
2363        ) {
2364     if (ECUtil::is_hinfo_key_string(i->first))
2365       out->erase(i++);
2366     else
2367       ++i;
2368   }
2369   return r;
2370 }
2371
2372 void ECBackend::rollback_append(
2373   const hobject_t &hoid,
2374   uint64_t old_size,
2375   ObjectStore::Transaction *t)
2376 {
2377   assert(old_size % sinfo.get_stripe_width() == 0);
2378   t->truncate(
2379     coll,
2380     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2381     sinfo.aligned_logical_offset_to_chunk_offset(
2382       old_size));
2383 }
2384
2385 void ECBackend::be_deep_scrub(
2386   const hobject_t &poid,
2387   uint32_t seed,
2388   ScrubMap::object &o,
2389   ThreadPool::TPHandle &handle) {
2390   bufferhash h(-1); // we always used -1
2391   int r;
2392   uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2393   if (stride % sinfo.get_chunk_size())
2394     stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2395   uint64_t pos = 0;
2396
2397   uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2398
2399   while (true) {
2400     bufferlist bl;
2401     handle.reset_tp_timeout();
2402     r = store->read(
2403       ch,
2404       ghobject_t(
2405         poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2406       pos,
2407       stride, bl,
2408       fadvise_flags);
2409     if (r < 0)
2410       break;
2411     if (bl.length() % sinfo.get_chunk_size()) {
2412       r = -EIO;
2413       break;
2414     }
2415     pos += r;
2416     h << bl;
2417     if ((unsigned)r < stride)
2418       break;
2419   }
2420
2421   if (r == -EIO) {
2422     dout(0) << "_scan_list  " << poid << " got "
2423             << r << " on read, read_error" << dendl;
2424     o.read_error = true;
2425     return;
2426   }
2427
2428   ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2429   if (!hinfo) {
2430     dout(0) << "_scan_list  " << poid << " could not retrieve hash info" << dendl;
2431     o.read_error = true;
2432     o.digest_present = false;
2433     return;
2434   } else {
2435     if (!get_parent()->get_pool().allows_ecoverwrites()) {
2436       assert(hinfo->has_chunk_hash());
2437       if (hinfo->get_total_chunk_size() != pos) {
2438         dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
2439         o.ec_size_mismatch = true;
2440         return;
2441       }
2442
2443       if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
2444         dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
2445         o.ec_hash_mismatch = true;
2446         return;
2447       }
2448
2449       /* We checked above that we match our own stored hash.  We cannot
2450        * send a hash of the actual object, so instead we simply send
2451        * our locally stored hash of shard 0 on the assumption that if
2452        * we match our chunk hash and our recollection of the hash for
2453        * chunk 0 matches that of our peers, there is likely no corruption.
2454        */
2455       o.digest = hinfo->get_chunk_hash(0);
2456       o.digest_present = true;
2457     } else {
2458       /* Hack! We must be using partial overwrites, and partial overwrites
2459        * don't support deep-scrub yet
2460        */
2461       o.digest = 0;
2462       o.digest_present = true;
2463     }
2464   }
2465
2466   o.omap_digest = seed;
2467   o.omap_digest_present = true;
2468 }