remove ceph code
[stor4nfv.git] / src / ceph / src / mon / Paxos.cc
diff --git a/src/ceph/src/mon/Paxos.cc b/src/ceph/src/mon/Paxos.cc
deleted file mode 100644 (file)
index e924387..0000000
+++ /dev/null
@@ -1,1590 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include <sstream>
-#include "Paxos.h"
-#include "Monitor.h"
-#include "messages/MMonPaxos.h"
-
-#include "mon/mon_types.h"
-#include "common/config.h"
-#include "include/assert.h"
-#include "include/stringify.h"
-#include "common/Timer.h"
-#include "messages/PaxosServiceMessage.h"
-
-#define dout_subsys ceph_subsys_paxos
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
-                       int rank, const string& paxos_name, int state,
-                       version_t first_committed, version_t last_committed)
-{
-  return *_dout << "mon." << name << "@" << rank
-               << "(" << mon->get_state_name() << ")"
-               << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
-               << " c " << first_committed << ".." << last_committed
-               << ") ";
-}
-
-class Paxos::C_Trimmed : public Context {
-  Paxos *paxos;
-public:
-  explicit C_Trimmed(Paxos *p) : paxos(p) { }
-  void finish(int r) override {
-    paxos->trimming = false;
-  }
-};
-
-MonitorDBStore *Paxos::get_store()
-{
-  return mon->store;
-}
-
-void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
-                                         version_t first, version_t last)
-{
-  dout(10) << __func__ << " first " << first << " last " << last << dendl;
-  for (version_t v = first; v <= last; ++v) {
-    dout(30) << __func__ << " apply version " << v << dendl;
-    bufferlist bl;
-    int err = get_store()->get(get_name(), v, bl);
-    assert(err == 0);
-    assert(bl.length());
-    decode_append_transaction(tx, bl);
-  }
-  dout(15) << __func__ << " total versions " << (last-first) << dendl;
-}
-
-void Paxos::init()
-{
-  // load paxos variables from stable storage
-  last_pn = get_store()->get(get_name(), "last_pn");
-  accepted_pn = get_store()->get(get_name(), "accepted_pn");
-  last_committed = get_store()->get(get_name(), "last_committed");
-  first_committed = get_store()->get(get_name(), "first_committed");
-
-  dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
-          << accepted_pn << " last_committed: " << last_committed
-          << " first_committed: " << first_committed << dendl;
-
-  dout(10) << "init" << dendl;
-  assert(is_consistent());
-}
-
-void Paxos::init_logger()
-{
-  PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
-
-  // Because monitors are so few in number, the resource cost of capturing
-  // almost all their perf counters at USEFUL is trivial.
-  pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
-
-  pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
-  pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
-  pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
-  pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
-  pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
-  pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
-  pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
-  pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin");
-  pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
-  pcb.add_u64_counter(l_paxos_commit, "commit",
-      "Commits", "cmt");
-  pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
-  pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit");
-  pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
-      "Commit latency", "clat");
-  pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
-  pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
-  pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect");
-  pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
-  pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
-  pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
-  pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
-  pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
-  pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
-  pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
-  pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
-  pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state");
-  pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
-  pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
-  pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
-  pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state");
-  pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
-  pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
-  logger = pcb.create_perf_counters();
-  g_ceph_context->get_perfcounters_collection()->add(logger);
-}
-
-void Paxos::dump_info(Formatter *f)
-{
-  f->open_object_section("paxos");
-  f->dump_unsigned("first_committed", first_committed);
-  f->dump_unsigned("last_committed", last_committed);
-  f->dump_unsigned("last_pn", last_pn);
-  f->dump_unsigned("accepted_pn", accepted_pn);
-  f->close_section();
-}
-
-// ---------------------------------
-
-// PHASE 1
-
-// leader
-void Paxos::collect(version_t oldpn)
-{
-  // we're recoverying, it seems!
-  state = STATE_RECOVERING;
-  assert(mon->is_leader());
-
-  // reset the number of lasts received
-  uncommitted_v = 0;
-  uncommitted_pn = 0;
-  uncommitted_value.clear();
-  peer_first_committed.clear();
-  peer_last_committed.clear();
-
-  // look for uncommitted value
-  if (get_store()->exists(get_name(), last_committed+1)) {
-    version_t v = get_store()->get(get_name(), "pending_v");
-    version_t pn = get_store()->get(get_name(), "pending_pn");
-    if (v && pn && v == last_committed + 1) {
-      uncommitted_pn = pn;
-    } else {
-      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
-              << " and crossing our fingers" << dendl;
-      uncommitted_pn = accepted_pn;
-    }
-    uncommitted_v = last_committed+1;
-
-    get_store()->get(get_name(), last_committed+1, uncommitted_value);
-    assert(uncommitted_value.length());
-    dout(10) << "learned uncommitted " << (last_committed+1)
-            << " pn " << uncommitted_pn
-            << " (" << uncommitted_value.length() << " bytes) from myself" 
-            << dendl;
-
-    logger->inc(l_paxos_collect_uncommitted);
-  }
-
-  // pick new pn
-  accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
-  accepted_pn_from = last_committed;
-  num_last = 1;
-  dout(10) << "collect with pn " << accepted_pn << dendl;
-
-  // send collect
-  for (set<int>::const_iterator p = mon->get_quorum().begin();
-       p != mon->get_quorum().end();
-       ++p) {
-    if (*p == mon->rank) continue;
-    
-    MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
-                                      ceph_clock_now());
-    collect->last_committed = last_committed;
-    collect->first_committed = first_committed;
-    collect->pn = accepted_pn;
-    mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
-  }
-
-  // set timeout event
-  collect_timeout_event = mon->timer.add_event_after(
-    g_conf->mon_accept_timeout_factor *
-    g_conf->mon_lease,
-    new C_MonContext(mon, [this](int r) {
-       if (r == -ECANCELED)
-         return;
-       collect_timeout();
-    }));
-}
-
-
-// peon
-void Paxos::handle_collect(MonOpRequestRef op)
-{
-  
-  op->mark_paxos_event("handle_collect");
-
-  MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
-  dout(10) << "handle_collect " << *collect << dendl;
-
-  assert(mon->is_peon()); // mon epoch filter should catch strays
-
-  // we're recoverying, it seems!
-  state = STATE_RECOVERING;
-
-  //update the peon recovery timeout 
-  reset_lease_timeout();
-
-  if (collect->first_committed > last_committed+1) {
-    dout(2) << __func__
-            << " leader's lowest version is too high for our last committed"
-            << " (theirs: " << collect->first_committed
-            << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
-    op->mark_paxos_event("need to bootstrap");
-    mon->bootstrap();
-    return;
-  }
-
-  // reply
-  MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
-                                 ceph_clock_now());
-  last->last_committed = last_committed;
-  last->first_committed = first_committed;
-  
-  version_t previous_pn = accepted_pn;
-
-  // can we accept this pn?
-  if (collect->pn > accepted_pn) {
-    // ok, accept it
-    accepted_pn = collect->pn;
-    accepted_pn_from = collect->pn_from;
-    dout(10) << "accepting pn " << accepted_pn << " from " 
-            << accepted_pn_from << dendl;
-  
-    auto t(std::make_shared<MonitorDBStore::Transaction>());
-    t->put(get_name(), "accepted_pn", accepted_pn);
-
-    dout(30) << __func__ << " transaction dump:\n";
-    JSONFormatter f(true);
-    t->dump(&f);
-    f.flush(*_dout);
-    *_dout << dendl;
-
-    logger->inc(l_paxos_collect);
-    logger->inc(l_paxos_collect_keys, t->get_keys());
-    logger->inc(l_paxos_collect_bytes, t->get_bytes());
-    utime_t start = ceph_clock_now();
-
-    get_store()->apply_transaction(t);
-
-    utime_t end = ceph_clock_now();
-    logger->tinc(l_paxos_collect_latency, end - start);
-  } else {
-    // don't accept!
-    dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
-            << ", we already accepted " << accepted_pn
-            << " from " << accepted_pn_from << dendl;
-  }
-  last->pn = accepted_pn;
-  last->pn_from = accepted_pn_from;
-
-  // share whatever committed values we have
-  if (collect->last_committed < last_committed)
-    share_state(last, collect->first_committed, collect->last_committed);
-
-  // do we have an accepted but uncommitted value?
-  //  (it'll be at last_committed+1)
-  bufferlist bl;
-  if (collect->last_committed <= last_committed &&
-      get_store()->exists(get_name(), last_committed+1)) {
-    get_store()->get(get_name(), last_committed+1, bl);
-    assert(bl.length() > 0);
-    dout(10) << " sharing our accepted but uncommitted value for " 
-            << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
-    last->values[last_committed+1] = bl;
-
-    version_t v = get_store()->get(get_name(), "pending_v");
-    version_t pn = get_store()->get(get_name(), "pending_pn");
-    if (v && pn && v == last_committed + 1) {
-      last->uncommitted_pn = pn;
-    } else {
-      // previously we didn't record which pn a value was accepted
-      // under!  use the pn value we just had...  :(
-      dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
-              << " and crossing our fingers" << dendl;
-      last->uncommitted_pn = previous_pn;
-    }
-
-    logger->inc(l_paxos_collect_uncommitted);
-  }
-
-  // send reply
-  collect->get_connection()->send_message(last);
-}
-
-/**
- * @note This is Okay. We share our versions between peer_last_committed and
- *      our last_committed (inclusive), and add their bufferlists to the
- *      message. It will be the peer's job to apply them to its store, as
- *      these bufferlists will contain raw transactions.
- *      This function is called by both the Peon and the Leader. The Peon will
- *      share the state with the Leader during handle_collect(), sharing any
- *      values the leader may be missing (i.e., the leader's last_committed is
- *      lower than the peon's last_committed). The Leader will share the state
- *      with the Peon during handle_last(), if the peon's last_committed is
- *      lower than the leader's last_committed.
- */
-void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
-                       version_t peer_last_committed)
-{
-  assert(peer_last_committed < last_committed);
-
-  dout(10) << "share_state peer has fc " << peer_first_committed 
-          << " lc " << peer_last_committed << dendl;
-  version_t v = peer_last_committed + 1;
-
-  // include incrementals
-  uint64_t bytes = 0;
-  for ( ; v <= last_committed; v++) {
-    if (get_store()->exists(get_name(), v)) {
-      get_store()->get(get_name(), v, m->values[v]);
-      assert(m->values[v].length());
-      dout(10) << " sharing " << v << " ("
-              << m->values[v].length() << " bytes)" << dendl;
-      bytes += m->values[v].length() + 16;  // paxos_ + 10 digits = 16
-    }
-  }
-  logger->inc(l_paxos_share_state);
-  logger->inc(l_paxos_share_state_keys, m->values.size());
-  logger->inc(l_paxos_share_state_bytes, bytes);
-
-  m->last_committed = last_committed;
-}
-
-/**
- * Store on disk a state that was shared with us
- *
- * Basically, we received a set of version. Or just one. It doesn't matter.
- * What matters is that we have to stash it in the store. So, we will simply
- * write every single bufferlist into their own versions on our side (i.e.,
- * onto paxos-related keys), and then we will decode those same bufferlists
- * we just wrote and apply the transactions they hold. We will also update
- * our first and last committed values to point to the new values, if need
- * be. All all this is done tightly wrapped in a transaction to ensure we
- * enjoy the atomicity guarantees given by our awesome k/v store.
- */
-bool Paxos::store_state(MMonPaxos *m)
-{
-  auto t(std::make_shared<MonitorDBStore::Transaction>());
-  map<version_t,bufferlist>::iterator start = m->values.begin();
-  bool changed = false;
-
-  // build map of values to store
-  // we want to write the range [last_committed, m->last_committed] only.
-  if (start != m->values.end() &&
-      start->first > last_committed + 1) {
-    // ignore everything if values start in the future.
-    dout(10) << "store_state ignoring all values, they start at " << start->first
-            << " > last_committed+1" << dendl;
-    return false;
-  }
-
-  // push forward the start position on the message's values iterator, up until
-  // we run out of positions or we find a position matching 'last_committed'.
-  while (start != m->values.end() && start->first <= last_committed) {
-    ++start;
-  }
-
-  // make sure we get the right interval of values to apply by pushing forward
-  // the 'end' iterator until it matches the message's 'last_committed'.
-  map<version_t,bufferlist>::iterator end = start;
-  while (end != m->values.end() && end->first <= m->last_committed) {
-    last_committed = end->first;
-    ++end;
-  }
-
-  if (start == end) {
-    dout(10) << "store_state nothing to commit" << dendl;
-  } else {
-    dout(10) << "store_state [" << start->first << ".." 
-            << last_committed << "]" << dendl;
-    t->put(get_name(), "last_committed", last_committed);
-
-    // we should apply the state here -- decode every single bufferlist in the
-    // map and append the transactions to 't'.
-    map<version_t,bufferlist>::iterator it;
-    for (it = start; it != end; ++it) {
-      // write the bufferlist as the version's value
-      t->put(get_name(), it->first, it->second);
-      // decode the bufferlist and append it to the transaction we will shortly
-      // apply.
-      decode_append_transaction(t, it->second);
-    }
-
-    // discard obsolete uncommitted value?
-    if (uncommitted_v && uncommitted_v <= last_committed) {
-      dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
-              << " pn " << uncommitted_pn << dendl;
-      uncommitted_v = 0;
-      uncommitted_pn = 0;
-      uncommitted_value.clear();
-    }
-  }
-  if (!t->empty()) {
-    dout(30) << __func__ << " transaction dump:\n";
-    JSONFormatter f(true);
-    t->dump(&f);
-    f.flush(*_dout);
-    *_dout << dendl;
-
-    logger->inc(l_paxos_store_state);
-    logger->inc(l_paxos_store_state_bytes, t->get_bytes());
-    logger->inc(l_paxos_store_state_keys, t->get_keys());
-    utime_t start = ceph_clock_now();
-
-    get_store()->apply_transaction(t);
-
-    utime_t end = ceph_clock_now();
-    logger->tinc(l_paxos_store_state_latency, end - start);
-
-    // refresh first_committed; this txn may have trimmed.
-    first_committed = get_store()->get(get_name(), "first_committed");
-
-    _sanity_check_store();
-    changed = true;
-  }
-
-  return changed;
-}
-
-void Paxos::_sanity_check_store()
-{
-  version_t lc = get_store()->get(get_name(), "last_committed");
-  assert(lc == last_committed);
-}
-
-
-// leader
-void Paxos::handle_last(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_last");
-  MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
-  bool need_refresh = false;
-  int from = last->get_source().num();
-
-  dout(10) << "handle_last " << *last << dendl;
-
-  if (!mon->is_leader()) {
-    dout(10) << "not leader, dropping" << dendl;
-    return;
-  }
-
-  // note peer's first_ and last_committed, in case we learn a new
-  // commit and need to push it to them.
-  peer_first_committed[from] = last->first_committed;
-  peer_last_committed[from] = last->last_committed;
-
-  if (last->first_committed > last_committed + 1) {
-    dout(5) << __func__
-            << " mon." << from
-           << " lowest version is too high for our last committed"
-            << " (theirs: " << last->first_committed
-            << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
-    op->mark_paxos_event("need to bootstrap");
-    mon->bootstrap();
-    return;
-  }
-
-  assert(g_conf->paxos_kill_at != 1);
-
-  // store any committed values if any are specified in the message
-  need_refresh = store_state(last);
-
-  assert(g_conf->paxos_kill_at != 2);
-
-  // is everyone contiguous and up to date?
-  for (map<int,version_t>::iterator p = peer_last_committed.begin();
-       p != peer_last_committed.end();
-       ++p) {
-    if (p->second + 1 < first_committed && first_committed > 1) {
-      dout(5) << __func__
-             << " peon " << p->first
-             << " last_committed (" << p->second
-             << ") is too low for our first_committed (" << first_committed
-             << ") -- bootstrap!" << dendl;
-      op->mark_paxos_event("need to bootstrap");
-      mon->bootstrap();
-      return;
-    }
-    if (p->second < last_committed) {
-      // share committed values
-      dout(10) << " sending commit to mon." << p->first << dendl;
-      MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
-                                       MMonPaxos::OP_COMMIT,
-                                       ceph_clock_now());
-      share_state(commit, peer_first_committed[p->first], p->second);
-      mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
-    }
-  }
-
-  // do they accept your pn?
-  if (last->pn > accepted_pn) {
-    // no, try again.
-    dout(10) << " they had a higher pn than us, picking a new one." << dendl;
-
-    // cancel timeout event
-    mon->timer.cancel_event(collect_timeout_event);
-    collect_timeout_event = 0;
-
-    collect(last->pn);
-  } else if (last->pn == accepted_pn) {
-    // yes, they accepted our pn.  great.
-    num_last++;
-    dout(10) << " they accepted our pn, we now have " 
-            << num_last << " peons" << dendl;
-
-    // did this person send back an accepted but uncommitted value?
-    if (last->uncommitted_pn) {
-      if (last->uncommitted_pn >= uncommitted_pn &&
-         last->last_committed >= last_committed &&
-         last->last_committed + 1 >= uncommitted_v) {
-       uncommitted_v = last->last_committed+1;
-       uncommitted_pn = last->uncommitted_pn;
-       uncommitted_value = last->values[uncommitted_v];
-       dout(10) << "we learned an uncommitted value for " << uncommitted_v
-                << " pn " << uncommitted_pn
-                << " " << uncommitted_value.length() << " bytes"
-                << dendl;
-      } else {
-       dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
-                << " pn " << last->uncommitted_pn
-                << " " << last->values[last->last_committed+1].length() << " bytes"
-                << dendl;
-      }
-    }
-    
-    // is that everyone?
-    if (num_last == mon->get_quorum().size()) {
-      // cancel timeout event
-      mon->timer.cancel_event(collect_timeout_event);
-      collect_timeout_event = 0;
-      peer_first_committed.clear();
-      peer_last_committed.clear();
-
-      // almost...
-
-      // did we learn an old value?
-      if (uncommitted_v == last_committed+1 &&
-         uncommitted_value.length()) {
-       dout(10) << "that's everyone.  begin on old learned value" << dendl;
-       state = STATE_UPDATING_PREVIOUS;
-       begin(uncommitted_value);
-      } else {
-       // active!
-       dout(10) << "that's everyone.  active!" << dendl;
-       extend_lease();
-
-       need_refresh = false;
-       if (do_refresh()) {
-         finish_round();
-       }
-      }
-    }
-  } else {
-    // no, this is an old message, discard
-    dout(10) << "old pn, ignoring" << dendl;
-  }
-
-  if (need_refresh)
-    (void)do_refresh();
-}
-
-void Paxos::collect_timeout()
-{
-  dout(1) << "collect timeout, calling fresh election" << dendl;
-  collect_timeout_event = 0;
-  logger->inc(l_paxos_collect_timeout);
-  assert(mon->is_leader());
-  mon->bootstrap();
-}
-
-
-// leader
-void Paxos::begin(bufferlist& v)
-{
-  dout(10) << "begin for " << last_committed+1 << " " 
-          << v.length() << " bytes"
-          << dendl;
-
-  assert(mon->is_leader());
-  assert(is_updating() || is_updating_previous());
-
-  // we must already have a majority for this to work.
-  assert(mon->get_quorum().size() == 1 ||
-        num_last > (unsigned)mon->monmap->size()/2);
-  
-  // and no value, yet.
-  assert(new_value.length() == 0);
-
-  // accept it ourselves
-  accepted.clear();
-  accepted.insert(mon->rank);
-  new_value = v;
-
-  if (last_committed == 0) {
-    auto t(std::make_shared<MonitorDBStore::Transaction>());
-    // initial base case; set first_committed too
-    t->put(get_name(), "first_committed", 1);
-    decode_append_transaction(t, new_value);
-
-    bufferlist tx_bl;
-    t->encode(tx_bl);
-
-    new_value = tx_bl;
-  }
-
-  // store the proposed value in the store. IF it is accepted, we will then
-  // have to decode it into a transaction and apply it.
-  auto t(std::make_shared<MonitorDBStore::Transaction>());
-  t->put(get_name(), last_committed+1, new_value);
-
-  // note which pn this pending value is for.
-  t->put(get_name(), "pending_v", last_committed + 1);
-  t->put(get_name(), "pending_pn", accepted_pn);
-
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
-  bufferlist::iterator new_value_it = new_value.begin();
-  debug_tx->decode(new_value_it);
-  debug_tx->dump(&f);
-  *_dout << "\nbl dump:\n";
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  logger->inc(l_paxos_begin);
-  logger->inc(l_paxos_begin_keys, t->get_keys());
-  logger->inc(l_paxos_begin_bytes, t->get_bytes());
-  utime_t start = ceph_clock_now();
-
-  get_store()->apply_transaction(t);
-
-  utime_t end = ceph_clock_now();
-  logger->tinc(l_paxos_begin_latency, end - start);
-
-  assert(g_conf->paxos_kill_at != 3);
-
-  if (mon->get_quorum().size() == 1) {
-    // we're alone, take it easy
-    commit_start();
-    return;
-  }
-
-  // ask others to accept it too!
-  for (set<int>::const_iterator p = mon->get_quorum().begin();
-       p != mon->get_quorum().end();
-       ++p) {
-    if (*p == mon->rank) continue;
-    
-    dout(10) << " sending begin to mon." << *p << dendl;
-    MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
-                                    ceph_clock_now());
-    begin->values[last_committed+1] = new_value;
-    begin->last_committed = last_committed;
-    begin->pn = accepted_pn;
-    
-    mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
-  }
-
-  // set timeout event
-  accept_timeout_event = mon->timer.add_event_after(
-    g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
-    new C_MonContext(mon, [this](int r) {
-       if (r == -ECANCELED)
-         return;
-       accept_timeout();
-      }));
-}
-
-// peon
-void Paxos::handle_begin(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_begin");
-  MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
-  dout(10) << "handle_begin " << *begin << dendl;
-
-  // can we accept this?
-  if (begin->pn < accepted_pn) {
-    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
-    op->mark_paxos_event("have higher pn, ignore");
-    return;
-  }
-  assert(begin->pn == accepted_pn);
-  assert(begin->last_committed == last_committed);
-  
-  assert(g_conf->paxos_kill_at != 4);
-
-  logger->inc(l_paxos_begin);
-
-  // set state.
-  state = STATE_UPDATING;
-  lease_expire = utime_t();  // cancel lease
-
-  // yes.
-  version_t v = last_committed+1;
-  dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
-  // store the accepted value onto our store. We will have to decode it and
-  // apply its transaction once we receive permission to commit.
-  auto t(std::make_shared<MonitorDBStore::Transaction>());
-  t->put(get_name(), v, begin->values[v]);
-
-  // note which pn this pending value is for.
-  t->put(get_name(), "pending_v", v);
-  t->put(get_name(), "pending_pn", accepted_pn);
-
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  logger->inc(l_paxos_begin_bytes, t->get_bytes());
-  utime_t start = ceph_clock_now();
-
-  get_store()->apply_transaction(t);
-
-  utime_t end = ceph_clock_now();
-  logger->tinc(l_paxos_begin_latency, end - start);
-
-  assert(g_conf->paxos_kill_at != 5);
-
-  // reply
-  MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
-                                   ceph_clock_now());
-  accept->pn = accepted_pn;
-  accept->last_committed = last_committed;
-  begin->get_connection()->send_message(accept);
-}
-
-// leader
-void Paxos::handle_accept(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_accept");
-  MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
-  dout(10) << "handle_accept " << *accept << dendl;
-  int from = accept->get_source().num();
-
-  if (accept->pn != accepted_pn) {
-    // we accepted a higher pn, from some other leader
-    dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
-    op->mark_paxos_event("have higher pn, ignore");
-    return;
-  }
-  if (last_committed > 0 &&
-      accept->last_committed < last_committed-1) {
-    dout(10) << " this is from an old round, ignoring" << dendl;
-    op->mark_paxos_event("old round, ignore");
-    return;
-  }
-  assert(accept->last_committed == last_committed ||   // not committed
-        accept->last_committed == last_committed-1);  // committed
-
-  assert(is_updating() || is_updating_previous());
-  assert(accepted.count(from) == 0);
-  accepted.insert(from);
-  dout(10) << " now " << accepted << " have accepted" << dendl;
-
-  assert(g_conf->paxos_kill_at != 6);
-
-  // only commit (and expose committed state) when we get *all* quorum
-  // members to accept.  otherwise, they may still be sharing the now
-  // stale state.
-  // FIXME: we can improve this with an additional lease revocation message
-  // that doesn't block for the persist.
-  if (accepted == mon->get_quorum()) {
-    // yay, commit!
-    dout(10) << " got majority, committing, done with update" << dendl;
-    op->mark_paxos_event("commit_start");
-    commit_start();
-  }
-}
-
-void Paxos::accept_timeout()
-{
-  dout(1) << "accept timeout, calling fresh election" << dendl;
-  accept_timeout_event = 0;
-  assert(mon->is_leader());
-  assert(is_updating() || is_updating_previous() || is_writing() ||
-        is_writing_previous());
-  logger->inc(l_paxos_accept_timeout);
-  mon->bootstrap();
-}
-
-struct C_Committed : public Context {
-  Paxos *paxos;
-  explicit C_Committed(Paxos *p) : paxos(p) {}
-  void finish(int r) override {
-    assert(r >= 0);
-    Mutex::Locker l(paxos->mon->lock);
-    if (paxos->is_shutdown()) {
-      paxos->abort_commit();
-      return;
-    }
-    paxos->commit_finish();
-  }
-};
-
-void Paxos::abort_commit()
-{
-  assert(commits_started > 0);
-  --commits_started;
-  if (commits_started == 0)
-    shutdown_cond.Signal();
-}
-
-void Paxos::commit_start()
-{
-  dout(10) << __func__ << " " << (last_committed+1) << dendl;
-
-  assert(g_conf->paxos_kill_at != 7);
-
-  auto t(std::make_shared<MonitorDBStore::Transaction>());
-
-  // commit locally
-  t->put(get_name(), "last_committed", last_committed + 1);
-
-  // decode the value and apply its transaction to the store.
-  // this value can now be read from last_committed.
-  decode_append_transaction(t, new_value);
-
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  logger->inc(l_paxos_commit);
-  logger->inc(l_paxos_commit_keys, t->get_keys());
-  logger->inc(l_paxos_commit_bytes, t->get_bytes());
-  commit_start_stamp = ceph_clock_now();
-
-  get_store()->queue_transaction(t, new C_Committed(this));
-
-  if (is_updating_previous())
-    state = STATE_WRITING_PREVIOUS;
-  else if (is_updating())
-    state = STATE_WRITING;
-  else
-    ceph_abort();
-  ++commits_started;
-
-  if (mon->get_quorum().size() > 1) {
-    // cancel timeout event
-    mon->timer.cancel_event(accept_timeout_event);
-    accept_timeout_event = 0;
-  }
-}
-
-void Paxos::commit_finish()
-{
-  dout(20) << __func__ << " " << (last_committed+1) << dendl;
-  utime_t end = ceph_clock_now();
-  logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
-
-  assert(g_conf->paxos_kill_at != 8);
-
-  // cancel lease - it was for the old value.
-  //  (this would only happen if message layer lost the 'begin', but
-  //   leader still got a majority and committed with out us.)
-  lease_expire = utime_t();  // cancel lease
-
-  last_committed++;
-  last_commit_time = ceph_clock_now();
-
-  // refresh first_committed; this txn may have trimmed.
-  first_committed = get_store()->get(get_name(), "first_committed");
-
-  _sanity_check_store();
-
-  // tell everyone
-  for (set<int>::const_iterator p = mon->get_quorum().begin();
-       p != mon->get_quorum().end();
-       ++p) {
-    if (*p == mon->rank) continue;
-
-    dout(10) << " sending commit to mon." << *p << dendl;
-    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
-                                     ceph_clock_now());
-    commit->values[last_committed] = new_value;
-    commit->pn = accepted_pn;
-    commit->last_committed = last_committed;
-
-    mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
-  }
-
-  assert(g_conf->paxos_kill_at != 9);
-
-  // get ready for a new round.
-  new_value.clear();
-
-  // WRITING -> REFRESH
-  // among other things, this lets do_refresh() -> mon->bootstrap() know
-  // it doesn't need to flush the store queue
-  assert(is_writing() || is_writing_previous());
-  state = STATE_REFRESH;
-  assert(commits_started > 0);
-  --commits_started;
-
-  if (do_refresh()) {
-    commit_proposal();
-    if (mon->get_quorum().size() > 1) {
-      extend_lease();
-    }
-
-    finish_contexts(g_ceph_context, waiting_for_commit);
-
-    assert(g_conf->paxos_kill_at != 10);
-
-    finish_round();
-  }
-}
-
-
-void Paxos::handle_commit(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_commit");
-  MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
-  dout(10) << "handle_commit on " << commit->last_committed << dendl;
-
-  logger->inc(l_paxos_commit);
-
-  if (!mon->is_peon()) {
-    dout(10) << "not a peon, dropping" << dendl;
-    ceph_abort();
-    return;
-  }
-
-  op->mark_paxos_event("store_state");
-  store_state(commit);
-
-  if (do_refresh()) {
-    finish_contexts(g_ceph_context, waiting_for_commit);
-  }
-}
-
-void Paxos::extend_lease()
-{
-  assert(mon->is_leader());
-  //assert(is_active());
-
-  lease_expire = ceph_clock_now();
-  lease_expire += g_conf->mon_lease;
-  acked_lease.clear();
-  acked_lease.insert(mon->rank);
-
-  dout(7) << "extend_lease now+" << g_conf->mon_lease 
-         << " (" << lease_expire << ")" << dendl;
-
-  // bcast
-  for (set<int>::const_iterator p = mon->get_quorum().begin();
-      p != mon->get_quorum().end(); ++p) {
-
-    if (*p == mon->rank) continue;
-    MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
-                                    ceph_clock_now());
-    lease->last_committed = last_committed;
-    lease->lease_timestamp = lease_expire;
-    lease->first_committed = first_committed;
-    mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
-  }
-
-  // set timeout event.
-  //  if old timeout is still in place, leave it.
-  if (!lease_ack_timeout_event) {
-    lease_ack_timeout_event = mon->timer.add_event_after(
-      g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
-      new C_MonContext(mon, [this](int r) {
-         if (r == -ECANCELED)
-           return;
-         lease_ack_timeout();
-       }));
-  }
-
-  // set renew event
-  utime_t at = lease_expire;
-  at -= g_conf->mon_lease;
-  at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
-  lease_renew_event = mon->timer.add_event_at(
-    at, new C_MonContext(mon, [this](int r) {
-       if (r == -ECANCELED)
-         return;
-       lease_renew_timeout();
-    }));
-}
-
-void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
-{
-  utime_t now = ceph_clock_now();
-  if (t > now) {
-    utime_t diff = t - now;
-    if (diff > g_conf->mon_clock_drift_allowed) {
-      utime_t warn_diff = now - last_clock_drift_warn;
-      if (warn_diff >
-         pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) {
-       mon->clog->warn() << "message from " << from << " was stamped " << diff
-                        << "s in the future, clocks not synchronized";
-       last_clock_drift_warn = ceph_clock_now();
-       ++clock_drift_warned;
-      }
-    }
-  }
-
-}
-
-bool Paxos::do_refresh()
-{
-  bool need_bootstrap = false;
-
-  utime_t start = ceph_clock_now();
-
-  // make sure we have the latest state loaded up
-  mon->refresh_from_paxos(&need_bootstrap);
-
-  utime_t end = ceph_clock_now();
-  logger->inc(l_paxos_refresh);
-  logger->tinc(l_paxos_refresh_latency, end - start);
-
-  if (need_bootstrap) {
-    dout(10) << " doing requested bootstrap" << dendl;
-    mon->bootstrap();
-    return false;
-  }
-
-  return true;
-}
-
-void Paxos::commit_proposal()
-{
-  dout(10) << __func__ << dendl;
-  assert(mon->is_leader());
-  assert(is_refresh());
-
-  finish_contexts(g_ceph_context, committing_finishers);
-}
-
-void Paxos::finish_round()
-{
-  dout(10) << __func__ << dendl;
-  assert(mon->is_leader());
-
-  // ok, now go active!
-  state = STATE_ACTIVE;
-
-  dout(20) << __func__ << " waiting_for_acting" << dendl;
-  finish_contexts(g_ceph_context, waiting_for_active);
-  dout(20) << __func__ << " waiting_for_readable" << dendl;
-  finish_contexts(g_ceph_context, waiting_for_readable);
-  dout(20) << __func__ << " waiting_for_writeable" << dendl;
-  finish_contexts(g_ceph_context, waiting_for_writeable);
-  
-  dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
-
-  if (should_trim()) {
-    trim();
-  }
-
-  if (is_active() && pending_proposal) {
-    propose_pending();
-  }
-}
-
-
-// peon
-void Paxos::handle_lease(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_lease");
-  MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
-  // sanity
-  if (!mon->is_peon() ||
-      last_committed != lease->last_committed) {
-    dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
-            << " or the last_committed doesn't match, dropping" << dendl;
-    op->mark_paxos_event("invalid lease, ignore");
-    return;
-  }
-
-  warn_on_future_time(lease->sent_timestamp, lease->get_source());
-
-  // extend lease
-  if (lease_expire < lease->lease_timestamp) {
-    lease_expire = lease->lease_timestamp;
-
-    utime_t now = ceph_clock_now();
-    if (lease_expire < now) {
-      utime_t diff = now - lease_expire;
-      derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
-    }
-  }
-
-  state = STATE_ACTIVE;
-
-  dout(10) << "handle_lease on " << lease->last_committed
-          << " now " << lease_expire << dendl;
-
-  // ack
-  MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
-                                ceph_clock_now());
-  ack->last_committed = last_committed;
-  ack->first_committed = first_committed;
-  ack->lease_timestamp = ceph_clock_now();
-  ::encode(mon->session_map.feature_map, ack->feature_map);
-  lease->get_connection()->send_message(ack);
-
-  // (re)set timeout event.
-  reset_lease_timeout();
-
-  // kick waiters
-  finish_contexts(g_ceph_context, waiting_for_active);
-  if (is_readable())
-    finish_contexts(g_ceph_context, waiting_for_readable);
-}
-
-void Paxos::handle_lease_ack(MonOpRequestRef op)
-{
-  op->mark_paxos_event("handle_lease_ack");
-  MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
-  int from = ack->get_source().num();
-
-  if (!lease_ack_timeout_event) {
-    dout(10) << "handle_lease_ack from " << ack->get_source()
-            << " -- stray (probably since revoked)" << dendl;
-
-  } else if (acked_lease.count(from) == 0) {
-    acked_lease.insert(from);
-    if (ack->feature_map.length()) {
-      auto p = ack->feature_map.begin();
-      FeatureMap& t = mon->quorum_feature_map[from];
-      ::decode(t, p);
-    }
-    if (acked_lease == mon->get_quorum()) {
-      // yay!
-      dout(10) << "handle_lease_ack from " << ack->get_source()
-              << " -- got everyone" << dendl;
-      mon->timer.cancel_event(lease_ack_timeout_event);
-      lease_ack_timeout_event = 0;
-
-
-    } else {
-      dout(10) << "handle_lease_ack from " << ack->get_source()
-              << " -- still need "
-              << mon->get_quorum().size() - acked_lease.size()
-              << " more" << dendl;
-    }
-  } else {
-    dout(10) << "handle_lease_ack from " << ack->get_source()
-            << " dup (lagging!), ignoring" << dendl;
-  }
-
-  warn_on_future_time(ack->sent_timestamp, ack->get_source());
-}
-
-void Paxos::lease_ack_timeout()
-{
-  dout(1) << "lease_ack_timeout -- calling new election" << dendl;
-  assert(mon->is_leader());
-  assert(is_active());
-  logger->inc(l_paxos_lease_ack_timeout);
-  lease_ack_timeout_event = 0;
-  mon->bootstrap();
-}
-
-void Paxos::reset_lease_timeout()
-{
-  dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
-  if (lease_timeout_event)
-    mon->timer.cancel_event(lease_timeout_event);
-  lease_timeout_event = mon->timer.add_event_after(
-    g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
-    new C_MonContext(mon, [this](int r) {
-       if (r == -ECANCELED)
-         return;
-       lease_timeout();
-      }));
-}
-
-void Paxos::lease_timeout()
-{
-  dout(1) << "lease_timeout -- calling new election" << dendl;
-  assert(mon->is_peon());
-  logger->inc(l_paxos_lease_timeout);
-  lease_timeout_event = 0;
-  mon->bootstrap();
-}
-
-void Paxos::lease_renew_timeout()
-{
-  lease_renew_event = 0;
-  extend_lease();
-}
-
-
-/*
- * trim old states
- */
-void Paxos::trim()
-{
-  assert(should_trim());
-  version_t end = MIN(get_version() - g_conf->paxos_min,
-                     get_first_committed() + g_conf->paxos_trim_max);
-
-  if (first_committed >= end)
-    return;
-
-  dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
-
-  MonitorDBStore::TransactionRef t = get_pending_transaction();
-
-  for (version_t v = first_committed; v < end; ++v) {
-    dout(10) << "trim " << v << dendl;
-    t->erase(get_name(), v);
-  }
-  t->put(get_name(), "first_committed", end);
-  if (g_conf->mon_compact_on_trim) {
-    dout(10) << " compacting trimmed range" << dendl;
-    t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
-  }
-
-  trimming = true;
-  queue_pending_finisher(new C_Trimmed(this));
-}
-
-/*
- * return a globally unique, monotonically increasing proposal number
- */
-version_t Paxos::get_new_proposal_number(version_t gt)
-{
-  if (last_pn < gt) 
-    last_pn = gt;
-  
-  // update. make it unique among all monitors.
-  last_pn /= 100;
-  last_pn++;
-  last_pn *= 100;
-  last_pn += (version_t)mon->rank;
-
-  // write
-  auto t(std::make_shared<MonitorDBStore::Transaction>());
-  t->put(get_name(), "last_pn", last_pn);
-
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  logger->inc(l_paxos_new_pn);
-  utime_t start = ceph_clock_now();
-
-  get_store()->apply_transaction(t);
-
-  utime_t end = ceph_clock_now();
-  logger->tinc(l_paxos_new_pn_latency, end - start);
-
-  dout(10) << "get_new_proposal_number = " << last_pn << dendl;
-  return last_pn;
-}
-
-
-void Paxos::cancel_events()
-{
-  if (collect_timeout_event) {
-    mon->timer.cancel_event(collect_timeout_event);
-    collect_timeout_event = 0;
-  }
-  if (accept_timeout_event) {
-    mon->timer.cancel_event(accept_timeout_event);
-    accept_timeout_event = 0;
-  }
-  if (lease_renew_event) {
-    mon->timer.cancel_event(lease_renew_event);
-    lease_renew_event = 0;
-  }
-  if (lease_ack_timeout_event) {
-    mon->timer.cancel_event(lease_ack_timeout_event);
-    lease_ack_timeout_event = 0;
-  }  
-  if (lease_timeout_event) {
-    mon->timer.cancel_event(lease_timeout_event);
-    lease_timeout_event = 0;
-  }
-}
-
-void Paxos::shutdown()
-{
-  dout(10) << __func__ << " cancel all contexts" << dendl;
-
-  state = STATE_SHUTDOWN;
-
-  // discard pending transaction
-  pending_proposal.reset();
-
-  // Let store finish commits in progress
-  // XXX: I assume I can't use finish_contexts() because the store
-  // is going to trigger
-  while(commits_started > 0)
-    shutdown_cond.Wait(mon->lock);
-
-  finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
-  finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
-  finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
-  finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
-  finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
-  finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
-  if (logger)
-    g_ceph_context->get_perfcounters_collection()->remove(logger);
-  delete logger;
-}
-
-void Paxos::leader_init()
-{
-  cancel_events();
-  new_value.clear();
-
-  // discard pending transaction
-  pending_proposal.reset();
-
-  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
-  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
-
-  logger->inc(l_paxos_start_leader);
-
-  if (mon->get_quorum().size() == 1) {
-    state = STATE_ACTIVE;
-    return;
-  }
-
-  state = STATE_RECOVERING;
-  lease_expire = utime_t();
-  dout(10) << "leader_init -- starting paxos recovery" << dendl;
-  collect(0);
-}
-
-void Paxos::peon_init()
-{
-  cancel_events();
-  new_value.clear();
-
-  state = STATE_RECOVERING;
-  lease_expire = utime_t();
-  dout(10) << "peon_init -- i am a peon" << dendl;
-
-  // start a timer, in case the leader never manages to issue a lease
-  reset_lease_timeout();
-
-  // discard pending transaction
-  pending_proposal.reset();
-
-  // no chance to write now!
-  finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
-  finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
-  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
-  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
-
-  logger->inc(l_paxos_start_peon);
-}
-
-void Paxos::restart()
-{
-  dout(10) << "restart -- canceling timeouts" << dendl;
-  cancel_events();
-  new_value.clear();
-
-  if (is_writing() || is_writing_previous()) {
-    dout(10) << __func__ << " flushing" << dendl;
-    mon->lock.Unlock();
-    mon->store->flush();
-    mon->lock.Lock();
-    dout(10) << __func__ << " flushed" << dendl;
-  }
-  state = STATE_RECOVERING;
-
-  // discard pending transaction
-  pending_proposal.reset();
-
-  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
-  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
-  finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
-  finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
-
-  logger->inc(l_paxos_restart);
-}
-
-
-void Paxos::dispatch(MonOpRequestRef op)
-{
-  assert(op->is_type_paxos());
-  op->mark_paxos_event("dispatch");
-  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
-  // election in progress?
-  if (!mon->is_leader() && !mon->is_peon()) {
-    dout(5) << "election in progress, dropping " << *m << dendl;
-    return;    
-  }
-
-  // check sanity
-  assert(mon->is_leader() || 
-        (mon->is_peon() && m->get_source().num() == mon->get_leader()));
-  
-  switch (m->get_type()) {
-
-  case MSG_MON_PAXOS:
-    {
-      MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);
-
-      // NOTE: these ops are defined in messages/MMonPaxos.h
-      switch (pm->op) {
-       // learner
-      case MMonPaxos::OP_COLLECT:
-       handle_collect(op);
-       break;
-      case MMonPaxos::OP_LAST:
-       handle_last(op);
-       break;
-      case MMonPaxos::OP_BEGIN:
-       handle_begin(op);
-       break;
-      case MMonPaxos::OP_ACCEPT:
-       handle_accept(op);
-       break;          
-      case MMonPaxos::OP_COMMIT:
-       handle_commit(op);
-       break;
-      case MMonPaxos::OP_LEASE:
-       handle_lease(op);
-       break;
-      case MMonPaxos::OP_LEASE_ACK:
-       handle_lease_ack(op);
-       break;
-      default:
-       ceph_abort();
-      }
-    }
-    break;
-    
-  default:
-    ceph_abort();
-  }
-}
-
-
-// -----------------
-// service interface
-
-// -- READ --
-
-bool Paxos::is_readable(version_t v)
-{
-  bool ret;
-  if (v > last_committed)
-    ret = false;
-  else
-    ret =
-      (mon->is_peon() || mon->is_leader()) &&
-      (is_active() || is_updating() || is_writing()) &&
-      last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
-  dout(5) << __func__ << " = " << (int)ret
-         << " - now=" << ceph_clock_now()
-         << " lease_expire=" << lease_expire
-         << " has v" << v << " lc " << last_committed
-         << dendl;
-  return ret;
-}
-
-bool Paxos::read(version_t v, bufferlist &bl)
-{
-  if (!get_store()->get(get_name(), v, bl))
-    return false;
-  return true;
-}
-
-version_t Paxos::read_current(bufferlist &bl)
-{
-  if (read(last_committed, bl))
-    return last_committed;
-  return 0;
-}
-
-
-bool Paxos::is_lease_valid()
-{
-  return ((mon->get_quorum().size() == 1)
-      || (ceph_clock_now() < lease_expire));
-}
-
-// -- WRITE --
-
-bool Paxos::is_writeable()
-{
-  return
-    mon->is_leader() &&
-    is_active() &&
-    is_lease_valid();
-}
-
-void Paxos::propose_pending()
-{
-  assert(is_active());
-  assert(pending_proposal);
-
-  cancel_events();
-
-  bufferlist bl;
-  pending_proposal->encode(bl);
-
-  dout(10) << __func__ << " " << (last_committed + 1)
-          << " " << bl.length() << " bytes" << dendl;
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  pending_proposal->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  pending_proposal.reset();
-
-  committing_finishers.swap(pending_finishers);
-  state = STATE_UPDATING;
-  begin(bl);
-}
-
-void Paxos::queue_pending_finisher(Context *onfinished)
-{
-  dout(5) << __func__ << " " << onfinished << dendl;
-  assert(onfinished);
-  pending_finishers.push_back(onfinished);
-}
-
-MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
-{
-  assert(mon->is_leader());
-  if (!pending_proposal) {
-    pending_proposal.reset(new MonitorDBStore::Transaction);
-    assert(pending_finishers.empty());
-  }
-  return pending_proposal;
-}
-
-bool Paxos::trigger_propose()
-{
-  if (plugged) {
-    dout(10) << __func__ << " plugged, not proposing now" << dendl;
-    return false;
-  } else if (is_active()) {
-    dout(10) << __func__ << " active, proposing now" << dendl;
-    propose_pending();
-    return true;
-  } else {
-    dout(10) << __func__ << " not active, will propose later" << dendl;
-    return false;
-  }
-}
-
-bool Paxos::is_consistent()
-{
-  return (first_committed <= last_committed);
-}
-