X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FPaxosService.cc;fp=src%2Fceph%2Fsrc%2Fmon%2FPaxosService.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=de732c322301956af95f9e7276495fb842f2467b;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mon/PaxosService.cc b/src/ceph/src/mon/PaxosService.cc deleted file mode 100644 index de732c3..0000000 --- a/src/ceph/src/mon/PaxosService.cc +++ /dev/null @@ -1,442 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "PaxosService.h" -#include "common/Clock.h" -#include "common/config.h" -#include "include/stringify.h" -#include "include/assert.h" -#include "mon/MonOpRequest.h" - -#define dout_subsys ceph_subsys_paxos -#undef dout_prefix -#define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) -static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name, - version_t fc, version_t lc) { - return *_dout << "mon." << mon->name << "@" << mon->rank - << "(" << mon->get_state_name() - << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; -} - -bool PaxosService::dispatch(MonOpRequestRef op) -{ - assert(op->is_type_service() || op->is_type_command()); - PaxosServiceMessage *m = static_cast(op->get_req()); - op->mark_event("psvc:dispatch"); - - dout(10) << __func__ << " " << m << " " << *m - << " from " << m->get_orig_source_inst() - << " con " << m->get_connection() << dendl; - - if (mon->is_shutdown()) { - return true; - } - - // make sure this message isn't forwarded from a previous election epoch - if (m->rx_election_epoch && - m->rx_election_epoch < mon->get_epoch()) { - dout(10) << " discarding forwarded message from previous election epoch " - << m->rx_election_epoch << " < " << mon->get_epoch() << dendl; - return true; - } - - // make sure the client is still connected. note that a proxied - // connection will be disconnected with a null message; don't drop - // those. also ignore loopback (e.g., log) messages. - if (m->get_connection() && - !m->get_connection()->is_connected() && - m->get_connection() != mon->con_self && - m->get_connection()->get_messenger() != NULL) { - dout(10) << " discarding message from disconnected client " - << m->get_source_inst() << " " << *m << dendl; - return true; - } - - // make sure our map is readable and up to date - if (!is_readable(m->version)) { - dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; - wait_for_readable(op, new C_RetryMessage(this, op), m->version); - return true; - } - - // preprocess - if (preprocess_query(op)) - return true; // easy! - - // leader? - if (!mon->is_leader()) { - mon->forward_request_leader(op); - return true; - } - - // writeable? - if (!is_writeable()) { - dout(10) << " waiting for paxos -> writeable" << dendl; - wait_for_writeable(op, new C_RetryMessage(this, op)); - return true; - } - - // update - if (!prepare_update(op)) { - // no changes made. - return true; - } - - if (need_immediate_propose) { - dout(10) << __func__ << " forced immediate propose" << dendl; - need_immediate_propose = false; - propose_pending(); - return true; - } - - double delay = 0.0; - if (!should_propose(delay)) { - dout(10) << " not proposing" << dendl; - return true; - } - - if (delay == 0.0) { - propose_pending(); - return true; - } - - // delay a bit - if (!proposal_timer) { - /** - * Callback class used to propose the pending value once the proposal_timer - * fires up. - */ - auto do_propose = new C_MonContext(mon, [this](int r) { - proposal_timer = 0; - if (r >= 0) { - propose_pending(); - } else if (r == -ECANCELED || r == -EAGAIN) { - return; - } else { - assert(0 == "bad return value for proposal_timer"); - } - }); - dout(10) << " setting proposal_timer " << do_propose - << " with delay of " << delay << dendl; - proposal_timer = mon->timer.add_event_after(delay, do_propose); - } else { - dout(10) << " proposal_timer already set" << dendl; - } - return true; -} - -void PaxosService::refresh(bool *need_bootstrap) -{ - // update cached versions - cached_first_committed = mon->store->get(get_service_name(), first_committed_name); - cached_last_committed = mon->store->get(get_service_name(), last_committed_name); - - version_t new_format = get_value("format_version"); - if (new_format != format_version) { - dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl; - on_upgrade(); - } - format_version = new_format; - - dout(10) << __func__ << dendl; - - update_from_paxos(need_bootstrap); -} - -void PaxosService::post_refresh() -{ - dout(10) << __func__ << dendl; - - post_paxos_update(); - - if (mon->is_peon() && !waiting_for_finished_proposal.empty()) { - finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); - } -} - -bool PaxosService::should_propose(double& delay) -{ - // simple default policy: quick startup, then some damping. - if (get_last_committed() <= 1) { - delay = 0.0; - } else { - utime_t now = ceph_clock_now(); - if ((now - paxos->last_commit_time) > g_conf->paxos_propose_interval) - delay = (double)g_conf->paxos_min_wait; - else - delay = (double)(g_conf->paxos_propose_interval + paxos->last_commit_time - - now); - } - return true; -} - - -void PaxosService::propose_pending() -{ - dout(10) << __func__ << dendl; - assert(have_pending); - assert(!proposing); - assert(mon->is_leader()); - assert(is_active()); - - if (proposal_timer) { - dout(10) << " canceling proposal_timer " << proposal_timer << dendl; - mon->timer.cancel_event(proposal_timer); - proposal_timer = NULL; - } - - /** - * @note What we contribute to the pending Paxos transaction is - * obtained by calling a function that must be implemented by - * the class implementing us. I.e., the function - * encode_pending will be the one responsible to encode - * whatever is pending on the implementation class into a - * bufferlist, so we can then propose that as a value through - * Paxos. - */ - MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); - - if (should_stash_full()) - encode_full(t); - - encode_pending(t); - have_pending = false; - - if (format_version > 0) { - t->put(get_service_name(), "format_version", format_version); - } - - // apply to paxos - proposing = true; - /** - * Callback class used to mark us as active once a proposal finishes going - * through Paxos. - * - * We should wake people up *only* *after* we inform the service we - * just went active. And we should wake people up only once we finish - * going active. This is why we first go active, avoiding to wake up the - * wrong people at the wrong time, such as waking up a C_RetryMessage - * before waking up a C_Active, thus ending up without a pending value. - */ - class C_Committed : public Context { - PaxosService *ps; - public: - explicit C_Committed(PaxosService *p) : ps(p) { } - void finish(int r) override { - ps->proposing = false; - if (r >= 0) - ps->_active(); - else if (r == -ECANCELED || r == -EAGAIN) - return; - else - assert(0 == "bad return value for C_Committed"); - } - }; - paxos->queue_pending_finisher(new C_Committed(this)); - paxos->trigger_propose(); -} - -bool PaxosService::should_stash_full() -{ - version_t latest_full = get_version_latest_full(); - /* @note The first member of the condition is moot and it is here just for - * clarity's sake. The second member would end up returing true - * nonetheless because, in that event, - * latest_full == get_trim_to() == 0. - */ - return (!latest_full || - (latest_full <= get_trim_to()) || - (get_last_committed() - latest_full > (version_t)g_conf->paxos_stash_full_interval)); -} - -void PaxosService::restart() -{ - dout(10) << __func__ << dendl; - if (proposal_timer) { - dout(10) << " canceling proposal_timer " << proposal_timer << dendl; - mon->timer.cancel_event(proposal_timer); - proposal_timer = 0; - } - - finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); - - if (have_pending) { - discard_pending(); - have_pending = false; - } - proposing = false; - - on_restart(); -} - -void PaxosService::election_finished() -{ - dout(10) << __func__ << dendl; - - finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); - - // make sure we update our state - _active(); -} - -void PaxosService::_active() -{ - if (is_proposing()) { - dout(10) << __func__ << " - proposing" << dendl; - return; - } - if (!is_active()) { - dout(10) << __func__ << " - not active" << dendl; - /** - * Callback used to make sure we call the PaxosService::_active function - * whenever a condition is fulfilled. - * - * This is used in multiple situations, from waiting for the Paxos to commit - * our proposed value, to waiting for the Paxos to become active once an - * election is finished. - */ - class C_Active : public Context { - PaxosService *svc; - public: - explicit C_Active(PaxosService *s) : svc(s) {} - void finish(int r) override { - if (r >= 0) - svc->_active(); - } - }; - wait_for_active_ctx(new C_Active(this)); - return; - } - dout(10) << __func__ << dendl; - - // create pending state? - if (mon->is_leader()) { - dout(7) << __func__ << " creating new pending" << dendl; - if (!have_pending) { - create_pending(); - have_pending = true; - } - - if (get_last_committed() == 0) { - // create initial state - create_initial(); - propose_pending(); - return; - } - } else { - if (!mon->is_leader()) { - dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl; - } - } - - // wake up anyone who came in while we were proposing. note that - // anyone waiting for the previous proposal to commit is no longer - // on this list; it is on Paxos's. - finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0); - - if (mon->is_leader()) - upgrade_format(); - - // NOTE: it's possible that this will get called twice if we commit - // an old paxos value. Implementations should be mindful of that. - on_active(); -} - - -void PaxosService::shutdown() -{ - cancel_events(); - - if (proposal_timer) { - dout(10) << " canceling proposal_timer " << proposal_timer << dendl; - mon->timer.cancel_event(proposal_timer); - proposal_timer = 0; - } - - finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); - - on_shutdown(); -} - -void PaxosService::maybe_trim() -{ - if (!is_writeable()) - return; - - version_t trim_to = get_trim_to(); - if (trim_to < get_first_committed()) - return; - - version_t to_remove = trim_to - get_first_committed(); - if (g_conf->paxos_service_trim_min > 0 && - to_remove < (version_t)g_conf->paxos_service_trim_min) { - dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove - << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl; - return; - } - - if (g_conf->paxos_service_trim_max > 0 && - to_remove > (version_t)g_conf->paxos_service_trim_max) { - dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove - << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max - << dendl; - trim_to = get_first_committed() + g_conf->paxos_service_trim_max; - to_remove = g_conf->paxos_service_trim_max; - } - - dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; - MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); - trim(t, get_first_committed(), trim_to); - put_first_committed(t, trim_to); - - // let the service add any extra stuff - encode_trim_extra(t, trim_to); - - paxos->trigger_propose(); -} - -void PaxosService::trim(MonitorDBStore::TransactionRef t, - version_t from, version_t to) -{ - dout(10) << __func__ << " from " << from << " to " << to << dendl; - assert(from != to); - - for (version_t v = from; v < to; ++v) { - dout(20) << __func__ << " " << v << dendl; - t->erase(get_service_name(), v); - - string full_key = mon->store->combine_strings("full", v); - if (mon->store->exists(get_service_name(), full_key)) { - dout(20) << __func__ << " " << full_key << dendl; - t->erase(get_service_name(), full_key); - } - } - if (g_conf->mon_compact_on_trim) { - dout(20) << " compacting prefix " << get_service_name() << dendl; - t->compact_range(get_service_name(), stringify(from - 1), stringify(to)); - t->compact_range(get_service_name(), - mon->store->combine_strings(full_prefix_name, from - 1), - mon->store->combine_strings(full_prefix_name, to)); - } -} - -void PaxosService::load_health() -{ - bufferlist bl; - mon->store->get("health", service_name, bl); - if (bl.length()) { - auto p = bl.begin(); - ::decode(health_checks, p); - } -}