1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "common/debug.h"
17 #include "common/errno.h"
18 #include "common/safe_io.h"
19 #include "FileJournal.h"
20 #include "include/color.h"
21 #include "common/perf_counters.h"
22 #include "FileStore.h"
24 #include "include/compat.h"
31 #include <sys/types.h>
33 #include <sys/mount.h>
35 #include "common/blkdev.h"
36 #if defined(__linux__)
37 #include "common/linux_version.h"
40 #if defined(__FreeBSD__)
41 #define O_DSYNC O_SYNC
44 #define dout_context cct
45 #define dout_subsys ceph_subsys_journal
47 #define dout_prefix *_dout << "journal "
49 const static int64_t ONE_MEG(1 << 20);
50 const static int CEPH_DIRECTIO_ALIGNMENT(4096);
53 int FileJournal::_open(bool forwrite, bool create)
60 flags |= O_DIRECT | O_DSYNC;
68 if (TEMP_FAILURE_RETRY(::close(fd))) {
70 derr << "FileJournal::_open: error closing old fd: "
71 << cpp_strerror(err) << dendl;
74 fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
77 dout(2) << "FileJournal::_open unable to open journal "
78 << fn << ": " << cpp_strerror(err) << dendl;
83 ret = ::fstat(fd, &st);
86 derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
91 if (S_ISBLK(st.st_mode)) {
92 ret = _open_block_device();
93 } else if (S_ISREG(st.st_mode)) {
94 if (aio && !force_aio) {
95 derr << "FileJournal::_open: disabling aio for non-block journal. Use "
96 << "journal_force_aio to force use of aio anyway" << dendl;
99 ret = _open_file(st.st_size, st.st_blksize, create);
101 derr << "FileJournal::_open: wrong journal file type: " << st.st_mode
112 ret = io_setup(128, &aio_ctx);
115 // Contrary to naive expectations -EAGIAN means ...
117 derr << "FileJournal::_open: user's limit of aio events exceeded. "
118 << "Try increasing /proc/sys/fs/aio-max-nr" << dendl;
121 derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl;
129 /* We really want max_size to be a multiple of block_size. */
130 max_size -= max_size % block_size;
132 dout(1) << "_open " << fn << " fd " << fd
134 << " bytes, block size " << block_size
135 << " bytes, directio = " << directio
141 VOID_TEMP_FAILURE_RETRY(::close(fd));
146 int FileJournal::_open_block_device()
149 int ret = get_block_device_size(fd, &bdev_sz);
151 dout(0) << __func__ << ": failed to read block device size." << dendl;
155 /* Check for bdev_sz too small */
156 if (bdev_sz < ONE_MEG) {
157 dout(0) << __func__ << ": your block device must be at least "
158 << ONE_MEG << " bytes to be used for a Ceph journal." << dendl;
162 dout(10) << __func__ << ": ignoring osd journal size. "
163 << "We'll use the entire block device (size: " << bdev_sz << ")"
167 block_size = cct->_conf->journal_block_size;
169 if (cct->_conf->journal_discard) {
170 discard = block_device_support_discard(fn.c_str());
171 dout(10) << fn << " support discard: " << (int)discard << dendl;
177 int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
181 int64_t conf_journal_sz(cct->_conf->osd_journal_size);
182 conf_journal_sz <<= 20;
184 if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) {
185 derr << "I'm sorry, I don't know how large of a journal to create."
186 << "Please specify a block device to use as the journal OR "
187 << "set osd_journal_size in your ceph.conf" << dendl;
191 if (create && (oldsize < conf_journal_sz)) {
192 uint64_t newsize(conf_journal_sz);
193 dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl;
194 ret = ::ftruncate(fd, newsize);
197 derr << "FileJournal::_open_file : unable to extend journal to "
198 << newsize << " bytes: " << cpp_strerror(err) << dendl;
201 #ifdef HAVE_POSIX_FALLOCATE
202 ret = ::posix_fallocate(fd, 0, newsize);
204 derr << "FileJournal::_open_file : unable to preallocation journal to "
205 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
209 #elif defined(__APPLE__)
211 store.fst_flags = F_ALLOCATECONTIG;
212 store.fst_posmode = F_PEOFPOSMODE;
213 store.fst_offset = 0;
214 store.fst_length = newsize;
216 ret = ::fcntl(fd, F_PREALLOCATE, &store);
219 derr << "FileJournal::_open_file : unable to preallocation journal to "
220 << newsize << " bytes: " << cpp_strerror(ret) << dendl;
225 # error "Journal pre-allocation not supported on platform."
231 block_size = cct->_conf->journal_block_size;
233 if (create && cct->_conf->journal_zero_on_create) {
234 derr << "FileJournal::_open_file : zeroing journal" << dendl;
235 uint64_t write_size = 1 << 20;
237 ret = ::posix_memalign((void **)&buf, block_size, write_size);
241 memset(static_cast<void*>(buf), 0, write_size);
243 for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
244 ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
250 if (i < (uint64_t)max_size) {
251 ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
261 dout(10) << "_open journal is not a block device, NOT checking disk "
262 << "write cache on '" << fn << "'" << dendl;
267 // This can not be used on an active journal
268 int FileJournal::check()
273 ret = _open(false, false);
277 ret = read_header(&header);
281 if (header.fsid != fsid) {
282 derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
283 << ", invalid (someone else's?) journal" << dendl;
288 dout(1) << "check: header looks ok" << dendl;
297 int FileJournal::create()
300 int64_t needed_space;
303 dout(2) << "create " << fn << " fsid " << fsid << dendl;
305 ret = _open(true, true);
309 // write empty header
311 header.flags = header_t::FLAG_CRC; // enable crcs on any new journal.
313 header.max_size = max_size;
314 header.block_size = block_size;
315 if (cct->_conf->journal_block_align || directio)
316 header.alignment = block_size;
318 header.alignment = 16; // at least stay word aligned on 64bit machines...
320 header.start = get_top();
321 header.start_seq = 0;
323 print_header(header);
325 // static zeroed buffer for alignment padding
327 zero_buf = new char[header.alignment];
328 memset(zero_buf, 0, header.alignment);
330 bp = prepare_header();
331 if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
333 derr << "FileJournal::create : create write header error "
334 << cpp_strerror(ret) << dendl;
338 // zero first little bit, too.
339 ret = posix_memalign(&buf, block_size, block_size);
342 derr << "FileJournal::create: failed to allocate " << block_size
343 << " bytes of memory: " << cpp_strerror(ret) << dendl;
346 memset(buf, 0, block_size);
347 if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
349 derr << "FileJournal::create: error zeroing first " << block_size
350 << " bytes " << cpp_strerror(ret) << dendl;
354 needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
355 needed_space += (2 * sizeof(entry_header_t)) + get_top();
356 if (header.max_size - header.start < needed_space) {
357 derr << "FileJournal::create: OSD journal is not large enough to hold "
358 << "osd_max_write_size bytes!" << dendl;
363 dout(2) << "create done" << dendl;
370 if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
372 derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
380 // This can not be used on an active journal
381 int FileJournal::peek_fsid(uuid_d& fsid)
384 int r = _open(false, false);
387 r = read_header(&header);
396 int FileJournal::open(uint64_t fs_op_seq)
398 dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
400 uint64_t next_seq = fs_op_seq + 1;
403 int err = _open(false);
407 // assume writeable, unless...
409 write_pos = get_top();
412 err = read_header(&header);
416 // static zeroed buffer for alignment padding
418 zero_buf = new char[header.alignment];
419 memset(zero_buf, 0, header.alignment);
421 dout(10) << "open header.fsid = " << header.fsid
422 //<< " vs expected fsid = " << fsid
424 if (header.fsid != fsid) {
425 derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
426 << ", invalid (someone else's?) journal" << dendl;
430 if (header.max_size > max_size) {
431 dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
435 if (header.block_size != block_size) {
436 dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
440 if (header.max_size % header.block_size) {
441 dout(2) << "open journal max size " << header.max_size
442 << " not a multiple of block size " << header.block_size << dendl;
446 if (header.alignment != block_size && directio) {
447 dout(0) << "open journal alignment " << header.alignment << " does not match block size "
448 << block_size << " (required for direct_io journal mode)" << dendl;
452 if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
453 dout(0) << "open journal alignment " << header.alignment
454 << " is not multiple of minimum directio alignment "
455 << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
461 // looks like a valid header.
462 write_pos = 0; // not writeable yet
464 journaled_seq = header.committed_up_to;
467 read_pos = header.start;
468 seq = header.start_seq;
472 off64_t old_pos = read_pos;
473 if (!read_entry(bl, seq)) {
474 dout(10) << "open reached end of journal." << dendl;
477 if (seq > next_seq) {
478 dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq
479 << ", ignoring journal contents"
482 last_committed_seq = 0;
485 if (seq == next_seq) {
486 dout(10) << "open reached seq " << seq << dendl;
490 seq++; // next event should follow.
499 void FileJournal::_close(int fd) const
501 VOID_TEMP_FAILURE_RETRY(::close(fd));
504 void FileJournal::close()
506 dout(1) << "close " << fn << dendl;
508 // stop writer thread
512 assert(writeq_empty());
513 assert(!must_write_header);
520 int FileJournal::dump(ostream& out)
522 return _dump(out, false);
525 int FileJournal::simple_dump(ostream& out)
527 return _dump(out, true);
530 int FileJournal::_dump(ostream& out, bool simple)
532 JSONFormatter f(true);
533 int ret = _fdump(f, simple);
538 int FileJournal::_fdump(Formatter &f, bool simple)
540 dout(10) << "_fdump" << dendl;
543 int err = _open(false, false);
547 err = read_header(&header);
553 off64_t next_pos = header.start;
555 f.open_object_section("journal");
557 f.open_object_section("header");
558 f.dump_unsigned("flags", header.flags);
561 f.dump_string("fsid", os.str());
562 f.dump_unsigned("block_size", header.block_size);
563 f.dump_unsigned("alignment", header.alignment);
564 f.dump_int("max_size", header.max_size);
565 f.dump_int("start", header.start);
566 f.dump_unsigned("committed_up_to", header.committed_up_to);
567 f.dump_unsigned("start_seq", header.start_seq);
570 f.open_array_section("entries");
571 uint64_t seq = header.start_seq;
574 off64_t pos = next_pos;
577 dout(2) << "_dump -- not readable" << dendl;
582 read_entry_result result = do_read_entry(
588 if (result != SUCCESS) {
589 if (seq < header.committed_up_to) {
590 dout(2) << "Unable to read past sequence " << seq
591 << " but header indicates the journal has committed up through "
592 << header.committed_up_to << ", journal is corrupt" << dendl;
595 dout(25) << ss.str() << dendl;
596 dout(25) << "No further valid entries found, journal is most likely valid"
601 f.open_object_section("entry");
602 f.dump_unsigned("offset", pos);
603 f.dump_unsigned("seq", seq);
605 f.dump_unsigned("bl.length", bl.length());
607 f.open_array_section("transactions");
608 bufferlist::iterator p = bl.begin();
611 ObjectStore::Transaction t(p);
612 f.open_object_section("transaction");
613 f.dump_unsigned("trans_num", trans_num);
625 dout(10) << "dump finish" << dendl;
632 void FileJournal::start_writer()
636 write_thread.create("journal_write");
639 write_finish_thread.create("journal_wrt_fin");
643 void FileJournal::stop_writer()
645 // Do nothing if writer already stopped or never started
649 Mutex::Locker l(write_lock);
650 Mutex::Locker p(writeq_lock);
652 writeq_cond.Signal();
653 // Doesn't hurt to signal commit_cond in case thread is waiting there
654 // and caller didn't use committed_thru() first.
655 commit_cond.Signal();
659 // write journal header now so that we have less to replay on remount
664 // stop aio completeion thread *after* writer thread has stopped
665 // and has submitted all of its io
666 if (aio && !aio_stop) {
670 write_finish_cond.Signal();
672 write_finish_thread.join();
679 void FileJournal::print_header(const header_t &header) const
681 dout(10) << "header: block_size " << header.block_size
682 << " alignment " << header.alignment
683 << " max_size " << header.max_size
685 dout(10) << "header: start " << header.start << dendl;
686 dout(10) << " write_pos " << write_pos << dendl;
689 int FileJournal::read_header(header_t *hdr) const
691 dout(10) << "read_header" << dendl;
694 buffer::ptr bp = buffer::create_page_aligned(block_size);
695 char* bpdata = bp.c_str();
696 int r = ::pread(fd, bpdata, bp.length(), 0);
700 dout(0) << "read_header got " << cpp_strerror(err) << dendl;
704 // don't use bp.zero() here, because it also invalidates
705 // crc cache (which is not yet populated anyway)
706 if (bp.length() != (size_t)r) {
707 // r will be always less or equal than bp.length
709 memset(bpdata, 0, bp.length() - r);
712 bl.push_back(std::move(bp));
715 bufferlist::iterator p = bl.begin();
718 catch (buffer::error& e) {
719 derr << "read_header error decoding journal header" << dendl;
725 * Unfortunately we weren't initializing the flags field for new
726 * journals! Aie. This is safe(ish) now that we have only one
727 * flag. Probably around when we add the next flag we need to
728 * remove this or else this (eventually old) code will clobber newer
731 if (hdr->flags > 3) {
732 derr << "read_header appears to have gibberish flags; assuming 0" << dendl;
741 bufferptr FileJournal::prepare_header()
745 Mutex::Locker l(finisher_lock);
746 header.committed_up_to = journaled_seq;
748 ::encode(header, bl);
749 bufferptr bp = buffer::create_page_aligned(get_top());
750 // don't use bp.zero() here, because it also invalidates
751 // crc cache (which is not yet populated anyway)
752 char* data = bp.c_str();
753 memcpy(data, bl.c_str(), bl.length());
755 memset(data, 0, bp.length()-bl.length());
759 void FileJournal::write_header_sync()
761 Mutex::Locker locker(write_lock);
762 must_write_header = true;
765 dout(20) << __func__ << " finish" << dendl;
768 int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
771 if (full_state != FULL_NOTFULL)
774 // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL.
776 if (pos >= header.start)
777 room = (header.max_size - pos) + (header.start - get_top()) - 1;
779 room = header.start - pos - 1;
780 dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start
781 << " top " << get_top() << dendl;
784 if (room >= (header.max_size >> 1) &&
785 room - size < (header.max_size >> 1)) {
786 dout(10) << " passing half full mark, triggering commit" << dendl;
787 do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
792 dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl;
793 if (pos + size > header.max_size)
794 must_write_header = true;
799 dout(1) << "check_for_full at " << pos << " : JOURNAL FULL "
800 << pos << " >= " << room
801 << " (max_size " << header.max_size << " start " << header.start << ")"
804 off64_t max = header.max_size - get_top();
806 dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl;
811 int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes)
813 // gather queued writes
814 off64_t queue_pos = write_pos;
816 int eleft = cct->_conf->journal_max_write_entries;
817 unsigned bmax = cct->_conf->journal_max_write_bytes;
819 if (full_state != FULL_NOTFULL)
822 while (!writeq_empty()) {
823 list<write_item> items;
824 batch_pop_write(items);
825 list<write_item>::iterator it = items.begin();
826 while (it != items.end()) {
827 uint64_t bytes = it->bl.length();
828 int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
829 if (r == 0) { // prepare ok, delete it
833 Mutex::Locker locker(aio_lock);
834 assert(aio_write_queue_ops > 0);
835 aio_write_queue_ops--;
836 assert(aio_write_queue_bytes >= bytes);
837 aio_write_queue_bytes -= bytes;
844 // the journal maybe full, insert the left item to writeq
845 batch_unpop_write(items);
847 goto out; // commit what we have
850 logger->inc(l_filestore_journal_full);
853 dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
855 dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
857 // throw out what we have so far
858 full_state = FULL_FULL;
859 while (!writeq_empty()) {
860 complete_write(1, peek_write().orig_len);
863 print_header(header);
866 return -ENOSPC; // hrm, full on first op
870 dout(20) << "prepare_multi_write hit max events per write "
871 << cct->_conf->journal_max_write_entries << dendl;
872 batch_unpop_write(items);
877 if (bl.length() >= bmax) {
878 dout(20) << "prepare_multi_write hit max write size "
879 << cct->_conf->journal_max_write_bytes << dendl;
880 batch_unpop_write(items);
888 dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
889 assert((write_pos + bl.length() == queue_pos) ||
890 (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
895 void FileJournal::queue_write_fin(uint64_t seq, Context *fin)
897 writing_seq.push_back(seq);
898 if (!waiting_for_notfull.empty()) {
899 // make sure previously unjournaled stuff waiting for UNFULL triggers
900 // _before_ newly journaled stuff does
901 dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin
902 << " until after UNFULL" << dendl;
903 C_Gather *g = new C_Gather(writeq.front().fin);
904 writing_fin.push_back(g->new_sub());
905 waiting_for_notfull.push_back(g->new_sub());
907 writing_fin.push_back(writeq.front().fin);
908 dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl;
913 void FileJournal::queue_completions_thru(uint64_t seq)
915 assert(finisher_lock.is_locked());
916 utime_t now = ceph_clock_now();
917 list<completion_item> items;
918 batch_pop_completions(items);
919 list<completion_item>::iterator it = items.begin();
920 while (it != items.end()) {
921 completion_item& next = *it;
926 dout(10) << "queue_completions_thru seq " << seq
927 << " queueing seq " << next.seq
928 << " " << next.finish
929 << " lat " << lat << dendl;
931 logger->tinc(l_filestore_journal_latency, lat);
934 finisher->queue(next.finish);
935 if (next.tracked_op) {
936 next.tracked_op->mark_event("journaled_completion_queued");
937 next.tracked_op->journal_trace.event("queued completion");
938 next.tracked_op->journal_trace.keyval("completed through", seq);
942 batch_unpop_completions(items);
943 finisher_cond.Signal();
947 int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
949 uint64_t seq = next_write.seq;
950 bufferlist &ebl = next_write.bl;
951 off64_t size = ebl.length();
953 int r = check_for_full(seq, queue_pos, size);
955 return r; // ENOSPC or EAGAIN
957 uint32_t orig_len = next_write.orig_len;
958 orig_bytes += orig_len;
961 // add to write buffer
962 dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
963 << " len " << orig_len << " -> " << size << dendl;
965 unsigned seq_offset = offsetof(entry_header_t, seq);
966 unsigned magic1_offset = offsetof(entry_header_t, magic1);
967 unsigned magic2_offset = offsetof(entry_header_t, magic2);
969 bufferptr headerptr = ebl.buffers().front();
971 uint64_t _queue_pos = queue_pos;
972 uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
973 headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
974 headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
975 headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
977 bufferptr footerptr = ebl.buffers().back();
978 unsigned post_offset = footerptr.length() - sizeof(entry_header_t);
979 footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
980 footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
981 footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
983 bl.claim_append(ebl);
984 if (next_write.tracked_op) {
985 next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
986 next_write.tracked_op->journal_trace.event("prepare_single_write");
989 journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
993 if (queue_pos >= header.max_size)
994 queue_pos = queue_pos + get_top() - header.max_size;
999 void FileJournal::check_align(off64_t pos, bufferlist& bl)
1001 // make sure list segments are page aligned
1002 if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
1003 assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1004 assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
1005 assert(0 == "bl was not aligned");
1009 int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
1013 off64_t spos = ::lseek64(fd, pos, SEEK_SET);
1016 derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl;
1019 ret = bl.write_fd(fd);
1021 derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl;
1025 if (pos == header.max_size)
1030 void FileJournal::do_write(bufferlist& bl)
1033 if (bl.length() == 0 && !must_write_header)
1037 if (cct->_conf->journal_write_header_frequency &&
1038 (((++journaled_since_start) %
1039 cct->_conf->journal_write_header_frequency) == 0)) {
1040 must_write_header = true;
1043 if (must_write_header) {
1044 must_write_header = false;
1045 hbp = prepare_header();
1048 dout(15) << "do_write writing " << write_pos << "~" << bl.length()
1049 << (hbp.length() ? " + header":"")
1052 utime_t from = ceph_clock_now();
1055 off64_t pos = write_pos;
1058 write_pos += bl.length();
1059 if (write_pos >= header.max_size)
1060 write_pos = write_pos - header.max_size + get_top();
1062 write_lock.Unlock();
1066 if (pos + bl.length() > header.max_size) {
1067 bufferlist first, second;
1068 split = header.max_size - pos;
1069 first.substr_of(bl, 0, split);
1070 second.substr_of(bl, split, bl.length() - split);
1071 assert(first.length() + second.length() == bl.length());
1072 dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
1073 << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
1075 //Save pos to write first piece second
1076 off64_t first_pos = pos;
1081 // be sneaky: include the header in the second fragment
1082 second.push_front(hbp);
1083 pos = 0; // we included the header
1085 // Write the second portion first possible with the header, so
1086 // do_read_entry() won't even get a valid entry_header_t if there
1087 // is a crash between the two writes.
1089 if (write_bl(pos, second)) {
1090 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1091 << ") failed" << dendl;
1092 check_align(pos, second);
1095 orig_pos = first_pos;
1096 if (write_bl(first_pos, first)) {
1097 derr << "FileJournal::do_write: write_bl(pos=" << orig_pos
1098 << ") failed" << dendl;
1099 check_align(first_pos, first);
1102 assert(first_pos == get_top());
1106 if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
1108 derr << "FileJournal::do_write: pwrite(fd=" << fd
1109 << ", hbp.length=" << hbp.length() << ") failed :"
1110 << cpp_strerror(err) << dendl;
1115 if (write_bl(pos, bl)) {
1116 derr << "FileJournal::do_write: write_bl(pos=" << pos
1117 << ") failed" << dendl;
1118 check_align(pos, bl);
1124 dout(20) << "do_write fsync" << dendl;
1127 * We'd really love to have a fsync_range or fdatasync_range and do a:
1130 * ::fsync_range(fd, header.max_size - split, split)l
1131 * ::fsync_range(fd, get_top(), bl.length() - split);
1133 * ::fsync_range(fd, write_pos, bl.length())
1135 * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be
1136 * too hard given all the underlying infrastructure already exist.
1138 * NOTE: using sync_file_range here would not be safe as it does not
1139 * flush disk caches or commits any sort of metadata.
1142 #if defined(DARWIN) || defined(__FreeBSD__)
1145 ret = ::fdatasync(fd);
1148 derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl;
1151 #ifdef HAVE_POSIX_FADVISE
1152 if (cct->_conf->filestore_fadvise)
1153 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
1157 utime_t lat = ceph_clock_now() - from;
1158 dout(20) << "do_write latency " << lat << dendl;
1162 assert(write_pos == pos);
1163 assert(write_pos % header.alignment == 0);
1166 Mutex::Locker locker(finisher_lock);
1167 journaled_seq = writing_seq;
1170 // only if we haven't filled up recently!
1171 if (full_state != FULL_NOTFULL) {
1172 dout(10) << "do_write NOT queueing finisher seq " << journaled_seq
1173 << ", full_commit_seq|full_restart_seq" << dendl;
1175 if (plug_journal_completions) {
1176 dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq
1177 << " due to completion plug" << dendl;
1179 dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl;
1180 queue_completions_thru(journaled_seq);
1186 void FileJournal::flush()
1188 dout(10) << "waiting for completions to empty" << dendl;
1190 Mutex::Locker l(finisher_lock);
1191 while (!completions_empty())
1192 finisher_cond.Wait(finisher_lock);
1194 dout(10) << "flush waiting for finisher" << dendl;
1195 finisher->wait_for_empty();
1196 dout(10) << "flush done" << dendl;
1200 void FileJournal::write_thread_entry()
1202 dout(10) << "write_thread_entry start" << dendl;
1205 Mutex::Locker locker(writeq_lock);
1206 if (writeq.empty() && !must_write_header) {
1209 dout(20) << "write_thread_entry going to sleep" << dendl;
1210 writeq_cond.Wait(writeq_lock);
1211 dout(20) << "write_thread_entry woke up" << dendl;
1218 Mutex::Locker locker(aio_lock);
1219 // should we back off to limit aios in flight? try to do this
1220 // adaptively so that we submit larger aios once we have lots of
1223 // NOTE: our condition here is based on aio_num (protected by
1224 // aio_lock) and throttle_bytes (part of the write queue). when
1225 // we sleep, we *only* wait for aio_num to change, and do not
1226 // wake when more data is queued. this is not strictly correct,
1227 // but should be fine given that we will have plenty of aios in
1228 // flight if we hit this limit to ensure we keep the device
1230 while (aio_num > 0) {
1231 int exp = MIN(aio_num * 2, 24);
1232 long unsigned min_new = 1ull << exp;
1233 uint64_t cur = aio_write_queue_bytes;
1234 dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
1235 << " ... exp " << exp << " min_new " << min_new
1236 << " ... pending " << cur << dendl;
1239 dout(20) << "write_thread_entry deferring until more aios complete: "
1240 << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
1241 << " bytes to start a new aio (currently " << cur << " pending)" << dendl;
1242 aio_cond.Wait(aio_lock);
1243 dout(20) << "write_thread_entry woke up" << dendl;
1248 Mutex::Locker locker(write_lock);
1249 uint64_t orig_ops = 0;
1250 uint64_t orig_bytes = 0;
1253 int r = prepare_multi_write(bl, orig_ops, orig_bytes);
1254 // Don't care about journal full if stoppping, so drop queue and
1255 // possibly let header get written and loop above to notice stop
1258 dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
1259 while (!writeq_empty()) {
1260 complete_write(1, peek_write().orig_len);
1263 print_header(header);
1266 dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
1267 commit_cond.Wait(write_lock);
1268 dout(20) << "write_thread_entry woke up" << dendl;
1275 logger->inc(l_filestore_journal_wr);
1276 logger->inc(l_filestore_journal_wr_bytes, bl.length());
1287 complete_write(orig_ops, orig_bytes);
1290 dout(10) << "write_thread_entry finish" << dendl;
1294 void FileJournal::do_aio_write(bufferlist& bl)
1297 if (cct->_conf->journal_write_header_frequency &&
1298 (((++journaled_since_start) %
1299 cct->_conf->journal_write_header_frequency) == 0)) {
1300 must_write_header = true;
1304 if (bl.length() == 0 && !must_write_header)
1308 if (must_write_header) {
1309 must_write_header = false;
1310 hbp = prepare_header();
1314 off64_t pos = write_pos;
1316 dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
1317 << (hbp.length() ? " + header":"")
1322 if (pos + bl.length() > header.max_size) {
1323 bufferlist first, second;
1324 split = header.max_size - pos;
1325 first.substr_of(bl, 0, split);
1326 second.substr_of(bl, split, bl.length() - split);
1327 assert(first.length() + second.length() == bl.length());
1328 dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
1330 if (write_aio_bl(pos, first, 0)) {
1331 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1332 << ") failed" << dendl;
1335 assert(pos == header.max_size);
1337 // be sneaky: include the header in the second fragment
1338 second.push_front(hbp);
1339 pos = 0; // we included the header
1341 pos = get_top(); // no header, start after that
1342 if (write_aio_bl(pos, second, writing_seq)) {
1343 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1344 << ") failed" << dendl;
1353 if (write_aio_bl(pos, hbl, 0)) {
1354 derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
1359 if (write_aio_bl(pos, bl, writing_seq)) {
1360 derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
1361 << ") failed" << dendl;
1367 if (write_pos == header.max_size)
1368 write_pos = get_top();
1369 assert(write_pos % header.alignment == 0);
1373 * write a buffer using aio
1375 * @param seq seq to trigger when this aio completes. if 0, do not update any state
1378 int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
1380 dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
1382 while (bl.length() > 0) {
1383 int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
1384 iovec *iov = new iovec[max];
1387 for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
1390 assert(p != bl.buffers().end());
1391 iov[n].iov_base = (void *)p->c_str();
1392 iov[n].iov_len = p->length();
1397 bl.splice(0, len, &tbl); // move bytes from bl -> tbl
1399 // lock only aio_queue, current aio, aio_num, aio_bytes, which may be
1400 // modified in check_aio_completion
1402 aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
1403 aio_info& aio = aio_queue.back();
1406 io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
1408 dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
1409 << " in " << n << dendl;
1412 aio_bytes += aio.len;
1414 // need to save current aio len to update write_pos later because current
1415 // aio could be ereased from aio_queue once it is done
1416 uint64_t cur_len = aio.len;
1417 // unlock aio_lock because following io_submit might take time to return
1420 iocb *piocb = &aio.iocb;
1422 // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
1426 int r = io_submit(aio_ctx, 1, &piocb);
1427 dout(20) << "write_aio_bl io_submit return value: " << r << dendl;
1429 derr << "io_submit to " << aio.off << "~" << cur_len
1430 << " got " << cpp_strerror(r) << dendl;
1431 if (r == -EAGAIN && attempts-- > 0) {
1436 check_align(pos, tbl);
1437 assert(0 == "io_submit got unexpected error");
1445 write_finish_cond.Signal();
1451 void FileJournal::write_finish_thread_entry()
1454 dout(10) << "write_finish_thread_entry enter" << dendl;
1457 Mutex::Locker locker(aio_lock);
1458 if (aio_queue.empty()) {
1461 dout(20) << "write_finish_thread_entry sleeping" << dendl;
1462 write_finish_cond.Wait(aio_lock);
1467 dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
1469 int r = io_getevents(aio_ctx, 1, 16, event, NULL);
1472 dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
1475 derr << "io_getevents got " << cpp_strerror(r) << dendl;
1476 assert(0 == "got unexpected error from io_getevents");
1480 Mutex::Locker locker(aio_lock);
1481 for (int i=0; i<r; i++) {
1482 aio_info *ai = (aio_info *)event[i].obj;
1483 if (event[i].res != ai->len) {
1484 derr << "aio to " << ai->off << "~" << ai->len
1485 << " returned: " << (int)event[i].res << dendl;
1486 assert(0 == "unexpected aio error");
1488 dout(10) << "write_finish_thread_entry aio " << ai->off
1489 << "~" << ai->len << " done" << dendl;
1492 check_aio_completion();
1495 dout(10) << "write_finish_thread_entry exit" << dendl;
1501 * check aio_wait for completed aio, and update state appropriately.
1503 void FileJournal::check_aio_completion()
1505 assert(aio_lock.is_locked());
1506 dout(20) << "check_aio_completion" << dendl;
1508 bool completed_something = false, signal = false;
1509 uint64_t new_journaled_seq = 0;
1511 list<aio_info>::iterator p = aio_queue.begin();
1512 while (p != aio_queue.end() && p->done) {
1513 dout(20) << "check_aio_completion completed seq " << p->seq << " "
1514 << p->off << "~" << p->len << dendl;
1516 new_journaled_seq = p->seq;
1517 completed_something = true;
1520 aio_bytes -= p->len;
1521 aio_queue.erase(p++);
1525 if (completed_something) {
1527 // only if we haven't filled up recently!
1528 Mutex::Locker locker(finisher_lock);
1529 journaled_seq = new_journaled_seq;
1530 if (full_state != FULL_NOTFULL) {
1531 dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
1532 << ", full_commit_seq|full_restart_seq" << dendl;
1534 if (plug_journal_completions) {
1535 dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
1536 << " due to completion plug" << dendl;
1538 dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
1539 queue_completions_thru(journaled_seq);
1544 // maybe write queue was waiting for aio count to drop?
1550 int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) {
1551 dout(10) << "prepare_entry " << tls << dendl;
1552 int data_len = cct->_conf->journal_align_min_size - 1;
1553 int data_align = -1; // -1 indicates that we don't care about the alignment
1555 for (vector<ObjectStore::Transaction>::iterator p = tls.begin();
1556 p != tls.end(); ++p) {
1557 if ((int)(*p).get_data_length() > data_len) {
1558 data_len = (*p).get_data_length();
1559 data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
1563 if (tbl->length()) {
1564 bl.claim_append(*tbl);
1566 // add it this entry
1568 unsigned head_size = sizeof(entry_header_t);
1569 off64_t base_size = 2*head_size + bl.length();
1570 memset(&h, 0, sizeof(h));
1571 if (data_align >= 0)
1572 h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
1573 off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
1574 unsigned post_pad = size - base_size - h.pre_pad;
1575 h.len = bl.length();
1576 h.post_pad = post_pad;
1577 h.crc32c = bl.crc32c(0);
1578 dout(10) << " len " << bl.length() << " -> " << size
1579 << " (head " << head_size << " pre_pad " << h.pre_pad
1580 << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
1581 << " (bl alignment " << data_align << ")"
1585 ebl.append((const char*)&h, sizeof(h));
1587 ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
1590 ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
1592 ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
1595 ebl.append((const char*)&h, sizeof(h));
1597 ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
1602 void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
1603 Context *oncommit, TrackedOpRef osd_op)
1606 dout(5) << "submit_entry seq " << seq
1607 << " len " << e.length()
1608 << " (" << oncommit << ")" << dendl;
1609 assert(e.length() > 0);
1610 assert(e.length() < header.max_size);
1613 osd_op->mark_event("commit_queued_for_journal_write");
1615 logger->inc(l_filestore_journal_queue_bytes, orig_len);
1616 logger->inc(l_filestore_journal_queue_ops, 1);
1619 throttle.register_throttle_seq(seq, e.length());
1621 logger->inc(l_filestore_journal_ops, 1);
1622 logger->inc(l_filestore_journal_bytes, e.length());
1626 osd_op->mark_event("commit_queued_for_journal_write");
1627 if (osd_op->store_trace) {
1628 osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
1629 osd_op->journal_trace.event("submit_entry");
1630 osd_op->journal_trace.keyval("seq", seq);
1634 Mutex::Locker l1(writeq_lock);
1636 Mutex::Locker l2(aio_lock);
1638 Mutex::Locker l3(completions_lock);
1641 aio_write_queue_ops++;
1642 aio_write_queue_bytes += e.length();
1646 completions.push_back(
1648 seq, oncommit, ceph_clock_now(), osd_op));
1650 writeq_cond.Signal();
1651 writeq.push_back(write_item(seq, e, orig_len, osd_op));
1653 osd_op->journal_trace.keyval("queue depth", writeq.size());
1657 bool FileJournal::writeq_empty()
1659 Mutex::Locker locker(writeq_lock);
1660 return writeq.empty();
1663 FileJournal::write_item &FileJournal::peek_write()
1665 assert(write_lock.is_locked());
1666 Mutex::Locker locker(writeq_lock);
1667 return writeq.front();
1670 void FileJournal::pop_write()
1672 assert(write_lock.is_locked());
1673 Mutex::Locker locker(writeq_lock);
1675 logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
1676 logger->dec(l_filestore_journal_queue_ops, 1);
1681 void FileJournal::batch_pop_write(list<write_item> &items)
1683 assert(write_lock.is_locked());
1685 Mutex::Locker locker(writeq_lock);
1688 for (auto &&i : items) {
1690 logger->dec(l_filestore_journal_queue_bytes, i.orig_len);
1691 logger->dec(l_filestore_journal_queue_ops, 1);
1696 void FileJournal::batch_unpop_write(list<write_item> &items)
1698 assert(write_lock.is_locked());
1699 for (auto &&i : items) {
1701 logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
1702 logger->inc(l_filestore_journal_queue_ops, 1);
1705 Mutex::Locker locker(writeq_lock);
1706 writeq.splice(writeq.begin(), items);
1709 void FileJournal::commit_start(uint64_t seq)
1711 dout(10) << "commit_start" << dendl;
1714 switch (full_state) {
1719 if (seq >= journaled_seq) {
1720 dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq "
1721 << seq << " > journaled_seq " << journaled_seq
1722 << ", moving to FULL_WAIT."
1724 full_state = FULL_WAIT;
1726 dout(1) << "FULL_FULL commit_start on seq "
1727 << seq << " < journaled_seq " << journaled_seq
1728 << ", remaining in FULL_FULL"
1734 dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl;
1735 full_state = FULL_NOTFULL;
1736 plug_journal_completions = true;
1742 *send discard command to joural block deivce
1744 void FileJournal::do_discard(int64_t offset, int64_t end)
1746 dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
1748 offset = ROUND_UP_TO(offset, block_size);
1751 end = ROUND_UP_TO(end - block_size, block_size);
1752 assert(end >= offset);
1754 if (block_device_discard(fd, offset, end - offset) < 0)
1755 dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
1758 void FileJournal::committed_thru(uint64_t seq)
1760 Mutex::Locker locker(write_lock);
1762 auto released = throttle.flush(seq);
1764 logger->dec(l_filestore_journal_ops, released.first);
1765 logger->dec(l_filestore_journal_bytes, released.second);
1768 if (seq < last_committed_seq) {
1769 dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
1770 assert(seq >= last_committed_seq);
1773 if (seq == last_committed_seq) {
1774 dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
1778 dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
1779 last_committed_seq = seq;
1783 Mutex::Locker locker(finisher_lock);
1784 queue_completions_thru(seq);
1785 if (plug_journal_completions && seq >= header.start_seq) {
1786 dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
1787 plug_journal_completions = false;
1788 queue_completions_thru(journaled_seq);
1792 // adjust start pointer
1793 while (!journalq.empty() && journalq.front().first <= seq) {
1794 journalq.pop_front();
1797 int64_t old_start = header.start;
1798 if (!journalq.empty()) {
1799 header.start = journalq.front().second;
1800 header.start_seq = journalq.front().first;
1802 header.start = write_pos;
1803 header.start_seq = seq + 1;
1807 dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl;
1808 if (old_start < header.start)
1809 do_discard(old_start, header.start - 1);
1811 do_discard(old_start, header.max_size - 1);
1812 do_discard(get_top(), header.start - 1);
1816 must_write_header = true;
1817 print_header(header);
1819 // committed but unjournaled items
1820 while (!writeq_empty() && peek_write().seq <= seq) {
1821 dout(15) << " dropping committed but unwritten seq " << peek_write().seq
1822 << " len " << peek_write().bl.length()
1824 complete_write(1, peek_write().orig_len);
1828 commit_cond.Signal();
1830 dout(10) << "committed_thru done" << dendl;
1834 void FileJournal::complete_write(uint64_t ops, uint64_t bytes)
1836 dout(5) << __func__ << " finished " << ops << " ops and "
1837 << bytes << " bytes" << dendl;
1840 int FileJournal::make_writeable()
1842 dout(10) << __func__ << dendl;
1843 int r = set_throttle_params();
1852 write_pos = read_pos;
1854 write_pos = get_top();
1857 must_write_header = true;
1863 int FileJournal::set_throttle_params()
1866 bool valid = throttle.set_params(
1867 cct->_conf->journal_throttle_low_threshhold,
1868 cct->_conf->journal_throttle_high_threshhold,
1869 cct->_conf->filestore_expected_throughput_bytes,
1870 cct->_conf->journal_throttle_high_multiple,
1871 cct->_conf->journal_throttle_max_multiple,
1872 header.max_size - get_top(),
1876 derr << "tried to set invalid params: "
1880 return valid ? 0 : -EINVAL;
1883 const char** FileJournal::get_tracked_conf_keys() const
1885 static const char *KEYS[] = {
1886 "journal_throttle_low_threshhold",
1887 "journal_throttle_high_threshhold",
1888 "journal_throttle_high_multiple",
1889 "journal_throttle_max_multiple",
1890 "filestore_expected_throughput_bytes",
1895 void FileJournal::wrap_read_bl(
1903 while (pos >= header.max_size)
1904 pos = pos + get_top() - header.max_size;
1907 if (pos + olen > header.max_size)
1908 len = header.max_size - pos; // partial
1912 int64_t actual = ::lseek64(fd, pos, SEEK_SET);
1913 assert(actual == pos);
1915 bufferptr bp = buffer::create(len);
1916 int r = safe_read_exact(fd, bp.c_str(), len);
1918 derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
1922 bl->push_back(std::move(bp));
1926 if (pos >= header.max_size)
1927 pos = pos + get_top() - header.max_size;
1932 bool FileJournal::read_entry(
1939 uint64_t seq = next_seq;
1942 dout(2) << "read_entry -- not readable" << dendl;
1946 off64_t pos = read_pos;
1947 off64_t next_pos = pos;
1949 read_entry_result result = do_read_entry(
1955 if (result == SUCCESS) {
1956 journalq.push_back( pair<uint64_t,off64_t>(seq, pos));
1957 uint64_t amount_to_take =
1960 (header.max_size - pos) + (next_pos - get_top());
1961 throttle.take(amount_to_take);
1962 throttle.register_throttle_seq(next_seq, amount_to_take);
1964 logger->inc(l_filestore_journal_ops, 1);
1965 logger->inc(l_filestore_journal_bytes, amount_to_take);
1967 if (next_seq > seq) {
1970 read_pos = next_pos;
1972 if (seq > journaled_seq)
1973 journaled_seq = seq;
1977 derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
1980 if (seq && seq < header.committed_up_to) {
1981 derr << "Unable to read past sequence " << seq
1982 << " but header indicates the journal has committed up through "
1983 << header.committed_up_to << ", journal is corrupt" << dendl;
1984 if (cct->_conf->journal_ignore_corruption) {
1993 dout(2) << "No further valid entries found, journal is most likely valid"
1998 FileJournal::read_entry_result FileJournal::do_read_entry(
2004 entry_header_t *_h) const
2006 off64_t cur_pos = init_pos;
2015 wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos);
2016 h = reinterpret_cast<entry_header_t *>(hbl.c_str());
2018 if (!h->check_magic(cur_pos, header.get_fsid64())) {
2019 dout(25) << "read_entry " << init_pos
2020 << " : bad header magic, end of journal" << dendl;
2022 *ss << "bad header magic";
2024 *next_pos = init_pos + (4<<10); // check 4k ahead
2025 return MAYBE_CORRUPT;
2027 cur_pos = _next_pos;
2031 cur_pos += h->pre_pad;
2034 wrap_read_bl(cur_pos, h->len, bl, &cur_pos);
2037 cur_pos += h->post_pad;
2042 wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos);
2043 f = reinterpret_cast<entry_header_t *>(fbl.c_str());
2044 if (memcmp(f, h, sizeof(*f))) {
2046 *ss << "bad footer magic, partial entry";
2048 *next_pos = cur_pos;
2049 return MAYBE_CORRUPT;
2052 if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal)
2053 h->crc32c != 0) { // newer entry in old journal
2054 uint32_t actual_crc = bl->crc32c(0);
2055 if (actual_crc != h->crc32c) {
2057 *ss << "header crc (" << h->crc32c
2058 << ") doesn't match body crc (" << actual_crc << ")";
2060 *next_pos = cur_pos;
2061 return MAYBE_CORRUPT;
2066 dout(2) << "read_entry " << init_pos << " : seq " << h->seq
2067 << " " << h->len << " bytes"
2076 *next_pos = cur_pos;
2081 assert(cur_pos % header.alignment == 0);
2085 void FileJournal::reserve_throttle_and_backoff(uint64_t count)
2087 throttle.get(count);
2090 void FileJournal::get_header(
2091 uint64_t wanted_seq,
2095 off64_t pos = header.start;
2096 off64_t next_pos = pos;
2099 dout(2) << __func__ << dendl;
2103 read_entry_result result = do_read_entry(
2110 if (result == FAILURE || result == MAYBE_CORRUPT)
2112 if (seq == wanted_seq) {
2118 ceph_abort(); // not reachable
2121 void FileJournal::corrupt(
2125 dout(2) << __func__ << dendl;
2126 if (corrupt_at >= header.max_size)
2127 corrupt_at = corrupt_at + get_top() - header.max_size;
2129 int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
2130 assert(actual == corrupt_at);
2133 int r = safe_read_exact(fd, buf, 1);
2136 actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
2137 assert(actual == corrupt_at);
2140 r = safe_write(wfd, buf, 1);
2144 void FileJournal::corrupt_payload(
2148 dout(2) << __func__ << dendl;
2151 get_header(seq, &pos, &h);
2152 off64_t corrupt_at =
2153 pos + sizeof(entry_header_t) + h.pre_pad;
2154 corrupt(wfd, corrupt_at);
2158 void FileJournal::corrupt_footer_magic(
2162 dout(2) << __func__ << dendl;
2165 get_header(seq, &pos, &h);
2166 off64_t corrupt_at =
2167 pos + sizeof(entry_header_t) + h.pre_pad +
2168 h.len + h.post_pad +
2169 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2170 corrupt(wfd, corrupt_at);
2174 void FileJournal::corrupt_header_magic(
2178 dout(2) << __func__ << dendl;
2181 get_header(seq, &pos, &h);
2182 off64_t corrupt_at =
2184 (reinterpret_cast<char*>(&h.magic2) - reinterpret_cast<char*>(&h));
2185 corrupt(wfd, corrupt_at);
2188 off64_t FileJournal::get_journal_size_estimate()
2190 off64_t size, start = header.start;
2191 if (write_pos < start) {
2192 size = (max_size - start) + write_pos;
2194 size = write_pos - start;
2196 dout(20) << __func__ << " journal size=" << size << dendl;