initial code repo
[stor4nfv.git] / src / ceph / src / osd / OSD.h
diff --git a/src/ceph/src/osd/OSD.h b/src/ceph/src/osd/OSD.h
new file mode 100644 (file)
index 0000000..e34bd51
--- /dev/null
@@ -0,0 +1,2465 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_OSD_H
+#define CEPH_OSD_H
+
+#include "PG.h"
+
+#include "msg/Dispatcher.h"
+
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "common/AsyncReserver.h"
+#include "common/ceph_context.h"
+#include "common/zipkin_trace.h"
+
+#include "mgr/MgrClient.h"
+
+#include "os/ObjectStore.h"
+#include "OSDCap.h" 
+#include "auth/KeyRing.h"
+#include "osd/ClassHandler.h"
+
+#include "include/CompatSet.h"
+
+#include "OpRequest.h"
+#include "Session.h"
+
+#include "osd/PGQueueable.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+#include "include/memory.h"
+using namespace std;
+
+#include "include/unordered_map.h"
+
+#include "common/shared_cache.hpp"
+#include "common/simple_cache.hpp"
+#include "common/sharedptr_registry.hpp"
+#include "common/WeightedPriorityQueue.h"
+#include "common/PrioritizedQueue.h"
+#include "osd/mClockOpClassQueue.h"
+#include "osd/mClockClientQueue.h"
+#include "messages/MOSDOp.h"
+#include "include/Spinlock.h"
+#include "common/EventTrace.h"
+
+#define CEPH_OSD_PROTOCOL    10 /* cluster internal */
+
+
+enum {
+  l_osd_first = 10000,
+  l_osd_op_wip,
+  l_osd_op,
+  l_osd_op_inb,
+  l_osd_op_outb,
+  l_osd_op_lat,
+  l_osd_op_process_lat,
+  l_osd_op_prepare_lat,
+  l_osd_op_r,
+  l_osd_op_r_outb,
+  l_osd_op_r_lat,
+  l_osd_op_r_lat_outb_hist,
+  l_osd_op_r_process_lat,
+  l_osd_op_r_prepare_lat,
+  l_osd_op_w,
+  l_osd_op_w_inb,
+  l_osd_op_w_lat,
+  l_osd_op_w_lat_inb_hist,
+  l_osd_op_w_process_lat,
+  l_osd_op_w_prepare_lat,
+  l_osd_op_rw,
+  l_osd_op_rw_inb,
+  l_osd_op_rw_outb,
+  l_osd_op_rw_lat,
+  l_osd_op_rw_lat_inb_hist,
+  l_osd_op_rw_lat_outb_hist,
+  l_osd_op_rw_process_lat,
+  l_osd_op_rw_prepare_lat,
+
+  l_osd_op_before_queue_op_lat,
+  l_osd_op_before_dequeue_op_lat,
+
+  l_osd_sop,
+  l_osd_sop_inb,
+  l_osd_sop_lat,
+  l_osd_sop_w,
+  l_osd_sop_w_inb,
+  l_osd_sop_w_lat,
+  l_osd_sop_pull,
+  l_osd_sop_pull_lat,
+  l_osd_sop_push,
+  l_osd_sop_push_inb,
+  l_osd_sop_push_lat,
+
+  l_osd_pull,
+  l_osd_push,
+  l_osd_push_outb,
+
+  l_osd_rop,
+
+  l_osd_loadavg,
+  l_osd_buf,
+  l_osd_history_alloc_bytes,
+  l_osd_history_alloc_num,
+  l_osd_cached_crc,
+  l_osd_cached_crc_adjusted,
+  l_osd_missed_crc,
+
+  l_osd_pg,
+  l_osd_pg_primary,
+  l_osd_pg_replica,
+  l_osd_pg_stray,
+  l_osd_hb_to,
+  l_osd_map,
+  l_osd_mape,
+  l_osd_mape_dup,
+
+  l_osd_waiting_for_map,
+
+  l_osd_map_cache_hit,
+  l_osd_map_cache_miss,
+  l_osd_map_cache_miss_low,
+  l_osd_map_cache_miss_low_avg,
+  l_osd_map_bl_cache_hit,
+  l_osd_map_bl_cache_miss,
+
+  l_osd_stat_bytes,
+  l_osd_stat_bytes_used,
+  l_osd_stat_bytes_avail,
+
+  l_osd_copyfrom,
+
+  l_osd_tier_promote,
+  l_osd_tier_flush,
+  l_osd_tier_flush_fail,
+  l_osd_tier_try_flush,
+  l_osd_tier_try_flush_fail,
+  l_osd_tier_evict,
+  l_osd_tier_whiteout,
+  l_osd_tier_dirty,
+  l_osd_tier_clean,
+  l_osd_tier_delay,
+  l_osd_tier_proxy_read,
+  l_osd_tier_proxy_write,
+
+  l_osd_agent_wake,
+  l_osd_agent_skip,
+  l_osd_agent_flush,
+  l_osd_agent_evict,
+
+  l_osd_object_ctx_cache_hit,
+  l_osd_object_ctx_cache_total,
+
+  l_osd_op_cache_hit,
+  l_osd_tier_flush_lat,
+  l_osd_tier_promote_lat,
+  l_osd_tier_r_lat,
+
+  l_osd_pg_info,
+  l_osd_pg_fastinfo,
+  l_osd_pg_biginfo,
+
+  l_osd_last,
+};
+
+// RecoveryState perf counters
+enum {
+  rs_first = 20000,
+  rs_initial_latency,
+  rs_started_latency,
+  rs_reset_latency,
+  rs_start_latency,
+  rs_primary_latency,
+  rs_peering_latency,
+  rs_backfilling_latency,
+  rs_waitremotebackfillreserved_latency,
+  rs_waitlocalbackfillreserved_latency,
+  rs_notbackfilling_latency,
+  rs_repnotrecovering_latency,
+  rs_repwaitrecoveryreserved_latency,
+  rs_repwaitbackfillreserved_latency,
+  rs_reprecovering_latency,
+  rs_activating_latency,
+  rs_waitlocalrecoveryreserved_latency,
+  rs_waitremoterecoveryreserved_latency,
+  rs_recovering_latency,
+  rs_recovered_latency,
+  rs_clean_latency,
+  rs_active_latency,
+  rs_replicaactive_latency,
+  rs_stray_latency,
+  rs_getinfo_latency,
+  rs_getlog_latency,
+  rs_waitactingchange_latency,
+  rs_incomplete_latency,
+  rs_down_latency,
+  rs_getmissing_latency,
+  rs_waitupthru_latency,
+  rs_notrecovering_latency,
+  rs_last,
+};
+
+class Messenger;
+class Message;
+class MonClient;
+class PerfCounters;
+class ObjectStore;
+class FuseStore;
+class OSDMap;
+class MLog;
+class Objecter;
+
+class Watch;
+class PrimaryLogPG;
+
+class AuthAuthorizeHandlerRegistry;
+
+class TestOpsSocketHook;
+struct C_CompleteSplits;
+struct C_OpenPGs;
+class LogChannel;
+class CephContext;
+typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
+class MOSDOp;
+
+class DeletingState {
+  Mutex lock;
+  Cond cond;
+  enum {
+    QUEUED,
+    CLEARING_DIR,
+    CLEARING_WAITING,
+    DELETING_DIR,
+    DELETED_DIR,
+    CANCELED,
+  } status;
+  bool stop_deleting;
+public:
+  const spg_t pgid;
+  const PGRef old_pg_state;
+  explicit DeletingState(const pair<spg_t, PGRef> &in) :
+    lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
+    pgid(in.first), old_pg_state(in.second) {
+    }
+
+  /// transition status to CLEARING_WAITING
+  bool pause_clearing() {
+    Mutex::Locker l(lock);
+    assert(status == CLEARING_DIR);
+    if (stop_deleting) {
+      status = CANCELED;
+      cond.Signal();
+      return false;
+    }
+    status = CLEARING_WAITING;
+    return true;
+  } ///< @return false if we should cancel deletion
+
+  /// start or resume the clearing - transition the status to CLEARING_DIR
+  bool start_or_resume_clearing() {
+    Mutex::Locker l(lock);
+    assert(
+      status == QUEUED ||
+      status == DELETED_DIR ||
+      status == CLEARING_WAITING);
+    if (stop_deleting) {
+      status = CANCELED;
+      cond.Signal();
+      return false;
+    }
+    status = CLEARING_DIR;
+    return true;
+  } ///< @return false if we should cancel the deletion
+
+  /// transition status to CLEARING_DIR
+  bool resume_clearing() {
+    Mutex::Locker l(lock);
+    assert(status == CLEARING_WAITING);
+    if (stop_deleting) {
+      status = CANCELED;
+      cond.Signal();
+      return false;
+    }
+    status = CLEARING_DIR;
+    return true;
+  } ///< @return false if we should cancel deletion
+
+  /// transition status to deleting
+  bool start_deleting() {
+    Mutex::Locker l(lock);
+    assert(status == CLEARING_DIR);
+    if (stop_deleting) {
+      status = CANCELED;
+      cond.Signal();
+      return false;
+    }
+    status = DELETING_DIR;
+    return true;
+  } ///< @return false if we should cancel deletion
+
+  /// signal collection removal queued
+  void finish_deleting() {
+    Mutex::Locker l(lock);
+    assert(status == DELETING_DIR);
+    status = DELETED_DIR;
+    cond.Signal();
+  }
+
+  /// try to halt the deletion
+  bool try_stop_deletion() {
+    Mutex::Locker l(lock);
+    stop_deleting = true;
+    /**
+     * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
+     * operations we have to wait for before continuing on.  States
+     * CLEARING_WAITING and QUEUED indicate that the remover will check
+     * stop_deleting before queueing any further operations.  CANCELED
+     * indicates that the remover has already halted.  DELETED_DIR
+     * indicates that the deletion has been fully queued.
+     */
+    while (status == DELETING_DIR || status == CLEARING_DIR)
+      cond.Wait(lock);
+    return status != DELETED_DIR;
+  } ///< @return true if we don't need to recreate the collection
+};
+typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
+
+class OSD;
+
+class OSDService {
+public:
+  OSD *osd;
+  CephContext *cct;
+  SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
+  ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
+  SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
+  const int whoami;
+  ObjectStore *&store;
+  LogClient &log_client;
+  LogChannelRef clog;
+  PGRecoveryStats &pg_recovery_stats;
+private:
+  Messenger *&cluster_messenger;
+  Messenger *&client_messenger;
+public:
+  PerfCounters *&logger;
+  PerfCounters *&recoverystate_perf;
+  MonClient   *&monc;
+  ThreadPool::BatchWorkQueue<PG> &peering_wq;
+  GenContextWQ recovery_gen_wq;
+  ClassHandler  *&class_handler;
+
+  void enqueue_back(spg_t pgid, PGQueueable qi);
+  void enqueue_front(spg_t pgid, PGQueueable qi);
+
+  void maybe_inject_dispatch_delay() {
+    if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+      if (rand() % 10000 <
+         g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+       utime_t t;
+       t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+       t.sleep();
+      }
+    }
+  }
+
+private:
+  // -- map epoch lower bound --
+  Mutex pg_epoch_lock;
+  multiset<epoch_t> pg_epochs;
+  map<spg_t,epoch_t> pg_epoch;
+
+public:
+  void pg_add_epoch(spg_t pgid, epoch_t epoch) {
+    Mutex::Locker l(pg_epoch_lock);
+    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+    assert(t == pg_epoch.end());
+    pg_epoch[pgid] = epoch;
+    pg_epochs.insert(epoch);
+  }
+  void pg_update_epoch(spg_t pgid, epoch_t epoch) {
+    Mutex::Locker l(pg_epoch_lock);
+    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+    assert(t != pg_epoch.end());
+    pg_epochs.erase(pg_epochs.find(t->second));
+    t->second = epoch;
+    pg_epochs.insert(epoch);
+  }
+  void pg_remove_epoch(spg_t pgid) {
+    Mutex::Locker l(pg_epoch_lock);
+    map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+    if (t != pg_epoch.end()) {
+      pg_epochs.erase(pg_epochs.find(t->second));
+      pg_epoch.erase(t);
+    }
+  }
+  epoch_t get_min_pg_epoch() {
+    Mutex::Locker l(pg_epoch_lock);
+    if (pg_epochs.empty())
+      return 0;
+    else
+      return *pg_epochs.begin();
+  }
+
+private:
+  // -- superblock --
+  Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+  OSDSuperblock superblock;
+
+public:
+  OSDSuperblock get_superblock() {
+    Mutex::Locker l(publish_lock);
+    return superblock;
+  }
+  void publish_superblock(const OSDSuperblock &block) {
+    Mutex::Locker l(publish_lock);
+    superblock = block;
+  }
+
+  int get_nodeid() const { return whoami; }
+
+  std::atomic<epoch_t> max_oldest_map;
+private:
+  OSDMapRef osdmap;
+
+public:
+  OSDMapRef get_osdmap() {
+    Mutex::Locker l(publish_lock);
+    return osdmap;
+  }
+  epoch_t get_osdmap_epoch() {
+    Mutex::Locker l(publish_lock);
+    return osdmap ? osdmap->get_epoch() : 0;
+  }
+  void publish_map(OSDMapRef map) {
+    Mutex::Locker l(publish_lock);
+    osdmap = map;
+  }
+
+  /*
+   * osdmap - current published map
+   * next_osdmap - pre_published map that is about to be published.
+   *
+   * We use the next_osdmap to send messages and initiate connections,
+   * but only if the target is the same instance as the one in the map
+   * epoch the current user is working from (i.e., the result is
+   * equivalent to what is in next_osdmap).
+   *
+   * This allows the helpers to start ignoring osds that are about to
+   * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
+   * down, without worrying about reopening connections from threads
+   * working from old maps.
+   */
+private:
+  OSDMapRef next_osdmap;
+  Cond pre_publish_cond;
+
+public:
+  void pre_publish_map(OSDMapRef map) {
+    Mutex::Locker l(pre_publish_lock);
+    next_osdmap = std::move(map);
+  }
+
+  void activate_map();
+  /// map epochs reserved below
+  map<epoch_t, unsigned> map_reservations;
+
+  /// gets ref to next_osdmap and registers the epoch as reserved
+  OSDMapRef get_nextmap_reserved() {
+    Mutex::Locker l(pre_publish_lock);
+    if (!next_osdmap)
+      return OSDMapRef();
+    epoch_t e = next_osdmap->get_epoch();
+    map<epoch_t, unsigned>::iterator i =
+      map_reservations.insert(make_pair(e, 0)).first;
+    i->second++;
+    return next_osdmap;
+  }
+  /// releases reservation on map
+  void release_map(OSDMapRef osdmap) {
+    Mutex::Locker l(pre_publish_lock);
+    map<epoch_t, unsigned>::iterator i =
+      map_reservations.find(osdmap->get_epoch());
+    assert(i != map_reservations.end());
+    assert(i->second > 0);
+    if (--(i->second) == 0) {
+      map_reservations.erase(i);
+    }
+    pre_publish_cond.Signal();
+  }
+  /// blocks until there are no reserved maps prior to next_osdmap
+  void await_reserved_maps() {
+    Mutex::Locker l(pre_publish_lock);
+    assert(next_osdmap);
+    while (true) {
+      map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
+      if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
+       break;
+      } else {
+       pre_publish_cond.Wait(pre_publish_lock);
+      }
+    }
+  }
+
+private:
+  Mutex peer_map_epoch_lock;
+  map<int, epoch_t> peer_map_epoch;
+public:
+  epoch_t get_peer_epoch(int p);
+  epoch_t note_peer_epoch(int p, epoch_t e);
+  void forget_peer_epoch(int p, epoch_t e);
+
+  void send_map(class MOSDMap *m, Connection *con);
+  void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+  MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
+                                       OSDSuperblock& superblock);
+  bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
+                        const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
+  void share_map(entity_name_t name, Connection *con, epoch_t epoch,
+                 OSDMapRef& osdmap, epoch_t *sent_epoch_p);
+  void share_map_peer(int peer, Connection *con,
+                      OSDMapRef map = OSDMapRef());
+
+  ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+  pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
+  void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+  void send_message_osd_cluster(Message *m, Connection *con) {
+    con->send_message(m);
+  }
+  void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
+    con->send_message(m);
+  }
+  void send_message_osd_client(Message *m, Connection *con) {
+    con->send_message(m);
+  }
+  void send_message_osd_client(Message *m, const ConnectionRef& con) {
+    con->send_message(m);
+  }
+  entity_name_t get_cluster_msgr_name() {
+    return cluster_messenger->get_myname();
+  }
+
+private:
+  // -- scrub scheduling --
+  Mutex sched_scrub_lock;
+  int scrubs_pending;
+  int scrubs_active;
+
+public:
+  struct ScrubJob {
+    CephContext* cct;
+    /// pg to be scrubbed
+    spg_t pgid;
+    /// a time scheduled for scrub. but the scrub could be delayed if system
+    /// load is too high or it fails to fall in the scrub hours
+    utime_t sched_time;
+    /// the hard upper bound of scrub time
+    utime_t deadline;
+    ScrubJob() : cct(nullptr) {}
+    explicit ScrubJob(CephContext* cct, const spg_t& pg,
+                     const utime_t& timestamp,
+                     double pool_scrub_min_interval = 0,
+                     double pool_scrub_max_interval = 0, bool must = true);
+    /// order the jobs by sched_time
+    bool operator<(const ScrubJob& rhs) const;
+  };
+  set<ScrubJob> sched_scrub_pg;
+
+  /// @returns the scrub_reg_stamp used for unregister the scrub job
+  utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
+                      double pool_scrub_max_interval, bool must) {
+    ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
+                  must);
+    Mutex::Locker l(sched_scrub_lock);
+    sched_scrub_pg.insert(scrub);
+    return scrub.sched_time;
+  }
+  void unreg_pg_scrub(spg_t pgid, utime_t t) {
+    Mutex::Locker l(sched_scrub_lock);
+    size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
+    assert(removed);
+  }
+  bool first_scrub_stamp(ScrubJob *out) {
+    Mutex::Locker l(sched_scrub_lock);
+    if (sched_scrub_pg.empty())
+      return false;
+    set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
+    *out = *iter;
+    return true;
+  }
+  bool next_scrub_stamp(const ScrubJob& next,
+                       ScrubJob *out) {
+    Mutex::Locker l(sched_scrub_lock);
+    if (sched_scrub_pg.empty())
+      return false;
+    set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
+    if (iter == sched_scrub_pg.cend())
+      return false;
+    ++iter;
+    if (iter == sched_scrub_pg.cend())
+      return false;
+    *out = *iter;
+    return true;
+  }
+
+  void dumps_scrub(Formatter *f) {
+    assert(f != nullptr);
+    Mutex::Locker l(sched_scrub_lock);
+
+    f->open_array_section("scrubs");
+    for (const auto &i: sched_scrub_pg) {
+      f->open_object_section("scrub");
+      f->dump_stream("pgid") << i.pgid;
+      f->dump_stream("sched_time") << i.sched_time;
+      f->dump_stream("deadline") << i.deadline;
+      f->dump_bool("forced", i.sched_time == i.deadline);
+      f->close_section();
+    }
+    f->close_section();
+  }
+
+  bool can_inc_scrubs_pending();
+  bool inc_scrubs_pending();
+  void inc_scrubs_active(bool reserved);
+  void dec_scrubs_pending();
+  void dec_scrubs_active();
+
+  void reply_op_error(OpRequestRef op, int err);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+  void handle_misdirected_op(PG *pg, OpRequestRef op);
+
+
+private:
+  // -- agent shared state --
+  Mutex agent_lock;
+  Cond agent_cond;
+  map<uint64_t, set<PGRef> > agent_queue;
+  set<PGRef>::iterator agent_queue_pos;
+  bool agent_valid_iterator;
+  int agent_ops;
+  int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
+  set<hobject_t> agent_oids;
+  bool agent_active;
+  struct AgentThread : public Thread {
+    OSDService *osd;
+    explicit AgentThread(OSDService *o) : osd(o) {}
+    void *entry() override {
+      osd->agent_entry();
+      return NULL;
+    }
+  } agent_thread;
+  bool agent_stop_flag;
+  Mutex agent_timer_lock;
+  SafeTimer agent_timer;
+
+public:
+  void agent_entry();
+  void agent_stop();
+
+  void _enqueue(PG *pg, uint64_t priority) {
+    if (!agent_queue.empty() &&
+       agent_queue.rbegin()->first < priority)
+      agent_valid_iterator = false;  // inserting higher-priority queue
+    set<PGRef>& nq = agent_queue[priority];
+    if (nq.empty())
+      agent_cond.Signal();
+    nq.insert(pg);
+  }
+
+  void _dequeue(PG *pg, uint64_t old_priority) {
+    set<PGRef>& oq = agent_queue[old_priority];
+    set<PGRef>::iterator p = oq.find(pg);
+    assert(p != oq.end());
+    if (p == agent_queue_pos)
+      ++agent_queue_pos;
+    oq.erase(p);
+    if (oq.empty()) {
+      if (agent_queue.rbegin()->first == old_priority)
+       agent_valid_iterator = false;
+      agent_queue.erase(old_priority);
+    }
+  }
+
+  /// enable agent for a pg
+  void agent_enable_pg(PG *pg, uint64_t priority) {
+    Mutex::Locker l(agent_lock);
+    _enqueue(pg, priority);
+  }
+
+  /// adjust priority for an enagled pg
+  void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
+    Mutex::Locker l(agent_lock);
+    assert(new_priority != old_priority);
+    _enqueue(pg, new_priority);
+    _dequeue(pg, old_priority);
+  }
+
+  /// disable agent for a pg
+  void agent_disable_pg(PG *pg, uint64_t old_priority) {
+    Mutex::Locker l(agent_lock);
+    _dequeue(pg, old_priority);
+  }
+
+  /// note start of an async (evict) op
+  void agent_start_evict_op() {
+    Mutex::Locker l(agent_lock);
+    ++agent_ops;
+  }
+
+  /// note finish or cancellation of an async (evict) op
+  void agent_finish_evict_op() {
+    Mutex::Locker l(agent_lock);
+    assert(agent_ops > 0);
+    --agent_ops;
+    agent_cond.Signal();
+  }
+
+  /// note start of an async (flush) op
+  void agent_start_op(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    ++agent_ops;
+    assert(agent_oids.count(oid) == 0);
+    agent_oids.insert(oid);
+  }
+
+  /// note finish or cancellation of an async (flush) op
+  void agent_finish_op(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    assert(agent_ops > 0);
+    --agent_ops;
+    assert(agent_oids.count(oid) == 1);
+    agent_oids.erase(oid);
+    agent_cond.Signal();
+  }
+
+  /// check if we are operating on an object
+  bool agent_is_active_oid(const hobject_t& oid) {
+    Mutex::Locker l(agent_lock);
+    return agent_oids.count(oid);
+  }
+
+  /// get count of active agent ops
+  int agent_get_num_ops() {
+    Mutex::Locker l(agent_lock);
+    return agent_ops;
+  }
+
+  void agent_inc_high_count() {
+    Mutex::Locker l(agent_lock);
+    flush_mode_high_count ++;
+  }
+
+  void agent_dec_high_count() {
+    Mutex::Locker l(agent_lock);
+    flush_mode_high_count --;
+  }
+
+private:
+  /// throttle promotion attempts
+  std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
+  PromoteCounter promote_counter;
+  utime_t last_recalibrate;
+  unsigned long promote_max_objects, promote_max_bytes;
+
+public:
+  bool promote_throttle() {
+    // NOTE: lockless!  we rely on the probability being a single word.
+    promote_counter.attempt();
+    if ((unsigned)rand() % 1000 > promote_probability_millis)
+      return true;  // yes throttle (no promote)
+    if (promote_max_objects &&
+       promote_counter.objects > promote_max_objects)
+      return true;  // yes throttle
+    if (promote_max_bytes &&
+       promote_counter.bytes > promote_max_bytes)
+      return true;  // yes throttle
+    return false;   //  no throttle (promote)
+  }
+  void promote_finish(uint64_t bytes) {
+    promote_counter.finish(bytes);
+  }
+  void promote_throttle_recalibrate();
+
+  // -- Objecter, for tiering reads/writes from/to other OSDs --
+  Objecter *objecter;
+  Finisher objecter_finisher;
+
+  // -- Watch --
+  Mutex watch_lock;
+  SafeTimer watch_timer;
+  uint64_t next_notif_id;
+  uint64_t get_next_id(epoch_t cur_epoch) {
+    Mutex::Locker l(watch_lock);
+    return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
+  }
+
+  // -- Recovery/Backfill Request Scheduling --
+  Mutex recovery_request_lock;
+  SafeTimer recovery_request_timer;
+
+  // For async recovery sleep
+  bool recovery_needs_sleep = true;
+  utime_t recovery_schedule_time = utime_t();
+
+  Mutex recovery_sleep_lock;
+  SafeTimer recovery_sleep_timer;
+
+  // -- tids --
+  // for ops i issue
+  std::atomic_uint last_tid{0};
+  ceph_tid_t get_tid() {
+    return (ceph_tid_t)last_tid++;
+  }
+
+  // -- backfill_reservation --
+  Finisher reserver_finisher;
+  AsyncReserver<spg_t> local_reserver;
+  AsyncReserver<spg_t> remote_reserver;
+
+  // -- pg_temp --
+private:
+  Mutex pg_temp_lock;
+  map<pg_t, vector<int> > pg_temp_wanted;
+  map<pg_t, vector<int> > pg_temp_pending;
+  void _sent_pg_temp();
+public:
+  void queue_want_pg_temp(pg_t pgid, vector<int>& want);
+  void remove_want_pg_temp(pg_t pgid);
+  void requeue_pg_temp();
+  void send_pg_temp();
+
+  void send_pg_created(pg_t pgid);
+
+  void queue_for_peering(PG *pg);
+
+  Mutex snap_sleep_lock;
+  SafeTimer snap_sleep_timer;
+
+  Mutex scrub_sleep_lock;
+  SafeTimer scrub_sleep_timer;
+
+  AsyncReserver<spg_t> snap_reserver;
+  void queue_for_snap_trim(PG *pg);
+
+  void queue_for_scrub(PG *pg, bool with_high_priority) {
+    unsigned scrub_queue_priority = pg->scrubber.priority;
+    if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
+      scrub_queue_priority = cct->_conf->osd_client_op_priority;
+    }
+    enqueue_back(
+      pg->info.pgid,
+      PGQueueable(
+       PGScrub(pg->get_osdmap()->get_epoch()),
+       cct->_conf->osd_scrub_cost,
+       scrub_queue_priority,
+       ceph_clock_now(),
+       entity_inst_t(),
+       pg->get_osdmap()->get_epoch()));
+  }
+
+private:
+  // -- pg recovery and associated throttling --
+  Mutex recovery_lock;
+  list<pair<epoch_t, PGRef> > awaiting_throttle;
+
+  utime_t defer_recovery_until;
+  uint64_t recovery_ops_active;
+  uint64_t recovery_ops_reserved;
+  bool recovery_paused;
+#ifdef DEBUG_RECOVERY_OIDS
+  map<spg_t, set<hobject_t> > recovery_oids;
+#endif
+  bool _recover_now(uint64_t *available_pushes);
+  void _maybe_queue_recovery();
+  void _queue_for_recovery(
+    pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
+    assert(recovery_lock.is_locked_by_me());
+    enqueue_back(
+      p.second->info.pgid,
+      PGQueueable(
+       PGRecovery(p.first, reserved_pushes),
+       cct->_conf->osd_recovery_cost,
+       cct->_conf->osd_recovery_priority,
+       ceph_clock_now(),
+       entity_inst_t(),
+       p.first));
+  }
+public:
+  void start_recovery_op(PG *pg, const hobject_t& soid);
+  void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
+  bool is_recovery_active();
+  void release_reserved_pushes(uint64_t pushes) {
+    Mutex::Locker l(recovery_lock);
+    assert(recovery_ops_reserved >= pushes);
+    recovery_ops_reserved -= pushes;
+    _maybe_queue_recovery();
+  }
+  void defer_recovery(float defer_for) {
+    defer_recovery_until = ceph_clock_now();
+    defer_recovery_until += defer_for;
+  }
+  void pause_recovery() {
+    Mutex::Locker l(recovery_lock);
+    recovery_paused = true;
+  }
+  bool recovery_is_paused() {
+    Mutex::Locker l(recovery_lock);
+    return recovery_paused;
+  }
+  void unpause_recovery() {
+    Mutex::Locker l(recovery_lock);
+    recovery_paused = false;
+    _maybe_queue_recovery();
+  }
+  void kick_recovery_queue() {
+    Mutex::Locker l(recovery_lock);
+    _maybe_queue_recovery();
+  }
+  void clear_queued_recovery(PG *pg) {
+    Mutex::Locker l(recovery_lock);
+    for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
+        i != awaiting_throttle.end();
+      ) {
+      if (i->second.get() == pg) {
+       awaiting_throttle.erase(i);
+       return;
+      } else {
+       ++i;
+      }
+    }
+  }
+  // delayed pg activation
+  void queue_for_recovery(PG *pg) {
+    Mutex::Locker l(recovery_lock);
+
+    if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
+      awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+    } else {
+      awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+    }
+    _maybe_queue_recovery();
+  }
+  void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
+    Mutex::Locker l(recovery_lock);
+    _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
+  }
+
+  void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
+
+  // osd map cache (past osd maps)
+  Mutex map_cache_lock;
+  SharedLRU<epoch_t, const OSDMap> map_cache;
+  SimpleLRU<epoch_t, bufferlist> map_bl_cache;
+  SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+
+  OSDMapRef try_get_map(epoch_t e);
+  OSDMapRef get_map(epoch_t e) {
+    OSDMapRef ret(try_get_map(e));
+    assert(ret);
+    return ret;
+  }
+  OSDMapRef add_map(OSDMap *o) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map(o);
+  }
+  OSDMapRef _add_map(OSDMap *o);
+
+  void add_map_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map_bl(e, bl);
+  }
+  void pin_map_bl(epoch_t e, bufferlist &bl);
+  void _add_map_bl(epoch_t e, bufferlist& bl);
+  bool get_map_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _get_map_bl(e, bl);
+  }
+  bool _get_map_bl(epoch_t e, bufferlist& bl);
+
+  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map_inc_bl(e, bl);
+  }
+  void pin_map_inc_bl(epoch_t e, bufferlist &bl);
+  void _add_map_inc_bl(epoch_t e, bufferlist& bl);
+  bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+
+  void clear_map_bl_cache_pins(epoch_t e);
+
+  void need_heartbeat_peer_update();
+
+  void pg_stat_queue_enqueue(PG *pg);
+  void pg_stat_queue_dequeue(PG *pg);
+
+  void init();
+  void final_init();  
+  void start_shutdown();
+  void shutdown_reserver();
+  void shutdown();
+
+private:
+  // split
+  Mutex in_progress_split_lock;
+  map<spg_t, spg_t> pending_splits; // child -> parent
+  map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
+  set<spg_t> in_progress_splits;       // child
+
+public:
+  void _start_split(spg_t parent, const set<spg_t> &children);
+  void start_split(spg_t parent, const set<spg_t> &children) {
+    Mutex::Locker l(in_progress_split_lock);
+    return _start_split(parent, children);
+  }
+  void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
+  void complete_split(const set<spg_t> &pgs);
+  void cancel_pending_splits_for_parent(spg_t parent);
+  void _cancel_pending_splits_for_parent(spg_t parent);
+  bool splitting(spg_t pgid);
+  void expand_pg_num(OSDMapRef old_map,
+                    OSDMapRef new_map);
+  void _maybe_split_pgid(OSDMapRef old_map,
+                        OSDMapRef new_map,
+                        spg_t pgid);
+  void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
+
+  // -- stats --
+  Mutex stat_lock;
+  osd_stat_t osd_stat;
+  uint32_t seq = 0;
+
+  void update_osd_stat(vector<int>& hb_peers);
+  osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
+                          vector<int>& hb_peers,
+                         int num_pgs);
+  osd_stat_t get_osd_stat() {
+    Mutex::Locker l(stat_lock);
+    ++seq;
+    osd_stat.up_from = up_epoch;
+    osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
+    return osd_stat;
+  }
+  uint64_t get_osd_stat_seq() {
+    Mutex::Locker l(stat_lock);
+    return osd_stat.seq;
+  }
+
+  // -- OSD Full Status --
+private:
+  friend TestOpsSocketHook;
+  mutable Mutex full_status_lock;
+  enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state;  // ascending
+  const char *get_full_state_name(s_names s) const {
+    switch (s) {
+    case NONE: return "none";
+    case NEARFULL: return "nearfull";
+    case BACKFILLFULL: return "backfillfull";
+    case FULL: return "full";
+    case FAILSAFE: return "failsafe";
+    default: return "???";
+    }
+  }
+  s_names get_full_state(string type) const {
+    if (type == "none")
+      return NONE;
+    else if (type == "failsafe")
+      return FAILSAFE;
+    else if (type == "full")
+      return FULL;
+    else if (type == "backfillfull")
+      return BACKFILLFULL;
+    else if (type == "nearfull")
+      return NEARFULL;
+    else
+      return INVALID;
+  }
+  double cur_ratio;  ///< current utilization
+  mutable int64_t injectfull = 0;
+  s_names injectfull_state = NONE;
+  float get_failsafe_full_ratio();
+  void check_full_status(float ratio);
+  bool _check_full(s_names type, ostream &ss) const;
+public:
+  bool check_failsafe_full(ostream &ss) const;
+  bool check_full(ostream &ss) const;
+  bool check_backfill_full(ostream &ss) const;
+  bool check_nearfull(ostream &ss) const;
+  bool is_failsafe_full() const;
+  bool is_full() const;
+  bool is_backfillfull() const;
+  bool is_nearfull() const;
+  bool need_fullness_update();  ///< osdmap state needs update
+  void set_injectfull(s_names type, int64_t count);
+  bool check_osdmap_full(const set<pg_shard_t> &missing_on);
+
+
+  // -- epochs --
+private:
+  mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+  epoch_t boot_epoch;  // _first_ epoch we were marked up (after this process started)
+  epoch_t up_epoch;    // _most_recent_ epoch we were marked up
+  epoch_t bind_epoch;  // epoch we last did a bind to new ip:ports
+public:
+  /**
+   * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
+   * can be NULL if you don't care about them.
+   */
+  void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+                       epoch_t *_bind_epoch) const;
+  /**
+   * Set the boot, up, and bind epochs. Any NULL params will not be set.
+   */
+  void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+                  const epoch_t *_bind_epoch);
+  epoch_t get_boot_epoch() const {
+    epoch_t ret;
+    retrieve_epochs(&ret, NULL, NULL);
+    return ret;
+  }
+  epoch_t get_up_epoch() const {
+    epoch_t ret;
+    retrieve_epochs(NULL, &ret, NULL);
+    return ret;
+  }
+  epoch_t get_bind_epoch() const {
+    epoch_t ret;
+    retrieve_epochs(NULL, NULL, &ret);
+    return ret;
+  }
+
+  void request_osdmap_update(epoch_t e);
+
+  // -- stopping --
+  Mutex is_stopping_lock;
+  Cond is_stopping_cond;
+  enum {
+    NOT_STOPPING,
+    PREPARING_TO_STOP,
+    STOPPING };
+  std::atomic_int state{NOT_STOPPING};
+  int get_state() {
+    return state;
+  }
+  void set_state(int s) {
+    state = s;
+  }
+  bool is_stopping() const {
+    return state == STOPPING;
+  }
+  bool is_preparing_to_stop() const {
+    return state == PREPARING_TO_STOP;
+  }
+  bool prepare_to_stop();
+  void got_stop_ack();
+
+
+#ifdef PG_DEBUG_REFS
+  Mutex pgid_lock;
+  map<spg_t, int> pgid_tracker;
+  map<spg_t, PG*> live_pgs;
+  void add_pgid(spg_t pgid, PG *pg);
+  void remove_pgid(spg_t pgid, PG *pg);
+  void dump_live_pgids();
+#endif
+
+  explicit OSDService(OSD *osd);
+  ~OSDService();
+};
+
+class OSD : public Dispatcher,
+           public md_config_obs_t {
+  /** OSD **/
+  Mutex osd_lock;                      // global lock
+  SafeTimer tick_timer;    // safe timer (osd_lock)
+
+  // Tick timer for those stuff that do not need osd_lock
+  Mutex tick_timer_lock;
+  SafeTimer tick_timer_without_osd_lock;
+public:
+  // config observer bits
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(const struct md_config_t *conf,
+                          const std::set <std::string> &changed) override;
+  void update_log_config();
+  void check_config();
+
+protected:
+
+  static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
+
+  AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
+  AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
+
+  Messenger   *cluster_messenger;
+  Messenger   *client_messenger;
+  Messenger   *objecter_messenger;
+  MonClient   *monc; // check the "monc helpers" list before accessing directly
+  MgrClient   mgrc;
+  PerfCounters      *logger;
+  PerfCounters      *recoverystate_perf;
+  ObjectStore *store;
+#ifdef HAVE_LIBFUSE
+  FuseStore *fuse_store = nullptr;
+#endif
+  LogClient log_client;
+  LogChannelRef clog;
+
+  int whoami;
+  std::string dev_path, journal_path;
+
+  bool store_is_rotational = true;
+  bool journal_is_rotational = true;
+
+  ZTracer::Endpoint trace_endpoint;
+  void create_logger();
+  void create_recoverystate_perf();
+  void tick();
+  void tick_without_osd_lock();
+  void _dispatch(Message *m);
+  void dispatch_op(OpRequestRef op);
+
+  void check_osdmap_features(ObjectStore *store);
+
+  // asok
+  friend class OSDSocketHook;
+  class OSDSocketHook *asok_hook;
+  bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
+
+public:
+  ClassHandler  *class_handler = nullptr;
+  int get_nodeid() { return whoami; }
+  
+  static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
+    char foo[20];
+    snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
+    return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
+  }
+  static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
+    char foo[22];
+    snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
+    return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
+  }
+
+  static ghobject_t make_snapmapper_oid() {
+    return ghobject_t(hobject_t(
+      sobject_t(
+       object_t("snapmapper"),
+       0)));
+  }
+
+  static ghobject_t make_pg_log_oid(spg_t pg) {
+    stringstream ss;
+    ss << "pglog_" << pg;
+    string s;
+    getline(ss, s);
+    return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
+  }
+  
+  static ghobject_t make_pg_biginfo_oid(spg_t pg) {
+    stringstream ss;
+    ss << "pginfo_" << pg;
+    string s;
+    getline(ss, s);
+    return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
+  }
+  static ghobject_t make_infos_oid() {
+    hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
+    return ghobject_t(oid);
+  }
+  static void recursive_remove_collection(CephContext* cct,
+                                         ObjectStore *store,
+                                         spg_t pgid,
+                                         coll_t tmp);
+
+  /**
+   * get_osd_initial_compat_set()
+   *
+   * Get the initial feature set for this OSD.  Features
+   * here are automatically upgraded.
+   *
+   * Return value: Initial osd CompatSet
+   */
+  static CompatSet get_osd_initial_compat_set();
+
+  /**
+   * get_osd_compat_set()
+   *
+   * Get all features supported by this OSD
+   *
+   * Return value: CompatSet of all supported features
+   */
+  static CompatSet get_osd_compat_set();
+  
+
+private:
+  class C_Tick;
+  class C_Tick_WithoutOSDLock;
+
+  // -- superblock --
+  OSDSuperblock superblock;
+
+  void write_superblock();
+  void write_superblock(ObjectStore::Transaction& t);
+  int read_superblock();
+
+  void clear_temp_objects();
+
+  CompatSet osd_compat;
+
+  // -- state --
+public:
+  typedef enum {
+    STATE_INITIALIZING = 1,
+    STATE_PREBOOT,
+    STATE_BOOTING,
+    STATE_ACTIVE,
+    STATE_STOPPING,
+    STATE_WAITING_FOR_HEALTHY
+  } osd_state_t;
+
+  static const char *get_state_name(int s) {
+    switch (s) {
+    case STATE_INITIALIZING: return "initializing";
+    case STATE_PREBOOT: return "preboot";
+    case STATE_BOOTING: return "booting";
+    case STATE_ACTIVE: return "active";
+    case STATE_STOPPING: return "stopping";
+    case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
+    default: return "???";
+    }
+  }
+
+private:
+  std::atomic_int state{STATE_INITIALIZING};
+  bool waiting_for_luminous_mons = false;
+
+public:
+  int get_state() const {
+    return state;
+  }
+  void set_state(int s) {
+    state = s;
+  }
+  bool is_initializing() const {
+    return state == STATE_INITIALIZING;
+  }
+  bool is_preboot() const {
+    return state == STATE_PREBOOT;
+  }
+  bool is_booting() const {
+    return state == STATE_BOOTING;
+  }
+  bool is_active() const {
+    return state == STATE_ACTIVE;
+  }
+  bool is_stopping() const {
+    return state == STATE_STOPPING;
+  }
+  bool is_waiting_for_healthy() const {
+    return state == STATE_WAITING_FOR_HEALTHY;
+  }
+
+private:
+
+  ThreadPool peering_tp;
+  ShardedThreadPool osd_op_tp;
+  ThreadPool disk_tp;
+  ThreadPool command_tp;
+
+  void set_disk_tp_priority();
+  void get_latest_osdmap();
+
+  // -- sessions --
+private:
+  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+  void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+
+  Mutex session_waiting_lock;
+  set<Session*> session_waiting_for_map;
+
+  /// Caller assumes refs for included Sessions
+  void get_sessions_waiting_for_map(set<Session*> *out) {
+    Mutex::Locker l(session_waiting_lock);
+    out->swap(session_waiting_for_map);
+  }
+  void register_session_waiting_on_map(Session *session) {
+    Mutex::Locker l(session_waiting_lock);
+    if (session_waiting_for_map.insert(session).second) {
+      session->get();
+    }
+  }
+  void clear_session_waiting_on_map(Session *session) {
+    Mutex::Locker l(session_waiting_lock);
+    set<Session*>::iterator i = session_waiting_for_map.find(session);
+    if (i != session_waiting_for_map.end()) {
+      (*i)->put();
+      session_waiting_for_map.erase(i);
+    }
+  }
+  void dispatch_sessions_waiting_on_map() {
+    set<Session*> sessions_to_check;
+    get_sessions_waiting_for_map(&sessions_to_check);
+    for (set<Session*>::iterator i = sessions_to_check.begin();
+        i != sessions_to_check.end();
+        sessions_to_check.erase(i++)) {
+      (*i)->session_dispatch_lock.Lock();
+      dispatch_session_waiting(*i, osdmap);
+      (*i)->session_dispatch_lock.Unlock();
+      (*i)->put();
+    }
+  }
+  void session_handle_reset(Session *session) {
+    Mutex::Locker l(session->session_dispatch_lock);
+    clear_session_waiting_on_map(session);
+
+    session->clear_backoffs();
+
+    /* Messages have connection refs, we need to clear the
+     * connection->session->message->connection
+     * cycles which result.
+     * Bug #12338
+     */
+    session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
+  }
+
+private:
+  /**
+   * @defgroup monc helpers
+   * @{
+   * Right now we only have the one
+   */
+
+  /**
+   * Ask the Monitors for a sequence of OSDMaps.
+   *
+   * @param epoch The epoch to start with when replying
+   * @param force_request True if this request forces a new subscription to
+   * the monitors; false if an outstanding request that encompasses it is
+   * sufficient.
+   */
+  void osdmap_subscribe(version_t epoch, bool force_request);
+  /** @} monc helpers */
+
+  Mutex osdmap_subscribe_lock;
+  epoch_t latest_subscribed_epoch{0};
+
+  // -- heartbeat --
+  /// information about a heartbeat peer
+  struct HeartbeatInfo {
+    int peer;           ///< peer
+    ConnectionRef con_front;   ///< peer connection (front)
+    ConnectionRef con_back;    ///< peer connection (back)
+    utime_t first_tx;   ///< time we sent our first ping request
+    utime_t last_tx;    ///< last time we sent a ping request
+    utime_t last_rx_front;  ///< last time we got a ping reply on the front side
+    utime_t last_rx_back;   ///< last time we got a ping reply on the back side
+    epoch_t epoch;      ///< most recent epoch we wanted this peer
+
+    bool is_unhealthy(utime_t cutoff) const {
+      return
+       ! ((last_rx_front > cutoff ||
+           (last_rx_front == utime_t() && (last_tx == utime_t() ||
+                                           first_tx > cutoff))) &&
+          (last_rx_back > cutoff ||
+           (last_rx_back == utime_t() && (last_tx == utime_t() ||
+                                          first_tx > cutoff))));
+    }
+    bool is_healthy(utime_t cutoff) const {
+      return last_rx_front > cutoff && last_rx_back > cutoff;
+    }
+
+  };
+  /// state attached to outgoing heartbeat connections
+  struct HeartbeatSession : public RefCountedObject {
+    int peer;
+    explicit HeartbeatSession(int p) : peer(p) {}
+  };
+  Mutex heartbeat_lock;
+  map<int, int> debug_heartbeat_drops_remaining;
+  Cond heartbeat_cond;
+  bool heartbeat_stop;
+  std::atomic_bool heartbeat_need_update;   
+  map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
+  utime_t last_mon_heartbeat;
+  Messenger *hb_front_client_messenger;
+  Messenger *hb_back_client_messenger;
+  Messenger *hb_front_server_messenger;
+  Messenger *hb_back_server_messenger;
+  utime_t last_heartbeat_resample;   ///< last time we chose random peers in waiting-for-healthy state
+  double daily_loadavg;
+  
+  void _add_heartbeat_peer(int p);
+  void _remove_heartbeat_peer(int p);
+  bool heartbeat_reset(Connection *con);
+  void maybe_update_heartbeat_peers();
+  void reset_heartbeat_peers();
+  bool heartbeat_peers_need_update() {
+    return heartbeat_need_update.load();
+  }
+  void heartbeat_set_peers_need_update() {
+    heartbeat_need_update.store(true);
+  }
+  void heartbeat_clear_peers_need_update() {
+    heartbeat_need_update.store(false);
+  }
+  void heartbeat();
+  void heartbeat_check();
+  void heartbeat_entry();
+  void need_heartbeat_peer_update();
+
+  void heartbeat_kick() {
+    Mutex::Locker l(heartbeat_lock);
+    heartbeat_cond.Signal();
+  }
+
+  struct T_Heartbeat : public Thread {
+    OSD *osd;
+    explicit T_Heartbeat(OSD *o) : osd(o) {}
+    void *entry() override {
+      osd->heartbeat_entry();
+      return 0;
+    }
+  } heartbeat_thread;
+
+public:
+  bool heartbeat_dispatch(Message *m);
+
+  struct HeartbeatDispatcher : public Dispatcher {
+    OSD *osd;
+    explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
+
+    bool ms_can_fast_dispatch_any() const override { return true; }
+    bool ms_can_fast_dispatch(const Message *m) const override {
+      switch (m->get_type()) {
+       case CEPH_MSG_PING:
+       case MSG_OSD_PING:
+          return true;
+       default:
+          return false;
+       }
+    }
+    void ms_fast_dispatch(Message *m) override {
+      osd->heartbeat_dispatch(m);
+    }
+    bool ms_dispatch(Message *m) override {
+      return osd->heartbeat_dispatch(m);
+    }
+    bool ms_handle_reset(Connection *con) override {
+      return osd->heartbeat_reset(con);
+    }
+    void ms_handle_remote_reset(Connection *con) override {}
+    bool ms_handle_refused(Connection *con) override {
+      return osd->ms_handle_refused(con);
+    }
+    bool ms_verify_authorizer(Connection *con, int peer_type,
+                             int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+                             bool& isvalid, CryptoKey& session_key) override {
+      isvalid = true;
+      return true;
+    }
+  } heartbeat_dispatcher;
+
+private:
+  // -- waiters --
+  list<OpRequestRef> finished;
+  
+  void take_waiters(list<OpRequestRef>& ls) {
+    assert(osd_lock.is_locked());
+    finished.splice(finished.end(), ls);
+  }
+  void do_waiters();
+  
+  // -- op tracking --
+  OpTracker op_tracker;
+  void check_ops_in_flight();
+  void test_ops(std::string command, std::string args, ostream& ss);
+  friend class TestOpsSocketHook;
+  TestOpsSocketHook *test_ops_hook;
+  friend struct C_CompleteSplits;
+  friend struct C_OpenPGs;
+
+  // -- op queue --
+  enum class io_queue {
+    prioritized,
+    weightedpriority,
+    mclock_opclass,
+    mclock_client,
+  };
+  friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+  const io_queue op_queue;
+  const unsigned int op_prio_cutoff;
+
+  /*
+   * The ordered op delivery chain is:
+   *
+   *   fast dispatch -> pqueue back
+   *                    pqueue front <-> to_process back
+   *                                     to_process front  -> RunVis(item)
+   *                                                      <- queue_front()
+   *
+   * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
+   * pushed back up into to_process and/or pqueue while order is preserved.
+   *
+   * Multiple worker threads can operate on each shard.
+   *
+   * Under normal circumstances, num_running == to_proces.size().  There are
+   * two times when that is not true: (1) when waiting_for_pg == true and
+   * to_process is accumulating requests that are waiting for the pg to be
+   * instantiated; in that case they will all get requeued together by
+   * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
+   * and already requeued the items.
+   */
+  friend class PGQueueable;
+
+  class ShardedOpWQ
+    : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+  {
+    struct ShardData {
+      Mutex sdata_lock;
+      Cond sdata_cond;
+
+      Mutex sdata_op_ordering_lock;   ///< protects all members below
+
+      OSDMapRef waiting_for_pg_osdmap;
+      struct pg_slot {
+       PGRef pg;                     ///< cached pg reference [optional]
+       list<PGQueueable> to_process; ///< order items for this slot
+       int num_running = 0;          ///< _process threads doing pg lookup/lock
+
+       /// true if pg does/did not exist. if so all new items go directly to
+       /// to_process.  cleared by prune_pg_waiters.
+       bool waiting_for_pg = false;
+
+       /// incremented by wake_pg_waiters; indicates racing _process threads
+       /// should bail out (their op has been requeued)
+       uint64_t requeue_seq = 0;
+      };
+
+      /// map of slots for each spg_t.  maintains ordering of items dequeued
+      /// from pqueue while _process thread drops shard lock to acquire the
+      /// pg lock.  slots are removed only by prune_pg_waiters.
+      unordered_map<spg_t,pg_slot> pg_slots;
+
+      /// priority queue
+      std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
+
+      void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
+       unsigned priority = item.second.get_priority();
+       unsigned cost = item.second.get_cost();
+       if (priority >= cutoff)
+         pqueue->enqueue_strict_front(
+           item.second.get_owner(),
+           priority, item);
+       else
+         pqueue->enqueue_front(
+           item.second.get_owner(),
+           priority, cost, item);
+      }
+
+      ShardData(
+       string lock_name, string ordering_lock,
+       uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+       io_queue opqueue)
+       : sdata_lock(lock_name.c_str(), false, true, false, cct),
+         sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+                                false, cct) {
+       if (opqueue == io_queue::weightedpriority) {
+         pqueue = std::unique_ptr
+           <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+             new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+               max_tok_per_prio, min_cost));
+       } else if (opqueue == io_queue::prioritized) {
+         pqueue = std::unique_ptr
+           <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+             new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+               max_tok_per_prio, min_cost));
+       } else if (opqueue == io_queue::mclock_opclass) {
+         pqueue = std::unique_ptr
+           <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
+       } else if (opqueue == io_queue::mclock_client) {
+         pqueue = std::unique_ptr
+           <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
+       }
+      }
+    }; // struct ShardData
+
+    vector<ShardData*> shard_list;
+    OSD *osd;
+    uint32_t num_shards;
+
+  public:
+    ShardedOpWQ(uint32_t pnum_shards,
+               OSD *o,
+               time_t ti,
+               time_t si,
+               ShardedThreadPool* tp)
+      : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
+        osd(o),
+        num_shards(pnum_shards) {
+      for (uint32_t i = 0; i < num_shards; i++) {
+       char lock_name[32] = {0};
+       snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
+       char order_lock[32] = {0};
+       snprintf(order_lock, sizeof(order_lock), "%s.%d",
+                "OSD:ShardedOpWQ:order:", i);
+       ShardData* one_shard = new ShardData(
+         lock_name, order_lock,
+         osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
+         osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
+       shard_list.push_back(one_shard);
+      }
+    }
+    ~ShardedOpWQ() override {
+      while (!shard_list.empty()) {
+       delete shard_list.back();
+       shard_list.pop_back();
+      }
+    }
+
+    /// wake any pg waiters after a PG is created/instantiated
+    void wake_pg_waiters(spg_t pgid);
+
+    /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
+    void prune_pg_waiters(OSDMapRef osdmap, int whoami);
+
+    /// clear cached PGRef on pg deletion
+    void clear_pg_pointer(spg_t pgid);
+
+    /// clear pg_slots on shutdown
+    void clear_pg_slots();
+
+    /// try to do some work
+    void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
+
+    /// enqueue a new item
+    void _enqueue(pair <spg_t, PGQueueable> item) override;
+
+    /// requeue an old item (at the front of the line)
+    void _enqueue_front(pair <spg_t, PGQueueable> item) override;
+      
+    void return_waiting_threads() override {
+      for(uint32_t i = 0; i < num_shards; i++) {
+       ShardData* sdata = shard_list[i];
+       assert (NULL != sdata); 
+       sdata->sdata_lock.Lock();
+       sdata->sdata_cond.Signal();
+       sdata->sdata_lock.Unlock();
+      }
+    }
+
+    void dump(Formatter *f) {
+      for(uint32_t i = 0; i < num_shards; i++) {
+       ShardData* sdata = shard_list[i];
+       char lock_name[32] = {0};
+       snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
+       assert (NULL != sdata);
+       sdata->sdata_op_ordering_lock.Lock();
+       f->open_object_section(lock_name);
+       sdata->pqueue->dump(f);
+       f->close_section();
+       sdata->sdata_op_ordering_lock.Unlock();
+      }
+    }
+
+    /// Must be called on ops queued back to front
+    struct Pred {
+      spg_t pgid;
+      list<OpRequestRef> *out_ops;
+      uint64_t reserved_pushes_to_free;
+      Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
+       : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+      void accumulate(const PGQueueable &op) {
+       reserved_pushes_to_free += op.get_reserved_pushes();
+       if (out_ops) {
+         boost::optional<OpRequestRef> mop = op.maybe_get_op();
+         if (mop)
+           out_ops->push_front(*mop);
+       }
+      }
+      bool operator()(const pair<spg_t, PGQueueable> &op) {
+       if (op.first == pgid) {
+         accumulate(op.second);
+         return true;
+       } else {
+         return false;
+       }
+      }
+      uint64_t get_reserved_pushes_to_free() const {
+       return reserved_pushes_to_free;
+      }
+    };
+
+    bool is_shard_empty(uint32_t thread_index) override {
+      uint32_t shard_index = thread_index % num_shards; 
+      ShardData* sdata = shard_list[shard_index];
+      assert(NULL != sdata);
+      Mutex::Locker l(sdata->sdata_op_ordering_lock);
+      return sdata->pqueue->empty();
+    }
+  } op_shardedwq;
+
+
+  void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
+  void dequeue_op(
+    PGRef pg, OpRequestRef op,
+    ThreadPool::TPHandle &handle);
+
+  // -- peering queue --
+  struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
+    list<PG*> peering_queue;
+    OSD *osd;
+    set<PG*> in_use;
+    PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
+      : ThreadPool::BatchWorkQueue<PG>(
+       "OSD::PeeringWQ", ti, si, tp), osd(o) {}
+
+    void _dequeue(PG *pg) override {
+      for (list<PG*>::iterator i = peering_queue.begin();
+          i != peering_queue.end();
+          ) {
+       if (*i == pg) {
+         peering_queue.erase(i++);
+         pg->put("PeeringWQ");
+       } else {
+         ++i;
+       }
+      }
+    }
+    bool _enqueue(PG *pg) override {
+      pg->get("PeeringWQ");
+      peering_queue.push_back(pg);
+      return true;
+    }
+    bool _empty() override {
+      return peering_queue.empty();
+    }
+    void _dequeue(list<PG*> *out) override;
+    void _process(
+      const list<PG *> &pgs,
+      ThreadPool::TPHandle &handle) override {
+      assert(!pgs.empty());
+      osd->process_peering_events(pgs, handle);
+      for (list<PG *>::const_iterator i = pgs.begin();
+          i != pgs.end();
+          ++i) {
+       (*i)->put("PeeringWQ");
+      }
+    }
+    void _process_finish(const list<PG *> &pgs) override {
+      for (list<PG*>::const_iterator i = pgs.begin();
+          i != pgs.end();
+          ++i) {
+       in_use.erase(*i);
+      }
+    }
+    void _clear() override {
+      assert(peering_queue.empty());
+    }
+  } peering_wq;
+
+  void process_peering_events(
+    const list<PG*> &pg,
+    ThreadPool::TPHandle &handle);
+
+  friend class PG;
+  friend class PrimaryLogPG;
+
+
+ protected:
+
+  // -- osd map --
+  OSDMapRef       osdmap;
+  OSDMapRef get_osdmap() {
+    return osdmap;
+  }
+  epoch_t get_osdmap_epoch() const {
+    return osdmap ? osdmap->get_epoch() : 0;
+  }
+
+  utime_t         had_map_since;
+  RWLock          map_lock;
+  list<OpRequestRef>  waiting_for_osdmap;
+  deque<utime_t> osd_markdown_log;
+
+  friend struct send_map_on_destruct;
+
+  void wait_for_new_map(OpRequestRef op);
+  void handle_osd_map(class MOSDMap *m);
+  void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
+  void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
+  void note_down_osd(int osd);
+  void note_up_osd(int osd);
+  friend class C_OnMapCommit;
+
+  bool advance_pg(
+    epoch_t advance_to, PG *pg,
+    ThreadPool::TPHandle &handle,
+    PG::RecoveryCtx *rctx,
+    set<PGRef> *split_pgs
+  );
+  void consume_map();
+  void activate_map();
+
+  // osd map cache (past osd maps)
+  OSDMapRef get_map(epoch_t e) {
+    return service.get_map(e);
+  }
+  OSDMapRef add_map(OSDMap *o) {
+    return service.add_map(o);
+  }
+  void add_map_bl(epoch_t e, bufferlist& bl) {
+    return service.add_map_bl(e, bl);
+  }
+  void pin_map_bl(epoch_t e, bufferlist &bl) {
+    return service.pin_map_bl(e, bl);
+  }
+  bool get_map_bl(epoch_t e, bufferlist& bl) {
+    return service.get_map_bl(e, bl);
+  }
+  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+    return service.add_map_inc_bl(e, bl);
+  }
+  void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
+    return service.pin_map_inc_bl(e, bl);
+  }
+
+protected:
+  // -- placement groups --
+  RWLock pg_map_lock; // this lock orders *above* individual PG _locks
+  ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
+
+  std::mutex pending_creates_lock;
+  std::set<pg_t> pending_creates_from_osd;
+  unsigned pending_creates_from_mon = 0;
+
+  map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
+  PGRecoveryStats pg_recovery_stats;
+
+  PGPool _get_pool(int id, OSDMapRef createmap);
+
+  PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
+  PG   *_lookup_lock_pg(spg_t pgid);
+
+public:
+  PG   *lookup_lock_pg(spg_t pgid);
+
+  int get_num_pgs() {
+    RWLock::RLocker l(pg_map_lock);
+    return pg_map.size();
+  }
+
+protected:
+  PG   *_open_lock_pg(OSDMapRef createmap,
+                     spg_t pg, bool no_lockdep_check=false);
+  enum res_result {
+    RES_PARENT,    // resurrected a parent
+    RES_SELF,      // resurrected self
+    RES_NONE       // nothing relevant deleting
+  };
+  res_result _try_resurrect_pg(
+    OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
+
+  PG   *_create_lock_pg(
+    OSDMapRef createmap,
+    spg_t pgid,
+    bool hold_map_lock,
+    bool backfill,
+    int role,
+    vector<int>& up, int up_primary,
+    vector<int>& acting, int acting_primary,
+    pg_history_t history,
+    const PastIntervals& pi,
+    ObjectStore::Transaction& t);
+
+  PG* _make_pg(OSDMapRef createmap, spg_t pgid);
+  void add_newly_split_pg(PG *pg,
+                         PG::RecoveryCtx *rctx);
+
+  int handle_pg_peering_evt(
+    spg_t pgid,
+    const pg_history_t& orig_history,
+    const PastIntervals& pi,
+    epoch_t epoch,
+    PG::CephPeeringEvtRef evt);
+  bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create);
+  void resume_creating_pg();
+
+  void load_pgs();
+  void build_past_intervals_parallel();
+
+  /// build initial pg history and intervals on create
+  void build_initial_pg_history(
+    spg_t pgid,
+    epoch_t created,
+    utime_t created_stamp,
+    pg_history_t *h,
+    PastIntervals *pi);
+
+  /// project pg history from from to now
+  bool project_pg_history(
+    spg_t pgid, pg_history_t& h, epoch_t from,
+    const vector<int>& lastup,
+    int lastupprimary,
+    const vector<int>& lastacting,
+    int lastactingprimary
+    ); ///< @return false if there was a map gap between from and now
+
+  // this must be called with pg->lock held on any pg addition to pg_map
+  void wake_pg_waiters(PGRef pg) {
+    assert(pg->is_locked());
+    op_shardedwq.wake_pg_waiters(pg->info.pgid);
+  }
+  epoch_t last_pg_create_epoch;
+
+  void handle_pg_create(OpRequestRef op);
+
+  void split_pgs(
+    PG *parent,
+    const set<spg_t> &childpgids, set<PGRef> *out_pgs,
+    OSDMapRef curmap,
+    OSDMapRef nextmap,
+    PG::RecoveryCtx *rctx);
+
+  // == monitor interaction ==
+  Mutex mon_report_lock;
+  utime_t last_mon_report;
+  utime_t last_pg_stats_sent;
+
+  /* if our monitor dies, we want to notice it and reconnect.
+   *  So we keep track of when it last acked our stat updates,
+   *  and if too much time passes (and we've been sending
+   *  more updates) then we can call it dead and reconnect
+   *  elsewhere.
+   */
+  utime_t last_pg_stats_ack;
+  float stats_ack_timeout;
+  set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
+
+  // -- boot --
+  void start_boot();
+  void _got_mon_epochs(epoch_t oldest, epoch_t newest);
+  void _preboot(epoch_t oldest, epoch_t newest);
+  void _send_boot();
+  void _collect_metadata(map<string,string> *pmeta);
+
+  void start_waiting_for_healthy();
+  bool _is_healthy();
+
+  void send_full_update();
+  
+  friend struct C_OSD_GetVersion;
+
+  // -- alive --
+  epoch_t up_thru_wanted;
+
+  void queue_want_up_thru(epoch_t want);
+  void send_alive();
+
+  // -- full map requests --
+  epoch_t requested_full_first, requested_full_last;
+
+  void request_full_map(epoch_t first, epoch_t last);
+  void rerequest_full_maps() {
+    epoch_t first = requested_full_first;
+    epoch_t last = requested_full_last;
+    requested_full_first = 0;
+    requested_full_last = 0;
+    request_full_map(first, last);
+  }
+  void got_full_map(epoch_t e);
+
+  // -- failures --
+  map<int,utime_t> failure_queue;
+  map<int,pair<utime_t,entity_inst_t> > failure_pending;
+
+  void requeue_failures();
+  void send_failures();
+  void send_still_alive(epoch_t epoch, const entity_inst_t &i);
+
+  // -- pg stats --
+  Mutex pg_stat_queue_lock;
+  Cond pg_stat_queue_cond;
+  xlist<PG*> pg_stat_queue;
+  bool osd_stat_updated;
+  uint64_t pg_stat_tid, pg_stat_tid_flushed;
+
+  void send_pg_stats(const utime_t &now);
+  void handle_pg_stats_ack(class MPGStatsAck *ack);
+  void flush_pg_stats();
+
+  ceph::coarse_mono_clock::time_point last_sent_beacon;
+  Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+  epoch_t min_last_epoch_clean = 0;
+  // which pgs were scanned for min_lec
+  std::vector<pg_t> min_last_epoch_clean_pgs;
+  void send_beacon(const ceph::coarse_mono_clock::time_point& now);
+
+  void pg_stat_queue_enqueue(PG *pg) {
+    pg_stat_queue_lock.Lock();
+    if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
+      pg->get("pg_stat_queue");
+      pg_stat_queue.push_back(&pg->stat_queue_item);
+    }
+    osd_stat_updated = true;
+    pg_stat_queue_lock.Unlock();
+  }
+  void pg_stat_queue_dequeue(PG *pg) {
+    pg_stat_queue_lock.Lock();
+    if (pg->stat_queue_item.remove_myself())
+      pg->put("pg_stat_queue");
+    pg_stat_queue_lock.Unlock();
+  }
+  void clear_pg_stat_queue() {
+    pg_stat_queue_lock.Lock();
+    while (!pg_stat_queue.empty()) {
+      PG *pg = pg_stat_queue.front();
+      pg_stat_queue.pop_front();
+      pg->put("pg_stat_queue");
+    }
+    pg_stat_queue_lock.Unlock();
+  }
+  void clear_outstanding_pg_stats(){
+    Mutex::Locker l(pg_stat_queue_lock);
+    outstanding_pg_stats.clear();
+  }
+
+  ceph_tid_t get_tid() {
+    return service.get_tid();
+  }
+
+  // -- generic pg peering --
+  PG::RecoveryCtx create_context();
+  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+                        ThreadPool::TPHandle *handle = NULL);
+  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+                                    ThreadPool::TPHandle *handle = NULL);
+  void do_notifies(map<int,
+                      vector<pair<pg_notify_t, PastIntervals> > >&
+                      notify_list,
+                  OSDMapRef map);
+  void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
+                 OSDMapRef map);
+  void do_infos(map<int,
+                   vector<pair<pg_notify_t, PastIntervals> > >& info_map,
+               OSDMapRef map);
+
+  bool require_mon_peer(const Message *m);
+  bool require_mon_or_mgr_peer(const Message *m);
+  bool require_osd_peer(const Message *m);
+  /***
+   * Verifies that we were alive in the given epoch, and that
+   * still are.
+   */
+  bool require_self_aliveness(const Message *m, epoch_t alive_since);
+  /**
+   * Verifies that the OSD who sent the given op has the same
+   * address as in the given map.
+   * @pre op was sent by an OSD using the cluster messenger
+   */
+  bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+                                 bool is_fast_dispatch);
+
+  bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
+                                bool is_fast_dispatch);
+
+  void handle_pg_query(OpRequestRef op);
+  void handle_pg_notify(OpRequestRef op);
+  void handle_pg_log(OpRequestRef op);
+  void handle_pg_info(OpRequestRef op);
+  void handle_pg_trim(OpRequestRef op);
+
+  void handle_pg_backfill_reserve(OpRequestRef op);
+  void handle_pg_recovery_reserve(OpRequestRef op);
+
+  void handle_force_recovery(Message *m);
+
+  void handle_pg_remove(OpRequestRef op);
+  void _remove_pg(PG *pg);
+
+  // -- commands --
+  struct Command {
+    vector<string> cmd;
+    ceph_tid_t tid;
+    bufferlist indata;
+    ConnectionRef con;
+
+    Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
+      : cmd(c), tid(t), indata(bl), con(co) {}
+  };
+  list<Command*> command_queue;
+  struct CommandWQ : public ThreadPool::WorkQueue<Command> {
+    OSD *osd;
+    CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
+      : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
+
+    bool _empty() override {
+      return osd->command_queue.empty();
+    }
+    bool _enqueue(Command *c) override {
+      osd->command_queue.push_back(c);
+      return true;
+    }
+    void _dequeue(Command *pg) override {
+      ceph_abort();
+    }
+    Command *_dequeue() override {
+      if (osd->command_queue.empty())
+       return NULL;
+      Command *c = osd->command_queue.front();
+      osd->command_queue.pop_front();
+      return c;
+    }
+    void _process(Command *c, ThreadPool::TPHandle &) override {
+      osd->osd_lock.Lock();
+      if (osd->is_stopping()) {
+       osd->osd_lock.Unlock();
+       delete c;
+       return;
+      }
+      osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
+      osd->osd_lock.Unlock();
+      delete c;
+    }
+    void _clear() override {
+      while (!osd->command_queue.empty()) {
+       Command *c = osd->command_queue.front();
+       osd->command_queue.pop_front();
+       delete c;
+      }
+    }
+  } command_wq;
+
+  void handle_command(class MMonCommand *m);
+  void handle_command(class MCommand *m);
+  void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
+
+  // -- pg recovery --
+  void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
+                  ThreadPool::TPHandle &handle);
+
+
+  // -- scrubbing --
+  void sched_scrub();
+  bool scrub_random_backoff();
+  bool scrub_load_below_threshold();
+  bool scrub_time_permit(utime_t now);
+
+  // -- removing --
+  struct RemoveWQ :
+    public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
+    CephContext* cct;
+    ObjectStore *&store;
+    list<pair<PGRef, DeletingStateRef> > remove_queue;
+    RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
+            ThreadPool *tp)
+      : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
+       "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
+
+    bool _empty() override {
+      return remove_queue.empty();
+    }
+    void _enqueue(pair<PGRef, DeletingStateRef> item) override {
+      remove_queue.push_back(item);
+    }
+    void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
+      remove_queue.push_front(item);
+    }
+    bool _dequeue(pair<PGRef, DeletingStateRef> item) {
+      ceph_abort();
+    }
+    pair<PGRef, DeletingStateRef> _dequeue() override {
+      assert(!remove_queue.empty());
+      pair<PGRef, DeletingStateRef> item = remove_queue.front();
+      remove_queue.pop_front();
+      return item;
+    }
+    void _process(pair<PGRef, DeletingStateRef>,
+                 ThreadPool::TPHandle &) override;
+    void _clear() override {
+      remove_queue.clear();
+    }
+  } remove_wq;
+
+private:
+  bool ms_can_fast_dispatch_any() const override { return true; }
+  bool ms_can_fast_dispatch(const Message *m) const override {
+    switch (m->get_type()) {
+    case CEPH_MSG_OSD_OP:
+    case CEPH_MSG_OSD_BACKOFF:
+    case MSG_OSD_SUBOP:
+    case MSG_OSD_REPOP:
+    case MSG_OSD_SUBOPREPLY:
+    case MSG_OSD_REPOPREPLY:
+    case MSG_OSD_PG_PUSH:
+    case MSG_OSD_PG_PULL:
+    case MSG_OSD_PG_PUSH_REPLY:
+    case MSG_OSD_PG_SCAN:
+    case MSG_OSD_PG_BACKFILL:
+    case MSG_OSD_PG_BACKFILL_REMOVE:
+    case MSG_OSD_EC_WRITE:
+    case MSG_OSD_EC_WRITE_REPLY:
+    case MSG_OSD_EC_READ:
+    case MSG_OSD_EC_READ_REPLY:
+    case MSG_OSD_SCRUB_RESERVE:
+    case MSG_OSD_REP_SCRUB:
+    case MSG_OSD_REP_SCRUBMAP:
+    case MSG_OSD_PG_UPDATE_LOG_MISSING:
+    case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+    case MSG_OSD_PG_RECOVERY_DELETE:
+    case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+      return true;
+    default:
+      return false;
+    }
+  }
+  void ms_fast_dispatch(Message *m) override;
+  void ms_fast_preprocess(Message *m) override;
+  bool ms_dispatch(Message *m) override;
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
+  bool ms_verify_authorizer(Connection *con, int peer_type,
+                           int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
+                           bool& isvalid, CryptoKey& session_key) override;
+  void ms_handle_connect(Connection *con) override;
+  void ms_handle_fast_connect(Connection *con) override;
+  void ms_handle_fast_accept(Connection *con) override;
+  bool ms_handle_reset(Connection *con) override;
+  void ms_handle_remote_reset(Connection *con) override {}
+  bool ms_handle_refused(Connection *con) override;
+
+  io_queue get_io_queue() const {
+    if (cct->_conf->osd_op_queue == "debug_random") {
+      static io_queue index_lookup[] = { io_queue::prioritized,
+                                        io_queue::weightedpriority,
+                                        io_queue::mclock_opclass,
+                                        io_queue::mclock_client };
+      srand(time(NULL));
+      unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+      return index_lookup[which];
+    } else if (cct->_conf->osd_op_queue == "prioritized") {
+      return io_queue::prioritized;
+    } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
+      return io_queue::mclock_opclass;
+    } else if (cct->_conf->osd_op_queue == "mclock_client") {
+      return io_queue::mclock_client;
+    } else {
+      // default / catch-all is 'wpq'
+      return io_queue::weightedpriority;
+    }
+  }
+
+  unsigned int get_io_prio_cut() const {
+    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+      srand(time(NULL));
+      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+      return CEPH_MSG_PRIO_HIGH;
+    } else {
+      // default / catch-all is 'low'
+      return CEPH_MSG_PRIO_LOW;
+    }
+  }
+
+ public:
+  /* internal and external can point to the same messenger, they will still
+   * be cleaned up properly*/
+  OSD(CephContext *cct_,
+      ObjectStore *store_,
+      int id,
+      Messenger *internal,
+      Messenger *external,
+      Messenger *hb_front_client,
+      Messenger *hb_back_client,
+      Messenger *hb_front_server,
+      Messenger *hb_back_server,
+      Messenger *osdc_messenger,
+      MonClient *mc, const std::string &dev, const std::string &jdev);
+  ~OSD() override;
+
+  // static bits
+  static int mkfs(CephContext *cct, ObjectStore *store,
+                 const string& dev,
+                 uuid_d fsid, int whoami);
+  /* remove any non-user xattrs from a map of them */
+  void filter_xattrs(map<string, bufferptr>& attrs) {
+    for (map<string, bufferptr>::iterator iter = attrs.begin();
+        iter != attrs.end();
+        ) {
+      if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
+       attrs.erase(iter++);
+      else ++iter;
+    }
+  }
+
+private:
+  int mon_cmd_maybe_osd_create(string &cmd);
+  int update_crush_device_class();
+  int update_crush_location();
+
+  static int write_meta(CephContext *cct,
+                       ObjectStore *store,
+                       uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
+
+  void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
+  void handle_scrub(struct MOSDScrub *m);
+  void handle_osd_ping(class MOSDPing *m);
+
+  int init_op_flags(OpRequestRef& op);
+
+  int get_num_op_shards();
+  int get_num_op_threads();
+
+  float get_osd_recovery_sleep();
+
+public:
+  static int peek_meta(ObjectStore *store, string& magic,
+                      uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
+  
+
+  // startup/shutdown
+  int pre_init();
+  int init();
+  void final_init();
+
+  int enable_disable_fuse(bool stop);
+
+  void suicide(int exitcode);
+  int shutdown();
+
+  void handle_signal(int signum);
+
+  /// check if we can throw out op from a disconnected client
+  static bool op_is_discardable(const MOSDOp *m);
+
+public:
+  OSDService service;
+  friend class OSDService;
+};
+
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+
+//compatibility of the executable
+extern const CompatSet::Feature ceph_osd_feature_compat[];
+extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
+extern const CompatSet::Feature ceph_osd_feature_incompat[];
+
+#endif // CEPH_OSD_H