// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include "include/scope_guard.h" #include "messages/MMonGetMap.h" #include "messages/MMonGetVersion.h" #include "messages/MMonGetVersionReply.h" #include "messages/MMonMap.h" #include "messages/MAuth.h" #include "messages/MLogAck.h" #include "messages/MAuthReply.h" #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" #include "messages/MPing.h" #include "messages/MMonSubscribe.h" #include "messages/MMonSubscribeAck.h" #include "common/errno.h" #include "common/LogClient.h" #include "MonClient.h" #include "MonMap.h" #include "auth/Auth.h" #include "auth/KeyRing.h" #include "auth/AuthClientHandler.h" #include "auth/AuthMethodList.h" #include "auth/RotatingKeyRing.h" #define dout_subsys ceph_subsys_monc #undef dout_prefix #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": " MonClient::MonClient(CephContext *cct_) : Dispatcher(cct_), messenger(NULL), monc_lock("MonClient::monc_lock"), timer(cct_, monc_lock), finisher(cct_), initialized(false), no_keyring_disabled_cephx(false), log_client(NULL), more_log_pending(false), want_monmap(true), had_a_connection(false), reopen_interval_multiplier( cct_->_conf->get_val("mon_client_hunt_interval_min_multiple")), last_mon_command_tid(0), version_req_id(0) { } MonClient::~MonClient() { } int MonClient::build_initial_monmap() { ldout(cct, 10) << __func__ << dendl; return monmap.build_initial(cct, cerr); } int MonClient::get_monmap() { ldout(cct, 10) << __func__ << dendl; Mutex::Locker l(monc_lock); _sub_want("monmap", 0, 0); if (!_opened()) _reopen_session(); while (want_monmap) map_cond.Wait(monc_lock); ldout(cct, 10) << __func__ << " done" << dendl; return 0; } int MonClient::get_monmap_privately() { ldout(cct, 10) << __func__ << dendl; Mutex::Locker l(monc_lock); bool temp_msgr = false; Messenger* smessenger = NULL; if (!messenger) { messenger = smessenger = Messenger::create_client_messenger(cct, "temp_mon_client"); if (NULL == messenger) { return -1; } messenger->add_dispatcher_head(this); smessenger->start(); temp_msgr = true; } int attempt = 10; ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl; std::random_device rd; std::mt19937 rng(rd()); assert(monmap.size() > 0); std::uniform_int_distribution ranks(0, monmap.size() - 1); while (monmap.fsid.is_zero()) { auto rank = ranks(rng); auto& pending_con = _add_conn(rank, 0); auto con = pending_con.get_con(); ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " " << con->get_peer_addr() << dendl; con->send_message(new MMonGetMap); if (--attempt == 0) break; utime_t interval; interval.set_from_double(cct->_conf->mon_client_hunt_interval); map_cond.WaitInterval(monc_lock, interval); if (monmap.fsid.is_zero() && con) { con->mark_down(); // nope, clean that connection up } } if (temp_msgr) { pending_cons.clear(); monc_lock.Unlock(); messenger->shutdown(); if (smessenger) smessenger->wait(); delete messenger; messenger = 0; monc_lock.Lock(); } pending_cons.clear(); if (!monmap.fsid.is_zero()) return 0; return -1; } /** * Ping the monitor with id @p mon_id and set the resulting reply in * the provided @p result_reply, if this last parameter is not NULL. * * So that we don't rely on the MonClient's default messenger, set up * during connect(), we create our own messenger to comunicate with the * specified monitor. This is advantageous in the following ways: * * - Isolate the ping procedure from the rest of the MonClient's operations, * allowing us to not acquire or manage the big monc_lock, thus not * having to block waiting for some other operation to finish before we * can proceed. * * for instance, we can ping mon.FOO even if we are currently hunting * or blocked waiting for auth to complete with mon.BAR. * * - Ping a monitor prior to establishing a connection (using connect()) * and properly establish the MonClient's messenger. This frees us * from dealing with the complex foo that happens in connect(). * * We also don't rely on MonClient as a dispatcher for this messenger, * unlike what happens with the MonClient's default messenger. This allows * us to sandbox the whole ping, having it much as a separate entity in * the MonClient class, considerably simplifying the handling and dispatching * of messages without needing to consider monc_lock. * * Current drawback is that we will establish a messenger for each ping * we want to issue, instead of keeping a single messenger instance that * would be used for all pings. */ int MonClient::ping_monitor(const string &mon_id, string *result_reply) { ldout(cct, 10) << __func__ << dendl; string new_mon_id; if (monmap.contains("noname-"+mon_id)) { new_mon_id = "noname-"+mon_id; } else { new_mon_id = mon_id; } if (new_mon_id.empty()) { ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl; return -EINVAL; } else if (!monmap.contains(new_mon_id)) { ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'" << dendl; return -ENOENT; } MonClientPinger *pinger = new MonClientPinger(cct, result_reply); Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client"); smsgr->add_dispatcher_head(pinger); smsgr->start(); ConnectionRef con = smsgr->get_connection(monmap.get_inst(new_mon_id)); ldout(cct, 10) << __func__ << " ping mon." << new_mon_id << " " << con->get_peer_addr() << dendl; con->send_message(new MPing); pinger->lock.Lock(); int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout); if (ret == 0) { ldout(cct,10) << __func__ << " got ping reply" << dendl; } else { ret = -ret; } pinger->lock.Unlock(); con->mark_down(); smsgr->shutdown(); smsgr->wait(); delete smsgr; delete pinger; return ret; } bool MonClient::ms_dispatch(Message *m) { if (my_addr == entity_addr_t()) my_addr = messenger->get_myaddr(); // we only care about these message types switch (m->get_type()) { case CEPH_MSG_MON_MAP: case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_MON_SUBSCRIBE_ACK: case CEPH_MSG_MON_GET_VERSION_REPLY: case MSG_MON_COMMAND_ACK: case MSG_LOGACK: break; default: return false; } Mutex::Locker lock(monc_lock); if (_hunting()) { auto pending_con = pending_cons.find(m->get_source_addr()); if (pending_con == pending_cons.end() || pending_con->second.get_con() != m->get_connection()) { // ignore any messages outside hunting sessions ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; m->put(); return true; } } else if (!active_con || active_con->get_con() != m->get_connection()) { // ignore any messages outside our session(s) ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; m->put(); return true; } switch (m->get_type()) { case CEPH_MSG_MON_MAP: handle_monmap(static_cast(m)); if (passthrough_monmap) { return false; } else { m->put(); } break; case CEPH_MSG_AUTH_REPLY: handle_auth(static_cast(m)); break; case CEPH_MSG_MON_SUBSCRIBE_ACK: handle_subscribe_ack(static_cast(m)); break; case CEPH_MSG_MON_GET_VERSION_REPLY: handle_get_version_reply(static_cast(m)); break; case MSG_MON_COMMAND_ACK: handle_mon_command_ack(static_cast(m)); break; case MSG_LOGACK: if (log_client) { log_client->handle_log_ack(static_cast(m)); m->put(); if (more_log_pending) { send_log(); } } else { m->put(); } break; } return true; } void MonClient::send_log(bool flush) { if (log_client) { Message *lm = log_client->get_mon_log_message(flush); if (lm) _send_mon_message(lm); more_log_pending = log_client->are_pending(); } } void MonClient::flush_log() { Mutex::Locker l(monc_lock); send_log(); } /* Unlike all the other message-handling functions, we don't put away a reference * because we want to support MMonMap passthrough to other Dispatchers. */ void MonClient::handle_monmap(MMonMap *m) { ldout(cct, 10) << __func__ << " " << *m << dendl; auto peer = m->get_source_addr(); string cur_mon = monmap.get_name(peer); bufferlist::iterator p = m->monmapbl.begin(); ::decode(monmap, p); ldout(cct, 10) << " got monmap " << monmap.epoch << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon) << dendl; ldout(cct, 10) << "dump:\n"; monmap.print(*_dout); *_dout << dendl; _sub_got("monmap", monmap.get_epoch()); if (!monmap.get_addr_name(peer, cur_mon)) { ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl; // can't find the mon we were talking to (above) _reopen_session(); } map_cond.Signal(); want_monmap = false; } // ---------------------- int MonClient::init() { ldout(cct, 10) << __func__ << dendl; messenger->add_dispatcher_head(this); entity_name = cct->_conf->name; Mutex::Locker l(monc_lock); string method; if (!cct->_conf->auth_supported.empty()) method = cct->_conf->auth_supported; else if (entity_name.get_type() == CEPH_ENTITY_TYPE_OSD || entity_name.get_type() == CEPH_ENTITY_TYPE_MDS || entity_name.get_type() == CEPH_ENTITY_TYPE_MON) method = cct->_conf->auth_cluster_required; else method = cct->_conf->auth_client_required; auth_supported.reset(new AuthMethodList(cct, method)); ldout(cct, 10) << "auth_supported " << auth_supported->get_supported_set() << " method " << method << dendl; int r = 0; keyring.reset(new KeyRing); // initializing keyring anyway if (auth_supported->is_supported_auth(CEPH_AUTH_CEPHX)) { r = keyring->from_ceph_context(cct); if (r == -ENOENT) { auth_supported->remove_supported_auth(CEPH_AUTH_CEPHX); if (!auth_supported->get_supported_set().empty()) { r = 0; no_keyring_disabled_cephx = true; } else { lderr(cct) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl; } } } if (r < 0) { return r; } rotating_secrets.reset( new RotatingKeyRing(cct, cct->get_module_type(), keyring.get())); initialized = true; timer.init(); finisher.start(); schedule_tick(); return 0; } void MonClient::shutdown() { ldout(cct, 10) << __func__ << dendl; monc_lock.Lock(); while (!version_requests.empty()) { version_requests.begin()->second->context->complete(-ECANCELED); ldout(cct, 20) << __func__ << " canceling and discarding version request " << version_requests.begin()->second << dendl; delete version_requests.begin()->second; version_requests.erase(version_requests.begin()); } while (!mon_commands.empty()) { auto tid = mon_commands.begin()->first; _cancel_mon_command(tid); } while (!waiting_for_session.empty()) { ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl; waiting_for_session.front()->put(); waiting_for_session.pop_front(); } active_con.reset(); pending_cons.clear(); auth.reset(); monc_lock.Unlock(); if (initialized) { finisher.wait_for_empty(); finisher.stop(); } monc_lock.Lock(); timer.shutdown(); monc_lock.Unlock(); } int MonClient::authenticate(double timeout) { Mutex::Locker lock(monc_lock); if (active_con) { ldout(cct, 5) << "already authenticated" << dendl; return 0; } _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); if (!_opened()) _reopen_session(); utime_t until = ceph_clock_now(); until += timeout; if (timeout > 0.0) ldout(cct, 10) << "authenticate will time out at " << until << dendl; while (!active_con && !authenticate_err) { if (timeout > 0.0) { int r = auth_cond.WaitUntil(monc_lock, until); if (r == ETIMEDOUT) { ldout(cct, 0) << "authenticate timed out after " << timeout << dendl; authenticate_err = -r; } } else { auth_cond.Wait(monc_lock); } } if (active_con) { ldout(cct, 5) << __func__ << " success, global_id " << active_con->get_global_id() << dendl; // active_con should not have been set if there was an error assert(authenticate_err == 0); authenticated = true; } if (authenticate_err < 0 && no_keyring_disabled_cephx) { lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl; } return authenticate_err; } void MonClient::handle_auth(MAuthReply *m) { assert(monc_lock.is_locked()); if (!_hunting()) { std::swap(active_con->get_auth(), auth); int ret = active_con->authenticate(m); m->put(); std::swap(auth, active_con->get_auth()); if (global_id != active_con->get_global_id()) { lderr(cct) << __func__ << " peer assigned me a different global_id: " << active_con->get_global_id() << dendl; } if (ret != -EAGAIN) { _finish_auth(ret); } return; } // hunting auto found = pending_cons.find(m->get_source_addr()); assert(found != pending_cons.end()); int auth_err = found->second.handle_auth(m, entity_name, want_keys, rotating_secrets.get()); m->put(); if (auth_err == -EAGAIN) { return; } if (auth_err) { pending_cons.erase(found); if (!pending_cons.empty()) { // keep trying with pending connections return; } // the last try just failed, give up. } else { auto& mc = found->second; assert(mc.have_session()); active_con.reset(new MonConnection(std::move(mc))); pending_cons.clear(); } _finish_hunting(); if (!auth_err) { last_rotating_renew_sent = utime_t(); while (!waiting_for_session.empty()) { _send_mon_message(waiting_for_session.front()); waiting_for_session.pop_front(); } _resend_mon_commands(); send_log(true); if (active_con) { std::swap(auth, active_con->get_auth()); global_id = active_con->get_global_id(); } } _finish_auth(auth_err); if (!auth_err) { Context *cb = nullptr; if (session_established_context) { cb = session_established_context.release(); } if (cb) { monc_lock.Unlock(); cb->complete(0); monc_lock.Lock(); } } } void MonClient::_finish_auth(int auth_err) { authenticate_err = auth_err; // _resend_mon_commands() could _reopen_session() if the connected mon is not // the one the MonCommand is targeting. if (!auth_err && active_con) { assert(auth); _check_auth_tickets(); } auth_cond.SignalAll(); } // --------- void MonClient::_send_mon_message(Message *m) { assert(monc_lock.is_locked()); if (active_con) { auto cur_con = active_con->get_con(); ldout(cct, 10) << "_send_mon_message to mon." << monmap.get_name(cur_con->get_peer_addr()) << " at " << cur_con->get_peer_addr() << dendl; cur_con->send_message(m); } else { waiting_for_session.push_back(m); } } void MonClient::_reopen_session(int rank) { assert(monc_lock.is_locked()); ldout(cct, 10) << __func__ << " rank " << rank << dendl; active_con.reset(); pending_cons.clear(); _start_hunting(); if (rank >= 0) { _add_conn(rank, global_id); } else { _add_conns(global_id); } // throw out old queued messages while (!waiting_for_session.empty()) { waiting_for_session.front()->put(); waiting_for_session.pop_front(); } // throw out version check requests while (!version_requests.empty()) { finisher.queue(version_requests.begin()->second->context, -EAGAIN); delete version_requests.begin()->second; version_requests.erase(version_requests.begin()); } for (auto& c : pending_cons) { c.second.start(monmap.get_epoch(), entity_name, *auth_supported); } for (map::iterator p = sub_sent.begin(); p != sub_sent.end(); ++p) { if (sub_new.count(p->first) == 0) sub_new[p->first] = p->second; } if (!sub_new.empty()) _renew_subs(); } MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id) { auto peer = monmap.get_addr(rank); auto conn = messenger->get_connection(monmap.get_inst(rank)); MonConnection mc(cct, conn, global_id); auto inserted = pending_cons.insert(make_pair(peer, move(mc))); ldout(cct, 10) << "picked mon." << monmap.get_name(rank) << " con " << conn << " addr " << conn->get_peer_addr() << dendl; return inserted.first->second; } void MonClient::_add_conns(uint64_t global_id) { uint16_t min_priority = std::numeric_limits::max(); for (const auto& m : monmap.mon_info) { if (m.second.priority < min_priority) { min_priority = m.second.priority; } } vector ranks; for (const auto& m : monmap.mon_info) { if (m.second.priority == min_priority) { ranks.push_back(monmap.get_rank(m.first)); } } std::random_device rd; std::mt19937 rng(rd()); std::shuffle(ranks.begin(), ranks.end(), rng); unsigned n = cct->_conf->mon_client_hunt_parallel; if (n == 0 || n > ranks.size()) { n = ranks.size(); } for (unsigned i = 0; i < n; i++) { _add_conn(ranks[i], global_id); } } bool MonClient::ms_handle_reset(Connection *con) { Mutex::Locker lock(monc_lock); if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) return false; if (_hunting()) { if (pending_cons.count(con->get_peer_addr())) { ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl; } else { ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl; } return true; } else { if (active_con && con == active_con->get_con()) { ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl; _reopen_session(); return false; } else { ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl; return true; } } } bool MonClient::_opened() const { assert(monc_lock.is_locked()); return active_con || _hunting(); } bool MonClient::_hunting() const { return !pending_cons.empty(); } void MonClient::_start_hunting() { assert(!_hunting()); // adjust timeouts if necessary if (!had_a_connection) return; reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff; if (reopen_interval_multiplier > cct->_conf->mon_client_hunt_interval_max_multiple) { reopen_interval_multiplier = cct->_conf->mon_client_hunt_interval_max_multiple; } } void MonClient::_finish_hunting() { assert(monc_lock.is_locked()); // the pending conns have been cleaned. assert(!_hunting()); if (active_con) { auto con = active_con->get_con(); ldout(cct, 1) << "found mon." << monmap.get_name(con->get_peer_addr()) << dendl; } else { ldout(cct, 1) << "no mon sessions established" << dendl; } had_a_connection = true; _un_backoff(); } void MonClient::tick() { ldout(cct, 10) << __func__ << dendl; auto reschedule_tick = make_scope_guard([this] { schedule_tick(); }); _check_auth_tickets(); if (_hunting()) { ldout(cct, 1) << "continuing hunt" << dendl; return _reopen_session(); } else if (active_con) { // just renew as needed utime_t now = ceph_clock_now(); auto cur_con = active_con->get_con(); if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) { ldout(cct, 10) << "renew subs? (now: " << now << "; renew after: " << sub_renew_after << ") -- " << (now > sub_renew_after ? "yes" : "no") << dendl; if (now > sub_renew_after) _renew_subs(); } cur_con->send_keepalive(); if (cct->_conf->mon_client_ping_timeout > 0 && cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { utime_t lk = cur_con->get_last_keepalive_ack(); utime_t interval = now - lk; if (interval > cct->_conf->mon_client_ping_timeout) { ldout(cct, 1) << "no keepalive since " << lk << " (" << interval << " seconds), reconnecting" << dendl; return _reopen_session(); } send_log(); } _un_backoff(); } } void MonClient::_un_backoff() { // un-backoff our reconnect interval reopen_interval_multiplier = std::max( cct->_conf->get_val("mon_client_hunt_interval_min_multiple"), reopen_interval_multiplier / cct->_conf->get_val("mon_client_hunt_interval_backoff")); ldout(cct, 20) << __func__ << " reopen_interval_multipler now " << reopen_interval_multiplier << dendl; } void MonClient::schedule_tick() { struct C_Tick : public Context { MonClient *monc; explicit C_Tick(MonClient *m) : monc(m) {} void finish(int r) override { monc->tick(); } }; if (_hunting()) { timer.add_event_after(cct->_conf->mon_client_hunt_interval * reopen_interval_multiplier, new C_Tick(this)); } else timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this)); } // --------- void MonClient::_renew_subs() { assert(monc_lock.is_locked()); if (sub_new.empty()) { ldout(cct, 10) << __func__ << " - empty" << dendl; return; } ldout(cct, 10) << __func__ << dendl; if (!_opened()) _reopen_session(); else { if (sub_renew_sent == utime_t()) sub_renew_sent = ceph_clock_now(); MMonSubscribe *m = new MMonSubscribe; m->what = sub_new; _send_mon_message(m); // update sub_sent with sub_new sub_new.insert(sub_sent.begin(), sub_sent.end()); std::swap(sub_new, sub_sent); sub_new.clear(); } } void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) { if (sub_renew_sent != utime_t()) { // NOTE: this is only needed for legacy (infernalis or older) // mons; see tick(). sub_renew_after = sub_renew_sent; sub_renew_after += m->interval / 2.0; ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl; sub_renew_sent = utime_t(); } else { ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl; } m->put(); } int MonClient::_check_auth_tickets() { assert(monc_lock.is_locked()); if (active_con && auth) { if (auth->need_tickets()) { ldout(cct, 10) << __func__ << " getting new tickets!" << dendl; MAuth *m = new MAuth; m->protocol = auth->get_protocol(); auth->prepare_build_request(); auth->build_request(m->auth_payload); _send_mon_message(m); } _check_auth_rotating(); } return 0; } int MonClient::_check_auth_rotating() { assert(monc_lock.is_locked()); if (!rotating_secrets || !auth_principal_needs_rotating_keys(entity_name)) { ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl; return 0; } if (!active_con || !auth) { ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl; return 0; } utime_t now = ceph_clock_now(); utime_t cutoff = now; cutoff -= MIN(30.0, cct->_conf->auth_service_ticket_ttl / 4.0); utime_t issued_at_lower_bound = now; issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl; if (!rotating_secrets->need_new_secrets(cutoff)) { ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl; rotating_secrets->dump_rotating(); return 0; } ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl; if (!rotating_secrets->need_new_secrets() && rotating_secrets->need_new_secrets(issued_at_lower_bound)) { // the key has expired before it has been issued? lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early" << " (before " << issued_at_lower_bound << ")" << dendl; } if ((now > last_rotating_renew_sent) && double(now - last_rotating_renew_sent) < 1) { ldout(cct, 10) << __func__ << " called too often (last: " << last_rotating_renew_sent << "), skipping refresh" << dendl; return 0; } MAuth *m = new MAuth; m->protocol = auth->get_protocol(); if (auth->build_rotating_request(m->auth_payload)) { last_rotating_renew_sent = now; _send_mon_message(m); } else { m->put(); } return 0; } int MonClient::wait_auth_rotating(double timeout) { Mutex::Locker l(monc_lock); utime_t now = ceph_clock_now(); utime_t until = now; until += timeout; // Must be initialized assert(auth != nullptr); if (auth->get_protocol() == CEPH_AUTH_NONE) return 0; if (!rotating_secrets) return 0; while (auth_principal_needs_rotating_keys(entity_name) && rotating_secrets->need_new_secrets(now)) { if (now >= until) { ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl; return -ETIMEDOUT; } ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl; auth_cond.WaitUntil(monc_lock, until); now = ceph_clock_now(); } ldout(cct, 10) << __func__ << " done" << dendl; return 0; } // --------- void MonClient::_send_command(MonCommand *r) { entity_addr_t peer; if (active_con) { peer = active_con->get_con()->get_peer_addr(); } if (r->target_rank >= 0 && r->target_rank != monmap.get_rank(peer)) { ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << " wants rank " << r->target_rank << ", reopening session" << dendl; if (r->target_rank >= (int)monmap.size()) { ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl; _finish_command(r, -ENOENT, "mon rank dne"); return; } _reopen_session(r->target_rank); return; } if (r->target_name.length() && r->target_name != monmap.get_name(peer)) { ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << " wants mon " << r->target_name << ", reopening session" << dendl; if (!monmap.contains(r->target_name)) { ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl; _finish_command(r, -ENOENT, "mon dne"); return; } _reopen_session(monmap.get_rank(r->target_name)); return; } ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl; MMonCommand *m = new MMonCommand(monmap.fsid); m->set_tid(r->tid); m->cmd = r->cmd; m->set_data(r->inbl); _send_mon_message(m); return; } void MonClient::_resend_mon_commands() { // resend any requests for (map::iterator p = mon_commands.begin(); p != mon_commands.end(); ++p) { _send_command(p->second); } } void MonClient::handle_mon_command_ack(MMonCommandAck *ack) { MonCommand *r = NULL; uint64_t tid = ack->get_tid(); if (tid == 0 && !mon_commands.empty()) { r = mon_commands.begin()->second; ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl; } else { map::iterator p = mon_commands.find(tid); if (p == mon_commands.end()) { ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl; ack->put(); return; } r = p->second; } ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl; if (r->poutbl) r->poutbl->claim(ack->get_data()); _finish_command(r, ack->r, ack->rs); ack->put(); } int MonClient::_cancel_mon_command(uint64_t tid) { assert(monc_lock.is_locked()); map::iterator it = mon_commands.find(tid); if (it == mon_commands.end()) { ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; return -ENOENT; } ldout(cct, 10) << __func__ << " tid " << tid << dendl; MonCommand *cmd = it->second; _finish_command(cmd, -ETIMEDOUT, ""); return 0; } void MonClient::_finish_command(MonCommand *r, int ret, string rs) { ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl; if (r->prval) *(r->prval) = ret; if (r->prs) *(r->prs) = rs; if (r->onfinish) finisher.queue(r->onfinish, ret); mon_commands.erase(r->tid); delete r; } void MonClient::start_mon_command(const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish) { Mutex::Locker l(monc_lock); MonCommand *r = new MonCommand(++last_mon_command_tid); r->cmd = cmd; r->inbl = inbl; r->poutbl = outbl; r->prs = outs; r->onfinish = onfinish; if (cct->_conf->rados_mon_op_timeout > 0) { class C_CancelMonCommand : public Context { uint64_t tid; MonClient *monc; public: C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {} void finish(int r) override { monc->_cancel_mon_command(tid); } }; r->ontimeout = new C_CancelMonCommand(r->tid, this); timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout); } mon_commands[r->tid] = r; _send_command(r); } void MonClient::start_mon_command(const string &mon_name, const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish) { Mutex::Locker l(monc_lock); MonCommand *r = new MonCommand(++last_mon_command_tid); r->target_name = mon_name; r->cmd = cmd; r->inbl = inbl; r->poutbl = outbl; r->prs = outs; r->onfinish = onfinish; mon_commands[r->tid] = r; _send_command(r); } void MonClient::start_mon_command(int rank, const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish) { Mutex::Locker l(monc_lock); MonCommand *r = new MonCommand(++last_mon_command_tid); r->target_rank = rank; r->cmd = cmd; r->inbl = inbl; r->poutbl = outbl; r->prs = outs; r->onfinish = onfinish; mon_commands[r->tid] = r; _send_command(r); } // --------- void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish) { version_req_d *req = new version_req_d(onfinish, newest, oldest); ldout(cct, 10) << "get_version " << map << " req " << req << dendl; Mutex::Locker l(monc_lock); MMonGetVersion *m = new MMonGetVersion(); m->what = map; m->handle = ++version_req_id; version_requests[m->handle] = req; _send_mon_message(m); } void MonClient::handle_get_version_reply(MMonGetVersionReply* m) { assert(monc_lock.is_locked()); map::iterator iter = version_requests.find(m->handle); if (iter == version_requests.end()) { ldout(cct, 0) << __func__ << " version request with handle " << m->handle << " not found" << dendl; } else { version_req_d *req = iter->second; ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl; version_requests.erase(iter); if (req->newest) *req->newest = m->version; if (req->oldest) *req->oldest = m->oldest_version; finisher.queue(req->context, 0); delete req; } m->put(); } AuthAuthorizer* MonClient::build_authorizer(int service_id) const { Mutex::Locker l(monc_lock); if (auth) { return auth->build_authorizer(service_id); } else { ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id) << ", but no auth is available now" << dendl; return nullptr; } } #define dout_subsys ceph_subsys_monc #undef dout_prefix #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ") MonConnection::MonConnection(CephContext *cct, ConnectionRef con, uint64_t global_id) : cct(cct), con(con), global_id(global_id) {} MonConnection::~MonConnection() { if (con) { con->mark_down(); con.reset(); } } bool MonConnection::have_session() const { return state == State::HAVE_SESSION; } void MonConnection::start(epoch_t epoch, const EntityName& entity_name, const AuthMethodList& auth_supported) { // restart authentication handshake state = State::NEGOTIATING; // send an initial keepalive to ensure our timestamp is valid by the // time we are in an OPENED state (by sequencing this before // authentication). con->send_keepalive(); auto m = new MAuth; m->protocol = 0; m->monmap_epoch = epoch; __u8 struct_v = 1; ::encode(struct_v, m->auth_payload); ::encode(auth_supported.get_supported_set(), m->auth_payload); ::encode(entity_name, m->auth_payload); ::encode(global_id, m->auth_payload); con->send_message(m); } int MonConnection::handle_auth(MAuthReply* m, const EntityName& entity_name, uint32_t want_keys, RotatingKeyRing* keyring) { if (state == State::NEGOTIATING) { int r = _negotiate(m, entity_name, want_keys, keyring); if (r) { return r; } state = State::AUTHENTICATING; } int r = authenticate(m); if (!r) { state = State::HAVE_SESSION; } return r; } int MonConnection::_negotiate(MAuthReply *m, const EntityName& entity_name, uint32_t want_keys, RotatingKeyRing* keyring) { if (auth && (int)m->protocol == auth->get_protocol()) { // good, negotiation completed auth->reset(); return 0; } auth.reset(get_auth_client_handler(cct, m->protocol, keyring)); if (!auth) { ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl; if (m->result == -ENOTSUP) { ldout(cct, 10) << "none of our auth protocols are supported by the server" << dendl; } return m->result; } // do not request MGR key unless the mon has the SERVER_KRAKEN // feature. otherwise it will give us an auth error. note that // we have to use the FEATUREMASK because pre-jewel the kraken // feature bit was used for something else. if ((want_keys & CEPH_ENTITY_TYPE_MGR) && !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) { ldout(cct, 1) << __func__ << " not requesting MGR keys from pre-kraken monitor" << dendl; want_keys &= ~CEPH_ENTITY_TYPE_MGR; } auth->set_want_keys(want_keys); auth->init(entity_name); auth->set_global_id(global_id); return 0; } int MonConnection::authenticate(MAuthReply *m) { assert(auth); if (!m->global_id) { ldout(cct, 1) << "peer sent an invalid global_id" << dendl; } if (m->global_id != global_id) { // it's a new session auth->reset(); global_id = m->global_id; auth->set_global_id(global_id); ldout(cct, 10) << "my global_id is " << m->global_id << dendl; } auto p = m->result_bl.begin(); int ret = auth->handle_response(m->result, p); if (ret == -EAGAIN) { auto ma = new MAuth; ma->protocol = auth->get_protocol(); auth->prepare_build_request(); auth->build_request(ma->auth_payload); con->send_message(ma); } return ret; }