Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN  8  // so actually 1/N
56
57
58 enum {
59   l_bluestore_first = 732430,
60   l_bluestore_kv_flush_lat,
61   l_bluestore_kv_commit_lat,
62   l_bluestore_kv_lat,
63   l_bluestore_state_prepare_lat,
64   l_bluestore_state_aio_wait_lat,
65   l_bluestore_state_io_done_lat,
66   l_bluestore_state_kv_queued_lat,
67   l_bluestore_state_kv_committing_lat,
68   l_bluestore_state_kv_done_lat,
69   l_bluestore_state_deferred_queued_lat,
70   l_bluestore_state_deferred_aio_wait_lat,
71   l_bluestore_state_deferred_cleanup_lat,
72   l_bluestore_state_finishing_lat,
73   l_bluestore_state_done_lat,
74   l_bluestore_throttle_lat,
75   l_bluestore_submit_lat,
76   l_bluestore_commit_lat,
77   l_bluestore_read_lat,
78   l_bluestore_read_onode_meta_lat,
79   l_bluestore_read_wait_aio_lat,
80   l_bluestore_compress_lat,
81   l_bluestore_decompress_lat,
82   l_bluestore_csum_lat,
83   l_bluestore_compress_success_count,
84   l_bluestore_compress_rejected_count,
85   l_bluestore_write_pad_bytes,
86   l_bluestore_deferred_write_ops,
87   l_bluestore_deferred_write_bytes,
88   l_bluestore_write_penalty_read_ops,
89   l_bluestore_allocated,
90   l_bluestore_stored,
91   l_bluestore_compressed,
92   l_bluestore_compressed_allocated,
93   l_bluestore_compressed_original,
94   l_bluestore_onodes,
95   l_bluestore_onode_hits,
96   l_bluestore_onode_misses,
97   l_bluestore_onode_shard_hits,
98   l_bluestore_onode_shard_misses,
99   l_bluestore_extents,
100   l_bluestore_blobs,
101   l_bluestore_buffers,
102   l_bluestore_buffer_bytes,
103   l_bluestore_buffer_hit_bytes,
104   l_bluestore_buffer_miss_bytes,
105   l_bluestore_write_big,
106   l_bluestore_write_big_bytes,
107   l_bluestore_write_big_blobs,
108   l_bluestore_write_small,
109   l_bluestore_write_small_bytes,
110   l_bluestore_write_small_unused,
111   l_bluestore_write_small_deferred,
112   l_bluestore_write_small_pre_read,
113   l_bluestore_write_small_new,
114   l_bluestore_txc,
115   l_bluestore_onode_reshard,
116   l_bluestore_blob_split,
117   l_bluestore_extent_compress,
118   l_bluestore_gc_merged,
119   l_bluestore_last
120 };
121
122 class BlueStore : public ObjectStore,
123                   public md_config_obs_t {
124   // -----------------------------------------------------
125   // types
126 public:
127   // config observer
128   const char** get_tracked_conf_keys() const override;
129   void handle_conf_change(const struct md_config_t *conf,
130                                   const std::set<std::string> &changed) override;
131
132   void _set_csum();
133   void _set_compression();
134   void _set_throttle_params();
135   int _set_cache_sizes();
136
137   class TransContext;
138
139   typedef map<uint64_t, bufferlist> ready_regions_t;
140
141   struct BufferSpace;
142   struct Collection;
143   typedef boost::intrusive_ptr<Collection> CollectionRef;
144
145   struct AioContext {
146     virtual void aio_finish(BlueStore *store) = 0;
147     virtual ~AioContext() {}
148   };
149
150   /// cached buffer
151   struct Buffer {
152     MEMPOOL_CLASS_HELPERS();
153
154     enum {
155       STATE_EMPTY,     ///< empty buffer -- used for cache history
156       STATE_CLEAN,     ///< clean data that is up to date
157       STATE_WRITING,   ///< data that is being written (io not yet complete)
158     };
159     static const char *get_state_name(int s) {
160       switch (s) {
161       case STATE_EMPTY: return "empty";
162       case STATE_CLEAN: return "clean";
163       case STATE_WRITING: return "writing";
164       default: return "???";
165       }
166     }
167     enum {
168       FLAG_NOCACHE = 1,  ///< trim when done WRITING (do not become CLEAN)
169       // NOTE: fix operator<< when you define a second flag
170     };
171     static const char *get_flag_name(int s) {
172       switch (s) {
173       case FLAG_NOCACHE: return "nocache";
174       default: return "???";
175       }
176     }
177
178     BufferSpace *space;
179     uint16_t state;             ///< STATE_*
180     uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
181     uint32_t flags;             ///< FLAG_*
182     uint64_t seq;
183     uint32_t offset, length;
184     bufferlist data;
185
186     boost::intrusive::list_member_hook<> lru_item;
187     boost::intrusive::list_member_hook<> state_item;
188
189     Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
190            unsigned f = 0)
191       : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
192     Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
193            unsigned f = 0)
194       : space(space), state(s), flags(f), seq(q), offset(o),
195         length(b.length()), data(b) {}
196
197     bool is_empty() const {
198       return state == STATE_EMPTY;
199     }
200     bool is_clean() const {
201       return state == STATE_CLEAN;
202     }
203     bool is_writing() const {
204       return state == STATE_WRITING;
205     }
206
207     uint32_t end() const {
208       return offset + length;
209     }
210
211     void truncate(uint32_t newlen) {
212       assert(newlen < length);
213       if (data.length()) {
214         bufferlist t;
215         t.substr_of(data, 0, newlen);
216         data.claim(t);
217       }
218       length = newlen;
219     }
220     void maybe_rebuild() {
221       if (data.length() &&
222           (data.get_num_buffers() > 1 ||
223            data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
224         data.rebuild();
225       }
226     }
227
228     void dump(Formatter *f) const {
229       f->dump_string("state", get_state_name(state));
230       f->dump_unsigned("seq", seq);
231       f->dump_unsigned("offset", offset);
232       f->dump_unsigned("length", length);
233       f->dump_unsigned("data_length", data.length());
234     }
235   };
236
237   struct Cache;
238
239   /// map logical extent range (object) onto buffers
240   struct BufferSpace {
241     typedef boost::intrusive::list<
242       Buffer,
243       boost::intrusive::member_hook<
244         Buffer,
245         boost::intrusive::list_member_hook<>,
246         &Buffer::state_item> > state_list_t;
247
248     mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
249       buffer_map;
250
251     // we use a bare intrusive list here instead of std::map because
252     // it uses less memory and we expect this to be very small (very
253     // few IOs in flight to the same Blob at the same time).
254     state_list_t writing;   ///< writing buffers, sorted by seq, ascending
255
256     ~BufferSpace() {
257       assert(buffer_map.empty());
258       assert(writing.empty());
259     }
260
261     void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
262       cache->_audit("_add_buffer start");
263       buffer_map[b->offset].reset(b);
264       if (b->is_writing()) {
265         b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
266         if (writing.empty() || writing.rbegin()->seq <= b->seq) {
267           writing.push_back(*b);
268         } else {
269           auto it = writing.begin();
270           while (it->seq < b->seq) {
271             ++it;
272           }
273
274           assert(it->seq >= b->seq);
275           // note that this will insert b before it
276           // hence the order is maintained
277           writing.insert(it, *b);
278         }
279       } else {
280         b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
281         cache->_add_buffer(b, level, near);
282       }
283       cache->_audit("_add_buffer end");
284     }
285     void _rm_buffer(Cache* cache, Buffer *b) {
286       _rm_buffer(cache, buffer_map.find(b->offset));
287     }
288     void _rm_buffer(Cache* cache,
289                     map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
290       assert(p != buffer_map.end());
291       cache->_audit("_rm_buffer start");
292       if (p->second->is_writing()) {
293         writing.erase(writing.iterator_to(*p->second));
294       } else {
295         cache->_rm_buffer(p->second.get());
296       }
297       buffer_map.erase(p);
298       cache->_audit("_rm_buffer end");
299     }
300
301     map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
302       uint32_t offset) {
303       auto i = buffer_map.lower_bound(offset);
304       if (i != buffer_map.begin()) {
305         --i;
306         if (i->first + i->second->length <= offset)
307           ++i;
308       }
309       return i;
310     }
311
312     // must be called under protection of the Cache lock
313     void _clear(Cache* cache);
314
315     // return value is the highest cache_private of a trimmed buffer, or 0.
316     int discard(Cache* cache, uint32_t offset, uint32_t length) {
317       std::lock_guard<std::recursive_mutex> l(cache->lock);
318       return _discard(cache, offset, length);
319     }
320     int _discard(Cache* cache, uint32_t offset, uint32_t length);
321
322     void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
323                unsigned flags) {
324       std::lock_guard<std::recursive_mutex> l(cache->lock);
325       Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
326                              flags);
327       b->cache_private = _discard(cache, offset, bl.length());
328       _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
329     }
330     void finish_write(Cache* cache, uint64_t seq);
331     void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
332       std::lock_guard<std::recursive_mutex> l(cache->lock);
333       Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
334       b->cache_private = _discard(cache, offset, bl.length());
335       _add_buffer(cache, b, 1, nullptr);
336     }
337
338     void read(Cache* cache, uint32_t offset, uint32_t length,
339               BlueStore::ready_regions_t& res,
340               interval_set<uint32_t>& res_intervals);
341
342     void truncate(Cache* cache, uint32_t offset) {
343       discard(cache, offset, (uint32_t)-1 - offset);
344     }
345
346     void split(Cache* cache, size_t pos, BufferSpace &r);
347
348     void dump(Cache* cache, Formatter *f) const {
349       std::lock_guard<std::recursive_mutex> l(cache->lock);
350       f->open_array_section("buffers");
351       for (auto& i : buffer_map) {
352         f->open_object_section("buffer");
353         assert(i.first == i.second->offset);
354         i.second->dump(f);
355         f->close_section();
356       }
357       f->close_section();
358     }
359   };
360
361   struct SharedBlobSet;
362
363   /// in-memory shared blob state (incl cached buffers)
364   struct SharedBlob {
365     MEMPOOL_CLASS_HELPERS();
366
367     std::atomic_int nref = {0}; ///< reference count
368     bool loaded = false;
369
370     CollectionRef coll;
371     union {
372       uint64_t sbid_unloaded;              ///< sbid if persistent isn't loaded
373       bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
374     };
375     BufferSpace bc;             ///< buffer cache
376
377     SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
378       if (get_cache()) {
379         get_cache()->add_blob();
380       }
381     }
382     SharedBlob(uint64_t i, Collection *_coll);
383     ~SharedBlob();
384
385     uint64_t get_sbid() const {
386       return loaded ? persistent->sbid : sbid_unloaded;
387     }
388
389     friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
390     friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
391
392     friend ostream& operator<<(ostream& out, const SharedBlob& sb);
393
394     void get() {
395       ++nref;
396     }
397     void put();
398
399     /// get logical references
400     void get_ref(uint64_t offset, uint32_t length);
401
402     /// put logical references, and get back any released extents
403     void put_ref(uint64_t offset, uint32_t length,
404                  PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
405
406     friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
407       return l.get_sbid() == r.get_sbid();
408     }
409     inline Cache* get_cache() {
410       return coll ? coll->cache : nullptr;
411     }
412     inline SharedBlobSet* get_parent() {
413       return coll ? &(coll->shared_blob_set) : nullptr;
414     }
415     inline bool is_loaded() const {
416       return loaded;
417     }
418
419   };
420   typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
421
422   /// a lookup table of SharedBlobs
423   struct SharedBlobSet {
424     std::mutex lock;   ///< protect lookup, insertion, removal
425
426     // we use a bare pointer because we don't want to affect the ref
427     // count
428     mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
429
430     SharedBlobRef lookup(uint64_t sbid) {
431       std::lock_guard<std::mutex> l(lock);
432       auto p = sb_map.find(sbid);
433       if (p == sb_map.end()) {
434         return nullptr;
435       }
436       return p->second;
437     }
438
439     void add(Collection* coll, SharedBlob *sb) {
440       std::lock_guard<std::mutex> l(lock);
441       sb_map[sb->get_sbid()] = sb;
442       sb->coll = coll;
443     }
444
445     bool try_remove(SharedBlob *sb) {
446       std::lock_guard<std::mutex> l(lock);
447       if (sb->nref == 0) {
448         assert(sb->get_parent() == this);
449         sb_map.erase(sb->get_sbid());
450         return true;
451       }
452       return false;
453     }
454
455     void remove(SharedBlob *sb) {
456       std::lock_guard<std::mutex> l(lock);
457       assert(sb->get_parent() == this);
458       sb_map.erase(sb->get_sbid());
459     }
460
461     bool empty() {
462       std::lock_guard<std::mutex> l(lock);
463       return sb_map.empty();
464     }
465
466     void dump(CephContext *cct, int lvl);
467   };
468
469 //#define CACHE_BLOB_BL  // not sure if this is a win yet or not... :/
470
471   /// in-memory blob metadata and associated cached buffers (if any)
472   struct Blob {
473     MEMPOOL_CLASS_HELPERS();
474
475     std::atomic_int nref = {0};     ///< reference count
476     int16_t id = -1;                ///< id, for spanning blobs only, >= 0
477     int16_t last_encoded_id = -1;   ///< (ephemeral) used during encoding only
478     SharedBlobRef shared_blob;      ///< shared blob state (if any)
479
480   private:
481     mutable bluestore_blob_t blob;  ///< decoded blob metadata
482 #ifdef CACHE_BLOB_BL
483     mutable bufferlist blob_bl;     ///< cached encoded blob, blob is dirty if empty
484 #endif
485     /// refs from this shard.  ephemeral if id<0, persisted if spanning.
486     bluestore_blob_use_tracker_t used_in_blob;
487
488   public:
489
490     friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
491     friend void intrusive_ptr_release(Blob *b) { b->put(); }
492
493     friend ostream& operator<<(ostream& out, const Blob &b);
494
495     const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
496       return used_in_blob;
497     }
498     bool is_referenced() const {
499       return used_in_blob.is_not_empty();
500     }
501     uint32_t get_referenced_bytes() const {
502       return used_in_blob.get_referenced_bytes();
503     }
504
505     bool is_spanning() const {
506       return id >= 0;
507     }
508
509     bool can_split() const {
510       std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
511       // splitting a BufferSpace writing list is too hard; don't try.
512       return shared_blob->bc.writing.empty() &&
513              used_in_blob.can_split() &&
514              get_blob().can_split();
515     }
516
517     bool can_split_at(uint32_t blob_offset) const {
518       return used_in_blob.can_split_at(blob_offset) &&
519              get_blob().can_split_at(blob_offset);
520     }
521
522     bool can_reuse_blob(uint32_t min_alloc_size,
523                         uint32_t target_blob_size,
524                         uint32_t b_offset,
525                         uint32_t *length0);
526
527     void dup(Blob& o) {
528       o.shared_blob = shared_blob;
529       o.blob = blob;
530 #ifdef CACHE_BLOB_BL
531       o.blob_bl = blob_bl;
532 #endif
533     }
534
535     inline const bluestore_blob_t& get_blob() const {
536       return blob;
537     }
538     inline bluestore_blob_t& dirty_blob() {
539 #ifdef CACHE_BLOB_BL
540       blob_bl.clear();
541 #endif
542       return blob;
543     }
544
545     /// discard buffers for unallocated regions
546     void discard_unallocated(Collection *coll);
547
548     /// get logical references
549     void get_ref(Collection *coll, uint32_t offset, uint32_t length);
550     /// put logical references, and get back any released extents
551     bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
552                  PExtentVector *r);
553
554     /// split the blob
555     void split(Collection *coll, uint32_t blob_offset, Blob *o);
556
557     void get() {
558       ++nref;
559     }
560     void put() {
561       if (--nref == 0)
562         delete this;
563     }
564
565
566 #ifdef CACHE_BLOB_BL
567     void _encode() const {
568       if (blob_bl.length() == 0 ) {
569         ::encode(blob, blob_bl);
570       } else {
571         assert(blob_bl.length());
572       }
573     }
574     void bound_encode(
575       size_t& p,
576       bool include_ref_map) const {
577       _encode();
578       p += blob_bl.length();
579       if (include_ref_map) {
580         used_in_blob.bound_encode(p);
581       }
582     }
583     void encode(
584       bufferlist::contiguous_appender& p,
585       bool include_ref_map) const {
586       _encode();
587       p.append(blob_bl);
588       if (include_ref_map) {
589         used_in_blob.encode(p);
590       }
591     }
592     void decode(
593       Collection */*coll*/,
594       bufferptr::iterator& p,
595       bool include_ref_map) {
596       const char *start = p.get_pos();
597       denc(blob, p);
598       const char *end = p.get_pos();
599       blob_bl.clear();
600       blob_bl.append(start, end - start);
601       if (include_ref_map) {
602         used_in_blob.decode(p);
603       }
604     }
605 #else
606     void bound_encode(
607       size_t& p,
608       uint64_t struct_v,
609       uint64_t sbid,
610       bool include_ref_map) const {
611       denc(blob, p, struct_v);
612       if (blob.is_shared()) {
613         denc(sbid, p);
614       }
615       if (include_ref_map) {
616         used_in_blob.bound_encode(p);
617       }
618     }
619     void encode(
620       bufferlist::contiguous_appender& p,
621       uint64_t struct_v,
622       uint64_t sbid,
623       bool include_ref_map) const {
624       denc(blob, p, struct_v);
625       if (blob.is_shared()) {
626         denc(sbid, p);
627       }
628       if (include_ref_map) {
629         used_in_blob.encode(p);
630       }
631     }
632     void decode(
633       Collection *coll,
634       bufferptr::iterator& p,
635       uint64_t struct_v,
636       uint64_t* sbid,
637       bool include_ref_map);
638 #endif
639   };
640   typedef boost::intrusive_ptr<Blob> BlobRef;
641   typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
642
643   /// a logical extent, pointing to (some portion of) a blob
644   typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
645   struct Extent : public ExtentBase {
646     MEMPOOL_CLASS_HELPERS();
647
648     uint32_t logical_offset = 0;      ///< logical offset
649     uint32_t blob_offset = 0;         ///< blob offset
650     uint32_t length = 0;              ///< length
651     BlobRef  blob;                    ///< the blob with our data
652
653     /// ctor for lookup only
654     explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
655     /// ctor for delayed initialization (see decode_some())
656     explicit Extent() : ExtentBase() {
657     }
658     /// ctor for general usage
659     Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
660       : ExtentBase(),
661         logical_offset(lo), blob_offset(o), length(l) {
662       assign_blob(b);
663     }
664     ~Extent() {
665       if (blob) {
666         blob->shared_blob->get_cache()->rm_extent();
667       }
668     }
669
670     void assign_blob(const BlobRef& b) {
671       assert(!blob);
672       blob = b;
673       blob->shared_blob->get_cache()->add_extent();
674     }
675
676     // comparators for intrusive_set
677     friend bool operator<(const Extent &a, const Extent &b) {
678       return a.logical_offset < b.logical_offset;
679     }
680     friend bool operator>(const Extent &a, const Extent &b) {
681       return a.logical_offset > b.logical_offset;
682     }
683     friend bool operator==(const Extent &a, const Extent &b) {
684       return a.logical_offset == b.logical_offset;
685     }
686
687     uint32_t blob_start() const {
688       return logical_offset - blob_offset;
689     }
690
691     uint32_t blob_end() const {
692       return blob_start() + blob->get_blob().get_logical_length();
693     }
694
695     uint32_t logical_end() const {
696       return logical_offset + length;
697     }
698
699     // return true if any piece of the blob is out of
700     // the given range [o, o + l].
701     bool blob_escapes_range(uint32_t o, uint32_t l) const {
702       return blob_start() < o || blob_end() > o + l;
703     }
704   };
705   typedef boost::intrusive::set<Extent> extent_map_t;
706
707
708   friend ostream& operator<<(ostream& out, const Extent& e);
709
710   struct OldExtent {
711     boost::intrusive::list_member_hook<> old_extent_item;
712     Extent e;
713     PExtentVector r;
714     bool blob_empty; // flag to track the last removed extent that makes blob
715                      // empty - required to update compression stat properly
716     OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
717       : e(lo, o, l, b), blob_empty(false) {
718     }
719     static OldExtent* create(CollectionRef c,
720                              uint32_t lo,
721                              uint32_t o,
722                              uint32_t l,
723                              BlobRef& b);
724   };
725   typedef boost::intrusive::list<
726       OldExtent,
727       boost::intrusive::member_hook<
728         OldExtent,
729     boost::intrusive::list_member_hook<>,
730     &OldExtent::old_extent_item> > old_extent_map_t;
731
732   struct Onode;
733
734   /// a sharded extent map, mapping offsets to lextents to blobs
735   struct ExtentMap {
736     Onode *onode;
737     extent_map_t extent_map;        ///< map of Extents to Blobs
738     blob_map_t spanning_blob_map;   ///< blobs that span shards
739
740     struct Shard {
741       bluestore_onode_t::shard_info *shard_info = nullptr;
742       unsigned extents = 0;  ///< count extents in this shard
743       bool loaded = false;   ///< true if shard is loaded
744       bool dirty = false;    ///< true if shard is dirty and needs reencoding
745     };
746     mempool::bluestore_cache_other::vector<Shard> shards;    ///< shards
747
748     bufferlist inline_bl;    ///< cached encoded map, if unsharded; empty=>dirty
749
750     uint32_t needs_reshard_begin = 0;
751     uint32_t needs_reshard_end = 0;
752
753     bool needs_reshard() const {
754       return needs_reshard_end > needs_reshard_begin;
755     }
756     void clear_needs_reshard() {
757       needs_reshard_begin = needs_reshard_end = 0;
758     }
759     void request_reshard(uint32_t begin, uint32_t end) {
760       if (begin < needs_reshard_begin) {
761         needs_reshard_begin = begin;
762       }
763       if (end > needs_reshard_end) {
764         needs_reshard_end = end;
765       }
766     }
767
768     struct DeleteDisposer {
769       void operator()(Extent *e) { delete e; }
770     };
771
772     ExtentMap(Onode *o);
773     ~ExtentMap() {
774       extent_map.clear_and_dispose(DeleteDisposer());
775     }
776
777     void clear() {
778       extent_map.clear_and_dispose(DeleteDisposer());
779       shards.clear();
780       inline_bl.clear();
781       clear_needs_reshard();
782     }
783
784     bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
785                      unsigned *pn);
786     unsigned decode_some(bufferlist& bl);
787
788     void bound_encode_spanning_blobs(size_t& p);
789     void encode_spanning_blobs(bufferlist::contiguous_appender& p);
790     void decode_spanning_blobs(bufferptr::iterator& p);
791
792     BlobRef get_spanning_blob(int id) {
793       auto p = spanning_blob_map.find(id);
794       assert(p != spanning_blob_map.end());
795       return p->second;
796     }
797
798     void update(KeyValueDB::Transaction t, bool force);
799     decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
800     void reshard(
801       KeyValueDB *db,
802       KeyValueDB::Transaction t);
803
804     /// initialize Shards from the onode
805     void init_shards(bool loaded, bool dirty);
806
807     /// return index of shard containing offset
808     /// or -1 if not found
809     int seek_shard(uint32_t offset) {
810       size_t end = shards.size();
811       size_t mid, left = 0;
812       size_t right = end; // one passed the right end
813
814       while (left < right) {
815         mid = left + (right - left) / 2;
816         if (offset >= shards[mid].shard_info->offset) {
817           size_t next = mid + 1;
818           if (next >= end || offset < shards[next].shard_info->offset)
819             return mid;
820           //continue to search forwards
821           left = next;
822         } else {
823           //continue to search backwards
824           right = mid;
825         }
826       }
827
828       return -1; // not found
829     }
830
831     /// check if a range spans a shard
832     bool spans_shard(uint32_t offset, uint32_t length) {
833       if (shards.empty()) {
834         return false;
835       }
836       int s = seek_shard(offset);
837       assert(s >= 0);
838       if (s == (int)shards.size() - 1) {
839         return false; // last shard
840       }
841       if (offset + length <= shards[s+1].shard_info->offset) {
842         return false;
843       }
844       return true;
845     }
846
847     /// ensure that a range of the map is loaded
848     void fault_range(KeyValueDB *db,
849                      uint32_t offset, uint32_t length);
850
851     /// ensure a range of the map is marked dirty
852     void dirty_range(uint32_t offset, uint32_t length);
853
854     /// for seek_lextent test
855     extent_map_t::iterator find(uint64_t offset);
856
857     /// seek to the first lextent including or after offset
858     extent_map_t::iterator seek_lextent(uint64_t offset);
859     extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
860
861     /// add a new Extent
862     void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
863       extent_map.insert(*new Extent(lo, o, l, b));
864     }
865
866     /// remove (and delete) an Extent
867     void rm(extent_map_t::iterator p) {
868       extent_map.erase_and_dispose(p, DeleteDisposer());
869     }
870
871     bool has_any_lextents(uint64_t offset, uint64_t length);
872
873     /// consolidate adjacent lextents in extent_map
874     int compress_extent_map(uint64_t offset, uint64_t length);
875
876     /// punch a logical hole.  add lextents to deref to target list.
877     void punch_hole(CollectionRef &c,
878                     uint64_t offset, uint64_t length,
879                     old_extent_map_t *old_extents);
880
881     /// put new lextent into lextent_map overwriting existing ones if
882     /// any and update references accordingly
883     Extent *set_lextent(CollectionRef &c,
884                         uint64_t logical_offset,
885                         uint64_t offset, uint64_t length,
886                         BlobRef b,
887                         old_extent_map_t *old_extents);
888
889     /// split a blob (and referring extents)
890     BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
891   };
892
893   /// Compressed Blob Garbage collector
894   /*
895   The primary idea of the collector is to estimate a difference between
896   allocation units(AU) currently present for compressed blobs and new AUs
897   required to store that data uncompressed. 
898   Estimation is performed for protrusive extents within a logical range
899   determined by a concatenation of old_extents collection and specific(current)
900   write request.
901   The root cause for old_extents use is the need to handle blob ref counts
902   properly. Old extents still hold blob refs and hence we need to traverse
903   the collection to determine if blob to be released.
904   Protrusive extents are extents that fit into the blob set in action
905   (ones that are below the logical range from above) but not removed totally
906   due to the current write. 
907   E.g. for
908   extent1 <loffs = 100, boffs = 100, len  = 100> -> 
909     blob1<compressed, len_on_disk=4096, logical_len=8192>
910   extent2 <loffs = 200, boffs = 200, len  = 100> ->
911     blob2<raw, len_on_disk=4096, llen=4096>
912   extent3 <loffs = 300, boffs = 300, len  = 100> ->
913     blob1<compressed, len_on_disk=4096, llen=8192>
914   extent4 <loffs = 4096, boffs = 0, len  = 100>  ->
915     blob3<raw, len_on_disk=4096, llen=4096>
916   write(300~100)
917   protrusive extents are within the following ranges <0~300, 400~8192-400>
918   In this case existing AUs that might be removed due to GC (i.e. blob1) 
919   use 2x4K bytes.
920   And new AUs expected after GC = 0 since extent1 to be merged into blob2.
921   Hence we should do a collect.
922   */
923   class GarbageCollector
924   {
925   public:
926     /// return amount of allocation units that might be saved due to GC
927     int64_t estimate(
928       uint64_t offset,
929       uint64_t length,
930       const ExtentMap& extent_map,
931       const old_extent_map_t& old_extents,
932       uint64_t min_alloc_size);
933
934     /// return a collection of extents to perform GC on
935     const vector<AllocExtent>& get_extents_to_collect() const {
936       return extents_to_collect;
937     }
938     GarbageCollector(CephContext* _cct) : cct(_cct) {}
939
940   private:
941     struct BlobInfo {
942       uint64_t referenced_bytes = 0;    ///< amount of bytes referenced in blob
943       int64_t expected_allocations = 0; ///< new alloc units required 
944                                         ///< in case of gc fulfilled
945       bool collect_candidate = false;   ///< indicate if blob has any extents 
946                                         ///< eligible for GC.
947       extent_map_t::const_iterator first_lextent; ///< points to the first 
948                                                   ///< lextent referring to 
949                                                   ///< the blob if any.
950                                                   ///< collect_candidate flag 
951                                                   ///< determines the validity
952       extent_map_t::const_iterator last_lextent;  ///< points to the last 
953                                                   ///< lextent referring to 
954                                                   ///< the blob if any.
955
956       BlobInfo(uint64_t ref_bytes) :
957         referenced_bytes(ref_bytes) {
958       }
959     };
960     CephContext* cct;
961     map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
962                                          ///< copies that are affected by the
963                                          ///< specific write
964
965     vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
966                                             ///< be collected if GC takes place
967
968     boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
969                                                 ///<  unit when traversing 
970                                                 ///< protrusive extents. 
971                                                 ///< Other extents mapped to
972                                                 ///< this AU to be ignored 
973                                                 ///< (except the case where
974                                                 ///< uncompressed extent follows
975                                                 ///< compressed one - see below).
976     BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
977                                            ///< caused expected_allocations
978                                            ///< counter increment at this blob.
979                                            ///< if uncompressed extent follows 
980                                            ///< a decrement for the 
981                                            ///< expected_allocations counter 
982                                            ///< is needed
983     int64_t expected_allocations = 0;      ///< new alloc units required in case
984                                            ///< of gc fulfilled
985     int64_t expected_for_release = 0;      ///< alloc units currently used by
986                                            ///< compressed blobs that might
987                                            ///< gone after GC
988     uint64_t gc_start_offset;              ///starting offset for GC
989     uint64_t gc_end_offset;                ///ending offset for GC
990
991   protected:
992     void process_protrusive_extents(const BlueStore::ExtentMap& extent_map, 
993                                     uint64_t start_offset,
994                                     uint64_t end_offset,
995                                     uint64_t start_touch_offset,
996                                     uint64_t end_touch_offset,
997                                     uint64_t min_alloc_size);
998   };
999
1000   struct OnodeSpace;
1001
1002   /// an in-memory object
1003   struct Onode {
1004     MEMPOOL_CLASS_HELPERS();
1005
1006     std::atomic_int nref;  ///< reference count
1007     Collection *c;
1008
1009     ghobject_t oid;
1010
1011     /// key under PREFIX_OBJ where we are stored
1012     mempool::bluestore_cache_other::string key;
1013
1014     boost::intrusive::list_member_hook<> lru_item;
1015
1016     bluestore_onode_t onode;  ///< metadata stored as value in kv store
1017     bool exists;              ///< true if object logically exists
1018
1019     ExtentMap extent_map;
1020
1021     // track txc's that have not been committed to kv store (and whose
1022     // effects cannot be read via the kvdb read methods)
1023     std::atomic<int> flushing_count = {0};
1024     std::mutex flush_lock;  ///< protect flush_txns
1025     std::condition_variable flush_cond;   ///< wait here for uncommitted txns
1026
1027     Onode(Collection *c, const ghobject_t& o,
1028           const mempool::bluestore_cache_other::string& k)
1029       : nref(0),
1030         c(c),
1031         oid(o),
1032         key(k),
1033         exists(false),
1034         extent_map(this) {
1035     }
1036
1037     void flush();
1038     void get() {
1039       ++nref;
1040     }
1041     void put() {
1042       if (--nref == 0)
1043         delete this;
1044     }
1045   };
1046   typedef boost::intrusive_ptr<Onode> OnodeRef;
1047
1048
1049   /// a cache (shard) of onodes and buffers
1050   struct Cache {
1051     CephContext* cct;
1052     PerfCounters *logger;
1053     std::recursive_mutex lock;          ///< protect lru and other structures
1054
1055     std::atomic<uint64_t> num_extents = {0};
1056     std::atomic<uint64_t> num_blobs = {0};
1057
1058     static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1059
1060     Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1061     virtual ~Cache() {}
1062
1063     virtual void _add_onode(OnodeRef& o, int level) = 0;
1064     virtual void _rm_onode(OnodeRef& o) = 0;
1065     virtual void _touch_onode(OnodeRef& o) = 0;
1066
1067     virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1068     virtual void _rm_buffer(Buffer *b) = 0;
1069     virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1070     virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1071     virtual void _touch_buffer(Buffer *b) = 0;
1072
1073     virtual uint64_t _get_num_onodes() = 0;
1074     virtual uint64_t _get_buffer_bytes() = 0;
1075
1076     void add_extent() {
1077       ++num_extents;
1078     }
1079     void rm_extent() {
1080       --num_extents;
1081     }
1082
1083     void add_blob() {
1084       ++num_blobs;
1085     }
1086     void rm_blob() {
1087       --num_blobs;
1088     }
1089
1090     void trim(uint64_t target_bytes,
1091               float target_meta_ratio,
1092               float target_data_ratio,
1093               float bytes_per_onode);
1094
1095     void trim_all();
1096
1097     virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1098
1099     virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1100                            uint64_t *blobs,
1101                            uint64_t *buffers,
1102                            uint64_t *bytes) = 0;
1103
1104     bool empty() {
1105       std::lock_guard<std::recursive_mutex> l(lock);
1106       return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1107     }
1108
1109 #ifdef DEBUG_CACHE
1110     virtual void _audit(const char *s) = 0;
1111 #else
1112     void _audit(const char *s) { /* no-op */ }
1113 #endif
1114   };
1115
1116   /// simple LRU cache for onodes and buffers
1117   struct LRUCache : public Cache {
1118   private:
1119     typedef boost::intrusive::list<
1120       Onode,
1121       boost::intrusive::member_hook<
1122         Onode,
1123         boost::intrusive::list_member_hook<>,
1124         &Onode::lru_item> > onode_lru_list_t;
1125     typedef boost::intrusive::list<
1126       Buffer,
1127       boost::intrusive::member_hook<
1128         Buffer,
1129         boost::intrusive::list_member_hook<>,
1130         &Buffer::lru_item> > buffer_lru_list_t;
1131
1132     onode_lru_list_t onode_lru;
1133
1134     buffer_lru_list_t buffer_lru;
1135     uint64_t buffer_size = 0;
1136
1137   public:
1138     LRUCache(CephContext* cct) : Cache(cct) {}
1139     uint64_t _get_num_onodes() override {
1140       return onode_lru.size();
1141     }
1142     void _add_onode(OnodeRef& o, int level) override {
1143       if (level > 0)
1144         onode_lru.push_front(*o);
1145       else
1146         onode_lru.push_back(*o);
1147     }
1148     void _rm_onode(OnodeRef& o) override {
1149       auto q = onode_lru.iterator_to(*o);
1150       onode_lru.erase(q);
1151     }
1152     void _touch_onode(OnodeRef& o) override;
1153
1154     uint64_t _get_buffer_bytes() override {
1155       return buffer_size;
1156     }
1157     void _add_buffer(Buffer *b, int level, Buffer *near) override {
1158       if (near) {
1159         auto q = buffer_lru.iterator_to(*near);
1160         buffer_lru.insert(q, *b);
1161       } else if (level > 0) {
1162         buffer_lru.push_front(*b);
1163       } else {
1164         buffer_lru.push_back(*b);
1165       }
1166       buffer_size += b->length;
1167     }
1168     void _rm_buffer(Buffer *b) override {
1169       assert(buffer_size >= b->length);
1170       buffer_size -= b->length;
1171       auto q = buffer_lru.iterator_to(*b);
1172       buffer_lru.erase(q);
1173     }
1174     void _move_buffer(Cache *src, Buffer *b) override {
1175       src->_rm_buffer(b);
1176       _add_buffer(b, 0, nullptr);
1177     }
1178     void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1179       assert((int64_t)buffer_size + delta >= 0);
1180       buffer_size += delta;
1181     }
1182     void _touch_buffer(Buffer *b) override {
1183       auto p = buffer_lru.iterator_to(*b);
1184       buffer_lru.erase(p);
1185       buffer_lru.push_front(*b);
1186       _audit("_touch_buffer end");
1187     }
1188
1189     void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1190
1191     void add_stats(uint64_t *onodes, uint64_t *extents,
1192                    uint64_t *blobs,
1193                    uint64_t *buffers,
1194                    uint64_t *bytes) override {
1195       std::lock_guard<std::recursive_mutex> l(lock);
1196       *onodes += onode_lru.size();
1197       *extents += num_extents;
1198       *blobs += num_blobs;
1199       *buffers += buffer_lru.size();
1200       *bytes += buffer_size;
1201     }
1202
1203 #ifdef DEBUG_CACHE
1204     void _audit(const char *s) override;
1205 #endif
1206   };
1207
1208   // 2Q cache for buffers, LRU for onodes
1209   struct TwoQCache : public Cache {
1210   private:
1211     // stick with LRU for onodes for now (fixme?)
1212     typedef boost::intrusive::list<
1213       Onode,
1214       boost::intrusive::member_hook<
1215         Onode,
1216         boost::intrusive::list_member_hook<>,
1217         &Onode::lru_item> > onode_lru_list_t;
1218     typedef boost::intrusive::list<
1219       Buffer,
1220       boost::intrusive::member_hook<
1221         Buffer,
1222         boost::intrusive::list_member_hook<>,
1223         &Buffer::lru_item> > buffer_list_t;
1224
1225     onode_lru_list_t onode_lru;
1226
1227     buffer_list_t buffer_hot;      ///< "Am" hot buffers
1228     buffer_list_t buffer_warm_in;  ///< "A1in" newly warm buffers
1229     buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1230     uint64_t buffer_bytes = 0;     ///< bytes
1231
1232     enum {
1233       BUFFER_NEW = 0,
1234       BUFFER_WARM_IN,   ///< in buffer_warm_in
1235       BUFFER_WARM_OUT,  ///< in buffer_warm_out
1236       BUFFER_HOT,       ///< in buffer_hot
1237       BUFFER_TYPE_MAX
1238     };
1239
1240     uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1241
1242   public:
1243     TwoQCache(CephContext* cct) : Cache(cct) {}
1244     uint64_t _get_num_onodes() override {
1245       return onode_lru.size();
1246     }
1247     void _add_onode(OnodeRef& o, int level) override {
1248       if (level > 0)
1249         onode_lru.push_front(*o);
1250       else
1251         onode_lru.push_back(*o);
1252     }
1253     void _rm_onode(OnodeRef& o) override {
1254       auto q = onode_lru.iterator_to(*o);
1255       onode_lru.erase(q);
1256     }
1257     void _touch_onode(OnodeRef& o) override;
1258
1259     uint64_t _get_buffer_bytes() override {
1260       return buffer_bytes;
1261     }
1262     void _add_buffer(Buffer *b, int level, Buffer *near) override;
1263     void _rm_buffer(Buffer *b) override;
1264     void _move_buffer(Cache *src, Buffer *b) override;
1265     void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1266     void _touch_buffer(Buffer *b) override {
1267       switch (b->cache_private) {
1268       case BUFFER_WARM_IN:
1269         // do nothing (somewhat counter-intuitively!)
1270         break;
1271       case BUFFER_WARM_OUT:
1272         // move from warm_out to hot LRU
1273         assert(0 == "this happens via discard hint");
1274         break;
1275       case BUFFER_HOT:
1276         // move to front of hot LRU
1277         buffer_hot.erase(buffer_hot.iterator_to(*b));
1278         buffer_hot.push_front(*b);
1279         break;
1280       }
1281       _audit("_touch_buffer end");
1282     }
1283
1284     void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1285
1286     void add_stats(uint64_t *onodes, uint64_t *extents,
1287                    uint64_t *blobs,
1288                    uint64_t *buffers,
1289                    uint64_t *bytes) override {
1290       std::lock_guard<std::recursive_mutex> l(lock);
1291       *onodes += onode_lru.size();
1292       *extents += num_extents;
1293       *blobs += num_blobs;
1294       *buffers += buffer_hot.size() + buffer_warm_in.size();
1295       *bytes += buffer_bytes;
1296     }
1297
1298 #ifdef DEBUG_CACHE
1299     void _audit(const char *s) override;
1300 #endif
1301   };
1302
1303   struct OnodeSpace {
1304   private:
1305     Cache *cache;
1306
1307     /// forward lookups
1308     mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1309
1310     friend class Collection; // for split_cache()
1311
1312   public:
1313     OnodeSpace(Cache *c) : cache(c) {}
1314     ~OnodeSpace() {
1315       clear();
1316     }
1317
1318     OnodeRef add(const ghobject_t& oid, OnodeRef o);
1319     OnodeRef lookup(const ghobject_t& o);
1320     void remove(const ghobject_t& oid) {
1321       onode_map.erase(oid);
1322     }
1323     void rename(OnodeRef& o, const ghobject_t& old_oid,
1324                 const ghobject_t& new_oid,
1325                 const mempool::bluestore_cache_other::string& new_okey);
1326     void clear();
1327     bool empty();
1328
1329     void dump(CephContext *cct, int lvl);
1330
1331     /// return true if f true for any item
1332     bool map_any(std::function<bool(OnodeRef)> f);
1333   };
1334
1335   struct Collection : public CollectionImpl {
1336     BlueStore *store;
1337     Cache *cache;       ///< our cache shard
1338     coll_t cid;
1339     bluestore_cnode_t cnode;
1340     RWLock lock;
1341
1342     bool exists;
1343
1344     SharedBlobSet shared_blob_set;      ///< open SharedBlobs
1345
1346     // cache onodes on a per-collection basis to avoid lock
1347     // contention.
1348     OnodeSpace onode_map;
1349
1350     //pool options
1351     pool_opts_t pool_opts;
1352
1353     OnodeRef get_onode(const ghobject_t& oid, bool create);
1354
1355     // the terminology is confusing here, sorry!
1356     //
1357     //  blob_t     shared_blob_t
1358     //  !shared    unused                -> open
1359     //  shared     !loaded               -> open + shared
1360     //  shared     loaded                -> open + shared + loaded
1361     //
1362     // i.e.,
1363     //  open = SharedBlob is instantiated
1364     //  shared = blob_t shared flag is set; SharedBlob is hashed.
1365     //  loaded = SharedBlob::shared_blob_t is loaded from kv store
1366     void open_shared_blob(uint64_t sbid, BlobRef b);
1367     void load_shared_blob(SharedBlobRef sb);
1368     void make_blob_shared(uint64_t sbid, BlobRef b);
1369     uint64_t make_blob_unshared(SharedBlob *sb);
1370
1371     BlobRef new_blob() {
1372       BlobRef b = new Blob();
1373       b->shared_blob = new SharedBlob(this);
1374       return b;
1375     }
1376
1377     const coll_t &get_cid() override {
1378       return cid;
1379     }
1380
1381     bool contains(const ghobject_t& oid) {
1382       if (cid.is_meta())
1383         return oid.hobj.pool == -1;
1384       spg_t spgid;
1385       if (cid.is_pg(&spgid))
1386         return
1387           spgid.pgid.contains(cnode.bits, oid) &&
1388           oid.shard_id == spgid.shard;
1389       return false;
1390     }
1391
1392     void split_cache(Collection *dest);
1393
1394     Collection(BlueStore *ns, Cache *ca, coll_t c);
1395   };
1396
1397   class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1398     CollectionRef c;
1399     OnodeRef o;
1400     KeyValueDB::Iterator it;
1401     string head, tail;
1402   public:
1403     OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1404     int seek_to_first() override;
1405     int upper_bound(const string &after) override;
1406     int lower_bound(const string &to) override;
1407     bool valid() override;
1408     int next(bool validate=true) override;
1409     string key() override;
1410     bufferlist value() override;
1411     int status() override {
1412       return 0;
1413     }
1414   };
1415
1416   class OpSequencer;
1417   typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1418
1419   struct volatile_statfs{
1420     enum {
1421       STATFS_ALLOCATED = 0,
1422       STATFS_STORED,
1423       STATFS_COMPRESSED_ORIGINAL,
1424       STATFS_COMPRESSED,
1425       STATFS_COMPRESSED_ALLOCATED,
1426       STATFS_LAST
1427     };
1428     int64_t values[STATFS_LAST];
1429     volatile_statfs() {
1430       memset(this, 0, sizeof(volatile_statfs));
1431     }
1432     void reset() {
1433       *this = volatile_statfs();
1434     }
1435     volatile_statfs& operator+=(const volatile_statfs& other) {
1436       for (size_t i = 0; i < STATFS_LAST; ++i) {
1437         values[i] += other.values[i];
1438       }
1439       return *this;
1440     }
1441     int64_t& allocated() {
1442       return values[STATFS_ALLOCATED];
1443     }
1444     int64_t& stored() {
1445       return values[STATFS_STORED];
1446     }
1447     int64_t& compressed_original() {
1448       return values[STATFS_COMPRESSED_ORIGINAL];
1449     }
1450     int64_t& compressed() {
1451       return values[STATFS_COMPRESSED];
1452     }
1453     int64_t& compressed_allocated() {
1454       return values[STATFS_COMPRESSED_ALLOCATED];
1455     }
1456     bool is_empty() {
1457       return values[STATFS_ALLOCATED] == 0 &&
1458         values[STATFS_STORED] == 0 &&
1459         values[STATFS_COMPRESSED] == 0 &&
1460         values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1461         values[STATFS_COMPRESSED_ALLOCATED] == 0;
1462     }
1463     void decode(bufferlist::iterator& it) {
1464       for (size_t i = 0; i < STATFS_LAST; i++) {
1465         ::decode(values[i], it);
1466       }
1467     }
1468
1469     void encode(bufferlist& bl) {
1470       for (size_t i = 0; i < STATFS_LAST; i++) {
1471         ::encode(values[i], bl);
1472       }
1473     }
1474   };
1475
1476   struct TransContext : public AioContext {
1477     MEMPOOL_CLASS_HELPERS();
1478
1479     typedef enum {
1480       STATE_PREPARE,
1481       STATE_AIO_WAIT,
1482       STATE_IO_DONE,
1483       STATE_KV_QUEUED,     // queued for kv_sync_thread submission
1484       STATE_KV_SUBMITTED,  // submitted to kv; not yet synced
1485       STATE_KV_DONE,
1486       STATE_DEFERRED_QUEUED,    // in deferred_queue (pending or running)
1487       STATE_DEFERRED_CLEANUP,   // remove deferred kv record
1488       STATE_DEFERRED_DONE,
1489       STATE_FINISHING,
1490       STATE_DONE,
1491     } state_t;
1492
1493     state_t state = STATE_PREPARE;
1494
1495     const char *get_state_name() {
1496       switch (state) {
1497       case STATE_PREPARE: return "prepare";
1498       case STATE_AIO_WAIT: return "aio_wait";
1499       case STATE_IO_DONE: return "io_done";
1500       case STATE_KV_QUEUED: return "kv_queued";
1501       case STATE_KV_SUBMITTED: return "kv_submitted";
1502       case STATE_KV_DONE: return "kv_done";
1503       case STATE_DEFERRED_QUEUED: return "deferred_queued";
1504       case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1505       case STATE_DEFERRED_DONE: return "deferred_done";
1506       case STATE_FINISHING: return "finishing";
1507       case STATE_DONE: return "done";
1508       }
1509       return "???";
1510     }
1511
1512 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1513     const char *get_state_latency_name(int state) {
1514       switch (state) {
1515       case l_bluestore_state_prepare_lat: return "prepare";
1516       case l_bluestore_state_aio_wait_lat: return "aio_wait";
1517       case l_bluestore_state_io_done_lat: return "io_done";
1518       case l_bluestore_state_kv_queued_lat: return "kv_queued";
1519       case l_bluestore_state_kv_committing_lat: return "kv_committing";
1520       case l_bluestore_state_kv_done_lat: return "kv_done";
1521       case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1522       case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1523       case l_bluestore_state_finishing_lat: return "finishing";
1524       case l_bluestore_state_done_lat: return "done";
1525       }
1526       return "???";
1527     }
1528 #endif
1529
1530     void log_state_latency(PerfCounters *logger, int state) {
1531       utime_t lat, now = ceph_clock_now();
1532       lat = now - last_stamp;
1533       logger->tinc(state, lat);
1534 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1535       if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1536         double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1537         OID_ELAPSED("", usecs, get_state_latency_name(state));
1538       }
1539 #endif
1540       last_stamp = now;
1541     }
1542
1543     OpSequencerRef osr;
1544     boost::intrusive::list_member_hook<> sequencer_item;
1545
1546     uint64_t bytes = 0, cost = 0;
1547
1548     set<OnodeRef> onodes;     ///< these need to be updated/written
1549     set<OnodeRef> modified_objects;  ///< objects we modified (and need a ref)
1550     set<SharedBlobRef> shared_blobs;  ///< these need to be updated/written
1551     set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1552
1553     KeyValueDB::Transaction t; ///< then we will commit this
1554     Context *oncommit = nullptr;         ///< signal on commit
1555     Context *onreadable = nullptr;       ///< signal on readable
1556     Context *onreadable_sync = nullptr;  ///< signal on readable
1557     list<Context*> oncommits;  ///< more commit completions
1558     list<CollectionRef> removed_collections; ///< colls we removed
1559
1560     boost::intrusive::list_member_hook<> deferred_queue_item;
1561     bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1562
1563     interval_set<uint64_t> allocated, released;
1564     volatile_statfs statfs_delta;
1565
1566     IOContext ioc;
1567     bool had_ios = false;  ///< true if we submitted IOs before our kv txn
1568
1569     uint64_t seq = 0;
1570     utime_t start;
1571     utime_t last_stamp;
1572
1573     uint64_t last_nid = 0;     ///< if non-zero, highest new nid we allocated
1574     uint64_t last_blobid = 0;  ///< if non-zero, highest new blobid we allocated
1575
1576     explicit TransContext(CephContext* cct, OpSequencer *o)
1577       : osr(o),
1578         ioc(cct, this),
1579         start(ceph_clock_now()) {
1580       last_stamp = start;
1581     }
1582     ~TransContext() {
1583       delete deferred_txn;
1584     }
1585
1586     void write_onode(OnodeRef &o) {
1587       onodes.insert(o);
1588     }
1589     void write_shared_blob(SharedBlobRef &sb) {
1590       shared_blobs.insert(sb);
1591     }
1592     void unshare_blob(SharedBlob *sb) {
1593       shared_blobs.erase(sb);
1594     }
1595
1596     /// note we logically modified object (when onode itself is unmodified)
1597     void note_modified_object(OnodeRef &o) {
1598       // onode itself isn't written, though
1599       modified_objects.insert(o);
1600     }
1601     void removed(OnodeRef& o) {
1602       onodes.erase(o);
1603       modified_objects.erase(o);
1604     }
1605
1606     void aio_finish(BlueStore *store) override {
1607       store->txc_aio_finish(this);
1608     }
1609   };
1610
1611   typedef boost::intrusive::list<
1612     TransContext,
1613     boost::intrusive::member_hook<
1614       TransContext,
1615       boost::intrusive::list_member_hook<>,
1616       &TransContext::deferred_queue_item> > deferred_queue_t;
1617
1618   struct DeferredBatch : public AioContext {
1619     OpSequencer *osr;
1620     struct deferred_io {
1621       bufferlist bl;    ///< data
1622       uint64_t seq;     ///< deferred transaction seq
1623     };
1624     map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1625     deferred_queue_t txcs;           ///< txcs in this batch
1626     IOContext ioc;                   ///< our aios
1627     /// bytes of pending io for each deferred seq (may be 0)
1628     map<uint64_t,int> seq_bytes;
1629
1630     void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1631     void _audit(CephContext *cct);
1632
1633     DeferredBatch(CephContext *cct, OpSequencer *osr)
1634       : osr(osr), ioc(cct, this) {}
1635
1636     /// prepare a write
1637     void prepare_write(CephContext *cct,
1638                        uint64_t seq, uint64_t offset, uint64_t length,
1639                        bufferlist::const_iterator& p);
1640
1641     void aio_finish(BlueStore *store) override {
1642       store->_deferred_aio_finish(osr);
1643     }
1644   };
1645
1646   class OpSequencer : public Sequencer_impl {
1647   public:
1648     std::mutex qlock;
1649     std::condition_variable qcond;
1650     typedef boost::intrusive::list<
1651       TransContext,
1652       boost::intrusive::member_hook<
1653         TransContext,
1654         boost::intrusive::list_member_hook<>,
1655         &TransContext::sequencer_item> > q_list_t;
1656     q_list_t q;  ///< transactions
1657
1658     boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1659
1660     DeferredBatch *deferred_running = nullptr;
1661     DeferredBatch *deferred_pending = nullptr;
1662
1663     Sequencer *parent;
1664     BlueStore *store;
1665
1666     uint64_t last_seq = 0;
1667
1668     std::atomic_int txc_with_unstable_io = {0};  ///< num txcs with unstable io
1669
1670     std::atomic_int kv_committing_serially = {0};
1671
1672     std::atomic_int kv_submitted_waiters = {0};
1673
1674     std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1675     std::atomic_bool zombie = {false};    ///< owning Sequencer has gone away
1676
1677     OpSequencer(CephContext* cct, BlueStore *store)
1678       : Sequencer_impl(cct),
1679         parent(NULL), store(store) {
1680       store->register_osr(this);
1681     }
1682     ~OpSequencer() override {
1683       assert(q.empty());
1684       _unregister();
1685     }
1686
1687     void discard() override {
1688       // Note that we may have txc's in flight when the parent Sequencer
1689       // goes away.  Reflect this with zombie==registered==true and let
1690       // _osr_drain_all clean up later.
1691       assert(!zombie);
1692       zombie = true;
1693       parent = nullptr;
1694       bool empty;
1695       {
1696         std::lock_guard<std::mutex> l(qlock);
1697         empty = q.empty();
1698       }
1699       if (empty) {
1700         _unregister();
1701       }
1702     }
1703
1704     void _unregister() {
1705       if (registered) {
1706         store->unregister_osr(this);
1707         registered = false;
1708       }
1709     }
1710
1711     void queue_new(TransContext *txc) {
1712       std::lock_guard<std::mutex> l(qlock);
1713       txc->seq = ++last_seq;
1714       q.push_back(*txc);
1715     }
1716
1717     void drain() {
1718       std::unique_lock<std::mutex> l(qlock);
1719       while (!q.empty())
1720         qcond.wait(l);
1721     }
1722
1723     void drain_preceding(TransContext *txc) {
1724       std::unique_lock<std::mutex> l(qlock);
1725       while (!q.empty() && &q.front() != txc)
1726         qcond.wait(l);
1727     }
1728
1729     bool _is_all_kv_submitted() {
1730       // caller must hold qlock
1731       if (q.empty()) {
1732         return true;
1733       }
1734       TransContext *txc = &q.back();
1735       if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1736         return true;
1737       }
1738       return false;
1739     }
1740
1741     void flush() override {
1742       std::unique_lock<std::mutex> l(qlock);
1743       while (true) {
1744         // set flag before the check because the condition
1745         // may become true outside qlock, and we need to make
1746         // sure those threads see waiters and signal qcond.
1747         ++kv_submitted_waiters;
1748         if (_is_all_kv_submitted()) {
1749           return;
1750         }
1751         qcond.wait(l);
1752         --kv_submitted_waiters;
1753       }
1754     }
1755
1756     bool flush_commit(Context *c) override {
1757       std::lock_guard<std::mutex> l(qlock);
1758       if (q.empty()) {
1759         return true;
1760       }
1761       TransContext *txc = &q.back();
1762       if (txc->state >= TransContext::STATE_KV_DONE) {
1763         return true;
1764       }
1765       txc->oncommits.push_back(c);
1766       return false;
1767     }
1768   };
1769
1770   typedef boost::intrusive::list<
1771     OpSequencer,
1772     boost::intrusive::member_hook<
1773       OpSequencer,
1774       boost::intrusive::list_member_hook<>,
1775       &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1776
1777   struct KVSyncThread : public Thread {
1778     BlueStore *store;
1779     explicit KVSyncThread(BlueStore *s) : store(s) {}
1780     void *entry() override {
1781       store->_kv_sync_thread();
1782       return NULL;
1783     }
1784   };
1785   struct KVFinalizeThread : public Thread {
1786     BlueStore *store;
1787     explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1788     void *entry() {
1789       store->_kv_finalize_thread();
1790       return NULL;
1791     }
1792   };
1793
1794   struct DBHistogram {
1795     struct value_dist {
1796       uint64_t count;
1797       uint32_t max_len;
1798     };
1799
1800     struct key_dist {
1801       uint64_t count;
1802       uint32_t max_len;
1803       map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1804     };
1805
1806     map<string, map<int, struct key_dist> > key_hist;
1807     map<int, uint64_t> value_hist;
1808     int get_key_slab(size_t sz);
1809     string get_key_slab_to_range(int slab);
1810     int get_value_slab(size_t sz);
1811     string get_value_slab_to_range(int slab);
1812     void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1813                           const string &prefix, size_t key_size, size_t value_size);
1814     void dump(Formatter *f);
1815   };
1816
1817   // --------------------------------------------------------
1818   // members
1819 private:
1820   BlueFS *bluefs = nullptr;
1821   unsigned bluefs_shared_bdev = 0;  ///< which bluefs bdev we are sharing
1822   bool bluefs_single_shared_device = true;
1823   utime_t bluefs_last_balance;
1824
1825   KeyValueDB *db = nullptr;
1826   BlockDevice *bdev = nullptr;
1827   std::string freelist_type;
1828   FreelistManager *fm = nullptr;
1829   Allocator *alloc = nullptr;
1830   uuid_d fsid;
1831   int path_fd = -1;  ///< open handle to $path
1832   int fsid_fd = -1;  ///< open handle (locked) to $path/fsid
1833   bool mounted = false;
1834
1835   RWLock coll_lock = {"BlueStore::coll_lock"};  ///< rwlock to protect coll_map
1836   mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1837
1838   vector<Cache*> cache_shards;
1839
1840   std::mutex osr_lock;              ///< protect osd_set
1841   std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1842
1843   std::atomic<uint64_t> nid_last = {0};
1844   std::atomic<uint64_t> nid_max = {0};
1845   std::atomic<uint64_t> blobid_last = {0};
1846   std::atomic<uint64_t> blobid_max = {0};
1847
1848   Throttle throttle_bytes;          ///< submit to commit
1849   Throttle throttle_deferred_bytes;  ///< submit to deferred complete
1850
1851   interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
1852   interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1853
1854   std::mutex deferred_lock;
1855   std::atomic<uint64_t> deferred_seq = {0};
1856   deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1857   int deferred_queue_size = 0;         ///< num txc's queued across all osrs
1858   atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1859   Finisher deferred_finisher;
1860
1861   int m_finisher_num = 1;
1862   vector<Finisher*> finishers;
1863
1864   KVSyncThread kv_sync_thread;
1865   std::mutex kv_lock;
1866   std::condition_variable kv_cond;
1867   bool _kv_only = false;
1868   bool kv_sync_started = false;
1869   bool kv_stop = false;
1870   bool kv_finalize_started = false;
1871   bool kv_finalize_stop = false;
1872   deque<TransContext*> kv_queue;             ///< ready, already submitted
1873   deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1874   deque<TransContext*> kv_committing;        ///< currently syncing
1875   deque<DeferredBatch*> deferred_done_queue;   ///< deferred ios done
1876   deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1877
1878   KVFinalizeThread kv_finalize_thread;
1879   std::mutex kv_finalize_lock;
1880   std::condition_variable kv_finalize_cond;
1881   deque<TransContext*> kv_committing_to_finalize;   ///< pending finalization
1882   deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1883
1884   PerfCounters *logger = nullptr;
1885
1886   std::mutex reap_lock;
1887   list<CollectionRef> removed_collections;
1888
1889   RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1890   set<ghobject_t> debug_data_error_objects;
1891   set<ghobject_t> debug_mdata_error_objects;
1892
1893   std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1894
1895   uint64_t block_size = 0;     ///< block size of block device (power of 2)
1896   uint64_t block_mask = 0;     ///< mask to get just the block offset
1897   size_t block_size_order = 0; ///< bits to shift to get block size
1898
1899   uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1900   ///< bits for min_alloc_size
1901   uint8_t min_alloc_size_order = 0;
1902   static_assert(std::numeric_limits<uint8_t>::max() >
1903                 std::numeric_limits<decltype(min_alloc_size)>::digits,
1904                 "not enough bits for min_alloc_size");
1905
1906   ///< maximum allocation unit (power of 2)
1907   std::atomic<uint64_t> max_alloc_size = {0};
1908
1909   ///< number threshold for forced deferred writes
1910   std::atomic<int> deferred_batch_ops = {0};
1911
1912   ///< size threshold for forced deferred writes
1913   std::atomic<uint64_t> prefer_deferred_size = {0};
1914
1915   ///< approx cost per io, in bytes
1916   std::atomic<uint64_t> throttle_cost_per_io = {0};
1917
1918   std::atomic<Compressor::CompressionMode> comp_mode =
1919     {Compressor::COMP_NONE}; ///< compression mode
1920   CompressorRef compressor;
1921   std::atomic<uint64_t> comp_min_blob_size = {0};
1922   std::atomic<uint64_t> comp_max_blob_size = {0};
1923
1924   std::atomic<uint64_t> max_blob_size = {0};  ///< maximum blob size
1925
1926   uint64_t kv_ios = 0;
1927   uint64_t kv_throttle_costs = 0;
1928
1929   // cache trim control
1930   uint64_t cache_size = 0;      ///< total cache size
1931   float cache_meta_ratio = 0;   ///< cache ratio dedicated to metadata
1932   float cache_kv_ratio = 0;     ///< cache ratio dedicated to kv (e.g., rocksdb)
1933   float cache_data_ratio = 0;   ///< cache ratio dedicated to object data
1934
1935   std::mutex vstatfs_lock;
1936   volatile_statfs vstatfs;
1937
1938   struct MempoolThread : public Thread {
1939     BlueStore *store;
1940     Cond cond;
1941     Mutex lock;
1942     bool stop = false;
1943   public:
1944     explicit MempoolThread(BlueStore *s)
1945       : store(s),
1946         lock("BlueStore::MempoolThread::lock") {}
1947     void *entry() override;
1948     void init() {
1949       assert(stop == false);
1950       create("bstore_mempool");
1951     }
1952     void shutdown() {
1953       lock.Lock();
1954       stop = true;
1955       cond.Signal();
1956       lock.Unlock();
1957       join();
1958     }
1959   } mempool_thread;
1960
1961   // --------------------------------------------------------
1962   // private methods
1963
1964   void _init_logger();
1965   void _shutdown_logger();
1966   int _reload_logger();
1967
1968   int _open_path();
1969   void _close_path();
1970   int _open_fsid(bool create);
1971   int _lock_fsid();
1972   int _read_fsid(uuid_d *f);
1973   int _write_fsid();
1974   void _close_fsid();
1975   void _set_alloc_sizes();
1976   void _set_blob_size();
1977
1978   int _open_bdev(bool create);
1979   void _close_bdev();
1980   int _open_db(bool create);
1981   void _close_db();
1982   int _open_fm(bool create);
1983   void _close_fm();
1984   int _open_alloc();
1985   void _close_alloc();
1986   int _open_collections(int *errors=0);
1987   void _close_collections();
1988
1989   int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1990                                    bool create);
1991
1992 public:
1993   static int _write_bdev_label(CephContext* cct,
1994                                string path, bluestore_bdev_label_t label);
1995   static int _read_bdev_label(CephContext* cct, string path,
1996                               bluestore_bdev_label_t *label);
1997 private:
1998   int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1999                                bool create);
2000
2001   int _open_super_meta();
2002
2003   void _open_statfs();
2004
2005   int _reconcile_bluefs_freespace();
2006   int _balance_bluefs_freespace(PExtentVector *extents);
2007   void _commit_bluefs_freespace(const PExtentVector& extents);
2008
2009   CollectionRef _get_collection(const coll_t& cid);
2010   void _queue_reap_collection(CollectionRef& c);
2011   void _reap_collections();
2012   void _update_cache_logger();
2013
2014   void _assign_nid(TransContext *txc, OnodeRef o);
2015   uint64_t _assign_blobid(TransContext *txc);
2016
2017   void _dump_onode(OnodeRef o, int log_level=30);
2018   void _dump_extent_map(ExtentMap& em, int log_level=30);
2019   void _dump_transaction(Transaction *t, int log_level = 30);
2020
2021   TransContext *_txc_create(OpSequencer *osr);
2022   void _txc_update_store_statfs(TransContext *txc);
2023   void _txc_add_transaction(TransContext *txc, Transaction *t);
2024   void _txc_calc_cost(TransContext *txc);
2025   void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2026   void _txc_state_proc(TransContext *txc);
2027   void _txc_aio_submit(TransContext *txc);
2028 public:
2029   void txc_aio_finish(void *p) {
2030     _txc_state_proc(static_cast<TransContext*>(p));
2031   }
2032 private:
2033   void _txc_finish_io(TransContext *txc);
2034   void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2035   void _txc_applied_kv(TransContext *txc);
2036   void _txc_committed_kv(TransContext *txc);
2037   void _txc_finish(TransContext *txc);
2038   void _txc_release_alloc(TransContext *txc);
2039
2040   void _osr_drain_preceding(TransContext *txc);
2041   void _osr_drain_all();
2042   void _osr_unregister_all();
2043
2044   void _kv_start();
2045   void _kv_stop();
2046   void _kv_sync_thread();
2047   void _kv_finalize_thread();
2048
2049   bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2050   void _deferred_queue(TransContext *txc);
2051 public:
2052   void deferred_try_submit();
2053 private:
2054   void _deferred_submit_unlock(OpSequencer *osr);
2055   void _deferred_aio_finish(OpSequencer *osr);
2056   int _deferred_replay();
2057
2058 public:
2059   using mempool_dynamic_bitset =
2060     boost::dynamic_bitset<uint64_t,
2061                           mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2062
2063 private:
2064   int _fsck_check_extents(
2065     const ghobject_t& oid,
2066     const PExtentVector& extents,
2067     bool compressed,
2068     mempool_dynamic_bitset &used_blocks,
2069     store_statfs_t& expected_statfs);
2070
2071   void _buffer_cache_write(
2072     TransContext *txc,
2073     BlobRef b,
2074     uint64_t offset,
2075     bufferlist& bl,
2076     unsigned flags) {
2077     b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2078                              flags);
2079     txc->shared_blobs_written.insert(b->shared_blob);
2080   }
2081
2082   int _collection_list(
2083     Collection *c, const ghobject_t& start, const ghobject_t& end,
2084     int max, vector<ghobject_t> *ls, ghobject_t *next);
2085
2086   template <typename T, typename F>
2087   T select_option(const std::string& opt_name, T val1, F f) {
2088     //NB: opt_name reserved for future use
2089     boost::optional<T> val2 = f();
2090     if (val2) {
2091       return *val2;
2092     }
2093     return val1;
2094   }
2095
2096   void _apply_padding(uint64_t head_pad,
2097                       uint64_t tail_pad,
2098                       bufferlist& padded);
2099
2100   // -- ondisk version ---
2101 public:
2102   const int32_t latest_ondisk_format = 2;        ///< our version
2103   const int32_t min_readable_ondisk_format = 1;  ///< what we can read
2104   const int32_t min_compat_ondisk_format = 2;    ///< who can read us
2105
2106 private:
2107   int32_t ondisk_format = 0;  ///< value detected on mount
2108
2109   int _upgrade_super();  ///< upgrade (called during open_super)
2110   void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2111
2112   // --- public interface ---
2113 public:
2114   BlueStore(CephContext *cct, const string& path);
2115   BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2116   ~BlueStore() override;
2117
2118   string get_type() override {
2119     return "bluestore";
2120   }
2121
2122   bool needs_journal() override { return false; };
2123   bool wants_journal() override { return false; };
2124   bool allows_journal() override { return false; };
2125
2126   bool is_rotational() override;
2127   bool is_journal_rotational() override;
2128
2129   string get_default_device_class() override {
2130     string device_class;
2131     map<string, string> metadata;
2132     collect_metadata(&metadata);
2133     auto it = metadata.find("bluestore_bdev_type");
2134     if (it != metadata.end()) {
2135       device_class = it->second;
2136     }
2137     return device_class;
2138   }
2139
2140   static int get_block_device_fsid(CephContext* cct, const string& path,
2141                                    uuid_d *fsid);
2142
2143   bool test_mount_in_use() override;
2144
2145 private:
2146   int _mount(bool kv_only);
2147 public:
2148   int mount() override {
2149     return _mount(false);
2150   }
2151   int umount() override;
2152
2153   int start_kv_only(KeyValueDB **pdb) {
2154     int r = _mount(true);
2155     if (r < 0)
2156       return r;
2157     *pdb = db;
2158     return 0;
2159   }
2160
2161   int write_meta(const std::string& key, const std::string& value) override;
2162   int read_meta(const std::string& key, std::string *value) override;
2163
2164
2165   int fsck(bool deep) override {
2166     return _fsck(deep, false);
2167   }
2168   int repair(bool deep) override {
2169     return _fsck(deep, true);
2170   }
2171   int _fsck(bool deep, bool repair);
2172
2173   void set_cache_shards(unsigned num) override;
2174
2175   int validate_hobject_key(const hobject_t &obj) const override {
2176     return 0;
2177   }
2178   unsigned get_max_attr_name_length() override {
2179     return 256;  // arbitrary; there is no real limit internally
2180   }
2181
2182   int mkfs() override;
2183   int mkjournal() override {
2184     return 0;
2185   }
2186
2187   void get_db_statistics(Formatter *f) override;
2188   void generate_db_histogram(Formatter *f) override;
2189   void _flush_cache();
2190   void flush_cache() override;
2191   void dump_perf_counters(Formatter *f) override {
2192     f->open_object_section("perf_counters");
2193     logger->dump_formatted(f, false);
2194     f->close_section();
2195   }
2196
2197   void register_osr(OpSequencer *osr) {
2198     std::lock_guard<std::mutex> l(osr_lock);
2199     osr_set.insert(osr);
2200   }
2201   void unregister_osr(OpSequencer *osr) {
2202     std::lock_guard<std::mutex> l(osr_lock);
2203     osr_set.erase(osr);
2204   }
2205
2206 public:
2207   int statfs(struct store_statfs_t *buf) override;
2208
2209   void collect_metadata(map<string,string> *pm) override;
2210
2211   bool exists(const coll_t& cid, const ghobject_t& oid) override;
2212   bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2213   int set_collection_opts(
2214     const coll_t& cid,
2215     const pool_opts_t& opts) override;
2216   int stat(
2217     const coll_t& cid,
2218     const ghobject_t& oid,
2219     struct stat *st,
2220     bool allow_eio = false) override;
2221   int stat(
2222     CollectionHandle &c,
2223     const ghobject_t& oid,
2224     struct stat *st,
2225     bool allow_eio = false) override;
2226   int read(
2227     const coll_t& cid,
2228     const ghobject_t& oid,
2229     uint64_t offset,
2230     size_t len,
2231     bufferlist& bl,
2232     uint32_t op_flags = 0) override;
2233   int read(
2234     CollectionHandle &c,
2235     const ghobject_t& oid,
2236     uint64_t offset,
2237     size_t len,
2238     bufferlist& bl,
2239     uint32_t op_flags = 0) override;
2240   int _do_read(
2241     Collection *c,
2242     OnodeRef o,
2243     uint64_t offset,
2244     size_t len,
2245     bufferlist& bl,
2246     uint32_t op_flags = 0);
2247
2248 private:
2249   int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2250              uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2251 public:
2252   int fiemap(const coll_t& cid, const ghobject_t& oid,
2253              uint64_t offset, size_t len, bufferlist& bl) override;
2254   int fiemap(CollectionHandle &c, const ghobject_t& oid,
2255              uint64_t offset, size_t len, bufferlist& bl) override;
2256   int fiemap(const coll_t& cid, const ghobject_t& oid,
2257              uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2258   int fiemap(CollectionHandle &c, const ghobject_t& oid,
2259              uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2260
2261
2262   int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2263               bufferptr& value) override;
2264   int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2265               bufferptr& value) override;
2266
2267   int getattrs(const coll_t& cid, const ghobject_t& oid,
2268                map<string,bufferptr>& aset) override;
2269   int getattrs(CollectionHandle &c, const ghobject_t& oid,
2270                map<string,bufferptr>& aset) override;
2271
2272   int list_collections(vector<coll_t>& ls) override;
2273
2274   CollectionHandle open_collection(const coll_t &c) override;
2275
2276   bool collection_exists(const coll_t& c) override;
2277   int collection_empty(const coll_t& c, bool *empty) override;
2278   int collection_bits(const coll_t& c) override;
2279
2280   int collection_list(const coll_t& cid,
2281                       const ghobject_t& start,
2282                       const ghobject_t& end,
2283                       int max,
2284                       vector<ghobject_t> *ls, ghobject_t *next) override;
2285   int collection_list(CollectionHandle &c,
2286                       const ghobject_t& start,
2287                       const ghobject_t& end,
2288                       int max,
2289                       vector<ghobject_t> *ls, ghobject_t *next) override;
2290
2291   int omap_get(
2292     const coll_t& cid,                ///< [in] Collection containing oid
2293     const ghobject_t &oid,   ///< [in] Object containing omap
2294     bufferlist *header,      ///< [out] omap header
2295     map<string, bufferlist> *out /// < [out] Key to value map
2296     ) override;
2297   int omap_get(
2298     CollectionHandle &c,     ///< [in] Collection containing oid
2299     const ghobject_t &oid,   ///< [in] Object containing omap
2300     bufferlist *header,      ///< [out] omap header
2301     map<string, bufferlist> *out /// < [out] Key to value map
2302     ) override;
2303
2304   /// Get omap header
2305   int omap_get_header(
2306     const coll_t& cid,                ///< [in] Collection containing oid
2307     const ghobject_t &oid,   ///< [in] Object containing omap
2308     bufferlist *header,      ///< [out] omap header
2309     bool allow_eio = false ///< [in] don't assert on eio
2310     ) override;
2311   int omap_get_header(
2312     CollectionHandle &c,                ///< [in] Collection containing oid
2313     const ghobject_t &oid,   ///< [in] Object containing omap
2314     bufferlist *header,      ///< [out] omap header
2315     bool allow_eio = false ///< [in] don't assert on eio
2316     ) override;
2317
2318   /// Get keys defined on oid
2319   int omap_get_keys(
2320     const coll_t& cid,              ///< [in] Collection containing oid
2321     const ghobject_t &oid, ///< [in] Object containing omap
2322     set<string> *keys      ///< [out] Keys defined on oid
2323     ) override;
2324   int omap_get_keys(
2325     CollectionHandle &c,              ///< [in] Collection containing oid
2326     const ghobject_t &oid, ///< [in] Object containing omap
2327     set<string> *keys      ///< [out] Keys defined on oid
2328     ) override;
2329
2330   /// Get key values
2331   int omap_get_values(
2332     const coll_t& cid,                    ///< [in] Collection containing oid
2333     const ghobject_t &oid,       ///< [in] Object containing omap
2334     const set<string> &keys,     ///< [in] Keys to get
2335     map<string, bufferlist> *out ///< [out] Returned keys and values
2336     ) override;
2337   int omap_get_values(
2338     CollectionHandle &c,         ///< [in] Collection containing oid
2339     const ghobject_t &oid,       ///< [in] Object containing omap
2340     const set<string> &keys,     ///< [in] Keys to get
2341     map<string, bufferlist> *out ///< [out] Returned keys and values
2342     ) override;
2343
2344   /// Filters keys into out which are defined on oid
2345   int omap_check_keys(
2346     const coll_t& cid,                ///< [in] Collection containing oid
2347     const ghobject_t &oid,   ///< [in] Object containing omap
2348     const set<string> &keys, ///< [in] Keys to check
2349     set<string> *out         ///< [out] Subset of keys defined on oid
2350     ) override;
2351   int omap_check_keys(
2352     CollectionHandle &c,                ///< [in] Collection containing oid
2353     const ghobject_t &oid,   ///< [in] Object containing omap
2354     const set<string> &keys, ///< [in] Keys to check
2355     set<string> *out         ///< [out] Subset of keys defined on oid
2356     ) override;
2357
2358   ObjectMap::ObjectMapIterator get_omap_iterator(
2359     const coll_t& cid,              ///< [in] collection
2360     const ghobject_t &oid  ///< [in] object
2361     ) override;
2362   ObjectMap::ObjectMapIterator get_omap_iterator(
2363     CollectionHandle &c,   ///< [in] collection
2364     const ghobject_t &oid  ///< [in] object
2365     ) override;
2366
2367   void set_fsid(uuid_d u) override {
2368     fsid = u;
2369   }
2370   uuid_d get_fsid() override {
2371     return fsid;
2372   }
2373
2374   uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2375     return num_objects * 300; //assuming per-object overhead is 300 bytes
2376   }
2377
2378   struct BSPerfTracker {
2379     PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2380     PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2381
2382     objectstore_perf_stat_t get_cur_stats() const {
2383       objectstore_perf_stat_t ret;
2384       ret.os_commit_latency = os_commit_latency.current_avg();
2385       ret.os_apply_latency = os_apply_latency.current_avg();
2386       return ret;
2387     }
2388
2389     void update_from_perfcounters(PerfCounters &logger);
2390   } perf_tracker;
2391
2392   objectstore_perf_stat_t get_cur_stats() override {
2393     perf_tracker.update_from_perfcounters(*logger);
2394     return perf_tracker.get_cur_stats();
2395   }
2396   const PerfCounters* get_perf_counters() const override {
2397     return logger;
2398   }
2399
2400   int queue_transactions(
2401     Sequencer *osr,
2402     vector<Transaction>& tls,
2403     TrackedOpRef op = TrackedOpRef(),
2404     ThreadPool::TPHandle *handle = NULL) override;
2405
2406   // error injection
2407   void inject_data_error(const ghobject_t& o) override {
2408     RWLock::WLocker l(debug_read_error_lock);
2409     debug_data_error_objects.insert(o);
2410   }
2411   void inject_mdata_error(const ghobject_t& o) override {
2412     RWLock::WLocker l(debug_read_error_lock);
2413     debug_mdata_error_objects.insert(o);
2414   }
2415   void compact() override {
2416     assert(db);
2417     db->compact();
2418   }
2419   
2420 private:
2421   bool _debug_data_eio(const ghobject_t& o) {
2422     if (!cct->_conf->bluestore_debug_inject_read_err) {
2423       return false;
2424     }
2425     RWLock::RLocker l(debug_read_error_lock);
2426     return debug_data_error_objects.count(o);
2427   }
2428   bool _debug_mdata_eio(const ghobject_t& o) {
2429     if (!cct->_conf->bluestore_debug_inject_read_err) {
2430       return false;
2431     }
2432     RWLock::RLocker l(debug_read_error_lock);
2433     return debug_mdata_error_objects.count(o);
2434   }
2435   void _debug_obj_on_delete(const ghobject_t& o) {
2436     if (cct->_conf->bluestore_debug_inject_read_err) {
2437       RWLock::WLocker l(debug_read_error_lock);
2438       debug_data_error_objects.erase(o);
2439       debug_mdata_error_objects.erase(o);
2440     }
2441   }
2442
2443 private:
2444
2445   // --------------------------------------------------------
2446   // read processing internal methods
2447   int _verify_csum(
2448     OnodeRef& o,
2449     const bluestore_blob_t* blob,
2450     uint64_t blob_xoffset,
2451     const bufferlist& bl,
2452     uint64_t logical_offset) const;
2453   int _decompress(bufferlist& source, bufferlist* result);
2454
2455
2456   // --------------------------------------------------------
2457   // write ops
2458
2459   struct WriteContext {
2460     bool buffered = false;          ///< buffered write
2461     bool compress = false;          ///< compressed write
2462     uint64_t target_blob_size = 0;  ///< target (max) blob size
2463     unsigned csum_order = 0;        ///< target checksum chunk order
2464
2465     old_extent_map_t old_extents;   ///< must deref these blobs
2466
2467     struct write_item {
2468       uint64_t logical_offset;      ///< write logical offset
2469       BlobRef b;
2470       uint64_t blob_length;
2471       uint64_t b_off;
2472       bufferlist bl;
2473       uint64_t b_off0; ///< original offset in a blob prior to padding
2474       uint64_t length0; ///< original data length prior to padding
2475
2476       bool mark_unused;
2477       bool new_blob; ///< whether new blob was created
2478
2479       bool compressed = false;
2480       bufferlist compressed_bl;
2481       size_t compressed_len = 0;
2482
2483       write_item(
2484         uint64_t logical_offs,
2485         BlobRef b,
2486         uint64_t blob_len,
2487         uint64_t o,
2488         bufferlist& bl,
2489         uint64_t o0,
2490         uint64_t l0,
2491         bool _mark_unused,
2492         bool _new_blob)
2493        :
2494          logical_offset(logical_offs),
2495          b(b),
2496          blob_length(blob_len),
2497          b_off(o),
2498          bl(bl),
2499          b_off0(o0),
2500          length0(l0),
2501          mark_unused(_mark_unused),
2502          new_blob(_new_blob) {}
2503     };
2504     vector<write_item> writes;                 ///< blobs we're writing
2505
2506     /// partial clone of the context
2507     void fork(const WriteContext& other) {
2508       buffered = other.buffered;
2509       compress = other.compress;
2510       target_blob_size = other.target_blob_size;
2511       csum_order = other.csum_order;
2512     }
2513     void write(
2514       uint64_t loffs,
2515       BlobRef b,
2516       uint64_t blob_len,
2517       uint64_t o,
2518       bufferlist& bl,
2519       uint64_t o0,
2520       uint64_t len0,
2521       bool _mark_unused,
2522       bool _new_blob) {
2523       writes.emplace_back(loffs,
2524                           b,
2525                           blob_len,
2526                           o,
2527                           bl,
2528                           o0,
2529                           len0,
2530                           _mark_unused,
2531                           _new_blob);
2532     }
2533     /// Checks for writes to the same pextent within a blob
2534     bool has_conflict(
2535       BlobRef b,
2536       uint64_t loffs,
2537       uint64_t loffs_end,
2538       uint64_t min_alloc_size);
2539   };
2540
2541   void _do_write_small(
2542     TransContext *txc,
2543     CollectionRef &c,
2544     OnodeRef o,
2545     uint64_t offset, uint64_t length,
2546     bufferlist::iterator& blp,
2547     WriteContext *wctx);
2548   void _do_write_big(
2549     TransContext *txc,
2550     CollectionRef &c,
2551     OnodeRef o,
2552     uint64_t offset, uint64_t length,
2553     bufferlist::iterator& blp,
2554     WriteContext *wctx);
2555   int _do_alloc_write(
2556     TransContext *txc,
2557     CollectionRef c,
2558     OnodeRef o,
2559     WriteContext *wctx);
2560   void _wctx_finish(
2561     TransContext *txc,
2562     CollectionRef& c,
2563     OnodeRef o,
2564     WriteContext *wctx,
2565     set<SharedBlob*> *maybe_unshared_blobs=0);
2566
2567   int _do_transaction(Transaction *t,
2568                       TransContext *txc,
2569                       ThreadPool::TPHandle *handle);
2570
2571   int _write(TransContext *txc,
2572              CollectionRef& c,
2573              OnodeRef& o,
2574              uint64_t offset, size_t len,
2575              bufferlist& bl,
2576              uint32_t fadvise_flags);
2577   void _pad_zeros(bufferlist *bl, uint64_t *offset,
2578                   uint64_t chunk_size);
2579
2580   void _choose_write_options(CollectionRef& c,
2581                              OnodeRef o,
2582                              uint32_t fadvise_flags,
2583                              WriteContext *wctx);
2584
2585   int _do_gc(TransContext *txc,
2586              CollectionRef& c,
2587              OnodeRef o,
2588              const GarbageCollector& gc,
2589              const WriteContext& wctx,
2590              uint64_t *dirty_start,
2591              uint64_t *dirty_end);
2592
2593   int _do_write(TransContext *txc,
2594                 CollectionRef &c,
2595                 OnodeRef o,
2596                 uint64_t offset, uint64_t length,
2597                 bufferlist& bl,
2598                 uint32_t fadvise_flags);
2599   void _do_write_data(TransContext *txc,
2600                       CollectionRef& c,
2601                       OnodeRef o,
2602                       uint64_t offset,
2603                       uint64_t length,
2604                       bufferlist& bl,
2605                       WriteContext *wctx);
2606
2607   int _touch(TransContext *txc,
2608              CollectionRef& c,
2609              OnodeRef& o);
2610   int _do_zero(TransContext *txc,
2611                CollectionRef& c,
2612                OnodeRef& o,
2613                uint64_t offset, size_t len);
2614   int _zero(TransContext *txc,
2615             CollectionRef& c,
2616             OnodeRef& o,
2617             uint64_t offset, size_t len);
2618   void _do_truncate(TransContext *txc,
2619                    CollectionRef& c,
2620                    OnodeRef o,
2621                    uint64_t offset,
2622                    set<SharedBlob*> *maybe_unshared_blobs=0);
2623   int _truncate(TransContext *txc,
2624                 CollectionRef& c,
2625                 OnodeRef& o,
2626                 uint64_t offset);
2627   int _remove(TransContext *txc,
2628               CollectionRef& c,
2629               OnodeRef& o);
2630   int _do_remove(TransContext *txc,
2631                  CollectionRef& c,
2632                  OnodeRef o);
2633   int _setattr(TransContext *txc,
2634                CollectionRef& c,
2635                OnodeRef& o,
2636                const string& name,
2637                bufferptr& val);
2638   int _setattrs(TransContext *txc,
2639                 CollectionRef& c,
2640                 OnodeRef& o,
2641                 const map<string,bufferptr>& aset);
2642   int _rmattr(TransContext *txc,
2643               CollectionRef& c,
2644               OnodeRef& o,
2645               const string& name);
2646   int _rmattrs(TransContext *txc,
2647                CollectionRef& c,
2648                OnodeRef& o);
2649   void _do_omap_clear(TransContext *txc, uint64_t id);
2650   int _omap_clear(TransContext *txc,
2651                   CollectionRef& c,
2652                   OnodeRef& o);
2653   int _omap_setkeys(TransContext *txc,
2654                     CollectionRef& c,
2655                     OnodeRef& o,
2656                     bufferlist& bl);
2657   int _omap_setheader(TransContext *txc,
2658                       CollectionRef& c,
2659                       OnodeRef& o,
2660                       bufferlist& header);
2661   int _omap_rmkeys(TransContext *txc,
2662                    CollectionRef& c,
2663                    OnodeRef& o,
2664                    bufferlist& bl);
2665   int _omap_rmkey_range(TransContext *txc,
2666                         CollectionRef& c,
2667                         OnodeRef& o,
2668                         const string& first, const string& last);
2669   int _set_alloc_hint(
2670     TransContext *txc,
2671     CollectionRef& c,
2672     OnodeRef& o,
2673     uint64_t expected_object_size,
2674     uint64_t expected_write_size,
2675     uint32_t flags);
2676   int _do_clone_range(TransContext *txc,
2677                       CollectionRef& c,
2678                       OnodeRef& oldo,
2679                       OnodeRef& newo,
2680                       uint64_t srcoff, uint64_t length, uint64_t dstoff);
2681   int _clone(TransContext *txc,
2682              CollectionRef& c,
2683              OnodeRef& oldo,
2684              OnodeRef& newo);
2685   int _clone_range(TransContext *txc,
2686                    CollectionRef& c,
2687                    OnodeRef& oldo,
2688                    OnodeRef& newo,
2689                    uint64_t srcoff, uint64_t length, uint64_t dstoff);
2690   int _rename(TransContext *txc,
2691               CollectionRef& c,
2692               OnodeRef& oldo,
2693               OnodeRef& newo,
2694               const ghobject_t& new_oid);
2695   int _create_collection(TransContext *txc, const coll_t &cid,
2696                          unsigned bits, CollectionRef *c);
2697   int _remove_collection(TransContext *txc, const coll_t &cid,
2698                          CollectionRef *c);
2699   int _split_collection(TransContext *txc,
2700                         CollectionRef& c,
2701                         CollectionRef& d,
2702                         unsigned bits, int rem);
2703 };
2704
2705 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2706   return out << *s.parent;
2707 }
2708
2709 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2710   o->get();
2711 }
2712 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2713   o->put();
2714 }
2715
2716 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2717   o->get();
2718 }
2719 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2720   o->put();
2721 }
2722
2723 #endif