-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_MONCLIENT_H
-#define CEPH_MONCLIENT_H
-
-#include <memory>
-
-#include "msg/Messenger.h"
-
-#include "MonMap.h"
-
-#include "common/Timer.h"
-#include "common/Finisher.h"
-#include "common/config.h"
-
-
-class MMonMap;
-class MMonGetVersionReply;
-struct MMonSubscribeAck;
-class MMonCommandAck;
-struct MAuthReply;
-class MAuthRotating;
-class LogClient;
-struct AuthAuthorizer;
-class AuthMethodList;
-class AuthClientHandler;
-class KeyRing;
-class RotatingKeyRing;
-
-struct MonClientPinger : public Dispatcher {
-
- Mutex lock;
- Cond ping_recvd_cond;
- string *result;
- bool done;
-
- MonClientPinger(CephContext *cct_, string *res_) :
- Dispatcher(cct_),
- lock("MonClientPinger::lock"),
- result(res_),
- done(false)
- { }
-
- int wait_for_reply(double timeout = 0.0) {
- utime_t until = ceph_clock_now();
- until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
- done = false;
-
- int ret = 0;
- while (!done) {
- ret = ping_recvd_cond.WaitUntil(lock, until);
- if (ret == ETIMEDOUT)
- break;
- }
- return ret;
- }
-
- bool ms_dispatch(Message *m) override {
- Mutex::Locker l(lock);
- if (m->get_type() != CEPH_MSG_PING)
- return false;
-
- bufferlist &payload = m->get_payload();
- if (result && payload.length() > 0) {
- bufferlist::iterator p = payload.begin();
- ::decode(*result, p);
- }
- done = true;
- ping_recvd_cond.SignalAll();
- m->put();
- return true;
- }
- bool ms_handle_reset(Connection *con) override {
- Mutex::Locker l(lock);
- done = true;
- ping_recvd_cond.SignalAll();
- return true;
- }
- void ms_handle_remote_reset(Connection *con) override {}
- bool ms_handle_refused(Connection *con) override {
- return false;
- }
-};
-
-class MonConnection {
-public:
- MonConnection(CephContext *cct,
- ConnectionRef conn,
- uint64_t global_id);
- ~MonConnection();
- MonConnection(MonConnection&& rhs) = default;
- MonConnection& operator=(MonConnection&&) = default;
- MonConnection(const MonConnection& rhs) = delete;
- MonConnection& operator=(const MonConnection&) = delete;
- int handle_auth(MAuthReply *m,
- const EntityName& entity_name,
- uint32_t want_keys,
- RotatingKeyRing* keyring);
- int authenticate(MAuthReply *m);
- void start(epoch_t epoch,
- const EntityName& entity_name,
- const AuthMethodList& auth_supported);
- bool have_session() const;
- uint64_t get_global_id() const {
- return global_id;
- }
- ConnectionRef get_con() {
- return con;
- }
- std::unique_ptr<AuthClientHandler>& get_auth() {
- return auth;
- }
-
-private:
- int _negotiate(MAuthReply *m,
- const EntityName& entity_name,
- uint32_t want_keys,
- RotatingKeyRing* keyring);
-
-private:
- CephContext *cct;
- enum class State {
- NONE,
- NEGOTIATING,
- AUTHENTICATING,
- HAVE_SESSION,
- };
- State state = State::NONE;
- ConnectionRef con;
-
- std::unique_ptr<AuthClientHandler> auth;
- uint64_t global_id;
-};
-
-class MonClient : public Dispatcher {
-public:
- MonMap monmap;
-private:
- Messenger *messenger;
-
- std::unique_ptr<MonConnection> active_con;
- std::map<entity_addr_t, MonConnection> pending_cons;
-
- EntityName entity_name;
-
- entity_addr_t my_addr;
-
- mutable Mutex monc_lock;
- SafeTimer timer;
- Finisher finisher;
-
- bool initialized;
- bool no_keyring_disabled_cephx;
-
- LogClient *log_client;
- bool more_log_pending;
-
- void send_log(bool flush = false);
-
- std::unique_ptr<AuthMethodList> auth_supported;
-
- bool ms_dispatch(Message *m) override;
- bool ms_handle_reset(Connection *con) override;
- void ms_handle_remote_reset(Connection *con) override {}
- bool ms_handle_refused(Connection *con) override { return false; }
-
- void handle_monmap(MMonMap *m);
-
- void handle_auth(MAuthReply *m);
-
- // monitor session
- void tick();
- void schedule_tick();
-
- // monclient
- bool want_monmap;
- Cond map_cond;
- bool passthrough_monmap = false;
-
- // authenticate
- std::unique_ptr<AuthClientHandler> auth;
- uint32_t want_keys = 0;
- uint64_t global_id = 0;
- Cond auth_cond;
- int authenticate_err = 0;
- bool authenticated = false;
-
- list<Message*> waiting_for_session;
- utime_t last_rotating_renew_sent;
- std::unique_ptr<Context> session_established_context;
- bool had_a_connection;
- double reopen_interval_multiplier;
-
- bool _opened() const;
- bool _hunting() const;
- void _start_hunting();
- void _finish_hunting();
- void _finish_auth(int auth_err);
- void _reopen_session(int rank = -1);
- MonConnection& _add_conn(unsigned rank, uint64_t global_id);
- void _un_backoff();
- void _add_conns(uint64_t global_id);
- void _send_mon_message(Message *m);
-
-public:
- void set_entity_name(EntityName name) { entity_name = name; }
-
- int _check_auth_tickets();
- int _check_auth_rotating();
- int wait_auth_rotating(double timeout);
-
- int authenticate(double timeout=0.0);
- bool is_authenticated() const {return authenticated;}
-
- /**
- * Try to flush as many log messages as we can in a single
- * message. Use this before shutting down to transmit your
- * last message.
- */
- void flush_log();
-
- // mon subscriptions
-private:
- map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
- map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
- utime_t sub_renew_sent, sub_renew_after;
-
- void _renew_subs();
- void handle_subscribe_ack(MMonSubscribeAck* m);
-
- bool _sub_want(const string &what, version_t start, unsigned flags) {
- auto sub = sub_new.find(what);
- if (sub != sub_new.end() &&
- sub->second.start == start &&
- sub->second.flags == flags) {
- return false;
- } else {
- sub = sub_sent.find(what);
- if (sub != sub_sent.end() &&
- sub->second.start == start &&
- sub->second.flags == flags)
- return false;
- }
-
- sub_new[what].start = start;
- sub_new[what].flags = flags;
- return true;
- }
- void _sub_got(const string &what, version_t got) {
- if (sub_new.count(what)) {
- if (sub_new[what].start <= got) {
- if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_new.erase(what);
- else
- sub_new[what].start = got + 1;
- }
- } else if (sub_sent.count(what)) {
- if (sub_sent[what].start <= got) {
- if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_sent.erase(what);
- else
- sub_sent[what].start = got + 1;
- }
- }
- }
- void _sub_unwant(const string &what) {
- sub_sent.erase(what);
- sub_new.erase(what);
- }
-
-public:
- void renew_subs() {
- Mutex::Locker l(monc_lock);
- _renew_subs();
- }
- bool sub_want(string what, version_t start, unsigned flags) {
- Mutex::Locker l(monc_lock);
- return _sub_want(what, start, flags);
- }
- void sub_got(string what, version_t have) {
- Mutex::Locker l(monc_lock);
- _sub_got(what, have);
- }
- void sub_unwant(string what) {
- Mutex::Locker l(monc_lock);
- _sub_unwant(what);
- }
- /**
- * Increase the requested subscription start point. If you do increase
- * the value, apply the passed-in flags as well; otherwise do nothing.
- */
- bool sub_want_increment(string what, version_t start, unsigned flags) {
- Mutex::Locker l(monc_lock);
- map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
- if (i != sub_new.end()) {
- if (i->second.start >= start)
- return false;
- i->second.start = start;
- i->second.flags = flags;
- return true;
- }
-
- i = sub_sent.find(what);
- if (i == sub_sent.end() || i->second.start < start) {
- ceph_mon_subscribe_item& item = sub_new[what];
- item.start = start;
- item.flags = flags;
- return true;
- }
- return false;
- }
-
- std::unique_ptr<KeyRing> keyring;
- std::unique_ptr<RotatingKeyRing> rotating_secrets;
-
- public:
- explicit MonClient(CephContext *cct_);
- MonClient(const MonClient &) = delete;
- MonClient& operator=(const MonClient &) = delete;
- ~MonClient() override;
-
- int init();
- void shutdown();
-
- void set_log_client(LogClient *clog) {
- log_client = clog;
- }
-
- int build_initial_monmap();
- int get_monmap();
- int get_monmap_privately();
- /**
- * If you want to see MonMap messages, set this and
- * the MonClient will tell the Messenger it hasn't
- * dealt with it.
- * Note that if you do this, *you* are of course responsible for
- * putting the message reference!
- */
- void set_passthrough_monmap() {
- Mutex::Locker l(monc_lock);
- passthrough_monmap = true;
- }
- void unset_passthrough_monmap() {
- Mutex::Locker l(monc_lock);
- passthrough_monmap = false;
- }
- /**
- * Ping monitor with ID @p mon_id and record the resulting
- * reply in @p result_reply.
- *
- * @param[in] mon_id Target monitor's ID
- * @param[out] result_reply reply from mon.ID, if param != NULL
- * @returns 0 in case of success; < 0 in case of error,
- * -ETIMEDOUT if monitor didn't reply before timeout
- * expired (default: conf->client_mount_timeout).
- */
- int ping_monitor(const string &mon_id, string *result_reply);
-
- void send_mon_message(Message *m) {
- Mutex::Locker l(monc_lock);
- _send_mon_message(m);
- }
- /**
- * If you specify a callback, you should not call
- * reopen_session() again until it has been triggered. The MonClient
- * will behave, but the first callback could be triggered after
- * the session has been killed and the MonClient has started trying
- * to reconnect to another monitor.
- */
- void reopen_session(Context *cb=NULL) {
- Mutex::Locker l(monc_lock);
- if (cb) {
- session_established_context.reset(cb);
- }
- _reopen_session();
- }
-
- entity_addr_t get_my_addr() const {
- return my_addr;
- }
-
- const uuid_d& get_fsid() const {
- return monmap.fsid;
- }
-
- entity_addr_t get_mon_addr(unsigned i) const {
- Mutex::Locker l(monc_lock);
- if (i < monmap.size())
- return monmap.get_addr(i);
- return entity_addr_t();
- }
- entity_inst_t get_mon_inst(unsigned i) const {
- Mutex::Locker l(monc_lock);
- if (i < monmap.size())
- return monmap.get_inst(i);
- return entity_inst_t();
- }
- int get_num_mon() const {
- Mutex::Locker l(monc_lock);
- return monmap.size();
- }
-
- uint64_t get_global_id() const {
- Mutex::Locker l(monc_lock);
- return global_id;
- }
-
- void set_messenger(Messenger *m) { messenger = m; }
- entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
- AuthAuthorizer* build_authorizer(int service_id) const;
-
- void set_want_keys(uint32_t want) {
- want_keys = want;
- }
-
- // admin commands
-private:
- uint64_t last_mon_command_tid;
- struct MonCommand {
- string target_name;
- int target_rank;
- uint64_t tid;
- vector<string> cmd;
- bufferlist inbl;
- bufferlist *poutbl;
- string *prs;
- int *prval;
- Context *onfinish, *ontimeout;
-
- explicit MonCommand(uint64_t t)
- : target_rank(-1),
- tid(t),
- poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
- {}
- };
- map<uint64_t,MonCommand*> mon_commands;
-
- void _send_command(MonCommand *r);
- void _resend_mon_commands();
- int _cancel_mon_command(uint64_t tid);
- void _finish_command(MonCommand *r, int ret, string rs);
- void _finish_auth();
- void handle_mon_command_ack(MMonCommandAck *ack);
-
-public:
- void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
- bufferlist *outbl, string *outs,
- Context *onfinish);
- void start_mon_command(int mon_rank,
- const vector<string>& cmd, const bufferlist& inbl,
- bufferlist *outbl, string *outs,
- Context *onfinish);
- void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
- const vector<string>& cmd, const bufferlist& inbl,
- bufferlist *outbl, string *outs,
- Context *onfinish);
-
- // version requests
-public:
- /**
- * get latest known version(s) of cluster map
- *
- * @param map string name of map (e.g., 'osdmap')
- * @param newest pointer where newest map version will be stored
- * @param oldest pointer where oldest map version will be stored
- * @param onfinish context that will be triggered on completion
- * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
- */
- void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
-
- /**
- * Run a callback within our lock, with a reference
- * to the MonMap
- */
- template<typename Callback, typename...Args>
- auto with_monmap(Callback&& cb, Args&&...args) const ->
- decltype(cb(monmap, std::forward<Args>(args)...)) {
- Mutex::Locker l(monc_lock);
- return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
- }
-
-private:
- struct version_req_d {
- Context *context;
- version_t *newest, *oldest;
- version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
- };
-
- map<ceph_tid_t, version_req_d*> version_requests;
- ceph_tid_t version_req_id;
- void handle_get_version_reply(MMonGetVersionReply* m);
-
-
-};
-
-#endif