X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Ffilestore%2FFileJournal.cc;fp=src%2Fceph%2Fsrc%2Fos%2Ffilestore%2FFileJournal.cc;h=8dd75435dc6755f6b5506241dc4433cf254c2274;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/os/filestore/FileJournal.cc b/src/ceph/src/os/filestore/FileJournal.cc new file mode 100644 index 0000000..8dd7543 --- /dev/null +++ b/src/ceph/src/os/filestore/FileJournal.cc @@ -0,0 +1,2198 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "acconfig.h" + +#include "common/debug.h" +#include "common/errno.h" +#include "common/safe_io.h" +#include "FileJournal.h" +#include "include/color.h" +#include "common/perf_counters.h" +#include "FileStore.h" + +#include "include/compat.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/blkdev.h" +#if defined(__linux__) +#include "common/linux_version.h" +#endif + +#if defined(__FreeBSD__) +#define O_DSYNC O_SYNC +#endif + +#define dout_context cct +#define dout_subsys ceph_subsys_journal +#undef dout_prefix +#define dout_prefix *_dout << "journal " + +const static int64_t ONE_MEG(1 << 20); +const static int CEPH_DIRECTIO_ALIGNMENT(4096); + + +int FileJournal::_open(bool forwrite, bool create) +{ + int flags, ret; + + if (forwrite) { + flags = O_RDWR; + if (directio) + flags |= O_DIRECT | O_DSYNC; + } else { + flags = O_RDONLY; + } + if (create) + flags |= O_CREAT; + + if (fd >= 0) { + if (TEMP_FAILURE_RETRY(::close(fd))) { + int err = errno; + derr << "FileJournal::_open: error closing old fd: " + << cpp_strerror(err) << dendl; + } + } + fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644)); + if (fd < 0) { + int err = errno; + dout(2) << "FileJournal::_open unable to open journal " + << fn << ": " << cpp_strerror(err) << dendl; + return -err; + } + + struct stat st; + ret = ::fstat(fd, &st); + if (ret) { + ret = errno; + derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl; + ret = -ret; + goto out_fd; + } + + if (S_ISBLK(st.st_mode)) { + ret = _open_block_device(); + } else if (S_ISREG(st.st_mode)) { + if (aio && !force_aio) { + derr << "FileJournal::_open: disabling aio for non-block journal. Use " + << "journal_force_aio to force use of aio anyway" << dendl; + aio = false; + } + ret = _open_file(st.st_size, st.st_blksize, create); + } else { + derr << "FileJournal::_open: wrong journal file type: " << st.st_mode + << dendl; + ret = -EINVAL; + } + + if (ret) + goto out_fd; + +#ifdef HAVE_LIBAIO + if (aio) { + aio_ctx = 0; + ret = io_setup(128, &aio_ctx); + if (ret < 0) { + switch (ret) { + // Contrary to naive expectations -EAGIAN means ... + case -EAGAIN: + derr << "FileJournal::_open: user's limit of aio events exceeded. " + << "Try increasing /proc/sys/fs/aio-max-nr" << dendl; + break; + default: + derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(-ret) << dendl; + break; + } + goto out_fd; + } + } +#endif + + /* We really want max_size to be a multiple of block_size. */ + max_size -= max_size % block_size; + + dout(1) << "_open " << fn << " fd " << fd + << ": " << max_size + << " bytes, block size " << block_size + << " bytes, directio = " << directio + << ", aio = " << aio + << dendl; + return 0; + + out_fd: + VOID_TEMP_FAILURE_RETRY(::close(fd)); + fd = -1; + return ret; +} + +int FileJournal::_open_block_device() +{ + int64_t bdev_sz = 0; + int ret = get_block_device_size(fd, &bdev_sz); + if (ret) { + dout(0) << __func__ << ": failed to read block device size." << dendl; + return -EIO; + } + + /* Check for bdev_sz too small */ + if (bdev_sz < ONE_MEG) { + dout(0) << __func__ << ": your block device must be at least " + << ONE_MEG << " bytes to be used for a Ceph journal." << dendl; + return -EINVAL; + } + + dout(10) << __func__ << ": ignoring osd journal size. " + << "We'll use the entire block device (size: " << bdev_sz << ")" + << dendl; + max_size = bdev_sz; + + block_size = cct->_conf->journal_block_size; + + if (cct->_conf->journal_discard) { + discard = block_device_support_discard(fn.c_str()); + dout(10) << fn << " support discard: " << (int)discard << dendl; + } + + return 0; +} + +int FileJournal::_open_file(int64_t oldsize, blksize_t blksize, + bool create) +{ + int ret; + int64_t conf_journal_sz(cct->_conf->osd_journal_size); + conf_journal_sz <<= 20; + + if ((cct->_conf->osd_journal_size == 0) && (oldsize < ONE_MEG)) { + derr << "I'm sorry, I don't know how large of a journal to create." + << "Please specify a block device to use as the journal OR " + << "set osd_journal_size in your ceph.conf" << dendl; + return -EINVAL; + } + + if (create && (oldsize < conf_journal_sz)) { + uint64_t newsize(conf_journal_sz); + dout(10) << __func__ << " _open extending to " << newsize << " bytes" << dendl; + ret = ::ftruncate(fd, newsize); + if (ret < 0) { + int err = errno; + derr << "FileJournal::_open_file : unable to extend journal to " + << newsize << " bytes: " << cpp_strerror(err) << dendl; + return -err; + } +#ifdef HAVE_POSIX_FALLOCATE + ret = ::posix_fallocate(fd, 0, newsize); + if (ret) { + derr << "FileJournal::_open_file : unable to preallocation journal to " + << newsize << " bytes: " << cpp_strerror(ret) << dendl; + return -ret; + } + max_size = newsize; +#elif defined(__APPLE__) + fstore_t store; + store.fst_flags = F_ALLOCATECONTIG; + store.fst_posmode = F_PEOFPOSMODE; + store.fst_offset = 0; + store.fst_length = newsize; + + ret = ::fcntl(fd, F_PREALLOCATE, &store); + if (ret == -1) { + ret = -errno; + derr << "FileJournal::_open_file : unable to preallocation journal to " + << newsize << " bytes: " << cpp_strerror(ret) << dendl; + return ret; + } + max_size = newsize; +#else +# error "Journal pre-allocation not supported on platform." +#endif + } + else { + max_size = oldsize; + } + block_size = cct->_conf->journal_block_size; + + if (create && cct->_conf->journal_zero_on_create) { + derr << "FileJournal::_open_file : zeroing journal" << dendl; + uint64_t write_size = 1 << 20; + char *buf; + ret = ::posix_memalign((void **)&buf, block_size, write_size); + if (ret != 0) { + return -ret; + } + memset(static_cast(buf), 0, write_size); + uint64_t i = 0; + for (; (i + write_size) <= (uint64_t)max_size; i += write_size) { + ret = ::pwrite(fd, static_cast(buf), write_size, i); + if (ret < 0) { + free(buf); + return -errno; + } + } + if (i < (uint64_t)max_size) { + ret = ::pwrite(fd, static_cast(buf), max_size - i, i); + if (ret < 0) { + free(buf); + return -errno; + } + } + free(buf); + } + + + dout(10) << "_open journal is not a block device, NOT checking disk " + << "write cache on '" << fn << "'" << dendl; + + return 0; +} + +// This can not be used on an active journal +int FileJournal::check() +{ + int ret; + + assert(fd == -1); + ret = _open(false, false); + if (ret) + return ret; + + ret = read_header(&header); + if (ret < 0) + goto done; + + if (header.fsid != fsid) { + derr << "check: ondisk fsid " << header.fsid << " doesn't match expected " << fsid + << ", invalid (someone else's?) journal" << dendl; + ret = -EINVAL; + goto done; + } + + dout(1) << "check: header looks ok" << dendl; + ret = 0; + + done: + close(); + return ret; +} + + +int FileJournal::create() +{ + void *buf = 0; + int64_t needed_space; + int ret; + buffer::ptr bp; + dout(2) << "create " << fn << " fsid " << fsid << dendl; + + ret = _open(true, true); + if (ret) + goto done; + + // write empty header + header = header_t(); + header.flags = header_t::FLAG_CRC; // enable crcs on any new journal. + header.fsid = fsid; + header.max_size = max_size; + header.block_size = block_size; + if (cct->_conf->journal_block_align || directio) + header.alignment = block_size; + else + header.alignment = 16; // at least stay word aligned on 64bit machines... + + header.start = get_top(); + header.start_seq = 0; + + print_header(header); + + // static zeroed buffer for alignment padding + delete [] zero_buf; + zero_buf = new char[header.alignment]; + memset(zero_buf, 0, header.alignment); + + bp = prepare_header(); + if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) { + ret = -errno; + derr << "FileJournal::create : create write header error " + << cpp_strerror(ret) << dendl; + goto close_fd; + } + + // zero first little bit, too. + ret = posix_memalign(&buf, block_size, block_size); + if (ret) { + ret = -ret; + derr << "FileJournal::create: failed to allocate " << block_size + << " bytes of memory: " << cpp_strerror(ret) << dendl; + goto close_fd; + } + memset(buf, 0, block_size); + if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) { + ret = -errno; + derr << "FileJournal::create: error zeroing first " << block_size + << " bytes " << cpp_strerror(ret) << dendl; + goto free_buf; + } + + needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20; + needed_space += (2 * sizeof(entry_header_t)) + get_top(); + if (header.max_size - header.start < needed_space) { + derr << "FileJournal::create: OSD journal is not large enough to hold " + << "osd_max_write_size bytes!" << dendl; + ret = -ENOSPC; + goto free_buf; + } + + dout(2) << "create done" << dendl; + ret = 0; + +free_buf: + free(buf); + buf = 0; +close_fd: + if (TEMP_FAILURE_RETRY(::close(fd)) < 0) { + ret = -errno; + derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret) + << dendl; + } +done: + fd = -1; + return ret; +} + +// This can not be used on an active journal +int FileJournal::peek_fsid(uuid_d& fsid) +{ + assert(fd == -1); + int r = _open(false, false); + if (r) + return r; + r = read_header(&header); + if (r < 0) + goto out; + fsid = header.fsid; +out: + close(); + return r; +} + +int FileJournal::open(uint64_t fs_op_seq) +{ + dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl; + + uint64_t next_seq = fs_op_seq + 1; + uint64_t seq = -1; + + int err = _open(false); + if (err) + return err; + + // assume writeable, unless... + read_pos = 0; + write_pos = get_top(); + + // read header? + err = read_header(&header); + if (err < 0) + goto out; + + // static zeroed buffer for alignment padding + delete [] zero_buf; + zero_buf = new char[header.alignment]; + memset(zero_buf, 0, header.alignment); + + dout(10) << "open header.fsid = " << header.fsid + //<< " vs expected fsid = " << fsid + << dendl; + if (header.fsid != fsid) { + derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid + << ", invalid (someone else's?) journal" << dendl; + err = -EINVAL; + goto out; + } + if (header.max_size > max_size) { + dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl; + err = -EINVAL; + goto out; + } + if (header.block_size != block_size) { + dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl; + err = -EINVAL; + goto out; + } + if (header.max_size % header.block_size) { + dout(2) << "open journal max size " << header.max_size + << " not a multiple of block size " << header.block_size << dendl; + err = -EINVAL; + goto out; + } + if (header.alignment != block_size && directio) { + dout(0) << "open journal alignment " << header.alignment << " does not match block size " + << block_size << " (required for direct_io journal mode)" << dendl; + err = -EINVAL; + goto out; + } + if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) { + dout(0) << "open journal alignment " << header.alignment + << " is not multiple of minimum directio alignment " + << CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)" + << dendl; + err = -EINVAL; + goto out; + } + + // looks like a valid header. + write_pos = 0; // not writeable yet + + journaled_seq = header.committed_up_to; + + // find next entry + read_pos = header.start; + seq = header.start_seq; + + while (1) { + bufferlist bl; + off64_t old_pos = read_pos; + if (!read_entry(bl, seq)) { + dout(10) << "open reached end of journal." << dendl; + break; + } + if (seq > next_seq) { + dout(10) << "open entry " << seq << " len " << bl.length() << " > next_seq " << next_seq + << ", ignoring journal contents" + << dendl; + read_pos = -1; + last_committed_seq = 0; + return 0; + } + if (seq == next_seq) { + dout(10) << "open reached seq " << seq << dendl; + read_pos = old_pos; + break; + } + seq++; // next event should follow. + } + + return 0; +out: + close(); + return err; +} + +void FileJournal::_close(int fd) const +{ + VOID_TEMP_FAILURE_RETRY(::close(fd)); +} + +void FileJournal::close() +{ + dout(1) << "close " << fn << dendl; + + // stop writer thread + stop_writer(); + + // close + assert(writeq_empty()); + assert(!must_write_header); + assert(fd >= 0); + _close(fd); + fd = -1; +} + + +int FileJournal::dump(ostream& out) +{ + return _dump(out, false); +} + +int FileJournal::simple_dump(ostream& out) +{ + return _dump(out, true); +} + +int FileJournal::_dump(ostream& out, bool simple) +{ + JSONFormatter f(true); + int ret = _fdump(f, simple); + f.flush(out); + return ret; +} + +int FileJournal::_fdump(Formatter &f, bool simple) +{ + dout(10) << "_fdump" << dendl; + + assert(fd == -1); + int err = _open(false, false); + if (err) + return err; + + err = read_header(&header); + if (err < 0) { + close(); + return err; + } + + off64_t next_pos = header.start; + + f.open_object_section("journal"); + + f.open_object_section("header"); + f.dump_unsigned("flags", header.flags); + ostringstream os; + os << header.fsid; + f.dump_string("fsid", os.str()); + f.dump_unsigned("block_size", header.block_size); + f.dump_unsigned("alignment", header.alignment); + f.dump_int("max_size", header.max_size); + f.dump_int("start", header.start); + f.dump_unsigned("committed_up_to", header.committed_up_to); + f.dump_unsigned("start_seq", header.start_seq); + f.close_section(); + + f.open_array_section("entries"); + uint64_t seq = header.start_seq; + while (1) { + bufferlist bl; + off64_t pos = next_pos; + + if (!pos) { + dout(2) << "_dump -- not readable" << dendl; + err = -EINVAL; + break; + } + stringstream ss; + read_entry_result result = do_read_entry( + pos, + &next_pos, + &bl, + &seq, + &ss); + if (result != SUCCESS) { + if (seq < header.committed_up_to) { + dout(2) << "Unable to read past sequence " << seq + << " but header indicates the journal has committed up through " + << header.committed_up_to << ", journal is corrupt" << dendl; + err = -EINVAL; + } + dout(25) << ss.str() << dendl; + dout(25) << "No further valid entries found, journal is most likely valid" + << dendl; + break; + } + + f.open_object_section("entry"); + f.dump_unsigned("offset", pos); + f.dump_unsigned("seq", seq); + if (simple) { + f.dump_unsigned("bl.length", bl.length()); + } else { + f.open_array_section("transactions"); + bufferlist::iterator p = bl.begin(); + int trans_num = 0; + while (!p.end()) { + ObjectStore::Transaction t(p); + f.open_object_section("transaction"); + f.dump_unsigned("trans_num", trans_num); + t.dump(&f); + f.close_section(); + trans_num++; + } + f.close_section(); + } + f.close_section(); + } + + f.close_section(); + f.close_section(); + dout(10) << "dump finish" << dendl; + + close(); + return err; +} + + +void FileJournal::start_writer() +{ + write_stop = false; + aio_stop = false; + write_thread.create("journal_write"); +#ifdef HAVE_LIBAIO + if (aio) + write_finish_thread.create("journal_wrt_fin"); +#endif +} + +void FileJournal::stop_writer() +{ + // Do nothing if writer already stopped or never started + if (!write_stop) + { + { + Mutex::Locker l(write_lock); + Mutex::Locker p(writeq_lock); + write_stop = true; + writeq_cond.Signal(); + // Doesn't hurt to signal commit_cond in case thread is waiting there + // and caller didn't use committed_thru() first. + commit_cond.Signal(); + } + write_thread.join(); + + // write journal header now so that we have less to replay on remount + write_header_sync(); + } + +#ifdef HAVE_LIBAIO + // stop aio completeion thread *after* writer thread has stopped + // and has submitted all of its io + if (aio && !aio_stop) { + aio_lock.Lock(); + aio_stop = true; + aio_cond.Signal(); + write_finish_cond.Signal(); + aio_lock.Unlock(); + write_finish_thread.join(); + } +#endif +} + + + +void FileJournal::print_header(const header_t &header) const +{ + dout(10) << "header: block_size " << header.block_size + << " alignment " << header.alignment + << " max_size " << header.max_size + << dendl; + dout(10) << "header: start " << header.start << dendl; + dout(10) << " write_pos " << write_pos << dendl; +} + +int FileJournal::read_header(header_t *hdr) const +{ + dout(10) << "read_header" << dendl; + bufferlist bl; + + buffer::ptr bp = buffer::create_page_aligned(block_size); + char* bpdata = bp.c_str(); + int r = ::pread(fd, bpdata, bp.length(), 0); + + if (r < 0) { + int err = errno; + dout(0) << "read_header got " << cpp_strerror(err) << dendl; + return -err; + } + + // don't use bp.zero() here, because it also invalidates + // crc cache (which is not yet populated anyway) + if (bp.length() != (size_t)r) { + // r will be always less or equal than bp.length + bpdata += r; + memset(bpdata, 0, bp.length() - r); + } + + bl.push_back(std::move(bp)); + + try { + bufferlist::iterator p = bl.begin(); + ::decode(*hdr, p); + } + catch (buffer::error& e) { + derr << "read_header error decoding journal header" << dendl; + return -EINVAL; + } + + + /* + * Unfortunately we weren't initializing the flags field for new + * journals! Aie. This is safe(ish) now that we have only one + * flag. Probably around when we add the next flag we need to + * remove this or else this (eventually old) code will clobber newer + * code's flags. + */ + if (hdr->flags > 3) { + derr << "read_header appears to have gibberish flags; assuming 0" << dendl; + hdr->flags = 0; + } + + print_header(*hdr); + + return 0; +} + +bufferptr FileJournal::prepare_header() +{ + bufferlist bl; + { + Mutex::Locker l(finisher_lock); + header.committed_up_to = journaled_seq; + } + ::encode(header, bl); + bufferptr bp = buffer::create_page_aligned(get_top()); + // don't use bp.zero() here, because it also invalidates + // crc cache (which is not yet populated anyway) + char* data = bp.c_str(); + memcpy(data, bl.c_str(), bl.length()); + data += bl.length(); + memset(data, 0, bp.length()-bl.length()); + return bp; +} + +void FileJournal::write_header_sync() +{ + Mutex::Locker locker(write_lock); + must_write_header = true; + bufferlist bl; + do_write(bl); + dout(20) << __func__ << " finish" << dendl; +} + +int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size) +{ + // already full? + if (full_state != FULL_NOTFULL) + return -ENOSPC; + + // take 1 byte off so that we only get pos == header.start on EMPTY, never on FULL. + off64_t room; + if (pos >= header.start) + room = (header.max_size - pos) + (header.start - get_top()) - 1; + else + room = header.start - pos - 1; + dout(10) << "room " << room << " max_size " << max_size << " pos " << pos << " header.start " << header.start + << " top " << get_top() << dendl; + + if (do_sync_cond) { + if (room >= (header.max_size >> 1) && + room - size < (header.max_size >> 1)) { + dout(10) << " passing half full mark, triggering commit" << dendl; + do_sync_cond->SloppySignal(); // initiate a real commit so we can trim + } + } + + if (room >= size) { + dout(10) << "check_for_full at " << pos << " : " << size << " < " << room << dendl; + if (pos + size > header.max_size) + must_write_header = true; + return 0; + } + + // full + dout(1) << "check_for_full at " << pos << " : JOURNAL FULL " + << pos << " >= " << room + << " (max_size " << header.max_size << " start " << header.start << ")" + << dendl; + + off64_t max = header.max_size - get_top(); + if (size > max) + dout(0) << "JOURNAL TOO SMALL: continuing, but slow: item " << size << " > journal " << max << " (usable)" << dendl; + + return -ENOSPC; +} + +int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytes) +{ + // gather queued writes + off64_t queue_pos = write_pos; + + int eleft = cct->_conf->journal_max_write_entries; + unsigned bmax = cct->_conf->journal_max_write_bytes; + + if (full_state != FULL_NOTFULL) + return -ENOSPC; + + while (!writeq_empty()) { + list items; + batch_pop_write(items); + list::iterator it = items.begin(); + while (it != items.end()) { + uint64_t bytes = it->bl.length(); + int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes); + if (r == 0) { // prepare ok, delete it + items.erase(it++); +#ifdef HAVE_LIBAIO + { + Mutex::Locker locker(aio_lock); + assert(aio_write_queue_ops > 0); + aio_write_queue_ops--; + assert(aio_write_queue_bytes >= bytes); + aio_write_queue_bytes -= bytes; + } +#else + (void)bytes; +#endif + } + if (r == -ENOSPC) { + // the journal maybe full, insert the left item to writeq + batch_unpop_write(items); + if (orig_ops) + goto out; // commit what we have + + if (logger) + logger->inc(l_filestore_journal_full); + + if (wait_on_full) { + dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl; + } else { + dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl; + + // throw out what we have so far + full_state = FULL_FULL; + while (!writeq_empty()) { + complete_write(1, peek_write().orig_len); + pop_write(); + } + print_header(header); + } + + return -ENOSPC; // hrm, full on first op + } + if (eleft) { + if (--eleft == 0) { + dout(20) << "prepare_multi_write hit max events per write " + << cct->_conf->journal_max_write_entries << dendl; + batch_unpop_write(items); + goto out; + } + } + if (bmax) { + if (bl.length() >= bmax) { + dout(20) << "prepare_multi_write hit max write size " + << cct->_conf->journal_max_write_bytes << dendl; + batch_unpop_write(items); + goto out; + } + } + } + } + +out: + dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl; + assert((write_pos + bl.length() == queue_pos) || + (write_pos + bl.length() - header.max_size + get_top() == queue_pos)); + return 0; +} + +/* +void FileJournal::queue_write_fin(uint64_t seq, Context *fin) +{ + writing_seq.push_back(seq); + if (!waiting_for_notfull.empty()) { + // make sure previously unjournaled stuff waiting for UNFULL triggers + // _before_ newly journaled stuff does + dout(10) << "queue_write_fin will defer seq " << seq << " callback " << fin + << " until after UNFULL" << dendl; + C_Gather *g = new C_Gather(writeq.front().fin); + writing_fin.push_back(g->new_sub()); + waiting_for_notfull.push_back(g->new_sub()); + } else { + writing_fin.push_back(writeq.front().fin); + dout(20) << "queue_write_fin seq " << seq << " callback " << fin << dendl; + } +} +*/ + +void FileJournal::queue_completions_thru(uint64_t seq) +{ + assert(finisher_lock.is_locked()); + utime_t now = ceph_clock_now(); + list items; + batch_pop_completions(items); + list::iterator it = items.begin(); + while (it != items.end()) { + completion_item& next = *it; + if (next.seq > seq) + break; + utime_t lat = now; + lat -= next.start; + dout(10) << "queue_completions_thru seq " << seq + << " queueing seq " << next.seq + << " " << next.finish + << " lat " << lat << dendl; + if (logger) { + logger->tinc(l_filestore_journal_latency, lat); + } + if (next.finish) + finisher->queue(next.finish); + if (next.tracked_op) { + next.tracked_op->mark_event("journaled_completion_queued"); + next.tracked_op->journal_trace.event("queued completion"); + next.tracked_op->journal_trace.keyval("completed through", seq); + } + items.erase(it++); + } + batch_unpop_completions(items); + finisher_cond.Signal(); +} + + +int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes) +{ + uint64_t seq = next_write.seq; + bufferlist &ebl = next_write.bl; + off64_t size = ebl.length(); + + int r = check_for_full(seq, queue_pos, size); + if (r < 0) + return r; // ENOSPC or EAGAIN + + uint32_t orig_len = next_write.orig_len; + orig_bytes += orig_len; + orig_ops++; + + // add to write buffer + dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq + << " len " << orig_len << " -> " << size << dendl; + + unsigned seq_offset = offsetof(entry_header_t, seq); + unsigned magic1_offset = offsetof(entry_header_t, magic1); + unsigned magic2_offset = offsetof(entry_header_t, magic2); + + bufferptr headerptr = ebl.buffers().front(); + uint64_t _seq = seq; + uint64_t _queue_pos = queue_pos; + uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64()); + headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq); + headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); + headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2); + + bufferptr footerptr = ebl.buffers().back(); + unsigned post_offset = footerptr.length() - sizeof(entry_header_t); + footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq); + footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos); + footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2); + + bl.claim_append(ebl); + if (next_write.tracked_op) { + next_write.tracked_op->mark_event("write_thread_in_journal_buffer"); + next_write.tracked_op->journal_trace.event("prepare_single_write"); + } + + journalq.push_back(pair(seq, queue_pos)); + writing_seq = seq; + + queue_pos += size; + if (queue_pos >= header.max_size) + queue_pos = queue_pos + get_top() - header.max_size; + + return 0; +} + +void FileJournal::check_align(off64_t pos, bufferlist& bl) +{ + // make sure list segments are page aligned + if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) { + assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); + assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0); + assert(0 == "bl was not aligned"); + } +} + +int FileJournal::write_bl(off64_t& pos, bufferlist& bl) +{ + int ret; + + off64_t spos = ::lseek64(fd, pos, SEEK_SET); + if (spos < 0) { + ret = -errno; + derr << "FileJournal::write_bl : lseek64 failed " << cpp_strerror(ret) << dendl; + return ret; + } + ret = bl.write_fd(fd); + if (ret) { + derr << "FileJournal::write_bl : write_fd failed: " << cpp_strerror(ret) << dendl; + return ret; + } + pos += bl.length(); + if (pos == header.max_size) + pos = get_top(); + return 0; +} + +void FileJournal::do_write(bufferlist& bl) +{ + // nothing to do? + if (bl.length() == 0 && !must_write_header) + return; + + buffer::ptr hbp; + if (cct->_conf->journal_write_header_frequency && + (((++journaled_since_start) % + cct->_conf->journal_write_header_frequency) == 0)) { + must_write_header = true; + } + + if (must_write_header) { + must_write_header = false; + hbp = prepare_header(); + } + + dout(15) << "do_write writing " << write_pos << "~" << bl.length() + << (hbp.length() ? " + header":"") + << dendl; + + utime_t from = ceph_clock_now(); + + // entry + off64_t pos = write_pos; + + // Adjust write_pos + write_pos += bl.length(); + if (write_pos >= header.max_size) + write_pos = write_pos - header.max_size + get_top(); + + write_lock.Unlock(); + + // split? + off64_t split = 0; + if (pos + bl.length() > header.max_size) { + bufferlist first, second; + split = header.max_size - pos; + first.substr_of(bl, 0, split); + second.substr_of(bl, split, bl.length() - split); + assert(first.length() + second.length() == bl.length()); + dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length() + << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl; + + //Save pos to write first piece second + off64_t first_pos = pos; + off64_t orig_pos; + pos = get_top(); + // header too? + if (hbp.length()) { + // be sneaky: include the header in the second fragment + second.push_front(hbp); + pos = 0; // we included the header + } + // Write the second portion first possible with the header, so + // do_read_entry() won't even get a valid entry_header_t if there + // is a crash between the two writes. + orig_pos = pos; + if (write_bl(pos, second)) { + derr << "FileJournal::do_write: write_bl(pos=" << orig_pos + << ") failed" << dendl; + check_align(pos, second); + ceph_abort(); + } + orig_pos = first_pos; + if (write_bl(first_pos, first)) { + derr << "FileJournal::do_write: write_bl(pos=" << orig_pos + << ") failed" << dendl; + check_align(first_pos, first); + ceph_abort(); + } + assert(first_pos == get_top()); + } else { + // header too? + if (hbp.length()) { + if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) { + int err = errno; + derr << "FileJournal::do_write: pwrite(fd=" << fd + << ", hbp.length=" << hbp.length() << ") failed :" + << cpp_strerror(err) << dendl; + ceph_abort(); + } + } + + if (write_bl(pos, bl)) { + derr << "FileJournal::do_write: write_bl(pos=" << pos + << ") failed" << dendl; + check_align(pos, bl); + ceph_abort(); + } + } + + if (!directio) { + dout(20) << "do_write fsync" << dendl; + + /* + * We'd really love to have a fsync_range or fdatasync_range and do a: + * + * if (split) { + * ::fsync_range(fd, header.max_size - split, split)l + * ::fsync_range(fd, get_top(), bl.length() - split); + * else + * ::fsync_range(fd, write_pos, bl.length()) + * + * NetBSD and AIX apparently have it, and adding it to Linux wouldn't be + * too hard given all the underlying infrastructure already exist. + * + * NOTE: using sync_file_range here would not be safe as it does not + * flush disk caches or commits any sort of metadata. + */ + int ret = 0; +#if defined(DARWIN) || defined(__FreeBSD__) + ret = ::fsync(fd); +#else + ret = ::fdatasync(fd); +#endif + if (ret < 0) { + derr << __func__ << " fsync/fdatasync failed: " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +#ifdef HAVE_POSIX_FADVISE + if (cct->_conf->filestore_fadvise) + posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); +#endif + } + + utime_t lat = ceph_clock_now() - from; + dout(20) << "do_write latency " << lat << dendl; + + write_lock.Lock(); + + assert(write_pos == pos); + assert(write_pos % header.alignment == 0); + + { + Mutex::Locker locker(finisher_lock); + journaled_seq = writing_seq; + + // kick finisher? + // only if we haven't filled up recently! + if (full_state != FULL_NOTFULL) { + dout(10) << "do_write NOT queueing finisher seq " << journaled_seq + << ", full_commit_seq|full_restart_seq" << dendl; + } else { + if (plug_journal_completions) { + dout(20) << "do_write NOT queueing finishers through seq " << journaled_seq + << " due to completion plug" << dendl; + } else { + dout(20) << "do_write queueing finishers through seq " << journaled_seq << dendl; + queue_completions_thru(journaled_seq); + } + } + } +} + +void FileJournal::flush() +{ + dout(10) << "waiting for completions to empty" << dendl; + { + Mutex::Locker l(finisher_lock); + while (!completions_empty()) + finisher_cond.Wait(finisher_lock); + } + dout(10) << "flush waiting for finisher" << dendl; + finisher->wait_for_empty(); + dout(10) << "flush done" << dendl; +} + + +void FileJournal::write_thread_entry() +{ + dout(10) << "write_thread_entry start" << dendl; + while (1) { + { + Mutex::Locker locker(writeq_lock); + if (writeq.empty() && !must_write_header) { + if (write_stop) + break; + dout(20) << "write_thread_entry going to sleep" << dendl; + writeq_cond.Wait(writeq_lock); + dout(20) << "write_thread_entry woke up" << dendl; + continue; + } + } + +#ifdef HAVE_LIBAIO + if (aio) { + Mutex::Locker locker(aio_lock); + // should we back off to limit aios in flight? try to do this + // adaptively so that we submit larger aios once we have lots of + // them in flight. + // + // NOTE: our condition here is based on aio_num (protected by + // aio_lock) and throttle_bytes (part of the write queue). when + // we sleep, we *only* wait for aio_num to change, and do not + // wake when more data is queued. this is not strictly correct, + // but should be fine given that we will have plenty of aios in + // flight if we hit this limit to ensure we keep the device + // saturated. + while (aio_num > 0) { + int exp = MIN(aio_num * 2, 24); + long unsigned min_new = 1ull << exp; + uint64_t cur = aio_write_queue_bytes; + dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes + << " ... exp " << exp << " min_new " << min_new + << " ... pending " << cur << dendl; + if (cur >= min_new) + break; + dout(20) << "write_thread_entry deferring until more aios complete: " + << aio_num << " aios with " << aio_bytes << " bytes needs " << min_new + << " bytes to start a new aio (currently " << cur << " pending)" << dendl; + aio_cond.Wait(aio_lock); + dout(20) << "write_thread_entry woke up" << dendl; + } + } +#endif + + Mutex::Locker locker(write_lock); + uint64_t orig_ops = 0; + uint64_t orig_bytes = 0; + + bufferlist bl; + int r = prepare_multi_write(bl, orig_ops, orig_bytes); + // Don't care about journal full if stoppping, so drop queue and + // possibly let header get written and loop above to notice stop + if (r == -ENOSPC) { + if (write_stop) { + dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl; + while (!writeq_empty()) { + complete_write(1, peek_write().orig_len); + pop_write(); + } + print_header(header); + r = 0; + } else { + dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl; + commit_cond.Wait(write_lock); + dout(20) << "write_thread_entry woke up" << dendl; + continue; + } + } + assert(r == 0); + + if (logger) { + logger->inc(l_filestore_journal_wr); + logger->inc(l_filestore_journal_wr_bytes, bl.length()); + } + +#ifdef HAVE_LIBAIO + if (aio) + do_aio_write(bl); + else + do_write(bl); +#else + do_write(bl); +#endif + complete_write(orig_ops, orig_bytes); + } + + dout(10) << "write_thread_entry finish" << dendl; +} + +#ifdef HAVE_LIBAIO +void FileJournal::do_aio_write(bufferlist& bl) +{ + + if (cct->_conf->journal_write_header_frequency && + (((++journaled_since_start) % + cct->_conf->journal_write_header_frequency) == 0)) { + must_write_header = true; + } + + // nothing to do? + if (bl.length() == 0 && !must_write_header) + return; + + buffer::ptr hbp; + if (must_write_header) { + must_write_header = false; + hbp = prepare_header(); + } + + // entry + off64_t pos = write_pos; + + dout(15) << "do_aio_write writing " << pos << "~" << bl.length() + << (hbp.length() ? " + header":"") + << dendl; + + // split? + off64_t split = 0; + if (pos + bl.length() > header.max_size) { + bufferlist first, second; + split = header.max_size - pos; + first.substr_of(bl, 0, split); + second.substr_of(bl, split, bl.length() - split); + assert(first.length() + second.length() == bl.length()); + dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl; + + if (write_aio_bl(pos, first, 0)) { + derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos + << ") failed" << dendl; + ceph_abort(); + } + assert(pos == header.max_size); + if (hbp.length()) { + // be sneaky: include the header in the second fragment + second.push_front(hbp); + pos = 0; // we included the header + } else + pos = get_top(); // no header, start after that + if (write_aio_bl(pos, second, writing_seq)) { + derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos + << ") failed" << dendl; + ceph_abort(); + } + } else { + // header too? + if (hbp.length()) { + bufferlist hbl; + hbl.push_back(hbp); + loff_t pos = 0; + if (write_aio_bl(pos, hbl, 0)) { + derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl; + ceph_abort(); + } + } + + if (write_aio_bl(pos, bl, writing_seq)) { + derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos + << ") failed" << dendl; + ceph_abort(); + } + } + + write_pos = pos; + if (write_pos == header.max_size) + write_pos = get_top(); + assert(write_pos % header.alignment == 0); +} + +/** + * write a buffer using aio + * + * @param seq seq to trigger when this aio completes. if 0, do not update any state + * on completion. + */ +int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq) +{ + dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl; + + while (bl.length() > 0) { + int max = MIN(bl.get_num_buffers(), IOV_MAX-1); + iovec *iov = new iovec[max]; + int n = 0; + unsigned len = 0; + for (std::list::const_iterator p = bl.buffers().begin(); + n < max; + ++p, ++n) { + assert(p != bl.buffers().end()); + iov[n].iov_base = (void *)p->c_str(); + iov[n].iov_len = p->length(); + len += p->length(); + } + + bufferlist tbl; + bl.splice(0, len, &tbl); // move bytes from bl -> tbl + + // lock only aio_queue, current aio, aio_num, aio_bytes, which may be + // modified in check_aio_completion + aio_lock.Lock(); + aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq)); + aio_info& aio = aio_queue.back(); + aio.iov = iov; + + io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos); + + dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len + << " in " << n << dendl; + + aio_num++; + aio_bytes += aio.len; + + // need to save current aio len to update write_pos later because current + // aio could be ereased from aio_queue once it is done + uint64_t cur_len = aio.len; + // unlock aio_lock because following io_submit might take time to return + aio_lock.Unlock(); + + iocb *piocb = &aio.iocb; + + // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds + int attempts = 16; + int delay = 125; + do { + int r = io_submit(aio_ctx, 1, &piocb); + dout(20) << "write_aio_bl io_submit return value: " << r << dendl; + if (r < 0) { + derr << "io_submit to " << aio.off << "~" << cur_len + << " got " << cpp_strerror(r) << dendl; + if (r == -EAGAIN && attempts-- > 0) { + usleep(delay); + delay *= 2; + continue; + } + check_align(pos, tbl); + assert(0 == "io_submit got unexpected error"); + } else { + break; + } + } while (true); + pos += cur_len; + } + aio_lock.Lock(); + write_finish_cond.Signal(); + aio_lock.Unlock(); + return 0; +} +#endif + +void FileJournal::write_finish_thread_entry() +{ +#ifdef HAVE_LIBAIO + dout(10) << "write_finish_thread_entry enter" << dendl; + while (true) { + { + Mutex::Locker locker(aio_lock); + if (aio_queue.empty()) { + if (aio_stop) + break; + dout(20) << "write_finish_thread_entry sleeping" << dendl; + write_finish_cond.Wait(aio_lock); + continue; + } + } + + dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl; + io_event event[16]; + int r = io_getevents(aio_ctx, 1, 16, event, NULL); + if (r < 0) { + if (r == -EINTR) { + dout(0) << "io_getevents got " << cpp_strerror(r) << dendl; + continue; + } + derr << "io_getevents got " << cpp_strerror(r) << dendl; + assert(0 == "got unexpected error from io_getevents"); + } + + { + Mutex::Locker locker(aio_lock); + for (int i=0; ilen) { + derr << "aio to " << ai->off << "~" << ai->len + << " returned: " << (int)event[i].res << dendl; + assert(0 == "unexpected aio error"); + } + dout(10) << "write_finish_thread_entry aio " << ai->off + << "~" << ai->len << " done" << dendl; + ai->done = true; + } + check_aio_completion(); + } + } + dout(10) << "write_finish_thread_entry exit" << dendl; +#endif +} + +#ifdef HAVE_LIBAIO +/** + * check aio_wait for completed aio, and update state appropriately. + */ +void FileJournal::check_aio_completion() +{ + assert(aio_lock.is_locked()); + dout(20) << "check_aio_completion" << dendl; + + bool completed_something = false, signal = false; + uint64_t new_journaled_seq = 0; + + list::iterator p = aio_queue.begin(); + while (p != aio_queue.end() && p->done) { + dout(20) << "check_aio_completion completed seq " << p->seq << " " + << p->off << "~" << p->len << dendl; + if (p->seq) { + new_journaled_seq = p->seq; + completed_something = true; + } + aio_num--; + aio_bytes -= p->len; + aio_queue.erase(p++); + signal = true; + } + + if (completed_something) { + // kick finisher? + // only if we haven't filled up recently! + Mutex::Locker locker(finisher_lock); + journaled_seq = new_journaled_seq; + if (full_state != FULL_NOTFULL) { + dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq + << ", full_commit_seq|full_restart_seq" << dendl; + } else { + if (plug_journal_completions) { + dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq + << " due to completion plug" << dendl; + } else { + dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl; + queue_completions_thru(journaled_seq); + } + } + } + if (signal) { + // maybe write queue was waiting for aio count to drop? + aio_cond.Signal(); + } +} +#endif + +int FileJournal::prepare_entry(vector& tls, bufferlist* tbl) { + dout(10) << "prepare_entry " << tls << dendl; + int data_len = cct->_conf->journal_align_min_size - 1; + int data_align = -1; // -1 indicates that we don't care about the alignment + bufferlist bl; + for (vector::iterator p = tls.begin(); + p != tls.end(); ++p) { + if ((int)(*p).get_data_length() > data_len) { + data_len = (*p).get_data_length(); + data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK; + } + ::encode(*p, bl); + } + if (tbl->length()) { + bl.claim_append(*tbl); + } + // add it this entry + entry_header_t h; + unsigned head_size = sizeof(entry_header_t); + off64_t base_size = 2*head_size + bl.length(); + memset(&h, 0, sizeof(h)); + if (data_align >= 0) + h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK; + off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment); + unsigned post_pad = size - base_size - h.pre_pad; + h.len = bl.length(); + h.post_pad = post_pad; + h.crc32c = bl.crc32c(0); + dout(10) << " len " << bl.length() << " -> " << size + << " (head " << head_size << " pre_pad " << h.pre_pad + << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")" + << " (bl alignment " << data_align << ")" + << dendl; + bufferlist ebl; + // header + ebl.append((const char*)&h, sizeof(h)); + if (h.pre_pad) { + ebl.push_back(buffer::create_static(h.pre_pad, zero_buf)); + } + // payload + ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy + if (h.post_pad) { + ebl.push_back(buffer::create_static(h.post_pad, zero_buf)); + } + // footer + ebl.append((const char*)&h, sizeof(h)); + if (directio) + ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT); + tbl->claim(ebl); + return h.len; +} + +void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len, + Context *oncommit, TrackedOpRef osd_op) +{ + // dump on queue + dout(5) << "submit_entry seq " << seq + << " len " << e.length() + << " (" << oncommit << ")" << dendl; + assert(e.length() > 0); + assert(e.length() < header.max_size); + + if (osd_op) + osd_op->mark_event("commit_queued_for_journal_write"); + if (logger) { + logger->inc(l_filestore_journal_queue_bytes, orig_len); + logger->inc(l_filestore_journal_queue_ops, 1); + } + + throttle.register_throttle_seq(seq, e.length()); + if (logger) { + logger->inc(l_filestore_journal_ops, 1); + logger->inc(l_filestore_journal_bytes, e.length()); + } + + if (osd_op) { + osd_op->mark_event("commit_queued_for_journal_write"); + if (osd_op->store_trace) { + osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace); + osd_op->journal_trace.event("submit_entry"); + osd_op->journal_trace.keyval("seq", seq); + } + } + { + Mutex::Locker l1(writeq_lock); +#ifdef HAVE_LIBAIO + Mutex::Locker l2(aio_lock); +#endif + Mutex::Locker l3(completions_lock); + +#ifdef HAVE_LIBAIO + aio_write_queue_ops++; + aio_write_queue_bytes += e.length(); + aio_cond.Signal(); +#endif + + completions.push_back( + completion_item( + seq, oncommit, ceph_clock_now(), osd_op)); + if (writeq.empty()) + writeq_cond.Signal(); + writeq.push_back(write_item(seq, e, orig_len, osd_op)); + if (osd_op) + osd_op->journal_trace.keyval("queue depth", writeq.size()); + } +} + +bool FileJournal::writeq_empty() +{ + Mutex::Locker locker(writeq_lock); + return writeq.empty(); +} + +FileJournal::write_item &FileJournal::peek_write() +{ + assert(write_lock.is_locked()); + Mutex::Locker locker(writeq_lock); + return writeq.front(); +} + +void FileJournal::pop_write() +{ + assert(write_lock.is_locked()); + Mutex::Locker locker(writeq_lock); + if (logger) { + logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len); + logger->dec(l_filestore_journal_queue_ops, 1); + } + writeq.pop_front(); +} + +void FileJournal::batch_pop_write(list &items) +{ + assert(write_lock.is_locked()); + { + Mutex::Locker locker(writeq_lock); + writeq.swap(items); + } + for (auto &&i : items) { + if (logger) { + logger->dec(l_filestore_journal_queue_bytes, i.orig_len); + logger->dec(l_filestore_journal_queue_ops, 1); + } + } +} + +void FileJournal::batch_unpop_write(list &items) +{ + assert(write_lock.is_locked()); + for (auto &&i : items) { + if (logger) { + logger->inc(l_filestore_journal_queue_bytes, i.orig_len); + logger->inc(l_filestore_journal_queue_ops, 1); + } + } + Mutex::Locker locker(writeq_lock); + writeq.splice(writeq.begin(), items); +} + +void FileJournal::commit_start(uint64_t seq) +{ + dout(10) << "commit_start" << dendl; + + // was full? + switch (full_state) { + case FULL_NOTFULL: + break; // all good + + case FULL_FULL: + if (seq >= journaled_seq) { + dout(1) << " FULL_FULL -> FULL_WAIT. commit_start on seq " + << seq << " > journaled_seq " << journaled_seq + << ", moving to FULL_WAIT." + << dendl; + full_state = FULL_WAIT; + } else { + dout(1) << "FULL_FULL commit_start on seq " + << seq << " < journaled_seq " << journaled_seq + << ", remaining in FULL_FULL" + << dendl; + } + break; + + case FULL_WAIT: + dout(1) << " FULL_WAIT -> FULL_NOTFULL. journal now active, setting completion plug." << dendl; + full_state = FULL_NOTFULL; + plug_journal_completions = true; + break; + } +} + +/* + *send discard command to joural block deivce + */ +void FileJournal::do_discard(int64_t offset, int64_t end) +{ + dout(10) << __func__ << "trim(" << offset << ", " << end << dendl; + + offset = ROUND_UP_TO(offset, block_size); + if (offset >= end) + return; + end = ROUND_UP_TO(end - block_size, block_size); + assert(end >= offset); + if (offset < end) + if (block_device_discard(fd, offset, end - offset) < 0) + dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl; +} + +void FileJournal::committed_thru(uint64_t seq) +{ + Mutex::Locker locker(write_lock); + + auto released = throttle.flush(seq); + if (logger) { + logger->dec(l_filestore_journal_ops, released.first); + logger->dec(l_filestore_journal_bytes, released.second); + } + + if (seq < last_committed_seq) { + dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl; + assert(seq >= last_committed_seq); + return; + } + if (seq == last_committed_seq) { + dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl; + return; + } + + dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl; + last_committed_seq = seq; + + // completions! + { + Mutex::Locker locker(finisher_lock); + queue_completions_thru(seq); + if (plug_journal_completions && seq >= header.start_seq) { + dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl; + plug_journal_completions = false; + queue_completions_thru(journaled_seq); + } + } + + // adjust start pointer + while (!journalq.empty() && journalq.front().first <= seq) { + journalq.pop_front(); + } + + int64_t old_start = header.start; + if (!journalq.empty()) { + header.start = journalq.front().second; + header.start_seq = journalq.front().first; + } else { + header.start = write_pos; + header.start_seq = seq + 1; + } + + if (discard) { + dout(10) << __func__ << " will trim (" << old_start << ", " << header.start << ")" << dendl; + if (old_start < header.start) + do_discard(old_start, header.start - 1); + else { + do_discard(old_start, header.max_size - 1); + do_discard(get_top(), header.start - 1); + } + } + + must_write_header = true; + print_header(header); + + // committed but unjournaled items + while (!writeq_empty() && peek_write().seq <= seq) { + dout(15) << " dropping committed but unwritten seq " << peek_write().seq + << " len " << peek_write().bl.length() + << dendl; + complete_write(1, peek_write().orig_len); + pop_write(); + } + + commit_cond.Signal(); + + dout(10) << "committed_thru done" << dendl; +} + + +void FileJournal::complete_write(uint64_t ops, uint64_t bytes) +{ + dout(5) << __func__ << " finished " << ops << " ops and " + << bytes << " bytes" << dendl; +} + +int FileJournal::make_writeable() +{ + dout(10) << __func__ << dendl; + int r = set_throttle_params(); + if (r < 0) + return r; + + r = _open(true); + if (r < 0) + return r; + + if (read_pos > 0) + write_pos = read_pos; + else + write_pos = get_top(); + read_pos = 0; + + must_write_header = true; + + start_writer(); + return 0; +} + +int FileJournal::set_throttle_params() +{ + stringstream ss; + bool valid = throttle.set_params( + cct->_conf->journal_throttle_low_threshhold, + cct->_conf->journal_throttle_high_threshhold, + cct->_conf->filestore_expected_throughput_bytes, + cct->_conf->journal_throttle_high_multiple, + cct->_conf->journal_throttle_max_multiple, + header.max_size - get_top(), + &ss); + + if (!valid) { + derr << "tried to set invalid params: " + << ss.str() + << dendl; + } + return valid ? 0 : -EINVAL; +} + +const char** FileJournal::get_tracked_conf_keys() const +{ + static const char *KEYS[] = { + "journal_throttle_low_threshhold", + "journal_throttle_high_threshhold", + "journal_throttle_high_multiple", + "journal_throttle_max_multiple", + "filestore_expected_throughput_bytes", + NULL}; + return KEYS; +} + +void FileJournal::wrap_read_bl( + off64_t pos, + int64_t olen, + bufferlist* bl, + off64_t *out_pos + ) const +{ + while (olen > 0) { + while (pos >= header.max_size) + pos = pos + get_top() - header.max_size; + + int64_t len; + if (pos + olen > header.max_size) + len = header.max_size - pos; // partial + else + len = olen; // rest + + int64_t actual = ::lseek64(fd, pos, SEEK_SET); + assert(actual == pos); + + bufferptr bp = buffer::create(len); + int r = safe_read_exact(fd, bp.c_str(), len); + if (r) { + derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned " + << r << dendl; + ceph_abort(); + } + bl->push_back(std::move(bp)); + pos += len; + olen -= len; + } + if (pos >= header.max_size) + pos = pos + get_top() - header.max_size; + if (out_pos) + *out_pos = pos; +} + +bool FileJournal::read_entry( + bufferlist &bl, + uint64_t &next_seq, + bool *corrupt) +{ + if (corrupt) + *corrupt = false; + uint64_t seq = next_seq; + + if (!read_pos) { + dout(2) << "read_entry -- not readable" << dendl; + return false; + } + + off64_t pos = read_pos; + off64_t next_pos = pos; + stringstream ss; + read_entry_result result = do_read_entry( + pos, + &next_pos, + &bl, + &seq, + &ss); + if (result == SUCCESS) { + journalq.push_back( pair(seq, pos)); + uint64_t amount_to_take = + next_pos > pos ? + next_pos - pos : + (header.max_size - pos) + (next_pos - get_top()); + throttle.take(amount_to_take); + throttle.register_throttle_seq(next_seq, amount_to_take); + if (logger) { + logger->inc(l_filestore_journal_ops, 1); + logger->inc(l_filestore_journal_bytes, amount_to_take); + } + if (next_seq > seq) { + return false; + } else { + read_pos = next_pos; + next_seq = seq; + if (seq > journaled_seq) + journaled_seq = seq; + return true; + } + } else { + derr << "do_read_entry(" << pos << "): " << ss.str() << dendl; + } + + if (seq && seq < header.committed_up_to) { + derr << "Unable to read past sequence " << seq + << " but header indicates the journal has committed up through " + << header.committed_up_to << ", journal is corrupt" << dendl; + if (cct->_conf->journal_ignore_corruption) { + if (corrupt) + *corrupt = true; + return false; + } else { + ceph_abort(); + } + } + + dout(2) << "No further valid entries found, journal is most likely valid" + << dendl; + return false; +} + +FileJournal::read_entry_result FileJournal::do_read_entry( + off64_t init_pos, + off64_t *next_pos, + bufferlist *bl, + uint64_t *seq, + ostream *ss, + entry_header_t *_h) const +{ + off64_t cur_pos = init_pos; + bufferlist _bl; + if (!bl) + bl = &_bl; + + // header + entry_header_t *h; + bufferlist hbl; + off64_t _next_pos; + wrap_read_bl(cur_pos, sizeof(*h), &hbl, &_next_pos); + h = reinterpret_cast(hbl.c_str()); + + if (!h->check_magic(cur_pos, header.get_fsid64())) { + dout(25) << "read_entry " << init_pos + << " : bad header magic, end of journal" << dendl; + if (ss) + *ss << "bad header magic"; + if (next_pos) + *next_pos = init_pos + (4<<10); // check 4k ahead + return MAYBE_CORRUPT; + } + cur_pos = _next_pos; + + // pad + body + pad + if (h->pre_pad) + cur_pos += h->pre_pad; + + bl->clear(); + wrap_read_bl(cur_pos, h->len, bl, &cur_pos); + + if (h->post_pad) + cur_pos += h->post_pad; + + // footer + entry_header_t *f; + bufferlist fbl; + wrap_read_bl(cur_pos, sizeof(*f), &fbl, &cur_pos); + f = reinterpret_cast(fbl.c_str()); + if (memcmp(f, h, sizeof(*f))) { + if (ss) + *ss << "bad footer magic, partial entry"; + if (next_pos) + *next_pos = cur_pos; + return MAYBE_CORRUPT; + } + + if ((header.flags & header_t::FLAG_CRC) || // if explicitly enabled (new journal) + h->crc32c != 0) { // newer entry in old journal + uint32_t actual_crc = bl->crc32c(0); + if (actual_crc != h->crc32c) { + if (ss) + *ss << "header crc (" << h->crc32c + << ") doesn't match body crc (" << actual_crc << ")"; + if (next_pos) + *next_pos = cur_pos; + return MAYBE_CORRUPT; + } + } + + // yay! + dout(2) << "read_entry " << init_pos << " : seq " << h->seq + << " " << h->len << " bytes" + << dendl; + + // ok! + if (seq) + *seq = h->seq; + + + if (next_pos) + *next_pos = cur_pos; + + if (_h) + *_h = *h; + + assert(cur_pos % header.alignment == 0); + return SUCCESS; +} + +void FileJournal::reserve_throttle_and_backoff(uint64_t count) +{ + throttle.get(count); +} + +void FileJournal::get_header( + uint64_t wanted_seq, + off64_t *_pos, + entry_header_t *h) +{ + off64_t pos = header.start; + off64_t next_pos = pos; + bufferlist bl; + uint64_t seq = 0; + dout(2) << __func__ << dendl; + while (1) { + bl.clear(); + pos = next_pos; + read_entry_result result = do_read_entry( + pos, + &next_pos, + &bl, + &seq, + 0, + h); + if (result == FAILURE || result == MAYBE_CORRUPT) + ceph_abort(); + if (seq == wanted_seq) { + if (_pos) + *_pos = pos; + return; + } + } + ceph_abort(); // not reachable +} + +void FileJournal::corrupt( + int wfd, + off64_t corrupt_at) +{ + dout(2) << __func__ << dendl; + if (corrupt_at >= header.max_size) + corrupt_at = corrupt_at + get_top() - header.max_size; + + int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET); + assert(actual == corrupt_at); + + char buf[10]; + int r = safe_read_exact(fd, buf, 1); + assert(r == 0); + + actual = ::lseek64(wfd, corrupt_at, SEEK_SET); + assert(actual == corrupt_at); + + buf[0]++; + r = safe_write(wfd, buf, 1); + assert(r == 0); +} + +void FileJournal::corrupt_payload( + int wfd, + uint64_t seq) +{ + dout(2) << __func__ << dendl; + off64_t pos = 0; + entry_header_t h; + get_header(seq, &pos, &h); + off64_t corrupt_at = + pos + sizeof(entry_header_t) + h.pre_pad; + corrupt(wfd, corrupt_at); +} + + +void FileJournal::corrupt_footer_magic( + int wfd, + uint64_t seq) +{ + dout(2) << __func__ << dendl; + off64_t pos = 0; + entry_header_t h; + get_header(seq, &pos, &h); + off64_t corrupt_at = + pos + sizeof(entry_header_t) + h.pre_pad + + h.len + h.post_pad + + (reinterpret_cast(&h.magic2) - reinterpret_cast(&h)); + corrupt(wfd, corrupt_at); +} + + +void FileJournal::corrupt_header_magic( + int wfd, + uint64_t seq) +{ + dout(2) << __func__ << dendl; + off64_t pos = 0; + entry_header_t h; + get_header(seq, &pos, &h); + off64_t corrupt_at = + pos + + (reinterpret_cast(&h.magic2) - reinterpret_cast(&h)); + corrupt(wfd, corrupt_at); +} + +off64_t FileJournal::get_journal_size_estimate() +{ + off64_t size, start = header.start; + if (write_pos < start) { + size = (max_size - start) + write_pos; + } else { + size = write_pos - start; + } + dout(20) << __func__ << " journal size=" << size << dendl; + return size; +}