1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifndef CEPH_FILEJOURNAL_H
17 #define CEPH_FILEJOURNAL_H
23 #include "common/Cond.h"
24 #include "common/Mutex.h"
25 #include "common/Thread.h"
26 #include "common/Throttle.h"
27 #include "JournalThrottle.h"
28 #include "common/zipkin_trace.h"
34 // re-include our assert to clobber the system one; fix dout:
35 #include "include/assert.h"
38 * Implements journaling on top of block device or file.
40 * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
44 public md_config_obs_t {
46 /// Protected by finisher_lock
47 struct completion_item {
51 TrackedOpRef tracked_op;
52 completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
53 : seq(o), finish(c), start(s), tracked_op(opref) {}
54 completion_item() : seq(0), finish(0), start(0) {}
60 TrackedOpRef tracked_op;
62 write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
63 seq(s), orig_len(ol), tracked_op(opref) {
64 bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
66 write_item() : seq(0), orig_len(0) {}
71 uint64_t journaled_seq;
72 bool plug_journal_completions;
76 list<write_item> writeq;
78 write_item &peek_write();
80 void batch_pop_write(list<write_item> &items);
81 void batch_unpop_write(list<write_item> &items);
83 Mutex completions_lock;
84 list<completion_item> completions;
85 bool completions_empty() {
86 Mutex::Locker l(completions_lock);
87 return completions.empty();
89 void batch_pop_completions(list<completion_item> &items) {
90 Mutex::Locker l(completions_lock);
91 completions.swap(items);
93 void batch_unpop_completions(list<completion_item> &items) {
94 Mutex::Locker l(completions_lock);
95 completions.splice(completions.begin(), items);
97 completion_item completion_peek_front() {
98 Mutex::Locker l(completions_lock);
99 assert(!completions.empty());
100 return completions.front();
102 void completion_pop_front() {
103 Mutex::Locker l(completions_lock);
104 assert(!completions.empty());
105 completions.pop_front();
108 int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override;
110 void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
112 TrackedOpRef osd_op = TrackedOpRef()) override;
113 /// End protected by finisher_lock
121 // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
128 int64_t max_size; // max size of journal ring buffer
129 int64_t start; // offset of first entry
130 uint64_t committed_up_to; // committed up to
135 * entry at header.start has sequence >= start_seq
137 * Generally, the entry at header.start will have sequence
138 * start_seq if it exists. The only exception is immediately
139 * after journal creation since the first sequence number is
142 * If the first read on open fails, we can assume corruption
143 * if start_seq > committed_up_to because the entry would have
144 * a sequence >= start_seq and therefore > committed_up_to.
149 flags(0), block_size(0), alignment(0), max_size(0), start(0),
150 committed_up_to(0), start_seq(0) {}
156 uint64_t get_fsid64() const {
157 return *(uint64_t*)fsid.bytes();
160 void encode(bufferlist& bl) const {
167 ::encode(block_size, em);
168 ::encode(alignment, em);
169 ::encode(max_size, em);
171 ::encode(committed_up_to, em);
172 ::encode(start_seq, em);
176 void decode(bufferlist::iterator& bl) {
179 if (v < 2) { // normally 0, but concievably 1
180 // decode old header_t struct (pre v0.40).
181 bl.advance(4); // skip __u32 flags (it was unused by any old code)
185 *(uint64_t*)&fsid.bytes()[0] = tfsid;
186 *(uint64_t*)&fsid.bytes()[8] = tfsid;
187 ::decode(block_size, bl);
188 ::decode(alignment, bl);
189 ::decode(max_size, bl);
197 bufferlist::iterator t = em.begin();
200 ::decode(block_size, t);
201 ::decode(alignment, t);
202 ::decode(max_size, t);
206 ::decode(committed_up_to, t);
211 ::decode(start_seq, t);
217 struct entry_header_t {
218 uint64_t seq; // fs op seq #
219 uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer.
221 uint32_t pre_pad, post_pad;
225 static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
226 return (fsid ^ seq ^ len);
228 bool check_magic(off64_t pos, uint64_t fsid) {
230 magic1 == (uint64_t)pos &&
231 magic2 == (fsid ^ seq ^ len);
233 } __attribute__((__packed__, aligned(4)));
235 bool journalq_empty() { return journalq.empty(); }
243 bool directio, aio, force_aio;
244 bool must_write_header;
245 off64_t write_pos; // byte where the next entry to be written will go
247 bool discard; //for block journal whether support discard
250 /// state associated with an in-flight aio request
251 /// Protected by aio_lock
257 uint64_t off, len; ///< these are for debug only
258 uint64_t seq; ///< seq number to complete on aio completion, if non-zero
260 aio_info(bufferlist& b, uint64_t o, uint64_t s)
261 : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
270 Cond write_finish_cond;
271 io_context_t aio_ctx;
272 list<aio_info> aio_queue;
273 int aio_num, aio_bytes;
274 uint64_t aio_write_queue_ops;
275 uint64_t aio_write_queue_bytes;
276 /// End protected by aio_lock
279 uint64_t last_committed_seq;
280 uint64_t journaled_since_start;
283 * full states cycle at the beginnging of each commit epoch, when commit_start()
285 * FULL - we just filled up during this epoch.
286 * WAIT - we filled up last epoch; now we have to wait until everything during
287 * that epoch commits to the fs before we can start writing over it.
288 * NOTFULL - all good, journal away.
299 deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
300 uint64_t writing_seq;
304 int set_throttle_params();
305 const char** get_tracked_conf_keys() const override;
306 void handle_conf_change(
307 const struct md_config_t *conf,
308 const std::set <std::string> &changed) override {
309 for (const char **i = get_tracked_conf_keys();
312 if (changed.count(string(*i))) {
313 set_throttle_params();
319 void complete_write(uint64_t ops, uint64_t bytes);
320 JournalThrottle throttle;
329 int _open(bool wr, bool create=false);
330 int _open_block_device();
331 void _close(int fd) const;
332 int _open_file(int64_t oldsize, blksize_t blksize, bool create);
333 int _dump(ostream& out, bool simple);
334 void print_header(const header_t &hdr) const;
335 int read_header(header_t *hdr) const;
336 bufferptr prepare_header();
339 void write_thread_entry();
341 void queue_completions_thru(uint64_t seq);
343 int check_for_full(uint64_t seq, off64_t pos, off64_t size);
344 int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
345 int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
346 uint64_t& orig_ops, uint64_t& orig_bytes);
347 void do_write(bufferlist& bl);
349 void write_finish_thread_entry();
350 void check_aio_completion();
351 void do_aio_write(bufferlist& bl);
352 int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
355 void check_align(off64_t pos, bufferlist& bl);
356 int write_bl(off64_t& pos, bufferlist& bl);
358 /// read len from journal starting at in_pos and wrapping up to len
360 off64_t in_pos, ///< [in] start position
361 int64_t len, ///< [in] length to read
362 bufferlist* bl, ///< [out] result
363 off64_t *out_pos ///< [out] next position to read, will be wrapped
366 void do_discard(int64_t offset, int64_t end);
368 class Writer : public Thread {
369 FileJournal *journal;
371 explicit Writer(FileJournal *fj) : journal(fj) {}
372 void *entry() override {
373 journal->write_thread_entry();
378 class WriteFinisher : public Thread {
379 FileJournal *journal;
381 explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
382 void *entry() override {
383 journal->write_finish_thread_entry();
386 } write_finish_thread;
388 off64_t get_top() const {
389 return ROUND_UP_TO(sizeof(header), block_size);
392 ZTracer::Endpoint trace_endpoint;
395 FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
396 const char *f, bool dio=false, bool ai=true, bool faio=false) :
397 Journal(cct, fsid, fin, sync_cond),
398 finisher_lock("FileJournal::finisher_lock", false, true, false, cct),
400 plug_journal_completions(false),
401 writeq_lock("FileJournal::writeq_lock", false, true, false, cct),
403 "FileJournal::completions_lock", false, true, false, cct),
406 max_size(0), block_size(0),
407 directio(dio), aio(ai), force_aio(faio),
408 must_write_header(false),
409 write_pos(0), read_pos(0),
412 aio_lock("FileJournal::aio_lock"),
414 aio_num(0), aio_bytes(0),
415 aio_write_queue_ops(0),
416 aio_write_queue_bytes(0),
418 last_committed_seq(0),
419 journaled_since_start(0),
420 full_state(FULL_NOTFULL),
423 throttle(cct->_conf->filestore_caller_concurrency),
424 write_lock("FileJournal::write_lock", false, true, false, cct),
428 write_finish_thread(this),
429 trace_endpoint("0.0.0.0", 0, "FileJournal") {
431 if (aio && !directio) {
432 lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
437 lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
442 cct->_conf->add_observer(this);
444 ~FileJournal() override {
447 cct->_conf->remove_observer(this);
450 int check() override;
451 int create() override;
452 int open(uint64_t fs_op_seq) override;
453 void close() override;
454 int peek_fsid(uuid_d& fsid);
456 int dump(ostream& out) override;
457 int simple_dump(ostream& out);
458 int _fdump(Formatter &f, bool simple);
460 void flush() override;
462 void reserve_throttle_and_backoff(uint64_t count) override;
464 bool is_writeable() override {
465 return read_pos == 0;
467 int make_writeable() override;
470 void commit_start(uint64_t seq) override;
471 void committed_thru(uint64_t seq) override;
472 bool should_commit_now() override {
473 return full_state != FULL_NOTFULL && !write_stop;
476 void write_header_sync();
478 void set_wait_on_full(bool b) { wait_on_full = b; }
480 off64_t get_journal_size_estimate();
484 /// Result code for read_entry
485 enum read_entry_result {
494 * Reads next entry starting at pos. If the entry appears
495 * clean, *bl will contain the payload, *seq will contain
496 * the sequence number, and *out_pos will reflect the next
497 * read position. If the entry is invalid *ss will contain
498 * debug text, while *seq, *out_pos, and *bl will be unchanged.
500 * If the entry suggests a corrupt log, *ss will contain debug
501 * text, *out_pos will contain the next index to check. If
502 * we find an entry in this way that returns SUCCESS, the journal
503 * is most likely corrupt.
505 read_entry_result do_read_entry(
506 off64_t pos, ///< [in] position to read
507 off64_t *next_pos, ///< [out] next position to read
508 bufferlist* bl, ///< [out] payload for successful read
509 uint64_t *seq, ///< [out] seq of successful read
510 ostream *ss, ///< [out] error output
511 entry_header_t *h = 0 ///< [out] header
512 ) const; ///< @return result code
522 uint64_t &last_seq) override {
523 return read_entry(bl, last_seq, 0);
534 void corrupt_payload(
537 void corrupt_footer_magic(
540 void corrupt_header_magic(
545 WRITE_CLASS_ENCODER(FileJournal::header_t)