X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Fbluestore%2FBlueStore.h;fp=src%2Fceph%2Fsrc%2Fos%2Fbluestore%2FBlueStore.h;h=57a8688396fed015e1b700863897b61d8d420739;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/os/bluestore/BlueStore.h b/src/ceph/src/os/bluestore/BlueStore.h new file mode 100644 index 0000000..57a8688 --- /dev/null +++ b/src/ceph/src/os/bluestore/BlueStore.h @@ -0,0 +1,2723 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OSD_BLUESTORE_H +#define CEPH_OSD_BLUESTORE_H + +#include "acconfig.h" + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "include/assert.h" +#include "include/unordered_map.h" +#include "include/memory.h" +#include "include/mempool.h" +#include "common/Finisher.h" +#include "common/perf_counters.h" +#include "compressor/Compressor.h" +#include "os/ObjectStore.h" + +#include "bluestore_types.h" +#include "BlockDevice.h" +#include "common/EventTrace.h" + +class Allocator; +class FreelistManager; +class BlueFS; + +//#define DEBUG_CACHE +//#define DEBUG_DEFERRED + + + +// constants for Buffer::optimize() +#define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N + + +enum { + l_bluestore_first = 732430, + l_bluestore_kv_flush_lat, + l_bluestore_kv_commit_lat, + l_bluestore_kv_lat, + l_bluestore_state_prepare_lat, + l_bluestore_state_aio_wait_lat, + l_bluestore_state_io_done_lat, + l_bluestore_state_kv_queued_lat, + l_bluestore_state_kv_committing_lat, + l_bluestore_state_kv_done_lat, + l_bluestore_state_deferred_queued_lat, + l_bluestore_state_deferred_aio_wait_lat, + l_bluestore_state_deferred_cleanup_lat, + l_bluestore_state_finishing_lat, + l_bluestore_state_done_lat, + l_bluestore_throttle_lat, + l_bluestore_submit_lat, + l_bluestore_commit_lat, + l_bluestore_read_lat, + l_bluestore_read_onode_meta_lat, + l_bluestore_read_wait_aio_lat, + l_bluestore_compress_lat, + l_bluestore_decompress_lat, + l_bluestore_csum_lat, + l_bluestore_compress_success_count, + l_bluestore_compress_rejected_count, + l_bluestore_write_pad_bytes, + l_bluestore_deferred_write_ops, + l_bluestore_deferred_write_bytes, + l_bluestore_write_penalty_read_ops, + l_bluestore_allocated, + l_bluestore_stored, + l_bluestore_compressed, + l_bluestore_compressed_allocated, + l_bluestore_compressed_original, + l_bluestore_onodes, + l_bluestore_onode_hits, + l_bluestore_onode_misses, + l_bluestore_onode_shard_hits, + l_bluestore_onode_shard_misses, + l_bluestore_extents, + l_bluestore_blobs, + l_bluestore_buffers, + l_bluestore_buffer_bytes, + l_bluestore_buffer_hit_bytes, + l_bluestore_buffer_miss_bytes, + l_bluestore_write_big, + l_bluestore_write_big_bytes, + l_bluestore_write_big_blobs, + l_bluestore_write_small, + l_bluestore_write_small_bytes, + l_bluestore_write_small_unused, + l_bluestore_write_small_deferred, + l_bluestore_write_small_pre_read, + l_bluestore_write_small_new, + l_bluestore_txc, + l_bluestore_onode_reshard, + l_bluestore_blob_split, + l_bluestore_extent_compress, + l_bluestore_gc_merged, + l_bluestore_last +}; + +class BlueStore : public ObjectStore, + public md_config_obs_t { + // ----------------------------------------------------- + // types +public: + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const struct md_config_t *conf, + const std::set &changed) override; + + void _set_csum(); + void _set_compression(); + void _set_throttle_params(); + int _set_cache_sizes(); + + class TransContext; + + typedef map ready_regions_t; + + struct BufferSpace; + struct Collection; + typedef boost::intrusive_ptr CollectionRef; + + struct AioContext { + virtual void aio_finish(BlueStore *store) = 0; + virtual ~AioContext() {} + }; + + /// cached buffer + struct Buffer { + MEMPOOL_CLASS_HELPERS(); + + enum { + STATE_EMPTY, ///< empty buffer -- used for cache history + STATE_CLEAN, ///< clean data that is up to date + STATE_WRITING, ///< data that is being written (io not yet complete) + }; + static const char *get_state_name(int s) { + switch (s) { + case STATE_EMPTY: return "empty"; + case STATE_CLEAN: return "clean"; + case STATE_WRITING: return "writing"; + default: return "???"; + } + } + enum { + FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN) + // NOTE: fix operator<< when you define a second flag + }; + static const char *get_flag_name(int s) { + switch (s) { + case FLAG_NOCACHE: return "nocache"; + default: return "???"; + } + } + + BufferSpace *space; + uint16_t state; ///< STATE_* + uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl + uint32_t flags; ///< FLAG_* + uint64_t seq; + uint32_t offset, length; + bufferlist data; + + boost::intrusive::list_member_hook<> lru_item; + boost::intrusive::list_member_hook<> state_item; + + Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l, + unsigned f = 0) + : space(space), state(s), flags(f), seq(q), offset(o), length(l) {} + Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b, + unsigned f = 0) + : space(space), state(s), flags(f), seq(q), offset(o), + length(b.length()), data(b) {} + + bool is_empty() const { + return state == STATE_EMPTY; + } + bool is_clean() const { + return state == STATE_CLEAN; + } + bool is_writing() const { + return state == STATE_WRITING; + } + + uint32_t end() const { + return offset + length; + } + + void truncate(uint32_t newlen) { + assert(newlen < length); + if (data.length()) { + bufferlist t; + t.substr_of(data, 0, newlen); + data.claim(t); + } + length = newlen; + } + void maybe_rebuild() { + if (data.length() && + (data.get_num_buffers() > 1 || + data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) { + data.rebuild(); + } + } + + void dump(Formatter *f) const { + f->dump_string("state", get_state_name(state)); + f->dump_unsigned("seq", seq); + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); + f->dump_unsigned("data_length", data.length()); + } + }; + + struct Cache; + + /// map logical extent range (object) onto buffers + struct BufferSpace { + typedef boost::intrusive::list< + Buffer, + boost::intrusive::member_hook< + Buffer, + boost::intrusive::list_member_hook<>, + &Buffer::state_item> > state_list_t; + + mempool::bluestore_cache_other::map> + buffer_map; + + // we use a bare intrusive list here instead of std::map because + // it uses less memory and we expect this to be very small (very + // few IOs in flight to the same Blob at the same time). + state_list_t writing; ///< writing buffers, sorted by seq, ascending + + ~BufferSpace() { + assert(buffer_map.empty()); + assert(writing.empty()); + } + + void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) { + cache->_audit("_add_buffer start"); + buffer_map[b->offset].reset(b); + if (b->is_writing()) { + b->data.reassign_to_mempool(mempool::mempool_bluestore_writing); + if (writing.empty() || writing.rbegin()->seq <= b->seq) { + writing.push_back(*b); + } else { + auto it = writing.begin(); + while (it->seq < b->seq) { + ++it; + } + + assert(it->seq >= b->seq); + // note that this will insert b before it + // hence the order is maintained + writing.insert(it, *b); + } + } else { + b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data); + cache->_add_buffer(b, level, near); + } + cache->_audit("_add_buffer end"); + } + void _rm_buffer(Cache* cache, Buffer *b) { + _rm_buffer(cache, buffer_map.find(b->offset)); + } + void _rm_buffer(Cache* cache, + map>::iterator p) { + assert(p != buffer_map.end()); + cache->_audit("_rm_buffer start"); + if (p->second->is_writing()) { + writing.erase(writing.iterator_to(*p->second)); + } else { + cache->_rm_buffer(p->second.get()); + } + buffer_map.erase(p); + cache->_audit("_rm_buffer end"); + } + + map>::iterator _data_lower_bound( + uint32_t offset) { + auto i = buffer_map.lower_bound(offset); + if (i != buffer_map.begin()) { + --i; + if (i->first + i->second->length <= offset) + ++i; + } + return i; + } + + // must be called under protection of the Cache lock + void _clear(Cache* cache); + + // return value is the highest cache_private of a trimmed buffer, or 0. + int discard(Cache* cache, uint32_t offset, uint32_t length) { + std::lock_guard l(cache->lock); + return _discard(cache, offset, length); + } + int _discard(Cache* cache, uint32_t offset, uint32_t length); + + void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl, + unsigned flags) { + std::lock_guard l(cache->lock); + Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl, + flags); + b->cache_private = _discard(cache, offset, bl.length()); + _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr); + } + void finish_write(Cache* cache, uint64_t seq); + void did_read(Cache* cache, uint32_t offset, bufferlist& bl) { + std::lock_guard l(cache->lock); + Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl); + b->cache_private = _discard(cache, offset, bl.length()); + _add_buffer(cache, b, 1, nullptr); + } + + void read(Cache* cache, uint32_t offset, uint32_t length, + BlueStore::ready_regions_t& res, + interval_set& res_intervals); + + void truncate(Cache* cache, uint32_t offset) { + discard(cache, offset, (uint32_t)-1 - offset); + } + + void split(Cache* cache, size_t pos, BufferSpace &r); + + void dump(Cache* cache, Formatter *f) const { + std::lock_guard l(cache->lock); + f->open_array_section("buffers"); + for (auto& i : buffer_map) { + f->open_object_section("buffer"); + assert(i.first == i.second->offset); + i.second->dump(f); + f->close_section(); + } + f->close_section(); + } + }; + + struct SharedBlobSet; + + /// in-memory shared blob state (incl cached buffers) + struct SharedBlob { + MEMPOOL_CLASS_HELPERS(); + + std::atomic_int nref = {0}; ///< reference count + bool loaded = false; + + CollectionRef coll; + union { + uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded + bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any + }; + BufferSpace bc; ///< buffer cache + + SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) { + if (get_cache()) { + get_cache()->add_blob(); + } + } + SharedBlob(uint64_t i, Collection *_coll); + ~SharedBlob(); + + uint64_t get_sbid() const { + return loaded ? persistent->sbid : sbid_unloaded; + } + + friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); } + friend void intrusive_ptr_release(SharedBlob *b) { b->put(); } + + friend ostream& operator<<(ostream& out, const SharedBlob& sb); + + void get() { + ++nref; + } + void put(); + + /// get logical references + void get_ref(uint64_t offset, uint32_t length); + + /// put logical references, and get back any released extents + void put_ref(uint64_t offset, uint32_t length, + PExtentVector *r, set *maybe_unshared_blobs); + + friend bool operator==(const SharedBlob &l, const SharedBlob &r) { + return l.get_sbid() == r.get_sbid(); + } + inline Cache* get_cache() { + return coll ? coll->cache : nullptr; + } + inline SharedBlobSet* get_parent() { + return coll ? &(coll->shared_blob_set) : nullptr; + } + inline bool is_loaded() const { + return loaded; + } + + }; + typedef boost::intrusive_ptr SharedBlobRef; + + /// a lookup table of SharedBlobs + struct SharedBlobSet { + std::mutex lock; ///< protect lookup, insertion, removal + + // we use a bare pointer because we don't want to affect the ref + // count + mempool::bluestore_cache_other::unordered_map sb_map; + + SharedBlobRef lookup(uint64_t sbid) { + std::lock_guard l(lock); + auto p = sb_map.find(sbid); + if (p == sb_map.end()) { + return nullptr; + } + return p->second; + } + + void add(Collection* coll, SharedBlob *sb) { + std::lock_guard l(lock); + sb_map[sb->get_sbid()] = sb; + sb->coll = coll; + } + + bool try_remove(SharedBlob *sb) { + std::lock_guard l(lock); + if (sb->nref == 0) { + assert(sb->get_parent() == this); + sb_map.erase(sb->get_sbid()); + return true; + } + return false; + } + + void remove(SharedBlob *sb) { + std::lock_guard l(lock); + assert(sb->get_parent() == this); + sb_map.erase(sb->get_sbid()); + } + + bool empty() { + std::lock_guard l(lock); + return sb_map.empty(); + } + + void dump(CephContext *cct, int lvl); + }; + +//#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/ + + /// in-memory blob metadata and associated cached buffers (if any) + struct Blob { + MEMPOOL_CLASS_HELPERS(); + + std::atomic_int nref = {0}; ///< reference count + int16_t id = -1; ///< id, for spanning blobs only, >= 0 + int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only + SharedBlobRef shared_blob; ///< shared blob state (if any) + + private: + mutable bluestore_blob_t blob; ///< decoded blob metadata +#ifdef CACHE_BLOB_BL + mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty +#endif + /// refs from this shard. ephemeral if id<0, persisted if spanning. + bluestore_blob_use_tracker_t used_in_blob; + + public: + + friend void intrusive_ptr_add_ref(Blob *b) { b->get(); } + friend void intrusive_ptr_release(Blob *b) { b->put(); } + + friend ostream& operator<<(ostream& out, const Blob &b); + + const bluestore_blob_use_tracker_t& get_blob_use_tracker() const { + return used_in_blob; + } + bool is_referenced() const { + return used_in_blob.is_not_empty(); + } + uint32_t get_referenced_bytes() const { + return used_in_blob.get_referenced_bytes(); + } + + bool is_spanning() const { + return id >= 0; + } + + bool can_split() const { + std::lock_guard l(shared_blob->get_cache()->lock); + // splitting a BufferSpace writing list is too hard; don't try. + return shared_blob->bc.writing.empty() && + used_in_blob.can_split() && + get_blob().can_split(); + } + + bool can_split_at(uint32_t blob_offset) const { + return used_in_blob.can_split_at(blob_offset) && + get_blob().can_split_at(blob_offset); + } + + bool can_reuse_blob(uint32_t min_alloc_size, + uint32_t target_blob_size, + uint32_t b_offset, + uint32_t *length0); + + void dup(Blob& o) { + o.shared_blob = shared_blob; + o.blob = blob; +#ifdef CACHE_BLOB_BL + o.blob_bl = blob_bl; +#endif + } + + inline const bluestore_blob_t& get_blob() const { + return blob; + } + inline bluestore_blob_t& dirty_blob() { +#ifdef CACHE_BLOB_BL + blob_bl.clear(); +#endif + return blob; + } + + /// discard buffers for unallocated regions + void discard_unallocated(Collection *coll); + + /// get logical references + void get_ref(Collection *coll, uint32_t offset, uint32_t length); + /// put logical references, and get back any released extents + bool put_ref(Collection *coll, uint32_t offset, uint32_t length, + PExtentVector *r); + + /// split the blob + void split(Collection *coll, uint32_t blob_offset, Blob *o); + + void get() { + ++nref; + } + void put() { + if (--nref == 0) + delete this; + } + + +#ifdef CACHE_BLOB_BL + void _encode() const { + if (blob_bl.length() == 0 ) { + ::encode(blob, blob_bl); + } else { + assert(blob_bl.length()); + } + } + void bound_encode( + size_t& p, + bool include_ref_map) const { + _encode(); + p += blob_bl.length(); + if (include_ref_map) { + used_in_blob.bound_encode(p); + } + } + void encode( + bufferlist::contiguous_appender& p, + bool include_ref_map) const { + _encode(); + p.append(blob_bl); + if (include_ref_map) { + used_in_blob.encode(p); + } + } + void decode( + Collection */*coll*/, + bufferptr::iterator& p, + bool include_ref_map) { + const char *start = p.get_pos(); + denc(blob, p); + const char *end = p.get_pos(); + blob_bl.clear(); + blob_bl.append(start, end - start); + if (include_ref_map) { + used_in_blob.decode(p); + } + } +#else + void bound_encode( + size_t& p, + uint64_t struct_v, + uint64_t sbid, + bool include_ref_map) const { + denc(blob, p, struct_v); + if (blob.is_shared()) { + denc(sbid, p); + } + if (include_ref_map) { + used_in_blob.bound_encode(p); + } + } + void encode( + bufferlist::contiguous_appender& p, + uint64_t struct_v, + uint64_t sbid, + bool include_ref_map) const { + denc(blob, p, struct_v); + if (blob.is_shared()) { + denc(sbid, p); + } + if (include_ref_map) { + used_in_blob.encode(p); + } + } + void decode( + Collection *coll, + bufferptr::iterator& p, + uint64_t struct_v, + uint64_t* sbid, + bool include_ref_map); +#endif + }; + typedef boost::intrusive_ptr BlobRef; + typedef mempool::bluestore_cache_other::map blob_map_t; + + /// a logical extent, pointing to (some portion of) a blob + typedef boost::intrusive::set_base_hook > ExtentBase; //making an alias to avoid build warnings + struct Extent : public ExtentBase { + MEMPOOL_CLASS_HELPERS(); + + uint32_t logical_offset = 0; ///< logical offset + uint32_t blob_offset = 0; ///< blob offset + uint32_t length = 0; ///< length + BlobRef blob; ///< the blob with our data + + /// ctor for lookup only + explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { } + /// ctor for delayed initialization (see decode_some()) + explicit Extent() : ExtentBase() { + } + /// ctor for general usage + Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) + : ExtentBase(), + logical_offset(lo), blob_offset(o), length(l) { + assign_blob(b); + } + ~Extent() { + if (blob) { + blob->shared_blob->get_cache()->rm_extent(); + } + } + + void assign_blob(const BlobRef& b) { + assert(!blob); + blob = b; + blob->shared_blob->get_cache()->add_extent(); + } + + // comparators for intrusive_set + friend bool operator<(const Extent &a, const Extent &b) { + return a.logical_offset < b.logical_offset; + } + friend bool operator>(const Extent &a, const Extent &b) { + return a.logical_offset > b.logical_offset; + } + friend bool operator==(const Extent &a, const Extent &b) { + return a.logical_offset == b.logical_offset; + } + + uint32_t blob_start() const { + return logical_offset - blob_offset; + } + + uint32_t blob_end() const { + return blob_start() + blob->get_blob().get_logical_length(); + } + + uint32_t logical_end() const { + return logical_offset + length; + } + + // return true if any piece of the blob is out of + // the given range [o, o + l]. + bool blob_escapes_range(uint32_t o, uint32_t l) const { + return blob_start() < o || blob_end() > o + l; + } + }; + typedef boost::intrusive::set extent_map_t; + + + friend ostream& operator<<(ostream& out, const Extent& e); + + struct OldExtent { + boost::intrusive::list_member_hook<> old_extent_item; + Extent e; + PExtentVector r; + bool blob_empty; // flag to track the last removed extent that makes blob + // empty - required to update compression stat properly + OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) + : e(lo, o, l, b), blob_empty(false) { + } + static OldExtent* create(CollectionRef c, + uint32_t lo, + uint32_t o, + uint32_t l, + BlobRef& b); + }; + typedef boost::intrusive::list< + OldExtent, + boost::intrusive::member_hook< + OldExtent, + boost::intrusive::list_member_hook<>, + &OldExtent::old_extent_item> > old_extent_map_t; + + struct Onode; + + /// a sharded extent map, mapping offsets to lextents to blobs + struct ExtentMap { + Onode *onode; + extent_map_t extent_map; ///< map of Extents to Blobs + blob_map_t spanning_blob_map; ///< blobs that span shards + + struct Shard { + bluestore_onode_t::shard_info *shard_info = nullptr; + unsigned extents = 0; ///< count extents in this shard + bool loaded = false; ///< true if shard is loaded + bool dirty = false; ///< true if shard is dirty and needs reencoding + }; + mempool::bluestore_cache_other::vector shards; ///< shards + + bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty + + uint32_t needs_reshard_begin = 0; + uint32_t needs_reshard_end = 0; + + bool needs_reshard() const { + return needs_reshard_end > needs_reshard_begin; + } + void clear_needs_reshard() { + needs_reshard_begin = needs_reshard_end = 0; + } + void request_reshard(uint32_t begin, uint32_t end) { + if (begin < needs_reshard_begin) { + needs_reshard_begin = begin; + } + if (end > needs_reshard_end) { + needs_reshard_end = end; + } + } + + struct DeleteDisposer { + void operator()(Extent *e) { delete e; } + }; + + ExtentMap(Onode *o); + ~ExtentMap() { + extent_map.clear_and_dispose(DeleteDisposer()); + } + + void clear() { + extent_map.clear_and_dispose(DeleteDisposer()); + shards.clear(); + inline_bl.clear(); + clear_needs_reshard(); + } + + bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl, + unsigned *pn); + unsigned decode_some(bufferlist& bl); + + void bound_encode_spanning_blobs(size_t& p); + void encode_spanning_blobs(bufferlist::contiguous_appender& p); + void decode_spanning_blobs(bufferptr::iterator& p); + + BlobRef get_spanning_blob(int id) { + auto p = spanning_blob_map.find(id); + assert(p != spanning_blob_map.end()); + return p->second; + } + + void update(KeyValueDB::Transaction t, bool force); + decltype(BlueStore::Blob::id) allocate_spanning_blob_id(); + void reshard( + KeyValueDB *db, + KeyValueDB::Transaction t); + + /// initialize Shards from the onode + void init_shards(bool loaded, bool dirty); + + /// return index of shard containing offset + /// or -1 if not found + int seek_shard(uint32_t offset) { + size_t end = shards.size(); + size_t mid, left = 0; + size_t right = end; // one passed the right end + + while (left < right) { + mid = left + (right - left) / 2; + if (offset >= shards[mid].shard_info->offset) { + size_t next = mid + 1; + if (next >= end || offset < shards[next].shard_info->offset) + return mid; + //continue to search forwards + left = next; + } else { + //continue to search backwards + right = mid; + } + } + + return -1; // not found + } + + /// check if a range spans a shard + bool spans_shard(uint32_t offset, uint32_t length) { + if (shards.empty()) { + return false; + } + int s = seek_shard(offset); + assert(s >= 0); + if (s == (int)shards.size() - 1) { + return false; // last shard + } + if (offset + length <= shards[s+1].shard_info->offset) { + return false; + } + return true; + } + + /// ensure that a range of the map is loaded + void fault_range(KeyValueDB *db, + uint32_t offset, uint32_t length); + + /// ensure a range of the map is marked dirty + void dirty_range(uint32_t offset, uint32_t length); + + /// for seek_lextent test + extent_map_t::iterator find(uint64_t offset); + + /// seek to the first lextent including or after offset + extent_map_t::iterator seek_lextent(uint64_t offset); + extent_map_t::const_iterator seek_lextent(uint64_t offset) const; + + /// add a new Extent + void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) { + extent_map.insert(*new Extent(lo, o, l, b)); + } + + /// remove (and delete) an Extent + void rm(extent_map_t::iterator p) { + extent_map.erase_and_dispose(p, DeleteDisposer()); + } + + bool has_any_lextents(uint64_t offset, uint64_t length); + + /// consolidate adjacent lextents in extent_map + int compress_extent_map(uint64_t offset, uint64_t length); + + /// punch a logical hole. add lextents to deref to target list. + void punch_hole(CollectionRef &c, + uint64_t offset, uint64_t length, + old_extent_map_t *old_extents); + + /// put new lextent into lextent_map overwriting existing ones if + /// any and update references accordingly + Extent *set_lextent(CollectionRef &c, + uint64_t logical_offset, + uint64_t offset, uint64_t length, + BlobRef b, + old_extent_map_t *old_extents); + + /// split a blob (and referring extents) + BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos); + }; + + /// Compressed Blob Garbage collector + /* + The primary idea of the collector is to estimate a difference between + allocation units(AU) currently present for compressed blobs and new AUs + required to store that data uncompressed. + Estimation is performed for protrusive extents within a logical range + determined by a concatenation of old_extents collection and specific(current) + write request. + The root cause for old_extents use is the need to handle blob ref counts + properly. Old extents still hold blob refs and hence we need to traverse + the collection to determine if blob to be released. + Protrusive extents are extents that fit into the blob set in action + (ones that are below the logical range from above) but not removed totally + due to the current write. + E.g. for + extent1 -> + blob1 + extent2 -> + blob2 + extent3 -> + blob1 + extent4 -> + blob3 + write(300~100) + protrusive extents are within the following ranges <0~300, 400~8192-400> + In this case existing AUs that might be removed due to GC (i.e. blob1) + use 2x4K bytes. + And new AUs expected after GC = 0 since extent1 to be merged into blob2. + Hence we should do a collect. + */ + class GarbageCollector + { + public: + /// return amount of allocation units that might be saved due to GC + int64_t estimate( + uint64_t offset, + uint64_t length, + const ExtentMap& extent_map, + const old_extent_map_t& old_extents, + uint64_t min_alloc_size); + + /// return a collection of extents to perform GC on + const vector& get_extents_to_collect() const { + return extents_to_collect; + } + GarbageCollector(CephContext* _cct) : cct(_cct) {} + + private: + struct BlobInfo { + uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob + int64_t expected_allocations = 0; ///< new alloc units required + ///< in case of gc fulfilled + bool collect_candidate = false; ///< indicate if blob has any extents + ///< eligible for GC. + extent_map_t::const_iterator first_lextent; ///< points to the first + ///< lextent referring to + ///< the blob if any. + ///< collect_candidate flag + ///< determines the validity + extent_map_t::const_iterator last_lextent; ///< points to the last + ///< lextent referring to + ///< the blob if any. + + BlobInfo(uint64_t ref_bytes) : + referenced_bytes(ref_bytes) { + } + }; + CephContext* cct; + map affected_blobs; ///< compressed blobs and their ref_map + ///< copies that are affected by the + ///< specific write + + vector extents_to_collect; ///< protrusive extents that should + ///< be collected if GC takes place + + boost::optional used_alloc_unit; ///< last processed allocation + ///< unit when traversing + ///< protrusive extents. + ///< Other extents mapped to + ///< this AU to be ignored + ///< (except the case where + ///< uncompressed extent follows + ///< compressed one - see below). + BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit + ///< caused expected_allocations + ///< counter increment at this blob. + ///< if uncompressed extent follows + ///< a decrement for the + ///< expected_allocations counter + ///< is needed + int64_t expected_allocations = 0; ///< new alloc units required in case + ///< of gc fulfilled + int64_t expected_for_release = 0; ///< alloc units currently used by + ///< compressed blobs that might + ///< gone after GC + uint64_t gc_start_offset; ///starting offset for GC + uint64_t gc_end_offset; ///ending offset for GC + + protected: + void process_protrusive_extents(const BlueStore::ExtentMap& extent_map, + uint64_t start_offset, + uint64_t end_offset, + uint64_t start_touch_offset, + uint64_t end_touch_offset, + uint64_t min_alloc_size); + }; + + struct OnodeSpace; + + /// an in-memory object + struct Onode { + MEMPOOL_CLASS_HELPERS(); + + std::atomic_int nref; ///< reference count + Collection *c; + + ghobject_t oid; + + /// key under PREFIX_OBJ where we are stored + mempool::bluestore_cache_other::string key; + + boost::intrusive::list_member_hook<> lru_item; + + bluestore_onode_t onode; ///< metadata stored as value in kv store + bool exists; ///< true if object logically exists + + ExtentMap extent_map; + + // track txc's that have not been committed to kv store (and whose + // effects cannot be read via the kvdb read methods) + std::atomic flushing_count = {0}; + std::mutex flush_lock; ///< protect flush_txns + std::condition_variable flush_cond; ///< wait here for uncommitted txns + + Onode(Collection *c, const ghobject_t& o, + const mempool::bluestore_cache_other::string& k) + : nref(0), + c(c), + oid(o), + key(k), + exists(false), + extent_map(this) { + } + + void flush(); + void get() { + ++nref; + } + void put() { + if (--nref == 0) + delete this; + } + }; + typedef boost::intrusive_ptr OnodeRef; + + + /// a cache (shard) of onodes and buffers + struct Cache { + CephContext* cct; + PerfCounters *logger; + std::recursive_mutex lock; ///< protect lru and other structures + + std::atomic num_extents = {0}; + std::atomic num_blobs = {0}; + + static Cache *create(CephContext* cct, string type, PerfCounters *logger); + + Cache(CephContext* cct) : cct(cct), logger(nullptr) {} + virtual ~Cache() {} + + virtual void _add_onode(OnodeRef& o, int level) = 0; + virtual void _rm_onode(OnodeRef& o) = 0; + virtual void _touch_onode(OnodeRef& o) = 0; + + virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0; + virtual void _rm_buffer(Buffer *b) = 0; + virtual void _move_buffer(Cache *src, Buffer *b) = 0; + virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0; + virtual void _touch_buffer(Buffer *b) = 0; + + virtual uint64_t _get_num_onodes() = 0; + virtual uint64_t _get_buffer_bytes() = 0; + + void add_extent() { + ++num_extents; + } + void rm_extent() { + --num_extents; + } + + void add_blob() { + ++num_blobs; + } + void rm_blob() { + --num_blobs; + } + + void trim(uint64_t target_bytes, + float target_meta_ratio, + float target_data_ratio, + float bytes_per_onode); + + void trim_all(); + + virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0; + + virtual void add_stats(uint64_t *onodes, uint64_t *extents, + uint64_t *blobs, + uint64_t *buffers, + uint64_t *bytes) = 0; + + bool empty() { + std::lock_guard l(lock); + return _get_num_onodes() == 0 && _get_buffer_bytes() == 0; + } + +#ifdef DEBUG_CACHE + virtual void _audit(const char *s) = 0; +#else + void _audit(const char *s) { /* no-op */ } +#endif + }; + + /// simple LRU cache for onodes and buffers + struct LRUCache : public Cache { + private: + typedef boost::intrusive::list< + Onode, + boost::intrusive::member_hook< + Onode, + boost::intrusive::list_member_hook<>, + &Onode::lru_item> > onode_lru_list_t; + typedef boost::intrusive::list< + Buffer, + boost::intrusive::member_hook< + Buffer, + boost::intrusive::list_member_hook<>, + &Buffer::lru_item> > buffer_lru_list_t; + + onode_lru_list_t onode_lru; + + buffer_lru_list_t buffer_lru; + uint64_t buffer_size = 0; + + public: + LRUCache(CephContext* cct) : Cache(cct) {} + uint64_t _get_num_onodes() override { + return onode_lru.size(); + } + void _add_onode(OnodeRef& o, int level) override { + if (level > 0) + onode_lru.push_front(*o); + else + onode_lru.push_back(*o); + } + void _rm_onode(OnodeRef& o) override { + auto q = onode_lru.iterator_to(*o); + onode_lru.erase(q); + } + void _touch_onode(OnodeRef& o) override; + + uint64_t _get_buffer_bytes() override { + return buffer_size; + } + void _add_buffer(Buffer *b, int level, Buffer *near) override { + if (near) { + auto q = buffer_lru.iterator_to(*near); + buffer_lru.insert(q, *b); + } else if (level > 0) { + buffer_lru.push_front(*b); + } else { + buffer_lru.push_back(*b); + } + buffer_size += b->length; + } + void _rm_buffer(Buffer *b) override { + assert(buffer_size >= b->length); + buffer_size -= b->length; + auto q = buffer_lru.iterator_to(*b); + buffer_lru.erase(q); + } + void _move_buffer(Cache *src, Buffer *b) override { + src->_rm_buffer(b); + _add_buffer(b, 0, nullptr); + } + void _adjust_buffer_size(Buffer *b, int64_t delta) override { + assert((int64_t)buffer_size + delta >= 0); + buffer_size += delta; + } + void _touch_buffer(Buffer *b) override { + auto p = buffer_lru.iterator_to(*b); + buffer_lru.erase(p); + buffer_lru.push_front(*b); + _audit("_touch_buffer end"); + } + + void _trim(uint64_t onode_max, uint64_t buffer_max) override; + + void add_stats(uint64_t *onodes, uint64_t *extents, + uint64_t *blobs, + uint64_t *buffers, + uint64_t *bytes) override { + std::lock_guard l(lock); + *onodes += onode_lru.size(); + *extents += num_extents; + *blobs += num_blobs; + *buffers += buffer_lru.size(); + *bytes += buffer_size; + } + +#ifdef DEBUG_CACHE + void _audit(const char *s) override; +#endif + }; + + // 2Q cache for buffers, LRU for onodes + struct TwoQCache : public Cache { + private: + // stick with LRU for onodes for now (fixme?) + typedef boost::intrusive::list< + Onode, + boost::intrusive::member_hook< + Onode, + boost::intrusive::list_member_hook<>, + &Onode::lru_item> > onode_lru_list_t; + typedef boost::intrusive::list< + Buffer, + boost::intrusive::member_hook< + Buffer, + boost::intrusive::list_member_hook<>, + &Buffer::lru_item> > buffer_list_t; + + onode_lru_list_t onode_lru; + + buffer_list_t buffer_hot; ///< "Am" hot buffers + buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers + buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted + uint64_t buffer_bytes = 0; ///< bytes + + enum { + BUFFER_NEW = 0, + BUFFER_WARM_IN, ///< in buffer_warm_in + BUFFER_WARM_OUT, ///< in buffer_warm_out + BUFFER_HOT, ///< in buffer_hot + BUFFER_TYPE_MAX + }; + + uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type + + public: + TwoQCache(CephContext* cct) : Cache(cct) {} + uint64_t _get_num_onodes() override { + return onode_lru.size(); + } + void _add_onode(OnodeRef& o, int level) override { + if (level > 0) + onode_lru.push_front(*o); + else + onode_lru.push_back(*o); + } + void _rm_onode(OnodeRef& o) override { + auto q = onode_lru.iterator_to(*o); + onode_lru.erase(q); + } + void _touch_onode(OnodeRef& o) override; + + uint64_t _get_buffer_bytes() override { + return buffer_bytes; + } + void _add_buffer(Buffer *b, int level, Buffer *near) override; + void _rm_buffer(Buffer *b) override; + void _move_buffer(Cache *src, Buffer *b) override; + void _adjust_buffer_size(Buffer *b, int64_t delta) override; + void _touch_buffer(Buffer *b) override { + switch (b->cache_private) { + case BUFFER_WARM_IN: + // do nothing (somewhat counter-intuitively!) + break; + case BUFFER_WARM_OUT: + // move from warm_out to hot LRU + assert(0 == "this happens via discard hint"); + break; + case BUFFER_HOT: + // move to front of hot LRU + buffer_hot.erase(buffer_hot.iterator_to(*b)); + buffer_hot.push_front(*b); + break; + } + _audit("_touch_buffer end"); + } + + void _trim(uint64_t onode_max, uint64_t buffer_max) override; + + void add_stats(uint64_t *onodes, uint64_t *extents, + uint64_t *blobs, + uint64_t *buffers, + uint64_t *bytes) override { + std::lock_guard l(lock); + *onodes += onode_lru.size(); + *extents += num_extents; + *blobs += num_blobs; + *buffers += buffer_hot.size() + buffer_warm_in.size(); + *bytes += buffer_bytes; + } + +#ifdef DEBUG_CACHE + void _audit(const char *s) override; +#endif + }; + + struct OnodeSpace { + private: + Cache *cache; + + /// forward lookups + mempool::bluestore_cache_other::unordered_map onode_map; + + friend class Collection; // for split_cache() + + public: + OnodeSpace(Cache *c) : cache(c) {} + ~OnodeSpace() { + clear(); + } + + OnodeRef add(const ghobject_t& oid, OnodeRef o); + OnodeRef lookup(const ghobject_t& o); + void remove(const ghobject_t& oid) { + onode_map.erase(oid); + } + void rename(OnodeRef& o, const ghobject_t& old_oid, + const ghobject_t& new_oid, + const mempool::bluestore_cache_other::string& new_okey); + void clear(); + bool empty(); + + void dump(CephContext *cct, int lvl); + + /// return true if f true for any item + bool map_any(std::function f); + }; + + struct Collection : public CollectionImpl { + BlueStore *store; + Cache *cache; ///< our cache shard + coll_t cid; + bluestore_cnode_t cnode; + RWLock lock; + + bool exists; + + SharedBlobSet shared_blob_set; ///< open SharedBlobs + + // cache onodes on a per-collection basis to avoid lock + // contention. + OnodeSpace onode_map; + + //pool options + pool_opts_t pool_opts; + + OnodeRef get_onode(const ghobject_t& oid, bool create); + + // the terminology is confusing here, sorry! + // + // blob_t shared_blob_t + // !shared unused -> open + // shared !loaded -> open + shared + // shared loaded -> open + shared + loaded + // + // i.e., + // open = SharedBlob is instantiated + // shared = blob_t shared flag is set; SharedBlob is hashed. + // loaded = SharedBlob::shared_blob_t is loaded from kv store + void open_shared_blob(uint64_t sbid, BlobRef b); + void load_shared_blob(SharedBlobRef sb); + void make_blob_shared(uint64_t sbid, BlobRef b); + uint64_t make_blob_unshared(SharedBlob *sb); + + BlobRef new_blob() { + BlobRef b = new Blob(); + b->shared_blob = new SharedBlob(this); + return b; + } + + const coll_t &get_cid() override { + return cid; + } + + bool contains(const ghobject_t& oid) { + if (cid.is_meta()) + return oid.hobj.pool == -1; + spg_t spgid; + if (cid.is_pg(&spgid)) + return + spgid.pgid.contains(cnode.bits, oid) && + oid.shard_id == spgid.shard; + return false; + } + + void split_cache(Collection *dest); + + Collection(BlueStore *ns, Cache *ca, coll_t c); + }; + + class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { + CollectionRef c; + OnodeRef o; + KeyValueDB::Iterator it; + string head, tail; + public: + OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it); + int seek_to_first() override; + int upper_bound(const string &after) override; + int lower_bound(const string &to) override; + bool valid() override; + int next(bool validate=true) override; + string key() override; + bufferlist value() override; + int status() override { + return 0; + } + }; + + class OpSequencer; + typedef boost::intrusive_ptr OpSequencerRef; + + struct volatile_statfs{ + enum { + STATFS_ALLOCATED = 0, + STATFS_STORED, + STATFS_COMPRESSED_ORIGINAL, + STATFS_COMPRESSED, + STATFS_COMPRESSED_ALLOCATED, + STATFS_LAST + }; + int64_t values[STATFS_LAST]; + volatile_statfs() { + memset(this, 0, sizeof(volatile_statfs)); + } + void reset() { + *this = volatile_statfs(); + } + volatile_statfs& operator+=(const volatile_statfs& other) { + for (size_t i = 0; i < STATFS_LAST; ++i) { + values[i] += other.values[i]; + } + return *this; + } + int64_t& allocated() { + return values[STATFS_ALLOCATED]; + } + int64_t& stored() { + return values[STATFS_STORED]; + } + int64_t& compressed_original() { + return values[STATFS_COMPRESSED_ORIGINAL]; + } + int64_t& compressed() { + return values[STATFS_COMPRESSED]; + } + int64_t& compressed_allocated() { + return values[STATFS_COMPRESSED_ALLOCATED]; + } + bool is_empty() { + return values[STATFS_ALLOCATED] == 0 && + values[STATFS_STORED] == 0 && + values[STATFS_COMPRESSED] == 0 && + values[STATFS_COMPRESSED_ORIGINAL] == 0 && + values[STATFS_COMPRESSED_ALLOCATED] == 0; + } + void decode(bufferlist::iterator& it) { + for (size_t i = 0; i < STATFS_LAST; i++) { + ::decode(values[i], it); + } + } + + void encode(bufferlist& bl) { + for (size_t i = 0; i < STATFS_LAST; i++) { + ::encode(values[i], bl); + } + } + }; + + struct TransContext : public AioContext { + MEMPOOL_CLASS_HELPERS(); + + typedef enum { + STATE_PREPARE, + STATE_AIO_WAIT, + STATE_IO_DONE, + STATE_KV_QUEUED, // queued for kv_sync_thread submission + STATE_KV_SUBMITTED, // submitted to kv; not yet synced + STATE_KV_DONE, + STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running) + STATE_DEFERRED_CLEANUP, // remove deferred kv record + STATE_DEFERRED_DONE, + STATE_FINISHING, + STATE_DONE, + } state_t; + + state_t state = STATE_PREPARE; + + const char *get_state_name() { + switch (state) { + case STATE_PREPARE: return "prepare"; + case STATE_AIO_WAIT: return "aio_wait"; + case STATE_IO_DONE: return "io_done"; + case STATE_KV_QUEUED: return "kv_queued"; + case STATE_KV_SUBMITTED: return "kv_submitted"; + case STATE_KV_DONE: return "kv_done"; + case STATE_DEFERRED_QUEUED: return "deferred_queued"; + case STATE_DEFERRED_CLEANUP: return "deferred_cleanup"; + case STATE_DEFERRED_DONE: return "deferred_done"; + case STATE_FINISHING: return "finishing"; + case STATE_DONE: return "done"; + } + return "???"; + } + +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + const char *get_state_latency_name(int state) { + switch (state) { + case l_bluestore_state_prepare_lat: return "prepare"; + case l_bluestore_state_aio_wait_lat: return "aio_wait"; + case l_bluestore_state_io_done_lat: return "io_done"; + case l_bluestore_state_kv_queued_lat: return "kv_queued"; + case l_bluestore_state_kv_committing_lat: return "kv_committing"; + case l_bluestore_state_kv_done_lat: return "kv_done"; + case l_bluestore_state_deferred_queued_lat: return "deferred_queued"; + case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup"; + case l_bluestore_state_finishing_lat: return "finishing"; + case l_bluestore_state_done_lat: return "done"; + } + return "???"; + } +#endif + + void log_state_latency(PerfCounters *logger, int state) { + utime_t lat, now = ceph_clock_now(); + lat = now - last_stamp; + logger->tinc(state, lat); +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) { + double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000; + OID_ELAPSED("", usecs, get_state_latency_name(state)); + } +#endif + last_stamp = now; + } + + OpSequencerRef osr; + boost::intrusive::list_member_hook<> sequencer_item; + + uint64_t bytes = 0, cost = 0; + + set onodes; ///< these need to be updated/written + set modified_objects; ///< objects we modified (and need a ref) + set shared_blobs; ///< these need to be updated/written + set shared_blobs_written; ///< update these on io completion + + KeyValueDB::Transaction t; ///< then we will commit this + Context *oncommit = nullptr; ///< signal on commit + Context *onreadable = nullptr; ///< signal on readable + Context *onreadable_sync = nullptr; ///< signal on readable + list oncommits; ///< more commit completions + list removed_collections; ///< colls we removed + + boost::intrusive::list_member_hook<> deferred_queue_item; + bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any + + interval_set allocated, released; + volatile_statfs statfs_delta; + + IOContext ioc; + bool had_ios = false; ///< true if we submitted IOs before our kv txn + + uint64_t seq = 0; + utime_t start; + utime_t last_stamp; + + uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated + uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated + + explicit TransContext(CephContext* cct, OpSequencer *o) + : osr(o), + ioc(cct, this), + start(ceph_clock_now()) { + last_stamp = start; + } + ~TransContext() { + delete deferred_txn; + } + + void write_onode(OnodeRef &o) { + onodes.insert(o); + } + void write_shared_blob(SharedBlobRef &sb) { + shared_blobs.insert(sb); + } + void unshare_blob(SharedBlob *sb) { + shared_blobs.erase(sb); + } + + /// note we logically modified object (when onode itself is unmodified) + void note_modified_object(OnodeRef &o) { + // onode itself isn't written, though + modified_objects.insert(o); + } + void removed(OnodeRef& o) { + onodes.erase(o); + modified_objects.erase(o); + } + + void aio_finish(BlueStore *store) override { + store->txc_aio_finish(this); + } + }; + + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::deferred_queue_item> > deferred_queue_t; + + struct DeferredBatch : public AioContext { + OpSequencer *osr; + struct deferred_io { + bufferlist bl; ///< data + uint64_t seq; ///< deferred transaction seq + }; + map iomap; ///< map of ios in this batch + deferred_queue_t txcs; ///< txcs in this batch + IOContext ioc; ///< our aios + /// bytes of pending io for each deferred seq (may be 0) + map seq_bytes; + + void _discard(CephContext *cct, uint64_t offset, uint64_t length); + void _audit(CephContext *cct); + + DeferredBatch(CephContext *cct, OpSequencer *osr) + : osr(osr), ioc(cct, this) {} + + /// prepare a write + void prepare_write(CephContext *cct, + uint64_t seq, uint64_t offset, uint64_t length, + bufferlist::const_iterator& p); + + void aio_finish(BlueStore *store) override { + store->_deferred_aio_finish(osr); + } + }; + + class OpSequencer : public Sequencer_impl { + public: + std::mutex qlock; + std::condition_variable qcond; + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::sequencer_item> > q_list_t; + q_list_t q; ///< transactions + + boost::intrusive::list_member_hook<> deferred_osr_queue_item; + + DeferredBatch *deferred_running = nullptr; + DeferredBatch *deferred_pending = nullptr; + + Sequencer *parent; + BlueStore *store; + + uint64_t last_seq = 0; + + std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io + + std::atomic_int kv_committing_serially = {0}; + + std::atomic_int kv_submitted_waiters = {0}; + + std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set + std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away + + OpSequencer(CephContext* cct, BlueStore *store) + : Sequencer_impl(cct), + parent(NULL), store(store) { + store->register_osr(this); + } + ~OpSequencer() override { + assert(q.empty()); + _unregister(); + } + + void discard() override { + // Note that we may have txc's in flight when the parent Sequencer + // goes away. Reflect this with zombie==registered==true and let + // _osr_drain_all clean up later. + assert(!zombie); + zombie = true; + parent = nullptr; + bool empty; + { + std::lock_guard l(qlock); + empty = q.empty(); + } + if (empty) { + _unregister(); + } + } + + void _unregister() { + if (registered) { + store->unregister_osr(this); + registered = false; + } + } + + void queue_new(TransContext *txc) { + std::lock_guard l(qlock); + txc->seq = ++last_seq; + q.push_back(*txc); + } + + void drain() { + std::unique_lock l(qlock); + while (!q.empty()) + qcond.wait(l); + } + + void drain_preceding(TransContext *txc) { + std::unique_lock l(qlock); + while (!q.empty() && &q.front() != txc) + qcond.wait(l); + } + + bool _is_all_kv_submitted() { + // caller must hold qlock + if (q.empty()) { + return true; + } + TransContext *txc = &q.back(); + if (txc->state >= TransContext::STATE_KV_SUBMITTED) { + return true; + } + return false; + } + + void flush() override { + std::unique_lock l(qlock); + while (true) { + // set flag before the check because the condition + // may become true outside qlock, and we need to make + // sure those threads see waiters and signal qcond. + ++kv_submitted_waiters; + if (_is_all_kv_submitted()) { + return; + } + qcond.wait(l); + --kv_submitted_waiters; + } + } + + bool flush_commit(Context *c) override { + std::lock_guard l(qlock); + if (q.empty()) { + return true; + } + TransContext *txc = &q.back(); + if (txc->state >= TransContext::STATE_KV_DONE) { + return true; + } + txc->oncommits.push_back(c); + return false; + } + }; + + typedef boost::intrusive::list< + OpSequencer, + boost::intrusive::member_hook< + OpSequencer, + boost::intrusive::list_member_hook<>, + &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t; + + struct KVSyncThread : public Thread { + BlueStore *store; + explicit KVSyncThread(BlueStore *s) : store(s) {} + void *entry() override { + store->_kv_sync_thread(); + return NULL; + } + }; + struct KVFinalizeThread : public Thread { + BlueStore *store; + explicit KVFinalizeThread(BlueStore *s) : store(s) {} + void *entry() { + store->_kv_finalize_thread(); + return NULL; + } + }; + + struct DBHistogram { + struct value_dist { + uint64_t count; + uint32_t max_len; + }; + + struct key_dist { + uint64_t count; + uint32_t max_len; + map val_map; ///< slab id to count, max length of value and key + }; + + map > key_hist; + map value_hist; + int get_key_slab(size_t sz); + string get_key_slab_to_range(int slab); + int get_value_slab(size_t sz); + string get_value_slab_to_range(int slab); + void update_hist_entry(map > &key_hist, + const string &prefix, size_t key_size, size_t value_size); + void dump(Formatter *f); + }; + + // -------------------------------------------------------- + // members +private: + BlueFS *bluefs = nullptr; + unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing + bool bluefs_single_shared_device = true; + utime_t bluefs_last_balance; + + KeyValueDB *db = nullptr; + BlockDevice *bdev = nullptr; + std::string freelist_type; + FreelistManager *fm = nullptr; + Allocator *alloc = nullptr; + uuid_d fsid; + int path_fd = -1; ///< open handle to $path + int fsid_fd = -1; ///< open handle (locked) to $path/fsid + bool mounted = false; + + RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map + mempool::bluestore_cache_other::unordered_map coll_map; + + vector cache_shards; + + std::mutex osr_lock; ///< protect osd_set + std::set osr_set; ///< set of all OpSequencers + + std::atomic nid_last = {0}; + std::atomic nid_max = {0}; + std::atomic blobid_last = {0}; + std::atomic blobid_max = {0}; + + Throttle throttle_bytes; ///< submit to commit + Throttle throttle_deferred_bytes; ///< submit to deferred complete + + interval_set bluefs_extents; ///< block extents owned by bluefs + interval_set bluefs_extents_reclaiming; ///< currently reclaiming + + std::mutex deferred_lock; + std::atomic deferred_seq = {0}; + deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending + int deferred_queue_size = 0; ///< num txc's queued across all osrs + atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread + Finisher deferred_finisher; + + int m_finisher_num = 1; + vector finishers; + + KVSyncThread kv_sync_thread; + std::mutex kv_lock; + std::condition_variable kv_cond; + bool _kv_only = false; + bool kv_sync_started = false; + bool kv_stop = false; + bool kv_finalize_started = false; + bool kv_finalize_stop = false; + deque kv_queue; ///< ready, already submitted + deque kv_queue_unsubmitted; ///< ready, need submit by kv thread + deque kv_committing; ///< currently syncing + deque deferred_done_queue; ///< deferred ios done + deque deferred_stable_queue; ///< deferred ios done + stable + + KVFinalizeThread kv_finalize_thread; + std::mutex kv_finalize_lock; + std::condition_variable kv_finalize_cond; + deque kv_committing_to_finalize; ///< pending finalization + deque deferred_stable_to_finalize; ///< pending finalization + + PerfCounters *logger = nullptr; + + std::mutex reap_lock; + list removed_collections; + + RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"}; + set debug_data_error_objects; + set debug_mdata_error_objects; + + std::atomic csum_type = {Checksummer::CSUM_CRC32C}; + + uint64_t block_size = 0; ///< block size of block device (power of 2) + uint64_t block_mask = 0; ///< mask to get just the block offset + size_t block_size_order = 0; ///< bits to shift to get block size + + uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2) + ///< bits for min_alloc_size + uint8_t min_alloc_size_order = 0; + static_assert(std::numeric_limits::max() > + std::numeric_limits::digits, + "not enough bits for min_alloc_size"); + + ///< maximum allocation unit (power of 2) + std::atomic max_alloc_size = {0}; + + ///< number threshold for forced deferred writes + std::atomic deferred_batch_ops = {0}; + + ///< size threshold for forced deferred writes + std::atomic prefer_deferred_size = {0}; + + ///< approx cost per io, in bytes + std::atomic throttle_cost_per_io = {0}; + + std::atomic comp_mode = + {Compressor::COMP_NONE}; ///< compression mode + CompressorRef compressor; + std::atomic comp_min_blob_size = {0}; + std::atomic comp_max_blob_size = {0}; + + std::atomic max_blob_size = {0}; ///< maximum blob size + + uint64_t kv_ios = 0; + uint64_t kv_throttle_costs = 0; + + // cache trim control + uint64_t cache_size = 0; ///< total cache size + float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata + float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb) + float cache_data_ratio = 0; ///< cache ratio dedicated to object data + + std::mutex vstatfs_lock; + volatile_statfs vstatfs; + + struct MempoolThread : public Thread { + BlueStore *store; + Cond cond; + Mutex lock; + bool stop = false; + public: + explicit MempoolThread(BlueStore *s) + : store(s), + lock("BlueStore::MempoolThread::lock") {} + void *entry() override; + void init() { + assert(stop == false); + create("bstore_mempool"); + } + void shutdown() { + lock.Lock(); + stop = true; + cond.Signal(); + lock.Unlock(); + join(); + } + } mempool_thread; + + // -------------------------------------------------------- + // private methods + + void _init_logger(); + void _shutdown_logger(); + int _reload_logger(); + + int _open_path(); + void _close_path(); + int _open_fsid(bool create); + int _lock_fsid(); + int _read_fsid(uuid_d *f); + int _write_fsid(); + void _close_fsid(); + void _set_alloc_sizes(); + void _set_blob_size(); + + int _open_bdev(bool create); + void _close_bdev(); + int _open_db(bool create); + void _close_db(); + int _open_fm(bool create); + void _close_fm(); + int _open_alloc(); + void _close_alloc(); + int _open_collections(int *errors=0); + void _close_collections(); + + int _setup_block_symlink_or_file(string name, string path, uint64_t size, + bool create); + +public: + static int _write_bdev_label(CephContext* cct, + string path, bluestore_bdev_label_t label); + static int _read_bdev_label(CephContext* cct, string path, + bluestore_bdev_label_t *label); +private: + int _check_or_set_bdev_label(string path, uint64_t size, string desc, + bool create); + + int _open_super_meta(); + + void _open_statfs(); + + int _reconcile_bluefs_freespace(); + int _balance_bluefs_freespace(PExtentVector *extents); + void _commit_bluefs_freespace(const PExtentVector& extents); + + CollectionRef _get_collection(const coll_t& cid); + void _queue_reap_collection(CollectionRef& c); + void _reap_collections(); + void _update_cache_logger(); + + void _assign_nid(TransContext *txc, OnodeRef o); + uint64_t _assign_blobid(TransContext *txc); + + void _dump_onode(OnodeRef o, int log_level=30); + void _dump_extent_map(ExtentMap& em, int log_level=30); + void _dump_transaction(Transaction *t, int log_level = 30); + + TransContext *_txc_create(OpSequencer *osr); + void _txc_update_store_statfs(TransContext *txc); + void _txc_add_transaction(TransContext *txc, Transaction *t); + void _txc_calc_cost(TransContext *txc); + void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t); + void _txc_state_proc(TransContext *txc); + void _txc_aio_submit(TransContext *txc); +public: + void txc_aio_finish(void *p) { + _txc_state_proc(static_cast(p)); + } +private: + void _txc_finish_io(TransContext *txc); + void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t); + void _txc_applied_kv(TransContext *txc); + void _txc_committed_kv(TransContext *txc); + void _txc_finish(TransContext *txc); + void _txc_release_alloc(TransContext *txc); + + void _osr_drain_preceding(TransContext *txc); + void _osr_drain_all(); + void _osr_unregister_all(); + + void _kv_start(); + void _kv_stop(); + void _kv_sync_thread(); + void _kv_finalize_thread(); + + bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o); + void _deferred_queue(TransContext *txc); +public: + void deferred_try_submit(); +private: + void _deferred_submit_unlock(OpSequencer *osr); + void _deferred_aio_finish(OpSequencer *osr); + int _deferred_replay(); + +public: + using mempool_dynamic_bitset = + boost::dynamic_bitset>; + +private: + int _fsck_check_extents( + const ghobject_t& oid, + const PExtentVector& extents, + bool compressed, + mempool_dynamic_bitset &used_blocks, + store_statfs_t& expected_statfs); + + void _buffer_cache_write( + TransContext *txc, + BlobRef b, + uint64_t offset, + bufferlist& bl, + unsigned flags) { + b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl, + flags); + txc->shared_blobs_written.insert(b->shared_blob); + } + + int _collection_list( + Collection *c, const ghobject_t& start, const ghobject_t& end, + int max, vector *ls, ghobject_t *next); + + template + T select_option(const std::string& opt_name, T val1, F f) { + //NB: opt_name reserved for future use + boost::optional val2 = f(); + if (val2) { + return *val2; + } + return val1; + } + + void _apply_padding(uint64_t head_pad, + uint64_t tail_pad, + bufferlist& padded); + + // -- ondisk version --- +public: + const int32_t latest_ondisk_format = 2; ///< our version + const int32_t min_readable_ondisk_format = 1; ///< what we can read + const int32_t min_compat_ondisk_format = 2; ///< who can read us + +private: + int32_t ondisk_format = 0; ///< value detected on mount + + int _upgrade_super(); ///< upgrade (called during open_super) + void _prepare_ondisk_format_super(KeyValueDB::Transaction& t); + + // --- public interface --- +public: + BlueStore(CephContext *cct, const string& path); + BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only + ~BlueStore() override; + + string get_type() override { + return "bluestore"; + } + + bool needs_journal() override { return false; }; + bool wants_journal() override { return false; }; + bool allows_journal() override { return false; }; + + bool is_rotational() override; + bool is_journal_rotational() override; + + string get_default_device_class() override { + string device_class; + map metadata; + collect_metadata(&metadata); + auto it = metadata.find("bluestore_bdev_type"); + if (it != metadata.end()) { + device_class = it->second; + } + return device_class; + } + + static int get_block_device_fsid(CephContext* cct, const string& path, + uuid_d *fsid); + + bool test_mount_in_use() override; + +private: + int _mount(bool kv_only); +public: + int mount() override { + return _mount(false); + } + int umount() override; + + int start_kv_only(KeyValueDB **pdb) { + int r = _mount(true); + if (r < 0) + return r; + *pdb = db; + return 0; + } + + int write_meta(const std::string& key, const std::string& value) override; + int read_meta(const std::string& key, std::string *value) override; + + + int fsck(bool deep) override { + return _fsck(deep, false); + } + int repair(bool deep) override { + return _fsck(deep, true); + } + int _fsck(bool deep, bool repair); + + void set_cache_shards(unsigned num) override; + + int validate_hobject_key(const hobject_t &obj) const override { + return 0; + } + unsigned get_max_attr_name_length() override { + return 256; // arbitrary; there is no real limit internally + } + + int mkfs() override; + int mkjournal() override { + return 0; + } + + void get_db_statistics(Formatter *f) override; + void generate_db_histogram(Formatter *f) override; + void _flush_cache(); + void flush_cache() override; + void dump_perf_counters(Formatter *f) override { + f->open_object_section("perf_counters"); + logger->dump_formatted(f, false); + f->close_section(); + } + + void register_osr(OpSequencer *osr) { + std::lock_guard l(osr_lock); + osr_set.insert(osr); + } + void unregister_osr(OpSequencer *osr) { + std::lock_guard l(osr_lock); + osr_set.erase(osr); + } + +public: + int statfs(struct store_statfs_t *buf) override; + + void collect_metadata(map *pm) override; + + bool exists(const coll_t& cid, const ghobject_t& oid) override; + bool exists(CollectionHandle &c, const ghobject_t& oid) override; + int set_collection_opts( + const coll_t& cid, + const pool_opts_t& opts) override; + int stat( + const coll_t& cid, + const ghobject_t& oid, + struct stat *st, + bool allow_eio = false) override; + int stat( + CollectionHandle &c, + const ghobject_t& oid, + struct stat *st, + bool allow_eio = false) override; + int read( + const coll_t& cid, + const ghobject_t& oid, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0) override; + int read( + CollectionHandle &c, + const ghobject_t& oid, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0) override; + int _do_read( + Collection *c, + OnodeRef o, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0); + +private: + int _fiemap(CollectionHandle &c_, const ghobject_t& oid, + uint64_t offset, size_t len, interval_set& destset); +public: + int fiemap(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, bufferlist& bl) override; + int fiemap(CollectionHandle &c, const ghobject_t& oid, + uint64_t offset, size_t len, bufferlist& bl) override; + int fiemap(const coll_t& cid, const ghobject_t& oid, + uint64_t offset, size_t len, map& destmap) override; + int fiemap(CollectionHandle &c, const ghobject_t& oid, + uint64_t offset, size_t len, map& destmap) override; + + + int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, + bufferptr& value) override; + int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name, + bufferptr& value) override; + + int getattrs(const coll_t& cid, const ghobject_t& oid, + map& aset) override; + int getattrs(CollectionHandle &c, const ghobject_t& oid, + map& aset) override; + + int list_collections(vector& ls) override; + + CollectionHandle open_collection(const coll_t &c) override; + + bool collection_exists(const coll_t& c) override; + int collection_empty(const coll_t& c, bool *empty) override; + int collection_bits(const coll_t& c) override; + + int collection_list(const coll_t& cid, + const ghobject_t& start, + const ghobject_t& end, + int max, + vector *ls, ghobject_t *next) override; + int collection_list(CollectionHandle &c, + const ghobject_t& start, + const ghobject_t& end, + int max, + vector *ls, ghobject_t *next) override; + + int omap_get( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + map *out /// < [out] Key to value map + ) override; + int omap_get( + CollectionHandle &c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + map *out /// < [out] Key to value map + ) override; + + /// Get omap header + int omap_get_header( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + bool allow_eio = false ///< [in] don't assert on eio + ) override; + int omap_get_header( + CollectionHandle &c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + bool allow_eio = false ///< [in] don't assert on eio + ) override; + + /// Get keys defined on oid + int omap_get_keys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + set *keys ///< [out] Keys defined on oid + ) override; + int omap_get_keys( + CollectionHandle &c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + set *keys ///< [out] Keys defined on oid + ) override; + + /// Get key values + int omap_get_values( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to get + map *out ///< [out] Returned keys and values + ) override; + int omap_get_values( + CollectionHandle &c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to get + map *out ///< [out] Returned keys and values + ) override; + + /// Filters keys into out which are defined on oid + int omap_check_keys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to check + set *out ///< [out] Subset of keys defined on oid + ) override; + int omap_check_keys( + CollectionHandle &c, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to check + set *out ///< [out] Subset of keys defined on oid + ) override; + + ObjectMap::ObjectMapIterator get_omap_iterator( + const coll_t& cid, ///< [in] collection + const ghobject_t &oid ///< [in] object + ) override; + ObjectMap::ObjectMapIterator get_omap_iterator( + CollectionHandle &c, ///< [in] collection + const ghobject_t &oid ///< [in] object + ) override; + + void set_fsid(uuid_d u) override { + fsid = u; + } + uuid_d get_fsid() override { + return fsid; + } + + uint64_t estimate_objects_overhead(uint64_t num_objects) override { + return num_objects * 300; //assuming per-object overhead is 300 bytes + } + + struct BSPerfTracker { + PerfCounters::avg_tracker os_commit_latency; + PerfCounters::avg_tracker os_apply_latency; + + objectstore_perf_stat_t get_cur_stats() const { + objectstore_perf_stat_t ret; + ret.os_commit_latency = os_commit_latency.current_avg(); + ret.os_apply_latency = os_apply_latency.current_avg(); + return ret; + } + + void update_from_perfcounters(PerfCounters &logger); + } perf_tracker; + + objectstore_perf_stat_t get_cur_stats() override { + perf_tracker.update_from_perfcounters(*logger); + return perf_tracker.get_cur_stats(); + } + const PerfCounters* get_perf_counters() const override { + return logger; + } + + int queue_transactions( + Sequencer *osr, + vector& tls, + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) override; + + // error injection + void inject_data_error(const ghobject_t& o) override { + RWLock::WLocker l(debug_read_error_lock); + debug_data_error_objects.insert(o); + } + void inject_mdata_error(const ghobject_t& o) override { + RWLock::WLocker l(debug_read_error_lock); + debug_mdata_error_objects.insert(o); + } + void compact() override { + assert(db); + db->compact(); + } + +private: + bool _debug_data_eio(const ghobject_t& o) { + if (!cct->_conf->bluestore_debug_inject_read_err) { + return false; + } + RWLock::RLocker l(debug_read_error_lock); + return debug_data_error_objects.count(o); + } + bool _debug_mdata_eio(const ghobject_t& o) { + if (!cct->_conf->bluestore_debug_inject_read_err) { + return false; + } + RWLock::RLocker l(debug_read_error_lock); + return debug_mdata_error_objects.count(o); + } + void _debug_obj_on_delete(const ghobject_t& o) { + if (cct->_conf->bluestore_debug_inject_read_err) { + RWLock::WLocker l(debug_read_error_lock); + debug_data_error_objects.erase(o); + debug_mdata_error_objects.erase(o); + } + } + +private: + + // -------------------------------------------------------- + // read processing internal methods + int _verify_csum( + OnodeRef& o, + const bluestore_blob_t* blob, + uint64_t blob_xoffset, + const bufferlist& bl, + uint64_t logical_offset) const; + int _decompress(bufferlist& source, bufferlist* result); + + + // -------------------------------------------------------- + // write ops + + struct WriteContext { + bool buffered = false; ///< buffered write + bool compress = false; ///< compressed write + uint64_t target_blob_size = 0; ///< target (max) blob size + unsigned csum_order = 0; ///< target checksum chunk order + + old_extent_map_t old_extents; ///< must deref these blobs + + struct write_item { + uint64_t logical_offset; ///< write logical offset + BlobRef b; + uint64_t blob_length; + uint64_t b_off; + bufferlist bl; + uint64_t b_off0; ///< original offset in a blob prior to padding + uint64_t length0; ///< original data length prior to padding + + bool mark_unused; + bool new_blob; ///< whether new blob was created + + bool compressed = false; + bufferlist compressed_bl; + size_t compressed_len = 0; + + write_item( + uint64_t logical_offs, + BlobRef b, + uint64_t blob_len, + uint64_t o, + bufferlist& bl, + uint64_t o0, + uint64_t l0, + bool _mark_unused, + bool _new_blob) + : + logical_offset(logical_offs), + b(b), + blob_length(blob_len), + b_off(o), + bl(bl), + b_off0(o0), + length0(l0), + mark_unused(_mark_unused), + new_blob(_new_blob) {} + }; + vector writes; ///< blobs we're writing + + /// partial clone of the context + void fork(const WriteContext& other) { + buffered = other.buffered; + compress = other.compress; + target_blob_size = other.target_blob_size; + csum_order = other.csum_order; + } + void write( + uint64_t loffs, + BlobRef b, + uint64_t blob_len, + uint64_t o, + bufferlist& bl, + uint64_t o0, + uint64_t len0, + bool _mark_unused, + bool _new_blob) { + writes.emplace_back(loffs, + b, + blob_len, + o, + bl, + o0, + len0, + _mark_unused, + _new_blob); + } + /// Checks for writes to the same pextent within a blob + bool has_conflict( + BlobRef b, + uint64_t loffs, + uint64_t loffs_end, + uint64_t min_alloc_size); + }; + + void _do_write_small( + TransContext *txc, + CollectionRef &c, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist::iterator& blp, + WriteContext *wctx); + void _do_write_big( + TransContext *txc, + CollectionRef &c, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist::iterator& blp, + WriteContext *wctx); + int _do_alloc_write( + TransContext *txc, + CollectionRef c, + OnodeRef o, + WriteContext *wctx); + void _wctx_finish( + TransContext *txc, + CollectionRef& c, + OnodeRef o, + WriteContext *wctx, + set *maybe_unshared_blobs=0); + + int _do_transaction(Transaction *t, + TransContext *txc, + ThreadPool::TPHandle *handle); + + int _write(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len, + bufferlist& bl, + uint32_t fadvise_flags); + void _pad_zeros(bufferlist *bl, uint64_t *offset, + uint64_t chunk_size); + + void _choose_write_options(CollectionRef& c, + OnodeRef o, + uint32_t fadvise_flags, + WriteContext *wctx); + + int _do_gc(TransContext *txc, + CollectionRef& c, + OnodeRef o, + const GarbageCollector& gc, + const WriteContext& wctx, + uint64_t *dirty_start, + uint64_t *dirty_end); + + int _do_write(TransContext *txc, + CollectionRef &c, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist& bl, + uint32_t fadvise_flags); + void _do_write_data(TransContext *txc, + CollectionRef& c, + OnodeRef o, + uint64_t offset, + uint64_t length, + bufferlist& bl, + WriteContext *wctx); + + int _touch(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _do_zero(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len); + int _zero(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len); + void _do_truncate(TransContext *txc, + CollectionRef& c, + OnodeRef o, + uint64_t offset, + set *maybe_unshared_blobs=0); + int _truncate(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset); + int _remove(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _do_remove(TransContext *txc, + CollectionRef& c, + OnodeRef o); + int _setattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name, + bufferptr& val); + int _setattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const map& aset); + int _rmattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name); + int _rmattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + void _do_omap_clear(TransContext *txc, uint64_t id); + int _omap_clear(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _omap_setkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& bl); + int _omap_setheader(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& header); + int _omap_rmkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& bl); + int _omap_rmkey_range(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& first, const string& last); + int _set_alloc_hint( + TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags); + int _do_clone_range(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + uint64_t srcoff, uint64_t length, uint64_t dstoff); + int _clone(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo); + int _clone_range(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + uint64_t srcoff, uint64_t length, uint64_t dstoff); + int _rename(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + const ghobject_t& new_oid); + int _create_collection(TransContext *txc, const coll_t &cid, + unsigned bits, CollectionRef *c); + int _remove_collection(TransContext *txc, const coll_t &cid, + CollectionRef *c); + int _split_collection(TransContext *txc, + CollectionRef& c, + CollectionRef& d, + unsigned bits, int rem); +}; + +inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) { + return out << *s.parent; +} + +static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) { + o->get(); +} +static inline void intrusive_ptr_release(BlueStore::Onode *o) { + o->put(); +} + +static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) { + o->get(); +} +static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) { + o->put(); +} + +#endif