Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librados / RadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 #include <sstream>
22 #include <pthread.h>
23 #include <errno.h>
24
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/stringify.h"
31 #include "include/util.h"
32
33 #include "messages/MLog.h"
34 #include "msg/Messenger.h"
35
36 // needed for static_cast
37 #include "messages/PaxosServiceMessage.h"
38 #include "messages/MPoolOpReply.h"
39 #include "messages/MStatfsReply.h"
40 #include "messages/MGetPoolStatsReply.h"
41 #include "messages/MOSDOpReply.h"
42 #include "messages/MOSDMap.h"
43 #include "messages/MCommandReply.h"
44
45 #include "AioCompletionImpl.h"
46 #include "IoCtxImpl.h"
47 #include "PoolAsyncCompletionImpl.h"
48 #include "RadosClient.h"
49
50 #include "include/assert.h"
51 #include "common/EventTrace.h"
52
53 #define dout_subsys ceph_subsys_rados
54 #undef dout_prefix
55 #define dout_prefix *_dout << "librados: "
56
57 bool librados::RadosClient::ms_get_authorizer(int dest_type,
58                                               AuthAuthorizer **authorizer,
59                                               bool force_new) {
60   //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
61   /* monitor authorization is being handled on different layer */
62   if (dest_type == CEPH_ENTITY_TYPE_MON)
63     return true;
64   *authorizer = monclient.build_authorizer(dest_type);
65   return *authorizer != NULL;
66 }
67
68 librados::RadosClient::RadosClient(CephContext *cct_)
69   : Dispatcher(cct_->get()),
70     cct_deleter{cct_, [](CephContext *p) {p->put();}},
71     conf(cct_->_conf),
72     state(DISCONNECTED),
73     monclient(cct_),
74     mgrclient(cct_, nullptr),
75     messenger(NULL),
76     instance_id(0),
77     objecter(NULL),
78     lock("librados::RadosClient::lock"),
79     timer(cct, lock),
80     refcnt(1),
81     log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
82     finisher(cct, "radosclient", "fn-radosclient")
83 {
84 }
85
86 int64_t librados::RadosClient::lookup_pool(const char *name)
87 {
88   int r = wait_for_osdmap();
89   if (r < 0) {
90     return r;
91   }
92
93   int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
94                                  name);
95   if (-ENOENT == ret) {
96     // Make sure we have the latest map
97     int r = wait_for_latest_osdmap();
98     if (r < 0)
99       return r;
100     ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
101                                  name);
102   }
103
104   return ret;
105 }
106
107 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
108 {
109   bool requires;
110   int r = pool_requires_alignment2(pool_id, &requires);
111   if (r < 0) {
112     // Cast answer to false, this is a little bit problematic
113     // since we really don't know the answer yet, say.
114     return false;
115   }
116
117   return requires;
118 }
119
120 // a safer version of pool_requires_alignment
121 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
122                                                     bool *requires)
123 {
124   if (!requires)
125     return -EINVAL;
126
127   int r = wait_for_osdmap();
128   if (r < 0) {
129     return r;
130   }
131
132   return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
133       if (!o.have_pg_pool(pool_id)) {
134         return -ENOENT;
135       }
136       *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
137       return 0;
138     });
139 }
140
141 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
142 {
143   uint64_t alignment;
144   int r = pool_required_alignment2(pool_id, &alignment);
145   if (r < 0) {
146     return 0;
147   }
148
149   return alignment;
150 }
151
152 // a safer version of pool_required_alignment
153 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
154                                                     uint64_t *alignment)
155 {
156   if (!alignment)
157     return -EINVAL;
158
159   int r = wait_for_osdmap();
160   if (r < 0) {
161     return r;
162   }
163
164   return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
165       if (!o.have_pg_pool(pool_id)) {
166         return -ENOENT;
167       }
168       *alignment = o.get_pg_pool(pool_id)->required_alignment();
169       return 0;
170     });
171 }
172
173 int librados::RadosClient::pool_get_auid(uint64_t pool_id,
174                                          unsigned long long *auid)
175 {
176   int r = wait_for_osdmap();
177   if (r < 0)
178     return r;
179   objecter->with_osdmap([&](const OSDMap& o) {
180       const pg_pool_t *pg = o.get_pg_pool(pool_id);
181       if (!pg) {
182         r = -ENOENT;
183       } else {
184         r = 0;
185         *auid = pg->auid;
186       }
187     });
188   return r;
189 }
190
191 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s)
192 {
193   int r = wait_for_osdmap();
194   if (r < 0)
195     return r;
196   objecter->with_osdmap([&](const OSDMap& o) {
197       if (!o.have_pg_pool(pool_id)) {
198         r = -ENOENT;
199       } else {
200         r = 0;
201         *s = o.get_pool_name(pool_id);
202       }
203     });
204   return r;
205 }
206
207 int librados::RadosClient::get_fsid(std::string *s)
208 {
209   if (!s)
210     return -EINVAL;
211   Mutex::Locker l(lock);
212   ostringstream oss;
213   oss << monclient.get_fsid();
214   *s = oss.str();
215   return 0;
216 }
217
218 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
219 {
220   int err = 0;
221   /* If we haven't yet connected, we have no way of telling whether we
222    * already built monc's initial monmap.  IF we are in CONNECTED state,
223    * then it is safe to assume that we went through connect(), which does
224    * build a monmap.
225    */
226   if (state != CONNECTED) {
227     ldout(cct, 10) << __func__ << " build monmap" << dendl;
228     err = monclient.build_initial_monmap();
229   }
230   if (err < 0) {
231     return err;
232   }
233
234   err = monclient.ping_monitor(mon_id, result);
235   return err;
236 }
237
238 int librados::RadosClient::connect()
239 {
240   common_init_finish(cct);
241
242   int err;
243
244   // already connected?
245   if (state == CONNECTING)
246     return -EINPROGRESS;
247   if (state == CONNECTED)
248     return -EISCONN;
249   state = CONNECTING;
250
251   // get monmap
252   err = monclient.build_initial_monmap();
253   if (err < 0)
254     goto out;
255
256   err = -ENOMEM;
257   messenger = Messenger::create_client_messenger(cct, "radosclient");
258   if (!messenger)
259     goto out;
260
261   // require OSDREPLYMUX feature.  this means we will fail to talk to
262   // old servers.  this is necessary because otherwise we won't know
263   // how to decompose the reply data into its constituent pieces.
264   messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
265
266   ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
267
268   ldout(cct, 1) << "starting objecter" << dendl;
269
270   objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
271                           &finisher,
272                           cct->_conf->rados_mon_op_timeout,
273                           cct->_conf->rados_osd_op_timeout);
274   if (!objecter)
275     goto out;
276   objecter->set_balanced_budget();
277
278   monclient.set_messenger(messenger);
279   mgrclient.set_messenger(messenger);
280
281   objecter->init();
282   messenger->add_dispatcher_head(&mgrclient);
283   messenger->add_dispatcher_tail(objecter);
284   messenger->add_dispatcher_tail(this);
285
286   messenger->start();
287
288   ldout(cct, 1) << "setting wanted keys" << dendl;
289   monclient.set_want_keys(
290       CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
291   ldout(cct, 1) << "calling monclient init" << dendl;
292   err = monclient.init();
293   if (err) {
294     ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
295     shutdown();
296     goto out;
297   }
298
299   err = monclient.authenticate(conf->client_mount_timeout);
300   if (err) {
301     ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
302     shutdown();
303     goto out;
304   }
305   messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
306
307   // MgrClient needs this (it doesn't have MonClient reference itself)
308   monclient.sub_want("mgrmap", 0, 0);
309   monclient.renew_subs();
310
311   if (service_daemon) {
312     ldout(cct, 10) << __func__ << " registering as " << service_name << "."
313                    << daemon_name << dendl;
314     mgrclient.service_daemon_register(service_name, daemon_name,
315                                       daemon_metadata);
316   }
317   mgrclient.init();
318
319   objecter->set_client_incarnation(0);
320   objecter->start();
321   lock.Lock();
322
323   timer.init();
324
325   finisher.start();
326
327   state = CONNECTED;
328   instance_id = monclient.get_global_id();
329
330   lock.Unlock();
331
332   ldout(cct, 1) << "init done" << dendl;
333   err = 0;
334
335  out:
336   if (err) {
337     state = DISCONNECTED;
338
339     if (objecter) {
340       delete objecter;
341       objecter = NULL;
342     }
343     if (messenger) {
344       delete messenger;
345       messenger = NULL;
346     }
347   }
348
349   return err;
350 }
351
352 void librados::RadosClient::shutdown()
353 {
354   lock.Lock();
355   if (state == DISCONNECTED) {
356     lock.Unlock();
357     return;
358   }
359
360   bool need_objecter = false;
361   if (objecter && objecter->initialized) {
362     need_objecter = true;
363   }
364
365   if (state == CONNECTED) {
366     if (need_objecter) {
367       // make sure watch callbacks are flushed
368       watch_flush();
369     }
370     finisher.wait_for_empty();
371     finisher.stop();
372   }
373   state = DISCONNECTED;
374   instance_id = 0;
375   timer.shutdown();   // will drop+retake lock
376   lock.Unlock();
377   if (need_objecter) {
378     objecter->shutdown();
379   }
380   mgrclient.shutdown();
381
382   monclient.shutdown();
383   if (messenger) {
384     messenger->shutdown();
385     messenger->wait();
386   }
387   ldout(cct, 1) << "shutdown" << dendl;
388 }
389
390 int librados::RadosClient::watch_flush()
391 {
392   ldout(cct, 10) << __func__ << " enter" << dendl;
393   Mutex mylock("RadosClient::watch_flush::mylock");
394   Cond cond;
395   bool done;
396   objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
397
398   mylock.Lock();
399   while (!done)
400     cond.Wait(mylock);
401   mylock.Unlock();
402
403   ldout(cct, 10) << __func__ << " exit" << dendl;
404   return 0;
405 }
406
407 struct C_aio_watch_flush_Complete : public Context {
408   librados::RadosClient *client;
409   librados::AioCompletionImpl *c;
410
411   C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
412     : client(_client), c(_c) {
413     c->get();
414   }
415
416   void finish(int r) override {
417     c->lock.Lock();
418     c->rval = r;
419     c->complete = true;
420     c->cond.Signal();
421
422     if (c->callback_complete ||
423         c->callback_safe) {
424       client->finisher.queue(new librados::C_AioComplete(c));
425     }
426     c->put_unlock();
427   }
428 };
429
430 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
431 {
432   ldout(cct, 10) << __func__ << " enter" << dendl;
433   Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
434   objecter->linger_callback_flush(oncomplete);
435   ldout(cct, 10) << __func__ << " exit" << dendl;
436   return 0;
437 }
438
439 uint64_t librados::RadosClient::get_instance_id()
440 {
441   return instance_id;
442 }
443
444 librados::RadosClient::~RadosClient()
445 {
446   if (messenger)
447     delete messenger;
448   if (objecter)
449     delete objecter;
450   cct = NULL;
451 }
452
453 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
454 {
455   int64_t poolid = lookup_pool(name);
456   if (poolid < 0) {
457     return (int)poolid;
458   }
459
460   *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
461   return 0;
462 }
463
464 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
465 {
466   *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
467   return 0;
468 }
469
470 bool librados::RadosClient::ms_dispatch(Message *m)
471 {
472   bool ret;
473
474   Mutex::Locker l(lock);
475   if (state == DISCONNECTED) {
476     ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
477     m->put();
478     ret = true;
479   } else {
480     ret = _dispatch(m);
481   }
482   return ret;
483 }
484
485 void librados::RadosClient::ms_handle_connect(Connection *con)
486 {
487 }
488
489 bool librados::RadosClient::ms_handle_reset(Connection *con)
490 {
491   return false;
492 }
493
494 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
495 {
496 }
497
498 bool librados::RadosClient::ms_handle_refused(Connection *con)
499 {
500   return false;
501 }
502
503 bool librados::RadosClient::_dispatch(Message *m)
504 {
505   assert(lock.is_locked());
506   switch (m->get_type()) {
507   // OSD
508   case CEPH_MSG_OSD_MAP:
509     cond.Signal();
510     m->put();
511     break;
512
513   case CEPH_MSG_MDS_MAP:
514     m->put();
515     break;
516
517   case MSG_LOG:
518     handle_log(static_cast<MLog *>(m));
519     break;
520
521   default:
522     return false;
523   }
524
525   return true;
526 }
527
528
529 int librados::RadosClient::wait_for_osdmap()
530 {
531   assert(!lock.is_locked_by_me());
532
533   if (state != CONNECTED) {
534     return -ENOTCONN;
535   }
536
537   bool need_map = false;
538   objecter->with_osdmap([&](const OSDMap& o) {
539       if (o.get_epoch() == 0) {
540         need_map = true;
541       }
542     });
543
544   if (need_map) {
545     Mutex::Locker l(lock);
546
547     utime_t timeout;
548     if (cct->_conf->rados_mon_op_timeout > 0)
549       timeout.set_from_double(cct->_conf->rados_mon_op_timeout);
550
551     if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
552       ldout(cct, 10) << __func__ << " waiting" << dendl;
553       utime_t start = ceph_clock_now();
554       while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
555         if (timeout.is_zero()) {
556           cond.Wait(lock);
557         } else {
558           cond.WaitInterval(lock, timeout);
559           utime_t elapsed = ceph_clock_now() - start;
560           if (elapsed > timeout) {
561             lderr(cct) << "timed out waiting for first osdmap from monitors"
562                        << dendl;
563             return -ETIMEDOUT;
564           }
565         }
566       }
567       ldout(cct, 10) << __func__ << " done waiting" << dendl;
568     }
569     return 0;
570   } else {
571     return 0;
572   }
573 }
574
575
576 int librados::RadosClient::wait_for_latest_osdmap()
577 {
578   Mutex mylock("RadosClient::wait_for_latest_osdmap");
579   Cond cond;
580   bool done;
581
582   objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
583
584   mylock.Lock();
585   while (!done)
586     cond.Wait(mylock);
587   mylock.Unlock();
588
589   return 0;
590 }
591
592 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
593 {
594   int r = wait_for_osdmap();
595   if (r < 0)
596     return r;
597
598   objecter->with_osdmap([&](const OSDMap& o) {
599       for (auto p : o.get_pools())
600         v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
601     });
602   return 0;
603 }
604
605 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
606                                           map<string,::pool_stat_t>& result)
607 {
608   Mutex mylock("RadosClient::get_pool_stats::mylock");
609   Cond cond;
610   bool done;
611   int ret = 0;
612
613   objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done,
614                                                           &ret));
615
616   mylock.Lock();
617   while (!done)
618     cond.Wait(mylock);
619   mylock.Unlock();
620
621   return ret;
622 }
623
624 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
625   const std::string& pool)
626 {
627   bool ret = false;
628   objecter->with_osdmap([&](const OSDMap& osdmap) {
629       int64_t poolid = osdmap.lookup_pg_pool_name(pool);
630       if (poolid >= 0)
631         ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
632     });
633   return ret;
634 }
635
636 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
637 {
638   Mutex mylock ("RadosClient::get_fs_stats::mylock");
639   Cond cond;
640   bool done;
641   int ret = 0;
642
643   lock.Lock();
644   objecter->get_fs_stats(stats, boost::optional<int64_t> (),
645                          new C_SafeCond(&mylock, &cond, &done, &ret));
646   lock.Unlock();
647
648   mylock.Lock();
649   while (!done) cond.Wait(mylock);
650   mylock.Unlock();
651
652   return ret;
653 }
654
655 void librados::RadosClient::get() {
656   Mutex::Locker l(lock);
657   assert(refcnt > 0);
658   refcnt++;
659 }
660
661 bool librados::RadosClient::put() {
662   Mutex::Locker l(lock);
663   assert(refcnt > 0);
664   refcnt--;
665   return (refcnt == 0);
666 }
667  
668 int librados::RadosClient::pool_create(string& name, unsigned long long auid,
669                                        int16_t crush_rule)
670 {
671   int r = wait_for_osdmap();
672   if (r < 0) {
673     return r;
674   }
675
676   Mutex mylock ("RadosClient::pool_create::mylock");
677   int reply;
678   Cond cond;
679   bool done;
680   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
681   reply = objecter->create_pool(name, onfinish, auid, crush_rule);
682
683   if (reply < 0) {
684     delete onfinish;
685   } else {
686     mylock.Lock();
687     while(!done)
688       cond.Wait(mylock);
689     mylock.Unlock();
690   }
691   return reply;
692 }
693
694 int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
695                                              unsigned long long auid,
696                                              int16_t crush_rule)
697 {
698   int r = wait_for_osdmap();
699   if (r < 0)
700     return r;
701
702   Context *onfinish = new C_PoolAsync_Safe(c);
703   r = objecter->create_pool(name, onfinish, auid, crush_rule);
704   if (r < 0) {
705     delete onfinish;
706   }
707   return r;
708 }
709
710 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
711 {
712   int r = wait_for_osdmap();
713   if (r < 0) {
714     return r;
715   }
716
717   objecter->with_osdmap([&](const OSDMap& o) {
718       const pg_pool_t* pool = o.get_pg_pool(pool_id);
719       if (pool) {
720         if (pool->tier_of < 0) {
721           *base_tier = pool_id;
722         } else {
723           *base_tier = pool->tier_of;
724         }
725         r = 0;
726       } else {
727         r = -ENOENT;
728       }
729     });
730   return r;
731 }
732
733 int librados::RadosClient::pool_delete(const char *name)
734 {
735   int r = wait_for_osdmap();
736   if (r < 0) {
737     return r;
738   }
739
740   Mutex mylock("RadosClient::pool_delete::mylock");
741   Cond cond;
742   bool done;
743   int ret;
744   Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
745   ret = objecter->delete_pool(name, onfinish);
746
747   if (ret < 0) {
748     delete onfinish;
749   } else {
750     mylock.Lock();
751     while (!done)
752       cond.Wait(mylock);
753     mylock.Unlock();
754   }
755   return ret;
756 }
757
758 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
759 {
760   int r = wait_for_osdmap();
761   if (r < 0)
762     return r;
763
764   Context *onfinish = new C_PoolAsync_Safe(c);
765   r = objecter->delete_pool(name, onfinish);
766   if (r < 0) {
767     delete onfinish;
768   }
769   return r;
770 }
771
772 void librados::RadosClient::blacklist_self(bool set) {
773   Mutex::Locker l(lock);
774   objecter->blacklist_self(set);
775 }
776
777 int librados::RadosClient::blacklist_add(const string& client_address,
778                                          uint32_t expire_seconds)
779 {
780   entity_addr_t addr;
781   if (!addr.parse(client_address.c_str(), 0)) {
782     lderr(cct) << "unable to parse address " << client_address << dendl;
783     return -EINVAL;
784   }
785
786   std::stringstream cmd;
787   cmd << "{"
788       << "\"prefix\": \"osd blacklist\", "
789       << "\"blacklistop\": \"add\", "
790       << "\"addr\": \"" << client_address << "\"";
791   if (expire_seconds != 0) {
792     cmd << ", \"expire\": " << expire_seconds << ".0";
793   }
794   cmd << "}";
795
796   std::vector<std::string> cmds;
797   cmds.push_back(cmd.str());
798   bufferlist inbl;
799   int r = mon_command(cmds, inbl, NULL, NULL);
800   if (r < 0) {
801     return r;
802   }
803
804   // ensure we have the latest osd map epoch before proceeding
805   r = wait_for_latest_osdmap();
806   return r;
807 }
808
809 int librados::RadosClient::mon_command(const vector<string>& cmd,
810                                        const bufferlist &inbl,
811                                        bufferlist *outbl, string *outs)
812 {
813   C_SaferCond ctx;
814   mon_command_async(cmd, inbl, outbl, outs, &ctx);
815   return ctx.wait();
816 }
817
818 void librados::RadosClient::mon_command_async(const vector<string>& cmd,
819                                               const bufferlist &inbl,
820                                               bufferlist *outbl, string *outs,
821                                               Context *on_finish)
822 {
823   lock.Lock();
824   monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
825   lock.Unlock();
826 }
827
828 int librados::RadosClient::mgr_command(const vector<string>& cmd,
829                                        const bufferlist &inbl,
830                                        bufferlist *outbl, string *outs)
831 {
832   Mutex::Locker l(lock);
833
834   C_SaferCond cond;
835   int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
836   if (r < 0)
837     return r;
838
839   lock.Unlock();
840   r = cond.wait();
841   lock.Lock();
842
843   return r;
844 }
845
846
847 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
848                                        const bufferlist &inbl,
849                                        bufferlist *outbl, string *outs)
850 {
851   Mutex mylock("RadosClient::mon_command::mylock");
852   Cond cond;
853   bool done;
854   int rval;
855   lock.Lock();
856   monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
857                                new C_SafeCond(&mylock, &cond, &done, &rval));
858   lock.Unlock();
859   mylock.Lock();
860   while (!done)
861     cond.Wait(mylock);
862   mylock.Unlock();
863   return rval;
864 }
865
866 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
867                                        const bufferlist &inbl,
868                                        bufferlist *outbl, string *outs)
869 {
870   Mutex mylock("RadosClient::mon_command::mylock");
871   Cond cond;
872   bool done;
873   int rval;
874   lock.Lock();
875   monclient.start_mon_command(name, cmd, inbl, outbl, outs,
876                                new C_SafeCond(&mylock, &cond, &done, &rval));
877   lock.Unlock();
878   mylock.Lock();
879   while (!done)
880     cond.Wait(mylock);
881   mylock.Unlock();
882   return rval;
883 }
884
885 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
886                                        const bufferlist& inbl,
887                                        bufferlist *poutbl, string *prs)
888 {
889   Mutex mylock("RadosClient::osd_command::mylock");
890   Cond cond;
891   bool done;
892   int ret;
893   ceph_tid_t tid;
894
895   if (osd < 0)
896     return -EINVAL;
897
898   lock.Lock();
899   // XXX do anything with tid?
900   objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
901                         new C_SafeCond(&mylock, &cond, &done, &ret));
902   lock.Unlock();
903   mylock.Lock();
904   while (!done)
905     cond.Wait(mylock);
906   mylock.Unlock();
907   return ret;
908 }
909
910 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
911                                       const bufferlist& inbl,
912                                       bufferlist *poutbl, string *prs)
913 {
914   Mutex mylock("RadosClient::pg_command::mylock");
915   Cond cond;
916   bool done;
917   int ret;
918   ceph_tid_t tid;
919   lock.Lock();
920   objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
921                        new C_SafeCond(&mylock, &cond, &done, &ret));
922   lock.Unlock();
923   mylock.Lock();
924   while (!done)
925     cond.Wait(mylock);
926   mylock.Unlock();
927   return ret;
928 }
929
930 int librados::RadosClient::monitor_log(const string& level,
931                                        rados_log_callback_t cb,
932                                        rados_log_callback2_t cb2,
933                                        void *arg)
934 {
935   Mutex::Locker l(lock);
936
937   if (state != CONNECTED) {
938     return -ENOTCONN;
939   }
940
941   if (cb == NULL && cb2 == NULL) {
942     // stop watch
943     ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
944                    << " " << (void*)log_cb2 << dendl;
945     monclient.sub_unwant(log_watch);
946     log_watch.clear();
947     log_cb = NULL;
948     log_cb2 = NULL;
949     log_cb_arg = NULL;
950     return 0;
951   }
952
953   string watch_level;
954   if (level == "debug") {
955     watch_level = "log-debug";
956   } else if (level == "info") {
957     watch_level = "log-info";
958   } else if (level == "warn" || level == "warning") {
959     watch_level = "log-warn";
960   } else if (level == "err" || level == "error") {
961     watch_level = "log-error";
962   } else if (level == "sec") {
963     watch_level = "log-sec";
964   } else {
965     ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
966     return -EINVAL;
967   }
968
969   if (log_cb || log_cb2)
970     monclient.sub_unwant(log_watch);
971
972   // (re)start watch
973   ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
974                  << " level " << level << dendl;
975   monclient.sub_want(watch_level, 0, 0);
976
977   monclient.renew_subs();
978   log_cb = cb;
979   log_cb2 = cb2;
980   log_cb_arg = arg;
981   log_watch = watch_level;
982   return 0;
983 }
984
985 void librados::RadosClient::handle_log(MLog *m)
986 {
987   assert(lock.is_locked());
988   ldout(cct, 10) << __func__ << " version " << m->version << dendl;
989
990   if (log_last_version < m->version) {
991     log_last_version = m->version;
992
993     if (log_cb || log_cb2) {
994       for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
995         LogEntry e = *it;
996         ostringstream ss;
997         ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
998         string line = ss.str();
999         string who = stringify(e.who);
1000         string name = stringify(e.name);
1001         string level = stringify(e.prio);
1002         struct timespec stamp;
1003         e.stamp.to_timespec(&stamp);
1004
1005         ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
1006         if (log_cb)
1007           log_cb(log_cb_arg, line.c_str(), who.c_str(),
1008                  stamp.tv_sec, stamp.tv_nsec,
1009                  e.seq, level.c_str(), e.msg.c_str());
1010         if (log_cb2)
1011           log_cb2(log_cb_arg, line.c_str(),
1012                   e.channel.c_str(),
1013                   who.c_str(), name.c_str(),
1014                   stamp.tv_sec, stamp.tv_nsec,
1015                   e.seq, level.c_str(), e.msg.c_str());
1016       }
1017     }
1018
1019     monclient.sub_got(log_watch, log_last_version);
1020   }
1021
1022   m->put();
1023 }
1024
1025 int librados::RadosClient::service_daemon_register(
1026   const std::string& service,  ///< service name (e.g., 'rgw')
1027   const std::string& name,     ///< daemon name (e.g., 'gwfoo')
1028   const std::map<std::string,std::string>& metadata)
1029 {
1030   if (service_daemon) {
1031     return -EEXIST;
1032   }
1033   if (service == "osd" ||
1034       service == "mds" ||
1035       service == "client" ||
1036       service == "mon" ||
1037       service == "mgr") {
1038     // normal ceph entity types are not allowed!
1039     return -EINVAL;
1040   }
1041   if (service.empty() || name.empty()) {
1042     return -EINVAL;
1043   }
1044
1045   collect_sys_info(&daemon_metadata, cct);
1046
1047   ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1048   service_daemon = true;
1049   service_name = service;
1050   daemon_name = name;
1051   daemon_metadata.insert(metadata.begin(), metadata.end());
1052
1053   if (state == DISCONNECTED) {
1054     return 0;
1055   }
1056   if (state == CONNECTING) {
1057     return -EBUSY;
1058   }
1059   mgrclient.service_daemon_register(service_name, daemon_name,
1060                                     daemon_metadata);
1061   return 0;
1062 }
1063
1064 int librados::RadosClient::service_daemon_update_status(
1065   const std::map<std::string,std::string>& status)
1066 {
1067   if (state != CONNECTED) {
1068     return -ENOTCONN;
1069   }
1070   return mgrclient.service_daemon_update_status(status);
1071 }
1072
1073 mon_feature_t librados::RadosClient::get_required_monitor_features() const
1074 {
1075   return monclient.monmap.get_required_features();
1076 }