initial code repo
[stor4nfv.git] / src / ceph / src / os / bluestore / BlueStore.h
diff --git a/src/ceph/src/os/bluestore/BlueStore.h b/src/ceph/src/os/bluestore/BlueStore.h
new file mode 100644 (file)
index 0000000..57a8688
--- /dev/null
@@ -0,0 +1,2723 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OSD_BLUESTORE_H
+#define CEPH_OSD_BLUESTORE_H
+
+#include "acconfig.h"
+
+#include <unistd.h>
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/unordered_set.hpp>
+#include <boost/intrusive/set.hpp>
+#include <boost/functional/hash.hpp>
+#include <boost/dynamic_bitset.hpp>
+
+#include "include/assert.h"
+#include "include/unordered_map.h"
+#include "include/memory.h"
+#include "include/mempool.h"
+#include "common/Finisher.h"
+#include "common/perf_counters.h"
+#include "compressor/Compressor.h"
+#include "os/ObjectStore.h"
+
+#include "bluestore_types.h"
+#include "BlockDevice.h"
+#include "common/EventTrace.h"
+
+class Allocator;
+class FreelistManager;
+class BlueFS;
+
+//#define DEBUG_CACHE
+//#define DEBUG_DEFERRED
+
+
+
+// constants for Buffer::optimize()
+#define MAX_BUFFER_SLOP_RATIO_DEN  8  // so actually 1/N
+
+
+enum {
+  l_bluestore_first = 732430,
+  l_bluestore_kv_flush_lat,
+  l_bluestore_kv_commit_lat,
+  l_bluestore_kv_lat,
+  l_bluestore_state_prepare_lat,
+  l_bluestore_state_aio_wait_lat,
+  l_bluestore_state_io_done_lat,
+  l_bluestore_state_kv_queued_lat,
+  l_bluestore_state_kv_committing_lat,
+  l_bluestore_state_kv_done_lat,
+  l_bluestore_state_deferred_queued_lat,
+  l_bluestore_state_deferred_aio_wait_lat,
+  l_bluestore_state_deferred_cleanup_lat,
+  l_bluestore_state_finishing_lat,
+  l_bluestore_state_done_lat,
+  l_bluestore_throttle_lat,
+  l_bluestore_submit_lat,
+  l_bluestore_commit_lat,
+  l_bluestore_read_lat,
+  l_bluestore_read_onode_meta_lat,
+  l_bluestore_read_wait_aio_lat,
+  l_bluestore_compress_lat,
+  l_bluestore_decompress_lat,
+  l_bluestore_csum_lat,
+  l_bluestore_compress_success_count,
+  l_bluestore_compress_rejected_count,
+  l_bluestore_write_pad_bytes,
+  l_bluestore_deferred_write_ops,
+  l_bluestore_deferred_write_bytes,
+  l_bluestore_write_penalty_read_ops,
+  l_bluestore_allocated,
+  l_bluestore_stored,
+  l_bluestore_compressed,
+  l_bluestore_compressed_allocated,
+  l_bluestore_compressed_original,
+  l_bluestore_onodes,
+  l_bluestore_onode_hits,
+  l_bluestore_onode_misses,
+  l_bluestore_onode_shard_hits,
+  l_bluestore_onode_shard_misses,
+  l_bluestore_extents,
+  l_bluestore_blobs,
+  l_bluestore_buffers,
+  l_bluestore_buffer_bytes,
+  l_bluestore_buffer_hit_bytes,
+  l_bluestore_buffer_miss_bytes,
+  l_bluestore_write_big,
+  l_bluestore_write_big_bytes,
+  l_bluestore_write_big_blobs,
+  l_bluestore_write_small,
+  l_bluestore_write_small_bytes,
+  l_bluestore_write_small_unused,
+  l_bluestore_write_small_deferred,
+  l_bluestore_write_small_pre_read,
+  l_bluestore_write_small_new,
+  l_bluestore_txc,
+  l_bluestore_onode_reshard,
+  l_bluestore_blob_split,
+  l_bluestore_extent_compress,
+  l_bluestore_gc_merged,
+  l_bluestore_last
+};
+
+class BlueStore : public ObjectStore,
+                 public md_config_obs_t {
+  // -----------------------------------------------------
+  // types
+public:
+  // config observer
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(const struct md_config_t *conf,
+                                  const std::set<std::string> &changed) override;
+
+  void _set_csum();
+  void _set_compression();
+  void _set_throttle_params();
+  int _set_cache_sizes();
+
+  class TransContext;
+
+  typedef map<uint64_t, bufferlist> ready_regions_t;
+
+  struct BufferSpace;
+  struct Collection;
+  typedef boost::intrusive_ptr<Collection> CollectionRef;
+
+  struct AioContext {
+    virtual void aio_finish(BlueStore *store) = 0;
+    virtual ~AioContext() {}
+  };
+
+  /// cached buffer
+  struct Buffer {
+    MEMPOOL_CLASS_HELPERS();
+
+    enum {
+      STATE_EMPTY,     ///< empty buffer -- used for cache history
+      STATE_CLEAN,     ///< clean data that is up to date
+      STATE_WRITING,   ///< data that is being written (io not yet complete)
+    };
+    static const char *get_state_name(int s) {
+      switch (s) {
+      case STATE_EMPTY: return "empty";
+      case STATE_CLEAN: return "clean";
+      case STATE_WRITING: return "writing";
+      default: return "???";
+      }
+    }
+    enum {
+      FLAG_NOCACHE = 1,  ///< trim when done WRITING (do not become CLEAN)
+      // NOTE: fix operator<< when you define a second flag
+    };
+    static const char *get_flag_name(int s) {
+      switch (s) {
+      case FLAG_NOCACHE: return "nocache";
+      default: return "???";
+      }
+    }
+
+    BufferSpace *space;
+    uint16_t state;             ///< STATE_*
+    uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
+    uint32_t flags;             ///< FLAG_*
+    uint64_t seq;
+    uint32_t offset, length;
+    bufferlist data;
+
+    boost::intrusive::list_member_hook<> lru_item;
+    boost::intrusive::list_member_hook<> state_item;
+
+    Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
+          unsigned f = 0)
+      : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
+    Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
+          unsigned f = 0)
+      : space(space), state(s), flags(f), seq(q), offset(o),
+       length(b.length()), data(b) {}
+
+    bool is_empty() const {
+      return state == STATE_EMPTY;
+    }
+    bool is_clean() const {
+      return state == STATE_CLEAN;
+    }
+    bool is_writing() const {
+      return state == STATE_WRITING;
+    }
+
+    uint32_t end() const {
+      return offset + length;
+    }
+
+    void truncate(uint32_t newlen) {
+      assert(newlen < length);
+      if (data.length()) {
+       bufferlist t;
+       t.substr_of(data, 0, newlen);
+       data.claim(t);
+      }
+      length = newlen;
+    }
+    void maybe_rebuild() {
+      if (data.length() &&
+         (data.get_num_buffers() > 1 ||
+          data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
+       data.rebuild();
+      }
+    }
+
+    void dump(Formatter *f) const {
+      f->dump_string("state", get_state_name(state));
+      f->dump_unsigned("seq", seq);
+      f->dump_unsigned("offset", offset);
+      f->dump_unsigned("length", length);
+      f->dump_unsigned("data_length", data.length());
+    }
+  };
+
+  struct Cache;
+
+  /// map logical extent range (object) onto buffers
+  struct BufferSpace {
+    typedef boost::intrusive::list<
+      Buffer,
+      boost::intrusive::member_hook<
+        Buffer,
+       boost::intrusive::list_member_hook<>,
+       &Buffer::state_item> > state_list_t;
+
+    mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
+      buffer_map;
+
+    // we use a bare intrusive list here instead of std::map because
+    // it uses less memory and we expect this to be very small (very
+    // few IOs in flight to the same Blob at the same time).
+    state_list_t writing;   ///< writing buffers, sorted by seq, ascending
+
+    ~BufferSpace() {
+      assert(buffer_map.empty());
+      assert(writing.empty());
+    }
+
+    void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
+      cache->_audit("_add_buffer start");
+      buffer_map[b->offset].reset(b);
+      if (b->is_writing()) {
+       b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
+        if (writing.empty() || writing.rbegin()->seq <= b->seq) {
+          writing.push_back(*b);
+        } else {
+          auto it = writing.begin();
+          while (it->seq < b->seq) {
+            ++it;
+          }
+
+          assert(it->seq >= b->seq);
+          // note that this will insert b before it
+          // hence the order is maintained
+          writing.insert(it, *b);
+        }
+      } else {
+       b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
+       cache->_add_buffer(b, level, near);
+      }
+      cache->_audit("_add_buffer end");
+    }
+    void _rm_buffer(Cache* cache, Buffer *b) {
+      _rm_buffer(cache, buffer_map.find(b->offset));
+    }
+    void _rm_buffer(Cache* cache,
+                   map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
+      assert(p != buffer_map.end());
+      cache->_audit("_rm_buffer start");
+      if (p->second->is_writing()) {
+        writing.erase(writing.iterator_to(*p->second));
+      } else {
+       cache->_rm_buffer(p->second.get());
+      }
+      buffer_map.erase(p);
+      cache->_audit("_rm_buffer end");
+    }
+
+    map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
+      uint32_t offset) {
+      auto i = buffer_map.lower_bound(offset);
+      if (i != buffer_map.begin()) {
+       --i;
+       if (i->first + i->second->length <= offset)
+         ++i;
+      }
+      return i;
+    }
+
+    // must be called under protection of the Cache lock
+    void _clear(Cache* cache);
+
+    // return value is the highest cache_private of a trimmed buffer, or 0.
+    int discard(Cache* cache, uint32_t offset, uint32_t length) {
+      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      return _discard(cache, offset, length);
+    }
+    int _discard(Cache* cache, uint32_t offset, uint32_t length);
+
+    void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
+              unsigned flags) {
+      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
+                            flags);
+      b->cache_private = _discard(cache, offset, bl.length());
+      _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
+    }
+    void finish_write(Cache* cache, uint64_t seq);
+    void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
+      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
+      b->cache_private = _discard(cache, offset, bl.length());
+      _add_buffer(cache, b, 1, nullptr);
+    }
+
+    void read(Cache* cache, uint32_t offset, uint32_t length,
+             BlueStore::ready_regions_t& res,
+             interval_set<uint32_t>& res_intervals);
+
+    void truncate(Cache* cache, uint32_t offset) {
+      discard(cache, offset, (uint32_t)-1 - offset);
+    }
+
+    void split(Cache* cache, size_t pos, BufferSpace &r);
+
+    void dump(Cache* cache, Formatter *f) const {
+      std::lock_guard<std::recursive_mutex> l(cache->lock);
+      f->open_array_section("buffers");
+      for (auto& i : buffer_map) {
+       f->open_object_section("buffer");
+       assert(i.first == i.second->offset);
+       i.second->dump(f);
+       f->close_section();
+      }
+      f->close_section();
+    }
+  };
+
+  struct SharedBlobSet;
+
+  /// in-memory shared blob state (incl cached buffers)
+  struct SharedBlob {
+    MEMPOOL_CLASS_HELPERS();
+
+    std::atomic_int nref = {0}; ///< reference count
+    bool loaded = false;
+
+    CollectionRef coll;
+    union {
+      uint64_t sbid_unloaded;              ///< sbid if persistent isn't loaded
+      bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
+    };
+    BufferSpace bc;             ///< buffer cache
+
+    SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
+      if (get_cache()) {
+       get_cache()->add_blob();
+      }
+    }
+    SharedBlob(uint64_t i, Collection *_coll);
+    ~SharedBlob();
+
+    uint64_t get_sbid() const {
+      return loaded ? persistent->sbid : sbid_unloaded;
+    }
+
+    friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
+    friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
+
+    friend ostream& operator<<(ostream& out, const SharedBlob& sb);
+
+    void get() {
+      ++nref;
+    }
+    void put();
+
+    /// get logical references
+    void get_ref(uint64_t offset, uint32_t length);
+
+    /// put logical references, and get back any released extents
+    void put_ref(uint64_t offset, uint32_t length,
+                PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
+
+    friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
+      return l.get_sbid() == r.get_sbid();
+    }
+    inline Cache* get_cache() {
+      return coll ? coll->cache : nullptr;
+    }
+    inline SharedBlobSet* get_parent() {
+      return coll ? &(coll->shared_blob_set) : nullptr;
+    }
+    inline bool is_loaded() const {
+      return loaded;
+    }
+
+  };
+  typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
+
+  /// a lookup table of SharedBlobs
+  struct SharedBlobSet {
+    std::mutex lock;   ///< protect lookup, insertion, removal
+
+    // we use a bare pointer because we don't want to affect the ref
+    // count
+    mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
+
+    SharedBlobRef lookup(uint64_t sbid) {
+      std::lock_guard<std::mutex> l(lock);
+      auto p = sb_map.find(sbid);
+      if (p == sb_map.end()) {
+        return nullptr;
+      }
+      return p->second;
+    }
+
+    void add(Collection* coll, SharedBlob *sb) {
+      std::lock_guard<std::mutex> l(lock);
+      sb_map[sb->get_sbid()] = sb;
+      sb->coll = coll;
+    }
+
+    bool try_remove(SharedBlob *sb) {
+      std::lock_guard<std::mutex> l(lock);
+      if (sb->nref == 0) {
+       assert(sb->get_parent() == this);
+       sb_map.erase(sb->get_sbid());
+       return true;
+      }
+      return false;
+    }
+
+    void remove(SharedBlob *sb) {
+      std::lock_guard<std::mutex> l(lock);
+      assert(sb->get_parent() == this);
+      sb_map.erase(sb->get_sbid());
+    }
+
+    bool empty() {
+      std::lock_guard<std::mutex> l(lock);
+      return sb_map.empty();
+    }
+
+    void dump(CephContext *cct, int lvl);
+  };
+
+//#define CACHE_BLOB_BL  // not sure if this is a win yet or not... :/
+
+  /// in-memory blob metadata and associated cached buffers (if any)
+  struct Blob {
+    MEMPOOL_CLASS_HELPERS();
+
+    std::atomic_int nref = {0};     ///< reference count
+    int16_t id = -1;                ///< id, for spanning blobs only, >= 0
+    int16_t last_encoded_id = -1;   ///< (ephemeral) used during encoding only
+    SharedBlobRef shared_blob;      ///< shared blob state (if any)
+
+  private:
+    mutable bluestore_blob_t blob;  ///< decoded blob metadata
+#ifdef CACHE_BLOB_BL
+    mutable bufferlist blob_bl;     ///< cached encoded blob, blob is dirty if empty
+#endif
+    /// refs from this shard.  ephemeral if id<0, persisted if spanning.
+    bluestore_blob_use_tracker_t used_in_blob;
+
+  public:
+
+    friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
+    friend void intrusive_ptr_release(Blob *b) { b->put(); }
+
+    friend ostream& operator<<(ostream& out, const Blob &b);
+
+    const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
+      return used_in_blob;
+    }
+    bool is_referenced() const {
+      return used_in_blob.is_not_empty();
+    }
+    uint32_t get_referenced_bytes() const {
+      return used_in_blob.get_referenced_bytes();
+    }
+
+    bool is_spanning() const {
+      return id >= 0;
+    }
+
+    bool can_split() const {
+      std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
+      // splitting a BufferSpace writing list is too hard; don't try.
+      return shared_blob->bc.writing.empty() &&
+             used_in_blob.can_split() &&
+             get_blob().can_split();
+    }
+
+    bool can_split_at(uint32_t blob_offset) const {
+      return used_in_blob.can_split_at(blob_offset) &&
+             get_blob().can_split_at(blob_offset);
+    }
+
+    bool can_reuse_blob(uint32_t min_alloc_size,
+                       uint32_t target_blob_size,
+                       uint32_t b_offset,
+                       uint32_t *length0);
+
+    void dup(Blob& o) {
+      o.shared_blob = shared_blob;
+      o.blob = blob;
+#ifdef CACHE_BLOB_BL
+      o.blob_bl = blob_bl;
+#endif
+    }
+
+    inline const bluestore_blob_t& get_blob() const {
+      return blob;
+    }
+    inline bluestore_blob_t& dirty_blob() {
+#ifdef CACHE_BLOB_BL
+      blob_bl.clear();
+#endif
+      return blob;
+    }
+
+    /// discard buffers for unallocated regions
+    void discard_unallocated(Collection *coll);
+
+    /// get logical references
+    void get_ref(Collection *coll, uint32_t offset, uint32_t length);
+    /// put logical references, and get back any released extents
+    bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
+                PExtentVector *r);
+
+    /// split the blob
+    void split(Collection *coll, uint32_t blob_offset, Blob *o);
+
+    void get() {
+      ++nref;
+    }
+    void put() {
+      if (--nref == 0)
+       delete this;
+    }
+
+
+#ifdef CACHE_BLOB_BL
+    void _encode() const {
+      if (blob_bl.length() == 0 ) {
+       ::encode(blob, blob_bl);
+      } else {
+       assert(blob_bl.length());
+      }
+    }
+    void bound_encode(
+      size_t& p,
+      bool include_ref_map) const {
+      _encode();
+      p += blob_bl.length();
+      if (include_ref_map) {
+       used_in_blob.bound_encode(p);
+      }
+    }
+    void encode(
+      bufferlist::contiguous_appender& p,
+      bool include_ref_map) const {
+      _encode();
+      p.append(blob_bl);
+      if (include_ref_map) {
+       used_in_blob.encode(p);
+      }
+    }
+    void decode(
+      Collection */*coll*/,
+      bufferptr::iterator& p,
+      bool include_ref_map) {
+      const char *start = p.get_pos();
+      denc(blob, p);
+      const char *end = p.get_pos();
+      blob_bl.clear();
+      blob_bl.append(start, end - start);
+      if (include_ref_map) {
+       used_in_blob.decode(p);
+      }
+    }
+#else
+    void bound_encode(
+      size_t& p,
+      uint64_t struct_v,
+      uint64_t sbid,
+      bool include_ref_map) const {
+      denc(blob, p, struct_v);
+      if (blob.is_shared()) {
+        denc(sbid, p);
+      }
+      if (include_ref_map) {
+       used_in_blob.bound_encode(p);
+      }
+    }
+    void encode(
+      bufferlist::contiguous_appender& p,
+      uint64_t struct_v,
+      uint64_t sbid,
+      bool include_ref_map) const {
+      denc(blob, p, struct_v);
+      if (blob.is_shared()) {
+        denc(sbid, p);
+      }
+      if (include_ref_map) {
+       used_in_blob.encode(p);
+      }
+    }
+    void decode(
+      Collection *coll,
+      bufferptr::iterator& p,
+      uint64_t struct_v,
+      uint64_t* sbid,
+      bool include_ref_map);
+#endif
+  };
+  typedef boost::intrusive_ptr<Blob> BlobRef;
+  typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
+
+  /// a logical extent, pointing to (some portion of) a blob
+  typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
+  struct Extent : public ExtentBase {
+    MEMPOOL_CLASS_HELPERS();
+
+    uint32_t logical_offset = 0;      ///< logical offset
+    uint32_t blob_offset = 0;         ///< blob offset
+    uint32_t length = 0;              ///< length
+    BlobRef  blob;                    ///< the blob with our data
+
+    /// ctor for lookup only
+    explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
+    /// ctor for delayed initialization (see decode_some())
+    explicit Extent() : ExtentBase() {
+    }
+    /// ctor for general usage
+    Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
+      : ExtentBase(),
+        logical_offset(lo), blob_offset(o), length(l) {
+      assign_blob(b);
+    }
+    ~Extent() {
+      if (blob) {
+       blob->shared_blob->get_cache()->rm_extent();
+      }
+    }
+
+    void assign_blob(const BlobRef& b) {
+      assert(!blob);
+      blob = b;
+      blob->shared_blob->get_cache()->add_extent();
+    }
+
+    // comparators for intrusive_set
+    friend bool operator<(const Extent &a, const Extent &b) {
+      return a.logical_offset < b.logical_offset;
+    }
+    friend bool operator>(const Extent &a, const Extent &b) {
+      return a.logical_offset > b.logical_offset;
+    }
+    friend bool operator==(const Extent &a, const Extent &b) {
+      return a.logical_offset == b.logical_offset;
+    }
+
+    uint32_t blob_start() const {
+      return logical_offset - blob_offset;
+    }
+
+    uint32_t blob_end() const {
+      return blob_start() + blob->get_blob().get_logical_length();
+    }
+
+    uint32_t logical_end() const {
+      return logical_offset + length;
+    }
+
+    // return true if any piece of the blob is out of
+    // the given range [o, o + l].
+    bool blob_escapes_range(uint32_t o, uint32_t l) const {
+      return blob_start() < o || blob_end() > o + l;
+    }
+  };
+  typedef boost::intrusive::set<Extent> extent_map_t;
+
+
+  friend ostream& operator<<(ostream& out, const Extent& e);
+
+  struct OldExtent {
+    boost::intrusive::list_member_hook<> old_extent_item;
+    Extent e;
+    PExtentVector r;
+    bool blob_empty; // flag to track the last removed extent that makes blob
+                     // empty - required to update compression stat properly
+    OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
+      : e(lo, o, l, b), blob_empty(false) {
+    }
+    static OldExtent* create(CollectionRef c,
+                             uint32_t lo,
+                            uint32_t o,
+                            uint32_t l,
+                            BlobRef& b);
+  };
+  typedef boost::intrusive::list<
+      OldExtent,
+      boost::intrusive::member_hook<
+        OldExtent,
+    boost::intrusive::list_member_hook<>,
+    &OldExtent::old_extent_item> > old_extent_map_t;
+
+  struct Onode;
+
+  /// a sharded extent map, mapping offsets to lextents to blobs
+  struct ExtentMap {
+    Onode *onode;
+    extent_map_t extent_map;        ///< map of Extents to Blobs
+    blob_map_t spanning_blob_map;   ///< blobs that span shards
+
+    struct Shard {
+      bluestore_onode_t::shard_info *shard_info = nullptr;
+      unsigned extents = 0;  ///< count extents in this shard
+      bool loaded = false;   ///< true if shard is loaded
+      bool dirty = false;    ///< true if shard is dirty and needs reencoding
+    };
+    mempool::bluestore_cache_other::vector<Shard> shards;    ///< shards
+
+    bufferlist inline_bl;    ///< cached encoded map, if unsharded; empty=>dirty
+
+    uint32_t needs_reshard_begin = 0;
+    uint32_t needs_reshard_end = 0;
+
+    bool needs_reshard() const {
+      return needs_reshard_end > needs_reshard_begin;
+    }
+    void clear_needs_reshard() {
+      needs_reshard_begin = needs_reshard_end = 0;
+    }
+    void request_reshard(uint32_t begin, uint32_t end) {
+      if (begin < needs_reshard_begin) {
+       needs_reshard_begin = begin;
+      }
+      if (end > needs_reshard_end) {
+       needs_reshard_end = end;
+      }
+    }
+
+    struct DeleteDisposer {
+      void operator()(Extent *e) { delete e; }
+    };
+
+    ExtentMap(Onode *o);
+    ~ExtentMap() {
+      extent_map.clear_and_dispose(DeleteDisposer());
+    }
+
+    void clear() {
+      extent_map.clear_and_dispose(DeleteDisposer());
+      shards.clear();
+      inline_bl.clear();
+      clear_needs_reshard();
+    }
+
+    bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
+                    unsigned *pn);
+    unsigned decode_some(bufferlist& bl);
+
+    void bound_encode_spanning_blobs(size_t& p);
+    void encode_spanning_blobs(bufferlist::contiguous_appender& p);
+    void decode_spanning_blobs(bufferptr::iterator& p);
+
+    BlobRef get_spanning_blob(int id) {
+      auto p = spanning_blob_map.find(id);
+      assert(p != spanning_blob_map.end());
+      return p->second;
+    }
+
+    void update(KeyValueDB::Transaction t, bool force);
+    decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
+    void reshard(
+      KeyValueDB *db,
+      KeyValueDB::Transaction t);
+
+    /// initialize Shards from the onode
+    void init_shards(bool loaded, bool dirty);
+
+    /// return index of shard containing offset
+    /// or -1 if not found
+    int seek_shard(uint32_t offset) {
+      size_t end = shards.size();
+      size_t mid, left = 0;
+      size_t right = end; // one passed the right end
+
+      while (left < right) {
+        mid = left + (right - left) / 2;
+        if (offset >= shards[mid].shard_info->offset) {
+          size_t next = mid + 1;
+          if (next >= end || offset < shards[next].shard_info->offset)
+            return mid;
+          //continue to search forwards
+          left = next;
+        } else {
+          //continue to search backwards
+          right = mid;
+        }
+      }
+
+      return -1; // not found
+    }
+
+    /// check if a range spans a shard
+    bool spans_shard(uint32_t offset, uint32_t length) {
+      if (shards.empty()) {
+       return false;
+      }
+      int s = seek_shard(offset);
+      assert(s >= 0);
+      if (s == (int)shards.size() - 1) {
+       return false; // last shard
+      }
+      if (offset + length <= shards[s+1].shard_info->offset) {
+       return false;
+      }
+      return true;
+    }
+
+    /// ensure that a range of the map is loaded
+    void fault_range(KeyValueDB *db,
+                    uint32_t offset, uint32_t length);
+
+    /// ensure a range of the map is marked dirty
+    void dirty_range(uint32_t offset, uint32_t length);
+
+    /// for seek_lextent test
+    extent_map_t::iterator find(uint64_t offset);
+
+    /// seek to the first lextent including or after offset
+    extent_map_t::iterator seek_lextent(uint64_t offset);
+    extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
+
+    /// add a new Extent
+    void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
+      extent_map.insert(*new Extent(lo, o, l, b));
+    }
+
+    /// remove (and delete) an Extent
+    void rm(extent_map_t::iterator p) {
+      extent_map.erase_and_dispose(p, DeleteDisposer());
+    }
+
+    bool has_any_lextents(uint64_t offset, uint64_t length);
+
+    /// consolidate adjacent lextents in extent_map
+    int compress_extent_map(uint64_t offset, uint64_t length);
+
+    /// punch a logical hole.  add lextents to deref to target list.
+    void punch_hole(CollectionRef &c,
+                   uint64_t offset, uint64_t length,
+                   old_extent_map_t *old_extents);
+
+    /// put new lextent into lextent_map overwriting existing ones if
+    /// any and update references accordingly
+    Extent *set_lextent(CollectionRef &c,
+                       uint64_t logical_offset,
+                       uint64_t offset, uint64_t length,
+                        BlobRef b,
+                       old_extent_map_t *old_extents);
+
+    /// split a blob (and referring extents)
+    BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
+  };
+
+  /// Compressed Blob Garbage collector
+  /*
+  The primary idea of the collector is to estimate a difference between
+  allocation units(AU) currently present for compressed blobs and new AUs
+  required to store that data uncompressed. 
+  Estimation is performed for protrusive extents within a logical range
+  determined by a concatenation of old_extents collection and specific(current)
+  write request.
+  The root cause for old_extents use is the need to handle blob ref counts
+  properly. Old extents still hold blob refs and hence we need to traverse
+  the collection to determine if blob to be released.
+  Protrusive extents are extents that fit into the blob set in action
+  (ones that are below the logical range from above) but not removed totally
+  due to the current write. 
+  E.g. for
+  extent1 <loffs = 100, boffs = 100, len  = 100> -> 
+    blob1<compressed, len_on_disk=4096, logical_len=8192>
+  extent2 <loffs = 200, boffs = 200, len  = 100> ->
+    blob2<raw, len_on_disk=4096, llen=4096>
+  extent3 <loffs = 300, boffs = 300, len  = 100> ->
+    blob1<compressed, len_on_disk=4096, llen=8192>
+  extent4 <loffs = 4096, boffs = 0, len  = 100>  ->
+    blob3<raw, len_on_disk=4096, llen=4096>
+  write(300~100)
+  protrusive extents are within the following ranges <0~300, 400~8192-400>
+  In this case existing AUs that might be removed due to GC (i.e. blob1) 
+  use 2x4K bytes.
+  And new AUs expected after GC = 0 since extent1 to be merged into blob2.
+  Hence we should do a collect.
+  */
+  class GarbageCollector
+  {
+  public:
+    /// return amount of allocation units that might be saved due to GC
+    int64_t estimate(
+      uint64_t offset,
+      uint64_t length,
+      const ExtentMap& extent_map,
+      const old_extent_map_t& old_extents,
+      uint64_t min_alloc_size);
+
+    /// return a collection of extents to perform GC on
+    const vector<AllocExtent>& get_extents_to_collect() const {
+      return extents_to_collect;
+    }
+    GarbageCollector(CephContext* _cct) : cct(_cct) {}
+
+  private:
+    struct BlobInfo {
+      uint64_t referenced_bytes = 0;    ///< amount of bytes referenced in blob
+      int64_t expected_allocations = 0; ///< new alloc units required 
+                                        ///< in case of gc fulfilled
+      bool collect_candidate = false;   ///< indicate if blob has any extents 
+                                        ///< eligible for GC.
+      extent_map_t::const_iterator first_lextent; ///< points to the first 
+                                                  ///< lextent referring to 
+                                                  ///< the blob if any.
+                                                  ///< collect_candidate flag 
+                                                  ///< determines the validity
+      extent_map_t::const_iterator last_lextent;  ///< points to the last 
+                                                  ///< lextent referring to 
+                                                  ///< the blob if any.
+
+      BlobInfo(uint64_t ref_bytes) :
+        referenced_bytes(ref_bytes) {
+      }
+    };
+    CephContext* cct;
+    map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
+                                         ///< copies that are affected by the
+                                         ///< specific write
+
+    vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
+                                            ///< be collected if GC takes place
+
+    boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
+                                                ///<  unit when traversing 
+                                                ///< protrusive extents. 
+                                                ///< Other extents mapped to
+                                                ///< this AU to be ignored 
+                                                ///< (except the case where
+                                                ///< uncompressed extent follows
+                                                ///< compressed one - see below).
+    BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
+                                           ///< caused expected_allocations
+                                          ///< counter increment at this blob.
+                                           ///< if uncompressed extent follows 
+                                           ///< a decrement for the 
+                                          ///< expected_allocations counter 
+                                           ///< is needed
+    int64_t expected_allocations = 0;      ///< new alloc units required in case
+                                           ///< of gc fulfilled
+    int64_t expected_for_release = 0;      ///< alloc units currently used by
+                                           ///< compressed blobs that might
+                                           ///< gone after GC
+    uint64_t gc_start_offset;              ///starting offset for GC
+    uint64_t gc_end_offset;                ///ending offset for GC
+
+  protected:
+    void process_protrusive_extents(const BlueStore::ExtentMap& extent_map, 
+                                   uint64_t start_offset,
+                                   uint64_t end_offset,
+                                   uint64_t start_touch_offset,
+                                   uint64_t end_touch_offset,
+                                   uint64_t min_alloc_size);
+  };
+
+  struct OnodeSpace;
+
+  /// an in-memory object
+  struct Onode {
+    MEMPOOL_CLASS_HELPERS();
+
+    std::atomic_int nref;  ///< reference count
+    Collection *c;
+
+    ghobject_t oid;
+
+    /// key under PREFIX_OBJ where we are stored
+    mempool::bluestore_cache_other::string key;
+
+    boost::intrusive::list_member_hook<> lru_item;
+
+    bluestore_onode_t onode;  ///< metadata stored as value in kv store
+    bool exists;              ///< true if object logically exists
+
+    ExtentMap extent_map;
+
+    // track txc's that have not been committed to kv store (and whose
+    // effects cannot be read via the kvdb read methods)
+    std::atomic<int> flushing_count = {0};
+    std::mutex flush_lock;  ///< protect flush_txns
+    std::condition_variable flush_cond;   ///< wait here for uncommitted txns
+
+    Onode(Collection *c, const ghobject_t& o,
+         const mempool::bluestore_cache_other::string& k)
+      : nref(0),
+       c(c),
+       oid(o),
+       key(k),
+       exists(false),
+       extent_map(this) {
+    }
+
+    void flush();
+    void get() {
+      ++nref;
+    }
+    void put() {
+      if (--nref == 0)
+       delete this;
+    }
+  };
+  typedef boost::intrusive_ptr<Onode> OnodeRef;
+
+
+  /// a cache (shard) of onodes and buffers
+  struct Cache {
+    CephContext* cct;
+    PerfCounters *logger;
+    std::recursive_mutex lock;          ///< protect lru and other structures
+
+    std::atomic<uint64_t> num_extents = {0};
+    std::atomic<uint64_t> num_blobs = {0};
+
+    static Cache *create(CephContext* cct, string type, PerfCounters *logger);
+
+    Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
+    virtual ~Cache() {}
+
+    virtual void _add_onode(OnodeRef& o, int level) = 0;
+    virtual void _rm_onode(OnodeRef& o) = 0;
+    virtual void _touch_onode(OnodeRef& o) = 0;
+
+    virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
+    virtual void _rm_buffer(Buffer *b) = 0;
+    virtual void _move_buffer(Cache *src, Buffer *b) = 0;
+    virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
+    virtual void _touch_buffer(Buffer *b) = 0;
+
+    virtual uint64_t _get_num_onodes() = 0;
+    virtual uint64_t _get_buffer_bytes() = 0;
+
+    void add_extent() {
+      ++num_extents;
+    }
+    void rm_extent() {
+      --num_extents;
+    }
+
+    void add_blob() {
+      ++num_blobs;
+    }
+    void rm_blob() {
+      --num_blobs;
+    }
+
+    void trim(uint64_t target_bytes,
+             float target_meta_ratio,
+             float target_data_ratio,
+             float bytes_per_onode);
+
+    void trim_all();
+
+    virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
+
+    virtual void add_stats(uint64_t *onodes, uint64_t *extents,
+                          uint64_t *blobs,
+                          uint64_t *buffers,
+                          uint64_t *bytes) = 0;
+
+    bool empty() {
+      std::lock_guard<std::recursive_mutex> l(lock);
+      return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
+    }
+
+#ifdef DEBUG_CACHE
+    virtual void _audit(const char *s) = 0;
+#else
+    void _audit(const char *s) { /* no-op */ }
+#endif
+  };
+
+  /// simple LRU cache for onodes and buffers
+  struct LRUCache : public Cache {
+  private:
+    typedef boost::intrusive::list<
+      Onode,
+      boost::intrusive::member_hook<
+        Onode,
+       boost::intrusive::list_member_hook<>,
+       &Onode::lru_item> > onode_lru_list_t;
+    typedef boost::intrusive::list<
+      Buffer,
+      boost::intrusive::member_hook<
+       Buffer,
+       boost::intrusive::list_member_hook<>,
+       &Buffer::lru_item> > buffer_lru_list_t;
+
+    onode_lru_list_t onode_lru;
+
+    buffer_lru_list_t buffer_lru;
+    uint64_t buffer_size = 0;
+
+  public:
+    LRUCache(CephContext* cct) : Cache(cct) {}
+    uint64_t _get_num_onodes() override {
+      return onode_lru.size();
+    }
+    void _add_onode(OnodeRef& o, int level) override {
+      if (level > 0)
+       onode_lru.push_front(*o);
+      else
+       onode_lru.push_back(*o);
+    }
+    void _rm_onode(OnodeRef& o) override {
+      auto q = onode_lru.iterator_to(*o);
+      onode_lru.erase(q);
+    }
+    void _touch_onode(OnodeRef& o) override;
+
+    uint64_t _get_buffer_bytes() override {
+      return buffer_size;
+    }
+    void _add_buffer(Buffer *b, int level, Buffer *near) override {
+      if (near) {
+       auto q = buffer_lru.iterator_to(*near);
+       buffer_lru.insert(q, *b);
+      } else if (level > 0) {
+       buffer_lru.push_front(*b);
+      } else {
+       buffer_lru.push_back(*b);
+      }
+      buffer_size += b->length;
+    }
+    void _rm_buffer(Buffer *b) override {
+      assert(buffer_size >= b->length);
+      buffer_size -= b->length;
+      auto q = buffer_lru.iterator_to(*b);
+      buffer_lru.erase(q);
+    }
+    void _move_buffer(Cache *src, Buffer *b) override {
+      src->_rm_buffer(b);
+      _add_buffer(b, 0, nullptr);
+    }
+    void _adjust_buffer_size(Buffer *b, int64_t delta) override {
+      assert((int64_t)buffer_size + delta >= 0);
+      buffer_size += delta;
+    }
+    void _touch_buffer(Buffer *b) override {
+      auto p = buffer_lru.iterator_to(*b);
+      buffer_lru.erase(p);
+      buffer_lru.push_front(*b);
+      _audit("_touch_buffer end");
+    }
+
+    void _trim(uint64_t onode_max, uint64_t buffer_max) override;
+
+    void add_stats(uint64_t *onodes, uint64_t *extents,
+                  uint64_t *blobs,
+                  uint64_t *buffers,
+                  uint64_t *bytes) override {
+      std::lock_guard<std::recursive_mutex> l(lock);
+      *onodes += onode_lru.size();
+      *extents += num_extents;
+      *blobs += num_blobs;
+      *buffers += buffer_lru.size();
+      *bytes += buffer_size;
+    }
+
+#ifdef DEBUG_CACHE
+    void _audit(const char *s) override;
+#endif
+  };
+
+  // 2Q cache for buffers, LRU for onodes
+  struct TwoQCache : public Cache {
+  private:
+    // stick with LRU for onodes for now (fixme?)
+    typedef boost::intrusive::list<
+      Onode,
+      boost::intrusive::member_hook<
+        Onode,
+       boost::intrusive::list_member_hook<>,
+       &Onode::lru_item> > onode_lru_list_t;
+    typedef boost::intrusive::list<
+      Buffer,
+      boost::intrusive::member_hook<
+       Buffer,
+       boost::intrusive::list_member_hook<>,
+       &Buffer::lru_item> > buffer_list_t;
+
+    onode_lru_list_t onode_lru;
+
+    buffer_list_t buffer_hot;      ///< "Am" hot buffers
+    buffer_list_t buffer_warm_in;  ///< "A1in" newly warm buffers
+    buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
+    uint64_t buffer_bytes = 0;     ///< bytes
+
+    enum {
+      BUFFER_NEW = 0,
+      BUFFER_WARM_IN,   ///< in buffer_warm_in
+      BUFFER_WARM_OUT,  ///< in buffer_warm_out
+      BUFFER_HOT,       ///< in buffer_hot
+      BUFFER_TYPE_MAX
+    };
+
+    uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
+
+  public:
+    TwoQCache(CephContext* cct) : Cache(cct) {}
+    uint64_t _get_num_onodes() override {
+      return onode_lru.size();
+    }
+    void _add_onode(OnodeRef& o, int level) override {
+      if (level > 0)
+       onode_lru.push_front(*o);
+      else
+       onode_lru.push_back(*o);
+    }
+    void _rm_onode(OnodeRef& o) override {
+      auto q = onode_lru.iterator_to(*o);
+      onode_lru.erase(q);
+    }
+    void _touch_onode(OnodeRef& o) override;
+
+    uint64_t _get_buffer_bytes() override {
+      return buffer_bytes;
+    }
+    void _add_buffer(Buffer *b, int level, Buffer *near) override;
+    void _rm_buffer(Buffer *b) override;
+    void _move_buffer(Cache *src, Buffer *b) override;
+    void _adjust_buffer_size(Buffer *b, int64_t delta) override;
+    void _touch_buffer(Buffer *b) override {
+      switch (b->cache_private) {
+      case BUFFER_WARM_IN:
+       // do nothing (somewhat counter-intuitively!)
+       break;
+      case BUFFER_WARM_OUT:
+       // move from warm_out to hot LRU
+       assert(0 == "this happens via discard hint");
+       break;
+      case BUFFER_HOT:
+       // move to front of hot LRU
+       buffer_hot.erase(buffer_hot.iterator_to(*b));
+       buffer_hot.push_front(*b);
+       break;
+      }
+      _audit("_touch_buffer end");
+    }
+
+    void _trim(uint64_t onode_max, uint64_t buffer_max) override;
+
+    void add_stats(uint64_t *onodes, uint64_t *extents,
+                  uint64_t *blobs,
+                  uint64_t *buffers,
+                  uint64_t *bytes) override {
+      std::lock_guard<std::recursive_mutex> l(lock);
+      *onodes += onode_lru.size();
+      *extents += num_extents;
+      *blobs += num_blobs;
+      *buffers += buffer_hot.size() + buffer_warm_in.size();
+      *bytes += buffer_bytes;
+    }
+
+#ifdef DEBUG_CACHE
+    void _audit(const char *s) override;
+#endif
+  };
+
+  struct OnodeSpace {
+  private:
+    Cache *cache;
+
+    /// forward lookups
+    mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
+
+    friend class Collection; // for split_cache()
+
+  public:
+    OnodeSpace(Cache *c) : cache(c) {}
+    ~OnodeSpace() {
+      clear();
+    }
+
+    OnodeRef add(const ghobject_t& oid, OnodeRef o);
+    OnodeRef lookup(const ghobject_t& o);
+    void remove(const ghobject_t& oid) {
+      onode_map.erase(oid);
+    }
+    void rename(OnodeRef& o, const ghobject_t& old_oid,
+               const ghobject_t& new_oid,
+               const mempool::bluestore_cache_other::string& new_okey);
+    void clear();
+    bool empty();
+
+    void dump(CephContext *cct, int lvl);
+
+    /// return true if f true for any item
+    bool map_any(std::function<bool(OnodeRef)> f);
+  };
+
+  struct Collection : public CollectionImpl {
+    BlueStore *store;
+    Cache *cache;       ///< our cache shard
+    coll_t cid;
+    bluestore_cnode_t cnode;
+    RWLock lock;
+
+    bool exists;
+
+    SharedBlobSet shared_blob_set;      ///< open SharedBlobs
+
+    // cache onodes on a per-collection basis to avoid lock
+    // contention.
+    OnodeSpace onode_map;
+
+    //pool options
+    pool_opts_t pool_opts;
+
+    OnodeRef get_onode(const ghobject_t& oid, bool create);
+
+    // the terminology is confusing here, sorry!
+    //
+    //  blob_t     shared_blob_t
+    //  !shared    unused                -> open
+    //  shared     !loaded               -> open + shared
+    //  shared     loaded                -> open + shared + loaded
+    //
+    // i.e.,
+    //  open = SharedBlob is instantiated
+    //  shared = blob_t shared flag is set; SharedBlob is hashed.
+    //  loaded = SharedBlob::shared_blob_t is loaded from kv store
+    void open_shared_blob(uint64_t sbid, BlobRef b);
+    void load_shared_blob(SharedBlobRef sb);
+    void make_blob_shared(uint64_t sbid, BlobRef b);
+    uint64_t make_blob_unshared(SharedBlob *sb);
+
+    BlobRef new_blob() {
+      BlobRef b = new Blob();
+      b->shared_blob = new SharedBlob(this);
+      return b;
+    }
+
+    const coll_t &get_cid() override {
+      return cid;
+    }
+
+    bool contains(const ghobject_t& oid) {
+      if (cid.is_meta())
+       return oid.hobj.pool == -1;
+      spg_t spgid;
+      if (cid.is_pg(&spgid))
+       return
+         spgid.pgid.contains(cnode.bits, oid) &&
+         oid.shard_id == spgid.shard;
+      return false;
+    }
+
+    void split_cache(Collection *dest);
+
+    Collection(BlueStore *ns, Cache *ca, coll_t c);
+  };
+
+  class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
+    CollectionRef c;
+    OnodeRef o;
+    KeyValueDB::Iterator it;
+    string head, tail;
+  public:
+    OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
+    int seek_to_first() override;
+    int upper_bound(const string &after) override;
+    int lower_bound(const string &to) override;
+    bool valid() override;
+    int next(bool validate=true) override;
+    string key() override;
+    bufferlist value() override;
+    int status() override {
+      return 0;
+    }
+  };
+
+  class OpSequencer;
+  typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+
+  struct volatile_statfs{
+    enum {
+      STATFS_ALLOCATED = 0,
+      STATFS_STORED,
+      STATFS_COMPRESSED_ORIGINAL,
+      STATFS_COMPRESSED,
+      STATFS_COMPRESSED_ALLOCATED,
+      STATFS_LAST
+    };
+    int64_t values[STATFS_LAST];
+    volatile_statfs() {
+      memset(this, 0, sizeof(volatile_statfs));
+    }
+    void reset() {
+      *this = volatile_statfs();
+    }
+    volatile_statfs& operator+=(const volatile_statfs& other) {
+      for (size_t i = 0; i < STATFS_LAST; ++i) {
+       values[i] += other.values[i];
+      }
+      return *this;
+    }
+    int64_t& allocated() {
+      return values[STATFS_ALLOCATED];
+    }
+    int64_t& stored() {
+      return values[STATFS_STORED];
+    }
+    int64_t& compressed_original() {
+      return values[STATFS_COMPRESSED_ORIGINAL];
+    }
+    int64_t& compressed() {
+      return values[STATFS_COMPRESSED];
+    }
+    int64_t& compressed_allocated() {
+      return values[STATFS_COMPRESSED_ALLOCATED];
+    }
+    bool is_empty() {
+      return values[STATFS_ALLOCATED] == 0 &&
+       values[STATFS_STORED] == 0 &&
+       values[STATFS_COMPRESSED] == 0 &&
+       values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
+       values[STATFS_COMPRESSED_ALLOCATED] == 0;
+    }
+    void decode(bufferlist::iterator& it) {
+      for (size_t i = 0; i < STATFS_LAST; i++) {
+       ::decode(values[i], it);
+      }
+    }
+
+    void encode(bufferlist& bl) {
+      for (size_t i = 0; i < STATFS_LAST; i++) {
+       ::encode(values[i], bl);
+      }
+    }
+  };
+
+  struct TransContext : public AioContext {
+    MEMPOOL_CLASS_HELPERS();
+
+    typedef enum {
+      STATE_PREPARE,
+      STATE_AIO_WAIT,
+      STATE_IO_DONE,
+      STATE_KV_QUEUED,     // queued for kv_sync_thread submission
+      STATE_KV_SUBMITTED,  // submitted to kv; not yet synced
+      STATE_KV_DONE,
+      STATE_DEFERRED_QUEUED,    // in deferred_queue (pending or running)
+      STATE_DEFERRED_CLEANUP,   // remove deferred kv record
+      STATE_DEFERRED_DONE,
+      STATE_FINISHING,
+      STATE_DONE,
+    } state_t;
+
+    state_t state = STATE_PREPARE;
+
+    const char *get_state_name() {
+      switch (state) {
+      case STATE_PREPARE: return "prepare";
+      case STATE_AIO_WAIT: return "aio_wait";
+      case STATE_IO_DONE: return "io_done";
+      case STATE_KV_QUEUED: return "kv_queued";
+      case STATE_KV_SUBMITTED: return "kv_submitted";
+      case STATE_KV_DONE: return "kv_done";
+      case STATE_DEFERRED_QUEUED: return "deferred_queued";
+      case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
+      case STATE_DEFERRED_DONE: return "deferred_done";
+      case STATE_FINISHING: return "finishing";
+      case STATE_DONE: return "done";
+      }
+      return "???";
+    }
+
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+    const char *get_state_latency_name(int state) {
+      switch (state) {
+      case l_bluestore_state_prepare_lat: return "prepare";
+      case l_bluestore_state_aio_wait_lat: return "aio_wait";
+      case l_bluestore_state_io_done_lat: return "io_done";
+      case l_bluestore_state_kv_queued_lat: return "kv_queued";
+      case l_bluestore_state_kv_committing_lat: return "kv_committing";
+      case l_bluestore_state_kv_done_lat: return "kv_done";
+      case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
+      case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
+      case l_bluestore_state_finishing_lat: return "finishing";
+      case l_bluestore_state_done_lat: return "done";
+      }
+      return "???";
+    }
+#endif
+
+    void log_state_latency(PerfCounters *logger, int state) {
+      utime_t lat, now = ceph_clock_now();
+      lat = now - last_stamp;
+      logger->tinc(state, lat);
+#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+      if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
+        double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
+        OID_ELAPSED("", usecs, get_state_latency_name(state));
+      }
+#endif
+      last_stamp = now;
+    }
+
+    OpSequencerRef osr;
+    boost::intrusive::list_member_hook<> sequencer_item;
+
+    uint64_t bytes = 0, cost = 0;
+
+    set<OnodeRef> onodes;     ///< these need to be updated/written
+    set<OnodeRef> modified_objects;  ///< objects we modified (and need a ref)
+    set<SharedBlobRef> shared_blobs;  ///< these need to be updated/written
+    set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
+
+    KeyValueDB::Transaction t; ///< then we will commit this
+    Context *oncommit = nullptr;         ///< signal on commit
+    Context *onreadable = nullptr;       ///< signal on readable
+    Context *onreadable_sync = nullptr;  ///< signal on readable
+    list<Context*> oncommits;  ///< more commit completions
+    list<CollectionRef> removed_collections; ///< colls we removed
+
+    boost::intrusive::list_member_hook<> deferred_queue_item;
+    bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
+
+    interval_set<uint64_t> allocated, released;
+    volatile_statfs statfs_delta;
+
+    IOContext ioc;
+    bool had_ios = false;  ///< true if we submitted IOs before our kv txn
+
+    uint64_t seq = 0;
+    utime_t start;
+    utime_t last_stamp;
+
+    uint64_t last_nid = 0;     ///< if non-zero, highest new nid we allocated
+    uint64_t last_blobid = 0;  ///< if non-zero, highest new blobid we allocated
+
+    explicit TransContext(CephContext* cct, OpSequencer *o)
+      : osr(o),
+       ioc(cct, this),
+       start(ceph_clock_now()) {
+      last_stamp = start;
+    }
+    ~TransContext() {
+      delete deferred_txn;
+    }
+
+    void write_onode(OnodeRef &o) {
+      onodes.insert(o);
+    }
+    void write_shared_blob(SharedBlobRef &sb) {
+      shared_blobs.insert(sb);
+    }
+    void unshare_blob(SharedBlob *sb) {
+      shared_blobs.erase(sb);
+    }
+
+    /// note we logically modified object (when onode itself is unmodified)
+    void note_modified_object(OnodeRef &o) {
+      // onode itself isn't written, though
+      modified_objects.insert(o);
+    }
+    void removed(OnodeRef& o) {
+      onodes.erase(o);
+      modified_objects.erase(o);
+    }
+
+    void aio_finish(BlueStore *store) override {
+      store->txc_aio_finish(this);
+    }
+  };
+
+  typedef boost::intrusive::list<
+    TransContext,
+    boost::intrusive::member_hook<
+      TransContext,
+      boost::intrusive::list_member_hook<>,
+      &TransContext::deferred_queue_item> > deferred_queue_t;
+
+  struct DeferredBatch : public AioContext {
+    OpSequencer *osr;
+    struct deferred_io {
+      bufferlist bl;    ///< data
+      uint64_t seq;     ///< deferred transaction seq
+    };
+    map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
+    deferred_queue_t txcs;           ///< txcs in this batch
+    IOContext ioc;                   ///< our aios
+    /// bytes of pending io for each deferred seq (may be 0)
+    map<uint64_t,int> seq_bytes;
+
+    void _discard(CephContext *cct, uint64_t offset, uint64_t length);
+    void _audit(CephContext *cct);
+
+    DeferredBatch(CephContext *cct, OpSequencer *osr)
+      : osr(osr), ioc(cct, this) {}
+
+    /// prepare a write
+    void prepare_write(CephContext *cct,
+                      uint64_t seq, uint64_t offset, uint64_t length,
+                      bufferlist::const_iterator& p);
+
+    void aio_finish(BlueStore *store) override {
+      store->_deferred_aio_finish(osr);
+    }
+  };
+
+  class OpSequencer : public Sequencer_impl {
+  public:
+    std::mutex qlock;
+    std::condition_variable qcond;
+    typedef boost::intrusive::list<
+      TransContext,
+      boost::intrusive::member_hook<
+        TransContext,
+       boost::intrusive::list_member_hook<>,
+       &TransContext::sequencer_item> > q_list_t;
+    q_list_t q;  ///< transactions
+
+    boost::intrusive::list_member_hook<> deferred_osr_queue_item;
+
+    DeferredBatch *deferred_running = nullptr;
+    DeferredBatch *deferred_pending = nullptr;
+
+    Sequencer *parent;
+    BlueStore *store;
+
+    uint64_t last_seq = 0;
+
+    std::atomic_int txc_with_unstable_io = {0};  ///< num txcs with unstable io
+
+    std::atomic_int kv_committing_serially = {0};
+
+    std::atomic_int kv_submitted_waiters = {0};
+
+    std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
+    std::atomic_bool zombie = {false};    ///< owning Sequencer has gone away
+
+    OpSequencer(CephContext* cct, BlueStore *store)
+      : Sequencer_impl(cct),
+       parent(NULL), store(store) {
+      store->register_osr(this);
+    }
+    ~OpSequencer() override {
+      assert(q.empty());
+      _unregister();
+    }
+
+    void discard() override {
+      // Note that we may have txc's in flight when the parent Sequencer
+      // goes away.  Reflect this with zombie==registered==true and let
+      // _osr_drain_all clean up later.
+      assert(!zombie);
+      zombie = true;
+      parent = nullptr;
+      bool empty;
+      {
+       std::lock_guard<std::mutex> l(qlock);
+       empty = q.empty();
+      }
+      if (empty) {
+       _unregister();
+      }
+    }
+
+    void _unregister() {
+      if (registered) {
+       store->unregister_osr(this);
+       registered = false;
+      }
+    }
+
+    void queue_new(TransContext *txc) {
+      std::lock_guard<std::mutex> l(qlock);
+      txc->seq = ++last_seq;
+      q.push_back(*txc);
+    }
+
+    void drain() {
+      std::unique_lock<std::mutex> l(qlock);
+      while (!q.empty())
+       qcond.wait(l);
+    }
+
+    void drain_preceding(TransContext *txc) {
+      std::unique_lock<std::mutex> l(qlock);
+      while (!q.empty() && &q.front() != txc)
+       qcond.wait(l);
+    }
+
+    bool _is_all_kv_submitted() {
+      // caller must hold qlock
+      if (q.empty()) {
+       return true;
+      }
+      TransContext *txc = &q.back();
+      if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
+       return true;
+      }
+      return false;
+    }
+
+    void flush() override {
+      std::unique_lock<std::mutex> l(qlock);
+      while (true) {
+       // set flag before the check because the condition
+       // may become true outside qlock, and we need to make
+       // sure those threads see waiters and signal qcond.
+       ++kv_submitted_waiters;
+       if (_is_all_kv_submitted()) {
+         return;
+       }
+       qcond.wait(l);
+       --kv_submitted_waiters;
+      }
+    }
+
+    bool flush_commit(Context *c) override {
+      std::lock_guard<std::mutex> l(qlock);
+      if (q.empty()) {
+       return true;
+      }
+      TransContext *txc = &q.back();
+      if (txc->state >= TransContext::STATE_KV_DONE) {
+       return true;
+      }
+      txc->oncommits.push_back(c);
+      return false;
+    }
+  };
+
+  typedef boost::intrusive::list<
+    OpSequencer,
+    boost::intrusive::member_hook<
+      OpSequencer,
+      boost::intrusive::list_member_hook<>,
+      &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
+
+  struct KVSyncThread : public Thread {
+    BlueStore *store;
+    explicit KVSyncThread(BlueStore *s) : store(s) {}
+    void *entry() override {
+      store->_kv_sync_thread();
+      return NULL;
+    }
+  };
+  struct KVFinalizeThread : public Thread {
+    BlueStore *store;
+    explicit KVFinalizeThread(BlueStore *s) : store(s) {}
+    void *entry() {
+      store->_kv_finalize_thread();
+      return NULL;
+    }
+  };
+
+  struct DBHistogram {
+    struct value_dist {
+      uint64_t count;
+      uint32_t max_len;
+    };
+
+    struct key_dist {
+      uint64_t count;
+      uint32_t max_len;
+      map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
+    };
+
+    map<string, map<int, struct key_dist> > key_hist;
+    map<int, uint64_t> value_hist;
+    int get_key_slab(size_t sz);
+    string get_key_slab_to_range(int slab);
+    int get_value_slab(size_t sz);
+    string get_value_slab_to_range(int slab);
+    void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
+                         const string &prefix, size_t key_size, size_t value_size);
+    void dump(Formatter *f);
+  };
+
+  // --------------------------------------------------------
+  // members
+private:
+  BlueFS *bluefs = nullptr;
+  unsigned bluefs_shared_bdev = 0;  ///< which bluefs bdev we are sharing
+  bool bluefs_single_shared_device = true;
+  utime_t bluefs_last_balance;
+
+  KeyValueDB *db = nullptr;
+  BlockDevice *bdev = nullptr;
+  std::string freelist_type;
+  FreelistManager *fm = nullptr;
+  Allocator *alloc = nullptr;
+  uuid_d fsid;
+  int path_fd = -1;  ///< open handle to $path
+  int fsid_fd = -1;  ///< open handle (locked) to $path/fsid
+  bool mounted = false;
+
+  RWLock coll_lock = {"BlueStore::coll_lock"};  ///< rwlock to protect coll_map
+  mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
+
+  vector<Cache*> cache_shards;
+
+  std::mutex osr_lock;              ///< protect osd_set
+  std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
+
+  std::atomic<uint64_t> nid_last = {0};
+  std::atomic<uint64_t> nid_max = {0};
+  std::atomic<uint64_t> blobid_last = {0};
+  std::atomic<uint64_t> blobid_max = {0};
+
+  Throttle throttle_bytes;          ///< submit to commit
+  Throttle throttle_deferred_bytes;  ///< submit to deferred complete
+
+  interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
+  interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
+
+  std::mutex deferred_lock;
+  std::atomic<uint64_t> deferred_seq = {0};
+  deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
+  int deferred_queue_size = 0;         ///< num txc's queued across all osrs
+  atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
+  Finisher deferred_finisher;
+
+  int m_finisher_num = 1;
+  vector<Finisher*> finishers;
+
+  KVSyncThread kv_sync_thread;
+  std::mutex kv_lock;
+  std::condition_variable kv_cond;
+  bool _kv_only = false;
+  bool kv_sync_started = false;
+  bool kv_stop = false;
+  bool kv_finalize_started = false;
+  bool kv_finalize_stop = false;
+  deque<TransContext*> kv_queue;             ///< ready, already submitted
+  deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
+  deque<TransContext*> kv_committing;        ///< currently syncing
+  deque<DeferredBatch*> deferred_done_queue;   ///< deferred ios done
+  deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
+
+  KVFinalizeThread kv_finalize_thread;
+  std::mutex kv_finalize_lock;
+  std::condition_variable kv_finalize_cond;
+  deque<TransContext*> kv_committing_to_finalize;   ///< pending finalization
+  deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
+
+  PerfCounters *logger = nullptr;
+
+  std::mutex reap_lock;
+  list<CollectionRef> removed_collections;
+
+  RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
+  set<ghobject_t> debug_data_error_objects;
+  set<ghobject_t> debug_mdata_error_objects;
+
+  std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
+
+  uint64_t block_size = 0;     ///< block size of block device (power of 2)
+  uint64_t block_mask = 0;     ///< mask to get just the block offset
+  size_t block_size_order = 0; ///< bits to shift to get block size
+
+  uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
+  ///< bits for min_alloc_size
+  uint8_t min_alloc_size_order = 0;
+  static_assert(std::numeric_limits<uint8_t>::max() >
+               std::numeric_limits<decltype(min_alloc_size)>::digits,
+               "not enough bits for min_alloc_size");
+
+  ///< maximum allocation unit (power of 2)
+  std::atomic<uint64_t> max_alloc_size = {0};
+
+  ///< number threshold for forced deferred writes
+  std::atomic<int> deferred_batch_ops = {0};
+
+  ///< size threshold for forced deferred writes
+  std::atomic<uint64_t> prefer_deferred_size = {0};
+
+  ///< approx cost per io, in bytes
+  std::atomic<uint64_t> throttle_cost_per_io = {0};
+
+  std::atomic<Compressor::CompressionMode> comp_mode =
+    {Compressor::COMP_NONE}; ///< compression mode
+  CompressorRef compressor;
+  std::atomic<uint64_t> comp_min_blob_size = {0};
+  std::atomic<uint64_t> comp_max_blob_size = {0};
+
+  std::atomic<uint64_t> max_blob_size = {0};  ///< maximum blob size
+
+  uint64_t kv_ios = 0;
+  uint64_t kv_throttle_costs = 0;
+
+  // cache trim control
+  uint64_t cache_size = 0;      ///< total cache size
+  float cache_meta_ratio = 0;   ///< cache ratio dedicated to metadata
+  float cache_kv_ratio = 0;     ///< cache ratio dedicated to kv (e.g., rocksdb)
+  float cache_data_ratio = 0;   ///< cache ratio dedicated to object data
+
+  std::mutex vstatfs_lock;
+  volatile_statfs vstatfs;
+
+  struct MempoolThread : public Thread {
+    BlueStore *store;
+    Cond cond;
+    Mutex lock;
+    bool stop = false;
+  public:
+    explicit MempoolThread(BlueStore *s)
+      : store(s),
+       lock("BlueStore::MempoolThread::lock") {}
+    void *entry() override;
+    void init() {
+      assert(stop == false);
+      create("bstore_mempool");
+    }
+    void shutdown() {
+      lock.Lock();
+      stop = true;
+      cond.Signal();
+      lock.Unlock();
+      join();
+    }
+  } mempool_thread;
+
+  // --------------------------------------------------------
+  // private methods
+
+  void _init_logger();
+  void _shutdown_logger();
+  int _reload_logger();
+
+  int _open_path();
+  void _close_path();
+  int _open_fsid(bool create);
+  int _lock_fsid();
+  int _read_fsid(uuid_d *f);
+  int _write_fsid();
+  void _close_fsid();
+  void _set_alloc_sizes();
+  void _set_blob_size();
+
+  int _open_bdev(bool create);
+  void _close_bdev();
+  int _open_db(bool create);
+  void _close_db();
+  int _open_fm(bool create);
+  void _close_fm();
+  int _open_alloc();
+  void _close_alloc();
+  int _open_collections(int *errors=0);
+  void _close_collections();
+
+  int _setup_block_symlink_or_file(string name, string path, uint64_t size,
+                                  bool create);
+
+public:
+  static int _write_bdev_label(CephContext* cct,
+                              string path, bluestore_bdev_label_t label);
+  static int _read_bdev_label(CephContext* cct, string path,
+                             bluestore_bdev_label_t *label);
+private:
+  int _check_or_set_bdev_label(string path, uint64_t size, string desc,
+                              bool create);
+
+  int _open_super_meta();
+
+  void _open_statfs();
+
+  int _reconcile_bluefs_freespace();
+  int _balance_bluefs_freespace(PExtentVector *extents);
+  void _commit_bluefs_freespace(const PExtentVector& extents);
+
+  CollectionRef _get_collection(const coll_t& cid);
+  void _queue_reap_collection(CollectionRef& c);
+  void _reap_collections();
+  void _update_cache_logger();
+
+  void _assign_nid(TransContext *txc, OnodeRef o);
+  uint64_t _assign_blobid(TransContext *txc);
+
+  void _dump_onode(OnodeRef o, int log_level=30);
+  void _dump_extent_map(ExtentMap& em, int log_level=30);
+  void _dump_transaction(Transaction *t, int log_level = 30);
+
+  TransContext *_txc_create(OpSequencer *osr);
+  void _txc_update_store_statfs(TransContext *txc);
+  void _txc_add_transaction(TransContext *txc, Transaction *t);
+  void _txc_calc_cost(TransContext *txc);
+  void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
+  void _txc_state_proc(TransContext *txc);
+  void _txc_aio_submit(TransContext *txc);
+public:
+  void txc_aio_finish(void *p) {
+    _txc_state_proc(static_cast<TransContext*>(p));
+  }
+private:
+  void _txc_finish_io(TransContext *txc);
+  void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
+  void _txc_applied_kv(TransContext *txc);
+  void _txc_committed_kv(TransContext *txc);
+  void _txc_finish(TransContext *txc);
+  void _txc_release_alloc(TransContext *txc);
+
+  void _osr_drain_preceding(TransContext *txc);
+  void _osr_drain_all();
+  void _osr_unregister_all();
+
+  void _kv_start();
+  void _kv_stop();
+  void _kv_sync_thread();
+  void _kv_finalize_thread();
+
+  bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
+  void _deferred_queue(TransContext *txc);
+public:
+  void deferred_try_submit();
+private:
+  void _deferred_submit_unlock(OpSequencer *osr);
+  void _deferred_aio_finish(OpSequencer *osr);
+  int _deferred_replay();
+
+public:
+  using mempool_dynamic_bitset =
+    boost::dynamic_bitset<uint64_t,
+                         mempool::bluestore_fsck::pool_allocator<uint64_t>>;
+
+private:
+  int _fsck_check_extents(
+    const ghobject_t& oid,
+    const PExtentVector& extents,
+    bool compressed,
+    mempool_dynamic_bitset &used_blocks,
+    store_statfs_t& expected_statfs);
+
+  void _buffer_cache_write(
+    TransContext *txc,
+    BlobRef b,
+    uint64_t offset,
+    bufferlist& bl,
+    unsigned flags) {
+    b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
+                            flags);
+    txc->shared_blobs_written.insert(b->shared_blob);
+  }
+
+  int _collection_list(
+    Collection *c, const ghobject_t& start, const ghobject_t& end,
+    int max, vector<ghobject_t> *ls, ghobject_t *next);
+
+  template <typename T, typename F>
+  T select_option(const std::string& opt_name, T val1, F f) {
+    //NB: opt_name reserved for future use
+    boost::optional<T> val2 = f();
+    if (val2) {
+      return *val2;
+    }
+    return val1;
+  }
+
+  void _apply_padding(uint64_t head_pad,
+                     uint64_t tail_pad,
+                     bufferlist& padded);
+
+  // -- ondisk version ---
+public:
+  const int32_t latest_ondisk_format = 2;        ///< our version
+  const int32_t min_readable_ondisk_format = 1;  ///< what we can read
+  const int32_t min_compat_ondisk_format = 2;    ///< who can read us
+
+private:
+  int32_t ondisk_format = 0;  ///< value detected on mount
+
+  int _upgrade_super();  ///< upgrade (called during open_super)
+  void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
+
+  // --- public interface ---
+public:
+  BlueStore(CephContext *cct, const string& path);
+  BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
+  ~BlueStore() override;
+
+  string get_type() override {
+    return "bluestore";
+  }
+
+  bool needs_journal() override { return false; };
+  bool wants_journal() override { return false; };
+  bool allows_journal() override { return false; };
+
+  bool is_rotational() override;
+  bool is_journal_rotational() override;
+
+  string get_default_device_class() override {
+    string device_class;
+    map<string, string> metadata;
+    collect_metadata(&metadata);
+    auto it = metadata.find("bluestore_bdev_type");
+    if (it != metadata.end()) {
+      device_class = it->second;
+    }
+    return device_class;
+  }
+
+  static int get_block_device_fsid(CephContext* cct, const string& path,
+                                  uuid_d *fsid);
+
+  bool test_mount_in_use() override;
+
+private:
+  int _mount(bool kv_only);
+public:
+  int mount() override {
+    return _mount(false);
+  }
+  int umount() override;
+
+  int start_kv_only(KeyValueDB **pdb) {
+    int r = _mount(true);
+    if (r < 0)
+      return r;
+    *pdb = db;
+    return 0;
+  }
+
+  int write_meta(const std::string& key, const std::string& value) override;
+  int read_meta(const std::string& key, std::string *value) override;
+
+
+  int fsck(bool deep) override {
+    return _fsck(deep, false);
+  }
+  int repair(bool deep) override {
+    return _fsck(deep, true);
+  }
+  int _fsck(bool deep, bool repair);
+
+  void set_cache_shards(unsigned num) override;
+
+  int validate_hobject_key(const hobject_t &obj) const override {
+    return 0;
+  }
+  unsigned get_max_attr_name_length() override {
+    return 256;  // arbitrary; there is no real limit internally
+  }
+
+  int mkfs() override;
+  int mkjournal() override {
+    return 0;
+  }
+
+  void get_db_statistics(Formatter *f) override;
+  void generate_db_histogram(Formatter *f) override;
+  void _flush_cache();
+  void flush_cache() override;
+  void dump_perf_counters(Formatter *f) override {
+    f->open_object_section("perf_counters");
+    logger->dump_formatted(f, false);
+    f->close_section();
+  }
+
+  void register_osr(OpSequencer *osr) {
+    std::lock_guard<std::mutex> l(osr_lock);
+    osr_set.insert(osr);
+  }
+  void unregister_osr(OpSequencer *osr) {
+    std::lock_guard<std::mutex> l(osr_lock);
+    osr_set.erase(osr);
+  }
+
+public:
+  int statfs(struct store_statfs_t *buf) override;
+
+  void collect_metadata(map<string,string> *pm) override;
+
+  bool exists(const coll_t& cid, const ghobject_t& oid) override;
+  bool exists(CollectionHandle &c, const ghobject_t& oid) override;
+  int set_collection_opts(
+    const coll_t& cid,
+    const pool_opts_t& opts) override;
+  int stat(
+    const coll_t& cid,
+    const ghobject_t& oid,
+    struct stat *st,
+    bool allow_eio = false) override;
+  int stat(
+    CollectionHandle &c,
+    const ghobject_t& oid,
+    struct stat *st,
+    bool allow_eio = false) override;
+  int read(
+    const coll_t& cid,
+    const ghobject_t& oid,
+    uint64_t offset,
+    size_t len,
+    bufferlist& bl,
+    uint32_t op_flags = 0) override;
+  int read(
+    CollectionHandle &c,
+    const ghobject_t& oid,
+    uint64_t offset,
+    size_t len,
+    bufferlist& bl,
+    uint32_t op_flags = 0) override;
+  int _do_read(
+    Collection *c,
+    OnodeRef o,
+    uint64_t offset,
+    size_t len,
+    bufferlist& bl,
+    uint32_t op_flags = 0);
+
+private:
+  int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
+            uint64_t offset, size_t len, interval_set<uint64_t>& destset);
+public:
+  int fiemap(const coll_t& cid, const ghobject_t& oid,
+            uint64_t offset, size_t len, bufferlist& bl) override;
+  int fiemap(CollectionHandle &c, const ghobject_t& oid,
+            uint64_t offset, size_t len, bufferlist& bl) override;
+  int fiemap(const coll_t& cid, const ghobject_t& oid,
+            uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
+  int fiemap(CollectionHandle &c, const ghobject_t& oid,
+            uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
+
+
+  int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
+             bufferptr& value) override;
+  int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
+             bufferptr& value) override;
+
+  int getattrs(const coll_t& cid, const ghobject_t& oid,
+              map<string,bufferptr>& aset) override;
+  int getattrs(CollectionHandle &c, const ghobject_t& oid,
+              map<string,bufferptr>& aset) override;
+
+  int list_collections(vector<coll_t>& ls) override;
+
+  CollectionHandle open_collection(const coll_t &c) override;
+
+  bool collection_exists(const coll_t& c) override;
+  int collection_empty(const coll_t& c, bool *empty) override;
+  int collection_bits(const coll_t& c) override;
+
+  int collection_list(const coll_t& cid,
+                     const ghobject_t& start,
+                     const ghobject_t& end,
+                     int max,
+                     vector<ghobject_t> *ls, ghobject_t *next) override;
+  int collection_list(CollectionHandle &c,
+                     const ghobject_t& start,
+                     const ghobject_t& end,
+                     int max,
+                     vector<ghobject_t> *ls, ghobject_t *next) override;
+
+  int omap_get(
+    const coll_t& cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    map<string, bufferlist> *out /// < [out] Key to value map
+    ) override;
+  int omap_get(
+    CollectionHandle &c,     ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    map<string, bufferlist> *out /// < [out] Key to value map
+    ) override;
+
+  /// Get omap header
+  int omap_get_header(
+    const coll_t& cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    bool allow_eio = false ///< [in] don't assert on eio
+    ) override;
+  int omap_get_header(
+    CollectionHandle &c,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    bufferlist *header,      ///< [out] omap header
+    bool allow_eio = false ///< [in] don't assert on eio
+    ) override;
+
+  /// Get keys defined on oid
+  int omap_get_keys(
+    const coll_t& cid,              ///< [in] Collection containing oid
+    const ghobject_t &oid, ///< [in] Object containing omap
+    set<string> *keys      ///< [out] Keys defined on oid
+    ) override;
+  int omap_get_keys(
+    CollectionHandle &c,              ///< [in] Collection containing oid
+    const ghobject_t &oid, ///< [in] Object containing omap
+    set<string> *keys      ///< [out] Keys defined on oid
+    ) override;
+
+  /// Get key values
+  int omap_get_values(
+    const coll_t& cid,                    ///< [in] Collection containing oid
+    const ghobject_t &oid,       ///< [in] Object containing omap
+    const set<string> &keys,     ///< [in] Keys to get
+    map<string, bufferlist> *out ///< [out] Returned keys and values
+    ) override;
+  int omap_get_values(
+    CollectionHandle &c,         ///< [in] Collection containing oid
+    const ghobject_t &oid,       ///< [in] Object containing omap
+    const set<string> &keys,     ///< [in] Keys to get
+    map<string, bufferlist> *out ///< [out] Returned keys and values
+    ) override;
+
+  /// Filters keys into out which are defined on oid
+  int omap_check_keys(
+    const coll_t& cid,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    const set<string> &keys, ///< [in] Keys to check
+    set<string> *out         ///< [out] Subset of keys defined on oid
+    ) override;
+  int omap_check_keys(
+    CollectionHandle &c,                ///< [in] Collection containing oid
+    const ghobject_t &oid,   ///< [in] Object containing omap
+    const set<string> &keys, ///< [in] Keys to check
+    set<string> *out         ///< [out] Subset of keys defined on oid
+    ) override;
+
+  ObjectMap::ObjectMapIterator get_omap_iterator(
+    const coll_t& cid,              ///< [in] collection
+    const ghobject_t &oid  ///< [in] object
+    ) override;
+  ObjectMap::ObjectMapIterator get_omap_iterator(
+    CollectionHandle &c,   ///< [in] collection
+    const ghobject_t &oid  ///< [in] object
+    ) override;
+
+  void set_fsid(uuid_d u) override {
+    fsid = u;
+  }
+  uuid_d get_fsid() override {
+    return fsid;
+  }
+
+  uint64_t estimate_objects_overhead(uint64_t num_objects) override {
+    return num_objects * 300; //assuming per-object overhead is 300 bytes
+  }
+
+  struct BSPerfTracker {
+    PerfCounters::avg_tracker<uint64_t> os_commit_latency;
+    PerfCounters::avg_tracker<uint64_t> os_apply_latency;
+
+    objectstore_perf_stat_t get_cur_stats() const {
+      objectstore_perf_stat_t ret;
+      ret.os_commit_latency = os_commit_latency.current_avg();
+      ret.os_apply_latency = os_apply_latency.current_avg();
+      return ret;
+    }
+
+    void update_from_perfcounters(PerfCounters &logger);
+  } perf_tracker;
+
+  objectstore_perf_stat_t get_cur_stats() override {
+    perf_tracker.update_from_perfcounters(*logger);
+    return perf_tracker.get_cur_stats();
+  }
+  const PerfCounters* get_perf_counters() const override {
+    return logger;
+  }
+
+  int queue_transactions(
+    Sequencer *osr,
+    vector<Transaction>& tls,
+    TrackedOpRef op = TrackedOpRef(),
+    ThreadPool::TPHandle *handle = NULL) override;
+
+  // error injection
+  void inject_data_error(const ghobject_t& o) override {
+    RWLock::WLocker l(debug_read_error_lock);
+    debug_data_error_objects.insert(o);
+  }
+  void inject_mdata_error(const ghobject_t& o) override {
+    RWLock::WLocker l(debug_read_error_lock);
+    debug_mdata_error_objects.insert(o);
+  }
+  void compact() override {
+    assert(db);
+    db->compact();
+  }
+  
+private:
+  bool _debug_data_eio(const ghobject_t& o) {
+    if (!cct->_conf->bluestore_debug_inject_read_err) {
+      return false;
+    }
+    RWLock::RLocker l(debug_read_error_lock);
+    return debug_data_error_objects.count(o);
+  }
+  bool _debug_mdata_eio(const ghobject_t& o) {
+    if (!cct->_conf->bluestore_debug_inject_read_err) {
+      return false;
+    }
+    RWLock::RLocker l(debug_read_error_lock);
+    return debug_mdata_error_objects.count(o);
+  }
+  void _debug_obj_on_delete(const ghobject_t& o) {
+    if (cct->_conf->bluestore_debug_inject_read_err) {
+      RWLock::WLocker l(debug_read_error_lock);
+      debug_data_error_objects.erase(o);
+      debug_mdata_error_objects.erase(o);
+    }
+  }
+
+private:
+
+  // --------------------------------------------------------
+  // read processing internal methods
+  int _verify_csum(
+    OnodeRef& o,
+    const bluestore_blob_t* blob,
+    uint64_t blob_xoffset,
+    const bufferlist& bl,
+    uint64_t logical_offset) const;
+  int _decompress(bufferlist& source, bufferlist* result);
+
+
+  // --------------------------------------------------------
+  // write ops
+
+  struct WriteContext {
+    bool buffered = false;          ///< buffered write
+    bool compress = false;          ///< compressed write
+    uint64_t target_blob_size = 0;  ///< target (max) blob size
+    unsigned csum_order = 0;        ///< target checksum chunk order
+
+    old_extent_map_t old_extents;   ///< must deref these blobs
+
+    struct write_item {
+      uint64_t logical_offset;      ///< write logical offset
+      BlobRef b;
+      uint64_t blob_length;
+      uint64_t b_off;
+      bufferlist bl;
+      uint64_t b_off0; ///< original offset in a blob prior to padding
+      uint64_t length0; ///< original data length prior to padding
+
+      bool mark_unused;
+      bool new_blob; ///< whether new blob was created
+
+      bool compressed = false;
+      bufferlist compressed_bl;
+      size_t compressed_len = 0;
+
+      write_item(
+       uint64_t logical_offs,
+        BlobRef b,
+        uint64_t blob_len,
+        uint64_t o,
+        bufferlist& bl,
+        uint64_t o0,
+        uint64_t l0,
+        bool _mark_unused,
+       bool _new_blob)
+       :
+         logical_offset(logical_offs),
+         b(b),
+         blob_length(blob_len),
+         b_off(o),
+         bl(bl),
+         b_off0(o0),
+         length0(l0),
+         mark_unused(_mark_unused),
+        new_blob(_new_blob) {}
+    };
+    vector<write_item> writes;                 ///< blobs we're writing
+
+    /// partial clone of the context
+    void fork(const WriteContext& other) {
+      buffered = other.buffered;
+      compress = other.compress;
+      target_blob_size = other.target_blob_size;
+      csum_order = other.csum_order;
+    }
+    void write(
+      uint64_t loffs,
+      BlobRef b,
+      uint64_t blob_len,
+      uint64_t o,
+      bufferlist& bl,
+      uint64_t o0,
+      uint64_t len0,
+      bool _mark_unused,
+      bool _new_blob) {
+      writes.emplace_back(loffs,
+                          b,
+                          blob_len,
+                          o,
+                          bl,
+                          o0,
+                          len0,
+                          _mark_unused,
+                          _new_blob);
+    }
+    /// Checks for writes to the same pextent within a blob
+    bool has_conflict(
+      BlobRef b,
+      uint64_t loffs,
+      uint64_t loffs_end,
+      uint64_t min_alloc_size);
+  };
+
+  void _do_write_small(
+    TransContext *txc,
+    CollectionRef &c,
+    OnodeRef o,
+    uint64_t offset, uint64_t length,
+    bufferlist::iterator& blp,
+    WriteContext *wctx);
+  void _do_write_big(
+    TransContext *txc,
+    CollectionRef &c,
+    OnodeRef o,
+    uint64_t offset, uint64_t length,
+    bufferlist::iterator& blp,
+    WriteContext *wctx);
+  int _do_alloc_write(
+    TransContext *txc,
+    CollectionRef c,
+    OnodeRef o,
+    WriteContext *wctx);
+  void _wctx_finish(
+    TransContext *txc,
+    CollectionRef& c,
+    OnodeRef o,
+    WriteContext *wctx,
+    set<SharedBlob*> *maybe_unshared_blobs=0);
+
+  int _do_transaction(Transaction *t,
+                     TransContext *txc,
+                     ThreadPool::TPHandle *handle);
+
+  int _write(TransContext *txc,
+            CollectionRef& c,
+            OnodeRef& o,
+            uint64_t offset, size_t len,
+            bufferlist& bl,
+            uint32_t fadvise_flags);
+  void _pad_zeros(bufferlist *bl, uint64_t *offset,
+                 uint64_t chunk_size);
+
+  void _choose_write_options(CollectionRef& c,
+                             OnodeRef o,
+                             uint32_t fadvise_flags,
+                             WriteContext *wctx);
+
+  int _do_gc(TransContext *txc,
+             CollectionRef& c,
+             OnodeRef o,
+             const GarbageCollector& gc,
+             const WriteContext& wctx,
+             uint64_t *dirty_start,
+             uint64_t *dirty_end);
+
+  int _do_write(TransContext *txc,
+               CollectionRef &c,
+               OnodeRef o,
+               uint64_t offset, uint64_t length,
+               bufferlist& bl,
+               uint32_t fadvise_flags);
+  void _do_write_data(TransContext *txc,
+                      CollectionRef& c,
+                      OnodeRef o,
+                      uint64_t offset,
+                      uint64_t length,
+                      bufferlist& bl,
+                      WriteContext *wctx);
+
+  int _touch(TransContext *txc,
+            CollectionRef& c,
+            OnodeRef& o);
+  int _do_zero(TransContext *txc,
+              CollectionRef& c,
+              OnodeRef& o,
+              uint64_t offset, size_t len);
+  int _zero(TransContext *txc,
+           CollectionRef& c,
+           OnodeRef& o,
+           uint64_t offset, size_t len);
+  void _do_truncate(TransContext *txc,
+                  CollectionRef& c,
+                  OnodeRef o,
+                  uint64_t offset,
+                  set<SharedBlob*> *maybe_unshared_blobs=0);
+  int _truncate(TransContext *txc,
+               CollectionRef& c,
+               OnodeRef& o,
+               uint64_t offset);
+  int _remove(TransContext *txc,
+             CollectionRef& c,
+             OnodeRef& o);
+  int _do_remove(TransContext *txc,
+                CollectionRef& c,
+                OnodeRef o);
+  int _setattr(TransContext *txc,
+              CollectionRef& c,
+              OnodeRef& o,
+              const string& name,
+              bufferptr& val);
+  int _setattrs(TransContext *txc,
+               CollectionRef& c,
+               OnodeRef& o,
+               const map<string,bufferptr>& aset);
+  int _rmattr(TransContext *txc,
+             CollectionRef& c,
+             OnodeRef& o,
+             const string& name);
+  int _rmattrs(TransContext *txc,
+              CollectionRef& c,
+              OnodeRef& o);
+  void _do_omap_clear(TransContext *txc, uint64_t id);
+  int _omap_clear(TransContext *txc,
+                 CollectionRef& c,
+                 OnodeRef& o);
+  int _omap_setkeys(TransContext *txc,
+                   CollectionRef& c,
+                   OnodeRef& o,
+                   bufferlist& bl);
+  int _omap_setheader(TransContext *txc,
+                     CollectionRef& c,
+                     OnodeRef& o,
+                     bufferlist& header);
+  int _omap_rmkeys(TransContext *txc,
+                  CollectionRef& c,
+                  OnodeRef& o,
+                  bufferlist& bl);
+  int _omap_rmkey_range(TransContext *txc,
+                       CollectionRef& c,
+                       OnodeRef& o,
+                       const string& first, const string& last);
+  int _set_alloc_hint(
+    TransContext *txc,
+    CollectionRef& c,
+    OnodeRef& o,
+    uint64_t expected_object_size,
+    uint64_t expected_write_size,
+    uint32_t flags);
+  int _do_clone_range(TransContext *txc,
+                     CollectionRef& c,
+                     OnodeRef& oldo,
+                     OnodeRef& newo,
+                     uint64_t srcoff, uint64_t length, uint64_t dstoff);
+  int _clone(TransContext *txc,
+            CollectionRef& c,
+            OnodeRef& oldo,
+            OnodeRef& newo);
+  int _clone_range(TransContext *txc,
+                  CollectionRef& c,
+                  OnodeRef& oldo,
+                  OnodeRef& newo,
+                  uint64_t srcoff, uint64_t length, uint64_t dstoff);
+  int _rename(TransContext *txc,
+             CollectionRef& c,
+             OnodeRef& oldo,
+             OnodeRef& newo,
+             const ghobject_t& new_oid);
+  int _create_collection(TransContext *txc, const coll_t &cid,
+                        unsigned bits, CollectionRef *c);
+  int _remove_collection(TransContext *txc, const coll_t &cid,
+                         CollectionRef *c);
+  int _split_collection(TransContext *txc,
+                       CollectionRef& c,
+                       CollectionRef& d,
+                       unsigned bits, int rem);
+};
+
+inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
+  return out << *s.parent;
+}
+
+static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
+  o->get();
+}
+static inline void intrusive_ptr_release(BlueStore::Onode *o) {
+  o->put();
+}
+
+static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
+  o->get();
+}
+static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
+  o->put();
+}
+
+#endif