// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2014 Red Hat * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_OSD_KSTORE_H #define CEPH_OSD_KSTORE_H #include "acconfig.h" #include #include #include #include #include "include/assert.h" #include "include/unordered_map.h" #include "include/memory.h" #include "common/Finisher.h" #include "common/RWLock.h" #include "common/WorkQueue.h" #include "os/ObjectStore.h" #include "common/perf_counters.h" #include "os/fs/FS.h" #include "kv/KeyValueDB.h" #include "kstore_types.h" #include "boost/intrusive/list.hpp" enum { l_kstore_first = 832430, l_kstore_state_prepare_lat, l_kstore_state_kv_queued_lat, l_kstore_state_kv_done_lat, l_kstore_state_finishing_lat, l_kstore_state_done_lat, l_kstore_last }; class KStore : public ObjectStore { // ----------------------------------------------------- // types public: class TransContext; /// an in-memory object struct Onode { CephContext* cct; std::atomic_int nref; ///< reference count ghobject_t oid; string key; ///< key under PREFIX_OBJ where we are stored boost::intrusive::list_member_hook<> lru_item; kstore_onode_t onode; ///< metadata stored as value in kv store bool dirty; // ??? bool exists; std::mutex flush_lock; ///< protect flush_txns std::condition_variable flush_cond; ///< wait here for unapplied txns set flush_txns; ///< committing txns uint64_t tail_offset; bufferlist tail_bl; map pending_stripes; ///< unwritten stripes Onode(CephContext* cct, const ghobject_t& o, const string& k) : cct(cct), nref(0), oid(o), key(k), dirty(false), exists(false), tail_offset(0) { } void flush(); void get() { ++nref; } void put() { if (--nref == 0) delete this; } void clear_tail() { tail_offset = 0; tail_bl.clear(); } void clear_pending_stripes() { pending_stripes.clear(); } }; typedef boost::intrusive_ptr OnodeRef; struct OnodeHashLRU { CephContext* cct; typedef boost::intrusive::list< Onode, boost::intrusive::member_hook< Onode, boost::intrusive::list_member_hook<>, &Onode::lru_item> > lru_list_t; std::mutex lock; ceph::unordered_map onode_map; ///< forward lookups lru_list_t lru; ///< lru OnodeHashLRU(CephContext* cct) : cct(cct) {} void add(const ghobject_t& oid, OnodeRef o); void _touch(OnodeRef o); OnodeRef lookup(const ghobject_t& o); void rename(const ghobject_t& old_oid, const ghobject_t& new_oid); void clear(); bool get_next(const ghobject_t& after, pair *next); int trim(int max=-1); }; struct Collection : public CollectionImpl { KStore *store; coll_t cid; kstore_cnode_t cnode; RWLock lock; // cache onodes on a per-collection basis to avoid lock // contention. OnodeHashLRU onode_map; OnodeRef get_onode(const ghobject_t& oid, bool create); const coll_t &get_cid() override { return cid; } bool contains(const ghobject_t& oid) { if (cid.is_meta()) return oid.hobj.pool == -1; spg_t spgid; if (cid.is_pg(&spgid)) return spgid.pgid.contains(cnode.bits, oid) && oid.shard_id == spgid.shard; return false; } Collection(KStore *ns, coll_t c); }; typedef boost::intrusive_ptr CollectionRef; class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl { CollectionRef c; OnodeRef o; KeyValueDB::Iterator it; string head, tail; public: OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it); int seek_to_first() override; int upper_bound(const string &after) override; int lower_bound(const string &to) override; bool valid() override; int next(bool validate=true) override; string key() override; bufferlist value() override; int status() override { return 0; } }; class OpSequencer; typedef boost::intrusive_ptr OpSequencerRef; struct TransContext { typedef enum { STATE_PREPARE, STATE_AIO_WAIT, STATE_IO_DONE, STATE_KV_QUEUED, STATE_KV_COMMITTING, STATE_KV_DONE, STATE_FINISHING, STATE_DONE, } state_t; state_t state; const char *get_state_name() { switch (state) { case STATE_PREPARE: return "prepare"; case STATE_AIO_WAIT: return "aio_wait"; case STATE_IO_DONE: return "io_done"; case STATE_KV_QUEUED: return "kv_queued"; case STATE_KV_COMMITTING: return "kv_committing"; case STATE_KV_DONE: return "kv_done"; case STATE_FINISHING: return "finishing"; case STATE_DONE: return "done"; } return "???"; } void log_state_latency(PerfCounters *logger, int state) { utime_t lat, now = ceph_clock_now(); lat = now - start; logger->tinc(state, lat); start = now; } OpSequencerRef osr; boost::intrusive::list_member_hook<> sequencer_item; uint64_t ops, bytes; set onodes; ///< these onodes need to be updated/written KeyValueDB::Transaction t; ///< then we will commit this Context *oncommit; ///< signal on commit Context *onreadable; ///< signal on readable Context *onreadable_sync; ///< signal on readable list oncommits; ///< more commit completions list removed_collections; ///< colls we removed CollectionRef first_collection; ///< first referenced collection utime_t start; explicit TransContext(OpSequencer *o) : state(STATE_PREPARE), osr(o), ops(0), bytes(0), oncommit(NULL), onreadable(NULL), onreadable_sync(NULL), start(ceph_clock_now()){ //cout << "txc new " << this << std::endl; } ~TransContext() { //cout << "txc del " << this << std::endl; } void write_onode(OnodeRef &o) { onodes.insert(o); } }; class OpSequencer : public Sequencer_impl { public: std::mutex qlock; std::condition_variable qcond; typedef boost::intrusive::list< TransContext, boost::intrusive::member_hook< TransContext, boost::intrusive::list_member_hook<>, &TransContext::sequencer_item> > q_list_t; q_list_t q; ///< transactions Sequencer *parent; OpSequencer(CephContext* cct) //set the qlock to PTHREAD_MUTEX_RECURSIVE mode : Sequencer_impl(cct), parent(NULL) { } ~OpSequencer() override { assert(q.empty()); } void queue_new(TransContext *txc) { std::lock_guard l(qlock); q.push_back(*txc); } void flush() override { std::unique_lock l(qlock); while (!q.empty()) qcond.wait(l); } bool flush_commit(Context *c) override { std::lock_guard l(qlock); if (q.empty()) { return true; } TransContext *txc = &q.back(); if (txc->state >= TransContext::STATE_KV_DONE) { return true; } assert(txc->state < TransContext::STATE_KV_DONE); txc->oncommits.push_back(c); return false; } }; struct KVSyncThread : public Thread { KStore *store; explicit KVSyncThread(KStore *s) : store(s) {} void *entry() override { store->_kv_sync_thread(); return NULL; } }; // -------------------------------------------------------- // members private: KeyValueDB *db; uuid_d fsid; int path_fd; ///< open handle to $path int fsid_fd; ///< open handle (locked) to $path/fsid bool mounted; RWLock coll_lock; ///< rwlock to protect coll_map ceph::unordered_map coll_map; std::mutex nid_lock; uint64_t nid_last; uint64_t nid_max; Throttle throttle_ops, throttle_bytes; ///< submit to commit Finisher finisher; KVSyncThread kv_sync_thread; std::mutex kv_lock; std::condition_variable kv_cond, kv_sync_cond; bool kv_stop; deque kv_queue, kv_committing; //Logger *logger; PerfCounters *logger; std::mutex reap_lock; list removed_collections; // -------------------------------------------------------- // private methods void _init_logger(); void _shutdown_logger(); int _open_path(); void _close_path(); int _open_fsid(bool create); int _lock_fsid(); int _read_fsid(uuid_d *f); int _write_fsid(); void _close_fsid(); int _open_db(bool create); void _close_db(); int _open_collections(int *errors=0); void _close_collections(); int _open_super_meta(); CollectionRef _get_collection(coll_t cid); void _queue_reap_collection(CollectionRef& c); void _reap_collections(); void _assign_nid(TransContext *txc, OnodeRef o); void _dump_onode(OnodeRef o); TransContext *_txc_create(OpSequencer *osr); void _txc_release(TransContext *txc, uint64_t offset, uint64_t length); void _txc_add_transaction(TransContext *txc, Transaction *t); void _txc_finalize(OpSequencer *osr, TransContext *txc); void _txc_state_proc(TransContext *txc); void _txc_finish_kv(TransContext *txc); void _txc_finish(TransContext *txc); void _osr_reap_done(OpSequencer *osr); void _kv_sync_thread(); void _kv_stop() { { std::lock_guard l(kv_lock); kv_stop = true; kv_cond.notify_all(); } kv_sync_thread.join(); kv_stop = false; } void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl); void _do_write_stripe(TransContext *txc, OnodeRef o, uint64_t offset, bufferlist& bl); void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset); int _collection_list( Collection *c, const ghobject_t& start, const ghobject_t& end, int max, vector *ls, ghobject_t *next); public: KStore(CephContext *cct, const string& path); ~KStore() override; string get_type() override { return "kstore"; } bool needs_journal() override { return false; }; bool wants_journal() override { return false; }; bool allows_journal() override { return false; }; static int get_block_device_fsid(const string& path, uuid_d *fsid); bool test_mount_in_use() override; int mount() override; int umount() override; void _sync(); int fsck(bool deep) override; int validate_hobject_key(const hobject_t &obj) const override { return 0; } unsigned get_max_attr_name_length() override { return 256; // arbitrary; there is no real limit internally } int mkfs() override; int mkjournal() override { return 0; } void dump_perf_counters(Formatter *f) override { f->open_object_section("perf_counters"); logger->dump_formatted(f, false); f->close_section(); } int statfs(struct store_statfs_t *buf) override; using ObjectStore::exists; bool exists(const coll_t& cid, const ghobject_t& oid) override; using ObjectStore::stat; int stat( const coll_t& cid, const ghobject_t& oid, struct stat *st, bool allow_eio = false) override; // struct stat? int set_collection_opts( const coll_t& cid, const pool_opts_t& opts) override; using ObjectStore::read; int read( const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl, uint32_t op_flags = 0) override; int _do_read( OnodeRef o, uint64_t offset, size_t len, bufferlist& bl, uint32_t op_flags = 0); using ObjectStore::fiemap; int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override; int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map& destmap) override; using ObjectStore::getattr; int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override; using ObjectStore::getattrs; int getattrs(const coll_t& cid, const ghobject_t& oid, map& aset) override; int list_collections(vector& ls) override; bool collection_exists(const coll_t& c) override; int collection_empty(const coll_t& c, bool *empty) override; int collection_bits(const coll_t& c) override; int collection_list( const coll_t& cid, const ghobject_t& start, const ghobject_t& end, int max, vector *ls, ghobject_t *next) override; int collection_list( CollectionHandle &c, const ghobject_t& start, const ghobject_t& end, int max, vector *ls, ghobject_t *next) override; using ObjectStore::omap_get; int omap_get( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object containing omap bufferlist *header, ///< [out] omap header map *out /// < [out] Key to value map ) override; using ObjectStore::omap_get_header; /// Get omap header int omap_get_header( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object containing omap bufferlist *header, ///< [out] omap header bool allow_eio = false ///< [in] don't assert on eio ) override; using ObjectStore::omap_get_keys; /// Get keys defined on oid int omap_get_keys( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object containing omap set *keys ///< [out] Keys defined on oid ) override; using ObjectStore::omap_get_values; /// Get key values int omap_get_values( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object containing omap const set &keys, ///< [in] Keys to get map *out ///< [out] Returned keys and values ) override; using ObjectStore::omap_check_keys; /// Filters keys into out which are defined on oid int omap_check_keys( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object containing omap const set &keys, ///< [in] Keys to check set *out ///< [out] Subset of keys defined on oid ) override; using ObjectStore::get_omap_iterator; ObjectMap::ObjectMapIterator get_omap_iterator( const coll_t& cid, ///< [in] collection const ghobject_t &oid ///< [in] object ) override; void set_fsid(uuid_d u) override { fsid = u; } uuid_d get_fsid() override { return fsid; } uint64_t estimate_objects_overhead(uint64_t num_objects) override { return num_objects * 300; //assuming per-object overhead is 300 bytes } objectstore_perf_stat_t get_cur_stats() override { return objectstore_perf_stat_t(); } const PerfCounters* get_perf_counters() const override { return logger; } int queue_transactions( Sequencer *osr, vector& tls, TrackedOpRef op = TrackedOpRef(), ThreadPool::TPHandle *handle = NULL) override; void compact () override { assert(db); db->compact(); } private: // -------------------------------------------------------- // write ops int _do_transaction(Transaction *t, TransContext *txc, ThreadPool::TPHandle *handle); int _write(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset, size_t len, bufferlist& bl, uint32_t fadvise_flags); int _do_write(TransContext *txc, OnodeRef o, uint64_t offset, uint64_t length, bufferlist& bl, uint32_t fadvise_flags); int _touch(TransContext *txc, CollectionRef& c, OnodeRef& o); int _zero(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset, size_t len); int _do_truncate(TransContext *txc, OnodeRef o, uint64_t offset); int _truncate(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset); int _remove(TransContext *txc, CollectionRef& c, OnodeRef& o); int _do_remove(TransContext *txc, OnodeRef o); int _setattr(TransContext *txc, CollectionRef& c, OnodeRef& o, const string& name, bufferptr& val); int _setattrs(TransContext *txc, CollectionRef& c, OnodeRef& o, const map& aset); int _rmattr(TransContext *txc, CollectionRef& c, OnodeRef& o, const string& name); int _rmattrs(TransContext *txc, CollectionRef& c, OnodeRef& o); void _do_omap_clear(TransContext *txc, uint64_t id); int _omap_clear(TransContext *txc, CollectionRef& c, OnodeRef& o); int _omap_setkeys(TransContext *txc, CollectionRef& c, OnodeRef& o, bufferlist& bl); int _omap_setheader(TransContext *txc, CollectionRef& c, OnodeRef& o, bufferlist& header); int _omap_rmkeys(TransContext *txc, CollectionRef& c, OnodeRef& o, bufferlist& bl); int _omap_rmkey_range(TransContext *txc, CollectionRef& c, OnodeRef& o, const string& first, const string& last); int _setallochint(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t expected_object_size, uint64_t expected_write_size, uint32_t flags); int _clone(TransContext *txc, CollectionRef& c, OnodeRef& oldo, OnodeRef& newo); int _clone_range(TransContext *txc, CollectionRef& c, OnodeRef& oldo, OnodeRef& newo, uint64_t srcoff, uint64_t length, uint64_t dstoff); int _rename(TransContext *txc, CollectionRef& c, OnodeRef& oldo, OnodeRef& newo, const ghobject_t& new_oid); int _create_collection(TransContext *txc, coll_t cid, unsigned bits, CollectionRef *c); int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c); int _split_collection(TransContext *txc, CollectionRef& c, CollectionRef& d, unsigned bits, int rem); }; inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) { return out << *s.parent; } static inline void intrusive_ptr_add_ref(KStore::Onode *o) { o->get(); } static inline void intrusive_ptr_release(KStore::Onode *o) { o->put(); } static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) { o->get(); } static inline void intrusive_ptr_release(KStore::OpSequencer *o) { o->put(); } #endif