1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/optional.hpp>
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
17 #include "rgw_metadata.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_tools.h"
20 #include "rgw_cr_rados.h"
21 #include "rgw_cr_rest.h"
22 #include "rgw_http_client.h"
24 #include "cls/lock/cls_lock_client.h"
26 #include <boost/asio/yield.hpp>
28 #define dout_subsys ceph_subsys_rgw
31 #define dout_prefix (*_dout << "meta sync: ")
33 static string mdlog_sync_status_oid = "mdlog.sync-status";
34 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
35 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
37 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
38 for (int i = 0; i < num_shards; i++) {
39 oids.push_back(get_shard_oid(oid_prefix, i));
42 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
43 char buf[oid_prefix.size() + 16];
44 snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
48 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
51 rgw_sync_error_info info(source_zone, error_code, message);
54 store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
56 uint32_t shard_id = ++counter % num_shards;
59 return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
62 void RGWSyncBackoff::update_wait_time()
67 cur_wait = (cur_wait << 1);
69 if (cur_wait >= max_secs) {
74 void RGWSyncBackoff::backoff_sleep()
80 void RGWSyncBackoff::backoff(RGWCoroutine *op)
83 op->wait(utime_t(cur_wait, 0));
86 int RGWBackoffControlCR::operate() {
88 // retry the operation until it succeeds
91 Mutex::Locker l(lock);
97 Mutex::Locker l(lock);
104 if (retcode != -EBUSY && retcode != -EAGAIN) {
105 ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
107 return set_cr_error(retcode);
113 yield backoff.backoff(this);
116 // run an optional finisher
117 yield call(alloc_finisher_cr());
119 ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
120 return set_cr_error(retcode);
122 return set_cr_done();
127 void rgw_mdlog_info::decode_json(JSONObj *obj) {
128 JSONDecoder::decode_json("num_objects", num_shards, obj);
129 JSONDecoder::decode_json("period", period, obj);
130 JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
133 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
134 JSONDecoder::decode_json("id", id, obj);
135 JSONDecoder::decode_json("section", section, obj);
136 JSONDecoder::decode_json("name", name, obj);
138 JSONDecoder::decode_json("timestamp", ut, obj);
139 timestamp = ut.to_real_time();
140 JSONDecoder::decode_json("data", log_data, obj);
143 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
144 JSONDecoder::decode_json("marker", marker, obj);
145 JSONDecoder::decode_json("truncated", truncated, obj);
146 JSONDecoder::decode_json("entries", entries, obj);
149 int RGWShardCollectCR::operate() {
151 while (spawn_next()) {
154 while (current_running >= max_concurrent) {
156 yield wait_for_child();
157 if (collect_next(&child_ret)) {
159 if (child_ret < 0 && child_ret != -ENOENT) {
160 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
166 while (current_running > 0) {
168 yield wait_for_child();
169 if (collect_next(&child_ret)) {
171 if (child_ret < 0 && child_ret != -ENOENT) {
172 ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
178 return set_cr_error(status);
180 return set_cr_done();
185 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
186 RGWMetaSyncEnv *sync_env;
188 const std::string& period;
190 map<int, RGWMetadataLogInfo> *mdlog_info;
193 #define READ_MDLOG_MAX_CONCURRENT 10
196 RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
197 const std::string& period, int _num_shards,
198 map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
200 period(period), num_shards(_num_shards),
201 mdlog_info(_mdlog_info), shard_id(0) {}
202 bool spawn_next() override;
205 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
206 RGWMetaSyncEnv *sync_env;
208 const std::string& period;
209 map<int, string> shards;
210 int max_entries_per_shard;
211 map<int, rgw_mdlog_shard_data> *result;
213 map<int, string>::iterator iter;
214 #define READ_MDLOG_MAX_CONCURRENT 10
217 RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
218 const std::string& period, map<int, string>& _shards,
219 int _max_entries_per_shard,
220 map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
221 sync_env(_sync_env), period(period),
222 max_entries_per_shard(_max_entries_per_shard),
224 shards.swap(_shards);
225 iter = shards.begin();
227 bool spawn_next() override;
230 RGWRemoteMetaLog::~RGWRemoteMetaLog()
235 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
237 rgw_http_param_pair pairs[] = { { "type", "metadata" },
240 int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
242 ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
246 ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
251 int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
253 if (store->is_meta_master()) {
257 rgw_mdlog_info log_info;
258 int ret = read_log_info(&log_info);
263 return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
266 int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
268 if (store->is_meta_master()) {
272 return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
275 int RGWRemoteMetaLog::init()
277 conn = store->rest_master_conn;
279 int ret = http_manager.set_threaded();
281 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
285 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
287 init_sync_env(&sync_env);
292 void RGWRemoteMetaLog::finish()
298 #define CLONE_MAX_ENTRIES 100
300 int RGWMetaSyncStatusManager::init()
302 if (store->is_meta_master()) {
306 if (!store->rest_master_conn) {
307 lderr(store->ctx()) << "no REST connection to master zone" << dendl;
311 int r = rgw_init_ioctx(store->get_rados_handle(), store->get_zone_params().log_pool, ioctx, true);
313 lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool << " ret=" << r << dendl;
317 r = master_log.init();
319 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
323 RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
325 rgw_meta_sync_status sync_status;
326 r = read_sync_status(&sync_status);
327 if (r < 0 && r != -ENOENT) {
328 lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
332 int num_shards = sync_status.sync_info.num_shards;
334 for (int i = 0; i < num_shards; i++) {
335 shard_objs[i] = rgw_raw_obj(store->get_zone_params().log_pool, sync_env.shard_obj_name(i));
338 RWLock::WLocker wl(ts_to_shard_lock);
339 for (int i = 0; i < num_shards; i++) {
340 clone_markers.push_back(string());
349 void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
350 RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
351 RGWSyncErrorLogger *_error_logger) {
355 async_rados = _async_rados;
356 http_manager = _http_manager;
357 error_logger = _error_logger;
360 string RGWMetaSyncEnv::status_oid()
362 return mdlog_sync_status_oid;
365 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
367 char buf[mdlog_sync_status_shard_prefix.size() + 16];
368 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
373 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
375 RGWMetadataLog *mdlog;
379 list<cls_log_entry> *entries;
383 int _send_request() override {
389 mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
391 int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
393 mdlog->complete_list_entries(handle);
398 RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
399 RGWMetadataLog* mdlog, int _shard_id,
400 string* _marker, int _max_entries,
401 list<cls_log_entry> *_entries, bool *_truncated)
402 : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
403 shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
404 entries(_entries), truncated(_truncated) {}
407 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
408 RGWMetaSyncEnv *sync_env;
409 RGWMetadataLog *const mdlog;
414 list<cls_log_entry> *entries;
417 RGWAsyncReadMDLogEntries *req{nullptr};
420 RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
421 int _shard_id, string*_marker, int _max_entries,
422 list<cls_log_entry> *_entries, bool *_truncated)
423 : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
424 shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
425 entries(_entries), truncated(_truncated) {}
427 ~RGWReadMDLogEntriesCR() override {
433 int send_request() override {
435 req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
436 sync_env->store, mdlog, shard_id, &marker,
437 max_entries, entries, truncated);
438 sync_env->async_rados->queue(req);
442 int request_complete() override {
443 int ret = req->get_ret_status();
444 if (ret >= 0 && !entries->empty()) {
447 return req->get_ret_status();
452 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
454 RGWRESTReadResource *http_op;
456 const std::string& period;
458 RGWMetadataLogInfo *shard_info;
461 RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
462 int _shard_id, RGWMetadataLogInfo *_shard_info)
463 : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
464 period(period), shard_id(_shard_id), shard_info(_shard_info) {}
466 int operate() override {
467 auto store = env->store;
468 RGWRESTConn *conn = store->rest_master_conn;
472 snprintf(buf, sizeof(buf), "%d", shard_id);
473 rgw_http_param_pair pairs[] = { { "type" , "metadata" },
475 { "period", period.c_str() },
479 string p = "/admin/log/";
481 http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
484 http_op->set_user_info((void *)stack);
486 int ret = http_op->aio_read();
488 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
489 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
491 return set_cr_error(ret);
497 int ret = http_op->wait(shard_info);
500 return set_cr_error(ret);
502 return set_cr_done();
509 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
510 RGWMetaSyncEnv *sync_env;
511 RGWRESTReadResource *http_op;
513 const std::string& period;
516 uint32_t max_entries;
517 rgw_mdlog_shard_data *result;
520 RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
521 int _shard_id, const string& _marker, uint32_t _max_entries,
522 rgw_mdlog_shard_data *_result)
523 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
524 period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
526 int send_request() override {
527 RGWRESTConn *conn = sync_env->conn;
528 RGWRados *store = sync_env->store;
531 snprintf(buf, sizeof(buf), "%d", shard_id);
533 char max_entries_buf[32];
534 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
536 const char *marker_key = (marker.empty() ? "" : "marker");
538 rgw_http_param_pair pairs[] = { { "type", "metadata" },
540 { "period", period.c_str() },
541 { "max-entries", max_entries_buf },
542 { marker_key, marker.c_str() },
545 string p = "/admin/log/";
547 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
548 http_op->set_user_info((void *)stack);
550 int ret = http_op->aio_read();
552 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
553 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
561 int request_complete() override {
562 int ret = http_op->wait(result);
564 if (ret < 0 && ret != -ENOENT) {
565 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
572 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
573 if (shard_id >= num_shards) {
576 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
581 bool RGWListRemoteMDLogCR::spawn_next() {
582 if (iter == shards.end()) {
586 spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
591 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
592 RGWMetaSyncEnv *sync_env;
594 rgw_meta_sync_info status;
595 vector<RGWMetadataLogInfo> shards_info;
596 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
597 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
599 RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
600 const rgw_meta_sync_info &status)
601 : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
602 status(status), shards_info(status.num_shards),
603 lease_cr(nullptr), lease_stack(nullptr) {}
605 ~RGWInitSyncStatusCoroutine() override {
611 int operate() override {
615 set_status("acquiring sync lock");
616 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
617 string lock_name = "sync_lock";
618 RGWRados *store = sync_env->store;
619 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
620 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
621 lock_name, lock_duration, this));
622 lease_stack.reset(spawn(lease_cr.get(), false));
624 while (!lease_cr->is_locked()) {
625 if (lease_cr->is_done()) {
626 ldout(cct, 5) << "lease cr failed, done early " << dendl;
627 set_status("lease lock failed, early abort");
628 return set_cr_error(lease_cr->get_ret_status());
634 set_status("writing sync status");
635 RGWRados *store = sync_env->store;
636 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
637 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
642 set_status("failed to write sync status");
643 ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
644 yield lease_cr->go_down();
645 return set_cr_error(retcode);
647 /* fetch current position in logs */
648 set_status("fetching remote log position");
650 for (int i = 0; i < (int)status.num_shards; i++) {
651 spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
652 &shards_info[i]), false);
656 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
659 set_status("updating sync status");
660 for (int i = 0; i < (int)status.num_shards; i++) {
661 rgw_meta_sync_marker marker;
662 RGWMetadataLogInfo& info = shards_info[i];
663 marker.next_step_marker = info.marker;
664 marker.timestamp = info.last_update;
665 RGWRados *store = sync_env->store;
666 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
668 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
673 set_status("changing sync state: build full sync maps");
674 status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
675 RGWRados *store = sync_env->store;
676 call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
677 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
680 set_status("drop lock lease");
681 yield lease_cr->go_down();
682 while (collect(&ret, NULL)) {
684 return set_cr_error(ret);
689 return set_cr_done();
695 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
696 static constexpr int MAX_CONCURRENT_SHARDS = 16;
699 const int num_shards;
701 map<uint32_t, rgw_meta_sync_marker>& markers;
704 RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
705 map<uint32_t, rgw_meta_sync_marker>& markers)
706 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
707 env(env), num_shards(num_shards), markers(markers)
709 bool spawn_next() override;
712 bool RGWReadSyncStatusMarkersCR::spawn_next()
714 if (shard_id >= num_shards) {
717 using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
718 rgw_raw_obj obj{env->store->get_zone_params().log_pool,
719 env->shard_obj_name(shard_id)};
720 spawn(new CR(env->async_rados, env->store, obj, &markers[shard_id]), false);
725 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
726 RGWMetaSyncEnv *sync_env;
727 rgw_meta_sync_status *sync_status;
730 RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
731 rgw_meta_sync_status *_status)
732 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
734 int operate() override;
737 int RGWReadSyncStatusCoroutine::operate()
741 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
743 bool empty_on_enoent = false; // fail on ENOENT
744 rgw_raw_obj obj{sync_env->store->get_zone_params().log_pool,
745 sync_env->status_oid()};
746 call(new ReadInfoCR(sync_env->async_rados, sync_env->store, obj,
747 &sync_status->sync_info, empty_on_enoent));
750 ldout(sync_env->cct, 4) << "failed to read sync status info with "
751 << cpp_strerror(retcode) << dendl;
752 return set_cr_error(retcode);
754 // read shard markers
755 using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
756 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
757 sync_status->sync_markers));
759 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
760 << cpp_strerror(retcode) << dendl;
761 return set_cr_error(retcode);
763 return set_cr_done();
768 class RGWFetchAllMetaCR : public RGWCoroutine {
769 RGWMetaSyncEnv *sync_env;
776 list<string> sections;
777 list<string>::iterator sections_iter;
779 struct meta_list_result {
783 bool truncated{false};
785 void decode_json(JSONObj *obj) {
786 JSONDecoder::decode_json("keys", keys, obj);
787 JSONDecoder::decode_json("marker", marker, obj);
788 JSONDecoder::decode_json("count", count, obj);
789 JSONDecoder::decode_json("truncated", truncated, obj);
792 list<string>::iterator iter;
794 std::unique_ptr<RGWShardedOmapCRManager> entries_index;
796 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
797 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
803 map<uint32_t, rgw_meta_sync_marker>& markers;
806 RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
807 map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
808 num_shards(_num_shards),
809 ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
810 lost_lock(false), failed(false), markers(_markers) {
813 ~RGWFetchAllMetaCR() override {
816 void append_section_from_set(set<string>& all_sections, const string& name) {
817 set<string>::iterator iter = all_sections.find(name);
818 if (iter != all_sections.end()) {
819 sections.emplace_back(std::move(*iter));
820 all_sections.erase(iter);
824 * meta sync should go in the following order: user, bucket.instance, bucket
825 * then whatever other sections exist (if any)
827 void rearrange_sections() {
828 set<string> all_sections;
829 std::move(sections.begin(), sections.end(),
830 std::inserter(all_sections, all_sections.end()));
833 append_section_from_set(all_sections, "user");
834 append_section_from_set(all_sections, "bucket.instance");
835 append_section_from_set(all_sections, "bucket");
837 std::move(all_sections.begin(), all_sections.end(),
838 std::back_inserter(sections));
841 int operate() override {
842 RGWRESTConn *conn = sync_env->conn;
846 set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
847 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
848 string lock_name = "sync_lock";
849 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
851 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->status_oid()),
852 lock_name, lock_duration, this));
853 lease_stack.reset(spawn(lease_cr.get(), false));
855 while (!lease_cr->is_locked()) {
856 if (lease_cr->is_done()) {
857 ldout(cct, 5) << "lease cr failed, done early " << dendl;
858 set_status("failed acquiring lock");
859 return set_cr_error(lease_cr->get_ret_status());
864 entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
865 sync_env->store->get_zone_params().log_pool,
866 mdlog_sync_full_sync_index_prefix));
868 call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
869 "/admin/metadata", NULL, §ions));
871 if (get_ret_status() < 0) {
872 ldout(cct, 0) << "ERROR: failed to fetch metadata sections" << dendl;
873 yield entries_index->finish();
874 yield lease_cr->go_down();
876 return set_cr_error(get_ret_status());
878 rearrange_sections();
879 sections_iter = sections.begin();
880 for (; sections_iter != sections.end(); ++sections_iter) {
883 #define META_FULL_SYNC_CHUNK_SIZE "1000"
884 string entrypoint = string("/admin/metadata/") + *sections_iter;
885 rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
886 { "marker", result.marker.c_str() },
889 call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
890 entrypoint, pairs, &result));
892 if (get_ret_status() < 0) {
893 ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
894 yield entries_index->finish();
895 yield lease_cr->go_down();
897 return set_cr_error(get_ret_status());
899 iter = result.keys.begin();
900 for (; iter != result.keys.end(); ++iter) {
901 if (!lease_cr->is_locked()) {
905 yield; // allow entries_index consumer to make progress
907 ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
908 string s = *sections_iter + ":" + *iter;
910 RGWRados *store = sync_env->store;
911 int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
913 ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
917 if (!entries_index->append(s, shard_id)) {
921 } while (result.truncated);
924 if (!entries_index->finish()) {
929 for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
930 int shard_id = (int)iter->first;
931 rgw_meta_sync_marker& marker = iter->second;
932 marker.total_entries = entries_index->get_total_entries(shard_id);
933 spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store,
934 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
939 drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
941 yield lease_cr->go_down();
944 while (collect(&ret, NULL)) {
946 return set_cr_error(ret);
952 yield return set_cr_error(-EIO);
955 yield return set_cr_error(-EBUSY);
958 if (ret_status < 0) {
959 yield return set_cr_error(ret_status);
962 yield return set_cr_done();
968 static string full_sync_index_shard_oid(int shard_id)
970 char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
971 snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
975 class RGWReadRemoteMetadataCR : public RGWCoroutine {
976 RGWMetaSyncEnv *sync_env;
978 RGWRESTReadResource *http_op;
986 RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
987 const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
994 int operate() override {
995 RGWRESTConn *conn = sync_env->conn;
998 rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
1001 string p = string("/admin/metadata/") + section + "/" + key;
1003 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1005 http_op->set_user_info((void *)stack);
1007 int ret = http_op->aio_read();
1009 ldout(sync_env->cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1010 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1012 return set_cr_error(ret);
1018 int ret = http_op->wait_bl(pbl);
1021 return set_cr_error(ret);
1023 return set_cr_done();
1030 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1035 int _send_request() override {
1036 int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
1038 ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1044 RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1045 const string& _raw_key,
1046 bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1047 raw_key(_raw_key), bl(_bl) {}
1051 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1052 RGWMetaSyncEnv *sync_env;
1056 RGWAsyncMetaStoreEntry *req;
1059 RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1060 const string& _raw_key,
1061 bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1062 raw_key(_raw_key), bl(_bl), req(NULL) {
1065 ~RGWMetaStoreEntryCR() override {
1071 int send_request() override {
1072 req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1073 sync_env->store, raw_key, bl);
1074 sync_env->async_rados->queue(req);
1078 int request_complete() override {
1079 return req->get_ret_status();
1083 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1087 int _send_request() override {
1088 int ret = store->meta_mgr->remove(raw_key);
1090 ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1096 RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1097 const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1098 raw_key(_raw_key) {}
1102 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1103 RGWMetaSyncEnv *sync_env;
1106 RGWAsyncMetaRemoveEntry *req;
1109 RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1110 const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1111 raw_key(_raw_key), req(NULL) {
1114 ~RGWMetaRemoveEntryCR() override {
1120 int send_request() override {
1121 req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1122 sync_env->store, raw_key);
1123 sync_env->async_rados->queue(req);
1127 int request_complete() override {
1128 int r = req->get_ret_status();
1136 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1138 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1139 RGWMetaSyncEnv *sync_env;
1142 rgw_meta_sync_marker sync_marker;
1146 RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1147 const string& _marker_oid,
1148 const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1149 sync_env(_sync_env),
1150 marker_oid(_marker_oid),
1151 sync_marker(_marker) {}
1153 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1154 sync_marker.marker = new_marker;
1155 if (index_pos > 0) {
1156 sync_marker.pos = index_pos;
1159 if (!real_clock::is_zero(timestamp)) {
1160 sync_marker.timestamp = timestamp;
1163 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1164 RGWRados *store = sync_env->store;
1165 return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
1167 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
1172 int RGWMetaSyncSingleEntryCR::operate() {
1174 #define NUM_TRANSIENT_ERROR_RETRIES 10
1176 if (error_injection &&
1177 rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1178 ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << ": injecting meta sync error on key=" << raw_key << dendl;
1179 return set_cr_error(-EIO);
1182 if (op_status != MDLOG_STATUS_COMPLETE) {
1183 ldout(sync_env->cct, 20) << "skipping pending operation" << dendl;
1184 yield call(marker_tracker->finish(entry_marker));
1186 return set_cr_error(retcode);
1188 return set_cr_done();
1190 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1192 pos = raw_key.find(':');
1193 section = raw_key.substr(0, pos);
1194 key = raw_key.substr(pos + 1);
1195 ldout(sync_env->cct, 20) << "fetching remote metadata: " << section << ":" << key << (tries == 0 ? "" : " (retry)") << dendl;
1196 call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl));
1199 sync_status = retcode;
1201 if (sync_status == -ENOENT) {
1202 /* FIXME: do we need to remove the entry from the local zone? */
1206 if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1207 ldout(sync_env->cct, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1211 if (sync_status < 0) {
1212 ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
1213 log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1214 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1215 string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1216 return set_cr_error(sync_status);
1223 for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1224 if (sync_status != -ENOENT) {
1225 yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1227 yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1229 if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1230 ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1236 sync_status = retcode;
1238 if (sync_status == 0 && marker_tracker) {
1240 yield call(marker_tracker->finish(entry_marker));
1241 sync_status = retcode;
1243 if (sync_status < 0) {
1244 return set_cr_error(sync_status);
1246 return set_cr_done();
1251 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1252 RGWMetaSyncEnv *sync_env;
1253 RGWMetadataLog *mdlog;
1255 const std::string& period;
1258 bool truncated = false;
1261 int max_entries = CLONE_MAX_ENTRIES;
1263 RGWRESTReadResource *http_op = nullptr;
1264 boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1266 RGWMetadataLogInfo shard_info;
1267 rgw_mdlog_shard_data data;
1270 RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1271 const std::string& period, int _id,
1272 const string& _marker, string *_new_marker)
1273 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1274 period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1276 *new_marker = marker;
1279 ~RGWCloneMetaLogCoroutine() override {
1284 completion->cancel();
1288 int operate() override;
1291 int state_read_shard_status();
1292 int state_read_shard_status_complete();
1293 int state_send_rest_request();
1294 int state_receive_rest_response();
1295 int state_store_mdlog_entries();
1296 int state_store_mdlog_entries_complete();
1299 class RGWMetaSyncShardCR : public RGWCoroutine {
1300 RGWMetaSyncEnv *sync_env;
1302 const rgw_pool& pool;
1303 const std::string& period; //< currently syncing period id
1304 const epoch_t realm_epoch; //< realm_epoch of period
1305 RGWMetadataLog* mdlog; //< log of syncing period
1307 rgw_meta_sync_marker& sync_marker;
1308 boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1311 const std::string& period_marker; //< max marker stored in next period
1313 map<string, bufferlist> entries;
1314 map<string, bufferlist>::iterator iter;
1318 RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1320 list<cls_log_entry> log_entries;
1321 list<cls_log_entry>::iterator log_iter;
1322 bool truncated = false;
1324 string mdlog_marker;
1326 rgw_mdlog_entry mdlog_entry;
1331 boost::asio::coroutine incremental_cr;
1332 boost::asio::coroutine full_cr;
1334 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1335 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1337 bool lost_lock = false;
1339 bool *reset_backoff;
1341 // hold a reference to the cr stack while it's in the map
1342 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1343 map<StackRef, string> stack_to_pos;
1344 map<string, string> pos_to_prev;
1346 bool can_adjust_marker = false;
1347 bool done_with_period = false;
1349 int total_entries = 0;
1352 RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1353 const std::string& period, epoch_t realm_epoch,
1354 RGWMetadataLog* mdlog, uint32_t _shard_id,
1355 rgw_meta_sync_marker& _marker,
1356 const std::string& period_marker, bool *_reset_backoff)
1357 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1358 period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1359 shard_id(_shard_id), sync_marker(_marker),
1360 period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1361 reset_backoff(_reset_backoff) {
1362 *reset_backoff = false;
1365 ~RGWMetaSyncShardCR() override {
1366 delete marker_tracker;
1372 void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1373 delete marker_tracker;
1374 marker_tracker = mt;
1377 int operate() override {
1380 switch (sync_marker.state) {
1381 case rgw_meta_sync_marker::FullSync:
1384 ldout(sync_env->cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1385 return set_cr_error(r);
1388 case rgw_meta_sync_marker::IncrementalSync:
1389 r = incremental_sync();
1391 ldout(sync_env->cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1392 return set_cr_error(r);
1401 void collect_children()
1404 RGWCoroutinesStack *child;
1405 while (collect_next(&child_ret, &child)) {
1406 auto iter = stack_to_pos.find(child);
1407 if (iter == stack_to_pos.end()) {
1408 /* some other stack that we don't care about */
1412 string& pos = iter->second;
1414 if (child_ret < 0) {
1415 ldout(sync_env->cct, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1418 map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1419 assert(prev_iter != pos_to_prev.end());
1422 * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1423 * update the marker and abort. We'll get called again for these. Permanent errors will be
1424 * handled by marking the entry at the error log shard, so that we retry on it separately
1426 if (child_ret == -EAGAIN) {
1427 can_adjust_marker = false;
1430 if (pos_to_prev.size() == 1) {
1431 if (can_adjust_marker) {
1432 sync_marker.marker = pos;
1434 pos_to_prev.erase(prev_iter);
1436 assert(pos_to_prev.size() > 1);
1437 pos_to_prev.erase(prev_iter);
1438 prev_iter = pos_to_prev.begin();
1439 if (can_adjust_marker) {
1440 sync_marker.marker = prev_iter->second;
1444 ldout(sync_env->cct, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1445 stack_to_pos.erase(iter);
1450 #define OMAP_GET_MAX_ENTRIES 100
1451 int max_entries = OMAP_GET_MAX_ENTRIES;
1453 set_status("full_sync");
1454 oid = full_sync_index_shard_oid(shard_id);
1455 can_adjust_marker = true;
1458 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1459 string lock_name = "sync_lock";
1460 RGWRados *store = sync_env->store;
1461 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1462 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1463 lock_name, lock_duration, this));
1464 lease_stack.reset(spawn(lease_cr.get(), false));
1467 while (!lease_cr->is_locked()) {
1468 if (lease_cr->is_done()) {
1469 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1471 return lease_cr->get_ret_status();
1477 /* lock succeeded, a retry now should avoid previous backoff status */
1478 *reset_backoff = true;
1480 /* prepare marker tracker */
1481 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1482 sync_env->shard_obj_name(shard_id),
1485 marker = sync_marker.marker;
1487 total_entries = sync_marker.pos;
1491 if (!lease_cr->is_locked()) {
1495 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1496 marker, &entries, max_entries));
1498 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1499 yield lease_cr->go_down();
1503 iter = entries.begin();
1504 for (; iter != entries.end(); ++iter) {
1505 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1507 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1508 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1510 // fetch remote and write locally
1512 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
1513 // stack_to_pos holds a reference to the stack
1514 stack_to_pos[stack] = iter->first;
1515 pos_to_prev[iter->first] = marker;
1518 marker = iter->first;
1521 } while ((int)entries.size() == max_entries && can_adjust_marker);
1523 while (num_spawned() > 1) {
1524 yield wait_for_child();
1529 /* update marker to reflect we're done with full sync */
1530 if (can_adjust_marker) {
1531 // apply updates to a temporary marker, or operate() will send us
1532 // to incremental_sync() after we yield
1533 temp_marker = sync_marker;
1534 temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1535 temp_marker->marker = std::move(temp_marker->next_step_marker);
1536 temp_marker->next_step_marker.clear();
1537 temp_marker->realm_epoch = realm_epoch;
1538 ldout(sync_env->cct, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1540 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1541 yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store,
1542 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1547 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1548 yield lease_cr->go_down();
1555 * if we reached here, it means that lost_lock is true, otherwise the state
1556 * change in the previous block will prevent us from reaching here
1559 yield lease_cr->go_down();
1565 if (!can_adjust_marker) {
1573 // apply the sync marker update
1574 assert(temp_marker);
1575 sync_marker = std::move(*temp_marker);
1576 temp_marker = boost::none;
1577 // must not yield after this point!
1583 int incremental_sync() {
1584 reenter(&incremental_cr) {
1585 set_status("incremental_sync");
1586 can_adjust_marker = true;
1588 if (!lease_cr) { /* could have had a lease_cr lock from previous state */
1590 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1591 string lock_name = "sync_lock";
1592 RGWRados *store = sync_env->store;
1593 lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1594 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1595 lock_name, lock_duration, this));
1596 lease_stack.reset(spawn(lease_cr.get(), false));
1599 while (!lease_cr->is_locked()) {
1600 if (lease_cr->is_done()) {
1601 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1603 return lease_cr->get_ret_status();
1609 // if the period has advanced, we can't use the existing marker
1610 if (sync_marker.realm_epoch < realm_epoch) {
1611 ldout(sync_env->cct, 4) << "clearing marker=" << sync_marker.marker
1612 << " from old realm_epoch=" << sync_marker.realm_epoch
1613 << " (now " << realm_epoch << ')' << dendl;
1614 sync_marker.realm_epoch = realm_epoch;
1615 sync_marker.marker.clear();
1617 mdlog_marker = sync_marker.marker;
1618 set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1619 sync_env->shard_obj_name(shard_id),
1623 * mdlog_marker: the remote sync marker positiion
1624 * sync_marker: the local sync marker position
1625 * max_marker: the max mdlog position that we fetched
1626 * marker: the current position we try to sync
1627 * period_marker: the last marker before the next period begins (optional)
1629 marker = max_marker = sync_marker.marker;
1632 if (!lease_cr->is_locked()) {
1636 #define INCREMENTAL_MAX_ENTRIES 100
1637 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1638 if (!period_marker.empty() && period_marker <= mdlog_marker) {
1639 ldout(cct, 10) << "mdlog_marker past period_marker=" << period_marker << dendl;
1640 done_with_period = true;
1643 if (mdlog_marker <= max_marker) {
1644 /* we're at the tip, try to bring more entries */
1645 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1646 yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1648 mdlog_marker, &mdlog_marker));
1651 ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
1652 yield lease_cr->go_down();
1654 *reset_backoff = false; // back off and try again later
1657 *reset_backoff = true; /* if we got to this point, all systems function */
1658 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1659 if (mdlog_marker > max_marker) {
1660 marker = max_marker;
1661 yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1662 &max_marker, INCREMENTAL_MAX_ENTRIES,
1663 &log_entries, &truncated));
1665 ldout(sync_env->cct, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode << dendl;
1666 yield lease_cr->go_down();
1668 *reset_backoff = false; // back off and try again later
1671 for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1672 if (!period_marker.empty() && period_marker <= log_iter->id) {
1673 done_with_period = true;
1674 if (period_marker < log_iter->id) {
1675 ldout(cct, 10) << "found key=" << log_iter->id
1676 << " past period_marker=" << period_marker << dendl;
1679 ldout(cct, 10) << "found key at period_marker=" << period_marker << dendl;
1680 // sync this entry, then return control to RGWMetaSyncCR
1682 if (!mdlog_entry.convert_from(*log_iter)) {
1683 ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
1686 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
1687 if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1688 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1690 raw_key = log_iter->section + ":" + log_iter->name;
1692 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
1694 // stack_to_pos holds a reference to the stack
1695 stack_to_pos[stack] = log_iter->id;
1696 pos_to_prev[log_iter->id] = marker;
1699 marker = log_iter->id;
1703 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1704 if (done_with_period) {
1705 // return control to RGWMetaSyncCR and advance to the next period
1706 ldout(sync_env->cct, 10) << *this << ": done with period" << dendl;
1709 if (mdlog_marker == max_marker && can_adjust_marker) {
1710 #define INCREMENTAL_INTERVAL 20
1711 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1713 } while (can_adjust_marker);
1715 while (num_spawned() > 1) {
1716 yield wait_for_child();
1720 yield lease_cr->go_down();
1728 if (!can_adjust_marker) {
1732 return set_cr_done();
1739 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1741 RGWMetaSyncEnv *sync_env;
1743 const rgw_pool& pool;
1744 const std::string& period;
1745 epoch_t realm_epoch;
1746 RGWMetadataLog* mdlog;
1748 rgw_meta_sync_marker sync_marker;
1749 const std::string period_marker;
1751 static constexpr bool exit_on_error = false; // retry on all errors
1753 RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1754 const std::string& period, epoch_t realm_epoch,
1755 RGWMetadataLog* mdlog, uint32_t _shard_id,
1756 const rgw_meta_sync_marker& _marker,
1757 std::string&& period_marker)
1758 : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1759 pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1760 shard_id(_shard_id), sync_marker(_marker),
1761 period_marker(std::move(period_marker)) {}
1763 RGWCoroutine *alloc_cr() override {
1764 return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1765 shard_id, sync_marker, period_marker, backoff_ptr());
1768 RGWCoroutine *alloc_finisher_cr() override {
1769 RGWRados *store = sync_env->store;
1770 return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store,
1771 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1776 class RGWMetaSyncCR : public RGWCoroutine {
1777 RGWMetaSyncEnv *sync_env;
1778 const rgw_pool& pool;
1779 RGWPeriodHistory::Cursor cursor; //< sync position in period history
1780 RGWPeriodHistory::Cursor next; //< next period in history
1781 rgw_meta_sync_status sync_status;
1783 std::mutex mutex; //< protect access to shard_crs
1785 // TODO: it should be enough to hold a reference on the stack only, as calling
1786 // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1787 // already completed
1788 using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1789 using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1790 using RefPair = std::pair<ControlCRRef, StackRef>;
1791 map<int, RefPair> shard_crs;
1795 RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
1796 const rgw_meta_sync_status& _sync_status)
1797 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1798 pool(sync_env->store->get_zone_params().log_pool),
1799 cursor(cursor), sync_status(_sync_status) {}
1801 int operate() override {
1803 // loop through one period at a time
1805 if (cursor == sync_env->store->period_history->get_current()) {
1806 next = RGWPeriodHistory::Cursor{};
1808 ldout(cct, 10) << "RGWMetaSyncCR on current period="
1809 << cursor.get_period().get_id() << dendl;
1811 ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
1816 ldout(cct, 10) << "RGWMetaSyncCR on period="
1817 << cursor.get_period().get_id() << ", next="
1818 << next.get_period().get_id() << dendl;
1822 // get the mdlog for the current period (may be empty)
1823 auto& period_id = sync_status.sync_info.period;
1824 auto realm_epoch = sync_status.sync_info.realm_epoch;
1825 auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
1827 // prevent wakeup() from accessing shard_crs while we're spawning them
1828 std::lock_guard<std::mutex> lock(mutex);
1830 // sync this period on each shard
1831 for (const auto& m : sync_status.sync_markers) {
1832 uint32_t shard_id = m.first;
1833 auto& marker = m.second;
1835 std::string period_marker;
1837 // read the maximum marker from the next period's sync status
1838 period_marker = next.get_period().get_sync_status()[shard_id];
1839 if (period_marker.empty()) {
1840 // no metadata changes have occurred on this shard, skip it
1841 ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1842 << " with empty period marker" << dendl;
1847 using ShardCR = RGWMetaSyncShardControlCR;
1848 auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1849 mdlog, shard_id, marker,
1850 std::move(period_marker));
1851 auto stack = spawn(cr, false);
1852 shard_crs[shard_id] = RefPair{cr, stack};
1855 // wait for each shard to complete
1856 while (ret == 0 && num_spawned() > 0) {
1857 yield wait_for_child();
1858 collect(&ret, nullptr);
1862 // drop shard cr refs under lock
1863 std::lock_guard<std::mutex> lock(mutex);
1867 return set_cr_error(ret);
1869 // advance to the next period
1873 // write the updated sync info
1874 sync_status.sync_info.period = cursor.get_period().get_id();
1875 sync_status.sync_info.realm_epoch = cursor.get_epoch();
1876 yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
1878 rgw_raw_obj(pool, sync_env->status_oid()),
1879 sync_status.sync_info));
1885 void wakeup(int shard_id) {
1886 std::lock_guard<std::mutex> lock(mutex);
1887 auto iter = shard_crs.find(shard_id);
1888 if (iter == shard_crs.end()) {
1891 iter->second.first->wakeup();
1895 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
1896 env->cct = store->ctx();
1899 env->async_rados = async_rados;
1900 env->http_manager = &http_manager;
1901 env->error_logger = error_logger;
1904 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
1906 if (store->is_meta_master()) {
1909 // cannot run concurrently with run_sync(), so run in a separate manager
1910 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
1911 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
1912 int ret = http_manager.set_threaded();
1914 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
1917 RGWMetaSyncEnv sync_env_local = sync_env;
1918 sync_env_local.http_manager = &http_manager;
1919 ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
1920 http_manager.stop();
1924 int RGWRemoteMetaLog::init_sync_status()
1926 if (store->is_meta_master()) {
1930 rgw_mdlog_info mdlog_info;
1931 int r = read_log_info(&mdlog_info);
1933 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
1937 rgw_meta_sync_info sync_info;
1938 sync_info.num_shards = mdlog_info.num_shards;
1939 auto cursor = store->period_history->get_current();
1941 sync_info.period = cursor.get_period().get_id();
1942 sync_info.realm_epoch = cursor.get_epoch();
1945 return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
1948 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
1950 return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store,
1951 rgw_raw_obj(store->get_zone_params().log_pool, sync_env.status_oid()),
1955 // return a cursor to the period at our sync position
1956 static RGWPeriodHistory::Cursor get_period_at(RGWRados* store,
1957 const rgw_meta_sync_info& info)
1959 if (info.period.empty()) {
1960 // return an empty cursor with error=0
1961 return RGWPeriodHistory::Cursor{};
1964 // look for an existing period in our history
1965 auto cursor = store->period_history->lookup(info.realm_epoch);
1967 // verify that the period ids match
1968 auto& existing = cursor.get_period().get_id();
1969 if (existing != info.period) {
1970 lderr(store->ctx()) << "ERROR: sync status period=" << info.period
1971 << " does not match period=" << existing
1972 << " in history at realm epoch=" << info.realm_epoch << dendl;
1973 return RGWPeriodHistory::Cursor{-EEXIST};
1978 // read the period from rados or pull it from the master
1980 int r = store->period_puller->pull(info.period, period);
1982 lderr(store->ctx()) << "ERROR: failed to read period id "
1983 << info.period << ": " << cpp_strerror(r) << dendl;
1984 return RGWPeriodHistory::Cursor{r};
1986 // attach the period to our history
1987 cursor = store->period_history->attach(std::move(period));
1989 r = cursor.get_error();
1990 lderr(store->ctx()) << "ERROR: failed to read period history back to "
1991 << info.period << ": " << cpp_strerror(r) << dendl;
1996 int RGWRemoteMetaLog::run_sync()
1998 if (store->is_meta_master()) {
2004 // get shard count and oldest log period from master
2005 rgw_mdlog_info mdlog_info;
2008 ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2011 r = read_log_info(&mdlog_info);
2012 if (r == -EIO || r == -ENOENT) {
2013 // keep retrying if master isn't alive or hasn't initialized the log
2014 ldout(store->ctx(), 10) << __func__ << "(): waiting for master.." << dendl;
2015 backoff.backoff_sleep();
2020 lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2026 rgw_meta_sync_status sync_status;
2029 ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2032 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2033 if (r < 0 && r != -ENOENT) {
2034 ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2038 if (!mdlog_info.period.empty()) {
2039 // restart sync if the remote has a period, but:
2040 // a) our status does not, or
2041 // b) our sync period comes before the remote's oldest log period
2042 if (sync_status.sync_info.period.empty() ||
2043 sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2044 sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2045 ldout(store->ctx(), 1) << "epoch=" << sync_status.sync_info.realm_epoch
2046 << " in sync status comes before remote's oldest mdlog epoch="
2047 << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2051 if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2052 ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
2053 sync_status.sync_info.num_shards = mdlog_info.num_shards;
2054 auto cursor = store->period_history->get_current();
2056 // run full sync, then start incremental from the current period/epoch
2057 sync_status.sync_info.period = cursor.get_period().get_id();
2058 sync_status.sync_info.realm_epoch = cursor.get_epoch();
2060 r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2062 backoff.backoff_sleep();
2067 ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl;
2071 } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2073 auto num_shards = sync_status.sync_info.num_shards;
2074 if (num_shards != mdlog_info.num_shards) {
2075 lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2079 RGWPeriodHistory::Cursor cursor;
2081 r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2082 if (r < 0 && r != -ENOENT) {
2083 ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2087 switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2088 case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2089 ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
2090 r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers));
2091 if (r == -EBUSY || r == -EAGAIN) {
2092 backoff.backoff_sleep();
2097 ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2101 sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2102 r = store_sync_info(sync_status.sync_info);
2104 ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
2108 case rgw_meta_sync_info::StateSync:
2109 ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
2110 // find our position in the period history (if any)
2111 cursor = get_period_at(store, sync_status.sync_info);
2112 r = cursor.get_error();
2116 meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status);
2117 r = run(meta_sync_cr);
2119 ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2124 ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
2127 } while (!going_down);
2132 void RGWRemoteMetaLog::wakeup(int shard_id)
2134 if (!meta_sync_cr) {
2137 meta_sync_cr->wakeup(shard_id);
2140 int RGWCloneMetaLogCoroutine::operate()
2145 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2146 return state_init();
2149 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2150 return state_read_shard_status();
2153 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2154 return state_read_shard_status_complete();
2157 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2158 return state_send_rest_request();
2161 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2162 return state_receive_rest_response();
2165 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2166 return state_store_mdlog_entries();
2168 } while (truncated);
2170 ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2171 return state_store_mdlog_entries_complete();
2178 int RGWCloneMetaLogCoroutine::state_init()
2180 data = rgw_mdlog_shard_data();
2185 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2187 const bool add_ref = false; // default constructs with refs=1
2189 completion.reset(new RGWMetadataLogInfoCompletion(
2190 [this](int ret, const cls_log_header& header) {
2192 ldout(cct, 1) << "ERROR: failed to read mdlog info with "
2193 << cpp_strerror(ret) << dendl;
2195 shard_info.marker = header.max_marker;
2196 shard_info.last_update = header.max_time.to_real_time();
2198 // wake up parent stack
2199 stack->get_completion_mgr()->complete(nullptr, stack);
2202 int ret = mdlog->get_info_async(shard_id, completion.get());
2204 ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2205 return set_cr_error(ret);
2211 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2215 ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2217 marker = shard_info.marker;
2222 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2224 RGWRESTConn *conn = sync_env->conn;
2227 snprintf(buf, sizeof(buf), "%d", shard_id);
2229 char max_entries_buf[32];
2230 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2232 const char *marker_key = (marker.empty() ? "" : "marker");
2234 rgw_http_param_pair pairs[] = { { "type", "metadata" },
2236 { "period", period.c_str() },
2237 { "max-entries", max_entries_buf },
2238 { marker_key, marker.c_str() },
2241 http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2243 http_op->set_user_info((void *)stack);
2245 int ret = http_op->aio_read();
2247 ldout(cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2248 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2257 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2259 int ret = http_op->wait(&data);
2261 error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2262 ldout(cct, 5) << "failed to wait for op, ret=" << ret << dendl;
2265 return set_cr_error(ret);
2270 ldout(cct, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2272 truncated = ((int)data.entries.size() == max_entries);
2274 if (data.entries.empty()) {
2276 *new_marker = marker;
2278 return set_cr_done();
2282 *new_marker = data.entries.back().id;
2289 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2291 list<cls_log_entry> dest_entries;
2293 vector<rgw_mdlog_entry>::iterator iter;
2294 for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2295 rgw_mdlog_entry& entry = *iter;
2296 ldout(cct, 20) << "entry: name=" << entry.name << dendl;
2298 cls_log_entry dest_entry;
2299 dest_entry.id = entry.id;
2300 dest_entry.section = entry.section;
2301 dest_entry.name = entry.name;
2302 dest_entry.timestamp = utime_t(entry.timestamp);
2304 ::encode(entry.log_data, dest_entry.data);
2306 dest_entries.push_back(dest_entry);
2311 RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2313 int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2316 ldout(cct, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2317 return set_cr_error(ret);
2322 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2324 return set_cr_done();
2328 // TODO: move into rgw_sync_trim.cc
2330 #define dout_prefix (*_dout << "meta trim: ")
2332 /// purge all log shards for the given mdlog
2333 class PurgeLogShardsCR : public RGWShardCollectCR {
2334 RGWRados *const store;
2335 const RGWMetadataLog* mdlog;
2336 const int num_shards;
2340 static constexpr int max_concurrent = 16;
2343 PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
2344 const rgw_pool& pool, int num_shards)
2345 : RGWShardCollectCR(store->ctx(), max_concurrent),
2346 store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
2349 bool spawn_next() override {
2350 if (i == num_shards) {
2353 mdlog->get_shard_oid(i++, obj.oid);
2354 spawn(new RGWRadosRemoveCR(store, obj), false);
2359 using Cursor = RGWPeriodHistory::Cursor;
2361 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2362 class PurgePeriodLogsCR : public RGWCoroutine {
2363 RGWRados *const store;
2364 RGWMetadataManager *const metadata;
2365 RGWObjVersionTracker objv;
2367 epoch_t realm_epoch;
2368 epoch_t *last_trim_epoch; //< update last trim on success
2371 PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
2372 : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
2373 realm_epoch(realm_epoch), last_trim_epoch(last_trim)
2379 int PurgePeriodLogsCR::operate()
2382 // read our current oldest log period
2383 yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
2385 return set_cr_error(retcode);
2388 ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
2389 << " period=" << cursor.get_period().get_id() << dendl;
2391 // trim -up to- the given realm_epoch
2392 while (cursor.get_epoch() < realm_epoch) {
2393 ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
2394 << " period=" << cursor.get_period().get_id() << dendl;
2396 const auto mdlog = metadata->get_log(cursor.get_period().get_id());
2397 const auto& pool = store->get_zone_params().log_pool;
2398 auto num_shards = cct->_conf->rgw_md_log_max_shards;
2399 call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
2402 ldout(cct, 1) << "failed to remove log shards: "
2403 << cpp_strerror(retcode) << dendl;
2404 return set_cr_error(retcode);
2406 ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
2407 << " period=" << cursor.get_period().get_id() << dendl;
2409 // update our mdlog history
2410 yield call(metadata->trim_log_period_cr(cursor, &objv));
2411 if (retcode == -ENOENT) {
2412 // must have raced to update mdlog history. return success and allow the
2413 // winner to continue purging
2414 ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
2415 << " period=" << cursor.get_period().get_id() << dendl;
2416 return set_cr_done();
2417 } else if (retcode < 0) {
2418 ldout(cct, 1) << "failed to remove log shards for realm_epoch="
2419 << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
2420 << " with: " << cpp_strerror(retcode) << dendl;
2421 return set_cr_error(retcode);
2424 if (*last_trim_epoch < cursor.get_epoch()) {
2425 *last_trim_epoch = cursor.get_epoch();
2428 assert(cursor.has_next()); // get_current() should always come after
2431 return set_cr_done();
2438 using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
2440 /// construct a RGWRESTConn for each zone in the realm
2441 template <typename Zonegroups>
2442 connection_map make_peer_connections(RGWRados *store,
2443 const Zonegroups& zonegroups)
2445 connection_map connections;
2446 for (auto& g : zonegroups) {
2447 for (auto& z : g.second.zones) {
2448 std::unique_ptr<RGWRESTConn> conn{
2449 new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
2450 connections.emplace(z.first, std::move(conn));
2456 /// return the marker that it's safe to trim up to
2457 const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
2459 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2462 /// comparison operator for take_min_status()
2463 bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
2465 // sort by stable marker
2466 return get_stable_marker(lhs) < get_stable_marker(rhs);
2469 /// populate the status with the minimum stable marker of each shard for any
2470 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2471 template <typename Iter>
2472 int take_min_status(CephContext *cct, Iter first, Iter last,
2473 rgw_meta_sync_status *status)
2475 if (first == last) {
2478 const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
2480 status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
2481 for (auto p = first; p != last; ++p) {
2482 // validate peer's shard count
2483 if (p->sync_markers.size() != num_shards) {
2484 ldout(cct, 1) << "take_min_status got peer status with "
2485 << p->sync_markers.size() << " shards, expected "
2486 << num_shards << dendl;
2489 if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
2490 // earlier epoch, take its entire status
2491 *status = std::move(*p);
2492 } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
2493 // same epoch, take any earlier markers
2494 auto m = status->sync_markers.begin();
2495 for (auto& shard : p->sync_markers) {
2496 if (shard.second < m->second) {
2497 m->second = std::move(shard.second);
2507 RGWRados *const store;
2508 RGWHTTPManager *const http;
2510 const std::string& zone;
2511 Cursor current; //< cursor to current period
2512 epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
2514 TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2515 : store(store), http(http), num_shards(num_shards),
2516 zone(store->get_zone_params().get_id()),
2517 current(store->period_history->get_current())
2521 struct MasterTrimEnv : public TrimEnv {
2522 connection_map connections; //< peer connections
2523 std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
2524 /// last trim marker for each shard, only applies to current period's mdlog
2525 std::vector<std::string> last_trim_markers;
2527 MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2528 : TrimEnv(store, http, num_shards),
2529 last_trim_markers(num_shards)
2531 auto& period = current.get_period();
2532 connections = make_peer_connections(store, period.get_map().zonegroups);
2533 connections.erase(zone);
2534 peer_status.resize(connections.size());
2538 struct PeerTrimEnv : public TrimEnv {
2539 /// last trim timestamp for each shard, only applies to current period's mdlog
2540 std::vector<ceph::real_time> last_trim_timestamps;
2542 PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2543 : TrimEnv(store, http, num_shards),
2544 last_trim_timestamps(num_shards)
2547 void set_num_shards(int num_shards) {
2548 this->num_shards = num_shards;
2549 last_trim_timestamps.resize(num_shards);
2553 } // anonymous namespace
2556 /// spawn a trim cr for each shard that needs it, while limiting the number
2557 /// of concurrent shards
2558 class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
2560 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2563 RGWMetadataLog *mdlog;
2566 const rgw_meta_sync_status& sync_status;
2569 MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
2570 const rgw_meta_sync_status& sync_status)
2571 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2572 env(env), mdlog(mdlog), sync_status(sync_status)
2575 bool spawn_next() override;
2578 bool MetaMasterTrimShardCollectCR::spawn_next()
2580 while (shard_id < env.num_shards) {
2581 auto m = sync_status.sync_markers.find(shard_id);
2582 if (m == sync_status.sync_markers.end()) {
2586 auto& stable = get_stable_marker(m->second);
2587 auto& last_trim = env.last_trim_markers[shard_id];
2589 if (stable <= last_trim) {
2591 ldout(cct, 20) << "skipping log shard " << shard_id
2592 << " at marker=" << stable
2593 << " last_trim=" << last_trim
2594 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2599 mdlog->get_shard_oid(shard_id, oid);
2601 ldout(cct, 10) << "trimming log shard " << shard_id
2602 << " at marker=" << stable
2603 << " last_trim=" << last_trim
2604 << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2605 spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
2612 /// spawn rest requests to read each peer's sync status
2613 class MetaMasterStatusCollectCR : public RGWShardCollectCR {
2614 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2617 connection_map::iterator c;
2618 std::vector<rgw_meta_sync_status>::iterator s;
2620 MetaMasterStatusCollectCR(MasterTrimEnv& env)
2621 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2622 env(env), c(env.connections.begin()), s(env.peer_status.begin())
2625 bool spawn_next() override {
2626 if (c == env.connections.end()) {
2629 static rgw_http_param_pair params[] = {
2630 { "type", "metadata" },
2631 { "status", nullptr },
2632 { nullptr, nullptr }
2635 ldout(cct, 20) << "query sync status from " << c->first << dendl;
2636 auto conn = c->second.get();
2637 using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
2638 spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
2646 class MetaMasterTrimCR : public RGWCoroutine {
2648 rgw_meta_sync_status min_status; //< minimum sync status of all peers
2652 MetaMasterTrimCR(MasterTrimEnv& env)
2653 : RGWCoroutine(env.store->ctx()), env(env)
2659 int MetaMasterTrimCR::operate()
2662 // TODO: detect this and fail before we spawn the trim thread?
2663 if (env.connections.empty()) {
2664 ldout(cct, 4) << "no peers, exiting" << dendl;
2665 return set_cr_done();
2668 ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
2669 // query mdlog sync status from peers
2670 yield call(new MetaMasterStatusCollectCR(env));
2672 // must get a successful reply from all peers to consider trimming
2674 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
2675 return set_cr_error(ret);
2678 // determine the minimum epoch and markers
2679 ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
2680 env.peer_status.end(), &min_status);
2682 ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
2683 return set_cr_error(ret);
2686 auto store = env.store;
2687 auto epoch = min_status.sync_info.realm_epoch;
2688 ldout(cct, 4) << "realm epoch min=" << epoch
2689 << " current=" << env.current.get_epoch()<< dendl;
2690 if (epoch > env.last_trim_epoch + 1) {
2691 // delete any prior mdlog periods
2692 spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
2694 ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
2695 << env.last_trim_epoch << dendl;
2698 // if realm_epoch == current, trim mdlog based on markers
2699 if (epoch == env.current.get_epoch()) {
2700 auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
2701 spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
2704 // ignore any errors during purge/trim because we want to hold the lock open
2705 return set_cr_done();
2711 /// read the first entry of the master's mdlog shard and trim to that position
2712 class MetaPeerTrimShardCR : public RGWCoroutine {
2713 RGWMetaSyncEnv& env;
2714 RGWMetadataLog *mdlog;
2715 const std::string& period_id;
2717 RGWMetadataLogInfo info;
2718 ceph::real_time stable; //< safe timestamp to trim, according to master
2719 ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
2720 rgw_mdlog_shard_data result; //< result from master's mdlog listing
2723 MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
2724 const std::string& period_id, int shard_id,
2725 ceph::real_time *last_trim)
2726 : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
2727 period_id(period_id), shard_id(shard_id), last_trim(last_trim)
2730 int operate() override;
2733 int MetaPeerTrimShardCR::operate()
2736 // query master's first mdlog entry for this shard
2737 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2740 ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2741 << shard_id << " for period " << period_id
2742 << ": " << cpp_strerror(retcode) << dendl;
2743 return set_cr_error(retcode);
2745 if (result.entries.empty()) {
2746 // if there are no mdlog entries, we don't have a timestamp to compare. we
2747 // can't just trim everything, because there could be racing updates since
2748 // this empty reply. query the mdlog shard info to read its max timestamp,
2749 // then retry the listing to make sure it's still empty before trimming to
2751 ldout(cct, 10) << "empty master mdlog shard " << shard_id
2752 << ", reading last timestamp from shard info" << dendl;
2753 // read the mdlog shard info for the last timestamp
2754 using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
2755 yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
2757 ldout(cct, 5) << "failed to read info from master's mdlog shard "
2758 << shard_id << " for period " << period_id
2759 << ": " << cpp_strerror(retcode) << dendl;
2760 return set_cr_error(retcode);
2762 if (ceph::real_clock::is_zero(info.last_update)) {
2763 return set_cr_done(); // nothing to trim
2765 ldout(cct, 10) << "got mdlog shard info with last update="
2766 << info.last_update << dendl;
2767 // re-read the master's first mdlog entry to make sure it hasn't changed
2768 yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2771 ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2772 << shard_id << " for period " << period_id
2773 << ": " << cpp_strerror(retcode) << dendl;
2774 return set_cr_error(retcode);
2776 // if the mdlog is still empty, trim to max marker
2777 if (result.entries.empty()) {
2778 stable = info.last_update;
2780 stable = result.entries.front().timestamp;
2782 // can only trim -up to- master's first timestamp, so subtract a second.
2783 // (this is why we use timestamps instead of markers for the peers)
2784 stable -= std::chrono::seconds(1);
2787 stable = result.entries.front().timestamp;
2788 stable -= std::chrono::seconds(1);
2791 if (stable <= *last_trim) {
2792 ldout(cct, 10) << "skipping log shard " << shard_id
2793 << " at timestamp=" << stable
2794 << " last_trim=" << *last_trim << dendl;
2795 return set_cr_done();
2798 ldout(cct, 10) << "trimming log shard " << shard_id
2799 << " at timestamp=" << stable
2800 << " last_trim=" << *last_trim << dendl;
2803 mdlog->get_shard_oid(shard_id, oid);
2804 call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
2806 if (retcode < 0 && retcode != -ENODATA) {
2807 ldout(cct, 1) << "failed to trim mdlog shard " << shard_id
2808 << ": " << cpp_strerror(retcode) << dendl;
2809 return set_cr_error(retcode);
2811 *last_trim = stable;
2812 return set_cr_done();
2817 class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
2818 static constexpr int MAX_CONCURRENT_SHARDS = 16;
2821 RGWMetadataLog *mdlog;
2822 const std::string& period_id;
2823 RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
2827 MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
2828 : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2829 env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
2831 meta_env.init(cct, env.store, env.store->rest_master_conn,
2832 env.store->get_async_rados(), env.http, nullptr);
2835 bool spawn_next() override;
2838 bool MetaPeerTrimShardCollectCR::spawn_next()
2840 if (shard_id >= env.num_shards) {
2843 auto& last_trim = env.last_trim_timestamps[shard_id];
2844 spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
2850 class MetaPeerTrimCR : public RGWCoroutine {
2852 rgw_mdlog_info mdlog_info; //< master's mdlog info
2855 MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
2860 int MetaPeerTrimCR::operate()
2863 ldout(cct, 10) << "fetching master mdlog info" << dendl;
2865 // query mdlog_info from master for oldest_log_period
2866 rgw_http_param_pair params[] = {
2867 { "type", "metadata" },
2868 { nullptr, nullptr }
2871 using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
2872 call(new LogInfoCR(cct, env.store->rest_master_conn, env.http,
2873 "/admin/log/", params, &mdlog_info));
2876 ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
2877 return set_cr_error(retcode);
2879 // use master's shard count instead
2880 env.set_num_shards(mdlog_info.num_shards);
2882 if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
2883 // delete any prior mdlog periods
2884 yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
2885 &env.last_trim_epoch));
2887 ldout(cct, 10) << "mdlogs already purged through realm_epoch "
2888 << env.last_trim_epoch << dendl;
2891 // if realm_epoch == current, trim mdlog based on master's markers
2892 if (mdlog_info.realm_epoch == env.current.get_epoch()) {
2894 auto meta_mgr = env.store->meta_mgr;
2895 auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
2896 call(new MetaPeerTrimShardCollectCR(env, mdlog));
2897 // ignore any errors during purge/trim because we want to hold the lock open
2900 return set_cr_done();
2905 class MetaTrimPollCR : public RGWCoroutine {
2906 RGWRados *const store;
2907 const utime_t interval; //< polling interval
2908 const rgw_raw_obj obj;
2909 const std::string name{"meta_trim"}; //< lock name
2910 const std::string cookie;
2913 /// allocate the coroutine to run within the lease
2914 virtual RGWCoroutine* alloc_cr() = 0;
2917 MetaTrimPollCR(RGWRados *store, utime_t interval)
2918 : RGWCoroutine(store->ctx()), store(store), interval(interval),
2919 obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
2920 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
2926 int MetaTrimPollCR::operate()
2930 set_status("sleeping");
2933 // prevent others from trimming for our entire wait interval
2934 set_status("acquiring trim lock");
2935 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
2936 obj, name, cookie, interval.sec()));
2938 ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
2942 set_status("trimming");
2943 yield call(alloc_cr());
2946 // on errors, unlock so other gateways can try
2947 set_status("unlocking");
2948 yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
2949 obj, name, cookie));
2956 class MetaMasterTrimPollCR : public MetaTrimPollCR {
2957 MasterTrimEnv env; //< trim state to share between calls
2958 RGWCoroutine* alloc_cr() override {
2959 return new MetaMasterTrimCR(env);
2962 MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2963 int num_shards, utime_t interval)
2964 : MetaTrimPollCR(store, interval),
2965 env(store, http, num_shards)
2969 class MetaPeerTrimPollCR : public MetaTrimPollCR {
2970 PeerTrimEnv env; //< trim state to share between calls
2971 RGWCoroutine* alloc_cr() override {
2972 return new MetaPeerTrimCR(env);
2975 MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2976 int num_shards, utime_t interval)
2977 : MetaTrimPollCR(store, interval),
2978 env(store, http, num_shards)
2982 RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
2983 int num_shards, utime_t interval)
2985 if (store->is_meta_master()) {
2986 return new MetaMasterTrimPollCR(store, http, num_shards, interval);
2988 return new MetaPeerTrimPollCR(store, http, num_shards, interval);
2992 struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
2993 MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
2994 : MasterTrimEnv(store, http, num_shards),
2995 MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
2999 struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
3000 MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
3001 : PeerTrimEnv(store, http, num_shards),
3002 MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
3006 RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
3007 RGWHTTPManager *http,
3010 if (store->is_meta_master()) {
3011 return new MetaMasterAdminTrimCR(store, http, num_shards);
3013 return new MetaPeerAdminTrimCR(store, http, num_shards);