X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FMonClient.h;fp=src%2Fceph%2Fsrc%2Fmon%2FMonClient.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=0b24e01e2bace5fe884b8265804d6a64d1986a47;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mon/MonClient.h b/src/ceph/src/mon/MonClient.h deleted file mode 100644 index 0b24e01..0000000 --- a/src/ceph/src/mon/MonClient.h +++ /dev/null @@ -1,508 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MONCLIENT_H -#define CEPH_MONCLIENT_H - -#include - -#include "msg/Messenger.h" - -#include "MonMap.h" - -#include "common/Timer.h" -#include "common/Finisher.h" -#include "common/config.h" - - -class MMonMap; -class MMonGetVersionReply; -struct MMonSubscribeAck; -class MMonCommandAck; -struct MAuthReply; -class MAuthRotating; -class LogClient; -struct AuthAuthorizer; -class AuthMethodList; -class AuthClientHandler; -class KeyRing; -class RotatingKeyRing; - -struct MonClientPinger : public Dispatcher { - - Mutex lock; - Cond ping_recvd_cond; - string *result; - bool done; - - MonClientPinger(CephContext *cct_, string *res_) : - Dispatcher(cct_), - lock("MonClientPinger::lock"), - result(res_), - done(false) - { } - - int wait_for_reply(double timeout = 0.0) { - utime_t until = ceph_clock_now(); - until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout); - done = false; - - int ret = 0; - while (!done) { - ret = ping_recvd_cond.WaitUntil(lock, until); - if (ret == ETIMEDOUT) - break; - } - return ret; - } - - bool ms_dispatch(Message *m) override { - Mutex::Locker l(lock); - if (m->get_type() != CEPH_MSG_PING) - return false; - - bufferlist &payload = m->get_payload(); - if (result && payload.length() > 0) { - bufferlist::iterator p = payload.begin(); - ::decode(*result, p); - } - done = true; - ping_recvd_cond.SignalAll(); - m->put(); - return true; - } - bool ms_handle_reset(Connection *con) override { - Mutex::Locker l(lock); - done = true; - ping_recvd_cond.SignalAll(); - return true; - } - void ms_handle_remote_reset(Connection *con) override {} - bool ms_handle_refused(Connection *con) override { - return false; - } -}; - -class MonConnection { -public: - MonConnection(CephContext *cct, - ConnectionRef conn, - uint64_t global_id); - ~MonConnection(); - MonConnection(MonConnection&& rhs) = default; - MonConnection& operator=(MonConnection&&) = default; - MonConnection(const MonConnection& rhs) = delete; - MonConnection& operator=(const MonConnection&) = delete; - int handle_auth(MAuthReply *m, - const EntityName& entity_name, - uint32_t want_keys, - RotatingKeyRing* keyring); - int authenticate(MAuthReply *m); - void start(epoch_t epoch, - const EntityName& entity_name, - const AuthMethodList& auth_supported); - bool have_session() const; - uint64_t get_global_id() const { - return global_id; - } - ConnectionRef get_con() { - return con; - } - std::unique_ptr& get_auth() { - return auth; - } - -private: - int _negotiate(MAuthReply *m, - const EntityName& entity_name, - uint32_t want_keys, - RotatingKeyRing* keyring); - -private: - CephContext *cct; - enum class State { - NONE, - NEGOTIATING, - AUTHENTICATING, - HAVE_SESSION, - }; - State state = State::NONE; - ConnectionRef con; - - std::unique_ptr auth; - uint64_t global_id; -}; - -class MonClient : public Dispatcher { -public: - MonMap monmap; -private: - Messenger *messenger; - - std::unique_ptr active_con; - std::map pending_cons; - - EntityName entity_name; - - entity_addr_t my_addr; - - mutable Mutex monc_lock; - SafeTimer timer; - Finisher finisher; - - bool initialized; - bool no_keyring_disabled_cephx; - - LogClient *log_client; - bool more_log_pending; - - void send_log(bool flush = false); - - std::unique_ptr auth_supported; - - bool ms_dispatch(Message *m) override; - bool ms_handle_reset(Connection *con) override; - void ms_handle_remote_reset(Connection *con) override {} - bool ms_handle_refused(Connection *con) override { return false; } - - void handle_monmap(MMonMap *m); - - void handle_auth(MAuthReply *m); - - // monitor session - void tick(); - void schedule_tick(); - - // monclient - bool want_monmap; - Cond map_cond; - bool passthrough_monmap = false; - - // authenticate - std::unique_ptr auth; - uint32_t want_keys = 0; - uint64_t global_id = 0; - Cond auth_cond; - int authenticate_err = 0; - bool authenticated = false; - - list waiting_for_session; - utime_t last_rotating_renew_sent; - std::unique_ptr session_established_context; - bool had_a_connection; - double reopen_interval_multiplier; - - bool _opened() const; - bool _hunting() const; - void _start_hunting(); - void _finish_hunting(); - void _finish_auth(int auth_err); - void _reopen_session(int rank = -1); - MonConnection& _add_conn(unsigned rank, uint64_t global_id); - void _un_backoff(); - void _add_conns(uint64_t global_id); - void _send_mon_message(Message *m); - -public: - void set_entity_name(EntityName name) { entity_name = name; } - - int _check_auth_tickets(); - int _check_auth_rotating(); - int wait_auth_rotating(double timeout); - - int authenticate(double timeout=0.0); - bool is_authenticated() const {return authenticated;} - - /** - * Try to flush as many log messages as we can in a single - * message. Use this before shutting down to transmit your - * last message. - */ - void flush_log(); - - // mon subscriptions -private: - map sub_sent; // my subs, and current versions - map sub_new; // unsent new subs - utime_t sub_renew_sent, sub_renew_after; - - void _renew_subs(); - void handle_subscribe_ack(MMonSubscribeAck* m); - - bool _sub_want(const string &what, version_t start, unsigned flags) { - auto sub = sub_new.find(what); - if (sub != sub_new.end() && - sub->second.start == start && - sub->second.flags == flags) { - return false; - } else { - sub = sub_sent.find(what); - if (sub != sub_sent.end() && - sub->second.start == start && - sub->second.flags == flags) - return false; - } - - sub_new[what].start = start; - sub_new[what].flags = flags; - return true; - } - void _sub_got(const string &what, version_t got) { - if (sub_new.count(what)) { - if (sub_new[what].start <= got) { - if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME) - sub_new.erase(what); - else - sub_new[what].start = got + 1; - } - } else if (sub_sent.count(what)) { - if (sub_sent[what].start <= got) { - if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME) - sub_sent.erase(what); - else - sub_sent[what].start = got + 1; - } - } - } - void _sub_unwant(const string &what) { - sub_sent.erase(what); - sub_new.erase(what); - } - -public: - void renew_subs() { - Mutex::Locker l(monc_lock); - _renew_subs(); - } - bool sub_want(string what, version_t start, unsigned flags) { - Mutex::Locker l(monc_lock); - return _sub_want(what, start, flags); - } - void sub_got(string what, version_t have) { - Mutex::Locker l(monc_lock); - _sub_got(what, have); - } - void sub_unwant(string what) { - Mutex::Locker l(monc_lock); - _sub_unwant(what); - } - /** - * Increase the requested subscription start point. If you do increase - * the value, apply the passed-in flags as well; otherwise do nothing. - */ - bool sub_want_increment(string what, version_t start, unsigned flags) { - Mutex::Locker l(monc_lock); - map::iterator i = sub_new.find(what); - if (i != sub_new.end()) { - if (i->second.start >= start) - return false; - i->second.start = start; - i->second.flags = flags; - return true; - } - - i = sub_sent.find(what); - if (i == sub_sent.end() || i->second.start < start) { - ceph_mon_subscribe_item& item = sub_new[what]; - item.start = start; - item.flags = flags; - return true; - } - return false; - } - - std::unique_ptr keyring; - std::unique_ptr rotating_secrets; - - public: - explicit MonClient(CephContext *cct_); - MonClient(const MonClient &) = delete; - MonClient& operator=(const MonClient &) = delete; - ~MonClient() override; - - int init(); - void shutdown(); - - void set_log_client(LogClient *clog) { - log_client = clog; - } - - int build_initial_monmap(); - int get_monmap(); - int get_monmap_privately(); - /** - * If you want to see MonMap messages, set this and - * the MonClient will tell the Messenger it hasn't - * dealt with it. - * Note that if you do this, *you* are of course responsible for - * putting the message reference! - */ - void set_passthrough_monmap() { - Mutex::Locker l(monc_lock); - passthrough_monmap = true; - } - void unset_passthrough_monmap() { - Mutex::Locker l(monc_lock); - passthrough_monmap = false; - } - /** - * Ping monitor with ID @p mon_id and record the resulting - * reply in @p result_reply. - * - * @param[in] mon_id Target monitor's ID - * @param[out] result_reply reply from mon.ID, if param != NULL - * @returns 0 in case of success; < 0 in case of error, - * -ETIMEDOUT if monitor didn't reply before timeout - * expired (default: conf->client_mount_timeout). - */ - int ping_monitor(const string &mon_id, string *result_reply); - - void send_mon_message(Message *m) { - Mutex::Locker l(monc_lock); - _send_mon_message(m); - } - /** - * If you specify a callback, you should not call - * reopen_session() again until it has been triggered. The MonClient - * will behave, but the first callback could be triggered after - * the session has been killed and the MonClient has started trying - * to reconnect to another monitor. - */ - void reopen_session(Context *cb=NULL) { - Mutex::Locker l(monc_lock); - if (cb) { - session_established_context.reset(cb); - } - _reopen_session(); - } - - entity_addr_t get_my_addr() const { - return my_addr; - } - - const uuid_d& get_fsid() const { - return monmap.fsid; - } - - entity_addr_t get_mon_addr(unsigned i) const { - Mutex::Locker l(monc_lock); - if (i < monmap.size()) - return monmap.get_addr(i); - return entity_addr_t(); - } - entity_inst_t get_mon_inst(unsigned i) const { - Mutex::Locker l(monc_lock); - if (i < monmap.size()) - return monmap.get_inst(i); - return entity_inst_t(); - } - int get_num_mon() const { - Mutex::Locker l(monc_lock); - return monmap.size(); - } - - uint64_t get_global_id() const { - Mutex::Locker l(monc_lock); - return global_id; - } - - void set_messenger(Messenger *m) { messenger = m; } - entity_addr_t get_myaddr() const { return messenger->get_myaddr(); } - AuthAuthorizer* build_authorizer(int service_id) const; - - void set_want_keys(uint32_t want) { - want_keys = want; - } - - // admin commands -private: - uint64_t last_mon_command_tid; - struct MonCommand { - string target_name; - int target_rank; - uint64_t tid; - vector cmd; - bufferlist inbl; - bufferlist *poutbl; - string *prs; - int *prval; - Context *onfinish, *ontimeout; - - explicit MonCommand(uint64_t t) - : target_rank(-1), - tid(t), - poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL) - {} - }; - map mon_commands; - - void _send_command(MonCommand *r); - void _resend_mon_commands(); - int _cancel_mon_command(uint64_t tid); - void _finish_command(MonCommand *r, int ret, string rs); - void _finish_auth(); - void handle_mon_command_ack(MMonCommandAck *ack); - -public: - void start_mon_command(const vector& cmd, const bufferlist& inbl, - bufferlist *outbl, string *outs, - Context *onfinish); - void start_mon_command(int mon_rank, - const vector& cmd, const bufferlist& inbl, - bufferlist *outbl, string *outs, - Context *onfinish); - void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix - const vector& cmd, const bufferlist& inbl, - bufferlist *outbl, string *outs, - Context *onfinish); - - // version requests -public: - /** - * get latest known version(s) of cluster map - * - * @param map string name of map (e.g., 'osdmap') - * @param newest pointer where newest map version will be stored - * @param oldest pointer where oldest map version will be stored - * @param onfinish context that will be triggered on completion - * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request - */ - void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish); - - /** - * Run a callback within our lock, with a reference - * to the MonMap - */ - template - auto with_monmap(Callback&& cb, Args&&...args) const -> - decltype(cb(monmap, std::forward(args)...)) { - Mutex::Locker l(monc_lock); - return std::forward(cb)(monmap, std::forward(args)...); - } - -private: - struct version_req_d { - Context *context; - version_t *newest, *oldest; - version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {} - }; - - map version_requests; - ceph_tid_t version_req_id; - void handle_get_version_reply(MMonGetVersionReply* m); - - -}; - -#endif