remove ceph code
[stor4nfv.git] / src / ceph / src / rgw / rgw_sync.cc
diff --git a/src/ceph/src/rgw/rgw_sync.cc b/src/ceph/src/rgw/rgw_sync.cc
deleted file mode 100644 (file)
index d45d9a9..0000000
+++ /dev/null
@@ -1,3014 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <boost/optional.hpp>
-
-#include "common/ceph_json.h"
-#include "common/RWLock.h"
-#include "common/RefCountedObj.h"
-#include "common/WorkQueue.h"
-#include "common/Throttle.h"
-#include "common/admin_socket.h"
-#include "common/errno.h"
-
-#include "rgw_common.h"
-#include "rgw_rados.h"
-#include "rgw_sync.h"
-#include "rgw_metadata.h"
-#include "rgw_rest_conn.h"
-#include "rgw_tools.h"
-#include "rgw_cr_rados.h"
-#include "rgw_cr_rest.h"
-#include "rgw_http_client.h"
-
-#include "cls/lock/cls_lock_client.h"
-
-#include <boost/asio/yield.hpp>
-
-#define dout_subsys ceph_subsys_rgw
-
-#undef dout_prefix
-#define dout_prefix (*_dout << "meta sync: ")
-
-static string mdlog_sync_status_oid = "mdlog.sync-status";
-static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
-static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
-
-RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
-  for (int i = 0; i < num_shards; i++) {
-    oids.push_back(get_shard_oid(oid_prefix, i));
-  }
-}
-string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
-  char buf[oid_prefix.size() + 16];
-  snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
-  return string(buf);
-}
-
-RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
-  cls_log_entry entry;
-
-  rgw_sync_error_info info(source_zone, error_code, message);
-  bufferlist bl;
-  ::encode(info, bl);
-  store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
-
-  uint32_t shard_id = ++counter % num_shards;
-
-
-  return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
-}
-
-void RGWSyncBackoff::update_wait_time()
-{
-  if (cur_wait == 0) {
-    cur_wait = 1;
-  } else {
-    cur_wait = (cur_wait << 1);
-  }
-  if (cur_wait >= max_secs) {
-    cur_wait = max_secs;
-  }
-}
-
-void RGWSyncBackoff::backoff_sleep()
-{
-  update_wait_time();
-  sleep(cur_wait);
-}
-
-void RGWSyncBackoff::backoff(RGWCoroutine *op)
-{
-  update_wait_time();
-  op->wait(utime_t(cur_wait, 0));
-}
-
-int RGWBackoffControlCR::operate() {
-  reenter(this) {
-    // retry the operation until it succeeds
-    while (true) {
-      yield {
-        Mutex::Locker l(lock);
-        cr = alloc_cr();
-        cr->get();
-        call(cr);
-      }
-      {
-        Mutex::Locker l(lock);
-        cr->put();
-        cr = NULL;
-      }
-      if (retcode >= 0) {
-        break;
-      }
-      if (retcode != -EBUSY && retcode != -EAGAIN) {
-        ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
-        if (exit_on_error) {
-          return set_cr_error(retcode);
-        }
-      }
-      if (reset_backoff) {
-        backoff.reset();
-      }
-      yield backoff.backoff(this);
-    }
-
-    // run an optional finisher
-    yield call(alloc_finisher_cr());
-    if (retcode < 0) {
-      ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
-      return set_cr_error(retcode);
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-void rgw_mdlog_info::decode_json(JSONObj *obj) {
-  JSONDecoder::decode_json("num_objects", num_shards, obj);
-  JSONDecoder::decode_json("period", period, obj);
-  JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
-}
-
-void rgw_mdlog_entry::decode_json(JSONObj *obj) {
-  JSONDecoder::decode_json("id", id, obj);
-  JSONDecoder::decode_json("section", section, obj);
-  JSONDecoder::decode_json("name", name, obj);
-  utime_t ut;
-  JSONDecoder::decode_json("timestamp", ut, obj);
-  timestamp = ut.to_real_time();
-  JSONDecoder::decode_json("data", log_data, obj);
-}
-
-void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
-  JSONDecoder::decode_json("marker", marker, obj);
-  JSONDecoder::decode_json("truncated", truncated, obj);
-  JSONDecoder::decode_json("entries", entries, obj);
-};
-
-int RGWShardCollectCR::operate() {
-  reenter(this) {
-    while (spawn_next()) {
-      current_running++;
-
-      while (current_running >= max_concurrent) {
-        int child_ret;
-        yield wait_for_child();
-        if (collect_next(&child_ret)) {
-          current_running--;
-          if (child_ret < 0 && child_ret != -ENOENT) {
-            ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
-            status = child_ret;
-          }
-        }
-      }
-    }
-    while (current_running > 0) {
-      int child_ret;
-      yield wait_for_child();
-      if (collect_next(&child_ret)) {
-        current_running--;
-        if (child_ret < 0 && child_ret != -ENOENT) {
-          ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
-          status = child_ret;
-        }
-      }
-    }
-    if (status < 0) {
-      return set_cr_error(status);
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
-  RGWMetaSyncEnv *sync_env;
-
-  const std::string& period;
-  int num_shards;
-  map<int, RGWMetadataLogInfo> *mdlog_info;
-
-  int shard_id;
-#define READ_MDLOG_MAX_CONCURRENT 10
-
-public:
-  RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
-                     const std::string& period, int _num_shards,
-                     map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
-                                                                 sync_env(_sync_env),
-                                                                 period(period), num_shards(_num_shards),
-                                                                 mdlog_info(_mdlog_info), shard_id(0) {}
-  bool spawn_next() override;
-};
-
-class RGWListRemoteMDLogCR : public RGWShardCollectCR {
-  RGWMetaSyncEnv *sync_env;
-
-  const std::string& period;
-  map<int, string> shards;
-  int max_entries_per_shard;
-  map<int, rgw_mdlog_shard_data> *result;
-
-  map<int, string>::iterator iter;
-#define READ_MDLOG_MAX_CONCURRENT 10
-
-public:
-  RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
-                     const std::string& period, map<int, string>& _shards,
-                     int _max_entries_per_shard,
-                     map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
-                                                                 sync_env(_sync_env), period(period),
-                                                                 max_entries_per_shard(_max_entries_per_shard),
-                                                                 result(_result) {
-    shards.swap(_shards);
-    iter = shards.begin();
-  }
-  bool spawn_next() override;
-};
-
-RGWRemoteMetaLog::~RGWRemoteMetaLog()
-{
-  delete error_logger;
-}
-
-int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
-{
-  rgw_http_param_pair pairs[] = { { "type", "metadata" },
-                                  { NULL, NULL } };
-
-  int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
-    return ret;
-  }
-
-  ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
-
-  return 0;
-}
-
-int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
-  rgw_mdlog_info log_info;
-  int ret = read_log_info(&log_info);
-  if (ret < 0) {
-    return ret;
-  }
-
-  return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
-}
-
-int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
-  return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
-}
-
-int RGWRemoteMetaLog::init()
-{
-  conn = store->rest_master_conn;
-
-  int ret = http_manager.set_threaded();
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
-    return ret;
-  }
-
-  error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
-
-  init_sync_env(&sync_env);
-
-  return 0;
-}
-
-void RGWRemoteMetaLog::finish()
-{
-  going_down = true;
-  stop();
-}
-
-#define CLONE_MAX_ENTRIES 100
-
-int RGWMetaSyncStatusManager::init()
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
-  if (!store->rest_master_conn) {
-    lderr(store->ctx()) << "no REST connection to master zone" << dendl;
-    return -EIO;
-  }
-
-  int r = rgw_init_ioctx(store->get_rados_handle(), store->get_zone_params().log_pool, ioctx, true);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool << " ret=" << r << dendl;
-    return r;
-  }
-
-  r = master_log.init();
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
-    return r;
-  }
-
-  RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
-
-  rgw_meta_sync_status sync_status;
-  r = read_sync_status(&sync_status);
-  if (r < 0 && r != -ENOENT) {
-    lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
-    return r;
-  }
-
-  int num_shards = sync_status.sync_info.num_shards;
-
-  for (int i = 0; i < num_shards; i++) {
-    shard_objs[i] = rgw_raw_obj(store->get_zone_params().log_pool, sync_env.shard_obj_name(i));
-  }
-
-  RWLock::WLocker wl(ts_to_shard_lock);
-  for (int i = 0; i < num_shards; i++) {
-    clone_markers.push_back(string());
-    utime_shard ut;
-    ut.shard_id = i;
-    ts_to_shard[ut] = i;
-  }
-
-  return 0;
-}
-
-void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
-                          RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
-                          RGWSyncErrorLogger *_error_logger) {
-  cct = _cct;
-  store = _store;
-  conn = _conn;
-  async_rados = _async_rados;
-  http_manager = _http_manager;
-  error_logger = _error_logger;
-}
-
-string RGWMetaSyncEnv::status_oid()
-{
-  return mdlog_sync_status_oid;
-}
-
-string RGWMetaSyncEnv::shard_obj_name(int shard_id)
-{
-  char buf[mdlog_sync_status_shard_prefix.size() + 16];
-  snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
-
-  return string(buf);
-}
-
-class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  RGWMetadataLog *mdlog;
-  int shard_id;
-  string *marker;
-  int max_entries;
-  list<cls_log_entry> *entries;
-  bool *truncated;
-
-protected:
-  int _send_request() override {
-    real_time from_time;
-    real_time end_time;
-
-    void *handle;
-
-    mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
-
-    int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
-
-    mdlog->complete_list_entries(handle);
-
-    return ret;
-  }
-public:
-  RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
-                           RGWMetadataLog* mdlog, int _shard_id,
-                           string* _marker, int _max_entries,
-                           list<cls_log_entry> *_entries, bool *_truncated)
-    : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
-      shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
-      entries(_entries), truncated(_truncated) {}
-};
-
-class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  RGWMetadataLog *const mdlog;
-  int shard_id;
-  string marker;
-  string *pmarker;
-  int max_entries;
-  list<cls_log_entry> *entries;
-  bool *truncated;
-
-  RGWAsyncReadMDLogEntries *req{nullptr};
-
-public:
-  RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
-                        int _shard_id, string*_marker, int _max_entries,
-                        list<cls_log_entry> *_entries, bool *_truncated)
-    : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
-      shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
-      entries(_entries), truncated(_truncated) {}
-
-  ~RGWReadMDLogEntriesCR() override {
-    if (req) {
-      req->finish();
-    }
-  }
-
-  int send_request() override {
-    marker = *pmarker;
-    req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
-                                       sync_env->store, mdlog, shard_id, &marker,
-                                       max_entries, entries, truncated);
-    sync_env->async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() override {
-    int ret = req->get_ret_status();
-    if (ret >= 0 && !entries->empty()) {
-     *pmarker = marker;
-    }
-    return req->get_ret_status();
-  }
-};
-
-
-class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
-  RGWMetaSyncEnv *env;
-  RGWRESTReadResource *http_op;
-
-  const std::string& period;
-  int shard_id;
-  RGWMetadataLogInfo *shard_info;
-
-public:
-  RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
-                                int _shard_id, RGWMetadataLogInfo *_shard_info)
-    : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
-      period(period), shard_id(_shard_id), shard_info(_shard_info) {}
-
-  int operate() override {
-    auto store = env->store;
-    RGWRESTConn *conn = store->rest_master_conn;
-    reenter(this) {
-      yield {
-       char buf[16];
-       snprintf(buf, sizeof(buf), "%d", shard_id);
-        rgw_http_param_pair pairs[] = { { "type" , "metadata" },
-                                       { "id", buf },
-                                       { "period", period.c_str() },
-                                       { "info" , NULL },
-                                       { NULL, NULL } };
-
-        string p = "/admin/log/";
-
-        http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
-                                          env->http_manager);
-
-        http_op->set_user_info((void *)stack);
-
-        int ret = http_op->aio_read();
-        if (ret < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
-          log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
-          http_op->put();
-          return set_cr_error(ret);
-        }
-
-        return io_block(0);
-      }
-      yield {
-        int ret = http_op->wait(shard_info);
-        http_op->put();
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-        return set_cr_done();
-      }
-    }
-    return 0;
-  }
-};
-
-class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  RGWRESTReadResource *http_op;
-
-  const std::string& period;
-  int shard_id;
-  string marker;
-  uint32_t max_entries;
-  rgw_mdlog_shard_data *result;
-
-public:
-  RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
-                            int _shard_id, const string& _marker, uint32_t _max_entries,
-                            rgw_mdlog_shard_data *_result)
-    : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
-      period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
-
-  int send_request() override {
-    RGWRESTConn *conn = sync_env->conn;
-    RGWRados *store = sync_env->store;
-
-    char buf[32];
-    snprintf(buf, sizeof(buf), "%d", shard_id);
-
-    char max_entries_buf[32];
-    snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
-
-    const char *marker_key = (marker.empty() ? "" : "marker");
-
-    rgw_http_param_pair pairs[] = { { "type", "metadata" },
-      { "id", buf },
-      { "period", period.c_str() },
-      { "max-entries", max_entries_buf },
-      { marker_key, marker.c_str() },
-      { NULL, NULL } };
-
-    string p = "/admin/log/";
-
-    http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
-    http_op->set_user_info((void *)stack);
-
-    int ret = http_op->aio_read();
-    if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
-      log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
-      http_op->put();
-      return ret;
-    }
-
-    return 0;
-  }
-
-  int request_complete() override {
-    int ret = http_op->wait(result);
-    http_op->put();
-    if (ret < 0 && ret != -ENOENT) {
-      ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
-      return ret;
-    }
-    return 0;
-  }
-};
-
-bool RGWReadRemoteMDLogInfoCR::spawn_next() {
-  if (shard_id >= num_shards) {
-    return false;
-  }
-  spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
-  shard_id++;
-  return true;
-}
-
-bool RGWListRemoteMDLogCR::spawn_next() {
-  if (iter == shards.end()) {
-    return false;
-  }
-
-  spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
-  ++iter;
-  return true;
-}
-
-class RGWInitSyncStatusCoroutine : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-
-  rgw_meta_sync_info status;
-  vector<RGWMetadataLogInfo> shards_info;
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-public:
-  RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
-                             const rgw_meta_sync_info &status)
-    : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
-      status(status), shards_info(status.num_shards),
-      lease_cr(nullptr), lease_stack(nullptr) {}
-
-  ~RGWInitSyncStatusCoroutine() override {
-    if (lease_cr) {
-      lease_cr->abort();
-    }
-  }
-
-  int operate() override {
-    int ret;
-    reenter(this) {
-      yield {
-        set_status("acquiring sync lock");
-       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-        string lock_name = "sync_lock";
-        RGWRados *store = sync_env->store;
-        lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                                rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
-                                                lock_name, lock_duration, this));
-        lease_stack.reset(spawn(lease_cr.get(), false));
-      }
-      while (!lease_cr->is_locked()) {
-        if (lease_cr->is_done()) {
-          ldout(cct, 5) << "lease cr failed, done early " << dendl;
-          set_status("lease lock failed, early abort");
-          return set_cr_error(lease_cr->get_ret_status());
-        }
-        set_sleeping(true);
-        yield;
-      }
-      yield {
-        set_status("writing sync status");
-        RGWRados *store = sync_env->store;
-        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
-                                                           rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
-                                                           status));
-      }
-
-      if (retcode < 0) {
-        set_status("failed to write sync status");
-        ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
-        yield lease_cr->go_down();
-        return set_cr_error(retcode);
-      }
-      /* fetch current position in logs */
-      set_status("fetching remote log position");
-      yield {
-        for (int i = 0; i < (int)status.num_shards; i++) {
-          spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
-                                                  &shards_info[i]), false);
-       }
-      }
-
-      drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
-
-      yield {
-        set_status("updating sync status");
-        for (int i = 0; i < (int)status.num_shards; i++) {
-         rgw_meta_sync_marker marker;
-          RGWMetadataLogInfo& info = shards_info[i];
-         marker.next_step_marker = info.marker;
-         marker.timestamp = info.last_update;
-          RGWRados *store = sync_env->store;
-          spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
-                                                                store,
-                                                                rgw_raw_obj(store->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
-                                                                marker), true);
-        }
-      }
-      yield {
-        set_status("changing sync state: build full sync maps");
-       status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
-        RGWRados *store = sync_env->store;
-        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
-                                                           rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
-                                                           status));
-      }
-      set_status("drop lock lease");
-      yield lease_cr->go_down();
-      while (collect(&ret, NULL)) {
-       if (ret < 0) {
-         return set_cr_error(ret);
-       }
-        yield;
-      }
-      drain_all();
-      return set_cr_done();
-    }
-    return 0;
-  }
-};
-
-class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  RGWMetaSyncEnv *env;
-  const int num_shards;
-  int shard_id{0};
-  map<uint32_t, rgw_meta_sync_marker>& markers;
-
- public:
-  RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
-                             map<uint32_t, rgw_meta_sync_marker>& markers)
-    : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
-      env(env), num_shards(num_shards), markers(markers)
-  {}
-  bool spawn_next() override;
-};
-
-bool RGWReadSyncStatusMarkersCR::spawn_next()
-{
-  if (shard_id >= num_shards) {
-    return false;
-  }
-  using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
-  rgw_raw_obj obj{env->store->get_zone_params().log_pool,
-                  env->shard_obj_name(shard_id)};
-  spawn(new CR(env->async_rados, env->store, obj, &markers[shard_id]), false);
-  shard_id++;
-  return true;
-}
-
-class RGWReadSyncStatusCoroutine : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  rgw_meta_sync_status *sync_status;
-
-public:
-  RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
-                             rgw_meta_sync_status *_status)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
-  {}
-  int operate() override;
-};
-
-int RGWReadSyncStatusCoroutine::operate()
-{
-  reenter(this) {
-    // read sync info
-    using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
-    yield {
-      bool empty_on_enoent = false; // fail on ENOENT
-      rgw_raw_obj obj{sync_env->store->get_zone_params().log_pool,
-                      sync_env->status_oid()};
-      call(new ReadInfoCR(sync_env->async_rados, sync_env->store, obj,
-                          &sync_status->sync_info, empty_on_enoent));
-    }
-    if (retcode < 0) {
-      ldout(sync_env->cct, 4) << "failed to read sync status info with "
-          << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    // read shard markers
-    using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
-    yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
-                                 sync_status->sync_markers));
-    if (retcode < 0) {
-      ldout(sync_env->cct, 4) << "failed to read sync status markers with "
-          << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class RGWFetchAllMetaCR : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-
-  int num_shards;
-
-
-  int ret_status;
-
-  list<string> sections;
-  list<string>::iterator sections_iter;
-
-  struct meta_list_result {
-    list<string> keys;
-    string marker;
-    uint64_t count{0};
-    bool truncated{false};
-
-    void decode_json(JSONObj *obj) {
-      JSONDecoder::decode_json("keys", keys, obj);
-      JSONDecoder::decode_json("marker", marker, obj);
-      JSONDecoder::decode_json("count", count, obj);
-      JSONDecoder::decode_json("truncated", truncated, obj);
-    }
-  } result;
-  list<string>::iterator iter;
-
-  std::unique_ptr<RGWShardedOmapCRManager> entries_index;
-
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-  bool lost_lock;
-  bool failed;
-
-  string marker;
-
-  map<uint32_t, rgw_meta_sync_marker>& markers;
-
-public:
-  RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
-                    map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                     num_shards(_num_shards),
-                                                     ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
-                                                      lost_lock(false), failed(false), markers(_markers) {
-  }
-
-  ~RGWFetchAllMetaCR() override {
-  }
-
-  void append_section_from_set(set<string>& all_sections, const string& name) {
-    set<string>::iterator iter = all_sections.find(name);
-    if (iter != all_sections.end()) {
-      sections.emplace_back(std::move(*iter));
-      all_sections.erase(iter);
-    }
-  }
-  /*
-   * meta sync should go in the following order: user, bucket.instance, bucket
-   * then whatever other sections exist (if any)
-   */
-  void rearrange_sections() {
-    set<string> all_sections;
-    std::move(sections.begin(), sections.end(),
-              std::inserter(all_sections, all_sections.end()));
-    sections.clear();
-
-    append_section_from_set(all_sections, "user");
-    append_section_from_set(all_sections, "bucket.instance");
-    append_section_from_set(all_sections, "bucket");
-
-    std::move(all_sections.begin(), all_sections.end(),
-              std::back_inserter(sections));
-  }
-
-  int operate() override {
-    RGWRESTConn *conn = sync_env->conn;
-
-    reenter(this) {
-      yield {
-        set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
-       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-        string lock_name = "sync_lock";
-        lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
-                                                sync_env->store,
-                                                rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->status_oid()),
-                                                lock_name, lock_duration, this));
-        lease_stack.reset(spawn(lease_cr.get(), false));
-      }
-      while (!lease_cr->is_locked()) {
-        if (lease_cr->is_done()) {
-          ldout(cct, 5) << "lease cr failed, done early " << dendl;
-          set_status("failed acquiring lock");
-          return set_cr_error(lease_cr->get_ret_status());
-        }
-        set_sleeping(true);
-        yield;
-      }
-      entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
-                                                      sync_env->store->get_zone_params().log_pool,
-                                                      mdlog_sync_full_sync_index_prefix));
-      yield {
-       call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
-                                      "/admin/metadata", NULL, &sections));
-      }
-      if (get_ret_status() < 0) {
-        ldout(cct, 0) << "ERROR: failed to fetch metadata sections" << dendl;
-        yield entries_index->finish();
-        yield lease_cr->go_down();
-        drain_all();
-       return set_cr_error(get_ret_status());
-      }
-      rearrange_sections();
-      sections_iter = sections.begin();
-      for (; sections_iter != sections.end(); ++sections_iter) {
-        do {
-          yield {
-#define META_FULL_SYNC_CHUNK_SIZE "1000"
-            string entrypoint = string("/admin/metadata/") + *sections_iter;
-            rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
-              { "marker", result.marker.c_str() },
-              { NULL, NULL } };
-            result.keys.clear();
-            call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
-                                                              entrypoint, pairs, &result));
-          }
-          if (get_ret_status() < 0) {
-            ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
-            yield entries_index->finish();
-            yield lease_cr->go_down();
-            drain_all();
-            return set_cr_error(get_ret_status());
-          }
-          iter = result.keys.begin();
-          for (; iter != result.keys.end(); ++iter) {
-            if (!lease_cr->is_locked()) {
-              lost_lock = true;
-              break;
-            }
-            yield; // allow entries_index consumer to make progress
-
-            ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
-            string s = *sections_iter + ":" + *iter;
-            int shard_id;
-            RGWRados *store = sync_env->store;
-            int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
-            if (ret < 0) {
-              ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
-              ret_status = ret;
-              break;
-            }
-            if (!entries_index->append(s, shard_id)) {
-              break;
-            }
-          }
-        } while (result.truncated);
-      }
-      yield {
-        if (!entries_index->finish()) {
-          failed = true;
-        }
-      }
-      if (!failed) {
-        for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
-          int shard_id = (int)iter->first;
-          rgw_meta_sync_marker& marker = iter->second;
-          marker.total_entries = entries_index->get_total_entries(shard_id);
-          spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store,
-                                                                rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
-                                                                marker), true);
-        }
-      }
-
-      drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
-
-      yield lease_cr->go_down();
-
-      int ret;
-      while (collect(&ret, NULL)) {
-       if (ret < 0) {
-         return set_cr_error(ret);
-       }
-        yield;
-      }
-      drain_all();
-      if (failed) {
-        yield return set_cr_error(-EIO);
-      }
-      if (lost_lock) {
-        yield return set_cr_error(-EBUSY);
-      }
-
-      if (ret_status < 0) {
-        yield return set_cr_error(ret_status);
-      }
-
-      yield return set_cr_done();
-    }
-    return 0;
-  }
-};
-
-static string full_sync_index_shard_oid(int shard_id)
-{
-  char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
-  snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
-  return string(buf);
-}
-
-class RGWReadRemoteMetadataCR : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-
-  RGWRESTReadResource *http_op;
-
-  string section;
-  string key;
-
-  bufferlist *pbl;
-
-public:
-  RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
-                                                      const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                      http_op(NULL),
-                                                      section(_section),
-                                                      key(_key),
-                                                     pbl(_pbl) {
-  }
-
-  int operate() override {
-    RGWRESTConn *conn = sync_env->conn;
-    reenter(this) {
-      yield {
-        rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
-                                       { NULL, NULL } };
-
-        string p = string("/admin/metadata/") + section + "/" + key;
-
-        http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
-
-        http_op->set_user_info((void *)stack);
-
-        int ret = http_op->aio_read();
-        if (ret < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
-          log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
-          http_op->put();
-          return set_cr_error(ret);
-        }
-
-        return io_block(0);
-      }
-      yield {
-        int ret = http_op->wait_bl(pbl);
-        http_op->put();
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-        return set_cr_done();
-      }
-    }
-    return 0;
-  }
-};
-
-class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  string raw_key;
-  bufferlist bl;
-protected:
-  int _send_request() override {
-    int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
-    if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
-      return ret;
-    }
-    return 0;
-  }
-public:
-  RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
-                       const string& _raw_key,
-                       bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
-                                          raw_key(_raw_key), bl(_bl) {}
-};
-
-
-class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  string raw_key;
-  bufferlist bl;
-
-  RGWAsyncMetaStoreEntry *req;
-
-public:
-  RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
-                       const string& _raw_key,
-                       bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                          raw_key(_raw_key), bl(_bl), req(NULL) {
-  }
-
-  ~RGWMetaStoreEntryCR() override {
-    if (req) {
-      req->finish();
-    }
-  }
-
-  int send_request() override {
-    req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
-                                  sync_env->store, raw_key, bl);
-    sync_env->async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() override {
-    return req->get_ret_status();
-  }
-};
-
-class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  string raw_key;
-protected:
-  int _send_request() override {
-    int ret = store->meta_mgr->remove(raw_key);
-    if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
-      return ret;
-    }
-    return 0;
-  }
-public:
-  RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
-                       const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
-                                          raw_key(_raw_key) {}
-};
-
-
-class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  string raw_key;
-
-  RGWAsyncMetaRemoveEntry *req;
-
-public:
-  RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
-                       const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                          raw_key(_raw_key), req(NULL) {
-  }
-
-  ~RGWMetaRemoveEntryCR() override {
-    if (req) {
-      req->finish();
-    }
-  }
-
-  int send_request() override {
-    req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
-                                  sync_env->store, raw_key);
-    sync_env->async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() override {
-    int r = req->get_ret_status();
-    if (r == -ENOENT) {
-      r = 0;
-    }
-    return r;
-  }
-};
-
-#define META_SYNC_UPDATE_MARKER_WINDOW 10
-
-class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
-  RGWMetaSyncEnv *sync_env;
-
-  string marker_oid;
-  rgw_meta_sync_marker sync_marker;
-
-
-public:
-  RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
-                         const string& _marker_oid,
-                         const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
-                                                                sync_env(_sync_env),
-                                                                marker_oid(_marker_oid),
-                                                                sync_marker(_marker) {}
-
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
-    sync_marker.marker = new_marker;
-    if (index_pos > 0) {
-      sync_marker.pos = index_pos;
-    }
-
-    if (!real_clock::is_zero(timestamp)) {
-      sync_marker.timestamp = timestamp;
-    }
-
-    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
-    RGWRados *store = sync_env->store;
-    return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
-                                                           store,
-                                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
-                                                           sync_marker);
-  }
-};
-
-int RGWMetaSyncSingleEntryCR::operate() {
-  reenter(this) {
-#define NUM_TRANSIENT_ERROR_RETRIES 10
-
-    if (error_injection &&
-        rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
-      ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << ": injecting meta sync error on key=" << raw_key << dendl;
-      return set_cr_error(-EIO);
-    }
-
-    if (op_status != MDLOG_STATUS_COMPLETE) {
-      ldout(sync_env->cct, 20) << "skipping pending operation" << dendl;
-      yield call(marker_tracker->finish(entry_marker));
-      if (retcode < 0) {
-        return set_cr_error(retcode);
-      }
-      return set_cr_done();
-    }
-    for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
-      yield {
-        pos = raw_key.find(':');
-        section = raw_key.substr(0, pos);
-        key = raw_key.substr(pos + 1);
-        ldout(sync_env->cct, 20) << "fetching remote metadata: " << section << ":" << key << (tries == 0 ? "" : " (retry)") << dendl;
-        call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl));
-      }
-
-      sync_status = retcode;
-
-      if (sync_status == -ENOENT) {
-        /* FIXME: do we need to remove the entry from the local zone? */
-        break;
-      }
-
-      if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
-        ldout(sync_env->cct, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
-        continue;
-      }
-
-      if (sync_status < 0) {
-        ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
-        log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
-        yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
-                                                        string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
-        return set_cr_error(sync_status);
-      }
-
-      break;
-    }
-
-    retcode = 0;
-    for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
-      if (sync_status != -ENOENT) {
-          yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
-      } else {
-          yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
-      }
-      if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
-        ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
-        continue;
-      }
-      break;
-    }
-
-    sync_status = retcode;
-
-    if (sync_status == 0 && marker_tracker) {
-      /* update marker */
-      yield call(marker_tracker->finish(entry_marker));
-      sync_status = retcode;
-    }
-    if (sync_status < 0) {
-      return set_cr_error(sync_status);
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class RGWCloneMetaLogCoroutine : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  RGWMetadataLog *mdlog;
-
-  const std::string& period;
-  int shard_id;
-  string marker;
-  bool truncated = false;
-  string *new_marker;
-
-  int max_entries = CLONE_MAX_ENTRIES;
-
-  RGWRESTReadResource *http_op = nullptr;
-  boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
-
-  RGWMetadataLogInfo shard_info;
-  rgw_mdlog_shard_data data;
-
-public:
-  RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
-                           const std::string& period, int _id,
-                           const string& _marker, string *_new_marker)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
-      period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
-    if (new_marker) {
-      *new_marker = marker;
-    }
-  }
-  ~RGWCloneMetaLogCoroutine() override {
-    if (http_op) {
-      http_op->put();
-    }
-    if (completion) {
-      completion->cancel();
-    }
-  }
-
-  int operate() override;
-
-  int state_init();
-  int state_read_shard_status();
-  int state_read_shard_status_complete();
-  int state_send_rest_request();
-  int state_receive_rest_response();
-  int state_store_mdlog_entries();
-  int state_store_mdlog_entries_complete();
-};
-
-class RGWMetaSyncShardCR : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-
-  const rgw_pool& pool;
-  const std::string& period; //< currently syncing period id
-  const epoch_t realm_epoch; //< realm_epoch of period
-  RGWMetadataLog* mdlog; //< log of syncing period
-  uint32_t shard_id;
-  rgw_meta_sync_marker& sync_marker;
-  boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
-  string marker;
-  string max_marker;
-  const std::string& period_marker; //< max marker stored in next period
-
-  map<string, bufferlist> entries;
-  map<string, bufferlist>::iterator iter;
-
-  string oid;
-
-  RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
-
-  list<cls_log_entry> log_entries;
-  list<cls_log_entry>::iterator log_iter;
-  bool truncated = false;
-
-  string mdlog_marker;
-  string raw_key;
-  rgw_mdlog_entry mdlog_entry;
-
-  Mutex inc_lock;
-  Cond inc_cond;
-
-  boost::asio::coroutine incremental_cr;
-  boost::asio::coroutine full_cr;
-
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
-  bool lost_lock = false;
-
-  bool *reset_backoff;
-
-  // hold a reference to the cr stack while it's in the map
-  using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
-  map<StackRef, string> stack_to_pos;
-  map<string, string> pos_to_prev;
-
-  bool can_adjust_marker = false;
-  bool done_with_period = false;
-
-  int total_entries = 0;
-
-public:
-  RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
-                     const std::string& period, epoch_t realm_epoch,
-                     RGWMetadataLog* mdlog, uint32_t _shard_id,
-                     rgw_meta_sync_marker& _marker,
-                     const std::string& period_marker, bool *_reset_backoff)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
-      period(period), realm_epoch(realm_epoch), mdlog(mdlog),
-      shard_id(_shard_id), sync_marker(_marker),
-      period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
-      reset_backoff(_reset_backoff) {
-    *reset_backoff = false;
-  }
-
-  ~RGWMetaSyncShardCR() override {
-    delete marker_tracker;
-    if (lease_cr) {
-      lease_cr->abort();
-    }
-  }
-
-  void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
-    delete marker_tracker;
-    marker_tracker = mt;
-  }
-
-  int operate() override {
-    int r;
-    while (true) {
-      switch (sync_marker.state) {
-      case rgw_meta_sync_marker::FullSync:
-        r  = full_sync();
-        if (r < 0) {
-          ldout(sync_env->cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
-          return set_cr_error(r);
-        }
-        return 0;
-      case rgw_meta_sync_marker::IncrementalSync:
-        r  = incremental_sync();
-        if (r < 0) {
-          ldout(sync_env->cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
-          return set_cr_error(r);
-        }
-        return 0;
-      }
-    }
-    /* unreachable */
-    return 0;
-  }
-
-  void collect_children()
-  {
-    int child_ret;
-    RGWCoroutinesStack *child;
-    while (collect_next(&child_ret, &child)) {
-      auto iter = stack_to_pos.find(child);
-      if (iter == stack_to_pos.end()) {
-        /* some other stack that we don't care about */
-        continue;
-      }
-
-      string& pos = iter->second;
-
-      if (child_ret < 0) {
-        ldout(sync_env->cct, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
-      }
-
-      map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
-      assert(prev_iter != pos_to_prev.end());
-
-      /*
-       * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
-       * update the marker and abort. We'll get called again for these. Permanent errors will be
-       * handled by marking the entry at the error log shard, so that we retry on it separately
-       */
-      if (child_ret == -EAGAIN) {
-        can_adjust_marker = false;
-      }
-
-      if (pos_to_prev.size() == 1) {
-        if (can_adjust_marker) {
-          sync_marker.marker = pos;
-        }
-        pos_to_prev.erase(prev_iter);
-      } else {
-        assert(pos_to_prev.size() > 1);
-        pos_to_prev.erase(prev_iter);
-        prev_iter = pos_to_prev.begin();
-        if (can_adjust_marker) {
-          sync_marker.marker = prev_iter->second;
-        }
-      }
-
-      ldout(sync_env->cct, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
-      stack_to_pos.erase(iter);
-    }
-  }
-
-  int full_sync() {
-#define OMAP_GET_MAX_ENTRIES 100
-    int max_entries = OMAP_GET_MAX_ENTRIES;
-    reenter(&full_cr) {
-      set_status("full_sync");
-      oid = full_sync_index_shard_oid(shard_id);
-      can_adjust_marker = true;
-      /* grab lock */
-      yield {
-       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-        string lock_name = "sync_lock";
-        RGWRados *store = sync_env->store;
-        lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                                rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                                lock_name, lock_duration, this));
-        lease_stack.reset(spawn(lease_cr.get(), false));
-        lost_lock = false;
-      }
-      while (!lease_cr->is_locked()) {
-        if (lease_cr->is_done()) {
-          ldout(cct, 5) << "lease cr failed, done early " << dendl;
-          drain_all();
-          return lease_cr->get_ret_status();
-        }
-        set_sleeping(true);
-        yield;
-      }
-
-      /* lock succeeded, a retry now should avoid previous backoff status */
-      *reset_backoff = true;
-
-      /* prepare marker tracker */
-      set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
-                                                         sync_env->shard_obj_name(shard_id),
-                                                         sync_marker));
-
-      marker = sync_marker.marker;
-
-      total_entries = sync_marker.pos;
-
-      /* sync! */
-      do {
-        if (!lease_cr->is_locked()) {
-          lost_lock = true;
-          break;
-        }
-        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             marker, &entries, max_entries));
-        if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
-          yield lease_cr->go_down();
-          drain_all();
-          return retcode;
-        }
-        iter = entries.begin();
-        for (; iter != entries.end(); ++iter) {
-          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
-          total_entries++;
-          if (!marker_tracker->start(iter->first, total_entries, real_time())) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
-          } else {
-            // fetch remote and write locally
-            yield {
-              RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
-              // stack_to_pos holds a reference to the stack
-              stack_to_pos[stack] = iter->first;
-              pos_to_prev[iter->first] = marker;
-            }
-          }
-          marker = iter->first;
-        }
-        collect_children();
-      } while ((int)entries.size() == max_entries && can_adjust_marker);
-
-      while (num_spawned() > 1) {
-        yield wait_for_child();
-        collect_children();
-      }
-
-      if (!lost_lock) {
-        /* update marker to reflect we're done with full sync */
-        if (can_adjust_marker) {
-          // apply updates to a temporary marker, or operate() will send us
-          // to incremental_sync() after we yield
-          temp_marker = sync_marker;
-         temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
-         temp_marker->marker = std::move(temp_marker->next_step_marker);
-         temp_marker->next_step_marker.clear();
-         temp_marker->realm_epoch = realm_epoch;
-         ldout(sync_env->cct, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
-
-         using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
-         yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store,
-                                      rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                      *temp_marker));
-        }
-
-        if (retcode < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
-          yield lease_cr->go_down();
-          drain_all();
-          return retcode;
-        }
-      }
-
-      /* 
-       * if we reached here, it means that lost_lock is true, otherwise the state
-       * change in the previous block will prevent us from reaching here
-       */
-
-      yield lease_cr->go_down();
-
-      lease_cr.reset();
-
-      drain_all();
-
-      if (!can_adjust_marker) {
-        return -EAGAIN;
-      }
-
-      if (lost_lock) {
-        return -EBUSY;
-      }
-
-      // apply the sync marker update
-      assert(temp_marker);
-      sync_marker = std::move(*temp_marker);
-      temp_marker = boost::none;
-      // must not yield after this point!
-    }
-    return 0;
-  }
-    
-
-  int incremental_sync() {
-    reenter(&incremental_cr) {
-      set_status("incremental_sync");
-      can_adjust_marker = true;
-      /* grab lock */
-      if (!lease_cr) { /* could have had  a lease_cr lock from previous state */
-        yield {
-          uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-          string lock_name = "sync_lock";
-          RGWRados *store = sync_env->store;
-          lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                                   rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                                   lock_name, lock_duration, this));
-          lease_stack.reset(spawn(lease_cr.get(), false));
-          lost_lock = false;
-        }
-        while (!lease_cr->is_locked()) {
-          if (lease_cr->is_done()) {
-            ldout(cct, 5) << "lease cr failed, done early " << dendl;
-            drain_all();
-            return lease_cr->get_ret_status();
-          }
-          set_sleeping(true);
-          yield;
-        }
-      }
-      // if the period has advanced, we can't use the existing marker
-      if (sync_marker.realm_epoch < realm_epoch) {
-        ldout(sync_env->cct, 4) << "clearing marker=" << sync_marker.marker
-            << " from old realm_epoch=" << sync_marker.realm_epoch
-            << " (now " << realm_epoch << ')' << dendl;
-        sync_marker.realm_epoch = realm_epoch;
-        sync_marker.marker.clear();
-      }
-      mdlog_marker = sync_marker.marker;
-      set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
-                                                         sync_env->shard_obj_name(shard_id),
-                                                         sync_marker));
-
-      /*
-       * mdlog_marker: the remote sync marker positiion
-       * sync_marker: the local sync marker position
-       * max_marker: the max mdlog position that we fetched
-       * marker: the current position we try to sync
-       * period_marker: the last marker before the next period begins (optional)
-       */
-      marker = max_marker = sync_marker.marker;
-      /* inc sync */
-      do {
-        if (!lease_cr->is_locked()) {
-          lost_lock = true;
-          break;
-        }
-#define INCREMENTAL_MAX_ENTRIES 100
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
-        if (!period_marker.empty() && period_marker <= mdlog_marker) {
-          ldout(cct, 10) << "mdlog_marker past period_marker=" << period_marker << dendl;
-          done_with_period = true;
-          break;
-        }
-       if (mdlog_marker <= max_marker) {
-         /* we're at the tip, try to bring more entries */
-          ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
-          yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
-                                                  period, shard_id,
-                                                  mdlog_marker, &mdlog_marker));
-       }
-        if (retcode < 0) {
-          ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
-          yield lease_cr->go_down();
-          drain_all();
-          *reset_backoff = false; // back off and try again later
-          return retcode;
-        }
-        *reset_backoff = true; /* if we got to this point, all systems function */
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (mdlog_marker > max_marker) {
-          marker = max_marker;
-          yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
-                                               &max_marker, INCREMENTAL_MAX_ENTRIES,
-                                               &log_entries, &truncated));
-          if (retcode < 0) {
-            ldout(sync_env->cct, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode << dendl;
-            yield lease_cr->go_down();
-            drain_all();
-            *reset_backoff = false; // back off and try again later
-            return retcode;
-          }
-          for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
-            if (!period_marker.empty() && period_marker <= log_iter->id) {
-              done_with_period = true;
-              if (period_marker < log_iter->id) {
-                ldout(cct, 10) << "found key=" << log_iter->id
-                    << " past period_marker=" << period_marker << dendl;
-                break;
-              }
-              ldout(cct, 10) << "found key at period_marker=" << period_marker << dendl;
-              // sync this entry, then return control to RGWMetaSyncCR
-            }
-            if (!mdlog_entry.convert_from(*log_iter)) {
-              ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
-              continue;
-            }
-            ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
-            if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
-              ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
-            } else {
-              raw_key = log_iter->section + ":" + log_iter->name;
-              yield {
-                RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
-                assert(stack);
-                // stack_to_pos holds a reference to the stack
-                stack_to_pos[stack] = log_iter->id;
-                pos_to_prev[log_iter->id] = marker;
-              }
-            }
-            marker = log_iter->id;
-          }
-        }
-        collect_children();
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
-        if (done_with_period) {
-          // return control to RGWMetaSyncCR and advance to the next period
-          ldout(sync_env->cct, 10) << *this << ": done with period" << dendl;
-          break;
-        }
-       if (mdlog_marker == max_marker && can_adjust_marker) {
-#define INCREMENTAL_INTERVAL 20
-         yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
-       }
-      } while (can_adjust_marker);
-
-      while (num_spawned() > 1) {
-        yield wait_for_child();
-        collect_children();
-      }
-
-      yield lease_cr->go_down();
-
-      drain_all();
-
-      if (lost_lock) {
-        return -EBUSY;
-      }
-
-      if (!can_adjust_marker) {
-        return -EAGAIN;
-      }
-
-      return set_cr_done();
-    }
-    /* TODO */
-    return 0;
-  }
-};
-
-class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
-{
-  RGWMetaSyncEnv *sync_env;
-
-  const rgw_pool& pool;
-  const std::string& period;
-  epoch_t realm_epoch;
-  RGWMetadataLog* mdlog;
-  uint32_t shard_id;
-  rgw_meta_sync_marker sync_marker;
-  const std::string period_marker;
-
-  static constexpr bool exit_on_error = false; // retry on all errors
-public:
-  RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
-                            const std::string& period, epoch_t realm_epoch,
-                            RGWMetadataLog* mdlog, uint32_t _shard_id,
-                            const rgw_meta_sync_marker& _marker,
-                            std::string&& period_marker)
-    : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
-      pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
-      shard_id(_shard_id), sync_marker(_marker),
-      period_marker(std::move(period_marker)) {}
-
-  RGWCoroutine *alloc_cr() override {
-    return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
-                                  shard_id, sync_marker, period_marker, backoff_ptr());
-  }
-
-  RGWCoroutine *alloc_finisher_cr() override {
-    RGWRados *store = sync_env->store;
-    return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store,
-                                                          rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                                          &sync_marker);
-  }
-};
-
-class RGWMetaSyncCR : public RGWCoroutine {
-  RGWMetaSyncEnv *sync_env;
-  const rgw_pool& pool;
-  RGWPeriodHistory::Cursor cursor; //< sync position in period history
-  RGWPeriodHistory::Cursor next; //< next period in history
-  rgw_meta_sync_status sync_status;
-
-  std::mutex mutex; //< protect access to shard_crs
-
-  // TODO: it should be enough to hold a reference on the stack only, as calling
-  // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
-  // already completed
-  using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
-  using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
-  using RefPair = std::pair<ControlCRRef, StackRef>;
-  map<int, RefPair> shard_crs;
-  int ret{0};
-
-public:
-  RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
-                const rgw_meta_sync_status& _sync_status)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-      pool(sync_env->store->get_zone_params().log_pool),
-      cursor(cursor), sync_status(_sync_status) {}
-
-  int operate() override {
-    reenter(this) {
-      // loop through one period at a time
-      for (;;) {
-        if (cursor == sync_env->store->period_history->get_current()) {
-          next = RGWPeriodHistory::Cursor{};
-          if (cursor) {
-            ldout(cct, 10) << "RGWMetaSyncCR on current period="
-                << cursor.get_period().get_id() << dendl;
-          } else {
-            ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
-          }
-        } else {
-          next = cursor;
-          next.next();
-          ldout(cct, 10) << "RGWMetaSyncCR on period="
-              << cursor.get_period().get_id() << ", next="
-              << next.get_period().get_id() << dendl;
-        }
-
-        yield {
-          // get the mdlog for the current period (may be empty)
-          auto& period_id = sync_status.sync_info.period;
-          auto realm_epoch = sync_status.sync_info.realm_epoch;
-          auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
-
-          // prevent wakeup() from accessing shard_crs while we're spawning them
-          std::lock_guard<std::mutex> lock(mutex);
-
-          // sync this period on each shard
-          for (const auto& m : sync_status.sync_markers) {
-            uint32_t shard_id = m.first;
-            auto& marker = m.second;
-
-            std::string period_marker;
-            if (next) {
-              // read the maximum marker from the next period's sync status
-              period_marker = next.get_period().get_sync_status()[shard_id];
-              if (period_marker.empty()) {
-                // no metadata changes have occurred on this shard, skip it
-                ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
-                    << " with empty period marker" << dendl;
-                continue;
-              }
-            }
-
-            using ShardCR = RGWMetaSyncShardControlCR;
-            auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
-                                  mdlog, shard_id, marker,
-                                  std::move(period_marker));
-            auto stack = spawn(cr, false);
-            shard_crs[shard_id] = RefPair{cr, stack};
-          }
-        }
-        // wait for each shard to complete
-        while (ret == 0 && num_spawned() > 0) {
-          yield wait_for_child();
-          collect(&ret, nullptr);
-        }
-        drain_all();
-        {
-          // drop shard cr refs under lock
-          std::lock_guard<std::mutex> lock(mutex);
-          shard_crs.clear();
-        }
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-        // advance to the next period
-        assert(next);
-        cursor = next;
-
-        // write the updated sync info
-        sync_status.sync_info.period = cursor.get_period().get_id();
-        sync_status.sync_info.realm_epoch = cursor.get_epoch();
-        yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
-                                                                 sync_env->store,
-                                                                 rgw_raw_obj(pool, sync_env->status_oid()),
-                                                                 sync_status.sync_info));
-      }
-    }
-    return 0;
-  }
-
-  void wakeup(int shard_id) {
-    std::lock_guard<std::mutex> lock(mutex);
-    auto iter = shard_crs.find(shard_id);
-    if (iter == shard_crs.end()) {
-      return;
-    }
-    iter->second.first->wakeup();
-  }
-};
-
-void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
-  env->cct = store->ctx();
-  env->store = store;
-  env->conn = conn;
-  env->async_rados = async_rados;
-  env->http_manager = &http_manager;
-  env->error_logger = error_logger;
-}
-
-int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-  // cannot run concurrently with run_sync(), so run in a separate manager
-  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
-  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
-  int ret = http_manager.set_threaded();
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
-    return ret;
-  }
-  RGWMetaSyncEnv sync_env_local = sync_env;
-  sync_env_local.http_manager = &http_manager;
-  ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
-  http_manager.stop();
-  return ret;
-}
-
-int RGWRemoteMetaLog::init_sync_status()
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
-  rgw_mdlog_info mdlog_info;
-  int r = read_log_info(&mdlog_info);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
-    return r;
-  }
-
-  rgw_meta_sync_info sync_info;
-  sync_info.num_shards = mdlog_info.num_shards;
-  auto cursor = store->period_history->get_current();
-  if (cursor) {
-    sync_info.period = cursor.get_period().get_id();
-    sync_info.realm_epoch = cursor.get_epoch();
-  }
-
-  return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
-}
-
-int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
-{
-  return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store,
-                                                           rgw_raw_obj(store->get_zone_params().log_pool, sync_env.status_oid()),
-                                                           sync_info));
-}
-
-// return a cursor to the period at our sync position
-static RGWPeriodHistory::Cursor get_period_at(RGWRados* store,
-                                              const rgw_meta_sync_info& info)
-{
-  if (info.period.empty()) {
-    // return an empty cursor with error=0
-    return RGWPeriodHistory::Cursor{};
-  }
-
-  // look for an existing period in our history
-  auto cursor = store->period_history->lookup(info.realm_epoch);
-  if (cursor) {
-    // verify that the period ids match
-    auto& existing = cursor.get_period().get_id();
-    if (existing != info.period) {
-      lderr(store->ctx()) << "ERROR: sync status period=" << info.period
-          << " does not match period=" << existing
-          << " in history at realm epoch=" << info.realm_epoch << dendl;
-      return RGWPeriodHistory::Cursor{-EEXIST};
-    }
-    return cursor;
-  }
-
-  // read the period from rados or pull it from the master
-  RGWPeriod period;
-  int r = store->period_puller->pull(info.period, period);
-  if (r < 0) {
-    lderr(store->ctx()) << "ERROR: failed to read period id "
-        << info.period << ": " << cpp_strerror(r) << dendl;
-    return RGWPeriodHistory::Cursor{r};
-  }
-  // attach the period to our history
-  cursor = store->period_history->attach(std::move(period));
-  if (!cursor) {
-    r = cursor.get_error();
-    lderr(store->ctx()) << "ERROR: failed to read period history back to "
-        << info.period << ": " << cpp_strerror(r) << dendl;
-  }
-  return cursor;
-}
-
-int RGWRemoteMetaLog::run_sync()
-{
-  if (store->is_meta_master()) {
-    return 0;
-  }
-
-  int r = 0;
-
-  // get shard count and oldest log period from master
-  rgw_mdlog_info mdlog_info;
-  for (;;) {
-    if (going_down) {
-      ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
-      return 0;
-    }
-    r = read_log_info(&mdlog_info);
-    if (r == -EIO || r == -ENOENT) {
-      // keep retrying if master isn't alive or hasn't initialized the log
-      ldout(store->ctx(), 10) << __func__ << "(): waiting for master.." << dendl;
-      backoff.backoff_sleep();
-      continue;
-    }
-    backoff.reset();
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
-      return r;
-    }
-    break;
-  }
-
-  rgw_meta_sync_status sync_status;
-  do {
-    if (going_down) {
-      ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
-      return 0;
-    }
-    r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
-    if (r < 0 && r != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
-      return r;
-    }
-
-    if (!mdlog_info.period.empty()) {
-      // restart sync if the remote has a period, but:
-      // a) our status does not, or
-      // b) our sync period comes before the remote's oldest log period
-      if (sync_status.sync_info.period.empty() ||
-          sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
-        sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
-        ldout(store->ctx(), 1) << "epoch=" << sync_status.sync_info.realm_epoch
-           << " in sync status comes before remote's oldest mdlog epoch="
-           << mdlog_info.realm_epoch << ", restarting sync" << dendl;
-      }
-    }
-
-    if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
-      ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
-      sync_status.sync_info.num_shards = mdlog_info.num_shards;
-      auto cursor = store->period_history->get_current();
-      if (cursor) {
-        // run full sync, then start incremental from the current period/epoch
-        sync_status.sync_info.period = cursor.get_period().get_id();
-        sync_status.sync_info.realm_epoch = cursor.get_epoch();
-      }
-      r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
-      if (r == -EBUSY) {
-        backoff.backoff_sleep();
-        continue;
-      }
-      backoff.reset();
-      if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl;
-        return r;
-      }
-    }
-  } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
-
-  auto num_shards = sync_status.sync_info.num_shards;
-  if (num_shards != mdlog_info.num_shards) {
-    lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
-    return -EINVAL;
-  }
-
-  RGWPeriodHistory::Cursor cursor;
-  do {
-    r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
-    if (r < 0 && r != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
-      return r;
-    }
-
-    switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
-      case rgw_meta_sync_info::StateBuildingFullSyncMaps:
-        ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
-        r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers));
-        if (r == -EBUSY || r == -EAGAIN) {
-          backoff.backoff_sleep();
-          continue;
-        }
-        backoff.reset();
-        if (r < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
-          return r;
-        }
-
-        sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
-        r = store_sync_info(sync_status.sync_info);
-        if (r < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
-          return r;
-        }
-        /* fall through */
-      case rgw_meta_sync_info::StateSync:
-        ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
-        // find our position in the period history (if any)
-        cursor = get_period_at(store, sync_status.sync_info);
-        r = cursor.get_error();
-        if (r < 0) {
-          return r;
-        }
-        meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status);
-        r = run(meta_sync_cr);
-        if (r < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
-          return r;
-        }
-        break;
-      default:
-        ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
-        return -EIO;
-    }
-  } while (!going_down);
-
-  return 0;
-}
-
-void RGWRemoteMetaLog::wakeup(int shard_id)
-{
-  if (!meta_sync_cr) {
-    return;
-  }
-  meta_sync_cr->wakeup(shard_id);
-}
-
-int RGWCloneMetaLogCoroutine::operate()
-{
-  reenter(this) {
-    do {
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
-        return state_init();
-      }
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
-        return state_read_shard_status();
-      }
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
-        return state_read_shard_status_complete();
-      }
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
-        return state_send_rest_request();
-      }
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
-        return state_receive_rest_response();
-      }
-      yield {
-        ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
-        return state_store_mdlog_entries();
-      }
-    } while (truncated);
-    yield {
-      ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
-      return state_store_mdlog_entries_complete();
-    }
-  }
-
-  return 0;
-}
-
-int RGWCloneMetaLogCoroutine::state_init()
-{
-  data = rgw_mdlog_shard_data();
-
-  return 0;
-}
-
-int RGWCloneMetaLogCoroutine::state_read_shard_status()
-{
-  const bool add_ref = false; // default constructs with refs=1
-
-  completion.reset(new RGWMetadataLogInfoCompletion(
-    [this](int ret, const cls_log_header& header) {
-      if (ret < 0) {
-        ldout(cct, 1) << "ERROR: failed to read mdlog info with "
-            << cpp_strerror(ret) << dendl;
-      } else {
-        shard_info.marker = header.max_marker;
-        shard_info.last_update = header.max_time.to_real_time();
-      }
-      // wake up parent stack
-      stack->get_completion_mgr()->complete(nullptr, stack);
-    }), add_ref);
-
-  int ret = mdlog->get_info_async(shard_id, completion.get());
-  if (ret < 0) {
-    ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
-    return set_cr_error(ret);
-  }
-
-  return io_block(0);
-}
-
-int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
-{
-  completion.reset();
-
-  ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
-
-  marker = shard_info.marker;
-
-  return 0;
-}
-
-int RGWCloneMetaLogCoroutine::state_send_rest_request()
-{
-  RGWRESTConn *conn = sync_env->conn;
-
-  char buf[32];
-  snprintf(buf, sizeof(buf), "%d", shard_id);
-
-  char max_entries_buf[32];
-  snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
-
-  const char *marker_key = (marker.empty() ? "" : "marker");
-
-  rgw_http_param_pair pairs[] = { { "type", "metadata" },
-                                  { "id", buf },
-                                  { "period", period.c_str() },
-                                  { "max-entries", max_entries_buf },
-                                  { marker_key, marker.c_str() },
-                                  { NULL, NULL } };
-
-  http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
-
-  http_op->set_user_info((void *)stack);
-
-  int ret = http_op->aio_read();
-  if (ret < 0) {
-    ldout(cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
-    log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
-    http_op->put();
-    http_op = NULL;
-    return ret;
-  }
-
-  return io_block(0);
-}
-
-int RGWCloneMetaLogCoroutine::state_receive_rest_response()
-{
-  int ret = http_op->wait(&data);
-  if (ret < 0) {
-    error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
-    ldout(cct, 5) << "failed to wait for op, ret=" << ret << dendl;
-    http_op->put();
-    http_op = NULL;
-    return set_cr_error(ret);
-  }
-  http_op->put();
-  http_op = NULL;
-
-  ldout(cct, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
-
-  truncated = ((int)data.entries.size() == max_entries);
-
-  if (data.entries.empty()) {
-    if (new_marker) {
-      *new_marker = marker;
-    }
-    return set_cr_done();
-  }
-
-  if (new_marker) {
-    *new_marker = data.entries.back().id;
-  }
-
-  return 0;
-}
-
-
-int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
-{
-  list<cls_log_entry> dest_entries;
-
-  vector<rgw_mdlog_entry>::iterator iter;
-  for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
-    rgw_mdlog_entry& entry = *iter;
-    ldout(cct, 20) << "entry: name=" << entry.name << dendl;
-
-    cls_log_entry dest_entry;
-    dest_entry.id = entry.id;
-    dest_entry.section = entry.section;
-    dest_entry.name = entry.name;
-    dest_entry.timestamp = utime_t(entry.timestamp);
-  
-    ::encode(entry.log_data, dest_entry.data);
-
-    dest_entries.push_back(dest_entry);
-
-    marker = entry.id;
-  }
-
-  RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
-
-  int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
-  if (ret < 0) {
-    cn->put();
-    ldout(cct, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
-    return set_cr_error(ret);
-  }
-  return io_block(0);
-}
-
-int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
-{
-  return set_cr_done();
-}
-
-
-// TODO: move into rgw_sync_trim.cc
-#undef dout_prefix
-#define dout_prefix (*_dout << "meta trim: ")
-
-/// purge all log shards for the given mdlog
-class PurgeLogShardsCR : public RGWShardCollectCR {
-  RGWRados *const store;
-  const RGWMetadataLog* mdlog;
-  const int num_shards;
-  rgw_raw_obj obj;
-  int i{0};
-
-  static constexpr int max_concurrent = 16;
-
- public:
-  PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
-                   const rgw_pool& pool, int num_shards)
-    : RGWShardCollectCR(store->ctx(), max_concurrent),
-      store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
-  {}
-
-  bool spawn_next() override {
-    if (i == num_shards) {
-      return false;
-    }
-    mdlog->get_shard_oid(i++, obj.oid);
-    spawn(new RGWRadosRemoveCR(store, obj), false);
-    return true;
-  }
-};
-
-using Cursor = RGWPeriodHistory::Cursor;
-
-/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
-class PurgePeriodLogsCR : public RGWCoroutine {
-  RGWRados *const store;
-  RGWMetadataManager *const metadata;
-  RGWObjVersionTracker objv;
-  Cursor cursor;
-  epoch_t realm_epoch;
-  epoch_t *last_trim_epoch; //< update last trim on success
-
- public:
-  PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
-    : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
-      realm_epoch(realm_epoch), last_trim_epoch(last_trim)
-  {}
-
-  int operate();
-};
-
-int PurgePeriodLogsCR::operate()
-{
-  reenter(this) {
-    // read our current oldest log period
-    yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
-    if (retcode < 0) {
-      return set_cr_error(retcode);
-    }
-    assert(cursor);
-    ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
-        << " period=" << cursor.get_period().get_id() << dendl;
-
-    // trim -up to- the given realm_epoch
-    while (cursor.get_epoch() < realm_epoch) {
-      ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
-          << " period=" << cursor.get_period().get_id() << dendl;
-      yield {
-        const auto mdlog = metadata->get_log(cursor.get_period().get_id());
-        const auto& pool = store->get_zone_params().log_pool;
-        auto num_shards = cct->_conf->rgw_md_log_max_shards;
-        call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
-      }
-      if (retcode < 0) {
-        ldout(cct, 1) << "failed to remove log shards: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
-          << " period=" << cursor.get_period().get_id() << dendl;
-
-      // update our mdlog history
-      yield call(metadata->trim_log_period_cr(cursor, &objv));
-      if (retcode == -ENOENT) {
-        // must have raced to update mdlog history. return success and allow the
-        // winner to continue purging
-        ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
-            << " period=" << cursor.get_period().get_id() << dendl;
-        return set_cr_done();
-      } else if (retcode < 0) {
-        ldout(cct, 1) << "failed to remove log shards for realm_epoch="
-            << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
-            << " with: " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-
-      if (*last_trim_epoch < cursor.get_epoch()) {
-        *last_trim_epoch = cursor.get_epoch();
-      }
-
-      assert(cursor.has_next()); // get_current() should always come after
-      cursor.next();
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-namespace {
-
-using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
-
-/// construct a RGWRESTConn for each zone in the realm
-template <typename Zonegroups>
-connection_map make_peer_connections(RGWRados *store,
-                                     const Zonegroups& zonegroups)
-{
-  connection_map connections;
-  for (auto& g : zonegroups) {
-    for (auto& z : g.second.zones) {
-      std::unique_ptr<RGWRESTConn> conn{
-        new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
-      connections.emplace(z.first, std::move(conn));
-    }
-  }
-  return connections;
-}
-
-/// return the marker that it's safe to trim up to
-const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
-{
-  return m.state == m.FullSync ? m.next_step_marker : m.marker;
-}
-
-/// comparison operator for take_min_status()
-bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
-{
-  // sort by stable marker
-  return get_stable_marker(lhs) < get_stable_marker(rhs);
-}
-
-/// populate the status with the minimum stable marker of each shard for any
-/// peer whose realm_epoch matches the minimum realm_epoch in the input
-template <typename Iter>
-int take_min_status(CephContext *cct, Iter first, Iter last,
-                    rgw_meta_sync_status *status)
-{
-  if (first == last) {
-    return -EINVAL;
-  }
-  const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
-
-  status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
-  for (auto p = first; p != last; ++p) {
-    // validate peer's shard count
-    if (p->sync_markers.size() != num_shards) {
-      ldout(cct, 1) << "take_min_status got peer status with "
-          << p->sync_markers.size() << " shards, expected "
-          << num_shards << dendl;
-      return -EINVAL;
-    }
-    if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
-      // earlier epoch, take its entire status
-      *status = std::move(*p);
-    } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
-      // same epoch, take any earlier markers
-      auto m = status->sync_markers.begin();
-      for (auto& shard : p->sync_markers) {
-        if (shard.second < m->second) {
-          m->second = std::move(shard.second);
-        }
-        ++m;
-      }
-    }
-  }
-  return 0;
-}
-
-struct TrimEnv {
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  int num_shards;
-  const std::string& zone;
-  Cursor current; //< cursor to current period
-  epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
-
-  TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : store(store), http(http), num_shards(num_shards),
-      zone(store->get_zone_params().get_id()),
-      current(store->period_history->get_current())
-  {}
-};
-
-struct MasterTrimEnv : public TrimEnv {
-  connection_map connections; //< peer connections
-  std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
-  /// last trim marker for each shard, only applies to current period's mdlog
-  std::vector<std::string> last_trim_markers;
-
-  MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : TrimEnv(store, http, num_shards),
-      last_trim_markers(num_shards)
-  {
-    auto& period = current.get_period();
-    connections = make_peer_connections(store, period.get_map().zonegroups);
-    connections.erase(zone);
-    peer_status.resize(connections.size());
-  }
-};
-
-struct PeerTrimEnv : public TrimEnv {
-  /// last trim timestamp for each shard, only applies to current period's mdlog
-  std::vector<ceph::real_time> last_trim_timestamps;
-
-  PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : TrimEnv(store, http, num_shards),
-      last_trim_timestamps(num_shards)
-  {}
-
-  void set_num_shards(int num_shards) {
-    this->num_shards = num_shards;
-    last_trim_timestamps.resize(num_shards);
-  }
-};
-
-} // anonymous namespace
-
-
-/// spawn a trim cr for each shard that needs it, while limiting the number
-/// of concurrent shards
-class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
- private:
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  MasterTrimEnv& env;
-  RGWMetadataLog *mdlog;
-  int shard_id{0};
-  std::string oid;
-  const rgw_meta_sync_status& sync_status;
-
- public:
-  MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
-                               const rgw_meta_sync_status& sync_status)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), mdlog(mdlog), sync_status(sync_status)
-  {}
-
-  bool spawn_next() override;
-};
-
-bool MetaMasterTrimShardCollectCR::spawn_next()
-{
-  while (shard_id < env.num_shards) {
-    auto m = sync_status.sync_markers.find(shard_id);
-    if (m == sync_status.sync_markers.end()) {
-      shard_id++;
-      continue;
-    }
-    auto& stable = get_stable_marker(m->second);
-    auto& last_trim = env.last_trim_markers[shard_id];
-
-    if (stable <= last_trim) {
-      // already trimmed
-      ldout(cct, 20) << "skipping log shard " << shard_id
-          << " at marker=" << stable
-          << " last_trim=" << last_trim
-          << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
-      shard_id++;
-      continue;
-    }
-
-    mdlog->get_shard_oid(shard_id, oid);
-
-    ldout(cct, 10) << "trimming log shard " << shard_id
-        << " at marker=" << stable
-        << " last_trim=" << last_trim
-        << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
-    spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
-    shard_id++;
-    return true;
-  }
-  return false;
-}
-
-/// spawn rest requests to read each peer's sync status
-class MetaMasterStatusCollectCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  MasterTrimEnv& env;
-  connection_map::iterator c;
-  std::vector<rgw_meta_sync_status>::iterator s;
- public:
-  MetaMasterStatusCollectCR(MasterTrimEnv& env)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), c(env.connections.begin()), s(env.peer_status.begin())
-  {}
-
-  bool spawn_next() override {
-    if (c == env.connections.end()) {
-      return false;
-    }
-    static rgw_http_param_pair params[] = {
-      { "type", "metadata" },
-      { "status", nullptr },
-      { nullptr, nullptr }
-    };
-
-    ldout(cct, 20) << "query sync status from " << c->first << dendl;
-    auto conn = c->second.get();
-    using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
-    spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
-          false);
-    ++c;
-    ++s;
-    return true;
-  }
-};
-
-class MetaMasterTrimCR : public RGWCoroutine {
-  MasterTrimEnv& env;
-  rgw_meta_sync_status min_status; //< minimum sync status of all peers
-  int ret{0};
-
- public:
-  MetaMasterTrimCR(MasterTrimEnv& env)
-    : RGWCoroutine(env.store->ctx()), env(env)
-  {}
-
-  int operate();
-};
-
-int MetaMasterTrimCR::operate()
-{
-  reenter(this) {
-    // TODO: detect this and fail before we spawn the trim thread?
-    if (env.connections.empty()) {
-      ldout(cct, 4) << "no peers, exiting" << dendl;
-      return set_cr_done();
-    }
-
-    ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
-    // query mdlog sync status from peers
-    yield call(new MetaMasterStatusCollectCR(env));
-
-    // must get a successful reply from all peers to consider trimming
-    if (ret < 0) {
-      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
-      return set_cr_error(ret);
-    }
-
-    // determine the minimum epoch and markers
-    ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
-                          env.peer_status.end(), &min_status);
-    if (ret < 0) {
-      ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
-      return set_cr_error(ret);
-    }
-    yield {
-      auto store = env.store;
-      auto epoch = min_status.sync_info.realm_epoch;
-      ldout(cct, 4) << "realm epoch min=" << epoch
-          << " current=" << env.current.get_epoch()<< dendl;
-      if (epoch > env.last_trim_epoch + 1) {
-        // delete any prior mdlog periods
-        spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
-      } else {
-        ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
-            << env.last_trim_epoch << dendl;
-      }
-
-      // if realm_epoch == current, trim mdlog based on markers
-      if (epoch == env.current.get_epoch()) {
-        auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
-        spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
-      }
-    }
-    // ignore any errors during purge/trim because we want to hold the lock open
-    return set_cr_done();
-  }
-  return 0;
-}
-
-
-/// read the first entry of the master's mdlog shard and trim to that position
-class MetaPeerTrimShardCR : public RGWCoroutine {
-  RGWMetaSyncEnv& env;
-  RGWMetadataLog *mdlog;
-  const std::string& period_id;
-  const int shard_id;
-  RGWMetadataLogInfo info;
-  ceph::real_time stable; //< safe timestamp to trim, according to master
-  ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
-  rgw_mdlog_shard_data result; //< result from master's mdlog listing
-
- public:
-  MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
-                      const std::string& period_id, int shard_id,
-                      ceph::real_time *last_trim)
-    : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
-      period_id(period_id), shard_id(shard_id), last_trim(last_trim)
-  {}
-
-  int operate() override;
-};
-
-int MetaPeerTrimShardCR::operate()
-{
-  reenter(this) {
-    // query master's first mdlog entry for this shard
-    yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
-                                             "", 1, &result));
-    if (retcode < 0) {
-      ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
-          << shard_id << " for period " << period_id
-          << ": " << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    if (result.entries.empty()) {
-      // if there are no mdlog entries, we don't have a timestamp to compare. we
-      // can't just trim everything, because there could be racing updates since
-      // this empty reply. query the mdlog shard info to read its max timestamp,
-      // then retry the listing to make sure it's still empty before trimming to
-      // that
-      ldout(cct, 10) << "empty master mdlog shard " << shard_id
-          << ", reading last timestamp from shard info" << dendl;
-      // read the mdlog shard info for the last timestamp
-      using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
-      yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
-      if (retcode < 0) {
-        ldout(cct, 5) << "failed to read info from master's mdlog shard "
-            << shard_id << " for period " << period_id
-            << ": " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      if (ceph::real_clock::is_zero(info.last_update)) {
-        return set_cr_done(); // nothing to trim
-      }
-      ldout(cct, 10) << "got mdlog shard info with last update="
-          << info.last_update << dendl;
-      // re-read the master's first mdlog entry to make sure it hasn't changed
-      yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
-                                               "", 1, &result));
-      if (retcode < 0) {
-        ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
-            << shard_id << " for period " << period_id
-            << ": " << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      // if the mdlog is still empty, trim to max marker
-      if (result.entries.empty()) {
-        stable = info.last_update;
-      } else {
-        stable = result.entries.front().timestamp;
-
-        // can only trim -up to- master's first timestamp, so subtract a second.
-        // (this is why we use timestamps instead of markers for the peers)
-        stable -= std::chrono::seconds(1);
-      }
-    } else {
-      stable = result.entries.front().timestamp;
-      stable -= std::chrono::seconds(1);
-    }
-
-    if (stable <= *last_trim) {
-      ldout(cct, 10) << "skipping log shard " << shard_id
-          << " at timestamp=" << stable
-          << " last_trim=" << *last_trim << dendl;
-      return set_cr_done();
-    }
-
-    ldout(cct, 10) << "trimming log shard " << shard_id
-        << " at timestamp=" << stable
-        << " last_trim=" << *last_trim << dendl;
-    yield {
-      std::string oid;
-      mdlog->get_shard_oid(shard_id, oid);
-      call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
-    }
-    if (retcode < 0 && retcode != -ENODATA) {
-      ldout(cct, 1) << "failed to trim mdlog shard " << shard_id
-          << ": " << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-    *last_trim = stable;
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-
-  PeerTrimEnv& env;
-  RGWMetadataLog *mdlog;
-  const std::string& period_id;
-  RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
-  int shard_id{0};
-
- public:
-  MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
-    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
-      env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
-  {
-    meta_env.init(cct, env.store, env.store->rest_master_conn,
-                  env.store->get_async_rados(), env.http, nullptr);
-  }
-
-  bool spawn_next() override;
-};
-
-bool MetaPeerTrimShardCollectCR::spawn_next()
-{
-  if (shard_id >= env.num_shards) {
-    return false;
-  }
-  auto& last_trim = env.last_trim_timestamps[shard_id];
-  spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
-        false);
-  shard_id++;
-  return true;
-}
-
-class MetaPeerTrimCR : public RGWCoroutine {
-  PeerTrimEnv& env;
-  rgw_mdlog_info mdlog_info; //< master's mdlog info
-
- public:
-  MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
-
-  int operate();
-};
-
-int MetaPeerTrimCR::operate()
-{
-  reenter(this) {
-    ldout(cct, 10) << "fetching master mdlog info" << dendl;
-    yield {
-      // query mdlog_info from master for oldest_log_period
-      rgw_http_param_pair params[] = {
-        { "type", "metadata" },
-        { nullptr, nullptr }
-      };
-
-      using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
-      call(new LogInfoCR(cct, env.store->rest_master_conn, env.http,
-                         "/admin/log/", params, &mdlog_info));
-    }
-    if (retcode < 0) {
-      ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
-      return set_cr_error(retcode);
-    }
-    // use master's shard count instead
-    env.set_num_shards(mdlog_info.num_shards);
-
-    if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
-      // delete any prior mdlog periods
-      yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
-                                       &env.last_trim_epoch));
-    } else {
-      ldout(cct, 10) << "mdlogs already purged through realm_epoch "
-          << env.last_trim_epoch << dendl;
-    }
-
-    // if realm_epoch == current, trim mdlog based on master's markers
-    if (mdlog_info.realm_epoch == env.current.get_epoch()) {
-      yield {
-        auto meta_mgr = env.store->meta_mgr;
-        auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
-        call(new MetaPeerTrimShardCollectCR(env, mdlog));
-        // ignore any errors during purge/trim because we want to hold the lock open
-      }
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class MetaTrimPollCR : public RGWCoroutine {
-  RGWRados *const store;
-  const utime_t interval; //< polling interval
-  const rgw_raw_obj obj;
-  const std::string name{"meta_trim"}; //< lock name
-  const std::string cookie;
-
- protected:
-  /// allocate the coroutine to run within the lease
-  virtual RGWCoroutine* alloc_cr() = 0;
-
- public:
-  MetaTrimPollCR(RGWRados *store, utime_t interval)
-    : RGWCoroutine(store->ctx()), store(store), interval(interval),
-      obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
-      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
-  {}
-
-  int operate();
-};
-
-int MetaTrimPollCR::operate()
-{
-  reenter(this) {
-    for (;;) {
-      set_status("sleeping");
-      wait(interval);
-
-      // prevent others from trimming for our entire wait interval
-      set_status("acquiring trim lock");
-      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
-                                          obj, name, cookie, interval.sec()));
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
-        continue;
-      }
-
-      set_status("trimming");
-      yield call(alloc_cr());
-
-      if (retcode < 0) {
-        // on errors, unlock so other gateways can try
-        set_status("unlocking");
-        yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
-                                              obj, name, cookie));
-      }
-    }
-  }
-  return 0;
-}
-
-class MetaMasterTrimPollCR : public MetaTrimPollCR  {
-  MasterTrimEnv env; //< trim state to share between calls
-  RGWCoroutine* alloc_cr() override {
-    return new MetaMasterTrimCR(env);
-  }
- public:
-  MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http,
-                       int num_shards, utime_t interval)
-    : MetaTrimPollCR(store, interval),
-      env(store, http, num_shards)
-  {}
-};
-
-class MetaPeerTrimPollCR : public MetaTrimPollCR {
-  PeerTrimEnv env; //< trim state to share between calls
-  RGWCoroutine* alloc_cr() override {
-    return new MetaPeerTrimCR(env);
-  }
- public:
-  MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http,
-                     int num_shards, utime_t interval)
-    : MetaTrimPollCR(store, interval),
-      env(store, http, num_shards)
-  {}
-};
-
-RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
-                                      int num_shards, utime_t interval)
-{
-  if (store->is_meta_master()) {
-    return new MetaMasterTrimPollCR(store, http, num_shards, interval);
-  }
-  return new MetaPeerTrimPollCR(store, http, num_shards, interval);
-}
-
-
-struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
-  MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : MasterTrimEnv(store, http, num_shards),
-      MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
-  {}
-};
-
-struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
-  MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
-    : PeerTrimEnv(store, http, num_shards),
-      MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
-  {}
-};
-
-RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
-                                            RGWHTTPManager *http,
-                                            int num_shards)
-{
-  if (store->is_meta_master()) {
-    return new MetaMasterAdminTrimCR(store, http, num_shards);
-  }
-  return new MetaPeerAdminTrimCR(store, http, num_shards);
-}