1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
24 #include <condition_variable>
26 #include "include/assert.h"
27 #include "include/unordered_map.h"
28 #include "include/memory.h"
29 #include "common/Finisher.h"
30 #include "common/RWLock.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
35 #include "kv/KeyValueDB.h"
37 #include "kstore_types.h"
39 #include "boost/intrusive/list.hpp"
42 l_kstore_first = 832430,
43 l_kstore_state_prepare_lat,
44 l_kstore_state_kv_queued_lat,
45 l_kstore_state_kv_done_lat,
46 l_kstore_state_finishing_lat,
47 l_kstore_state_done_lat,
51 class KStore : public ObjectStore {
52 // -----------------------------------------------------
58 /// an in-memory object
61 std::atomic_int nref; ///< reference count
64 string key; ///< key under PREFIX_OBJ where we are stored
65 boost::intrusive::list_member_hook<> lru_item;
67 kstore_onode_t onode; ///< metadata stored as value in kv store
71 std::mutex flush_lock; ///< protect flush_txns
72 std::condition_variable flush_cond; ///< wait here for unapplied txns
73 set<TransContext*> flush_txns; ///< committing txns
78 map<uint64_t,bufferlist> pending_stripes; ///< unwritten stripes
80 Onode(CephContext* cct, const ghobject_t& o, const string& k)
103 void clear_pending_stripes() {
104 pending_stripes.clear();
107 typedef boost::intrusive_ptr<Onode> OnodeRef;
109 struct OnodeHashLRU {
111 typedef boost::intrusive::list<
113 boost::intrusive::member_hook<
115 boost::intrusive::list_member_hook<>,
116 &Onode::lru_item> > lru_list_t;
119 ceph::unordered_map<ghobject_t,OnodeRef> onode_map; ///< forward lookups
120 lru_list_t lru; ///< lru
122 OnodeHashLRU(CephContext* cct) : cct(cct) {}
124 void add(const ghobject_t& oid, OnodeRef o);
125 void _touch(OnodeRef o);
126 OnodeRef lookup(const ghobject_t& o);
127 void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
129 bool get_next(const ghobject_t& after, pair<ghobject_t,OnodeRef> *next);
130 int trim(int max=-1);
133 struct Collection : public CollectionImpl {
136 kstore_cnode_t cnode;
139 // cache onodes on a per-collection basis to avoid lock
141 OnodeHashLRU onode_map;
143 OnodeRef get_onode(const ghobject_t& oid, bool create);
145 const coll_t &get_cid() override {
149 bool contains(const ghobject_t& oid) {
151 return oid.hobj.pool == -1;
153 if (cid.is_pg(&spgid))
155 spgid.pgid.contains(cnode.bits, oid) &&
156 oid.shard_id == spgid.shard;
160 Collection(KStore *ns, coll_t c);
162 typedef boost::intrusive_ptr<Collection> CollectionRef;
164 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
167 KeyValueDB::Iterator it;
170 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
171 int seek_to_first() override;
172 int upper_bound(const string &after) override;
173 int lower_bound(const string &to) override;
174 bool valid() override;
175 int next(bool validate=true) override;
176 string key() override;
177 bufferlist value() override;
178 int status() override {
184 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
186 struct TransContext {
200 const char *get_state_name() {
202 case STATE_PREPARE: return "prepare";
203 case STATE_AIO_WAIT: return "aio_wait";
204 case STATE_IO_DONE: return "io_done";
205 case STATE_KV_QUEUED: return "kv_queued";
206 case STATE_KV_COMMITTING: return "kv_committing";
207 case STATE_KV_DONE: return "kv_done";
208 case STATE_FINISHING: return "finishing";
209 case STATE_DONE: return "done";
214 void log_state_latency(PerfCounters *logger, int state) {
215 utime_t lat, now = ceph_clock_now();
217 logger->tinc(state, lat);
222 boost::intrusive::list_member_hook<> sequencer_item;
226 set<OnodeRef> onodes; ///< these onodes need to be updated/written
227 KeyValueDB::Transaction t; ///< then we will commit this
228 Context *oncommit; ///< signal on commit
229 Context *onreadable; ///< signal on readable
230 Context *onreadable_sync; ///< signal on readable
231 list<Context*> oncommits; ///< more commit completions
232 list<CollectionRef> removed_collections; ///< colls we removed
234 CollectionRef first_collection; ///< first referenced collection
236 explicit TransContext(OpSequencer *o)
237 : state(STATE_PREPARE),
243 onreadable_sync(NULL),
244 start(ceph_clock_now()){
245 //cout << "txc new " << this << std::endl;
248 //cout << "txc del " << this << std::endl;
251 void write_onode(OnodeRef &o) {
256 class OpSequencer : public Sequencer_impl {
259 std::condition_variable qcond;
260 typedef boost::intrusive::list<
262 boost::intrusive::member_hook<
264 boost::intrusive::list_member_hook<>,
265 &TransContext::sequencer_item> > q_list_t;
266 q_list_t q; ///< transactions
270 OpSequencer(CephContext* cct)
271 //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
272 : Sequencer_impl(cct),
275 ~OpSequencer() override {
279 void queue_new(TransContext *txc) {
280 std::lock_guard<std::mutex> l(qlock);
284 void flush() override {
285 std::unique_lock<std::mutex> l(qlock);
290 bool flush_commit(Context *c) override {
291 std::lock_guard<std::mutex> l(qlock);
295 TransContext *txc = &q.back();
296 if (txc->state >= TransContext::STATE_KV_DONE) {
299 assert(txc->state < TransContext::STATE_KV_DONE);
300 txc->oncommits.push_back(c);
305 struct KVSyncThread : public Thread {
307 explicit KVSyncThread(KStore *s) : store(s) {}
308 void *entry() override {
309 store->_kv_sync_thread();
314 // --------------------------------------------------------
319 int path_fd; ///< open handle to $path
320 int fsid_fd; ///< open handle (locked) to $path/fsid
323 RWLock coll_lock; ///< rwlock to protect coll_map
324 ceph::unordered_map<coll_t, CollectionRef> coll_map;
330 Throttle throttle_ops, throttle_bytes; ///< submit to commit
334 KVSyncThread kv_sync_thread;
336 std::condition_variable kv_cond, kv_sync_cond;
338 deque<TransContext*> kv_queue, kv_committing;
341 PerfCounters *logger;
342 std::mutex reap_lock;
343 list<CollectionRef> removed_collections;
346 // --------------------------------------------------------
350 void _shutdown_logger();
354 int _open_fsid(bool create);
356 int _read_fsid(uuid_d *f);
359 int _open_db(bool create);
361 int _open_collections(int *errors=0);
362 void _close_collections();
364 int _open_super_meta();
366 CollectionRef _get_collection(coll_t cid);
367 void _queue_reap_collection(CollectionRef& c);
368 void _reap_collections();
370 void _assign_nid(TransContext *txc, OnodeRef o);
372 void _dump_onode(OnodeRef o);
374 TransContext *_txc_create(OpSequencer *osr);
375 void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
376 void _txc_add_transaction(TransContext *txc, Transaction *t);
377 void _txc_finalize(OpSequencer *osr, TransContext *txc);
378 void _txc_state_proc(TransContext *txc);
379 void _txc_finish_kv(TransContext *txc);
380 void _txc_finish(TransContext *txc);
382 void _osr_reap_done(OpSequencer *osr);
384 void _kv_sync_thread();
387 std::lock_guard<std::mutex> l(kv_lock);
389 kv_cond.notify_all();
391 kv_sync_thread.join();
395 void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl);
396 void _do_write_stripe(TransContext *txc, OnodeRef o,
397 uint64_t offset, bufferlist& bl);
398 void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
400 int _collection_list(
401 Collection *c, const ghobject_t& start, const ghobject_t& end,
402 int max, vector<ghobject_t> *ls, ghobject_t *next);
405 KStore(CephContext *cct, const string& path);
408 string get_type() override {
412 bool needs_journal() override { return false; };
413 bool wants_journal() override { return false; };
414 bool allows_journal() override { return false; };
416 static int get_block_device_fsid(const string& path, uuid_d *fsid);
418 bool test_mount_in_use() override;
420 int mount() override;
421 int umount() override;
424 int fsck(bool deep) override;
427 int validate_hobject_key(const hobject_t &obj) const override {
430 unsigned get_max_attr_name_length() override {
431 return 256; // arbitrary; there is no real limit internally
435 int mkjournal() override {
438 void dump_perf_counters(Formatter *f) override {
439 f->open_object_section("perf_counters");
440 logger->dump_formatted(f, false);
444 int statfs(struct store_statfs_t *buf) override;
446 using ObjectStore::exists;
447 bool exists(const coll_t& cid, const ghobject_t& oid) override;
448 using ObjectStore::stat;
451 const ghobject_t& oid,
453 bool allow_eio = false) override; // struct stat?
454 int set_collection_opts(
456 const pool_opts_t& opts) override;
457 using ObjectStore::read;
460 const ghobject_t& oid,
464 uint32_t op_flags = 0) override;
470 uint32_t op_flags = 0);
472 using ObjectStore::fiemap;
473 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
474 int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
475 using ObjectStore::getattr;
476 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override;
477 using ObjectStore::getattrs;
478 int getattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset) override;
480 int list_collections(vector<coll_t>& ls) override;
481 bool collection_exists(const coll_t& c) override;
482 int collection_empty(const coll_t& c, bool *empty) override;
483 int collection_bits(const coll_t& c) override;
485 const coll_t& cid, const ghobject_t& start, const ghobject_t& end,
487 vector<ghobject_t> *ls, ghobject_t *next) override;
489 CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
491 vector<ghobject_t> *ls, ghobject_t *next) override;
493 using ObjectStore::omap_get;
495 const coll_t& cid, ///< [in] Collection containing oid
496 const ghobject_t &oid, ///< [in] Object containing omap
497 bufferlist *header, ///< [out] omap header
498 map<string, bufferlist> *out /// < [out] Key to value map
501 using ObjectStore::omap_get_header;
504 const coll_t& cid, ///< [in] Collection containing oid
505 const ghobject_t &oid, ///< [in] Object containing omap
506 bufferlist *header, ///< [out] omap header
507 bool allow_eio = false ///< [in] don't assert on eio
510 using ObjectStore::omap_get_keys;
511 /// Get keys defined on oid
513 const coll_t& cid, ///< [in] Collection containing oid
514 const ghobject_t &oid, ///< [in] Object containing omap
515 set<string> *keys ///< [out] Keys defined on oid
518 using ObjectStore::omap_get_values;
521 const coll_t& cid, ///< [in] Collection containing oid
522 const ghobject_t &oid, ///< [in] Object containing omap
523 const set<string> &keys, ///< [in] Keys to get
524 map<string, bufferlist> *out ///< [out] Returned keys and values
527 using ObjectStore::omap_check_keys;
528 /// Filters keys into out which are defined on oid
530 const coll_t& cid, ///< [in] Collection containing oid
531 const ghobject_t &oid, ///< [in] Object containing omap
532 const set<string> &keys, ///< [in] Keys to check
533 set<string> *out ///< [out] Subset of keys defined on oid
536 using ObjectStore::get_omap_iterator;
537 ObjectMap::ObjectMapIterator get_omap_iterator(
538 const coll_t& cid, ///< [in] collection
539 const ghobject_t &oid ///< [in] object
542 void set_fsid(uuid_d u) override {
545 uuid_d get_fsid() override {
549 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
550 return num_objects * 300; //assuming per-object overhead is 300 bytes
553 objectstore_perf_stat_t get_cur_stats() override {
554 return objectstore_perf_stat_t();
556 const PerfCounters* get_perf_counters() const override {
561 int queue_transactions(
563 vector<Transaction>& tls,
564 TrackedOpRef op = TrackedOpRef(),
565 ThreadPool::TPHandle *handle = NULL) override;
567 void compact () override {
573 // --------------------------------------------------------
576 int _do_transaction(Transaction *t,
578 ThreadPool::TPHandle *handle);
580 int _write(TransContext *txc,
583 uint64_t offset, size_t len,
585 uint32_t fadvise_flags);
586 int _do_write(TransContext *txc,
588 uint64_t offset, uint64_t length,
590 uint32_t fadvise_flags);
591 int _touch(TransContext *txc,
594 int _zero(TransContext *txc,
597 uint64_t offset, size_t len);
598 int _do_truncate(TransContext *txc,
601 int _truncate(TransContext *txc,
605 int _remove(TransContext *txc,
608 int _do_remove(TransContext *txc,
610 int _setattr(TransContext *txc,
615 int _setattrs(TransContext *txc,
618 const map<string,bufferptr>& aset);
619 int _rmattr(TransContext *txc,
623 int _rmattrs(TransContext *txc,
626 void _do_omap_clear(TransContext *txc, uint64_t id);
627 int _omap_clear(TransContext *txc,
630 int _omap_setkeys(TransContext *txc,
634 int _omap_setheader(TransContext *txc,
638 int _omap_rmkeys(TransContext *txc,
642 int _omap_rmkey_range(TransContext *txc,
645 const string& first, const string& last);
646 int _setallochint(TransContext *txc,
649 uint64_t expected_object_size,
650 uint64_t expected_write_size,
652 int _clone(TransContext *txc,
656 int _clone_range(TransContext *txc,
660 uint64_t srcoff, uint64_t length, uint64_t dstoff);
661 int _rename(TransContext *txc,
665 const ghobject_t& new_oid);
666 int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
668 int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
669 int _split_collection(TransContext *txc,
672 unsigned bits, int rem);
676 inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) {
677 return out << *s.parent;
680 static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
683 static inline void intrusive_ptr_release(KStore::Onode *o) {
687 static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
690 static inline void intrusive_ptr_release(KStore::OpSequencer *o) {