// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "common/perf_counters.h" #include "common/dout.h" #include "include/Context.h" #include "msg/Messenger.h" #include "osdc/Journaler.h" #include "common/errno.h" #include "include/assert.h" #include "common/Finisher.h" #define dout_subsys ceph_subsys_journaler #undef dout_prefix #define dout_prefix *_dout << objecter->messenger->get_myname() \ << ".journaler." << name << (readonly ? "(ro) ":"(rw) ") using std::chrono::seconds; class Journaler::C_DelayFlush : public Context { Journaler *journaler; public: C_DelayFlush(Journaler *j) : journaler(j) {} void finish(int r) override { journaler->_do_delayed_flush(); } }; void Journaler::set_readonly() { lock_guard l(lock); ldout(cct, 1) << "set_readonly" << dendl; readonly = true; } void Journaler::set_writeable() { lock_guard l(lock); ldout(cct, 1) << "set_writeable" << dendl; readonly = false; } void Journaler::create(file_layout_t *l, stream_format_t const sf) { lock_guard lk(lock); assert(!readonly); state = STATE_ACTIVE; stream_format = sf; journal_stream.set_format(sf); _set_layout(l); prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos = requested_pos = received_pos = expire_pos = trimming_pos = trimmed_pos = next_safe_pos = layout.get_period(); ldout(cct, 1) << "created blank journal at inode 0x" << std::hex << ino << std::dec << ", format=" << stream_format << dendl; } void Journaler::set_layout(file_layout_t const *l) { lock_guard lk(lock); _set_layout(l); } void Journaler::_set_layout(file_layout_t const *l) { layout = *l; assert(layout.pool_id == pg_pool); last_written.layout = layout; last_committed.layout = layout; // prefetch intelligently. // (watch out, this is big if you use big objects or weird striping) uint64_t periods = cct->_conf->journaler_prefetch_periods; if (periods < 2) periods = 2; // we need at least 2 periods to make progress. fetch_len = layout.get_period() * periods; } /***************** HEADER *******************/ ostream& operator<<(ostream &out, const Journaler::Header &h) { return out << "loghead(trim " << h.trimmed_pos << ", expire " << h.expire_pos << ", write " << h.write_pos << ", stream_format " << (int)(h.stream_format) << ")"; } class Journaler::C_ReadHead : public Context { Journaler *ls; public: bufferlist bl; explicit C_ReadHead(Journaler *l) : ls(l) {} void finish(int r) override { ls->_finish_read_head(r, bl); } }; class Journaler::C_RereadHead : public Context { Journaler *ls; Context *onfinish; public: bufferlist bl; C_RereadHead(Journaler *l, Context *onfinish_) : ls (l), onfinish(onfinish_) {} void finish(int r) override { ls->_finish_reread_head(r, bl, onfinish); } }; class Journaler::C_ProbeEnd : public Context { Journaler *ls; public: uint64_t end; explicit C_ProbeEnd(Journaler *l) : ls(l), end(-1) {} void finish(int r) override { ls->_finish_probe_end(r, end); } }; class Journaler::C_ReProbe : public Context { Journaler *ls; C_OnFinisher *onfinish; public: uint64_t end; C_ReProbe(Journaler *l, C_OnFinisher *onfinish_) : ls(l), onfinish(onfinish_), end(0) {} void finish(int r) override { ls->_finish_reprobe(r, end, onfinish); } }; void Journaler::recover(Context *onread) { lock_guard l(lock); if (stopping) { onread->complete(-EAGAIN); return; } ldout(cct, 1) << "recover start" << dendl; assert(state != STATE_ACTIVE); assert(readonly); if (onread) waitfor_recover.push_back(wrap_finisher(onread)); if (state != STATE_UNDEF) { ldout(cct, 1) << "recover - already recovering" << dendl; return; } ldout(cct, 1) << "read_head" << dendl; state = STATE_READHEAD; C_ReadHead *fin = new C_ReadHead(this); _read_head(fin, &fin->bl); } void Journaler::_read_head(Context *on_finish, bufferlist *bl) { // lock is locked assert(state == STATE_READHEAD || state == STATE_REREADHEAD); object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, wrap_finisher(on_finish)); } void Journaler::reread_head(Context *onfinish) { lock_guard l(lock); _reread_head(wrap_finisher(onfinish)); } /** * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos * from the on-disk header. This switches the state to STATE_REREADHEAD for * the duration, and you shouldn't start a re-read while other operations are * in-flight, nor start other operations while a re-read is in progress. * Also, don't call this until the Journaler has finished its recovery and has * gone STATE_ACTIVE! */ void Journaler::_reread_head(Context *onfinish) { ldout(cct, 10) << "reread_head" << dendl; assert(state == STATE_ACTIVE); state = STATE_REREADHEAD; C_RereadHead *fin = new C_RereadHead(this, onfinish); _read_head(fin, &fin->bl); } void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) { lock_guard l(lock); //read on-disk header into assert(bl.length() || r < 0 ); // unpack header if (r == 0) { Header h; bufferlist::iterator p = bl.begin(); try { ::decode(h, p); } catch (const buffer::error &e) { finish->complete(-EINVAL); return; } prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; init_headers(h); state = STATE_ACTIVE; } finish->complete(r); } void Journaler::_finish_read_head(int r, bufferlist& bl) { lock_guard l(lock); assert(state == STATE_READHEAD); if (r!=0) { ldout(cct, 0) << "error getting journal off disk" << dendl; list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, r); return; } if (bl.length() == 0) { ldout(cct, 1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl; state = STATE_ACTIVE; list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, 0); return; } // unpack header bool corrupt = false; Header h; bufferlist::iterator p = bl.begin(); try { ::decode(h, p); if (h.magic != magic) { ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '" << magic << "'" << dendl; corrupt = true; } else if (h.write_pos < h.expire_pos || h.expire_pos < h.trimmed_pos) { ldout(cct, 0) << "Corrupt header (bad offsets): " << h << dendl; corrupt = true; } } catch (const buffer::error &e) { corrupt = true; } if (corrupt) { list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, -EINVAL); return; } prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = h.write_pos; read_pos = requested_pos = received_pos = expire_pos = h.expire_pos; trimmed_pos = trimming_pos = h.trimmed_pos; init_headers(h); _set_layout(&h.layout); stream_format = h.stream_format; journal_stream.set_format(h.stream_format); ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl; C_ProbeEnd *fin = new C_ProbeEnd(this); state = STATE_PROBING; _probe(fin, &fin->end); } void Journaler::_probe(Context *finish, uint64_t *end) { // lock is locked ldout(cct, 1) << "probing for end of the log" << dendl; assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log filer.probe(ino, &layout, CEPH_NOSNAP, write_pos, end, true, 0, wrap_finisher(finish)); } void Journaler::_reprobe(C_OnFinisher *finish) { ldout(cct, 10) << "reprobe" << dendl; assert(state == STATE_ACTIVE); state = STATE_REPROBING; C_ReProbe *fin = new C_ReProbe(this, finish); _probe(fin, &fin->end); } void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) { lock_guard l(lock); assert(new_end >= write_pos || r < 0); ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = new_end; state = STATE_ACTIVE; onfinish->complete(r); } void Journaler::_finish_probe_end(int r, uint64_t end) { lock_guard l(lock); assert(state == STATE_PROBING); if (r < 0) { // error in probing goto out; } if (((int64_t)end) == -1) { end = write_pos; ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " << write_pos << "). log was empty. recovered." << dendl; ceph_abort(); // hrm. } else { assert(end >= write_pos); ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " << write_pos << "). recovered." << dendl; } state = STATE_ACTIVE; prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = end; out: // done. list ls; ls.swap(waitfor_recover); finish_contexts(cct, ls, r); } class Journaler::C_RereadHeadProbe : public Context { Journaler *ls; C_OnFinisher *final_finish; public: C_RereadHeadProbe(Journaler *l, C_OnFinisher *finish) : ls(l), final_finish(finish) {} void finish(int r) override { ls->_finish_reread_head_and_probe(r, final_finish); } }; void Journaler::reread_head_and_probe(Context *onfinish) { lock_guard l(lock); assert(state == STATE_ACTIVE); _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); } void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) { // Expect to be called back from finish_reread_head, which already takes lock // lock is locked assert(!r); //if we get an error, we're boned _reprobe(onfinish); } // WRITING class Journaler::C_WriteHead : public Context { public: Journaler *ls; Header h; C_OnFinisher *oncommit; C_WriteHead(Journaler *l, Header& h_, C_OnFinisher *c) : ls(l), h(h_), oncommit(c) {} void finish(int r) override { ls->_finish_write_head(r, h, oncommit); } }; void Journaler::write_head(Context *oncommit) { lock_guard l(lock); _write_head(oncommit); } void Journaler::_write_head(Context *oncommit) { assert(!readonly); assert(state == STATE_ACTIVE); last_written.trimmed_pos = trimmed_pos; last_written.expire_pos = expire_pos; last_written.unused_field = expire_pos; last_written.write_pos = safe_pos; last_written.stream_format = stream_format; ldout(cct, 10) << "write_head " << last_written << dendl; // Avoid persisting bad pointers in case of bugs assert(last_written.write_pos >= last_written.expire_pos); assert(last_written.expire_pos >= last_written.trimmed_pos); last_wrote_head = ceph::real_clock::now(); bufferlist bl; ::encode(last_written, bl); SnapContext snapc; object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); objecter->write_full(oid, oloc, snapc, bl, ceph::real_clock::now(), 0, wrap_finisher(new C_WriteHead( this, last_written, wrap_finisher(oncommit))), 0, 0, write_iohint); } void Journaler::_finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit) { lock_guard l(lock); if (r < 0) { lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl; handle_write_error(r); return; } assert(!readonly); ldout(cct, 10) << "_finish_write_head " << wrote << dendl; last_committed = wrote; if (oncommit) { oncommit->complete(r); } _trim(); // trim? } /***************** WRITING *******************/ class Journaler::C_Flush : public Context { Journaler *ls; uint64_t start; ceph::real_time stamp; public: C_Flush(Journaler *l, int64_t s, ceph::real_time st) : ls(l), start(s), stamp(st) {} void finish(int r) override { ls->_finish_flush(r, start, stamp); } }; void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) { lock_guard l(lock); assert(!readonly); if (r < 0) { lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl; handle_write_error(r); return; } assert(start < flush_pos); // calc latency? if (logger) { ceph::timespan lat = ceph::real_clock::now() - stamp; logger->tinc(logger_key_lat, lat); } // adjust safe_pos auto it = pending_safe.find(start); assert(it != pending_safe.end()); pending_safe.erase(it); if (pending_safe.empty()) safe_pos = next_safe_pos; else safe_pos = pending_safe.begin()->second; ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe << ", (prezeroing/prezero)/write/flush/safe positions now " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; // kick waiters <= safe_pos if (!waitfor_safe.empty()) { list ls; while (!waitfor_safe.empty()) { auto it = waitfor_safe.begin(); if (it->first > safe_pos) break; ls.splice(ls.end(), it->second); waitfor_safe.erase(it); } finish_contexts(cct, ls); } } uint64_t Journaler::append_entry(bufferlist& bl) { unique_lock l(lock); assert(!readonly); uint32_t s = bl.length(); // append size_t delta = bl.length() + journal_stream.get_envelope_size(); // write_buf space is nearly full if (!write_buf_throttle.get_or_fail(delta)) { l.unlock(); ldout(cct, 10) << "write_buf_throttle wait, delta " << delta << dendl; write_buf_throttle.get(delta); l.lock(); } ldout(cct, 20) << "write_buf_throttle get, delta " << delta << dendl; size_t wrote = journal_stream.write(bl, &write_buf, write_pos); ldout(cct, 10) << "append_entry len " << s << " to " << write_pos << "~" << wrote << dendl; write_pos += wrote; // flush previous object? uint64_t su = get_layout_period(); assert(su > 0); uint64_t write_off = write_pos % su; uint64_t write_obj = write_pos / su; uint64_t flush_obj = flush_pos / su; if (write_obj != flush_obj) { ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); // if _do_flush() skips flushing some data, it does do a best effort to // update next_safe_pos. if (write_buf.length() > 0 && write_buf.length() <= wrote) { // the unflushed data are within this entry // set next_safe_pos to end of previous entry next_safe_pos = write_pos - wrote; } } return write_pos; } void Journaler::_do_flush(unsigned amount) { if (write_pos == flush_pos) return; assert(write_pos > flush_pos); assert(!readonly); // flush uint64_t len = write_pos - flush_pos; assert(len == write_buf.length()); if (amount && amount < len) len = amount; // zero at least two full periods ahead. this ensures // that the next object will not exist. uint64_t period = get_layout_period(); if (flush_pos + len + 2*period > prezero_pos) { _issue_prezero(); int64_t newlen = prezero_pos - flush_pos - period; if (newlen <= 0) { ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl; waiting_for_zero = true; return; } if (static_cast(newlen) < len) { ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos << ", will do " << flush_pos << "~" << newlen << dendl; len = newlen; } else { waiting_for_zero = false; } } else { waiting_for_zero = false; } ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; // submit write for anything pending // flush _start_ pos to _finish_flush ceph::real_time now = ceph::real_clock::now(); SnapContext snapc; Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT pending_safe[flush_pos] = next_safe_pos; bufferlist write_bl; // adjust pointers if (len == write_buf.length()) { write_bl.swap(write_buf); next_safe_pos = write_pos; } else { write_buf.splice(0, len, &write_bl); // Keys of waitfor_safe map are journal entry boundaries. // Try finding a journal entry that we are actually flushing // and set next_safe_pos to end of it. This is best effort. // The one we found may not be the lastest flushing entry. auto p = waitfor_safe.lower_bound(flush_pos + len); if (p != waitfor_safe.end()) { if (p->first > flush_pos + len && p != waitfor_safe.begin()) --p; if (p->first <= flush_pos + len && p->first > next_safe_pos) next_safe_pos = p->first; } } filer.write(ino, &layout, snapc, flush_pos, len, write_bl, ceph::real_clock::now(), 0, wrap_finisher(onsafe), write_iohint); flush_pos += len; assert(write_buf.length() == write_pos - flush_pos); write_buf_throttle.put(len); ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl; ldout(cct, 10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; _issue_prezero(); } void Journaler::wait_for_flush(Context *onsafe) { lock_guard l(lock); if (stopping) { onsafe->complete(-EAGAIN); return; } _wait_for_flush(onsafe); } void Journaler::_wait_for_flush(Context *onsafe) { assert(!readonly); // all flushed and safe? if (write_pos == safe_pos) { assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe " "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; if (onsafe) { finisher->queue(onsafe, 0); } return; } // queue waiter if (onsafe) { waitfor_safe[write_pos].push_back(wrap_finisher(onsafe)); } } void Journaler::flush(Context *onsafe) { lock_guard l(lock); _flush(wrap_finisher(onsafe)); } void Journaler::_flush(C_OnFinisher *onsafe) { assert(!readonly); if (write_pos == flush_pos) { assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/" "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl; if (onsafe) { onsafe->complete(0); } } else { _do_flush(); _wait_for_flush(onsafe); } // write head? if (_write_head_needed()) { _write_head(); } } bool Journaler::_write_head_needed() { return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval) < ceph::real_clock::now(); } /*************** prezeroing ******************/ struct C_Journaler_Prezero : public Context { Journaler *journaler; uint64_t from, len; C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) : journaler(j), from(f), len(l) {} void finish(int r) override { journaler->_finish_prezero(r, from, len); } }; void Journaler::_issue_prezero() { assert(prezeroing_pos >= flush_pos); // we need to zero at least two periods, minimum, to ensure that we // have a full empty object/period in front of us. uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods); /* * issue zero requests based on write_pos, even though the invariant * is that we zero ahead of flush_pos. */ uint64_t period = get_layout_period(); uint64_t to = write_pos + period * num_periods + period - 1; to -= to % period; if (prezeroing_pos >= to) { ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl; return; } while (prezeroing_pos < to) { uint64_t len; if (prezeroing_pos % period == 0) { len = period; ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl; } else { len = period - (prezeroing_pos % period); ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl; } SnapContext snapc; Context *c = wrap_finisher(new C_Journaler_Prezero(this, prezeroing_pos, len)); filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph::real_clock::now(), 0, c); prezeroing_pos += len; } } // Lock cycle because we get called out of objecter callback (holding // objecter read lock), but there are also cases where we take the journaler // lock before calling into objecter to do I/O. void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) { lock_guard l(lock); ldout(cct, 10) << "_prezeroed to " << start << "~" << len << ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos << ", pending " << pending_zero << dendl; if (r < 0 && r != -ENOENT) { lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl; handle_write_error(r); return; } assert(r == 0 || r == -ENOENT); if (start == prezero_pos) { prezero_pos += len; while (!pending_zero.empty() && pending_zero.begin().get_start() == prezero_pos) { interval_set::iterator b(pending_zero.begin()); prezero_pos += b.get_len(); pending_zero.erase(b); } if (waiting_for_zero) { _do_flush(); } } else { pending_zero.insert(start, len); } ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos << ", pending " << pending_zero << dendl; } /***************** READING *******************/ class Journaler::C_Read : public Context { Journaler *ls; uint64_t offset; uint64_t length; public: bufferlist bl; C_Read(Journaler *j, uint64_t o, uint64_t l) : ls(j), offset(o), length(l) {} void finish(int r) override { ls->_finish_read(r, offset, length, bl); } }; class Journaler::C_RetryRead : public Context { Journaler *ls; public: explicit C_RetryRead(Journaler *l) : ls(l) {} void finish(int r) override { // Should only be called from waitfor_safe i.e. already inside lock // (ls->lock is locked ls->_prefetch(); } }; void Journaler::_finish_read(int r, uint64_t offset, uint64_t length, bufferlist& bl) { lock_guard l(lock); if (r < 0) { ldout(cct, 0) << "_finish_read got error " << r << dendl; error = r; } else { ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() << dendl; if (bl.length() < length) { ldout(cct, 0) << "_finish_read got less than expected (" << length << ")" << dendl; error = -EINVAL; } } if (error) { if (on_readable) { C_OnFinisher *f = on_readable; on_readable = 0; f->complete(error); } return; } prefetch_buf[offset].swap(bl); try { _assimilate_prefetch(); } catch (const buffer::error &err) { lderr(cct) << "_decode error from assimilate_prefetch" << dendl; error = -EINVAL; if (on_readable) { C_OnFinisher *f = on_readable; on_readable = 0; f->complete(error); } return; } _prefetch(); } void Journaler::_assimilate_prefetch() { bool was_readable = readable; bool got_any = false; while (!prefetch_buf.empty()) { map::iterator p = prefetch_buf.begin(); if (p->first != received_pos) { uint64_t gap = p->first - received_pos; ldout(cct, 10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos << " to first prefetched buffer " << p->first << dendl; break; } ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl; received_pos += p->second.length(); read_buf.claim_append(p->second); assert(received_pos <= requested_pos); prefetch_buf.erase(p); got_any = true; } if (got_any) { ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos << dendl; // Update readability (this will also hit any decode errors resulting // from bad data) readable = _is_readable(); } if ((got_any && !was_readable && readable) || read_pos == write_pos) { // readable! ldout(cct, 10) << "_finish_read now readable (or at journal end) readable=" << readable << " read_pos=" << read_pos << " write_pos=" << write_pos << dendl; if (on_readable) { C_OnFinisher *f = on_readable; on_readable = 0; f->complete(0); } } } void Journaler::_issue_read(uint64_t len) { // stuck at safe_pos? (this is needed if we are reading the tail of // a journal we are also writing to) assert(requested_pos <= safe_pos); if (requested_pos == safe_pos) { ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; assert(write_pos > requested_pos); if (pending_safe.empty()) { _flush(NULL); } // Make sure keys of waitfor_safe map are journal entry boundaries. // The key we used here is either next_safe_pos or old value of // next_safe_pos. next_safe_pos is always set to journal entry // boundary. auto p = pending_safe.rbegin(); if (p != pending_safe.rend()) waitfor_safe[p->second].push_back(new C_RetryRead(this)); else waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this)); return; } // don't read too much if (requested_pos + len > safe_pos) { len = safe_pos - requested_pos; ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl; } // go. ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len) << dendl; // step by period (object). _don't_ do a single big filer.read() // here because it will wait for all object reads to complete before // giving us back any data. this way we can process whatever bits // come in that are contiguous. uint64_t period = get_layout_period(); while (len > 0) { uint64_t e = requested_pos + period; e -= e % period; uint64_t l = e - requested_pos; if (l > len) l = len; C_Read *c = new C_Read(this, requested_pos, l); filer.read(ino, &layout, CEPH_NOSNAP, requested_pos, l, &c->bl, 0, wrap_finisher(c), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); requested_pos += l; len -= l; } } void Journaler::_prefetch() { ldout(cct, 10) << "_prefetch" << dendl; // prefetch uint64_t pf; if (temp_fetch_len) { ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl; pf = temp_fetch_len; temp_fetch_len = 0; } else { pf = fetch_len; } uint64_t raw_target = read_pos + pf; // read full log segments, so increase if necessary uint64_t period = get_layout_period(); uint64_t remainder = raw_target % period; uint64_t adjustment = remainder ? period - remainder : 0; uint64_t target = raw_target + adjustment; // don't read past the log tail if (target > write_pos) target = write_pos; if (requested_pos < target) { uint64_t len = target - requested_pos; ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target << " (" << raw_target << "), prefetching " << len << dendl; if (pending_safe.empty() && write_pos > safe_pos) { // If we are reading and writing the journal, then we may need // to issue a flush if one isn't already in progress. // Avoid doing a flush every time so that if we do write/read/write/read // we don't end up flushing after every write. ldout(cct, 10) << "_prefetch: requested_pos=" << requested_pos << ", read_pos=" << read_pos << ", write_pos=" << write_pos << ", safe_pos=" << safe_pos << dendl; _do_flush(); } _issue_read(len); } } /* * _is_readable() - return true if next entry is ready. */ bool Journaler::_is_readable() { // anything to read? if (read_pos == write_pos) return false; // Check if the retrieve bytestream has enough for an entry uint64_t need; if (journal_stream.readable(read_buf, &need)) { return true; } ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length() << ", but need " << need << " for next entry; fetch_len is " << fetch_len << dendl; // partial fragment at the end? if (received_pos == write_pos) { ldout(cct, 10) << "is_readable() detected partial entry at tail, " "adjusting write_pos to " << read_pos << dendl; // adjust write_pos prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; assert(write_buf.length() == 0); assert(waitfor_safe.empty()); // reset read state requested_pos = received_pos = read_pos; read_buf.clear(); // FIXME: truncate on disk? return false; } if (need > fetch_len) { temp_fetch_len = need; ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len << dendl; } ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl; return false; } /* * is_readable() - kickstart prefetch, too */ bool Journaler::is_readable() { lock_guard l(lock); if (error != 0) { return false; } bool r = readable; _prefetch(); return r; } class Journaler::C_EraseFinish : public Context { Journaler *journaler; C_OnFinisher *completion; public: C_EraseFinish(Journaler *j, C_OnFinisher *c) : journaler(j), completion(c) {} void finish(int r) override { journaler->_finish_erase(r, completion); } }; /** * Entirely erase the journal, including header. For use when you * have already made a copy of the journal somewhere else. */ void Journaler::erase(Context *completion) { lock_guard l(lock); // Async delete the journal data uint64_t first = trimmed_pos / get_layout_period(); uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; filer.purge_range(ino, &layout, SnapContext(), first, num, ceph::real_clock::now(), 0, wrap_finisher(new C_EraseFinish( this, wrap_finisher(completion)))); // We will not start the operation to delete the header until // _finish_erase has seen the data deletion succeed: otherwise if // there was an error deleting data we might prematurely delete the // header thereby lose our reference to the data. } void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) { lock_guard l(lock); if (data_result == 0) { // Async delete the journal header filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph::real_clock::now(), 0, wrap_finisher(completion)); } else { lderr(cct) << "Failed to delete journal " << ino << " data: " << cpp_strerror(data_result) << dendl; completion->complete(data_result); } } /* try_read_entry(bl) * read entry into bl if it's ready. * otherwise, do nothing. */ bool Journaler::try_read_entry(bufferlist& bl) { lock_guard l(lock); if (!readable) { ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl; return false; } uint64_t start_ptr; size_t consumed; try { consumed = journal_stream.read(read_buf, &bl, &start_ptr); if (stream_format >= JOURNAL_FORMAT_RESILIENT) { assert(start_ptr == read_pos); } } catch (const buffer::error &e) { lderr(cct) << __func__ << ": decode error from journal_stream" << dendl; error = -EINVAL; return false; } ldout(cct, 10) << "try_read_entry at " << read_pos << " read " << read_pos << "~" << consumed << " (have " << read_buf.length() << ")" << dendl; read_pos += consumed; try { // We were readable, we might not be any more readable = _is_readable(); } catch (const buffer::error &e) { lderr(cct) << __func__ << ": decode error from _is_readable" << dendl; error = -EINVAL; return false; } // prefetch? _prefetch(); return true; } void Journaler::wait_for_readable(Context *onreadable) { lock_guard l(lock); if (stopping) { finisher->queue(onreadable, -EAGAIN); return; } assert(on_readable == 0); if (!readable) { ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl; on_readable = wrap_finisher(onreadable); } else { // race with OSD reply finisher->queue(onreadable, 0); } } bool Journaler::have_waiter() const { return on_readable != nullptr; } /***************** TRIMMING *******************/ class Journaler::C_Trim : public Context { Journaler *ls; uint64_t to; public: C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {} void finish(int r) override { ls->_finish_trim(r, to); } }; void Journaler::trim() { lock_guard l(lock); _trim(); } void Journaler::_trim() { assert(!readonly); uint64_t period = get_layout_period(); uint64_t trim_to = last_committed.expire_pos; trim_to -= trim_to % period; ldout(cct, 10) << "trim last_commited head was " << last_committed << ", can trim to " << trim_to << dendl; if (trim_to == 0 || trim_to == trimming_pos) { ldout(cct, 10) << "trim already trimmed/trimming to " << trimmed_pos << "/" << trimming_pos << dendl; return; } if (trimming_pos > trimmed_pos) { ldout(cct, 10) << "trim already trimming atm, try again later. " "trimmed/trimming is " << trimmed_pos << "/" << trimming_pos << dendl; return; } // trim assert(trim_to <= write_pos); assert(trim_to <= expire_pos); assert(trim_to > trimming_pos); ldout(cct, 10) << "trim trimming to " << trim_to << ", trimmed/trimming/expire are " << trimmed_pos << "/" << trimming_pos << "/" << expire_pos << dendl; // delete range of objects uint64_t first = trimming_pos / period; uint64_t num = (trim_to - trimming_pos) / period; SnapContext snapc; filer.purge_range(ino, &layout, snapc, first, num, ceph::real_clock::now(), 0, wrap_finisher(new C_Trim(this, trim_to))); trimming_pos = trim_to; } void Journaler::_finish_trim(int r, uint64_t to) { lock_guard l(lock); assert(!readonly); ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos << ", trimmed/trimming/expire now " << to << "/" << trimming_pos << "/" << expire_pos << dendl; if (r < 0 && r != -ENOENT) { lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl; handle_write_error(r); return; } assert(r >= 0 || r == -ENOENT); assert(to <= trimming_pos); assert(to > trimmed_pos); trimmed_pos = to; } void Journaler::handle_write_error(int r) { // lock is locked lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl; if (on_write_error) { on_write_error->complete(r); on_write_error = NULL; called_write_error = true; } else if (called_write_error) { /* We don't call error handler more than once, subsequent errors * are dropped -- this is okay as long as the error handler does * something dramatic like respawn */ lderr(cct) << __func__ << ": multiple write errors, handler already called" << dendl; } else { assert(0 == "unhandled write error"); } } /** * Test whether the 'read_buf' byte stream has enough data to read * an entry * * sets 'next_envelope_size' to the number of bytes needed to advance (enough * to get the next header if header was unavailable, or enough to get the whole * next entry if the header was available but the body wasn't). */ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const { assert(need != NULL); uint32_t entry_size = 0; uint64_t entry_sentinel = 0; bufferlist::iterator p = read_buf.begin(); // Do we have enough data to decode an entry prefix? if (format >= JOURNAL_FORMAT_RESILIENT) { *need = sizeof(entry_size) + sizeof(entry_sentinel); } else { *need = sizeof(entry_size); } if (read_buf.length() >= *need) { if (format >= JOURNAL_FORMAT_RESILIENT) { ::decode(entry_sentinel, p); if (entry_sentinel != sentinel) { throw buffer::malformed_input("Invalid sentinel"); } } ::decode(entry_size, p); } else { return false; } // Do we have enough data to decode an entry prefix, payload and suffix? if (format >= JOURNAL_FORMAT_RESILIENT) { *need = JOURNAL_ENVELOPE_RESILIENT + entry_size; } else { *need = JOURNAL_ENVELOPE_LEGACY + entry_size; } if (read_buf.length() >= *need) { return true; // No more bytes needed } return false; } /** * Consume one entry from a journal byte stream 'from', splicing a * serialized LogEvent blob into 'entry'. * * 'entry' must be non null and point to an empty bufferlist. * * 'from' must contain sufficient valid data (i.e. readable is true). * * 'start_ptr' will be set to the entry's start pointer, if the collection * format provides it. It may not be null. * * @returns The number of bytes consumed from the `from` byte stream. Note * that this is not equal to the length of `entry`, which contains * the inner serialized LogEvent and not the envelope. */ size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ptr) { assert(start_ptr != NULL); assert(entry != NULL); assert(entry->length() == 0); uint32_t entry_size = 0; // Consume envelope prefix: entry_size and entry_sentinel bufferlist::iterator from_ptr = from.begin(); if (format >= JOURNAL_FORMAT_RESILIENT) { uint64_t entry_sentinel = 0; ::decode(entry_sentinel, from_ptr); // Assertion instead of clean check because of precondition of this // fn is that readable() already passed assert(entry_sentinel == sentinel); } ::decode(entry_size, from_ptr); // Read out the payload from_ptr.copy(entry_size, *entry); // Consume the envelope suffix (start_ptr) if (format >= JOURNAL_FORMAT_RESILIENT) { ::decode(*start_ptr, from_ptr); } else { *start_ptr = 0; } // Trim the input buffer to discard the bytes we have consumed from.splice(0, from_ptr.get_off()); return from_ptr.get_off(); } /** * Append one entry */ size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr) { assert(to != NULL); uint32_t const entry_size = entry.length(); if (format >= JOURNAL_FORMAT_RESILIENT) { ::encode(sentinel, *to); } ::encode(entry_size, *to); to->claim_append(entry); if (format >= JOURNAL_FORMAT_RESILIENT) { ::encode(start_ptr, *to); } if (format >= JOURNAL_FORMAT_RESILIENT) { return JOURNAL_ENVELOPE_RESILIENT + entry_size; } else { return JOURNAL_ENVELOPE_LEGACY + entry_size; } } /** * set write error callback * * Set a callback/context to trigger if we get a write error from * the objecter. This may be from an explicit request (e.g., flush) * or something async the journaler did on its own (e.g., journal * header update). * * It is only used once; if the caller continues to use the * Journaler and wants to hear about errors, it needs to reset the * error_handler. * * @param c callback/context to trigger on error */ void Journaler::set_write_error_handler(Context *c) { lock_guard l(lock); assert(!on_write_error); on_write_error = wrap_finisher(c); called_write_error = false; } /** * Wrap a context in a C_OnFinisher, if it is non-NULL * * Utility function to avoid lots of error-prone and verbose * NULL checking on contexts passed in. */ C_OnFinisher *Journaler::wrap_finisher(Context *c) { if (c != NULL) { return new C_OnFinisher(c, finisher); } else { return NULL; } } void Journaler::shutdown() { lock_guard l(lock); ldout(cct, 1) << __func__ << dendl; readable = false; stopping = true; // Kick out anyone reading from journal error = -EAGAIN; if (on_readable) { C_OnFinisher *f = on_readable; on_readable = 0; f->complete(-EAGAIN); } finish_contexts(cct, waitfor_recover, -ESHUTDOWN); std::map >::iterator i; for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) { finish_contexts(cct, i->second, -EAGAIN); } waitfor_safe.clear(); }