Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / kstore / KStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_OSD_KSTORE_H
16 #define CEPH_OSD_KSTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include "include/assert.h"
27 #include "include/unordered_map.h"
28 #include "include/memory.h"
29 #include "common/Finisher.h"
30 #include "common/RWLock.h"
31 #include "common/WorkQueue.h"
32 #include "os/ObjectStore.h"
33 #include "common/perf_counters.h"
34 #include "os/fs/FS.h"
35 #include "kv/KeyValueDB.h"
36
37 #include "kstore_types.h"
38
39 #include "boost/intrusive/list.hpp"
40
41 enum {  
42   l_kstore_first = 832430,
43   l_kstore_state_prepare_lat,
44   l_kstore_state_kv_queued_lat,
45   l_kstore_state_kv_done_lat,
46   l_kstore_state_finishing_lat,
47   l_kstore_state_done_lat,
48   l_kstore_last
49 };
50
51 class KStore : public ObjectStore {
52   // -----------------------------------------------------
53   // types
54 public:
55
56   class TransContext;
57
58   /// an in-memory object
59   struct Onode {
60     CephContext* cct;
61     std::atomic_int nref;  ///< reference count
62
63     ghobject_t oid;
64     string key;     ///< key under PREFIX_OBJ where we are stored
65     boost::intrusive::list_member_hook<> lru_item;
66
67     kstore_onode_t onode;  ///< metadata stored as value in kv store
68     bool dirty;     // ???
69     bool exists;
70
71     std::mutex flush_lock;  ///< protect flush_txns
72     std::condition_variable flush_cond;   ///< wait here for unapplied txns
73     set<TransContext*> flush_txns;   ///< committing txns
74
75     uint64_t tail_offset;
76     bufferlist tail_bl;
77
78     map<uint64_t,bufferlist> pending_stripes;  ///< unwritten stripes
79
80     Onode(CephContext* cct, const ghobject_t& o, const string& k)
81       : cct(cct),
82         nref(0),
83         oid(o),
84         key(k),
85         dirty(false),
86         exists(false),
87         tail_offset(0) {
88     }
89
90     void flush();
91     void get() {
92       ++nref;
93     }
94     void put() {
95       if (--nref == 0)
96         delete this;
97     }
98
99     void clear_tail() {
100       tail_offset = 0;
101       tail_bl.clear();
102     }
103     void clear_pending_stripes() {
104       pending_stripes.clear();
105     }
106   };
107   typedef boost::intrusive_ptr<Onode> OnodeRef;
108
109   struct OnodeHashLRU {
110     CephContext* cct;
111     typedef boost::intrusive::list<
112       Onode,
113       boost::intrusive::member_hook<
114         Onode,
115         boost::intrusive::list_member_hook<>,
116         &Onode::lru_item> > lru_list_t;
117
118     std::mutex lock;
119     ceph::unordered_map<ghobject_t,OnodeRef> onode_map;  ///< forward lookups
120     lru_list_t lru;                                      ///< lru
121
122     OnodeHashLRU(CephContext* cct) : cct(cct) {}
123
124     void add(const ghobject_t& oid, OnodeRef o);
125     void _touch(OnodeRef o);
126     OnodeRef lookup(const ghobject_t& o);
127     void rename(const ghobject_t& old_oid, const ghobject_t& new_oid);
128     void clear();
129     bool get_next(const ghobject_t& after, pair<ghobject_t,OnodeRef> *next);
130     int trim(int max=-1);
131   };
132
133   struct Collection : public CollectionImpl {
134     KStore *store;
135     coll_t cid;
136     kstore_cnode_t cnode;
137     RWLock lock;
138
139     // cache onodes on a per-collection basis to avoid lock
140     // contention.
141     OnodeHashLRU onode_map;
142
143     OnodeRef get_onode(const ghobject_t& oid, bool create);
144
145     const coll_t &get_cid() override {
146       return cid;
147     }
148
149     bool contains(const ghobject_t& oid) {
150       if (cid.is_meta())
151         return oid.hobj.pool == -1;
152       spg_t spgid;
153       if (cid.is_pg(&spgid))
154         return
155           spgid.pgid.contains(cnode.bits, oid) &&
156           oid.shard_id == spgid.shard;
157       return false;
158     }
159
160     Collection(KStore *ns, coll_t c);
161   };
162   typedef boost::intrusive_ptr<Collection> CollectionRef;
163
164   class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
165     CollectionRef c;
166     OnodeRef o;
167     KeyValueDB::Iterator it;
168     string head, tail;
169   public:
170     OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
171     int seek_to_first() override;
172     int upper_bound(const string &after) override;
173     int lower_bound(const string &to) override;
174     bool valid() override;
175     int next(bool validate=true) override;
176     string key() override;
177     bufferlist value() override;
178     int status() override {
179       return 0;
180     }
181   };
182
183   class OpSequencer;
184   typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
185
186   struct TransContext {
187     typedef enum {
188       STATE_PREPARE,
189       STATE_AIO_WAIT,
190       STATE_IO_DONE,
191       STATE_KV_QUEUED,
192       STATE_KV_COMMITTING,
193       STATE_KV_DONE,
194       STATE_FINISHING,
195       STATE_DONE,
196     } state_t;
197
198     state_t state;
199
200     const char *get_state_name() {
201       switch (state) {
202       case STATE_PREPARE: return "prepare";
203       case STATE_AIO_WAIT: return "aio_wait";
204       case STATE_IO_DONE: return "io_done";
205       case STATE_KV_QUEUED: return "kv_queued";
206       case STATE_KV_COMMITTING: return "kv_committing";
207       case STATE_KV_DONE: return "kv_done";
208       case STATE_FINISHING: return "finishing";
209       case STATE_DONE: return "done";
210       }
211       return "???";
212     }
213
214     void log_state_latency(PerfCounters *logger, int state) {
215         utime_t lat, now = ceph_clock_now();
216         lat = now - start;
217         logger->tinc(state, lat);
218         start = now;
219     }
220
221     OpSequencerRef osr;
222     boost::intrusive::list_member_hook<> sequencer_item;
223
224     uint64_t ops, bytes;
225
226     set<OnodeRef> onodes;     ///< these onodes need to be updated/written
227     KeyValueDB::Transaction t; ///< then we will commit this
228     Context *oncommit;         ///< signal on commit
229     Context *onreadable;         ///< signal on readable
230     Context *onreadable_sync;         ///< signal on readable
231     list<Context*> oncommits;  ///< more commit completions
232     list<CollectionRef> removed_collections; ///< colls we removed
233
234     CollectionRef first_collection;  ///< first referenced collection
235     utime_t start;
236     explicit TransContext(OpSequencer *o)
237       : state(STATE_PREPARE),
238         osr(o),
239         ops(0),
240         bytes(0),
241         oncommit(NULL),
242         onreadable(NULL),
243         onreadable_sync(NULL),
244         start(ceph_clock_now()){
245       //cout << "txc new " << this << std::endl;
246     }
247     ~TransContext() {
248       //cout << "txc del " << this << std::endl;
249     }
250
251     void write_onode(OnodeRef &o) {
252       onodes.insert(o);
253     }
254   };
255
256   class OpSequencer : public Sequencer_impl {
257   public:
258     std::mutex qlock;
259     std::condition_variable qcond;
260     typedef boost::intrusive::list<
261       TransContext,
262       boost::intrusive::member_hook<
263         TransContext,
264         boost::intrusive::list_member_hook<>,
265         &TransContext::sequencer_item> > q_list_t;
266     q_list_t q;  ///< transactions
267
268     Sequencer *parent;
269
270     OpSequencer(CephContext* cct)
271         //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
272       : Sequencer_impl(cct),
273         parent(NULL) {
274     }
275     ~OpSequencer() override {
276       assert(q.empty());
277     }
278
279     void queue_new(TransContext *txc) {
280       std::lock_guard<std::mutex> l(qlock);
281       q.push_back(*txc);
282     }
283
284     void flush() override {
285       std::unique_lock<std::mutex> l(qlock);
286       while (!q.empty())
287         qcond.wait(l);
288     }
289
290     bool flush_commit(Context *c) override {
291       std::lock_guard<std::mutex> l(qlock);
292       if (q.empty()) {
293         return true;
294       }
295       TransContext *txc = &q.back();
296       if (txc->state >= TransContext::STATE_KV_DONE) {
297         return true;
298       }
299       assert(txc->state < TransContext::STATE_KV_DONE);
300       txc->oncommits.push_back(c);
301       return false;
302     }
303   };
304
305   struct KVSyncThread : public Thread {
306     KStore *store;
307     explicit KVSyncThread(KStore *s) : store(s) {}
308     void *entry() override {
309       store->_kv_sync_thread();
310       return NULL;
311     }
312   };
313
314   // --------------------------------------------------------
315   // members
316 private:
317   KeyValueDB *db;
318   uuid_d fsid;
319   int path_fd;  ///< open handle to $path
320   int fsid_fd;  ///< open handle (locked) to $path/fsid
321   bool mounted;
322
323   RWLock coll_lock;    ///< rwlock to protect coll_map
324   ceph::unordered_map<coll_t, CollectionRef> coll_map;
325
326   std::mutex nid_lock;
327   uint64_t nid_last;
328   uint64_t nid_max;
329
330   Throttle throttle_ops, throttle_bytes;          ///< submit to commit
331
332   Finisher finisher;
333
334   KVSyncThread kv_sync_thread;
335   std::mutex kv_lock;
336   std::condition_variable kv_cond, kv_sync_cond;
337   bool kv_stop;
338   deque<TransContext*> kv_queue, kv_committing;
339
340   //Logger *logger;
341   PerfCounters *logger;
342   std::mutex reap_lock;
343   list<CollectionRef> removed_collections;
344
345
346   // --------------------------------------------------------
347   // private methods
348
349   void _init_logger();
350   void _shutdown_logger();
351
352   int _open_path();
353   void _close_path();
354   int _open_fsid(bool create);
355   int _lock_fsid();
356   int _read_fsid(uuid_d *f);
357   int _write_fsid();
358   void _close_fsid();
359   int _open_db(bool create);
360   void _close_db();
361   int _open_collections(int *errors=0);
362   void _close_collections();
363
364   int _open_super_meta();
365
366   CollectionRef _get_collection(coll_t cid);
367   void _queue_reap_collection(CollectionRef& c);
368   void _reap_collections();
369
370   void _assign_nid(TransContext *txc, OnodeRef o);
371
372   void _dump_onode(OnodeRef o);
373
374   TransContext *_txc_create(OpSequencer *osr);
375   void _txc_release(TransContext *txc, uint64_t offset, uint64_t length);
376   void _txc_add_transaction(TransContext *txc, Transaction *t);
377   void _txc_finalize(OpSequencer *osr, TransContext *txc);
378   void _txc_state_proc(TransContext *txc);
379   void _txc_finish_kv(TransContext *txc);
380   void _txc_finish(TransContext *txc);
381
382   void _osr_reap_done(OpSequencer *osr);
383
384   void _kv_sync_thread();
385   void _kv_stop() {
386     {
387       std::lock_guard<std::mutex> l(kv_lock);
388       kv_stop = true;
389       kv_cond.notify_all();
390     }
391     kv_sync_thread.join();
392     kv_stop = false;
393   }
394
395   void _do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl);
396   void _do_write_stripe(TransContext *txc, OnodeRef o,
397                         uint64_t offset, bufferlist& bl);
398   void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
399
400   int _collection_list(
401     Collection *c, const ghobject_t& start, const ghobject_t& end,
402     int max, vector<ghobject_t> *ls, ghobject_t *next);
403
404 public:
405   KStore(CephContext *cct, const string& path);
406   ~KStore() override;
407
408   string get_type() override {
409     return "kstore";
410   }
411
412   bool needs_journal() override { return false; };
413   bool wants_journal() override { return false; };
414   bool allows_journal() override { return false; };
415
416   static int get_block_device_fsid(const string& path, uuid_d *fsid);
417
418   bool test_mount_in_use() override;
419
420   int mount() override;
421   int umount() override;
422   void _sync();
423
424   int fsck(bool deep) override;
425
426
427   int validate_hobject_key(const hobject_t &obj) const override {
428     return 0;
429   }
430   unsigned get_max_attr_name_length() override {
431     return 256;  // arbitrary; there is no real limit internally
432   }
433
434   int mkfs() override;
435   int mkjournal() override {
436     return 0;
437   }
438   void dump_perf_counters(Formatter *f) override {
439     f->open_object_section("perf_counters");
440     logger->dump_formatted(f, false);
441     f->close_section();
442   }
443
444   int statfs(struct store_statfs_t *buf) override;
445
446   using ObjectStore::exists;
447   bool exists(const coll_t& cid, const ghobject_t& oid) override;
448   using ObjectStore::stat;
449   int stat(
450     const coll_t& cid,
451     const ghobject_t& oid,
452     struct stat *st,
453     bool allow_eio = false) override; // struct stat?
454   int set_collection_opts(
455     const coll_t& cid,
456     const pool_opts_t& opts) override;
457   using ObjectStore::read;
458   int read(
459     const coll_t& cid,
460     const ghobject_t& oid,
461     uint64_t offset,
462     size_t len,
463     bufferlist& bl,
464     uint32_t op_flags = 0) override;
465   int _do_read(
466     OnodeRef o,
467     uint64_t offset,
468     size_t len,
469     bufferlist& bl,
470     uint32_t op_flags = 0);
471
472   using ObjectStore::fiemap;
473   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
474   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
475   using ObjectStore::getattr;
476   int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr& value) override;
477   using ObjectStore::getattrs;
478   int getattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset) override;
479
480   int list_collections(vector<coll_t>& ls) override;
481   bool collection_exists(const coll_t& c) override;
482   int collection_empty(const coll_t& c, bool *empty) override;
483   int collection_bits(const coll_t& c) override;
484   int collection_list(
485     const coll_t& cid, const ghobject_t& start, const ghobject_t& end,
486     int max,
487     vector<ghobject_t> *ls, ghobject_t *next) override;
488   int collection_list(
489     CollectionHandle &c, const ghobject_t& start, const ghobject_t& end,
490     int max,
491     vector<ghobject_t> *ls, ghobject_t *next) override;
492
493   using ObjectStore::omap_get;
494   int omap_get(
495     const coll_t& cid,                ///< [in] Collection containing oid
496     const ghobject_t &oid,   ///< [in] Object containing omap
497     bufferlist *header,      ///< [out] omap header
498     map<string, bufferlist> *out /// < [out] Key to value map
499     ) override;
500
501   using ObjectStore::omap_get_header;
502   /// Get omap header
503   int omap_get_header(
504     const coll_t& cid,                ///< [in] Collection containing oid
505     const ghobject_t &oid,   ///< [in] Object containing omap
506     bufferlist *header,      ///< [out] omap header
507     bool allow_eio = false ///< [in] don't assert on eio
508     ) override;
509
510   using ObjectStore::omap_get_keys;
511   /// Get keys defined on oid
512   int omap_get_keys(
513     const coll_t& cid,              ///< [in] Collection containing oid
514     const ghobject_t &oid, ///< [in] Object containing omap
515     set<string> *keys      ///< [out] Keys defined on oid
516     ) override;
517
518   using ObjectStore::omap_get_values;
519   /// Get key values
520   int omap_get_values(
521     const coll_t& cid,                    ///< [in] Collection containing oid
522     const ghobject_t &oid,       ///< [in] Object containing omap
523     const set<string> &keys,     ///< [in] Keys to get
524     map<string, bufferlist> *out ///< [out] Returned keys and values
525     ) override;
526
527   using ObjectStore::omap_check_keys;
528   /// Filters keys into out which are defined on oid
529   int omap_check_keys(
530     const coll_t& cid,                ///< [in] Collection containing oid
531     const ghobject_t &oid,   ///< [in] Object containing omap
532     const set<string> &keys, ///< [in] Keys to check
533     set<string> *out         ///< [out] Subset of keys defined on oid
534     ) override;
535
536   using ObjectStore::get_omap_iterator;
537   ObjectMap::ObjectMapIterator get_omap_iterator(
538     const coll_t& cid,              ///< [in] collection
539     const ghobject_t &oid  ///< [in] object
540     ) override;
541
542   void set_fsid(uuid_d u) override {
543     fsid = u;
544   }
545   uuid_d get_fsid() override {
546     return fsid;
547   }
548
549   uint64_t estimate_objects_overhead(uint64_t num_objects) override {
550     return num_objects * 300; //assuming per-object overhead is 300 bytes
551   }
552
553   objectstore_perf_stat_t get_cur_stats() override {
554     return objectstore_perf_stat_t();
555   }
556   const PerfCounters* get_perf_counters() const override {
557     return logger;
558   }
559
560
561   int queue_transactions(
562     Sequencer *osr,
563     vector<Transaction>& tls,
564     TrackedOpRef op = TrackedOpRef(),
565     ThreadPool::TPHandle *handle = NULL) override;
566
567   void compact () override {
568     assert(db);
569     db->compact();
570   }
571   
572 private:
573   // --------------------------------------------------------
574   // write ops
575
576   int _do_transaction(Transaction *t,
577                       TransContext *txc,
578                       ThreadPool::TPHandle *handle);
579
580   int _write(TransContext *txc,
581              CollectionRef& c,
582              OnodeRef& o,
583              uint64_t offset, size_t len,
584              bufferlist& bl,
585              uint32_t fadvise_flags);
586   int _do_write(TransContext *txc,
587                 OnodeRef o,
588                 uint64_t offset, uint64_t length,
589                 bufferlist& bl,
590                 uint32_t fadvise_flags);
591   int _touch(TransContext *txc,
592              CollectionRef& c,
593              OnodeRef& o);
594   int _zero(TransContext *txc,
595             CollectionRef& c,
596             OnodeRef& o,
597             uint64_t offset, size_t len);
598   int _do_truncate(TransContext *txc,
599                    OnodeRef o,
600                    uint64_t offset);
601   int _truncate(TransContext *txc,
602                 CollectionRef& c,
603                 OnodeRef& o,
604                 uint64_t offset);
605   int _remove(TransContext *txc,
606               CollectionRef& c,
607               OnodeRef& o);
608   int _do_remove(TransContext *txc,
609                  OnodeRef o);
610   int _setattr(TransContext *txc,
611                CollectionRef& c,
612                OnodeRef& o,
613                const string& name,
614                bufferptr& val);
615   int _setattrs(TransContext *txc,
616                 CollectionRef& c,
617                 OnodeRef& o,
618                 const map<string,bufferptr>& aset);
619   int _rmattr(TransContext *txc,
620               CollectionRef& c,
621               OnodeRef& o,
622               const string& name);
623   int _rmattrs(TransContext *txc,
624                CollectionRef& c,
625                OnodeRef& o);
626   void _do_omap_clear(TransContext *txc, uint64_t id);
627   int _omap_clear(TransContext *txc,
628                   CollectionRef& c,
629                   OnodeRef& o);
630   int _omap_setkeys(TransContext *txc,
631                     CollectionRef& c,
632                     OnodeRef& o,
633                     bufferlist& bl);
634   int _omap_setheader(TransContext *txc,
635                       CollectionRef& c,
636                       OnodeRef& o,
637                       bufferlist& header);
638   int _omap_rmkeys(TransContext *txc,
639                    CollectionRef& c,
640                    OnodeRef& o,
641                    bufferlist& bl);
642   int _omap_rmkey_range(TransContext *txc,
643                         CollectionRef& c,
644                         OnodeRef& o,
645                         const string& first, const string& last);
646   int _setallochint(TransContext *txc,
647                     CollectionRef& c,
648                     OnodeRef& o,
649                     uint64_t expected_object_size,
650                     uint64_t expected_write_size,
651                     uint32_t flags);
652   int _clone(TransContext *txc,
653              CollectionRef& c,
654              OnodeRef& oldo,
655              OnodeRef& newo);
656   int _clone_range(TransContext *txc,
657                    CollectionRef& c,
658                    OnodeRef& oldo,
659                    OnodeRef& newo,
660                    uint64_t srcoff, uint64_t length, uint64_t dstoff);
661   int _rename(TransContext *txc,
662               CollectionRef& c,
663               OnodeRef& oldo,
664               OnodeRef& newo,
665               const ghobject_t& new_oid);
666   int _create_collection(TransContext *txc, coll_t cid, unsigned bits,
667                          CollectionRef *c);
668   int _remove_collection(TransContext *txc, coll_t cid, CollectionRef *c);
669   int _split_collection(TransContext *txc,
670                         CollectionRef& c,
671                         CollectionRef& d,
672                         unsigned bits, int rem);
673
674 };
675
676 inline ostream& operator<<(ostream& out, const KStore::OpSequencer& s) {
677   return out << *s.parent;
678 }
679
680 static inline void intrusive_ptr_add_ref(KStore::Onode *o) {
681   o->get();
682 }
683 static inline void intrusive_ptr_release(KStore::Onode *o) {
684   o->put();
685 }
686
687 static inline void intrusive_ptr_add_ref(KStore::OpSequencer *o) {
688   o->get();
689 }
690 static inline void intrusive_ptr_release(KStore::OpSequencer *o) {
691   o->put();
692 }
693
694 #endif