initial code repo
[stor4nfv.git] / src / ceph / src / osd / PrimaryLogPG.h
diff --git a/src/ceph/src/osd/PrimaryLogPG.h b/src/ceph/src/osd/PrimaryLogPG.h
new file mode 100644 (file)
index 0000000..df2a45f
--- /dev/null
@@ -0,0 +1,1837 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_REPLICATEDPG_H
+#define CEPH_REPLICATEDPG_H
+
+#include <boost/tuple/tuple.hpp>
+#include "include/assert.h" 
+#include "PG.h"
+#include "Watch.h"
+#include "TierAgentState.h"
+#include "messages/MOSDOpReply.h"
+#include "common/Checksummer.h"
+#include "common/sharedptr_registry.hpp"
+#include "ReplicatedBackend.h"
+#include "PGTransaction.h"
+
+class CopyFromCallback;
+class PromoteCallback;
+
+class PrimaryLogPG;
+class PGLSFilter;
+class HitSet;
+struct TierAgentState;
+class MOSDOp;
+class MOSDOpReply;
+class OSDService;
+
+void intrusive_ptr_add_ref(PrimaryLogPG *pg);
+void intrusive_ptr_release(PrimaryLogPG *pg);
+uint64_t get_with_id(PrimaryLogPG *pg);
+void put_with_id(PrimaryLogPG *pg, uint64_t id);
+
+#ifdef PG_DEBUG_REFS
+  typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
+#else
+  typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
+#endif
+
+struct inconsistent_snapset_wrapper;
+
+class PrimaryLogPG : public PG, public PGBackend::Listener {
+  friend class OSD;
+  friend class Watch;
+
+public:
+  MEMPOOL_CLASS_HELPERS();
+
+  /*
+   * state associated with a copy operation
+   */
+  struct OpContext;
+  class CopyCallback;
+
+  /**
+   * CopyResults stores the object metadata of interest to a copy initiator.
+   */
+  struct CopyResults {
+    ceph::real_time mtime; ///< the copy source's mtime
+    uint64_t object_size; ///< the copied object's size
+    bool started_temp_obj; ///< true if the callback needs to delete temp object
+    hobject_t temp_oid;    ///< temp object (if any)
+
+    /**
+     * Function to fill in transaction; if non-empty the callback
+     * must execute it before any other accesses to the object
+     * (in order to complete the copy).
+     */
+    std::function<void(PGTransaction *)> fill_in_final_tx;
+
+    version_t user_version; ///< The copy source's user version
+    bool should_requeue;  ///< op should be requeued on cancel
+    vector<snapid_t> snaps;  ///< src's snaps (if clone)
+    snapid_t snap_seq;       ///< src's snap_seq (if head)
+    librados::snap_set_t snapset; ///< src snapset (if head)
+    bool mirror_snapset;
+    bool has_omap;
+    uint32_t flags;    // object_copy_data_t::FLAG_*
+    uint32_t source_data_digest, source_omap_digest;
+    uint32_t data_digest, omap_digest;
+    mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+    map<string, bufferlist> attrs; // xattrs
+    uint64_t truncate_seq;
+    uint64_t truncate_size;
+    bool is_data_digest() {
+      return flags & object_copy_data_t::FLAG_DATA_DIGEST;
+    }
+    bool is_omap_digest() {
+      return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
+    }
+    CopyResults()
+      : object_size(0), started_temp_obj(false),
+       user_version(0),
+       should_requeue(false), mirror_snapset(false),
+       has_omap(false),
+       flags(0),
+       source_data_digest(-1), source_omap_digest(-1),
+       data_digest(-1), omap_digest(-1),
+       truncate_seq(0), truncate_size(0)
+    {}
+  };
+
+  struct CopyOp {
+    CopyCallback *cb;
+    ObjectContextRef obc;
+    hobject_t src;
+    object_locator_t oloc;
+    unsigned flags;
+    bool mirror_snapset;
+
+    CopyResults results;
+
+    ceph_tid_t objecter_tid;
+    ceph_tid_t objecter_tid2;
+
+    object_copy_cursor_t cursor;
+    map<string,bufferlist> attrs;
+    bufferlist data;
+    bufferlist omap_header;
+    bufferlist omap_data;
+    int rval;
+
+    object_copy_cursor_t temp_cursor;
+
+    /*
+     * For CopyOp the process is:
+     * step1: read the data(attr/omap/data) from the source object
+     * step2: handle those data(w/ those data create a new object)
+     * src_obj_fadvise_flags used in step1;
+     * dest_obj_fadvise_flags used in step2
+     */
+    unsigned src_obj_fadvise_flags;
+    unsigned dest_obj_fadvise_flags;
+
+    CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
+          object_locator_t l,
+           version_t v,
+          unsigned f,
+          bool ms,
+          unsigned src_obj_fadvise_flags,
+          unsigned dest_obj_fadvise_flags)
+      : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
+       mirror_snapset(ms),
+       objecter_tid(0),
+       objecter_tid2(0),
+       rval(-1),
+       src_obj_fadvise_flags(src_obj_fadvise_flags),
+       dest_obj_fadvise_flags(dest_obj_fadvise_flags)
+    {
+      results.user_version = v;
+      results.mirror_snapset = mirror_snapset;
+    }
+  };
+  typedef ceph::shared_ptr<CopyOp> CopyOpRef;
+
+  /**
+   * The CopyCallback class defines an interface for completions to the
+   * copy_start code. Users of the copy infrastructure must implement
+   * one and give an instance of the class to start_copy.
+   *
+   * The implementer is responsible for making sure that the CopyCallback
+   * can associate itself with the correct copy operation.
+   */
+  typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
+
+  friend class CopyFromCallback;
+  friend class CopyFromFinisher;
+  friend class PromoteCallback;
+
+  struct ProxyReadOp {
+    OpRequestRef op;
+    hobject_t soid;
+    ceph_tid_t objecter_tid;
+    vector<OSDOp> &ops;
+    version_t user_version;
+    int data_offset;
+    bool canceled;              ///< true if canceled
+
+    ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
+      : op(_op), soid(oid),
+        objecter_tid(0), ops(_ops),
+       user_version(0), data_offset(0),
+       canceled(false) { }
+  };
+  typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
+
+  struct ProxyWriteOp {
+    OpContext *ctx;
+    OpRequestRef op;
+    hobject_t soid;
+    ceph_tid_t objecter_tid;
+    vector<OSDOp> &ops;
+    version_t user_version;
+    bool sent_reply;
+    utime_t mtime;
+    bool canceled;
+    osd_reqid_t reqid;
+
+    ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
+      : ctx(NULL), op(_op), soid(oid),
+        objecter_tid(0), ops(_ops),
+       user_version(0), sent_reply(false),
+       canceled(false),
+        reqid(_reqid) { }
+  };
+  typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
+
+  struct FlushOp {
+    ObjectContextRef obc;       ///< obc we are flushing
+    OpRequestRef op;            ///< initiating op
+    list<OpRequestRef> dup_ops; ///< bandwagon jumpers
+    version_t flushed_version;  ///< user version we are flushing
+    ceph_tid_t objecter_tid;    ///< copy-from request tid
+    int rval;                   ///< copy-from result
+    bool blocking;              ///< whether we are blocking updates
+    bool removal;               ///< we are removing the backend object
+    boost::optional<std::function<void()>> on_flush; ///< callback, may be null
+
+    FlushOp()
+      : flushed_version(0), objecter_tid(0), rval(0),
+       blocking(false), removal(false) {}
+    ~FlushOp() { assert(!on_flush); }
+  };
+  typedef ceph::shared_ptr<FlushOp> FlushOpRef;
+
+  boost::scoped_ptr<PGBackend> pgbackend;
+  PGBackend *get_pgbackend() override {
+    return pgbackend.get();
+  }
+
+  /// Listener methods
+  DoutPrefixProvider *get_dpp() override {
+    return this;
+  }
+
+  void on_local_recover(
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info,
+    ObjectContextRef obc,
+    bool is_delete,
+    ObjectStore::Transaction *t
+    ) override;
+  void on_peer_recover(
+    pg_shard_t peer,
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info
+    ) override;
+  void begin_peer_recover(
+    pg_shard_t peer,
+    const hobject_t oid) override;
+  void on_global_recover(
+    const hobject_t &oid,
+    const object_stat_sum_t &stat_diff,
+    bool is_delete) override;
+  void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
+  void primary_failed(const hobject_t &soid) override;
+  bool primary_error(const hobject_t& soid, eversion_t v) override;
+  void cancel_pull(const hobject_t &soid) override;
+  void apply_stats(
+    const hobject_t &soid,
+    const object_stat_sum_t &delta_stats) override;
+  void on_primary_error(const hobject_t &oid, eversion_t v) override;
+  void remove_missing_object(const hobject_t &oid,
+                            eversion_t v,
+                            Context *on_complete) override;
+
+  template<class T> class BlessedGenContext;
+  class BlessedContext;
+  Context *bless_context(Context *c) override;
+
+  GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) override;
+    
+  void send_message(int to_osd, Message *m) override {
+    osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
+  }
+  void queue_transaction(ObjectStore::Transaction&& t,
+                        OpRequestRef op) override {
+    osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
+  }
+  void queue_transactions(vector<ObjectStore::Transaction>& tls,
+                         OpRequestRef op) override {
+    osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
+  }
+  epoch_t get_epoch() const override {
+    return get_osdmap()->get_epoch();
+  }
+  epoch_t get_interval_start_epoch() const override {
+    return info.history.same_interval_since;
+  }
+  epoch_t get_last_peering_reset_epoch() const override {
+    return get_last_peering_reset();
+  }
+  const set<pg_shard_t> &get_actingbackfill_shards() const override {
+    return actingbackfill;
+  }
+  const set<pg_shard_t> &get_acting_shards() const override {
+    return actingset;
+  }
+  const set<pg_shard_t> &get_backfill_shards() const override {
+    return backfill_targets;
+  }
+
+  std::string gen_dbg_prefix() const override { return gen_prefix(); }
+  
+  const map<hobject_t, set<pg_shard_t>>
+    &get_missing_loc_shards() const override {
+    return missing_loc.get_missing_locs();
+  }
+  const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
+    return peer_missing;
+  }
+  using PGBackend::Listener::get_shard_missing;
+  const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+    return peer_info;
+  }
+  using PGBackend::Listener::get_shard_info;  
+  const pg_missing_tracker_t &get_local_missing() const override {
+    return pg_log.get_missing();
+  }
+  const PGLog &get_log() const override {
+    return pg_log;
+  }
+  bool pgb_is_primary() const override {
+    return is_primary();
+  }
+  OSDMapRef pgb_get_osdmap() const override {
+    return get_osdmap();
+  }
+  const pg_info_t &get_info() const override {
+    return info;
+  }
+  const pg_pool_t &get_pool() const override {
+    return pool.info;
+  }
+
+  ObjectContextRef get_obc(
+    const hobject_t &hoid,
+    const map<string, bufferlist> &attrs) override {
+    return get_object_context(hoid, true, &attrs);
+  }
+
+  bool try_lock_for_read(
+    const hobject_t &hoid,
+    ObcLockManager &manager) override {
+    if (is_missing_object(hoid))
+      return false;
+    auto obc = get_object_context(hoid, false, nullptr);
+    if (!obc)
+      return false;
+    return manager.try_get_read_lock(hoid, obc);
+  }
+
+  void release_locks(ObcLockManager &manager) override {
+    release_object_locks(manager);
+  }
+
+  void pgb_set_object_snap_mapping(
+    const hobject_t &soid,
+    const set<snapid_t> &snaps,
+    ObjectStore::Transaction *t) override {
+    return update_object_snap_mapping(t, soid, snaps);
+  }
+  void pgb_clear_object_snap_mapping(
+    const hobject_t &soid,
+    ObjectStore::Transaction *t) override {
+    return clear_object_snap_mapping(t, soid);
+  }
+
+  void log_operation(
+    const vector<pg_log_entry_t> &logv,
+    const boost::optional<pg_hit_set_history_t> &hset_history,
+    const eversion_t &trim_to,
+    const eversion_t &roll_forward_to,
+    bool transaction_applied,
+    ObjectStore::Transaction &t) override {
+    if (hset_history) {
+      info.hit_set = *hset_history;
+    }
+    append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
+  }
+
+  struct C_OSD_OnApplied;
+  void op_applied(
+    const eversion_t &applied_version) override;
+
+  bool should_send_op(
+    pg_shard_t peer,
+    const hobject_t &hoid) override {
+    if (peer == get_primary())
+      return true;
+    assert(peer_info.count(peer));
+    bool should_send =
+      hoid.pool != (int64_t)info.pgid.pool() ||
+      hoid <= last_backfill_started ||
+      hoid <= peer_info[peer].last_backfill;
+    if (!should_send)
+      assert(is_backfill_targets(peer));
+    return should_send;
+  }
+  
+  void update_peer_last_complete_ondisk(
+    pg_shard_t fromosd,
+    eversion_t lcod) override {
+    peer_last_complete_ondisk[fromosd] = lcod;
+  }
+
+  void update_last_complete_ondisk(
+    eversion_t lcod) override {
+    last_complete_ondisk = lcod;
+  }
+
+  void update_stats(
+    const pg_stat_t &stat) override {
+    info.stats = stat;
+  }
+
+  void schedule_recovery_work(
+    GenContext<ThreadPool::TPHandle&> *c) override;
+
+  pg_shard_t whoami_shard() const override {
+    return pg_whoami;
+  }
+  spg_t primary_spg_t() const override {
+    return spg_t(info.pgid.pgid, primary.shard);
+  }
+  pg_shard_t primary_shard() const override {
+    return primary;
+  }
+  uint64_t min_peer_features() const override {
+    return get_min_peer_features();
+  }
+
+  void send_message_osd_cluster(
+    int peer, Message *m, epoch_t from_epoch) override;
+  void send_message_osd_cluster(
+    Message *m, Connection *con) override;
+  void send_message_osd_cluster(
+    Message *m, const ConnectionRef& con) override;
+  ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
+  entity_name_t get_cluster_msgr_name() override {
+    return osd->get_cluster_msgr_name();
+  }
+
+  PerfCounters *get_logger() override;
+
+  ceph_tid_t get_tid() override { return osd->get_tid(); }
+
+  LogClientTemp clog_error() override { return osd->clog->error(); }
+  LogClientTemp clog_warn() override { return osd->clog->warn(); }
+
+  struct watch_disconnect_t {
+    uint64_t cookie;
+    entity_name_t name;
+    bool send_disconnect;
+    watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
+      : cookie(c), name(n), send_disconnect(sd) {}
+  };
+  void complete_disconnect_watches(
+    ObjectContextRef obc,
+    const list<watch_disconnect_t> &to_disconnect);
+
+  struct OpFinisher {
+    virtual ~OpFinisher() {
+    }
+
+    virtual int execute() = 0;
+  };
+
+  /*
+   * Capture all object state associated with an in-progress read or write.
+   */
+  struct OpContext {
+    OpRequestRef op;
+    osd_reqid_t reqid;
+    vector<OSDOp> *ops;
+
+    const ObjectState *obs; // Old objectstate
+    const SnapSet *snapset; // Old snapset
+
+    ObjectState new_obs;  // resulting ObjectState
+    SnapSet new_snapset;  // resulting SnapSet (in case of a write)
+    //pg_stat_t new_stats;  // resulting Stats
+    object_stat_sum_t delta_stats;
+
+    bool modify;          // (force) modification (even if op_t is empty)
+    bool user_modify;     // user-visible modification
+    bool undirty;         // user explicitly un-dirtying this object
+    bool cache_evict;     ///< true if this is a cache eviction
+    bool ignore_cache;    ///< true if IGNORE_CACHE flag is set
+    bool ignore_log_op_stats;  // don't log op stats
+    bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
+
+    // side effects
+    list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
+    list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
+    list<notify_info_t> notifies;
+    struct NotifyAck {
+      boost::optional<uint64_t> watch_cookie;
+      uint64_t notify_id;
+      bufferlist reply_bl;
+      explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
+      NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
+       : watch_cookie(cookie), notify_id(notify_id) {
+       reply_bl.claim(rbl);
+      }
+    };
+    list<NotifyAck> notify_acks;
+
+    uint64_t bytes_written, bytes_read;
+
+    utime_t mtime;
+    SnapContext snapc;           // writer snap context
+    eversion_t at_version;       // pg's current version pointer
+    version_t user_at_version;   // pg's current user version pointer
+
+    int current_osd_subop_num;
+
+    PGTransactionUPtr op_t;
+    vector<pg_log_entry_t> log;
+    boost::optional<pg_hit_set_history_t> updated_hset_history;
+
+    interval_set<uint64_t> modified_ranges;
+    ObjectContextRef obc;
+    ObjectContextRef clone_obc;    // if we created a clone
+    ObjectContextRef snapset_obc;  // if we created/deleted a snapdir
+
+    // FIXME: we may want to kill this msgr hint off at some point!
+    boost::optional<int> data_off = boost::none;
+
+    MOSDOpReply *reply;
+
+    PrimaryLogPG *pg;
+
+    int num_read;    ///< count read ops
+    int num_write;   ///< count update ops
+
+    mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
+
+    hobject_t new_temp_oid, discard_temp_oid;  ///< temp objects we should start/stop tracking
+
+    list<std::function<void()>> on_applied;
+    list<std::function<void()>> on_committed;
+    list<std::function<void()>> on_finish;
+    list<std::function<void()>> on_success;
+    template <typename F>
+    void register_on_finish(F &&f) {
+      on_finish.emplace_back(std::forward<F>(f));
+    }
+    template <typename F>
+    void register_on_success(F &&f) {
+      on_success.emplace_back(std::forward<F>(f));
+    }
+    template <typename F>
+    void register_on_applied(F &&f) {
+      on_applied.emplace_back(std::forward<F>(f));
+    }
+    template <typename F>
+    void register_on_commit(F &&f) {
+      on_committed.emplace_back(std::forward<F>(f));
+    }
+
+    bool sent_reply;
+
+    // pending async reads <off, len, op_flags> -> <outbl, outr>
+    list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+             pair<bufferlist*, Context*> > > pending_async_reads;
+    int inflightreads;
+    friend struct OnReadComplete;
+    void start_async_reads(PrimaryLogPG *pg);
+    void finish_read(PrimaryLogPG *pg);
+    bool async_reads_complete() {
+      return inflightreads == 0;
+    }
+
+    ObjectContext::RWState::State lock_type;
+    ObcLockManager lock_manager;
+
+    std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
+
+    OpContext(const OpContext& other);
+    const OpContext& operator=(const OpContext& other);
+
+    OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
+             ObjectContextRef& obc,
+             PrimaryLogPG *_pg) :
+      op(_op), reqid(_reqid), ops(_ops),
+      obs(&obc->obs),
+      snapset(0),
+      new_obs(obs->oi, obs->exists),
+      modify(false), user_modify(false), undirty(false), cache_evict(false),
+      ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
+      bytes_written(0), bytes_read(0), user_at_version(0),
+      current_osd_subop_num(0),
+      obc(obc),
+      reply(NULL), pg(_pg),
+      num_read(0),
+      num_write(0),
+      sent_reply(false),
+      inflightreads(0),
+      lock_type(ObjectContext::RWState::RWNONE) {
+      if (obc->ssc) {
+       new_snapset = obc->ssc->snapset;
+       snapset = &obc->ssc->snapset;
+      }
+    }
+    OpContext(OpRequestRef _op, osd_reqid_t _reqid,
+              vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
+      op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
+      modify(false), user_modify(false), undirty(false), cache_evict(false),
+      ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
+      bytes_written(0), bytes_read(0), user_at_version(0),
+      current_osd_subop_num(0),
+      reply(NULL), pg(_pg),
+      num_read(0),
+      num_write(0),
+      inflightreads(0),
+      lock_type(ObjectContext::RWState::RWNONE) {}
+    void reset_obs(ObjectContextRef obc) {
+      new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
+      if (obc->ssc) {
+       new_snapset = obc->ssc->snapset;
+       snapset = &obc->ssc->snapset;
+      }
+    }
+    ~OpContext() {
+      assert(!op_t);
+      if (reply)
+       reply->put();
+      for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
+                    pair<bufferlist*, Context*> > >::iterator i =
+            pending_async_reads.begin();
+          i != pending_async_reads.end();
+          pending_async_reads.erase(i++)) {
+       delete i->second.second;
+      }
+    }
+    uint64_t get_features() {
+      if (op && op->get_req()) {
+        return op->get_req()->get_connection()->get_features();
+      }
+      return -1ull;
+    }
+  };
+  using OpContextUPtr = std::unique_ptr<OpContext>;
+  friend struct OpContext;
+
+  /*
+   * State on the PG primary associated with the replicated mutation
+   */
+  class RepGather {
+  public:
+    hobject_t hoid;
+    OpRequestRef op;
+    xlist<RepGather*>::item queue_item;
+    int nref;
+
+    eversion_t v;
+    int r = 0;
+
+    ceph_tid_t rep_tid;
+
+    bool rep_aborted, rep_done;
+
+    bool all_applied;
+    bool all_committed;
+    const bool applies_with_commit;
+    
+    utime_t   start;
+    
+    eversion_t          pg_local_last_complete;
+
+    ObcLockManager lock_manager;
+
+    list<std::function<void()>> on_applied;
+    list<std::function<void()>> on_committed;
+    list<std::function<void()>> on_success;
+    list<std::function<void()>> on_finish;
+    
+    RepGather(
+      OpContext *c, ceph_tid_t rt,
+      eversion_t lc,
+      bool applies_with_commit) :
+      hoid(c->obc->obs.oi.soid),
+      op(c->op),
+      queue_item(this),
+      nref(1),
+      rep_tid(rt), 
+      rep_aborted(false), rep_done(false),
+      all_applied(false), all_committed(false),
+      applies_with_commit(applies_with_commit),
+      pg_local_last_complete(lc),
+      lock_manager(std::move(c->lock_manager)),
+      on_applied(std::move(c->on_applied)),
+      on_committed(std::move(c->on_committed)),
+      on_success(std::move(c->on_success)),
+      on_finish(std::move(c->on_finish)) {}
+
+    RepGather(
+      ObcLockManager &&manager,
+      OpRequestRef &&o,
+      boost::optional<std::function<void(void)> > &&on_complete,
+      ceph_tid_t rt,
+      eversion_t lc,
+      bool applies_with_commit,
+      int r) :
+      op(o),
+      queue_item(this),
+      nref(1),
+      r(r),
+      rep_tid(rt),
+      rep_aborted(false), rep_done(false),
+      all_applied(false), all_committed(false),
+      applies_with_commit(applies_with_commit),
+      pg_local_last_complete(lc),
+      lock_manager(std::move(manager)) {
+      if (on_complete) {
+       on_success.push_back(std::move(*on_complete));
+      }
+    }
+
+    RepGather *get() {
+      nref++;
+      return this;
+    }
+    void put() {
+      assert(nref > 0);
+      if (--nref == 0) {
+       assert(on_applied.empty());
+       delete this;
+       //generic_dout(0) << "deleting " << this << dendl;
+      }
+    }
+  };
+
+
+protected:
+
+  /**
+   * Grabs locks for OpContext, should be cleaned up in close_op_ctx
+   *
+   * @param ctx [in,out] ctx to get locks for
+   * @return true on success, false if we are queued
+   */
+  bool get_rw_locks(bool write_ordered, OpContext *ctx) {
+    /* If snapset_obc, !obc->obs->exists and we will always take the
+     * snapdir lock *before* the head lock.  Since all callers will do
+     * this (read or write) if we get the first we will be guaranteed
+     * to get the second.
+     */
+    if (write_ordered && ctx->op->may_read()) {
+      ctx->lock_type = ObjectContext::RWState::RWEXCL;
+    } else if (write_ordered) {
+      ctx->lock_type = ObjectContext::RWState::RWWRITE;
+    } else {
+      assert(ctx->op->may_read());
+      ctx->lock_type = ObjectContext::RWState::RWREAD;
+    }
+
+    if (ctx->snapset_obc) {
+      assert(!ctx->obc->obs.exists);
+      if (!ctx->lock_manager.get_lock_type(
+           ctx->lock_type,
+           ctx->snapset_obc->obs.oi.soid,
+           ctx->snapset_obc,
+           ctx->op)) {
+       ctx->lock_type = ObjectContext::RWState::RWNONE;
+       return false;
+      }
+    }
+    if (ctx->lock_manager.get_lock_type(
+         ctx->lock_type,
+         ctx->obc->obs.oi.soid,
+         ctx->obc,
+         ctx->op)) {
+      return true;
+    } else {
+      assert(!ctx->snapset_obc);
+      ctx->lock_type = ObjectContext::RWState::RWNONE;
+      return false;
+    }
+  }
+
+  /**
+   * Cleans up OpContext
+   *
+   * @param ctx [in] ctx to clean up
+   */
+  void close_op_ctx(OpContext *ctx);
+
+  /**
+   * Releases locks
+   *
+   * @param manager [in] manager with locks to release
+   */
+  void release_object_locks(
+    ObcLockManager &lock_manager) {
+    list<pair<hobject_t, list<OpRequestRef> > > to_req;
+    bool requeue_recovery = false;
+    bool requeue_snaptrim = false;
+    lock_manager.put_locks(
+      &to_req,
+      &requeue_recovery,
+      &requeue_snaptrim);
+    if (requeue_recovery)
+      queue_recovery();
+    if (requeue_snaptrim)
+      snap_trimmer_machine.process_event(TrimWriteUnblocked());
+
+    if (!to_req.empty()) {
+      // requeue at front of scrub blocking queue if we are blocked by scrub
+      for (auto &&p: to_req) {
+       if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
+         waiting_for_scrub.splice(
+           waiting_for_scrub.begin(),
+           p.second,
+           p.second.begin(),
+           p.second.end());
+       } else {
+         requeue_ops(p.second);
+       }
+      }
+    }
+  }
+
+  // replica ops
+  // [primary|tail]
+  xlist<RepGather*> repop_queue;
+
+  friend class C_OSD_RepopApplied;
+  friend class C_OSD_RepopCommit;
+  void repop_all_applied(RepGather *repop);
+  void repop_all_committed(RepGather *repop);
+  void eval_repop(RepGather*);
+  void issue_repop(RepGather *repop, OpContext *ctx);
+  RepGather *new_repop(
+    OpContext *ctx,
+    ObjectContextRef obc,
+    ceph_tid_t rep_tid);
+  boost::intrusive_ptr<RepGather> new_repop(
+    eversion_t version,
+    int r,
+    ObcLockManager &&manager,
+    OpRequestRef &&op,
+    boost::optional<std::function<void(void)> > &&on_complete);
+  void remove_repop(RepGather *repop);
+
+  OpContextUPtr simple_opc_create(ObjectContextRef obc);
+  void simple_opc_submit(OpContextUPtr ctx);
+
+  /**
+   * Merge entries atomically into all actingbackfill osds
+   * adjusting missing and recovery state as necessary.
+   *
+   * Also used to store error log entries for dup detection.
+   */
+  void submit_log_entries(
+    const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+    ObcLockManager &&manager,
+    boost::optional<std::function<void(void)> > &&on_complete,
+    OpRequestRef op = OpRequestRef(),
+    int r = 0);
+  struct LogUpdateCtx {
+    boost::intrusive_ptr<RepGather> repop;
+    set<pg_shard_t> waiting_on;
+  };
+  void cancel_log_updates();
+  map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
+
+
+  // hot/cold tracking
+  HitSetRef hit_set;        ///< currently accumulating HitSet
+  utime_t hit_set_start_stamp;    ///< time the current HitSet started recording
+
+
+  void hit_set_clear();     ///< discard any HitSet state
+  void hit_set_setup();     ///< initialize HitSet state
+  void hit_set_create();    ///< create a new HitSet
+  void hit_set_persist();   ///< persist hit info
+  bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
+  void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
+  void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
+  void hit_set_remove_all();
+
+  hobject_t get_hit_set_current_object(utime_t stamp);
+  hobject_t get_hit_set_archive_object(utime_t start,
+                                      utime_t end,
+                                      bool using_gmt);
+
+  // agent
+  boost::scoped_ptr<TierAgentState> agent_state;
+
+  void agent_setup();       ///< initialize agent state
+  bool agent_work(int max) override ///< entry point to do some agent work
+  {
+    return agent_work(max, max);
+  }
+  bool agent_work(int max, int agent_flush_quota) override;
+  bool agent_maybe_flush(ObjectContextRef& obc);  ///< maybe flush
+  bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush);  ///< maybe evict
+
+  void agent_load_hit_sets();  ///< load HitSets, if needed
+
+  /// estimate object atime and temperature
+  ///
+  /// @param oid [in] object name
+  /// @param temperature [out] relative temperature (# consider both access time and frequency)
+  void agent_estimate_temp(const hobject_t& oid, int *temperature);
+
+  /// stop the agent
+  void agent_stop() override;
+  void agent_delay() override;
+
+  /// clear agent state
+  void agent_clear() override;
+
+  /// choose (new) agent mode(s), returns true if op is requeued
+  bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
+  void agent_choose_mode_restart() override;
+
+  /// true if we can send an ondisk/commit for v
+  bool already_complete(eversion_t v);
+  /// true if we can send an ack for v
+  bool already_ack(eversion_t v);
+
+  // projected object info
+  SharedLRU<hobject_t, ObjectContext> object_contexts;
+  // map from oid.snapdir() to SnapSetContext *
+  map<hobject_t, SnapSetContext*> snapset_contexts;
+  Mutex snapset_contexts_lock;
+
+  // debug order that client ops are applied
+  map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
+
+  void populate_obc_watchers(ObjectContextRef obc);
+  void check_blacklisted_obc_watchers(ObjectContextRef obc);
+  void check_blacklisted_watchers() override;
+  void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
+  void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
+public:
+  void handle_watch_timeout(WatchRef watch);
+protected:
+
+  ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
+  ObjectContextRef get_object_context(
+    const hobject_t& soid,
+    bool can_create,
+    const map<string, bufferlist> *attrs = 0
+    );
+
+  void context_registry_on_change();
+  void object_context_destructor_callback(ObjectContext *obc);
+  class C_PG_ObjectContext;
+
+  int find_object_context(const hobject_t& oid,
+                         ObjectContextRef *pobc,
+                         bool can_create,
+                         bool map_snapid_to_clone=false,
+                         hobject_t *missing_oid=NULL);
+
+  void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
+
+  void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
+
+  SnapSetContext *get_snapset_context(
+    const hobject_t& oid,
+    bool can_create,
+    const map<string, bufferlist> *attrs = 0,
+    bool oid_existed = true //indicate this oid whether exsited in backend
+    );
+  void register_snapset_context(SnapSetContext *ssc) {
+    Mutex::Locker l(snapset_contexts_lock);
+    _register_snapset_context(ssc);
+  }
+  void _register_snapset_context(SnapSetContext *ssc) {
+    assert(snapset_contexts_lock.is_locked());
+    if (!ssc->registered) {
+      assert(snapset_contexts.count(ssc->oid) == 0);
+      ssc->registered = true;
+      snapset_contexts[ssc->oid] = ssc;
+    }
+  }
+  void put_snapset_context(SnapSetContext *ssc);
+
+  map<hobject_t, ObjectContextRef> recovering;
+
+  /*
+   * Backfill
+   *
+   * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
+   *
+   * objects prior to peer_info[backfill_target].last_backfill
+   *   - are on the peer
+   *   - are included in the peer stats
+   *
+   * objects \in (last_backfill, last_backfill_started]
+   *   - are on the peer or are in backfills_in_flight
+   *   - are not included in pg stats (yet)
+   *   - have their stats in pending_backfill_updates on the primary
+   */
+  set<hobject_t> backfills_in_flight;
+  map<hobject_t, pg_stat_t> pending_backfill_updates;
+
+  void dump_recovery_info(Formatter *f) const override {
+    f->open_array_section("backfill_targets");
+    for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
+        p != backfill_targets.end(); ++p)
+      f->dump_stream("replica") << *p;
+    f->close_section();
+    f->open_array_section("waiting_on_backfill");
+    for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
+        p != waiting_on_backfill.end(); ++p)
+      f->dump_stream("osd") << *p;
+    f->close_section();
+    f->dump_stream("last_backfill_started") << last_backfill_started;
+    {
+      f->open_object_section("backfill_info");
+      backfill_info.dump(f);
+      f->close_section();
+    }
+    {
+      f->open_array_section("peer_backfill_info");
+      for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
+            peer_backfill_info.begin();
+          pbi != peer_backfill_info.end(); ++pbi) {
+        f->dump_stream("osd") << pbi->first;
+        f->open_object_section("BackfillInterval");
+          pbi->second.dump(f);
+        f->close_section();
+      }
+      f->close_section();
+    }
+    {
+      f->open_array_section("backfills_in_flight");
+      for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
+          i != backfills_in_flight.end();
+          ++i) {
+       f->dump_stream("object") << *i;
+      }
+      f->close_section();
+    }
+    {
+      f->open_array_section("recovering");
+      for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
+          i != recovering.end();
+          ++i) {
+       f->dump_stream("object") << i->first;
+      }
+      f->close_section();
+    }
+    {
+      f->open_object_section("pg_backend");
+      pgbackend->dump_recovery_info(f);
+      f->close_section();
+    }
+  }
+
+  /// last backfill operation started
+  hobject_t last_backfill_started;
+  bool new_backfill;
+
+  int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
+                                PGBackend::RecoveryHandle *h);
+  int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
+                                 PGBackend::RecoveryHandle *h);
+
+  void finish_degraded_object(const hobject_t& oid);
+
+  // Cancels/resets pulls from peer
+  void check_recovery_sources(const OSDMapRef& map) override ;
+
+  int recover_missing(
+    const hobject_t& oid,
+    eversion_t v,
+    int priority,
+    PGBackend::RecoveryHandle *h);
+
+  // low level ops
+
+  void _make_clone(
+    OpContext *ctx,
+    PGTransaction* t,
+    ObjectContextRef obc,
+    const hobject_t& head, const hobject_t& coid,
+    object_info_t *poi);
+  void execute_ctx(OpContext *ctx);
+  void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
+  void reply_ctx(OpContext *ctx, int err);
+  void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
+  void make_writeable(OpContext *ctx);
+  void log_op_stats(OpContext *ctx);
+
+  void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
+                                  interval_set<uint64_t>& modified, uint64_t offset,
+                                  uint64_t length, bool write_full=false);
+  void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
+
+
+  enum class cache_result_t {
+    NOOP,
+    BLOCKED_FULL,
+    BLOCKED_PROMOTE,
+    HANDLED_PROXY,
+    HANDLED_REDIRECT,
+  };
+  cache_result_t maybe_handle_cache_detail(OpRequestRef op,
+                                          bool write_ordered,
+                                          ObjectContextRef obc, int r,
+                                          hobject_t missing_oid,
+                                          bool must_promote,
+                                          bool in_hit_set,
+                                          ObjectContextRef *promote_obc);
+  cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
+                                                    bool write_ordered,
+                                                    ObjectContextRef obc);
+  bool maybe_handle_manifest(OpRequestRef op,
+                             bool write_ordered,
+                             ObjectContextRef obc) {
+    return cache_result_t::NOOP != maybe_handle_manifest_detail(
+      op,
+      write_ordered,
+      obc);
+  }
+
+  /**
+   * This helper function is called from do_op if the ObjectContext lookup fails.
+   * @returns true if the caching code is handling the Op, false otherwise.
+   */
+  bool maybe_handle_cache(OpRequestRef op,
+                         bool write_ordered,
+                         ObjectContextRef obc, int r,
+                         const hobject_t& missing_oid,
+                         bool must_promote,
+                         bool in_hit_set = false) {
+    return cache_result_t::NOOP != maybe_handle_cache_detail(
+      op,
+      write_ordered,
+      obc,
+      r,
+      missing_oid,
+      must_promote,
+      in_hit_set,
+      nullptr);
+  }
+
+  /**
+   * This helper function checks if a promotion is needed.
+   */
+  bool maybe_promote(ObjectContextRef obc,
+                    const hobject_t& missing_oid,
+                    const object_locator_t& oloc,
+                    bool in_hit_set,
+                    uint32_t recency,
+                    OpRequestRef promote_op,
+                    ObjectContextRef *promote_obc = nullptr);
+  /**
+   * This helper function tells the client to redirect their request elsewhere.
+   */
+  void do_cache_redirect(OpRequestRef op);
+  /**
+   * This function attempts to start a promote.  Either it succeeds,
+   * or places op on a wait list.  If op is null, failure means that
+   * this is a noop.  If a future user wants to be able to distinguish
+   * these cases, a return value should be added.
+   */
+  void promote_object(
+    ObjectContextRef obc,            ///< [optional] obc
+    const hobject_t& missing_object, ///< oid (if !obc)
+    const object_locator_t& oloc,    ///< locator for obc|oid
+    OpRequestRef op,                 ///< [optional] client op
+    ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
+    );
+
+  int prepare_transaction(OpContext *ctx);
+  list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
+  void complete_read_ctx(int result, OpContext *ctx);
+  
+  // pg on-disk content
+  void check_local() override;
+
+  void _clear_recovery_state() override;
+
+  bool start_recovery_ops(
+    uint64_t max,
+    ThreadPool::TPHandle &handle, uint64_t *started) override;
+
+  uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
+  uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
+  hobject_t earliest_peer_backfill() const;
+  bool all_peer_done() const;
+  /**
+   * @param work_started will be set to true if recover_backfill got anywhere
+   * @returns the number of operations started
+   */
+  uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
+                           bool *work_started);
+
+  /**
+   * scan a (hash) range of objects in the current pg
+   *
+   * @begin first item should be >= this value
+   * @min return at least this many items, unless we are done
+   * @max return no more than this many items
+   * @bi [out] resulting map of objects to eversion_t's
+   */
+  void scan_range(
+    int min, int max, BackfillInterval *bi,
+    ThreadPool::TPHandle &handle
+    );
+
+  /// Update a hash range to reflect changes since the last scan
+  void update_range(
+    BackfillInterval *bi,        ///< [in,out] interval to update
+    ThreadPool::TPHandle &handle ///< [in] tp handle
+    );
+
+  int prep_backfill_object_push(
+    hobject_t oid, eversion_t v, ObjectContextRef obc,
+    vector<pg_shard_t> peers,
+    PGBackend::RecoveryHandle *h);
+  void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
+
+
+  class C_OSD_OndiskWriteUnlock;
+  class C_OSD_AppliedRecoveredObject;
+  class C_OSD_CommittedPushedObject;
+  class C_OSD_AppliedRecoveredObjectReplica;
+  void sub_op_remove(OpRequestRef op);
+
+  void _applied_recovered_object(ObjectContextRef obc);
+  void _applied_recovered_object_replica();
+  void _committed_pushed_object(epoch_t epoch, eversion_t lc);
+  void recover_got(hobject_t oid, eversion_t v);
+
+  // -- copyfrom --
+  map<hobject_t, CopyOpRef> copy_ops;
+
+  int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
+                 ObjectContextRef& obc);
+  int finish_copy_get();
+
+  void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
+                              OSDOp& osd_op);
+
+  /**
+   * To copy an object, call start_copy.
+   *
+   * @param cb: The CopyCallback to be activated when the copy is complete
+   * @param obc: The ObjectContext we are copying into
+   * @param src: The source object
+   * @param oloc: the source object locator
+   * @param version: the version of the source object to copy (0 for any)
+   */
+  void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
+                 object_locator_t oloc, version_t version, unsigned flags,
+                 bool mirror_snapset, unsigned src_obj_fadvise_flags,
+                 unsigned dest_obj_fadvise_flags);
+  void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
+  void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
+  uint64_t get_copy_chunk_size() const {
+    uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
+    if (pool.info.requires_aligned_append()) {
+      uint64_t alignment = pool.info.required_alignment();
+      if (size % alignment) {
+       size += alignment - (size % alignment);
+      }
+    }
+    return size;
+  }
+  void _copy_some(ObjectContextRef obc, CopyOpRef cop);
+  void finish_copyfrom(CopyFromCallback *cb);
+  void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
+  void cancel_copy(CopyOpRef cop, bool requeue);
+  void cancel_copy_ops(bool requeue);
+
+  friend struct C_Copyfrom;
+
+  // -- flush --
+  map<hobject_t, FlushOpRef> flush_ops;
+
+  /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
+  int start_flush(
+    OpRequestRef op, ObjectContextRef obc,
+    bool blocking, hobject_t *pmissing,
+    boost::optional<std::function<void()>> &&on_flush);
+  void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
+  int try_flush_mark_clean(FlushOpRef fop);
+  void cancel_flush(FlushOpRef fop, bool requeue);
+  void cancel_flush_ops(bool requeue);
+
+  /// @return false if clone is has been evicted
+  bool is_present_clone(hobject_t coid);
+
+  friend struct C_Flush;
+
+  // -- scrub --
+  bool _range_available_for_scrub(
+    const hobject_t &begin, const hobject_t &end) override;
+  void scrub_snapshot_metadata(
+    ScrubMap &map,
+    const std::map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest) override;
+  void _scrub_clear_state() override;
+  void _scrub_finish() override;
+  object_stat_collection_t scrub_cstat;
+
+  void _split_into(pg_t child_pgid, PG *child,
+                   unsigned split_bits) override;
+  void apply_and_flush_repops(bool requeue);
+
+  void calc_trim_to() override;
+  int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
+  int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
+
+  // -- checksum --
+  int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
+  int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
+                      bufferlist::iterator *init_value_bl_it,
+                      const bufferlist &read_bl);
+
+  friend class C_ChecksumRead;
+
+  int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
+  int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
+
+  friend class C_ExtentCmpRead;
+
+  int do_read(OpContext *ctx, OSDOp& osd_op);
+  int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
+  int do_writesame(OpContext *ctx, OSDOp& osd_op);
+
+  bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
+  int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
+
+  map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
+  void kick_proxy_ops_blocked(hobject_t& soid);
+  void cancel_proxy_ops(bool requeue);
+
+  // -- proxyread --
+  map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
+
+  void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
+  void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
+  void cancel_proxy_read(ProxyReadOpRef prdop);
+
+  friend struct C_ProxyRead;
+
+  // -- proxywrite --
+  map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
+
+  void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
+  void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
+  void cancel_proxy_write(ProxyWriteOpRef pwop);
+
+  friend struct C_ProxyWrite_Commit;
+
+public:
+  PrimaryLogPG(OSDService *o, OSDMapRef curmap,
+              const PGPool &_pool, spg_t p);
+  ~PrimaryLogPG() override {}
+
+  int do_command(
+    cmdmap_t cmdmap,
+    ostream& ss,
+    bufferlist& idata,
+    bufferlist& odata,
+    ConnectionRef conn,
+    ceph_tid_t tid) override;
+
+  void do_request(
+    OpRequestRef& op,
+    ThreadPool::TPHandle &handle) override;
+  void do_op(OpRequestRef& op) override;
+  void record_write_error(OpRequestRef op, const hobject_t &soid,
+                         MOSDOpReply *orig_reply, int r);
+  void do_pg_op(OpRequestRef op);
+  void do_sub_op(OpRequestRef op) override;
+  void do_sub_op_reply(OpRequestRef op) override;
+  void do_scan(
+    OpRequestRef op,
+    ThreadPool::TPHandle &handle) override;
+  void do_backfill(OpRequestRef op) override;
+  void do_backfill_remove(OpRequestRef op);
+
+  void handle_backoff(OpRequestRef& op);
+
+  int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
+  void snap_trimmer(epoch_t e) override;
+  void kick_snap_trim() override;
+  void snap_trimmer_scrub_complete() override;
+  int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
+
+  int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
+  int do_tmap2omap(OpContext *ctx, unsigned flags);
+  int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
+  int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
+
+  void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
+private:
+  int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
+  hobject_t earliest_backfill() const;
+  bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
+
+  uint64_t temp_seq; ///< last id for naming temp objects
+  /// generate a new temp object name
+  hobject_t generate_temp_object(const hobject_t& target);
+  /// generate a new temp object name (for recovery)
+  hobject_t get_temp_recovery_object(const hobject_t& target,
+                                    eversion_t version) override;
+  int get_recovery_op_priority() const {
+      int pri = 0;
+      pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
+      return  pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
+  }
+  void log_missing(unsigned missing,
+                       const boost::optional<hobject_t> &head,
+                       LogChannelRef clog,
+                       const spg_t &pgid,
+                       const char *func,
+                       const char *mode,
+                       bool allow_incomplete_clones);
+  unsigned process_clones_to(const boost::optional<hobject_t> &head,
+    const boost::optional<SnapSet> &snapset,
+    LogChannelRef clog,
+    const spg_t &pgid,
+    const char *mode,
+    bool allow_incomplete_clones,
+    boost::optional<snapid_t> target,
+    vector<snapid_t>::reverse_iterator *curclone,
+    inconsistent_snapset_wrapper &snap_error);
+
+public:
+  coll_t get_coll() {
+    return coll;
+  }
+  void split_colls(
+    spg_t child,
+    int split_bits,
+    int seed,
+    const pg_pool_t *pool,
+    ObjectStore::Transaction *t) override {
+    coll_t target = coll_t(child);
+    PG::_create(*t, child, split_bits);
+    t->split_collection(
+      coll,
+      split_bits,
+      seed,
+      target);
+    PG::_init(*t, child, pool);
+  }
+private:
+
+  struct DoSnapWork : boost::statechart::event< DoSnapWork > {
+    DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
+  };
+  struct KickTrim : boost::statechart::event< KickTrim > {
+    KickTrim() : boost::statechart::event < KickTrim >() {}
+  };
+  struct RepopsComplete : boost::statechart::event< RepopsComplete > {
+    RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
+  };
+  struct ScrubComplete : boost::statechart::event< ScrubComplete > {
+    ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
+  };
+  struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
+    TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
+  };
+  struct Reset : boost::statechart::event< Reset > {
+    Reset() : boost::statechart::event< Reset >() {}
+  };
+  struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
+    SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
+  };
+  struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
+    SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
+  };
+
+  struct NotTrimming;
+  struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
+    PrimaryLogPG *pg;
+    explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
+    void log_enter(const char *state_name);
+    void log_exit(const char *state_name, utime_t duration);
+    bool can_trim() {
+      return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
+    }
+  } snap_trimmer_machine;
+
+  struct WaitReservation;
+  struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< KickTrim >,
+      boost::statechart::transition< Reset, NotTrimming >
+      > reactions;
+
+    set<hobject_t> in_flight;
+    snapid_t snap_to_trim;
+
+    explicit Trimming(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming") {
+      context< SnapTrimmer >().log_enter(state_name);
+      assert(context< SnapTrimmer >().can_trim());
+      assert(in_flight.empty());
+    }
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+      auto *pg = context< SnapTrimmer >().pg;
+      pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
+      pg->state_clear(PG_STATE_SNAPTRIM);
+      pg->publish_stats_to_osd();
+    }
+    boost::statechart::result react(const KickTrim&) {
+      return discard_event();
+    }
+  };
+
+  /* SnapTrimmerStates */
+  struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< SnapTrimTimerReady >
+      > reactions;
+    Context *wakeup = nullptr;
+    explicit WaitTrimTimer(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
+      context< SnapTrimmer >().log_enter(state_name);
+      assert(context<Trimming>().in_flight.empty());
+      struct OnTimer : Context {
+       PrimaryLogPGRef pg;
+       epoch_t epoch;
+       OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
+       void finish(int) override {
+         pg->lock();
+         if (!pg->pg_has_reset_since(epoch))
+           pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
+         pg->unlock();
+       }
+      };
+      auto *pg = context< SnapTrimmer >().pg;
+      if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
+       Mutex::Locker l(pg->osd->snap_sleep_lock);
+       wakeup = pg->osd->snap_sleep_timer.add_event_after(
+         pg->cct->_conf->osd_snap_trim_sleep,
+         new OnTimer{pg, pg->get_osdmap()->get_epoch()});
+      } else {
+       post_event(SnapTrimTimerReady());
+      }
+    }
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+      auto *pg = context< SnapTrimmer >().pg;
+      if (wakeup) {
+       Mutex::Locker l(pg->osd->snap_sleep_lock);
+       pg->osd->snap_sleep_timer.cancel_event(wakeup);
+       wakeup = nullptr;
+      }
+    }
+    boost::statechart::result react(const SnapTrimTimerReady &) {
+      wakeup = nullptr;
+      if (!context< SnapTrimmer >().can_trim()) {
+       post_event(KickTrim());
+       return transit< NotTrimming >();
+      } else {
+       return transit< AwaitAsyncWork >();
+      }
+    }
+  };
+
+  struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< TrimWriteUnblocked >
+      > reactions;
+    explicit WaitRWLock(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
+      context< SnapTrimmer >().log_enter(state_name);
+      assert(context<Trimming>().in_flight.empty());
+    }
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+    }
+    boost::statechart::result react(const TrimWriteUnblocked&) {
+      if (!context< SnapTrimmer >().can_trim()) {
+       post_event(KickTrim());
+       return transit< NotTrimming >();
+      } else {
+       return transit< AwaitAsyncWork >();
+      }
+    }
+  };
+
+  struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< RepopsComplete >
+      > reactions;
+    explicit WaitRepops(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
+      context< SnapTrimmer >().log_enter(state_name);
+      assert(!context<Trimming>().in_flight.empty());
+    }
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+    }
+    boost::statechart::result react(const RepopsComplete&) {
+      if (!context< SnapTrimmer >().can_trim()) {
+       post_event(KickTrim());
+       return transit< NotTrimming >();
+      } else {
+       return transit< WaitTrimTimer >();
+      }
+    }
+  };
+
+  struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< DoSnapWork >
+      > reactions;
+    explicit AwaitAsyncWork(my_context ctx);
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+    }
+    boost::statechart::result react(const DoSnapWork&);
+  };
+
+  struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
+    /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
+     * always cancels the reservation */
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< SnapTrimReserved >
+      > reactions;
+    struct ReservationCB : public Context {
+      PrimaryLogPGRef pg;
+      bool canceled;
+      ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
+      void finish(int) override {
+       pg->lock();
+       if (!canceled)
+         pg->snap_trimmer_machine.process_event(SnapTrimReserved());
+       pg->unlock();
+      }
+      void cancel() {
+       assert(pg->is_locked());
+       assert(!canceled);
+       canceled = true;
+      }
+    };
+    ReservationCB *pending = nullptr;
+
+    explicit WaitReservation(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
+      context< SnapTrimmer >().log_enter(state_name);
+      assert(context<Trimming>().in_flight.empty());
+      auto *pg = context< SnapTrimmer >().pg;
+      pending = new ReservationCB(pg);
+      pg->osd->snap_reserver.request_reservation(
+       pg->get_pgid(),
+       pending,
+       0);
+      pg->state_set(PG_STATE_SNAPTRIM_WAIT);
+      pg->publish_stats_to_osd();
+    }
+    boost::statechart::result react(const SnapTrimReserved&);
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+      if (pending)
+       pending->cancel();
+      pending = nullptr;
+      auto *pg = context< SnapTrimmer >().pg;
+      pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
+      pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
+      pg->publish_stats_to_osd();
+    }
+  };
+
+  struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< ScrubComplete >,
+      boost::statechart::custom_reaction< KickTrim >,
+      boost::statechart::transition< Reset, NotTrimming >
+      > reactions;
+    explicit WaitScrub(my_context ctx)
+      : my_base(ctx),
+       NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
+      context< SnapTrimmer >().log_enter(state_name);
+    }
+    void exit() {
+      context< SnapTrimmer >().log_exit(state_name, enter_time);
+    }
+    boost::statechart::result react(const ScrubComplete&) {
+      post_event(KickTrim());
+      return transit< NotTrimming >();
+    }
+    boost::statechart::result react(const KickTrim&) {
+      return discard_event();
+    }
+  };
+
+  struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
+    typedef boost::mpl::list <
+      boost::statechart::custom_reaction< KickTrim >,
+      boost::statechart::transition< Reset, NotTrimming >
+      > reactions;
+    explicit NotTrimming(my_context ctx);
+    void exit();
+    boost::statechart::result react(const KickTrim&);
+  };
+
+  int _verify_no_head_clones(const hobject_t& soid,
+                            const SnapSet& ss);
+  // return true if we're creating a local object, false for a
+  // whiteout or no change.
+  void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
+  int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
+  int _rollback_to(OpContext *ctx, ceph_osd_op& op);
+public:
+  bool is_missing_object(const hobject_t& oid) const;
+  bool is_unreadable_object(const hobject_t &oid) const {
+    return is_missing_object(oid) ||
+      !missing_loc.readable_with_acting(oid, actingset);
+  }
+  void maybe_kick_recovery(const hobject_t &soid);
+  void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
+  void wait_for_all_missing(OpRequestRef op);
+
+  bool is_degraded_or_backfilling_object(const hobject_t& oid);
+  void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
+
+  void block_write_on_full_cache(
+    const hobject_t& oid, OpRequestRef op);
+  void block_for_clean(
+    const hobject_t& oid, OpRequestRef op);
+  void block_write_on_snap_rollback(
+    const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
+  void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
+
+  bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
+  void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+  void kick_object_context_blocked(ObjectContextRef obc);
+
+  void maybe_force_recovery();
+
+  void mark_all_unfound_lost(
+    int what,
+    ConnectionRef con,
+    ceph_tid_t tid);
+  eversion_t pick_newest_available(const hobject_t& oid);
+
+  void do_update_log_missing(
+    OpRequestRef &op);
+
+  void do_update_log_missing_reply(
+    OpRequestRef &op);
+
+  void on_role_change() override;
+  void on_pool_change() override;
+  void _on_new_interval() override;
+  void clear_async_reads();
+  void on_change(ObjectStore::Transaction *t) override;
+  void on_activate() override;
+  void on_flushed() override;
+  void on_removal(ObjectStore::Transaction *t) override;
+  void on_shutdown() override;
+  bool check_failsafe_full(ostream &ss) override;
+  bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
+  int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
+
+  // attr cache handling
+  void setattr_maybe_cache(
+    ObjectContextRef obc,
+    OpContext *op,
+    PGTransaction *t,
+    const string &key,
+    bufferlist &val);
+  void setattrs_maybe_cache(
+    ObjectContextRef obc,
+    OpContext *op,
+    PGTransaction *t,
+    map<string, bufferlist> &attrs);
+  void rmattr_maybe_cache(
+    ObjectContextRef obc,
+    OpContext *op,
+    PGTransaction *t,
+    const string &key);
+  int getattr_maybe_cache(
+    ObjectContextRef obc,
+    const string &key,
+    bufferlist *val);
+  int getattrs_maybe_cache(
+    ObjectContextRef obc,
+    map<string, bufferlist> *out,
+    bool user_only = false);
+};
+
+inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
+{
+  out << "repgather(" << &repop
+      << " " << repop.v
+      << " rep_tid=" << repop.rep_tid 
+      << " committed?=" << repop.all_committed
+      << " applied?=" << repop.all_applied
+      << " r=" << repop.r
+      << ")";
+  return out;
+}
+
+inline ostream& operator<<(ostream& out,
+                          const PrimaryLogPG::ProxyWriteOpRef& pwop)
+{
+  out << "proxywrite(" << &pwop
+      << " " << pwop->user_version
+      << " pwop_tid=" << pwop->objecter_tid;
+  if (pwop->ctx->op)
+    out << " op=" << *(pwop->ctx->op->get_req());
+  out << ")";
+  return out;
+}
+
+void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
+void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
+
+
+#endif