1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/stringify.h"
31 #include "include/util.h"
33 #include "messages/MLog.h"
34 #include "msg/Messenger.h"
36 // needed for static_cast
37 #include "messages/PaxosServiceMessage.h"
38 #include "messages/MPoolOpReply.h"
39 #include "messages/MStatfsReply.h"
40 #include "messages/MGetPoolStatsReply.h"
41 #include "messages/MOSDOpReply.h"
42 #include "messages/MOSDMap.h"
43 #include "messages/MCommandReply.h"
45 #include "AioCompletionImpl.h"
46 #include "IoCtxImpl.h"
47 #include "PoolAsyncCompletionImpl.h"
48 #include "RadosClient.h"
50 #include "include/assert.h"
51 #include "common/EventTrace.h"
53 #define dout_subsys ceph_subsys_rados
55 #define dout_prefix *_dout << "librados: "
57 bool librados::RadosClient::ms_get_authorizer(int dest_type,
58 AuthAuthorizer **authorizer,
60 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
61 /* monitor authorization is being handled on different layer */
62 if (dest_type == CEPH_ENTITY_TYPE_MON)
64 *authorizer = monclient.build_authorizer(dest_type);
65 return *authorizer != NULL;
68 librados::RadosClient::RadosClient(CephContext *cct_)
69 : Dispatcher(cct_->get()),
70 cct_deleter{cct_, [](CephContext *p) {p->put();}},
74 mgrclient(cct_, nullptr),
78 lock("librados::RadosClient::lock"),
81 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
82 finisher(cct, "radosclient", "fn-radosclient")
86 int64_t librados::RadosClient::lookup_pool(const char *name)
88 int r = wait_for_osdmap();
93 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
96 // Make sure we have the latest map
97 int r = wait_for_latest_osdmap();
100 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
107 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
110 int r = pool_requires_alignment2(pool_id, &requires);
112 // Cast answer to false, this is a little bit problematic
113 // since we really don't know the answer yet, say.
120 // a safer version of pool_requires_alignment
121 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
127 int r = wait_for_osdmap();
132 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
133 if (!o.have_pg_pool(pool_id)) {
136 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
141 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
144 int r = pool_required_alignment2(pool_id, &alignment);
152 // a safer version of pool_required_alignment
153 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
159 int r = wait_for_osdmap();
164 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
165 if (!o.have_pg_pool(pool_id)) {
168 *alignment = o.get_pg_pool(pool_id)->required_alignment();
173 int librados::RadosClient::pool_get_auid(uint64_t pool_id,
174 unsigned long long *auid)
176 int r = wait_for_osdmap();
179 objecter->with_osdmap([&](const OSDMap& o) {
180 const pg_pool_t *pg = o.get_pg_pool(pool_id);
191 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s)
193 int r = wait_for_osdmap();
196 objecter->with_osdmap([&](const OSDMap& o) {
197 if (!o.have_pg_pool(pool_id)) {
201 *s = o.get_pool_name(pool_id);
207 int librados::RadosClient::get_fsid(std::string *s)
211 Mutex::Locker l(lock);
213 oss << monclient.get_fsid();
218 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
221 /* If we haven't yet connected, we have no way of telling whether we
222 * already built monc's initial monmap. IF we are in CONNECTED state,
223 * then it is safe to assume that we went through connect(), which does
226 if (state != CONNECTED) {
227 ldout(cct, 10) << __func__ << " build monmap" << dendl;
228 err = monclient.build_initial_monmap();
234 err = monclient.ping_monitor(mon_id, result);
238 int librados::RadosClient::connect()
240 common_init_finish(cct);
244 // already connected?
245 if (state == CONNECTING)
247 if (state == CONNECTED)
252 err = monclient.build_initial_monmap();
257 messenger = Messenger::create_client_messenger(cct, "radosclient");
261 // require OSDREPLYMUX feature. this means we will fail to talk to
262 // old servers. this is necessary because otherwise we won't know
263 // how to decompose the reply data into its constituent pieces.
264 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
266 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
268 ldout(cct, 1) << "starting objecter" << dendl;
270 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
272 cct->_conf->rados_mon_op_timeout,
273 cct->_conf->rados_osd_op_timeout);
276 objecter->set_balanced_budget();
278 monclient.set_messenger(messenger);
279 mgrclient.set_messenger(messenger);
282 messenger->add_dispatcher_head(&mgrclient);
283 messenger->add_dispatcher_tail(objecter);
284 messenger->add_dispatcher_tail(this);
288 ldout(cct, 1) << "setting wanted keys" << dendl;
289 monclient.set_want_keys(
290 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
291 ldout(cct, 1) << "calling monclient init" << dendl;
292 err = monclient.init();
294 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
299 err = monclient.authenticate(conf->client_mount_timeout);
301 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
305 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
307 // MgrClient needs this (it doesn't have MonClient reference itself)
308 monclient.sub_want("mgrmap", 0, 0);
309 monclient.renew_subs();
311 if (service_daemon) {
312 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
313 << daemon_name << dendl;
314 mgrclient.service_daemon_register(service_name, daemon_name,
319 objecter->set_client_incarnation(0);
328 instance_id = monclient.get_global_id();
332 ldout(cct, 1) << "init done" << dendl;
337 state = DISCONNECTED;
352 void librados::RadosClient::shutdown()
355 if (state == DISCONNECTED) {
360 bool need_objecter = false;
361 if (objecter && objecter->initialized) {
362 need_objecter = true;
365 if (state == CONNECTED) {
367 // make sure watch callbacks are flushed
370 finisher.wait_for_empty();
373 state = DISCONNECTED;
375 timer.shutdown(); // will drop+retake lock
378 objecter->shutdown();
380 mgrclient.shutdown();
382 monclient.shutdown();
384 messenger->shutdown();
387 ldout(cct, 1) << "shutdown" << dendl;
390 int librados::RadosClient::watch_flush()
392 ldout(cct, 10) << __func__ << " enter" << dendl;
393 Mutex mylock("RadosClient::watch_flush::mylock");
396 objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
403 ldout(cct, 10) << __func__ << " exit" << dendl;
407 struct C_aio_watch_flush_Complete : public Context {
408 librados::RadosClient *client;
409 librados::AioCompletionImpl *c;
411 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
412 : client(_client), c(_c) {
416 void finish(int r) override {
422 if (c->callback_complete ||
424 client->finisher.queue(new librados::C_AioComplete(c));
430 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
432 ldout(cct, 10) << __func__ << " enter" << dendl;
433 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
434 objecter->linger_callback_flush(oncomplete);
435 ldout(cct, 10) << __func__ << " exit" << dendl;
439 uint64_t librados::RadosClient::get_instance_id()
444 librados::RadosClient::~RadosClient()
453 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
455 int64_t poolid = lookup_pool(name);
460 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
464 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
466 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
470 bool librados::RadosClient::ms_dispatch(Message *m)
474 Mutex::Locker l(lock);
475 if (state == DISCONNECTED) {
476 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
485 void librados::RadosClient::ms_handle_connect(Connection *con)
489 bool librados::RadosClient::ms_handle_reset(Connection *con)
494 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
498 bool librados::RadosClient::ms_handle_refused(Connection *con)
503 bool librados::RadosClient::_dispatch(Message *m)
505 assert(lock.is_locked());
506 switch (m->get_type()) {
508 case CEPH_MSG_OSD_MAP:
513 case CEPH_MSG_MDS_MAP:
518 handle_log(static_cast<MLog *>(m));
529 int librados::RadosClient::wait_for_osdmap()
531 assert(!lock.is_locked_by_me());
533 if (state != CONNECTED) {
537 bool need_map = false;
538 objecter->with_osdmap([&](const OSDMap& o) {
539 if (o.get_epoch() == 0) {
545 Mutex::Locker l(lock);
548 if (cct->_conf->rados_mon_op_timeout > 0)
549 timeout.set_from_double(cct->_conf->rados_mon_op_timeout);
551 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
552 ldout(cct, 10) << __func__ << " waiting" << dendl;
553 utime_t start = ceph_clock_now();
554 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
555 if (timeout.is_zero()) {
558 cond.WaitInterval(lock, timeout);
559 utime_t elapsed = ceph_clock_now() - start;
560 if (elapsed > timeout) {
561 lderr(cct) << "timed out waiting for first osdmap from monitors"
567 ldout(cct, 10) << __func__ << " done waiting" << dendl;
576 int librados::RadosClient::wait_for_latest_osdmap()
578 Mutex mylock("RadosClient::wait_for_latest_osdmap");
582 objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
592 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
594 int r = wait_for_osdmap();
598 objecter->with_osdmap([&](const OSDMap& o) {
599 for (auto p : o.get_pools())
600 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
605 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
606 map<string,::pool_stat_t>& result)
608 Mutex mylock("RadosClient::get_pool_stats::mylock");
613 objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done,
624 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
625 const std::string& pool)
628 objecter->with_osdmap([&](const OSDMap& osdmap) {
629 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
631 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
636 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
638 Mutex mylock ("RadosClient::get_fs_stats::mylock");
644 objecter->get_fs_stats(stats, boost::optional<int64_t> (),
645 new C_SafeCond(&mylock, &cond, &done, &ret));
649 while (!done) cond.Wait(mylock);
655 void librados::RadosClient::get() {
656 Mutex::Locker l(lock);
661 bool librados::RadosClient::put() {
662 Mutex::Locker l(lock);
665 return (refcnt == 0);
668 int librados::RadosClient::pool_create(string& name, unsigned long long auid,
671 int r = wait_for_osdmap();
676 Mutex mylock ("RadosClient::pool_create::mylock");
680 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
681 reply = objecter->create_pool(name, onfinish, auid, crush_rule);
694 int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
695 unsigned long long auid,
698 int r = wait_for_osdmap();
702 Context *onfinish = new C_PoolAsync_Safe(c);
703 r = objecter->create_pool(name, onfinish, auid, crush_rule);
710 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
712 int r = wait_for_osdmap();
717 objecter->with_osdmap([&](const OSDMap& o) {
718 const pg_pool_t* pool = o.get_pg_pool(pool_id);
720 if (pool->tier_of < 0) {
721 *base_tier = pool_id;
723 *base_tier = pool->tier_of;
733 int librados::RadosClient::pool_delete(const char *name)
735 int r = wait_for_osdmap();
740 Mutex mylock("RadosClient::pool_delete::mylock");
744 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
745 ret = objecter->delete_pool(name, onfinish);
758 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
760 int r = wait_for_osdmap();
764 Context *onfinish = new C_PoolAsync_Safe(c);
765 r = objecter->delete_pool(name, onfinish);
772 void librados::RadosClient::blacklist_self(bool set) {
773 Mutex::Locker l(lock);
774 objecter->blacklist_self(set);
777 int librados::RadosClient::blacklist_add(const string& client_address,
778 uint32_t expire_seconds)
781 if (!addr.parse(client_address.c_str(), 0)) {
782 lderr(cct) << "unable to parse address " << client_address << dendl;
786 std::stringstream cmd;
788 << "\"prefix\": \"osd blacklist\", "
789 << "\"blacklistop\": \"add\", "
790 << "\"addr\": \"" << client_address << "\"";
791 if (expire_seconds != 0) {
792 cmd << ", \"expire\": " << expire_seconds << ".0";
796 std::vector<std::string> cmds;
797 cmds.push_back(cmd.str());
799 int r = mon_command(cmds, inbl, NULL, NULL);
804 // ensure we have the latest osd map epoch before proceeding
805 r = wait_for_latest_osdmap();
809 int librados::RadosClient::mon_command(const vector<string>& cmd,
810 const bufferlist &inbl,
811 bufferlist *outbl, string *outs)
814 mon_command_async(cmd, inbl, outbl, outs, &ctx);
818 void librados::RadosClient::mon_command_async(const vector<string>& cmd,
819 const bufferlist &inbl,
820 bufferlist *outbl, string *outs,
824 monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
828 int librados::RadosClient::mgr_command(const vector<string>& cmd,
829 const bufferlist &inbl,
830 bufferlist *outbl, string *outs)
832 Mutex::Locker l(lock);
835 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
847 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
848 const bufferlist &inbl,
849 bufferlist *outbl, string *outs)
851 Mutex mylock("RadosClient::mon_command::mylock");
856 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
857 new C_SafeCond(&mylock, &cond, &done, &rval));
866 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
867 const bufferlist &inbl,
868 bufferlist *outbl, string *outs)
870 Mutex mylock("RadosClient::mon_command::mylock");
875 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
876 new C_SafeCond(&mylock, &cond, &done, &rval));
885 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
886 const bufferlist& inbl,
887 bufferlist *poutbl, string *prs)
889 Mutex mylock("RadosClient::osd_command::mylock");
899 // XXX do anything with tid?
900 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
901 new C_SafeCond(&mylock, &cond, &done, &ret));
910 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
911 const bufferlist& inbl,
912 bufferlist *poutbl, string *prs)
914 Mutex mylock("RadosClient::pg_command::mylock");
920 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
921 new C_SafeCond(&mylock, &cond, &done, &ret));
930 int librados::RadosClient::monitor_log(const string& level,
931 rados_log_callback_t cb,
932 rados_log_callback2_t cb2,
935 Mutex::Locker l(lock);
937 if (state != CONNECTED) {
941 if (cb == NULL && cb2 == NULL) {
943 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
944 << " " << (void*)log_cb2 << dendl;
945 monclient.sub_unwant(log_watch);
954 if (level == "debug") {
955 watch_level = "log-debug";
956 } else if (level == "info") {
957 watch_level = "log-info";
958 } else if (level == "warn" || level == "warning") {
959 watch_level = "log-warn";
960 } else if (level == "err" || level == "error") {
961 watch_level = "log-error";
962 } else if (level == "sec") {
963 watch_level = "log-sec";
965 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
969 if (log_cb || log_cb2)
970 monclient.sub_unwant(log_watch);
973 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
974 << " level " << level << dendl;
975 monclient.sub_want(watch_level, 0, 0);
977 monclient.renew_subs();
981 log_watch = watch_level;
985 void librados::RadosClient::handle_log(MLog *m)
987 assert(lock.is_locked());
988 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
990 if (log_last_version < m->version) {
991 log_last_version = m->version;
993 if (log_cb || log_cb2) {
994 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
997 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
998 string line = ss.str();
999 string who = stringify(e.who);
1000 string name = stringify(e.name);
1001 string level = stringify(e.prio);
1002 struct timespec stamp;
1003 e.stamp.to_timespec(&stamp);
1005 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
1007 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1008 stamp.tv_sec, stamp.tv_nsec,
1009 e.seq, level.c_str(), e.msg.c_str());
1011 log_cb2(log_cb_arg, line.c_str(),
1013 who.c_str(), name.c_str(),
1014 stamp.tv_sec, stamp.tv_nsec,
1015 e.seq, level.c_str(), e.msg.c_str());
1019 monclient.sub_got(log_watch, log_last_version);
1025 int librados::RadosClient::service_daemon_register(
1026 const std::string& service, ///< service name (e.g., 'rgw')
1027 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1028 const std::map<std::string,std::string>& metadata)
1030 if (service_daemon) {
1033 if (service == "osd" ||
1035 service == "client" ||
1038 // normal ceph entity types are not allowed!
1041 if (service.empty() || name.empty()) {
1045 collect_sys_info(&daemon_metadata, cct);
1047 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1048 service_daemon = true;
1049 service_name = service;
1051 daemon_metadata.insert(metadata.begin(), metadata.end());
1053 if (state == DISCONNECTED) {
1056 if (state == CONNECTING) {
1059 mgrclient.service_daemon_register(service_name, daemon_name,
1064 int librados::RadosClient::service_daemon_update_status(
1065 const std::map<std::string,std::string>& status)
1067 if (state != CONNECTED) {
1070 return mgrclient.service_daemon_update_status(status);
1073 mon_feature_t librados::RadosClient::get_required_monitor_features() const
1075 return monclient.monmap.get_required_features();