Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / FileStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #ifndef CEPH_FILESTORE_H
17 #define CEPH_FILESTORE_H
18
19 #include "include/types.h"
20
21 #include <map>
22 #include <deque>
23 #include <atomic>
24 #include <fstream>
25
26 using namespace std;
27
28 #include <boost/scoped_ptr.hpp>
29
30 #include "include/unordered_map.h"
31
32 #include "include/assert.h"
33
34 #include "os/ObjectStore.h"
35 #include "JournalingObjectStore.h"
36
37 #include "common/Timer.h"
38 #include "common/WorkQueue.h"
39 #include "common/perf_counters.h"
40 #include "common/zipkin_trace.h"
41
42 #include "common/Mutex.h"
43 #include "HashIndex.h"
44 #include "IndexManager.h"
45 #include "os/ObjectMap.h"
46 #include "SequencerPosition.h"
47 #include "FDCache.h"
48 #include "WBThrottle.h"
49
50 #include "include/uuid.h"
51
52
53 // from include/linux/falloc.h:
54 #ifndef FALLOC_FL_PUNCH_HOLE
55 # define FALLOC_FL_PUNCH_HOLE 0x2
56 #endif
57
58 #if defined(__linux__)
59 # ifndef BTRFS_SUPER_MAGIC
60 #define BTRFS_SUPER_MAGIC 0x9123683EL
61 # endif
62 # ifndef XFS_SUPER_MAGIC
63 #define XFS_SUPER_MAGIC 0x58465342L
64 # endif
65 # ifndef ZFS_SUPER_MAGIC
66 #define ZFS_SUPER_MAGIC 0x2fc12fc1L
67 # endif
68 #endif
69
70
71 class FileStoreBackend;
72
73 #define CEPH_FS_FEATURE_INCOMPAT_SHARDS CompatSet::Feature(1, "sharded objects")
74
75 enum {
76   l_filestore_first = 84000,
77   l_filestore_journal_queue_ops,
78   l_filestore_journal_queue_bytes,
79   l_filestore_journal_ops,
80   l_filestore_journal_bytes,
81   l_filestore_journal_latency,
82   l_filestore_journal_wr,
83   l_filestore_journal_wr_bytes,
84   l_filestore_journal_full,
85   l_filestore_committing,
86   l_filestore_commitcycle,
87   l_filestore_commitcycle_interval,
88   l_filestore_commitcycle_latency,
89   l_filestore_op_queue_max_ops,
90   l_filestore_op_queue_ops,
91   l_filestore_ops,
92   l_filestore_op_queue_max_bytes,
93   l_filestore_op_queue_bytes,
94   l_filestore_bytes,
95   l_filestore_apply_latency,
96   l_filestore_queue_transaction_latency_avg,
97   l_filestore_sync_pause_max_lat,
98   l_filestore_last,
99 };
100
101 class FSSuperblock {
102 public:
103   CompatSet compat_features;
104   string omap_backend;
105
106   FSSuperblock() { }
107
108   void encode(bufferlist &bl) const;
109   void decode(bufferlist::iterator &bl);
110   void dump(Formatter *f) const;
111   static void generate_test_instances(list<FSSuperblock*>& o);
112 };
113 WRITE_CLASS_ENCODER(FSSuperblock)
114
115 inline ostream& operator<<(ostream& out, const FSSuperblock& sb)
116 {
117   return out << "sb(" << sb.compat_features << "): "
118              << sb.omap_backend;
119 }
120
121 class FileStore : public JournalingObjectStore,
122                   public md_config_obs_t
123 {
124   static const uint32_t target_version = 4;
125 public:
126   uint32_t get_target_version() {
127     return target_version;
128   }
129
130   static int get_block_device_fsid(CephContext* cct, const string& path,
131                                    uuid_d *fsid);
132   struct FSPerfTracker {
133     PerfCounters::avg_tracker<uint64_t> os_commit_latency;
134     PerfCounters::avg_tracker<uint64_t> os_apply_latency;
135
136     objectstore_perf_stat_t get_cur_stats() const {
137       objectstore_perf_stat_t ret;
138       ret.os_commit_latency = os_commit_latency.current_avg();
139       ret.os_apply_latency = os_apply_latency.current_avg();
140       return ret;
141     }
142
143     void update_from_perfcounters(PerfCounters &logger);
144   } perf_tracker;
145   objectstore_perf_stat_t get_cur_stats() override {
146     perf_tracker.update_from_perfcounters(*logger);
147     return perf_tracker.get_cur_stats();
148   }
149   const PerfCounters* get_perf_counters() const override {
150     return logger;
151   }
152
153 private:
154   string internal_name;         ///< internal name, used to name the perfcounter instance
155   string basedir, journalpath;
156   osflagbits_t generic_flags;
157   std::string current_fn;
158   std::string current_op_seq_fn;
159   std::string omap_dir;
160   uuid_d fsid;
161
162   size_t blk_size;            ///< fs block size
163
164   int fsid_fd, op_fd, basedir_fd, current_fd;
165
166   FileStoreBackend *backend;
167
168   void create_backend(long f_type);
169
170   deque<uint64_t> snaps;
171
172   // Indexed Collections
173   IndexManager index_manager;
174   int get_index(const coll_t& c, Index *index);
175   int init_index(const coll_t& c);
176
177   bool _need_temp_object_collection(const coll_t& cid, const ghobject_t& oid) {
178     // - normal temp case: cid is pg, object is temp (pool < -1)
179     // - hammer temp case: cid is pg (or already temp), object pool is -1
180     return cid.is_pg() && oid.hobj.pool <= -1;
181   }
182   void init_temp_collections();
183
184   // ObjectMap
185   boost::scoped_ptr<ObjectMap> object_map;
186
187   // helper fns
188   int get_cdir(const coll_t& cid, char *s, int len);
189
190   /// read a uuid from fd
191   int read_fsid(int fd, uuid_d *uuid);
192
193   /// lock fsid_fd
194   int lock_fsid();
195
196   // sync thread
197   Mutex lock;
198   bool force_sync;
199   Cond sync_cond;
200
201   Mutex sync_entry_timeo_lock;
202   SafeTimer timer;
203
204   list<Context*> sync_waiters;
205   bool stop;
206   void sync_entry();
207   struct SyncThread : public Thread {
208     FileStore *fs;
209     explicit SyncThread(FileStore *f) : fs(f) {}
210     void *entry() override {
211       fs->sync_entry();
212       return 0;
213     }
214   } sync_thread;
215
216   // -- op workqueue --
217   struct Op {
218     utime_t start;
219     uint64_t op;
220     vector<Transaction> tls;
221     Context *onreadable, *onreadable_sync;
222     uint64_t ops, bytes;
223     TrackedOpRef osd_op;
224     ZTracer::Trace trace;
225   };
226   class OpSequencer : public Sequencer_impl {
227     Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
228     list<Op*> q;
229     list<uint64_t> jq;
230     list<pair<uint64_t, Context*> > flush_commit_waiters;
231     Cond cond;
232   public:
233     Sequencer *parent;
234     Mutex apply_lock;  // for apply mutual exclusion
235     int id;
236
237     /// get_max_uncompleted
238     bool _get_max_uncompleted(
239       uint64_t *seq ///< [out] max uncompleted seq
240       ) {
241       assert(qlock.is_locked());
242       assert(seq);
243       *seq = 0;
244       if (q.empty() && jq.empty())
245         return true;
246
247       if (!q.empty())
248         *seq = q.back()->op;
249       if (!jq.empty() && jq.back() > *seq)
250         *seq = jq.back();
251
252       return false;
253     } /// @returns true if both queues are empty
254
255     /// get_min_uncompleted
256     bool _get_min_uncompleted(
257       uint64_t *seq ///< [out] min uncompleted seq
258       ) {
259       assert(qlock.is_locked());
260       assert(seq);
261       *seq = 0;
262       if (q.empty() && jq.empty())
263         return true;
264
265       if (!q.empty())
266         *seq = q.front()->op;
267       if (!jq.empty() && jq.front() < *seq)
268         *seq = jq.front();
269
270       return false;
271     } /// @returns true if both queues are empty
272
273     void _wake_flush_waiters(list<Context*> *to_queue) {
274       uint64_t seq;
275       if (_get_min_uncompleted(&seq))
276         seq = -1;
277
278       for (list<pair<uint64_t, Context*> >::iterator i =
279              flush_commit_waiters.begin();
280            i != flush_commit_waiters.end() && i->first < seq;
281            flush_commit_waiters.erase(i++)) {
282         to_queue->push_back(i->second);
283       }
284     }
285
286     void queue_journal(uint64_t s) {
287       Mutex::Locker l(qlock);
288       jq.push_back(s);
289     }
290     void dequeue_journal(list<Context*> *to_queue) {
291       Mutex::Locker l(qlock);
292       jq.pop_front();
293       cond.Signal();
294       _wake_flush_waiters(to_queue);
295     }
296     void queue(Op *o) {
297       Mutex::Locker l(qlock);
298       q.push_back(o);
299       o->trace.keyval("queue depth", q.size());
300     }
301     Op *peek_queue() {
302       Mutex::Locker l(qlock);
303       assert(apply_lock.is_locked());
304       return q.front();
305     }
306
307     Op *dequeue(list<Context*> *to_queue) {
308       assert(to_queue);
309       assert(apply_lock.is_locked());
310       Mutex::Locker l(qlock);
311       Op *o = q.front();
312       q.pop_front();
313       cond.Signal();
314
315       _wake_flush_waiters(to_queue);
316       return o;
317     }
318
319     void flush() override {
320       Mutex::Locker l(qlock);
321
322       while (cct->_conf->filestore_blackhole)
323         cond.Wait(qlock);  // wait forever
324
325
326       // get max for journal _or_ op queues
327       uint64_t seq = 0;
328       if (!q.empty())
329         seq = q.back()->op;
330       if (!jq.empty() && jq.back() > seq)
331         seq = jq.back();
332
333       if (seq) {
334         // everything prior to our watermark to drain through either/both queues
335         while ((!q.empty() && q.front()->op <= seq) ||
336                (!jq.empty() && jq.front() <= seq))
337           cond.Wait(qlock);
338       }
339     }
340     bool flush_commit(Context *c) override {
341       Mutex::Locker l(qlock);
342       uint64_t seq = 0;
343       if (_get_max_uncompleted(&seq)) {
344         return true;
345       } else {
346         flush_commit_waiters.push_back(make_pair(seq, c));
347         return false;
348       }
349     }
350
351     OpSequencer(CephContext* cct, int i)
352       : Sequencer_impl(cct),
353         qlock("FileStore::OpSequencer::qlock", false, false),
354         parent(0),
355         apply_lock("FileStore::OpSequencer::apply_lock", false, false),
356         id(i) {}
357     ~OpSequencer() override {
358       assert(q.empty());
359     }
360
361     const string& get_name() const {
362       return parent->get_name();
363     }
364   };
365
366   friend ostream& operator<<(ostream& out, const OpSequencer& s);
367
368   FDCache fdcache;
369   WBThrottle wbthrottle;
370
371   std::atomic<int64_t> next_osr_id = { 0 };
372   bool m_disable_wbthrottle;
373   deque<OpSequencer*> op_queue;
374   BackoffThrottle throttle_ops, throttle_bytes;
375   const int m_ondisk_finisher_num;
376   const int m_apply_finisher_num;
377   vector<Finisher*> ondisk_finishers;
378   vector<Finisher*> apply_finishers;
379
380   ThreadPool op_tp;
381   struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
382     FileStore *store;
383     OpWQ(FileStore *fs, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
384       : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", timeout, suicide_timeout, tp), store(fs) {}
385
386     bool _enqueue(OpSequencer *osr) override {
387       store->op_queue.push_back(osr);
388       return true;
389     }
390     void _dequeue(OpSequencer *o) override {
391       ceph_abort();
392     }
393     bool _empty() override {
394       return store->op_queue.empty();
395     }
396     OpSequencer *_dequeue() override {
397       if (store->op_queue.empty())
398         return NULL;
399       OpSequencer *osr = store->op_queue.front();
400       store->op_queue.pop_front();
401       return osr;
402     }
403     void _process(OpSequencer *osr, ThreadPool::TPHandle &handle) override {
404       store->_do_op(osr, handle);
405     }
406     void _process_finish(OpSequencer *osr) override {
407       store->_finish_op(osr);
408     }
409     void _clear() override {
410       assert(store->op_queue.empty());
411     }
412   } op_wq;
413
414   void _do_op(OpSequencer *o, ThreadPool::TPHandle &handle);
415   void _finish_op(OpSequencer *o);
416   Op *build_op(vector<Transaction>& tls,
417                Context *onreadable, Context *onreadable_sync,
418                TrackedOpRef osd_op);
419   void queue_op(OpSequencer *osr, Op *o);
420   void op_queue_reserve_throttle(Op *o);
421   void op_queue_release_throttle(Op *o);
422   void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
423   friend struct C_JournaledAhead;
424
425   void new_journal();
426
427   PerfCounters *logger;
428
429   ZTracer::Endpoint trace_endpoint;
430
431 public:
432   int lfn_find(const ghobject_t& oid, const Index& index,
433                                   IndexedPath *path = NULL);
434   int lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length);
435   int lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf);
436   int lfn_open(
437     const coll_t& cid,
438     const ghobject_t& oid,
439     bool create,
440     FDRef *outfd,
441     Index *index = 0);
442
443   void lfn_close(FDRef fd);
444   int lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid) ;
445   int lfn_unlink(const coll_t& cid, const ghobject_t& o, const SequencerPosition &spos,
446                  bool force_clear_omap=false);
447
448 public:
449   FileStore(CephContext* cct, const std::string &base, const std::string &jdev,
450             osflagbits_t flags = 0,
451     const char *internal_name = "filestore", bool update_to=false);
452   ~FileStore() override;
453
454   string get_type() override {
455     return "filestore";
456   }
457
458   int _detect_fs();
459   int _sanity_check_fs();
460
461   bool test_mount_in_use() override;
462   int read_op_seq(uint64_t *seq);
463   int write_op_seq(int, uint64_t seq);
464   int mount() override;
465   int umount() override;
466
467   int validate_hobject_key(const hobject_t &obj) const override;
468
469   unsigned get_max_attr_name_length() override {
470     // xattr limit is 128; leave room for our prefixes (user.ceph._),
471     // some margin, and cap at 100
472     return 100;
473   }
474   int mkfs() override;
475   int mkjournal() override;
476   bool wants_journal() override {
477     return true;
478   }
479   bool allows_journal() override {
480     return true;
481   }
482   bool needs_journal() override {
483     return false;
484   }
485
486   bool is_rotational() override;
487   bool is_journal_rotational() override;
488
489   void dump_perf_counters(Formatter *f) override {
490     f->open_object_section("perf_counters");
491     logger->dump_formatted(f, false);
492     f->close_section();
493   }
494
495   int write_version_stamp();
496   int version_stamp_is_valid(uint32_t *version);
497   int update_version_stamp();
498   int upgrade() override;
499
500   bool can_sort_nibblewise() override {
501     return true;    // i support legacy sort order
502   }
503
504   void collect_metadata(map<string,string> *pm) override;
505
506   int statfs(struct store_statfs_t *buf) override;
507
508   int _do_transactions(
509     vector<Transaction> &tls, uint64_t op_seq,
510     ThreadPool::TPHandle *handle);
511   int do_transactions(vector<Transaction> &tls, uint64_t op_seq) override {
512     return _do_transactions(tls, op_seq, 0);
513   }
514   void _do_transaction(
515     Transaction& t, uint64_t op_seq, int trans_num,
516     ThreadPool::TPHandle *handle);
517
518   int queue_transactions(Sequencer *osr, vector<Transaction>& tls,
519                          TrackedOpRef op = TrackedOpRef(),
520                          ThreadPool::TPHandle *handle = NULL) override;
521
522   /**
523    * set replay guard xattr on given file
524    *
525    * This will ensure that we will not replay this (or any previous) operation
526    * against this particular inode/object.
527    *
528    * @param fd open file descriptor for the file/object
529    * @param spos sequencer position of the last operation we should not replay
530    */
531   void _set_replay_guard(int fd,
532                          const SequencerPosition& spos,
533                          const ghobject_t *oid=0,
534                          bool in_progress=false);
535   void _set_replay_guard(const coll_t& cid,
536                          const SequencerPosition& spos,
537                          bool in_progress);
538   void _set_global_replay_guard(const coll_t& cid,
539                                 const SequencerPosition &spos);
540
541   /// close a replay guard opened with in_progress=true
542   void _close_replay_guard(int fd, const SequencerPosition& spos,
543                            const ghobject_t *oid=0);
544   void _close_replay_guard(const coll_t& cid, const SequencerPosition& spos);
545
546   /**
547    * check replay guard xattr on given file
548    *
549    * Check the current position against any marker on the file that
550    * indicates which operations have already been applied.  If the
551    * current or a newer operation has been marked as applied, we
552    * should not replay the current operation again.
553    *
554    * If we are not replaying the journal, we already return true.  It
555    * is only on replay that we might return false, indicated that the
556    * operation should not be performed (again).
557    *
558    * @param fd open fd on the file/object in question
559    * @param spos sequencerposition for an operation we could apply/replay
560    * @return 1 if we can apply (maybe replay) this operation, -1 if spos has already been applied, 0 if it was in progress
561    */
562   int _check_replay_guard(int fd, const SequencerPosition& spos);
563   int _check_replay_guard(const coll_t& cid, const SequencerPosition& spos);
564   int _check_replay_guard(const coll_t& cid, const ghobject_t &oid, const SequencerPosition& pos);
565   int _check_global_replay_guard(const coll_t& cid, const SequencerPosition& spos);
566
567   // ------------------
568   // objects
569   int pick_object_revision_lt(ghobject_t& oid) {
570     return 0;
571   }
572   using ObjectStore::exists;
573   bool exists(const coll_t& cid, const ghobject_t& oid) override;
574   using ObjectStore::stat;
575   int stat(
576     const coll_t& cid,
577     const ghobject_t& oid,
578     struct stat *st,
579     bool allow_eio = false) override;
580   using ObjectStore::set_collection_opts;
581   int set_collection_opts(
582     const coll_t& cid,
583     const pool_opts_t& opts) override;
584   using ObjectStore::read;
585   int read(
586     const coll_t& cid,
587     const ghobject_t& oid,
588     uint64_t offset,
589     size_t len,
590     bufferlist& bl,
591     uint32_t op_flags = 0) override;
592   int _do_fiemap(int fd, uint64_t offset, size_t len,
593                  map<uint64_t, uint64_t> *m);
594   int _do_seek_hole_data(int fd, uint64_t offset, size_t len,
595                          map<uint64_t, uint64_t> *m);
596   using ObjectStore::fiemap;
597   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, bufferlist& bl) override;
598   int fiemap(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
599
600   int _touch(const coll_t& cid, const ghobject_t& oid);
601   int _write(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len,
602               const bufferlist& bl, uint32_t fadvise_flags = 0);
603   int _zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len);
604   int _truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size);
605   int _clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
606              const SequencerPosition& spos);
607   int _clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
608                    uint64_t srcoff, uint64_t len, uint64_t dstoff,
609                    const SequencerPosition& spos);
610   int _do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
611   int _do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
612   int _do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc=false);
613   int _remove(const coll_t& cid, const ghobject_t& oid, const SequencerPosition &spos);
614
615   int _fgetattr(int fd, const char *name, bufferptr& bp);
616   int _fgetattrs(int fd, map<string,bufferptr>& aset);
617   int _fsetattrs(int fd, map<string, bufferptr> &aset);
618
619   void _start_sync();
620
621   void do_force_sync();
622   void start_sync(Context *onsafe);
623   void sync();
624   void _flush_op_queue();
625   void flush();
626   void sync_and_flush();
627
628   int flush_journal() override;
629   int dump_journal(ostream& out) override;
630
631   void set_fsid(uuid_d u) override {
632     fsid = u;
633   }
634   uuid_d get_fsid() override { return fsid; }
635   
636   uint64_t estimate_objects_overhead(uint64_t num_objects) override;
637
638   // DEBUG read error injection, an object is removed from both on delete()
639   Mutex read_error_lock;
640   set<ghobject_t> data_error_set; // read() will return -EIO
641   set<ghobject_t> mdata_error_set; // getattr(),stat() will return -EIO
642   void inject_data_error(const ghobject_t &oid) override;
643   void inject_mdata_error(const ghobject_t &oid) override;
644
645   void compact() override {
646     assert(object_map);
647     object_map->compact();
648   }
649
650   void debug_obj_on_delete(const ghobject_t &oid);
651   bool debug_data_eio(const ghobject_t &oid);
652   bool debug_mdata_eio(const ghobject_t &oid);
653
654   int snapshot(const string& name) override;
655
656   // attrs
657   using ObjectStore::getattr;
658   using ObjectStore::getattrs;
659   int getattr(const coll_t& cid, const ghobject_t& oid, const char *name, bufferptr &bp) override;
660   int getattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset) override;
661
662   int _setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
663                 const SequencerPosition &spos);
664   int _rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
665               const SequencerPosition &spos);
666   int _rmattrs(const coll_t& cid, const ghobject_t& oid,
667                const SequencerPosition &spos);
668
669   int _collection_remove_recursive(const coll_t &cid,
670                                    const SequencerPosition &spos);
671
672   int _collection_set_bits(const coll_t& cid, int bits);
673
674   // collections
675   using ObjectStore::collection_list;
676   int collection_bits(const coll_t& c) override;
677   int collection_list(const coll_t& c,
678                       const ghobject_t& start, const ghobject_t& end, int max,
679                       vector<ghobject_t> *ls, ghobject_t *next) override;
680   int list_collections(vector<coll_t>& ls) override;
681   int list_collections(vector<coll_t>& ls, bool include_temp);
682   int collection_stat(const coll_t& c, struct stat *st);
683   bool collection_exists(const coll_t& c) override;
684   int collection_empty(const coll_t& c, bool *empty) override;
685
686   // omap (see ObjectStore.h for documentation)
687   using ObjectStore::omap_get;
688   int omap_get(const coll_t& c, const ghobject_t &oid, bufferlist *header,
689                map<string, bufferlist> *out) override;
690   using ObjectStore::omap_get_header;
691   int omap_get_header(
692     const coll_t& c,
693     const ghobject_t &oid,
694     bufferlist *out,
695     bool allow_eio = false) override;
696   using ObjectStore::omap_get_keys;
697   int omap_get_keys(const coll_t& c, const ghobject_t &oid, set<string> *keys) override;
698   using ObjectStore::omap_get_values;
699   int omap_get_values(const coll_t& c, const ghobject_t &oid, const set<string> &keys,
700                       map<string, bufferlist> *out) override;
701   using ObjectStore::omap_check_keys;
702   int omap_check_keys(const coll_t& c, const ghobject_t &oid, const set<string> &keys,
703                       set<string> *out) override;
704   using ObjectStore::get_omap_iterator;
705   ObjectMap::ObjectMapIterator get_omap_iterator(const coll_t& c, const ghobject_t &oid) override;
706
707   int _create_collection(const coll_t& c, int bits,
708                          const SequencerPosition &spos);
709   int _destroy_collection(const coll_t& c);
710   /**
711    * Give an expected number of objects hint to the collection.
712    *
713    * @param c                 - collection id.
714    * @param pg_num            - pg number of the pool this collection belongs to
715    * @param expected_num_objs - expected number of objects in this collection
716    * @param spos              - sequence position
717    *
718    * @return 0 on success, an error code otherwise
719    */
720   int _collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
721       uint64_t expected_num_objs,
722       const SequencerPosition &spos);
723   int _collection_add(const coll_t& c, const coll_t& ocid, const ghobject_t& oid,
724                       const SequencerPosition& spos);
725   int _collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
726                               coll_t c, const ghobject_t& o,
727                               const SequencerPosition& spos,
728                               bool ignore_enoent = false);
729
730   int _set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
731                       uint64_t expected_object_size,
732                       uint64_t expected_write_size);
733
734   void dump_start(const std::string& file);
735   void dump_stop();
736   void dump_transactions(vector<Transaction>& ls, uint64_t seq, OpSequencer *osr);
737
738   virtual int apply_layout_settings(const coll_t &cid);
739
740 private:
741   void _inject_failure();
742
743   // omap
744   int _omap_clear(const coll_t& cid, const ghobject_t &oid,
745                   const SequencerPosition &spos);
746   int _omap_setkeys(const coll_t& cid, const ghobject_t &oid,
747                     const map<string, bufferlist> &aset,
748                     const SequencerPosition &spos);
749   int _omap_rmkeys(const coll_t& cid, const ghobject_t &oid, const set<string> &keys,
750                    const SequencerPosition &spos);
751   int _omap_rmkeyrange(const coll_t& cid, const ghobject_t &oid,
752                        const string& first, const string& last,
753                        const SequencerPosition &spos);
754   int _omap_setheader(const coll_t& cid, const ghobject_t &oid, const bufferlist &bl,
755                       const SequencerPosition &spos);
756   int _split_collection(const coll_t& cid, uint32_t bits, uint32_t rem, coll_t dest,
757                         const SequencerPosition &spos);
758   int _split_collection_create(const coll_t& cid, uint32_t bits, uint32_t rem,
759                                coll_t dest,
760                                const SequencerPosition &spos);
761
762   const char** get_tracked_conf_keys() const override;
763   void handle_conf_change(const struct md_config_t *conf,
764                           const std::set <std::string> &changed) override;
765   int set_throttle_params();
766   float m_filestore_commit_timeout;
767   bool m_filestore_journal_parallel;
768   bool m_filestore_journal_trailing;
769   bool m_filestore_journal_writeahead;
770   int m_filestore_fiemap_threshold;
771   double m_filestore_max_sync_interval;
772   double m_filestore_min_sync_interval;
773   bool m_filestore_fail_eio;
774   bool m_filestore_fadvise;
775   int do_update;
776   bool m_journal_dio, m_journal_aio, m_journal_force_aio;
777   std::string m_osd_rollback_to_cluster_snap;
778   bool m_osd_use_stale_snap;
779   bool m_filestore_do_dump;
780   std::ofstream m_filestore_dump;
781   JSONFormatter m_filestore_dump_fmt;
782   std::atomic<int64_t> m_filestore_kill_at = { 0 };
783   bool m_filestore_sloppy_crc;
784   int m_filestore_sloppy_crc_block_size;
785   uint64_t m_filestore_max_alloc_hint_size;
786   long m_fs_type;
787
788   //Determined xattr handling based on fs type
789   void set_xattr_limits_via_conf();
790   uint32_t m_filestore_max_inline_xattr_size;
791   uint32_t m_filestore_max_inline_xattrs;
792   uint32_t m_filestore_max_xattr_value_size;
793
794   FSSuperblock superblock;
795
796   /**
797    * write_superblock()
798    *
799    * Write superblock to persisent storage
800    *
801    * return value: 0 on success, otherwise negative errno
802    */
803   int write_superblock();
804
805   /**
806    * read_superblock()
807    *
808    * Fill in FileStore::superblock by reading persistent storage
809    *
810    * return value: 0 on success, otherwise negative errno
811    */
812   int read_superblock();
813
814   friend class FileStoreBackend;
815   friend class TestFileStore;
816 };
817
818 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s);
819
820 struct fiemap;
821
822 class FileStoreBackend {
823 private:
824   FileStore *filestore;
825 protected:
826   int get_basedir_fd() {
827     return filestore->basedir_fd;
828   }
829   int get_current_fd() {
830     return filestore->current_fd;
831   }
832   int get_op_fd() {
833     return filestore->op_fd;
834   }
835   size_t get_blksize() {
836     return filestore->blk_size;
837   }
838   const string& get_basedir_path() {
839     return filestore->basedir;
840   }
841   const string& get_journal_path() {
842     return filestore->journalpath;
843   }
844   const string& get_current_path() {
845     return filestore->current_fn;
846   }
847   int _copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff) {
848     if (has_fiemap() || has_seek_data_hole()) {
849       return filestore->_do_sparse_copy_range(from, to, srcoff, len, dstoff);
850     } else {
851       return filestore->_do_copy_range(from, to, srcoff, len, dstoff);
852     }
853   }
854   int get_crc_block_size() {
855     return filestore->m_filestore_sloppy_crc_block_size;
856   }
857
858 public:
859   explicit FileStoreBackend(FileStore *fs) : filestore(fs) {}
860   virtual ~FileStoreBackend() {}
861
862   CephContext* cct() const {
863     return filestore->cct;
864   }
865
866   static FileStoreBackend *create(long f_type, FileStore *fs);
867
868   virtual const char *get_name() = 0;
869   virtual int detect_features() = 0;
870   virtual int create_current() = 0;
871   virtual bool can_checkpoint() = 0;
872   virtual int list_checkpoints(list<string>& ls) = 0;
873   virtual int create_checkpoint(const string& name, uint64_t *cid) = 0;
874   virtual int sync_checkpoint(uint64_t id) = 0;
875   virtual int rollback_to(const string& name) = 0;
876   virtual int destroy_checkpoint(const string& name) = 0;
877   virtual int syncfs() = 0;
878   virtual bool has_fiemap() = 0;
879   virtual bool has_seek_data_hole() = 0;
880   virtual bool is_rotational() = 0;
881   virtual bool is_journal_rotational() = 0;
882   virtual int do_fiemap(int fd, off_t start, size_t len, struct fiemap **pfiemap) = 0;
883   virtual int clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff) = 0;
884   virtual int set_alloc_hint(int fd, uint64_t hint) = 0;
885   virtual bool has_splice() const = 0;
886
887   // hooks for (sloppy) crc tracking
888   virtual int _crc_update_write(int fd, loff_t off, size_t len, const bufferlist& bl) = 0;
889   virtual int _crc_update_truncate(int fd, loff_t off) = 0;
890   virtual int _crc_update_zero(int fd, loff_t off, size_t len) = 0;
891   virtual int _crc_update_clone_range(int srcfd, int destfd,
892                                       loff_t srcoff, size_t len, loff_t dstoff) = 0;
893   virtual int _crc_verify_read(int fd, loff_t off, size_t len, const bufferlist& bl,
894                                ostream *out) = 0;
895 };
896
897 #endif