X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FPaxos.cc;fp=src%2Fceph%2Fsrc%2Fmon%2FPaxos.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=e92438769f088a910be5e7c6097e8a19e31caad3;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mon/Paxos.cc b/src/ceph/src/mon/Paxos.cc deleted file mode 100644 index e924387..0000000 --- a/src/ceph/src/mon/Paxos.cc +++ /dev/null @@ -1,1590 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include "Paxos.h" -#include "Monitor.h" -#include "messages/MMonPaxos.h" - -#include "mon/mon_types.h" -#include "common/config.h" -#include "include/assert.h" -#include "include/stringify.h" -#include "common/Timer.h" -#include "messages/PaxosServiceMessage.h" - -#define dout_subsys ceph_subsys_paxos -#undef dout_prefix -#define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed) -static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name, - int rank, const string& paxos_name, int state, - version_t first_committed, version_t last_committed) -{ - return *_dout << "mon." << name << "@" << rank - << "(" << mon->get_state_name() << ")" - << ".paxos(" << paxos_name << " " << Paxos::get_statename(state) - << " c " << first_committed << ".." << last_committed - << ") "; -} - -class Paxos::C_Trimmed : public Context { - Paxos *paxos; -public: - explicit C_Trimmed(Paxos *p) : paxos(p) { } - void finish(int r) override { - paxos->trimming = false; - } -}; - -MonitorDBStore *Paxos::get_store() -{ - return mon->store; -} - -void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx, - version_t first, version_t last) -{ - dout(10) << __func__ << " first " << first << " last " << last << dendl; - for (version_t v = first; v <= last; ++v) { - dout(30) << __func__ << " apply version " << v << dendl; - bufferlist bl; - int err = get_store()->get(get_name(), v, bl); - assert(err == 0); - assert(bl.length()); - decode_append_transaction(tx, bl); - } - dout(15) << __func__ << " total versions " << (last-first) << dendl; -} - -void Paxos::init() -{ - // load paxos variables from stable storage - last_pn = get_store()->get(get_name(), "last_pn"); - accepted_pn = get_store()->get(get_name(), "accepted_pn"); - last_committed = get_store()->get(get_name(), "last_committed"); - first_committed = get_store()->get(get_name(), "first_committed"); - - dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: " - << accepted_pn << " last_committed: " << last_committed - << " first_committed: " << first_committed << dendl; - - dout(10) << "init" << dendl; - assert(is_consistent()); -} - -void Paxos::init_logger() -{ - PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last); - - // Because monitors are so few in number, the resource cost of capturing - // almost all their perf counters at USEFUL is trivial. - pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL); - - pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role"); - pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role"); - pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts"); - pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes"); - pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency"); - pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins"); - pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin"); - pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin"); - pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation"); - pcb.add_u64_counter(l_paxos_commit, "commit", - "Commits", "cmt"); - pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit"); - pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit"); - pcb.add_time_avg(l_paxos_commit_latency, "commit_latency", - "Commit latency", "clat"); - pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects"); - pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect"); - pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect"); - pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency"); - pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects"); - pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts"); - pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts"); - pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts"); - pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts"); - pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk"); - pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state"); - pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state"); - pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency"); - pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state"); - pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state"); - pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state"); - pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries"); - pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency"); - logger = pcb.create_perf_counters(); - g_ceph_context->get_perfcounters_collection()->add(logger); -} - -void Paxos::dump_info(Formatter *f) -{ - f->open_object_section("paxos"); - f->dump_unsigned("first_committed", first_committed); - f->dump_unsigned("last_committed", last_committed); - f->dump_unsigned("last_pn", last_pn); - f->dump_unsigned("accepted_pn", accepted_pn); - f->close_section(); -} - -// --------------------------------- - -// PHASE 1 - -// leader -void Paxos::collect(version_t oldpn) -{ - // we're recoverying, it seems! - state = STATE_RECOVERING; - assert(mon->is_leader()); - - // reset the number of lasts received - uncommitted_v = 0; - uncommitted_pn = 0; - uncommitted_value.clear(); - peer_first_committed.clear(); - peer_last_committed.clear(); - - // look for uncommitted value - if (get_store()->exists(get_name(), last_committed+1)) { - version_t v = get_store()->get(get_name(), "pending_v"); - version_t pn = get_store()->get(get_name(), "pending_pn"); - if (v && pn && v == last_committed + 1) { - uncommitted_pn = pn; - } else { - dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn - << " and crossing our fingers" << dendl; - uncommitted_pn = accepted_pn; - } - uncommitted_v = last_committed+1; - - get_store()->get(get_name(), last_committed+1, uncommitted_value); - assert(uncommitted_value.length()); - dout(10) << "learned uncommitted " << (last_committed+1) - << " pn " << uncommitted_pn - << " (" << uncommitted_value.length() << " bytes) from myself" - << dendl; - - logger->inc(l_paxos_collect_uncommitted); - } - - // pick new pn - accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn)); - accepted_pn_from = last_committed; - num_last = 1; - dout(10) << "collect with pn " << accepted_pn << dendl; - - // send collect - for (set::const_iterator p = mon->get_quorum().begin(); - p != mon->get_quorum().end(); - ++p) { - if (*p == mon->rank) continue; - - MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, - ceph_clock_now()); - collect->last_committed = last_committed; - collect->first_committed = first_committed; - collect->pn = accepted_pn; - mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); - } - - // set timeout event - collect_timeout_event = mon->timer.add_event_after( - g_conf->mon_accept_timeout_factor * - g_conf->mon_lease, - new C_MonContext(mon, [this](int r) { - if (r == -ECANCELED) - return; - collect_timeout(); - })); -} - - -// peon -void Paxos::handle_collect(MonOpRequestRef op) -{ - - op->mark_paxos_event("handle_collect"); - - MMonPaxos *collect = static_cast(op->get_req()); - dout(10) << "handle_collect " << *collect << dendl; - - assert(mon->is_peon()); // mon epoch filter should catch strays - - // we're recoverying, it seems! - state = STATE_RECOVERING; - - //update the peon recovery timeout - reset_lease_timeout(); - - if (collect->first_committed > last_committed+1) { - dout(2) << __func__ - << " leader's lowest version is too high for our last committed" - << " (theirs: " << collect->first_committed - << "; ours: " << last_committed << ") -- bootstrap!" << dendl; - op->mark_paxos_event("need to bootstrap"); - mon->bootstrap(); - return; - } - - // reply - MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, - ceph_clock_now()); - last->last_committed = last_committed; - last->first_committed = first_committed; - - version_t previous_pn = accepted_pn; - - // can we accept this pn? - if (collect->pn > accepted_pn) { - // ok, accept it - accepted_pn = collect->pn; - accepted_pn_from = collect->pn_from; - dout(10) << "accepting pn " << accepted_pn << " from " - << accepted_pn_from << dendl; - - auto t(std::make_shared()); - t->put(get_name(), "accepted_pn", accepted_pn); - - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_collect); - logger->inc(l_paxos_collect_keys, t->get_keys()); - logger->inc(l_paxos_collect_bytes, t->get_bytes()); - utime_t start = ceph_clock_now(); - - get_store()->apply_transaction(t); - - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_collect_latency, end - start); - } else { - // don't accept! - dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from - << ", we already accepted " << accepted_pn - << " from " << accepted_pn_from << dendl; - } - last->pn = accepted_pn; - last->pn_from = accepted_pn_from; - - // share whatever committed values we have - if (collect->last_committed < last_committed) - share_state(last, collect->first_committed, collect->last_committed); - - // do we have an accepted but uncommitted value? - // (it'll be at last_committed+1) - bufferlist bl; - if (collect->last_committed <= last_committed && - get_store()->exists(get_name(), last_committed+1)) { - get_store()->get(get_name(), last_committed+1, bl); - assert(bl.length() > 0); - dout(10) << " sharing our accepted but uncommitted value for " - << last_committed+1 << " (" << bl.length() << " bytes)" << dendl; - last->values[last_committed+1] = bl; - - version_t v = get_store()->get(get_name(), "pending_v"); - version_t pn = get_store()->get(get_name(), "pending_pn"); - if (v && pn && v == last_committed + 1) { - last->uncommitted_pn = pn; - } else { - // previously we didn't record which pn a value was accepted - // under! use the pn value we just had... :( - dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn - << " and crossing our fingers" << dendl; - last->uncommitted_pn = previous_pn; - } - - logger->inc(l_paxos_collect_uncommitted); - } - - // send reply - collect->get_connection()->send_message(last); -} - -/** - * @note This is Okay. We share our versions between peer_last_committed and - * our last_committed (inclusive), and add their bufferlists to the - * message. It will be the peer's job to apply them to its store, as - * these bufferlists will contain raw transactions. - * This function is called by both the Peon and the Leader. The Peon will - * share the state with the Leader during handle_collect(), sharing any - * values the leader may be missing (i.e., the leader's last_committed is - * lower than the peon's last_committed). The Leader will share the state - * with the Peon during handle_last(), if the peon's last_committed is - * lower than the leader's last_committed. - */ -void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, - version_t peer_last_committed) -{ - assert(peer_last_committed < last_committed); - - dout(10) << "share_state peer has fc " << peer_first_committed - << " lc " << peer_last_committed << dendl; - version_t v = peer_last_committed + 1; - - // include incrementals - uint64_t bytes = 0; - for ( ; v <= last_committed; v++) { - if (get_store()->exists(get_name(), v)) { - get_store()->get(get_name(), v, m->values[v]); - assert(m->values[v].length()); - dout(10) << " sharing " << v << " (" - << m->values[v].length() << " bytes)" << dendl; - bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16 - } - } - logger->inc(l_paxos_share_state); - logger->inc(l_paxos_share_state_keys, m->values.size()); - logger->inc(l_paxos_share_state_bytes, bytes); - - m->last_committed = last_committed; -} - -/** - * Store on disk a state that was shared with us - * - * Basically, we received a set of version. Or just one. It doesn't matter. - * What matters is that we have to stash it in the store. So, we will simply - * write every single bufferlist into their own versions on our side (i.e., - * onto paxos-related keys), and then we will decode those same bufferlists - * we just wrote and apply the transactions they hold. We will also update - * our first and last committed values to point to the new values, if need - * be. All all this is done tightly wrapped in a transaction to ensure we - * enjoy the atomicity guarantees given by our awesome k/v store. - */ -bool Paxos::store_state(MMonPaxos *m) -{ - auto t(std::make_shared()); - map::iterator start = m->values.begin(); - bool changed = false; - - // build map of values to store - // we want to write the range [last_committed, m->last_committed] only. - if (start != m->values.end() && - start->first > last_committed + 1) { - // ignore everything if values start in the future. - dout(10) << "store_state ignoring all values, they start at " << start->first - << " > last_committed+1" << dendl; - return false; - } - - // push forward the start position on the message's values iterator, up until - // we run out of positions or we find a position matching 'last_committed'. - while (start != m->values.end() && start->first <= last_committed) { - ++start; - } - - // make sure we get the right interval of values to apply by pushing forward - // the 'end' iterator until it matches the message's 'last_committed'. - map::iterator end = start; - while (end != m->values.end() && end->first <= m->last_committed) { - last_committed = end->first; - ++end; - } - - if (start == end) { - dout(10) << "store_state nothing to commit" << dendl; - } else { - dout(10) << "store_state [" << start->first << ".." - << last_committed << "]" << dendl; - t->put(get_name(), "last_committed", last_committed); - - // we should apply the state here -- decode every single bufferlist in the - // map and append the transactions to 't'. - map::iterator it; - for (it = start; it != end; ++it) { - // write the bufferlist as the version's value - t->put(get_name(), it->first, it->second); - // decode the bufferlist and append it to the transaction we will shortly - // apply. - decode_append_transaction(t, it->second); - } - - // discard obsolete uncommitted value? - if (uncommitted_v && uncommitted_v <= last_committed) { - dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v - << " pn " << uncommitted_pn << dendl; - uncommitted_v = 0; - uncommitted_pn = 0; - uncommitted_value.clear(); - } - } - if (!t->empty()) { - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_store_state); - logger->inc(l_paxos_store_state_bytes, t->get_bytes()); - logger->inc(l_paxos_store_state_keys, t->get_keys()); - utime_t start = ceph_clock_now(); - - get_store()->apply_transaction(t); - - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_store_state_latency, end - start); - - // refresh first_committed; this txn may have trimmed. - first_committed = get_store()->get(get_name(), "first_committed"); - - _sanity_check_store(); - changed = true; - } - - return changed; -} - -void Paxos::_sanity_check_store() -{ - version_t lc = get_store()->get(get_name(), "last_committed"); - assert(lc == last_committed); -} - - -// leader -void Paxos::handle_last(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_last"); - MMonPaxos *last = static_cast(op->get_req()); - bool need_refresh = false; - int from = last->get_source().num(); - - dout(10) << "handle_last " << *last << dendl; - - if (!mon->is_leader()) { - dout(10) << "not leader, dropping" << dendl; - return; - } - - // note peer's first_ and last_committed, in case we learn a new - // commit and need to push it to them. - peer_first_committed[from] = last->first_committed; - peer_last_committed[from] = last->last_committed; - - if (last->first_committed > last_committed + 1) { - dout(5) << __func__ - << " mon." << from - << " lowest version is too high for our last committed" - << " (theirs: " << last->first_committed - << "; ours: " << last_committed << ") -- bootstrap!" << dendl; - op->mark_paxos_event("need to bootstrap"); - mon->bootstrap(); - return; - } - - assert(g_conf->paxos_kill_at != 1); - - // store any committed values if any are specified in the message - need_refresh = store_state(last); - - assert(g_conf->paxos_kill_at != 2); - - // is everyone contiguous and up to date? - for (map::iterator p = peer_last_committed.begin(); - p != peer_last_committed.end(); - ++p) { - if (p->second + 1 < first_committed && first_committed > 1) { - dout(5) << __func__ - << " peon " << p->first - << " last_committed (" << p->second - << ") is too low for our first_committed (" << first_committed - << ") -- bootstrap!" << dendl; - op->mark_paxos_event("need to bootstrap"); - mon->bootstrap(); - return; - } - if (p->second < last_committed) { - // share committed values - dout(10) << " sending commit to mon." << p->first << dendl; - MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), - MMonPaxos::OP_COMMIT, - ceph_clock_now()); - share_state(commit, peer_first_committed[p->first], p->second); - mon->messenger->send_message(commit, mon->monmap->get_inst(p->first)); - } - } - - // do they accept your pn? - if (last->pn > accepted_pn) { - // no, try again. - dout(10) << " they had a higher pn than us, picking a new one." << dendl; - - // cancel timeout event - mon->timer.cancel_event(collect_timeout_event); - collect_timeout_event = 0; - - collect(last->pn); - } else if (last->pn == accepted_pn) { - // yes, they accepted our pn. great. - num_last++; - dout(10) << " they accepted our pn, we now have " - << num_last << " peons" << dendl; - - // did this person send back an accepted but uncommitted value? - if (last->uncommitted_pn) { - if (last->uncommitted_pn >= uncommitted_pn && - last->last_committed >= last_committed && - last->last_committed + 1 >= uncommitted_v) { - uncommitted_v = last->last_committed+1; - uncommitted_pn = last->uncommitted_pn; - uncommitted_value = last->values[uncommitted_v]; - dout(10) << "we learned an uncommitted value for " << uncommitted_v - << " pn " << uncommitted_pn - << " " << uncommitted_value.length() << " bytes" - << dendl; - } else { - dout(10) << "ignoring uncommitted value for " << (last->last_committed+1) - << " pn " << last->uncommitted_pn - << " " << last->values[last->last_committed+1].length() << " bytes" - << dendl; - } - } - - // is that everyone? - if (num_last == mon->get_quorum().size()) { - // cancel timeout event - mon->timer.cancel_event(collect_timeout_event); - collect_timeout_event = 0; - peer_first_committed.clear(); - peer_last_committed.clear(); - - // almost... - - // did we learn an old value? - if (uncommitted_v == last_committed+1 && - uncommitted_value.length()) { - dout(10) << "that's everyone. begin on old learned value" << dendl; - state = STATE_UPDATING_PREVIOUS; - begin(uncommitted_value); - } else { - // active! - dout(10) << "that's everyone. active!" << dendl; - extend_lease(); - - need_refresh = false; - if (do_refresh()) { - finish_round(); - } - } - } - } else { - // no, this is an old message, discard - dout(10) << "old pn, ignoring" << dendl; - } - - if (need_refresh) - (void)do_refresh(); -} - -void Paxos::collect_timeout() -{ - dout(1) << "collect timeout, calling fresh election" << dendl; - collect_timeout_event = 0; - logger->inc(l_paxos_collect_timeout); - assert(mon->is_leader()); - mon->bootstrap(); -} - - -// leader -void Paxos::begin(bufferlist& v) -{ - dout(10) << "begin for " << last_committed+1 << " " - << v.length() << " bytes" - << dendl; - - assert(mon->is_leader()); - assert(is_updating() || is_updating_previous()); - - // we must already have a majority for this to work. - assert(mon->get_quorum().size() == 1 || - num_last > (unsigned)mon->monmap->size()/2); - - // and no value, yet. - assert(new_value.length() == 0); - - // accept it ourselves - accepted.clear(); - accepted.insert(mon->rank); - new_value = v; - - if (last_committed == 0) { - auto t(std::make_shared()); - // initial base case; set first_committed too - t->put(get_name(), "first_committed", 1); - decode_append_transaction(t, new_value); - - bufferlist tx_bl; - t->encode(tx_bl); - - new_value = tx_bl; - } - - // store the proposed value in the store. IF it is accepted, we will then - // have to decode it into a transaction and apply it. - auto t(std::make_shared()); - t->put(get_name(), last_committed+1, new_value); - - // note which pn this pending value is for. - t->put(get_name(), "pending_v", last_committed + 1); - t->put(get_name(), "pending_pn", accepted_pn); - - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - auto debug_tx(std::make_shared()); - bufferlist::iterator new_value_it = new_value.begin(); - debug_tx->decode(new_value_it); - debug_tx->dump(&f); - *_dout << "\nbl dump:\n"; - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_begin); - logger->inc(l_paxos_begin_keys, t->get_keys()); - logger->inc(l_paxos_begin_bytes, t->get_bytes()); - utime_t start = ceph_clock_now(); - - get_store()->apply_transaction(t); - - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_begin_latency, end - start); - - assert(g_conf->paxos_kill_at != 3); - - if (mon->get_quorum().size() == 1) { - // we're alone, take it easy - commit_start(); - return; - } - - // ask others to accept it too! - for (set::const_iterator p = mon->get_quorum().begin(); - p != mon->get_quorum().end(); - ++p) { - if (*p == mon->rank) continue; - - dout(10) << " sending begin to mon." << *p << dendl; - MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, - ceph_clock_now()); - begin->values[last_committed+1] = new_value; - begin->last_committed = last_committed; - begin->pn = accepted_pn; - - mon->messenger->send_message(begin, mon->monmap->get_inst(*p)); - } - - // set timeout event - accept_timeout_event = mon->timer.add_event_after( - g_conf->mon_accept_timeout_factor * g_conf->mon_lease, - new C_MonContext(mon, [this](int r) { - if (r == -ECANCELED) - return; - accept_timeout(); - })); -} - -// peon -void Paxos::handle_begin(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_begin"); - MMonPaxos *begin = static_cast(op->get_req()); - dout(10) << "handle_begin " << *begin << dendl; - - // can we accept this? - if (begin->pn < accepted_pn) { - dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl; - op->mark_paxos_event("have higher pn, ignore"); - return; - } - assert(begin->pn == accepted_pn); - assert(begin->last_committed == last_committed); - - assert(g_conf->paxos_kill_at != 4); - - logger->inc(l_paxos_begin); - - // set state. - state = STATE_UPDATING; - lease_expire = utime_t(); // cancel lease - - // yes. - version_t v = last_committed+1; - dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl; - // store the accepted value onto our store. We will have to decode it and - // apply its transaction once we receive permission to commit. - auto t(std::make_shared()); - t->put(get_name(), v, begin->values[v]); - - // note which pn this pending value is for. - t->put(get_name(), "pending_v", v); - t->put(get_name(), "pending_pn", accepted_pn); - - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_begin_bytes, t->get_bytes()); - utime_t start = ceph_clock_now(); - - get_store()->apply_transaction(t); - - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_begin_latency, end - start); - - assert(g_conf->paxos_kill_at != 5); - - // reply - MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, - ceph_clock_now()); - accept->pn = accepted_pn; - accept->last_committed = last_committed; - begin->get_connection()->send_message(accept); -} - -// leader -void Paxos::handle_accept(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_accept"); - MMonPaxos *accept = static_cast(op->get_req()); - dout(10) << "handle_accept " << *accept << dendl; - int from = accept->get_source().num(); - - if (accept->pn != accepted_pn) { - // we accepted a higher pn, from some other leader - dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl; - op->mark_paxos_event("have higher pn, ignore"); - return; - } - if (last_committed > 0 && - accept->last_committed < last_committed-1) { - dout(10) << " this is from an old round, ignoring" << dendl; - op->mark_paxos_event("old round, ignore"); - return; - } - assert(accept->last_committed == last_committed || // not committed - accept->last_committed == last_committed-1); // committed - - assert(is_updating() || is_updating_previous()); - assert(accepted.count(from) == 0); - accepted.insert(from); - dout(10) << " now " << accepted << " have accepted" << dendl; - - assert(g_conf->paxos_kill_at != 6); - - // only commit (and expose committed state) when we get *all* quorum - // members to accept. otherwise, they may still be sharing the now - // stale state. - // FIXME: we can improve this with an additional lease revocation message - // that doesn't block for the persist. - if (accepted == mon->get_quorum()) { - // yay, commit! - dout(10) << " got majority, committing, done with update" << dendl; - op->mark_paxos_event("commit_start"); - commit_start(); - } -} - -void Paxos::accept_timeout() -{ - dout(1) << "accept timeout, calling fresh election" << dendl; - accept_timeout_event = 0; - assert(mon->is_leader()); - assert(is_updating() || is_updating_previous() || is_writing() || - is_writing_previous()); - logger->inc(l_paxos_accept_timeout); - mon->bootstrap(); -} - -struct C_Committed : public Context { - Paxos *paxos; - explicit C_Committed(Paxos *p) : paxos(p) {} - void finish(int r) override { - assert(r >= 0); - Mutex::Locker l(paxos->mon->lock); - if (paxos->is_shutdown()) { - paxos->abort_commit(); - return; - } - paxos->commit_finish(); - } -}; - -void Paxos::abort_commit() -{ - assert(commits_started > 0); - --commits_started; - if (commits_started == 0) - shutdown_cond.Signal(); -} - -void Paxos::commit_start() -{ - dout(10) << __func__ << " " << (last_committed+1) << dendl; - - assert(g_conf->paxos_kill_at != 7); - - auto t(std::make_shared()); - - // commit locally - t->put(get_name(), "last_committed", last_committed + 1); - - // decode the value and apply its transaction to the store. - // this value can now be read from last_committed. - decode_append_transaction(t, new_value); - - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_commit); - logger->inc(l_paxos_commit_keys, t->get_keys()); - logger->inc(l_paxos_commit_bytes, t->get_bytes()); - commit_start_stamp = ceph_clock_now(); - - get_store()->queue_transaction(t, new C_Committed(this)); - - if (is_updating_previous()) - state = STATE_WRITING_PREVIOUS; - else if (is_updating()) - state = STATE_WRITING; - else - ceph_abort(); - ++commits_started; - - if (mon->get_quorum().size() > 1) { - // cancel timeout event - mon->timer.cancel_event(accept_timeout_event); - accept_timeout_event = 0; - } -} - -void Paxos::commit_finish() -{ - dout(20) << __func__ << " " << (last_committed+1) << dendl; - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_commit_latency, end - commit_start_stamp); - - assert(g_conf->paxos_kill_at != 8); - - // cancel lease - it was for the old value. - // (this would only happen if message layer lost the 'begin', but - // leader still got a majority and committed with out us.) - lease_expire = utime_t(); // cancel lease - - last_committed++; - last_commit_time = ceph_clock_now(); - - // refresh first_committed; this txn may have trimmed. - first_committed = get_store()->get(get_name(), "first_committed"); - - _sanity_check_store(); - - // tell everyone - for (set::const_iterator p = mon->get_quorum().begin(); - p != mon->get_quorum().end(); - ++p) { - if (*p == mon->rank) continue; - - dout(10) << " sending commit to mon." << *p << dendl; - MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, - ceph_clock_now()); - commit->values[last_committed] = new_value; - commit->pn = accepted_pn; - commit->last_committed = last_committed; - - mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); - } - - assert(g_conf->paxos_kill_at != 9); - - // get ready for a new round. - new_value.clear(); - - // WRITING -> REFRESH - // among other things, this lets do_refresh() -> mon->bootstrap() know - // it doesn't need to flush the store queue - assert(is_writing() || is_writing_previous()); - state = STATE_REFRESH; - assert(commits_started > 0); - --commits_started; - - if (do_refresh()) { - commit_proposal(); - if (mon->get_quorum().size() > 1) { - extend_lease(); - } - - finish_contexts(g_ceph_context, waiting_for_commit); - - assert(g_conf->paxos_kill_at != 10); - - finish_round(); - } -} - - -void Paxos::handle_commit(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_commit"); - MMonPaxos *commit = static_cast(op->get_req()); - dout(10) << "handle_commit on " << commit->last_committed << dendl; - - logger->inc(l_paxos_commit); - - if (!mon->is_peon()) { - dout(10) << "not a peon, dropping" << dendl; - ceph_abort(); - return; - } - - op->mark_paxos_event("store_state"); - store_state(commit); - - if (do_refresh()) { - finish_contexts(g_ceph_context, waiting_for_commit); - } -} - -void Paxos::extend_lease() -{ - assert(mon->is_leader()); - //assert(is_active()); - - lease_expire = ceph_clock_now(); - lease_expire += g_conf->mon_lease; - acked_lease.clear(); - acked_lease.insert(mon->rank); - - dout(7) << "extend_lease now+" << g_conf->mon_lease - << " (" << lease_expire << ")" << dendl; - - // bcast - for (set::const_iterator p = mon->get_quorum().begin(); - p != mon->get_quorum().end(); ++p) { - - if (*p == mon->rank) continue; - MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, - ceph_clock_now()); - lease->last_committed = last_committed; - lease->lease_timestamp = lease_expire; - lease->first_committed = first_committed; - mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); - } - - // set timeout event. - // if old timeout is still in place, leave it. - if (!lease_ack_timeout_event) { - lease_ack_timeout_event = mon->timer.add_event_after( - g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease, - new C_MonContext(mon, [this](int r) { - if (r == -ECANCELED) - return; - lease_ack_timeout(); - })); - } - - // set renew event - utime_t at = lease_expire; - at -= g_conf->mon_lease; - at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease; - lease_renew_event = mon->timer.add_event_at( - at, new C_MonContext(mon, [this](int r) { - if (r == -ECANCELED) - return; - lease_renew_timeout(); - })); -} - -void Paxos::warn_on_future_time(utime_t t, entity_name_t from) -{ - utime_t now = ceph_clock_now(); - if (t > now) { - utime_t diff = t - now; - if (diff > g_conf->mon_clock_drift_allowed) { - utime_t warn_diff = now - last_clock_drift_warn; - if (warn_diff > - pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) { - mon->clog->warn() << "message from " << from << " was stamped " << diff - << "s in the future, clocks not synchronized"; - last_clock_drift_warn = ceph_clock_now(); - ++clock_drift_warned; - } - } - } - -} - -bool Paxos::do_refresh() -{ - bool need_bootstrap = false; - - utime_t start = ceph_clock_now(); - - // make sure we have the latest state loaded up - mon->refresh_from_paxos(&need_bootstrap); - - utime_t end = ceph_clock_now(); - logger->inc(l_paxos_refresh); - logger->tinc(l_paxos_refresh_latency, end - start); - - if (need_bootstrap) { - dout(10) << " doing requested bootstrap" << dendl; - mon->bootstrap(); - return false; - } - - return true; -} - -void Paxos::commit_proposal() -{ - dout(10) << __func__ << dendl; - assert(mon->is_leader()); - assert(is_refresh()); - - finish_contexts(g_ceph_context, committing_finishers); -} - -void Paxos::finish_round() -{ - dout(10) << __func__ << dendl; - assert(mon->is_leader()); - - // ok, now go active! - state = STATE_ACTIVE; - - dout(20) << __func__ << " waiting_for_acting" << dendl; - finish_contexts(g_ceph_context, waiting_for_active); - dout(20) << __func__ << " waiting_for_readable" << dendl; - finish_contexts(g_ceph_context, waiting_for_readable); - dout(20) << __func__ << " waiting_for_writeable" << dendl; - finish_contexts(g_ceph_context, waiting_for_writeable); - - dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl; - - if (should_trim()) { - trim(); - } - - if (is_active() && pending_proposal) { - propose_pending(); - } -} - - -// peon -void Paxos::handle_lease(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_lease"); - MMonPaxos *lease = static_cast(op->get_req()); - // sanity - if (!mon->is_peon() || - last_committed != lease->last_committed) { - dout(10) << "handle_lease i'm not a peon, or they're not the leader," - << " or the last_committed doesn't match, dropping" << dendl; - op->mark_paxos_event("invalid lease, ignore"); - return; - } - - warn_on_future_time(lease->sent_timestamp, lease->get_source()); - - // extend lease - if (lease_expire < lease->lease_timestamp) { - lease_expire = lease->lease_timestamp; - - utime_t now = ceph_clock_now(); - if (lease_expire < now) { - utime_t diff = now - lease_expire; - derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl; - } - } - - state = STATE_ACTIVE; - - dout(10) << "handle_lease on " << lease->last_committed - << " now " << lease_expire << dendl; - - // ack - MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, - ceph_clock_now()); - ack->last_committed = last_committed; - ack->first_committed = first_committed; - ack->lease_timestamp = ceph_clock_now(); - ::encode(mon->session_map.feature_map, ack->feature_map); - lease->get_connection()->send_message(ack); - - // (re)set timeout event. - reset_lease_timeout(); - - // kick waiters - finish_contexts(g_ceph_context, waiting_for_active); - if (is_readable()) - finish_contexts(g_ceph_context, waiting_for_readable); -} - -void Paxos::handle_lease_ack(MonOpRequestRef op) -{ - op->mark_paxos_event("handle_lease_ack"); - MMonPaxos *ack = static_cast(op->get_req()); - int from = ack->get_source().num(); - - if (!lease_ack_timeout_event) { - dout(10) << "handle_lease_ack from " << ack->get_source() - << " -- stray (probably since revoked)" << dendl; - - } else if (acked_lease.count(from) == 0) { - acked_lease.insert(from); - if (ack->feature_map.length()) { - auto p = ack->feature_map.begin(); - FeatureMap& t = mon->quorum_feature_map[from]; - ::decode(t, p); - } - if (acked_lease == mon->get_quorum()) { - // yay! - dout(10) << "handle_lease_ack from " << ack->get_source() - << " -- got everyone" << dendl; - mon->timer.cancel_event(lease_ack_timeout_event); - lease_ack_timeout_event = 0; - - - } else { - dout(10) << "handle_lease_ack from " << ack->get_source() - << " -- still need " - << mon->get_quorum().size() - acked_lease.size() - << " more" << dendl; - } - } else { - dout(10) << "handle_lease_ack from " << ack->get_source() - << " dup (lagging!), ignoring" << dendl; - } - - warn_on_future_time(ack->sent_timestamp, ack->get_source()); -} - -void Paxos::lease_ack_timeout() -{ - dout(1) << "lease_ack_timeout -- calling new election" << dendl; - assert(mon->is_leader()); - assert(is_active()); - logger->inc(l_paxos_lease_ack_timeout); - lease_ack_timeout_event = 0; - mon->bootstrap(); -} - -void Paxos::reset_lease_timeout() -{ - dout(20) << "reset_lease_timeout - setting timeout event" << dendl; - if (lease_timeout_event) - mon->timer.cancel_event(lease_timeout_event); - lease_timeout_event = mon->timer.add_event_after( - g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease, - new C_MonContext(mon, [this](int r) { - if (r == -ECANCELED) - return; - lease_timeout(); - })); -} - -void Paxos::lease_timeout() -{ - dout(1) << "lease_timeout -- calling new election" << dendl; - assert(mon->is_peon()); - logger->inc(l_paxos_lease_timeout); - lease_timeout_event = 0; - mon->bootstrap(); -} - -void Paxos::lease_renew_timeout() -{ - lease_renew_event = 0; - extend_lease(); -} - - -/* - * trim old states - */ -void Paxos::trim() -{ - assert(should_trim()); - version_t end = MIN(get_version() - g_conf->paxos_min, - get_first_committed() + g_conf->paxos_trim_max); - - if (first_committed >= end) - return; - - dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl; - - MonitorDBStore::TransactionRef t = get_pending_transaction(); - - for (version_t v = first_committed; v < end; ++v) { - dout(10) << "trim " << v << dendl; - t->erase(get_name(), v); - } - t->put(get_name(), "first_committed", end); - if (g_conf->mon_compact_on_trim) { - dout(10) << " compacting trimmed range" << dendl; - t->compact_range(get_name(), stringify(first_committed - 1), stringify(end)); - } - - trimming = true; - queue_pending_finisher(new C_Trimmed(this)); -} - -/* - * return a globally unique, monotonically increasing proposal number - */ -version_t Paxos::get_new_proposal_number(version_t gt) -{ - if (last_pn < gt) - last_pn = gt; - - // update. make it unique among all monitors. - last_pn /= 100; - last_pn++; - last_pn *= 100; - last_pn += (version_t)mon->rank; - - // write - auto t(std::make_shared()); - t->put(get_name(), "last_pn", last_pn); - - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - t->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - logger->inc(l_paxos_new_pn); - utime_t start = ceph_clock_now(); - - get_store()->apply_transaction(t); - - utime_t end = ceph_clock_now(); - logger->tinc(l_paxos_new_pn_latency, end - start); - - dout(10) << "get_new_proposal_number = " << last_pn << dendl; - return last_pn; -} - - -void Paxos::cancel_events() -{ - if (collect_timeout_event) { - mon->timer.cancel_event(collect_timeout_event); - collect_timeout_event = 0; - } - if (accept_timeout_event) { - mon->timer.cancel_event(accept_timeout_event); - accept_timeout_event = 0; - } - if (lease_renew_event) { - mon->timer.cancel_event(lease_renew_event); - lease_renew_event = 0; - } - if (lease_ack_timeout_event) { - mon->timer.cancel_event(lease_ack_timeout_event); - lease_ack_timeout_event = 0; - } - if (lease_timeout_event) { - mon->timer.cancel_event(lease_timeout_event); - lease_timeout_event = 0; - } -} - -void Paxos::shutdown() -{ - dout(10) << __func__ << " cancel all contexts" << dendl; - - state = STATE_SHUTDOWN; - - // discard pending transaction - pending_proposal.reset(); - - // Let store finish commits in progress - // XXX: I assume I can't use finish_contexts() because the store - // is going to trigger - while(commits_started > 0) - shutdown_cond.Wait(mon->lock); - - finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED); - finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED); - finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED); - finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED); - finish_contexts(g_ceph_context, pending_finishers, -ECANCELED); - finish_contexts(g_ceph_context, committing_finishers, -ECANCELED); - if (logger) - g_ceph_context->get_perfcounters_collection()->remove(logger); - delete logger; -} - -void Paxos::leader_init() -{ - cancel_events(); - new_value.clear(); - - // discard pending transaction - pending_proposal.reset(); - - finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); - finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); - - logger->inc(l_paxos_start_leader); - - if (mon->get_quorum().size() == 1) { - state = STATE_ACTIVE; - return; - } - - state = STATE_RECOVERING; - lease_expire = utime_t(); - dout(10) << "leader_init -- starting paxos recovery" << dendl; - collect(0); -} - -void Paxos::peon_init() -{ - cancel_events(); - new_value.clear(); - - state = STATE_RECOVERING; - lease_expire = utime_t(); - dout(10) << "peon_init -- i am a peon" << dendl; - - // start a timer, in case the leader never manages to issue a lease - reset_lease_timeout(); - - // discard pending transaction - pending_proposal.reset(); - - // no chance to write now! - finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN); - finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN); - finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); - finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); - - logger->inc(l_paxos_start_peon); -} - -void Paxos::restart() -{ - dout(10) << "restart -- canceling timeouts" << dendl; - cancel_events(); - new_value.clear(); - - if (is_writing() || is_writing_previous()) { - dout(10) << __func__ << " flushing" << dendl; - mon->lock.Unlock(); - mon->store->flush(); - mon->lock.Lock(); - dout(10) << __func__ << " flushed" << dendl; - } - state = STATE_RECOVERING; - - // discard pending transaction - pending_proposal.reset(); - - finish_contexts(g_ceph_context, committing_finishers, -EAGAIN); - finish_contexts(g_ceph_context, pending_finishers, -EAGAIN); - finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN); - finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN); - - logger->inc(l_paxos_restart); -} - - -void Paxos::dispatch(MonOpRequestRef op) -{ - assert(op->is_type_paxos()); - op->mark_paxos_event("dispatch"); - PaxosServiceMessage *m = static_cast(op->get_req()); - // election in progress? - if (!mon->is_leader() && !mon->is_peon()) { - dout(5) << "election in progress, dropping " << *m << dendl; - return; - } - - // check sanity - assert(mon->is_leader() || - (mon->is_peon() && m->get_source().num() == mon->get_leader())); - - switch (m->get_type()) { - - case MSG_MON_PAXOS: - { - MMonPaxos *pm = reinterpret_cast(m); - - // NOTE: these ops are defined in messages/MMonPaxos.h - switch (pm->op) { - // learner - case MMonPaxos::OP_COLLECT: - handle_collect(op); - break; - case MMonPaxos::OP_LAST: - handle_last(op); - break; - case MMonPaxos::OP_BEGIN: - handle_begin(op); - break; - case MMonPaxos::OP_ACCEPT: - handle_accept(op); - break; - case MMonPaxos::OP_COMMIT: - handle_commit(op); - break; - case MMonPaxos::OP_LEASE: - handle_lease(op); - break; - case MMonPaxos::OP_LEASE_ACK: - handle_lease_ack(op); - break; - default: - ceph_abort(); - } - } - break; - - default: - ceph_abort(); - } -} - - -// ----------------- -// service interface - -// -- READ -- - -bool Paxos::is_readable(version_t v) -{ - bool ret; - if (v > last_committed) - ret = false; - else - ret = - (mon->is_peon() || mon->is_leader()) && - (is_active() || is_updating() || is_writing()) && - last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease - dout(5) << __func__ << " = " << (int)ret - << " - now=" << ceph_clock_now() - << " lease_expire=" << lease_expire - << " has v" << v << " lc " << last_committed - << dendl; - return ret; -} - -bool Paxos::read(version_t v, bufferlist &bl) -{ - if (!get_store()->get(get_name(), v, bl)) - return false; - return true; -} - -version_t Paxos::read_current(bufferlist &bl) -{ - if (read(last_committed, bl)) - return last_committed; - return 0; -} - - -bool Paxos::is_lease_valid() -{ - return ((mon->get_quorum().size() == 1) - || (ceph_clock_now() < lease_expire)); -} - -// -- WRITE -- - -bool Paxos::is_writeable() -{ - return - mon->is_leader() && - is_active() && - is_lease_valid(); -} - -void Paxos::propose_pending() -{ - assert(is_active()); - assert(pending_proposal); - - cancel_events(); - - bufferlist bl; - pending_proposal->encode(bl); - - dout(10) << __func__ << " " << (last_committed + 1) - << " " << bl.length() << " bytes" << dendl; - dout(30) << __func__ << " transaction dump:\n"; - JSONFormatter f(true); - pending_proposal->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - pending_proposal.reset(); - - committing_finishers.swap(pending_finishers); - state = STATE_UPDATING; - begin(bl); -} - -void Paxos::queue_pending_finisher(Context *onfinished) -{ - dout(5) << __func__ << " " << onfinished << dendl; - assert(onfinished); - pending_finishers.push_back(onfinished); -} - -MonitorDBStore::TransactionRef Paxos::get_pending_transaction() -{ - assert(mon->is_leader()); - if (!pending_proposal) { - pending_proposal.reset(new MonitorDBStore::Transaction); - assert(pending_finishers.empty()); - } - return pending_proposal; -} - -bool Paxos::trigger_propose() -{ - if (plugged) { - dout(10) << __func__ << " plugged, not proposing now" << dendl; - return false; - } else if (is_active()) { - dout(10) << __func__ << " active, proposing now" << dendl; - propose_pending(); - return true; - } else { - dout(10) << __func__ << " not active, will propose later" << dendl; - return false; - } -} - -bool Paxos::is_consistent() -{ - return (first_committed <= last_committed); -} -