+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Red Hat
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_OSD_BLUESTORE_H
-#define CEPH_OSD_BLUESTORE_H
-
-#include "acconfig.h"
-
-#include <unistd.h>
-
-#include <atomic>
-#include <mutex>
-#include <condition_variable>
-
-#include <boost/intrusive/list.hpp>
-#include <boost/intrusive/unordered_set.hpp>
-#include <boost/intrusive/set.hpp>
-#include <boost/functional/hash.hpp>
-#include <boost/dynamic_bitset.hpp>
-
-#include "include/assert.h"
-#include "include/unordered_map.h"
-#include "include/memory.h"
-#include "include/mempool.h"
-#include "common/Finisher.h"
-#include "common/perf_counters.h"
-#include "compressor/Compressor.h"
-#include "os/ObjectStore.h"
-
-#include "bluestore_types.h"
-#include "BlockDevice.h"
-#include "common/EventTrace.h"
-
-class Allocator;
-class FreelistManager;
-class BlueFS;
-
-//#define DEBUG_CACHE
-//#define DEBUG_DEFERRED
-
-
-
-// constants for Buffer::optimize()
-#define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
-
-
-enum {
- l_bluestore_first = 732430,
- l_bluestore_kv_flush_lat,
- l_bluestore_kv_commit_lat,
- l_bluestore_kv_lat,
- l_bluestore_state_prepare_lat,
- l_bluestore_state_aio_wait_lat,
- l_bluestore_state_io_done_lat,
- l_bluestore_state_kv_queued_lat,
- l_bluestore_state_kv_committing_lat,
- l_bluestore_state_kv_done_lat,
- l_bluestore_state_deferred_queued_lat,
- l_bluestore_state_deferred_aio_wait_lat,
- l_bluestore_state_deferred_cleanup_lat,
- l_bluestore_state_finishing_lat,
- l_bluestore_state_done_lat,
- l_bluestore_throttle_lat,
- l_bluestore_submit_lat,
- l_bluestore_commit_lat,
- l_bluestore_read_lat,
- l_bluestore_read_onode_meta_lat,
- l_bluestore_read_wait_aio_lat,
- l_bluestore_compress_lat,
- l_bluestore_decompress_lat,
- l_bluestore_csum_lat,
- l_bluestore_compress_success_count,
- l_bluestore_compress_rejected_count,
- l_bluestore_write_pad_bytes,
- l_bluestore_deferred_write_ops,
- l_bluestore_deferred_write_bytes,
- l_bluestore_write_penalty_read_ops,
- l_bluestore_allocated,
- l_bluestore_stored,
- l_bluestore_compressed,
- l_bluestore_compressed_allocated,
- l_bluestore_compressed_original,
- l_bluestore_onodes,
- l_bluestore_onode_hits,
- l_bluestore_onode_misses,
- l_bluestore_onode_shard_hits,
- l_bluestore_onode_shard_misses,
- l_bluestore_extents,
- l_bluestore_blobs,
- l_bluestore_buffers,
- l_bluestore_buffer_bytes,
- l_bluestore_buffer_hit_bytes,
- l_bluestore_buffer_miss_bytes,
- l_bluestore_write_big,
- l_bluestore_write_big_bytes,
- l_bluestore_write_big_blobs,
- l_bluestore_write_small,
- l_bluestore_write_small_bytes,
- l_bluestore_write_small_unused,
- l_bluestore_write_small_deferred,
- l_bluestore_write_small_pre_read,
- l_bluestore_write_small_new,
- l_bluestore_txc,
- l_bluestore_onode_reshard,
- l_bluestore_blob_split,
- l_bluestore_extent_compress,
- l_bluestore_gc_merged,
- l_bluestore_last
-};
-
-class BlueStore : public ObjectStore,
- public md_config_obs_t {
- // -----------------------------------------------------
- // types
-public:
- // config observer
- const char** get_tracked_conf_keys() const override;
- void handle_conf_change(const struct md_config_t *conf,
- const std::set<std::string> &changed) override;
-
- void _set_csum();
- void _set_compression();
- void _set_throttle_params();
- int _set_cache_sizes();
-
- class TransContext;
-
- typedef map<uint64_t, bufferlist> ready_regions_t;
-
- struct BufferSpace;
- struct Collection;
- typedef boost::intrusive_ptr<Collection> CollectionRef;
-
- struct AioContext {
- virtual void aio_finish(BlueStore *store) = 0;
- virtual ~AioContext() {}
- };
-
- /// cached buffer
- struct Buffer {
- MEMPOOL_CLASS_HELPERS();
-
- enum {
- STATE_EMPTY, ///< empty buffer -- used for cache history
- STATE_CLEAN, ///< clean data that is up to date
- STATE_WRITING, ///< data that is being written (io not yet complete)
- };
- static const char *get_state_name(int s) {
- switch (s) {
- case STATE_EMPTY: return "empty";
- case STATE_CLEAN: return "clean";
- case STATE_WRITING: return "writing";
- default: return "???";
- }
- }
- enum {
- FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
- // NOTE: fix operator<< when you define a second flag
- };
- static const char *get_flag_name(int s) {
- switch (s) {
- case FLAG_NOCACHE: return "nocache";
- default: return "???";
- }
- }
-
- BufferSpace *space;
- uint16_t state; ///< STATE_*
- uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
- uint32_t flags; ///< FLAG_*
- uint64_t seq;
- uint32_t offset, length;
- bufferlist data;
-
- boost::intrusive::list_member_hook<> lru_item;
- boost::intrusive::list_member_hook<> state_item;
-
- Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
- unsigned f = 0)
- : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
- Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
- unsigned f = 0)
- : space(space), state(s), flags(f), seq(q), offset(o),
- length(b.length()), data(b) {}
-
- bool is_empty() const {
- return state == STATE_EMPTY;
- }
- bool is_clean() const {
- return state == STATE_CLEAN;
- }
- bool is_writing() const {
- return state == STATE_WRITING;
- }
-
- uint32_t end() const {
- return offset + length;
- }
-
- void truncate(uint32_t newlen) {
- assert(newlen < length);
- if (data.length()) {
- bufferlist t;
- t.substr_of(data, 0, newlen);
- data.claim(t);
- }
- length = newlen;
- }
- void maybe_rebuild() {
- if (data.length() &&
- (data.get_num_buffers() > 1 ||
- data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
- data.rebuild();
- }
- }
-
- void dump(Formatter *f) const {
- f->dump_string("state", get_state_name(state));
- f->dump_unsigned("seq", seq);
- f->dump_unsigned("offset", offset);
- f->dump_unsigned("length", length);
- f->dump_unsigned("data_length", data.length());
- }
- };
-
- struct Cache;
-
- /// map logical extent range (object) onto buffers
- struct BufferSpace {
- typedef boost::intrusive::list<
- Buffer,
- boost::intrusive::member_hook<
- Buffer,
- boost::intrusive::list_member_hook<>,
- &Buffer::state_item> > state_list_t;
-
- mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
- buffer_map;
-
- // we use a bare intrusive list here instead of std::map because
- // it uses less memory and we expect this to be very small (very
- // few IOs in flight to the same Blob at the same time).
- state_list_t writing; ///< writing buffers, sorted by seq, ascending
-
- ~BufferSpace() {
- assert(buffer_map.empty());
- assert(writing.empty());
- }
-
- void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
- cache->_audit("_add_buffer start");
- buffer_map[b->offset].reset(b);
- if (b->is_writing()) {
- b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
- if (writing.empty() || writing.rbegin()->seq <= b->seq) {
- writing.push_back(*b);
- } else {
- auto it = writing.begin();
- while (it->seq < b->seq) {
- ++it;
- }
-
- assert(it->seq >= b->seq);
- // note that this will insert b before it
- // hence the order is maintained
- writing.insert(it, *b);
- }
- } else {
- b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
- cache->_add_buffer(b, level, near);
- }
- cache->_audit("_add_buffer end");
- }
- void _rm_buffer(Cache* cache, Buffer *b) {
- _rm_buffer(cache, buffer_map.find(b->offset));
- }
- void _rm_buffer(Cache* cache,
- map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
- assert(p != buffer_map.end());
- cache->_audit("_rm_buffer start");
- if (p->second->is_writing()) {
- writing.erase(writing.iterator_to(*p->second));
- } else {
- cache->_rm_buffer(p->second.get());
- }
- buffer_map.erase(p);
- cache->_audit("_rm_buffer end");
- }
-
- map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
- uint32_t offset) {
- auto i = buffer_map.lower_bound(offset);
- if (i != buffer_map.begin()) {
- --i;
- if (i->first + i->second->length <= offset)
- ++i;
- }
- return i;
- }
-
- // must be called under protection of the Cache lock
- void _clear(Cache* cache);
-
- // return value is the highest cache_private of a trimmed buffer, or 0.
- int discard(Cache* cache, uint32_t offset, uint32_t length) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
- return _discard(cache, offset, length);
- }
- int _discard(Cache* cache, uint32_t offset, uint32_t length);
-
- void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
- unsigned flags) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
- Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
- flags);
- b->cache_private = _discard(cache, offset, bl.length());
- _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
- }
- void finish_write(Cache* cache, uint64_t seq);
- void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
- Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
- b->cache_private = _discard(cache, offset, bl.length());
- _add_buffer(cache, b, 1, nullptr);
- }
-
- void read(Cache* cache, uint32_t offset, uint32_t length,
- BlueStore::ready_regions_t& res,
- interval_set<uint32_t>& res_intervals);
-
- void truncate(Cache* cache, uint32_t offset) {
- discard(cache, offset, (uint32_t)-1 - offset);
- }
-
- void split(Cache* cache, size_t pos, BufferSpace &r);
-
- void dump(Cache* cache, Formatter *f) const {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
- f->open_array_section("buffers");
- for (auto& i : buffer_map) {
- f->open_object_section("buffer");
- assert(i.first == i.second->offset);
- i.second->dump(f);
- f->close_section();
- }
- f->close_section();
- }
- };
-
- struct SharedBlobSet;
-
- /// in-memory shared blob state (incl cached buffers)
- struct SharedBlob {
- MEMPOOL_CLASS_HELPERS();
-
- std::atomic_int nref = {0}; ///< reference count
- bool loaded = false;
-
- CollectionRef coll;
- union {
- uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
- bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
- };
- BufferSpace bc; ///< buffer cache
-
- SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
- if (get_cache()) {
- get_cache()->add_blob();
- }
- }
- SharedBlob(uint64_t i, Collection *_coll);
- ~SharedBlob();
-
- uint64_t get_sbid() const {
- return loaded ? persistent->sbid : sbid_unloaded;
- }
-
- friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
- friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
-
- friend ostream& operator<<(ostream& out, const SharedBlob& sb);
-
- void get() {
- ++nref;
- }
- void put();
-
- /// get logical references
- void get_ref(uint64_t offset, uint32_t length);
-
- /// put logical references, and get back any released extents
- void put_ref(uint64_t offset, uint32_t length,
- PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
-
- friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
- return l.get_sbid() == r.get_sbid();
- }
- inline Cache* get_cache() {
- return coll ? coll->cache : nullptr;
- }
- inline SharedBlobSet* get_parent() {
- return coll ? &(coll->shared_blob_set) : nullptr;
- }
- inline bool is_loaded() const {
- return loaded;
- }
-
- };
- typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
-
- /// a lookup table of SharedBlobs
- struct SharedBlobSet {
- std::mutex lock; ///< protect lookup, insertion, removal
-
- // we use a bare pointer because we don't want to affect the ref
- // count
- mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
-
- SharedBlobRef lookup(uint64_t sbid) {
- std::lock_guard<std::mutex> l(lock);
- auto p = sb_map.find(sbid);
- if (p == sb_map.end()) {
- return nullptr;
- }
- return p->second;
- }
-
- void add(Collection* coll, SharedBlob *sb) {
- std::lock_guard<std::mutex> l(lock);
- sb_map[sb->get_sbid()] = sb;
- sb->coll = coll;
- }
-
- bool try_remove(SharedBlob *sb) {
- std::lock_guard<std::mutex> l(lock);
- if (sb->nref == 0) {
- assert(sb->get_parent() == this);
- sb_map.erase(sb->get_sbid());
- return true;
- }
- return false;
- }
-
- void remove(SharedBlob *sb) {
- std::lock_guard<std::mutex> l(lock);
- assert(sb->get_parent() == this);
- sb_map.erase(sb->get_sbid());
- }
-
- bool empty() {
- std::lock_guard<std::mutex> l(lock);
- return sb_map.empty();
- }
-
- void dump(CephContext *cct, int lvl);
- };
-
-//#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
-
- /// in-memory blob metadata and associated cached buffers (if any)
- struct Blob {
- MEMPOOL_CLASS_HELPERS();
-
- std::atomic_int nref = {0}; ///< reference count
- int16_t id = -1; ///< id, for spanning blobs only, >= 0
- int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
- SharedBlobRef shared_blob; ///< shared blob state (if any)
-
- private:
- mutable bluestore_blob_t blob; ///< decoded blob metadata
-#ifdef CACHE_BLOB_BL
- mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
-#endif
- /// refs from this shard. ephemeral if id<0, persisted if spanning.
- bluestore_blob_use_tracker_t used_in_blob;
-
- public:
-
- friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
- friend void intrusive_ptr_release(Blob *b) { b->put(); }
-
- friend ostream& operator<<(ostream& out, const Blob &b);
-
- const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
- return used_in_blob;
- }
- bool is_referenced() const {
- return used_in_blob.is_not_empty();
- }
- uint32_t get_referenced_bytes() const {
- return used_in_blob.get_referenced_bytes();
- }
-
- bool is_spanning() const {
- return id >= 0;
- }
-
- bool can_split() const {
- std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
- // splitting a BufferSpace writing list is too hard; don't try.
- return shared_blob->bc.writing.empty() &&
- used_in_blob.can_split() &&
- get_blob().can_split();
- }
-
- bool can_split_at(uint32_t blob_offset) const {
- return used_in_blob.can_split_at(blob_offset) &&
- get_blob().can_split_at(blob_offset);
- }
-
- bool can_reuse_blob(uint32_t min_alloc_size,
- uint32_t target_blob_size,
- uint32_t b_offset,
- uint32_t *length0);
-
- void dup(Blob& o) {
- o.shared_blob = shared_blob;
- o.blob = blob;
-#ifdef CACHE_BLOB_BL
- o.blob_bl = blob_bl;
-#endif
- }
-
- inline const bluestore_blob_t& get_blob() const {
- return blob;
- }
- inline bluestore_blob_t& dirty_blob() {
-#ifdef CACHE_BLOB_BL
- blob_bl.clear();
-#endif
- return blob;
- }
-
- /// discard buffers for unallocated regions
- void discard_unallocated(Collection *coll);
-
- /// get logical references
- void get_ref(Collection *coll, uint32_t offset, uint32_t length);
- /// put logical references, and get back any released extents
- bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
- PExtentVector *r);
-
- /// split the blob
- void split(Collection *coll, uint32_t blob_offset, Blob *o);
-
- void get() {
- ++nref;
- }
- void put() {
- if (--nref == 0)
- delete this;
- }
-
-
-#ifdef CACHE_BLOB_BL
- void _encode() const {
- if (blob_bl.length() == 0 ) {
- ::encode(blob, blob_bl);
- } else {
- assert(blob_bl.length());
- }
- }
- void bound_encode(
- size_t& p,
- bool include_ref_map) const {
- _encode();
- p += blob_bl.length();
- if (include_ref_map) {
- used_in_blob.bound_encode(p);
- }
- }
- void encode(
- bufferlist::contiguous_appender& p,
- bool include_ref_map) const {
- _encode();
- p.append(blob_bl);
- if (include_ref_map) {
- used_in_blob.encode(p);
- }
- }
- void decode(
- Collection */*coll*/,
- bufferptr::iterator& p,
- bool include_ref_map) {
- const char *start = p.get_pos();
- denc(blob, p);
- const char *end = p.get_pos();
- blob_bl.clear();
- blob_bl.append(start, end - start);
- if (include_ref_map) {
- used_in_blob.decode(p);
- }
- }
-#else
- void bound_encode(
- size_t& p,
- uint64_t struct_v,
- uint64_t sbid,
- bool include_ref_map) const {
- denc(blob, p, struct_v);
- if (blob.is_shared()) {
- denc(sbid, p);
- }
- if (include_ref_map) {
- used_in_blob.bound_encode(p);
- }
- }
- void encode(
- bufferlist::contiguous_appender& p,
- uint64_t struct_v,
- uint64_t sbid,
- bool include_ref_map) const {
- denc(blob, p, struct_v);
- if (blob.is_shared()) {
- denc(sbid, p);
- }
- if (include_ref_map) {
- used_in_blob.encode(p);
- }
- }
- void decode(
- Collection *coll,
- bufferptr::iterator& p,
- uint64_t struct_v,
- uint64_t* sbid,
- bool include_ref_map);
-#endif
- };
- typedef boost::intrusive_ptr<Blob> BlobRef;
- typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
-
- /// a logical extent, pointing to (some portion of) a blob
- typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
- struct Extent : public ExtentBase {
- MEMPOOL_CLASS_HELPERS();
-
- uint32_t logical_offset = 0; ///< logical offset
- uint32_t blob_offset = 0; ///< blob offset
- uint32_t length = 0; ///< length
- BlobRef blob; ///< the blob with our data
-
- /// ctor for lookup only
- explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
- /// ctor for delayed initialization (see decode_some())
- explicit Extent() : ExtentBase() {
- }
- /// ctor for general usage
- Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
- : ExtentBase(),
- logical_offset(lo), blob_offset(o), length(l) {
- assign_blob(b);
- }
- ~Extent() {
- if (blob) {
- blob->shared_blob->get_cache()->rm_extent();
- }
- }
-
- void assign_blob(const BlobRef& b) {
- assert(!blob);
- blob = b;
- blob->shared_blob->get_cache()->add_extent();
- }
-
- // comparators for intrusive_set
- friend bool operator<(const Extent &a, const Extent &b) {
- return a.logical_offset < b.logical_offset;
- }
- friend bool operator>(const Extent &a, const Extent &b) {
- return a.logical_offset > b.logical_offset;
- }
- friend bool operator==(const Extent &a, const Extent &b) {
- return a.logical_offset == b.logical_offset;
- }
-
- uint32_t blob_start() const {
- return logical_offset - blob_offset;
- }
-
- uint32_t blob_end() const {
- return blob_start() + blob->get_blob().get_logical_length();
- }
-
- uint32_t logical_end() const {
- return logical_offset + length;
- }
-
- // return true if any piece of the blob is out of
- // the given range [o, o + l].
- bool blob_escapes_range(uint32_t o, uint32_t l) const {
- return blob_start() < o || blob_end() > o + l;
- }
- };
- typedef boost::intrusive::set<Extent> extent_map_t;
-
-
- friend ostream& operator<<(ostream& out, const Extent& e);
-
- struct OldExtent {
- boost::intrusive::list_member_hook<> old_extent_item;
- Extent e;
- PExtentVector r;
- bool blob_empty; // flag to track the last removed extent that makes blob
- // empty - required to update compression stat properly
- OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
- : e(lo, o, l, b), blob_empty(false) {
- }
- static OldExtent* create(CollectionRef c,
- uint32_t lo,
- uint32_t o,
- uint32_t l,
- BlobRef& b);
- };
- typedef boost::intrusive::list<
- OldExtent,
- boost::intrusive::member_hook<
- OldExtent,
- boost::intrusive::list_member_hook<>,
- &OldExtent::old_extent_item> > old_extent_map_t;
-
- struct Onode;
-
- /// a sharded extent map, mapping offsets to lextents to blobs
- struct ExtentMap {
- Onode *onode;
- extent_map_t extent_map; ///< map of Extents to Blobs
- blob_map_t spanning_blob_map; ///< blobs that span shards
-
- struct Shard {
- bluestore_onode_t::shard_info *shard_info = nullptr;
- unsigned extents = 0; ///< count extents in this shard
- bool loaded = false; ///< true if shard is loaded
- bool dirty = false; ///< true if shard is dirty and needs reencoding
- };
- mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
-
- bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
-
- uint32_t needs_reshard_begin = 0;
- uint32_t needs_reshard_end = 0;
-
- bool needs_reshard() const {
- return needs_reshard_end > needs_reshard_begin;
- }
- void clear_needs_reshard() {
- needs_reshard_begin = needs_reshard_end = 0;
- }
- void request_reshard(uint32_t begin, uint32_t end) {
- if (begin < needs_reshard_begin) {
- needs_reshard_begin = begin;
- }
- if (end > needs_reshard_end) {
- needs_reshard_end = end;
- }
- }
-
- struct DeleteDisposer {
- void operator()(Extent *e) { delete e; }
- };
-
- ExtentMap(Onode *o);
- ~ExtentMap() {
- extent_map.clear_and_dispose(DeleteDisposer());
- }
-
- void clear() {
- extent_map.clear_and_dispose(DeleteDisposer());
- shards.clear();
- inline_bl.clear();
- clear_needs_reshard();
- }
-
- bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
- unsigned *pn);
- unsigned decode_some(bufferlist& bl);
-
- void bound_encode_spanning_blobs(size_t& p);
- void encode_spanning_blobs(bufferlist::contiguous_appender& p);
- void decode_spanning_blobs(bufferptr::iterator& p);
-
- BlobRef get_spanning_blob(int id) {
- auto p = spanning_blob_map.find(id);
- assert(p != spanning_blob_map.end());
- return p->second;
- }
-
- void update(KeyValueDB::Transaction t, bool force);
- decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
- void reshard(
- KeyValueDB *db,
- KeyValueDB::Transaction t);
-
- /// initialize Shards from the onode
- void init_shards(bool loaded, bool dirty);
-
- /// return index of shard containing offset
- /// or -1 if not found
- int seek_shard(uint32_t offset) {
- size_t end = shards.size();
- size_t mid, left = 0;
- size_t right = end; // one passed the right end
-
- while (left < right) {
- mid = left + (right - left) / 2;
- if (offset >= shards[mid].shard_info->offset) {
- size_t next = mid + 1;
- if (next >= end || offset < shards[next].shard_info->offset)
- return mid;
- //continue to search forwards
- left = next;
- } else {
- //continue to search backwards
- right = mid;
- }
- }
-
- return -1; // not found
- }
-
- /// check if a range spans a shard
- bool spans_shard(uint32_t offset, uint32_t length) {
- if (shards.empty()) {
- return false;
- }
- int s = seek_shard(offset);
- assert(s >= 0);
- if (s == (int)shards.size() - 1) {
- return false; // last shard
- }
- if (offset + length <= shards[s+1].shard_info->offset) {
- return false;
- }
- return true;
- }
-
- /// ensure that a range of the map is loaded
- void fault_range(KeyValueDB *db,
- uint32_t offset, uint32_t length);
-
- /// ensure a range of the map is marked dirty
- void dirty_range(uint32_t offset, uint32_t length);
-
- /// for seek_lextent test
- extent_map_t::iterator find(uint64_t offset);
-
- /// seek to the first lextent including or after offset
- extent_map_t::iterator seek_lextent(uint64_t offset);
- extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
-
- /// add a new Extent
- void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
- extent_map.insert(*new Extent(lo, o, l, b));
- }
-
- /// remove (and delete) an Extent
- void rm(extent_map_t::iterator p) {
- extent_map.erase_and_dispose(p, DeleteDisposer());
- }
-
- bool has_any_lextents(uint64_t offset, uint64_t length);
-
- /// consolidate adjacent lextents in extent_map
- int compress_extent_map(uint64_t offset, uint64_t length);
-
- /// punch a logical hole. add lextents to deref to target list.
- void punch_hole(CollectionRef &c,
- uint64_t offset, uint64_t length,
- old_extent_map_t *old_extents);
-
- /// put new lextent into lextent_map overwriting existing ones if
- /// any and update references accordingly
- Extent *set_lextent(CollectionRef &c,
- uint64_t logical_offset,
- uint64_t offset, uint64_t length,
- BlobRef b,
- old_extent_map_t *old_extents);
-
- /// split a blob (and referring extents)
- BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
- };
-
- /// Compressed Blob Garbage collector
- /*
- The primary idea of the collector is to estimate a difference between
- allocation units(AU) currently present for compressed blobs and new AUs
- required to store that data uncompressed.
- Estimation is performed for protrusive extents within a logical range
- determined by a concatenation of old_extents collection and specific(current)
- write request.
- The root cause for old_extents use is the need to handle blob ref counts
- properly. Old extents still hold blob refs and hence we need to traverse
- the collection to determine if blob to be released.
- Protrusive extents are extents that fit into the blob set in action
- (ones that are below the logical range from above) but not removed totally
- due to the current write.
- E.g. for
- extent1 <loffs = 100, boffs = 100, len = 100> ->
- blob1<compressed, len_on_disk=4096, logical_len=8192>
- extent2 <loffs = 200, boffs = 200, len = 100> ->
- blob2<raw, len_on_disk=4096, llen=4096>
- extent3 <loffs = 300, boffs = 300, len = 100> ->
- blob1<compressed, len_on_disk=4096, llen=8192>
- extent4 <loffs = 4096, boffs = 0, len = 100> ->
- blob3<raw, len_on_disk=4096, llen=4096>
- write(300~100)
- protrusive extents are within the following ranges <0~300, 400~8192-400>
- In this case existing AUs that might be removed due to GC (i.e. blob1)
- use 2x4K bytes.
- And new AUs expected after GC = 0 since extent1 to be merged into blob2.
- Hence we should do a collect.
- */
- class GarbageCollector
- {
- public:
- /// return amount of allocation units that might be saved due to GC
- int64_t estimate(
- uint64_t offset,
- uint64_t length,
- const ExtentMap& extent_map,
- const old_extent_map_t& old_extents,
- uint64_t min_alloc_size);
-
- /// return a collection of extents to perform GC on
- const vector<AllocExtent>& get_extents_to_collect() const {
- return extents_to_collect;
- }
- GarbageCollector(CephContext* _cct) : cct(_cct) {}
-
- private:
- struct BlobInfo {
- uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
- int64_t expected_allocations = 0; ///< new alloc units required
- ///< in case of gc fulfilled
- bool collect_candidate = false; ///< indicate if blob has any extents
- ///< eligible for GC.
- extent_map_t::const_iterator first_lextent; ///< points to the first
- ///< lextent referring to
- ///< the blob if any.
- ///< collect_candidate flag
- ///< determines the validity
- extent_map_t::const_iterator last_lextent; ///< points to the last
- ///< lextent referring to
- ///< the blob if any.
-
- BlobInfo(uint64_t ref_bytes) :
- referenced_bytes(ref_bytes) {
- }
- };
- CephContext* cct;
- map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
- ///< copies that are affected by the
- ///< specific write
-
- vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
- ///< be collected if GC takes place
-
- boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
- ///< unit when traversing
- ///< protrusive extents.
- ///< Other extents mapped to
- ///< this AU to be ignored
- ///< (except the case where
- ///< uncompressed extent follows
- ///< compressed one - see below).
- BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
- ///< caused expected_allocations
- ///< counter increment at this blob.
- ///< if uncompressed extent follows
- ///< a decrement for the
- ///< expected_allocations counter
- ///< is needed
- int64_t expected_allocations = 0; ///< new alloc units required in case
- ///< of gc fulfilled
- int64_t expected_for_release = 0; ///< alloc units currently used by
- ///< compressed blobs that might
- ///< gone after GC
- uint64_t gc_start_offset; ///starting offset for GC
- uint64_t gc_end_offset; ///ending offset for GC
-
- protected:
- void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
- uint64_t start_offset,
- uint64_t end_offset,
- uint64_t start_touch_offset,
- uint64_t end_touch_offset,
- uint64_t min_alloc_size);
- };
-
- struct OnodeSpace;
-
- /// an in-memory object
- struct Onode {
- MEMPOOL_CLASS_HELPERS();
-
- std::atomic_int nref; ///< reference count
- Collection *c;
-
- ghobject_t oid;
-
- /// key under PREFIX_OBJ where we are stored
- mempool::bluestore_cache_other::string key;
-
- boost::intrusive::list_member_hook<> lru_item;
-
- bluestore_onode_t onode; ///< metadata stored as value in kv store
- bool exists; ///< true if object logically exists
-
- ExtentMap extent_map;
-
- // track txc's that have not been committed to kv store (and whose
- // effects cannot be read via the kvdb read methods)
- std::atomic<int> flushing_count = {0};
- std::mutex flush_lock; ///< protect flush_txns
- std::condition_variable flush_cond; ///< wait here for uncommitted txns
-
- Onode(Collection *c, const ghobject_t& o,
- const mempool::bluestore_cache_other::string& k)
- : nref(0),
- c(c),
- oid(o),
- key(k),
- exists(false),
- extent_map(this) {
- }
-
- void flush();
- void get() {
- ++nref;
- }
- void put() {
- if (--nref == 0)
- delete this;
- }
- };
- typedef boost::intrusive_ptr<Onode> OnodeRef;
-
-
- /// a cache (shard) of onodes and buffers
- struct Cache {
- CephContext* cct;
- PerfCounters *logger;
- std::recursive_mutex lock; ///< protect lru and other structures
-
- std::atomic<uint64_t> num_extents = {0};
- std::atomic<uint64_t> num_blobs = {0};
-
- static Cache *create(CephContext* cct, string type, PerfCounters *logger);
-
- Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
- virtual ~Cache() {}
-
- virtual void _add_onode(OnodeRef& o, int level) = 0;
- virtual void _rm_onode(OnodeRef& o) = 0;
- virtual void _touch_onode(OnodeRef& o) = 0;
-
- virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
- virtual void _rm_buffer(Buffer *b) = 0;
- virtual void _move_buffer(Cache *src, Buffer *b) = 0;
- virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
- virtual void _touch_buffer(Buffer *b) = 0;
-
- virtual uint64_t _get_num_onodes() = 0;
- virtual uint64_t _get_buffer_bytes() = 0;
-
- void add_extent() {
- ++num_extents;
- }
- void rm_extent() {
- --num_extents;
- }
-
- void add_blob() {
- ++num_blobs;
- }
- void rm_blob() {
- --num_blobs;
- }
-
- void trim(uint64_t target_bytes,
- float target_meta_ratio,
- float target_data_ratio,
- float bytes_per_onode);
-
- void trim_all();
-
- virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
-
- virtual void add_stats(uint64_t *onodes, uint64_t *extents,
- uint64_t *blobs,
- uint64_t *buffers,
- uint64_t *bytes) = 0;
-
- bool empty() {
- std::lock_guard<std::recursive_mutex> l(lock);
- return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
- }
-
-#ifdef DEBUG_CACHE
- virtual void _audit(const char *s) = 0;
-#else
- void _audit(const char *s) { /* no-op */ }
-#endif
- };
-
- /// simple LRU cache for onodes and buffers
- struct LRUCache : public Cache {
- private:
- typedef boost::intrusive::list<
- Onode,
- boost::intrusive::member_hook<
- Onode,
- boost::intrusive::list_member_hook<>,
- &Onode::lru_item> > onode_lru_list_t;
- typedef boost::intrusive::list<
- Buffer,
- boost::intrusive::member_hook<
- Buffer,
- boost::intrusive::list_member_hook<>,
- &Buffer::lru_item> > buffer_lru_list_t;
-
- onode_lru_list_t onode_lru;
-
- buffer_lru_list_t buffer_lru;
- uint64_t buffer_size = 0;
-
- public:
- LRUCache(CephContext* cct) : Cache(cct) {}
- uint64_t _get_num_onodes() override {
- return onode_lru.size();
- }
- void _add_onode(OnodeRef& o, int level) override {
- if (level > 0)
- onode_lru.push_front(*o);
- else
- onode_lru.push_back(*o);
- }
- void _rm_onode(OnodeRef& o) override {
- auto q = onode_lru.iterator_to(*o);
- onode_lru.erase(q);
- }
- void _touch_onode(OnodeRef& o) override;
-
- uint64_t _get_buffer_bytes() override {
- return buffer_size;
- }
- void _add_buffer(Buffer *b, int level, Buffer *near) override {
- if (near) {
- auto q = buffer_lru.iterator_to(*near);
- buffer_lru.insert(q, *b);
- } else if (level > 0) {
- buffer_lru.push_front(*b);
- } else {
- buffer_lru.push_back(*b);
- }
- buffer_size += b->length;
- }
- void _rm_buffer(Buffer *b) override {
- assert(buffer_size >= b->length);
- buffer_size -= b->length;
- auto q = buffer_lru.iterator_to(*b);
- buffer_lru.erase(q);
- }
- void _move_buffer(Cache *src, Buffer *b) override {
- src->_rm_buffer(b);
- _add_buffer(b, 0, nullptr);
- }
- void _adjust_buffer_size(Buffer *b, int64_t delta) override {
- assert((int64_t)buffer_size + delta >= 0);
- buffer_size += delta;
- }
- void _touch_buffer(Buffer *b) override {
- auto p = buffer_lru.iterator_to(*b);
- buffer_lru.erase(p);
- buffer_lru.push_front(*b);
- _audit("_touch_buffer end");
- }
-
- void _trim(uint64_t onode_max, uint64_t buffer_max) override;
-
- void add_stats(uint64_t *onodes, uint64_t *extents,
- uint64_t *blobs,
- uint64_t *buffers,
- uint64_t *bytes) override {
- std::lock_guard<std::recursive_mutex> l(lock);
- *onodes += onode_lru.size();
- *extents += num_extents;
- *blobs += num_blobs;
- *buffers += buffer_lru.size();
- *bytes += buffer_size;
- }
-
-#ifdef DEBUG_CACHE
- void _audit(const char *s) override;
-#endif
- };
-
- // 2Q cache for buffers, LRU for onodes
- struct TwoQCache : public Cache {
- private:
- // stick with LRU for onodes for now (fixme?)
- typedef boost::intrusive::list<
- Onode,
- boost::intrusive::member_hook<
- Onode,
- boost::intrusive::list_member_hook<>,
- &Onode::lru_item> > onode_lru_list_t;
- typedef boost::intrusive::list<
- Buffer,
- boost::intrusive::member_hook<
- Buffer,
- boost::intrusive::list_member_hook<>,
- &Buffer::lru_item> > buffer_list_t;
-
- onode_lru_list_t onode_lru;
-
- buffer_list_t buffer_hot; ///< "Am" hot buffers
- buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
- buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
- uint64_t buffer_bytes = 0; ///< bytes
-
- enum {
- BUFFER_NEW = 0,
- BUFFER_WARM_IN, ///< in buffer_warm_in
- BUFFER_WARM_OUT, ///< in buffer_warm_out
- BUFFER_HOT, ///< in buffer_hot
- BUFFER_TYPE_MAX
- };
-
- uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
-
- public:
- TwoQCache(CephContext* cct) : Cache(cct) {}
- uint64_t _get_num_onodes() override {
- return onode_lru.size();
- }
- void _add_onode(OnodeRef& o, int level) override {
- if (level > 0)
- onode_lru.push_front(*o);
- else
- onode_lru.push_back(*o);
- }
- void _rm_onode(OnodeRef& o) override {
- auto q = onode_lru.iterator_to(*o);
- onode_lru.erase(q);
- }
- void _touch_onode(OnodeRef& o) override;
-
- uint64_t _get_buffer_bytes() override {
- return buffer_bytes;
- }
- void _add_buffer(Buffer *b, int level, Buffer *near) override;
- void _rm_buffer(Buffer *b) override;
- void _move_buffer(Cache *src, Buffer *b) override;
- void _adjust_buffer_size(Buffer *b, int64_t delta) override;
- void _touch_buffer(Buffer *b) override {
- switch (b->cache_private) {
- case BUFFER_WARM_IN:
- // do nothing (somewhat counter-intuitively!)
- break;
- case BUFFER_WARM_OUT:
- // move from warm_out to hot LRU
- assert(0 == "this happens via discard hint");
- break;
- case BUFFER_HOT:
- // move to front of hot LRU
- buffer_hot.erase(buffer_hot.iterator_to(*b));
- buffer_hot.push_front(*b);
- break;
- }
- _audit("_touch_buffer end");
- }
-
- void _trim(uint64_t onode_max, uint64_t buffer_max) override;
-
- void add_stats(uint64_t *onodes, uint64_t *extents,
- uint64_t *blobs,
- uint64_t *buffers,
- uint64_t *bytes) override {
- std::lock_guard<std::recursive_mutex> l(lock);
- *onodes += onode_lru.size();
- *extents += num_extents;
- *blobs += num_blobs;
- *buffers += buffer_hot.size() + buffer_warm_in.size();
- *bytes += buffer_bytes;
- }
-
-#ifdef DEBUG_CACHE
- void _audit(const char *s) override;
-#endif
- };
-
- struct OnodeSpace {
- private:
- Cache *cache;
-
- /// forward lookups
- mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
-
- friend class Collection; // for split_cache()
-
- public:
- OnodeSpace(Cache *c) : cache(c) {}
- ~OnodeSpace() {
- clear();
- }
-
- OnodeRef add(const ghobject_t& oid, OnodeRef o);
- OnodeRef lookup(const ghobject_t& o);
- void remove(const ghobject_t& oid) {
- onode_map.erase(oid);
- }
- void rename(OnodeRef& o, const ghobject_t& old_oid,
- const ghobject_t& new_oid,
- const mempool::bluestore_cache_other::string& new_okey);
- void clear();
- bool empty();
-
- void dump(CephContext *cct, int lvl);
-
- /// return true if f true for any item
- bool map_any(std::function<bool(OnodeRef)> f);
- };
-
- struct Collection : public CollectionImpl {
- BlueStore *store;
- Cache *cache; ///< our cache shard
- coll_t cid;
- bluestore_cnode_t cnode;
- RWLock lock;
-
- bool exists;
-
- SharedBlobSet shared_blob_set; ///< open SharedBlobs
-
- // cache onodes on a per-collection basis to avoid lock
- // contention.
- OnodeSpace onode_map;
-
- //pool options
- pool_opts_t pool_opts;
-
- OnodeRef get_onode(const ghobject_t& oid, bool create);
-
- // the terminology is confusing here, sorry!
- //
- // blob_t shared_blob_t
- // !shared unused -> open
- // shared !loaded -> open + shared
- // shared loaded -> open + shared + loaded
- //
- // i.e.,
- // open = SharedBlob is instantiated
- // shared = blob_t shared flag is set; SharedBlob is hashed.
- // loaded = SharedBlob::shared_blob_t is loaded from kv store
- void open_shared_blob(uint64_t sbid, BlobRef b);
- void load_shared_blob(SharedBlobRef sb);
- void make_blob_shared(uint64_t sbid, BlobRef b);
- uint64_t make_blob_unshared(SharedBlob *sb);
-
- BlobRef new_blob() {
- BlobRef b = new Blob();
- b->shared_blob = new SharedBlob(this);
- return b;
- }
-
- const coll_t &get_cid() override {
- return cid;
- }
-
- bool contains(const ghobject_t& oid) {
- if (cid.is_meta())
- return oid.hobj.pool == -1;
- spg_t spgid;
- if (cid.is_pg(&spgid))
- return
- spgid.pgid.contains(cnode.bits, oid) &&
- oid.shard_id == spgid.shard;
- return false;
- }
-
- void split_cache(Collection *dest);
-
- Collection(BlueStore *ns, Cache *ca, coll_t c);
- };
-
- class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
- CollectionRef c;
- OnodeRef o;
- KeyValueDB::Iterator it;
- string head, tail;
- public:
- OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
- int seek_to_first() override;
- int upper_bound(const string &after) override;
- int lower_bound(const string &to) override;
- bool valid() override;
- int next(bool validate=true) override;
- string key() override;
- bufferlist value() override;
- int status() override {
- return 0;
- }
- };
-
- class OpSequencer;
- typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
-
- struct volatile_statfs{
- enum {
- STATFS_ALLOCATED = 0,
- STATFS_STORED,
- STATFS_COMPRESSED_ORIGINAL,
- STATFS_COMPRESSED,
- STATFS_COMPRESSED_ALLOCATED,
- STATFS_LAST
- };
- int64_t values[STATFS_LAST];
- volatile_statfs() {
- memset(this, 0, sizeof(volatile_statfs));
- }
- void reset() {
- *this = volatile_statfs();
- }
- volatile_statfs& operator+=(const volatile_statfs& other) {
- for (size_t i = 0; i < STATFS_LAST; ++i) {
- values[i] += other.values[i];
- }
- return *this;
- }
- int64_t& allocated() {
- return values[STATFS_ALLOCATED];
- }
- int64_t& stored() {
- return values[STATFS_STORED];
- }
- int64_t& compressed_original() {
- return values[STATFS_COMPRESSED_ORIGINAL];
- }
- int64_t& compressed() {
- return values[STATFS_COMPRESSED];
- }
- int64_t& compressed_allocated() {
- return values[STATFS_COMPRESSED_ALLOCATED];
- }
- bool is_empty() {
- return values[STATFS_ALLOCATED] == 0 &&
- values[STATFS_STORED] == 0 &&
- values[STATFS_COMPRESSED] == 0 &&
- values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
- values[STATFS_COMPRESSED_ALLOCATED] == 0;
- }
- void decode(bufferlist::iterator& it) {
- for (size_t i = 0; i < STATFS_LAST; i++) {
- ::decode(values[i], it);
- }
- }
-
- void encode(bufferlist& bl) {
- for (size_t i = 0; i < STATFS_LAST; i++) {
- ::encode(values[i], bl);
- }
- }
- };
-
- struct TransContext : public AioContext {
- MEMPOOL_CLASS_HELPERS();
-
- typedef enum {
- STATE_PREPARE,
- STATE_AIO_WAIT,
- STATE_IO_DONE,
- STATE_KV_QUEUED, // queued for kv_sync_thread submission
- STATE_KV_SUBMITTED, // submitted to kv; not yet synced
- STATE_KV_DONE,
- STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
- STATE_DEFERRED_CLEANUP, // remove deferred kv record
- STATE_DEFERRED_DONE,
- STATE_FINISHING,
- STATE_DONE,
- } state_t;
-
- state_t state = STATE_PREPARE;
-
- const char *get_state_name() {
- switch (state) {
- case STATE_PREPARE: return "prepare";
- case STATE_AIO_WAIT: return "aio_wait";
- case STATE_IO_DONE: return "io_done";
- case STATE_KV_QUEUED: return "kv_queued";
- case STATE_KV_SUBMITTED: return "kv_submitted";
- case STATE_KV_DONE: return "kv_done";
- case STATE_DEFERRED_QUEUED: return "deferred_queued";
- case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
- case STATE_DEFERRED_DONE: return "deferred_done";
- case STATE_FINISHING: return "finishing";
- case STATE_DONE: return "done";
- }
- return "???";
- }
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- const char *get_state_latency_name(int state) {
- switch (state) {
- case l_bluestore_state_prepare_lat: return "prepare";
- case l_bluestore_state_aio_wait_lat: return "aio_wait";
- case l_bluestore_state_io_done_lat: return "io_done";
- case l_bluestore_state_kv_queued_lat: return "kv_queued";
- case l_bluestore_state_kv_committing_lat: return "kv_committing";
- case l_bluestore_state_kv_done_lat: return "kv_done";
- case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
- case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
- case l_bluestore_state_finishing_lat: return "finishing";
- case l_bluestore_state_done_lat: return "done";
- }
- return "???";
- }
-#endif
-
- void log_state_latency(PerfCounters *logger, int state) {
- utime_t lat, now = ceph_clock_now();
- lat = now - last_stamp;
- logger->tinc(state, lat);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
- double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
- OID_ELAPSED("", usecs, get_state_latency_name(state));
- }
-#endif
- last_stamp = now;
- }
-
- OpSequencerRef osr;
- boost::intrusive::list_member_hook<> sequencer_item;
-
- uint64_t bytes = 0, cost = 0;
-
- set<OnodeRef> onodes; ///< these need to be updated/written
- set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
- set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
- set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
-
- KeyValueDB::Transaction t; ///< then we will commit this
- Context *oncommit = nullptr; ///< signal on commit
- Context *onreadable = nullptr; ///< signal on readable
- Context *onreadable_sync = nullptr; ///< signal on readable
- list<Context*> oncommits; ///< more commit completions
- list<CollectionRef> removed_collections; ///< colls we removed
-
- boost::intrusive::list_member_hook<> deferred_queue_item;
- bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
-
- interval_set<uint64_t> allocated, released;
- volatile_statfs statfs_delta;
-
- IOContext ioc;
- bool had_ios = false; ///< true if we submitted IOs before our kv txn
-
- uint64_t seq = 0;
- utime_t start;
- utime_t last_stamp;
-
- uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
- uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
-
- explicit TransContext(CephContext* cct, OpSequencer *o)
- : osr(o),
- ioc(cct, this),
- start(ceph_clock_now()) {
- last_stamp = start;
- }
- ~TransContext() {
- delete deferred_txn;
- }
-
- void write_onode(OnodeRef &o) {
- onodes.insert(o);
- }
- void write_shared_blob(SharedBlobRef &sb) {
- shared_blobs.insert(sb);
- }
- void unshare_blob(SharedBlob *sb) {
- shared_blobs.erase(sb);
- }
-
- /// note we logically modified object (when onode itself is unmodified)
- void note_modified_object(OnodeRef &o) {
- // onode itself isn't written, though
- modified_objects.insert(o);
- }
- void removed(OnodeRef& o) {
- onodes.erase(o);
- modified_objects.erase(o);
- }
-
- void aio_finish(BlueStore *store) override {
- store->txc_aio_finish(this);
- }
- };
-
- typedef boost::intrusive::list<
- TransContext,
- boost::intrusive::member_hook<
- TransContext,
- boost::intrusive::list_member_hook<>,
- &TransContext::deferred_queue_item> > deferred_queue_t;
-
- struct DeferredBatch : public AioContext {
- OpSequencer *osr;
- struct deferred_io {
- bufferlist bl; ///< data
- uint64_t seq; ///< deferred transaction seq
- };
- map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
- deferred_queue_t txcs; ///< txcs in this batch
- IOContext ioc; ///< our aios
- /// bytes of pending io for each deferred seq (may be 0)
- map<uint64_t,int> seq_bytes;
-
- void _discard(CephContext *cct, uint64_t offset, uint64_t length);
- void _audit(CephContext *cct);
-
- DeferredBatch(CephContext *cct, OpSequencer *osr)
- : osr(osr), ioc(cct, this) {}
-
- /// prepare a write
- void prepare_write(CephContext *cct,
- uint64_t seq, uint64_t offset, uint64_t length,
- bufferlist::const_iterator& p);
-
- void aio_finish(BlueStore *store) override {
- store->_deferred_aio_finish(osr);
- }
- };
-
- class OpSequencer : public Sequencer_impl {
- public:
- std::mutex qlock;
- std::condition_variable qcond;
- typedef boost::intrusive::list<
- TransContext,
- boost::intrusive::member_hook<
- TransContext,
- boost::intrusive::list_member_hook<>,
- &TransContext::sequencer_item> > q_list_t;
- q_list_t q; ///< transactions
-
- boost::intrusive::list_member_hook<> deferred_osr_queue_item;
-
- DeferredBatch *deferred_running = nullptr;
- DeferredBatch *deferred_pending = nullptr;
-
- Sequencer *parent;
- BlueStore *store;
-
- uint64_t last_seq = 0;
-
- std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
-
- std::atomic_int kv_committing_serially = {0};
-
- std::atomic_int kv_submitted_waiters = {0};
-
- std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
- std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
-
- OpSequencer(CephContext* cct, BlueStore *store)
- : Sequencer_impl(cct),
- parent(NULL), store(store) {
- store->register_osr(this);
- }
- ~OpSequencer() override {
- assert(q.empty());
- _unregister();
- }
-
- void discard() override {
- // Note that we may have txc's in flight when the parent Sequencer
- // goes away. Reflect this with zombie==registered==true and let
- // _osr_drain_all clean up later.
- assert(!zombie);
- zombie = true;
- parent = nullptr;
- bool empty;
- {
- std::lock_guard<std::mutex> l(qlock);
- empty = q.empty();
- }
- if (empty) {
- _unregister();
- }
- }
-
- void _unregister() {
- if (registered) {
- store->unregister_osr(this);
- registered = false;
- }
- }
-
- void queue_new(TransContext *txc) {
- std::lock_guard<std::mutex> l(qlock);
- txc->seq = ++last_seq;
- q.push_back(*txc);
- }
-
- void drain() {
- std::unique_lock<std::mutex> l(qlock);
- while (!q.empty())
- qcond.wait(l);
- }
-
- void drain_preceding(TransContext *txc) {
- std::unique_lock<std::mutex> l(qlock);
- while (!q.empty() && &q.front() != txc)
- qcond.wait(l);
- }
-
- bool _is_all_kv_submitted() {
- // caller must hold qlock
- if (q.empty()) {
- return true;
- }
- TransContext *txc = &q.back();
- if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
- return true;
- }
- return false;
- }
-
- void flush() override {
- std::unique_lock<std::mutex> l(qlock);
- while (true) {
- // set flag before the check because the condition
- // may become true outside qlock, and we need to make
- // sure those threads see waiters and signal qcond.
- ++kv_submitted_waiters;
- if (_is_all_kv_submitted()) {
- return;
- }
- qcond.wait(l);
- --kv_submitted_waiters;
- }
- }
-
- bool flush_commit(Context *c) override {
- std::lock_guard<std::mutex> l(qlock);
- if (q.empty()) {
- return true;
- }
- TransContext *txc = &q.back();
- if (txc->state >= TransContext::STATE_KV_DONE) {
- return true;
- }
- txc->oncommits.push_back(c);
- return false;
- }
- };
-
- typedef boost::intrusive::list<
- OpSequencer,
- boost::intrusive::member_hook<
- OpSequencer,
- boost::intrusive::list_member_hook<>,
- &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
-
- struct KVSyncThread : public Thread {
- BlueStore *store;
- explicit KVSyncThread(BlueStore *s) : store(s) {}
- void *entry() override {
- store->_kv_sync_thread();
- return NULL;
- }
- };
- struct KVFinalizeThread : public Thread {
- BlueStore *store;
- explicit KVFinalizeThread(BlueStore *s) : store(s) {}
- void *entry() {
- store->_kv_finalize_thread();
- return NULL;
- }
- };
-
- struct DBHistogram {
- struct value_dist {
- uint64_t count;
- uint32_t max_len;
- };
-
- struct key_dist {
- uint64_t count;
- uint32_t max_len;
- map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
- };
-
- map<string, map<int, struct key_dist> > key_hist;
- map<int, uint64_t> value_hist;
- int get_key_slab(size_t sz);
- string get_key_slab_to_range(int slab);
- int get_value_slab(size_t sz);
- string get_value_slab_to_range(int slab);
- void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
- const string &prefix, size_t key_size, size_t value_size);
- void dump(Formatter *f);
- };
-
- // --------------------------------------------------------
- // members
-private:
- BlueFS *bluefs = nullptr;
- unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
- bool bluefs_single_shared_device = true;
- utime_t bluefs_last_balance;
-
- KeyValueDB *db = nullptr;
- BlockDevice *bdev = nullptr;
- std::string freelist_type;
- FreelistManager *fm = nullptr;
- Allocator *alloc = nullptr;
- uuid_d fsid;
- int path_fd = -1; ///< open handle to $path
- int fsid_fd = -1; ///< open handle (locked) to $path/fsid
- bool mounted = false;
-
- RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
- mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
-
- vector<Cache*> cache_shards;
-
- std::mutex osr_lock; ///< protect osd_set
- std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
-
- std::atomic<uint64_t> nid_last = {0};
- std::atomic<uint64_t> nid_max = {0};
- std::atomic<uint64_t> blobid_last = {0};
- std::atomic<uint64_t> blobid_max = {0};
-
- Throttle throttle_bytes; ///< submit to commit
- Throttle throttle_deferred_bytes; ///< submit to deferred complete
-
- interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
- interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
-
- std::mutex deferred_lock;
- std::atomic<uint64_t> deferred_seq = {0};
- deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
- int deferred_queue_size = 0; ///< num txc's queued across all osrs
- atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
- Finisher deferred_finisher;
-
- int m_finisher_num = 1;
- vector<Finisher*> finishers;
-
- KVSyncThread kv_sync_thread;
- std::mutex kv_lock;
- std::condition_variable kv_cond;
- bool _kv_only = false;
- bool kv_sync_started = false;
- bool kv_stop = false;
- bool kv_finalize_started = false;
- bool kv_finalize_stop = false;
- deque<TransContext*> kv_queue; ///< ready, already submitted
- deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
- deque<TransContext*> kv_committing; ///< currently syncing
- deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
- deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
-
- KVFinalizeThread kv_finalize_thread;
- std::mutex kv_finalize_lock;
- std::condition_variable kv_finalize_cond;
- deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
- deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
-
- PerfCounters *logger = nullptr;
-
- std::mutex reap_lock;
- list<CollectionRef> removed_collections;
-
- RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
- set<ghobject_t> debug_data_error_objects;
- set<ghobject_t> debug_mdata_error_objects;
-
- std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
-
- uint64_t block_size = 0; ///< block size of block device (power of 2)
- uint64_t block_mask = 0; ///< mask to get just the block offset
- size_t block_size_order = 0; ///< bits to shift to get block size
-
- uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
- ///< bits for min_alloc_size
- uint8_t min_alloc_size_order = 0;
- static_assert(std::numeric_limits<uint8_t>::max() >
- std::numeric_limits<decltype(min_alloc_size)>::digits,
- "not enough bits for min_alloc_size");
-
- ///< maximum allocation unit (power of 2)
- std::atomic<uint64_t> max_alloc_size = {0};
-
- ///< number threshold for forced deferred writes
- std::atomic<int> deferred_batch_ops = {0};
-
- ///< size threshold for forced deferred writes
- std::atomic<uint64_t> prefer_deferred_size = {0};
-
- ///< approx cost per io, in bytes
- std::atomic<uint64_t> throttle_cost_per_io = {0};
-
- std::atomic<Compressor::CompressionMode> comp_mode =
- {Compressor::COMP_NONE}; ///< compression mode
- CompressorRef compressor;
- std::atomic<uint64_t> comp_min_blob_size = {0};
- std::atomic<uint64_t> comp_max_blob_size = {0};
-
- std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
-
- uint64_t kv_ios = 0;
- uint64_t kv_throttle_costs = 0;
-
- // cache trim control
- uint64_t cache_size = 0; ///< total cache size
- float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
- float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
- float cache_data_ratio = 0; ///< cache ratio dedicated to object data
-
- std::mutex vstatfs_lock;
- volatile_statfs vstatfs;
-
- struct MempoolThread : public Thread {
- BlueStore *store;
- Cond cond;
- Mutex lock;
- bool stop = false;
- public:
- explicit MempoolThread(BlueStore *s)
- : store(s),
- lock("BlueStore::MempoolThread::lock") {}
- void *entry() override;
- void init() {
- assert(stop == false);
- create("bstore_mempool");
- }
- void shutdown() {
- lock.Lock();
- stop = true;
- cond.Signal();
- lock.Unlock();
- join();
- }
- } mempool_thread;
-
- // --------------------------------------------------------
- // private methods
-
- void _init_logger();
- void _shutdown_logger();
- int _reload_logger();
-
- int _open_path();
- void _close_path();
- int _open_fsid(bool create);
- int _lock_fsid();
- int _read_fsid(uuid_d *f);
- int _write_fsid();
- void _close_fsid();
- void _set_alloc_sizes();
- void _set_blob_size();
-
- int _open_bdev(bool create);
- void _close_bdev();
- int _open_db(bool create);
- void _close_db();
- int _open_fm(bool create);
- void _close_fm();
- int _open_alloc();
- void _close_alloc();
- int _open_collections(int *errors=0);
- void _close_collections();
-
- int _setup_block_symlink_or_file(string name, string path, uint64_t size,
- bool create);
-
-public:
- static int _write_bdev_label(CephContext* cct,
- string path, bluestore_bdev_label_t label);
- static int _read_bdev_label(CephContext* cct, string path,
- bluestore_bdev_label_t *label);
-private:
- int _check_or_set_bdev_label(string path, uint64_t size, string desc,
- bool create);
-
- int _open_super_meta();
-
- void _open_statfs();
-
- int _reconcile_bluefs_freespace();
- int _balance_bluefs_freespace(PExtentVector *extents);
- void _commit_bluefs_freespace(const PExtentVector& extents);
-
- CollectionRef _get_collection(const coll_t& cid);
- void _queue_reap_collection(CollectionRef& c);
- void _reap_collections();
- void _update_cache_logger();
-
- void _assign_nid(TransContext *txc, OnodeRef o);
- uint64_t _assign_blobid(TransContext *txc);
-
- void _dump_onode(OnodeRef o, int log_level=30);
- void _dump_extent_map(ExtentMap& em, int log_level=30);
- void _dump_transaction(Transaction *t, int log_level = 30);
-
- TransContext *_txc_create(OpSequencer *osr);
- void _txc_update_store_statfs(TransContext *txc);
- void _txc_add_transaction(TransContext *txc, Transaction *t);
- void _txc_calc_cost(TransContext *txc);
- void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
- void _txc_state_proc(TransContext *txc);
- void _txc_aio_submit(TransContext *txc);
-public:
- void txc_aio_finish(void *p) {
- _txc_state_proc(static_cast<TransContext*>(p));
- }
-private:
- void _txc_finish_io(TransContext *txc);
- void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
- void _txc_applied_kv(TransContext *txc);
- void _txc_committed_kv(TransContext *txc);
- void _txc_finish(TransContext *txc);
- void _txc_release_alloc(TransContext *txc);
-
- void _osr_drain_preceding(TransContext *txc);
- void _osr_drain_all();
- void _osr_unregister_all();
-
- void _kv_start();
- void _kv_stop();
- void _kv_sync_thread();
- void _kv_finalize_thread();
-
- bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
- void _deferred_queue(TransContext *txc);
-public:
- void deferred_try_submit();
-private:
- void _deferred_submit_unlock(OpSequencer *osr);
- void _deferred_aio_finish(OpSequencer *osr);
- int _deferred_replay();
-
-public:
- using mempool_dynamic_bitset =
- boost::dynamic_bitset<uint64_t,
- mempool::bluestore_fsck::pool_allocator<uint64_t>>;
-
-private:
- int _fsck_check_extents(
- const ghobject_t& oid,
- const PExtentVector& extents,
- bool compressed,
- mempool_dynamic_bitset &used_blocks,
- store_statfs_t& expected_statfs);
-
- void _buffer_cache_write(
- TransContext *txc,
- BlobRef b,
- uint64_t offset,
- bufferlist& bl,
- unsigned flags) {
- b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
- flags);
- txc->shared_blobs_written.insert(b->shared_blob);
- }
-
- int _collection_list(
- Collection *c, const ghobject_t& start, const ghobject_t& end,
- int max, vector<ghobject_t> *ls, ghobject_t *next);
-
- template <typename T, typename F>
- T select_option(const std::string& opt_name, T val1, F f) {
- //NB: opt_name reserved for future use
- boost::optional<T> val2 = f();
- if (val2) {
- return *val2;
- }
- return val1;
- }
-
- void _apply_padding(uint64_t head_pad,
- uint64_t tail_pad,
- bufferlist& padded);
-
- // -- ondisk version ---
-public:
- const int32_t latest_ondisk_format = 2; ///< our version
- const int32_t min_readable_ondisk_format = 1; ///< what we can read
- const int32_t min_compat_ondisk_format = 2; ///< who can read us
-
-private:
- int32_t ondisk_format = 0; ///< value detected on mount
-
- int _upgrade_super(); ///< upgrade (called during open_super)
- void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
-
- // --- public interface ---
-public:
- BlueStore(CephContext *cct, const string& path);
- BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
- ~BlueStore() override;
-
- string get_type() override {
- return "bluestore";
- }
-
- bool needs_journal() override { return false; };
- bool wants_journal() override { return false; };
- bool allows_journal() override { return false; };
-
- bool is_rotational() override;
- bool is_journal_rotational() override;
-
- string get_default_device_class() override {
- string device_class;
- map<string, string> metadata;
- collect_metadata(&metadata);
- auto it = metadata.find("bluestore_bdev_type");
- if (it != metadata.end()) {
- device_class = it->second;
- }
- return device_class;
- }
-
- static int get_block_device_fsid(CephContext* cct, const string& path,
- uuid_d *fsid);
-
- bool test_mount_in_use() override;
-
-private:
- int _mount(bool kv_only);
-public:
- int mount() override {
- return _mount(false);
- }
- int umount() override;
-
- int start_kv_only(KeyValueDB **pdb) {
- int r = _mount(true);
- if (r < 0)
- return r;
- *pdb = db;
- return 0;
- }
-
- int write_meta(const std::string& key, const std::string& value) override;
- int read_meta(const std::string& key, std::string *value) override;
-
-
- int fsck(bool deep) override {
- return _fsck(deep, false);
- }
- int repair(bool deep) override {
- return _fsck(deep, true);
- }
- int _fsck(bool deep, bool repair);
-
- void set_cache_shards(unsigned num) override;
-
- int validate_hobject_key(const hobject_t &obj) const override {
- return 0;
- }
- unsigned get_max_attr_name_length() override {
- return 256; // arbitrary; there is no real limit internally
- }
-
- int mkfs() override;
- int mkjournal() override {
- return 0;
- }
-
- void get_db_statistics(Formatter *f) override;
- void generate_db_histogram(Formatter *f) override;
- void _flush_cache();
- void flush_cache() override;
- void dump_perf_counters(Formatter *f) override {
- f->open_object_section("perf_counters");
- logger->dump_formatted(f, false);
- f->close_section();
- }
-
- void register_osr(OpSequencer *osr) {
- std::lock_guard<std::mutex> l(osr_lock);
- osr_set.insert(osr);
- }
- void unregister_osr(OpSequencer *osr) {
- std::lock_guard<std::mutex> l(osr_lock);
- osr_set.erase(osr);
- }
-
-public:
- int statfs(struct store_statfs_t *buf) override;
-
- void collect_metadata(map<string,string> *pm) override;
-
- bool exists(const coll_t& cid, const ghobject_t& oid) override;
- bool exists(CollectionHandle &c, const ghobject_t& oid) override;
- int set_collection_opts(
- const coll_t& cid,
- const pool_opts_t& opts) override;
- int stat(
- const coll_t& cid,
- const ghobject_t& oid,
- struct stat *st,
- bool allow_eio = false) override;
- int stat(
- CollectionHandle &c,
- const ghobject_t& oid,
- struct stat *st,
- bool allow_eio = false) override;
- int read(
- const coll_t& cid,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags = 0) override;
- int read(
- CollectionHandle &c,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags = 0) override;
- int _do_read(
- Collection *c,
- OnodeRef o,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags = 0);
-
-private:
- int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
- uint64_t offset, size_t len, interval_set<uint64_t>& destset);
-public:
- int fiemap(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, bufferlist& bl) override;
- int fiemap(CollectionHandle &c, const ghobject_t& oid,
- uint64_t offset, size_t len, bufferlist& bl) override;
- int fiemap(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
- int fiemap(CollectionHandle &c, const ghobject_t& oid,
- uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
-
-
- int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
- bufferptr& value) override;
- int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
- bufferptr& value) override;
-
- int getattrs(const coll_t& cid, const ghobject_t& oid,
- map<string,bufferptr>& aset) override;
- int getattrs(CollectionHandle &c, const ghobject_t& oid,
- map<string,bufferptr>& aset) override;
-
- int list_collections(vector<coll_t>& ls) override;
-
- CollectionHandle open_collection(const coll_t &c) override;
-
- bool collection_exists(const coll_t& c) override;
- int collection_empty(const coll_t& c, bool *empty) override;
- int collection_bits(const coll_t& c) override;
-
- int collection_list(const coll_t& cid,
- const ghobject_t& start,
- const ghobject_t& end,
- int max,
- vector<ghobject_t> *ls, ghobject_t *next) override;
- int collection_list(CollectionHandle &c,
- const ghobject_t& start,
- const ghobject_t& end,
- int max,
- vector<ghobject_t> *ls, ghobject_t *next) override;
-
- int omap_get(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- map<string, bufferlist> *out /// < [out] Key to value map
- ) override;
- int omap_get(
- CollectionHandle &c, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- map<string, bufferlist> *out /// < [out] Key to value map
- ) override;
-
- /// Get omap header
- int omap_get_header(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- bool allow_eio = false ///< [in] don't assert on eio
- ) override;
- int omap_get_header(
- CollectionHandle &c, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- bool allow_eio = false ///< [in] don't assert on eio
- ) override;
-
- /// Get keys defined on oid
- int omap_get_keys(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- set<string> *keys ///< [out] Keys defined on oid
- ) override;
- int omap_get_keys(
- CollectionHandle &c, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- set<string> *keys ///< [out] Keys defined on oid
- ) override;
-
- /// Get key values
- int omap_get_values(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to get
- map<string, bufferlist> *out ///< [out] Returned keys and values
- ) override;
- int omap_get_values(
- CollectionHandle &c, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to get
- map<string, bufferlist> *out ///< [out] Returned keys and values
- ) override;
-
- /// Filters keys into out which are defined on oid
- int omap_check_keys(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to check
- set<string> *out ///< [out] Subset of keys defined on oid
- ) override;
- int omap_check_keys(
- CollectionHandle &c, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to check
- set<string> *out ///< [out] Subset of keys defined on oid
- ) override;
-
- ObjectMap::ObjectMapIterator get_omap_iterator(
- const coll_t& cid, ///< [in] collection
- const ghobject_t &oid ///< [in] object
- ) override;
- ObjectMap::ObjectMapIterator get_omap_iterator(
- CollectionHandle &c, ///< [in] collection
- const ghobject_t &oid ///< [in] object
- ) override;
-
- void set_fsid(uuid_d u) override {
- fsid = u;
- }
- uuid_d get_fsid() override {
- return fsid;
- }
-
- uint64_t estimate_objects_overhead(uint64_t num_objects) override {
- return num_objects * 300; //assuming per-object overhead is 300 bytes
- }
-
- struct BSPerfTracker {
- PerfCounters::avg_tracker<uint64_t> os_commit_latency;
- PerfCounters::avg_tracker<uint64_t> os_apply_latency;
-
- objectstore_perf_stat_t get_cur_stats() const {
- objectstore_perf_stat_t ret;
- ret.os_commit_latency = os_commit_latency.current_avg();
- ret.os_apply_latency = os_apply_latency.current_avg();
- return ret;
- }
-
- void update_from_perfcounters(PerfCounters &logger);
- } perf_tracker;
-
- objectstore_perf_stat_t get_cur_stats() override {
- perf_tracker.update_from_perfcounters(*logger);
- return perf_tracker.get_cur_stats();
- }
- const PerfCounters* get_perf_counters() const override {
- return logger;
- }
-
- int queue_transactions(
- Sequencer *osr,
- vector<Transaction>& tls,
- TrackedOpRef op = TrackedOpRef(),
- ThreadPool::TPHandle *handle = NULL) override;
-
- // error injection
- void inject_data_error(const ghobject_t& o) override {
- RWLock::WLocker l(debug_read_error_lock);
- debug_data_error_objects.insert(o);
- }
- void inject_mdata_error(const ghobject_t& o) override {
- RWLock::WLocker l(debug_read_error_lock);
- debug_mdata_error_objects.insert(o);
- }
- void compact() override {
- assert(db);
- db->compact();
- }
-
-private:
- bool _debug_data_eio(const ghobject_t& o) {
- if (!cct->_conf->bluestore_debug_inject_read_err) {
- return false;
- }
- RWLock::RLocker l(debug_read_error_lock);
- return debug_data_error_objects.count(o);
- }
- bool _debug_mdata_eio(const ghobject_t& o) {
- if (!cct->_conf->bluestore_debug_inject_read_err) {
- return false;
- }
- RWLock::RLocker l(debug_read_error_lock);
- return debug_mdata_error_objects.count(o);
- }
- void _debug_obj_on_delete(const ghobject_t& o) {
- if (cct->_conf->bluestore_debug_inject_read_err) {
- RWLock::WLocker l(debug_read_error_lock);
- debug_data_error_objects.erase(o);
- debug_mdata_error_objects.erase(o);
- }
- }
-
-private:
-
- // --------------------------------------------------------
- // read processing internal methods
- int _verify_csum(
- OnodeRef& o,
- const bluestore_blob_t* blob,
- uint64_t blob_xoffset,
- const bufferlist& bl,
- uint64_t logical_offset) const;
- int _decompress(bufferlist& source, bufferlist* result);
-
-
- // --------------------------------------------------------
- // write ops
-
- struct WriteContext {
- bool buffered = false; ///< buffered write
- bool compress = false; ///< compressed write
- uint64_t target_blob_size = 0; ///< target (max) blob size
- unsigned csum_order = 0; ///< target checksum chunk order
-
- old_extent_map_t old_extents; ///< must deref these blobs
-
- struct write_item {
- uint64_t logical_offset; ///< write logical offset
- BlobRef b;
- uint64_t blob_length;
- uint64_t b_off;
- bufferlist bl;
- uint64_t b_off0; ///< original offset in a blob prior to padding
- uint64_t length0; ///< original data length prior to padding
-
- bool mark_unused;
- bool new_blob; ///< whether new blob was created
-
- bool compressed = false;
- bufferlist compressed_bl;
- size_t compressed_len = 0;
-
- write_item(
- uint64_t logical_offs,
- BlobRef b,
- uint64_t blob_len,
- uint64_t o,
- bufferlist& bl,
- uint64_t o0,
- uint64_t l0,
- bool _mark_unused,
- bool _new_blob)
- :
- logical_offset(logical_offs),
- b(b),
- blob_length(blob_len),
- b_off(o),
- bl(bl),
- b_off0(o0),
- length0(l0),
- mark_unused(_mark_unused),
- new_blob(_new_blob) {}
- };
- vector<write_item> writes; ///< blobs we're writing
-
- /// partial clone of the context
- void fork(const WriteContext& other) {
- buffered = other.buffered;
- compress = other.compress;
- target_blob_size = other.target_blob_size;
- csum_order = other.csum_order;
- }
- void write(
- uint64_t loffs,
- BlobRef b,
- uint64_t blob_len,
- uint64_t o,
- bufferlist& bl,
- uint64_t o0,
- uint64_t len0,
- bool _mark_unused,
- bool _new_blob) {
- writes.emplace_back(loffs,
- b,
- blob_len,
- o,
- bl,
- o0,
- len0,
- _mark_unused,
- _new_blob);
- }
- /// Checks for writes to the same pextent within a blob
- bool has_conflict(
- BlobRef b,
- uint64_t loffs,
- uint64_t loffs_end,
- uint64_t min_alloc_size);
- };
-
- void _do_write_small(
- TransContext *txc,
- CollectionRef &c,
- OnodeRef o,
- uint64_t offset, uint64_t length,
- bufferlist::iterator& blp,
- WriteContext *wctx);
- void _do_write_big(
- TransContext *txc,
- CollectionRef &c,
- OnodeRef o,
- uint64_t offset, uint64_t length,
- bufferlist::iterator& blp,
- WriteContext *wctx);
- int _do_alloc_write(
- TransContext *txc,
- CollectionRef c,
- OnodeRef o,
- WriteContext *wctx);
- void _wctx_finish(
- TransContext *txc,
- CollectionRef& c,
- OnodeRef o,
- WriteContext *wctx,
- set<SharedBlob*> *maybe_unshared_blobs=0);
-
- int _do_transaction(Transaction *t,
- TransContext *txc,
- ThreadPool::TPHandle *handle);
-
- int _write(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- uint64_t offset, size_t len,
- bufferlist& bl,
- uint32_t fadvise_flags);
- void _pad_zeros(bufferlist *bl, uint64_t *offset,
- uint64_t chunk_size);
-
- void _choose_write_options(CollectionRef& c,
- OnodeRef o,
- uint32_t fadvise_flags,
- WriteContext *wctx);
-
- int _do_gc(TransContext *txc,
- CollectionRef& c,
- OnodeRef o,
- const GarbageCollector& gc,
- const WriteContext& wctx,
- uint64_t *dirty_start,
- uint64_t *dirty_end);
-
- int _do_write(TransContext *txc,
- CollectionRef &c,
- OnodeRef o,
- uint64_t offset, uint64_t length,
- bufferlist& bl,
- uint32_t fadvise_flags);
- void _do_write_data(TransContext *txc,
- CollectionRef& c,
- OnodeRef o,
- uint64_t offset,
- uint64_t length,
- bufferlist& bl,
- WriteContext *wctx);
-
- int _touch(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o);
- int _do_zero(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- uint64_t offset, size_t len);
- int _zero(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- uint64_t offset, size_t len);
- void _do_truncate(TransContext *txc,
- CollectionRef& c,
- OnodeRef o,
- uint64_t offset,
- set<SharedBlob*> *maybe_unshared_blobs=0);
- int _truncate(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- uint64_t offset);
- int _remove(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o);
- int _do_remove(TransContext *txc,
- CollectionRef& c,
- OnodeRef o);
- int _setattr(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- const string& name,
- bufferptr& val);
- int _setattrs(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- const map<string,bufferptr>& aset);
- int _rmattr(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- const string& name);
- int _rmattrs(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o);
- void _do_omap_clear(TransContext *txc, uint64_t id);
- int _omap_clear(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o);
- int _omap_setkeys(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- bufferlist& bl);
- int _omap_setheader(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- bufferlist& header);
- int _omap_rmkeys(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- bufferlist& bl);
- int _omap_rmkey_range(TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- const string& first, const string& last);
- int _set_alloc_hint(
- TransContext *txc,
- CollectionRef& c,
- OnodeRef& o,
- uint64_t expected_object_size,
- uint64_t expected_write_size,
- uint32_t flags);
- int _do_clone_range(TransContext *txc,
- CollectionRef& c,
- OnodeRef& oldo,
- OnodeRef& newo,
- uint64_t srcoff, uint64_t length, uint64_t dstoff);
- int _clone(TransContext *txc,
- CollectionRef& c,
- OnodeRef& oldo,
- OnodeRef& newo);
- int _clone_range(TransContext *txc,
- CollectionRef& c,
- OnodeRef& oldo,
- OnodeRef& newo,
- uint64_t srcoff, uint64_t length, uint64_t dstoff);
- int _rename(TransContext *txc,
- CollectionRef& c,
- OnodeRef& oldo,
- OnodeRef& newo,
- const ghobject_t& new_oid);
- int _create_collection(TransContext *txc, const coll_t &cid,
- unsigned bits, CollectionRef *c);
- int _remove_collection(TransContext *txc, const coll_t &cid,
- CollectionRef *c);
- int _split_collection(TransContext *txc,
- CollectionRef& c,
- CollectionRef& d,
- unsigned bits, int rem);
-};
-
-inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
- return out << *s.parent;
-}
-
-static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
- o->get();
-}
-static inline void intrusive_ptr_release(BlueStore::Onode *o) {
- o->put();
-}
-
-static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
- o->get();
-}
-static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
- o->put();
-}
-
-#endif