initial code repo
[stor4nfv.git] / src / ceph / src / osd / ECBackend.cc
diff --git a/src/ceph/src/osd/ECBackend.cc b/src/ceph/src/osd/ECBackend.cc
new file mode 100644 (file)
index 0000000..40be6a3
--- /dev/null
@@ -0,0 +1,2468 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include "ECBackend.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDECSubOpWrite.h"
+#include "messages/MOSDECSubOpWriteReply.h"
+#include "messages/MOSDECSubOpRead.h"
+#include "messages/MOSDECSubOpReadReply.h"
+#include "ECMsgTypes.h"
+
+#include "PrimaryLogPG.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
+  return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
+
+struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
+  list<ECBackend::RecoveryOp> ops;
+};
+
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
+  switch (rhs.pipeline_state) {
+  case ECBackend::pipeline_state_t::CACHE_VALID:
+    return lhs << "CACHE_VALID";
+  case ECBackend::pipeline_state_t::CACHE_INVALID:
+    return lhs << "CACHE_INVALID";
+  default:
+    assert(0 == "invalid pipeline state");
+  }
+  return lhs; // unreachable
+}
+
+static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
+{
+  lhs << "[";
+  for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
+       i != rhs.end();
+       ++i) {
+    if (i != rhs.begin())
+      lhs << ", ";
+    lhs << make_pair(i->first, i->second.length());
+  }
+  return lhs << "]";
+}
+
+static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
+{
+  lhs << "[";
+  for (map<int, bufferlist>::const_iterator i = rhs.begin();
+       i != rhs.end();
+       ++i) {
+    if (i != rhs.begin())
+      lhs << ", ";
+    lhs << make_pair(i->first, i->second.length());
+  }
+  return lhs << "]";
+}
+
+static ostream &operator<<(
+  ostream &lhs,
+  const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
+{
+  return lhs << "(" << rhs.get<0>() << ", "
+            << rhs.get<1>() << ", " << rhs.get<2>() << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
+{
+  return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
+            << ", need=" << rhs.need
+            << ", want_attrs=" << rhs.want_attrs
+            << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
+{
+  lhs << "read_result_t(r=" << rhs.r
+      << ", errors=" << rhs.errors;
+  if (rhs.attrs) {
+    lhs << ", attrs=" << rhs.attrs.get();
+  } else {
+    lhs << ", noattrs";
+  }
+  return lhs << ", returned=" << rhs.returned << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
+{
+  lhs << "ReadOp(tid=" << rhs.tid;
+  if (rhs.op && rhs.op->get_req()) {
+    lhs << ", op=";
+    rhs.op->get_req()->print(lhs);
+  }
+  return lhs << ", to_read=" << rhs.to_read
+            << ", complete=" << rhs.complete
+            << ", priority=" << rhs.priority
+            << ", obj_to_source=" << rhs.obj_to_source
+            << ", source_to_obj=" << rhs.source_to_obj
+            << ", in_progress=" << rhs.in_progress << ")";
+}
+
+void ECBackend::ReadOp::dump(Formatter *f) const
+{
+  f->dump_unsigned("tid", tid);
+  if (op && op->get_req()) {
+    f->dump_stream("op") << *(op->get_req());
+  }
+  f->dump_stream("to_read") << to_read;
+  f->dump_stream("complete") << complete;
+  f->dump_int("priority", priority);
+  f->dump_stream("obj_to_source") << obj_to_source;
+  f->dump_stream("source_to_obj") << source_to_obj;
+  f->dump_stream("in_progress") << in_progress;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
+{
+  lhs << "Op(" << rhs.hoid
+      << " v=" << rhs.version
+      << " tt=" << rhs.trim_to
+      << " tid=" << rhs.tid
+      << " reqid=" << rhs.reqid;
+  if (rhs.client_op && rhs.client_op->get_req()) {
+    lhs << " client_op=";
+    rhs.client_op->get_req()->print(lhs);
+  }
+  lhs << " roll_forward_to=" << rhs.roll_forward_to
+      << " temp_added=" << rhs.temp_added
+      << " temp_cleared=" << rhs.temp_cleared
+      << " pending_read=" << rhs.pending_read
+      << " remote_read=" << rhs.remote_read
+      << " remote_read_result=" << rhs.remote_read_result
+      << " pending_apply=" << rhs.pending_apply
+      << " pending_commit=" << rhs.pending_commit
+      << " plan.to_read=" << rhs.plan.to_read
+      << " plan.will_write=" << rhs.plan.will_write
+      << ")";
+  return lhs;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
+{
+  return lhs << "RecoveryOp("
+            << "hoid=" << rhs.hoid
+            << " v=" << rhs.v
+            << " missing_on=" << rhs.missing_on
+            << " missing_on_shards=" << rhs.missing_on_shards
+            << " recovery_info=" << rhs.recovery_info
+            << " recovery_progress=" << rhs.recovery_progress
+            << " obc refcount=" << rhs.obc.use_count()
+            << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
+            << " waiting_on_pushes=" << rhs.waiting_on_pushes
+            << " extent_requested=" << rhs.extent_requested
+            << ")";
+}
+
+void ECBackend::RecoveryOp::dump(Formatter *f) const
+{
+  f->dump_stream("hoid") << hoid;
+  f->dump_stream("v") << v;
+  f->dump_stream("missing_on") << missing_on;
+  f->dump_stream("missing_on_shards") << missing_on_shards;
+  f->dump_stream("recovery_info") << recovery_info;
+  f->dump_stream("recovery_progress") << recovery_progress;
+  f->dump_stream("state") << tostr(state);
+  f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
+  f->dump_stream("extent_requested") << extent_requested;
+}
+
+ECBackend::ECBackend(
+  PGBackend::Listener *pg,
+  coll_t coll,
+  ObjectStore::CollectionHandle &ch,
+  ObjectStore *store,
+  CephContext *cct,
+  ErasureCodeInterfaceRef ec_impl,
+  uint64_t stripe_width)
+  : PGBackend(cct, pg, store, coll, ch),
+    ec_impl(ec_impl),
+    sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
+  assert((ec_impl->get_data_chunk_count() *
+         ec_impl->get_chunk_size(stripe_width)) == stripe_width);
+}
+
+PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
+{
+  return new ECRecoveryHandle;
+}
+
+void ECBackend::_failed_push(const hobject_t &hoid,
+  pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
+{
+  ECBackend::read_result_t &res = in.second;
+  dout(10) << __func__ << ": Read error " << hoid << " r="
+          << res.r << " errors=" << res.errors << dendl;
+  dout(10) << __func__ << ": canceling recovery op for obj " << hoid
+          << dendl;
+  assert(recovery_ops.count(hoid));
+  recovery_ops.erase(hoid);
+
+  list<pg_shard_t> fl;
+  for (auto&& i : res.errors) {
+    fl.push_back(i.first);
+  }
+  get_parent()->failed_push(fl, hoid);
+}
+
+struct OnRecoveryReadComplete :
+  public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+  ECBackend *pg;
+  hobject_t hoid;
+  set<int> want;
+  OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
+    : pg(pg), hoid(hoid) {}
+  void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
+    ECBackend::read_result_t &res = in.second;
+    if (!(res.r == 0 && res.errors.empty())) {
+        pg->_failed_push(hoid, in);
+        return;
+    }
+    assert(res.returned.size() == 1);
+    pg->handle_recovery_read_complete(
+      hoid,
+      res.returned.back(),
+      res.attrs,
+      in.first);
+  }
+};
+
+struct RecoveryMessages {
+  map<hobject_t,
+      ECBackend::read_request_t> reads;
+  void read(
+    ECBackend *ec,
+    const hobject_t &hoid, uint64_t off, uint64_t len,
+    const set<pg_shard_t> &need,
+    bool attrs) {
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+    to_read.push_back(boost::make_tuple(off, len, 0));
+    assert(!reads.count(hoid));
+    reads.insert(
+      make_pair(
+       hoid,
+       ECBackend::read_request_t(
+         to_read,
+         need,
+         attrs,
+         new OnRecoveryReadComplete(
+           ec,
+           hoid))));
+  }
+
+  map<pg_shard_t, vector<PushOp> > pushes;
+  map<pg_shard_t, vector<PushReplyOp> > push_replies;
+  ObjectStore::Transaction t;
+  RecoveryMessages() {}
+  ~RecoveryMessages(){}
+};
+
+void ECBackend::handle_recovery_push(
+  const PushOp &op,
+  RecoveryMessages *m)
+{
+  ostringstream ss;
+  if (get_parent()->check_failsafe_full(ss)) {
+    dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
+    ceph_abort();
+  }
+
+  bool oneshot = op.before_progress.first && op.after_progress.data_complete;
+  ghobject_t tobj;
+  if (oneshot) {
+    tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
+                     get_parent()->whoami_shard().shard);
+  } else {
+    tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
+                                                            op.version),
+                     ghobject_t::NO_GEN,
+                     get_parent()->whoami_shard().shard);
+    if (op.before_progress.first) {
+      dout(10) << __func__ << ": Adding oid "
+              << tobj.hobj << " in the temp collection" << dendl;
+      add_temp_obj(tobj.hobj);
+    }
+  }
+
+  if (op.before_progress.first) {
+    m->t.remove(coll, tobj);
+    m->t.touch(coll, tobj);
+  }
+
+  if (!op.data_included.empty()) {
+    uint64_t start = op.data_included.range_start();
+    uint64_t end = op.data_included.range_end();
+    assert(op.data.length() == (end - start));
+
+    m->t.write(
+      coll,
+      tobj,
+      start,
+      op.data.length(),
+      op.data);
+  } else {
+    assert(op.data.length() == 0);
+  }
+
+  if (op.before_progress.first) {
+    assert(op.attrset.count(string("_")));
+    m->t.setattrs(
+      coll,
+      tobj,
+      op.attrset);
+  }
+
+  if (op.after_progress.data_complete && !oneshot) {
+    dout(10) << __func__ << ": Removing oid "
+            << tobj.hobj << " from the temp collection" << dendl;
+    clear_temp_obj(tobj.hobj);
+    m->t.remove(coll, ghobject_t(
+       op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+    m->t.collection_move_rename(
+      coll, tobj,
+      coll, ghobject_t(
+       op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+  }
+  if (op.after_progress.data_complete) {
+    if ((get_parent()->pgb_is_primary())) {
+      assert(recovery_ops.count(op.soid));
+      assert(recovery_ops[op.soid].obc);
+      get_parent()->on_local_recover(
+       op.soid,
+       op.recovery_info,
+       recovery_ops[op.soid].obc,
+       false,
+       &m->t);
+    } else {
+      get_parent()->on_local_recover(
+       op.soid,
+       op.recovery_info,
+       ObjectContextRef(),
+       false,
+       &m->t);
+    }
+  }
+  m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
+  m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
+}
+
+void ECBackend::handle_recovery_push_reply(
+  const PushReplyOp &op,
+  pg_shard_t from,
+  RecoveryMessages *m)
+{
+  if (!recovery_ops.count(op.soid))
+    return;
+  RecoveryOp &rop = recovery_ops[op.soid];
+  assert(rop.waiting_on_pushes.count(from));
+  rop.waiting_on_pushes.erase(from);
+  continue_recovery_op(rop, m);
+}
+
+void ECBackend::handle_recovery_read_complete(
+  const hobject_t &hoid,
+  boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+  boost::optional<map<string, bufferlist> > attrs,
+  RecoveryMessages *m)
+{
+  dout(10) << __func__ << ": returned " << hoid << " "
+          << "(" << to_read.get<0>()
+          << ", " << to_read.get<1>()
+          << ", " << to_read.get<2>()
+          << ")"
+          << dendl;
+  assert(recovery_ops.count(hoid));
+  RecoveryOp &op = recovery_ops[hoid];
+  assert(op.returned_data.empty());
+  map<int, bufferlist*> target;
+  for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
+       i != op.missing_on_shards.end();
+       ++i) {
+    target[*i] = &(op.returned_data[*i]);
+  }
+  map<int, bufferlist> from;
+  for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
+      i != to_read.get<2>().end();
+      ++i) {
+    from[i->first.shard].claim(i->second);
+  }
+  dout(10) << __func__ << ": " << from << dendl;
+  int r = ECUtil::decode(sinfo, ec_impl, from, target);
+  assert(r == 0);
+  if (attrs) {
+    op.xattrs.swap(*attrs);
+
+    if (!op.obc) {
+      // attrs only reference the origin bufferlist (decode from
+      // ECSubReadReply message) whose size is much greater than attrs
+      // in recovery. If obc cache it (get_obc maybe cache the attr),
+      // this causes the whole origin bufferlist would not be free
+      // until obc is evicted from obc cache. So rebuild the
+      // bufferlist before cache it.
+      for (map<string, bufferlist>::iterator it = op.xattrs.begin();
+           it != op.xattrs.end();
+           ++it) {
+        it->second.rebuild();
+      }
+      // Need to remove ECUtil::get_hinfo_key() since it should not leak out
+      // of the backend (see bug #12983)
+      map<string, bufferlist> sanitized_attrs(op.xattrs);
+      sanitized_attrs.erase(ECUtil::get_hinfo_key());
+      op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
+      assert(op.obc);
+      op.recovery_info.size = op.obc->obs.oi.size;
+      op.recovery_info.oi = op.obc->obs.oi;
+    }
+
+    ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+    if (op.obc->obs.oi.size > 0) {
+      assert(op.xattrs.count(ECUtil::get_hinfo_key()));
+      bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
+      ::decode(hinfo, bp);
+    }
+    op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+  }
+  assert(op.xattrs.size());
+  assert(op.obc);
+  continue_recovery_op(op, m);
+}
+
+struct SendPushReplies : public Context {
+  PGBackend::Listener *l;
+  epoch_t epoch;
+  map<int, MOSDPGPushReply*> replies;
+  SendPushReplies(
+    PGBackend::Listener *l,
+    epoch_t epoch,
+    map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
+    replies.swap(in);
+  }
+  void finish(int) override {
+    for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+        i != replies.end();
+        ++i) {
+      l->send_message_osd_cluster(i->first, i->second, epoch);
+    }
+    replies.clear();
+  }
+  ~SendPushReplies() override {
+    for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+        i != replies.end();
+        ++i) {
+      i->second->put();
+    }
+    replies.clear();
+  }
+};
+
+void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
+{
+  for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
+       i != m.pushes.end();
+       m.pushes.erase(i++)) {
+    MOSDPGPush *msg = new MOSDPGPush();
+    msg->set_priority(priority);
+    msg->map_epoch = get_parent()->get_epoch();
+    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+    msg->from = get_parent()->whoami_shard();
+    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+    msg->pushes.swap(i->second);
+    msg->compute_cost(cct);
+    get_parent()->send_message(
+      i->first.osd,
+      msg);
+  }
+  map<int, MOSDPGPushReply*> replies;
+  for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
+        m.push_replies.begin();
+       i != m.push_replies.end();
+       m.push_replies.erase(i++)) {
+    MOSDPGPushReply *msg = new MOSDPGPushReply();
+    msg->set_priority(priority);
+    msg->map_epoch = get_parent()->get_epoch();
+    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+    msg->from = get_parent()->whoami_shard();
+    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+    msg->replies.swap(i->second);
+    msg->compute_cost(cct);
+    replies.insert(make_pair(i->first.osd, msg));
+  }
+
+  if (!replies.empty()) {
+    (m.t).register_on_complete(
+       get_parent()->bless_context(
+         new SendPushReplies(
+           get_parent(),
+           get_parent()->get_epoch(),
+           replies)));
+    get_parent()->queue_transaction(std::move(m.t));
+  } 
+
+  if (m.reads.empty())
+    return;
+  start_read_op(
+    priority,
+    m.reads,
+    OpRequestRef(),
+    false, true);
+}
+
+void ECBackend::continue_recovery_op(
+  RecoveryOp &op,
+  RecoveryMessages *m)
+{
+  dout(10) << __func__ << ": continuing " << op << dendl;
+  while (1) {
+    switch (op.state) {
+    case RecoveryOp::IDLE: {
+      // start read
+      op.state = RecoveryOp::READING;
+      assert(!op.recovery_progress.data_complete);
+      set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
+      uint64_t from = op.recovery_progress.data_recovered_to;
+      uint64_t amount = get_recovery_chunk_size();
+
+      if (op.recovery_progress.first && op.obc) {
+       /* We've got the attrs and the hinfo, might as well use them */
+       op.hinfo = get_hash_info(op.hoid);
+       assert(op.hinfo);
+       op.xattrs = op.obc->attr_cache;
+       ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
+      }
+
+      set<pg_shard_t> to_read;
+      int r = get_min_avail_to_read_shards(
+       op.hoid, want, true, false, &to_read);
+      if (r != 0) {
+       // we must have lost a recovery source
+       assert(!op.recovery_progress.first);
+       dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
+                << dendl;
+       get_parent()->cancel_pull(op.hoid);
+       recovery_ops.erase(op.hoid);
+       return;
+      }
+      m->read(
+       this,
+       op.hoid,
+       op.recovery_progress.data_recovered_to,
+       amount,
+       to_read,
+       op.recovery_progress.first && !op.obc);
+      op.extent_requested = make_pair(
+       from,
+       amount);
+      dout(10) << __func__ << ": IDLE return " << op << dendl;
+      return;
+    }
+    case RecoveryOp::READING: {
+      // read completed, start write
+      assert(op.xattrs.size());
+      assert(op.returned_data.size());
+      op.state = RecoveryOp::WRITING;
+      ObjectRecoveryProgress after_progress = op.recovery_progress;
+      after_progress.data_recovered_to += op.extent_requested.second;
+      after_progress.first = false;
+      if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
+       after_progress.data_recovered_to =
+         sinfo.logical_to_next_stripe_offset(
+           op.obc->obs.oi.size);
+       after_progress.data_complete = true;
+      }
+      for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
+          mi != op.missing_on.end();
+          ++mi) {
+       assert(op.returned_data.count(mi->shard));
+       m->pushes[*mi].push_back(PushOp());
+       PushOp &pop = m->pushes[*mi].back();
+       pop.soid = op.hoid;
+       pop.version = op.v;
+       pop.data = op.returned_data[mi->shard];
+       dout(10) << __func__ << ": before_progress=" << op.recovery_progress
+                << ", after_progress=" << after_progress
+                << ", pop.data.length()=" << pop.data.length()
+                << ", size=" << op.obc->obs.oi.size << dendl;
+       assert(
+         pop.data.length() ==
+         sinfo.aligned_logical_offset_to_chunk_offset(
+           after_progress.data_recovered_to -
+           op.recovery_progress.data_recovered_to)
+         );
+       if (pop.data.length())
+         pop.data_included.insert(
+           sinfo.aligned_logical_offset_to_chunk_offset(
+             op.recovery_progress.data_recovered_to),
+           pop.data.length()
+           );
+       if (op.recovery_progress.first) {
+         pop.attrset = op.xattrs;
+       }
+       pop.recovery_info = op.recovery_info;
+       pop.before_progress = op.recovery_progress;
+       pop.after_progress = after_progress;
+       if (*mi != get_parent()->primary_shard())
+         get_parent()->begin_peer_recover(
+           *mi,
+           op.hoid);
+      }
+      op.returned_data.clear();
+      op.waiting_on_pushes = op.missing_on;
+      op.recovery_progress = after_progress;
+      dout(10) << __func__ << ": READING return " << op << dendl;
+      return;
+    }
+    case RecoveryOp::WRITING: {
+      if (op.waiting_on_pushes.empty()) {
+       if (op.recovery_progress.data_complete) {
+         op.state = RecoveryOp::COMPLETE;
+         for (set<pg_shard_t>::iterator i = op.missing_on.begin();
+              i != op.missing_on.end();
+              ++i) {
+           if (*i != get_parent()->primary_shard()) {
+             dout(10) << __func__ << ": on_peer_recover on " << *i
+                      << ", obj " << op.hoid << dendl;
+             get_parent()->on_peer_recover(
+               *i,
+               op.hoid,
+               op.recovery_info);
+           }
+         }
+         object_stat_sum_t stat;
+         stat.num_bytes_recovered = op.recovery_info.size;
+         stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
+         stat.num_objects_recovered = 1;
+         get_parent()->on_global_recover(op.hoid, stat, false);
+         dout(10) << __func__ << ": WRITING return " << op << dendl;
+         recovery_ops.erase(op.hoid);
+         return;
+       } else {
+         op.state = RecoveryOp::IDLE;
+         dout(10) << __func__ << ": WRITING continue " << op << dendl;
+         continue;
+       }
+      }
+      return;
+    }
+    // should never be called once complete
+    case RecoveryOp::COMPLETE:
+    default: {
+      ceph_abort();
+    };
+    }
+  }
+}
+
+void ECBackend::run_recovery_op(
+  RecoveryHandle *_h,
+  int priority)
+{
+  ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+  RecoveryMessages m;
+  for (list<RecoveryOp>::iterator i = h->ops.begin();
+       i != h->ops.end();
+       ++i) {
+    dout(10) << __func__ << ": starting " << *i << dendl;
+    assert(!recovery_ops.count(i->hoid));
+    RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
+    continue_recovery_op(op, &m);
+  }
+
+  dispatch_recovery_messages(m, priority);
+  send_recovery_deletes(priority, h->deletes);
+  delete _h;
+}
+
+int ECBackend::recover_object(
+  const hobject_t &hoid,
+  eversion_t v,
+  ObjectContextRef head,
+  ObjectContextRef obc,
+  RecoveryHandle *_h)
+{
+  ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+  h->ops.push_back(RecoveryOp());
+  h->ops.back().v = v;
+  h->ops.back().hoid = hoid;
+  h->ops.back().obc = obc;
+  h->ops.back().recovery_info.soid = hoid;
+  h->ops.back().recovery_info.version = v;
+  if (obc) {
+    h->ops.back().recovery_info.size = obc->obs.oi.size;
+    h->ops.back().recovery_info.oi = obc->obs.oi;
+  }
+  if (hoid.is_snap()) {
+    if (obc) {
+      assert(obc->ssc);
+      h->ops.back().recovery_info.ss = obc->ssc->snapset;
+    } else if (head) {
+      assert(head->ssc);
+      h->ops.back().recovery_info.ss = head->ssc->snapset;
+    } else {
+      assert(0 == "neither obc nor head set for a snap object");
+    }
+  }
+  h->ops.back().recovery_progress.omap_complete = true;
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_actingbackfill_shards().begin();
+       i != get_parent()->get_actingbackfill_shards().end();
+       ++i) {
+    dout(10) << "checking " << *i << dendl;
+    if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
+      h->ops.back().missing_on.insert(*i);
+      h->ops.back().missing_on_shards.insert(i->shard);
+    }
+  }
+  dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
+  return 0;
+}
+
+bool ECBackend::can_handle_while_inactive(
+  OpRequestRef _op)
+{
+  return false;
+}
+
+bool ECBackend::_handle_message(
+  OpRequestRef _op)
+{
+  dout(10) << __func__ << ": " << *_op->get_req() << dendl;
+  int priority = _op->get_req()->get_priority();
+  switch (_op->get_req()->get_type()) {
+  case MSG_OSD_EC_WRITE: {
+    // NOTE: this is non-const because handle_sub_write modifies the embedded
+    // ObjectStore::Transaction in place (and then std::move's it).  It does
+    // not conflict with ECSubWrite's operator<<.
+    MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
+      _op->get_nonconst_req());
+    handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
+    return true;
+  }
+  case MSG_OSD_EC_WRITE_REPLY: {
+    const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
+      _op->get_req());
+    handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
+    return true;
+  }
+  case MSG_OSD_EC_READ: {
+    const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
+    MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
+    reply->pgid = get_parent()->primary_spg_t();
+    reply->map_epoch = get_parent()->get_epoch();
+    reply->min_epoch = get_parent()->get_interval_start_epoch();
+    handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
+    reply->trace = _op->pg_trace;
+    get_parent()->send_message_osd_cluster(
+      op->op.from.osd, reply, get_parent()->get_epoch());
+    return true;
+  }
+  case MSG_OSD_EC_READ_REPLY: {
+    // NOTE: this is non-const because handle_sub_read_reply steals resulting
+    // buffers.  It does not conflict with ECSubReadReply operator<<.
+    MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
+      _op->get_nonconst_req());
+    RecoveryMessages rm;
+    handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
+    dispatch_recovery_messages(rm, priority);
+    return true;
+  }
+  case MSG_OSD_PG_PUSH: {
+    const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
+    RecoveryMessages rm;
+    for (vector<PushOp>::const_iterator i = op->pushes.begin();
+        i != op->pushes.end();
+        ++i) {
+      handle_recovery_push(*i, &rm);
+    }
+    dispatch_recovery_messages(rm, priority);
+    return true;
+  }
+  case MSG_OSD_PG_PUSH_REPLY: {
+    const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
+      _op->get_req());
+    RecoveryMessages rm;
+    for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
+        i != op->replies.end();
+        ++i) {
+      handle_recovery_push_reply(*i, op->from, &rm);
+    }
+    dispatch_recovery_messages(rm, priority);
+    return true;
+  }
+  default:
+    return false;
+  }
+  return false;
+}
+
+struct SubWriteCommitted : public Context {
+  ECBackend *pg;
+  OpRequestRef msg;
+  ceph_tid_t tid;
+  eversion_t version;
+  eversion_t last_complete;
+  const ZTracer::Trace trace;
+  SubWriteCommitted(
+    ECBackend *pg,
+    OpRequestRef msg,
+    ceph_tid_t tid,
+    eversion_t version,
+    eversion_t last_complete,
+    const ZTracer::Trace &trace)
+    : pg(pg), msg(msg), tid(tid),
+      version(version), last_complete(last_complete), trace(trace) {}
+  void finish(int) override {
+    if (msg)
+      msg->mark_event("sub_op_committed");
+    pg->sub_write_committed(tid, version, last_complete, trace);
+  }
+};
+void ECBackend::sub_write_committed(
+  ceph_tid_t tid, eversion_t version, eversion_t last_complete,
+  const ZTracer::Trace &trace) {
+  if (get_parent()->pgb_is_primary()) {
+    ECSubWriteReply reply;
+    reply.tid = tid;
+    reply.last_complete = last_complete;
+    reply.committed = true;
+    reply.from = get_parent()->whoami_shard();
+    handle_sub_write_reply(
+      get_parent()->whoami_shard(),
+      reply, trace);
+  } else {
+    get_parent()->update_last_complete_ondisk(last_complete);
+    MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
+    r->pgid = get_parent()->primary_spg_t();
+    r->map_epoch = get_parent()->get_epoch();
+    r->min_epoch = get_parent()->get_interval_start_epoch();
+    r->op.tid = tid;
+    r->op.last_complete = last_complete;
+    r->op.committed = true;
+    r->op.from = get_parent()->whoami_shard();
+    r->set_priority(CEPH_MSG_PRIO_HIGH);
+    r->trace = trace;
+    r->trace.event("sending sub op commit");
+    get_parent()->send_message_osd_cluster(
+      get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
+  }
+}
+
+struct SubWriteApplied : public Context {
+  ECBackend *pg;
+  OpRequestRef msg;
+  ceph_tid_t tid;
+  eversion_t version;
+  const ZTracer::Trace trace;
+  SubWriteApplied(
+    ECBackend *pg,
+    OpRequestRef msg,
+    ceph_tid_t tid,
+    eversion_t version,
+    const ZTracer::Trace &trace)
+    : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
+  void finish(int) override {
+    if (msg)
+      msg->mark_event("sub_op_applied");
+    pg->sub_write_applied(tid, version, trace);
+  }
+};
+void ECBackend::sub_write_applied(
+  ceph_tid_t tid, eversion_t version,
+  const ZTracer::Trace &trace) {
+  parent->op_applied(version);
+  if (get_parent()->pgb_is_primary()) {
+    ECSubWriteReply reply;
+    reply.from = get_parent()->whoami_shard();
+    reply.tid = tid;
+    reply.applied = true;
+    handle_sub_write_reply(
+      get_parent()->whoami_shard(),
+      reply, trace);
+  } else {
+    MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
+    r->pgid = get_parent()->primary_spg_t();
+    r->map_epoch = get_parent()->get_epoch();
+    r->min_epoch = get_parent()->get_interval_start_epoch();
+    r->op.from = get_parent()->whoami_shard();
+    r->op.tid = tid;
+    r->op.applied = true;
+    r->set_priority(CEPH_MSG_PRIO_HIGH);
+    r->trace = trace;
+    r->trace.event("sending sub op apply");
+    get_parent()->send_message_osd_cluster(
+      get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
+  }
+}
+
+void ECBackend::handle_sub_write(
+  pg_shard_t from,
+  OpRequestRef msg,
+  ECSubWrite &op,
+  const ZTracer::Trace &trace,
+  Context *on_local_applied_sync)
+{
+  if (msg)
+    msg->mark_started();
+  trace.event("handle_sub_write");
+  assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
+  if (!get_parent()->pgb_is_primary())
+    get_parent()->update_stats(op.stats);
+  ObjectStore::Transaction localt;
+  if (!op.temp_added.empty()) {
+    add_temp_objs(op.temp_added);
+  }
+  if (op.backfill) {
+    for (set<hobject_t>::iterator i = op.temp_removed.begin();
+        i != op.temp_removed.end();
+        ++i) {
+      dout(10) << __func__ << ": removing object " << *i
+              << " since we won't get the transaction" << dendl;
+      localt.remove(
+       coll,
+       ghobject_t(
+         *i,
+         ghobject_t::NO_GEN,
+         get_parent()->whoami_shard().shard));
+    }
+  }
+  clear_temp_objs(op.temp_removed);
+  get_parent()->log_operation(
+    op.log_entries,
+    op.updated_hit_set_history,
+    op.trim_to,
+    op.roll_forward_to,
+    !op.backfill,
+    localt);
+
+  PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
+  if (_rPG && !_rPG->is_undersized() &&
+      (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
+    op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+  if (on_local_applied_sync) {
+    dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
+    localt.register_on_applied_sync(on_local_applied_sync);
+  }
+  localt.register_on_commit(
+    get_parent()->bless_context(
+      new SubWriteCommitted(
+       this, msg, op.tid,
+       op.at_version,
+       get_parent()->get_info().last_complete, trace)));
+  localt.register_on_applied(
+    get_parent()->bless_context(
+      new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
+  vector<ObjectStore::Transaction> tls;
+  tls.reserve(2);
+  tls.push_back(std::move(op.t));
+  tls.push_back(std::move(localt));
+  get_parent()->queue_transactions(tls, msg);
+}
+
+void ECBackend::handle_sub_read(
+  pg_shard_t from,
+  const ECSubRead &op,
+  ECSubReadReply *reply,
+  const ZTracer::Trace &trace)
+{
+  trace.event("handle sub read");
+  shard_id_t shard = get_parent()->whoami_shard().shard;
+  for(auto i = op.to_read.begin();
+      i != op.to_read.end();
+      ++i) {
+    int r = 0;
+    ECUtil::HashInfoRef hinfo;
+    if (!get_parent()->get_pool().allows_ecoverwrites()) {
+      hinfo = get_hash_info(i->first);
+      if (!hinfo) {
+       r = -EIO;
+       get_parent()->clog_error() << "Corruption detected: object " << i->first
+                                   << " is missing hash_info";
+       dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+       goto error;
+      }
+    }
+    for (auto j = i->second.begin(); j != i->second.end(); ++j) {
+      bufferlist bl;
+      r = store->read(
+       ch,
+       ghobject_t(i->first, ghobject_t::NO_GEN, shard),
+       j->get<0>(),
+       j->get<1>(),
+       bl, j->get<2>());
+      if (r < 0) {
+       get_parent()->clog_error() << "Error " << r
+                                  << " reading object "
+                                  << i->first;
+       dout(5) << __func__ << ": Error " << r
+               << " reading " << i->first << dendl;
+       goto error;
+      } else {
+        dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
+       reply->buffers_read[i->first].push_back(
+         make_pair(
+           j->get<0>(),
+           bl)
+         );
+      }
+
+      if (!get_parent()->get_pool().allows_ecoverwrites()) {
+       // This shows that we still need deep scrub because large enough files
+       // are read in sections, so the digest check here won't be done here.
+       // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
+       // the state of our chunk in case other chunks could substitute.
+       assert(hinfo->has_chunk_hash());
+       if ((bl.length() == hinfo->get_total_chunk_size()) &&
+           (j->get<0>() == 0)) {
+         dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+         bufferhash h(-1);
+         h << bl;
+         if (h.digest() != hinfo->get_chunk_hash(shard)) {
+           get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
+                                      << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
+           dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+                   << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+           r = -EIO;
+           goto error;
+         }
+       }
+      }
+    }
+    continue;
+error:
+    // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
+    // the state of our chunk in case other chunks could substitute.
+    reply->buffers_read.erase(i->first);
+    reply->errors[i->first] = r;
+  }
+  for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
+       i != op.attrs_to_read.end();
+       ++i) {
+    dout(10) << __func__ << ": fulfilling attr request on "
+            << *i << dendl;
+    if (reply->errors.count(*i))
+      continue;
+    int r = store->getattrs(
+      ch,
+      ghobject_t(
+       *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      reply->attrs_read[*i]);
+    if (r < 0) {
+      reply->buffers_read.erase(*i);
+      reply->errors[*i] = r;
+    }
+  }
+  reply->from = get_parent()->whoami_shard();
+  reply->tid = op.tid;
+}
+
+void ECBackend::handle_sub_write_reply(
+  pg_shard_t from,
+  const ECSubWriteReply &op,
+  const ZTracer::Trace &trace)
+{
+  map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
+  assert(i != tid_to_op_map.end());
+  if (op.committed) {
+    trace.event("sub write committed");
+    assert(i->second.pending_commit.count(from));
+    i->second.pending_commit.erase(from);
+    if (from != get_parent()->whoami_shard()) {
+      get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
+    }
+  }
+  if (op.applied) {
+    trace.event("sub write applied");
+    assert(i->second.pending_apply.count(from));
+    i->second.pending_apply.erase(from);
+  }
+
+  if (i->second.pending_apply.empty() && i->second.on_all_applied) {
+    dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
+    i->second.on_all_applied->complete(0);
+    i->second.on_all_applied = 0;
+    i->second.trace.event("ec write all applied");
+  }
+  if (i->second.pending_commit.empty() && i->second.on_all_commit) {
+    dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
+    i->second.on_all_commit->complete(0);
+    i->second.on_all_commit = 0;
+    i->second.trace.event("ec write all committed");
+  }
+  check_ops();
+}
+
+void ECBackend::handle_sub_read_reply(
+  pg_shard_t from,
+  ECSubReadReply &op,
+  RecoveryMessages *m,
+  const ZTracer::Trace &trace)
+{
+  trace.event("ec sub read reply");
+  dout(10) << __func__ << ": reply " << op << dendl;
+  map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
+  if (iter == tid_to_read_map.end()) {
+    //canceled
+    dout(20) << __func__ << ": dropped " << op << dendl;
+    return;
+  }
+  ReadOp &rop = iter->second;
+  for (auto i = op.buffers_read.begin();
+       i != op.buffers_read.end();
+       ++i) {
+    assert(!op.errors.count(i->first));        // If attribute error we better not have sent a buffer
+    if (!rop.to_read.count(i->first)) {
+      // We canceled this read! @see filter_read_op
+      dout(20) << __func__ << " to_read skipping" << dendl;
+      continue;
+    }
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
+      rop.to_read.find(i->first)->second.to_read.begin();
+    list<
+      boost::tuple<
+       uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
+      rop.complete[i->first].returned.begin();
+    for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
+        j != i->second.end();
+        ++j, ++req_iter, ++riter) {
+      assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
+      assert(riter != rop.complete[i->first].returned.end());
+      pair<uint64_t, uint64_t> adjusted =
+       sinfo.aligned_offset_len_to_chunk(
+         make_pair(req_iter->get<0>(), req_iter->get<1>()));
+      assert(adjusted.first == j->first);
+      riter->get<2>()[from].claim(j->second);
+    }
+  }
+  for (auto i = op.attrs_read.begin();
+       i != op.attrs_read.end();
+       ++i) {
+    assert(!op.errors.count(i->first));        // if read error better not have sent an attribute
+    if (!rop.to_read.count(i->first)) {
+      // We canceled this read! @see filter_read_op
+      dout(20) << __func__ << " to_read skipping" << dendl;
+      continue;
+    }
+    rop.complete[i->first].attrs = map<string, bufferlist>();
+    (*(rop.complete[i->first].attrs)).swap(i->second);
+  }
+  for (auto i = op.errors.begin();
+       i != op.errors.end();
+       ++i) {
+    rop.complete[i->first].errors.insert(
+      make_pair(
+       from,
+       i->second));
+    dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
+  }
+
+  map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
+                                       shard_to_read_map.find(from);
+  assert(siter != shard_to_read_map.end());
+  assert(siter->second.count(op.tid));
+  siter->second.erase(op.tid);
+
+  assert(rop.in_progress.count(from));
+  rop.in_progress.erase(from);
+  unsigned is_complete = 0;
+  // For redundant reads check for completion as each shard comes in,
+  // or in a non-recovery read check for completion once all the shards read.
+  // TODO: It would be nice if recovery could send more reads too
+  if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
+    for (map<hobject_t, read_result_t>::const_iterator iter =
+        rop.complete.begin();
+      iter != rop.complete.end();
+      ++iter) {
+      set<int> have;
+      for (map<pg_shard_t, bufferlist>::const_iterator j =
+          iter->second.returned.front().get<2>().begin();
+        j != iter->second.returned.front().get<2>().end();
+        ++j) {
+        have.insert(j->first.shard);
+        dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
+      }
+      set<int> want_to_read, dummy_minimum;
+      get_want_to_read_shards(&want_to_read);
+      int err;
+      if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
+       dout(20) << __func__ << " minimum_to_decode failed" << dendl;
+        if (rop.in_progress.empty()) {
+         // If we don't have enough copies and we haven't sent reads for all shards
+         // we can send the rest of the reads, if any.
+         if (!rop.do_redundant_reads) {
+           int r = send_all_remaining_reads(iter->first, rop);
+           if (r == 0) {
+             // We added to in_progress and not incrementing is_complete
+             continue;
+           }
+           // Couldn't read any additional shards so handle as completed with errors
+         }
+         // We don't want to confuse clients / RBD with objectstore error
+         // values in particular ENOENT.  We may have different error returns
+         // from different shards, so we'll return minimum_to_decode() error
+         // (usually EIO) to reader.  It is likely an error here is due to a
+         // damaged pg.
+         rop.complete[iter->first].r = err;
+         ++is_complete;
+       }
+      } else {
+        assert(rop.complete[iter->first].r == 0);
+       if (!rop.complete[iter->first].errors.empty()) {
+         if (cct->_conf->osd_read_ec_check_for_errors) {
+           dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
+           err = rop.complete[iter->first].errors.begin()->second;
+            rop.complete[iter->first].r = err;
+         } else {
+           get_parent()->clog_warn() << "Error(s) ignored for "
+                                      << iter->first << " enough copies available";
+           dout(10) << __func__ << " Error(s) ignored for " << iter->first
+                    << " enough copies available" << dendl;
+           rop.complete[iter->first].errors.clear();
+         }
+       }
+       ++is_complete;
+      }
+    }
+  }
+  if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
+    dout(20) << __func__ << " Complete: " << rop << dendl;
+    rop.trace.event("ec read complete");
+    complete_read_op(rop, m);
+  } else {
+    dout(10) << __func__ << " readop not complete: " << rop << dendl;
+  }
+}
+
+void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
+{
+  map<hobject_t, read_request_t>::iterator reqiter =
+    rop.to_read.begin();
+  map<hobject_t, read_result_t>::iterator resiter =
+    rop.complete.begin();
+  assert(rop.to_read.size() == rop.complete.size());
+  for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
+    if (reqiter->second.cb) {
+      pair<RecoveryMessages *, read_result_t &> arg(
+       m, resiter->second);
+      reqiter->second.cb->complete(arg);
+      reqiter->second.cb = NULL;
+    }
+  }
+  tid_to_read_map.erase(rop.tid);
+}
+
+struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
+  ECBackend *ec;
+  ceph_tid_t tid;
+  FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
+  void finish(ThreadPool::TPHandle &handle) override {
+    auto ropiter = ec->tid_to_read_map.find(tid);
+    assert(ropiter != ec->tid_to_read_map.end());
+    int priority = ropiter->second.priority;
+    RecoveryMessages rm;
+    ec->complete_read_op(ropiter->second, &rm);
+    ec->dispatch_recovery_messages(rm, priority);
+  }
+};
+
+void ECBackend::filter_read_op(
+  const OSDMapRef& osdmap,
+  ReadOp &op)
+{
+  set<hobject_t> to_cancel;
+  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
+       i != op.source_to_obj.end();
+       ++i) {
+    if (osdmap->is_down(i->first.osd)) {
+      to_cancel.insert(i->second.begin(), i->second.end());
+      op.in_progress.erase(i->first);
+      continue;
+    }
+  }
+
+  if (to_cancel.empty())
+    return;
+
+  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
+       i != op.source_to_obj.end();
+       ) {
+    for (set<hobject_t>::iterator j = i->second.begin();
+        j != i->second.end();
+        ) {
+      if (to_cancel.count(*j))
+       i->second.erase(j++);
+      else
+       ++j;
+    }
+    if (i->second.empty()) {
+      op.source_to_obj.erase(i++);
+    } else {
+      assert(!osdmap->is_down(i->first.osd));
+      ++i;
+    }
+  }
+
+  for (set<hobject_t>::iterator i = to_cancel.begin();
+       i != to_cancel.end();
+       ++i) {
+    get_parent()->cancel_pull(*i);
+
+    assert(op.to_read.count(*i));
+    read_request_t &req = op.to_read.find(*i)->second;
+    dout(10) << __func__ << ": canceling " << req
+            << "  for obj " << *i << dendl;
+    assert(req.cb);
+    delete req.cb;
+    req.cb = NULL;
+
+    op.to_read.erase(*i);
+    op.complete.erase(*i);
+    recovery_ops.erase(*i);
+  }
+
+  if (op.in_progress.empty()) {
+    get_parent()->schedule_recovery_work(
+      get_parent()->bless_gencontext(
+       new FinishReadOp(this, op.tid)));
+  }
+}
+
+void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
+{
+  set<ceph_tid_t> tids_to_filter;
+  for (map<pg_shard_t, set<ceph_tid_t> >::iterator 
+       i = shard_to_read_map.begin();
+       i != shard_to_read_map.end();
+       ) {
+    if (osdmap->is_down(i->first.osd)) {
+      tids_to_filter.insert(i->second.begin(), i->second.end());
+      shard_to_read_map.erase(i++);
+    } else {
+      ++i;
+    }
+  }
+  for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
+       i != tids_to_filter.end();
+       ++i) {
+    map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+    assert(j != tid_to_read_map.end());
+    filter_read_op(osdmap, j->second);
+  }
+}
+
+void ECBackend::on_change()
+{
+  dout(10) << __func__ << dendl;
+
+  completed_to = eversion_t();
+  committed_to = eversion_t();
+  pipeline_state.clear();
+  waiting_reads.clear();
+  waiting_state.clear();
+  waiting_commit.clear();
+  for (auto &&op: tid_to_op_map) {
+    cache.release_write_pin(op.second.pin);
+  }
+  tid_to_op_map.clear();
+
+  for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
+       i != tid_to_read_map.end();
+       ++i) {
+    dout(10) << __func__ << ": cancelling " << i->second << dendl;
+    for (map<hobject_t, read_request_t>::iterator j =
+          i->second.to_read.begin();
+        j != i->second.to_read.end();
+        ++j) {
+      delete j->second.cb;
+      j->second.cb = 0;
+    }
+  }
+  tid_to_read_map.clear();
+  in_progress_client_reads.clear();
+  shard_to_read_map.clear();
+  clear_recovery_state();
+}
+
+void ECBackend::clear_recovery_state()
+{
+  recovery_ops.clear();
+}
+
+void ECBackend::on_flushed()
+{
+}
+
+void ECBackend::dump_recovery_info(Formatter *f) const
+{
+  f->open_array_section("recovery_ops");
+  for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
+       i != recovery_ops.end();
+       ++i) {
+    f->open_object_section("op");
+    i->second.dump(f);
+    f->close_section();
+  }
+  f->close_section();
+  f->open_array_section("read_ops");
+  for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
+       i != tid_to_read_map.end();
+       ++i) {
+    f->open_object_section("read_op");
+    i->second.dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void ECBackend::submit_transaction(
+  const hobject_t &hoid,
+  const object_stat_sum_t &delta_stats,
+  const eversion_t &at_version,
+  PGTransactionUPtr &&t,
+  const eversion_t &trim_to,
+  const eversion_t &roll_forward_to,
+  const vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_history,
+  Context *on_local_applied_sync,
+  Context *on_all_applied,
+  Context *on_all_commit,
+  ceph_tid_t tid,
+  osd_reqid_t reqid,
+  OpRequestRef client_op
+  )
+{
+  assert(!tid_to_op_map.count(tid));
+  Op *op = &(tid_to_op_map[tid]);
+  op->hoid = hoid;
+  op->delta_stats = delta_stats;
+  op->version = at_version;
+  op->trim_to = trim_to;
+  op->roll_forward_to = MAX(roll_forward_to, committed_to);
+  op->log_entries = log_entries;
+  std::swap(op->updated_hit_set_history, hset_history);
+  op->on_local_applied_sync = on_local_applied_sync;
+  op->on_all_applied = on_all_applied;
+  op->on_all_commit = on_all_commit;
+  op->tid = tid;
+  op->reqid = reqid;
+  op->client_op = client_op;
+  if (client_op)
+    op->trace = client_op->pg_trace;
+  
+  dout(10) << __func__ << ": op " << *op << " starting" << dendl;
+  start_rmw(op, std::move(t));
+  dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+}
+
+void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
+  if (!waiting_state.empty()) {
+    waiting_state.back().on_write.emplace_back(std::move(cb));
+  } else if (!waiting_reads.empty()) {
+    waiting_reads.back().on_write.emplace_back(std::move(cb));
+  } else {
+    // Nothing earlier in the pipeline, just call it
+    cb();
+  }
+}
+
+int ECBackend::get_min_avail_to_read_shards(
+  const hobject_t &hoid,
+  const set<int> &want,
+  bool for_recovery,
+  bool do_redundant_reads,
+  set<pg_shard_t> *to_read)
+{
+  // Make sure we don't do redundant reads for recovery
+  assert(!for_recovery || !do_redundant_reads);
+
+  set<int> have;
+  map<shard_id_t, pg_shard_t> shards;
+
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_acting_shards().begin();
+       i != get_parent()->get_acting_shards().end();
+       ++i) {
+    dout(10) << __func__ << ": checking acting " << *i << dendl;
+    const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+    if (!missing.is_missing(hoid)) {
+      assert(!have.count(i->shard));
+      have.insert(i->shard);
+      assert(!shards.count(i->shard));
+      shards.insert(make_pair(i->shard, *i));
+    }
+  }
+
+  if (for_recovery) {
+    for (set<pg_shard_t>::const_iterator i =
+          get_parent()->get_backfill_shards().begin();
+        i != get_parent()->get_backfill_shards().end();
+        ++i) {
+      if (have.count(i->shard)) {
+       assert(shards.count(i->shard));
+       continue;
+      }
+      dout(10) << __func__ << ": checking backfill " << *i << dendl;
+      assert(!shards.count(i->shard));
+      const pg_info_t &info = get_parent()->get_shard_info(*i);
+      const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+      if (hoid < info.last_backfill &&
+         !missing.is_missing(hoid)) {
+       have.insert(i->shard);
+       shards.insert(make_pair(i->shard, *i));
+      }
+    }
+
+    map<hobject_t, set<pg_shard_t>>::const_iterator miter =
+      get_parent()->get_missing_loc_shards().find(hoid);
+    if (miter != get_parent()->get_missing_loc_shards().end()) {
+      for (set<pg_shard_t>::iterator i = miter->second.begin();
+          i != miter->second.end();
+          ++i) {
+       dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
+       auto m = get_parent()->maybe_get_shard_missing(*i);
+       if (m) {
+         assert(!(*m).is_missing(hoid));
+       }
+       have.insert(i->shard);
+       shards.insert(make_pair(i->shard, *i));
+      }
+    }
+  }
+
+  set<int> need;
+  int r = ec_impl->minimum_to_decode(want, have, &need);
+  if (r < 0)
+    return r;
+
+  if (do_redundant_reads) {
+      need.swap(have);
+  } 
+
+  if (!to_read)
+    return 0;
+
+  for (set<int>::iterator i = need.begin();
+       i != need.end();
+       ++i) {
+    assert(shards.count(shard_id_t(*i)));
+    to_read->insert(shards[shard_id_t(*i)]);
+  }
+  return 0;
+}
+
+int ECBackend::get_remaining_shards(
+  const hobject_t &hoid,
+  const set<int> &avail,
+  set<pg_shard_t> *to_read)
+{
+  set<int> need;
+  map<shard_id_t, pg_shard_t> shards;
+
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_acting_shards().begin();
+       i != get_parent()->get_acting_shards().end();
+       ++i) {
+    dout(10) << __func__ << ": checking acting " << *i << dendl;
+    const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+    if (!missing.is_missing(hoid)) {
+      assert(!need.count(i->shard));
+      need.insert(i->shard);
+      assert(!shards.count(i->shard));
+      shards.insert(make_pair(i->shard, *i));
+    }
+  }
+
+  if (!to_read)
+    return 0;
+
+  for (set<int>::iterator i = need.begin();
+       i != need.end();
+       ++i) {
+    assert(shards.count(shard_id_t(*i)));
+    if (avail.find(*i) == avail.end())
+      to_read->insert(shards[shard_id_t(*i)]);
+  }
+  return 0;
+}
+
+void ECBackend::start_read_op(
+  int priority,
+  map<hobject_t, read_request_t> &to_read,
+  OpRequestRef _op,
+  bool do_redundant_reads,
+  bool for_recovery)
+{
+  ceph_tid_t tid = get_parent()->get_tid();
+  assert(!tid_to_read_map.count(tid));
+  auto &op = tid_to_read_map.emplace(
+    tid,
+    ReadOp(
+      priority,
+      tid,
+      do_redundant_reads,
+      for_recovery,
+      _op,
+      std::move(to_read))).first->second;
+  dout(10) << __func__ << ": starting " << op << dendl;
+  if (_op) {
+    op.trace = _op->pg_trace;
+    op.trace.event("start ec read");
+  }
+  do_read_op(op);
+}
+
+void ECBackend::do_read_op(ReadOp &op)
+{
+  int priority = op.priority;
+  ceph_tid_t tid = op.tid;
+
+  dout(10) << __func__ << ": starting read " << op << dendl;
+
+  map<pg_shard_t, ECSubRead> messages;
+  for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
+       i != op.to_read.end();
+       ++i) {
+    bool need_attrs = i->second.want_attrs;
+    for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
+        j != i->second.need.end();
+        ++j) {
+      if (need_attrs) {
+       messages[*j].attrs_to_read.insert(i->first);
+       need_attrs = false;
+      }
+      op.obj_to_source[i->first].insert(*j);
+      op.source_to_obj[*j].insert(i->first);
+    }
+    for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
+          i->second.to_read.begin();
+        j != i->second.to_read.end();
+        ++j) {
+      pair<uint64_t, uint64_t> chunk_off_len =
+       sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
+      for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
+          k != i->second.need.end();
+          ++k) {
+       messages[*k].to_read[i->first].push_back(
+         boost::make_tuple(
+           chunk_off_len.first,
+           chunk_off_len.second,
+           j->get<2>()));
+      }
+      assert(!need_attrs);
+    }
+  }
+
+  for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
+       i != messages.end();
+       ++i) {
+    op.in_progress.insert(i->first);
+    shard_to_read_map[i->first].insert(op.tid);
+    i->second.tid = tid;
+    MOSDECSubOpRead *msg = new MOSDECSubOpRead;
+    msg->set_priority(priority);
+    msg->pgid = spg_t(
+      get_parent()->whoami_spg_t().pgid,
+      i->first.shard);
+    msg->map_epoch = get_parent()->get_epoch();
+    msg->min_epoch = get_parent()->get_interval_start_epoch();
+    msg->op = i->second;
+    msg->op.from = get_parent()->whoami_shard();
+    msg->op.tid = tid;
+    if (op.trace) {
+      // initialize a child span for this shard
+      msg->trace.init("ec sub read", nullptr, &op.trace);
+      msg->trace.keyval("shard", i->first.shard.id);
+    }
+    get_parent()->send_message_osd_cluster(
+      i->first.osd,
+      msg,
+      get_parent()->get_epoch());
+  }
+  dout(10) << __func__ << ": started " << op << dendl;
+}
+
+ECUtil::HashInfoRef ECBackend::get_hash_info(
+  const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
+{
+  dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
+  ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
+  if (!ref) {
+    dout(10) << __func__ << ": not in cache " << hoid << dendl;
+    struct stat st;
+    int r = store->stat(
+      ch,
+      ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      &st);
+    ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+    // XXX: What does it mean if there is no object on disk?
+    if (r >= 0) {
+      dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
+      bufferlist bl;
+      if (attrs) {
+       map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
+       if (k == attrs->end()) {
+         dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
+       } else {
+         bl.push_back(k->second);
+       }
+      } else {
+       r = store->getattr(
+         ch,
+         ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+         ECUtil::get_hinfo_key(),
+         bl);
+       if (r < 0) {
+         dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
+         bl.clear(); // just in case
+       }
+      }
+      if (bl.length() > 0) {
+       bufferlist::iterator bp = bl.begin();
+       ::decode(hinfo, bp);
+       if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
+         dout(0) << __func__ << ": Mismatch of total_chunk_size "
+                              << hinfo.get_total_chunk_size() << dendl;
+         return ECUtil::HashInfoRef();
+       }
+      } else if (st.st_size > 0) { // If empty object and no hinfo, create it
+       return ECUtil::HashInfoRef();
+      }
+    }
+    ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+  }
+  return ref;
+}
+
+void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
+{
+  assert(op);
+
+  op->plan = ECTransaction::get_write_plan(
+    sinfo,
+    std::move(t),
+    [&](const hobject_t &i) {
+      ECUtil::HashInfoRef ref = get_hash_info(i, false);
+      if (!ref) {
+       derr << __func__ << ": get_hash_info(" << i << ")"
+            << " returned a null pointer and there is no "
+            << " way to recover from such an error in this "
+            << " context" << dendl;
+       ceph_abort();
+      }
+      return ref;
+    },
+    get_parent()->get_dpp());
+
+  dout(10) << __func__ << ": " << *op << dendl;
+
+  waiting_state.push_back(*op);
+  check_ops();
+}
+
+bool ECBackend::try_state_to_reads()
+{
+  if (waiting_state.empty())
+    return false;
+
+  Op *op = &(waiting_state.front());
+  if (op->requires_rmw() && pipeline_state.cache_invalid()) {
+    assert(get_parent()->get_pool().allows_ecoverwrites());
+    dout(20) << __func__ << ": blocking " << *op
+            << " because it requires an rmw and the cache is invalid "
+            << pipeline_state
+            << dendl;
+    return false;
+  }
+
+  if (op->invalidates_cache()) {
+    dout(20) << __func__ << ": invalidating cache after this op"
+            << dendl;
+    pipeline_state.invalidate();
+    op->using_cache = false;
+  } else {
+    op->using_cache = pipeline_state.caching_enabled();
+  }
+
+  waiting_state.pop_front();
+  waiting_reads.push_back(*op);
+
+  if (op->using_cache) {
+    cache.open_write_pin(op->pin);
+
+    extent_set empty;
+    for (auto &&hpair: op->plan.will_write) {
+      auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
+      const extent_set &to_read_plan =
+       to_read_plan_iter == op->plan.to_read.end() ?
+       empty :
+       to_read_plan_iter->second;
+
+      extent_set remote_read = cache.reserve_extents_for_rmw(
+       hpair.first,
+       op->pin,
+       hpair.second,
+       to_read_plan);
+
+      extent_set pending_read = to_read_plan;
+      pending_read.subtract(remote_read);
+
+      if (!remote_read.empty()) {
+       op->remote_read[hpair.first] = std::move(remote_read);
+      }
+      if (!pending_read.empty()) {
+       op->pending_read[hpair.first] = std::move(pending_read);
+      }
+    }
+  } else {
+    op->remote_read = op->plan.to_read;
+  }
+
+  dout(10) << __func__ << ": " << *op << dendl;
+
+  if (!op->remote_read.empty()) {
+    assert(get_parent()->get_pool().allows_ecoverwrites());
+    objects_read_async_no_cache(
+      op->remote_read,
+      [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
+       for (auto &&i: results) {
+         op->remote_read_result.emplace(i.first, i.second.second);
+       }
+       check_ops();
+      });
+  }
+
+  return true;
+}
+
+bool ECBackend::try_reads_to_commit()
+{
+  if (waiting_reads.empty())
+    return false;
+  Op *op = &(waiting_reads.front());
+  if (op->read_in_progress())
+    return false;
+  waiting_reads.pop_front();
+  waiting_commit.push_back(*op);
+
+  dout(10) << __func__ << ": starting commit on " << *op << dendl;
+  dout(20) << __func__ << ": " << cache << dendl;
+
+  get_parent()->apply_stats(
+    op->hoid,
+    op->delta_stats);
+
+  if (op->using_cache) {
+    for (auto &&hpair: op->pending_read) {
+      op->remote_read_result[hpair.first].insert(
+       cache.get_remaining_extents_for_rmw(
+         hpair.first,
+         op->pin,
+         hpair.second));
+    }
+    op->pending_read.clear();
+  } else {
+    assert(op->pending_read.empty());
+  }
+
+  map<shard_id_t, ObjectStore::Transaction> trans;
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_actingbackfill_shards().begin();
+       i != get_parent()->get_actingbackfill_shards().end();
+       ++i) {
+    trans[i->shard];
+  }
+
+  op->trace.event("start ec write");
+
+  map<hobject_t,extent_map> written;
+  if (op->plan.t) {
+    ECTransaction::generate_transactions(
+      op->plan,
+      ec_impl,
+      get_parent()->get_info().pgid.pgid,
+      (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
+      sinfo,
+      op->remote_read_result,
+      op->log_entries,
+      &written,
+      &trans,
+      &(op->temp_added),
+      &(op->temp_cleared),
+      get_parent()->get_dpp());
+  }
+
+  dout(20) << __func__ << ": " << cache << dendl;
+  dout(20) << __func__ << ": written: " << written << dendl;
+  dout(20) << __func__ << ": op: " << *op << dendl;
+
+  if (!get_parent()->get_pool().allows_ecoverwrites()) {
+    for (auto &&i: op->log_entries) {
+      if (i.requires_kraken()) {
+       derr << __func__ << ": log entry " << i << " requires kraken"
+            << " but overwrites are not enabled!" << dendl;
+       ceph_abort();
+      }
+    }
+  }
+
+  map<hobject_t,extent_set> written_set;
+  for (auto &&i: written) {
+    written_set[i.first] = i.second.get_interval_set();
+  }
+  dout(20) << __func__ << ": written_set: " << written_set << dendl;
+  assert(written_set == op->plan.will_write);
+
+  if (op->using_cache) {
+    for (auto &&hpair: written) {
+      dout(20) << __func__ << ": " << hpair << dendl;
+      cache.present_rmw_update(hpair.first, op->pin, hpair.second);
+    }
+  }
+  op->remote_read.clear();
+  op->remote_read_result.clear();
+
+  dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+  ObjectStore::Transaction empty;
+  bool should_write_local = false;
+  ECSubWrite local_write_op;
+  for (set<pg_shard_t>::const_iterator i =
+        get_parent()->get_actingbackfill_shards().begin();
+       i != get_parent()->get_actingbackfill_shards().end();
+       ++i) {
+    op->pending_apply.insert(*i);
+    op->pending_commit.insert(*i);
+    map<shard_id_t, ObjectStore::Transaction>::iterator iter =
+      trans.find(i->shard);
+    assert(iter != trans.end());
+    bool should_send = get_parent()->should_send_op(*i, op->hoid);
+    const pg_stat_t &stats =
+      should_send ?
+      get_info().stats :
+      parent->get_shard_info().find(*i)->second.stats;
+
+    ECSubWrite sop(
+      get_parent()->whoami_shard(),
+      op->tid,
+      op->reqid,
+      op->hoid,
+      stats,
+      should_send ? iter->second : empty,
+      op->version,
+      op->trim_to,
+      op->roll_forward_to,
+      op->log_entries,
+      op->updated_hit_set_history,
+      op->temp_added,
+      op->temp_cleared,
+      !should_send);
+
+    ZTracer::Trace trace;
+    if (op->trace) {
+      // initialize a child span for this shard
+      trace.init("ec sub write", nullptr, &op->trace);
+      trace.keyval("shard", i->shard.id);
+    }
+
+    if (*i == get_parent()->whoami_shard()) {
+      should_write_local = true;
+      local_write_op.claim(sop);
+    } else {
+      MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
+      r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
+      r->map_epoch = get_parent()->get_epoch();
+      r->min_epoch = get_parent()->get_interval_start_epoch();
+      r->trace = trace;
+      get_parent()->send_message_osd_cluster(
+       i->osd, r, get_parent()->get_epoch());
+    }
+  }
+  if (should_write_local) {
+      handle_sub_write(
+       get_parent()->whoami_shard(),
+       op->client_op,
+       local_write_op,
+       op->trace,
+       op->on_local_applied_sync);
+      op->on_local_applied_sync = 0;
+  }
+
+  for (auto i = op->on_write.begin();
+       i != op->on_write.end();
+       op->on_write.erase(i++)) {
+    (*i)();
+  }
+
+  return true;
+}
+
+bool ECBackend::try_finish_rmw()
+{
+  if (waiting_commit.empty())
+    return false;
+  Op *op = &(waiting_commit.front());
+  if (op->write_in_progress())
+    return false;
+  waiting_commit.pop_front();
+
+  dout(10) << __func__ << ": " << *op << dendl;
+  dout(20) << __func__ << ": " << cache << dendl;
+
+  if (op->roll_forward_to > completed_to)
+    completed_to = op->roll_forward_to;
+  if (op->version > committed_to)
+    committed_to = op->version;
+
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
+    if (op->version > get_parent()->get_log().get_can_rollback_to() &&
+       waiting_reads.empty() &&
+       waiting_commit.empty()) {
+      // submit a dummy transaction to kick the rollforward
+      auto tid = get_parent()->get_tid();
+      Op *nop = &(tid_to_op_map[tid]);
+      nop->hoid = op->hoid;
+      nop->trim_to = op->trim_to;
+      nop->roll_forward_to = op->version;
+      nop->tid = tid;
+      nop->reqid = op->reqid;
+      waiting_reads.push_back(*nop);
+    }
+  }
+
+  if (op->using_cache) {
+    cache.release_write_pin(op->pin);
+  }
+  tid_to_op_map.erase(op->tid);
+
+  if (waiting_reads.empty() &&
+      waiting_commit.empty()) {
+    pipeline_state.clear();
+    dout(20) << __func__ << ": clearing pipeline_state "
+            << pipeline_state
+            << dendl;
+  }
+  return true;
+}
+
+void ECBackend::check_ops()
+{
+  while (try_state_to_reads() ||
+        try_reads_to_commit() ||
+        try_finish_rmw());
+}
+
+int ECBackend::objects_read_sync(
+  const hobject_t &hoid,
+  uint64_t off,
+  uint64_t len,
+  uint32_t op_flags,
+  bufferlist *bl)
+{
+  return -EOPNOTSUPP;
+}
+
+void ECBackend::objects_read_async(
+  const hobject_t &hoid,
+  const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+             pair<bufferlist*, Context*> > > &to_read,
+  Context *on_complete,
+  bool fast_read)
+{
+  map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
+    reads;
+
+  uint32_t flags = 0;
+  extent_set es;
+  for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+        pair<bufferlist*, Context*> > >::const_iterator i =
+        to_read.begin();
+       i != to_read.end();
+       ++i) {
+    pair<uint64_t, uint64_t> tmp =
+      sinfo.offset_len_to_stripe_bounds(
+       make_pair(i->first.get<0>(), i->first.get<1>()));
+
+    extent_set esnew;
+    esnew.insert(tmp.first, tmp.second);
+    es.union_of(esnew);
+    flags |= i->first.get<2>();
+  }
+
+  if (!es.empty()) {
+    auto &offsets = reads[hoid];
+    for (auto j = es.begin();
+        j != es.end();
+        ++j) {
+      offsets.push_back(
+       boost::make_tuple(
+         j.get_start(),
+         j.get_len(),
+         flags));
+    }
+  }
+
+  struct cb {
+    ECBackend *ec;
+    hobject_t hoid;
+    list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+             pair<bufferlist*, Context*> > > to_read;
+    unique_ptr<Context> on_complete;
+    cb(const cb&) = delete;
+    cb(cb &&) = default;
+    cb(ECBackend *ec,
+       const hobject_t &hoid,
+       const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+                  pair<bufferlist*, Context*> > > &to_read,
+       Context *on_complete)
+      : ec(ec),
+       hoid(hoid),
+       to_read(to_read),
+       on_complete(on_complete) {}
+    void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
+      auto dpp = ec->get_parent()->get_dpp();
+      ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
+                        << dendl;
+      ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
+                        << dendl;
+
+      auto &got = results[hoid];
+
+      int r = 0;
+      for (auto &&read: to_read) {
+       if (got.first < 0) {
+         if (read.second.second) {
+           read.second.second->complete(got.first);
+         }
+         if (r == 0)
+           r = got.first;
+       } else {
+         assert(read.second.first);
+         uint64_t offset = read.first.get<0>();
+         uint64_t length = read.first.get<1>();
+         auto range = got.second.get_containing_range(offset, length);
+         assert(range.first != range.second);
+         assert(range.first.get_off() <= offset);
+         assert(
+           (offset + length) <=
+           (range.first.get_off() + range.first.get_len()));
+         read.second.first->substr_of(
+           range.first.get_val(),
+           offset - range.first.get_off(),
+           length);
+         if (read.second.second) {
+           read.second.second->complete(length);
+           read.second.second = nullptr;
+         }
+       }
+      }
+      to_read.clear();
+      if (on_complete) {
+       on_complete.release()->complete(r);
+      }
+    }
+    ~cb() {
+      for (auto &&i: to_read) {
+       delete i.second.second;
+      }
+      to_read.clear();
+    }
+  };
+  objects_read_and_reconstruct(
+    reads,
+    fast_read,
+    make_gen_lambda_context<
+      map<hobject_t,pair<int, extent_map> > &&, cb>(
+       cb(this,
+          hoid,
+          to_read,
+          on_complete)));
+}
+
+struct CallClientContexts :
+  public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+  hobject_t hoid;
+  ECBackend *ec;
+  ECBackend::ClientAsyncReadStatus *status;
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+  CallClientContexts(
+    hobject_t hoid,
+    ECBackend *ec,
+    ECBackend::ClientAsyncReadStatus *status,
+    const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
+    : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
+  void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
+    ECBackend::read_result_t &res = in.second;
+    extent_map result;
+    if (res.r != 0)
+      goto out;
+    assert(res.returned.size() == to_read.size());
+    assert(res.r == 0);
+    assert(res.errors.empty());
+    for (auto &&read: to_read) {
+      pair<uint64_t, uint64_t> adjusted =
+       ec->sinfo.offset_len_to_stripe_bounds(
+         make_pair(read.get<0>(), read.get<1>()));
+      assert(res.returned.front().get<0>() == adjusted.first &&
+            res.returned.front().get<1>() == adjusted.second);
+      map<int, bufferlist> to_decode;
+      bufferlist bl;
+      for (map<pg_shard_t, bufferlist>::iterator j =
+            res.returned.front().get<2>().begin();
+          j != res.returned.front().get<2>().end();
+          ++j) {
+       to_decode[j->first.shard].claim(j->second);
+      }
+      int r = ECUtil::decode(
+       ec->sinfo,
+       ec->ec_impl,
+       to_decode,
+       &bl);
+      if (r < 0) {
+        res.r = r;
+        goto out;
+      }
+      bufferlist trimmed;
+      trimmed.substr_of(
+       bl,
+       read.get<0>() - adjusted.first,
+       MIN(read.get<1>(),
+           bl.length() - (read.get<0>() - adjusted.first)));
+      result.insert(
+       read.get<0>(), trimmed.length(), std::move(trimmed));
+      res.returned.pop_front();
+    }
+out:
+    status->complete_object(hoid, res.r, std::move(result));
+    ec->kick_reads();
+  }
+};
+
+void ECBackend::objects_read_and_reconstruct(
+  const map<hobject_t,
+    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+  > &reads,
+  bool fast_read,
+  GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+  in_progress_client_reads.emplace_back(
+    reads.size(), std::move(func));
+  if (!reads.size()) {
+    kick_reads();
+    return;
+  }
+
+  set<int> want_to_read;
+  get_want_to_read_shards(&want_to_read);
+    
+  map<hobject_t, read_request_t> for_read_op;
+  for (auto &&to_read: reads) {
+    set<pg_shard_t> shards;
+    int r = get_min_avail_to_read_shards(
+      to_read.first,
+      want_to_read,
+      false,
+      fast_read,
+      &shards);
+    assert(r == 0);
+
+    CallClientContexts *c = new CallClientContexts(
+      to_read.first,
+      this,
+      &(in_progress_client_reads.back()),
+      to_read.second);
+    for_read_op.insert(
+      make_pair(
+       to_read.first,
+       read_request_t(
+         to_read.second,
+         shards,
+         false,
+         c)));
+  }
+
+  start_read_op(
+    CEPH_MSG_PRIO_DEFAULT,
+    for_read_op,
+    OpRequestRef(),
+    fast_read, false);
+  return;
+}
+
+
+int ECBackend::send_all_remaining_reads(
+  const hobject_t &hoid,
+  ReadOp &rop)
+{
+  set<int> already_read;
+  const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
+  for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+    already_read.insert(i->shard);
+  dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+  set<pg_shard_t> shards;
+  int r = get_remaining_shards(hoid, already_read, &shards);
+  if (r)
+    return r;
+  if (shards.empty())
+    return -EIO;
+
+  dout(10) << __func__ << " Read remaining shards " << shards << dendl;
+
+  // TODOSAM: this doesn't seem right
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+    rop.to_read.find(hoid)->second.to_read;
+  GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
+    rop.to_read.find(hoid)->second.cb;
+
+  map<hobject_t, read_request_t> for_read_op;
+  for_read_op.insert(
+    make_pair(
+      hoid,
+      read_request_t(
+       offsets,
+       shards,
+       false,
+       c)));
+
+  rop.to_read.swap(for_read_op);
+  do_read_op(rop);
+  return 0;
+}
+
+int ECBackend::objects_get_attrs(
+  const hobject_t &hoid,
+  map<string, bufferlist> *out)
+{
+  int r = store->getattrs(
+    ch,
+    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+    *out);
+  if (r < 0)
+    return r;
+
+  for (map<string, bufferlist>::iterator i = out->begin();
+       i != out->end();
+       ) {
+    if (ECUtil::is_hinfo_key_string(i->first))
+      out->erase(i++);
+    else
+      ++i;
+  }
+  return r;
+}
+
+void ECBackend::rollback_append(
+  const hobject_t &hoid,
+  uint64_t old_size,
+  ObjectStore::Transaction *t)
+{
+  assert(old_size % sinfo.get_stripe_width() == 0);
+  t->truncate(
+    coll,
+    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+    sinfo.aligned_logical_offset_to_chunk_offset(
+      old_size));
+}
+
+void ECBackend::be_deep_scrub(
+  const hobject_t &poid,
+  uint32_t seed,
+  ScrubMap::object &o,
+  ThreadPool::TPHandle &handle) {
+  bufferhash h(-1); // we always used -1
+  int r;
+  uint64_t stride = cct->_conf->osd_deep_scrub_stride;
+  if (stride % sinfo.get_chunk_size())
+    stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
+  uint64_t pos = 0;
+
+  uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+
+  while (true) {
+    bufferlist bl;
+    handle.reset_tp_timeout();
+    r = store->read(
+      ch,
+      ghobject_t(
+       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      pos,
+      stride, bl,
+      fadvise_flags);
+    if (r < 0)
+      break;
+    if (bl.length() % sinfo.get_chunk_size()) {
+      r = -EIO;
+      break;
+    }
+    pos += r;
+    h << bl;
+    if ((unsigned)r < stride)
+      break;
+  }
+
+  if (r == -EIO) {
+    dout(0) << "_scan_list  " << poid << " got "
+           << r << " on read, read_error" << dendl;
+    o.read_error = true;
+    return;
+  }
+
+  ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
+  if (!hinfo) {
+    dout(0) << "_scan_list  " << poid << " could not retrieve hash info" << dendl;
+    o.read_error = true;
+    o.digest_present = false;
+    return;
+  } else {
+    if (!get_parent()->get_pool().allows_ecoverwrites()) {
+      assert(hinfo->has_chunk_hash());
+      if (hinfo->get_total_chunk_size() != pos) {
+       dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
+       o.ec_size_mismatch = true;
+       return;
+      }
+
+      if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
+       dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
+       o.ec_hash_mismatch = true;
+       return;
+      }
+
+      /* We checked above that we match our own stored hash.  We cannot
+       * send a hash of the actual object, so instead we simply send
+       * our locally stored hash of shard 0 on the assumption that if
+       * we match our chunk hash and our recollection of the hash for
+       * chunk 0 matches that of our peers, there is likely no corruption.
+       */
+      o.digest = hinfo->get_chunk_hash(0);
+      o.digest_present = true;
+    } else {
+      /* Hack! We must be using partial overwrites, and partial overwrites
+       * don't support deep-scrub yet
+       */
+      o.digest = 0;
+      o.digest_present = true;
+    }
+  }
+
+  o.omap_digest = seed;
+  o.omap_digest_present = true;
+}