X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Fkstore%2FKStore.h;fp=src%2Fceph%2Fsrc%2Fos%2Fkstore%2FKStore.h;h=487d984703978557ad606b995a92a952ec689f59;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/os/kstore/KStore.h b/src/ceph/src/os/kstore/KStore.h new file mode 100644 index 0000000..487d984 --- /dev/null +++ b/src/ceph/src/os/kstore/KStore.h @@ -0,0 +1,694 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OSD_KSTORE_H +#define CEPH_OSD_KSTORE_H + +#include "acconfig.h" + +#include + +#include +#include +#include + +#include "include/assert.h" +#include "include/unordered_map.h" +#include "include/memory.h" +#include "common/Finisher.h" +#include "common/RWLock.h" +#include "common/WorkQueue.h" +#include "os/ObjectStore.h" +#include "common/perf_counters.h" +#include "os/fs/FS.h" +#include "kv/KeyValueDB.h" + +#include "kstore_types.h" + +#include "boost/intrusive/list.hpp" + +enum { + l_kstore_first = 832430, + l_kstore_state_prepare_lat, + l_kstore_state_kv_queued_lat, + l_kstore_state_kv_done_lat, + l_kstore_state_finishing_lat, + l_kstore_state_done_lat, + l_kstore_last +}; + +class KStore : public ObjectStore { + // ----------------------------------------------------- + // types +public: + + class TransContext; + + /// an in-memory object + struct Onode { + CephContext* cct; + std::atomic_int nref; ///< reference count + + ghobject_t oid; + string key; ///< key under PREFIX_OBJ where we are stored + boost::intrusive::list_member_hook<> lru_item; + + kstore_onode_t onode; ///< metadata stored as value in kv store + bool dirty; // ??? + bool exists; + + std::mutex flush_lock; ///< protect flush_txns + std::condition_variable flush_cond; ///< wait here for unapplied txns + set flush_txns; ///< committing txns + + uint64_t tail_offset; + bufferlist tail_bl; + + map pending_stripes; ///< unwritten stripes + + Onode(CephContext* cct, const ghobject_t& o, const string& k) + : cct(cct), + nref(0), + oid(o), + key(k), + dirty(false), + exists(false), + tail_offset(0) { + } + + void flush(); + void get() { + ++nref; + } + void put() { + if (--nref == 0) + delete this; + } + + void clear_tail() { + tail_offset = 0; + tail_bl.clear(); + } + void clear_pending_stripes() { + pending_stripes.clear(); + } + }; + typedef boost::intrusive_ptr OnodeRef; + + struct OnodeHashLRU { + CephContext* cct; + typedef boost::intrusive::list< + Onode, + boost::intrusive::member_hook< + Onode, + boost::intrusive::list_member_hook<>, + &Onode::lru_item> > lru_list_t; + + std::mutex lock; + ceph::unordered_map onode_map; ///< forward lookups + lru_list_t lru; ///< lru + + OnodeHashLRU(CephContext* cct) : cct(cct) {} + + void add(const ghobject_t& oid, OnodeRef o); + void _touch(OnodeRef o); + OnodeRef lookup(const ghobject_t& o); + void rename(const ghobject_t& old_oid, const ghobject_t& new_oid); + void clear(); + bool get_next(const ghobject_t& after, pair *next); + int trim(int max=-1); + }; + + struct Collection : public CollectionImpl { + KStore *store; + coll_t cid; + kstore_cnode_t cnode; + RWLock lock; + + // cache onodes on a per-collection basis to avoid lock + // contention. + OnodeHashLRU onode_map; + + OnodeRef get_onode(const ghobject_t& oid, bool create); + + const coll_t &get_cid() override { + return cid; + } + + bool contains(const ghobject_t& oid) { + if (cid.is_meta()) + return oid.hobj.pool == -1; + spg_t spgid; + if (cid.is_pg(&spgid)) + return + spgid.pgid.contains(cnode.bits, oid) && + oid.shard_id == spgid.shard; + return false; + } + + Collection(KStore *ns, coll_t c); + }; + typedef boost::intrusive_ptr CollectionRef; + + class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { + CollectionRef c; + OnodeRef o; + KeyValueDB::Iterator it; + string head, tail; + public: + OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it); + int seek_to_first() override; + int upper_bound(const string &after) override; + int lower_bound(const string &to) override; + bool valid() override; + int next(bool validate=true) override; + string key() override; + bufferlist value() override; + int status() override { + return 0; + } + }; + + class OpSequencer; + typedef boost::intrusive_ptr OpSequencerRef; + + struct TransContext { + typedef enum { + STATE_PREPARE, + STATE_AIO_WAIT, + STATE_IO_DONE, + STATE_KV_QUEUED, + STATE_KV_COMMITTING, + STATE_KV_DONE, + STATE_FINISHING, + STATE_DONE, + } state_t; + + state_t state; + + const char *get_state_name() { + switch (state) { + case STATE_PREPARE: return "prepare"; + case STATE_AIO_WAIT: return "aio_wait"; + case STATE_IO_DONE: return "io_done"; + case STATE_KV_QUEUED: return "kv_queued"; + case STATE_KV_COMMITTING: return "kv_committing"; + case STATE_KV_DONE: return "kv_done"; + case STATE_FINISHING: return "finishing"; + case STATE_DONE: return "done"; + } + return "???"; + } + + void log_state_latency(PerfCounters *logger, int state) { + utime_t lat, now = ceph_clock_now(); + lat = now - start; + logger->tinc(state, lat); + start = now; + } + + OpSequencerRef osr; + boost::intrusive::list_member_hook<> sequencer_item; + + uint64_t ops, bytes; + + set onodes; ///< these onodes need to be updated/written + KeyValueDB::Transaction t; ///< then we will commit this + Context *oncommit; ///< signal on commit + Context *onreadable; ///< signal on readable + Context *onreadable_sync; ///< signal on readable + list oncommits; ///< more commit completions + list removed_collections; ///< colls we removed + + CollectionRef first_collection; ///< first referenced collection + utime_t start; + explicit TransContext(OpSequencer *o) + : state(STATE_PREPARE), + osr(o), + ops(0), + bytes(0), + oncommit(NULL), + onreadable(NULL), + onreadable_sync(NULL), + start(ceph_clock_now()){ + //cout << "txc new " << this << std::endl; + } + ~TransContext() { + //cout << "txc del " << this << std::endl; + } + + void write_onode(OnodeRef &o) { + onodes.insert(o); + } + }; + + class OpSequencer : public Sequencer_impl { + public: + std::mutex qlock; + std::condition_variable qcond; + typedef boost::intrusive::list< + TransContext, + boost::intrusive::member_hook< + TransContext, + boost::intrusive::list_member_hook<>, + &TransContext::sequencer_item> > q_list_t; + q_list_t q; ///< transactions + + Sequencer *parent; + + OpSequencer(CephContext* cct) + //set the qlock to PTHREAD_MUTEX_RECURSIVE mode + : Sequencer_impl(cct), + parent(NULL) { + } + ~OpSequencer() override { + assert(q.empty()); + } + + void queue_new(TransContext *txc) { + std::lock_guard l(qlock); + q.push_back(*txc); + } + + void flush() override { + std::unique_lock l(qlock); + while (!q.empty()) + qcond.wait(l); + } + + bool flush_commit(Context *c) override { + std::lock_guard l(qlock); + if (q.empty()) { + return true; + } + TransContext *txc = &q.back(); + if (txc->state >= TransContext::STATE_KV_DONE) { + return true; + } + assert(txc->state < TransContext::STATE_KV_DONE); + txc->oncommits.push_back(c); + return false; + } + }; + + struct KVSyncThread : public Thread { + KStore *store; + explicit KVSyncThread(KStore *s) : store(s) {} + void *entry() override { + store->_kv_sync_thread(); + return NULL; + } + }; + + // -------------------------------------------------------- + // members +private: + KeyValueDB *db; + uuid_d fsid; + int path_fd; ///< open handle to $path + int fsid_fd; ///< open handle (locked) to $path/fsid + bool mounted; + + RWLock coll_lock; ///< rwlock to protect coll_map + ceph::unordered_map coll_map; + + std::mutex nid_lock; + uint64_t nid_last; + uint64_t nid_max; + + Throttle throttle_ops, throttle_bytes; ///< submit to commit + + Finisher finisher; + + KVSyncThread kv_sync_thread; + std::mutex kv_lock; + std::condition_variable kv_cond, kv_sync_cond; + bool kv_stop; + deque kv_queue, kv_committing; + + //Logger *logger; + PerfCounters *logger; + std::mutex reap_lock; + list removed_collections; + + + // -------------------------------------------------------- + // private methods + + void _init_logger(); + void _shutdown_logger(); + + int _open_path(); + void _close_path(); + int _open_fsid(bool create); + int _lock_fsid(); + int _read_fsid(uuid_d *f); + int _write_fsid(); + void _close_fsid(); + int _open_db(bool create); + void _close_db(); + int _open_collections(int *errors=0); + void _close_collections(); + + int _open_super_meta(); + + CollectionRef _get_collection(coll_t cid); + void _queue_reap_collection(CollectionRef& c); + void _reap_collections(); + + void _assign_nid(TransContext *txc, OnodeRef o); + + void _dump_onode(OnodeRef o); + + TransContext *_txc_create(OpSequencer *osr); + void _txc_release(TransContext *txc, uint64_t offset, uint64_t length); + void _txc_add_transaction(TransContext *txc, Transaction *t); + void _txc_finalize(OpSequencer *osr, TransContext *txc); + void _txc_state_proc(TransContext *txc); + void _txc_finish_kv(TransContext *txc); + void _txc_finish(TransContext *txc); + + void _osr_reap_done(OpSequencer *osr); + + void _kv_sync_thread(); + void _kv_stop() { + { + std::lock_guard l(kv_lock); + kv_stop = true; + kv_cond.notify_all(); + } + kv_sync_thread.join(); + kv_stop = false; + } + + void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl); + void _do_write_stripe(TransContext *txc, OnodeRef o, + uint64_t offset, bufferlist& bl); + void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset); + + int _collection_list( + Collection *c, const ghobject_t& start, const ghobject_t& end, + int max, vector *ls, ghobject_t *next); + +public: + KStore(CephContext *cct, const string& path); + ~KStore() override; + + string get_type() override { + return "kstore"; + } + + bool needs_journal() override { return false; }; + bool wants_journal() override { return false; }; + bool allows_journal() override { return false; }; + + static int get_block_device_fsid(const string& path, uuid_d *fsid); + + bool test_mount_in_use() override; + + int mount() override; + int umount() override; + void _sync(); + + int fsck(bool deep) override; + + + int validate_hobject_key(const hobject_t &obj) const override { + return 0; + } + unsigned get_max_attr_name_length() override { + return 256; // arbitrary; there is no real limit internally + } + + int mkfs() override; + int mkjournal() override { + return 0; + } + void dump_perf_counters(Formatter *f) override { + f->open_object_section("perf_counters"); + logger->dump_formatted(f, false); + f->close_section(); + } + + int statfs(struct store_statfs_t *buf) override; + + using ObjectStore::exists; + bool exists(const coll_t& cid, const ghobject_t& oid) override; + using ObjectStore::stat; + int stat( + const coll_t& cid, + const ghobject_t& oid, + struct stat *st, + bool allow_eio = false) override; // struct stat? + int set_collection_opts( + const coll_t& cid, + const pool_opts_t& opts) override; + using ObjectStore::read; + int read( + const coll_t& cid, + const ghobject_t& oid, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0) override; + int _do_read( + OnodeRef o, + uint64_t offset, + size_t len, + bufferlist& bl, + uint32_t op_flags = 0); + + using ObjectStore::fiemap; + int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override; + int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map& destmap) override; + using ObjectStore::getattr; + int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override; + using ObjectStore::getattrs; + int getattrs(const coll_t& cid, const ghobject_t& oid, map& aset) override; + + int list_collections(vector& ls) override; + bool collection_exists(const coll_t& c) override; + int collection_empty(const coll_t& c, bool *empty) override; + int collection_bits(const coll_t& c) override; + int collection_list( + const coll_t& cid, const ghobject_t& start, const ghobject_t& end, + int max, + vector *ls, ghobject_t *next) override; + int collection_list( + CollectionHandle &c, const ghobject_t& start, const ghobject_t& end, + int max, + vector *ls, ghobject_t *next) override; + + using ObjectStore::omap_get; + int omap_get( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + map *out /// < [out] Key to value map + ) override; + + using ObjectStore::omap_get_header; + /// Get omap header + int omap_get_header( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + bufferlist *header, ///< [out] omap header + bool allow_eio = false ///< [in] don't assert on eio + ) override; + + using ObjectStore::omap_get_keys; + /// Get keys defined on oid + int omap_get_keys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + set *keys ///< [out] Keys defined on oid + ) override; + + using ObjectStore::omap_get_values; + /// Get key values + int omap_get_values( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to get + map *out ///< [out] Returned keys and values + ) override; + + using ObjectStore::omap_check_keys; + /// Filters keys into out which are defined on oid + int omap_check_keys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object containing omap + const set &keys, ///< [in] Keys to check + set *out ///< [out] Subset of keys defined on oid + ) override; + + using ObjectStore::get_omap_iterator; + ObjectMap::ObjectMapIterator get_omap_iterator( + const coll_t& cid, ///< [in] collection + const ghobject_t &oid ///< [in] object + ) override; + + void set_fsid(uuid_d u) override { + fsid = u; + } + uuid_d get_fsid() override { + return fsid; + } + + uint64_t estimate_objects_overhead(uint64_t num_objects) override { + return num_objects * 300; //assuming per-object overhead is 300 bytes + } + + objectstore_perf_stat_t get_cur_stats() override { + return objectstore_perf_stat_t(); + } + const PerfCounters* get_perf_counters() const override { + return logger; + } + + + int queue_transactions( + Sequencer *osr, + vector& tls, + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) override; + + void compact () override { + assert(db); + db->compact(); + } + +private: + // -------------------------------------------------------- + // write ops + + int _do_transaction(Transaction *t, + TransContext *txc, + ThreadPool::TPHandle *handle); + + int _write(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len, + bufferlist& bl, + uint32_t fadvise_flags); + int _do_write(TransContext *txc, + OnodeRef o, + uint64_t offset, uint64_t length, + bufferlist& bl, + uint32_t fadvise_flags); + int _touch(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _zero(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset, size_t len); + int _do_truncate(TransContext *txc, + OnodeRef o, + uint64_t offset); + int _truncate(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t offset); + int _remove(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _do_remove(TransContext *txc, + OnodeRef o); + int _setattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name, + bufferptr& val); + int _setattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const map& aset); + int _rmattr(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& name); + int _rmattrs(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + void _do_omap_clear(TransContext *txc, uint64_t id); + int _omap_clear(TransContext *txc, + CollectionRef& c, + OnodeRef& o); + int _omap_setkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& bl); + int _omap_setheader(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& header); + int _omap_rmkeys(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + bufferlist& bl); + int _omap_rmkey_range(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + const string& first, const string& last); + int _setallochint(TransContext *txc, + CollectionRef& c, + OnodeRef& o, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags); + int _clone(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo); + int _clone_range(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + uint64_t srcoff, uint64_t length, uint64_t dstoff); + int _rename(TransContext *txc, + CollectionRef& c, + OnodeRef& oldo, + OnodeRef& newo, + const ghobject_t& new_oid); + int _create_collection(TransContext *txc, coll_t cid, unsigned bits, + CollectionRef *c); + int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c); + int _split_collection(TransContext *txc, + CollectionRef& c, + CollectionRef& d, + unsigned bits, int rem); + +}; + +inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) { + return out << *s.parent; +} + +static inline void intrusive_ptr_add_ref(KStore::Onode *o) { + o->get(); +} +static inline void intrusive_ptr_release(KStore::Onode *o) { + o->put(); +} + +static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) { + o->get(); +} +static inline void intrusive_ptr_release(KStore::OpSequencer *o) { + o->put(); +} + +#endif