1 #include <boost/utility/string_ref.hpp>
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
22 #include "cls/lock/cls_lock_client.h"
24 #include "auth/Crypto.h"
26 #include <boost/asio/yield.hpp>
28 #define dout_subsys ceph_subsys_rgw
31 #define dout_prefix (*_dout << "data sync: ")
33 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
34 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
35 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
36 static string bucket_status_oid_prefix = "bucket.sync-status";
38 class RGWSyncDebugLogger {
45 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
46 const string& sync_type, const string& sync_stage,
47 const string& resource, bool log_start = true) {
48 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
50 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
51 ~RGWSyncDebugLogger();
53 void init(CephContext *_cct, const string& source_zone,
54 const string& sync_type, const string& sync_stage,
55 const string& resource, bool log_start = true);
56 void log(const string& state);
57 void finish(int status);
60 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
61 const string& sync_type, const string& sync_section,
62 const string& resource, bool log_start)
66 string zone_str = source_zone.substr(0, 8);
67 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
73 RGWSyncDebugLogger::~RGWSyncDebugLogger()
80 void RGWSyncDebugLogger::log(const string& state)
82 ldout(cct, 5) << prefix << ":" << state << dendl;
85 void RGWSyncDebugLogger::finish(int status)
88 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
91 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
93 RGWDataSyncDebugLogger() {}
94 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
95 const string& resource, bool log_start = true) {
96 init(sync_env, sync_section, resource, log_start);
98 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
99 const string& resource, bool log_start = true) {
100 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
105 void rgw_datalog_info::decode_json(JSONObj *obj) {
106 JSONDecoder::decode_json("num_objects", num_shards, obj);
109 void rgw_datalog_entry::decode_json(JSONObj *obj) {
110 JSONDecoder::decode_json("key", key, obj);
112 JSONDecoder::decode_json("timestamp", ut, obj);
113 timestamp = ut.to_real_time();
116 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
117 JSONDecoder::decode_json("marker", marker, obj);
118 JSONDecoder::decode_json("truncated", truncated, obj);
119 JSONDecoder::decode_json("entries", entries, obj);
122 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
123 static constexpr int MAX_CONCURRENT_SHARDS = 16;
126 const int num_shards;
129 map<uint32_t, rgw_data_sync_marker>& markers;
132 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
133 map<uint32_t, rgw_data_sync_marker>& markers)
134 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
135 env(env), num_shards(num_shards), markers(markers)
137 bool spawn_next() override;
140 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
142 if (shard_id >= num_shards) {
145 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
146 spawn(new CR(env->async_rados, env->store,
147 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
154 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
155 RGWDataSyncEnv *sync_env;
156 rgw_data_sync_status *sync_status;
159 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
160 rgw_data_sync_status *_status)
161 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
163 int operate() override;
166 int RGWReadDataSyncStatusCoroutine::operate()
170 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
172 bool empty_on_enoent = false; // fail on ENOENT
173 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
174 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
175 &sync_status->sync_info, empty_on_enoent));
178 ldout(sync_env->cct, 4) << "failed to read sync status info with "
179 << cpp_strerror(retcode) << dendl;
180 return set_cr_error(retcode);
182 // read shard markers
183 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
184 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
185 sync_status->sync_markers));
187 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
188 << cpp_strerror(retcode) << dendl;
189 return set_cr_error(retcode);
191 return set_cr_done();
196 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
197 RGWDataSyncEnv *sync_env;
199 RGWRESTReadResource *http_op;
202 RGWDataChangesLogInfo *shard_info;
205 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
206 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
210 shard_info(_shard_info) {
213 ~RGWReadRemoteDataLogShardInfoCR() override {
219 int operate() override {
223 snprintf(buf, sizeof(buf), "%d", shard_id);
224 rgw_http_param_pair pairs[] = { { "type" , "data" },
229 string p = "/admin/log/";
231 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
233 http_op->set_user_info((void *)stack);
235 int ret = http_op->aio_read();
237 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
238 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
239 return set_cr_error(ret);
245 int ret = http_op->wait(shard_info);
247 return set_cr_error(ret);
249 return set_cr_done();
256 struct read_remote_data_log_response {
259 list<rgw_data_change_log_entry> entries;
261 read_remote_data_log_response() : truncated(false) {}
263 void decode_json(JSONObj *obj) {
264 JSONDecoder::decode_json("marker", marker, obj);
265 JSONDecoder::decode_json("truncated", truncated, obj);
266 JSONDecoder::decode_json("entries", entries, obj);
270 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
271 RGWDataSyncEnv *sync_env;
273 RGWRESTReadResource *http_op;
277 list<rgw_data_change_log_entry> *entries;
280 read_remote_data_log_response response;
283 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
284 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
290 truncated(_truncated) {
292 ~RGWReadRemoteDataLogShardCR() override {
298 int operate() override {
302 snprintf(buf, sizeof(buf), "%d", shard_id);
303 rgw_http_param_pair pairs[] = { { "type" , "data" },
305 { "marker", pmarker->c_str() },
306 { "extra-info", "true" },
309 string p = "/admin/log/";
311 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
313 http_op->set_user_info((void *)stack);
315 int ret = http_op->aio_read();
317 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
318 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
319 return set_cr_error(ret);
325 int ret = http_op->wait(&response);
327 return set_cr_error(ret);
330 entries->swap(response.entries);
331 *pmarker = response.marker;
332 *truncated = response.truncated;
333 return set_cr_done();
340 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
341 RGWDataSyncEnv *sync_env;
344 map<int, RGWDataChangesLogInfo> *datalog_info;
347 #define READ_DATALOG_MAX_CONCURRENT 10
350 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
352 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
353 sync_env(_sync_env), num_shards(_num_shards),
354 datalog_info(_datalog_info), shard_id(0) {}
355 bool spawn_next() override;
358 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
359 if (shard_id >= num_shards) {
362 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
367 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
368 RGWDataSyncEnv *sync_env;
369 RGWRESTReadResource *http_op;
373 uint32_t max_entries;
374 rgw_datalog_shard_data *result;
377 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
378 const string& _marker, uint32_t _max_entries,
379 rgw_datalog_shard_data *_result)
380 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
381 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
383 int send_request() override {
384 RGWRESTConn *conn = sync_env->conn;
385 RGWRados *store = sync_env->store;
388 snprintf(buf, sizeof(buf), "%d", shard_id);
390 char max_entries_buf[32];
391 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
393 const char *marker_key = (marker.empty() ? "" : "marker");
395 rgw_http_param_pair pairs[] = { { "type", "data" },
397 { "max-entries", max_entries_buf },
398 { marker_key, marker.c_str() },
401 string p = "/admin/log/";
403 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
404 http_op->set_user_info((void *)stack);
406 int ret = http_op->aio_read();
408 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
409 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
417 int request_complete() override {
418 int ret = http_op->wait(result);
420 if (ret < 0 && ret != -ENOENT) {
421 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
428 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
429 RGWDataSyncEnv *sync_env;
431 map<int, string> shards;
432 int max_entries_per_shard;
433 map<int, rgw_datalog_shard_data> *result;
435 map<int, string>::iterator iter;
436 #define READ_DATALOG_MAX_CONCURRENT 10
439 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
440 map<int, string>& _shards,
441 int _max_entries_per_shard,
442 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
443 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
445 shards.swap(_shards);
446 iter = shards.begin();
448 bool spawn_next() override;
451 bool RGWListRemoteDataLogCR::spawn_next() {
452 if (iter == shards.end()) {
456 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
461 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
462 static constexpr uint32_t lock_duration = 30;
463 RGWDataSyncEnv *sync_env;
465 const rgw_pool& pool;
466 const uint32_t num_shards;
468 string sync_status_oid;
472 rgw_data_sync_status *status;
473 map<int, RGWDataChangesLogInfo> shards_info;
475 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
476 uint64_t instance_id,
477 rgw_data_sync_status *status)
478 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
479 pool(store->get_zone_params().log_pool),
480 num_shards(num_shards), status(status) {
481 lock_name = "sync_lock";
483 status->sync_info.instance_id = instance_id;
485 #define COOKIE_LEN 16
486 char buf[COOKIE_LEN + 1];
488 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
491 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
494 int operate() override {
497 using LockCR = RGWSimpleRadosLockCR;
498 yield call(new LockCR(sync_env->async_rados, store,
499 rgw_raw_obj{pool, sync_status_oid},
500 lock_name, cookie, lock_duration));
502 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
503 return set_cr_error(retcode);
505 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
506 yield call(new WriteInfoCR(sync_env->async_rados, store,
507 rgw_raw_obj{pool, sync_status_oid},
510 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
511 return set_cr_error(retcode);
514 /* take lock again, we just recreated the object */
515 yield call(new LockCR(sync_env->async_rados, store,
516 rgw_raw_obj{pool, sync_status_oid},
517 lock_name, cookie, lock_duration));
519 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
520 return set_cr_error(retcode);
523 /* fetch current position in logs */
525 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
527 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
528 return set_cr_error(-EIO);
530 for (uint32_t i = 0; i < num_shards; i++) {
531 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
534 while (collect(&ret, NULL)) {
536 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
537 return set_state(RGWCoroutine_Error);
542 for (uint32_t i = 0; i < num_shards; i++) {
543 RGWDataChangesLogInfo& info = shards_info[i];
544 auto& marker = status->sync_markers[i];
545 marker.next_step_marker = info.marker;
546 marker.timestamp = info.last_update;
547 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
548 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
549 spawn(new WriteMarkerCR(sync_env->async_rados, store,
550 rgw_raw_obj{pool, oid}, marker), true);
553 while (collect(&ret, NULL)) {
555 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
556 return set_state(RGWCoroutine_Error);
561 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
562 yield call(new WriteInfoCR(sync_env->async_rados, store,
563 rgw_raw_obj{pool, sync_status_oid},
566 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
567 return set_cr_error(retcode);
569 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
570 rgw_raw_obj{pool, sync_status_oid},
572 return set_cr_done();
578 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
580 rgw_http_param_pair pairs[] = { { "type", "data" },
583 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
585 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
589 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
594 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
596 rgw_datalog_info log_info;
597 int ret = read_log_info(&log_info);
602 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
605 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
607 if (store->is_meta_master()) {
611 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
614 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
616 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
622 int ret = http_manager.set_threaded();
624 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
633 void RGWRemoteDataLog::finish()
638 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
640 // cannot run concurrently with run_sync(), so run in a separate manager
641 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
642 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
643 int ret = http_manager.set_threaded();
645 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
648 RGWDataSyncEnv sync_env_local = sync_env;
649 sync_env_local.http_manager = &http_manager;
650 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
655 int RGWRemoteDataLog::init_sync_status(int num_shards)
657 rgw_data_sync_status sync_status;
658 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
659 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
660 int ret = http_manager.set_threaded();
662 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
665 RGWDataSyncEnv sync_env_local = sync_env;
666 sync_env_local.http_manager = &http_manager;
667 uint64_t instance_id;
668 get_random_bytes((char *)&instance_id, sizeof(instance_id));
669 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
674 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
676 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
677 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
681 struct bucket_instance_meta_info {
685 RGWBucketInstanceMetadataObject data;
687 bucket_instance_meta_info() {}
689 void decode_json(JSONObj *obj) {
690 JSONDecoder::decode_json("key", key, obj);
691 JSONDecoder::decode_json("ver", ver, obj);
692 JSONDecoder::decode_json("mtime", mtime, obj);
693 JSONDecoder::decode_json("data", data, obj);
697 class RGWListBucketIndexesCR : public RGWCoroutine {
698 RGWDataSyncEnv *sync_env;
702 rgw_data_sync_status *sync_status;
709 list<string>::iterator iter;
711 RGWShardedOmapCRManager *entries_index;
716 bucket_instance_meta_info meta_info;
724 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
725 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
726 store(sync_env->store), sync_status(_sync_status),
727 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
728 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
729 path = "/admin/metadata/bucket.instance";
730 num_shards = sync_status->sync_info.num_shards;
732 ~RGWListBucketIndexesCR() override {
733 delete entries_index;
736 int operate() override {
739 string entrypoint = string("/admin/metadata/bucket.instance");
740 /* FIXME: need a better scaling solution here, requires streaming output */
741 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
742 entrypoint, NULL, &result));
745 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
746 return set_cr_error(retcode);
748 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
749 store->get_zone_params().log_pool,
751 yield; // yield so OmapAppendCRs can start
752 for (iter = result.begin(); iter != result.end(); ++iter) {
753 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
758 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
761 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
764 num_shards = meta_info.data.get_bucket_info().num_shards;
765 if (num_shards > 0) {
766 for (i = 0; i < num_shards; i++) {
768 snprintf(buf, sizeof(buf), ":%d", i);
770 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
773 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
777 if (!entries_index->finish()) {
782 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
783 int shard_id = (int)iter->first;
784 rgw_data_sync_marker& marker = iter->second;
785 marker.total_entries = entries_index->get_total_entries(shard_id);
786 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
787 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
791 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
792 EIO, string("failed to build bucket instances map")));
794 while (collect(&ret, NULL)) {
796 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
797 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
804 yield return set_cr_error(req_ret);
806 yield return set_cr_done();
812 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
814 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
815 RGWDataSyncEnv *sync_env;
818 rgw_data_sync_marker sync_marker;
820 map<string, string> key_to_marker;
821 map<string, string> marker_to_key;
823 void handle_finish(const string& marker) override {
824 map<string, string>::iterator iter = marker_to_key.find(marker);
825 if (iter == marker_to_key.end()) {
828 key_to_marker.erase(iter->second);
829 reset_need_retry(iter->second);
830 marker_to_key.erase(iter);
834 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
835 const string& _marker_oid,
836 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
838 marker_oid(_marker_oid),
839 sync_marker(_marker) {}
841 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
842 sync_marker.marker = new_marker;
843 sync_marker.pos = index_pos;
845 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
846 RGWRados *store = sync_env->store;
848 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
849 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
854 * create index from key -> marker, and from marker -> key
855 * this is useful so that we can insure that we only have one
856 * entry for any key that is used. This is needed when doing
857 * incremenatl sync of data, and we don't want to run multiple
858 * concurrent sync operations for the same bucket shard
860 bool index_key_to_marker(const string& key, const string& marker) {
861 if (key_to_marker.find(key) != key_to_marker.end()) {
865 key_to_marker[key] = marker;
866 marker_to_key[marker] = key;
871 // ostream wrappers to print buckets without copying strings
874 bucket_str(const rgw_bucket& b) : b(b) {}
876 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
878 if (!b.tenant.empty()) {
879 out << b.tenant << '/';
882 if (!b.bucket_id.empty()) {
883 out << ':' << b.bucket_id;
888 struct bucket_shard_str {
889 const rgw_bucket_shard& bs;
890 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
892 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
894 out << bucket_str{bs.bucket};
895 if (bs.shard_id >= 0) {
896 out << ':' << bs.shard_id;
901 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
902 RGWDataSyncEnv *sync_env;
904 RGWBucketInfo bucket_info;
905 rgw_bucket_shard_sync_info sync_status;
906 RGWMetaSyncEnv meta_sync_env;
908 RGWDataSyncDebugLogger logger;
909 const std::string status_oid;
911 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
912 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
915 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
916 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
917 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
918 logger.init(sync_env, "Bucket", bs.get_key());
920 ~RGWRunBucketSyncCoroutine() override {
926 int operate() override;
929 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
930 RGWDataSyncEnv *sync_env;
941 RGWDataSyncShardMarkerTrack *marker_tracker;
943 boost::intrusive_ptr<RGWOmapAppend> error_repo;
944 bool remove_from_repo;
949 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
950 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
951 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
953 raw_key(_raw_key), entry_marker(_entry_marker),
955 marker_tracker(_marker_tracker),
956 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
957 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
960 int operate() override {
964 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
965 &bs.bucket, &bs.shard_id);
967 return set_cr_error(-EIO);
969 if (marker_tracker) {
970 marker_tracker->reset_need_retry(raw_key);
972 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
974 } while (marker_tracker && marker_tracker->need_retry(raw_key));
976 sync_status = retcode;
978 if (sync_status == -ENOENT) {
979 // this was added when 'tenant/' was added to datalog entries, because
980 // preexisting tenant buckets could never sync and would stay in the
981 // error_repo forever
982 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
983 "for missing bucket " << raw_key << dendl;
987 if (sync_status < 0) {
988 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
989 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
991 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
993 if (error_repo && !error_repo->append(raw_key)) {
994 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
996 } else if (error_repo && remove_from_repo) {
998 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1000 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1001 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1004 /* FIXME: what do do in case of error */
1005 if (marker_tracker && !entry_marker.empty()) {
1007 yield call(marker_tracker->finish(entry_marker));
1009 if (sync_status == 0) {
1010 sync_status = retcode;
1012 if (sync_status < 0) {
1013 return set_cr_error(sync_status);
1015 return set_cr_done();
1021 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1022 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1024 enum RemoteDatalogStatus {
1025 RemoteNotTrimmed = 0,
1027 RemoteMightTrimmed = 2
1030 class RGWDataSyncShardCR : public RGWCoroutine {
1031 RGWDataSyncEnv *sync_env;
1036 rgw_data_sync_marker sync_marker;
1038 map<string, bufferlist> entries;
1039 map<string, bufferlist>::iterator iter;
1043 RGWDataSyncShardMarkerTrack *marker_tracker;
1045 list<rgw_data_change_log_entry> log_entries;
1046 list<rgw_data_change_log_entry>::iterator log_iter;
1049 RGWDataChangesLogInfo shard_info;
1050 string datalog_marker;
1052 RemoteDatalogStatus remote_trimmed;
1056 boost::asio::coroutine incremental_cr;
1057 boost::asio::coroutine full_cr;
1060 set<string> modified_shards;
1061 set<string> current_modified;
1063 set<string>::iterator modified_iter;
1069 bool *reset_backoff;
1071 set<string> spawned_keys;
1073 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1074 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1079 RGWOmapAppend *error_repo;
1080 map<string, bufferlist> error_entries;
1081 string error_marker;
1082 int max_error_entries;
1084 ceph::real_time error_retry_time;
1086 #define RETRY_BACKOFF_SECS_MIN 60
1087 #define RETRY_BACKOFF_SECS_DEFAULT 60
1088 #define RETRY_BACKOFF_SECS_MAX 600
1089 uint32_t retry_backoff_secs;
1091 RGWDataSyncDebugLogger logger;
1093 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1095 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1096 sync_env(_sync_env),
1098 shard_id(_shard_id),
1099 sync_marker(_marker),
1100 marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
1101 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1102 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1103 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1104 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1105 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1106 error_oid = status_oid + ".retry";
1108 logger.init(sync_env, "DataShard", status_oid);
1111 ~RGWDataSyncShardCR() override {
1112 delete marker_tracker;
1121 void append_modified_shards(set<string>& keys) {
1122 Mutex::Locker l(inc_lock);
1123 modified_shards.insert(keys.begin(), keys.end());
1126 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1127 delete marker_tracker;
1128 marker_tracker = mt;
1131 int operate() override {
1134 switch (sync_marker.state) {
1135 case rgw_data_sync_marker::FullSync:
1138 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1139 return set_cr_error(r);
1142 case rgw_data_sync_marker::IncrementalSync:
1143 r = incremental_sync();
1145 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1146 return set_cr_error(r);
1150 return set_cr_error(-EIO);
1156 void init_lease_cr() {
1157 set_status("acquiring sync lock");
1158 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1159 string lock_name = "sync_lock";
1163 RGWRados *store = sync_env->store;
1164 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1165 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1166 lock_name, lock_duration, this));
1167 lease_stack.reset(spawn(lease_cr.get(), false));
1171 #define OMAP_GET_MAX_ENTRIES 100
1172 int max_entries = OMAP_GET_MAX_ENTRIES;
1174 yield init_lease_cr();
1175 while (!lease_cr->is_locked()) {
1176 if (lease_cr->is_done()) {
1177 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1178 set_status("lease lock failed, early abort");
1179 return set_cr_error(lease_cr->get_ret_status());
1184 logger.log("full sync");
1185 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1186 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1187 total_entries = sync_marker.pos;
1189 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1191 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1192 lease_cr->go_down();
1194 return set_cr_error(retcode);
1196 iter = entries.begin();
1197 for (; iter != entries.end(); ++iter) {
1198 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1200 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1201 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1203 // fetch remote and write locally
1204 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1206 lease_cr->go_down();
1208 return set_cr_error(retcode);
1211 sync_marker.marker = iter->first;
1213 } while ((int)entries.size() == max_entries);
1215 lease_cr->go_down();
1219 /* update marker to reflect we're done with full sync */
1220 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1221 sync_marker.marker = sync_marker.next_step_marker;
1222 sync_marker.next_step_marker.clear();
1223 RGWRados *store = sync_env->store;
1224 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1225 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1229 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1230 lease_cr->go_down();
1231 return set_cr_error(retcode);
1237 int incremental_sync() {
1238 reenter(&incremental_cr) {
1239 yield init_lease_cr();
1240 while (!lease_cr->is_locked()) {
1241 if (lease_cr->is_done()) {
1242 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1243 set_status("lease lock failed, early abort");
1244 return set_cr_error(lease_cr->get_ret_status());
1249 set_status("lease acquired");
1250 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1251 rgw_raw_obj(pool, error_oid),
1254 spawn(error_repo, false);
1255 logger.log("inc sync");
1256 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1258 current_modified.clear();
1260 current_modified.swap(modified_shards);
1263 /* process out of band updates */
1264 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1266 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1267 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1271 /* process bucket shards that previously failed */
1272 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1273 error_marker, &error_entries,
1274 max_error_entries));
1275 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1276 iter = error_entries.begin();
1277 for (; iter != error_entries.end(); ++iter) {
1278 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1279 spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1280 error_marker = iter->first;
1282 if ((int)error_entries.size() != max_error_entries) {
1283 if (error_marker.empty() && error_entries.empty()) {
1284 /* the retry repo is empty, we back off a bit before calling it again */
1285 retry_backoff_secs *= 2;
1286 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1287 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1290 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1292 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1293 error_marker.clear();
1297 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1299 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1300 stop_spawned_services();
1302 return set_cr_error(retcode);
1304 datalog_marker = shard_info.marker;
1305 remote_trimmed = RemoteNotTrimmed;
1306 #define INCREMENTAL_MAX_ENTRIES 100
1307 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1308 if (datalog_marker > sync_marker.marker) {
1309 spawned_keys.clear();
1310 if (sync_marker.marker.empty())
1311 remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
1312 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1314 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1315 stop_spawned_services();
1317 return set_cr_error(retcode);
1319 if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
1320 remote_trimmed = RemoteTrimmed;
1322 remote_trimmed = RemoteNotTrimmed;
1323 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1324 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1325 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1326 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1327 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1330 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1331 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1334 * don't spawn the same key more than once. We can do that as long as we don't yield
1336 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1337 spawned_keys.insert(log_iter->entry.key);
1338 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1340 stop_spawned_services();
1342 return set_cr_error(retcode);
1347 while ((int)num_spawned() > spawn_window) {
1348 set_status() << "num_spawned() > spawn_window";
1349 yield wait_for_child();
1351 while (collect(&ret, lease_stack.get())) {
1353 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1354 /* we have reported this error */
1356 /* not waiting for child here */
1360 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1361 if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
1362 #define INCREMENTAL_INTERVAL 20
1363 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1369 void stop_spawned_services() {
1370 lease_cr->go_down();
1372 error_repo->finish();
1379 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1380 RGWDataSyncEnv *sync_env;
1385 rgw_data_sync_marker sync_marker;
1388 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1389 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1390 sync_env(_sync_env),
1392 shard_id(_shard_id),
1393 sync_marker(_marker) {
1396 RGWCoroutine *alloc_cr() override {
1397 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1400 RGWCoroutine *alloc_finisher_cr() override {
1401 RGWRados *store = sync_env->store;
1402 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1403 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1407 void append_modified_shards(set<string>& keys) {
1408 Mutex::Locker l(cr_lock());
1410 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1415 cr->append_modified_shards(keys);
1419 class RGWDataSyncCR : public RGWCoroutine {
1420 RGWDataSyncEnv *sync_env;
1421 uint32_t num_shards;
1423 rgw_data_sync_status sync_status;
1425 RGWDataSyncShardMarkerTrack *marker_tracker;
1427 Mutex shard_crs_lock;
1428 map<int, RGWDataSyncShardControlCR *> shard_crs;
1430 bool *reset_backoff;
1432 RGWDataSyncDebugLogger logger;
1434 RGWDataSyncModule *data_sync_module{nullptr};
1436 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1437 sync_env(_sync_env),
1438 num_shards(_num_shards),
1439 marker_tracker(NULL),
1440 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1441 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1445 ~RGWDataSyncCR() override {
1446 for (auto iter : shard_crs) {
1451 int operate() override {
1454 /* read sync status */
1455 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1457 data_sync_module = sync_env->sync_module->get_data_handler();
1459 if (retcode < 0 && retcode != -ENOENT) {
1460 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1461 return set_cr_error(retcode);
1464 /* state: init status */
1465 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1466 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1467 sync_status.sync_info.num_shards = num_shards;
1468 uint64_t instance_id;
1469 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1470 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1472 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1473 return set_cr_error(retcode);
1475 // sets state = StateBuildingFullSyncMaps
1477 *reset_backoff = true;
1480 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1482 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1483 /* call sync module init here */
1484 yield call(data_sync_module->init_sync(sync_env));
1486 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1487 return set_cr_error(retcode);
1489 /* state: building full sync maps */
1490 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1491 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1493 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1494 return set_cr_error(retcode);
1496 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1498 /* update new state */
1499 yield call(set_sync_info_cr());
1501 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1502 return set_cr_error(retcode);
1505 *reset_backoff = true;
1509 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1510 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1511 iter != sync_status.sync_markers.end(); ++iter) {
1512 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1513 iter->first, iter->second);
1515 shard_crs_lock.Lock();
1516 shard_crs[iter->first] = cr;
1517 shard_crs_lock.Unlock();
1523 return set_cr_done();
1528 RGWCoroutine *set_sync_info_cr() {
1529 RGWRados *store = sync_env->store;
1530 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1531 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1532 sync_status.sync_info);
1535 void wakeup(int shard_id, set<string>& keys) {
1536 Mutex::Locker l(shard_crs_lock);
1537 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1538 if (iter == shard_crs.end()) {
1541 iter->second->append_modified_shards(keys);
1542 iter->second->wakeup();
1546 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1548 RGWDefaultDataSyncModule() {}
1550 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1551 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1552 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1553 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1556 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1557 RGWDefaultDataSyncModule data_handler;
1559 RGWDefaultSyncModuleInstance() {}
1560 RGWDataSyncModule *get_data_handler() override {
1561 return &data_handler;
1565 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1567 instance->reset(new RGWDefaultSyncModuleInstance());
1571 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1573 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1574 key, versioned_epoch,
1578 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1579 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1581 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1582 bucket_info, key, versioned, versioned_epoch,
1583 NULL, NULL, false, &mtime, zones_trace);
1586 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1587 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1589 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1590 bucket_info, key, versioned, versioned_epoch,
1591 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1594 class RGWDataSyncControlCR : public RGWBackoffControlCR
1596 RGWDataSyncEnv *sync_env;
1597 uint32_t num_shards;
1599 static constexpr bool exit_on_error = false; // retry on all errors
1601 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1602 sync_env(_sync_env), num_shards(_num_shards) {
1605 RGWCoroutine *alloc_cr() override {
1606 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1609 void wakeup(int shard_id, set<string>& keys) {
1610 Mutex& m = cr_lock();
1613 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1623 cr->wakeup(shard_id, keys);
1630 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1631 RWLock::RLocker rl(lock);
1632 if (!data_sync_cr) {
1635 data_sync_cr->wakeup(shard_id, keys);
1638 int RGWRemoteDataLog::run_sync(int num_shards)
1641 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1642 data_sync_cr->get(); // run() will drop a ref, so take another
1645 int r = run(data_sync_cr);
1648 data_sync_cr->put();
1649 data_sync_cr = NULL;
1653 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1659 int RGWDataSyncStatusManager::init()
1661 auto zone_def_iter = store->zone_by_id.find(source_zone);
1662 if (zone_def_iter == store->zone_by_id.end()) {
1663 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1667 auto& zone_def = zone_def_iter->second;
1669 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1673 RGWZoneParams& zone_params = store->get_zone_params();
1675 sync_module = store->get_sync_module();
1677 conn = store->get_zone_conn_by_id(source_zone);
1679 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1683 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1685 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1687 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1692 rgw_datalog_info datalog_info;
1693 r = source_log.read_log_info(&datalog_info);
1695 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1700 num_shards = datalog_info.num_shards;
1702 for (int i = 0; i < num_shards; i++) {
1703 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1709 void RGWDataSyncStatusManager::finalize()
1711 delete error_logger;
1712 error_logger = nullptr;
1715 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1717 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1718 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1723 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1725 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1726 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1731 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1732 const rgw_bucket& bucket, int shard_id,
1733 RGWSyncErrorLogger *_error_logger,
1734 RGWSyncModuleInstanceRef& _sync_module)
1737 source_zone = _source_zone;
1739 bs.shard_id = shard_id;
1741 sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
1746 struct bucket_index_marker_info {
1750 bool syncstopped{false};
1752 void decode_json(JSONObj *obj) {
1753 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1754 JSONDecoder::decode_json("master_ver", master_ver, obj);
1755 JSONDecoder::decode_json("max_marker", max_marker, obj);
1756 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
1760 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1761 RGWDataSyncEnv *sync_env;
1762 const string instance_key;
1764 bucket_index_marker_info *info;
1767 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1768 const rgw_bucket_shard& bs,
1769 bucket_index_marker_info *_info)
1770 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1771 instance_key(bs.get_key()), info(_info) {}
1773 int operate() override {
1776 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1777 { "bucket-instance", instance_key.c_str() },
1781 string p = "/admin/log/";
1782 call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1785 return set_cr_error(retcode);
1787 return set_cr_done();
1793 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1794 RGWDataSyncEnv *sync_env;
1796 rgw_bucket_shard bs;
1797 const string sync_status_oid;
1799 rgw_bucket_shard_sync_info& status;
1801 bucket_index_marker_info info;
1803 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1804 const rgw_bucket_shard& bs,
1805 rgw_bucket_shard_sync_info& _status)
1806 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1807 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1811 int operate() override {
1813 /* fetch current position in logs */
1814 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1815 if (retcode < 0 && retcode != -ENOENT) {
1816 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1817 return set_cr_error(retcode);
1820 auto store = sync_env->store;
1821 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1823 if (info.syncstopped) {
1824 call(new RGWRadosRemoveCR(store, obj));
1826 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1827 status.inc_marker.position = info.max_marker;
1828 map<string, bufferlist> attrs;
1829 status.encode_all_attrs(attrs);
1830 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1833 return set_cr_done();
1839 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1841 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1845 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1847 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1848 if (iter == attrs.end()) {
1853 bufferlist::iterator biter = iter->second.begin();
1855 ::decode(*val, biter);
1856 } catch (buffer::error& err) {
1857 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1861 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1863 decode_attr(cct, attrs, "state", &state);
1864 decode_attr(cct, attrs, "full_marker", &full_marker);
1865 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1868 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1870 encode_state_attr(attrs);
1871 full_marker.encode_attr(attrs);
1872 inc_marker.encode_attr(attrs);
1875 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1877 ::encode(state, attrs["state"]);
1880 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1882 ::encode(*this, attrs["full_marker"]);
1885 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1887 ::encode(*this, attrs["inc_marker"]);
1890 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1891 RGWDataSyncEnv *sync_env;
1893 rgw_bucket_shard_sync_info *status;
1895 map<string, bufferlist> attrs;
1897 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1898 const rgw_bucket_shard& bs,
1899 rgw_bucket_shard_sync_info *_status)
1900 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1901 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1903 int operate() override;
1906 int RGWReadBucketSyncStatusCoroutine::operate()
1909 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1910 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1912 if (retcode == -ENOENT) {
1913 *status = rgw_bucket_shard_sync_info();
1914 return set_cr_done();
1917 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1918 return set_cr_error(retcode);
1920 status->decode_from_attrs(sync_env->cct, attrs);
1921 return set_cr_done();
1925 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1927 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1930 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1931 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1932 delete iter->second;
1934 delete error_logger;
1938 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1940 JSONDecoder::decode_json("ID", id, obj);
1941 JSONDecoder::decode_json("DisplayName", display_name, obj);
1944 struct bucket_list_entry {
1951 string storage_class;
1952 rgw_bucket_entry_owner owner;
1953 uint64_t versioned_epoch;
1956 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1958 void decode_json(JSONObj *obj) {
1959 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1960 JSONDecoder::decode_json("Key", key.name, obj);
1961 JSONDecoder::decode_json("VersionId", key.instance, obj);
1962 JSONDecoder::decode_json("IsLatest", is_latest, obj);
1964 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1968 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1970 ts.tv_sec = (uint64_t)internal_timegm(&t);
1972 mtime = real_clock::from_ceph_timespec(ts);
1974 JSONDecoder::decode_json("ETag", etag, obj);
1975 JSONDecoder::decode_json("Size", size, obj);
1976 JSONDecoder::decode_json("StorageClass", storage_class, obj);
1977 JSONDecoder::decode_json("Owner", owner, obj);
1978 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1979 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1983 struct bucket_list_result {
1987 string version_id_marker;
1990 list<bucket_list_entry> entries;
1992 bucket_list_result() : max_keys(0), is_truncated(false) {}
1994 void decode_json(JSONObj *obj) {
1995 JSONDecoder::decode_json("Name", name, obj);
1996 JSONDecoder::decode_json("Prefix", prefix, obj);
1997 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
1998 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
1999 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2000 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2001 JSONDecoder::decode_json("Entries", entries, obj);
2005 class RGWListBucketShardCR: public RGWCoroutine {
2006 RGWDataSyncEnv *sync_env;
2007 const rgw_bucket_shard& bs;
2008 const string instance_key;
2009 rgw_obj_key marker_position;
2011 bucket_list_result *result;
2014 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2015 rgw_obj_key& _marker_position, bucket_list_result *_result)
2016 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2017 instance_key(bs.get_key()), marker_position(_marker_position),
2020 int operate() override {
2023 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2024 { "versions" , NULL },
2025 { "format" , "json" },
2026 { "objs-container" , "true" },
2027 { "key-marker" , marker_position.name.c_str() },
2028 { "version-id-marker" , marker_position.instance.c_str() },
2030 // don't include tenant in the url, it's already part of instance_key
2031 string p = string("/") + bs.bucket.name;
2032 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2035 return set_cr_error(retcode);
2037 return set_cr_done();
2043 class RGWListBucketIndexLogCR: public RGWCoroutine {
2044 RGWDataSyncEnv *sync_env;
2045 const string instance_key;
2048 list<rgw_bi_log_entry> *result;
2051 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2052 string& _marker, list<rgw_bi_log_entry> *_result)
2053 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2054 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2056 int operate() override {
2059 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2060 { "format" , "json" },
2061 { "marker" , marker.c_str() },
2062 { "type", "bucket-index" },
2065 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2068 return set_cr_error(retcode);
2070 return set_cr_done();
2076 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2078 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2079 RGWDataSyncEnv *sync_env;
2082 rgw_bucket_shard_full_sync_marker sync_marker;
2085 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2086 const string& _marker_oid,
2087 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2088 sync_env(_sync_env),
2089 marker_oid(_marker_oid),
2090 sync_marker(_marker) {}
2092 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2093 sync_marker.position = new_marker;
2094 sync_marker.count = index_pos;
2096 map<string, bufferlist> attrs;
2097 sync_marker.encode_attr(attrs);
2099 RGWRados *store = sync_env->store;
2101 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2102 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2103 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2108 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2109 RGWDataSyncEnv *sync_env;
2112 rgw_bucket_shard_inc_sync_marker sync_marker;
2114 map<rgw_obj_key, string> key_to_marker;
2115 map<string, rgw_obj_key> marker_to_key;
2117 void handle_finish(const string& marker) override {
2118 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2119 if (iter == marker_to_key.end()) {
2122 key_to_marker.erase(iter->second);
2123 reset_need_retry(iter->second);
2124 marker_to_key.erase(iter);
2128 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2129 const string& _marker_oid,
2130 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2131 sync_env(_sync_env),
2132 marker_oid(_marker_oid),
2133 sync_marker(_marker) {}
2135 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2136 sync_marker.position = new_marker;
2138 map<string, bufferlist> attrs;
2139 sync_marker.encode_attr(attrs);
2141 RGWRados *store = sync_env->store;
2143 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2144 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2146 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2151 * create index from key -> <op, marker>, and from marker -> key
2152 * this is useful so that we can insure that we only have one
2153 * entry for any key that is used. This is needed when doing
2154 * incremenatl sync of data, and we don't want to run multiple
2155 * concurrent sync operations for the same bucket shard
2156 * Also, we should make sure that we don't run concurrent operations on the same key with
2159 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2160 if (key_to_marker.find(key) != key_to_marker.end()) {
2161 set_need_retry(key);
2164 key_to_marker[key] = marker;
2165 marker_to_key[marker] = key;
2169 bool can_do_op(const rgw_obj_key& key) {
2170 return (key_to_marker.find(key) == key_to_marker.end());
2174 template <class T, class K>
2175 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2176 RGWDataSyncEnv *sync_env;
2178 RGWBucketInfo *bucket_info;
2179 const rgw_bucket_shard& bs;
2183 uint64_t versioned_epoch;
2184 rgw_bucket_entry_owner owner;
2185 real_time timestamp;
2187 RGWPendingState op_state;
2190 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2194 stringstream error_ss;
2196 RGWDataSyncDebugLogger logger;
2198 bool error_injection;
2200 RGWDataSyncModule *data_sync_module;
2202 rgw_zone_set zones_trace;
2205 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2206 RGWBucketInfo *_bucket_info,
2207 const rgw_bucket_shard& bs,
2208 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2209 real_time& _timestamp,
2210 const rgw_bucket_entry_owner& _owner,
2211 RGWModifyOp _op, RGWPendingState _op_state,
2212 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2213 sync_env(_sync_env),
2214 bucket_info(_bucket_info), bs(bs),
2215 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2217 timestamp(_timestamp), op(_op),
2218 op_state(_op_state),
2219 entry_marker(_entry_marker),
2220 marker_tracker(_marker_tracker),
2223 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2224 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2225 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2228 logger.init(sync_env, "Object", ss.str());
2230 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2232 data_sync_module = sync_env->sync_module->get_data_handler();
2234 zones_trace = _zones_trace;
2235 zones_trace.insert(sync_env->store->get_zone().id);
2238 int operate() override {
2240 /* skip entries that are not complete */
2241 if (op_state != CLS_RGW_STATE_COMPLETE) {
2246 marker_tracker->reset_need_retry(key);
2247 if (key.name.empty()) {
2248 /* shouldn't happen */
2249 set_status("skipping empty entry");
2250 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2253 if (error_injection &&
2254 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2255 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2257 } else if (op == CLS_RGW_OP_ADD ||
2258 op == CLS_RGW_OP_LINK_OLH) {
2259 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2260 set_status("skipping entry");
2261 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2265 set_status("syncing obj");
2266 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2267 logger.log("fetch");
2268 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2269 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2270 set_status("removing obj");
2271 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2274 logger.log("remove");
2275 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2276 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2277 logger.log("creating delete marker");
2278 set_status("creating delete marker");
2279 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2280 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2283 } while (marker_tracker->need_retry(key));
2289 ss << "done, retcode=" << retcode;
2291 logger.log(ss.str());
2294 if (retcode < 0 && retcode != -ENOENT) {
2295 set_status() << "failed to sync obj; retcode=" << retcode;
2296 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2297 << bucket_shard_str{bs} << "/" << key.name << dendl;
2298 error_ss << bucket_shard_str{bs} << "/" << key.name;
2299 sync_status = retcode;
2301 if (!error_ss.str().empty()) {
2302 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2305 if (sync_status == 0) {
2307 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2308 yield call(marker_tracker->finish(entry_marker));
2309 sync_status = retcode;
2311 if (sync_status < 0) {
2312 return set_cr_error(sync_status);
2314 return set_cr_done();
2320 #define BUCKET_SYNC_SPAWN_WINDOW 20
2322 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2323 RGWDataSyncEnv *sync_env;
2324 const rgw_bucket_shard& bs;
2325 RGWBucketInfo *bucket_info;
2326 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2327 bucket_list_result list_result;
2328 list<bucket_list_entry>::iterator entries_iter;
2329 rgw_bucket_shard_full_sync_marker& full_marker;
2330 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2331 rgw_obj_key list_marker;
2332 bucket_list_entry *entry{nullptr};
2333 RGWModifyOp op{CLS_RGW_OP_ADD};
2335 int total_entries{0};
2339 const string& status_oid;
2341 RGWDataSyncDebugLogger logger;
2342 rgw_zone_set zones_trace;
2344 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2345 RGWBucketInfo *_bucket_info,
2346 const std::string& status_oid,
2347 RGWContinuousLeaseCR *lease_cr,
2348 rgw_bucket_shard_full_sync_marker& _full_marker)
2349 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2350 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2351 marker_tracker(sync_env, status_oid, full_marker),
2352 status_oid(status_oid) {
2353 logger.init(sync_env, "BucketFull", bs.get_key());
2354 zones_trace.insert(sync_env->source_zone);
2357 int operate() override;
2360 int RGWBucketShardFullSyncCR::operate()
2364 list_marker = full_marker.position;
2366 total_entries = full_marker.count;
2368 if (!lease_cr->is_locked()) {
2370 return set_cr_error(-ECANCELED);
2372 set_status("listing remote bucket");
2373 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2374 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2376 if (retcode < 0 && retcode != -ENOENT) {
2377 set_status("failed bucket listing, going down");
2379 return set_cr_error(retcode);
2381 entries_iter = list_result.entries.begin();
2382 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2383 if (!lease_cr->is_locked()) {
2385 return set_cr_error(-ECANCELED);
2387 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2388 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2389 entry = &(*entries_iter);
2391 list_marker = entries_iter->key;
2392 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2393 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2395 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2396 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2397 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2398 false, /* versioned, only matters for object removal */
2399 entry->versioned_epoch, entry->mtime,
2400 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2401 entry->key, &marker_tracker, zones_trace),
2404 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2405 yield wait_for_child();
2408 again = collect(&ret, nullptr);
2410 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2412 /* we have reported this error */
2417 } while (list_result.is_truncated && sync_status == 0);
2418 set_status("done iterating over all objects");
2419 /* wait for all operations to complete */
2420 while (num_spawned()) {
2421 yield wait_for_child();
2424 again = collect(&ret, nullptr);
2426 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2428 /* we have reported this error */
2432 if (!lease_cr->is_locked()) {
2433 return set_cr_error(-ECANCELED);
2435 /* update sync state to incremental */
2436 if (sync_status == 0) {
2438 rgw_bucket_shard_sync_info sync_status;
2439 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2440 map<string, bufferlist> attrs;
2441 sync_status.encode_state_attr(attrs);
2442 RGWRados *store = sync_env->store;
2443 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2444 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2448 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2450 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2451 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2452 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2453 return set_cr_error(retcode);
2455 if (sync_status < 0) {
2456 return set_cr_error(sync_status);
2458 return set_cr_done();
2463 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2464 RGWDataSyncEnv *sync_env;
2465 const rgw_bucket_shard& bs;
2466 RGWBucketInfo *bucket_info;
2467 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2468 list<rgw_bi_log_entry> list_result;
2469 list<rgw_bi_log_entry>::iterator entries_iter;
2470 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2471 rgw_bucket_shard_inc_sync_marker& inc_marker;
2473 rgw_bi_log_entry *entry{nullptr};
2474 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2475 bool updated_status{false};
2476 const string& status_oid;
2477 const string& zone_id;
2478 ceph::real_time sync_modify_time;
2482 RGWDataSyncDebugLogger logger;
2485 bool syncstopped{false};
2488 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2489 const rgw_bucket_shard& bs,
2490 RGWBucketInfo *_bucket_info,
2491 const std::string& status_oid,
2492 RGWContinuousLeaseCR *lease_cr,
2493 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2494 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2495 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2496 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2497 set_description() << "bucket shard incremental sync bucket="
2498 << bucket_shard_str{bs};
2500 logger.init(sync_env, "BucketInc", bs.get_key());
2503 int operate() override;
2506 int RGWBucketShardIncrementalSyncCR::operate()
2511 if (!lease_cr->is_locked()) {
2513 return set_cr_error(-ECANCELED);
2515 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
2516 set_status() << "listing bilog; position=" << inc_marker.position;
2517 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2519 if (retcode < 0 && retcode != -ENOENT ) {
2522 /* wait for all operations to complete */
2523 return set_cr_error(retcode);
2525 /* no need to retry */
2530 for (auto& e : list_result) {
2531 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
2532 ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
2533 sync_modify_time = e.timestamp;
2537 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
2538 ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
2539 sync_modify_time = e.timestamp;
2540 syncstopped = false;
2543 if (e.state != CLS_RGW_STATE_COMPLETE) {
2546 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2549 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2550 if (squash_entry.first <= e.timestamp) {
2551 squash_entry = make_pair<>(e.timestamp, e.op);
2555 entries_iter = list_result.begin();
2556 for (; entries_iter != list_result.end(); ++entries_iter) {
2557 if (!lease_cr->is_locked()) {
2559 return set_cr_error(-ECANCELED);
2561 entry = &(*entries_iter);
2563 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2567 cur_id = entry->id.substr(p + 1);
2570 inc_marker.position = cur_id;
2572 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2573 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
2574 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2578 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2579 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2580 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2581 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2585 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2587 if (!key.ns.empty()) {
2588 set_status() << "skipping entry in namespace: " << entry->object;
2589 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2590 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2594 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2595 if (entry->op == CLS_RGW_OP_CANCEL) {
2596 set_status() << "canceled operation, skipping";
2597 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2598 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2599 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2602 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2603 set_status() << "non-complete operation, skipping";
2604 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2605 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2606 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2609 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2610 set_status() << "redundant operation, skipping";
2611 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2612 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2613 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2616 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2617 set_status() << "squashed operation, skipping";
2618 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2619 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2620 /* not updating high marker though */
2623 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2624 << bucket_shard_str{bs} << "/" << key << dendl;
2625 updated_status = false;
2626 while (!marker_tracker.can_do_op(key)) {
2627 if (!updated_status) {
2628 set_status() << "can't do op, conflicting inflight operation";
2629 updated_status = true;
2631 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2632 yield wait_for_child();
2635 again = collect(&ret, nullptr);
2637 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2639 /* we have reported this error */
2643 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2644 set_status() << "can't do op, sync already in progress for object";
2645 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2646 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2650 set_status() << "start object sync";
2651 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2652 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2654 uint64_t versioned_epoch = 0;
2655 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2656 if (entry->ver.pool < 0) {
2657 versioned_epoch = entry->ver.epoch;
2659 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2660 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2661 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2662 entry->is_versioned(), versioned_epoch,
2663 entry->timestamp, owner, entry->op, entry->state,
2664 cur_id, &marker_tracker, entry->zones_trace),
2668 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2669 set_status() << "num_spawned() > spawn_window";
2670 yield wait_for_child();
2673 again = collect(&ret, nullptr);
2675 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2677 /* we have reported this error */
2679 /* not waiting for child here */
2683 } while (!list_result.empty() && sync_status == 0);
2689 const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
2690 RGWRados *store = sync_env->store;
2691 call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
2694 return set_cr_done();
2697 while (num_spawned()) {
2698 yield wait_for_child();
2701 again = collect(&ret, nullptr);
2703 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2705 /* we have reported this error */
2707 /* not waiting for child here */
2711 yield call(marker_tracker.flush());
2713 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2714 return set_cr_error(retcode);
2716 if (sync_status < 0) {
2717 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2720 /* wait for all operations to complete */
2723 if (sync_status < 0) {
2724 return set_cr_error(sync_status);
2727 return set_cr_done();
2732 int RGWRunBucketSyncCoroutine::operate()
2736 set_status("acquiring sync lock");
2737 auto store = sync_env->store;
2738 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2739 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2741 cct->_conf->rgw_sync_lease_period,
2743 lease_stack.reset(spawn(lease_cr.get(), false));
2745 while (!lease_cr->is_locked()) {
2746 if (lease_cr->is_done()) {
2747 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2748 set_status("lease lock failed, early abort");
2749 return set_cr_error(lease_cr->get_ret_status());
2755 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2756 if (retcode < 0 && retcode != -ENOENT) {
2757 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2758 << bucket_shard_str{bs} << dendl;
2759 lease_cr->go_down();
2761 return set_cr_error(retcode);
2764 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2765 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2767 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2768 if (retcode == -ENOENT) {
2769 /* bucket instance info has not been synced in yet, fetch it now */
2771 ldout(sync_env->cct, 10) << "no local info for bucket "
2772 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2773 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2775 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2777 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2778 string() /* no marker */,
2779 MDLOG_STATUS_COMPLETE,
2780 NULL /* no marker tracker */));
2783 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2784 lease_cr->go_down();
2786 return set_cr_error(retcode);
2789 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2792 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2793 lease_cr->go_down();
2795 return set_cr_error(retcode);
2798 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2799 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2801 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2802 << " failed, retcode=" << retcode << dendl;
2803 lease_cr->go_down();
2805 return set_cr_error(retcode);
2809 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2810 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2811 status_oid, lease_cr.get(),
2812 sync_status.full_marker));
2814 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2815 << " failed, retcode=" << retcode << dendl;
2816 lease_cr->go_down();
2818 return set_cr_error(retcode);
2820 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2823 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2824 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2825 status_oid, lease_cr.get(),
2826 sync_status.inc_marker));
2828 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2829 << " failed, retcode=" << retcode << dendl;
2830 lease_cr->go_down();
2832 return set_cr_error(retcode);
2836 lease_cr->go_down();
2838 return set_cr_done();
2844 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2846 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2849 int RGWBucketSyncStatusManager::init()
2851 conn = store->get_zone_conn_by_id(source_zone);
2853 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2857 int ret = http_manager.set_threaded();
2859 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2864 const string key = bucket.get_key();
2866 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2869 string path = string("/admin/metadata/bucket.instance");
2871 bucket_instance_meta_info result;
2872 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2874 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2878 RGWBucketInfo& bi = result.data.get_bucket_info();
2879 num_shards = bi.num_shards;
2881 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2883 sync_module.reset(new RGWDefaultSyncModuleInstance());
2885 int effective_num_shards = (num_shards ? num_shards : 1);
2887 auto async_rados = store->get_async_rados();
2889 for (int i = 0; i < effective_num_shards; i++) {
2890 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2891 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2893 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2902 int RGWBucketSyncStatusManager::init_sync_status()
2904 list<RGWCoroutinesStack *> stacks;
2906 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2907 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2908 RGWRemoteBucketLog *l = iter->second;
2909 stack->call(l->init_sync_status_cr());
2911 stacks.push_back(stack);
2914 return cr_mgr.run(stacks);
2917 int RGWBucketSyncStatusManager::read_sync_status()
2919 list<RGWCoroutinesStack *> stacks;
2921 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2922 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2923 RGWRemoteBucketLog *l = iter->second;
2924 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2926 stacks.push_back(stack);
2929 int ret = cr_mgr.run(stacks);
2931 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2932 << bucket_str{bucket} << dendl;
2939 int RGWBucketSyncStatusManager::run()
2941 list<RGWCoroutinesStack *> stacks;
2943 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2944 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2945 RGWRemoteBucketLog *l = iter->second;
2946 stack->call(l->run_sync_cr());
2948 stacks.push_back(stack);
2951 int ret = cr_mgr.run(stacks);
2953 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2954 << bucket_str{bucket} << dendl;
2961 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2962 const rgw_bucket_shard& bs)
2964 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2968 // TODO: move into rgw_data_sync_trim.cc
2970 #define dout_prefix (*_dout << "data trim: ")
2974 /// return the marker that it's safe to trim up to
2975 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
2977 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2980 /// comparison operator for take_min_markers()
2981 bool operator<(const rgw_data_sync_marker& lhs,
2982 const rgw_data_sync_marker& rhs)
2984 // sort by stable marker
2985 return get_stable_marker(lhs) < get_stable_marker(rhs);
2988 /// populate the container starting with 'dest' with the minimum stable marker
2989 /// of each shard for all of the peers in [first, last)
2990 template <typename IterIn, typename IterOut>
2991 void take_min_markers(IterIn first, IterIn last, IterOut dest)
2993 if (first == last) {
2996 // initialize markers with the first peer's
2998 for (auto &shard : first->sync_markers) {
2999 *m = std::move(shard.second);
3002 // for remaining peers, replace with smaller markers
3003 for (auto p = first + 1; p != last; ++p) {
3005 for (auto &shard : p->sync_markers) {
3006 if (shard.second < *m) {
3007 *m = std::move(shard.second);
3014 } // anonymous namespace
3016 class DataLogTrimCR : public RGWCoroutine {
3018 RGWHTTPManager *http;
3019 const int num_shards;
3020 const std::string& zone_id; //< my zone id
3021 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3022 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3023 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3027 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3028 int num_shards, std::vector<std::string>& last_trim)
3029 : RGWCoroutine(store->ctx()), store(store), http(http),
3030 num_shards(num_shards),
3031 zone_id(store->get_zone().id),
3032 peer_status(store->zone_conn_map.size()),
3033 min_shard_markers(num_shards),
3034 last_trim(last_trim)
3037 int operate() override;
3040 int DataLogTrimCR::operate()
3043 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3044 set_status("fetching sync status");
3046 // query data sync status from each sync peer
3047 rgw_http_param_pair params[] = {
3049 { "status", nullptr },
3050 { "source-zone", zone_id.c_str() },
3051 { nullptr, nullptr }
3054 auto p = peer_status.begin();
3055 for (auto& c : store->zone_conn_map) {
3056 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3057 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3058 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3064 // must get a successful reply from all peers to consider trimming
3066 while (ret == 0 && num_spawned() > 0) {
3067 yield wait_for_child();
3073 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3074 return set_cr_error(ret);
3077 ldout(cct, 10) << "trimming log shards" << dendl;
3078 set_status("trimming log shards");
3080 // determine the minimum marker for each shard
3081 take_min_markers(peer_status.begin(), peer_status.end(),
3082 min_shard_markers.begin());
3084 for (int i = 0; i < num_shards; i++) {
3085 const auto& m = min_shard_markers[i];
3086 auto& stable = get_stable_marker(m);
3087 if (stable <= last_trim[i]) {
3090 ldout(cct, 10) << "trimming log shard " << i
3091 << " at marker=" << stable
3092 << " last_trim=" << last_trim[i] << dendl;
3093 using TrimCR = RGWSyncLogTrimCR;
3094 spawn(new TrimCR(store, store->data_log->get_oid(i),
3095 stable, &last_trim[i]),
3099 return set_cr_done();
3104 class DataLogTrimPollCR : public RGWCoroutine {
3106 RGWHTTPManager *http;
3107 const int num_shards;
3108 const utime_t interval; //< polling interval
3109 const std::string lock_oid; //< use first data log shard for lock
3110 const std::string lock_cookie;
3111 std::vector<std::string> last_trim; //< last trimmed marker per shard
3114 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3115 int num_shards, utime_t interval)
3116 : RGWCoroutine(store->ctx()), store(store), http(http),
3117 num_shards(num_shards), interval(interval),
3118 lock_oid(store->data_log->get_oid(0)),
3119 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3120 last_trim(num_shards)
3123 int operate() override;
3126 int DataLogTrimPollCR::operate()
3130 set_status("sleeping");
3133 // request a 'data_trim' lock that covers the entire wait interval to
3134 // prevent other gateways from attempting to trim for the duration
3135 set_status("acquiring trim lock");
3136 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3137 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3138 "data_trim", lock_cookie,
3141 // if the lock is already held, go back to sleep and try again later
3142 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3143 << interval.sec() << "s" << dendl;
3147 set_status("trimming");
3148 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3150 // note that the lock is not released. this is intentional, as it avoids
3151 // duplicating this work in other gateways
3157 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3158 RGWHTTPManager *http,
3159 int num_shards, utime_t interval)
3161 return new DataLogTrimPollCR(store, http, num_shards, interval);