initial code repo
[stor4nfv.git] / src / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
17
18 #include <memory>
19
20 #include "msg/Messenger.h"
21
22 #include "MonMap.h"
23
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
27
28
29 class MMonMap;
30 class MMonGetVersionReply;
31 struct MMonSubscribeAck;
32 class MMonCommandAck;
33 struct MAuthReply;
34 class MAuthRotating;
35 class LogClient;
36 struct AuthAuthorizer;
37 class AuthMethodList;
38 class AuthClientHandler;
39 class KeyRing;
40 class RotatingKeyRing;
41
42 struct MonClientPinger : public Dispatcher {
43
44   Mutex lock;
45   Cond ping_recvd_cond;
46   string *result;
47   bool done;
48
49   MonClientPinger(CephContext *cct_, string *res_) :
50     Dispatcher(cct_),
51     lock("MonClientPinger::lock"),
52     result(res_),
53     done(false)
54   { }
55
56   int wait_for_reply(double timeout = 0.0) {
57     utime_t until = ceph_clock_now();
58     until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
59     done = false;
60
61     int ret = 0;
62     while (!done) {
63       ret = ping_recvd_cond.WaitUntil(lock, until);
64       if (ret == ETIMEDOUT)
65         break;
66     }
67     return ret;
68   }
69
70   bool ms_dispatch(Message *m) override {
71     Mutex::Locker l(lock);
72     if (m->get_type() != CEPH_MSG_PING)
73       return false;
74
75     bufferlist &payload = m->get_payload();
76     if (result && payload.length() > 0) {
77       bufferlist::iterator p = payload.begin();
78       ::decode(*result, p);
79     }
80     done = true;
81     ping_recvd_cond.SignalAll();
82     m->put();
83     return true;
84   }
85   bool ms_handle_reset(Connection *con) override {
86     Mutex::Locker l(lock);
87     done = true;
88     ping_recvd_cond.SignalAll();
89     return true;
90   }
91   void ms_handle_remote_reset(Connection *con) override {}
92   bool ms_handle_refused(Connection *con) override {
93     return false;
94   }
95 };
96
97 class MonConnection {
98 public:
99   MonConnection(CephContext *cct,
100                 ConnectionRef conn,
101                 uint64_t global_id);
102   ~MonConnection();
103   MonConnection(MonConnection&& rhs) = default;
104   MonConnection& operator=(MonConnection&&) = default;
105   MonConnection(const MonConnection& rhs) = delete;
106   MonConnection& operator=(const MonConnection&) = delete;
107   int handle_auth(MAuthReply *m,
108                   const EntityName& entity_name,
109                   uint32_t want_keys,
110                   RotatingKeyRing* keyring);
111   int authenticate(MAuthReply *m);
112   void start(epoch_t epoch,
113              const EntityName& entity_name,
114              const AuthMethodList& auth_supported);
115   bool have_session() const;
116   uint64_t get_global_id() const {
117     return global_id;
118   }
119   ConnectionRef get_con() {
120     return con;
121   }
122   std::unique_ptr<AuthClientHandler>& get_auth() {
123     return auth;
124   }
125
126 private:
127   int _negotiate(MAuthReply *m,
128                  const EntityName& entity_name,
129                  uint32_t want_keys,
130                  RotatingKeyRing* keyring);
131
132 private:
133   CephContext *cct;
134   enum class State {
135     NONE,
136     NEGOTIATING,
137     AUTHENTICATING,
138     HAVE_SESSION,
139   };
140   State state = State::NONE;
141   ConnectionRef con;
142
143   std::unique_ptr<AuthClientHandler> auth;
144   uint64_t global_id;
145 };
146
147 class MonClient : public Dispatcher {
148 public:
149   MonMap monmap;
150 private:
151   Messenger *messenger;
152
153   std::unique_ptr<MonConnection> active_con;
154   std::map<entity_addr_t, MonConnection> pending_cons;
155
156   EntityName entity_name;
157
158   entity_addr_t my_addr;
159
160   mutable Mutex monc_lock;
161   SafeTimer timer;
162   Finisher finisher;
163
164   bool initialized;
165   bool no_keyring_disabled_cephx;
166
167   LogClient *log_client;
168   bool more_log_pending;
169
170   void send_log(bool flush = false);
171
172   std::unique_ptr<AuthMethodList> auth_supported;
173
174   bool ms_dispatch(Message *m) override;
175   bool ms_handle_reset(Connection *con) override;
176   void ms_handle_remote_reset(Connection *con) override {}
177   bool ms_handle_refused(Connection *con) override { return false; }
178
179   void handle_monmap(MMonMap *m);
180
181   void handle_auth(MAuthReply *m);
182
183   // monitor session
184   void tick();
185   void schedule_tick();
186
187   // monclient
188   bool want_monmap;
189   Cond map_cond;
190   bool passthrough_monmap = false;
191
192   // authenticate
193   std::unique_ptr<AuthClientHandler> auth;
194   uint32_t want_keys = 0;
195   uint64_t global_id = 0;
196   Cond auth_cond;
197   int authenticate_err = 0;
198   bool authenticated = false;
199
200   list<Message*> waiting_for_session;
201   utime_t last_rotating_renew_sent;
202   std::unique_ptr<Context> session_established_context;
203   bool had_a_connection;
204   double reopen_interval_multiplier;
205
206   bool _opened() const;
207   bool _hunting() const;
208   void _start_hunting();
209   void _finish_hunting();
210   void _finish_auth(int auth_err);
211   void _reopen_session(int rank = -1);
212   MonConnection& _add_conn(unsigned rank, uint64_t global_id);
213   void _un_backoff();
214   void _add_conns(uint64_t global_id);
215   void _send_mon_message(Message *m);
216
217 public:
218   void set_entity_name(EntityName name) { entity_name = name; }
219
220   int _check_auth_tickets();
221   int _check_auth_rotating();
222   int wait_auth_rotating(double timeout);
223
224   int authenticate(double timeout=0.0);
225   bool is_authenticated() const {return authenticated;}
226
227   /**
228    * Try to flush as many log messages as we can in a single
229    * message.  Use this before shutting down to transmit your
230    * last message.
231    */
232   void flush_log();
233
234   // mon subscriptions
235 private:
236   map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
237   map<string,ceph_mon_subscribe_item> sub_new;  // unsent new subs
238   utime_t sub_renew_sent, sub_renew_after;
239
240   void _renew_subs();
241   void handle_subscribe_ack(MMonSubscribeAck* m);
242
243   bool _sub_want(const string &what, version_t start, unsigned flags) {
244     auto sub = sub_new.find(what);
245     if (sub != sub_new.end() &&
246         sub->second.start == start &&
247         sub->second.flags == flags) {
248       return false;
249     } else {
250       sub = sub_sent.find(what);
251       if (sub != sub_sent.end() &&
252           sub->second.start == start &&
253           sub->second.flags == flags)
254         return false;
255     }
256
257     sub_new[what].start = start;
258     sub_new[what].flags = flags;
259     return true;
260   }
261   void _sub_got(const string &what, version_t got) {
262     if (sub_new.count(what)) {
263       if (sub_new[what].start <= got) {
264         if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
265           sub_new.erase(what);
266         else
267           sub_new[what].start = got + 1;
268       }
269     } else if (sub_sent.count(what)) {
270       if (sub_sent[what].start <= got) {
271         if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
272           sub_sent.erase(what);
273         else
274           sub_sent[what].start = got + 1;
275       }
276     }
277   }
278   void _sub_unwant(const string &what) {
279     sub_sent.erase(what);
280     sub_new.erase(what);
281   }
282
283 public:
284   void renew_subs() {
285     Mutex::Locker l(monc_lock);
286     _renew_subs();
287   }
288   bool sub_want(string what, version_t start, unsigned flags) {
289     Mutex::Locker l(monc_lock);
290     return _sub_want(what, start, flags);
291   }
292   void sub_got(string what, version_t have) {
293     Mutex::Locker l(monc_lock);
294     _sub_got(what, have);
295   }
296   void sub_unwant(string what) {
297     Mutex::Locker l(monc_lock);
298     _sub_unwant(what);
299   }
300   /**
301    * Increase the requested subscription start point. If you do increase
302    * the value, apply the passed-in flags as well; otherwise do nothing.
303    */
304   bool sub_want_increment(string what, version_t start, unsigned flags) {
305     Mutex::Locker l(monc_lock);
306     map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
307     if (i != sub_new.end()) {
308       if (i->second.start >= start)
309         return false;
310       i->second.start = start;
311       i->second.flags = flags;
312       return true;
313     }
314
315     i = sub_sent.find(what);
316     if (i == sub_sent.end() || i->second.start < start) {
317       ceph_mon_subscribe_item& item = sub_new[what];
318       item.start = start;
319       item.flags = flags;
320       return true;
321     }
322     return false;
323   }
324   
325   std::unique_ptr<KeyRing> keyring;
326   std::unique_ptr<RotatingKeyRing> rotating_secrets;
327
328  public:
329   explicit MonClient(CephContext *cct_);
330   MonClient(const MonClient &) = delete;
331   MonClient& operator=(const MonClient &) = delete;
332   ~MonClient() override;
333
334   int init();
335   void shutdown();
336
337   void set_log_client(LogClient *clog) {
338     log_client = clog;
339   }
340
341   int build_initial_monmap();
342   int get_monmap();
343   int get_monmap_privately();
344   /**
345    * If you want to see MonMap messages, set this and
346    * the MonClient will tell the Messenger it hasn't
347    * dealt with it.
348    * Note that if you do this, *you* are of course responsible for
349    * putting the message reference!
350    */
351   void set_passthrough_monmap() {
352     Mutex::Locker l(monc_lock);
353     passthrough_monmap = true;
354   }
355   void unset_passthrough_monmap() {
356     Mutex::Locker l(monc_lock);
357     passthrough_monmap = false;
358   }
359   /**
360    * Ping monitor with ID @p mon_id and record the resulting
361    * reply in @p result_reply.
362    *
363    * @param[in]  mon_id Target monitor's ID
364    * @param[out] result_reply reply from mon.ID, if param != NULL
365    * @returns    0 in case of success; < 0 in case of error,
366    *             -ETIMEDOUT if monitor didn't reply before timeout
367    *             expired (default: conf->client_mount_timeout).
368    */
369   int ping_monitor(const string &mon_id, string *result_reply);
370
371   void send_mon_message(Message *m) {
372     Mutex::Locker l(monc_lock);
373     _send_mon_message(m);
374   }
375   /**
376    * If you specify a callback, you should not call
377    * reopen_session() again until it has been triggered. The MonClient
378    * will behave, but the first callback could be triggered after
379    * the session has been killed and the MonClient has started trying
380    * to reconnect to another monitor.
381    */
382   void reopen_session(Context *cb=NULL) {
383     Mutex::Locker l(monc_lock);
384     if (cb) {
385       session_established_context.reset(cb);
386     }
387     _reopen_session();
388   }
389
390   entity_addr_t get_my_addr() const {
391     return my_addr;
392   }
393
394   const uuid_d& get_fsid() const {
395     return monmap.fsid;
396   }
397
398   entity_addr_t get_mon_addr(unsigned i) const {
399     Mutex::Locker l(monc_lock);
400     if (i < monmap.size())
401       return monmap.get_addr(i);
402     return entity_addr_t();
403   }
404   entity_inst_t get_mon_inst(unsigned i) const {
405     Mutex::Locker l(monc_lock);
406     if (i < monmap.size())
407       return monmap.get_inst(i);
408     return entity_inst_t();
409   }
410   int get_num_mon() const {
411     Mutex::Locker l(monc_lock);
412     return monmap.size();
413   }
414
415   uint64_t get_global_id() const {
416     Mutex::Locker l(monc_lock);
417     return global_id;
418   }
419
420   void set_messenger(Messenger *m) { messenger = m; }
421   entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
422   AuthAuthorizer* build_authorizer(int service_id) const;
423
424   void set_want_keys(uint32_t want) {
425     want_keys = want;
426   }
427
428   // admin commands
429 private:
430   uint64_t last_mon_command_tid;
431   struct MonCommand {
432     string target_name;
433     int target_rank;
434     uint64_t tid;
435     vector<string> cmd;
436     bufferlist inbl;
437     bufferlist *poutbl;
438     string *prs;
439     int *prval;
440     Context *onfinish, *ontimeout;
441
442     explicit MonCommand(uint64_t t)
443       : target_rank(-1),
444         tid(t),
445         poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
446     {}
447   };
448   map<uint64_t,MonCommand*> mon_commands;
449
450   void _send_command(MonCommand *r);
451   void _resend_mon_commands();
452   int _cancel_mon_command(uint64_t tid);
453   void _finish_command(MonCommand *r, int ret, string rs);
454   void _finish_auth();
455   void handle_mon_command_ack(MMonCommandAck *ack);
456
457 public:
458   void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
459                         bufferlist *outbl, string *outs,
460                         Context *onfinish);
461   void start_mon_command(int mon_rank,
462                         const vector<string>& cmd, const bufferlist& inbl,
463                         bufferlist *outbl, string *outs,
464                         Context *onfinish);
465   void start_mon_command(const string &mon_name,  ///< mon name, with mon. prefix
466                         const vector<string>& cmd, const bufferlist& inbl,
467                         bufferlist *outbl, string *outs,
468                         Context *onfinish);
469
470   // version requests
471 public:
472   /**
473    * get latest known version(s) of cluster map
474    *
475    * @param map string name of map (e.g., 'osdmap')
476    * @param newest pointer where newest map version will be stored
477    * @param oldest pointer where oldest map version will be stored
478    * @param onfinish context that will be triggered on completion
479    * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
480    */
481   void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
482
483   /**
484    * Run a callback within our lock, with a reference
485    * to the MonMap
486    */
487   template<typename Callback, typename...Args>
488   auto with_monmap(Callback&& cb, Args&&...args) const ->
489     decltype(cb(monmap, std::forward<Args>(args)...)) {
490     Mutex::Locker l(monc_lock);
491     return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
492   }
493
494 private:
495   struct version_req_d {
496     Context *context;
497     version_t *newest, *oldest;
498     version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
499   };
500
501   map<ceph_tid_t, version_req_d*> version_requests;
502   ceph_tid_t version_req_id;
503   void handle_get_version_reply(MMonGetVersionReply* m);
504
505
506 };
507
508 #endif