Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / PaxosService.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "PaxosService.h"
16 #include "common/Clock.h"
17 #include "common/config.h"
18 #include "include/stringify.h"
19 #include "include/assert.h"
20 #include "mon/MonOpRequest.h"
21
22 #define dout_subsys ceph_subsys_paxos
23 #undef dout_prefix
24 #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
25 static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name,
26                         version_t fc, version_t lc) {
27   return *_dout << "mon." << mon->name << "@" << mon->rank
28                 << "(" << mon->get_state_name()
29                 << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
30 }
31
32 bool PaxosService::dispatch(MonOpRequestRef op)
33 {
34   assert(op->is_type_service() || op->is_type_command());
35   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
36   op->mark_event("psvc:dispatch");
37
38   dout(10) << __func__ << " " << m << " " << *m
39            << " from " << m->get_orig_source_inst()
40            << " con " << m->get_connection() << dendl;
41
42   if (mon->is_shutdown()) {
43     return true;
44   }
45
46   // make sure this message isn't forwarded from a previous election epoch
47   if (m->rx_election_epoch &&
48       m->rx_election_epoch < mon->get_epoch()) {
49     dout(10) << " discarding forwarded message from previous election epoch "
50              << m->rx_election_epoch << " < " << mon->get_epoch() << dendl;
51     return true;
52   }
53
54   // make sure the client is still connected.  note that a proxied
55   // connection will be disconnected with a null message; don't drop
56   // those.  also ignore loopback (e.g., log) messages.
57   if (m->get_connection() &&
58       !m->get_connection()->is_connected() &&
59       m->get_connection() != mon->con_self &&
60       m->get_connection()->get_messenger() != NULL) {
61     dout(10) << " discarding message from disconnected client "
62              << m->get_source_inst() << " " << *m << dendl;
63     return true;
64   }
65
66   // make sure our map is readable and up to date
67   if (!is_readable(m->version)) {
68     dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;
69     wait_for_readable(op, new C_RetryMessage(this, op), m->version);
70     return true;
71   }
72
73   // preprocess
74   if (preprocess_query(op)) 
75     return true;  // easy!
76
77   // leader?
78   if (!mon->is_leader()) {
79     mon->forward_request_leader(op);
80     return true;
81   }
82   
83   // writeable?
84   if (!is_writeable()) {
85     dout(10) << " waiting for paxos -> writeable" << dendl;
86     wait_for_writeable(op, new C_RetryMessage(this, op));
87     return true;
88   }
89
90   // update
91   if (!prepare_update(op)) {
92     // no changes made.
93     return true;
94   }
95
96   if (need_immediate_propose) {
97     dout(10) << __func__ << " forced immediate propose" << dendl;
98     need_immediate_propose = false;
99     propose_pending();
100     return true;
101   }
102
103   double delay = 0.0;
104   if (!should_propose(delay)) {
105     dout(10) << " not proposing" << dendl;
106     return true;
107   }
108
109   if (delay == 0.0) {
110     propose_pending();
111     return true;
112   }
113
114   // delay a bit
115   if (!proposal_timer) {
116     /**
117        * Callback class used to propose the pending value once the proposal_timer
118        * fires up.
119        */
120     auto do_propose = new C_MonContext(mon, [this](int r) {
121         proposal_timer = 0;
122         if (r >= 0) {
123           propose_pending();
124         } else if (r == -ECANCELED || r == -EAGAIN) {
125           return;
126         } else {
127           assert(0 == "bad return value for proposal_timer");
128         }
129     });
130     dout(10) << " setting proposal_timer " << do_propose
131              << " with delay of " << delay << dendl;
132     proposal_timer = mon->timer.add_event_after(delay, do_propose);
133   } else {
134     dout(10) << " proposal_timer already set" << dendl;
135   }
136   return true;
137 }
138
139 void PaxosService::refresh(bool *need_bootstrap)
140 {
141   // update cached versions
142   cached_first_committed = mon->store->get(get_service_name(), first_committed_name);
143   cached_last_committed = mon->store->get(get_service_name(), last_committed_name);
144
145   version_t new_format = get_value("format_version");
146   if (new_format != format_version) {
147     dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl;
148     on_upgrade();
149   }
150   format_version = new_format;
151
152   dout(10) << __func__ << dendl;
153
154   update_from_paxos(need_bootstrap);
155 }
156
157 void PaxosService::post_refresh()
158 {
159   dout(10) << __func__ << dendl;
160
161   post_paxos_update();
162
163   if (mon->is_peon() && !waiting_for_finished_proposal.empty()) {
164     finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
165   }
166 }
167
168 bool PaxosService::should_propose(double& delay)
169 {
170   // simple default policy: quick startup, then some damping.
171   if (get_last_committed() <= 1) {
172     delay = 0.0;
173   } else {
174     utime_t now = ceph_clock_now();
175     if ((now - paxos->last_commit_time) > g_conf->paxos_propose_interval)
176       delay = (double)g_conf->paxos_min_wait;
177     else
178       delay = (double)(g_conf->paxos_propose_interval + paxos->last_commit_time
179                        - now);
180   }
181   return true;
182 }
183
184
185 void PaxosService::propose_pending()
186 {
187   dout(10) << __func__ << dendl;
188   assert(have_pending);
189   assert(!proposing);
190   assert(mon->is_leader());
191   assert(is_active());
192
193   if (proposal_timer) {
194     dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
195     mon->timer.cancel_event(proposal_timer);
196     proposal_timer = NULL;
197   }
198
199   /**
200    * @note What we contribute to the pending Paxos transaction is
201    *       obtained by calling a function that must be implemented by
202    *       the class implementing us.  I.e., the function
203    *       encode_pending will be the one responsible to encode
204    *       whatever is pending on the implementation class into a
205    *       bufferlist, so we can then propose that as a value through
206    *       Paxos.
207    */
208   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
209
210   if (should_stash_full())
211     encode_full(t);
212
213   encode_pending(t);
214   have_pending = false;
215
216   if (format_version > 0) {
217     t->put(get_service_name(), "format_version", format_version);
218   }
219
220   // apply to paxos
221   proposing = true;
222   /**
223    * Callback class used to mark us as active once a proposal finishes going
224    * through Paxos.
225    *
226    * We should wake people up *only* *after* we inform the service we
227    * just went active. And we should wake people up only once we finish
228    * going active. This is why we first go active, avoiding to wake up the
229    * wrong people at the wrong time, such as waking up a C_RetryMessage
230    * before waking up a C_Active, thus ending up without a pending value.
231    */
232   class C_Committed : public Context {
233     PaxosService *ps;
234   public:
235     explicit C_Committed(PaxosService *p) : ps(p) { }
236     void finish(int r) override {
237       ps->proposing = false;
238       if (r >= 0)
239         ps->_active();
240       else if (r == -ECANCELED || r == -EAGAIN)
241         return;
242       else
243         assert(0 == "bad return value for C_Committed");
244     }
245   };
246   paxos->queue_pending_finisher(new C_Committed(this));
247   paxos->trigger_propose();
248 }
249
250 bool PaxosService::should_stash_full()
251 {
252   version_t latest_full = get_version_latest_full();
253   /* @note The first member of the condition is moot and it is here just for
254    *       clarity's sake. The second member would end up returing true
255    *       nonetheless because, in that event,
256    *          latest_full == get_trim_to() == 0.
257    */
258   return (!latest_full ||
259           (latest_full <= get_trim_to()) ||
260           (get_last_committed() - latest_full > (version_t)g_conf->paxos_stash_full_interval));
261 }
262
263 void PaxosService::restart()
264 {
265   dout(10) << __func__ << dendl;
266   if (proposal_timer) {
267     dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
268     mon->timer.cancel_event(proposal_timer);
269     proposal_timer = 0;
270   }
271
272   finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
273
274   if (have_pending) {
275     discard_pending();
276     have_pending = false;
277   }
278   proposing = false;
279
280   on_restart();
281 }
282
283 void PaxosService::election_finished()
284 {
285   dout(10) << __func__ << dendl;
286
287   finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
288
289   // make sure we update our state
290   _active();
291 }
292
293 void PaxosService::_active()
294 {
295   if (is_proposing()) {
296     dout(10) << __func__ << " - proposing" << dendl;
297     return;
298   }
299   if (!is_active()) {
300     dout(10) << __func__ << " - not active" << dendl;
301     /**
302      * Callback used to make sure we call the PaxosService::_active function
303      * whenever a condition is fulfilled.
304      *
305      * This is used in multiple situations, from waiting for the Paxos to commit
306      * our proposed value, to waiting for the Paxos to become active once an
307      * election is finished.
308      */
309     class C_Active : public Context {
310       PaxosService *svc;
311     public:
312       explicit C_Active(PaxosService *s) : svc(s) {}
313       void finish(int r) override {
314         if (r >= 0)
315           svc->_active();
316       }
317     };
318     wait_for_active_ctx(new C_Active(this));
319     return;
320   }
321   dout(10) << __func__ << dendl;
322
323   // create pending state?
324   if (mon->is_leader()) {
325     dout(7) << __func__ << " creating new pending" << dendl;
326     if (!have_pending) {
327       create_pending();
328       have_pending = true;
329     }
330
331     if (get_last_committed() == 0) {
332       // create initial state
333       create_initial();
334       propose_pending();
335       return;
336     }
337   } else {
338     if (!mon->is_leader()) {
339       dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl;
340     }
341   }
342
343   // wake up anyone who came in while we were proposing.  note that
344   // anyone waiting for the previous proposal to commit is no longer
345   // on this list; it is on Paxos's.
346   finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0);
347
348   if (mon->is_leader())
349     upgrade_format();
350
351   // NOTE: it's possible that this will get called twice if we commit
352   // an old paxos value.  Implementations should be mindful of that.
353   on_active();
354 }
355
356
357 void PaxosService::shutdown()
358 {
359   cancel_events();
360
361   if (proposal_timer) {
362     dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
363     mon->timer.cancel_event(proposal_timer);
364     proposal_timer = 0;
365   }
366
367   finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
368
369   on_shutdown();
370 }
371
372 void PaxosService::maybe_trim()
373 {
374   if (!is_writeable())
375     return;
376
377   version_t trim_to = get_trim_to();
378   if (trim_to < get_first_committed())
379     return;
380
381   version_t to_remove = trim_to - get_first_committed();
382   if (g_conf->paxos_service_trim_min > 0 &&
383       to_remove < (version_t)g_conf->paxos_service_trim_min) {
384     dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
385              << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl;
386     return;
387   }
388
389   if (g_conf->paxos_service_trim_max > 0 &&
390       to_remove > (version_t)g_conf->paxos_service_trim_max) {
391     dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
392              << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max
393              << dendl;
394     trim_to = get_first_committed() + g_conf->paxos_service_trim_max;
395     to_remove = g_conf->paxos_service_trim_max;
396   }
397
398   dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
399   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
400   trim(t, get_first_committed(), trim_to);
401   put_first_committed(t, trim_to);
402
403   // let the service add any extra stuff
404   encode_trim_extra(t, trim_to);
405
406   paxos->trigger_propose();
407 }
408
409 void PaxosService::trim(MonitorDBStore::TransactionRef t,
410                         version_t from, version_t to)
411 {
412   dout(10) << __func__ << " from " << from << " to " << to << dendl;
413   assert(from != to);
414
415   for (version_t v = from; v < to; ++v) {
416     dout(20) << __func__ << " " << v << dendl;
417     t->erase(get_service_name(), v);
418
419     string full_key = mon->store->combine_strings("full", v);
420     if (mon->store->exists(get_service_name(), full_key)) {
421       dout(20) << __func__ << " " << full_key << dendl;
422       t->erase(get_service_name(), full_key);
423     }
424   }
425   if (g_conf->mon_compact_on_trim) {
426     dout(20) << " compacting prefix " << get_service_name() << dendl;
427     t->compact_range(get_service_name(), stringify(from - 1), stringify(to));
428     t->compact_range(get_service_name(),
429                      mon->store->combine_strings(full_prefix_name, from - 1),
430                      mon->store->combine_strings(full_prefix_name, to));
431   }
432 }
433
434 void PaxosService::load_health()
435 {
436   bufferlist bl;
437   mon->store->get("health", service_name, bl);
438   if (bl.length()) {
439     auto p = bl.begin();
440     ::decode(health_checks, p);
441   }
442 }