Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/optional.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/admin_socket.h"
12 #include "common/errno.h"
13
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
16 #include "rgw_sync.h"
17 #include "rgw_metadata.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_tools.h"
20 #include "rgw_cr_rados.h"
21 #include "rgw_cr_rest.h"
22 #include "rgw_http_client.h"
23
24 #include "cls/lock/cls_lock_client.h"
25
26 #include <boost/asio/yield.hpp>
27
28 #define dout_subsys ceph_subsys_rgw
29
30 #undef dout_prefix
31 #define dout_prefix (*_dout << "meta sync: ")
32
33 static string mdlog_sync_status_oid = "mdlog.sync-status";
34 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
35 static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
36
37 RGWSyncErrorLogger::RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) {
38   for (int i = 0; i < num_shards; i++) {
39     oids.push_back(get_shard_oid(oid_prefix, i));
40   }
41 }
42 string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) {
43   char buf[oid_prefix.size() + 16];
44   snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id);
45   return string(buf);
46 }
47
48 RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) {
49   cls_log_entry entry;
50
51   rgw_sync_error_info info(source_zone, error_code, message);
52   bufferlist bl;
53   ::encode(info, bl);
54   store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
55
56   uint32_t shard_id = ++counter % num_shards;
57
58
59   return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
60 }
61
62 void RGWSyncBackoff::update_wait_time()
63 {
64   if (cur_wait == 0) {
65     cur_wait = 1;
66   } else {
67     cur_wait = (cur_wait << 1);
68   }
69   if (cur_wait >= max_secs) {
70     cur_wait = max_secs;
71   }
72 }
73
74 void RGWSyncBackoff::backoff_sleep()
75 {
76   update_wait_time();
77   sleep(cur_wait);
78 }
79
80 void RGWSyncBackoff::backoff(RGWCoroutine *op)
81 {
82   update_wait_time();
83   op->wait(utime_t(cur_wait, 0));
84 }
85
86 int RGWBackoffControlCR::operate() {
87   reenter(this) {
88     // retry the operation until it succeeds
89     while (true) {
90       yield {
91         Mutex::Locker l(lock);
92         cr = alloc_cr();
93         cr->get();
94         call(cr);
95       }
96       {
97         Mutex::Locker l(lock);
98         cr->put();
99         cr = NULL;
100       }
101       if (retcode >= 0) {
102         break;
103       }
104       if (retcode != -EBUSY && retcode != -EAGAIN) {
105         ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl;
106         if (exit_on_error) {
107           return set_cr_error(retcode);
108         }
109       }
110       if (reset_backoff) {
111         backoff.reset();
112       }
113       yield backoff.backoff(this);
114     }
115
116     // run an optional finisher
117     yield call(alloc_finisher_cr());
118     if (retcode < 0) {
119       ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl;
120       return set_cr_error(retcode);
121     }
122     return set_cr_done();
123   }
124   return 0;
125 }
126
127 void rgw_mdlog_info::decode_json(JSONObj *obj) {
128   JSONDecoder::decode_json("num_objects", num_shards, obj);
129   JSONDecoder::decode_json("period", period, obj);
130   JSONDecoder::decode_json("realm_epoch", realm_epoch, obj);
131 }
132
133 void rgw_mdlog_entry::decode_json(JSONObj *obj) {
134   JSONDecoder::decode_json("id", id, obj);
135   JSONDecoder::decode_json("section", section, obj);
136   JSONDecoder::decode_json("name", name, obj);
137   utime_t ut;
138   JSONDecoder::decode_json("timestamp", ut, obj);
139   timestamp = ut.to_real_time();
140   JSONDecoder::decode_json("data", log_data, obj);
141 }
142
143 void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
144   JSONDecoder::decode_json("marker", marker, obj);
145   JSONDecoder::decode_json("truncated", truncated, obj);
146   JSONDecoder::decode_json("entries", entries, obj);
147 };
148
149 int RGWShardCollectCR::operate() {
150   reenter(this) {
151     while (spawn_next()) {
152       current_running++;
153
154       while (current_running >= max_concurrent) {
155         int child_ret;
156         yield wait_for_child();
157         if (collect_next(&child_ret)) {
158           current_running--;
159           if (child_ret < 0 && child_ret != -ENOENT) {
160             ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
161             status = child_ret;
162           }
163         }
164       }
165     }
166     while (current_running > 0) {
167       int child_ret;
168       yield wait_for_child();
169       if (collect_next(&child_ret)) {
170         current_running--;
171         if (child_ret < 0 && child_ret != -ENOENT) {
172           ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
173           status = child_ret;
174         }
175       }
176     }
177     if (status < 0) {
178       return set_cr_error(status);
179     }
180     return set_cr_done();
181   }
182   return 0;
183 }
184
185 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
186   RGWMetaSyncEnv *sync_env;
187
188   const std::string& period;
189   int num_shards;
190   map<int, RGWMetadataLogInfo> *mdlog_info;
191
192   int shard_id;
193 #define READ_MDLOG_MAX_CONCURRENT 10
194
195 public:
196   RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env,
197                      const std::string& period, int _num_shards,
198                      map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
199                                                                  sync_env(_sync_env),
200                                                                  period(period), num_shards(_num_shards),
201                                                                  mdlog_info(_mdlog_info), shard_id(0) {}
202   bool spawn_next() override;
203 };
204
205 class RGWListRemoteMDLogCR : public RGWShardCollectCR {
206   RGWMetaSyncEnv *sync_env;
207
208   const std::string& period;
209   map<int, string> shards;
210   int max_entries_per_shard;
211   map<int, rgw_mdlog_shard_data> *result;
212
213   map<int, string>::iterator iter;
214 #define READ_MDLOG_MAX_CONCURRENT 10
215
216 public:
217   RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env,
218                      const std::string& period, map<int, string>& _shards,
219                      int _max_entries_per_shard,
220                      map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
221                                                                  sync_env(_sync_env), period(period),
222                                                                  max_entries_per_shard(_max_entries_per_shard),
223                                                                  result(_result) {
224     shards.swap(_shards);
225     iter = shards.begin();
226   }
227   bool spawn_next() override;
228 };
229
230 RGWRemoteMetaLog::~RGWRemoteMetaLog()
231 {
232   delete error_logger;
233 }
234
235 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
236 {
237   rgw_http_param_pair pairs[] = { { "type", "metadata" },
238                                   { NULL, NULL } };
239
240   int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
241   if (ret < 0) {
242     ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
243     return ret;
244   }
245
246   ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl;
247
248   return 0;
249 }
250
251 int RGWRemoteMetaLog::read_master_log_shards_info(const string &master_period, map<int, RGWMetadataLogInfo> *shards_info)
252 {
253   if (store->is_meta_master()) {
254     return 0;
255   }
256
257   rgw_mdlog_info log_info;
258   int ret = read_log_info(&log_info);
259   if (ret < 0) {
260     return ret;
261   }
262
263   return run(new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info));
264 }
265
266 int RGWRemoteMetaLog::read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result)
267 {
268   if (store->is_meta_master()) {
269     return 0;
270   }
271
272   return run(new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result));
273 }
274
275 int RGWRemoteMetaLog::init()
276 {
277   conn = store->rest_master_conn;
278
279   int ret = http_manager.set_threaded();
280   if (ret < 0) {
281     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
282     return ret;
283   }
284
285   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
286
287   init_sync_env(&sync_env);
288
289   return 0;
290 }
291
292 void RGWRemoteMetaLog::finish()
293 {
294   going_down = true;
295   stop();
296 }
297
298 #define CLONE_MAX_ENTRIES 100
299
300 int RGWMetaSyncStatusManager::init()
301 {
302   if (store->is_meta_master()) {
303     return 0;
304   }
305
306   if (!store->rest_master_conn) {
307     lderr(store->ctx()) << "no REST connection to master zone" << dendl;
308     return -EIO;
309   }
310
311   int r = rgw_init_ioctx(store->get_rados_handle(), store->get_zone_params().log_pool, ioctx, true);
312   if (r < 0) {
313     lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool << " ret=" << r << dendl;
314     return r;
315   }
316
317   r = master_log.init();
318   if (r < 0) {
319     lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
320     return r;
321   }
322
323   RGWMetaSyncEnv& sync_env = master_log.get_sync_env();
324
325   rgw_meta_sync_status sync_status;
326   r = read_sync_status(&sync_status);
327   if (r < 0 && r != -ENOENT) {
328     lderr(store->ctx()) << "ERROR: failed to read sync status, r=" << r << dendl;
329     return r;
330   }
331
332   int num_shards = sync_status.sync_info.num_shards;
333
334   for (int i = 0; i < num_shards; i++) {
335     shard_objs[i] = rgw_raw_obj(store->get_zone_params().log_pool, sync_env.shard_obj_name(i));
336   }
337
338   RWLock::WLocker wl(ts_to_shard_lock);
339   for (int i = 0; i < num_shards; i++) {
340     clone_markers.push_back(string());
341     utime_shard ut;
342     ut.shard_id = i;
343     ts_to_shard[ut] = i;
344   }
345
346   return 0;
347 }
348
349 void RGWMetaSyncEnv::init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
350                           RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
351                           RGWSyncErrorLogger *_error_logger) {
352   cct = _cct;
353   store = _store;
354   conn = _conn;
355   async_rados = _async_rados;
356   http_manager = _http_manager;
357   error_logger = _error_logger;
358 }
359
360 string RGWMetaSyncEnv::status_oid()
361 {
362   return mdlog_sync_status_oid;
363 }
364
365 string RGWMetaSyncEnv::shard_obj_name(int shard_id)
366 {
367   char buf[mdlog_sync_status_shard_prefix.size() + 16];
368   snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id);
369
370   return string(buf);
371 }
372
373 class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest {
374   RGWRados *store;
375   RGWMetadataLog *mdlog;
376   int shard_id;
377   string *marker;
378   int max_entries;
379   list<cls_log_entry> *entries;
380   bool *truncated;
381
382 protected:
383   int _send_request() override {
384     real_time from_time;
385     real_time end_time;
386
387     void *handle;
388
389     mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle);
390
391     int ret = mdlog->list_entries(handle, max_entries, *entries, marker, truncated);
392
393     mdlog->complete_list_entries(handle);
394
395     return ret;
396   }
397 public:
398   RGWAsyncReadMDLogEntries(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
399                            RGWMetadataLog* mdlog, int _shard_id,
400                            string* _marker, int _max_entries,
401                            list<cls_log_entry> *_entries, bool *_truncated)
402     : RGWAsyncRadosRequest(caller, cn), store(_store), mdlog(mdlog),
403       shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
404       entries(_entries), truncated(_truncated) {}
405 };
406
407 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
408   RGWMetaSyncEnv *sync_env;
409   RGWMetadataLog *const mdlog;
410   int shard_id;
411   string marker;
412   string *pmarker;
413   int max_entries;
414   list<cls_log_entry> *entries;
415   bool *truncated;
416
417   RGWAsyncReadMDLogEntries *req{nullptr};
418
419 public:
420   RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
421                         int _shard_id, string*_marker, int _max_entries,
422                         list<cls_log_entry> *_entries, bool *_truncated)
423     : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
424       shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
425       entries(_entries), truncated(_truncated) {}
426
427   ~RGWReadMDLogEntriesCR() override {
428     if (req) {
429       req->finish();
430     }
431   }
432
433   int send_request() override {
434     marker = *pmarker;
435     req = new RGWAsyncReadMDLogEntries(this, stack->create_completion_notifier(),
436                                        sync_env->store, mdlog, shard_id, &marker,
437                                        max_entries, entries, truncated);
438     sync_env->async_rados->queue(req);
439     return 0;
440   }
441
442   int request_complete() override {
443     int ret = req->get_ret_status();
444     if (ret >= 0 && !entries->empty()) {
445      *pmarker = marker;
446     }
447     return req->get_ret_status();
448   }
449 };
450
451
452 class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
453   RGWMetaSyncEnv *env;
454   RGWRESTReadResource *http_op;
455
456   const std::string& period;
457   int shard_id;
458   RGWMetadataLogInfo *shard_info;
459
460 public:
461   RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
462                                 int _shard_id, RGWMetadataLogInfo *_shard_info)
463     : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
464       period(period), shard_id(_shard_id), shard_info(_shard_info) {}
465
466   int operate() override {
467     auto store = env->store;
468     RGWRESTConn *conn = store->rest_master_conn;
469     reenter(this) {
470       yield {
471         char buf[16];
472         snprintf(buf, sizeof(buf), "%d", shard_id);
473         rgw_http_param_pair pairs[] = { { "type" , "metadata" },
474                                         { "id", buf },
475                                         { "period", period.c_str() },
476                                         { "info" , NULL },
477                                         { NULL, NULL } };
478
479         string p = "/admin/log/";
480
481         http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
482                                           env->http_manager);
483
484         http_op->set_user_info((void *)stack);
485
486         int ret = http_op->aio_read();
487         if (ret < 0) {
488           ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
489           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
490           http_op->put();
491           return set_cr_error(ret);
492         }
493
494         return io_block(0);
495       }
496       yield {
497         int ret = http_op->wait(shard_info);
498         http_op->put();
499         if (ret < 0) {
500           return set_cr_error(ret);
501         }
502         return set_cr_done();
503       }
504     }
505     return 0;
506   }
507 };
508
509 class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
510   RGWMetaSyncEnv *sync_env;
511   RGWRESTReadResource *http_op;
512
513   const std::string& period;
514   int shard_id;
515   string marker;
516   uint32_t max_entries;
517   rgw_mdlog_shard_data *result;
518
519 public:
520   RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
521                             int _shard_id, const string& _marker, uint32_t _max_entries,
522                             rgw_mdlog_shard_data *_result)
523     : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
524       period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
525
526   int send_request() override {
527     RGWRESTConn *conn = sync_env->conn;
528     RGWRados *store = sync_env->store;
529
530     char buf[32];
531     snprintf(buf, sizeof(buf), "%d", shard_id);
532
533     char max_entries_buf[32];
534     snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
535
536     const char *marker_key = (marker.empty() ? "" : "marker");
537
538     rgw_http_param_pair pairs[] = { { "type", "metadata" },
539       { "id", buf },
540       { "period", period.c_str() },
541       { "max-entries", max_entries_buf },
542       { marker_key, marker.c_str() },
543       { NULL, NULL } };
544
545     string p = "/admin/log/";
546
547     http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
548     http_op->set_user_info((void *)stack);
549
550     int ret = http_op->aio_read();
551     if (ret < 0) {
552       ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
553       log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
554       http_op->put();
555       return ret;
556     }
557
558     return 0;
559   }
560
561   int request_complete() override {
562     int ret = http_op->wait(result);
563     http_op->put();
564     if (ret < 0 && ret != -ENOENT) {
565       ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
566       return ret;
567     }
568     return 0;
569   }
570 };
571
572 bool RGWReadRemoteMDLogInfoCR::spawn_next() {
573   if (shard_id >= num_shards) {
574     return false;
575   }
576   spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
577   shard_id++;
578   return true;
579 }
580
581 bool RGWListRemoteMDLogCR::spawn_next() {
582   if (iter == shards.end()) {
583     return false;
584   }
585
586   spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
587   ++iter;
588   return true;
589 }
590
591 class RGWInitSyncStatusCoroutine : public RGWCoroutine {
592   RGWMetaSyncEnv *sync_env;
593
594   rgw_meta_sync_info status;
595   vector<RGWMetadataLogInfo> shards_info;
596   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
597   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
598 public:
599   RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
600                              const rgw_meta_sync_info &status)
601     : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
602       status(status), shards_info(status.num_shards),
603       lease_cr(nullptr), lease_stack(nullptr) {}
604
605   ~RGWInitSyncStatusCoroutine() override {
606     if (lease_cr) {
607       lease_cr->abort();
608     }
609   }
610
611   int operate() override {
612     int ret;
613     reenter(this) {
614       yield {
615         set_status("acquiring sync lock");
616         uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
617         string lock_name = "sync_lock";
618         RGWRados *store = sync_env->store;
619         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
620                                                 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
621                                                 lock_name, lock_duration, this));
622         lease_stack.reset(spawn(lease_cr.get(), false));
623       }
624       while (!lease_cr->is_locked()) {
625         if (lease_cr->is_done()) {
626           ldout(cct, 5) << "lease cr failed, done early " << dendl;
627           set_status("lease lock failed, early abort");
628           return set_cr_error(lease_cr->get_ret_status());
629         }
630         set_sleeping(true);
631         yield;
632       }
633       yield {
634         set_status("writing sync status");
635         RGWRados *store = sync_env->store;
636         call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
637                                                            rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
638                                                            status));
639       }
640
641       if (retcode < 0) {
642         set_status("failed to write sync status");
643         ldout(cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
644         yield lease_cr->go_down();
645         return set_cr_error(retcode);
646       }
647       /* fetch current position in logs */
648       set_status("fetching remote log position");
649       yield {
650         for (int i = 0; i < (int)status.num_shards; i++) {
651           spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
652                                                   &shards_info[i]), false);
653         }
654       }
655
656       drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
657
658       yield {
659         set_status("updating sync status");
660         for (int i = 0; i < (int)status.num_shards; i++) {
661           rgw_meta_sync_marker marker;
662           RGWMetadataLogInfo& info = shards_info[i];
663           marker.next_step_marker = info.marker;
664           marker.timestamp = info.last_update;
665           RGWRados *store = sync_env->store;
666           spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
667                                                                 store,
668                                                                 rgw_raw_obj(store->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
669                                                                 marker), true);
670         }
671       }
672       yield {
673         set_status("changing sync state: build full sync maps");
674         status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
675         RGWRados *store = sync_env->store;
676         call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store,
677                                                            rgw_raw_obj(store->get_zone_params().log_pool, sync_env->status_oid()),
678                                                            status));
679       }
680       set_status("drop lock lease");
681       yield lease_cr->go_down();
682       while (collect(&ret, NULL)) {
683         if (ret < 0) {
684           return set_cr_error(ret);
685         }
686         yield;
687       }
688       drain_all();
689       return set_cr_done();
690     }
691     return 0;
692   }
693 };
694
695 class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR {
696   static constexpr int MAX_CONCURRENT_SHARDS = 16;
697
698   RGWMetaSyncEnv *env;
699   const int num_shards;
700   int shard_id{0};
701   map<uint32_t, rgw_meta_sync_marker>& markers;
702
703  public:
704   RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards,
705                              map<uint32_t, rgw_meta_sync_marker>& markers)
706     : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
707       env(env), num_shards(num_shards), markers(markers)
708   {}
709   bool spawn_next() override;
710 };
711
712 bool RGWReadSyncStatusMarkersCR::spawn_next()
713 {
714   if (shard_id >= num_shards) {
715     return false;
716   }
717   using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
718   rgw_raw_obj obj{env->store->get_zone_params().log_pool,
719                   env->shard_obj_name(shard_id)};
720   spawn(new CR(env->async_rados, env->store, obj, &markers[shard_id]), false);
721   shard_id++;
722   return true;
723 }
724
725 class RGWReadSyncStatusCoroutine : public RGWCoroutine {
726   RGWMetaSyncEnv *sync_env;
727   rgw_meta_sync_status *sync_status;
728
729 public:
730   RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
731                              rgw_meta_sync_status *_status)
732     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
733   {}
734   int operate() override;
735 };
736
737 int RGWReadSyncStatusCoroutine::operate()
738 {
739   reenter(this) {
740     // read sync info
741     using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>;
742     yield {
743       bool empty_on_enoent = false; // fail on ENOENT
744       rgw_raw_obj obj{sync_env->store->get_zone_params().log_pool,
745                       sync_env->status_oid()};
746       call(new ReadInfoCR(sync_env->async_rados, sync_env->store, obj,
747                           &sync_status->sync_info, empty_on_enoent));
748     }
749     if (retcode < 0) {
750       ldout(sync_env->cct, 4) << "failed to read sync status info with "
751           << cpp_strerror(retcode) << dendl;
752       return set_cr_error(retcode);
753     }
754     // read shard markers
755     using ReadMarkersCR = RGWReadSyncStatusMarkersCR;
756     yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
757                                  sync_status->sync_markers));
758     if (retcode < 0) {
759       ldout(sync_env->cct, 4) << "failed to read sync status markers with "
760           << cpp_strerror(retcode) << dendl;
761       return set_cr_error(retcode);
762     }
763     return set_cr_done();
764   }
765   return 0;
766 }
767
768 class RGWFetchAllMetaCR : public RGWCoroutine {
769   RGWMetaSyncEnv *sync_env;
770
771   int num_shards;
772
773
774   int ret_status;
775
776   list<string> sections;
777   list<string>::iterator sections_iter;
778
779   struct meta_list_result {
780     list<string> keys;
781     string marker;
782     uint64_t count{0};
783     bool truncated{false};
784
785     void decode_json(JSONObj *obj) {
786       JSONDecoder::decode_json("keys", keys, obj);
787       JSONDecoder::decode_json("marker", marker, obj);
788       JSONDecoder::decode_json("count", count, obj);
789       JSONDecoder::decode_json("truncated", truncated, obj);
790     }
791   } result;
792   list<string>::iterator iter;
793
794   std::unique_ptr<RGWShardedOmapCRManager> entries_index;
795
796   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
797   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
798   bool lost_lock;
799   bool failed;
800
801   string marker;
802
803   map<uint32_t, rgw_meta_sync_marker>& markers;
804
805 public:
806   RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
807                     map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
808                                                       num_shards(_num_shards),
809                                                       ret_status(0), lease_cr(nullptr), lease_stack(nullptr),
810                                                       lost_lock(false), failed(false), markers(_markers) {
811   }
812
813   ~RGWFetchAllMetaCR() override {
814   }
815
816   void append_section_from_set(set<string>& all_sections, const string& name) {
817     set<string>::iterator iter = all_sections.find(name);
818     if (iter != all_sections.end()) {
819       sections.emplace_back(std::move(*iter));
820       all_sections.erase(iter);
821     }
822   }
823   /*
824    * meta sync should go in the following order: user, bucket.instance, bucket
825    * then whatever other sections exist (if any)
826    */
827   void rearrange_sections() {
828     set<string> all_sections;
829     std::move(sections.begin(), sections.end(),
830               std::inserter(all_sections, all_sections.end()));
831     sections.clear();
832
833     append_section_from_set(all_sections, "user");
834     append_section_from_set(all_sections, "bucket.instance");
835     append_section_from_set(all_sections, "bucket");
836
837     std::move(all_sections.begin(), all_sections.end(),
838               std::back_inserter(sections));
839   }
840
841   int operate() override {
842     RGWRESTConn *conn = sync_env->conn;
843
844     reenter(this) {
845       yield {
846         set_status(string("acquiring lock (") + sync_env->status_oid() + ")");
847         uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
848         string lock_name = "sync_lock";
849         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
850                                                 sync_env->store,
851                                                 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->status_oid()),
852                                                 lock_name, lock_duration, this));
853         lease_stack.reset(spawn(lease_cr.get(), false));
854       }
855       while (!lease_cr->is_locked()) {
856         if (lease_cr->is_done()) {
857           ldout(cct, 5) << "lease cr failed, done early " << dendl;
858           set_status("failed acquiring lock");
859           return set_cr_error(lease_cr->get_ret_status());
860         }
861         set_sleeping(true);
862         yield;
863       }
864       entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
865                                                       sync_env->store->get_zone_params().log_pool,
866                                                       mdlog_sync_full_sync_index_prefix));
867       yield {
868         call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
869                                        "/admin/metadata", NULL, &sections));
870       }
871       if (get_ret_status() < 0) {
872         ldout(cct, 0) << "ERROR: failed to fetch metadata sections" << dendl;
873         yield entries_index->finish();
874         yield lease_cr->go_down();
875         drain_all();
876         return set_cr_error(get_ret_status());
877       }
878       rearrange_sections();
879       sections_iter = sections.begin();
880       for (; sections_iter != sections.end(); ++sections_iter) {
881         do {
882           yield {
883 #define META_FULL_SYNC_CHUNK_SIZE "1000"
884             string entrypoint = string("/admin/metadata/") + *sections_iter;
885             rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
886               { "marker", result.marker.c_str() },
887               { NULL, NULL } };
888             result.keys.clear();
889             call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
890                                                               entrypoint, pairs, &result));
891           }
892           if (get_ret_status() < 0) {
893             ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
894             yield entries_index->finish();
895             yield lease_cr->go_down();
896             drain_all();
897             return set_cr_error(get_ret_status());
898           }
899           iter = result.keys.begin();
900           for (; iter != result.keys.end(); ++iter) {
901             if (!lease_cr->is_locked()) {
902               lost_lock = true;
903               break;
904             }
905             yield; // allow entries_index consumer to make progress
906
907             ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
908             string s = *sections_iter + ":" + *iter;
909             int shard_id;
910             RGWRados *store = sync_env->store;
911             int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
912             if (ret < 0) {
913               ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
914               ret_status = ret;
915               break;
916             }
917             if (!entries_index->append(s, shard_id)) {
918               break;
919             }
920           }
921         } while (result.truncated);
922       }
923       yield {
924         if (!entries_index->finish()) {
925           failed = true;
926         }
927       }
928       if (!failed) {
929         for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
930           int shard_id = (int)iter->first;
931           rgw_meta_sync_marker& marker = iter->second;
932           marker.total_entries = entries_index->get_total_entries(shard_id);
933           spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store,
934                                                                 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
935                                                                 marker), true);
936         }
937       }
938
939       drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */
940
941       yield lease_cr->go_down();
942
943       int ret;
944       while (collect(&ret, NULL)) {
945         if (ret < 0) {
946           return set_cr_error(ret);
947         }
948         yield;
949       }
950       drain_all();
951       if (failed) {
952         yield return set_cr_error(-EIO);
953       }
954       if (lost_lock) {
955         yield return set_cr_error(-EBUSY);
956       }
957
958       if (ret_status < 0) {
959         yield return set_cr_error(ret_status);
960       }
961
962       yield return set_cr_done();
963     }
964     return 0;
965   }
966 };
967
968 static string full_sync_index_shard_oid(int shard_id)
969 {
970   char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
971   snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
972   return string(buf);
973 }
974
975 class RGWReadRemoteMetadataCR : public RGWCoroutine {
976   RGWMetaSyncEnv *sync_env;
977
978   RGWRESTReadResource *http_op;
979
980   string section;
981   string key;
982
983   bufferlist *pbl;
984
985 public:
986   RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
987                                                       const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
988                                                       http_op(NULL),
989                                                       section(_section),
990                                                       key(_key),
991                                                       pbl(_pbl) {
992   }
993
994   int operate() override {
995     RGWRESTConn *conn = sync_env->conn;
996     reenter(this) {
997       yield {
998         rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
999                                         { NULL, NULL } };
1000
1001         string p = string("/admin/metadata/") + section + "/" + key;
1002
1003         http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
1004
1005         http_op->set_user_info((void *)stack);
1006
1007         int ret = http_op->aio_read();
1008         if (ret < 0) {
1009           ldout(sync_env->cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
1010           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
1011           http_op->put();
1012           return set_cr_error(ret);
1013         }
1014
1015         return io_block(0);
1016       }
1017       yield {
1018         int ret = http_op->wait_bl(pbl);
1019         http_op->put();
1020         if (ret < 0) {
1021           return set_cr_error(ret);
1022         }
1023         return set_cr_done();
1024       }
1025     }
1026     return 0;
1027   }
1028 };
1029
1030 class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest {
1031   RGWRados *store;
1032   string raw_key;
1033   bufferlist bl;
1034 protected:
1035   int _send_request() override {
1036     int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
1037     if (ret < 0) {
1038       ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
1039       return ret;
1040     }
1041     return 0;
1042   }
1043 public:
1044   RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1045                        const string& _raw_key,
1046                        bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
1047                                           raw_key(_raw_key), bl(_bl) {}
1048 };
1049
1050
1051 class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
1052   RGWMetaSyncEnv *sync_env;
1053   string raw_key;
1054   bufferlist bl;
1055
1056   RGWAsyncMetaStoreEntry *req;
1057
1058 public:
1059   RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
1060                        const string& _raw_key,
1061                        bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1062                                           raw_key(_raw_key), bl(_bl), req(NULL) {
1063   }
1064
1065   ~RGWMetaStoreEntryCR() override {
1066     if (req) {
1067       req->finish();
1068     }
1069   }
1070
1071   int send_request() override {
1072     req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(),
1073                                    sync_env->store, raw_key, bl);
1074     sync_env->async_rados->queue(req);
1075     return 0;
1076   }
1077
1078   int request_complete() override {
1079     return req->get_ret_status();
1080   }
1081 };
1082
1083 class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
1084   RGWRados *store;
1085   string raw_key;
1086 protected:
1087   int _send_request() override {
1088     int ret = store->meta_mgr->remove(raw_key);
1089     if (ret < 0) {
1090       ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
1091       return ret;
1092     }
1093     return 0;
1094   }
1095 public:
1096   RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
1097                        const string& _raw_key) : RGWAsyncRadosRequest(caller, cn), store(_store),
1098                                           raw_key(_raw_key) {}
1099 };
1100
1101
1102 class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
1103   RGWMetaSyncEnv *sync_env;
1104   string raw_key;
1105
1106   RGWAsyncMetaRemoveEntry *req;
1107
1108 public:
1109   RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
1110                        const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
1111                                           raw_key(_raw_key), req(NULL) {
1112   }
1113
1114   ~RGWMetaRemoveEntryCR() override {
1115     if (req) {
1116       req->finish();
1117     }
1118   }
1119
1120   int send_request() override {
1121     req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(),
1122                                    sync_env->store, raw_key);
1123     sync_env->async_rados->queue(req);
1124     return 0;
1125   }
1126
1127   int request_complete() override {
1128     int r = req->get_ret_status();
1129     if (r == -ENOENT) {
1130       r = 0;
1131     }
1132     return r;
1133   }
1134 };
1135
1136 #define META_SYNC_UPDATE_MARKER_WINDOW 10
1137
1138 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1139   RGWMetaSyncEnv *sync_env;
1140
1141   string marker_oid;
1142   rgw_meta_sync_marker sync_marker;
1143
1144
1145 public:
1146   RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
1147                          const string& _marker_oid,
1148                          const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
1149                                                                 sync_env(_sync_env),
1150                                                                 marker_oid(_marker_oid),
1151                                                                 sync_marker(_marker) {}
1152
1153   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1154     sync_marker.marker = new_marker;
1155     if (index_pos > 0) {
1156       sync_marker.pos = index_pos;
1157     }
1158
1159     if (!real_clock::is_zero(timestamp)) {
1160       sync_marker.timestamp = timestamp;
1161     }
1162
1163     ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
1164     RGWRados *store = sync_env->store;
1165     return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
1166                                                            store,
1167                                                            rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
1168                                                            sync_marker);
1169   }
1170 };
1171
1172 int RGWMetaSyncSingleEntryCR::operate() {
1173   reenter(this) {
1174 #define NUM_TRANSIENT_ERROR_RETRIES 10
1175
1176     if (error_injection &&
1177         rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) {
1178       ldout(sync_env->cct, 0) << __FILE__ << ":" << __LINE__ << ": injecting meta sync error on key=" << raw_key << dendl;
1179       return set_cr_error(-EIO);
1180     }
1181
1182     if (op_status != MDLOG_STATUS_COMPLETE) {
1183       ldout(sync_env->cct, 20) << "skipping pending operation" << dendl;
1184       yield call(marker_tracker->finish(entry_marker));
1185       if (retcode < 0) {
1186         return set_cr_error(retcode);
1187       }
1188       return set_cr_done();
1189     }
1190     for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1191       yield {
1192         pos = raw_key.find(':');
1193         section = raw_key.substr(0, pos);
1194         key = raw_key.substr(pos + 1);
1195         ldout(sync_env->cct, 20) << "fetching remote metadata: " << section << ":" << key << (tries == 0 ? "" : " (retry)") << dendl;
1196         call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl));
1197       }
1198
1199       sync_status = retcode;
1200
1201       if (sync_status == -ENOENT) {
1202         /* FIXME: do we need to remove the entry from the local zone? */
1203         break;
1204       }
1205
1206       if ((sync_status == -EAGAIN || sync_status == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1207         ldout(sync_env->cct, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl;
1208         continue;
1209       }
1210
1211       if (sync_status < 0) {
1212         ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
1213         log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
1214         yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), section, key, -sync_status,
1215                                                         string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status)));
1216         return set_cr_error(sync_status);
1217       }
1218
1219       break;
1220     }
1221
1222     retcode = 0;
1223     for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
1224       if (sync_status != -ENOENT) {
1225           yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
1226       } else {
1227           yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
1228       }
1229       if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
1230         ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
1231         continue;
1232       }
1233       break;
1234     }
1235
1236     sync_status = retcode;
1237
1238     if (sync_status == 0 && marker_tracker) {
1239       /* update marker */
1240       yield call(marker_tracker->finish(entry_marker));
1241       sync_status = retcode;
1242     }
1243     if (sync_status < 0) {
1244       return set_cr_error(sync_status);
1245     }
1246     return set_cr_done();
1247   }
1248   return 0;
1249 }
1250
1251 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
1252   RGWMetaSyncEnv *sync_env;
1253   RGWMetadataLog *mdlog;
1254
1255   const std::string& period;
1256   int shard_id;
1257   string marker;
1258   bool truncated = false;
1259   string *new_marker;
1260
1261   int max_entries = CLONE_MAX_ENTRIES;
1262
1263   RGWRESTReadResource *http_op = nullptr;
1264   boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;
1265
1266   RGWMetadataLogInfo shard_info;
1267   rgw_mdlog_shard_data data;
1268
1269 public:
1270   RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
1271                            const std::string& period, int _id,
1272                            const string& _marker, string *_new_marker)
1273     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
1274       period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
1275     if (new_marker) {
1276       *new_marker = marker;
1277     }
1278   }
1279   ~RGWCloneMetaLogCoroutine() override {
1280     if (http_op) {
1281       http_op->put();
1282     }
1283     if (completion) {
1284       completion->cancel();
1285     }
1286   }
1287
1288   int operate() override;
1289
1290   int state_init();
1291   int state_read_shard_status();
1292   int state_read_shard_status_complete();
1293   int state_send_rest_request();
1294   int state_receive_rest_response();
1295   int state_store_mdlog_entries();
1296   int state_store_mdlog_entries_complete();
1297 };
1298
1299 class RGWMetaSyncShardCR : public RGWCoroutine {
1300   RGWMetaSyncEnv *sync_env;
1301
1302   const rgw_pool& pool;
1303   const std::string& period; //< currently syncing period id
1304   const epoch_t realm_epoch; //< realm_epoch of period
1305   RGWMetadataLog* mdlog; //< log of syncing period
1306   uint32_t shard_id;
1307   rgw_meta_sync_marker& sync_marker;
1308   boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates
1309   string marker;
1310   string max_marker;
1311   const std::string& period_marker; //< max marker stored in next period
1312
1313   map<string, bufferlist> entries;
1314   map<string, bufferlist>::iterator iter;
1315
1316   string oid;
1317
1318   RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
1319
1320   list<cls_log_entry> log_entries;
1321   list<cls_log_entry>::iterator log_iter;
1322   bool truncated = false;
1323
1324   string mdlog_marker;
1325   string raw_key;
1326   rgw_mdlog_entry mdlog_entry;
1327
1328   Mutex inc_lock;
1329   Cond inc_cond;
1330
1331   boost::asio::coroutine incremental_cr;
1332   boost::asio::coroutine full_cr;
1333
1334   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1335   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1336
1337   bool lost_lock = false;
1338
1339   bool *reset_backoff;
1340
1341   // hold a reference to the cr stack while it's in the map
1342   using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1343   map<StackRef, string> stack_to_pos;
1344   map<string, string> pos_to_prev;
1345
1346   bool can_adjust_marker = false;
1347   bool done_with_period = false;
1348
1349   int total_entries = 0;
1350
1351 public:
1352   RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1353                      const std::string& period, epoch_t realm_epoch,
1354                      RGWMetadataLog* mdlog, uint32_t _shard_id,
1355                      rgw_meta_sync_marker& _marker,
1356                      const std::string& period_marker, bool *_reset_backoff)
1357     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
1358       period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1359       shard_id(_shard_id), sync_marker(_marker),
1360       period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
1361       reset_backoff(_reset_backoff) {
1362     *reset_backoff = false;
1363   }
1364
1365   ~RGWMetaSyncShardCR() override {
1366     delete marker_tracker;
1367     if (lease_cr) {
1368       lease_cr->abort();
1369     }
1370   }
1371
1372   void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
1373     delete marker_tracker;
1374     marker_tracker = mt;
1375   }
1376
1377   int operate() override {
1378     int r;
1379     while (true) {
1380       switch (sync_marker.state) {
1381       case rgw_meta_sync_marker::FullSync:
1382         r  = full_sync();
1383         if (r < 0) {
1384           ldout(sync_env->cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1385           return set_cr_error(r);
1386         }
1387         return 0;
1388       case rgw_meta_sync_marker::IncrementalSync:
1389         r  = incremental_sync();
1390         if (r < 0) {
1391           ldout(sync_env->cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1392           return set_cr_error(r);
1393         }
1394         return 0;
1395       }
1396     }
1397     /* unreachable */
1398     return 0;
1399   }
1400
1401   void collect_children()
1402   {
1403     int child_ret;
1404     RGWCoroutinesStack *child;
1405     while (collect_next(&child_ret, &child)) {
1406       auto iter = stack_to_pos.find(child);
1407       if (iter == stack_to_pos.end()) {
1408         /* some other stack that we don't care about */
1409         continue;
1410       }
1411
1412       string& pos = iter->second;
1413
1414       if (child_ret < 0) {
1415         ldout(sync_env->cct, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl;
1416       }
1417
1418       map<string, string>::iterator prev_iter = pos_to_prev.find(pos);
1419       assert(prev_iter != pos_to_prev.end());
1420
1421       /*
1422        * we should get -EAGAIN for transient errors, for which we want to retry, so we don't
1423        * update the marker and abort. We'll get called again for these. Permanent errors will be
1424        * handled by marking the entry at the error log shard, so that we retry on it separately
1425        */
1426       if (child_ret == -EAGAIN) {
1427         can_adjust_marker = false;
1428       }
1429
1430       if (pos_to_prev.size() == 1) {
1431         if (can_adjust_marker) {
1432           sync_marker.marker = pos;
1433         }
1434         pos_to_prev.erase(prev_iter);
1435       } else {
1436         assert(pos_to_prev.size() > 1);
1437         pos_to_prev.erase(prev_iter);
1438         prev_iter = pos_to_prev.begin();
1439         if (can_adjust_marker) {
1440           sync_marker.marker = prev_iter->second;
1441         }
1442       }
1443
1444       ldout(sync_env->cct, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
1445       stack_to_pos.erase(iter);
1446     }
1447   }
1448
1449   int full_sync() {
1450 #define OMAP_GET_MAX_ENTRIES 100
1451     int max_entries = OMAP_GET_MAX_ENTRIES;
1452     reenter(&full_cr) {
1453       set_status("full_sync");
1454       oid = full_sync_index_shard_oid(shard_id);
1455       can_adjust_marker = true;
1456       /* grab lock */
1457       yield {
1458         uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1459         string lock_name = "sync_lock";
1460         RGWRados *store = sync_env->store;
1461         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1462                                                 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1463                                                 lock_name, lock_duration, this));
1464         lease_stack.reset(spawn(lease_cr.get(), false));
1465         lost_lock = false;
1466       }
1467       while (!lease_cr->is_locked()) {
1468         if (lease_cr->is_done()) {
1469           ldout(cct, 5) << "lease cr failed, done early " << dendl;
1470           drain_all();
1471           return lease_cr->get_ret_status();
1472         }
1473         set_sleeping(true);
1474         yield;
1475       }
1476
1477       /* lock succeeded, a retry now should avoid previous backoff status */
1478       *reset_backoff = true;
1479
1480       /* prepare marker tracker */
1481       set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1482                                                          sync_env->shard_obj_name(shard_id),
1483                                                          sync_marker));
1484
1485       marker = sync_marker.marker;
1486
1487       total_entries = sync_marker.pos;
1488
1489       /* sync! */
1490       do {
1491         if (!lease_cr->is_locked()) {
1492           lost_lock = true;
1493           break;
1494         }
1495         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1496                                              marker, &entries, max_entries));
1497         if (retcode < 0) {
1498           ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1499           yield lease_cr->go_down();
1500           drain_all();
1501           return retcode;
1502         }
1503         iter = entries.begin();
1504         for (; iter != entries.end(); ++iter) {
1505           ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1506           total_entries++;
1507           if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1508             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1509           } else {
1510             // fetch remote and write locally
1511             yield {
1512               RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
1513               // stack_to_pos holds a reference to the stack
1514               stack_to_pos[stack] = iter->first;
1515               pos_to_prev[iter->first] = marker;
1516             }
1517           }
1518           marker = iter->first;
1519         }
1520         collect_children();
1521       } while ((int)entries.size() == max_entries && can_adjust_marker);
1522
1523       while (num_spawned() > 1) {
1524         yield wait_for_child();
1525         collect_children();
1526       }
1527
1528       if (!lost_lock) {
1529         /* update marker to reflect we're done with full sync */
1530         if (can_adjust_marker) {
1531           // apply updates to a temporary marker, or operate() will send us
1532           // to incremental_sync() after we yield
1533           temp_marker = sync_marker;
1534           temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
1535           temp_marker->marker = std::move(temp_marker->next_step_marker);
1536           temp_marker->next_step_marker.clear();
1537           temp_marker->realm_epoch = realm_epoch;
1538           ldout(sync_env->cct, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
1539
1540           using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
1541           yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store,
1542                                        rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1543                                        *temp_marker));
1544         }
1545
1546         if (retcode < 0) {
1547           ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1548           yield lease_cr->go_down();
1549           drain_all();
1550           return retcode;
1551         }
1552       }
1553
1554       /* 
1555        * if we reached here, it means that lost_lock is true, otherwise the state
1556        * change in the previous block will prevent us from reaching here
1557        */
1558
1559       yield lease_cr->go_down();
1560
1561       lease_cr.reset();
1562
1563       drain_all();
1564
1565       if (!can_adjust_marker) {
1566         return -EAGAIN;
1567       }
1568
1569       if (lost_lock) {
1570         return -EBUSY;
1571       }
1572
1573       // apply the sync marker update
1574       assert(temp_marker);
1575       sync_marker = std::move(*temp_marker);
1576       temp_marker = boost::none;
1577       // must not yield after this point!
1578     }
1579     return 0;
1580   }
1581     
1582
1583   int incremental_sync() {
1584     reenter(&incremental_cr) {
1585       set_status("incremental_sync");
1586       can_adjust_marker = true;
1587       /* grab lock */
1588       if (!lease_cr) { /* could have had  a lease_cr lock from previous state */
1589         yield {
1590           uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1591           string lock_name = "sync_lock";
1592           RGWRados *store = sync_env->store;
1593           lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
1594                                                    rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1595                                                    lock_name, lock_duration, this));
1596           lease_stack.reset(spawn(lease_cr.get(), false));
1597           lost_lock = false;
1598         }
1599         while (!lease_cr->is_locked()) {
1600           if (lease_cr->is_done()) {
1601             ldout(cct, 5) << "lease cr failed, done early " << dendl;
1602             drain_all();
1603             return lease_cr->get_ret_status();
1604           }
1605           set_sleeping(true);
1606           yield;
1607         }
1608       }
1609       // if the period has advanced, we can't use the existing marker
1610       if (sync_marker.realm_epoch < realm_epoch) {
1611         ldout(sync_env->cct, 4) << "clearing marker=" << sync_marker.marker
1612             << " from old realm_epoch=" << sync_marker.realm_epoch
1613             << " (now " << realm_epoch << ')' << dendl;
1614         sync_marker.realm_epoch = realm_epoch;
1615         sync_marker.marker.clear();
1616       }
1617       mdlog_marker = sync_marker.marker;
1618       set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
1619                                                          sync_env->shard_obj_name(shard_id),
1620                                                          sync_marker));
1621
1622       /*
1623        * mdlog_marker: the remote sync marker positiion
1624        * sync_marker: the local sync marker position
1625        * max_marker: the max mdlog position that we fetched
1626        * marker: the current position we try to sync
1627        * period_marker: the last marker before the next period begins (optional)
1628        */
1629       marker = max_marker = sync_marker.marker;
1630       /* inc sync */
1631       do {
1632         if (!lease_cr->is_locked()) {
1633           lost_lock = true;
1634           break;
1635         }
1636 #define INCREMENTAL_MAX_ENTRIES 100
1637         ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1638         if (!period_marker.empty() && period_marker <= mdlog_marker) {
1639           ldout(cct, 10) << "mdlog_marker past period_marker=" << period_marker << dendl;
1640           done_with_period = true;
1641           break;
1642         }
1643         if (mdlog_marker <= max_marker) {
1644           /* we're at the tip, try to bring more entries */
1645           ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
1646           yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
1647                                                   period, shard_id,
1648                                                   mdlog_marker, &mdlog_marker));
1649         }
1650         if (retcode < 0) {
1651           ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
1652           yield lease_cr->go_down();
1653           drain_all();
1654           *reset_backoff = false; // back off and try again later
1655           return retcode;
1656         }
1657         *reset_backoff = true; /* if we got to this point, all systems function */
1658         ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1659         if (mdlog_marker > max_marker) {
1660           marker = max_marker;
1661           yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
1662                                                &max_marker, INCREMENTAL_MAX_ENTRIES,
1663                                                &log_entries, &truncated));
1664           if (retcode < 0) {
1665             ldout(sync_env->cct, 10) << *this << ": failed to list mdlog entries, retcode=" << retcode << dendl;
1666             yield lease_cr->go_down();
1667             drain_all();
1668             *reset_backoff = false; // back off and try again later
1669             return retcode;
1670           }
1671           for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) {
1672             if (!period_marker.empty() && period_marker <= log_iter->id) {
1673               done_with_period = true;
1674               if (period_marker < log_iter->id) {
1675                 ldout(cct, 10) << "found key=" << log_iter->id
1676                     << " past period_marker=" << period_marker << dendl;
1677                 break;
1678               }
1679               ldout(cct, 10) << "found key at period_marker=" << period_marker << dendl;
1680               // sync this entry, then return control to RGWMetaSyncCR
1681             }
1682             if (!mdlog_entry.convert_from(*log_iter)) {
1683               ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
1684               continue;
1685             }
1686             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
1687             if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) {
1688               ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
1689             } else {
1690               raw_key = log_iter->section + ":" + log_iter->name;
1691               yield {
1692                 RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
1693                 assert(stack);
1694                 // stack_to_pos holds a reference to the stack
1695                 stack_to_pos[stack] = log_iter->id;
1696                 pos_to_prev[log_iter->id] = marker;
1697               }
1698             }
1699             marker = log_iter->id;
1700           }
1701         }
1702         collect_children();
1703         ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
1704         if (done_with_period) {
1705           // return control to RGWMetaSyncCR and advance to the next period
1706           ldout(sync_env->cct, 10) << *this << ": done with period" << dendl;
1707           break;
1708         }
1709         if (mdlog_marker == max_marker && can_adjust_marker) {
1710 #define INCREMENTAL_INTERVAL 20
1711           yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1712         }
1713       } while (can_adjust_marker);
1714
1715       while (num_spawned() > 1) {
1716         yield wait_for_child();
1717         collect_children();
1718       }
1719
1720       yield lease_cr->go_down();
1721
1722       drain_all();
1723
1724       if (lost_lock) {
1725         return -EBUSY;
1726       }
1727
1728       if (!can_adjust_marker) {
1729         return -EAGAIN;
1730       }
1731
1732       return set_cr_done();
1733     }
1734     /* TODO */
1735     return 0;
1736   }
1737 };
1738
1739 class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
1740 {
1741   RGWMetaSyncEnv *sync_env;
1742
1743   const rgw_pool& pool;
1744   const std::string& period;
1745   epoch_t realm_epoch;
1746   RGWMetadataLog* mdlog;
1747   uint32_t shard_id;
1748   rgw_meta_sync_marker sync_marker;
1749   const std::string period_marker;
1750
1751   static constexpr bool exit_on_error = false; // retry on all errors
1752 public:
1753   RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
1754                             const std::string& period, epoch_t realm_epoch,
1755                             RGWMetadataLog* mdlog, uint32_t _shard_id,
1756                             const rgw_meta_sync_marker& _marker,
1757                             std::string&& period_marker)
1758     : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
1759       pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
1760       shard_id(_shard_id), sync_marker(_marker),
1761       period_marker(std::move(period_marker)) {}
1762
1763   RGWCoroutine *alloc_cr() override {
1764     return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
1765                                   shard_id, sync_marker, period_marker, backoff_ptr());
1766   }
1767
1768   RGWCoroutine *alloc_finisher_cr() override {
1769     RGWRados *store = sync_env->store;
1770     return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store,
1771                                                           rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
1772                                                           &sync_marker);
1773   }
1774 };
1775
1776 class RGWMetaSyncCR : public RGWCoroutine {
1777   RGWMetaSyncEnv *sync_env;
1778   const rgw_pool& pool;
1779   RGWPeriodHistory::Cursor cursor; //< sync position in period history
1780   RGWPeriodHistory::Cursor next; //< next period in history
1781   rgw_meta_sync_status sync_status;
1782
1783   std::mutex mutex; //< protect access to shard_crs
1784
1785   // TODO: it should be enough to hold a reference on the stack only, as calling
1786   // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has
1787   // already completed
1788   using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
1789   using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
1790   using RefPair = std::pair<ControlCRRef, StackRef>;
1791   map<int, RefPair> shard_crs;
1792   int ret{0};
1793
1794 public:
1795   RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
1796                 const rgw_meta_sync_status& _sync_status)
1797     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1798       pool(sync_env->store->get_zone_params().log_pool),
1799       cursor(cursor), sync_status(_sync_status) {}
1800
1801   int operate() override {
1802     reenter(this) {
1803       // loop through one period at a time
1804       for (;;) {
1805         if (cursor == sync_env->store->period_history->get_current()) {
1806           next = RGWPeriodHistory::Cursor{};
1807           if (cursor) {
1808             ldout(cct, 10) << "RGWMetaSyncCR on current period="
1809                 << cursor.get_period().get_id() << dendl;
1810           } else {
1811             ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
1812           }
1813         } else {
1814           next = cursor;
1815           next.next();
1816           ldout(cct, 10) << "RGWMetaSyncCR on period="
1817               << cursor.get_period().get_id() << ", next="
1818               << next.get_period().get_id() << dendl;
1819         }
1820
1821         yield {
1822           // get the mdlog for the current period (may be empty)
1823           auto& period_id = sync_status.sync_info.period;
1824           auto realm_epoch = sync_status.sync_info.realm_epoch;
1825           auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
1826
1827           // prevent wakeup() from accessing shard_crs while we're spawning them
1828           std::lock_guard<std::mutex> lock(mutex);
1829
1830           // sync this period on each shard
1831           for (const auto& m : sync_status.sync_markers) {
1832             uint32_t shard_id = m.first;
1833             auto& marker = m.second;
1834
1835             std::string period_marker;
1836             if (next) {
1837               // read the maximum marker from the next period's sync status
1838               period_marker = next.get_period().get_sync_status()[shard_id];
1839               if (period_marker.empty()) {
1840                 // no metadata changes have occurred on this shard, skip it
1841                 ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
1842                     << " with empty period marker" << dendl;
1843                 continue;
1844               }
1845             }
1846
1847             using ShardCR = RGWMetaSyncShardControlCR;
1848             auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
1849                                   mdlog, shard_id, marker,
1850                                   std::move(period_marker));
1851             auto stack = spawn(cr, false);
1852             shard_crs[shard_id] = RefPair{cr, stack};
1853           }
1854         }
1855         // wait for each shard to complete
1856         while (ret == 0 && num_spawned() > 0) {
1857           yield wait_for_child();
1858           collect(&ret, nullptr);
1859         }
1860         drain_all();
1861         {
1862           // drop shard cr refs under lock
1863           std::lock_guard<std::mutex> lock(mutex);
1864           shard_crs.clear();
1865         }
1866         if (ret < 0) {
1867           return set_cr_error(ret);
1868         }
1869         // advance to the next period
1870         assert(next);
1871         cursor = next;
1872
1873         // write the updated sync info
1874         sync_status.sync_info.period = cursor.get_period().get_id();
1875         sync_status.sync_info.realm_epoch = cursor.get_epoch();
1876         yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
1877                                                                  sync_env->store,
1878                                                                  rgw_raw_obj(pool, sync_env->status_oid()),
1879                                                                  sync_status.sync_info));
1880       }
1881     }
1882     return 0;
1883   }
1884
1885   void wakeup(int shard_id) {
1886     std::lock_guard<std::mutex> lock(mutex);
1887     auto iter = shard_crs.find(shard_id);
1888     if (iter == shard_crs.end()) {
1889       return;
1890     }
1891     iter->second.first->wakeup();
1892   }
1893 };
1894
1895 void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) {
1896   env->cct = store->ctx();
1897   env->store = store;
1898   env->conn = conn;
1899   env->async_rados = async_rados;
1900   env->http_manager = &http_manager;
1901   env->error_logger = error_logger;
1902 }
1903
1904 int RGWRemoteMetaLog::read_sync_status(rgw_meta_sync_status *sync_status)
1905 {
1906   if (store->is_meta_master()) {
1907     return 0;
1908   }
1909   // cannot run concurrently with run_sync(), so run in a separate manager
1910   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
1911   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
1912   int ret = http_manager.set_threaded();
1913   if (ret < 0) {
1914     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
1915     return ret;
1916   }
1917   RGWMetaSyncEnv sync_env_local = sync_env;
1918   sync_env_local.http_manager = &http_manager;
1919   ret = crs.run(new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status));
1920   http_manager.stop();
1921   return ret;
1922 }
1923
1924 int RGWRemoteMetaLog::init_sync_status()
1925 {
1926   if (store->is_meta_master()) {
1927     return 0;
1928   }
1929
1930   rgw_mdlog_info mdlog_info;
1931   int r = read_log_info(&mdlog_info);
1932   if (r < 0) {
1933     lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
1934     return r;
1935   }
1936
1937   rgw_meta_sync_info sync_info;
1938   sync_info.num_shards = mdlog_info.num_shards;
1939   auto cursor = store->period_history->get_current();
1940   if (cursor) {
1941     sync_info.period = cursor.get_period().get_id();
1942     sync_info.realm_epoch = cursor.get_epoch();
1943   }
1944
1945   return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
1946 }
1947
1948 int RGWRemoteMetaLog::store_sync_info(const rgw_meta_sync_info& sync_info)
1949 {
1950   return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store,
1951                                                            rgw_raw_obj(store->get_zone_params().log_pool, sync_env.status_oid()),
1952                                                            sync_info));
1953 }
1954
1955 // return a cursor to the period at our sync position
1956 static RGWPeriodHistory::Cursor get_period_at(RGWRados* store,
1957                                               const rgw_meta_sync_info& info)
1958 {
1959   if (info.period.empty()) {
1960     // return an empty cursor with error=0
1961     return RGWPeriodHistory::Cursor{};
1962   }
1963
1964   // look for an existing period in our history
1965   auto cursor = store->period_history->lookup(info.realm_epoch);
1966   if (cursor) {
1967     // verify that the period ids match
1968     auto& existing = cursor.get_period().get_id();
1969     if (existing != info.period) {
1970       lderr(store->ctx()) << "ERROR: sync status period=" << info.period
1971           << " does not match period=" << existing
1972           << " in history at realm epoch=" << info.realm_epoch << dendl;
1973       return RGWPeriodHistory::Cursor{-EEXIST};
1974     }
1975     return cursor;
1976   }
1977
1978   // read the period from rados or pull it from the master
1979   RGWPeriod period;
1980   int r = store->period_puller->pull(info.period, period);
1981   if (r < 0) {
1982     lderr(store->ctx()) << "ERROR: failed to read period id "
1983         << info.period << ": " << cpp_strerror(r) << dendl;
1984     return RGWPeriodHistory::Cursor{r};
1985   }
1986   // attach the period to our history
1987   cursor = store->period_history->attach(std::move(period));
1988   if (!cursor) {
1989     r = cursor.get_error();
1990     lderr(store->ctx()) << "ERROR: failed to read period history back to "
1991         << info.period << ": " << cpp_strerror(r) << dendl;
1992   }
1993   return cursor;
1994 }
1995
1996 int RGWRemoteMetaLog::run_sync()
1997 {
1998   if (store->is_meta_master()) {
1999     return 0;
2000   }
2001
2002   int r = 0;
2003
2004   // get shard count and oldest log period from master
2005   rgw_mdlog_info mdlog_info;
2006   for (;;) {
2007     if (going_down) {
2008       ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2009       return 0;
2010     }
2011     r = read_log_info(&mdlog_info);
2012     if (r == -EIO || r == -ENOENT) {
2013       // keep retrying if master isn't alive or hasn't initialized the log
2014       ldout(store->ctx(), 10) << __func__ << "(): waiting for master.." << dendl;
2015       backoff.backoff_sleep();
2016       continue;
2017     }
2018     backoff.reset();
2019     if (r < 0) {
2020       lderr(store->ctx()) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl;
2021       return r;
2022     }
2023     break;
2024   }
2025
2026   rgw_meta_sync_status sync_status;
2027   do {
2028     if (going_down) {
2029       ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
2030       return 0;
2031     }
2032     r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2033     if (r < 0 && r != -ENOENT) {
2034       ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2035       return r;
2036     }
2037
2038     if (!mdlog_info.period.empty()) {
2039       // restart sync if the remote has a period, but:
2040       // a) our status does not, or
2041       // b) our sync period comes before the remote's oldest log period
2042       if (sync_status.sync_info.period.empty() ||
2043           sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) {
2044         sync_status.sync_info.state = rgw_meta_sync_info::StateInit;
2045         ldout(store->ctx(), 1) << "epoch=" << sync_status.sync_info.realm_epoch
2046            << " in sync status comes before remote's oldest mdlog epoch="
2047            << mdlog_info.realm_epoch << ", restarting sync" << dendl;
2048       }
2049     }
2050
2051     if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
2052       ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
2053       sync_status.sync_info.num_shards = mdlog_info.num_shards;
2054       auto cursor = store->period_history->get_current();
2055       if (cursor) {
2056         // run full sync, then start incremental from the current period/epoch
2057         sync_status.sync_info.period = cursor.get_period().get_id();
2058         sync_status.sync_info.realm_epoch = cursor.get_epoch();
2059       }
2060       r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
2061       if (r == -EBUSY) {
2062         backoff.backoff_sleep();
2063         continue;
2064       }
2065       backoff.reset();
2066       if (r < 0) {
2067         ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl;
2068         return r;
2069       }
2070     }
2071   } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
2072
2073   auto num_shards = sync_status.sync_info.num_shards;
2074   if (num_shards != mdlog_info.num_shards) {
2075     lderr(store->ctx()) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
2076     return -EINVAL;
2077   }
2078
2079   RGWPeriodHistory::Cursor cursor;
2080   do {
2081     r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
2082     if (r < 0 && r != -ENOENT) {
2083       ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
2084       return r;
2085     }
2086
2087     switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
2088       case rgw_meta_sync_info::StateBuildingFullSyncMaps:
2089         ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
2090         r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers));
2091         if (r == -EBUSY || r == -EAGAIN) {
2092           backoff.backoff_sleep();
2093           continue;
2094         }
2095         backoff.reset();
2096         if (r < 0) {
2097           ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2098           return r;
2099         }
2100
2101         sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
2102         r = store_sync_info(sync_status.sync_info);
2103         if (r < 0) {
2104           ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
2105           return r;
2106         }
2107         /* fall through */
2108       case rgw_meta_sync_info::StateSync:
2109         ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
2110         // find our position in the period history (if any)
2111         cursor = get_period_at(store, sync_status.sync_info);
2112         r = cursor.get_error();
2113         if (r < 0) {
2114           return r;
2115         }
2116         meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status);
2117         r = run(meta_sync_cr);
2118         if (r < 0) {
2119           ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
2120           return r;
2121         }
2122         break;
2123       default:
2124         ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
2125         return -EIO;
2126     }
2127   } while (!going_down);
2128
2129   return 0;
2130 }
2131
2132 void RGWRemoteMetaLog::wakeup(int shard_id)
2133 {
2134   if (!meta_sync_cr) {
2135     return;
2136   }
2137   meta_sync_cr->wakeup(shard_id);
2138 }
2139
2140 int RGWCloneMetaLogCoroutine::operate()
2141 {
2142   reenter(this) {
2143     do {
2144       yield {
2145         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
2146         return state_init();
2147       }
2148       yield {
2149         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
2150         return state_read_shard_status();
2151       }
2152       yield {
2153         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
2154         return state_read_shard_status_complete();
2155       }
2156       yield {
2157         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
2158         return state_send_rest_request();
2159       }
2160       yield {
2161         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
2162         return state_receive_rest_response();
2163       }
2164       yield {
2165         ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
2166         return state_store_mdlog_entries();
2167       }
2168     } while (truncated);
2169     yield {
2170       ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
2171       return state_store_mdlog_entries_complete();
2172     }
2173   }
2174
2175   return 0;
2176 }
2177
2178 int RGWCloneMetaLogCoroutine::state_init()
2179 {
2180   data = rgw_mdlog_shard_data();
2181
2182   return 0;
2183 }
2184
2185 int RGWCloneMetaLogCoroutine::state_read_shard_status()
2186 {
2187   const bool add_ref = false; // default constructs with refs=1
2188
2189   completion.reset(new RGWMetadataLogInfoCompletion(
2190     [this](int ret, const cls_log_header& header) {
2191       if (ret < 0) {
2192         ldout(cct, 1) << "ERROR: failed to read mdlog info with "
2193             << cpp_strerror(ret) << dendl;
2194       } else {
2195         shard_info.marker = header.max_marker;
2196         shard_info.last_update = header.max_time.to_real_time();
2197       }
2198       // wake up parent stack
2199       stack->get_completion_mgr()->complete(nullptr, stack);
2200     }), add_ref);
2201
2202   int ret = mdlog->get_info_async(shard_id, completion.get());
2203   if (ret < 0) {
2204     ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
2205     return set_cr_error(ret);
2206   }
2207
2208   return io_block(0);
2209 }
2210
2211 int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
2212 {
2213   completion.reset();
2214
2215   ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
2216
2217   marker = shard_info.marker;
2218
2219   return 0;
2220 }
2221
2222 int RGWCloneMetaLogCoroutine::state_send_rest_request()
2223 {
2224   RGWRESTConn *conn = sync_env->conn;
2225
2226   char buf[32];
2227   snprintf(buf, sizeof(buf), "%d", shard_id);
2228
2229   char max_entries_buf[32];
2230   snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
2231
2232   const char *marker_key = (marker.empty() ? "" : "marker");
2233
2234   rgw_http_param_pair pairs[] = { { "type", "metadata" },
2235                                   { "id", buf },
2236                                   { "period", period.c_str() },
2237                                   { "max-entries", max_entries_buf },
2238                                   { marker_key, marker.c_str() },
2239                                   { NULL, NULL } };
2240
2241   http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
2242
2243   http_op->set_user_info((void *)stack);
2244
2245   int ret = http_op->aio_read();
2246   if (ret < 0) {
2247     ldout(cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
2248     log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
2249     http_op->put();
2250     http_op = NULL;
2251     return ret;
2252   }
2253
2254   return io_block(0);
2255 }
2256
2257 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
2258 {
2259   int ret = http_op->wait(&data);
2260   if (ret < 0) {
2261     error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
2262     ldout(cct, 5) << "failed to wait for op, ret=" << ret << dendl;
2263     http_op->put();
2264     http_op = NULL;
2265     return set_cr_error(ret);
2266   }
2267   http_op->put();
2268   http_op = NULL;
2269
2270   ldout(cct, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
2271
2272   truncated = ((int)data.entries.size() == max_entries);
2273
2274   if (data.entries.empty()) {
2275     if (new_marker) {
2276       *new_marker = marker;
2277     }
2278     return set_cr_done();
2279   }
2280
2281   if (new_marker) {
2282     *new_marker = data.entries.back().id;
2283   }
2284
2285   return 0;
2286 }
2287
2288
2289 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
2290 {
2291   list<cls_log_entry> dest_entries;
2292
2293   vector<rgw_mdlog_entry>::iterator iter;
2294   for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
2295     rgw_mdlog_entry& entry = *iter;
2296     ldout(cct, 20) << "entry: name=" << entry.name << dendl;
2297
2298     cls_log_entry dest_entry;
2299     dest_entry.id = entry.id;
2300     dest_entry.section = entry.section;
2301     dest_entry.name = entry.name;
2302     dest_entry.timestamp = utime_t(entry.timestamp);
2303   
2304     ::encode(entry.log_data, dest_entry.data);
2305
2306     dest_entries.push_back(dest_entry);
2307
2308     marker = entry.id;
2309   }
2310
2311   RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
2312
2313   int ret = mdlog->store_entries_in_shard(dest_entries, shard_id, cn->completion());
2314   if (ret < 0) {
2315     cn->put();
2316     ldout(cct, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
2317     return set_cr_error(ret);
2318   }
2319   return io_block(0);
2320 }
2321
2322 int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
2323 {
2324   return set_cr_done();
2325 }
2326
2327
2328 // TODO: move into rgw_sync_trim.cc
2329 #undef dout_prefix
2330 #define dout_prefix (*_dout << "meta trim: ")
2331
2332 /// purge all log shards for the given mdlog
2333 class PurgeLogShardsCR : public RGWShardCollectCR {
2334   RGWRados *const store;
2335   const RGWMetadataLog* mdlog;
2336   const int num_shards;
2337   rgw_raw_obj obj;
2338   int i{0};
2339
2340   static constexpr int max_concurrent = 16;
2341
2342  public:
2343   PurgeLogShardsCR(RGWRados *store, const RGWMetadataLog* mdlog,
2344                    const rgw_pool& pool, int num_shards)
2345     : RGWShardCollectCR(store->ctx(), max_concurrent),
2346       store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
2347   {}
2348
2349   bool spawn_next() override {
2350     if (i == num_shards) {
2351       return false;
2352     }
2353     mdlog->get_shard_oid(i++, obj.oid);
2354     spawn(new RGWRadosRemoveCR(store, obj), false);
2355     return true;
2356   }
2357 };
2358
2359 using Cursor = RGWPeriodHistory::Cursor;
2360
2361 /// purge mdlogs from the oldest up to (but not including) the given realm_epoch
2362 class PurgePeriodLogsCR : public RGWCoroutine {
2363   RGWRados *const store;
2364   RGWMetadataManager *const metadata;
2365   RGWObjVersionTracker objv;
2366   Cursor cursor;
2367   epoch_t realm_epoch;
2368   epoch_t *last_trim_epoch; //< update last trim on success
2369
2370  public:
2371   PurgePeriodLogsCR(RGWRados *store, epoch_t realm_epoch, epoch_t *last_trim)
2372     : RGWCoroutine(store->ctx()), store(store), metadata(store->meta_mgr),
2373       realm_epoch(realm_epoch), last_trim_epoch(last_trim)
2374   {}
2375
2376   int operate();
2377 };
2378
2379 int PurgePeriodLogsCR::operate()
2380 {
2381   reenter(this) {
2382     // read our current oldest log period
2383     yield call(metadata->read_oldest_log_period_cr(&cursor, &objv));
2384     if (retcode < 0) {
2385       return set_cr_error(retcode);
2386     }
2387     assert(cursor);
2388     ldout(cct, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
2389         << " period=" << cursor.get_period().get_id() << dendl;
2390
2391     // trim -up to- the given realm_epoch
2392     while (cursor.get_epoch() < realm_epoch) {
2393       ldout(cct, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
2394           << " period=" << cursor.get_period().get_id() << dendl;
2395       yield {
2396         const auto mdlog = metadata->get_log(cursor.get_period().get_id());
2397         const auto& pool = store->get_zone_params().log_pool;
2398         auto num_shards = cct->_conf->rgw_md_log_max_shards;
2399         call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
2400       }
2401       if (retcode < 0) {
2402         ldout(cct, 1) << "failed to remove log shards: "
2403             << cpp_strerror(retcode) << dendl;
2404         return set_cr_error(retcode);
2405       }
2406       ldout(cct, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
2407           << " period=" << cursor.get_period().get_id() << dendl;
2408
2409       // update our mdlog history
2410       yield call(metadata->trim_log_period_cr(cursor, &objv));
2411       if (retcode == -ENOENT) {
2412         // must have raced to update mdlog history. return success and allow the
2413         // winner to continue purging
2414         ldout(cct, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
2415             << " period=" << cursor.get_period().get_id() << dendl;
2416         return set_cr_done();
2417       } else if (retcode < 0) {
2418         ldout(cct, 1) << "failed to remove log shards for realm_epoch="
2419             << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
2420             << " with: " << cpp_strerror(retcode) << dendl;
2421         return set_cr_error(retcode);
2422       }
2423
2424       if (*last_trim_epoch < cursor.get_epoch()) {
2425         *last_trim_epoch = cursor.get_epoch();
2426       }
2427
2428       assert(cursor.has_next()); // get_current() should always come after
2429       cursor.next();
2430     }
2431     return set_cr_done();
2432   }
2433   return 0;
2434 }
2435
2436 namespace {
2437
2438 using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;
2439
2440 /// construct a RGWRESTConn for each zone in the realm
2441 template <typename Zonegroups>
2442 connection_map make_peer_connections(RGWRados *store,
2443                                      const Zonegroups& zonegroups)
2444 {
2445   connection_map connections;
2446   for (auto& g : zonegroups) {
2447     for (auto& z : g.second.zones) {
2448       std::unique_ptr<RGWRESTConn> conn{
2449         new RGWRESTConn(store->ctx(), store, z.first, z.second.endpoints)};
2450       connections.emplace(z.first, std::move(conn));
2451     }
2452   }
2453   return connections;
2454 }
2455
2456 /// return the marker that it's safe to trim up to
2457 const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
2458 {
2459   return m.state == m.FullSync ? m.next_step_marker : m.marker;
2460 }
2461
2462 /// comparison operator for take_min_status()
2463 bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
2464 {
2465   // sort by stable marker
2466   return get_stable_marker(lhs) < get_stable_marker(rhs);
2467 }
2468
2469 /// populate the status with the minimum stable marker of each shard for any
2470 /// peer whose realm_epoch matches the minimum realm_epoch in the input
2471 template <typename Iter>
2472 int take_min_status(CephContext *cct, Iter first, Iter last,
2473                     rgw_meta_sync_status *status)
2474 {
2475   if (first == last) {
2476     return -EINVAL;
2477   }
2478   const size_t num_shards = cct->_conf->rgw_md_log_max_shards;
2479
2480   status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
2481   for (auto p = first; p != last; ++p) {
2482     // validate peer's shard count
2483     if (p->sync_markers.size() != num_shards) {
2484       ldout(cct, 1) << "take_min_status got peer status with "
2485           << p->sync_markers.size() << " shards, expected "
2486           << num_shards << dendl;
2487       return -EINVAL;
2488     }
2489     if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
2490       // earlier epoch, take its entire status
2491       *status = std::move(*p);
2492     } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
2493       // same epoch, take any earlier markers
2494       auto m = status->sync_markers.begin();
2495       for (auto& shard : p->sync_markers) {
2496         if (shard.second < m->second) {
2497           m->second = std::move(shard.second);
2498         }
2499         ++m;
2500       }
2501     }
2502   }
2503   return 0;
2504 }
2505
2506 struct TrimEnv {
2507   RGWRados *const store;
2508   RGWHTTPManager *const http;
2509   int num_shards;
2510   const std::string& zone;
2511   Cursor current; //< cursor to current period
2512   epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged
2513
2514   TrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2515     : store(store), http(http), num_shards(num_shards),
2516       zone(store->get_zone_params().get_id()),
2517       current(store->period_history->get_current())
2518   {}
2519 };
2520
2521 struct MasterTrimEnv : public TrimEnv {
2522   connection_map connections; //< peer connections
2523   std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
2524   /// last trim marker for each shard, only applies to current period's mdlog
2525   std::vector<std::string> last_trim_markers;
2526
2527   MasterTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2528     : TrimEnv(store, http, num_shards),
2529       last_trim_markers(num_shards)
2530   {
2531     auto& period = current.get_period();
2532     connections = make_peer_connections(store, period.get_map().zonegroups);
2533     connections.erase(zone);
2534     peer_status.resize(connections.size());
2535   }
2536 };
2537
2538 struct PeerTrimEnv : public TrimEnv {
2539   /// last trim timestamp for each shard, only applies to current period's mdlog
2540   std::vector<ceph::real_time> last_trim_timestamps;
2541
2542   PeerTrimEnv(RGWRados *store, RGWHTTPManager *http, int num_shards)
2543     : TrimEnv(store, http, num_shards),
2544       last_trim_timestamps(num_shards)
2545   {}
2546
2547   void set_num_shards(int num_shards) {
2548     this->num_shards = num_shards;
2549     last_trim_timestamps.resize(num_shards);
2550   }
2551 };
2552
2553 } // anonymous namespace
2554
2555
2556 /// spawn a trim cr for each shard that needs it, while limiting the number
2557 /// of concurrent shards
2558 class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
2559  private:
2560   static constexpr int MAX_CONCURRENT_SHARDS = 16;
2561
2562   MasterTrimEnv& env;
2563   RGWMetadataLog *mdlog;
2564   int shard_id{0};
2565   std::string oid;
2566   const rgw_meta_sync_status& sync_status;
2567
2568  public:
2569   MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
2570                                const rgw_meta_sync_status& sync_status)
2571     : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2572       env(env), mdlog(mdlog), sync_status(sync_status)
2573   {}
2574
2575   bool spawn_next() override;
2576 };
2577
2578 bool MetaMasterTrimShardCollectCR::spawn_next()
2579 {
2580   while (shard_id < env.num_shards) {
2581     auto m = sync_status.sync_markers.find(shard_id);
2582     if (m == sync_status.sync_markers.end()) {
2583       shard_id++;
2584       continue;
2585     }
2586     auto& stable = get_stable_marker(m->second);
2587     auto& last_trim = env.last_trim_markers[shard_id];
2588
2589     if (stable <= last_trim) {
2590       // already trimmed
2591       ldout(cct, 20) << "skipping log shard " << shard_id
2592           << " at marker=" << stable
2593           << " last_trim=" << last_trim
2594           << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2595       shard_id++;
2596       continue;
2597     }
2598
2599     mdlog->get_shard_oid(shard_id, oid);
2600
2601     ldout(cct, 10) << "trimming log shard " << shard_id
2602         << " at marker=" << stable
2603         << " last_trim=" << last_trim
2604         << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
2605     spawn(new RGWSyncLogTrimCR(env.store, oid, stable, &last_trim), false);
2606     shard_id++;
2607     return true;
2608   }
2609   return false;
2610 }
2611
2612 /// spawn rest requests to read each peer's sync status
2613 class MetaMasterStatusCollectCR : public RGWShardCollectCR {
2614   static constexpr int MAX_CONCURRENT_SHARDS = 16;
2615
2616   MasterTrimEnv& env;
2617   connection_map::iterator c;
2618   std::vector<rgw_meta_sync_status>::iterator s;
2619  public:
2620   MetaMasterStatusCollectCR(MasterTrimEnv& env)
2621     : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2622       env(env), c(env.connections.begin()), s(env.peer_status.begin())
2623   {}
2624
2625   bool spawn_next() override {
2626     if (c == env.connections.end()) {
2627       return false;
2628     }
2629     static rgw_http_param_pair params[] = {
2630       { "type", "metadata" },
2631       { "status", nullptr },
2632       { nullptr, nullptr }
2633     };
2634
2635     ldout(cct, 20) << "query sync status from " << c->first << dendl;
2636     auto conn = c->second.get();
2637     using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
2638     spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
2639           false);
2640     ++c;
2641     ++s;
2642     return true;
2643   }
2644 };
2645
2646 class MetaMasterTrimCR : public RGWCoroutine {
2647   MasterTrimEnv& env;
2648   rgw_meta_sync_status min_status; //< minimum sync status of all peers
2649   int ret{0};
2650
2651  public:
2652   MetaMasterTrimCR(MasterTrimEnv& env)
2653     : RGWCoroutine(env.store->ctx()), env(env)
2654   {}
2655
2656   int operate();
2657 };
2658
2659 int MetaMasterTrimCR::operate()
2660 {
2661   reenter(this) {
2662     // TODO: detect this and fail before we spawn the trim thread?
2663     if (env.connections.empty()) {
2664       ldout(cct, 4) << "no peers, exiting" << dendl;
2665       return set_cr_done();
2666     }
2667
2668     ldout(cct, 10) << "fetching sync status for zone " << env.zone << dendl;
2669     // query mdlog sync status from peers
2670     yield call(new MetaMasterStatusCollectCR(env));
2671
2672     // must get a successful reply from all peers to consider trimming
2673     if (ret < 0) {
2674       ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
2675       return set_cr_error(ret);
2676     }
2677
2678     // determine the minimum epoch and markers
2679     ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
2680                           env.peer_status.end(), &min_status);
2681     if (ret < 0) {
2682       ldout(cct, 4) << "failed to calculate min sync status from peers" << dendl;
2683       return set_cr_error(ret);
2684     }
2685     yield {
2686       auto store = env.store;
2687       auto epoch = min_status.sync_info.realm_epoch;
2688       ldout(cct, 4) << "realm epoch min=" << epoch
2689           << " current=" << env.current.get_epoch()<< dendl;
2690       if (epoch > env.last_trim_epoch + 1) {
2691         // delete any prior mdlog periods
2692         spawn(new PurgePeriodLogsCR(store, epoch, &env.last_trim_epoch), true);
2693       } else {
2694         ldout(cct, 10) << "mdlogs already purged up to realm_epoch "
2695             << env.last_trim_epoch << dendl;
2696       }
2697
2698       // if realm_epoch == current, trim mdlog based on markers
2699       if (epoch == env.current.get_epoch()) {
2700         auto mdlog = store->meta_mgr->get_log(env.current.get_period().get_id());
2701         spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
2702       }
2703     }
2704     // ignore any errors during purge/trim because we want to hold the lock open
2705     return set_cr_done();
2706   }
2707   return 0;
2708 }
2709
2710
2711 /// read the first entry of the master's mdlog shard and trim to that position
2712 class MetaPeerTrimShardCR : public RGWCoroutine {
2713   RGWMetaSyncEnv& env;
2714   RGWMetadataLog *mdlog;
2715   const std::string& period_id;
2716   const int shard_id;
2717   RGWMetadataLogInfo info;
2718   ceph::real_time stable; //< safe timestamp to trim, according to master
2719   ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
2720   rgw_mdlog_shard_data result; //< result from master's mdlog listing
2721
2722  public:
2723   MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
2724                       const std::string& period_id, int shard_id,
2725                       ceph::real_time *last_trim)
2726     : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
2727       period_id(period_id), shard_id(shard_id), last_trim(last_trim)
2728   {}
2729
2730   int operate() override;
2731 };
2732
2733 int MetaPeerTrimShardCR::operate()
2734 {
2735   reenter(this) {
2736     // query master's first mdlog entry for this shard
2737     yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2738                                              "", 1, &result));
2739     if (retcode < 0) {
2740       ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2741           << shard_id << " for period " << period_id
2742           << ": " << cpp_strerror(retcode) << dendl;
2743       return set_cr_error(retcode);
2744     }
2745     if (result.entries.empty()) {
2746       // if there are no mdlog entries, we don't have a timestamp to compare. we
2747       // can't just trim everything, because there could be racing updates since
2748       // this empty reply. query the mdlog shard info to read its max timestamp,
2749       // then retry the listing to make sure it's still empty before trimming to
2750       // that
2751       ldout(cct, 10) << "empty master mdlog shard " << shard_id
2752           << ", reading last timestamp from shard info" << dendl;
2753       // read the mdlog shard info for the last timestamp
2754       using ShardInfoCR = RGWReadRemoteMDLogShardInfoCR;
2755       yield call(new ShardInfoCR(&env, period_id, shard_id, &info));
2756       if (retcode < 0) {
2757         ldout(cct, 5) << "failed to read info from master's mdlog shard "
2758             << shard_id << " for period " << period_id
2759             << ": " << cpp_strerror(retcode) << dendl;
2760         return set_cr_error(retcode);
2761       }
2762       if (ceph::real_clock::is_zero(info.last_update)) {
2763         return set_cr_done(); // nothing to trim
2764       }
2765       ldout(cct, 10) << "got mdlog shard info with last update="
2766           << info.last_update << dendl;
2767       // re-read the master's first mdlog entry to make sure it hasn't changed
2768       yield call(new RGWListRemoteMDLogShardCR(&env, period_id, shard_id,
2769                                                "", 1, &result));
2770       if (retcode < 0) {
2771         ldout(cct, 5) << "failed to read first entry from master's mdlog shard "
2772             << shard_id << " for period " << period_id
2773             << ": " << cpp_strerror(retcode) << dendl;
2774         return set_cr_error(retcode);
2775       }
2776       // if the mdlog is still empty, trim to max marker
2777       if (result.entries.empty()) {
2778         stable = info.last_update;
2779       } else {
2780         stable = result.entries.front().timestamp;
2781
2782         // can only trim -up to- master's first timestamp, so subtract a second.
2783         // (this is why we use timestamps instead of markers for the peers)
2784         stable -= std::chrono::seconds(1);
2785       }
2786     } else {
2787       stable = result.entries.front().timestamp;
2788       stable -= std::chrono::seconds(1);
2789     }
2790
2791     if (stable <= *last_trim) {
2792       ldout(cct, 10) << "skipping log shard " << shard_id
2793           << " at timestamp=" << stable
2794           << " last_trim=" << *last_trim << dendl;
2795       return set_cr_done();
2796     }
2797
2798     ldout(cct, 10) << "trimming log shard " << shard_id
2799         << " at timestamp=" << stable
2800         << " last_trim=" << *last_trim << dendl;
2801     yield {
2802       std::string oid;
2803       mdlog->get_shard_oid(shard_id, oid);
2804       call(new RGWRadosTimelogTrimCR(env.store, oid, real_time{}, stable, "", ""));
2805     }
2806     if (retcode < 0 && retcode != -ENODATA) {
2807       ldout(cct, 1) << "failed to trim mdlog shard " << shard_id
2808           << ": " << cpp_strerror(retcode) << dendl;
2809       return set_cr_error(retcode);
2810     }
2811     *last_trim = stable;
2812     return set_cr_done();
2813   }
2814   return 0;
2815 }
2816
2817 class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
2818   static constexpr int MAX_CONCURRENT_SHARDS = 16;
2819
2820   PeerTrimEnv& env;
2821   RGWMetadataLog *mdlog;
2822   const std::string& period_id;
2823   RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
2824   int shard_id{0};
2825
2826  public:
2827   MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
2828     : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
2829       env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
2830   {
2831     meta_env.init(cct, env.store, env.store->rest_master_conn,
2832                   env.store->get_async_rados(), env.http, nullptr);
2833   }
2834
2835   bool spawn_next() override;
2836 };
2837
2838 bool MetaPeerTrimShardCollectCR::spawn_next()
2839 {
2840   if (shard_id >= env.num_shards) {
2841     return false;
2842   }
2843   auto& last_trim = env.last_trim_timestamps[shard_id];
2844   spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
2845         false);
2846   shard_id++;
2847   return true;
2848 }
2849
2850 class MetaPeerTrimCR : public RGWCoroutine {
2851   PeerTrimEnv& env;
2852   rgw_mdlog_info mdlog_info; //< master's mdlog info
2853
2854  public:
2855   MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}
2856
2857   int operate();
2858 };
2859
2860 int MetaPeerTrimCR::operate()
2861 {
2862   reenter(this) {
2863     ldout(cct, 10) << "fetching master mdlog info" << dendl;
2864     yield {
2865       // query mdlog_info from master for oldest_log_period
2866       rgw_http_param_pair params[] = {
2867         { "type", "metadata" },
2868         { nullptr, nullptr }
2869       };
2870
2871       using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
2872       call(new LogInfoCR(cct, env.store->rest_master_conn, env.http,
2873                          "/admin/log/", params, &mdlog_info));
2874     }
2875     if (retcode < 0) {
2876       ldout(cct, 4) << "failed to read mdlog info from master" << dendl;
2877       return set_cr_error(retcode);
2878     }
2879     // use master's shard count instead
2880     env.set_num_shards(mdlog_info.num_shards);
2881
2882     if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
2883       // delete any prior mdlog periods
2884       yield call(new PurgePeriodLogsCR(env.store, mdlog_info.realm_epoch,
2885                                        &env.last_trim_epoch));
2886     } else {
2887       ldout(cct, 10) << "mdlogs already purged through realm_epoch "
2888           << env.last_trim_epoch << dendl;
2889     }
2890
2891     // if realm_epoch == current, trim mdlog based on master's markers
2892     if (mdlog_info.realm_epoch == env.current.get_epoch()) {
2893       yield {
2894         auto meta_mgr = env.store->meta_mgr;
2895         auto mdlog = meta_mgr->get_log(env.current.get_period().get_id());
2896         call(new MetaPeerTrimShardCollectCR(env, mdlog));
2897         // ignore any errors during purge/trim because we want to hold the lock open
2898       }
2899     }
2900     return set_cr_done();
2901   }
2902   return 0;
2903 }
2904
2905 class MetaTrimPollCR : public RGWCoroutine {
2906   RGWRados *const store;
2907   const utime_t interval; //< polling interval
2908   const rgw_raw_obj obj;
2909   const std::string name{"meta_trim"}; //< lock name
2910   const std::string cookie;
2911
2912  protected:
2913   /// allocate the coroutine to run within the lease
2914   virtual RGWCoroutine* alloc_cr() = 0;
2915
2916  public:
2917   MetaTrimPollCR(RGWRados *store, utime_t interval)
2918     : RGWCoroutine(store->ctx()), store(store), interval(interval),
2919       obj(store->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
2920       cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
2921   {}
2922
2923   int operate();
2924 };
2925
2926 int MetaTrimPollCR::operate()
2927 {
2928   reenter(this) {
2929     for (;;) {
2930       set_status("sleeping");
2931       wait(interval);
2932
2933       // prevent others from trimming for our entire wait interval
2934       set_status("acquiring trim lock");
2935       yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
2936                                           obj, name, cookie, interval.sec()));
2937       if (retcode < 0) {
2938         ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
2939         continue;
2940       }
2941
2942       set_status("trimming");
2943       yield call(alloc_cr());
2944
2945       if (retcode < 0) {
2946         // on errors, unlock so other gateways can try
2947         set_status("unlocking");
2948         yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
2949                                               obj, name, cookie));
2950       }
2951     }
2952   }
2953   return 0;
2954 }
2955
2956 class MetaMasterTrimPollCR : public MetaTrimPollCR  {
2957   MasterTrimEnv env; //< trim state to share between calls
2958   RGWCoroutine* alloc_cr() override {
2959     return new MetaMasterTrimCR(env);
2960   }
2961  public:
2962   MetaMasterTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2963                        int num_shards, utime_t interval)
2964     : MetaTrimPollCR(store, interval),
2965       env(store, http, num_shards)
2966   {}
2967 };
2968
2969 class MetaPeerTrimPollCR : public MetaTrimPollCR {
2970   PeerTrimEnv env; //< trim state to share between calls
2971   RGWCoroutine* alloc_cr() override {
2972     return new MetaPeerTrimCR(env);
2973   }
2974  public:
2975   MetaPeerTrimPollCR(RGWRados *store, RGWHTTPManager *http,
2976                      int num_shards, utime_t interval)
2977     : MetaTrimPollCR(store, interval),
2978       env(store, http, num_shards)
2979   {}
2980 };
2981
2982 RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
2983                                       int num_shards, utime_t interval)
2984 {
2985   if (store->is_meta_master()) {
2986     return new MetaMasterTrimPollCR(store, http, num_shards, interval);
2987   }
2988   return new MetaPeerTrimPollCR(store, http, num_shards, interval);
2989 }
2990
2991
2992 struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
2993   MetaMasterAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
2994     : MasterTrimEnv(store, http, num_shards),
2995       MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
2996   {}
2997 };
2998
2999 struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
3000   MetaPeerAdminTrimCR(RGWRados *store, RGWHTTPManager *http, int num_shards)
3001     : PeerTrimEnv(store, http, num_shards),
3002       MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
3003   {}
3004 };
3005
3006 RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
3007                                             RGWHTTPManager *http,
3008                                             int num_shards)
3009 {
3010   if (store->is_meta_master()) {
3011     return new MetaMasterAdminTrimCR(store, http, num_shards);
3012   }
3013   return new MetaPeerAdminTrimCR(store, http, num_shards);
3014 }