remove ceph code
[stor4nfv.git] / src / ceph / src / osd / ECBackend.cc
diff --git a/src/ceph/src/osd/ECBackend.cc b/src/ceph/src/osd/ECBackend.cc
deleted file mode 100644 (file)
index 40be6a3..0000000
+++ /dev/null
@@ -1,2468 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013 Inktank Storage, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include <iostream>
-#include <sstream>
-
-#include "ECBackend.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDECSubOpWrite.h"
-#include "messages/MOSDECSubOpWriteReply.h"
-#include "messages/MOSDECSubOpRead.h"
-#include "messages/MOSDECSubOpReadReply.h"
-#include "ECMsgTypes.h"
-
-#include "PrimaryLogPG.h"
-
-#define dout_context cct
-#define dout_subsys ceph_subsys_osd
-#define DOUT_PREFIX_ARGS this
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
-  return *_dout << pgb->get_parent()->gen_dbg_prefix();
-}
-
-struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
-  list<ECBackend::RecoveryOp> ops;
-};
-
-ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
-  switch (rhs.pipeline_state) {
-  case ECBackend::pipeline_state_t::CACHE_VALID:
-    return lhs << "CACHE_VALID";
-  case ECBackend::pipeline_state_t::CACHE_INVALID:
-    return lhs << "CACHE_INVALID";
-  default:
-    assert(0 == "invalid pipeline state");
-  }
-  return lhs; // unreachable
-}
-
-static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
-{
-  lhs << "[";
-  for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
-       i != rhs.end();
-       ++i) {
-    if (i != rhs.begin())
-      lhs << ", ";
-    lhs << make_pair(i->first, i->second.length());
-  }
-  return lhs << "]";
-}
-
-static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
-{
-  lhs << "[";
-  for (map<int, bufferlist>::const_iterator i = rhs.begin();
-       i != rhs.end();
-       ++i) {
-    if (i != rhs.begin())
-      lhs << ", ";
-    lhs << make_pair(i->first, i->second.length());
-  }
-  return lhs << "]";
-}
-
-static ostream &operator<<(
-  ostream &lhs,
-  const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
-{
-  return lhs << "(" << rhs.get<0>() << ", "
-            << rhs.get<1>() << ", " << rhs.get<2>() << ")";
-}
-
-ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
-{
-  return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
-            << ", need=" << rhs.need
-            << ", want_attrs=" << rhs.want_attrs
-            << ")";
-}
-
-ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
-{
-  lhs << "read_result_t(r=" << rhs.r
-      << ", errors=" << rhs.errors;
-  if (rhs.attrs) {
-    lhs << ", attrs=" << rhs.attrs.get();
-  } else {
-    lhs << ", noattrs";
-  }
-  return lhs << ", returned=" << rhs.returned << ")";
-}
-
-ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
-{
-  lhs << "ReadOp(tid=" << rhs.tid;
-  if (rhs.op && rhs.op->get_req()) {
-    lhs << ", op=";
-    rhs.op->get_req()->print(lhs);
-  }
-  return lhs << ", to_read=" << rhs.to_read
-            << ", complete=" << rhs.complete
-            << ", priority=" << rhs.priority
-            << ", obj_to_source=" << rhs.obj_to_source
-            << ", source_to_obj=" << rhs.source_to_obj
-            << ", in_progress=" << rhs.in_progress << ")";
-}
-
-void ECBackend::ReadOp::dump(Formatter *f) const
-{
-  f->dump_unsigned("tid", tid);
-  if (op && op->get_req()) {
-    f->dump_stream("op") << *(op->get_req());
-  }
-  f->dump_stream("to_read") << to_read;
-  f->dump_stream("complete") << complete;
-  f->dump_int("priority", priority);
-  f->dump_stream("obj_to_source") << obj_to_source;
-  f->dump_stream("source_to_obj") << source_to_obj;
-  f->dump_stream("in_progress") << in_progress;
-}
-
-ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
-{
-  lhs << "Op(" << rhs.hoid
-      << " v=" << rhs.version
-      << " tt=" << rhs.trim_to
-      << " tid=" << rhs.tid
-      << " reqid=" << rhs.reqid;
-  if (rhs.client_op && rhs.client_op->get_req()) {
-    lhs << " client_op=";
-    rhs.client_op->get_req()->print(lhs);
-  }
-  lhs << " roll_forward_to=" << rhs.roll_forward_to
-      << " temp_added=" << rhs.temp_added
-      << " temp_cleared=" << rhs.temp_cleared
-      << " pending_read=" << rhs.pending_read
-      << " remote_read=" << rhs.remote_read
-      << " remote_read_result=" << rhs.remote_read_result
-      << " pending_apply=" << rhs.pending_apply
-      << " pending_commit=" << rhs.pending_commit
-      << " plan.to_read=" << rhs.plan.to_read
-      << " plan.will_write=" << rhs.plan.will_write
-      << ")";
-  return lhs;
-}
-
-ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
-{
-  return lhs << "RecoveryOp("
-            << "hoid=" << rhs.hoid
-            << " v=" << rhs.v
-            << " missing_on=" << rhs.missing_on
-            << " missing_on_shards=" << rhs.missing_on_shards
-            << " recovery_info=" << rhs.recovery_info
-            << " recovery_progress=" << rhs.recovery_progress
-            << " obc refcount=" << rhs.obc.use_count()
-            << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
-            << " waiting_on_pushes=" << rhs.waiting_on_pushes
-            << " extent_requested=" << rhs.extent_requested
-            << ")";
-}
-
-void ECBackend::RecoveryOp::dump(Formatter *f) const
-{
-  f->dump_stream("hoid") << hoid;
-  f->dump_stream("v") << v;
-  f->dump_stream("missing_on") << missing_on;
-  f->dump_stream("missing_on_shards") << missing_on_shards;
-  f->dump_stream("recovery_info") << recovery_info;
-  f->dump_stream("recovery_progress") << recovery_progress;
-  f->dump_stream("state") << tostr(state);
-  f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
-  f->dump_stream("extent_requested") << extent_requested;
-}
-
-ECBackend::ECBackend(
-  PGBackend::Listener *pg,
-  coll_t coll,
-  ObjectStore::CollectionHandle &ch,
-  ObjectStore *store,
-  CephContext *cct,
-  ErasureCodeInterfaceRef ec_impl,
-  uint64_t stripe_width)
-  : PGBackend(cct, pg, store, coll, ch),
-    ec_impl(ec_impl),
-    sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
-  assert((ec_impl->get_data_chunk_count() *
-         ec_impl->get_chunk_size(stripe_width)) == stripe_width);
-}
-
-PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
-{
-  return new ECRecoveryHandle;
-}
-
-void ECBackend::_failed_push(const hobject_t &hoid,
-  pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
-{
-  ECBackend::read_result_t &res = in.second;
-  dout(10) << __func__ << ": Read error " << hoid << " r="
-          << res.r << " errors=" << res.errors << dendl;
-  dout(10) << __func__ << ": canceling recovery op for obj " << hoid
-          << dendl;
-  assert(recovery_ops.count(hoid));
-  recovery_ops.erase(hoid);
-
-  list<pg_shard_t> fl;
-  for (auto&& i : res.errors) {
-    fl.push_back(i.first);
-  }
-  get_parent()->failed_push(fl, hoid);
-}
-
-struct OnRecoveryReadComplete :
-  public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
-  ECBackend *pg;
-  hobject_t hoid;
-  set<int> want;
-  OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
-    : pg(pg), hoid(hoid) {}
-  void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
-    ECBackend::read_result_t &res = in.second;
-    if (!(res.r == 0 && res.errors.empty())) {
-        pg->_failed_push(hoid, in);
-        return;
-    }
-    assert(res.returned.size() == 1);
-    pg->handle_recovery_read_complete(
-      hoid,
-      res.returned.back(),
-      res.attrs,
-      in.first);
-  }
-};
-
-struct RecoveryMessages {
-  map<hobject_t,
-      ECBackend::read_request_t> reads;
-  void read(
-    ECBackend *ec,
-    const hobject_t &hoid, uint64_t off, uint64_t len,
-    const set<pg_shard_t> &need,
-    bool attrs) {
-    list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
-    to_read.push_back(boost::make_tuple(off, len, 0));
-    assert(!reads.count(hoid));
-    reads.insert(
-      make_pair(
-       hoid,
-       ECBackend::read_request_t(
-         to_read,
-         need,
-         attrs,
-         new OnRecoveryReadComplete(
-           ec,
-           hoid))));
-  }
-
-  map<pg_shard_t, vector<PushOp> > pushes;
-  map<pg_shard_t, vector<PushReplyOp> > push_replies;
-  ObjectStore::Transaction t;
-  RecoveryMessages() {}
-  ~RecoveryMessages(){}
-};
-
-void ECBackend::handle_recovery_push(
-  const PushOp &op,
-  RecoveryMessages *m)
-{
-  ostringstream ss;
-  if (get_parent()->check_failsafe_full(ss)) {
-    dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
-    ceph_abort();
-  }
-
-  bool oneshot = op.before_progress.first && op.after_progress.data_complete;
-  ghobject_t tobj;
-  if (oneshot) {
-    tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
-                     get_parent()->whoami_shard().shard);
-  } else {
-    tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
-                                                            op.version),
-                     ghobject_t::NO_GEN,
-                     get_parent()->whoami_shard().shard);
-    if (op.before_progress.first) {
-      dout(10) << __func__ << ": Adding oid "
-              << tobj.hobj << " in the temp collection" << dendl;
-      add_temp_obj(tobj.hobj);
-    }
-  }
-
-  if (op.before_progress.first) {
-    m->t.remove(coll, tobj);
-    m->t.touch(coll, tobj);
-  }
-
-  if (!op.data_included.empty()) {
-    uint64_t start = op.data_included.range_start();
-    uint64_t end = op.data_included.range_end();
-    assert(op.data.length() == (end - start));
-
-    m->t.write(
-      coll,
-      tobj,
-      start,
-      op.data.length(),
-      op.data);
-  } else {
-    assert(op.data.length() == 0);
-  }
-
-  if (op.before_progress.first) {
-    assert(op.attrset.count(string("_")));
-    m->t.setattrs(
-      coll,
-      tobj,
-      op.attrset);
-  }
-
-  if (op.after_progress.data_complete && !oneshot) {
-    dout(10) << __func__ << ": Removing oid "
-            << tobj.hobj << " from the temp collection" << dendl;
-    clear_temp_obj(tobj.hobj);
-    m->t.remove(coll, ghobject_t(
-       op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
-    m->t.collection_move_rename(
-      coll, tobj,
-      coll, ghobject_t(
-       op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
-  }
-  if (op.after_progress.data_complete) {
-    if ((get_parent()->pgb_is_primary())) {
-      assert(recovery_ops.count(op.soid));
-      assert(recovery_ops[op.soid].obc);
-      get_parent()->on_local_recover(
-       op.soid,
-       op.recovery_info,
-       recovery_ops[op.soid].obc,
-       false,
-       &m->t);
-    } else {
-      get_parent()->on_local_recover(
-       op.soid,
-       op.recovery_info,
-       ObjectContextRef(),
-       false,
-       &m->t);
-    }
-  }
-  m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
-  m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
-}
-
-void ECBackend::handle_recovery_push_reply(
-  const PushReplyOp &op,
-  pg_shard_t from,
-  RecoveryMessages *m)
-{
-  if (!recovery_ops.count(op.soid))
-    return;
-  RecoveryOp &rop = recovery_ops[op.soid];
-  assert(rop.waiting_on_pushes.count(from));
-  rop.waiting_on_pushes.erase(from);
-  continue_recovery_op(rop, m);
-}
-
-void ECBackend::handle_recovery_read_complete(
-  const hobject_t &hoid,
-  boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
-  boost::optional<map<string, bufferlist> > attrs,
-  RecoveryMessages *m)
-{
-  dout(10) << __func__ << ": returned " << hoid << " "
-          << "(" << to_read.get<0>()
-          << ", " << to_read.get<1>()
-          << ", " << to_read.get<2>()
-          << ")"
-          << dendl;
-  assert(recovery_ops.count(hoid));
-  RecoveryOp &op = recovery_ops[hoid];
-  assert(op.returned_data.empty());
-  map<int, bufferlist*> target;
-  for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
-       i != op.missing_on_shards.end();
-       ++i) {
-    target[*i] = &(op.returned_data[*i]);
-  }
-  map<int, bufferlist> from;
-  for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
-      i != to_read.get<2>().end();
-      ++i) {
-    from[i->first.shard].claim(i->second);
-  }
-  dout(10) << __func__ << ": " << from << dendl;
-  int r = ECUtil::decode(sinfo, ec_impl, from, target);
-  assert(r == 0);
-  if (attrs) {
-    op.xattrs.swap(*attrs);
-
-    if (!op.obc) {
-      // attrs only reference the origin bufferlist (decode from
-      // ECSubReadReply message) whose size is much greater than attrs
-      // in recovery. If obc cache it (get_obc maybe cache the attr),
-      // this causes the whole origin bufferlist would not be free
-      // until obc is evicted from obc cache. So rebuild the
-      // bufferlist before cache it.
-      for (map<string, bufferlist>::iterator it = op.xattrs.begin();
-           it != op.xattrs.end();
-           ++it) {
-        it->second.rebuild();
-      }
-      // Need to remove ECUtil::get_hinfo_key() since it should not leak out
-      // of the backend (see bug #12983)
-      map<string, bufferlist> sanitized_attrs(op.xattrs);
-      sanitized_attrs.erase(ECUtil::get_hinfo_key());
-      op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
-      assert(op.obc);
-      op.recovery_info.size = op.obc->obs.oi.size;
-      op.recovery_info.oi = op.obc->obs.oi;
-    }
-
-    ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
-    if (op.obc->obs.oi.size > 0) {
-      assert(op.xattrs.count(ECUtil::get_hinfo_key()));
-      bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
-      ::decode(hinfo, bp);
-    }
-    op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
-  }
-  assert(op.xattrs.size());
-  assert(op.obc);
-  continue_recovery_op(op, m);
-}
-
-struct SendPushReplies : public Context {
-  PGBackend::Listener *l;
-  epoch_t epoch;
-  map<int, MOSDPGPushReply*> replies;
-  SendPushReplies(
-    PGBackend::Listener *l,
-    epoch_t epoch,
-    map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
-    replies.swap(in);
-  }
-  void finish(int) override {
-    for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
-        i != replies.end();
-        ++i) {
-      l->send_message_osd_cluster(i->first, i->second, epoch);
-    }
-    replies.clear();
-  }
-  ~SendPushReplies() override {
-    for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
-        i != replies.end();
-        ++i) {
-      i->second->put();
-    }
-    replies.clear();
-  }
-};
-
-void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
-{
-  for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
-       i != m.pushes.end();
-       m.pushes.erase(i++)) {
-    MOSDPGPush *msg = new MOSDPGPush();
-    msg->set_priority(priority);
-    msg->map_epoch = get_parent()->get_epoch();
-    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
-    msg->from = get_parent()->whoami_shard();
-    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
-    msg->pushes.swap(i->second);
-    msg->compute_cost(cct);
-    get_parent()->send_message(
-      i->first.osd,
-      msg);
-  }
-  map<int, MOSDPGPushReply*> replies;
-  for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
-        m.push_replies.begin();
-       i != m.push_replies.end();
-       m.push_replies.erase(i++)) {
-    MOSDPGPushReply *msg = new MOSDPGPushReply();
-    msg->set_priority(priority);
-    msg->map_epoch = get_parent()->get_epoch();
-    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
-    msg->from = get_parent()->whoami_shard();
-    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
-    msg->replies.swap(i->second);
-    msg->compute_cost(cct);
-    replies.insert(make_pair(i->first.osd, msg));
-  }
-
-  if (!replies.empty()) {
-    (m.t).register_on_complete(
-       get_parent()->bless_context(
-         new SendPushReplies(
-           get_parent(),
-           get_parent()->get_epoch(),
-           replies)));
-    get_parent()->queue_transaction(std::move(m.t));
-  } 
-
-  if (m.reads.empty())
-    return;
-  start_read_op(
-    priority,
-    m.reads,
-    OpRequestRef(),
-    false, true);
-}
-
-void ECBackend::continue_recovery_op(
-  RecoveryOp &op,
-  RecoveryMessages *m)
-{
-  dout(10) << __func__ << ": continuing " << op << dendl;
-  while (1) {
-    switch (op.state) {
-    case RecoveryOp::IDLE: {
-      // start read
-      op.state = RecoveryOp::READING;
-      assert(!op.recovery_progress.data_complete);
-      set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
-      uint64_t from = op.recovery_progress.data_recovered_to;
-      uint64_t amount = get_recovery_chunk_size();
-
-      if (op.recovery_progress.first && op.obc) {
-       /* We've got the attrs and the hinfo, might as well use them */
-       op.hinfo = get_hash_info(op.hoid);
-       assert(op.hinfo);
-       op.xattrs = op.obc->attr_cache;
-       ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
-      }
-
-      set<pg_shard_t> to_read;
-      int r = get_min_avail_to_read_shards(
-       op.hoid, want, true, false, &to_read);
-      if (r != 0) {
-       // we must have lost a recovery source
-       assert(!op.recovery_progress.first);
-       dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
-                << dendl;
-       get_parent()->cancel_pull(op.hoid);
-       recovery_ops.erase(op.hoid);
-       return;
-      }
-      m->read(
-       this,
-       op.hoid,
-       op.recovery_progress.data_recovered_to,
-       amount,
-       to_read,
-       op.recovery_progress.first && !op.obc);
-      op.extent_requested = make_pair(
-       from,
-       amount);
-      dout(10) << __func__ << ": IDLE return " << op << dendl;
-      return;
-    }
-    case RecoveryOp::READING: {
-      // read completed, start write
-      assert(op.xattrs.size());
-      assert(op.returned_data.size());
-      op.state = RecoveryOp::WRITING;
-      ObjectRecoveryProgress after_progress = op.recovery_progress;
-      after_progress.data_recovered_to += op.extent_requested.second;
-      after_progress.first = false;
-      if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
-       after_progress.data_recovered_to =
-         sinfo.logical_to_next_stripe_offset(
-           op.obc->obs.oi.size);
-       after_progress.data_complete = true;
-      }
-      for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
-          mi != op.missing_on.end();
-          ++mi) {
-       assert(op.returned_data.count(mi->shard));
-       m->pushes[*mi].push_back(PushOp());
-       PushOp &pop = m->pushes[*mi].back();
-       pop.soid = op.hoid;
-       pop.version = op.v;
-       pop.data = op.returned_data[mi->shard];
-       dout(10) << __func__ << ": before_progress=" << op.recovery_progress
-                << ", after_progress=" << after_progress
-                << ", pop.data.length()=" << pop.data.length()
-                << ", size=" << op.obc->obs.oi.size << dendl;
-       assert(
-         pop.data.length() ==
-         sinfo.aligned_logical_offset_to_chunk_offset(
-           after_progress.data_recovered_to -
-           op.recovery_progress.data_recovered_to)
-         );
-       if (pop.data.length())
-         pop.data_included.insert(
-           sinfo.aligned_logical_offset_to_chunk_offset(
-             op.recovery_progress.data_recovered_to),
-           pop.data.length()
-           );
-       if (op.recovery_progress.first) {
-         pop.attrset = op.xattrs;
-       }
-       pop.recovery_info = op.recovery_info;
-       pop.before_progress = op.recovery_progress;
-       pop.after_progress = after_progress;
-       if (*mi != get_parent()->primary_shard())
-         get_parent()->begin_peer_recover(
-           *mi,
-           op.hoid);
-      }
-      op.returned_data.clear();
-      op.waiting_on_pushes = op.missing_on;
-      op.recovery_progress = after_progress;
-      dout(10) << __func__ << ": READING return " << op << dendl;
-      return;
-    }
-    case RecoveryOp::WRITING: {
-      if (op.waiting_on_pushes.empty()) {
-       if (op.recovery_progress.data_complete) {
-         op.state = RecoveryOp::COMPLETE;
-         for (set<pg_shard_t>::iterator i = op.missing_on.begin();
-              i != op.missing_on.end();
-              ++i) {
-           if (*i != get_parent()->primary_shard()) {
-             dout(10) << __func__ << ": on_peer_recover on " << *i
-                      << ", obj " << op.hoid << dendl;
-             get_parent()->on_peer_recover(
-               *i,
-               op.hoid,
-               op.recovery_info);
-           }
-         }
-         object_stat_sum_t stat;
-         stat.num_bytes_recovered = op.recovery_info.size;
-         stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
-         stat.num_objects_recovered = 1;
-         get_parent()->on_global_recover(op.hoid, stat, false);
-         dout(10) << __func__ << ": WRITING return " << op << dendl;
-         recovery_ops.erase(op.hoid);
-         return;
-       } else {
-         op.state = RecoveryOp::IDLE;
-         dout(10) << __func__ << ": WRITING continue " << op << dendl;
-         continue;
-       }
-      }
-      return;
-    }
-    // should never be called once complete
-    case RecoveryOp::COMPLETE:
-    default: {
-      ceph_abort();
-    };
-    }
-  }
-}
-
-void ECBackend::run_recovery_op(
-  RecoveryHandle *_h,
-  int priority)
-{
-  ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
-  RecoveryMessages m;
-  for (list<RecoveryOp>::iterator i = h->ops.begin();
-       i != h->ops.end();
-       ++i) {
-    dout(10) << __func__ << ": starting " << *i << dendl;
-    assert(!recovery_ops.count(i->hoid));
-    RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
-    continue_recovery_op(op, &m);
-  }
-
-  dispatch_recovery_messages(m, priority);
-  send_recovery_deletes(priority, h->deletes);
-  delete _h;
-}
-
-int ECBackend::recover_object(
-  const hobject_t &hoid,
-  eversion_t v,
-  ObjectContextRef head,
-  ObjectContextRef obc,
-  RecoveryHandle *_h)
-{
-  ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
-  h->ops.push_back(RecoveryOp());
-  h->ops.back().v = v;
-  h->ops.back().hoid = hoid;
-  h->ops.back().obc = obc;
-  h->ops.back().recovery_info.soid = hoid;
-  h->ops.back().recovery_info.version = v;
-  if (obc) {
-    h->ops.back().recovery_info.size = obc->obs.oi.size;
-    h->ops.back().recovery_info.oi = obc->obs.oi;
-  }
-  if (hoid.is_snap()) {
-    if (obc) {
-      assert(obc->ssc);
-      h->ops.back().recovery_info.ss = obc->ssc->snapset;
-    } else if (head) {
-      assert(head->ssc);
-      h->ops.back().recovery_info.ss = head->ssc->snapset;
-    } else {
-      assert(0 == "neither obc nor head set for a snap object");
-    }
-  }
-  h->ops.back().recovery_progress.omap_complete = true;
-  for (set<pg_shard_t>::const_iterator i =
-        get_parent()->get_actingbackfill_shards().begin();
-       i != get_parent()->get_actingbackfill_shards().end();
-       ++i) {
-    dout(10) << "checking " << *i << dendl;
-    if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
-      h->ops.back().missing_on.insert(*i);
-      h->ops.back().missing_on_shards.insert(i->shard);
-    }
-  }
-  dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
-  return 0;
-}
-
-bool ECBackend::can_handle_while_inactive(
-  OpRequestRef _op)
-{
-  return false;
-}
-
-bool ECBackend::_handle_message(
-  OpRequestRef _op)
-{
-  dout(10) << __func__ << ": " << *_op->get_req() << dendl;
-  int priority = _op->get_req()->get_priority();
-  switch (_op->get_req()->get_type()) {
-  case MSG_OSD_EC_WRITE: {
-    // NOTE: this is non-const because handle_sub_write modifies the embedded
-    // ObjectStore::Transaction in place (and then std::move's it).  It does
-    // not conflict with ECSubWrite's operator<<.
-    MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
-      _op->get_nonconst_req());
-    handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
-    return true;
-  }
-  case MSG_OSD_EC_WRITE_REPLY: {
-    const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
-      _op->get_req());
-    handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
-    return true;
-  }
-  case MSG_OSD_EC_READ: {
-    const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
-    MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
-    reply->pgid = get_parent()->primary_spg_t();
-    reply->map_epoch = get_parent()->get_epoch();
-    reply->min_epoch = get_parent()->get_interval_start_epoch();
-    handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
-    reply->trace = _op->pg_trace;
-    get_parent()->send_message_osd_cluster(
-      op->op.from.osd, reply, get_parent()->get_epoch());
-    return true;
-  }
-  case MSG_OSD_EC_READ_REPLY: {
-    // NOTE: this is non-const because handle_sub_read_reply steals resulting
-    // buffers.  It does not conflict with ECSubReadReply operator<<.
-    MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
-      _op->get_nonconst_req());
-    RecoveryMessages rm;
-    handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
-    dispatch_recovery_messages(rm, priority);
-    return true;
-  }
-  case MSG_OSD_PG_PUSH: {
-    const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
-    RecoveryMessages rm;
-    for (vector<PushOp>::const_iterator i = op->pushes.begin();
-        i != op->pushes.end();
-        ++i) {
-      handle_recovery_push(*i, &rm);
-    }
-    dispatch_recovery_messages(rm, priority);
-    return true;
-  }
-  case MSG_OSD_PG_PUSH_REPLY: {
-    const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
-      _op->get_req());
-    RecoveryMessages rm;
-    for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
-        i != op->replies.end();
-        ++i) {
-      handle_recovery_push_reply(*i, op->from, &rm);
-    }
-    dispatch_recovery_messages(rm, priority);
-    return true;
-  }
-  default:
-    return false;
-  }
-  return false;
-}
-
-struct SubWriteCommitted : public Context {
-  ECBackend *pg;
-  OpRequestRef msg;
-  ceph_tid_t tid;
-  eversion_t version;
-  eversion_t last_complete;
-  const ZTracer::Trace trace;
-  SubWriteCommitted(
-    ECBackend *pg,
-    OpRequestRef msg,
-    ceph_tid_t tid,
-    eversion_t version,
-    eversion_t last_complete,
-    const ZTracer::Trace &trace)
-    : pg(pg), msg(msg), tid(tid),
-      version(version), last_complete(last_complete), trace(trace) {}
-  void finish(int) override {
-    if (msg)
-      msg->mark_event("sub_op_committed");
-    pg->sub_write_committed(tid, version, last_complete, trace);
-  }
-};
-void ECBackend::sub_write_committed(
-  ceph_tid_t tid, eversion_t version, eversion_t last_complete,
-  const ZTracer::Trace &trace) {
-  if (get_parent()->pgb_is_primary()) {
-    ECSubWriteReply reply;
-    reply.tid = tid;
-    reply.last_complete = last_complete;
-    reply.committed = true;
-    reply.from = get_parent()->whoami_shard();
-    handle_sub_write_reply(
-      get_parent()->whoami_shard(),
-      reply, trace);
-  } else {
-    get_parent()->update_last_complete_ondisk(last_complete);
-    MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
-    r->pgid = get_parent()->primary_spg_t();
-    r->map_epoch = get_parent()->get_epoch();
-    r->min_epoch = get_parent()->get_interval_start_epoch();
-    r->op.tid = tid;
-    r->op.last_complete = last_complete;
-    r->op.committed = true;
-    r->op.from = get_parent()->whoami_shard();
-    r->set_priority(CEPH_MSG_PRIO_HIGH);
-    r->trace = trace;
-    r->trace.event("sending sub op commit");
-    get_parent()->send_message_osd_cluster(
-      get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
-  }
-}
-
-struct SubWriteApplied : public Context {
-  ECBackend *pg;
-  OpRequestRef msg;
-  ceph_tid_t tid;
-  eversion_t version;
-  const ZTracer::Trace trace;
-  SubWriteApplied(
-    ECBackend *pg,
-    OpRequestRef msg,
-    ceph_tid_t tid,
-    eversion_t version,
-    const ZTracer::Trace &trace)
-    : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
-  void finish(int) override {
-    if (msg)
-      msg->mark_event("sub_op_applied");
-    pg->sub_write_applied(tid, version, trace);
-  }
-};
-void ECBackend::sub_write_applied(
-  ceph_tid_t tid, eversion_t version,
-  const ZTracer::Trace &trace) {
-  parent->op_applied(version);
-  if (get_parent()->pgb_is_primary()) {
-    ECSubWriteReply reply;
-    reply.from = get_parent()->whoami_shard();
-    reply.tid = tid;
-    reply.applied = true;
-    handle_sub_write_reply(
-      get_parent()->whoami_shard(),
-      reply, trace);
-  } else {
-    MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
-    r->pgid = get_parent()->primary_spg_t();
-    r->map_epoch = get_parent()->get_epoch();
-    r->min_epoch = get_parent()->get_interval_start_epoch();
-    r->op.from = get_parent()->whoami_shard();
-    r->op.tid = tid;
-    r->op.applied = true;
-    r->set_priority(CEPH_MSG_PRIO_HIGH);
-    r->trace = trace;
-    r->trace.event("sending sub op apply");
-    get_parent()->send_message_osd_cluster(
-      get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
-  }
-}
-
-void ECBackend::handle_sub_write(
-  pg_shard_t from,
-  OpRequestRef msg,
-  ECSubWrite &op,
-  const ZTracer::Trace &trace,
-  Context *on_local_applied_sync)
-{
-  if (msg)
-    msg->mark_started();
-  trace.event("handle_sub_write");
-  assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
-  if (!get_parent()->pgb_is_primary())
-    get_parent()->update_stats(op.stats);
-  ObjectStore::Transaction localt;
-  if (!op.temp_added.empty()) {
-    add_temp_objs(op.temp_added);
-  }
-  if (op.backfill) {
-    for (set<hobject_t>::iterator i = op.temp_removed.begin();
-        i != op.temp_removed.end();
-        ++i) {
-      dout(10) << __func__ << ": removing object " << *i
-              << " since we won't get the transaction" << dendl;
-      localt.remove(
-       coll,
-       ghobject_t(
-         *i,
-         ghobject_t::NO_GEN,
-         get_parent()->whoami_shard().shard));
-    }
-  }
-  clear_temp_objs(op.temp_removed);
-  get_parent()->log_operation(
-    op.log_entries,
-    op.updated_hit_set_history,
-    op.trim_to,
-    op.roll_forward_to,
-    !op.backfill,
-    localt);
-
-  PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
-  if (_rPG && !_rPG->is_undersized() &&
-      (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
-    op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
-
-  if (on_local_applied_sync) {
-    dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
-    localt.register_on_applied_sync(on_local_applied_sync);
-  }
-  localt.register_on_commit(
-    get_parent()->bless_context(
-      new SubWriteCommitted(
-       this, msg, op.tid,
-       op.at_version,
-       get_parent()->get_info().last_complete, trace)));
-  localt.register_on_applied(
-    get_parent()->bless_context(
-      new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
-  vector<ObjectStore::Transaction> tls;
-  tls.reserve(2);
-  tls.push_back(std::move(op.t));
-  tls.push_back(std::move(localt));
-  get_parent()->queue_transactions(tls, msg);
-}
-
-void ECBackend::handle_sub_read(
-  pg_shard_t from,
-  const ECSubRead &op,
-  ECSubReadReply *reply,
-  const ZTracer::Trace &trace)
-{
-  trace.event("handle sub read");
-  shard_id_t shard = get_parent()->whoami_shard().shard;
-  for(auto i = op.to_read.begin();
-      i != op.to_read.end();
-      ++i) {
-    int r = 0;
-    ECUtil::HashInfoRef hinfo;
-    if (!get_parent()->get_pool().allows_ecoverwrites()) {
-      hinfo = get_hash_info(i->first);
-      if (!hinfo) {
-       r = -EIO;
-       get_parent()->clog_error() << "Corruption detected: object " << i->first
-                                   << " is missing hash_info";
-       dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
-       goto error;
-      }
-    }
-    for (auto j = i->second.begin(); j != i->second.end(); ++j) {
-      bufferlist bl;
-      r = store->read(
-       ch,
-       ghobject_t(i->first, ghobject_t::NO_GEN, shard),
-       j->get<0>(),
-       j->get<1>(),
-       bl, j->get<2>());
-      if (r < 0) {
-       get_parent()->clog_error() << "Error " << r
-                                  << " reading object "
-                                  << i->first;
-       dout(5) << __func__ << ": Error " << r
-               << " reading " << i->first << dendl;
-       goto error;
-      } else {
-        dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
-       reply->buffers_read[i->first].push_back(
-         make_pair(
-           j->get<0>(),
-           bl)
-         );
-      }
-
-      if (!get_parent()->get_pool().allows_ecoverwrites()) {
-       // This shows that we still need deep scrub because large enough files
-       // are read in sections, so the digest check here won't be done here.
-       // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
-       // the state of our chunk in case other chunks could substitute.
-       assert(hinfo->has_chunk_hash());
-       if ((bl.length() == hinfo->get_total_chunk_size()) &&
-           (j->get<0>() == 0)) {
-         dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
-         bufferhash h(-1);
-         h << bl;
-         if (h.digest() != hinfo->get_chunk_hash(shard)) {
-           get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
-                                      << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
-           dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
-                   << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
-           r = -EIO;
-           goto error;
-         }
-       }
-      }
-    }
-    continue;
-error:
-    // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
-    // the state of our chunk in case other chunks could substitute.
-    reply->buffers_read.erase(i->first);
-    reply->errors[i->first] = r;
-  }
-  for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
-       i != op.attrs_to_read.end();
-       ++i) {
-    dout(10) << __func__ << ": fulfilling attr request on "
-            << *i << dendl;
-    if (reply->errors.count(*i))
-      continue;
-    int r = store->getattrs(
-      ch,
-      ghobject_t(
-       *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-      reply->attrs_read[*i]);
-    if (r < 0) {
-      reply->buffers_read.erase(*i);
-      reply->errors[*i] = r;
-    }
-  }
-  reply->from = get_parent()->whoami_shard();
-  reply->tid = op.tid;
-}
-
-void ECBackend::handle_sub_write_reply(
-  pg_shard_t from,
-  const ECSubWriteReply &op,
-  const ZTracer::Trace &trace)
-{
-  map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
-  assert(i != tid_to_op_map.end());
-  if (op.committed) {
-    trace.event("sub write committed");
-    assert(i->second.pending_commit.count(from));
-    i->second.pending_commit.erase(from);
-    if (from != get_parent()->whoami_shard()) {
-      get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
-    }
-  }
-  if (op.applied) {
-    trace.event("sub write applied");
-    assert(i->second.pending_apply.count(from));
-    i->second.pending_apply.erase(from);
-  }
-
-  if (i->second.pending_apply.empty() && i->second.on_all_applied) {
-    dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
-    i->second.on_all_applied->complete(0);
-    i->second.on_all_applied = 0;
-    i->second.trace.event("ec write all applied");
-  }
-  if (i->second.pending_commit.empty() && i->second.on_all_commit) {
-    dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
-    i->second.on_all_commit->complete(0);
-    i->second.on_all_commit = 0;
-    i->second.trace.event("ec write all committed");
-  }
-  check_ops();
-}
-
-void ECBackend::handle_sub_read_reply(
-  pg_shard_t from,
-  ECSubReadReply &op,
-  RecoveryMessages *m,
-  const ZTracer::Trace &trace)
-{
-  trace.event("ec sub read reply");
-  dout(10) << __func__ << ": reply " << op << dendl;
-  map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
-  if (iter == tid_to_read_map.end()) {
-    //canceled
-    dout(20) << __func__ << ": dropped " << op << dendl;
-    return;
-  }
-  ReadOp &rop = iter->second;
-  for (auto i = op.buffers_read.begin();
-       i != op.buffers_read.end();
-       ++i) {
-    assert(!op.errors.count(i->first));        // If attribute error we better not have sent a buffer
-    if (!rop.to_read.count(i->first)) {
-      // We canceled this read! @see filter_read_op
-      dout(20) << __func__ << " to_read skipping" << dendl;
-      continue;
-    }
-    list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
-      rop.to_read.find(i->first)->second.to_read.begin();
-    list<
-      boost::tuple<
-       uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
-      rop.complete[i->first].returned.begin();
-    for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
-        j != i->second.end();
-        ++j, ++req_iter, ++riter) {
-      assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
-      assert(riter != rop.complete[i->first].returned.end());
-      pair<uint64_t, uint64_t> adjusted =
-       sinfo.aligned_offset_len_to_chunk(
-         make_pair(req_iter->get<0>(), req_iter->get<1>()));
-      assert(adjusted.first == j->first);
-      riter->get<2>()[from].claim(j->second);
-    }
-  }
-  for (auto i = op.attrs_read.begin();
-       i != op.attrs_read.end();
-       ++i) {
-    assert(!op.errors.count(i->first));        // if read error better not have sent an attribute
-    if (!rop.to_read.count(i->first)) {
-      // We canceled this read! @see filter_read_op
-      dout(20) << __func__ << " to_read skipping" << dendl;
-      continue;
-    }
-    rop.complete[i->first].attrs = map<string, bufferlist>();
-    (*(rop.complete[i->first].attrs)).swap(i->second);
-  }
-  for (auto i = op.errors.begin();
-       i != op.errors.end();
-       ++i) {
-    rop.complete[i->first].errors.insert(
-      make_pair(
-       from,
-       i->second));
-    dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
-  }
-
-  map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
-                                       shard_to_read_map.find(from);
-  assert(siter != shard_to_read_map.end());
-  assert(siter->second.count(op.tid));
-  siter->second.erase(op.tid);
-
-  assert(rop.in_progress.count(from));
-  rop.in_progress.erase(from);
-  unsigned is_complete = 0;
-  // For redundant reads check for completion as each shard comes in,
-  // or in a non-recovery read check for completion once all the shards read.
-  // TODO: It would be nice if recovery could send more reads too
-  if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
-    for (map<hobject_t, read_result_t>::const_iterator iter =
-        rop.complete.begin();
-      iter != rop.complete.end();
-      ++iter) {
-      set<int> have;
-      for (map<pg_shard_t, bufferlist>::const_iterator j =
-          iter->second.returned.front().get<2>().begin();
-        j != iter->second.returned.front().get<2>().end();
-        ++j) {
-        have.insert(j->first.shard);
-        dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
-      }
-      set<int> want_to_read, dummy_minimum;
-      get_want_to_read_shards(&want_to_read);
-      int err;
-      if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
-       dout(20) << __func__ << " minimum_to_decode failed" << dendl;
-        if (rop.in_progress.empty()) {
-         // If we don't have enough copies and we haven't sent reads for all shards
-         // we can send the rest of the reads, if any.
-         if (!rop.do_redundant_reads) {
-           int r = send_all_remaining_reads(iter->first, rop);
-           if (r == 0) {
-             // We added to in_progress and not incrementing is_complete
-             continue;
-           }
-           // Couldn't read any additional shards so handle as completed with errors
-         }
-         // We don't want to confuse clients / RBD with objectstore error
-         // values in particular ENOENT.  We may have different error returns
-         // from different shards, so we'll return minimum_to_decode() error
-         // (usually EIO) to reader.  It is likely an error here is due to a
-         // damaged pg.
-         rop.complete[iter->first].r = err;
-         ++is_complete;
-       }
-      } else {
-        assert(rop.complete[iter->first].r == 0);
-       if (!rop.complete[iter->first].errors.empty()) {
-         if (cct->_conf->osd_read_ec_check_for_errors) {
-           dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
-           err = rop.complete[iter->first].errors.begin()->second;
-            rop.complete[iter->first].r = err;
-         } else {
-           get_parent()->clog_warn() << "Error(s) ignored for "
-                                      << iter->first << " enough copies available";
-           dout(10) << __func__ << " Error(s) ignored for " << iter->first
-                    << " enough copies available" << dendl;
-           rop.complete[iter->first].errors.clear();
-         }
-       }
-       ++is_complete;
-      }
-    }
-  }
-  if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
-    dout(20) << __func__ << " Complete: " << rop << dendl;
-    rop.trace.event("ec read complete");
-    complete_read_op(rop, m);
-  } else {
-    dout(10) << __func__ << " readop not complete: " << rop << dendl;
-  }
-}
-
-void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
-{
-  map<hobject_t, read_request_t>::iterator reqiter =
-    rop.to_read.begin();
-  map<hobject_t, read_result_t>::iterator resiter =
-    rop.complete.begin();
-  assert(rop.to_read.size() == rop.complete.size());
-  for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
-    if (reqiter->second.cb) {
-      pair<RecoveryMessages *, read_result_t &> arg(
-       m, resiter->second);
-      reqiter->second.cb->complete(arg);
-      reqiter->second.cb = NULL;
-    }
-  }
-  tid_to_read_map.erase(rop.tid);
-}
-
-struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
-  ECBackend *ec;
-  ceph_tid_t tid;
-  FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
-  void finish(ThreadPool::TPHandle &handle) override {
-    auto ropiter = ec->tid_to_read_map.find(tid);
-    assert(ropiter != ec->tid_to_read_map.end());
-    int priority = ropiter->second.priority;
-    RecoveryMessages rm;
-    ec->complete_read_op(ropiter->second, &rm);
-    ec->dispatch_recovery_messages(rm, priority);
-  }
-};
-
-void ECBackend::filter_read_op(
-  const OSDMapRef& osdmap,
-  ReadOp &op)
-{
-  set<hobject_t> to_cancel;
-  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
-       i != op.source_to_obj.end();
-       ++i) {
-    if (osdmap->is_down(i->first.osd)) {
-      to_cancel.insert(i->second.begin(), i->second.end());
-      op.in_progress.erase(i->first);
-      continue;
-    }
-  }
-
-  if (to_cancel.empty())
-    return;
-
-  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
-       i != op.source_to_obj.end();
-       ) {
-    for (set<hobject_t>::iterator j = i->second.begin();
-        j != i->second.end();
-        ) {
-      if (to_cancel.count(*j))
-       i->second.erase(j++);
-      else
-       ++j;
-    }
-    if (i->second.empty()) {
-      op.source_to_obj.erase(i++);
-    } else {
-      assert(!osdmap->is_down(i->first.osd));
-      ++i;
-    }
-  }
-
-  for (set<hobject_t>::iterator i = to_cancel.begin();
-       i != to_cancel.end();
-       ++i) {
-    get_parent()->cancel_pull(*i);
-
-    assert(op.to_read.count(*i));
-    read_request_t &req = op.to_read.find(*i)->second;
-    dout(10) << __func__ << ": canceling " << req
-            << "  for obj " << *i << dendl;
-    assert(req.cb);
-    delete req.cb;
-    req.cb = NULL;
-
-    op.to_read.erase(*i);
-    op.complete.erase(*i);
-    recovery_ops.erase(*i);
-  }
-
-  if (op.in_progress.empty()) {
-    get_parent()->schedule_recovery_work(
-      get_parent()->bless_gencontext(
-       new FinishReadOp(this, op.tid)));
-  }
-}
-
-void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
-{
-  set<ceph_tid_t> tids_to_filter;
-  for (map<pg_shard_t, set<ceph_tid_t> >::iterator 
-       i = shard_to_read_map.begin();
-       i != shard_to_read_map.end();
-       ) {
-    if (osdmap->is_down(i->first.osd)) {
-      tids_to_filter.insert(i->second.begin(), i->second.end());
-      shard_to_read_map.erase(i++);
-    } else {
-      ++i;
-    }
-  }
-  for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
-       i != tids_to_filter.end();
-       ++i) {
-    map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
-    assert(j != tid_to_read_map.end());
-    filter_read_op(osdmap, j->second);
-  }
-}
-
-void ECBackend::on_change()
-{
-  dout(10) << __func__ << dendl;
-
-  completed_to = eversion_t();
-  committed_to = eversion_t();
-  pipeline_state.clear();
-  waiting_reads.clear();
-  waiting_state.clear();
-  waiting_commit.clear();
-  for (auto &&op: tid_to_op_map) {
-    cache.release_write_pin(op.second.pin);
-  }
-  tid_to_op_map.clear();
-
-  for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
-       i != tid_to_read_map.end();
-       ++i) {
-    dout(10) << __func__ << ": cancelling " << i->second << dendl;
-    for (map<hobject_t, read_request_t>::iterator j =
-          i->second.to_read.begin();
-        j != i->second.to_read.end();
-        ++j) {
-      delete j->second.cb;
-      j->second.cb = 0;
-    }
-  }
-  tid_to_read_map.clear();
-  in_progress_client_reads.clear();
-  shard_to_read_map.clear();
-  clear_recovery_state();
-}
-
-void ECBackend::clear_recovery_state()
-{
-  recovery_ops.clear();
-}
-
-void ECBackend::on_flushed()
-{
-}
-
-void ECBackend::dump_recovery_info(Formatter *f) const
-{
-  f->open_array_section("recovery_ops");
-  for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
-       i != recovery_ops.end();
-       ++i) {
-    f->open_object_section("op");
-    i->second.dump(f);
-    f->close_section();
-  }
-  f->close_section();
-  f->open_array_section("read_ops");
-  for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
-       i != tid_to_read_map.end();
-       ++i) {
-    f->open_object_section("read_op");
-    i->second.dump(f);
-    f->close_section();
-  }
-  f->close_section();
-}
-
-void ECBackend::submit_transaction(
-  const hobject_t &hoid,
-  const object_stat_sum_t &delta_stats,
-  const eversion_t &at_version,
-  PGTransactionUPtr &&t,
-  const eversion_t &trim_to,
-  const eversion_t &roll_forward_to,
-  const vector<pg_log_entry_t> &log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_history,
-  Context *on_local_applied_sync,
-  Context *on_all_applied,
-  Context *on_all_commit,
-  ceph_tid_t tid,
-  osd_reqid_t reqid,
-  OpRequestRef client_op
-  )
-{
-  assert(!tid_to_op_map.count(tid));
-  Op *op = &(tid_to_op_map[tid]);
-  op->hoid = hoid;
-  op->delta_stats = delta_stats;
-  op->version = at_version;
-  op->trim_to = trim_to;
-  op->roll_forward_to = MAX(roll_forward_to, committed_to);
-  op->log_entries = log_entries;
-  std::swap(op->updated_hit_set_history, hset_history);
-  op->on_local_applied_sync = on_local_applied_sync;
-  op->on_all_applied = on_all_applied;
-  op->on_all_commit = on_all_commit;
-  op->tid = tid;
-  op->reqid = reqid;
-  op->client_op = client_op;
-  if (client_op)
-    op->trace = client_op->pg_trace;
-  
-  dout(10) << __func__ << ": op " << *op << " starting" << dendl;
-  start_rmw(op, std::move(t));
-  dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
-}
-
-void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
-  if (!waiting_state.empty()) {
-    waiting_state.back().on_write.emplace_back(std::move(cb));
-  } else if (!waiting_reads.empty()) {
-    waiting_reads.back().on_write.emplace_back(std::move(cb));
-  } else {
-    // Nothing earlier in the pipeline, just call it
-    cb();
-  }
-}
-
-int ECBackend::get_min_avail_to_read_shards(
-  const hobject_t &hoid,
-  const set<int> &want,
-  bool for_recovery,
-  bool do_redundant_reads,
-  set<pg_shard_t> *to_read)
-{
-  // Make sure we don't do redundant reads for recovery
-  assert(!for_recovery || !do_redundant_reads);
-
-  set<int> have;
-  map<shard_id_t, pg_shard_t> shards;
-
-  for (set<pg_shard_t>::const_iterator i =
-        get_parent()->get_acting_shards().begin();
-       i != get_parent()->get_acting_shards().end();
-       ++i) {
-    dout(10) << __func__ << ": checking acting " << *i << dendl;
-    const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
-    if (!missing.is_missing(hoid)) {
-      assert(!have.count(i->shard));
-      have.insert(i->shard);
-      assert(!shards.count(i->shard));
-      shards.insert(make_pair(i->shard, *i));
-    }
-  }
-
-  if (for_recovery) {
-    for (set<pg_shard_t>::const_iterator i =
-          get_parent()->get_backfill_shards().begin();
-        i != get_parent()->get_backfill_shards().end();
-        ++i) {
-      if (have.count(i->shard)) {
-       assert(shards.count(i->shard));
-       continue;
-      }
-      dout(10) << __func__ << ": checking backfill " << *i << dendl;
-      assert(!shards.count(i->shard));
-      const pg_info_t &info = get_parent()->get_shard_info(*i);
-      const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
-      if (hoid < info.last_backfill &&
-         !missing.is_missing(hoid)) {
-       have.insert(i->shard);
-       shards.insert(make_pair(i->shard, *i));
-      }
-    }
-
-    map<hobject_t, set<pg_shard_t>>::const_iterator miter =
-      get_parent()->get_missing_loc_shards().find(hoid);
-    if (miter != get_parent()->get_missing_loc_shards().end()) {
-      for (set<pg_shard_t>::iterator i = miter->second.begin();
-          i != miter->second.end();
-          ++i) {
-       dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
-       auto m = get_parent()->maybe_get_shard_missing(*i);
-       if (m) {
-         assert(!(*m).is_missing(hoid));
-       }
-       have.insert(i->shard);
-       shards.insert(make_pair(i->shard, *i));
-      }
-    }
-  }
-
-  set<int> need;
-  int r = ec_impl->minimum_to_decode(want, have, &need);
-  if (r < 0)
-    return r;
-
-  if (do_redundant_reads) {
-      need.swap(have);
-  } 
-
-  if (!to_read)
-    return 0;
-
-  for (set<int>::iterator i = need.begin();
-       i != need.end();
-       ++i) {
-    assert(shards.count(shard_id_t(*i)));
-    to_read->insert(shards[shard_id_t(*i)]);
-  }
-  return 0;
-}
-
-int ECBackend::get_remaining_shards(
-  const hobject_t &hoid,
-  const set<int> &avail,
-  set<pg_shard_t> *to_read)
-{
-  set<int> need;
-  map<shard_id_t, pg_shard_t> shards;
-
-  for (set<pg_shard_t>::const_iterator i =
-        get_parent()->get_acting_shards().begin();
-       i != get_parent()->get_acting_shards().end();
-       ++i) {
-    dout(10) << __func__ << ": checking acting " << *i << dendl;
-    const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
-    if (!missing.is_missing(hoid)) {
-      assert(!need.count(i->shard));
-      need.insert(i->shard);
-      assert(!shards.count(i->shard));
-      shards.insert(make_pair(i->shard, *i));
-    }
-  }
-
-  if (!to_read)
-    return 0;
-
-  for (set<int>::iterator i = need.begin();
-       i != need.end();
-       ++i) {
-    assert(shards.count(shard_id_t(*i)));
-    if (avail.find(*i) == avail.end())
-      to_read->insert(shards[shard_id_t(*i)]);
-  }
-  return 0;
-}
-
-void ECBackend::start_read_op(
-  int priority,
-  map<hobject_t, read_request_t> &to_read,
-  OpRequestRef _op,
-  bool do_redundant_reads,
-  bool for_recovery)
-{
-  ceph_tid_t tid = get_parent()->get_tid();
-  assert(!tid_to_read_map.count(tid));
-  auto &op = tid_to_read_map.emplace(
-    tid,
-    ReadOp(
-      priority,
-      tid,
-      do_redundant_reads,
-      for_recovery,
-      _op,
-      std::move(to_read))).first->second;
-  dout(10) << __func__ << ": starting " << op << dendl;
-  if (_op) {
-    op.trace = _op->pg_trace;
-    op.trace.event("start ec read");
-  }
-  do_read_op(op);
-}
-
-void ECBackend::do_read_op(ReadOp &op)
-{
-  int priority = op.priority;
-  ceph_tid_t tid = op.tid;
-
-  dout(10) << __func__ << ": starting read " << op << dendl;
-
-  map<pg_shard_t, ECSubRead> messages;
-  for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
-       i != op.to_read.end();
-       ++i) {
-    bool need_attrs = i->second.want_attrs;
-    for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
-        j != i->second.need.end();
-        ++j) {
-      if (need_attrs) {
-       messages[*j].attrs_to_read.insert(i->first);
-       need_attrs = false;
-      }
-      op.obj_to_source[i->first].insert(*j);
-      op.source_to_obj[*j].insert(i->first);
-    }
-    for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
-          i->second.to_read.begin();
-        j != i->second.to_read.end();
-        ++j) {
-      pair<uint64_t, uint64_t> chunk_off_len =
-       sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
-      for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
-          k != i->second.need.end();
-          ++k) {
-       messages[*k].to_read[i->first].push_back(
-         boost::make_tuple(
-           chunk_off_len.first,
-           chunk_off_len.second,
-           j->get<2>()));
-      }
-      assert(!need_attrs);
-    }
-  }
-
-  for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
-       i != messages.end();
-       ++i) {
-    op.in_progress.insert(i->first);
-    shard_to_read_map[i->first].insert(op.tid);
-    i->second.tid = tid;
-    MOSDECSubOpRead *msg = new MOSDECSubOpRead;
-    msg->set_priority(priority);
-    msg->pgid = spg_t(
-      get_parent()->whoami_spg_t().pgid,
-      i->first.shard);
-    msg->map_epoch = get_parent()->get_epoch();
-    msg->min_epoch = get_parent()->get_interval_start_epoch();
-    msg->op = i->second;
-    msg->op.from = get_parent()->whoami_shard();
-    msg->op.tid = tid;
-    if (op.trace) {
-      // initialize a child span for this shard
-      msg->trace.init("ec sub read", nullptr, &op.trace);
-      msg->trace.keyval("shard", i->first.shard.id);
-    }
-    get_parent()->send_message_osd_cluster(
-      i->first.osd,
-      msg,
-      get_parent()->get_epoch());
-  }
-  dout(10) << __func__ << ": started " << op << dendl;
-}
-
-ECUtil::HashInfoRef ECBackend::get_hash_info(
-  const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
-{
-  dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
-  ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
-  if (!ref) {
-    dout(10) << __func__ << ": not in cache " << hoid << dendl;
-    struct stat st;
-    int r = store->stat(
-      ch,
-      ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-      &st);
-    ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
-    // XXX: What does it mean if there is no object on disk?
-    if (r >= 0) {
-      dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
-      bufferlist bl;
-      if (attrs) {
-       map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
-       if (k == attrs->end()) {
-         dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
-       } else {
-         bl.push_back(k->second);
-       }
-      } else {
-       r = store->getattr(
-         ch,
-         ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-         ECUtil::get_hinfo_key(),
-         bl);
-       if (r < 0) {
-         dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
-         bl.clear(); // just in case
-       }
-      }
-      if (bl.length() > 0) {
-       bufferlist::iterator bp = bl.begin();
-       ::decode(hinfo, bp);
-       if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
-         dout(0) << __func__ << ": Mismatch of total_chunk_size "
-                              << hinfo.get_total_chunk_size() << dendl;
-         return ECUtil::HashInfoRef();
-       }
-      } else if (st.st_size > 0) { // If empty object and no hinfo, create it
-       return ECUtil::HashInfoRef();
-      }
-    }
-    ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
-  }
-  return ref;
-}
-
-void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
-{
-  assert(op);
-
-  op->plan = ECTransaction::get_write_plan(
-    sinfo,
-    std::move(t),
-    [&](const hobject_t &i) {
-      ECUtil::HashInfoRef ref = get_hash_info(i, false);
-      if (!ref) {
-       derr << __func__ << ": get_hash_info(" << i << ")"
-            << " returned a null pointer and there is no "
-            << " way to recover from such an error in this "
-            << " context" << dendl;
-       ceph_abort();
-      }
-      return ref;
-    },
-    get_parent()->get_dpp());
-
-  dout(10) << __func__ << ": " << *op << dendl;
-
-  waiting_state.push_back(*op);
-  check_ops();
-}
-
-bool ECBackend::try_state_to_reads()
-{
-  if (waiting_state.empty())
-    return false;
-
-  Op *op = &(waiting_state.front());
-  if (op->requires_rmw() && pipeline_state.cache_invalid()) {
-    assert(get_parent()->get_pool().allows_ecoverwrites());
-    dout(20) << __func__ << ": blocking " << *op
-            << " because it requires an rmw and the cache is invalid "
-            << pipeline_state
-            << dendl;
-    return false;
-  }
-
-  if (op->invalidates_cache()) {
-    dout(20) << __func__ << ": invalidating cache after this op"
-            << dendl;
-    pipeline_state.invalidate();
-    op->using_cache = false;
-  } else {
-    op->using_cache = pipeline_state.caching_enabled();
-  }
-
-  waiting_state.pop_front();
-  waiting_reads.push_back(*op);
-
-  if (op->using_cache) {
-    cache.open_write_pin(op->pin);
-
-    extent_set empty;
-    for (auto &&hpair: op->plan.will_write) {
-      auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
-      const extent_set &to_read_plan =
-       to_read_plan_iter == op->plan.to_read.end() ?
-       empty :
-       to_read_plan_iter->second;
-
-      extent_set remote_read = cache.reserve_extents_for_rmw(
-       hpair.first,
-       op->pin,
-       hpair.second,
-       to_read_plan);
-
-      extent_set pending_read = to_read_plan;
-      pending_read.subtract(remote_read);
-
-      if (!remote_read.empty()) {
-       op->remote_read[hpair.first] = std::move(remote_read);
-      }
-      if (!pending_read.empty()) {
-       op->pending_read[hpair.first] = std::move(pending_read);
-      }
-    }
-  } else {
-    op->remote_read = op->plan.to_read;
-  }
-
-  dout(10) << __func__ << ": " << *op << dendl;
-
-  if (!op->remote_read.empty()) {
-    assert(get_parent()->get_pool().allows_ecoverwrites());
-    objects_read_async_no_cache(
-      op->remote_read,
-      [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
-       for (auto &&i: results) {
-         op->remote_read_result.emplace(i.first, i.second.second);
-       }
-       check_ops();
-      });
-  }
-
-  return true;
-}
-
-bool ECBackend::try_reads_to_commit()
-{
-  if (waiting_reads.empty())
-    return false;
-  Op *op = &(waiting_reads.front());
-  if (op->read_in_progress())
-    return false;
-  waiting_reads.pop_front();
-  waiting_commit.push_back(*op);
-
-  dout(10) << __func__ << ": starting commit on " << *op << dendl;
-  dout(20) << __func__ << ": " << cache << dendl;
-
-  get_parent()->apply_stats(
-    op->hoid,
-    op->delta_stats);
-
-  if (op->using_cache) {
-    for (auto &&hpair: op->pending_read) {
-      op->remote_read_result[hpair.first].insert(
-       cache.get_remaining_extents_for_rmw(
-         hpair.first,
-         op->pin,
-         hpair.second));
-    }
-    op->pending_read.clear();
-  } else {
-    assert(op->pending_read.empty());
-  }
-
-  map<shard_id_t, ObjectStore::Transaction> trans;
-  for (set<pg_shard_t>::const_iterator i =
-        get_parent()->get_actingbackfill_shards().begin();
-       i != get_parent()->get_actingbackfill_shards().end();
-       ++i) {
-    trans[i->shard];
-  }
-
-  op->trace.event("start ec write");
-
-  map<hobject_t,extent_map> written;
-  if (op->plan.t) {
-    ECTransaction::generate_transactions(
-      op->plan,
-      ec_impl,
-      get_parent()->get_info().pgid.pgid,
-      (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
-      sinfo,
-      op->remote_read_result,
-      op->log_entries,
-      &written,
-      &trans,
-      &(op->temp_added),
-      &(op->temp_cleared),
-      get_parent()->get_dpp());
-  }
-
-  dout(20) << __func__ << ": " << cache << dendl;
-  dout(20) << __func__ << ": written: " << written << dendl;
-  dout(20) << __func__ << ": op: " << *op << dendl;
-
-  if (!get_parent()->get_pool().allows_ecoverwrites()) {
-    for (auto &&i: op->log_entries) {
-      if (i.requires_kraken()) {
-       derr << __func__ << ": log entry " << i << " requires kraken"
-            << " but overwrites are not enabled!" << dendl;
-       ceph_abort();
-      }
-    }
-  }
-
-  map<hobject_t,extent_set> written_set;
-  for (auto &&i: written) {
-    written_set[i.first] = i.second.get_interval_set();
-  }
-  dout(20) << __func__ << ": written_set: " << written_set << dendl;
-  assert(written_set == op->plan.will_write);
-
-  if (op->using_cache) {
-    for (auto &&hpair: written) {
-      dout(20) << __func__ << ": " << hpair << dendl;
-      cache.present_rmw_update(hpair.first, op->pin, hpair.second);
-    }
-  }
-  op->remote_read.clear();
-  op->remote_read_result.clear();
-
-  dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
-  ObjectStore::Transaction empty;
-  bool should_write_local = false;
-  ECSubWrite local_write_op;
-  for (set<pg_shard_t>::const_iterator i =
-        get_parent()->get_actingbackfill_shards().begin();
-       i != get_parent()->get_actingbackfill_shards().end();
-       ++i) {
-    op->pending_apply.insert(*i);
-    op->pending_commit.insert(*i);
-    map<shard_id_t, ObjectStore::Transaction>::iterator iter =
-      trans.find(i->shard);
-    assert(iter != trans.end());
-    bool should_send = get_parent()->should_send_op(*i, op->hoid);
-    const pg_stat_t &stats =
-      should_send ?
-      get_info().stats :
-      parent->get_shard_info().find(*i)->second.stats;
-
-    ECSubWrite sop(
-      get_parent()->whoami_shard(),
-      op->tid,
-      op->reqid,
-      op->hoid,
-      stats,
-      should_send ? iter->second : empty,
-      op->version,
-      op->trim_to,
-      op->roll_forward_to,
-      op->log_entries,
-      op->updated_hit_set_history,
-      op->temp_added,
-      op->temp_cleared,
-      !should_send);
-
-    ZTracer::Trace trace;
-    if (op->trace) {
-      // initialize a child span for this shard
-      trace.init("ec sub write", nullptr, &op->trace);
-      trace.keyval("shard", i->shard.id);
-    }
-
-    if (*i == get_parent()->whoami_shard()) {
-      should_write_local = true;
-      local_write_op.claim(sop);
-    } else {
-      MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
-      r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
-      r->map_epoch = get_parent()->get_epoch();
-      r->min_epoch = get_parent()->get_interval_start_epoch();
-      r->trace = trace;
-      get_parent()->send_message_osd_cluster(
-       i->osd, r, get_parent()->get_epoch());
-    }
-  }
-  if (should_write_local) {
-      handle_sub_write(
-       get_parent()->whoami_shard(),
-       op->client_op,
-       local_write_op,
-       op->trace,
-       op->on_local_applied_sync);
-      op->on_local_applied_sync = 0;
-  }
-
-  for (auto i = op->on_write.begin();
-       i != op->on_write.end();
-       op->on_write.erase(i++)) {
-    (*i)();
-  }
-
-  return true;
-}
-
-bool ECBackend::try_finish_rmw()
-{
-  if (waiting_commit.empty())
-    return false;
-  Op *op = &(waiting_commit.front());
-  if (op->write_in_progress())
-    return false;
-  waiting_commit.pop_front();
-
-  dout(10) << __func__ << ": " << *op << dendl;
-  dout(20) << __func__ << ": " << cache << dendl;
-
-  if (op->roll_forward_to > completed_to)
-    completed_to = op->roll_forward_to;
-  if (op->version > committed_to)
-    committed_to = op->version;
-
-  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
-    if (op->version > get_parent()->get_log().get_can_rollback_to() &&
-       waiting_reads.empty() &&
-       waiting_commit.empty()) {
-      // submit a dummy transaction to kick the rollforward
-      auto tid = get_parent()->get_tid();
-      Op *nop = &(tid_to_op_map[tid]);
-      nop->hoid = op->hoid;
-      nop->trim_to = op->trim_to;
-      nop->roll_forward_to = op->version;
-      nop->tid = tid;
-      nop->reqid = op->reqid;
-      waiting_reads.push_back(*nop);
-    }
-  }
-
-  if (op->using_cache) {
-    cache.release_write_pin(op->pin);
-  }
-  tid_to_op_map.erase(op->tid);
-
-  if (waiting_reads.empty() &&
-      waiting_commit.empty()) {
-    pipeline_state.clear();
-    dout(20) << __func__ << ": clearing pipeline_state "
-            << pipeline_state
-            << dendl;
-  }
-  return true;
-}
-
-void ECBackend::check_ops()
-{
-  while (try_state_to_reads() ||
-        try_reads_to_commit() ||
-        try_finish_rmw());
-}
-
-int ECBackend::objects_read_sync(
-  const hobject_t &hoid,
-  uint64_t off,
-  uint64_t len,
-  uint32_t op_flags,
-  bufferlist *bl)
-{
-  return -EOPNOTSUPP;
-}
-
-void ECBackend::objects_read_async(
-  const hobject_t &hoid,
-  const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-             pair<bufferlist*, Context*> > > &to_read,
-  Context *on_complete,
-  bool fast_read)
-{
-  map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
-    reads;
-
-  uint32_t flags = 0;
-  extent_set es;
-  for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-        pair<bufferlist*, Context*> > >::const_iterator i =
-        to_read.begin();
-       i != to_read.end();
-       ++i) {
-    pair<uint64_t, uint64_t> tmp =
-      sinfo.offset_len_to_stripe_bounds(
-       make_pair(i->first.get<0>(), i->first.get<1>()));
-
-    extent_set esnew;
-    esnew.insert(tmp.first, tmp.second);
-    es.union_of(esnew);
-    flags |= i->first.get<2>();
-  }
-
-  if (!es.empty()) {
-    auto &offsets = reads[hoid];
-    for (auto j = es.begin();
-        j != es.end();
-        ++j) {
-      offsets.push_back(
-       boost::make_tuple(
-         j.get_start(),
-         j.get_len(),
-         flags));
-    }
-  }
-
-  struct cb {
-    ECBackend *ec;
-    hobject_t hoid;
-    list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-             pair<bufferlist*, Context*> > > to_read;
-    unique_ptr<Context> on_complete;
-    cb(const cb&) = delete;
-    cb(cb &&) = default;
-    cb(ECBackend *ec,
-       const hobject_t &hoid,
-       const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                  pair<bufferlist*, Context*> > > &to_read,
-       Context *on_complete)
-      : ec(ec),
-       hoid(hoid),
-       to_read(to_read),
-       on_complete(on_complete) {}
-    void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
-      auto dpp = ec->get_parent()->get_dpp();
-      ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
-                        << dendl;
-      ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
-                        << dendl;
-
-      auto &got = results[hoid];
-
-      int r = 0;
-      for (auto &&read: to_read) {
-       if (got.first < 0) {
-         if (read.second.second) {
-           read.second.second->complete(got.first);
-         }
-         if (r == 0)
-           r = got.first;
-       } else {
-         assert(read.second.first);
-         uint64_t offset = read.first.get<0>();
-         uint64_t length = read.first.get<1>();
-         auto range = got.second.get_containing_range(offset, length);
-         assert(range.first != range.second);
-         assert(range.first.get_off() <= offset);
-         assert(
-           (offset + length) <=
-           (range.first.get_off() + range.first.get_len()));
-         read.second.first->substr_of(
-           range.first.get_val(),
-           offset - range.first.get_off(),
-           length);
-         if (read.second.second) {
-           read.second.second->complete(length);
-           read.second.second = nullptr;
-         }
-       }
-      }
-      to_read.clear();
-      if (on_complete) {
-       on_complete.release()->complete(r);
-      }
-    }
-    ~cb() {
-      for (auto &&i: to_read) {
-       delete i.second.second;
-      }
-      to_read.clear();
-    }
-  };
-  objects_read_and_reconstruct(
-    reads,
-    fast_read,
-    make_gen_lambda_context<
-      map<hobject_t,pair<int, extent_map> > &&, cb>(
-       cb(this,
-          hoid,
-          to_read,
-          on_complete)));
-}
-
-struct CallClientContexts :
-  public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
-  hobject_t hoid;
-  ECBackend *ec;
-  ECBackend::ClientAsyncReadStatus *status;
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
-  CallClientContexts(
-    hobject_t hoid,
-    ECBackend *ec,
-    ECBackend::ClientAsyncReadStatus *status,
-    const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
-    : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
-  void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
-    ECBackend::read_result_t &res = in.second;
-    extent_map result;
-    if (res.r != 0)
-      goto out;
-    assert(res.returned.size() == to_read.size());
-    assert(res.r == 0);
-    assert(res.errors.empty());
-    for (auto &&read: to_read) {
-      pair<uint64_t, uint64_t> adjusted =
-       ec->sinfo.offset_len_to_stripe_bounds(
-         make_pair(read.get<0>(), read.get<1>()));
-      assert(res.returned.front().get<0>() == adjusted.first &&
-            res.returned.front().get<1>() == adjusted.second);
-      map<int, bufferlist> to_decode;
-      bufferlist bl;
-      for (map<pg_shard_t, bufferlist>::iterator j =
-            res.returned.front().get<2>().begin();
-          j != res.returned.front().get<2>().end();
-          ++j) {
-       to_decode[j->first.shard].claim(j->second);
-      }
-      int r = ECUtil::decode(
-       ec->sinfo,
-       ec->ec_impl,
-       to_decode,
-       &bl);
-      if (r < 0) {
-        res.r = r;
-        goto out;
-      }
-      bufferlist trimmed;
-      trimmed.substr_of(
-       bl,
-       read.get<0>() - adjusted.first,
-       MIN(read.get<1>(),
-           bl.length() - (read.get<0>() - adjusted.first)));
-      result.insert(
-       read.get<0>(), trimmed.length(), std::move(trimmed));
-      res.returned.pop_front();
-    }
-out:
-    status->complete_object(hoid, res.r, std::move(result));
-    ec->kick_reads();
-  }
-};
-
-void ECBackend::objects_read_and_reconstruct(
-  const map<hobject_t,
-    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
-  > &reads,
-  bool fast_read,
-  GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
-{
-  in_progress_client_reads.emplace_back(
-    reads.size(), std::move(func));
-  if (!reads.size()) {
-    kick_reads();
-    return;
-  }
-
-  set<int> want_to_read;
-  get_want_to_read_shards(&want_to_read);
-    
-  map<hobject_t, read_request_t> for_read_op;
-  for (auto &&to_read: reads) {
-    set<pg_shard_t> shards;
-    int r = get_min_avail_to_read_shards(
-      to_read.first,
-      want_to_read,
-      false,
-      fast_read,
-      &shards);
-    assert(r == 0);
-
-    CallClientContexts *c = new CallClientContexts(
-      to_read.first,
-      this,
-      &(in_progress_client_reads.back()),
-      to_read.second);
-    for_read_op.insert(
-      make_pair(
-       to_read.first,
-       read_request_t(
-         to_read.second,
-         shards,
-         false,
-         c)));
-  }
-
-  start_read_op(
-    CEPH_MSG_PRIO_DEFAULT,
-    for_read_op,
-    OpRequestRef(),
-    fast_read, false);
-  return;
-}
-
-
-int ECBackend::send_all_remaining_reads(
-  const hobject_t &hoid,
-  ReadOp &rop)
-{
-  set<int> already_read;
-  const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
-  for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
-    already_read.insert(i->shard);
-  dout(10) << __func__ << " have/error shards=" << already_read << dendl;
-  set<pg_shard_t> shards;
-  int r = get_remaining_shards(hoid, already_read, &shards);
-  if (r)
-    return r;
-  if (shards.empty())
-    return -EIO;
-
-  dout(10) << __func__ << " Read remaining shards " << shards << dendl;
-
-  // TODOSAM: this doesn't seem right
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
-    rop.to_read.find(hoid)->second.to_read;
-  GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
-    rop.to_read.find(hoid)->second.cb;
-
-  map<hobject_t, read_request_t> for_read_op;
-  for_read_op.insert(
-    make_pair(
-      hoid,
-      read_request_t(
-       offsets,
-       shards,
-       false,
-       c)));
-
-  rop.to_read.swap(for_read_op);
-  do_read_op(rop);
-  return 0;
-}
-
-int ECBackend::objects_get_attrs(
-  const hobject_t &hoid,
-  map<string, bufferlist> *out)
-{
-  int r = store->getattrs(
-    ch,
-    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    *out);
-  if (r < 0)
-    return r;
-
-  for (map<string, bufferlist>::iterator i = out->begin();
-       i != out->end();
-       ) {
-    if (ECUtil::is_hinfo_key_string(i->first))
-      out->erase(i++);
-    else
-      ++i;
-  }
-  return r;
-}
-
-void ECBackend::rollback_append(
-  const hobject_t &hoid,
-  uint64_t old_size,
-  ObjectStore::Transaction *t)
-{
-  assert(old_size % sinfo.get_stripe_width() == 0);
-  t->truncate(
-    coll,
-    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    sinfo.aligned_logical_offset_to_chunk_offset(
-      old_size));
-}
-
-void ECBackend::be_deep_scrub(
-  const hobject_t &poid,
-  uint32_t seed,
-  ScrubMap::object &o,
-  ThreadPool::TPHandle &handle) {
-  bufferhash h(-1); // we always used -1
-  int r;
-  uint64_t stride = cct->_conf->osd_deep_scrub_stride;
-  if (stride % sinfo.get_chunk_size())
-    stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
-  uint64_t pos = 0;
-
-  uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
-
-  while (true) {
-    bufferlist bl;
-    handle.reset_tp_timeout();
-    r = store->read(
-      ch,
-      ghobject_t(
-       poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-      pos,
-      stride, bl,
-      fadvise_flags);
-    if (r < 0)
-      break;
-    if (bl.length() % sinfo.get_chunk_size()) {
-      r = -EIO;
-      break;
-    }
-    pos += r;
-    h << bl;
-    if ((unsigned)r < stride)
-      break;
-  }
-
-  if (r == -EIO) {
-    dout(0) << "_scan_list  " << poid << " got "
-           << r << " on read, read_error" << dendl;
-    o.read_error = true;
-    return;
-  }
-
-  ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
-  if (!hinfo) {
-    dout(0) << "_scan_list  " << poid << " could not retrieve hash info" << dendl;
-    o.read_error = true;
-    o.digest_present = false;
-    return;
-  } else {
-    if (!get_parent()->get_pool().allows_ecoverwrites()) {
-      assert(hinfo->has_chunk_hash());
-      if (hinfo->get_total_chunk_size() != pos) {
-       dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
-       o.ec_size_mismatch = true;
-       return;
-      }
-
-      if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
-       dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
-       o.ec_hash_mismatch = true;
-       return;
-      }
-
-      /* We checked above that we match our own stored hash.  We cannot
-       * send a hash of the actual object, so instead we simply send
-       * our locally stored hash of shard 0 on the assumption that if
-       * we match our chunk hash and our recollection of the hash for
-       * chunk 0 matches that of our peers, there is likely no corruption.
-       */
-      o.digest = hinfo->get_chunk_hash(0);
-      o.digest_present = true;
-    } else {
-      /* Hack! We must be using partial overwrites, and partial overwrites
-       * don't support deep-scrub yet
-       */
-      o.digest = 0;
-      o.digest_present = true;
-    }
-  }
-
-  o.omap_digest = seed;
-  o.omap_digest_present = true;
-}