X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_data_sync.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_data_sync.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=daaffb7cde9eb7b2c10b9eeb9490c7e9c581b588;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_data_sync.cc b/src/ceph/src/rgw/rgw_data_sync.cc deleted file mode 100644 index daaffb7..0000000 --- a/src/ceph/src/rgw/rgw_data_sync.cc +++ /dev/null @@ -1,3162 +0,0 @@ -#include - -#include "common/ceph_json.h" -#include "common/RWLock.h" -#include "common/RefCountedObj.h" -#include "common/WorkQueue.h" -#include "common/Throttle.h" -#include "common/errno.h" - -#include "rgw_common.h" -#include "rgw_rados.h" -#include "rgw_sync.h" -#include "rgw_data_sync.h" -#include "rgw_rest_conn.h" -#include "rgw_cr_rados.h" -#include "rgw_cr_rest.h" -#include "rgw_http_client.h" -#include "rgw_bucket.h" -#include "rgw_metadata.h" -#include "rgw_sync_module.h" - -#include "cls/lock/cls_lock_client.h" - -#include "auth/Crypto.h" - -#include - -#define dout_subsys ceph_subsys_rgw - -#undef dout_prefix -#define dout_prefix (*_dout << "data sync: ") - -static string datalog_sync_status_oid_prefix = "datalog.sync-status"; -static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; -static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; -static string bucket_status_oid_prefix = "bucket.sync-status"; - -class RGWSyncDebugLogger { - CephContext *cct; - string prefix; - - bool ended; - -public: - RGWSyncDebugLogger(CephContext *_cct, const string& source_zone, - const string& sync_type, const string& sync_stage, - const string& resource, bool log_start = true) { - init(_cct, source_zone, sync_type, sync_stage, resource, log_start); - } - RGWSyncDebugLogger() : cct(NULL), ended(false) {} - ~RGWSyncDebugLogger(); - - void init(CephContext *_cct, const string& source_zone, - const string& sync_type, const string& sync_stage, - const string& resource, bool log_start = true); - void log(const string& state); - void finish(int status); -}; - -void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone, - const string& sync_type, const string& sync_section, - const string& resource, bool log_start) -{ - cct = _cct; - ended = false; - string zone_str = source_zone.substr(0, 8); - prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource; - if (log_start) { - log("start"); - } -} - -RGWSyncDebugLogger::~RGWSyncDebugLogger() -{ - if (!ended) { - log("finish"); - } -} - -void RGWSyncDebugLogger::log(const string& state) -{ - ldout(cct, 5) << prefix << ":" << state << dendl; -} - -void RGWSyncDebugLogger::finish(int status) -{ - ended = true; - ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl; -} - -class RGWDataSyncDebugLogger : public RGWSyncDebugLogger { -public: - RGWDataSyncDebugLogger() {} - RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section, - const string& resource, bool log_start = true) { - init(sync_env, sync_section, resource, log_start); - } - void init(RGWDataSyncEnv *sync_env, const string& sync_section, - const string& resource, bool log_start = true) { - RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start); - } - -}; - -void rgw_datalog_info::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("num_objects", num_shards, obj); -} - -void rgw_datalog_entry::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("key", key, obj); - utime_t ut; - JSONDecoder::decode_json("timestamp", ut, obj); - timestamp = ut.to_real_time(); -} - -void rgw_datalog_shard_data::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("marker", marker, obj); - JSONDecoder::decode_json("truncated", truncated, obj); - JSONDecoder::decode_json("entries", entries, obj); -}; - -class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { - static constexpr int MAX_CONCURRENT_SHARDS = 16; - - RGWDataSyncEnv *env; - const int num_shards; - int shard_id{0};; - - map& markers; - - public: - RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards, - map& markers) - : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), - env(env), num_shards(num_shards), markers(markers) - {} - bool spawn_next() override; -}; - -bool RGWReadDataSyncStatusMarkersCR::spawn_next() -{ - if (shard_id >= num_shards) { - return false; - } - using CR = RGWSimpleRadosReadCR; - spawn(new CR(env->async_rados, env->store, - rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)), - &markers[shard_id]), - false); - shard_id++; - return true; -} - -class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - rgw_data_sync_status *sync_status; - -public: - RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - rgw_data_sync_status *_status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status) - {} - int operate() override; -}; - -int RGWReadDataSyncStatusCoroutine::operate() -{ - reenter(this) { - // read sync info - using ReadInfoCR = RGWSimpleRadosReadCR; - yield { - bool empty_on_enoent = false; // fail on ENOENT - call(new ReadInfoCR(sync_env->async_rados, sync_env->store, - rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), - &sync_status->sync_info, empty_on_enoent)); - } - if (retcode < 0) { - ldout(sync_env->cct, 4) << "failed to read sync status info with " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - // read shard markers - using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR; - yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards, - sync_status->sync_markers)); - if (retcode < 0) { - ldout(sync_env->cct, 4) << "failed to read sync status markers with " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - return set_cr_done(); - } - return 0; -} - -class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - RGWRESTReadResource *http_op; - - int shard_id; - RGWDataChangesLogInfo *shard_info; - -public: - RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env, - int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - http_op(NULL), - shard_id(_shard_id), - shard_info(_shard_info) { - } - - ~RGWReadRemoteDataLogShardInfoCR() override { - if (http_op) { - http_op->put(); - } - } - - int operate() override { - reenter(this) { - yield { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", shard_id); - rgw_http_param_pair pairs[] = { { "type" , "data" }, - { "id", buf }, - { "info" , NULL }, - { NULL, NULL } }; - - string p = "/admin/log/"; - - http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); - - http_op->set_user_info((void *)stack); - - int ret = http_op->aio_read(); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl; - log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; - return set_cr_error(ret); - } - - return io_block(0); - } - yield { - int ret = http_op->wait(shard_info); - if (ret < 0) { - return set_cr_error(ret); - } - return set_cr_done(); - } - } - return 0; - } -}; - -struct read_remote_data_log_response { - string marker; - bool truncated; - list entries; - - read_remote_data_log_response() : truncated(false) {} - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("marker", marker, obj); - JSONDecoder::decode_json("truncated", truncated, obj); - JSONDecoder::decode_json("entries", entries, obj); - }; -}; - -class RGWReadRemoteDataLogShardCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - RGWRESTReadResource *http_op; - - int shard_id; - string *pmarker; - list *entries; - bool *truncated; - - read_remote_data_log_response response; - -public: - RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, - int _shard_id, string *_pmarker, list *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - http_op(NULL), - shard_id(_shard_id), - pmarker(_pmarker), - entries(_entries), - truncated(_truncated) { - } - ~RGWReadRemoteDataLogShardCR() override { - if (http_op) { - http_op->put(); - } - } - - int operate() override { - reenter(this) { - yield { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", shard_id); - rgw_http_param_pair pairs[] = { { "type" , "data" }, - { "id", buf }, - { "marker", pmarker->c_str() }, - { "extra-info", "true" }, - { NULL, NULL } }; - - string p = "/admin/log/"; - - http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager); - - http_op->set_user_info((void *)stack); - - int ret = http_op->aio_read(); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl; - log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; - return set_cr_error(ret); - } - - return io_block(0); - } - yield { - int ret = http_op->wait(&response); - if (ret < 0) { - return set_cr_error(ret); - } - entries->clear(); - entries->swap(response.entries); - *pmarker = response.marker; - *truncated = response.truncated; - return set_cr_done(); - } - } - return 0; - } -}; - -class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR { - RGWDataSyncEnv *sync_env; - - int num_shards; - map *datalog_info; - - int shard_id; -#define READ_DATALOG_MAX_CONCURRENT 10 - -public: - RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env, - int _num_shards, - map *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), - sync_env(_sync_env), num_shards(_num_shards), - datalog_info(_datalog_info), shard_id(0) {} - bool spawn_next() override; -}; - -bool RGWReadRemoteDataLogInfoCR::spawn_next() { - if (shard_id >= num_shards) { - return false; - } - spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false); - shard_id++; - return true; -} - -class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine { - RGWDataSyncEnv *sync_env; - RGWRESTReadResource *http_op; - - int shard_id; - string marker; - uint32_t max_entries; - rgw_datalog_shard_data *result; - -public: - RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id, - const string& _marker, uint32_t _max_entries, - rgw_datalog_shard_data *_result) - : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL), - shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {} - - int send_request() override { - RGWRESTConn *conn = sync_env->conn; - RGWRados *store = sync_env->store; - - char buf[32]; - snprintf(buf, sizeof(buf), "%d", shard_id); - - char max_entries_buf[32]; - snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries); - - const char *marker_key = (marker.empty() ? "" : "marker"); - - rgw_http_param_pair pairs[] = { { "type", "data" }, - { "id", buf }, - { "max-entries", max_entries_buf }, - { marker_key, marker.c_str() }, - { NULL, NULL } }; - - string p = "/admin/log/"; - - http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager); - http_op->set_user_info((void *)stack); - - int ret = http_op->aio_read(); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; - log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; - http_op->put(); - return ret; - } - - return 0; - } - - int request_complete() override { - int ret = http_op->wait(result); - http_op->put(); - if (ret < 0 && ret != -ENOENT) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl; - return ret; - } - return 0; - } -}; - -class RGWListRemoteDataLogCR : public RGWShardCollectCR { - RGWDataSyncEnv *sync_env; - - map shards; - int max_entries_per_shard; - map *result; - - map::iterator iter; -#define READ_DATALOG_MAX_CONCURRENT 10 - -public: - RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env, - map& _shards, - int _max_entries_per_shard, - map *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), - sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard), - result(_result) { - shards.swap(_shards); - iter = shards.begin(); - } - bool spawn_next() override; -}; - -bool RGWListRemoteDataLogCR::spawn_next() { - if (iter == shards.end()) { - return false; - } - - spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false); - ++iter; - return true; -} - -class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { - static constexpr uint32_t lock_duration = 30; - RGWDataSyncEnv *sync_env; - RGWRados *store; - const rgw_pool& pool; - const uint32_t num_shards; - - string sync_status_oid; - - string lock_name; - string cookie; - rgw_data_sync_status *status; - map shards_info; -public: - RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards, - uint64_t instance_id, - rgw_data_sync_status *status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store), - pool(store->get_zone_params().log_pool), - num_shards(num_shards), status(status) { - lock_name = "sync_lock"; - - status->sync_info.instance_id = instance_id; - -#define COOKIE_LEN 16 - char buf[COOKIE_LEN + 1]; - - gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); - cookie = buf; - - sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone); - } - - int operate() override { - int ret; - reenter(this) { - using LockCR = RGWSimpleRadosLockCR; - yield call(new LockCR(sync_env->async_rados, store, - rgw_raw_obj{pool, sync_status_oid}, - lock_name, cookie, lock_duration)); - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; - return set_cr_error(retcode); - } - using WriteInfoCR = RGWSimpleRadosWriteCR; - yield call(new WriteInfoCR(sync_env->async_rados, store, - rgw_raw_obj{pool, sync_status_oid}, - status->sync_info)); - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl; - return set_cr_error(retcode); - } - - /* take lock again, we just recreated the object */ - yield call(new LockCR(sync_env->async_rados, store, - rgw_raw_obj{pool, sync_status_oid}, - lock_name, cookie, lock_duration)); - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; - return set_cr_error(retcode); - } - - /* fetch current position in logs */ - yield { - RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone); - if (!conn) { - ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl; - return set_cr_error(-EIO); - } - for (uint32_t i = 0; i < num_shards; i++) { - spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true); - } - } - while (collect(&ret, NULL)) { - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl; - return set_state(RGWCoroutine_Error); - } - yield; - } - yield { - for (uint32_t i = 0; i < num_shards; i++) { - RGWDataChangesLogInfo& info = shards_info[i]; - auto& marker = status->sync_markers[i]; - marker.next_step_marker = info.marker; - marker.timestamp = info.last_update; - const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i); - using WriteMarkerCR = RGWSimpleRadosWriteCR; - spawn(new WriteMarkerCR(sync_env->async_rados, store, - rgw_raw_obj{pool, oid}, marker), true); - } - } - while (collect(&ret, NULL)) { - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl; - return set_state(RGWCoroutine_Error); - } - yield; - } - - status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - yield call(new WriteInfoCR(sync_env->async_rados, store, - rgw_raw_obj{pool, sync_status_oid}, - status->sync_info)); - if (retcode < 0) { - ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl; - return set_cr_error(retcode); - } - yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, - rgw_raw_obj{pool, sync_status_oid}, - lock_name, cookie)); - return set_cr_done(); - } - return 0; - } -}; - -int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info) -{ - rgw_http_param_pair pairs[] = { { "type", "data" }, - { NULL, NULL } }; - - int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl; - return ret; - } - - ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl; - - return 0; -} - -int RGWRemoteDataLog::read_source_log_shards_info(map *shards_info) -{ - rgw_datalog_info log_info; - int ret = read_log_info(&log_info); - if (ret < 0) { - return ret; - } - - return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info)); -} - -int RGWRemoteDataLog::read_source_log_shards_next(map shard_markers, map *result) -{ - if (store->is_meta_master()) { - return 0; - } - - return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result)); -} - -int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module) -{ - sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module); - - if (initialized) { - return 0; - } - - int ret = http_manager.set_threaded(); - if (ret < 0) { - ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; - return ret; - } - - initialized = true; - - return 0; -} - -void RGWRemoteDataLog::finish() -{ - stop(); -} - -int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) -{ - // cannot run concurrently with run_sync(), so run in a separate manager - RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); - RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); - int ret = http_manager.set_threaded(); - if (ret < 0) { - ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; - return ret; - } - RGWDataSyncEnv sync_env_local = sync_env; - sync_env_local.http_manager = &http_manager; - ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status)); - http_manager.stop(); - return ret; -} - -int RGWRemoteDataLog::init_sync_status(int num_shards) -{ - rgw_data_sync_status sync_status; - RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); - RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); - int ret = http_manager.set_threaded(); - if (ret < 0) { - ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; - return ret; - } - RGWDataSyncEnv sync_env_local = sync_env; - sync_env_local.http_manager = &http_manager; - uint64_t instance_id; - get_random_bytes((char *)&instance_id, sizeof(instance_id)); - ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status)); - http_manager.stop(); - return ret; -} - -static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id) -{ - char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16]; - snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id); - return string(buf); -} - -struct bucket_instance_meta_info { - string key; - obj_version ver; - utime_t mtime; - RGWBucketInstanceMetadataObject data; - - bucket_instance_meta_info() {} - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("key", key, obj); - JSONDecoder::decode_json("ver", ver, obj); - JSONDecoder::decode_json("mtime", mtime, obj); - JSONDecoder::decode_json("data", data, obj); - } -}; - -class RGWListBucketIndexesCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - RGWRados *store; - - rgw_data_sync_status *sync_status; - int num_shards; - - int req_ret; - int ret; - - list result; - list::iterator iter; - - RGWShardedOmapCRManager *entries_index; - - string oid_prefix; - - string path; - bucket_instance_meta_info meta_info; - string key; - string s; - int i; - - bool failed; - -public: - RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env, - rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - store(sync_env->store), sync_status(_sync_status), - req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) { - oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; - path = "/admin/metadata/bucket.instance"; - num_shards = sync_status->sync_info.num_shards; - } - ~RGWListBucketIndexesCR() override { - delete entries_index; - } - - int operate() override { - reenter(this) { - yield { - string entrypoint = string("/admin/metadata/bucket.instance"); - /* FIXME: need a better scaling solution here, requires streaming output */ - call(new RGWReadRESTResourceCR >(store->ctx(), sync_env->conn, sync_env->http_manager, - entrypoint, NULL, &result)); - } - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; - return set_cr_error(retcode); - } - entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, - store->get_zone_params().log_pool, - oid_prefix); - yield; // yield so OmapAppendCRs can start - for (iter = result.begin(); iter != result.end(); ++iter) { - ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl; - - key = *iter; - - yield { - rgw_http_param_pair pairs[] = { { "key", key.c_str() }, - { NULL, NULL } }; - - call(new RGWReadRESTResourceCR(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info)); - } - - num_shards = meta_info.data.get_bucket_info().num_shards; - if (num_shards > 0) { - for (i = 0; i < num_shards; i++) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", i); - s = key + buf; - yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i)); - } - } else { - yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1)); - } - } - yield { - if (!entries_index->finish()) { - failed = true; - } - } - if (!failed) { - for (map::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) { - int shard_id = (int)iter->first; - rgw_data_sync_marker& marker = iter->second; - marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), - marker), true); - } - } else { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", - EIO, string("failed to build bucket instances map"))); - } - while (collect(&ret, NULL)) { - if (ret < 0) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", - -ret, string("failed to store sync status: ") + cpp_strerror(-ret))); - req_ret = ret; - } - yield; - } - drain_all(); - if (req_ret < 0) { - yield return set_cr_error(req_ret); - } - yield return set_cr_done(); - } - return 0; - } -}; - -#define DATA_SYNC_UPDATE_MARKER_WINDOW 1 - -class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWDataSyncEnv *sync_env; - - string marker_oid; - rgw_data_sync_marker sync_marker; - - map key_to_marker; - map marker_to_key; - - void handle_finish(const string& marker) override { - map::iterator iter = marker_to_key.find(marker); - if (iter == marker_to_key.end()) { - return; - } - key_to_marker.erase(iter->second); - reset_need_retry(iter->second); - marker_to_key.erase(iter); - } - -public: - RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env, - const string& _marker_oid, - const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW), - sync_env(_sync_env), - marker_oid(_marker_oid), - sync_marker(_marker) {} - - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { - sync_marker.marker = new_marker; - sync_marker.pos = index_pos; - - ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - RGWRados *store = sync_env->store; - - return new RGWSimpleRadosWriteCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, marker_oid), - sync_marker); - } - - /* - * create index from key -> marker, and from marker -> key - * this is useful so that we can insure that we only have one - * entry for any key that is used. This is needed when doing - * incremenatl sync of data, and we don't want to run multiple - * concurrent sync operations for the same bucket shard - */ - bool index_key_to_marker(const string& key, const string& marker) { - if (key_to_marker.find(key) != key_to_marker.end()) { - set_need_retry(key); - return false; - } - key_to_marker[key] = marker; - marker_to_key[marker] = key; - return true; - } -}; - -// ostream wrappers to print buckets without copying strings -struct bucket_str { - const rgw_bucket& b; - bucket_str(const rgw_bucket& b) : b(b) {} -}; -std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) { - auto& b = rhs.b; - if (!b.tenant.empty()) { - out << b.tenant << '/'; - } - out << b.name; - if (!b.bucket_id.empty()) { - out << ':' << b.bucket_id; - } - return out; -} - -struct bucket_shard_str { - const rgw_bucket_shard& bs; - bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {} -}; -std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { - auto& bs = rhs.bs; - out << bucket_str{bs.bucket}; - if (bs.shard_id >= 0) { - out << ':' << bs.shard_id; - } - return out; -} - -class RGWRunBucketSyncCoroutine : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - rgw_bucket_shard bs; - RGWBucketInfo bucket_info; - rgw_bucket_shard_sync_info sync_status; - RGWMetaSyncEnv meta_sync_env; - - RGWDataSyncDebugLogger logger; - const std::string status_oid; - - boost::intrusive_ptr lease_cr; - boost::intrusive_ptr lease_stack; - -public: - RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) { - logger.init(sync_env, "Bucket", bs.get_key()); - } - ~RGWRunBucketSyncCoroutine() override { - if (lease_cr) { - lease_cr->abort(); - } - } - - int operate() override; -}; - -class RGWDataSyncSingleEntryCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - string raw_key; - string entry_marker; - - rgw_bucket_shard bs; - - int sync_status; - - bufferlist md_bl; - - RGWDataSyncShardMarkerTrack *marker_tracker; - - boost::intrusive_ptr error_repo; - bool remove_from_repo; - - set keys; - -public: - RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, - const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - raw_key(_raw_key), entry_marker(_entry_marker), - sync_status(0), - marker_tracker(_marker_tracker), - error_repo(_error_repo), remove_from_repo(_remove_from_repo) { - set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; - } - - int operate() override { - reenter(this) { - do { - yield { - int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key, - &bs.bucket, &bs.shard_id); - if (ret < 0) { - return set_cr_error(-EIO); - } - if (marker_tracker) { - marker_tracker->reset_need_retry(raw_key); - } - call(new RGWRunBucketSyncCoroutine(sync_env, bs)); - } - } while (marker_tracker && marker_tracker->need_retry(raw_key)); - - sync_status = retcode; - - if (sync_status == -ENOENT) { - // this was added when 'tenant/' was added to datalog entries, because - // preexisting tenant buckets could never sync and would stay in the - // error_repo forever - ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry " - "for missing bucket " << raw_key << dendl; - sync_status = 0; - } - - if (sync_status < 0) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key, - -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); - if (retcode < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl; - } - if (error_repo && !error_repo->append(raw_key)) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl; - } - } else if (error_repo && remove_from_repo) { - keys = {raw_key}; - yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys)); - if (retcode < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo (" - << error_repo->get_obj() << " retcode=" << retcode << dendl; - } - } - /* FIXME: what do do in case of error */ - if (marker_tracker && !entry_marker.empty()) { - /* update marker */ - yield call(marker_tracker->finish(entry_marker)); - } - if (sync_status == 0) { - sync_status = retcode; - } - if (sync_status < 0) { - return set_cr_error(sync_status); - } - return set_cr_done(); - } - return 0; - } -}; - -#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20 -#define DATA_SYNC_MAX_ERR_ENTRIES 10 - -enum RemoteDatalogStatus { - RemoteNotTrimmed = 0, - RemoteTrimmed = 1, - RemoteMightTrimmed = 2 -}; - -class RGWDataSyncShardCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - rgw_pool pool; - - uint32_t shard_id; - rgw_data_sync_marker sync_marker; - - map entries; - map::iterator iter; - - string oid; - - RGWDataSyncShardMarkerTrack *marker_tracker; - - list log_entries; - list::iterator log_iter; - bool truncated; - - RGWDataChangesLogInfo shard_info; - string datalog_marker; - - RemoteDatalogStatus remote_trimmed; - Mutex inc_lock; - Cond inc_cond; - - boost::asio::coroutine incremental_cr; - boost::asio::coroutine full_cr; - - - set modified_shards; - set current_modified; - - set::iterator modified_iter; - - int total_entries; - - int spawn_window; - - bool *reset_backoff; - - set spawned_keys; - - boost::intrusive_ptr lease_cr; - boost::intrusive_ptr lease_stack; - string status_oid; - - - string error_oid; - RGWOmapAppend *error_repo; - map error_entries; - string error_marker; - int max_error_entries; - - ceph::real_time error_retry_time; - -#define RETRY_BACKOFF_SECS_MIN 60 -#define RETRY_BACKOFF_SECS_DEFAULT 60 -#define RETRY_BACKOFF_SECS_MAX 600 - uint32_t retry_backoff_secs; - - RGWDataSyncDebugLogger logger; -public: - RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env, - rgw_pool& _pool, - uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - pool(_pool), - shard_id(_shard_id), - sync_marker(_marker), - marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"), - total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), - lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), - retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) { - set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; - status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); - error_oid = status_oid + ".retry"; - - logger.init(sync_env, "DataShard", status_oid); - } - - ~RGWDataSyncShardCR() override { - delete marker_tracker; - if (lease_cr) { - lease_cr->abort(); - } - if (error_repo) { - error_repo->put(); - } - } - - void append_modified_shards(set& keys) { - Mutex::Locker l(inc_lock); - modified_shards.insert(keys.begin(), keys.end()); - } - - void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) { - delete marker_tracker; - marker_tracker = mt; - } - - int operate() override { - int r; - while (true) { - switch (sync_marker.state) { - case rgw_data_sync_marker::FullSync: - r = full_sync(); - if (r < 0) { - ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl; - return set_cr_error(r); - } - return 0; - case rgw_data_sync_marker::IncrementalSync: - r = incremental_sync(); - if (r < 0) { - ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl; - return set_cr_error(r); - } - return 0; - default: - return set_cr_error(-EIO); - } - } - return 0; - } - - void init_lease_cr() { - set_status("acquiring sync lock"); - uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; - string lock_name = "sync_lock"; - if (lease_cr) { - lease_cr->abort(); - } - RGWRados *store = sync_env->store; - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, status_oid), - lock_name, lock_duration, this)); - lease_stack.reset(spawn(lease_cr.get(), false)); - } - - int full_sync() { -#define OMAP_GET_MAX_ENTRIES 100 - int max_entries = OMAP_GET_MAX_ENTRIES; - reenter(&full_cr) { - yield init_lease_cr(); - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - ldout(cct, 5) << "lease cr failed, done early " << dendl; - set_status("lease lock failed, early abort"); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - logger.log("full sync"); - oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker)); - total_entries = sync_marker.pos; - do { - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - iter = entries.begin(); - for (; iter != entries.end(); ++iter) { - ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl; - total_entries++; - if (!marker_tracker->start(iter->first, total_entries, real_time())) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; - } else { - // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false); - if (retcode < 0) { - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - } - sync_marker.marker = iter->first; - } - } while ((int)entries.size() == max_entries); - - lease_cr->go_down(); - drain_all(); - - yield { - /* update marker to reflect we're done with full sync */ - sync_marker.state = rgw_data_sync_marker::IncrementalSync; - sync_marker.marker = sync_marker.next_step_marker; - sync_marker.next_step_marker.clear(); - RGWRados *store = sync_env->store; - call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, status_oid), - sync_marker)); - } - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; - lease_cr->go_down(); - return set_cr_error(retcode); - } - } - return 0; - } - - int incremental_sync() { - reenter(&incremental_cr) { - yield init_lease_cr(); - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - ldout(cct, 5) << "lease cr failed, done early " << dendl; - set_status("lease lock failed, early abort"); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - set_status("lease acquired"); - error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, - rgw_raw_obj(pool, error_oid), - 1 /* no buffer */); - error_repo->get(); - spawn(error_repo, false); - logger.log("inc sync"); - set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker)); - do { - current_modified.clear(); - inc_lock.Lock(); - current_modified.swap(modified_shards); - inc_lock.Unlock(); - - /* process out of band updates */ - for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { - yield { - ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false); - } - } - - /* process bucket shards that previously failed */ - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), - error_marker, &error_entries, - max_error_entries)); - ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl; - iter = error_entries.begin(); - for (; iter != error_entries.end(); ++iter) { - ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false); - error_marker = iter->first; - } - if ((int)error_entries.size() != max_error_entries) { - if (error_marker.empty() && error_entries.empty()) { - /* the retry repo is empty, we back off a bit before calling it again */ - retry_backoff_secs *= 2; - if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) { - retry_backoff_secs = RETRY_BACKOFF_SECS_MAX; - } - } else { - retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; - } - error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs); - error_marker.clear(); - } - - - yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl; - stop_spawned_services(); - drain_all(); - return set_cr_error(retcode); - } - datalog_marker = shard_info.marker; - remote_trimmed = RemoteNotTrimmed; -#define INCREMENTAL_MAX_ENTRIES 100 - ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (datalog_marker > sync_marker.marker) { - spawned_keys.clear(); - if (sync_marker.marker.empty()) - remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed; - yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl; - stop_spawned_services(); - drain_all(); - return set_cr_error(retcode); - } - if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty()) - remote_trimmed = RemoteTrimmed; - else - remote_trimmed = RemoteNotTrimmed; - for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { - ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl; - if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) { - ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl; - marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); - continue; - } - if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl; - } else { - /* - * don't spawn the same key more than once. We can do that as long as we don't yield - */ - if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) { - spawned_keys.insert(log_iter->entry.key); - spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false); - if (retcode < 0) { - stop_spawned_services(); - drain_all(); - return set_cr_error(retcode); - } - } - } - } - while ((int)num_spawned() > spawn_window) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - int ret; - while (collect(&ret, lease_stack.get())) { - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; - /* we have reported this error */ - } - /* not waiting for child here */ - } - } - } - ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) { -#define INCREMENTAL_INTERVAL 20 - yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); - } - } while (true); - } - return 0; - } - void stop_spawned_services() { - lease_cr->go_down(); - if (error_repo) { - error_repo->finish(); - error_repo->put(); - error_repo = NULL; - } - } -}; - -class RGWDataSyncShardControlCR : public RGWBackoffControlCR { - RGWDataSyncEnv *sync_env; - - rgw_pool pool; - - uint32_t shard_id; - rgw_data_sync_marker sync_marker; - -public: - RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool, - uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false), - sync_env(_sync_env), - pool(_pool), - shard_id(_shard_id), - sync_marker(_marker) { - } - - RGWCoroutine *alloc_cr() override { - return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr()); - } - - RGWCoroutine *alloc_finisher_cr() override { - RGWRados *store = sync_env->store; - return new RGWSimpleRadosReadCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)), - &sync_marker); - } - - void append_modified_shards(set& keys) { - Mutex::Locker l(cr_lock()); - - RGWDataSyncShardCR *cr = static_cast(get_cr()); - if (!cr) { - return; - } - - cr->append_modified_shards(keys); - } -}; - -class RGWDataSyncCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - uint32_t num_shards; - - rgw_data_sync_status sync_status; - - RGWDataSyncShardMarkerTrack *marker_tracker; - - Mutex shard_crs_lock; - map shard_crs; - - bool *reset_backoff; - - RGWDataSyncDebugLogger logger; - - RGWDataSyncModule *data_sync_module{nullptr}; -public: - RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - num_shards(_num_shards), - marker_tracker(NULL), - shard_crs_lock("RGWDataSyncCR::shard_crs_lock"), - reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") { - - } - - ~RGWDataSyncCR() override { - for (auto iter : shard_crs) { - iter.second->put(); - } - } - - int operate() override { - reenter(this) { - - /* read sync status */ - yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status)); - - data_sync_module = sync_env->sync_module->get_data_handler(); - - if (retcode < 0 && retcode != -ENOENT) { - ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - - /* state: init status */ - if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) { - ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl; - sync_status.sync_info.num_shards = num_shards; - uint64_t instance_id; - get_random_bytes((char *)&instance_id, sizeof(instance_id)); - yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - // sets state = StateBuildingFullSyncMaps - - *reset_backoff = true; - } - - data_sync_module->init(sync_env, sync_status.sync_info.instance_id); - - if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { - /* call sync module init here */ - yield call(data_sync_module->init_sync(sync_env)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - /* state: building full sync maps */ - ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl; - yield call(new RGWListBucketIndexesCR(sync_env, &sync_status)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - sync_status.sync_info.state = rgw_data_sync_info::StateSync; - - /* update new state */ - yield call(set_sync_info_cr()); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - - *reset_backoff = true; - } - - yield { - if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) { - for (map::iterator iter = sync_status.sync_markers.begin(); - iter != sync_status.sync_markers.end(); ++iter) { - RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool, - iter->first, iter->second); - cr->get(); - shard_crs_lock.Lock(); - shard_crs[iter->first] = cr; - shard_crs_lock.Unlock(); - spawn(cr, true); - } - } - } - - return set_cr_done(); - } - return 0; - } - - RGWCoroutine *set_sync_info_cr() { - RGWRados *store = sync_env->store; - return new RGWSimpleRadosWriteCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)), - sync_status.sync_info); - } - - void wakeup(int shard_id, set& keys) { - Mutex::Locker l(shard_crs_lock); - map::iterator iter = shard_crs.find(shard_id); - if (iter == shard_crs.end()) { - return; - } - iter->second->append_modified_shards(keys); - iter->second->wakeup(); - } -}; - -class RGWDefaultDataSyncModule : public RGWDataSyncModule { -public: - RGWDefaultDataSyncModule() {} - - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, - rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; -}; - -class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance { - RGWDefaultDataSyncModule data_handler; -public: - RGWDefaultSyncModuleInstance() {} - RGWDataSyncModule *get_data_handler() override { - return &data_handler; - } -}; - -int RGWDefaultSyncModule::create_instance(CephContext *cct, map& config, RGWSyncModuleInstanceRef *instance) -{ - instance->reset(new RGWDefaultSyncModuleInstance()); - return 0; -} - -RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) -{ - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info, - key, versioned_epoch, - true, zones_trace); -} - -RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, - real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) -{ - return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, versioned, versioned_epoch, - NULL, NULL, false, &mtime, zones_trace); -} - -RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, - rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) -{ - return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, versioned, versioned_epoch, - &owner.id, &owner.display_name, true, &mtime, zones_trace); -} - -class RGWDataSyncControlCR : public RGWBackoffControlCR -{ - RGWDataSyncEnv *sync_env; - uint32_t num_shards; - - static constexpr bool exit_on_error = false; // retry on all errors -public: - RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error), - sync_env(_sync_env), num_shards(_num_shards) { - } - - RGWCoroutine *alloc_cr() override { - return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr()); - } - - void wakeup(int shard_id, set& keys) { - Mutex& m = cr_lock(); - - m.Lock(); - RGWDataSyncCR *cr = static_cast(get_cr()); - if (!cr) { - m.Unlock(); - return; - } - - cr->get(); - m.Unlock(); - - if (cr) { - cr->wakeup(shard_id, keys); - } - - cr->put(); - } -}; - -void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { - RWLock::RLocker rl(lock); - if (!data_sync_cr) { - return; - } - data_sync_cr->wakeup(shard_id, keys); -} - -int RGWRemoteDataLog::run_sync(int num_shards) -{ - lock.get_write(); - data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards); - data_sync_cr->get(); // run() will drop a ref, so take another - lock.unlock(); - - int r = run(data_sync_cr); - - lock.get_write(); - data_sync_cr->put(); - data_sync_cr = NULL; - lock.unlock(); - - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl; - return r; - } - return 0; -} - -int RGWDataSyncStatusManager::init() -{ - auto zone_def_iter = store->zone_by_id.find(source_zone); - if (zone_def_iter == store->zone_by_id.end()) { - ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl; - return -EIO; - } - - auto& zone_def = zone_def_iter->second; - - if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) { - return -ENOTSUP; - } - - RGWZoneParams& zone_params = store->get_zone_params(); - - sync_module = store->get_sync_module(); - - conn = store->get_zone_conn_by_id(source_zone); - if (!conn) { - ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl; - return -EINVAL; - } - - error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); - - int r = source_log.init(source_zone, conn, error_logger, sync_module); - if (r < 0) { - lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl; - finalize(); - return r; - } - - rgw_datalog_info datalog_info; - r = source_log.read_log_info(&datalog_info); - if (r < 0) { - ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl; - finalize(); - return r; - } - - num_shards = datalog_info.num_shards; - - for (int i = 0; i < num_shards; i++) { - shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i)); - } - - return 0; -} - -void RGWDataSyncStatusManager::finalize() -{ - delete error_logger; - error_logger = nullptr; -} - -string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone) -{ - char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16]; - snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str()); - - return string(buf); -} - -string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id) -{ - char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16]; - snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id); - - return string(buf); -} - -int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& bucket, int shard_id, - RGWSyncErrorLogger *_error_logger, - RGWSyncModuleInstanceRef& _sync_module) -{ - conn = _conn; - source_zone = _source_zone; - bs.bucket = bucket; - bs.shard_id = shard_id; - - sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module); - - return 0; -} - -struct bucket_index_marker_info { - string bucket_ver; - string master_ver; - string max_marker; - bool syncstopped{false}; - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); - JSONDecoder::decode_json("master_ver", master_ver, obj); - JSONDecoder::decode_json("max_marker", max_marker, obj); - JSONDecoder::decode_json("syncstopped", syncstopped, obj); - } -}; - -class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - const string instance_key; - - bucket_index_marker_info *info; - -public: - RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, - bucket_index_marker_info *_info) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - instance_key(bs.get_key()), info(_info) {} - - int operate() override { - reenter(this) { - yield { - rgw_http_param_pair pairs[] = { { "type" , "bucket-index" }, - { "bucket-instance", instance_key.c_str() }, - { "info" , NULL }, - { NULL, NULL } }; - - string p = "/admin/log/"; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info)); - } - if (retcode < 0) { - return set_cr_error(retcode); - } - return set_cr_done(); - } - return 0; - } -}; - -class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - rgw_bucket_shard bs; - const string sync_status_oid; - - rgw_bucket_shard_sync_info& status; - - bucket_index_marker_info info; -public: - RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, - rgw_bucket_shard_sync_info& _status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), - status(_status) - {} - - int operate() override { - reenter(this) { - /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info)); - if (retcode < 0 && retcode != -ENOENT) { - ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; - return set_cr_error(retcode); - } - yield { - auto store = sync_env->store; - rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid); - - if (info.syncstopped) { - call(new RGWRadosRemoveCR(store, obj)); - } else { - status.state = rgw_bucket_shard_sync_info::StateFullSync; - status.inc_marker.position = info.max_marker; - map attrs; - status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs)); - } - } - return set_cr_done(); - } - return 0; - } -}; - -RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() -{ - return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status); -} - -template -static void decode_attr(CephContext *cct, map& attrs, const string& attr_name, T *val) -{ - map::iterator iter = attrs.find(attr_name); - if (iter == attrs.end()) { - *val = T(); - return; - } - - bufferlist::iterator biter = iter->second.begin(); - try { - ::decode(*val, biter); - } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl; - } -} - -void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map& attrs) -{ - decode_attr(cct, attrs, "state", &state); - decode_attr(cct, attrs, "full_marker", &full_marker); - decode_attr(cct, attrs, "inc_marker", &inc_marker); -} - -void rgw_bucket_shard_sync_info::encode_all_attrs(map& attrs) -{ - encode_state_attr(attrs); - full_marker.encode_attr(attrs); - inc_marker.encode_attr(attrs); -} - -void rgw_bucket_shard_sync_info::encode_state_attr(map& attrs) -{ - ::encode(state, attrs["state"]); -} - -void rgw_bucket_shard_full_sync_marker::encode_attr(map& attrs) -{ - ::encode(*this, attrs["full_marker"]); -} - -void rgw_bucket_shard_inc_sync_marker::encode_attr(map& attrs) -{ - ::encode(*this, attrs["inc_marker"]); -} - -class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - string oid; - rgw_bucket_shard_sync_info *status; - - map attrs; -public: - RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, - rgw_bucket_shard_sync_info *_status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), - status(_status) {} - int operate() override; -}; - -int RGWReadBucketSyncStatusCoroutine::operate() -{ - reenter(this) { - yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store, - rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid), - &attrs)); - if (retcode == -ENOENT) { - *status = rgw_bucket_shard_sync_info(); - return set_cr_done(); - } - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl; - return set_cr_error(retcode); - } - status->decode_from_attrs(sync_env->cct, attrs); - return set_cr_done(); - } - return 0; -} -RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) -{ - return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status); -} - -RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { - delete iter->second; - } - delete error_logger; -} - - -void rgw_bucket_entry_owner::decode_json(JSONObj *obj) -{ - JSONDecoder::decode_json("ID", id, obj); - JSONDecoder::decode_json("DisplayName", display_name, obj); -} - -struct bucket_list_entry { - bool delete_marker; - rgw_obj_key key; - bool is_latest; - real_time mtime; - string etag; - uint64_t size; - string storage_class; - rgw_bucket_entry_owner owner; - uint64_t versioned_epoch; - string rgw_tag; - - bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {} - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj); - JSONDecoder::decode_json("Key", key.name, obj); - JSONDecoder::decode_json("VersionId", key.instance, obj); - JSONDecoder::decode_json("IsLatest", is_latest, obj); - string mtime_str; - JSONDecoder::decode_json("RgwxMtime", mtime_str, obj); - - struct tm t; - uint32_t nsec; - if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) { - ceph_timespec ts; - ts.tv_sec = (uint64_t)internal_timegm(&t); - ts.tv_nsec = nsec; - mtime = real_clock::from_ceph_timespec(ts); - } - JSONDecoder::decode_json("ETag", etag, obj); - JSONDecoder::decode_json("Size", size, obj); - JSONDecoder::decode_json("StorageClass", storage_class, obj); - JSONDecoder::decode_json("Owner", owner, obj); - JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj); - JSONDecoder::decode_json("RgwxTag", rgw_tag, obj); - } -}; - -struct bucket_list_result { - string name; - string prefix; - string key_marker; - string version_id_marker; - int max_keys; - bool is_truncated; - list entries; - - bucket_list_result() : max_keys(0), is_truncated(false) {} - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("Name", name, obj); - JSONDecoder::decode_json("Prefix", prefix, obj); - JSONDecoder::decode_json("KeyMarker", key_marker, obj); - JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj); - JSONDecoder::decode_json("MaxKeys", max_keys, obj); - JSONDecoder::decode_json("IsTruncated", is_truncated, obj); - JSONDecoder::decode_json("Entries", entries, obj); - } -}; - -class RGWListBucketShardCR: public RGWCoroutine { - RGWDataSyncEnv *sync_env; - const rgw_bucket_shard& bs; - const string instance_key; - rgw_obj_key marker_position; - - bucket_list_result *result; - -public: - RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, - rgw_obj_key& _marker_position, bucket_list_result *_result) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - instance_key(bs.get_key()), marker_position(_marker_position), - result(_result) {} - - int operate() override { - reenter(this) { - yield { - rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() }, - { "versions" , NULL }, - { "format" , "json" }, - { "objs-container" , "true" }, - { "key-marker" , marker_position.name.c_str() }, - { "version-id-marker" , marker_position.instance.c_str() }, - { NULL, NULL } }; - // don't include tenant in the url, it's already part of instance_key - string p = string("/") + bs.bucket.name; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result)); - } - if (retcode < 0) { - return set_cr_error(retcode); - } - return set_cr_done(); - } - return 0; - } -}; - -class RGWListBucketIndexLogCR: public RGWCoroutine { - RGWDataSyncEnv *sync_env; - const string instance_key; - string marker; - - list *result; - -public: - RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, - string& _marker, list *_result) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - instance_key(bs.get_key()), marker(_marker), result(_result) {} - - int operate() override { - reenter(this) { - yield { - rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() }, - { "format" , "json" }, - { "marker" , marker.c_str() }, - { "type", "bucket-index" }, - { NULL, NULL } }; - - call(new RGWReadRESTResourceCR >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result)); - } - if (retcode < 0) { - return set_cr_error(retcode); - } - return set_cr_done(); - } - return 0; - } -}; - -#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 - -class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWDataSyncEnv *sync_env; - - string marker_oid; - rgw_bucket_shard_full_sync_marker sync_marker; - -public: - RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env, - const string& _marker_oid, - const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), - sync_env(_sync_env), - marker_oid(_marker_oid), - sync_marker(_marker) {} - - RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override { - sync_marker.position = new_marker; - sync_marker.count = index_pos; - - map attrs; - sync_marker.encode_attr(attrs); - - RGWRados *store = sync_env->store; - - ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, marker_oid), - attrs); - } -}; - -class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { - RGWDataSyncEnv *sync_env; - - string marker_oid; - rgw_bucket_shard_inc_sync_marker sync_marker; - - map key_to_marker; - map marker_to_key; - - void handle_finish(const string& marker) override { - map::iterator iter = marker_to_key.find(marker); - if (iter == marker_to_key.end()) { - return; - } - key_to_marker.erase(iter->second); - reset_need_retry(iter->second); - marker_to_key.erase(iter); - } - -public: - RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env, - const string& _marker_oid, - const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), - sync_env(_sync_env), - marker_oid(_marker_oid), - sync_marker(_marker) {} - - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { - sync_marker.position = new_marker; - - map attrs; - sync_marker.encode_attr(attrs); - - RGWRados *store = sync_env->store; - - ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, - store, - rgw_raw_obj(store->get_zone_params().log_pool, marker_oid), - attrs); - } - - /* - * create index from key -> , and from marker -> key - * this is useful so that we can insure that we only have one - * entry for any key that is used. This is needed when doing - * incremenatl sync of data, and we don't want to run multiple - * concurrent sync operations for the same bucket shard - * Also, we should make sure that we don't run concurrent operations on the same key with - * different ops. - */ - bool index_key_to_marker(const rgw_obj_key& key, const string& marker) { - if (key_to_marker.find(key) != key_to_marker.end()) { - set_need_retry(key); - return false; - } - key_to_marker[key] = marker; - marker_to_key[marker] = key; - return true; - } - - bool can_do_op(const rgw_obj_key& key) { - return (key_to_marker.find(key) == key_to_marker.end()); - } -}; - -template -class RGWBucketSyncSingleEntryCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - - RGWBucketInfo *bucket_info; - const rgw_bucket_shard& bs; - - rgw_obj_key key; - bool versioned; - uint64_t versioned_epoch; - rgw_bucket_entry_owner owner; - real_time timestamp; - RGWModifyOp op; - RGWPendingState op_state; - - T entry_marker; - RGWSyncShardMarkerTrack *marker_tracker; - - int sync_status; - - stringstream error_ss; - - RGWDataSyncDebugLogger logger; - - bool error_injection; - - RGWDataSyncModule *data_sync_module; - - rgw_zone_set zones_trace; - -public: - RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo *_bucket_info, - const rgw_bucket_shard& bs, - const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch, - real_time& _timestamp, - const rgw_bucket_entry_owner& _owner, - RGWModifyOp _op, RGWPendingState _op_state, - const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - bucket_info(_bucket_info), bs(bs), - key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), - owner(_owner), - timestamp(_timestamp), op(_op), - op_state(_op_state), - entry_marker(_entry_marker), - marker_tracker(_marker_tracker), - sync_status(0){ - stringstream ss; - ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]"; - set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; - ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl; - set_status("init"); - - logger.init(sync_env, "Object", ss.str()); - - error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0); - - data_sync_module = sync_env->sync_module->get_data_handler(); - - zones_trace = _zones_trace; - zones_trace.insert(sync_env->store->get_zone().id); - } - - int operate() override { - reenter(this) { - /* skip entries that are not complete */ - if (op_state != CLS_RGW_STATE_COMPLETE) { - goto done; - } - do { - yield { - marker_tracker->reset_need_retry(key); - if (key.name.empty()) { - /* shouldn't happen */ - set_status("skipping empty entry"); - ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl; - goto done; - } - if (error_injection && - rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) { - ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl; - retcode = -EIO; - } else if (op == CLS_RGW_OP_ADD || - op == CLS_RGW_OP_LINK_OLH) { - if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") { - set_status("skipping entry"); - ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl; - goto done; - - } - set_status("syncing obj"); - ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - logger.log("fetch"); - call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace)); - } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { - set_status("removing obj"); - if (op == CLS_RGW_OP_UNLINK_INSTANCE) { - versioned = true; - } - logger.log("remove"); - call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace)); - } else if (op == CLS_RGW_OP_LINK_OLH_DM) { - logger.log("creating delete marker"); - set_status("creating delete marker"); - ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace)); - } - } - } while (marker_tracker->need_retry(key)); - { - stringstream ss; - if (retcode >= 0) { - ss << "done"; - } else { - ss << "done, retcode=" << retcode; - } - logger.log(ss.str()); - } - - if (retcode < 0 && retcode != -ENOENT) { - set_status() << "failed to sync obj; retcode=" << retcode; - ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " - << bucket_shard_str{bs} << "/" << key.name << dendl; - error_ss << bucket_shard_str{bs} << "/" << key.name; - sync_status = retcode; - } - if (!error_ss.str().empty()) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object")); - } -done: - if (sync_status == 0) { - /* update marker */ - set_status() << "calling marker_tracker->finish(" << entry_marker << ")"; - yield call(marker_tracker->finish(entry_marker)); - sync_status = retcode; - } - if (sync_status < 0) { - return set_cr_error(sync_status); - } - return set_cr_done(); - } - return 0; - } -}; - -#define BUCKET_SYNC_SPAWN_WINDOW 20 - -class RGWBucketShardFullSyncCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - const rgw_bucket_shard& bs; - RGWBucketInfo *bucket_info; - boost::intrusive_ptr lease_cr; - bucket_list_result list_result; - list::iterator entries_iter; - rgw_bucket_shard_full_sync_marker& full_marker; - RGWBucketFullSyncShardMarkerTrack marker_tracker; - rgw_obj_key list_marker; - bucket_list_entry *entry{nullptr}; - RGWModifyOp op{CLS_RGW_OP_ADD}; - - int total_entries{0}; - - int sync_status{0}; - - const string& status_oid; - - RGWDataSyncDebugLogger logger; - rgw_zone_set zones_trace; -public: - RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, - RGWBucketInfo *_bucket_info, - const std::string& status_oid, - RGWContinuousLeaseCR *lease_cr, - rgw_bucket_shard_full_sync_marker& _full_marker) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker), - marker_tracker(sync_env, status_oid, full_marker), - status_oid(status_oid) { - logger.init(sync_env, "BucketFull", bs.get_key()); - zones_trace.insert(sync_env->source_zone); - } - - int operate() override; -}; - -int RGWBucketShardFullSyncCR::operate() -{ - int ret; - reenter(this) { - list_marker = full_marker.position; - - total_entries = full_marker.count; - do { - if (!lease_cr->is_locked()) { - drain_all(); - return set_cr_error(-ECANCELED); - } - set_status("listing remote bucket"); - ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl; - yield call(new RGWListBucketShardCR(sync_env, bs, list_marker, - &list_result)); - if (retcode < 0 && retcode != -ENOENT) { - set_status("failed bucket listing, going down"); - drain_all(); - return set_cr_error(retcode); - } - entries_iter = list_result.entries.begin(); - for (; entries_iter != list_result.entries.end(); ++entries_iter) { - if (!lease_cr->is_locked()) { - drain_all(); - return set_cr_error(-ECANCELED); - } - ldout(sync_env->cct, 20) << "[full sync] syncing object: " - << bucket_shard_str{bs} << "/" << entries_iter->key << dendl; - entry = &(*entries_iter); - total_entries++; - list_marker = entries_iter->key; - if (!marker_tracker.start(entry->key, total_entries, real_time())) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl; - } else { - op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); - using SyncCR = RGWBucketSyncSingleEntryCR; - yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key, - false, /* versioned, only matters for object removal */ - entry->versioned_epoch, entry->mtime, - entry->owner, op, CLS_RGW_STATE_COMPLETE, - entry->key, &marker_tracker, zones_trace), - false); - } - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; - sync_status = ret; - /* we have reported this error */ - } - } - } - } - } while (list_result.is_truncated && sync_status == 0); - set_status("done iterating over all objects"); - /* wait for all operations to complete */ - while (num_spawned()) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; - sync_status = ret; - /* we have reported this error */ - } - } - } - if (!lease_cr->is_locked()) { - return set_cr_error(-ECANCELED); - } - /* update sync state to incremental */ - if (sync_status == 0) { - yield { - rgw_bucket_shard_sync_info sync_status; - sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; - map attrs; - sync_status.encode_state_attr(attrs); - RGWRados *store = sync_env->store; - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, status_oid), - attrs)); - } - } else { - ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl; - } - if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */ - ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " - << bucket_shard_str{bs} << " retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - if (sync_status < 0) { - return set_cr_error(sync_status); - } - return set_cr_done(); - } - return 0; -} - -class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { - RGWDataSyncEnv *sync_env; - const rgw_bucket_shard& bs; - RGWBucketInfo *bucket_info; - boost::intrusive_ptr lease_cr; - list list_result; - list::iterator entries_iter; - map, pair > squash_map; - rgw_bucket_shard_inc_sync_marker& inc_marker; - rgw_obj_key key; - rgw_bi_log_entry *entry{nullptr}; - RGWBucketIncSyncShardMarkerTrack marker_tracker; - bool updated_status{false}; - const string& status_oid; - const string& zone_id; - ceph::real_time sync_modify_time; - - string cur_id; - - RGWDataSyncDebugLogger logger; - - int sync_status{0}; - bool syncstopped{false}; - -public: - RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, - RGWBucketInfo *_bucket_info, - const std::string& status_oid, - RGWContinuousLeaseCR *lease_cr, - rgw_bucket_shard_inc_sync_marker& _inc_marker) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker), - marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){ - set_description() << "bucket shard incremental sync bucket=" - << bucket_shard_str{bs}; - set_status("init"); - logger.init(sync_env, "BucketInc", bs.get_key()); - } - - int operate() override; -}; - -int RGWBucketShardIncrementalSyncCR::operate() -{ - int ret; - reenter(this) { - do { - if (!lease_cr->is_locked()) { - drain_all(); - return set_cr_error(-ECANCELED); - } - ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl; - set_status() << "listing bilog; position=" << inc_marker.position; - yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position, - &list_result)); - if (retcode < 0 && retcode != -ENOENT ) { - drain_all(); - if (!syncstopped) { - /* wait for all operations to complete */ - return set_cr_error(retcode); - } else { - /* no need to retry */ - break; - } - } - squash_map.clear(); - for (auto& e : list_result) { - if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) { - ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl; - sync_modify_time = e.timestamp; - syncstopped = true; - continue; - } - if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) { - ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl; - sync_modify_time = e.timestamp; - syncstopped = false; - continue; - } - if (e.state != CLS_RGW_STATE_COMPLETE) { - continue; - } - if (e.zones_trace.find(zone_id) != e.zones_trace.end()) { - continue; - } - auto& squash_entry = squash_map[make_pair(e.object, e.instance)]; - if (squash_entry.first <= e.timestamp) { - squash_entry = make_pair<>(e.timestamp, e.op); - } - } - - entries_iter = list_result.begin(); - for (; entries_iter != list_result.end(); ++entries_iter) { - if (!lease_cr->is_locked()) { - drain_all(); - return set_cr_error(-ECANCELED); - } - entry = &(*entries_iter); - { - ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */ - if (p < 0) { - cur_id = entry->id; - } else { - cur_id = entry->id.substr(p + 1); - } - } - inc_marker.position = cur_id; - - if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) { - ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - - if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) { - set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry"; - ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - - ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl; - - if (!key.ns.empty()) { - set_status() << "skipping entry in namespace: " << entry->object; - ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - - set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op; - if (entry->op == CLS_RGW_OP_CANCEL) { - set_status() << "canceled operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " - << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - if (entry->state != CLS_RGW_STATE_COMPLETE) { - set_status() << "non-complete operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " - << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) { - set_status() << "redundant operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " - <timestamp); - continue; - } - if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) { - set_status() << "squashed operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " - << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl; - /* not updating high marker though */ - continue; - } - ldout(sync_env->cct, 20) << "[inc sync] syncing object: " - << bucket_shard_str{bs} << "/" << key << dendl; - updated_status = false; - while (!marker_tracker.can_do_op(key)) { - if (!updated_status) { - set_status() << "can't do op, conflicting inflight operation"; - updated_status = true; - } - ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl; - sync_status = ret; - /* we have reported this error */ - } - } - } - if (!marker_tracker.index_key_to_marker(key, cur_id)) { - set_status() << "can't do op, sync already in progress for object"; - ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl; - marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); - continue; - } - // yield { - set_status() << "start object sync"; - if (!marker_tracker.start(cur_id, 0, entry->timestamp)) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl; - } else { - uint64_t versioned_epoch = 0; - rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name); - if (entry->ver.pool < 0) { - versioned_epoch = entry->ver.epoch; - } - ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl; - using SyncCR = RGWBucketSyncSingleEntryCR; - spawn(new SyncCR(sync_env, bucket_info, bs, key, - entry->is_versioned(), versioned_epoch, - entry->timestamp, owner, entry->op, entry->state, - cur_id, &marker_tracker, entry->zones_trace), - false); - } - // } - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; - sync_status = ret; - /* we have reported this error */ - } - /* not waiting for child here */ - } - } - } - } while (!list_result.empty() && sync_status == 0); - - if (syncstopped) { - drain_all(); - - yield { - const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); - RGWRados *store = sync_env->store; - call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid})); - } - lease_cr->abort(); - return set_cr_done(); - } - - while (num_spawned()) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; - sync_status = ret; - /* we have reported this error */ - } - /* not waiting for child here */ - } - } - - yield call(marker_tracker.flush()); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl; - return set_cr_error(retcode); - } - if (sync_status < 0) { - ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl; - } - - /* wait for all operations to complete */ - drain_all(); - - if (sync_status < 0) { - return set_cr_error(sync_status); - } - - return set_cr_done(); - } - return 0; -} - -int RGWRunBucketSyncCoroutine::operate() -{ - reenter(this) { - yield { - set_status("acquiring sync lock"); - auto store = sync_env->store; - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(store->get_zone_params().log_pool, status_oid), - "sync_lock", - cct->_conf->rgw_sync_lease_period, - this)); - lease_stack.reset(spawn(lease_cr.get(), false)); - } - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - ldout(cct, 5) << "lease cr failed, done early" << dendl; - set_status("lease lock failed, early abort"); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - - yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status)); - if (retcode < 0 && retcode != -ENOENT) { - ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" - << bucket_shard_str{bs} << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - - ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " - << bucket_shard_str{bs} << ": " << sync_status.state << dendl; - - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); - if (retcode == -ENOENT) { - /* bucket instance info has not been synced in yet, fetch it now */ - yield { - ldout(sync_env->cct, 10) << "no local info for bucket " - << bucket_str{bs.bucket} << ": fetching metadata" << dendl; - string raw_key = string("bucket.instance:") + bs.bucket.get_key(); - - meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger); - - call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key, - string() /* no marker */, - MDLOG_STATUS_COMPLETE, - NULL /* no marker tracker */)); - } - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); - } - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - - if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs} - << " failed, retcode=" << retcode << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - } - - if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info, - status_oid, lease_cr.get(), - sync_status.full_marker)); - if (retcode < 0) { - ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs} - << " failed, retcode=" << retcode << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; - } - - if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info, - status_oid, lease_cr.get(), - sync_status.inc_marker)); - if (retcode < 0) { - ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs} - << " failed, retcode=" << retcode << dendl; - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - } - - lease_cr->go_down(); - drain_all(); - return set_cr_done(); - } - - return 0; -} - -RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() -{ - return new RGWRunBucketSyncCoroutine(&sync_env, bs); -} - -int RGWBucketSyncStatusManager::init() -{ - conn = store->get_zone_conn_by_id(source_zone); - if (!conn) { - ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl; - return -EINVAL; - } - - int ret = http_manager.set_threaded(); - if (ret < 0) { - ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; - return ret; - } - - - const string key = bucket.get_key(); - - rgw_http_param_pair pairs[] = { { "key", key.c_str() }, - { NULL, NULL } }; - - string path = string("/admin/metadata/bucket.instance"); - - bucket_instance_meta_info result; - ret = cr_mgr.run(new RGWReadRESTResourceCR(store->ctx(), conn, &http_manager, path, pairs, &result)); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl; - return ret; - } - - RGWBucketInfo& bi = result.data.get_bucket_info(); - num_shards = bi.num_shards; - - error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); - - sync_module.reset(new RGWDefaultSyncModuleInstance()); - - int effective_num_shards = (num_shards ? num_shards : 1); - - auto async_rados = store->get_async_rados(); - - for (int i = 0; i < effective_num_shards; i++) { - RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager); - ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl; - return ret; - } - source_logs[i] = l; - } - - return 0; -} - -int RGWBucketSyncStatusManager::init_sync_status() -{ - list stacks; - - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { - RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->init_sync_status_cr()); - - stacks.push_back(stack); - } - - return cr_mgr.run(stacks); -} - -int RGWBucketSyncStatusManager::read_sync_status() -{ - list stacks; - - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { - RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->read_sync_status_cr(&sync_status[iter->first])); - - stacks.push_back(stack); - } - - int ret = cr_mgr.run(stacks); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " - << bucket_str{bucket} << dendl; - return ret; - } - - return 0; -} - -int RGWBucketSyncStatusManager::run() -{ - list stacks; - - for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { - RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - RGWRemoteBucketLog *l = iter->second; - stack->call(l->run_sync_cr()); - - stacks.push_back(stack); - } - - int ret = cr_mgr.run(stacks); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " - << bucket_str{bucket} << dendl; - return ret; - } - - return 0; -} - -string RGWBucketSyncStatusManager::status_oid(const string& source_zone, - const rgw_bucket_shard& bs) -{ - return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); -} - - -// TODO: move into rgw_data_sync_trim.cc -#undef dout_prefix -#define dout_prefix (*_dout << "data trim: ") - -namespace { - -/// return the marker that it's safe to trim up to -const std::string& get_stable_marker(const rgw_data_sync_marker& m) -{ - return m.state == m.FullSync ? m.next_step_marker : m.marker; -} - -/// comparison operator for take_min_markers() -bool operator<(const rgw_data_sync_marker& lhs, - const rgw_data_sync_marker& rhs) -{ - // sort by stable marker - return get_stable_marker(lhs) < get_stable_marker(rhs); -} - -/// populate the container starting with 'dest' with the minimum stable marker -/// of each shard for all of the peers in [first, last) -template -void take_min_markers(IterIn first, IterIn last, IterOut dest) -{ - if (first == last) { - return; - } - // initialize markers with the first peer's - auto m = dest; - for (auto &shard : first->sync_markers) { - *m = std::move(shard.second); - ++m; - } - // for remaining peers, replace with smaller markers - for (auto p = first + 1; p != last; ++p) { - m = dest; - for (auto &shard : p->sync_markers) { - if (shard.second < *m) { - *m = std::move(shard.second); - } - ++m; - } - } -} - -} // anonymous namespace - -class DataLogTrimCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http; - const int num_shards; - const std::string& zone_id; //< my zone id - std::vector peer_status; //< sync status for each peer - std::vector min_shard_markers; //< min marker per shard - std::vector& last_trim; //< last trimmed marker per shard - int ret{0}; - - public: - DataLogTrimCR(RGWRados *store, RGWHTTPManager *http, - int num_shards, std::vector& last_trim) - : RGWCoroutine(store->ctx()), store(store), http(http), - num_shards(num_shards), - zone_id(store->get_zone().id), - peer_status(store->zone_conn_map.size()), - min_shard_markers(num_shards), - last_trim(last_trim) - {} - - int operate() override; -}; - -int DataLogTrimCR::operate() -{ - reenter(this) { - ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl; - set_status("fetching sync status"); - yield { - // query data sync status from each sync peer - rgw_http_param_pair params[] = { - { "type", "data" }, - { "status", nullptr }, - { "source-zone", zone_id.c_str() }, - { nullptr, nullptr } - }; - - auto p = peer_status.begin(); - for (auto& c : store->zone_conn_map) { - ldout(cct, 20) << "query sync status from " << c.first << dendl; - using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), - false); - ++p; - } - } - - // must get a successful reply from all peers to consider trimming - ret = 0; - while (ret == 0 && num_spawned() > 0) { - yield wait_for_child(); - collect_next(&ret); - } - drain_all(); - - if (ret < 0) { - ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; - return set_cr_error(ret); - } - - ldout(cct, 10) << "trimming log shards" << dendl; - set_status("trimming log shards"); - yield { - // determine the minimum marker for each shard - take_min_markers(peer_status.begin(), peer_status.end(), - min_shard_markers.begin()); - - for (int i = 0; i < num_shards; i++) { - const auto& m = min_shard_markers[i]; - auto& stable = get_stable_marker(m); - if (stable <= last_trim[i]) { - continue; - } - ldout(cct, 10) << "trimming log shard " << i - << " at marker=" << stable - << " last_trim=" << last_trim[i] << dendl; - using TrimCR = RGWSyncLogTrimCR; - spawn(new TrimCR(store, store->data_log->get_oid(i), - stable, &last_trim[i]), - true); - } - } - return set_cr_done(); - } - return 0; -} - -class DataLogTrimPollCR : public RGWCoroutine { - RGWRados *store; - RGWHTTPManager *http; - const int num_shards; - const utime_t interval; //< polling interval - const std::string lock_oid; //< use first data log shard for lock - const std::string lock_cookie; - std::vector last_trim; //< last trimmed marker per shard - - public: - DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval) - : RGWCoroutine(store->ctx()), store(store), http(http), - num_shards(num_shards), interval(interval), - lock_oid(store->data_log->get_oid(0)), - lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), - last_trim(num_shards) - {} - - int operate() override; -}; - -int DataLogTrimPollCR::operate() -{ - reenter(this) { - for (;;) { - set_status("sleeping"); - wait(interval); - - // request a 'data_trim' lock that covers the entire wait interval to - // prevent other gateways from attempting to trim for the duration - set_status("acquiring trim lock"); - yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, - rgw_raw_obj(store->get_zone_params().log_pool, lock_oid), - "data_trim", lock_cookie, - interval.sec())); - if (retcode < 0) { - // if the lock is already held, go back to sleep and try again later - ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in " - << interval.sec() << "s" << dendl; - continue; - } - - set_status("trimming"); - yield call(new DataLogTrimCR(store, http, num_shards, last_trim)); - - // note that the lock is not released. this is intentional, as it avoids - // duplicating this work in other gateways - } - } - return 0; -} - -RGWCoroutine* create_data_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards, utime_t interval) -{ - return new DataLogTrimPollCR(store, http, num_shards, interval); -}