Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / Paxos.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <sstream>
16 #include "Paxos.h"
17 #include "Monitor.h"
18 #include "messages/MMonPaxos.h"
19
20 #include "mon/mon_types.h"
21 #include "common/config.h"
22 #include "include/assert.h"
23 #include "include/stringify.h"
24 #include "common/Timer.h"
25 #include "messages/PaxosServiceMessage.h"
26
27 #define dout_subsys ceph_subsys_paxos
28 #undef dout_prefix
29 #define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
30 static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
31                         int rank, const string& paxos_name, int state,
32                         version_t first_committed, version_t last_committed)
33 {
34   return *_dout << "mon." << name << "@" << rank
35                 << "(" << mon->get_state_name() << ")"
36                 << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
37                 << " c " << first_committed << ".." << last_committed
38                 << ") ";
39 }
40
41 class Paxos::C_Trimmed : public Context {
42   Paxos *paxos;
43 public:
44   explicit C_Trimmed(Paxos *p) : paxos(p) { }
45   void finish(int r) override {
46     paxos->trimming = false;
47   }
48 };
49
50 MonitorDBStore *Paxos::get_store()
51 {
52   return mon->store;
53 }
54
55 void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
56                                           version_t first, version_t last)
57 {
58   dout(10) << __func__ << " first " << first << " last " << last << dendl;
59   for (version_t v = first; v <= last; ++v) {
60     dout(30) << __func__ << " apply version " << v << dendl;
61     bufferlist bl;
62     int err = get_store()->get(get_name(), v, bl);
63     assert(err == 0);
64     assert(bl.length());
65     decode_append_transaction(tx, bl);
66   }
67   dout(15) << __func__ << " total versions " << (last-first) << dendl;
68 }
69
70 void Paxos::init()
71 {
72   // load paxos variables from stable storage
73   last_pn = get_store()->get(get_name(), "last_pn");
74   accepted_pn = get_store()->get(get_name(), "accepted_pn");
75   last_committed = get_store()->get(get_name(), "last_committed");
76   first_committed = get_store()->get(get_name(), "first_committed");
77
78   dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
79            << accepted_pn << " last_committed: " << last_committed
80            << " first_committed: " << first_committed << dendl;
81
82   dout(10) << "init" << dendl;
83   assert(is_consistent());
84 }
85
86 void Paxos::init_logger()
87 {
88   PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
89
90   // Because monitors are so few in number, the resource cost of capturing
91   // almost all their perf counters at USEFUL is trivial.
92   pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
93
94   pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
95   pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
96   pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
97   pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
98   pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
99   pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
100   pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
101   pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin");
102   pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
103   pcb.add_u64_counter(l_paxos_commit, "commit",
104       "Commits", "cmt");
105   pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
106   pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit");
107   pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
108       "Commit latency", "clat");
109   pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
110   pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
111   pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect");
112   pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
113   pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
114   pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
115   pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
116   pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
117   pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
118   pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
119   pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
120   pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state");
121   pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
122   pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
123   pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
124   pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state");
125   pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
126   pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
127   logger = pcb.create_perf_counters();
128   g_ceph_context->get_perfcounters_collection()->add(logger);
129 }
130
131 void Paxos::dump_info(Formatter *f)
132 {
133   f->open_object_section("paxos");
134   f->dump_unsigned("first_committed", first_committed);
135   f->dump_unsigned("last_committed", last_committed);
136   f->dump_unsigned("last_pn", last_pn);
137   f->dump_unsigned("accepted_pn", accepted_pn);
138   f->close_section();
139 }
140
141 // ---------------------------------
142
143 // PHASE 1
144
145 // leader
146 void Paxos::collect(version_t oldpn)
147 {
148   // we're recoverying, it seems!
149   state = STATE_RECOVERING;
150   assert(mon->is_leader());
151
152   // reset the number of lasts received
153   uncommitted_v = 0;
154   uncommitted_pn = 0;
155   uncommitted_value.clear();
156   peer_first_committed.clear();
157   peer_last_committed.clear();
158
159   // look for uncommitted value
160   if (get_store()->exists(get_name(), last_committed+1)) {
161     version_t v = get_store()->get(get_name(), "pending_v");
162     version_t pn = get_store()->get(get_name(), "pending_pn");
163     if (v && pn && v == last_committed + 1) {
164       uncommitted_pn = pn;
165     } else {
166       dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
167                << " and crossing our fingers" << dendl;
168       uncommitted_pn = accepted_pn;
169     }
170     uncommitted_v = last_committed+1;
171
172     get_store()->get(get_name(), last_committed+1, uncommitted_value);
173     assert(uncommitted_value.length());
174     dout(10) << "learned uncommitted " << (last_committed+1)
175              << " pn " << uncommitted_pn
176              << " (" << uncommitted_value.length() << " bytes) from myself" 
177              << dendl;
178
179     logger->inc(l_paxos_collect_uncommitted);
180   }
181
182   // pick new pn
183   accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
184   accepted_pn_from = last_committed;
185   num_last = 1;
186   dout(10) << "collect with pn " << accepted_pn << dendl;
187
188   // send collect
189   for (set<int>::const_iterator p = mon->get_quorum().begin();
190        p != mon->get_quorum().end();
191        ++p) {
192     if (*p == mon->rank) continue;
193     
194     MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
195                                        ceph_clock_now());
196     collect->last_committed = last_committed;
197     collect->first_committed = first_committed;
198     collect->pn = accepted_pn;
199     mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
200   }
201
202   // set timeout event
203   collect_timeout_event = mon->timer.add_event_after(
204     g_conf->mon_accept_timeout_factor *
205     g_conf->mon_lease,
206     new C_MonContext(mon, [this](int r) {
207         if (r == -ECANCELED)
208           return;
209         collect_timeout();
210     }));
211 }
212
213
214 // peon
215 void Paxos::handle_collect(MonOpRequestRef op)
216 {
217   
218   op->mark_paxos_event("handle_collect");
219
220   MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
221   dout(10) << "handle_collect " << *collect << dendl;
222
223   assert(mon->is_peon()); // mon epoch filter should catch strays
224
225   // we're recoverying, it seems!
226   state = STATE_RECOVERING;
227
228   //update the peon recovery timeout 
229   reset_lease_timeout();
230
231   if (collect->first_committed > last_committed+1) {
232     dout(2) << __func__
233             << " leader's lowest version is too high for our last committed"
234             << " (theirs: " << collect->first_committed
235             << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
236     op->mark_paxos_event("need to bootstrap");
237     mon->bootstrap();
238     return;
239   }
240
241   // reply
242   MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
243                                   ceph_clock_now());
244   last->last_committed = last_committed;
245   last->first_committed = first_committed;
246   
247   version_t previous_pn = accepted_pn;
248
249   // can we accept this pn?
250   if (collect->pn > accepted_pn) {
251     // ok, accept it
252     accepted_pn = collect->pn;
253     accepted_pn_from = collect->pn_from;
254     dout(10) << "accepting pn " << accepted_pn << " from " 
255              << accepted_pn_from << dendl;
256   
257     auto t(std::make_shared<MonitorDBStore::Transaction>());
258     t->put(get_name(), "accepted_pn", accepted_pn);
259
260     dout(30) << __func__ << " transaction dump:\n";
261     JSONFormatter f(true);
262     t->dump(&f);
263     f.flush(*_dout);
264     *_dout << dendl;
265
266     logger->inc(l_paxos_collect);
267     logger->inc(l_paxos_collect_keys, t->get_keys());
268     logger->inc(l_paxos_collect_bytes, t->get_bytes());
269     utime_t start = ceph_clock_now();
270
271     get_store()->apply_transaction(t);
272
273     utime_t end = ceph_clock_now();
274     logger->tinc(l_paxos_collect_latency, end - start);
275   } else {
276     // don't accept!
277     dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
278              << ", we already accepted " << accepted_pn
279              << " from " << accepted_pn_from << dendl;
280   }
281   last->pn = accepted_pn;
282   last->pn_from = accepted_pn_from;
283
284   // share whatever committed values we have
285   if (collect->last_committed < last_committed)
286     share_state(last, collect->first_committed, collect->last_committed);
287
288   // do we have an accepted but uncommitted value?
289   //  (it'll be at last_committed+1)
290   bufferlist bl;
291   if (collect->last_committed <= last_committed &&
292       get_store()->exists(get_name(), last_committed+1)) {
293     get_store()->get(get_name(), last_committed+1, bl);
294     assert(bl.length() > 0);
295     dout(10) << " sharing our accepted but uncommitted value for " 
296              << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
297     last->values[last_committed+1] = bl;
298
299     version_t v = get_store()->get(get_name(), "pending_v");
300     version_t pn = get_store()->get(get_name(), "pending_pn");
301     if (v && pn && v == last_committed + 1) {
302       last->uncommitted_pn = pn;
303     } else {
304       // previously we didn't record which pn a value was accepted
305       // under!  use the pn value we just had...  :(
306       dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
307                << " and crossing our fingers" << dendl;
308       last->uncommitted_pn = previous_pn;
309     }
310
311     logger->inc(l_paxos_collect_uncommitted);
312   }
313
314   // send reply
315   collect->get_connection()->send_message(last);
316 }
317
318 /**
319  * @note This is Okay. We share our versions between peer_last_committed and
320  *       our last_committed (inclusive), and add their bufferlists to the
321  *       message. It will be the peer's job to apply them to its store, as
322  *       these bufferlists will contain raw transactions.
323  *       This function is called by both the Peon and the Leader. The Peon will
324  *       share the state with the Leader during handle_collect(), sharing any
325  *       values the leader may be missing (i.e., the leader's last_committed is
326  *       lower than the peon's last_committed). The Leader will share the state
327  *       with the Peon during handle_last(), if the peon's last_committed is
328  *       lower than the leader's last_committed.
329  */
330 void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
331                         version_t peer_last_committed)
332 {
333   assert(peer_last_committed < last_committed);
334
335   dout(10) << "share_state peer has fc " << peer_first_committed 
336            << " lc " << peer_last_committed << dendl;
337   version_t v = peer_last_committed + 1;
338
339   // include incrementals
340   uint64_t bytes = 0;
341   for ( ; v <= last_committed; v++) {
342     if (get_store()->exists(get_name(), v)) {
343       get_store()->get(get_name(), v, m->values[v]);
344       assert(m->values[v].length());
345       dout(10) << " sharing " << v << " ("
346                << m->values[v].length() << " bytes)" << dendl;
347       bytes += m->values[v].length() + 16;  // paxos_ + 10 digits = 16
348     }
349   }
350   logger->inc(l_paxos_share_state);
351   logger->inc(l_paxos_share_state_keys, m->values.size());
352   logger->inc(l_paxos_share_state_bytes, bytes);
353
354   m->last_committed = last_committed;
355 }
356
357 /**
358  * Store on disk a state that was shared with us
359  *
360  * Basically, we received a set of version. Or just one. It doesn't matter.
361  * What matters is that we have to stash it in the store. So, we will simply
362  * write every single bufferlist into their own versions on our side (i.e.,
363  * onto paxos-related keys), and then we will decode those same bufferlists
364  * we just wrote and apply the transactions they hold. We will also update
365  * our first and last committed values to point to the new values, if need
366  * be. All all this is done tightly wrapped in a transaction to ensure we
367  * enjoy the atomicity guarantees given by our awesome k/v store.
368  */
369 bool Paxos::store_state(MMonPaxos *m)
370 {
371   auto t(std::make_shared<MonitorDBStore::Transaction>());
372   map<version_t,bufferlist>::iterator start = m->values.begin();
373   bool changed = false;
374
375   // build map of values to store
376   // we want to write the range [last_committed, m->last_committed] only.
377   if (start != m->values.end() &&
378       start->first > last_committed + 1) {
379     // ignore everything if values start in the future.
380     dout(10) << "store_state ignoring all values, they start at " << start->first
381              << " > last_committed+1" << dendl;
382     return false;
383   }
384
385   // push forward the start position on the message's values iterator, up until
386   // we run out of positions or we find a position matching 'last_committed'.
387   while (start != m->values.end() && start->first <= last_committed) {
388     ++start;
389   }
390
391   // make sure we get the right interval of values to apply by pushing forward
392   // the 'end' iterator until it matches the message's 'last_committed'.
393   map<version_t,bufferlist>::iterator end = start;
394   while (end != m->values.end() && end->first <= m->last_committed) {
395     last_committed = end->first;
396     ++end;
397   }
398
399   if (start == end) {
400     dout(10) << "store_state nothing to commit" << dendl;
401   } else {
402     dout(10) << "store_state [" << start->first << ".." 
403              << last_committed << "]" << dendl;
404     t->put(get_name(), "last_committed", last_committed);
405
406     // we should apply the state here -- decode every single bufferlist in the
407     // map and append the transactions to 't'.
408     map<version_t,bufferlist>::iterator it;
409     for (it = start; it != end; ++it) {
410       // write the bufferlist as the version's value
411       t->put(get_name(), it->first, it->second);
412       // decode the bufferlist and append it to the transaction we will shortly
413       // apply.
414       decode_append_transaction(t, it->second);
415     }
416
417     // discard obsolete uncommitted value?
418     if (uncommitted_v && uncommitted_v <= last_committed) {
419       dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
420                << " pn " << uncommitted_pn << dendl;
421       uncommitted_v = 0;
422       uncommitted_pn = 0;
423       uncommitted_value.clear();
424     }
425   }
426   if (!t->empty()) {
427     dout(30) << __func__ << " transaction dump:\n";
428     JSONFormatter f(true);
429     t->dump(&f);
430     f.flush(*_dout);
431     *_dout << dendl;
432
433     logger->inc(l_paxos_store_state);
434     logger->inc(l_paxos_store_state_bytes, t->get_bytes());
435     logger->inc(l_paxos_store_state_keys, t->get_keys());
436     utime_t start = ceph_clock_now();
437
438     get_store()->apply_transaction(t);
439
440     utime_t end = ceph_clock_now();
441     logger->tinc(l_paxos_store_state_latency, end - start);
442
443     // refresh first_committed; this txn may have trimmed.
444     first_committed = get_store()->get(get_name(), "first_committed");
445
446     _sanity_check_store();
447     changed = true;
448   }
449
450   return changed;
451 }
452
453 void Paxos::_sanity_check_store()
454 {
455   version_t lc = get_store()->get(get_name(), "last_committed");
456   assert(lc == last_committed);
457 }
458
459
460 // leader
461 void Paxos::handle_last(MonOpRequestRef op)
462 {
463   op->mark_paxos_event("handle_last");
464   MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
465   bool need_refresh = false;
466   int from = last->get_source().num();
467
468   dout(10) << "handle_last " << *last << dendl;
469
470   if (!mon->is_leader()) {
471     dout(10) << "not leader, dropping" << dendl;
472     return;
473   }
474
475   // note peer's first_ and last_committed, in case we learn a new
476   // commit and need to push it to them.
477   peer_first_committed[from] = last->first_committed;
478   peer_last_committed[from] = last->last_committed;
479
480   if (last->first_committed > last_committed + 1) {
481     dout(5) << __func__
482             << " mon." << from
483             << " lowest version is too high for our last committed"
484             << " (theirs: " << last->first_committed
485             << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
486     op->mark_paxos_event("need to bootstrap");
487     mon->bootstrap();
488     return;
489   }
490
491   assert(g_conf->paxos_kill_at != 1);
492
493   // store any committed values if any are specified in the message
494   need_refresh = store_state(last);
495
496   assert(g_conf->paxos_kill_at != 2);
497
498   // is everyone contiguous and up to date?
499   for (map<int,version_t>::iterator p = peer_last_committed.begin();
500        p != peer_last_committed.end();
501        ++p) {
502     if (p->second + 1 < first_committed && first_committed > 1) {
503       dout(5) << __func__
504               << " peon " << p->first
505               << " last_committed (" << p->second
506               << ") is too low for our first_committed (" << first_committed
507               << ") -- bootstrap!" << dendl;
508       op->mark_paxos_event("need to bootstrap");
509       mon->bootstrap();
510       return;
511     }
512     if (p->second < last_committed) {
513       // share committed values
514       dout(10) << " sending commit to mon." << p->first << dendl;
515       MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
516                                         MMonPaxos::OP_COMMIT,
517                                         ceph_clock_now());
518       share_state(commit, peer_first_committed[p->first], p->second);
519       mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
520     }
521   }
522
523   // do they accept your pn?
524   if (last->pn > accepted_pn) {
525     // no, try again.
526     dout(10) << " they had a higher pn than us, picking a new one." << dendl;
527
528     // cancel timeout event
529     mon->timer.cancel_event(collect_timeout_event);
530     collect_timeout_event = 0;
531
532     collect(last->pn);
533   } else if (last->pn == accepted_pn) {
534     // yes, they accepted our pn.  great.
535     num_last++;
536     dout(10) << " they accepted our pn, we now have " 
537              << num_last << " peons" << dendl;
538
539     // did this person send back an accepted but uncommitted value?
540     if (last->uncommitted_pn) {
541       if (last->uncommitted_pn >= uncommitted_pn &&
542           last->last_committed >= last_committed &&
543           last->last_committed + 1 >= uncommitted_v) {
544         uncommitted_v = last->last_committed+1;
545         uncommitted_pn = last->uncommitted_pn;
546         uncommitted_value = last->values[uncommitted_v];
547         dout(10) << "we learned an uncommitted value for " << uncommitted_v
548                  << " pn " << uncommitted_pn
549                  << " " << uncommitted_value.length() << " bytes"
550                  << dendl;
551       } else {
552         dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
553                  << " pn " << last->uncommitted_pn
554                  << " " << last->values[last->last_committed+1].length() << " bytes"
555                  << dendl;
556       }
557     }
558     
559     // is that everyone?
560     if (num_last == mon->get_quorum().size()) {
561       // cancel timeout event
562       mon->timer.cancel_event(collect_timeout_event);
563       collect_timeout_event = 0;
564       peer_first_committed.clear();
565       peer_last_committed.clear();
566
567       // almost...
568
569       // did we learn an old value?
570       if (uncommitted_v == last_committed+1 &&
571           uncommitted_value.length()) {
572         dout(10) << "that's everyone.  begin on old learned value" << dendl;
573         state = STATE_UPDATING_PREVIOUS;
574         begin(uncommitted_value);
575       } else {
576         // active!
577         dout(10) << "that's everyone.  active!" << dendl;
578         extend_lease();
579
580         need_refresh = false;
581         if (do_refresh()) {
582           finish_round();
583         }
584       }
585     }
586   } else {
587     // no, this is an old message, discard
588     dout(10) << "old pn, ignoring" << dendl;
589   }
590
591   if (need_refresh)
592     (void)do_refresh();
593 }
594
595 void Paxos::collect_timeout()
596 {
597   dout(1) << "collect timeout, calling fresh election" << dendl;
598   collect_timeout_event = 0;
599   logger->inc(l_paxos_collect_timeout);
600   assert(mon->is_leader());
601   mon->bootstrap();
602 }
603
604
605 // leader
606 void Paxos::begin(bufferlist& v)
607 {
608   dout(10) << "begin for " << last_committed+1 << " " 
609            << v.length() << " bytes"
610            << dendl;
611
612   assert(mon->is_leader());
613   assert(is_updating() || is_updating_previous());
614
615   // we must already have a majority for this to work.
616   assert(mon->get_quorum().size() == 1 ||
617          num_last > (unsigned)mon->monmap->size()/2);
618   
619   // and no value, yet.
620   assert(new_value.length() == 0);
621
622   // accept it ourselves
623   accepted.clear();
624   accepted.insert(mon->rank);
625   new_value = v;
626
627   if (last_committed == 0) {
628     auto t(std::make_shared<MonitorDBStore::Transaction>());
629     // initial base case; set first_committed too
630     t->put(get_name(), "first_committed", 1);
631     decode_append_transaction(t, new_value);
632
633     bufferlist tx_bl;
634     t->encode(tx_bl);
635
636     new_value = tx_bl;
637   }
638
639   // store the proposed value in the store. IF it is accepted, we will then
640   // have to decode it into a transaction and apply it.
641   auto t(std::make_shared<MonitorDBStore::Transaction>());
642   t->put(get_name(), last_committed+1, new_value);
643
644   // note which pn this pending value is for.
645   t->put(get_name(), "pending_v", last_committed + 1);
646   t->put(get_name(), "pending_pn", accepted_pn);
647
648   dout(30) << __func__ << " transaction dump:\n";
649   JSONFormatter f(true);
650   t->dump(&f);
651   f.flush(*_dout);
652   auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
653   bufferlist::iterator new_value_it = new_value.begin();
654   debug_tx->decode(new_value_it);
655   debug_tx->dump(&f);
656   *_dout << "\nbl dump:\n";
657   f.flush(*_dout);
658   *_dout << dendl;
659
660   logger->inc(l_paxos_begin);
661   logger->inc(l_paxos_begin_keys, t->get_keys());
662   logger->inc(l_paxos_begin_bytes, t->get_bytes());
663   utime_t start = ceph_clock_now();
664
665   get_store()->apply_transaction(t);
666
667   utime_t end = ceph_clock_now();
668   logger->tinc(l_paxos_begin_latency, end - start);
669
670   assert(g_conf->paxos_kill_at != 3);
671
672   if (mon->get_quorum().size() == 1) {
673     // we're alone, take it easy
674     commit_start();
675     return;
676   }
677
678   // ask others to accept it too!
679   for (set<int>::const_iterator p = mon->get_quorum().begin();
680        p != mon->get_quorum().end();
681        ++p) {
682     if (*p == mon->rank) continue;
683     
684     dout(10) << " sending begin to mon." << *p << dendl;
685     MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
686                                      ceph_clock_now());
687     begin->values[last_committed+1] = new_value;
688     begin->last_committed = last_committed;
689     begin->pn = accepted_pn;
690     
691     mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
692   }
693
694   // set timeout event
695   accept_timeout_event = mon->timer.add_event_after(
696     g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
697     new C_MonContext(mon, [this](int r) {
698         if (r == -ECANCELED)
699           return;
700         accept_timeout();
701       }));
702 }
703
704 // peon
705 void Paxos::handle_begin(MonOpRequestRef op)
706 {
707   op->mark_paxos_event("handle_begin");
708   MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
709   dout(10) << "handle_begin " << *begin << dendl;
710
711   // can we accept this?
712   if (begin->pn < accepted_pn) {
713     dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
714     op->mark_paxos_event("have higher pn, ignore");
715     return;
716   }
717   assert(begin->pn == accepted_pn);
718   assert(begin->last_committed == last_committed);
719   
720   assert(g_conf->paxos_kill_at != 4);
721
722   logger->inc(l_paxos_begin);
723
724   // set state.
725   state = STATE_UPDATING;
726   lease_expire = utime_t();  // cancel lease
727
728   // yes.
729   version_t v = last_committed+1;
730   dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
731   // store the accepted value onto our store. We will have to decode it and
732   // apply its transaction once we receive permission to commit.
733   auto t(std::make_shared<MonitorDBStore::Transaction>());
734   t->put(get_name(), v, begin->values[v]);
735
736   // note which pn this pending value is for.
737   t->put(get_name(), "pending_v", v);
738   t->put(get_name(), "pending_pn", accepted_pn);
739
740   dout(30) << __func__ << " transaction dump:\n";
741   JSONFormatter f(true);
742   t->dump(&f);
743   f.flush(*_dout);
744   *_dout << dendl;
745
746   logger->inc(l_paxos_begin_bytes, t->get_bytes());
747   utime_t start = ceph_clock_now();
748
749   get_store()->apply_transaction(t);
750
751   utime_t end = ceph_clock_now();
752   logger->tinc(l_paxos_begin_latency, end - start);
753
754   assert(g_conf->paxos_kill_at != 5);
755
756   // reply
757   MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
758                                     ceph_clock_now());
759   accept->pn = accepted_pn;
760   accept->last_committed = last_committed;
761   begin->get_connection()->send_message(accept);
762 }
763
764 // leader
765 void Paxos::handle_accept(MonOpRequestRef op)
766 {
767   op->mark_paxos_event("handle_accept");
768   MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
769   dout(10) << "handle_accept " << *accept << dendl;
770   int from = accept->get_source().num();
771
772   if (accept->pn != accepted_pn) {
773     // we accepted a higher pn, from some other leader
774     dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
775     op->mark_paxos_event("have higher pn, ignore");
776     return;
777   }
778   if (last_committed > 0 &&
779       accept->last_committed < last_committed-1) {
780     dout(10) << " this is from an old round, ignoring" << dendl;
781     op->mark_paxos_event("old round, ignore");
782     return;
783   }
784   assert(accept->last_committed == last_committed ||   // not committed
785          accept->last_committed == last_committed-1);  // committed
786
787   assert(is_updating() || is_updating_previous());
788   assert(accepted.count(from) == 0);
789   accepted.insert(from);
790   dout(10) << " now " << accepted << " have accepted" << dendl;
791
792   assert(g_conf->paxos_kill_at != 6);
793
794   // only commit (and expose committed state) when we get *all* quorum
795   // members to accept.  otherwise, they may still be sharing the now
796   // stale state.
797   // FIXME: we can improve this with an additional lease revocation message
798   // that doesn't block for the persist.
799   if (accepted == mon->get_quorum()) {
800     // yay, commit!
801     dout(10) << " got majority, committing, done with update" << dendl;
802     op->mark_paxos_event("commit_start");
803     commit_start();
804   }
805 }
806
807 void Paxos::accept_timeout()
808 {
809   dout(1) << "accept timeout, calling fresh election" << dendl;
810   accept_timeout_event = 0;
811   assert(mon->is_leader());
812   assert(is_updating() || is_updating_previous() || is_writing() ||
813          is_writing_previous());
814   logger->inc(l_paxos_accept_timeout);
815   mon->bootstrap();
816 }
817
818 struct C_Committed : public Context {
819   Paxos *paxos;
820   explicit C_Committed(Paxos *p) : paxos(p) {}
821   void finish(int r) override {
822     assert(r >= 0);
823     Mutex::Locker l(paxos->mon->lock);
824     if (paxos->is_shutdown()) {
825       paxos->abort_commit();
826       return;
827     }
828     paxos->commit_finish();
829   }
830 };
831
832 void Paxos::abort_commit()
833 {
834   assert(commits_started > 0);
835   --commits_started;
836   if (commits_started == 0)
837     shutdown_cond.Signal();
838 }
839
840 void Paxos::commit_start()
841 {
842   dout(10) << __func__ << " " << (last_committed+1) << dendl;
843
844   assert(g_conf->paxos_kill_at != 7);
845
846   auto t(std::make_shared<MonitorDBStore::Transaction>());
847
848   // commit locally
849   t->put(get_name(), "last_committed", last_committed + 1);
850
851   // decode the value and apply its transaction to the store.
852   // this value can now be read from last_committed.
853   decode_append_transaction(t, new_value);
854
855   dout(30) << __func__ << " transaction dump:\n";
856   JSONFormatter f(true);
857   t->dump(&f);
858   f.flush(*_dout);
859   *_dout << dendl;
860
861   logger->inc(l_paxos_commit);
862   logger->inc(l_paxos_commit_keys, t->get_keys());
863   logger->inc(l_paxos_commit_bytes, t->get_bytes());
864   commit_start_stamp = ceph_clock_now();
865
866   get_store()->queue_transaction(t, new C_Committed(this));
867
868   if (is_updating_previous())
869     state = STATE_WRITING_PREVIOUS;
870   else if (is_updating())
871     state = STATE_WRITING;
872   else
873     ceph_abort();
874   ++commits_started;
875
876   if (mon->get_quorum().size() > 1) {
877     // cancel timeout event
878     mon->timer.cancel_event(accept_timeout_event);
879     accept_timeout_event = 0;
880   }
881 }
882
883 void Paxos::commit_finish()
884 {
885   dout(20) << __func__ << " " << (last_committed+1) << dendl;
886   utime_t end = ceph_clock_now();
887   logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
888
889   assert(g_conf->paxos_kill_at != 8);
890
891   // cancel lease - it was for the old value.
892   //  (this would only happen if message layer lost the 'begin', but
893   //   leader still got a majority and committed with out us.)
894   lease_expire = utime_t();  // cancel lease
895
896   last_committed++;
897   last_commit_time = ceph_clock_now();
898
899   // refresh first_committed; this txn may have trimmed.
900   first_committed = get_store()->get(get_name(), "first_committed");
901
902   _sanity_check_store();
903
904   // tell everyone
905   for (set<int>::const_iterator p = mon->get_quorum().begin();
906        p != mon->get_quorum().end();
907        ++p) {
908     if (*p == mon->rank) continue;
909
910     dout(10) << " sending commit to mon." << *p << dendl;
911     MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
912                                       ceph_clock_now());
913     commit->values[last_committed] = new_value;
914     commit->pn = accepted_pn;
915     commit->last_committed = last_committed;
916
917     mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
918   }
919
920   assert(g_conf->paxos_kill_at != 9);
921
922   // get ready for a new round.
923   new_value.clear();
924
925   // WRITING -> REFRESH
926   // among other things, this lets do_refresh() -> mon->bootstrap() know
927   // it doesn't need to flush the store queue
928   assert(is_writing() || is_writing_previous());
929   state = STATE_REFRESH;
930   assert(commits_started > 0);
931   --commits_started;
932
933   if (do_refresh()) {
934     commit_proposal();
935     if (mon->get_quorum().size() > 1) {
936       extend_lease();
937     }
938
939     finish_contexts(g_ceph_context, waiting_for_commit);
940
941     assert(g_conf->paxos_kill_at != 10);
942
943     finish_round();
944   }
945 }
946
947
948 void Paxos::handle_commit(MonOpRequestRef op)
949 {
950   op->mark_paxos_event("handle_commit");
951   MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
952   dout(10) << "handle_commit on " << commit->last_committed << dendl;
953
954   logger->inc(l_paxos_commit);
955
956   if (!mon->is_peon()) {
957     dout(10) << "not a peon, dropping" << dendl;
958     ceph_abort();
959     return;
960   }
961
962   op->mark_paxos_event("store_state");
963   store_state(commit);
964
965   if (do_refresh()) {
966     finish_contexts(g_ceph_context, waiting_for_commit);
967   }
968 }
969
970 void Paxos::extend_lease()
971 {
972   assert(mon->is_leader());
973   //assert(is_active());
974
975   lease_expire = ceph_clock_now();
976   lease_expire += g_conf->mon_lease;
977   acked_lease.clear();
978   acked_lease.insert(mon->rank);
979
980   dout(7) << "extend_lease now+" << g_conf->mon_lease 
981           << " (" << lease_expire << ")" << dendl;
982
983   // bcast
984   for (set<int>::const_iterator p = mon->get_quorum().begin();
985       p != mon->get_quorum().end(); ++p) {
986
987     if (*p == mon->rank) continue;
988     MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
989                                      ceph_clock_now());
990     lease->last_committed = last_committed;
991     lease->lease_timestamp = lease_expire;
992     lease->first_committed = first_committed;
993     mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
994   }
995
996   // set timeout event.
997   //  if old timeout is still in place, leave it.
998   if (!lease_ack_timeout_event) {
999     lease_ack_timeout_event = mon->timer.add_event_after(
1000       g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
1001       new C_MonContext(mon, [this](int r) {
1002           if (r == -ECANCELED)
1003             return;
1004           lease_ack_timeout();
1005         }));
1006   }
1007
1008   // set renew event
1009   utime_t at = lease_expire;
1010   at -= g_conf->mon_lease;
1011   at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
1012   lease_renew_event = mon->timer.add_event_at(
1013     at, new C_MonContext(mon, [this](int r) {
1014         if (r == -ECANCELED)
1015           return;
1016         lease_renew_timeout();
1017     }));
1018 }
1019
1020 void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
1021 {
1022   utime_t now = ceph_clock_now();
1023   if (t > now) {
1024     utime_t diff = t - now;
1025     if (diff > g_conf->mon_clock_drift_allowed) {
1026       utime_t warn_diff = now - last_clock_drift_warn;
1027       if (warn_diff >
1028           pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) {
1029         mon->clog->warn() << "message from " << from << " was stamped " << diff
1030                          << "s in the future, clocks not synchronized";
1031         last_clock_drift_warn = ceph_clock_now();
1032         ++clock_drift_warned;
1033       }
1034     }
1035   }
1036
1037 }
1038
1039 bool Paxos::do_refresh()
1040 {
1041   bool need_bootstrap = false;
1042
1043   utime_t start = ceph_clock_now();
1044
1045   // make sure we have the latest state loaded up
1046   mon->refresh_from_paxos(&need_bootstrap);
1047
1048   utime_t end = ceph_clock_now();
1049   logger->inc(l_paxos_refresh);
1050   logger->tinc(l_paxos_refresh_latency, end - start);
1051
1052   if (need_bootstrap) {
1053     dout(10) << " doing requested bootstrap" << dendl;
1054     mon->bootstrap();
1055     return false;
1056   }
1057
1058   return true;
1059 }
1060
1061 void Paxos::commit_proposal()
1062 {
1063   dout(10) << __func__ << dendl;
1064   assert(mon->is_leader());
1065   assert(is_refresh());
1066
1067   finish_contexts(g_ceph_context, committing_finishers);
1068 }
1069
1070 void Paxos::finish_round()
1071 {
1072   dout(10) << __func__ << dendl;
1073   assert(mon->is_leader());
1074
1075   // ok, now go active!
1076   state = STATE_ACTIVE;
1077
1078   dout(20) << __func__ << " waiting_for_acting" << dendl;
1079   finish_contexts(g_ceph_context, waiting_for_active);
1080   dout(20) << __func__ << " waiting_for_readable" << dendl;
1081   finish_contexts(g_ceph_context, waiting_for_readable);
1082   dout(20) << __func__ << " waiting_for_writeable" << dendl;
1083   finish_contexts(g_ceph_context, waiting_for_writeable);
1084   
1085   dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
1086
1087   if (should_trim()) {
1088     trim();
1089   }
1090
1091   if (is_active() && pending_proposal) {
1092     propose_pending();
1093   }
1094 }
1095
1096
1097 // peon
1098 void Paxos::handle_lease(MonOpRequestRef op)
1099 {
1100   op->mark_paxos_event("handle_lease");
1101   MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
1102   // sanity
1103   if (!mon->is_peon() ||
1104       last_committed != lease->last_committed) {
1105     dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1106              << " or the last_committed doesn't match, dropping" << dendl;
1107     op->mark_paxos_event("invalid lease, ignore");
1108     return;
1109   }
1110
1111   warn_on_future_time(lease->sent_timestamp, lease->get_source());
1112
1113   // extend lease
1114   if (lease_expire < lease->lease_timestamp) {
1115     lease_expire = lease->lease_timestamp;
1116
1117     utime_t now = ceph_clock_now();
1118     if (lease_expire < now) {
1119       utime_t diff = now - lease_expire;
1120       derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
1121     }
1122   }
1123
1124   state = STATE_ACTIVE;
1125
1126   dout(10) << "handle_lease on " << lease->last_committed
1127            << " now " << lease_expire << dendl;
1128
1129   // ack
1130   MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
1131                                  ceph_clock_now());
1132   ack->last_committed = last_committed;
1133   ack->first_committed = first_committed;
1134   ack->lease_timestamp = ceph_clock_now();
1135   ::encode(mon->session_map.feature_map, ack->feature_map);
1136   lease->get_connection()->send_message(ack);
1137
1138   // (re)set timeout event.
1139   reset_lease_timeout();
1140
1141   // kick waiters
1142   finish_contexts(g_ceph_context, waiting_for_active);
1143   if (is_readable())
1144     finish_contexts(g_ceph_context, waiting_for_readable);
1145 }
1146
1147 void Paxos::handle_lease_ack(MonOpRequestRef op)
1148 {
1149   op->mark_paxos_event("handle_lease_ack");
1150   MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
1151   int from = ack->get_source().num();
1152
1153   if (!lease_ack_timeout_event) {
1154     dout(10) << "handle_lease_ack from " << ack->get_source()
1155              << " -- stray (probably since revoked)" << dendl;
1156
1157   } else if (acked_lease.count(from) == 0) {
1158     acked_lease.insert(from);
1159     if (ack->feature_map.length()) {
1160       auto p = ack->feature_map.begin();
1161       FeatureMap& t = mon->quorum_feature_map[from];
1162       ::decode(t, p);
1163     }
1164     if (acked_lease == mon->get_quorum()) {
1165       // yay!
1166       dout(10) << "handle_lease_ack from " << ack->get_source()
1167                << " -- got everyone" << dendl;
1168       mon->timer.cancel_event(lease_ack_timeout_event);
1169       lease_ack_timeout_event = 0;
1170
1171
1172     } else {
1173       dout(10) << "handle_lease_ack from " << ack->get_source()
1174                << " -- still need "
1175                << mon->get_quorum().size() - acked_lease.size()
1176                << " more" << dendl;
1177     }
1178   } else {
1179     dout(10) << "handle_lease_ack from " << ack->get_source()
1180              << " dup (lagging!), ignoring" << dendl;
1181   }
1182
1183   warn_on_future_time(ack->sent_timestamp, ack->get_source());
1184 }
1185
1186 void Paxos::lease_ack_timeout()
1187 {
1188   dout(1) << "lease_ack_timeout -- calling new election" << dendl;
1189   assert(mon->is_leader());
1190   assert(is_active());
1191   logger->inc(l_paxos_lease_ack_timeout);
1192   lease_ack_timeout_event = 0;
1193   mon->bootstrap();
1194 }
1195
1196 void Paxos::reset_lease_timeout()
1197 {
1198   dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
1199   if (lease_timeout_event)
1200     mon->timer.cancel_event(lease_timeout_event);
1201   lease_timeout_event = mon->timer.add_event_after(
1202     g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
1203     new C_MonContext(mon, [this](int r) {
1204         if (r == -ECANCELED)
1205           return;
1206         lease_timeout();
1207       }));
1208 }
1209
1210 void Paxos::lease_timeout()
1211 {
1212   dout(1) << "lease_timeout -- calling new election" << dendl;
1213   assert(mon->is_peon());
1214   logger->inc(l_paxos_lease_timeout);
1215   lease_timeout_event = 0;
1216   mon->bootstrap();
1217 }
1218
1219 void Paxos::lease_renew_timeout()
1220 {
1221   lease_renew_event = 0;
1222   extend_lease();
1223 }
1224
1225
1226 /*
1227  * trim old states
1228  */
1229 void Paxos::trim()
1230 {
1231   assert(should_trim());
1232   version_t end = MIN(get_version() - g_conf->paxos_min,
1233                       get_first_committed() + g_conf->paxos_trim_max);
1234
1235   if (first_committed >= end)
1236     return;
1237
1238   dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
1239
1240   MonitorDBStore::TransactionRef t = get_pending_transaction();
1241
1242   for (version_t v = first_committed; v < end; ++v) {
1243     dout(10) << "trim " << v << dendl;
1244     t->erase(get_name(), v);
1245   }
1246   t->put(get_name(), "first_committed", end);
1247   if (g_conf->mon_compact_on_trim) {
1248     dout(10) << " compacting trimmed range" << dendl;
1249     t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
1250   }
1251
1252   trimming = true;
1253   queue_pending_finisher(new C_Trimmed(this));
1254 }
1255
1256 /*
1257  * return a globally unique, monotonically increasing proposal number
1258  */
1259 version_t Paxos::get_new_proposal_number(version_t gt)
1260 {
1261   if (last_pn < gt) 
1262     last_pn = gt;
1263   
1264   // update. make it unique among all monitors.
1265   last_pn /= 100;
1266   last_pn++;
1267   last_pn *= 100;
1268   last_pn += (version_t)mon->rank;
1269
1270   // write
1271   auto t(std::make_shared<MonitorDBStore::Transaction>());
1272   t->put(get_name(), "last_pn", last_pn);
1273
1274   dout(30) << __func__ << " transaction dump:\n";
1275   JSONFormatter f(true);
1276   t->dump(&f);
1277   f.flush(*_dout);
1278   *_dout << dendl;
1279
1280   logger->inc(l_paxos_new_pn);
1281   utime_t start = ceph_clock_now();
1282
1283   get_store()->apply_transaction(t);
1284
1285   utime_t end = ceph_clock_now();
1286   logger->tinc(l_paxos_new_pn_latency, end - start);
1287
1288   dout(10) << "get_new_proposal_number = " << last_pn << dendl;
1289   return last_pn;
1290 }
1291
1292
1293 void Paxos::cancel_events()
1294 {
1295   if (collect_timeout_event) {
1296     mon->timer.cancel_event(collect_timeout_event);
1297     collect_timeout_event = 0;
1298   }
1299   if (accept_timeout_event) {
1300     mon->timer.cancel_event(accept_timeout_event);
1301     accept_timeout_event = 0;
1302   }
1303   if (lease_renew_event) {
1304     mon->timer.cancel_event(lease_renew_event);
1305     lease_renew_event = 0;
1306   }
1307   if (lease_ack_timeout_event) {
1308     mon->timer.cancel_event(lease_ack_timeout_event);
1309     lease_ack_timeout_event = 0;
1310   }  
1311   if (lease_timeout_event) {
1312     mon->timer.cancel_event(lease_timeout_event);
1313     lease_timeout_event = 0;
1314   }
1315 }
1316
1317 void Paxos::shutdown()
1318 {
1319   dout(10) << __func__ << " cancel all contexts" << dendl;
1320
1321   state = STATE_SHUTDOWN;
1322
1323   // discard pending transaction
1324   pending_proposal.reset();
1325
1326   // Let store finish commits in progress
1327   // XXX: I assume I can't use finish_contexts() because the store
1328   // is going to trigger
1329   while(commits_started > 0)
1330     shutdown_cond.Wait(mon->lock);
1331
1332   finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
1333   finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
1334   finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
1335   finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
1336   finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
1337   finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
1338   if (logger)
1339     g_ceph_context->get_perfcounters_collection()->remove(logger);
1340   delete logger;
1341 }
1342
1343 void Paxos::leader_init()
1344 {
1345   cancel_events();
1346   new_value.clear();
1347
1348   // discard pending transaction
1349   pending_proposal.reset();
1350
1351   finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1352   finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1353
1354   logger->inc(l_paxos_start_leader);
1355
1356   if (mon->get_quorum().size() == 1) {
1357     state = STATE_ACTIVE;
1358     return;
1359   }
1360
1361   state = STATE_RECOVERING;
1362   lease_expire = utime_t();
1363   dout(10) << "leader_init -- starting paxos recovery" << dendl;
1364   collect(0);
1365 }
1366
1367 void Paxos::peon_init()
1368 {
1369   cancel_events();
1370   new_value.clear();
1371
1372   state = STATE_RECOVERING;
1373   lease_expire = utime_t();
1374   dout(10) << "peon_init -- i am a peon" << dendl;
1375
1376   // start a timer, in case the leader never manages to issue a lease
1377   reset_lease_timeout();
1378
1379   // discard pending transaction
1380   pending_proposal.reset();
1381
1382   // no chance to write now!
1383   finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
1384   finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1385   finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1386   finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1387
1388   logger->inc(l_paxos_start_peon);
1389 }
1390
1391 void Paxos::restart()
1392 {
1393   dout(10) << "restart -- canceling timeouts" << dendl;
1394   cancel_events();
1395   new_value.clear();
1396
1397   if (is_writing() || is_writing_previous()) {
1398     dout(10) << __func__ << " flushing" << dendl;
1399     mon->lock.Unlock();
1400     mon->store->flush();
1401     mon->lock.Lock();
1402     dout(10) << __func__ << " flushed" << dendl;
1403   }
1404   state = STATE_RECOVERING;
1405
1406   // discard pending transaction
1407   pending_proposal.reset();
1408
1409   finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1410   finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1411   finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1412   finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
1413
1414   logger->inc(l_paxos_restart);
1415 }
1416
1417
1418 void Paxos::dispatch(MonOpRequestRef op)
1419 {
1420   assert(op->is_type_paxos());
1421   op->mark_paxos_event("dispatch");
1422   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
1423   // election in progress?
1424   if (!mon->is_leader() && !mon->is_peon()) {
1425     dout(5) << "election in progress, dropping " << *m << dendl;
1426     return;    
1427   }
1428
1429   // check sanity
1430   assert(mon->is_leader() || 
1431          (mon->is_peon() && m->get_source().num() == mon->get_leader()));
1432   
1433   switch (m->get_type()) {
1434
1435   case MSG_MON_PAXOS:
1436     {
1437       MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);
1438
1439       // NOTE: these ops are defined in messages/MMonPaxos.h
1440       switch (pm->op) {
1441         // learner
1442       case MMonPaxos::OP_COLLECT:
1443         handle_collect(op);
1444         break;
1445       case MMonPaxos::OP_LAST:
1446         handle_last(op);
1447         break;
1448       case MMonPaxos::OP_BEGIN:
1449         handle_begin(op);
1450         break;
1451       case MMonPaxos::OP_ACCEPT:
1452         handle_accept(op);
1453         break;          
1454       case MMonPaxos::OP_COMMIT:
1455         handle_commit(op);
1456         break;
1457       case MMonPaxos::OP_LEASE:
1458         handle_lease(op);
1459         break;
1460       case MMonPaxos::OP_LEASE_ACK:
1461         handle_lease_ack(op);
1462         break;
1463       default:
1464         ceph_abort();
1465       }
1466     }
1467     break;
1468     
1469   default:
1470     ceph_abort();
1471   }
1472 }
1473
1474
1475 // -----------------
1476 // service interface
1477
1478 // -- READ --
1479
1480 bool Paxos::is_readable(version_t v)
1481 {
1482   bool ret;
1483   if (v > last_committed)
1484     ret = false;
1485   else
1486     ret =
1487       (mon->is_peon() || mon->is_leader()) &&
1488       (is_active() || is_updating() || is_writing()) &&
1489       last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
1490   dout(5) << __func__ << " = " << (int)ret
1491           << " - now=" << ceph_clock_now()
1492           << " lease_expire=" << lease_expire
1493           << " has v" << v << " lc " << last_committed
1494           << dendl;
1495   return ret;
1496 }
1497
1498 bool Paxos::read(version_t v, bufferlist &bl)
1499 {
1500   if (!get_store()->get(get_name(), v, bl))
1501     return false;
1502   return true;
1503 }
1504
1505 version_t Paxos::read_current(bufferlist &bl)
1506 {
1507   if (read(last_committed, bl))
1508     return last_committed;
1509   return 0;
1510 }
1511
1512
1513 bool Paxos::is_lease_valid()
1514 {
1515   return ((mon->get_quorum().size() == 1)
1516       || (ceph_clock_now() < lease_expire));
1517 }
1518
1519 // -- WRITE --
1520
1521 bool Paxos::is_writeable()
1522 {
1523   return
1524     mon->is_leader() &&
1525     is_active() &&
1526     is_lease_valid();
1527 }
1528
1529 void Paxos::propose_pending()
1530 {
1531   assert(is_active());
1532   assert(pending_proposal);
1533
1534   cancel_events();
1535
1536   bufferlist bl;
1537   pending_proposal->encode(bl);
1538
1539   dout(10) << __func__ << " " << (last_committed + 1)
1540            << " " << bl.length() << " bytes" << dendl;
1541   dout(30) << __func__ << " transaction dump:\n";
1542   JSONFormatter f(true);
1543   pending_proposal->dump(&f);
1544   f.flush(*_dout);
1545   *_dout << dendl;
1546
1547   pending_proposal.reset();
1548
1549   committing_finishers.swap(pending_finishers);
1550   state = STATE_UPDATING;
1551   begin(bl);
1552 }
1553
1554 void Paxos::queue_pending_finisher(Context *onfinished)
1555 {
1556   dout(5) << __func__ << " " << onfinished << dendl;
1557   assert(onfinished);
1558   pending_finishers.push_back(onfinished);
1559 }
1560
1561 MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
1562 {
1563   assert(mon->is_leader());
1564   if (!pending_proposal) {
1565     pending_proposal.reset(new MonitorDBStore::Transaction);
1566     assert(pending_finishers.empty());
1567   }
1568   return pending_proposal;
1569 }
1570
1571 bool Paxos::trigger_propose()
1572 {
1573   if (plugged) {
1574     dout(10) << __func__ << " plugged, not proposing now" << dendl;
1575     return false;
1576   } else if (is_active()) {
1577     dout(10) << __func__ << " active, proposing now" << dendl;
1578     propose_pending();
1579     return true;
1580   } else {
1581     dout(10) << __func__ << " not active, will propose later" << dendl;
1582     return false;
1583   }
1584 }
1585
1586 bool Paxos::is_consistent()
1587 {
1588   return (first_committed <= last_committed);
1589 }
1590