Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <random>
16
17 #include "include/scope_guard.h"
18
19 #include "messages/MMonGetMap.h"
20 #include "messages/MMonGetVersion.h"
21 #include "messages/MMonGetVersionReply.h"
22 #include "messages/MMonMap.h"
23 #include "messages/MAuth.h"
24 #include "messages/MLogAck.h"
25 #include "messages/MAuthReply.h"
26 #include "messages/MMonCommand.h"
27 #include "messages/MMonCommandAck.h"
28 #include "messages/MPing.h"
29
30 #include "messages/MMonSubscribe.h"
31 #include "messages/MMonSubscribeAck.h"
32 #include "common/errno.h"
33 #include "common/LogClient.h"
34
35 #include "MonClient.h"
36 #include "MonMap.h"
37
38 #include "auth/Auth.h"
39 #include "auth/KeyRing.h"
40 #include "auth/AuthClientHandler.h"
41 #include "auth/AuthMethodList.h"
42 #include "auth/RotatingKeyRing.h"
43
44 #define dout_subsys ceph_subsys_monc
45 #undef dout_prefix
46 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
47
48 MonClient::MonClient(CephContext *cct_) :
49   Dispatcher(cct_),
50   messenger(NULL),
51   monc_lock("MonClient::monc_lock"),
52   timer(cct_, monc_lock),
53   finisher(cct_),
54   initialized(false),
55   no_keyring_disabled_cephx(false),
56   log_client(NULL),
57   more_log_pending(false),
58   want_monmap(true),
59   had_a_connection(false),
60   reopen_interval_multiplier(
61     cct_->_conf->get_val<double>("mon_client_hunt_interval_min_multiple")),
62   last_mon_command_tid(0),
63   version_req_id(0)
64 {
65 }
66
67 MonClient::~MonClient()
68 {
69 }
70
71 int MonClient::build_initial_monmap()
72 {
73   ldout(cct, 10) << __func__ << dendl;
74   return monmap.build_initial(cct, cerr);
75 }
76
77 int MonClient::get_monmap()
78 {
79   ldout(cct, 10) << __func__ << dendl;
80   Mutex::Locker l(monc_lock);
81   
82   _sub_want("monmap", 0, 0);
83   if (!_opened())
84     _reopen_session();
85
86   while (want_monmap)
87     map_cond.Wait(monc_lock);
88
89   ldout(cct, 10) << __func__ << " done" << dendl;
90   return 0;
91 }
92
93 int MonClient::get_monmap_privately()
94 {
95   ldout(cct, 10) << __func__ << dendl;
96   Mutex::Locker l(monc_lock);
97
98   bool temp_msgr = false;
99   Messenger* smessenger = NULL;
100   if (!messenger) {
101     messenger = smessenger = Messenger::create_client_messenger(cct, "temp_mon_client");
102     if (NULL == messenger) {
103         return -1;
104     }
105     messenger->add_dispatcher_head(this);
106     smessenger->start();
107     temp_msgr = true;
108   }
109
110   int attempt = 10;
111
112   ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
113
114   std::random_device rd;
115   std::mt19937 rng(rd());
116   assert(monmap.size() > 0);
117   std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
118   while (monmap.fsid.is_zero()) {
119     auto rank = ranks(rng);
120     auto& pending_con = _add_conn(rank, 0);
121     auto con = pending_con.get_con();
122     ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
123                    << con->get_peer_addr() << dendl;
124     con->send_message(new MMonGetMap);
125
126     if (--attempt == 0)
127       break;
128
129     utime_t interval;
130     interval.set_from_double(cct->_conf->mon_client_hunt_interval);
131     map_cond.WaitInterval(monc_lock, interval);
132
133     if (monmap.fsid.is_zero() && con) {
134       con->mark_down();  // nope, clean that connection up
135     }
136   }
137
138   if (temp_msgr) {
139     pending_cons.clear();
140     monc_lock.Unlock();
141     messenger->shutdown();
142     if (smessenger)
143       smessenger->wait();
144     delete messenger;
145     messenger = 0;
146     monc_lock.Lock();
147   }
148
149   pending_cons.clear();
150
151   if (!monmap.fsid.is_zero())
152     return 0;
153   return -1;
154 }
155
156
157 /**
158  * Ping the monitor with id @p mon_id and set the resulting reply in
159  * the provided @p result_reply, if this last parameter is not NULL.
160  *
161  * So that we don't rely on the MonClient's default messenger, set up
162  * during connect(), we create our own messenger to comunicate with the
163  * specified monitor.  This is advantageous in the following ways:
164  *
165  * - Isolate the ping procedure from the rest of the MonClient's operations,
166  *   allowing us to not acquire or manage the big monc_lock, thus not
167  *   having to block waiting for some other operation to finish before we
168  *   can proceed.
169  *   * for instance, we can ping mon.FOO even if we are currently hunting
170  *     or blocked waiting for auth to complete with mon.BAR.
171  *
172  * - Ping a monitor prior to establishing a connection (using connect())
173  *   and properly establish the MonClient's messenger.  This frees us
174  *   from dealing with the complex foo that happens in connect().
175  *
176  * We also don't rely on MonClient as a dispatcher for this messenger,
177  * unlike what happens with the MonClient's default messenger.  This allows
178  * us to sandbox the whole ping, having it much as a separate entity in
179  * the MonClient class, considerably simplifying the handling and dispatching
180  * of messages without needing to consider monc_lock.
181  *
182  * Current drawback is that we will establish a messenger for each ping
183  * we want to issue, instead of keeping a single messenger instance that
184  * would be used for all pings.
185  */
186 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
187 {
188   ldout(cct, 10) << __func__ << dendl;
189
190   string new_mon_id;
191   if (monmap.contains("noname-"+mon_id)) {
192     new_mon_id = "noname-"+mon_id;
193   } else {
194     new_mon_id = mon_id;
195   }
196
197   if (new_mon_id.empty()) {
198     ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
199     return -EINVAL;
200   } else if (!monmap.contains(new_mon_id)) {
201     ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
202                    << dendl;
203     return -ENOENT;
204   }
205
206   MonClientPinger *pinger = new MonClientPinger(cct, result_reply);
207
208   Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
209   smsgr->add_dispatcher_head(pinger);
210   smsgr->start();
211
212   ConnectionRef con = smsgr->get_connection(monmap.get_inst(new_mon_id));
213   ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
214                  << " " << con->get_peer_addr() << dendl;
215   con->send_message(new MPing);
216
217   pinger->lock.Lock();
218   int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout);
219   if (ret == 0) {
220     ldout(cct,10) << __func__ << " got ping reply" << dendl;
221   } else {
222     ret = -ret;
223   }
224   pinger->lock.Unlock();
225
226   con->mark_down();
227   smsgr->shutdown();
228   smsgr->wait();
229   delete smsgr;
230   delete pinger;
231   return ret;
232 }
233
234 bool MonClient::ms_dispatch(Message *m)
235 {
236   if (my_addr == entity_addr_t())
237     my_addr = messenger->get_myaddr();
238
239   // we only care about these message types
240   switch (m->get_type()) {
241   case CEPH_MSG_MON_MAP:
242   case CEPH_MSG_AUTH_REPLY:
243   case CEPH_MSG_MON_SUBSCRIBE_ACK:
244   case CEPH_MSG_MON_GET_VERSION_REPLY:
245   case MSG_MON_COMMAND_ACK:
246   case MSG_LOGACK:
247     break;
248   default:
249     return false;
250   }
251
252   Mutex::Locker lock(monc_lock);
253
254   if (_hunting()) {
255     auto pending_con = pending_cons.find(m->get_source_addr());
256     if (pending_con == pending_cons.end() ||
257         pending_con->second.get_con() != m->get_connection()) {
258       // ignore any messages outside hunting sessions
259       ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
260       m->put();
261       return true;
262     }
263   } else if (!active_con || active_con->get_con() != m->get_connection()) {
264     // ignore any messages outside our session(s)
265     ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
266     m->put();
267     return true;
268   }
269
270   switch (m->get_type()) {
271   case CEPH_MSG_MON_MAP:
272     handle_monmap(static_cast<MMonMap*>(m));
273     if (passthrough_monmap) {
274       return false;
275     } else {
276       m->put();
277     }
278     break;
279   case CEPH_MSG_AUTH_REPLY:
280     handle_auth(static_cast<MAuthReply*>(m));
281     break;
282   case CEPH_MSG_MON_SUBSCRIBE_ACK:
283     handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
284     break;
285   case CEPH_MSG_MON_GET_VERSION_REPLY:
286     handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
287     break;
288   case MSG_MON_COMMAND_ACK:
289     handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
290     break;
291   case MSG_LOGACK:
292     if (log_client) {
293       log_client->handle_log_ack(static_cast<MLogAck*>(m));
294       m->put();
295       if (more_log_pending) {
296         send_log();
297       }
298     } else {
299       m->put();
300     }
301     break;
302   }
303   return true;
304 }
305
306 void MonClient::send_log(bool flush)
307 {
308   if (log_client) {
309     Message *lm = log_client->get_mon_log_message(flush);
310     if (lm)
311       _send_mon_message(lm);
312     more_log_pending = log_client->are_pending();
313   }
314 }
315
316 void MonClient::flush_log()
317 {
318   Mutex::Locker l(monc_lock);
319   send_log();
320 }
321
322 /* Unlike all the other message-handling functions, we don't put away a reference
323 * because we want to support MMonMap passthrough to other Dispatchers. */
324 void MonClient::handle_monmap(MMonMap *m)
325 {
326   ldout(cct, 10) << __func__ << " " << *m << dendl;
327   auto peer = m->get_source_addr();
328   string cur_mon = monmap.get_name(peer);
329
330   bufferlist::iterator p = m->monmapbl.begin();
331   ::decode(monmap, p);
332
333   ldout(cct, 10) << " got monmap " << monmap.epoch
334                  << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
335                  << dendl;
336   ldout(cct, 10) << "dump:\n";
337   monmap.print(*_dout);
338   *_dout << dendl;
339
340   _sub_got("monmap", monmap.get_epoch());
341
342   if (!monmap.get_addr_name(peer, cur_mon)) {
343     ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
344     // can't find the mon we were talking to (above)
345     _reopen_session();
346   }
347
348   map_cond.Signal();
349   want_monmap = false;
350 }
351
352 // ----------------------
353
354 int MonClient::init()
355 {
356   ldout(cct, 10) << __func__ << dendl;
357
358   messenger->add_dispatcher_head(this);
359
360   entity_name = cct->_conf->name;
361
362   Mutex::Locker l(monc_lock);
363
364   string method;
365   if (!cct->_conf->auth_supported.empty())
366     method = cct->_conf->auth_supported;
367   else if (entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
368            entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
369            entity_name.get_type() == CEPH_ENTITY_TYPE_MON)
370     method = cct->_conf->auth_cluster_required;
371   else
372     method = cct->_conf->auth_client_required;
373   auth_supported.reset(new AuthMethodList(cct, method));
374   ldout(cct, 10) << "auth_supported " << auth_supported->get_supported_set() << " method " << method << dendl;
375
376   int r = 0;
377   keyring.reset(new KeyRing); // initializing keyring anyway
378
379   if (auth_supported->is_supported_auth(CEPH_AUTH_CEPHX)) {
380     r = keyring->from_ceph_context(cct);
381     if (r == -ENOENT) {
382       auth_supported->remove_supported_auth(CEPH_AUTH_CEPHX);
383       if (!auth_supported->get_supported_set().empty()) {
384         r = 0;
385         no_keyring_disabled_cephx = true;
386       } else {
387         lderr(cct) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl;
388       }
389     }
390   }
391
392   if (r < 0) {
393     return r;
394   }
395
396   rotating_secrets.reset(
397     new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
398
399   initialized = true;
400
401   timer.init();
402   finisher.start();
403   schedule_tick();
404
405   return 0;
406 }
407
408 void MonClient::shutdown()
409 {
410   ldout(cct, 10) << __func__ << dendl;
411   monc_lock.Lock();
412   while (!version_requests.empty()) {
413     version_requests.begin()->second->context->complete(-ECANCELED);
414     ldout(cct, 20) << __func__ << " canceling and discarding version request "
415                    << version_requests.begin()->second << dendl;
416     delete version_requests.begin()->second;
417     version_requests.erase(version_requests.begin());
418   }
419   while (!mon_commands.empty()) {
420     auto tid = mon_commands.begin()->first;
421     _cancel_mon_command(tid);
422   }
423   while (!waiting_for_session.empty()) {
424     ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
425     waiting_for_session.front()->put();
426     waiting_for_session.pop_front();
427   }
428
429   active_con.reset();
430   pending_cons.clear();
431   auth.reset();
432
433   monc_lock.Unlock();
434
435   if (initialized) {
436     finisher.wait_for_empty();
437     finisher.stop();
438   }
439   monc_lock.Lock();
440   timer.shutdown();
441
442   monc_lock.Unlock();
443 }
444
445 int MonClient::authenticate(double timeout)
446 {
447   Mutex::Locker lock(monc_lock);
448
449   if (active_con) {
450     ldout(cct, 5) << "already authenticated" << dendl;
451     return 0;
452   }
453
454   _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
455   if (!_opened())
456     _reopen_session();
457
458   utime_t until = ceph_clock_now();
459   until += timeout;
460   if (timeout > 0.0)
461     ldout(cct, 10) << "authenticate will time out at " << until << dendl;
462   while (!active_con && !authenticate_err) {
463     if (timeout > 0.0) {
464       int r = auth_cond.WaitUntil(monc_lock, until);
465       if (r == ETIMEDOUT) {
466         ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
467         authenticate_err = -r;
468       }
469     } else {
470       auth_cond.Wait(monc_lock);
471     }
472   }
473
474   if (active_con) {
475     ldout(cct, 5) << __func__ << " success, global_id "
476                   << active_con->get_global_id() << dendl;
477     // active_con should not have been set if there was an error
478     assert(authenticate_err == 0);
479     authenticated = true;
480   }
481
482   if (authenticate_err < 0 && no_keyring_disabled_cephx) {
483     lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
484   }
485
486   return authenticate_err;
487 }
488
489 void MonClient::handle_auth(MAuthReply *m)
490 {
491   assert(monc_lock.is_locked());
492   if (!_hunting()) {
493     std::swap(active_con->get_auth(), auth);
494     int ret = active_con->authenticate(m);
495     m->put();
496     std::swap(auth, active_con->get_auth());
497     if (global_id != active_con->get_global_id()) {
498       lderr(cct) << __func__ << " peer assigned me a different global_id: "
499                  << active_con->get_global_id() << dendl;
500     }
501     if (ret != -EAGAIN) {
502       _finish_auth(ret);
503     }
504     return;
505   }
506
507   // hunting
508   auto found = pending_cons.find(m->get_source_addr());
509   assert(found != pending_cons.end());
510   int auth_err = found->second.handle_auth(m, entity_name, want_keys,
511                                            rotating_secrets.get());
512   m->put();
513   if (auth_err == -EAGAIN) {
514     return;
515   }
516   if (auth_err) {
517     pending_cons.erase(found);
518     if (!pending_cons.empty()) {
519       // keep trying with pending connections
520       return;
521     }
522     // the last try just failed, give up.
523   } else {
524     auto& mc = found->second;
525     assert(mc.have_session());
526     active_con.reset(new MonConnection(std::move(mc)));
527     pending_cons.clear();
528   }
529
530   _finish_hunting();
531
532   if (!auth_err) {
533     last_rotating_renew_sent = utime_t();
534     while (!waiting_for_session.empty()) {
535       _send_mon_message(waiting_for_session.front());
536       waiting_for_session.pop_front();
537     }
538     _resend_mon_commands();
539     send_log(true);
540     if (active_con) {
541       std::swap(auth, active_con->get_auth());
542       global_id = active_con->get_global_id();
543     }
544   }
545   _finish_auth(auth_err);
546   if (!auth_err) {
547     Context *cb = nullptr;
548     if (session_established_context) {
549       cb = session_established_context.release();
550     }
551     if (cb) {
552       monc_lock.Unlock();
553       cb->complete(0);
554       monc_lock.Lock();
555     }
556   }
557 }
558
559 void MonClient::_finish_auth(int auth_err)
560 {
561   authenticate_err = auth_err;
562   // _resend_mon_commands() could _reopen_session() if the connected mon is not
563   // the one the MonCommand is targeting.
564   if (!auth_err && active_con) {
565     assert(auth);
566     _check_auth_tickets();
567   }
568   auth_cond.SignalAll();
569 }
570
571 // ---------
572
573 void MonClient::_send_mon_message(Message *m)
574 {
575   assert(monc_lock.is_locked());
576   if (active_con) {
577     auto cur_con = active_con->get_con();
578     ldout(cct, 10) << "_send_mon_message to mon."
579                    << monmap.get_name(cur_con->get_peer_addr())
580                    << " at " << cur_con->get_peer_addr() << dendl;
581     cur_con->send_message(m);
582   } else {
583     waiting_for_session.push_back(m);
584   }
585 }
586
587 void MonClient::_reopen_session(int rank)
588 {
589   assert(monc_lock.is_locked());
590   ldout(cct, 10) << __func__ << " rank " << rank << dendl;
591
592   active_con.reset();
593   pending_cons.clear();
594
595   _start_hunting();
596
597   if (rank >= 0) {
598     _add_conn(rank, global_id);
599   } else {
600     _add_conns(global_id);
601   }
602
603   // throw out old queued messages
604   while (!waiting_for_session.empty()) {
605     waiting_for_session.front()->put();
606     waiting_for_session.pop_front();
607   }
608
609   // throw out version check requests
610   while (!version_requests.empty()) {
611     finisher.queue(version_requests.begin()->second->context, -EAGAIN);
612     delete version_requests.begin()->second;
613     version_requests.erase(version_requests.begin());
614   }
615
616   for (auto& c : pending_cons) {
617     c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
618   }
619
620   for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
621        p != sub_sent.end();
622        ++p) {
623     if (sub_new.count(p->first) == 0)
624       sub_new[p->first] = p->second;
625   }
626   if (!sub_new.empty())
627     _renew_subs();
628 }
629
630 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
631 {
632   auto peer = monmap.get_addr(rank);
633   auto conn = messenger->get_connection(monmap.get_inst(rank));
634   MonConnection mc(cct, conn, global_id);
635   auto inserted = pending_cons.insert(make_pair(peer, move(mc)));
636   ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
637                  << " con " << conn
638                  << " addr " << conn->get_peer_addr()
639                  << dendl;
640   return inserted.first->second;
641 }
642
643 void MonClient::_add_conns(uint64_t global_id)
644 {
645   uint16_t min_priority = std::numeric_limits<uint16_t>::max();
646   for (const auto& m : monmap.mon_info) {
647     if (m.second.priority < min_priority) {
648       min_priority = m.second.priority;
649     }
650   }
651   vector<unsigned> ranks;
652   for (const auto& m : monmap.mon_info) {
653     if (m.second.priority == min_priority) {
654       ranks.push_back(monmap.get_rank(m.first));
655     }
656   }
657   std::random_device rd;
658   std::mt19937 rng(rd());
659   std::shuffle(ranks.begin(), ranks.end(), rng);
660   unsigned n = cct->_conf->mon_client_hunt_parallel;
661   if (n == 0 || n > ranks.size()) {
662     n = ranks.size();
663   }
664   for (unsigned i = 0; i < n; i++) {
665     _add_conn(ranks[i], global_id);
666   }
667 }
668
669 bool MonClient::ms_handle_reset(Connection *con)
670 {
671   Mutex::Locker lock(monc_lock);
672
673   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
674     return false;
675
676   if (_hunting()) {
677     if (pending_cons.count(con->get_peer_addr())) {
678       ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
679     } else {
680       ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
681     }
682     return true;
683   } else {
684     if (active_con && con == active_con->get_con()) {
685       ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
686       _reopen_session();
687       return false;
688     } else {
689       ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
690       return true;
691     }
692   }
693 }
694
695 bool MonClient::_opened() const
696 {
697   assert(monc_lock.is_locked());
698   return active_con || _hunting();
699 }
700
701 bool MonClient::_hunting() const
702 {
703   return !pending_cons.empty();
704 }
705
706 void MonClient::_start_hunting()
707 {
708   assert(!_hunting());
709   // adjust timeouts if necessary
710   if (!had_a_connection)
711     return;
712   reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
713   if (reopen_interval_multiplier >
714       cct->_conf->mon_client_hunt_interval_max_multiple) {
715     reopen_interval_multiplier =
716       cct->_conf->mon_client_hunt_interval_max_multiple;
717   }
718 }
719
720 void MonClient::_finish_hunting()
721 {
722   assert(monc_lock.is_locked());
723   // the pending conns have been cleaned.
724   assert(!_hunting());
725   if (active_con) {
726     auto con = active_con->get_con();
727     ldout(cct, 1) << "found mon."
728                   << monmap.get_name(con->get_peer_addr())
729                   << dendl;
730   } else {
731     ldout(cct, 1) << "no mon sessions established" << dendl;
732   }
733
734   had_a_connection = true;
735   _un_backoff();
736 }
737
738 void MonClient::tick()
739 {
740   ldout(cct, 10) << __func__ << dendl;
741
742   auto reschedule_tick = make_scope_guard([this] {
743       schedule_tick();
744     });
745
746   _check_auth_tickets();
747   
748   if (_hunting()) {
749     ldout(cct, 1) << "continuing hunt" << dendl;
750     return _reopen_session();
751   } else if (active_con) {
752     // just renew as needed
753     utime_t now = ceph_clock_now();
754     auto cur_con = active_con->get_con();
755     if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
756       ldout(cct, 10) << "renew subs? (now: " << now
757                      << "; renew after: " << sub_renew_after << ") -- "
758                      << (now > sub_renew_after ? "yes" : "no")
759                      << dendl;
760       if (now > sub_renew_after)
761         _renew_subs();
762     }
763
764     cur_con->send_keepalive();
765
766     if (cct->_conf->mon_client_ping_timeout > 0 &&
767         cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
768       utime_t lk = cur_con->get_last_keepalive_ack();
769       utime_t interval = now - lk;
770       if (interval > cct->_conf->mon_client_ping_timeout) {
771         ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
772                       << " seconds), reconnecting" << dendl;
773         return _reopen_session();
774       }
775       send_log();
776     }
777
778     _un_backoff();
779   }
780 }
781
782 void MonClient::_un_backoff()
783 {
784   // un-backoff our reconnect interval
785   reopen_interval_multiplier = std::max(
786     cct->_conf->get_val<double>("mon_client_hunt_interval_min_multiple"),
787     reopen_interval_multiplier /
788     cct->_conf->get_val<double>("mon_client_hunt_interval_backoff"));
789   ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
790                  << reopen_interval_multiplier << dendl;
791 }
792
793 void MonClient::schedule_tick()
794 {
795   struct C_Tick : public Context {
796     MonClient *monc;
797     explicit C_Tick(MonClient *m) : monc(m) {}
798     void finish(int r) override {
799       monc->tick();
800     }
801   };
802
803   if (_hunting()) {
804     timer.add_event_after(cct->_conf->mon_client_hunt_interval
805                           * reopen_interval_multiplier,
806                           new C_Tick(this));
807   } else
808     timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
809 }
810
811 // ---------
812
813 void MonClient::_renew_subs()
814 {
815   assert(monc_lock.is_locked());
816   if (sub_new.empty()) {
817     ldout(cct, 10) << __func__ << " - empty" << dendl;
818     return;
819   }
820
821   ldout(cct, 10) << __func__ << dendl;
822   if (!_opened())
823     _reopen_session();
824   else {
825     if (sub_renew_sent == utime_t())
826       sub_renew_sent = ceph_clock_now();
827
828     MMonSubscribe *m = new MMonSubscribe;
829     m->what = sub_new;
830     _send_mon_message(m);
831
832     // update sub_sent with sub_new
833     sub_new.insert(sub_sent.begin(), sub_sent.end());
834     std::swap(sub_new, sub_sent);
835     sub_new.clear();
836   }
837 }
838
839 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
840 {
841   if (sub_renew_sent != utime_t()) {
842     // NOTE: this is only needed for legacy (infernalis or older)
843     // mons; see tick().
844     sub_renew_after = sub_renew_sent;
845     sub_renew_after += m->interval / 2.0;
846     ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
847     sub_renew_sent = utime_t();
848   } else {
849     ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl;
850   }
851
852   m->put();
853 }
854
855 int MonClient::_check_auth_tickets()
856 {
857   assert(monc_lock.is_locked());
858   if (active_con && auth) {
859     if (auth->need_tickets()) {
860       ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
861       MAuth *m = new MAuth;
862       m->protocol = auth->get_protocol();
863       auth->prepare_build_request();
864       auth->build_request(m->auth_payload);
865       _send_mon_message(m);
866     }
867
868     _check_auth_rotating();
869   }
870   return 0;
871 }
872
873 int MonClient::_check_auth_rotating()
874 {
875   assert(monc_lock.is_locked());
876   if (!rotating_secrets ||
877       !auth_principal_needs_rotating_keys(entity_name)) {
878     ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
879     return 0;
880   }
881
882   if (!active_con || !auth) {
883     ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
884     return 0;
885   }
886
887   utime_t now = ceph_clock_now();
888   utime_t cutoff = now;
889   cutoff -= MIN(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
890   utime_t issued_at_lower_bound = now;
891   issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
892   if (!rotating_secrets->need_new_secrets(cutoff)) {
893     ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
894     rotating_secrets->dump_rotating();
895     return 0;
896   }
897
898   ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
899   if (!rotating_secrets->need_new_secrets() &&
900       rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
901     // the key has expired before it has been issued?
902     lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
903                << " (before " << issued_at_lower_bound << ")" << dendl;
904   }
905   if ((now > last_rotating_renew_sent) &&
906       double(now - last_rotating_renew_sent) < 1) {
907     ldout(cct, 10) << __func__ << " called too often (last: "
908                    << last_rotating_renew_sent << "), skipping refresh" << dendl;
909     return 0;
910   }
911   MAuth *m = new MAuth;
912   m->protocol = auth->get_protocol();
913   if (auth->build_rotating_request(m->auth_payload)) {
914     last_rotating_renew_sent = now;
915     _send_mon_message(m);
916   } else {
917     m->put();
918   }
919   return 0;
920 }
921
922 int MonClient::wait_auth_rotating(double timeout)
923 {
924   Mutex::Locker l(monc_lock);
925   utime_t now = ceph_clock_now();
926   utime_t until = now;
927   until += timeout;
928
929   // Must be initialized
930   assert(auth != nullptr);
931
932   if (auth->get_protocol() == CEPH_AUTH_NONE)
933     return 0;
934   
935   if (!rotating_secrets)
936     return 0;
937
938   while (auth_principal_needs_rotating_keys(entity_name) &&
939          rotating_secrets->need_new_secrets(now)) {
940     if (now >= until) {
941       ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
942       return -ETIMEDOUT;
943     }
944     ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
945     auth_cond.WaitUntil(monc_lock, until);
946     now = ceph_clock_now();
947   }
948   ldout(cct, 10) << __func__ << " done" << dendl;
949   return 0;
950 }
951
952 // ---------
953
954 void MonClient::_send_command(MonCommand *r)
955 {
956   entity_addr_t peer;
957   if (active_con) {
958     peer = active_con->get_con()->get_peer_addr();
959   }
960
961   if (r->target_rank >= 0 &&
962       r->target_rank != monmap.get_rank(peer)) {
963     ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
964                    << " wants rank " << r->target_rank
965                    << ", reopening session"
966                    << dendl;
967     if (r->target_rank >= (int)monmap.size()) {
968       ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
969       _finish_command(r, -ENOENT, "mon rank dne");
970       return;
971     }
972     _reopen_session(r->target_rank);
973     return;
974   }
975
976   if (r->target_name.length() &&
977       r->target_name != monmap.get_name(peer)) {
978     ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
979                    << " wants mon " << r->target_name
980                    << ", reopening session"
981                    << dendl;
982     if (!monmap.contains(r->target_name)) {
983       ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
984       _finish_command(r, -ENOENT, "mon dne");
985       return;
986     }
987     _reopen_session(monmap.get_rank(r->target_name));
988     return;
989   }
990
991   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
992   MMonCommand *m = new MMonCommand(monmap.fsid);
993   m->set_tid(r->tid);
994   m->cmd = r->cmd;
995   m->set_data(r->inbl);
996   _send_mon_message(m);
997   return;
998 }
999
1000 void MonClient::_resend_mon_commands()
1001 {
1002   // resend any requests
1003   for (map<uint64_t,MonCommand*>::iterator p = mon_commands.begin();
1004        p != mon_commands.end();
1005        ++p) {
1006     _send_command(p->second);
1007   }
1008 }
1009
1010 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1011 {
1012   MonCommand *r = NULL;
1013   uint64_t tid = ack->get_tid();
1014
1015   if (tid == 0 && !mon_commands.empty()) {
1016     r = mon_commands.begin()->second;
1017     ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1018   } else {
1019     map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid);
1020     if (p == mon_commands.end()) {
1021       ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1022       ack->put();
1023       return;
1024     }
1025     r = p->second;
1026   }
1027
1028   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1029   if (r->poutbl)
1030     r->poutbl->claim(ack->get_data());
1031   _finish_command(r, ack->r, ack->rs);
1032   ack->put();
1033 }
1034
1035 int MonClient::_cancel_mon_command(uint64_t tid)
1036 {
1037   assert(monc_lock.is_locked());
1038
1039   map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid);
1040   if (it == mon_commands.end()) {
1041     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1042     return -ENOENT;
1043   }
1044
1045   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1046
1047   MonCommand *cmd = it->second;
1048   _finish_command(cmd, -ETIMEDOUT, "");
1049   return 0;
1050 }
1051
1052 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1053 {
1054   ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1055   if (r->prval)
1056     *(r->prval) = ret;
1057   if (r->prs)
1058     *(r->prs) = rs;
1059   if (r->onfinish)
1060     finisher.queue(r->onfinish, ret);
1061   mon_commands.erase(r->tid);
1062   delete r;
1063 }
1064
1065 void MonClient::start_mon_command(const vector<string>& cmd,
1066                                  const bufferlist& inbl,
1067                                  bufferlist *outbl, string *outs,
1068                                  Context *onfinish)
1069 {
1070   Mutex::Locker l(monc_lock);
1071   MonCommand *r = new MonCommand(++last_mon_command_tid);
1072   r->cmd = cmd;
1073   r->inbl = inbl;
1074   r->poutbl = outbl;
1075   r->prs = outs;
1076   r->onfinish = onfinish;
1077   if (cct->_conf->rados_mon_op_timeout > 0) {
1078     class C_CancelMonCommand : public Context
1079     {
1080       uint64_t tid;
1081       MonClient *monc;
1082       public:
1083       C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1084       void finish(int r) override {
1085         monc->_cancel_mon_command(tid);
1086       }
1087     };
1088     r->ontimeout = new C_CancelMonCommand(r->tid, this);
1089     timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1090   }
1091   mon_commands[r->tid] = r;
1092   _send_command(r);
1093 }
1094
1095 void MonClient::start_mon_command(const string &mon_name,
1096                                  const vector<string>& cmd,
1097                                  const bufferlist& inbl,
1098                                  bufferlist *outbl, string *outs,
1099                                  Context *onfinish)
1100 {
1101   Mutex::Locker l(monc_lock);
1102   MonCommand *r = new MonCommand(++last_mon_command_tid);
1103   r->target_name = mon_name;
1104   r->cmd = cmd;
1105   r->inbl = inbl;
1106   r->poutbl = outbl;
1107   r->prs = outs;
1108   r->onfinish = onfinish;
1109   mon_commands[r->tid] = r;
1110   _send_command(r);
1111 }
1112
1113 void MonClient::start_mon_command(int rank,
1114                                  const vector<string>& cmd,
1115                                  const bufferlist& inbl,
1116                                  bufferlist *outbl, string *outs,
1117                                  Context *onfinish)
1118 {
1119   Mutex::Locker l(monc_lock);
1120   MonCommand *r = new MonCommand(++last_mon_command_tid);
1121   r->target_rank = rank;
1122   r->cmd = cmd;
1123   r->inbl = inbl;
1124   r->poutbl = outbl;
1125   r->prs = outs;
1126   r->onfinish = onfinish;
1127   mon_commands[r->tid] = r;
1128   _send_command(r);
1129 }
1130
1131 // ---------
1132
1133 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1134 {
1135   version_req_d *req = new version_req_d(onfinish, newest, oldest);
1136   ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1137   Mutex::Locker l(monc_lock);
1138   MMonGetVersion *m = new MMonGetVersion();
1139   m->what = map;
1140   m->handle = ++version_req_id;
1141   version_requests[m->handle] = req;
1142   _send_mon_message(m);
1143 }
1144
1145 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1146 {
1147   assert(monc_lock.is_locked());
1148   map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
1149   if (iter == version_requests.end()) {
1150     ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1151                   << " not found" << dendl;
1152   } else {
1153     version_req_d *req = iter->second;
1154     ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1155     version_requests.erase(iter);
1156     if (req->newest)
1157       *req->newest = m->version;
1158     if (req->oldest)
1159       *req->oldest = m->oldest_version;
1160     finisher.queue(req->context, 0);
1161     delete req;
1162   }
1163   m->put();
1164 }
1165
1166 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1167   Mutex::Locker l(monc_lock);
1168   if (auth) {
1169     return auth->build_authorizer(service_id);
1170   } else {
1171     ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1172                   << ", but no auth is available now" << dendl;
1173     return nullptr;
1174   }
1175 }
1176
1177 #define dout_subsys ceph_subsys_monc
1178 #undef dout_prefix
1179 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1180
1181 MonConnection::MonConnection(CephContext *cct, ConnectionRef con, uint64_t global_id)
1182   : cct(cct), con(con), global_id(global_id)
1183 {}
1184
1185 MonConnection::~MonConnection()
1186 {
1187   if (con) {
1188     con->mark_down();
1189     con.reset();
1190   }
1191 }
1192
1193 bool MonConnection::have_session() const
1194 {
1195   return state == State::HAVE_SESSION;
1196 }
1197
1198 void MonConnection::start(epoch_t epoch,
1199                          const EntityName& entity_name,
1200                          const AuthMethodList& auth_supported)
1201 {
1202   // restart authentication handshake
1203   state = State::NEGOTIATING;
1204
1205   // send an initial keepalive to ensure our timestamp is valid by the
1206   // time we are in an OPENED state (by sequencing this before
1207   // authentication).
1208   con->send_keepalive();
1209
1210   auto m = new MAuth;
1211   m->protocol = 0;
1212   m->monmap_epoch = epoch;
1213   __u8 struct_v = 1;
1214   ::encode(struct_v, m->auth_payload);
1215   ::encode(auth_supported.get_supported_set(), m->auth_payload);
1216   ::encode(entity_name, m->auth_payload);
1217   ::encode(global_id, m->auth_payload);
1218   con->send_message(m);
1219 }
1220
1221 int MonConnection::handle_auth(MAuthReply* m,
1222                                const EntityName& entity_name,
1223                                uint32_t want_keys,
1224                                RotatingKeyRing* keyring)
1225 {
1226   if (state == State::NEGOTIATING) {
1227     int r = _negotiate(m, entity_name, want_keys, keyring);
1228     if (r) {
1229       return r;
1230     }
1231     state = State::AUTHENTICATING;
1232   }
1233   int r = authenticate(m);
1234   if (!r) {
1235     state = State::HAVE_SESSION;
1236   }
1237   return r;
1238 }
1239
1240 int MonConnection::_negotiate(MAuthReply *m,
1241                               const EntityName& entity_name,
1242                               uint32_t want_keys,
1243                               RotatingKeyRing* keyring)
1244 {
1245   if (auth && (int)m->protocol == auth->get_protocol()) {
1246     // good, negotiation completed
1247     auth->reset();
1248     return 0;
1249   }
1250
1251   auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
1252   if (!auth) {
1253     ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
1254     if (m->result == -ENOTSUP) {
1255       ldout(cct, 10) << "none of our auth protocols are supported by the server"
1256                      << dendl;
1257     }
1258     return m->result;
1259   }
1260
1261   // do not request MGR key unless the mon has the SERVER_KRAKEN
1262   // feature.  otherwise it will give us an auth error.  note that
1263   // we have to use the FEATUREMASK because pre-jewel the kraken
1264   // feature bit was used for something else.
1265   if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
1266       !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1267     ldout(cct, 1) << __func__
1268                   << " not requesting MGR keys from pre-kraken monitor"
1269                   << dendl;
1270     want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1271   }
1272   auth->set_want_keys(want_keys);
1273   auth->init(entity_name);
1274   auth->set_global_id(global_id);
1275   return 0;
1276 }
1277
1278 int MonConnection::authenticate(MAuthReply *m)
1279 {
1280   assert(auth);
1281   if (!m->global_id) {
1282     ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1283   }
1284   if (m->global_id != global_id) {
1285     // it's a new session
1286     auth->reset();
1287     global_id = m->global_id;
1288     auth->set_global_id(global_id);
1289     ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1290   }
1291   auto p = m->result_bl.begin();
1292   int ret = auth->handle_response(m->result, p);
1293   if (ret == -EAGAIN) {
1294     auto ma = new MAuth;
1295     ma->protocol = auth->get_protocol();
1296     auth->prepare_build_request();
1297     auth->build_request(ma->auth_payload);
1298     con->send_message(ma);
1299   }
1300   return ret;
1301 }