X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosdc%2FJournaler.cc;fp=src%2Fceph%2Fsrc%2Fosdc%2FJournaler.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=fe9a6e9cdaf2b043942406e543ea9716fb3cf7fb;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/osdc/Journaler.cc b/src/ceph/src/osdc/Journaler.cc deleted file mode 100644 index fe9a6e9..0000000 --- a/src/ceph/src/osdc/Journaler.cc +++ /dev/null @@ -1,1542 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/perf_counters.h" -#include "common/dout.h" -#include "include/Context.h" -#include "msg/Messenger.h" -#include "osdc/Journaler.h" -#include "common/errno.h" -#include "include/assert.h" -#include "common/Finisher.h" - -#define dout_subsys ceph_subsys_journaler -#undef dout_prefix -#define dout_prefix *_dout << objecter->messenger->get_myname() \ - << ".journaler." << name << (readonly ? "(ro) ":"(rw) ") - -using std::chrono::seconds; - - -class Journaler::C_DelayFlush : public Context { - Journaler *journaler; - public: - C_DelayFlush(Journaler *j) : journaler(j) {} - void finish(int r) override { - journaler->_do_delayed_flush(); - } -}; - -void Journaler::set_readonly() -{ - lock_guard l(lock); - - ldout(cct, 1) << "set_readonly" << dendl; - readonly = true; -} - -void Journaler::set_writeable() -{ - lock_guard l(lock); - - ldout(cct, 1) << "set_writeable" << dendl; - readonly = false; -} - -void Journaler::create(file_layout_t *l, stream_format_t const sf) -{ - lock_guard lk(lock); - - assert(!readonly); - state = STATE_ACTIVE; - - stream_format = sf; - journal_stream.set_format(sf); - _set_layout(l); - - prezeroing_pos = prezero_pos = write_pos = flush_pos = - safe_pos = read_pos = requested_pos = received_pos = - expire_pos = trimming_pos = trimmed_pos = - next_safe_pos = layout.get_period(); - - ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino - << std::dec << ", format=" << stream_format << dendl; -} - -void Journaler::set_layout(file_layout_t const *l) -{ - lock_guard lk(lock); - _set_layout(l); -} - -void Journaler::_set_layout(file_layout_t const *l) -{ - layout = *l; - - assert(layout.pool_id == pg_pool); - last_written.layout = layout; - last_committed.layout = layout; - - // prefetch intelligently. - // (watch out, this is big if you use big objects or weird striping) - uint64_t periods = cct->_conf->journaler_prefetch_periods; - if (periods < 2) - periods = 2; // we need at least 2 periods to make progress. - fetch_len = layout.get_period() * periods; -} - - -/***************** HEADER *******************/ - -ostream& operator<<(ostream &out, const Journaler::Header &h) -{ - return out << "loghead(trim " << h.trimmed_pos - << ", expire " << h.expire_pos - << ", write " << h.write_pos - << ", stream_format " << (int)(h.stream_format) - << ")"; -} - -class Journaler::C_ReadHead : public Context { - Journaler *ls; -public: - bufferlist bl; - explicit C_ReadHead(Journaler *l) : ls(l) {} - void finish(int r) override { - ls->_finish_read_head(r, bl); - } -}; - -class Journaler::C_RereadHead : public Context { - Journaler *ls; - Context *onfinish; -public: - bufferlist bl; - C_RereadHead(Journaler *l, Context *onfinish_) : ls (l), - onfinish(onfinish_) {} - void finish(int r) override { - ls->_finish_reread_head(r, bl, onfinish); - } -}; - -class Journaler::C_ProbeEnd : public Context { - Journaler *ls; -public: - uint64_t end; - explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {} - void finish(int r) override { - ls->_finish_probe_end(r, end); - } -}; - -class Journaler::C_ReProbe : public Context { - Journaler *ls; - C_OnFinisher *onfinish; -public: - uint64_t end; - C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) : - ls(l), onfinish(onfinish_), end(0) {} - void finish(int r) override { - ls->_finish_reprobe(r, end, onfinish); - } -}; - -void Journaler::recover(Context *onread) -{ - lock_guard l(lock); - if (stopping) { - onread->complete(-EAGAIN); - return; - } - - ldout(cct, 1) << "recover start" << dendl; - assert(state != STATE_ACTIVE); - assert(readonly); - - if (onread) - waitfor_recover.push_back(wrap_finisher(onread)); - - if (state != STATE_UNDEF) { - ldout(cct, 1) << "recover - already recovering" << dendl; - return; - } - - ldout(cct, 1) << "read_head" << dendl; - state = STATE_READHEAD; - C_ReadHead *fin = new C_ReadHead(this); - _read_head(fin, &fin->bl); -} - -void Journaler::_read_head(Context *on_finish, bufferlist *bl) -{ - // lock is locked - assert(state == STATE_READHEAD || state == STATE_REREADHEAD); - - object_t oid = file_object_t(ino, 0); - object_locator_t oloc(pg_pool); - objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish)); -} - -void Journaler::reread_head(Context *onfinish) -{ - lock_guard l(lock); - _reread_head(wrap_finisher(onfinish)); -} - -/** - * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos - * from the on-disk header. This switches the state to STATE_REREADHEAD for - * the duration, and you shouldn't start a re-read while other operations are - * in-flight, nor start other operations while a re-read is in progress. - * Also, don't call this until the Journaler has finished its recovery and has - * gone STATE_ACTIVE! - */ -void Journaler::_reread_head(Context *onfinish) -{ - ldout(cct, 10) << "reread_head" << dendl; - assert(state == STATE_ACTIVE); - - state = STATE_REREADHEAD; - C_RereadHead *fin = new C_RereadHead(this, onfinish); - _read_head(fin, &fin->bl); -} - -void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) -{ - lock_guard l(lock); - - //read on-disk header into - assert(bl.length() || r < 0 ); - - // unpack header - if (r == 0) { - Header h; - bufferlist::iterator p = bl.begin(); - try { - ::decode(h, p); - } catch (const buffer::error &e) { - finish->complete(-EINVAL); - return; - } - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos - = h.write_pos; - expire_pos = h.expire_pos; - trimmed_pos = trimming_pos = h.trimmed_pos; - init_headers(h); - state = STATE_ACTIVE; - } - - finish->complete(r); -} - -void Journaler::_finish_read_head(int r, bufferlist& bl) -{ - lock_guard l(lock); - - assert(state == STATE_READHEAD); - - if (r!=0) { - ldout(cct, 0) << "error getting journal off disk" << dendl; - list ls; - ls.swap(waitfor_recover); - finish_contexts(cct, ls, r); - return; - } - - if (bl.length() == 0) { - ldout(cct, 1) << "_finish_read_head r=" << r - << " read 0 bytes, assuming empty log" << dendl; - state = STATE_ACTIVE; - list ls; - ls.swap(waitfor_recover); - finish_contexts(cct, ls, 0); - return; - } - - // unpack header - bool corrupt = false; - Header h; - bufferlist::iterator p = bl.begin(); - try { - ::decode(h, p); - - if (h.magic != magic) { - ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '" - << magic << "'" << dendl; - corrupt = true; - } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) { - ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl; - corrupt = true; - } - } catch (const buffer::error &e) { - corrupt = true; - } - - if (corrupt) { - list ls; - ls.swap(waitfor_recover); - finish_contexts(cct, ls, -EINVAL); - return; - } - - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos - = h.write_pos; - read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; - trimmed_pos = trimming_pos = h.trimmed_pos; - - init_headers(h); - _set_layout(&h.layout); - stream_format = h.stream_format; - journal_stream.set_format(h.stream_format); - - ldout(cct, 1) << "_finish_read_head " << h - << ". probing for end of log (from " << write_pos << ")..." - << dendl; - C_ProbeEnd *fin = new C_ProbeEnd(this); - state = STATE_PROBING; - _probe(fin, &fin->end); -} - -void Journaler::_probe(Context *finish, uint64_t *end) -{ - // lock is locked - ldout(cct, 1) << "probing for end of the log" << dendl; - assert(state == STATE_PROBING || state == STATE_REPROBING); - // probe the log - filer.probe(ino, &layout, CEPH_NOSNAP, - write_pos, end, true, 0, wrap_finisher(finish)); -} - -void Journaler::_reprobe(C_OnFinisher *finish) -{ - ldout(cct, 10) << "reprobe" << dendl; - assert(state == STATE_ACTIVE); - - state = STATE_REPROBING; - C_ReProbe *fin = new C_ReProbe(this, finish); - _probe(fin, &fin->end); -} - - -void Journaler::_finish_reprobe(int r, uint64_t new_end, - C_OnFinisher *onfinish) -{ - lock_guard l(lock); - - assert(new_end >= write_pos || r < 0); - ldout(cct, 1) << "_finish_reprobe new_end = " << new_end - << " (header had " << write_pos << ")." - << dendl; - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end; - state = STATE_ACTIVE; - onfinish->complete(r); -} - -void Journaler::_finish_probe_end(int r, uint64_t end) -{ - lock_guard l(lock); - - assert(state == STATE_PROBING); - if (r < 0) { // error in probing - goto out; - } - if (((int64_t)end) == -1) { - end = write_pos; - ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " - << write_pos << "). log was empty. recovered." << dendl; - ceph_abort(); // hrm. - } else { - assert(end >= write_pos); - ldout(cct, 1) << "_finish_probe_end write_pos = " << end - << " (header had " << write_pos << "). recovered." - << dendl; - } - - state = STATE_ACTIVE; - - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end; - -out: - // done. - list ls; - ls.swap(waitfor_recover); - finish_contexts(cct, ls, r); -} - -class Journaler::C_RereadHeadProbe : public Context -{ - Journaler *ls; - C_OnFinisher *final_finish; -public: - C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) : - ls(l), final_finish(finish) {} - void finish(int r) override { - ls->_finish_reread_head_and_probe(r, final_finish); - } -}; - -void Journaler::reread_head_and_probe(Context *onfinish) -{ - lock_guard l(lock); - - assert(state == STATE_ACTIVE); - _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); -} - -void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) -{ - // Expect to be called back from finish_reread_head, which already takes lock - // lock is locked - - assert(!r); //if we get an error, we're boned - _reprobe(onfinish); -} - - -// WRITING - -class Journaler::C_WriteHead : public Context { -public: - Journaler *ls; - Header h; - C_OnFinisher *oncommit; - C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), - oncommit(c) {} - void finish(int r) override { - ls->_finish_write_head(r, h, oncommit); - } -}; - -void Journaler::write_head(Context *oncommit) -{ - lock_guard l(lock); - _write_head(oncommit); -} - - -void Journaler::_write_head(Context *oncommit) -{ - assert(!readonly); - assert(state == STATE_ACTIVE); - last_written.trimmed_pos = trimmed_pos; - last_written.expire_pos = expire_pos; - last_written.unused_field = expire_pos; - last_written.write_pos = safe_pos; - last_written.stream_format = stream_format; - ldout(cct, 10) << "write_head " << last_written << dendl; - - // Avoid persisting bad pointers in case of bugs - assert(last_written.write_pos >= last_written.expire_pos); - assert(last_written.expire_pos >= last_written.trimmed_pos); - - last_wrote_head = ceph::real_clock::now(); - - bufferlist bl; - ::encode(last_written, bl); - SnapContext snapc; - - object_t oid = file_object_t(ino, 0); - object_locator_t oloc(pg_pool); - objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0, - wrap_finisher(new C_WriteHead( - this, last_written, - wrap_finisher(oncommit))), - 0, 0, write_iohint); -} - -void Journaler::_finish_write_head(int r, Header &wrote, - C_OnFinisher *oncommit) -{ - lock_guard l(lock); - - if (r < 0) { - lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl; - handle_write_error(r); - return; - } - assert(!readonly); - ldout(cct, 10) << "_finish_write_head " << wrote << dendl; - last_committed = wrote; - if (oncommit) { - oncommit->complete(r); - } - - _trim(); // trim? -} - - -/***************** WRITING *******************/ - -class Journaler::C_Flush : public Context { - Journaler *ls; - uint64_t start; - ceph::real_time stamp; -public: - C_Flush(Journaler *l, int64_t s, ceph::real_time st) - : ls(l), start(s), stamp(st) {} - void finish(int r) override { - ls->_finish_flush(r, start, stamp); - } -}; - -void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) -{ - lock_guard l(lock); - assert(!readonly); - - if (r < 0) { - lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl; - handle_write_error(r); - return; - } - - assert(start < flush_pos); - - // calc latency? - if (logger) { - ceph::timespan lat = ceph::real_clock::now() - stamp; - logger->tinc(logger_key_lat, lat); - } - - // adjust safe_pos - auto it = pending_safe.find(start); - assert(it != pending_safe.end()); - pending_safe.erase(it); - if (pending_safe.empty()) - safe_pos = next_safe_pos; - else - safe_pos = pending_safe.begin()->second; - - ldout(cct, 10) << "_finish_flush safe from " << start - << ", pending_safe " << pending_safe - << ", (prezeroing/prezero)/write/flush/safe positions now " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" - << write_pos << "/" << flush_pos << "/" << safe_pos - << dendl; - - // kick waiters <= safe_pos - if (!waitfor_safe.empty()) { - list ls; - while (!waitfor_safe.empty()) { - auto it = waitfor_safe.begin(); - if (it->first > safe_pos) - break; - ls.splice(ls.end(), it->second); - waitfor_safe.erase(it); - } - finish_contexts(cct, ls); - } -} - - - -uint64_t Journaler::append_entry(bufferlist& bl) -{ - unique_lock l(lock); - - assert(!readonly); - uint32_t s = bl.length(); - - // append - size_t delta = bl.length() + journal_stream.get_envelope_size(); - // write_buf space is nearly full - if (!write_buf_throttle.get_or_fail(delta)) { - l.unlock(); - ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl; - write_buf_throttle.get(delta); - l.lock(); - } - ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl; - size_t wrote = journal_stream.write(bl, &write_buf, write_pos); - ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" - << wrote << dendl; - write_pos += wrote; - - // flush previous object? - uint64_t su = get_layout_period(); - assert(su > 0); - uint64_t write_off = write_pos % su; - uint64_t write_obj = write_pos / su; - uint64_t flush_obj = flush_pos / su; - if (write_obj != flush_obj) { - ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " - << write_obj << " flo " << flush_obj << ")" << dendl; - _do_flush(write_buf.length() - write_off); - - // if _do_flush() skips flushing some data, it does do a best effort to - // update next_safe_pos. - if (write_buf.length() > 0 && - write_buf.length() <= wrote) { // the unflushed data are within this entry - // set next_safe_pos to end of previous entry - next_safe_pos = write_pos - wrote; - } - } - - return write_pos; -} - - -void Journaler::_do_flush(unsigned amount) -{ - if (write_pos == flush_pos) - return; - assert(write_pos > flush_pos); - assert(!readonly); - - // flush - uint64_t len = write_pos - flush_pos; - assert(len == write_buf.length()); - if (amount && amount < len) - len = amount; - - // zero at least two full periods ahead. this ensures - // that the next object will not exist. - uint64_t period = get_layout_period(); - if (flush_pos + len + 2*period > prezero_pos) { - _issue_prezero(); - - int64_t newlen = prezero_pos - flush_pos - period; - if (newlen <= 0) { - ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len - << " already too close to prezero_pos " << prezero_pos - << ", zeroing first" << dendl; - waiting_for_zero = true; - return; - } - if (static_cast(newlen) < len) { - ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len - << " but hit prezero_pos " << prezero_pos - << ", will do " << flush_pos << "~" << newlen << dendl; - len = newlen; - } else { - waiting_for_zero = false; - } - } else { - waiting_for_zero = false; - } - ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; - - // submit write for anything pending - // flush _start_ pos to _finish_flush - ceph::real_time now = ceph::real_clock::now(); - SnapContext snapc; - - Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT - pending_safe[flush_pos] = next_safe_pos; - - bufferlist write_bl; - - // adjust pointers - if (len == write_buf.length()) { - write_bl.swap(write_buf); - next_safe_pos = write_pos; - } else { - write_buf.splice(0, len, &write_bl); - // Keys of waitfor_safe map are journal entry boundaries. - // Try finding a journal entry that we are actually flushing - // and set next_safe_pos to end of it. This is best effort. - // The one we found may not be the lastest flushing entry. - auto p = waitfor_safe.lower_bound(flush_pos + len); - if (p != waitfor_safe.end()) { - if (p->first > flush_pos + len && p != waitfor_safe.begin()) - --p; - if (p->first <= flush_pos + len && p->first > next_safe_pos) - next_safe_pos = p->first; - } - } - - filer.write(ino, &layout, snapc, - flush_pos, len, write_bl, ceph::real_clock::now(), - 0, - wrap_finisher(onsafe), write_iohint); - - flush_pos += len; - assert(write_buf.length() == write_pos - flush_pos); - write_buf_throttle.put(len); - ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl; - - ldout(cct, 10) - << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at " - << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos - << "/" << flush_pos << "/" << safe_pos << dendl; - - _issue_prezero(); -} - - -void Journaler::wait_for_flush(Context *onsafe) -{ - lock_guard l(lock); - if (stopping) { - onsafe->complete(-EAGAIN); - return; - } - _wait_for_flush(onsafe); -} - -void Journaler::_wait_for_flush(Context *onsafe) -{ - assert(!readonly); - - // all flushed and safe? - if (write_pos == safe_pos) { - assert(write_buf.length() == 0); - ldout(cct, 10) - << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe " - "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" - << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; - if (onsafe) { - finisher->queue(onsafe, 0); - } - return; - } - - // queue waiter - if (onsafe) { - waitfor_safe[write_pos].push_back(wrap_finisher(onsafe)); - } -} - -void Journaler::flush(Context *onsafe) -{ - lock_guard l(lock); - _flush(wrap_finisher(onsafe)); -} - -void Journaler::_flush(C_OnFinisher *onsafe) -{ - assert(!readonly); - - if (write_pos == flush_pos) { - assert(write_buf.length() == 0); - ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/" - "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos - << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos - << dendl; - if (onsafe) { - onsafe->complete(0); - } - } else { - _do_flush(); - _wait_for_flush(onsafe); - } - - // write head? - if (_write_head_needed()) { - _write_head(); - } -} - -bool Journaler::_write_head_needed() -{ - return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval) - < ceph::real_clock::now(); -} - - -/*************** prezeroing ******************/ - -struct C_Journaler_Prezero : public Context { - Journaler *journaler; - uint64_t from, len; - C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) - : journaler(j), from(f), len(l) {} - void finish(int r) override { - journaler->_finish_prezero(r, from, len); - } -}; - -void Journaler::_issue_prezero() -{ - assert(prezeroing_pos >= flush_pos); - - // we need to zero at least two periods, minimum, to ensure that we - // have a full empty object/period in front of us. - uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods); - - /* - * issue zero requests based on write_pos, even though the invariant - * is that we zero ahead of flush_pos. - */ - uint64_t period = get_layout_period(); - uint64_t to = write_pos + period * num_periods + period - 1; - to -= to % period; - - if (prezeroing_pos >= to) { - ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " - << prezeroing_pos << dendl; - return; - } - - while (prezeroing_pos < to) { - uint64_t len; - if (prezeroing_pos % period == 0) { - len = period; - ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" - << period << " (full period)" << dendl; - } else { - len = period - (prezeroing_pos % period); - ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" - << len << " (partial period)" << dendl; - } - SnapContext snapc; - Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, - len)); - filer.zero(ino, &layout, snapc, prezeroing_pos, len, - ceph::real_clock::now(), 0, c); - prezeroing_pos += len; - } -} - -// Lock cycle because we get called out of objecter callback (holding -// objecter read lock), but there are also cases where we take the journaler -// lock before calling into objecter to do I/O. -void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) -{ - lock_guard l(lock); - - ldout(cct, 10) << "_prezeroed to " << start << "~" << len - << ", prezeroing/prezero was " << prezeroing_pos << "/" - << prezero_pos << ", pending " << pending_zero - << dendl; - if (r < 0 && r != -ENOENT) { - lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl; - handle_write_error(r); - return; - } - - assert(r == 0 || r == -ENOENT); - - if (start == prezero_pos) { - prezero_pos += len; - while (!pending_zero.empty() && - pending_zero.begin().get_start() == prezero_pos) { - interval_set::iterator b(pending_zero.begin()); - prezero_pos += b.get_len(); - pending_zero.erase(b); - } - - if (waiting_for_zero) { - _do_flush(); - } - } else { - pending_zero.insert(start, len); - } - ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos - << "/" << prezero_pos - << ", pending " << pending_zero - << dendl; -} - - - -/***************** READING *******************/ - - -class Journaler::C_Read : public Context { - Journaler *ls; - uint64_t offset; - uint64_t length; -public: - bufferlist bl; - C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {} - void finish(int r) override { - ls->_finish_read(r, offset, length, bl); - } -}; - -class Journaler::C_RetryRead : public Context { - Journaler *ls; -public: - explicit C_RetryRead(Journaler *l) : ls(l) {} - - void finish(int r) override { - // Should only be called from waitfor_safe i.e. already inside lock - // (ls->lock is locked - ls->_prefetch(); - } -}; - -void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, - bufferlist& bl) -{ - lock_guard l(lock); - - if (r < 0) { - ldout(cct, 0) << "_finish_read got error " << r << dendl; - error = r; - } else { - ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() - << dendl; - if (bl.length() < length) { - ldout(cct, 0) << "_finish_read got less than expected (" << length << ")" - << dendl; - error = -EINVAL; - } - } - - if (error) { - if (on_readable) { - C_OnFinisher *f = on_readable; - on_readable = 0; - f->complete(error); - } - return; - } - - prefetch_buf[offset].swap(bl); - - try { - _assimilate_prefetch(); - } catch (const buffer::error &err) { - lderr(cct) << "_decode error from assimilate_prefetch" << dendl; - error = -EINVAL; - if (on_readable) { - C_OnFinisher *f = on_readable; - on_readable = 0; - f->complete(error); - } - return; - } - _prefetch(); -} - -void Journaler::_assimilate_prefetch() -{ - bool was_readable = readable; - - bool got_any = false; - while (!prefetch_buf.empty()) { - map::iterator p = prefetch_buf.begin(); - if (p->first != received_pos) { - uint64_t gap = p->first - received_pos; - ldout(cct, 10) << "_assimilate_prefetch gap of " << gap - << " from received_pos " << received_pos - << " to first prefetched buffer " << p->first << dendl; - break; - } - - ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" - << p->second.length() << dendl; - received_pos += p->second.length(); - read_buf.claim_append(p->second); - assert(received_pos <= requested_pos); - prefetch_buf.erase(p); - got_any = true; - } - - if (got_any) { - ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" - << read_buf.length() << ", read pointers " << read_pos - << "/" << received_pos << "/" << requested_pos - << dendl; - - // Update readability (this will also hit any decode errors resulting - // from bad data) - readable = _is_readable(); - } - - if ((got_any && !was_readable && readable) || read_pos == write_pos) { - // readable! - ldout(cct, 10) << "_finish_read now readable (or at journal end) readable=" - << readable << " read_pos=" << read_pos << " write_pos=" - << write_pos << dendl; - if (on_readable) { - C_OnFinisher *f = on_readable; - on_readable = 0; - f->complete(0); - } - } -} - -void Journaler::_issue_read(uint64_t len) -{ - // stuck at safe_pos? (this is needed if we are reading the tail of - // a journal we are also writing to) - assert(requested_pos <= safe_pos); - if (requested_pos == safe_pos) { - ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos - << ", waiting" << dendl; - assert(write_pos > requested_pos); - if (pending_safe.empty()) { - _flush(NULL); - } - - // Make sure keys of waitfor_safe map are journal entry boundaries. - // The key we used here is either next_safe_pos or old value of - // next_safe_pos. next_safe_pos is always set to journal entry - // boundary. - auto p = pending_safe.rbegin(); - if (p != pending_safe.rend()) - waitfor_safe[p->second].push_back(new C_RetryRead(this)); - else - waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this)); - return; - } - - // don't read too much - if (requested_pos + len > safe_pos) { - len = safe_pos - requested_pos; - ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos - << dendl; - } - - // go. - ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len - << ", read pointers " << read_pos << "/" << received_pos - << "/" << (requested_pos+len) << dendl; - - // step by period (object). _don't_ do a single big filer.read() - // here because it will wait for all object reads to complete before - // giving us back any data. this way we can process whatever bits - // come in that are contiguous. - uint64_t period = get_layout_period(); - while (len > 0) { - uint64_t e = requested_pos + period; - e -= e % period; - uint64_t l = e - requested_pos; - if (l > len) - l = len; - C_Read *c = new C_Read(this, requested_pos, l); - filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, - wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - requested_pos += l; - len -= l; - } -} - -void Journaler::_prefetch() -{ - ldout(cct, 10) << "_prefetch" << dendl; - // prefetch - uint64_t pf; - if (temp_fetch_len) { - ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl; - pf = temp_fetch_len; - temp_fetch_len = 0; - } else { - pf = fetch_len; - } - - uint64_t raw_target = read_pos + pf; - - // read full log segments, so increase if necessary - uint64_t period = get_layout_period(); - uint64_t remainder = raw_target % period; - uint64_t adjustment = remainder ? period - remainder : 0; - uint64_t target = raw_target + adjustment; - - // don't read past the log tail - if (target > write_pos) - target = write_pos; - - if (requested_pos < target) { - uint64_t len = target - requested_pos; - ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos - << " < target " << target << " (" << raw_target - << "), prefetching " << len << dendl; - - if (pending_safe.empty() && write_pos > safe_pos) { - // If we are reading and writing the journal, then we may need - // to issue a flush if one isn't already in progress. - // Avoid doing a flush every time so that if we do write/read/write/read - // we don't end up flushing after every write. - ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos - << ", read_pos=" << read_pos - << ", write_pos=" << write_pos - << ", safe_pos=" << safe_pos << dendl; - _do_flush(); - } - - _issue_read(len); - } -} - - -/* - * _is_readable() - return true if next entry is ready. - */ -bool Journaler::_is_readable() -{ - // anything to read? - if (read_pos == write_pos) - return false; - - // Check if the retrieve bytestream has enough for an entry - uint64_t need; - if (journal_stream.readable(read_buf, &need)) { - return true; - } - - ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() - << ", but need " << need << " for next entry; fetch_len is " - << fetch_len << dendl; - - // partial fragment at the end? - if (received_pos == write_pos) { - ldout(cct, 10) << "is_readable() detected partial entry at tail, " - "adjusting write_pos to " << read_pos << dendl; - - // adjust write_pos - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; - assert(write_buf.length() == 0); - assert(waitfor_safe.empty()); - - // reset read state - requested_pos = received_pos = read_pos; - read_buf.clear(); - - // FIXME: truncate on disk? - - return false; - } - - if (need > fetch_len) { - temp_fetch_len = need; - ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len - << dendl; - } - - ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; - return false; -} - -/* - * is_readable() - kickstart prefetch, too - */ -bool Journaler::is_readable() -{ - lock_guard l(lock); - - if (error != 0) { - return false; - } - - bool r = readable; - _prefetch(); - return r; -} - -class Journaler::C_EraseFinish : public Context { - Journaler *journaler; - C_OnFinisher *completion; - public: - C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {} - void finish(int r) override { - journaler->_finish_erase(r, completion); - } -}; - -/** - * Entirely erase the journal, including header. For use when you - * have already made a copy of the journal somewhere else. - */ -void Journaler::erase(Context *completion) -{ - lock_guard l(lock); - - // Async delete the journal data - uint64_t first = trimmed_pos / get_layout_period(); - uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; - filer.purge_range(ino, &layout, SnapContext(), first, num, - ceph::real_clock::now(), 0, - wrap_finisher(new C_EraseFinish( - this, wrap_finisher(completion)))); - - // We will not start the operation to delete the header until - // _finish_erase has seen the data deletion succeed: otherwise if - // there was an error deleting data we might prematurely delete the - // header thereby lose our reference to the data. -} - -void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) -{ - lock_guard l(lock); - - if (data_result == 0) { - // Async delete the journal header - filer.purge_range(ino, &layout, SnapContext(), 0, 1, - ceph::real_clock::now(), - 0, wrap_finisher(completion)); - } else { - lderr(cct) << "Failed to delete journal " << ino << " data: " - << cpp_strerror(data_result) << dendl; - completion->complete(data_result); - } -} - -/* try_read_entry(bl) - * read entry into bl if it's ready. - * otherwise, do nothing. - */ -bool Journaler::try_read_entry(bufferlist& bl) -{ - lock_guard l(lock); - - if (!readable) { - ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" - << dendl; - return false; - } - - uint64_t start_ptr; - size_t consumed; - try { - consumed = journal_stream.read(read_buf, &bl, &start_ptr); - if (stream_format >= JOURNAL_FORMAT_RESILIENT) { - assert(start_ptr == read_pos); - } - } catch (const buffer::error &e) { - lderr(cct) << __func__ << ": decode error from journal_stream" << dendl; - error = -EINVAL; - return false; - } - - ldout(cct, 10) << "try_read_entry at " << read_pos << " read " - << read_pos << "~" << consumed << " (have " - << read_buf.length() << ")" << dendl; - - read_pos += consumed; - try { - // We were readable, we might not be any more - readable = _is_readable(); - } catch (const buffer::error &e) { - lderr(cct) << __func__ << ": decode error from _is_readable" << dendl; - error = -EINVAL; - return false; - } - - // prefetch? - _prefetch(); - return true; -} - -void Journaler::wait_for_readable(Context *onreadable) -{ - lock_guard l(lock); - if (stopping) { - finisher->queue(onreadable, -EAGAIN); - return; - } - - assert(on_readable == 0); - if (!readable) { - ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " - << onreadable << dendl; - on_readable = wrap_finisher(onreadable); - } else { - // race with OSD reply - finisher->queue(onreadable, 0); - } -} - -bool Journaler::have_waiter() const -{ - return on_readable != nullptr; -} - - - - -/***************** TRIMMING *******************/ - - -class Journaler::C_Trim : public Context { - Journaler *ls; - uint64_t to; -public: - C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {} - void finish(int r) override { - ls->_finish_trim(r, to); - } -}; - -void Journaler::trim() -{ - lock_guard l(lock); - _trim(); -} - -void Journaler::_trim() -{ - assert(!readonly); - uint64_t period = get_layout_period(); - uint64_t trim_to = last_committed.expire_pos; - trim_to -= trim_to % period; - ldout(cct, 10) << "trim last_commited head was " << last_committed - << ", can trim to " << trim_to - << dendl; - if (trim_to == 0 || trim_to == trimming_pos) { - ldout(cct, 10) << "trim already trimmed/trimming to " - << trimmed_pos << "/" << trimming_pos << dendl; - return; - } - - if (trimming_pos > trimmed_pos) { - ldout(cct, 10) << "trim already trimming atm, try again later. " - "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl; - return; - } - - // trim - assert(trim_to <= write_pos); - assert(trim_to <= expire_pos); - assert(trim_to > trimming_pos); - ldout(cct, 10) << "trim trimming to " << trim_to - << ", trimmed/trimming/expire are " - << trimmed_pos << "/" << trimming_pos << "/" << expire_pos - << dendl; - - // delete range of objects - uint64_t first = trimming_pos / period; - uint64_t num = (trim_to - trimming_pos) / period; - SnapContext snapc; - filer.purge_range(ino, &layout, snapc, first, num, - ceph::real_clock::now(), 0, - wrap_finisher(new C_Trim(this, trim_to))); - trimming_pos = trim_to; -} - -void Journaler::_finish_trim(int r, uint64_t to) -{ - lock_guard l(lock); - - assert(!readonly); - ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos - << ", trimmed/trimming/expire now " - << to << "/" << trimming_pos << "/" << expire_pos - << dendl; - if (r < 0 && r != -ENOENT) { - lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl; - handle_write_error(r); - return; - } - - assert(r >= 0 || r == -ENOENT); - - assert(to <= trimming_pos); - assert(to > trimmed_pos); - trimmed_pos = to; -} - -void Journaler::handle_write_error(int r) -{ - // lock is locked - - lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl; - if (on_write_error) { - on_write_error->complete(r); - on_write_error = NULL; - called_write_error = true; - } else if (called_write_error) { - /* We don't call error handler more than once, subsequent errors - * are dropped -- this is okay as long as the error handler does - * something dramatic like respawn */ - lderr(cct) << __func__ << ": multiple write errors, handler already called" - << dendl; - } else { - assert(0 == "unhandled write error"); - } -} - - -/** - * Test whether the 'read_buf' byte stream has enough data to read - * an entry - * - * sets 'next_envelope_size' to the number of bytes needed to advance (enough - * to get the next header if header was unavailable, or enough to get the whole - * next entry if the header was available but the body wasn't). - */ -bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const -{ - assert(need != NULL); - - uint32_t entry_size = 0; - uint64_t entry_sentinel = 0; - bufferlist::iterator p = read_buf.begin(); - - // Do we have enough data to decode an entry prefix? - if (format >= JOURNAL_FORMAT_RESILIENT) { - *need = sizeof(entry_size) + sizeof(entry_sentinel); - } else { - *need = sizeof(entry_size); - } - if (read_buf.length() >= *need) { - if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(entry_sentinel, p); - if (entry_sentinel != sentinel) { - throw buffer::malformed_input("Invalid sentinel"); - } - } - - ::decode(entry_size, p); - } else { - return false; - } - - // Do we have enough data to decode an entry prefix, payload and suffix? - if (format >= JOURNAL_FORMAT_RESILIENT) { - *need = JOURNAL_ENVELOPE_RESILIENT + entry_size; - } else { - *need = JOURNAL_ENVELOPE_LEGACY + entry_size; - } - if (read_buf.length() >= *need) { - return true; // No more bytes needed - } - - return false; -} - - -/** - * Consume one entry from a journal byte stream 'from', splicing a - * serialized LogEvent blob into 'entry'. - * - * 'entry' must be non null and point to an empty bufferlist. - * - * 'from' must contain sufficient valid data (i.e. readable is true). - * - * 'start_ptr' will be set to the entry's start pointer, if the collection - * format provides it. It may not be null. - * - * @returns The number of bytes consumed from the `from` byte stream. Note - * that this is not equal to the length of `entry`, which contains - * the inner serialized LogEvent and not the envelope. - */ -size_t JournalStream::read(bufferlist &from, bufferlist *entry, - uint64_t *start_ptr) -{ - assert(start_ptr != NULL); - assert(entry != NULL); - assert(entry->length() == 0); - - uint32_t entry_size = 0; - - // Consume envelope prefix: entry_size and entry_sentinel - bufferlist::iterator from_ptr = from.begin(); - if (format >= JOURNAL_FORMAT_RESILIENT) { - uint64_t entry_sentinel = 0; - ::decode(entry_sentinel, from_ptr); - // Assertion instead of clean check because of precondition of this - // fn is that readable() already passed - assert(entry_sentinel == sentinel); - } - ::decode(entry_size, from_ptr); - - // Read out the payload - from_ptr.copy(entry_size, *entry); - - // Consume the envelope suffix (start_ptr) - if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(*start_ptr, from_ptr); - } else { - *start_ptr = 0; - } - - // Trim the input buffer to discard the bytes we have consumed - from.splice(0, from_ptr.get_off()); - - return from_ptr.get_off(); -} - - -/** - * Append one entry - */ -size_t JournalStream::write(bufferlist &entry, bufferlist *to, - uint64_t const &start_ptr) -{ - assert(to != NULL); - - uint32_t const entry_size = entry.length(); - if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(sentinel, *to); - } - ::encode(entry_size, *to); - to->claim_append(entry); - if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(start_ptr, *to); - } - - if (format >= JOURNAL_FORMAT_RESILIENT) { - return JOURNAL_ENVELOPE_RESILIENT + entry_size; - } else { - return JOURNAL_ENVELOPE_LEGACY + entry_size; - } -} - -/** - * set write error callback - * - * Set a callback/context to trigger if we get a write error from - * the objecter. This may be from an explicit request (e.g., flush) - * or something async the journaler did on its own (e.g., journal - * header update). - * - * It is only used once; if the caller continues to use the - * Journaler and wants to hear about errors, it needs to reset the - * error_handler. - * - * @param c callback/context to trigger on error - */ -void Journaler::set_write_error_handler(Context *c) { - lock_guard l(lock); - assert(!on_write_error); - on_write_error = wrap_finisher(c); - called_write_error = false; -} - - -/** - * Wrap a context in a C_OnFinisher, if it is non-NULL - * - * Utility function to avoid lots of error-prone and verbose - * NULL checking on contexts passed in. - */ -C_OnFinisher *Journaler::wrap_finisher(Context *c) -{ - if (c != NULL) { - return new C_OnFinisher(c, finisher); - } else { - return NULL; - } -} - -void Journaler::shutdown() -{ - lock_guard l(lock); - - ldout(cct, 1) << __func__ << dendl; - - readable = false; - stopping = true; - - // Kick out anyone reading from journal - error = -EAGAIN; - if (on_readable) { - C_OnFinisher *f = on_readable; - on_readable = 0; - f->complete(-EAGAIN); - } - - finish_contexts(cct, waitfor_recover, -ESHUTDOWN); - - std::map >::iterator i; - for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) { - finish_contexts(cct, i->second, -EAGAIN); - } - waitfor_safe.clear(); -} -