1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
20 #include "msg/Messenger.h"
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
30 class MMonGetVersionReply;
31 struct MMonSubscribeAck;
36 struct AuthAuthorizer;
38 class AuthClientHandler;
40 class RotatingKeyRing;
42 struct MonClientPinger : public Dispatcher {
49 MonClientPinger(CephContext *cct_, string *res_) :
51 lock("MonClientPinger::lock"),
56 int wait_for_reply(double timeout = 0.0) {
57 utime_t until = ceph_clock_now();
58 until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
63 ret = ping_recvd_cond.WaitUntil(lock, until);
70 bool ms_dispatch(Message *m) override {
71 Mutex::Locker l(lock);
72 if (m->get_type() != CEPH_MSG_PING)
75 bufferlist &payload = m->get_payload();
76 if (result && payload.length() > 0) {
77 bufferlist::iterator p = payload.begin();
81 ping_recvd_cond.SignalAll();
85 bool ms_handle_reset(Connection *con) override {
86 Mutex::Locker l(lock);
88 ping_recvd_cond.SignalAll();
91 void ms_handle_remote_reset(Connection *con) override {}
92 bool ms_handle_refused(Connection *con) override {
99 MonConnection(CephContext *cct,
103 MonConnection(MonConnection&& rhs) = default;
104 MonConnection& operator=(MonConnection&&) = default;
105 MonConnection(const MonConnection& rhs) = delete;
106 MonConnection& operator=(const MonConnection&) = delete;
107 int handle_auth(MAuthReply *m,
108 const EntityName& entity_name,
110 RotatingKeyRing* keyring);
111 int authenticate(MAuthReply *m);
112 void start(epoch_t epoch,
113 const EntityName& entity_name,
114 const AuthMethodList& auth_supported);
115 bool have_session() const;
116 uint64_t get_global_id() const {
119 ConnectionRef get_con() {
122 std::unique_ptr<AuthClientHandler>& get_auth() {
127 int _negotiate(MAuthReply *m,
128 const EntityName& entity_name,
130 RotatingKeyRing* keyring);
140 State state = State::NONE;
143 std::unique_ptr<AuthClientHandler> auth;
147 class MonClient : public Dispatcher {
151 Messenger *messenger;
153 std::unique_ptr<MonConnection> active_con;
154 std::map<entity_addr_t, MonConnection> pending_cons;
156 EntityName entity_name;
158 entity_addr_t my_addr;
160 mutable Mutex monc_lock;
165 bool no_keyring_disabled_cephx;
167 LogClient *log_client;
168 bool more_log_pending;
170 void send_log(bool flush = false);
172 std::unique_ptr<AuthMethodList> auth_supported;
174 bool ms_dispatch(Message *m) override;
175 bool ms_handle_reset(Connection *con) override;
176 void ms_handle_remote_reset(Connection *con) override {}
177 bool ms_handle_refused(Connection *con) override { return false; }
179 void handle_monmap(MMonMap *m);
181 void handle_auth(MAuthReply *m);
185 void schedule_tick();
190 bool passthrough_monmap = false;
193 std::unique_ptr<AuthClientHandler> auth;
194 uint32_t want_keys = 0;
195 uint64_t global_id = 0;
197 int authenticate_err = 0;
198 bool authenticated = false;
200 list<Message*> waiting_for_session;
201 utime_t last_rotating_renew_sent;
202 std::unique_ptr<Context> session_established_context;
203 bool had_a_connection;
204 double reopen_interval_multiplier;
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err);
211 void _reopen_session(int rank = -1);
212 MonConnection& _add_conn(unsigned rank, uint64_t global_id);
214 void _add_conns(uint64_t global_id);
215 void _send_mon_message(Message *m);
218 void set_entity_name(EntityName name) { entity_name = name; }
220 int _check_auth_tickets();
221 int _check_auth_rotating();
222 int wait_auth_rotating(double timeout);
224 int authenticate(double timeout=0.0);
225 bool is_authenticated() const {return authenticated;}
228 * Try to flush as many log messages as we can in a single
229 * message. Use this before shutting down to transmit your
236 map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
237 map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
238 utime_t sub_renew_sent, sub_renew_after;
241 void handle_subscribe_ack(MMonSubscribeAck* m);
243 bool _sub_want(const string &what, version_t start, unsigned flags) {
244 auto sub = sub_new.find(what);
245 if (sub != sub_new.end() &&
246 sub->second.start == start &&
247 sub->second.flags == flags) {
250 sub = sub_sent.find(what);
251 if (sub != sub_sent.end() &&
252 sub->second.start == start &&
253 sub->second.flags == flags)
257 sub_new[what].start = start;
258 sub_new[what].flags = flags;
261 void _sub_got(const string &what, version_t got) {
262 if (sub_new.count(what)) {
263 if (sub_new[what].start <= got) {
264 if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
267 sub_new[what].start = got + 1;
269 } else if (sub_sent.count(what)) {
270 if (sub_sent[what].start <= got) {
271 if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
272 sub_sent.erase(what);
274 sub_sent[what].start = got + 1;
278 void _sub_unwant(const string &what) {
279 sub_sent.erase(what);
285 Mutex::Locker l(monc_lock);
288 bool sub_want(string what, version_t start, unsigned flags) {
289 Mutex::Locker l(monc_lock);
290 return _sub_want(what, start, flags);
292 void sub_got(string what, version_t have) {
293 Mutex::Locker l(monc_lock);
294 _sub_got(what, have);
296 void sub_unwant(string what) {
297 Mutex::Locker l(monc_lock);
301 * Increase the requested subscription start point. If you do increase
302 * the value, apply the passed-in flags as well; otherwise do nothing.
304 bool sub_want_increment(string what, version_t start, unsigned flags) {
305 Mutex::Locker l(monc_lock);
306 map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
307 if (i != sub_new.end()) {
308 if (i->second.start >= start)
310 i->second.start = start;
311 i->second.flags = flags;
315 i = sub_sent.find(what);
316 if (i == sub_sent.end() || i->second.start < start) {
317 ceph_mon_subscribe_item& item = sub_new[what];
325 std::unique_ptr<KeyRing> keyring;
326 std::unique_ptr<RotatingKeyRing> rotating_secrets;
329 explicit MonClient(CephContext *cct_);
330 MonClient(const MonClient &) = delete;
331 MonClient& operator=(const MonClient &) = delete;
332 ~MonClient() override;
337 void set_log_client(LogClient *clog) {
341 int build_initial_monmap();
343 int get_monmap_privately();
345 * If you want to see MonMap messages, set this and
346 * the MonClient will tell the Messenger it hasn't
348 * Note that if you do this, *you* are of course responsible for
349 * putting the message reference!
351 void set_passthrough_monmap() {
352 Mutex::Locker l(monc_lock);
353 passthrough_monmap = true;
355 void unset_passthrough_monmap() {
356 Mutex::Locker l(monc_lock);
357 passthrough_monmap = false;
360 * Ping monitor with ID @p mon_id and record the resulting
361 * reply in @p result_reply.
363 * @param[in] mon_id Target monitor's ID
364 * @param[out] result_reply reply from mon.ID, if param != NULL
365 * @returns 0 in case of success; < 0 in case of error,
366 * -ETIMEDOUT if monitor didn't reply before timeout
367 * expired (default: conf->client_mount_timeout).
369 int ping_monitor(const string &mon_id, string *result_reply);
371 void send_mon_message(Message *m) {
372 Mutex::Locker l(monc_lock);
373 _send_mon_message(m);
376 * If you specify a callback, you should not call
377 * reopen_session() again until it has been triggered. The MonClient
378 * will behave, but the first callback could be triggered after
379 * the session has been killed and the MonClient has started trying
380 * to reconnect to another monitor.
382 void reopen_session(Context *cb=NULL) {
383 Mutex::Locker l(monc_lock);
385 session_established_context.reset(cb);
390 entity_addr_t get_my_addr() const {
394 const uuid_d& get_fsid() const {
398 entity_addr_t get_mon_addr(unsigned i) const {
399 Mutex::Locker l(monc_lock);
400 if (i < monmap.size())
401 return monmap.get_addr(i);
402 return entity_addr_t();
404 entity_inst_t get_mon_inst(unsigned i) const {
405 Mutex::Locker l(monc_lock);
406 if (i < monmap.size())
407 return monmap.get_inst(i);
408 return entity_inst_t();
410 int get_num_mon() const {
411 Mutex::Locker l(monc_lock);
412 return monmap.size();
415 uint64_t get_global_id() const {
416 Mutex::Locker l(monc_lock);
420 void set_messenger(Messenger *m) { messenger = m; }
421 entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
422 AuthAuthorizer* build_authorizer(int service_id) const;
424 void set_want_keys(uint32_t want) {
430 uint64_t last_mon_command_tid;
440 Context *onfinish, *ontimeout;
442 explicit MonCommand(uint64_t t)
445 poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
448 map<uint64_t,MonCommand*> mon_commands;
450 void _send_command(MonCommand *r);
451 void _resend_mon_commands();
452 int _cancel_mon_command(uint64_t tid);
453 void _finish_command(MonCommand *r, int ret, string rs);
455 void handle_mon_command_ack(MMonCommandAck *ack);
458 void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
459 bufferlist *outbl, string *outs,
461 void start_mon_command(int mon_rank,
462 const vector<string>& cmd, const bufferlist& inbl,
463 bufferlist *outbl, string *outs,
465 void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
466 const vector<string>& cmd, const bufferlist& inbl,
467 bufferlist *outbl, string *outs,
473 * get latest known version(s) of cluster map
475 * @param map string name of map (e.g., 'osdmap')
476 * @param newest pointer where newest map version will be stored
477 * @param oldest pointer where oldest map version will be stored
478 * @param onfinish context that will be triggered on completion
479 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
481 void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
484 * Run a callback within our lock, with a reference
487 template<typename Callback, typename...Args>
488 auto with_monmap(Callback&& cb, Args&&...args) const ->
489 decltype(cb(monmap, std::forward<Args>(args)...)) {
490 Mutex::Locker l(monc_lock);
491 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
495 struct version_req_d {
497 version_t *newest, *oldest;
498 version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
501 map<ceph_tid_t, version_req_d*> version_requests;
502 ceph_tid_t version_req_id;
503 void handle_get_version_reply(MMonGetVersionReply* m);