Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / FileJournal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
18
19 #include <deque>
20 using std::deque;
21
22 #include "Journal.h"
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
29
30 #ifdef HAVE_LIBAIO
31 # include <libaio.h>
32 #endif
33
34 // re-include our assert to clobber the system one; fix dout:
35 #include "include/assert.h"
36
37 /**
38  * Implements journaling on top of block device or file.
39  *
40  * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
41  */
42 class FileJournal :
43   public Journal,
44   public md_config_obs_t {
45 public:
46   /// Protected by finisher_lock
47   struct completion_item {
48     uint64_t seq;
49     Context *finish;
50     utime_t start;
51     TrackedOpRef tracked_op;
52     completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
53       : seq(o), finish(c), start(s), tracked_op(opref) {}
54     completion_item() : seq(0), finish(0), start(0) {}
55   };
56   struct write_item {
57     uint64_t seq;
58     bufferlist bl;
59     uint32_t orig_len;
60     TrackedOpRef tracked_op;
61     ZTracer::Trace trace;
62     write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
63       seq(s), orig_len(ol), tracked_op(opref) {
64       bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
65     }
66     write_item() : seq(0), orig_len(0) {}
67   };
68
69   Mutex finisher_lock;
70   Cond finisher_cond;
71   uint64_t journaled_seq;
72   bool plug_journal_completions;
73
74   Mutex writeq_lock;
75   Cond writeq_cond;
76   list<write_item> writeq;
77   bool writeq_empty();
78   write_item &peek_write();
79   void pop_write();
80   void batch_pop_write(list<write_item> &items);
81   void batch_unpop_write(list<write_item> &items);
82
83   Mutex completions_lock;
84   list<completion_item> completions;
85   bool completions_empty() {
86     Mutex::Locker l(completions_lock);
87     return completions.empty();
88   }
89   void batch_pop_completions(list<completion_item> &items) {
90     Mutex::Locker l(completions_lock);
91     completions.swap(items);
92   }
93   void batch_unpop_completions(list<completion_item> &items) {
94     Mutex::Locker l(completions_lock);
95     completions.splice(completions.begin(), items);
96   }
97   completion_item completion_peek_front() {
98     Mutex::Locker l(completions_lock);
99     assert(!completions.empty());
100     return completions.front();
101   }
102   void completion_pop_front() {
103     Mutex::Locker l(completions_lock);
104     assert(!completions.empty());
105     completions.pop_front();
106   }
107
108   int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override;
109
110   void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
111                     Context *oncommit,
112                     TrackedOpRef osd_op = TrackedOpRef()) override;
113   /// End protected by finisher_lock
114
115   /*
116    * journal header
117    */
118   struct header_t {
119     enum {
120       FLAG_CRC = (1<<0),
121       // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
122     };
123
124     uint64_t flags;
125     uuid_d fsid;
126     __u32 block_size;
127     __u32 alignment;
128     int64_t max_size;   // max size of journal ring buffer
129     int64_t start;      // offset of first entry
130     uint64_t committed_up_to; // committed up to
131
132     /**
133      * start_seq
134      *
135      * entry at header.start has sequence >= start_seq
136      *
137      * Generally, the entry at header.start will have sequence
138      * start_seq if it exists.  The only exception is immediately
139      * after journal creation since the first sequence number is
140      * not known.
141      *
142      * If the first read on open fails, we can assume corruption
143      * if start_seq > committed_up_to because the entry would have
144      * a sequence >= start_seq and therefore > committed_up_to.
145      */
146     uint64_t start_seq;
147
148     header_t() :
149       flags(0), block_size(0), alignment(0), max_size(0), start(0),
150       committed_up_to(0), start_seq(0) {}
151
152     void clear() {
153       start = block_size;
154     }
155
156     uint64_t get_fsid64() const {
157       return *(uint64_t*)fsid.bytes();
158     }
159
160     void encode(bufferlist& bl) const {
161       __u32 v = 4;
162       ::encode(v, bl);
163       bufferlist em;
164       {
165         ::encode(flags, em);
166         ::encode(fsid, em);
167         ::encode(block_size, em);
168         ::encode(alignment, em);
169         ::encode(max_size, em);
170         ::encode(start, em);
171         ::encode(committed_up_to, em);
172         ::encode(start_seq, em);
173       }
174       ::encode(em, bl);
175     }
176     void decode(bufferlist::iterator& bl) {
177       __u32 v;
178       ::decode(v, bl);
179       if (v < 2) {  // normally 0, but concievably 1
180         // decode old header_t struct (pre v0.40).
181         bl.advance(4); // skip __u32 flags (it was unused by any old code)
182         flags = 0;
183         uint64_t tfsid;
184         ::decode(tfsid, bl);
185         *(uint64_t*)&fsid.bytes()[0] = tfsid;
186         *(uint64_t*)&fsid.bytes()[8] = tfsid;
187         ::decode(block_size, bl);
188         ::decode(alignment, bl);
189         ::decode(max_size, bl);
190         ::decode(start, bl);
191         committed_up_to = 0;
192         start_seq = 0;
193         return;
194       }
195       bufferlist em;
196       ::decode(em, bl);
197       bufferlist::iterator t = em.begin();
198       ::decode(flags, t);
199       ::decode(fsid, t);
200       ::decode(block_size, t);
201       ::decode(alignment, t);
202       ::decode(max_size, t);
203       ::decode(start, t);
204
205       if (v > 2)
206         ::decode(committed_up_to, t);
207       else
208         committed_up_to = 0;
209
210       if (v > 3)
211         ::decode(start_seq, t);
212       else
213         start_seq = 0;
214     }
215   } header;
216
217   struct entry_header_t {
218     uint64_t seq;     // fs op seq #
219     uint32_t crc32c;  // payload only.  not header, pre_pad, post_pad, or footer.
220     uint32_t len;
221     uint32_t pre_pad, post_pad;
222     uint64_t magic1;
223     uint64_t magic2;
224
225     static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
226       return (fsid ^ seq ^ len);
227     }
228     bool check_magic(off64_t pos, uint64_t fsid) {
229       return
230     magic1 == (uint64_t)pos &&
231     magic2 == (fsid ^ seq ^ len);
232     }
233   } __attribute__((__packed__, aligned(4)));
234
235   bool journalq_empty() { return journalq.empty(); }
236
237 private:
238   string fn;
239
240   char *zero_buf;
241   off64_t max_size;
242   size_t block_size;
243   bool directio, aio, force_aio;
244   bool must_write_header;
245   off64_t write_pos;      // byte where the next entry to be written will go
246   off64_t read_pos;       //
247   bool discard;   //for block journal whether support discard
248
249 #ifdef HAVE_LIBAIO
250   /// state associated with an in-flight aio request
251   /// Protected by aio_lock
252   struct aio_info {
253     struct iocb iocb;
254     bufferlist bl;
255     struct iovec *iov;
256     bool done;
257     uint64_t off, len;    ///< these are for debug only
258     uint64_t seq;         ///< seq number to complete on aio completion, if non-zero
259
260     aio_info(bufferlist& b, uint64_t o, uint64_t s)
261       : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
262       bl.claim(b);
263     }
264     ~aio_info() {
265       delete[] iov;
266     }
267   };
268   Mutex aio_lock;
269   Cond aio_cond;
270   Cond write_finish_cond;
271   io_context_t aio_ctx;
272   list<aio_info> aio_queue;
273   int aio_num, aio_bytes;
274   uint64_t aio_write_queue_ops;
275   uint64_t aio_write_queue_bytes;
276   /// End protected by aio_lock
277 #endif
278
279   uint64_t last_committed_seq;
280   uint64_t journaled_since_start;
281
282   /*
283    * full states cycle at the beginnging of each commit epoch, when commit_start()
284    * is called.
285    *   FULL - we just filled up during this epoch.
286    *   WAIT - we filled up last epoch; now we have to wait until everything during
287    *          that epoch commits to the fs before we can start writing over it.
288    *   NOTFULL - all good, journal away.
289    */
290   enum {
291     FULL_NOTFULL = 0,
292     FULL_FULL = 1,
293     FULL_WAIT = 2,
294   } full_state;
295
296   int fd;
297
298   // in journal
299   deque<pair<uint64_t, off64_t> > journalq;  // track seq offsets, so we can trim later.
300   uint64_t writing_seq;
301
302
303   // throttle
304   int set_throttle_params();
305   const char** get_tracked_conf_keys() const override;
306   void handle_conf_change(
307     const struct md_config_t *conf,
308     const std::set <std::string> &changed) override {
309     for (const char **i = get_tracked_conf_keys();
310          *i;
311          ++i) {
312       if (changed.count(string(*i))) {
313         set_throttle_params();
314         return;
315       }
316     }
317   }
318
319   void complete_write(uint64_t ops, uint64_t bytes);
320   JournalThrottle throttle;
321
322   // write thread
323   Mutex write_lock;
324   bool write_stop;
325   bool aio_stop;
326
327   Cond commit_cond;
328
329   int _open(bool wr, bool create=false);
330   int _open_block_device();
331   void _close(int fd) const;
332   int _open_file(int64_t oldsize, blksize_t blksize, bool create);
333   int _dump(ostream& out, bool simple);
334   void print_header(const header_t &hdr) const;
335   int read_header(header_t *hdr) const;
336   bufferptr prepare_header();
337   void start_writer();
338   void stop_writer();
339   void write_thread_entry();
340
341   void queue_completions_thru(uint64_t seq);
342
343   int check_for_full(uint64_t seq, off64_t pos, off64_t size);
344   int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
345   int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
346     uint64_t& orig_ops, uint64_t& orig_bytes);
347   void do_write(bufferlist& bl);
348
349   void write_finish_thread_entry();
350   void check_aio_completion();
351   void do_aio_write(bufferlist& bl);
352   int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
353
354
355   void check_align(off64_t pos, bufferlist& bl);
356   int write_bl(off64_t& pos, bufferlist& bl);
357
358   /// read len from journal starting at in_pos and wrapping up to len
359   void wrap_read_bl(
360     off64_t in_pos,   ///< [in] start position
361     int64_t len,      ///< [in] length to read
362     bufferlist* bl,   ///< [out] result
363     off64_t *out_pos  ///< [out] next position to read, will be wrapped
364     ) const;
365
366   void do_discard(int64_t offset, int64_t end);
367
368   class Writer : public Thread {
369     FileJournal *journal;
370   public:
371     explicit Writer(FileJournal *fj) : journal(fj) {}
372     void *entry() override {
373       journal->write_thread_entry();
374       return 0;
375     }
376   } write_thread;
377
378   class WriteFinisher : public Thread {
379     FileJournal *journal;
380   public:
381     explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
382     void *entry() override {
383       journal->write_finish_thread_entry();
384       return 0;
385     }
386   } write_finish_thread;
387
388   off64_t get_top() const {
389     return ROUND_UP_TO(sizeof(header), block_size);
390   }
391
392   ZTracer::Endpoint trace_endpoint;
393
394  public:
395   FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
396               const char *f, bool dio=false, bool ai=true, bool faio=false) :
397     Journal(cct, fsid, fin, sync_cond),
398     finisher_lock("FileJournal::finisher_lock", false, true, false, cct),
399     journaled_seq(0),
400     plug_journal_completions(false),
401     writeq_lock("FileJournal::writeq_lock", false, true, false, cct),
402     completions_lock(
403       "FileJournal::completions_lock", false, true, false, cct),
404     fn(f),
405     zero_buf(NULL),
406     max_size(0), block_size(0),
407     directio(dio), aio(ai), force_aio(faio),
408     must_write_header(false),
409     write_pos(0), read_pos(0),
410     discard(false),
411 #ifdef HAVE_LIBAIO
412     aio_lock("FileJournal::aio_lock"),
413     aio_ctx(0),
414     aio_num(0), aio_bytes(0),
415     aio_write_queue_ops(0),
416     aio_write_queue_bytes(0),
417 #endif
418     last_committed_seq(0),
419     journaled_since_start(0),
420     full_state(FULL_NOTFULL),
421     fd(-1),
422     writing_seq(0),
423     throttle(cct->_conf->filestore_caller_concurrency),
424     write_lock("FileJournal::write_lock", false, true, false, cct),
425     write_stop(true),
426     aio_stop(true),
427     write_thread(this),
428     write_finish_thread(this),
429     trace_endpoint("0.0.0.0", 0, "FileJournal") {
430
431       if (aio && !directio) {
432         lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
433         aio = false;
434       }
435 #ifndef HAVE_LIBAIO
436       if (aio) {
437         lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
438         aio = false;
439       }
440 #endif
441
442       cct->_conf->add_observer(this);
443   }
444   ~FileJournal() override {
445     assert(fd == -1);
446     delete[] zero_buf;
447     cct->_conf->remove_observer(this);
448   }
449
450   int check() override;
451   int create() override;
452   int open(uint64_t fs_op_seq) override;
453   void close() override;
454   int peek_fsid(uuid_d& fsid);
455
456   int dump(ostream& out) override;
457   int simple_dump(ostream& out);
458   int _fdump(Formatter &f, bool simple);
459
460   void flush() override;
461
462   void reserve_throttle_and_backoff(uint64_t count) override;
463
464   bool is_writeable() override {
465     return read_pos == 0;
466   }
467   int make_writeable() override;
468
469   // writes
470   void commit_start(uint64_t seq) override;
471   void committed_thru(uint64_t seq) override;
472   bool should_commit_now() override {
473     return full_state != FULL_NOTFULL && !write_stop;
474   }
475
476   void write_header_sync();
477
478   void set_wait_on_full(bool b) { wait_on_full = b; }
479
480   off64_t get_journal_size_estimate();
481
482   // reads
483
484   /// Result code for read_entry
485   enum read_entry_result {
486     SUCCESS,
487     FAILURE,
488     MAYBE_CORRUPT
489   };
490
491   /**
492    * read_entry
493    *
494    * Reads next entry starting at pos.  If the entry appears
495    * clean, *bl will contain the payload, *seq will contain
496    * the sequence number, and *out_pos will reflect the next
497    * read position.  If the entry is invalid *ss will contain
498    * debug text, while *seq, *out_pos, and *bl will be unchanged.
499    *
500    * If the entry suggests a corrupt log, *ss will contain debug
501    * text, *out_pos will contain the next index to check.  If
502    * we find an entry in this way that returns SUCCESS, the journal
503    * is most likely corrupt.
504    */
505   read_entry_result do_read_entry(
506     off64_t pos,          ///< [in] position to read
507     off64_t *next_pos,    ///< [out] next position to read
508     bufferlist* bl,       ///< [out] payload for successful read
509     uint64_t *seq,        ///< [out] seq of successful read
510     ostream *ss,          ///< [out] error output
511     entry_header_t *h = 0 ///< [out] header
512     ) const; ///< @return result code
513
514   bool read_entry(
515     bufferlist &bl,
516     uint64_t &last_seq,
517     bool *corrupt
518     );
519
520   bool read_entry(
521     bufferlist &bl,
522     uint64_t &last_seq) override {
523     return read_entry(bl, last_seq, 0);
524   }
525
526   // Debug/Testing
527   void get_header(
528     uint64_t wanted_seq,
529     off64_t *_pos,
530     entry_header_t *h);
531   void corrupt(
532     int wfd,
533     off64_t corrupt_at);
534   void corrupt_payload(
535     int wfd,
536     uint64_t seq);
537   void corrupt_footer_magic(
538     int wfd,
539     uint64_t seq);
540   void corrupt_header_magic(
541     int wfd,
542     uint64_t seq);
543 };
544
545 WRITE_CLASS_ENCODER(FileJournal::header_t)
546
547 #endif