Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21
22 #include "cls/lock/cls_lock_client.h"
23
24 #include "auth/Crypto.h"
25
26 #include <boost/asio/yield.hpp>
27
28 #define dout_subsys ceph_subsys_rgw
29
30 #undef dout_prefix
31 #define dout_prefix (*_dout << "data sync: ")
32
33 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
34 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
35 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
36 static string bucket_status_oid_prefix = "bucket.sync-status";
37
38 class RGWSyncDebugLogger {
39   CephContext *cct;
40   string prefix;
41
42   bool ended;
43
44 public:
45   RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
46                      const string& sync_type, const string& sync_stage,
47                      const string& resource, bool log_start = true) {
48     init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
49   }
50   RGWSyncDebugLogger() : cct(NULL), ended(false) {}
51   ~RGWSyncDebugLogger();
52
53   void init(CephContext *_cct, const string& source_zone,
54             const string& sync_type, const string& sync_stage,
55             const string& resource, bool log_start = true);
56   void log(const string& state);
57   void finish(int status);
58 };
59
60 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
61                               const string& sync_type, const string& sync_section,
62                               const string& resource, bool log_start)
63 {
64   cct = _cct;
65   ended = false;
66   string zone_str = source_zone.substr(0, 8);
67   prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
68   if (log_start) {
69     log("start");
70   }
71 }
72
73 RGWSyncDebugLogger::~RGWSyncDebugLogger()
74 {
75   if (!ended) {
76     log("finish");
77   }
78 }
79
80 void RGWSyncDebugLogger::log(const string& state)
81 {
82   ldout(cct, 5) << prefix << ":" << state << dendl;
83 }
84
85 void RGWSyncDebugLogger::finish(int status)
86 {
87   ended = true;
88   ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
89 }
90
91 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
92 public:
93   RGWDataSyncDebugLogger() {}
94   RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
95                          const string& resource, bool log_start = true) {
96     init(sync_env, sync_section, resource, log_start);
97   }
98   void init(RGWDataSyncEnv *sync_env, const string& sync_section,
99             const string& resource, bool log_start = true) {
100     RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
101   }
102
103 };
104
105 void rgw_datalog_info::decode_json(JSONObj *obj) {
106   JSONDecoder::decode_json("num_objects", num_shards, obj);
107 }
108
109 void rgw_datalog_entry::decode_json(JSONObj *obj) {
110   JSONDecoder::decode_json("key", key, obj);
111   utime_t ut;
112   JSONDecoder::decode_json("timestamp", ut, obj);
113   timestamp = ut.to_real_time();
114 }
115
116 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
117   JSONDecoder::decode_json("marker", marker, obj);
118   JSONDecoder::decode_json("truncated", truncated, obj);
119   JSONDecoder::decode_json("entries", entries, obj);
120 };
121
122 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
123   static constexpr int MAX_CONCURRENT_SHARDS = 16;
124
125   RGWDataSyncEnv *env;
126   const int num_shards;
127   int shard_id{0};;
128
129   map<uint32_t, rgw_data_sync_marker>& markers;
130
131  public:
132   RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
133                                  map<uint32_t, rgw_data_sync_marker>& markers)
134     : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
135       env(env), num_shards(num_shards), markers(markers)
136   {}
137   bool spawn_next() override;
138 };
139
140 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
141 {
142   if (shard_id >= num_shards) {
143     return false;
144   }
145   using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
146   spawn(new CR(env->async_rados, env->store,
147                rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
148                &markers[shard_id]),
149         false);
150   shard_id++;
151   return true;
152 }
153
154 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
155   RGWDataSyncEnv *sync_env;
156   rgw_data_sync_status *sync_status;
157
158 public:
159   RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
160                                  rgw_data_sync_status *_status)
161     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
162   {}
163   int operate() override;
164 };
165
166 int RGWReadDataSyncStatusCoroutine::operate()
167 {
168   reenter(this) {
169     // read sync info
170     using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
171     yield {
172       bool empty_on_enoent = false; // fail on ENOENT
173       call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
174                           rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
175                           &sync_status->sync_info, empty_on_enoent));
176     }
177     if (retcode < 0) {
178       ldout(sync_env->cct, 4) << "failed to read sync status info with "
179           << cpp_strerror(retcode) << dendl;
180       return set_cr_error(retcode);
181     }
182     // read shard markers
183     using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
184     yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
185                                  sync_status->sync_markers));
186     if (retcode < 0) {
187       ldout(sync_env->cct, 4) << "failed to read sync status markers with "
188           << cpp_strerror(retcode) << dendl;
189       return set_cr_error(retcode);
190     }
191     return set_cr_done();
192   }
193   return 0;
194 }
195
196 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
197   RGWDataSyncEnv *sync_env;
198
199   RGWRESTReadResource *http_op;
200
201   int shard_id;
202   RGWDataChangesLogInfo *shard_info;
203
204 public:
205   RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
206                                                       int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
207                                                       sync_env(_sync_env),
208                                                       http_op(NULL),
209                                                       shard_id(_shard_id),
210                                                       shard_info(_shard_info) {
211   }
212
213   ~RGWReadRemoteDataLogShardInfoCR() override {
214     if (http_op) {
215       http_op->put();
216     }
217   }
218
219   int operate() override {
220     reenter(this) {
221       yield {
222         char buf[16];
223         snprintf(buf, sizeof(buf), "%d", shard_id);
224         rgw_http_param_pair pairs[] = { { "type" , "data" },
225                                         { "id", buf },
226                                         { "info" , NULL },
227                                         { NULL, NULL } };
228
229         string p = "/admin/log/";
230
231         http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
232
233         http_op->set_user_info((void *)stack);
234
235         int ret = http_op->aio_read();
236         if (ret < 0) {
237           ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
238           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
239           return set_cr_error(ret);
240         }
241
242         return io_block(0);
243       }
244       yield {
245         int ret = http_op->wait(shard_info);
246         if (ret < 0) {
247           return set_cr_error(ret);
248         }
249         return set_cr_done();
250       }
251     }
252     return 0;
253   }
254 };
255
256 struct read_remote_data_log_response {
257   string marker;
258   bool truncated;
259   list<rgw_data_change_log_entry> entries;
260
261   read_remote_data_log_response() : truncated(false) {}
262
263   void decode_json(JSONObj *obj) {
264     JSONDecoder::decode_json("marker", marker, obj);
265     JSONDecoder::decode_json("truncated", truncated, obj);
266     JSONDecoder::decode_json("entries", entries, obj);
267   };
268 };
269
270 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
271   RGWDataSyncEnv *sync_env;
272
273   RGWRESTReadResource *http_op;
274
275   int shard_id;
276   string *pmarker;
277   list<rgw_data_change_log_entry> *entries;
278   bool *truncated;
279
280   read_remote_data_log_response response;
281
282 public:
283   RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
284                               int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
285                                                       sync_env(_sync_env),
286                                                       http_op(NULL),
287                                                       shard_id(_shard_id),
288                                                       pmarker(_pmarker),
289                                                       entries(_entries),
290                                                       truncated(_truncated) {
291   }
292   ~RGWReadRemoteDataLogShardCR() override {
293     if (http_op) {
294       http_op->put();
295     }
296   }
297
298   int operate() override {
299     reenter(this) {
300       yield {
301         char buf[16];
302         snprintf(buf, sizeof(buf), "%d", shard_id);
303         rgw_http_param_pair pairs[] = { { "type" , "data" },
304                                         { "id", buf },
305                                         { "marker", pmarker->c_str() },
306                                         { "extra-info", "true" },
307                                         { NULL, NULL } };
308
309         string p = "/admin/log/";
310
311         http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
312
313         http_op->set_user_info((void *)stack);
314
315         int ret = http_op->aio_read();
316         if (ret < 0) {
317           ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
318           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
319           return set_cr_error(ret);
320         }
321
322         return io_block(0);
323       }
324       yield {
325         int ret = http_op->wait(&response);
326         if (ret < 0) {
327           return set_cr_error(ret);
328         }
329         entries->clear();
330         entries->swap(response.entries);
331         *pmarker = response.marker;
332         *truncated = response.truncated;
333         return set_cr_done();
334       }
335     }
336     return 0;
337   }
338 };
339
340 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
341   RGWDataSyncEnv *sync_env;
342
343   int num_shards;
344   map<int, RGWDataChangesLogInfo> *datalog_info;
345
346   int shard_id;
347 #define READ_DATALOG_MAX_CONCURRENT 10
348
349 public:
350   RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
351                      int _num_shards,
352                      map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
353                                                                  sync_env(_sync_env), num_shards(_num_shards),
354                                                                  datalog_info(_datalog_info), shard_id(0) {}
355   bool spawn_next() override;
356 };
357
358 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
359   if (shard_id >= num_shards) {
360     return false;
361   }
362   spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
363   shard_id++;
364   return true;
365 }
366
367 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
368   RGWDataSyncEnv *sync_env;
369   RGWRESTReadResource *http_op;
370
371   int shard_id;
372   string marker;
373   uint32_t max_entries;
374   rgw_datalog_shard_data *result;
375
376 public:
377   RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
378                               const string& _marker, uint32_t _max_entries,
379                               rgw_datalog_shard_data *_result)
380     : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
381       shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
382
383   int send_request() override {
384     RGWRESTConn *conn = sync_env->conn;
385     RGWRados *store = sync_env->store;
386
387     char buf[32];
388     snprintf(buf, sizeof(buf), "%d", shard_id);
389
390     char max_entries_buf[32];
391     snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
392
393     const char *marker_key = (marker.empty() ? "" : "marker");
394
395     rgw_http_param_pair pairs[] = { { "type", "data" },
396       { "id", buf },
397       { "max-entries", max_entries_buf },
398       { marker_key, marker.c_str() },
399       { NULL, NULL } };
400
401     string p = "/admin/log/";
402
403     http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
404     http_op->set_user_info((void *)stack);
405
406     int ret = http_op->aio_read();
407     if (ret < 0) {
408       ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
409       log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
410       http_op->put();
411       return ret;
412     }
413
414     return 0;
415   }
416
417   int request_complete() override {
418     int ret = http_op->wait(result);
419     http_op->put();
420     if (ret < 0 && ret != -ENOENT) {
421       ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
422       return ret;
423     }
424     return 0;
425   }
426 };
427
428 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
429   RGWDataSyncEnv *sync_env;
430
431   map<int, string> shards;
432   int max_entries_per_shard;
433   map<int, rgw_datalog_shard_data> *result;
434
435   map<int, string>::iterator iter;
436 #define READ_DATALOG_MAX_CONCURRENT 10
437
438 public:
439   RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
440                      map<int, string>& _shards,
441                      int _max_entries_per_shard,
442                      map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
443                                                                  sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
444                                                                  result(_result) {
445     shards.swap(_shards);
446     iter = shards.begin();
447   }
448   bool spawn_next() override;
449 };
450
451 bool RGWListRemoteDataLogCR::spawn_next() {
452   if (iter == shards.end()) {
453     return false;
454   }
455
456   spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
457   ++iter;
458   return true;
459 }
460
461 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
462   static constexpr uint32_t lock_duration = 30;
463   RGWDataSyncEnv *sync_env;
464   RGWRados *store;
465   const rgw_pool& pool;
466   const uint32_t num_shards;
467
468   string sync_status_oid;
469
470   string lock_name;
471   string cookie;
472   rgw_data_sync_status *status;
473   map<int, RGWDataChangesLogInfo> shards_info;
474 public:
475   RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
476                                  uint64_t instance_id,
477                                  rgw_data_sync_status *status)
478     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
479       pool(store->get_zone_params().log_pool),
480       num_shards(num_shards), status(status) {
481     lock_name = "sync_lock";
482
483     status->sync_info.instance_id = instance_id;
484
485 #define COOKIE_LEN 16
486     char buf[COOKIE_LEN + 1];
487
488     gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
489     cookie = buf;
490
491     sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
492   }
493
494   int operate() override {
495     int ret;
496     reenter(this) {
497       using LockCR = RGWSimpleRadosLockCR;
498       yield call(new LockCR(sync_env->async_rados, store,
499                             rgw_raw_obj{pool, sync_status_oid},
500                             lock_name, cookie, lock_duration));
501       if (retcode < 0) {
502         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
503         return set_cr_error(retcode);
504       }
505       using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
506       yield call(new WriteInfoCR(sync_env->async_rados, store,
507                                  rgw_raw_obj{pool, sync_status_oid},
508                                  status->sync_info));
509       if (retcode < 0) {
510         ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
511         return set_cr_error(retcode);
512       }
513
514       /* take lock again, we just recreated the object */
515       yield call(new LockCR(sync_env->async_rados, store,
516                             rgw_raw_obj{pool, sync_status_oid},
517                             lock_name, cookie, lock_duration));
518       if (retcode < 0) {
519         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
520         return set_cr_error(retcode);
521       }
522
523       /* fetch current position in logs */
524       yield {
525         RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
526         if (!conn) {
527           ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
528           return set_cr_error(-EIO);
529         }
530         for (uint32_t i = 0; i < num_shards; i++) {
531           spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
532         }
533       }
534       while (collect(&ret, NULL)) {
535         if (ret < 0) {
536           ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
537           return set_state(RGWCoroutine_Error);
538         }
539         yield;
540       }
541       yield {
542         for (uint32_t i = 0; i < num_shards; i++) {
543           RGWDataChangesLogInfo& info = shards_info[i];
544           auto& marker = status->sync_markers[i];
545           marker.next_step_marker = info.marker;
546           marker.timestamp = info.last_update;
547           const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
548           using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
549           spawn(new WriteMarkerCR(sync_env->async_rados, store,
550                                   rgw_raw_obj{pool, oid}, marker), true);
551         }
552       }
553       while (collect(&ret, NULL)) {
554         if (ret < 0) {
555           ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
556           return set_state(RGWCoroutine_Error);
557         }
558         yield;
559       }
560
561       status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
562       yield call(new WriteInfoCR(sync_env->async_rados, store,
563                                  rgw_raw_obj{pool, sync_status_oid},
564                                  status->sync_info));
565       if (retcode < 0) {
566         ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
567         return set_cr_error(retcode);
568       }
569       yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
570                                             rgw_raw_obj{pool, sync_status_oid},
571                                             lock_name, cookie));
572       return set_cr_done();
573     }
574     return 0;
575   }
576 };
577
578 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
579 {
580   rgw_http_param_pair pairs[] = { { "type", "data" },
581                                   { NULL, NULL } };
582
583   int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
584   if (ret < 0) {
585     ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
586     return ret;
587   }
588
589   ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
590
591   return 0;
592 }
593
594 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
595 {
596   rgw_datalog_info log_info;
597   int ret = read_log_info(&log_info);
598   if (ret < 0) {
599     return ret;
600   }
601
602   return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
603 }
604
605 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
606 {
607   if (store->is_meta_master()) {
608     return 0;
609   }
610
611   return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
612 }
613
614 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
615 {
616   sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
617
618   if (initialized) {
619     return 0;
620   }
621
622   int ret = http_manager.set_threaded();
623   if (ret < 0) {
624     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
625     return ret;
626   }
627
628   initialized = true;
629
630   return 0;
631 }
632
633 void RGWRemoteDataLog::finish()
634 {
635   stop();
636 }
637
638 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
639 {
640   // cannot run concurrently with run_sync(), so run in a separate manager
641   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
642   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
643   int ret = http_manager.set_threaded();
644   if (ret < 0) {
645     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
646     return ret;
647   }
648   RGWDataSyncEnv sync_env_local = sync_env;
649   sync_env_local.http_manager = &http_manager;
650   ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
651   http_manager.stop();
652   return ret;
653 }
654
655 int RGWRemoteDataLog::init_sync_status(int num_shards)
656 {
657   rgw_data_sync_status sync_status;
658   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
659   RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
660   int ret = http_manager.set_threaded();
661   if (ret < 0) {
662     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
663     return ret;
664   }
665   RGWDataSyncEnv sync_env_local = sync_env;
666   sync_env_local.http_manager = &http_manager;
667   uint64_t instance_id;
668   get_random_bytes((char *)&instance_id, sizeof(instance_id));
669   ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
670   http_manager.stop();
671   return ret;
672 }
673
674 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
675 {
676   char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
677   snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
678   return string(buf);
679 }
680
681 struct bucket_instance_meta_info {
682   string key;
683   obj_version ver;
684   utime_t mtime;
685   RGWBucketInstanceMetadataObject data;
686
687   bucket_instance_meta_info() {}
688
689   void decode_json(JSONObj *obj) {
690     JSONDecoder::decode_json("key", key, obj);
691     JSONDecoder::decode_json("ver", ver, obj);
692     JSONDecoder::decode_json("mtime", mtime, obj);
693     JSONDecoder::decode_json("data", data, obj);
694   }
695 };
696
697 class RGWListBucketIndexesCR : public RGWCoroutine {
698   RGWDataSyncEnv *sync_env;
699
700   RGWRados *store;
701
702   rgw_data_sync_status *sync_status;
703   int num_shards;
704
705   int req_ret;
706   int ret;
707
708   list<string> result;
709   list<string>::iterator iter;
710
711   RGWShardedOmapCRManager *entries_index;
712
713   string oid_prefix;
714
715   string path;
716   bucket_instance_meta_info meta_info;
717   string key;
718   string s;
719   int i;
720
721   bool failed;
722
723 public:
724   RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
725                          rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
726                                                       store(sync_env->store), sync_status(_sync_status),
727                                                       req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
728     oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; 
729     path = "/admin/metadata/bucket.instance";
730     num_shards = sync_status->sync_info.num_shards;
731   }
732   ~RGWListBucketIndexesCR() override {
733     delete entries_index;
734   }
735
736   int operate() override {
737     reenter(this) {
738       yield {
739         string entrypoint = string("/admin/metadata/bucket.instance");
740         /* FIXME: need a better scaling solution here, requires streaming output */
741         call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
742                                                       entrypoint, NULL, &result));
743       }
744       if (retcode < 0) {
745         ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
746         return set_cr_error(retcode);
747       }
748       entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
749                                                   store->get_zone_params().log_pool,
750                                                   oid_prefix);
751       yield; // yield so OmapAppendCRs can start
752       for (iter = result.begin(); iter != result.end(); ++iter) {
753         ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
754
755         key = *iter;
756
757         yield {
758           rgw_http_param_pair pairs[] = { { "key", key.c_str() },
759                                           { NULL, NULL } };
760
761           call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
762         }
763
764         num_shards = meta_info.data.get_bucket_info().num_shards;
765         if (num_shards > 0) {
766           for (i = 0; i < num_shards; i++) {
767             char buf[16];
768             snprintf(buf, sizeof(buf), ":%d", i);
769             s = key + buf;
770             yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
771           }
772         } else {
773           yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
774         }
775       }
776       yield {
777         if (!entries_index->finish()) {
778           failed = true;
779         }
780       }
781       if (!failed) {
782         for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
783           int shard_id = (int)iter->first;
784           rgw_data_sync_marker& marker = iter->second;
785           marker.total_entries = entries_index->get_total_entries(shard_id);
786           spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
787                                                                 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
788                                                                 marker), true);
789         }
790       } else {
791           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
792                                                           EIO, string("failed to build bucket instances map")));
793       }
794       while (collect(&ret, NULL)) {
795         if (ret < 0) {
796           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
797                                                           -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
798           req_ret = ret;
799         }
800         yield;
801       }
802       drain_all();
803       if (req_ret < 0) {
804         yield return set_cr_error(req_ret);
805       }
806       yield return set_cr_done();
807     }
808     return 0;
809   }
810 };
811
812 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
813
814 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
815   RGWDataSyncEnv *sync_env;
816
817   string marker_oid;
818   rgw_data_sync_marker sync_marker;
819
820   map<string, string> key_to_marker;
821   map<string, string> marker_to_key;
822
823   void handle_finish(const string& marker) override {
824     map<string, string>::iterator iter = marker_to_key.find(marker);
825     if (iter == marker_to_key.end()) {
826       return;
827     }
828     key_to_marker.erase(iter->second);
829     reset_need_retry(iter->second);
830     marker_to_key.erase(iter);
831   }
832
833 public:
834   RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
835                          const string& _marker_oid,
836                          const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
837                                                                 sync_env(_sync_env),
838                                                                 marker_oid(_marker_oid),
839                                                                 sync_marker(_marker) {}
840
841   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
842     sync_marker.marker = new_marker;
843     sync_marker.pos = index_pos;
844
845     ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
846     RGWRados *store = sync_env->store;
847
848     return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
849                                                            rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
850                                                            sync_marker);
851   }
852
853   /*
854    * create index from key -> marker, and from marker -> key
855    * this is useful so that we can insure that we only have one
856    * entry for any key that is used. This is needed when doing
857    * incremenatl sync of data, and we don't want to run multiple
858    * concurrent sync operations for the same bucket shard 
859    */
860   bool index_key_to_marker(const string& key, const string& marker) {
861     if (key_to_marker.find(key) != key_to_marker.end()) {
862       set_need_retry(key);
863       return false;
864     }
865     key_to_marker[key] = marker;
866     marker_to_key[marker] = key;
867     return true;
868   }
869 };
870
871 // ostream wrappers to print buckets without copying strings
872 struct bucket_str {
873   const rgw_bucket& b;
874   bucket_str(const rgw_bucket& b) : b(b) {}
875 };
876 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
877   auto& b = rhs.b;
878   if (!b.tenant.empty()) {
879     out << b.tenant << '/';
880   }
881   out << b.name;
882   if (!b.bucket_id.empty()) {
883     out << ':' << b.bucket_id;
884   }
885   return out;
886 }
887
888 struct bucket_shard_str {
889   const rgw_bucket_shard& bs;
890   bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
891 };
892 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
893   auto& bs = rhs.bs;
894   out << bucket_str{bs.bucket};
895   if (bs.shard_id >= 0) {
896     out << ':' << bs.shard_id;
897   }
898   return out;
899 }
900
901 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
902   RGWDataSyncEnv *sync_env;
903   rgw_bucket_shard bs;
904   RGWBucketInfo bucket_info;
905   rgw_bucket_shard_sync_info sync_status;
906   RGWMetaSyncEnv meta_sync_env;
907
908   RGWDataSyncDebugLogger logger;
909   const std::string status_oid;
910
911   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
912   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
913
914 public:
915   RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
916     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
917       status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
918     logger.init(sync_env, "Bucket", bs.get_key());
919   }
920   ~RGWRunBucketSyncCoroutine() override {
921     if (lease_cr) {
922       lease_cr->abort();
923     }
924   }
925
926   int operate() override;
927 };
928
929 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
930   RGWDataSyncEnv *sync_env;
931
932   string raw_key;
933   string entry_marker;
934
935   rgw_bucket_shard bs;
936
937   int sync_status;
938
939   bufferlist md_bl;
940
941   RGWDataSyncShardMarkerTrack *marker_tracker;
942
943   boost::intrusive_ptr<RGWOmapAppend> error_repo;
944   bool remove_from_repo;
945
946   set<string> keys;
947
948 public:
949   RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
950                            const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
951                            RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
952                                                       sync_env(_sync_env),
953                                                       raw_key(_raw_key), entry_marker(_entry_marker),
954                                                       sync_status(0),
955                                                       marker_tracker(_marker_tracker),
956                                                       error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
957     set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
958   }
959
960   int operate() override {
961     reenter(this) {
962       do {
963         yield {
964           int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
965                                                 &bs.bucket, &bs.shard_id);
966           if (ret < 0) {
967             return set_cr_error(-EIO);
968           }
969           if (marker_tracker) {
970             marker_tracker->reset_need_retry(raw_key);
971           }
972           call(new RGWRunBucketSyncCoroutine(sync_env, bs));
973         }
974       } while (marker_tracker && marker_tracker->need_retry(raw_key));
975
976       sync_status = retcode;
977
978       if (sync_status == -ENOENT) {
979         // this was added when 'tenant/' was added to datalog entries, because
980         // preexisting tenant buckets could never sync and would stay in the
981         // error_repo forever
982         ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
983             "for missing bucket " << raw_key << dendl;
984         sync_status = 0;
985       }
986
987       if (sync_status < 0) {
988         yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
989                                                         -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
990         if (retcode < 0) {
991           ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
992         }
993         if (error_repo && !error_repo->append(raw_key)) {
994           ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
995         }
996       } else if (error_repo && remove_from_repo) {
997         keys = {raw_key};
998         yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
999         if (retcode < 0) {
1000           ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1001              << error_repo->get_obj() << " retcode=" << retcode << dendl;
1002         }
1003       }
1004       /* FIXME: what do do in case of error */
1005       if (marker_tracker && !entry_marker.empty()) {
1006         /* update marker */
1007         yield call(marker_tracker->finish(entry_marker));
1008       }
1009       if (sync_status == 0) {
1010         sync_status = retcode;
1011       }
1012       if (sync_status < 0) {
1013         return set_cr_error(sync_status);
1014       }
1015       return set_cr_done();
1016     }
1017     return 0;
1018   }
1019 };
1020
1021 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1022 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1023
1024 enum RemoteDatalogStatus {
1025   RemoteNotTrimmed = 0,
1026   RemoteTrimmed = 1,
1027   RemoteMightTrimmed = 2
1028 };
1029
1030 class RGWDataSyncShardCR : public RGWCoroutine {
1031   RGWDataSyncEnv *sync_env;
1032
1033   rgw_pool pool;
1034
1035   uint32_t shard_id;
1036   rgw_data_sync_marker sync_marker;
1037
1038   map<string, bufferlist> entries;
1039   map<string, bufferlist>::iterator iter;
1040
1041   string oid;
1042
1043   RGWDataSyncShardMarkerTrack *marker_tracker;
1044
1045   list<rgw_data_change_log_entry> log_entries;
1046   list<rgw_data_change_log_entry>::iterator log_iter;
1047   bool truncated;
1048
1049   RGWDataChangesLogInfo shard_info;
1050   string datalog_marker;
1051
1052   RemoteDatalogStatus remote_trimmed;
1053   Mutex inc_lock;
1054   Cond inc_cond;
1055
1056   boost::asio::coroutine incremental_cr;
1057   boost::asio::coroutine full_cr;
1058
1059
1060   set<string> modified_shards;
1061   set<string> current_modified;
1062
1063   set<string>::iterator modified_iter;
1064
1065   int total_entries;
1066
1067   int spawn_window;
1068
1069   bool *reset_backoff;
1070
1071   set<string> spawned_keys;
1072
1073   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1074   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1075   string status_oid;
1076
1077
1078   string error_oid;
1079   RGWOmapAppend *error_repo;
1080   map<string, bufferlist> error_entries;
1081   string error_marker;
1082   int max_error_entries;
1083
1084   ceph::real_time error_retry_time;
1085
1086 #define RETRY_BACKOFF_SECS_MIN 60
1087 #define RETRY_BACKOFF_SECS_DEFAULT 60
1088 #define RETRY_BACKOFF_SECS_MAX 600
1089   uint32_t retry_backoff_secs;
1090
1091   RGWDataSyncDebugLogger logger;
1092 public:
1093   RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1094                      rgw_pool& _pool,
1095                      uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1096                                                       sync_env(_sync_env),
1097                                                       pool(_pool),
1098                                                       shard_id(_shard_id),
1099                                                       sync_marker(_marker),
1100                                                       marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
1101                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1102                                                       lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1103                                                       retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1104     set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1105     status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1106     error_oid = status_oid + ".retry";
1107
1108     logger.init(sync_env, "DataShard", status_oid);
1109   }
1110
1111   ~RGWDataSyncShardCR() override {
1112     delete marker_tracker;
1113     if (lease_cr) {
1114       lease_cr->abort();
1115     }
1116     if (error_repo) {
1117       error_repo->put();
1118     }
1119   }
1120
1121   void append_modified_shards(set<string>& keys) {
1122     Mutex::Locker l(inc_lock);
1123     modified_shards.insert(keys.begin(), keys.end());
1124   }
1125
1126   void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1127     delete marker_tracker;
1128     marker_tracker = mt;
1129   }
1130
1131   int operate() override {
1132     int r;
1133     while (true) {
1134       switch (sync_marker.state) {
1135       case rgw_data_sync_marker::FullSync:
1136         r = full_sync();
1137         if (r < 0) {
1138           ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1139           return set_cr_error(r);
1140         }
1141         return 0;
1142       case rgw_data_sync_marker::IncrementalSync:
1143         r  = incremental_sync();
1144         if (r < 0) {
1145           ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1146           return set_cr_error(r);
1147         }
1148         return 0;
1149       default:
1150         return set_cr_error(-EIO);
1151       }
1152     }
1153     return 0;
1154   }
1155
1156   void init_lease_cr() {
1157     set_status("acquiring sync lock");
1158     uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1159     string lock_name = "sync_lock";
1160     if (lease_cr) {
1161       lease_cr->abort();
1162     }
1163     RGWRados *store = sync_env->store;
1164     lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1165                                             rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1166                                             lock_name, lock_duration, this));
1167     lease_stack.reset(spawn(lease_cr.get(), false));
1168   }
1169
1170   int full_sync() {
1171 #define OMAP_GET_MAX_ENTRIES 100
1172     int max_entries = OMAP_GET_MAX_ENTRIES;
1173     reenter(&full_cr) {
1174       yield init_lease_cr();
1175       while (!lease_cr->is_locked()) {
1176         if (lease_cr->is_done()) {
1177           ldout(cct, 5) << "lease cr failed, done early " << dendl;
1178           set_status("lease lock failed, early abort");
1179           return set_cr_error(lease_cr->get_ret_status());
1180         }
1181         set_sleeping(true);
1182         yield;
1183       }
1184       logger.log("full sync");
1185       oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1186       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1187       total_entries = sync_marker.pos;
1188       do {
1189         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1190         if (retcode < 0) {
1191           ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1192           lease_cr->go_down();
1193           drain_all();
1194           return set_cr_error(retcode);
1195         }
1196         iter = entries.begin();
1197         for (; iter != entries.end(); ++iter) {
1198           ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1199           total_entries++;
1200           if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1201             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1202           } else {
1203             // fetch remote and write locally
1204             yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1205             if (retcode < 0) {
1206               lease_cr->go_down();
1207               drain_all();
1208               return set_cr_error(retcode);
1209             }
1210           }
1211           sync_marker.marker = iter->first;
1212         }
1213       } while ((int)entries.size() == max_entries);
1214
1215       lease_cr->go_down();
1216       drain_all();
1217
1218       yield {
1219         /* update marker to reflect we're done with full sync */
1220         sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1221         sync_marker.marker = sync_marker.next_step_marker;
1222         sync_marker.next_step_marker.clear();
1223         RGWRados *store = sync_env->store;
1224         call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1225                                                              rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1226                                                              sync_marker));
1227       }
1228       if (retcode < 0) {
1229         ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1230         lease_cr->go_down();
1231         return set_cr_error(retcode);
1232       }
1233     }
1234     return 0;
1235   }
1236
1237   int incremental_sync() {
1238     reenter(&incremental_cr) {
1239       yield init_lease_cr();
1240       while (!lease_cr->is_locked()) {
1241         if (lease_cr->is_done()) {
1242           ldout(cct, 5) << "lease cr failed, done early " << dendl;
1243           set_status("lease lock failed, early abort");
1244           return set_cr_error(lease_cr->get_ret_status());
1245         }
1246         set_sleeping(true);
1247         yield;
1248       }
1249       set_status("lease acquired");
1250       error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1251                                      rgw_raw_obj(pool, error_oid),
1252                                      1 /* no buffer */);
1253       error_repo->get();
1254       spawn(error_repo, false);
1255       logger.log("inc sync");
1256       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1257       do {
1258         current_modified.clear();
1259         inc_lock.Lock();
1260         current_modified.swap(modified_shards);
1261         inc_lock.Unlock();
1262
1263         /* process out of band updates */
1264         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1265           yield {
1266             ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1267             spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1268           }
1269         }
1270
1271         /* process bucket shards that previously failed */
1272         yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1273                                              error_marker, &error_entries,
1274                                              max_error_entries));
1275         ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1276         iter = error_entries.begin();
1277         for (; iter != error_entries.end(); ++iter) {
1278           ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1279           spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1280           error_marker = iter->first;
1281         }
1282         if ((int)error_entries.size() != max_error_entries) {
1283           if (error_marker.empty() && error_entries.empty()) {
1284             /* the retry repo is empty, we back off a bit before calling it again */
1285             retry_backoff_secs *= 2;
1286             if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1287               retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1288             }
1289           } else {
1290             retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1291           }
1292           error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1293           error_marker.clear();
1294         }
1295
1296
1297         yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1298         if (retcode < 0) {
1299           ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1300           stop_spawned_services();
1301           drain_all();
1302           return set_cr_error(retcode);
1303         }
1304         datalog_marker = shard_info.marker;
1305         remote_trimmed = RemoteNotTrimmed;
1306 #define INCREMENTAL_MAX_ENTRIES 100
1307         ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1308         if (datalog_marker > sync_marker.marker) {
1309           spawned_keys.clear();
1310           if (sync_marker.marker.empty())
1311             remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
1312           yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1313           if (retcode < 0) {
1314             ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1315             stop_spawned_services();
1316             drain_all();
1317             return set_cr_error(retcode);
1318           }
1319           if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
1320             remote_trimmed = RemoteTrimmed;
1321           else
1322             remote_trimmed = RemoteNotTrimmed;
1323           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1324             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1325             if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1326               ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1327               marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1328               continue;
1329             }
1330             if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1331               ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1332             } else {
1333               /*
1334                * don't spawn the same key more than once. We can do that as long as we don't yield
1335                */
1336               if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1337                 spawned_keys.insert(log_iter->entry.key);
1338                 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1339                 if (retcode < 0) {
1340                   stop_spawned_services();
1341                   drain_all();
1342                   return set_cr_error(retcode);
1343                 }
1344               }
1345             }
1346           }
1347           while ((int)num_spawned() > spawn_window) {
1348             set_status() << "num_spawned() > spawn_window";
1349             yield wait_for_child();
1350             int ret;
1351             while (collect(&ret, lease_stack.get())) {
1352               if (ret < 0) {
1353                 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1354                 /* we have reported this error */
1355               }
1356               /* not waiting for child here */
1357             }
1358           }
1359         }
1360         ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1361         if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
1362 #define INCREMENTAL_INTERVAL 20
1363           yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1364         }
1365       } while (true);
1366     }
1367     return 0;
1368   }
1369   void stop_spawned_services() {
1370     lease_cr->go_down();
1371     if (error_repo) {
1372       error_repo->finish();
1373       error_repo->put();
1374       error_repo = NULL;
1375     }
1376   }
1377 };
1378
1379 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1380   RGWDataSyncEnv *sync_env;
1381
1382   rgw_pool pool;
1383
1384   uint32_t shard_id;
1385   rgw_data_sync_marker sync_marker;
1386
1387 public:
1388   RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1389                      uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1390                                                       sync_env(_sync_env),
1391                                                       pool(_pool),
1392                                                       shard_id(_shard_id),
1393                                                       sync_marker(_marker) {
1394   }
1395
1396   RGWCoroutine *alloc_cr() override {
1397     return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1398   }
1399
1400   RGWCoroutine *alloc_finisher_cr() override {
1401     RGWRados *store = sync_env->store;
1402     return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1403                                                           rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1404                                                           &sync_marker);
1405   }
1406
1407   void append_modified_shards(set<string>& keys) {
1408     Mutex::Locker l(cr_lock());
1409
1410     RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1411     if (!cr) {
1412       return;
1413     }
1414
1415     cr->append_modified_shards(keys);
1416   }
1417 };
1418
1419 class RGWDataSyncCR : public RGWCoroutine {
1420   RGWDataSyncEnv *sync_env;
1421   uint32_t num_shards;
1422
1423   rgw_data_sync_status sync_status;
1424
1425   RGWDataSyncShardMarkerTrack *marker_tracker;
1426
1427   Mutex shard_crs_lock;
1428   map<int, RGWDataSyncShardControlCR *> shard_crs;
1429
1430   bool *reset_backoff;
1431
1432   RGWDataSyncDebugLogger logger;
1433
1434   RGWDataSyncModule *data_sync_module{nullptr};
1435 public:
1436   RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1437                                                       sync_env(_sync_env),
1438                                                       num_shards(_num_shards),
1439                                                       marker_tracker(NULL),
1440                                                       shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1441                                                       reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1442
1443   }
1444
1445   ~RGWDataSyncCR() override {
1446     for (auto iter : shard_crs) {
1447       iter.second->put();
1448     }
1449   }
1450
1451   int operate() override {
1452     reenter(this) {
1453
1454       /* read sync status */
1455       yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1456
1457       data_sync_module = sync_env->sync_module->get_data_handler();
1458
1459       if (retcode < 0 && retcode != -ENOENT) {
1460         ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1461         return set_cr_error(retcode);
1462       }
1463
1464       /* state: init status */
1465       if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1466         ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1467         sync_status.sync_info.num_shards = num_shards;
1468         uint64_t instance_id;
1469         get_random_bytes((char *)&instance_id, sizeof(instance_id));
1470         yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1471         if (retcode < 0) {
1472           ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1473           return set_cr_error(retcode);
1474         }
1475         // sets state = StateBuildingFullSyncMaps
1476
1477         *reset_backoff = true;
1478       }
1479
1480       data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1481
1482       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1483         /* call sync module init here */
1484         yield call(data_sync_module->init_sync(sync_env));
1485         if (retcode < 0) {
1486           ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1487           return set_cr_error(retcode);
1488         }
1489         /* state: building full sync maps */
1490         ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1491         yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1492         if (retcode < 0) {
1493           ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1494           return set_cr_error(retcode);
1495         }
1496         sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1497
1498         /* update new state */
1499         yield call(set_sync_info_cr());
1500         if (retcode < 0) {
1501           ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1502           return set_cr_error(retcode);
1503         }
1504
1505         *reset_backoff = true;
1506       }
1507
1508       yield {
1509         if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1510           for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1511                iter != sync_status.sync_markers.end(); ++iter) {
1512             RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1513                                                                           iter->first, iter->second);
1514             cr->get();
1515             shard_crs_lock.Lock();
1516             shard_crs[iter->first] = cr;
1517             shard_crs_lock.Unlock();
1518             spawn(cr, true);
1519           }
1520         }
1521       }
1522
1523       return set_cr_done();
1524     }
1525     return 0;
1526   }
1527
1528   RGWCoroutine *set_sync_info_cr() {
1529     RGWRados *store = sync_env->store;
1530     return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1531                                                          rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1532                                                          sync_status.sync_info);
1533   }
1534
1535   void wakeup(int shard_id, set<string>& keys) {
1536     Mutex::Locker l(shard_crs_lock);
1537     map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1538     if (iter == shard_crs.end()) {
1539       return;
1540     }
1541     iter->second->append_modified_shards(keys);
1542     iter->second->wakeup();
1543   }
1544 };
1545
1546 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1547 public:
1548   RGWDefaultDataSyncModule() {}
1549
1550   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1551   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1552   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1553                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1554 };
1555
1556 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1557   RGWDefaultDataSyncModule data_handler;
1558 public:
1559   RGWDefaultSyncModuleInstance() {}
1560   RGWDataSyncModule *get_data_handler() override {
1561     return &data_handler;
1562   }
1563 };
1564
1565 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1566 {
1567   instance->reset(new RGWDefaultSyncModuleInstance());
1568   return 0;
1569 }
1570
1571 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1572 {
1573   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1574                                  key, versioned_epoch,
1575                                  true, zones_trace);
1576 }
1577
1578 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1579                                                       real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1580 {
1581   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1582                             bucket_info, key, versioned, versioned_epoch,
1583                             NULL, NULL, false, &mtime, zones_trace);
1584 }
1585
1586 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1587                                                              rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1588 {
1589   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1590                             bucket_info, key, versioned, versioned_epoch,
1591                             &owner.id, &owner.display_name, true, &mtime, zones_trace);
1592 }
1593
1594 class RGWDataSyncControlCR : public RGWBackoffControlCR
1595 {
1596   RGWDataSyncEnv *sync_env;
1597   uint32_t num_shards;
1598
1599   static constexpr bool exit_on_error = false; // retry on all errors
1600 public:
1601   RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1602                                                       sync_env(_sync_env), num_shards(_num_shards) {
1603   }
1604
1605   RGWCoroutine *alloc_cr() override {
1606     return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1607   }
1608
1609   void wakeup(int shard_id, set<string>& keys) {
1610     Mutex& m = cr_lock();
1611
1612     m.Lock();
1613     RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1614     if (!cr) {
1615       m.Unlock();
1616       return;
1617     }
1618
1619     cr->get();
1620     m.Unlock();
1621
1622     if (cr) {
1623       cr->wakeup(shard_id, keys);
1624     }
1625
1626     cr->put();
1627   }
1628 };
1629
1630 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1631   RWLock::RLocker rl(lock);
1632   if (!data_sync_cr) {
1633     return;
1634   }
1635   data_sync_cr->wakeup(shard_id, keys);
1636 }
1637
1638 int RGWRemoteDataLog::run_sync(int num_shards)
1639 {
1640   lock.get_write();
1641   data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1642   data_sync_cr->get(); // run() will drop a ref, so take another
1643   lock.unlock();
1644
1645   int r = run(data_sync_cr);
1646
1647   lock.get_write();
1648   data_sync_cr->put();
1649   data_sync_cr = NULL;
1650   lock.unlock();
1651
1652   if (r < 0) {
1653     ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1654     return r;
1655   }
1656   return 0;
1657 }
1658
1659 int RGWDataSyncStatusManager::init()
1660 {
1661   auto zone_def_iter = store->zone_by_id.find(source_zone);
1662   if (zone_def_iter == store->zone_by_id.end()) {
1663     ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1664     return -EIO;
1665   }
1666
1667   auto& zone_def = zone_def_iter->second;
1668
1669   if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1670     return -ENOTSUP;
1671   }
1672
1673   RGWZoneParams& zone_params = store->get_zone_params();
1674
1675   sync_module = store->get_sync_module();
1676
1677   conn = store->get_zone_conn_by_id(source_zone);
1678   if (!conn) {
1679     ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1680     return -EINVAL;
1681   }
1682
1683   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1684
1685   int r = source_log.init(source_zone, conn, error_logger, sync_module);
1686   if (r < 0) {
1687     lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1688     finalize();
1689     return r;
1690   }
1691
1692   rgw_datalog_info datalog_info;
1693   r = source_log.read_log_info(&datalog_info);
1694   if (r < 0) {
1695     ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1696     finalize();
1697     return r;
1698   }
1699
1700   num_shards = datalog_info.num_shards;
1701
1702   for (int i = 0; i < num_shards; i++) {
1703     shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1704   }
1705
1706   return 0;
1707 }
1708
1709 void RGWDataSyncStatusManager::finalize()
1710 {
1711   delete error_logger;
1712   error_logger = nullptr;
1713 }
1714
1715 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1716 {
1717   char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1718   snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1719
1720   return string(buf);
1721 }
1722
1723 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1724 {
1725   char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1726   snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1727
1728   return string(buf);
1729 }
1730
1731 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1732                              const rgw_bucket& bucket, int shard_id,
1733                              RGWSyncErrorLogger *_error_logger,
1734                              RGWSyncModuleInstanceRef& _sync_module)
1735 {
1736   conn = _conn;
1737   source_zone = _source_zone;
1738   bs.bucket = bucket;
1739   bs.shard_id = shard_id;
1740
1741   sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
1742
1743   return 0;
1744 }
1745
1746 struct bucket_index_marker_info {
1747   string bucket_ver;
1748   string master_ver;
1749   string max_marker;
1750   bool syncstopped{false};
1751
1752   void decode_json(JSONObj *obj) {
1753     JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1754     JSONDecoder::decode_json("master_ver", master_ver, obj);
1755     JSONDecoder::decode_json("max_marker", max_marker, obj);
1756     JSONDecoder::decode_json("syncstopped", syncstopped, obj);
1757   }
1758 };
1759
1760 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1761   RGWDataSyncEnv *sync_env;
1762   const string instance_key;
1763
1764   bucket_index_marker_info *info;
1765
1766 public:
1767   RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1768                                   const rgw_bucket_shard& bs,
1769                                   bucket_index_marker_info *_info)
1770     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1771       instance_key(bs.get_key()), info(_info) {}
1772
1773   int operate() override {
1774     reenter(this) {
1775       yield {
1776         rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1777                                         { "bucket-instance", instance_key.c_str() },
1778                                         { "info" , NULL },
1779                                         { NULL, NULL } };
1780
1781         string p = "/admin/log/";
1782         call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1783       }
1784       if (retcode < 0) {
1785         return set_cr_error(retcode);
1786       }
1787       return set_cr_done();
1788     }
1789     return 0;
1790   }
1791 };
1792
1793 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1794   RGWDataSyncEnv *sync_env;
1795
1796   rgw_bucket_shard bs;
1797   const string sync_status_oid;
1798
1799   rgw_bucket_shard_sync_info& status;
1800
1801   bucket_index_marker_info info;
1802 public:
1803   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1804                                         const rgw_bucket_shard& bs,
1805                                         rgw_bucket_shard_sync_info& _status)
1806     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1807       sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1808       status(_status)
1809   {}
1810
1811   int operate() override {
1812     reenter(this) {
1813       /* fetch current position in logs */
1814       yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1815       if (retcode < 0 && retcode != -ENOENT) {
1816         ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1817         return set_cr_error(retcode);
1818       }
1819       yield {
1820         auto store = sync_env->store;
1821         rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1822
1823         if (info.syncstopped) {
1824           call(new RGWRadosRemoveCR(store, obj));
1825         } else {
1826           status.state = rgw_bucket_shard_sync_info::StateFullSync;
1827           status.inc_marker.position = info.max_marker;
1828           map<string, bufferlist> attrs;
1829           status.encode_all_attrs(attrs);
1830           call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1831         }
1832       }
1833       return set_cr_done();
1834     }
1835     return 0;
1836   }
1837 };
1838
1839 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1840 {
1841   return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1842 }
1843
1844 template <class T>
1845 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1846 {
1847   map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1848   if (iter == attrs.end()) {
1849     *val = T();
1850     return;
1851   }
1852
1853   bufferlist::iterator biter = iter->second.begin();
1854   try {
1855     ::decode(*val, biter);
1856   } catch (buffer::error& err) {
1857     ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1858   }
1859 }
1860
1861 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1862 {
1863   decode_attr(cct, attrs, "state", &state);
1864   decode_attr(cct, attrs, "full_marker", &full_marker);
1865   decode_attr(cct, attrs, "inc_marker", &inc_marker);
1866 }
1867
1868 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1869 {
1870   encode_state_attr(attrs);
1871   full_marker.encode_attr(attrs);
1872   inc_marker.encode_attr(attrs);
1873 }
1874
1875 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1876 {
1877   ::encode(state, attrs["state"]);
1878 }
1879
1880 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1881 {
1882   ::encode(*this, attrs["full_marker"]);
1883 }
1884
1885 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1886 {
1887   ::encode(*this, attrs["inc_marker"]);
1888 }
1889
1890 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1891   RGWDataSyncEnv *sync_env;
1892   string oid;
1893   rgw_bucket_shard_sync_info *status;
1894
1895   map<string, bufferlist> attrs;
1896 public:
1897   RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1898                                    const rgw_bucket_shard& bs,
1899                                    rgw_bucket_shard_sync_info *_status)
1900     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1901       oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1902       status(_status) {}
1903   int operate() override;
1904 };
1905
1906 int RGWReadBucketSyncStatusCoroutine::operate()
1907 {
1908   reenter(this) {
1909     yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1910                                                    rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1911                                                    &attrs));
1912     if (retcode == -ENOENT) {
1913       *status = rgw_bucket_shard_sync_info();
1914       return set_cr_done();
1915     }
1916     if (retcode < 0) {
1917       ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1918       return set_cr_error(retcode);
1919     }
1920     status->decode_from_attrs(sync_env->cct, attrs);
1921     return set_cr_done();
1922   }
1923   return 0;
1924 }
1925 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1926 {
1927   return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1928 }
1929
1930 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1931   for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1932     delete iter->second;
1933   }
1934   delete error_logger;
1935 }
1936
1937
1938 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1939 {
1940   JSONDecoder::decode_json("ID", id, obj);
1941   JSONDecoder::decode_json("DisplayName", display_name, obj);
1942 }
1943
1944 struct bucket_list_entry {
1945   bool delete_marker;
1946   rgw_obj_key key;
1947   bool is_latest;
1948   real_time mtime;
1949   string etag;
1950   uint64_t size;
1951   string storage_class;
1952   rgw_bucket_entry_owner owner;
1953   uint64_t versioned_epoch;
1954   string rgw_tag;
1955
1956   bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1957
1958   void decode_json(JSONObj *obj) {
1959     JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1960     JSONDecoder::decode_json("Key", key.name, obj);
1961     JSONDecoder::decode_json("VersionId", key.instance, obj);
1962     JSONDecoder::decode_json("IsLatest", is_latest, obj);
1963     string mtime_str;
1964     JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1965
1966     struct tm t;
1967     uint32_t nsec;
1968     if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1969       ceph_timespec ts;
1970       ts.tv_sec = (uint64_t)internal_timegm(&t);
1971       ts.tv_nsec = nsec;
1972       mtime = real_clock::from_ceph_timespec(ts);
1973     }
1974     JSONDecoder::decode_json("ETag", etag, obj);
1975     JSONDecoder::decode_json("Size", size, obj);
1976     JSONDecoder::decode_json("StorageClass", storage_class, obj);
1977     JSONDecoder::decode_json("Owner", owner, obj);
1978     JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1979     JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1980   }
1981 };
1982
1983 struct bucket_list_result {
1984   string name;
1985   string prefix;
1986   string key_marker;
1987   string version_id_marker;
1988   int max_keys;
1989   bool is_truncated;
1990   list<bucket_list_entry> entries;
1991
1992   bucket_list_result() : max_keys(0), is_truncated(false) {}
1993
1994   void decode_json(JSONObj *obj) {
1995     JSONDecoder::decode_json("Name", name, obj);
1996     JSONDecoder::decode_json("Prefix", prefix, obj);
1997     JSONDecoder::decode_json("KeyMarker", key_marker, obj);
1998     JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
1999     JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2000     JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2001     JSONDecoder::decode_json("Entries", entries, obj);
2002   }
2003 };
2004
2005 class RGWListBucketShardCR: public RGWCoroutine {
2006   RGWDataSyncEnv *sync_env;
2007   const rgw_bucket_shard& bs;
2008   const string instance_key;
2009   rgw_obj_key marker_position;
2010
2011   bucket_list_result *result;
2012
2013 public:
2014   RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2015                        rgw_obj_key& _marker_position, bucket_list_result *_result)
2016     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2017       instance_key(bs.get_key()), marker_position(_marker_position),
2018       result(_result) {}
2019
2020   int operate() override {
2021     reenter(this) {
2022       yield {
2023         rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2024                                         { "versions" , NULL },
2025                                         { "format" , "json" },
2026                                         { "objs-container" , "true" },
2027                                         { "key-marker" , marker_position.name.c_str() },
2028                                         { "version-id-marker" , marker_position.instance.c_str() },
2029                                         { NULL, NULL } };
2030         // don't include tenant in the url, it's already part of instance_key
2031         string p = string("/") + bs.bucket.name;
2032         call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2033       }
2034       if (retcode < 0) {
2035         return set_cr_error(retcode);
2036       }
2037       return set_cr_done();
2038     }
2039     return 0;
2040   }
2041 };
2042
2043 class RGWListBucketIndexLogCR: public RGWCoroutine {
2044   RGWDataSyncEnv *sync_env;
2045   const string instance_key;
2046   string marker;
2047
2048   list<rgw_bi_log_entry> *result;
2049
2050 public:
2051   RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2052                           string& _marker, list<rgw_bi_log_entry> *_result)
2053     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2054       instance_key(bs.get_key()), marker(_marker), result(_result) {}
2055
2056   int operate() override {
2057     reenter(this) {
2058       yield {
2059         rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2060                                         { "format" , "json" },
2061                                         { "marker" , marker.c_str() },
2062                                         { "type", "bucket-index" },
2063                                         { NULL, NULL } };
2064
2065         call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2066       }
2067       if (retcode < 0) {
2068         return set_cr_error(retcode);
2069       }
2070       return set_cr_done();
2071     }
2072     return 0;
2073   }
2074 };
2075
2076 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2077
2078 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2079   RGWDataSyncEnv *sync_env;
2080
2081   string marker_oid;
2082   rgw_bucket_shard_full_sync_marker sync_marker;
2083
2084 public:
2085   RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2086                          const string& _marker_oid,
2087                          const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2088                                                                 sync_env(_sync_env),
2089                                                                 marker_oid(_marker_oid),
2090                                                                 sync_marker(_marker) {}
2091
2092   RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2093     sync_marker.position = new_marker;
2094     sync_marker.count = index_pos;
2095
2096     map<string, bufferlist> attrs;
2097     sync_marker.encode_attr(attrs);
2098
2099     RGWRados *store = sync_env->store;
2100
2101     ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2102     return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2103                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2104                                           attrs);
2105   }
2106 };
2107
2108 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2109   RGWDataSyncEnv *sync_env;
2110
2111   string marker_oid;
2112   rgw_bucket_shard_inc_sync_marker sync_marker;
2113
2114   map<rgw_obj_key, string> key_to_marker;
2115   map<string, rgw_obj_key> marker_to_key;
2116
2117   void handle_finish(const string& marker) override {
2118     map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2119     if (iter == marker_to_key.end()) {
2120       return;
2121     }
2122     key_to_marker.erase(iter->second);
2123     reset_need_retry(iter->second);
2124     marker_to_key.erase(iter);
2125   }
2126
2127 public:
2128   RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2129                          const string& _marker_oid,
2130                          const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2131                                                                 sync_env(_sync_env),
2132                                                                 marker_oid(_marker_oid),
2133                                                                 sync_marker(_marker) {}
2134
2135   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2136     sync_marker.position = new_marker;
2137
2138     map<string, bufferlist> attrs;
2139     sync_marker.encode_attr(attrs);
2140
2141     RGWRados *store = sync_env->store;
2142
2143     ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2144     return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2145                                           store,
2146                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2147                                           attrs);
2148   }
2149
2150   /*
2151    * create index from key -> <op, marker>, and from marker -> key
2152    * this is useful so that we can insure that we only have one
2153    * entry for any key that is used. This is needed when doing
2154    * incremenatl sync of data, and we don't want to run multiple
2155    * concurrent sync operations for the same bucket shard 
2156    * Also, we should make sure that we don't run concurrent operations on the same key with
2157    * different ops.
2158    */
2159   bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2160     if (key_to_marker.find(key) != key_to_marker.end()) {
2161       set_need_retry(key);
2162       return false;
2163     }
2164     key_to_marker[key] = marker;
2165     marker_to_key[marker] = key;
2166     return true;
2167   }
2168
2169   bool can_do_op(const rgw_obj_key& key) {
2170     return (key_to_marker.find(key) == key_to_marker.end());
2171   }
2172 };
2173
2174 template <class T, class K>
2175 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2176   RGWDataSyncEnv *sync_env;
2177
2178   RGWBucketInfo *bucket_info;
2179   const rgw_bucket_shard& bs;
2180
2181   rgw_obj_key key;
2182   bool versioned;
2183   uint64_t versioned_epoch;
2184   rgw_bucket_entry_owner owner;
2185   real_time timestamp;
2186   RGWModifyOp op;
2187   RGWPendingState op_state;
2188
2189   T entry_marker;
2190   RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2191
2192   int sync_status;
2193
2194   stringstream error_ss;
2195
2196   RGWDataSyncDebugLogger logger;
2197
2198   bool error_injection;
2199
2200   RGWDataSyncModule *data_sync_module;
2201   
2202   rgw_zone_set zones_trace;
2203
2204 public:
2205   RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2206                              RGWBucketInfo *_bucket_info,
2207                              const rgw_bucket_shard& bs,
2208                              const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2209                              real_time& _timestamp,
2210                              const rgw_bucket_entry_owner& _owner,
2211                              RGWModifyOp _op, RGWPendingState _op_state,
2212                              const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2213                                                       sync_env(_sync_env),
2214                                                       bucket_info(_bucket_info), bs(bs),
2215                                                       key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2216                                                       owner(_owner),
2217                                                       timestamp(_timestamp), op(_op),
2218                                                       op_state(_op_state),
2219                                                       entry_marker(_entry_marker),
2220                                                       marker_tracker(_marker_tracker),
2221                                                       sync_status(0){
2222     stringstream ss;
2223     ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2224     set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2225     ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2226     set_status("init");
2227
2228     logger.init(sync_env, "Object", ss.str());
2229
2230     error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2231
2232     data_sync_module = sync_env->sync_module->get_data_handler();
2233     
2234     zones_trace = _zones_trace;
2235     zones_trace.insert(sync_env->store->get_zone().id);
2236   }
2237
2238   int operate() override {
2239     reenter(this) {
2240       /* skip entries that are not complete */
2241       if (op_state != CLS_RGW_STATE_COMPLETE) {
2242         goto done;
2243       }
2244       do {
2245         yield {
2246           marker_tracker->reset_need_retry(key);
2247           if (key.name.empty()) {
2248             /* shouldn't happen */
2249             set_status("skipping empty entry");
2250             ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2251             goto done;
2252           }
2253           if (error_injection &&
2254               rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2255             ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2256             retcode = -EIO;
2257           } else if (op == CLS_RGW_OP_ADD ||
2258                      op == CLS_RGW_OP_LINK_OLH) {
2259             if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2260               set_status("skipping entry");
2261               ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2262               goto done;
2263
2264             }
2265             set_status("syncing obj");
2266             ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2267             logger.log("fetch");
2268             call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2269           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2270             set_status("removing obj");
2271             if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2272               versioned = true;
2273             }
2274             logger.log("remove");
2275             call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2276           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2277             logger.log("creating delete marker");
2278             set_status("creating delete marker");
2279             ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2280             call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2281           }
2282         }
2283       } while (marker_tracker->need_retry(key));
2284       {
2285         stringstream ss;
2286         if (retcode >= 0) {
2287           ss << "done";
2288         } else {
2289           ss << "done, retcode=" << retcode;
2290         }
2291         logger.log(ss.str());
2292       }
2293
2294       if (retcode < 0 && retcode != -ENOENT) {
2295         set_status() << "failed to sync obj; retcode=" << retcode;
2296         ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2297             << bucket_shard_str{bs} << "/" << key.name << dendl;
2298         error_ss << bucket_shard_str{bs} << "/" << key.name;
2299         sync_status = retcode;
2300       }
2301       if (!error_ss.str().empty()) {
2302         yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2303       }
2304 done:
2305       if (sync_status == 0) {
2306         /* update marker */
2307         set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2308         yield call(marker_tracker->finish(entry_marker));
2309         sync_status = retcode;
2310       }
2311       if (sync_status < 0) {
2312         return set_cr_error(sync_status);
2313       }
2314       return set_cr_done();
2315     }
2316     return 0;
2317   }
2318 };
2319
2320 #define BUCKET_SYNC_SPAWN_WINDOW 20
2321
2322 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2323   RGWDataSyncEnv *sync_env;
2324   const rgw_bucket_shard& bs;
2325   RGWBucketInfo *bucket_info;
2326   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2327   bucket_list_result list_result;
2328   list<bucket_list_entry>::iterator entries_iter;
2329   rgw_bucket_shard_full_sync_marker& full_marker;
2330   RGWBucketFullSyncShardMarkerTrack marker_tracker;
2331   rgw_obj_key list_marker;
2332   bucket_list_entry *entry{nullptr};
2333   RGWModifyOp op{CLS_RGW_OP_ADD};
2334
2335   int total_entries{0};
2336
2337   int sync_status{0};
2338
2339   const string& status_oid;
2340
2341   RGWDataSyncDebugLogger logger;
2342   rgw_zone_set zones_trace;
2343 public:
2344   RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2345                            RGWBucketInfo *_bucket_info,
2346                            const std::string& status_oid,
2347                            RGWContinuousLeaseCR *lease_cr,
2348                            rgw_bucket_shard_full_sync_marker& _full_marker)
2349     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2350       bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2351       marker_tracker(sync_env, status_oid, full_marker),
2352       status_oid(status_oid) {
2353     logger.init(sync_env, "BucketFull", bs.get_key());
2354     zones_trace.insert(sync_env->source_zone);
2355   }
2356
2357   int operate() override;
2358 };
2359
2360 int RGWBucketShardFullSyncCR::operate()
2361 {
2362   int ret;
2363   reenter(this) {
2364     list_marker = full_marker.position;
2365
2366     total_entries = full_marker.count;
2367     do {
2368       if (!lease_cr->is_locked()) {
2369         drain_all();
2370         return set_cr_error(-ECANCELED);
2371       }
2372       set_status("listing remote bucket");
2373       ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2374       yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2375                                           &list_result));
2376       if (retcode < 0 && retcode != -ENOENT) {
2377         set_status("failed bucket listing, going down");
2378         drain_all();
2379         return set_cr_error(retcode);
2380       }
2381       entries_iter = list_result.entries.begin();
2382       for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2383         if (!lease_cr->is_locked()) {
2384           drain_all();
2385           return set_cr_error(-ECANCELED);
2386         }
2387         ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2388             << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2389         entry = &(*entries_iter);
2390         total_entries++;
2391         list_marker = entries_iter->key;
2392         if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2393           ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2394         } else {
2395           op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2396           using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2397           yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2398                                  false, /* versioned, only matters for object removal */
2399                                  entry->versioned_epoch, entry->mtime,
2400                                  entry->owner, op, CLS_RGW_STATE_COMPLETE,
2401                                  entry->key, &marker_tracker, zones_trace),
2402                       false);
2403         }
2404         while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2405           yield wait_for_child();
2406           bool again = true;
2407           while (again) {
2408             again = collect(&ret, nullptr);
2409             if (ret < 0) {
2410               ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2411               sync_status = ret;
2412               /* we have reported this error */
2413             }
2414           }
2415         }
2416       }
2417     } while (list_result.is_truncated && sync_status == 0);
2418     set_status("done iterating over all objects");
2419     /* wait for all operations to complete */
2420     while (num_spawned()) {
2421       yield wait_for_child();
2422       bool again = true;
2423       while (again) {
2424         again = collect(&ret, nullptr);
2425         if (ret < 0) {
2426           ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2427           sync_status = ret;
2428           /* we have reported this error */
2429         }
2430       }
2431     }
2432     if (!lease_cr->is_locked()) {
2433       return set_cr_error(-ECANCELED);
2434     }
2435     /* update sync state to incremental */
2436     if (sync_status == 0) {
2437       yield {
2438         rgw_bucket_shard_sync_info sync_status;
2439         sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2440         map<string, bufferlist> attrs;
2441         sync_status.encode_state_attr(attrs);
2442         RGWRados *store = sync_env->store;
2443         call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2444                                             rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2445                                             attrs));
2446       }
2447     } else {
2448       ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2449     }
2450     if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2451       ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2452           << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2453       return set_cr_error(retcode);
2454     }
2455     if (sync_status < 0) {
2456       return set_cr_error(sync_status);
2457     }
2458     return set_cr_done();
2459   }
2460   return 0;
2461 }
2462
2463 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2464   RGWDataSyncEnv *sync_env;
2465   const rgw_bucket_shard& bs;
2466   RGWBucketInfo *bucket_info;
2467   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2468   list<rgw_bi_log_entry> list_result;
2469   list<rgw_bi_log_entry>::iterator entries_iter;
2470   map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2471   rgw_bucket_shard_inc_sync_marker& inc_marker;
2472   rgw_obj_key key;
2473   rgw_bi_log_entry *entry{nullptr};
2474   RGWBucketIncSyncShardMarkerTrack marker_tracker;
2475   bool updated_status{false};
2476   const string& status_oid;
2477   const string& zone_id;
2478   ceph::real_time sync_modify_time;
2479
2480   string cur_id;
2481
2482   RGWDataSyncDebugLogger logger;
2483
2484   int sync_status{0};
2485   bool syncstopped{false};
2486
2487 public:
2488   RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2489                                   const rgw_bucket_shard& bs,
2490                                   RGWBucketInfo *_bucket_info,
2491                                   const std::string& status_oid,
2492                                   RGWContinuousLeaseCR *lease_cr,
2493                                   rgw_bucket_shard_inc_sync_marker& _inc_marker)
2494     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2495       bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2496       marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2497     set_description() << "bucket shard incremental sync bucket="
2498         << bucket_shard_str{bs};
2499     set_status("init");
2500     logger.init(sync_env, "BucketInc", bs.get_key());
2501   }
2502
2503   int operate() override;
2504 };
2505
2506 int RGWBucketShardIncrementalSyncCR::operate()
2507 {
2508   int ret;
2509   reenter(this) {
2510     do {
2511       if (!lease_cr->is_locked()) {
2512         drain_all();
2513         return set_cr_error(-ECANCELED);
2514       }
2515       ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
2516       set_status() << "listing bilog; position=" << inc_marker.position;
2517       yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2518                                              &list_result));
2519       if (retcode < 0 && retcode != -ENOENT ) {
2520         drain_all();
2521         if (!syncstopped) {
2522           /* wait for all operations to complete */
2523           return set_cr_error(retcode);
2524         } else {
2525           /* no need to retry */
2526           break;        
2527         }
2528       }
2529       squash_map.clear();
2530       for (auto& e : list_result) {
2531         if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
2532           ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
2533           sync_modify_time = e.timestamp;
2534           syncstopped = true;
2535           continue;
2536         }
2537         if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
2538           ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
2539           sync_modify_time = e.timestamp;
2540           syncstopped = false;
2541           continue;
2542         }
2543         if (e.state != CLS_RGW_STATE_COMPLETE) {
2544           continue;
2545         }
2546         if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2547           continue;
2548         }
2549         auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2550         if (squash_entry.first <= e.timestamp) {
2551           squash_entry = make_pair<>(e.timestamp, e.op);
2552         }
2553       }
2554
2555       entries_iter = list_result.begin();
2556       for (; entries_iter != list_result.end(); ++entries_iter) {
2557         if (!lease_cr->is_locked()) {
2558           drain_all();
2559           return set_cr_error(-ECANCELED);
2560         }
2561         entry = &(*entries_iter);
2562         {
2563           ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2564           if (p < 0) {
2565             cur_id = entry->id;
2566           } else {
2567             cur_id = entry->id.substr(p + 1);
2568           }
2569         }
2570         inc_marker.position = cur_id;
2571
2572         if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2573           ldout(sync_env->cct, 20) << "detected syncstop or resync  on " << entries_iter->timestamp << " , skipping entry" << dendl;
2574           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2575           continue;
2576         }
2577
2578         if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2579           set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2580           ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2581           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2582           continue;
2583         }
2584
2585         ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2586
2587         if (!key.ns.empty()) {
2588           set_status() << "skipping entry in namespace: " << entry->object;
2589           ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2590           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2591           continue;
2592         }
2593
2594         set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2595         if (entry->op == CLS_RGW_OP_CANCEL) {
2596           set_status() << "canceled operation, skipping";
2597           ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2598               << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2599           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2600           continue;
2601         }
2602         if (entry->state != CLS_RGW_STATE_COMPLETE) {
2603           set_status() << "non-complete operation, skipping";
2604           ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2605               << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2606           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2607           continue;
2608         }
2609         if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2610           set_status() << "redundant operation, skipping";
2611           ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2612               <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2613           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2614           continue;
2615         }
2616         if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2617           set_status() << "squashed operation, skipping";
2618           ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2619               << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2620           /* not updating high marker though */
2621           continue;
2622         }
2623         ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2624             << bucket_shard_str{bs} << "/" << key << dendl;
2625         updated_status = false;
2626         while (!marker_tracker.can_do_op(key)) {
2627           if (!updated_status) {
2628             set_status() << "can't do op, conflicting inflight operation";
2629             updated_status = true;
2630           }
2631           ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2632           yield wait_for_child();
2633           bool again = true;
2634           while (again) {
2635             again = collect(&ret, nullptr);
2636             if (ret < 0) {
2637               ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2638               sync_status = ret;
2639               /* we have reported this error */
2640             }
2641           }
2642         }
2643         if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2644           set_status() << "can't do op, sync already in progress for object";
2645           ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2646           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2647           continue;
2648         }
2649         // yield {
2650           set_status() << "start object sync";
2651           if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2652             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2653           } else {
2654             uint64_t versioned_epoch = 0;
2655             rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2656             if (entry->ver.pool < 0) {
2657               versioned_epoch = entry->ver.epoch;
2658             }
2659             ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2660             using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2661             spawn(new SyncCR(sync_env, bucket_info, bs, key,
2662                              entry->is_versioned(), versioned_epoch,
2663                              entry->timestamp, owner, entry->op, entry->state,
2664                              cur_id, &marker_tracker, entry->zones_trace),
2665                   false);
2666           }
2667         // }
2668         while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2669           set_status() << "num_spawned() > spawn_window";
2670           yield wait_for_child();
2671           bool again = true;
2672           while (again) {
2673             again = collect(&ret, nullptr);
2674             if (ret < 0) {
2675               ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2676               sync_status = ret;
2677               /* we have reported this error */
2678             }
2679             /* not waiting for child here */
2680           }
2681         }
2682       }
2683     } while (!list_result.empty() && sync_status == 0);
2684
2685     if (syncstopped) {
2686       drain_all();
2687
2688       yield {
2689         const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
2690         RGWRados *store = sync_env->store;
2691         call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
2692       }
2693       lease_cr->abort();
2694       return set_cr_done();
2695     }
2696
2697     while (num_spawned()) {
2698       yield wait_for_child();
2699       bool again = true;
2700       while (again) {
2701         again = collect(&ret, nullptr);
2702         if (ret < 0) {
2703           ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2704           sync_status = ret;
2705           /* we have reported this error */
2706         }
2707         /* not waiting for child here */
2708       }
2709     }
2710
2711     yield call(marker_tracker.flush());
2712     if (retcode < 0) {
2713       ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2714       return set_cr_error(retcode);
2715     }
2716     if (sync_status < 0) {
2717       ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2718     }
2719
2720     /* wait for all operations to complete */
2721     drain_all();
2722
2723     if (sync_status < 0) {
2724       return set_cr_error(sync_status);
2725     }
2726
2727     return set_cr_done();
2728   }
2729   return 0;
2730 }
2731
2732 int RGWRunBucketSyncCoroutine::operate()
2733 {
2734   reenter(this) {
2735     yield {
2736       set_status("acquiring sync lock");
2737       auto store = sync_env->store;
2738       lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2739                                               rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2740                                               "sync_lock",
2741                                               cct->_conf->rgw_sync_lease_period,
2742                                               this));
2743       lease_stack.reset(spawn(lease_cr.get(), false));
2744     }
2745     while (!lease_cr->is_locked()) {
2746       if (lease_cr->is_done()) {
2747         ldout(cct, 5) << "lease cr failed, done early" << dendl;
2748         set_status("lease lock failed, early abort");
2749         return set_cr_error(lease_cr->get_ret_status());
2750       }
2751       set_sleeping(true);
2752       yield;
2753     }
2754
2755     yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2756     if (retcode < 0 && retcode != -ENOENT) {
2757       ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2758           << bucket_shard_str{bs} << dendl;
2759       lease_cr->go_down();
2760       drain_all();
2761       return set_cr_error(retcode);
2762     }
2763
2764     ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2765         << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2766
2767     yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2768     if (retcode == -ENOENT) {
2769       /* bucket instance info has not been synced in yet, fetch it now */
2770       yield {
2771         ldout(sync_env->cct, 10) << "no local info for bucket "
2772             << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2773         string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2774
2775         meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2776
2777         call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2778                                           string() /* no marker */,
2779                                           MDLOG_STATUS_COMPLETE,
2780                                           NULL /* no marker tracker */));
2781       }
2782       if (retcode < 0) {
2783         ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2784         lease_cr->go_down();
2785         drain_all();
2786         return set_cr_error(retcode);
2787       }
2788
2789       yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2790     }
2791     if (retcode < 0) {
2792       ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2793       lease_cr->go_down();
2794       drain_all();
2795       return set_cr_error(retcode);
2796     }
2797
2798     if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2799       yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2800       if (retcode < 0) {
2801         ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2802             << " failed, retcode=" << retcode << dendl;
2803         lease_cr->go_down();
2804         drain_all();
2805         return set_cr_error(retcode);
2806       }
2807     }
2808
2809     if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2810       yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2811                                               status_oid, lease_cr.get(),
2812                                               sync_status.full_marker));
2813       if (retcode < 0) {
2814         ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2815             << " failed, retcode=" << retcode << dendl;
2816         lease_cr->go_down();
2817         drain_all();
2818         return set_cr_error(retcode);
2819       }
2820       sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2821     }
2822
2823     if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2824       yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2825                                                      status_oid, lease_cr.get(),
2826                                                      sync_status.inc_marker));
2827       if (retcode < 0) {
2828         ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2829             << " failed, retcode=" << retcode << dendl;
2830         lease_cr->go_down();
2831         drain_all();
2832         return set_cr_error(retcode);
2833       }
2834     }
2835
2836     lease_cr->go_down();
2837     drain_all();
2838     return set_cr_done();
2839   }
2840
2841   return 0;
2842 }
2843
2844 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2845 {
2846   return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2847 }
2848
2849 int RGWBucketSyncStatusManager::init()
2850 {
2851   conn = store->get_zone_conn_by_id(source_zone);
2852   if (!conn) {
2853     ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2854     return -EINVAL;
2855   }
2856
2857   int ret = http_manager.set_threaded();
2858   if (ret < 0) {
2859     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2860     return ret;
2861   }
2862
2863
2864   const string key = bucket.get_key();
2865
2866   rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2867                                   { NULL, NULL } };
2868
2869   string path = string("/admin/metadata/bucket.instance");
2870
2871   bucket_instance_meta_info result;
2872   ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2873   if (ret < 0) {
2874     ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2875     return ret;
2876   }
2877
2878   RGWBucketInfo& bi = result.data.get_bucket_info();
2879   num_shards = bi.num_shards;
2880
2881   error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2882
2883   sync_module.reset(new RGWDefaultSyncModuleInstance());
2884
2885   int effective_num_shards = (num_shards ? num_shards : 1);
2886
2887   auto async_rados = store->get_async_rados();
2888
2889   for (int i = 0; i < effective_num_shards; i++) {
2890     RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2891     ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2892     if (ret < 0) {
2893       ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2894       return ret;
2895     }
2896     source_logs[i] = l;
2897   }
2898
2899   return 0;
2900 }
2901
2902 int RGWBucketSyncStatusManager::init_sync_status()
2903 {
2904   list<RGWCoroutinesStack *> stacks;
2905
2906   for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2907     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2908     RGWRemoteBucketLog *l = iter->second;
2909     stack->call(l->init_sync_status_cr());
2910
2911     stacks.push_back(stack);
2912   }
2913
2914   return cr_mgr.run(stacks);
2915 }
2916
2917 int RGWBucketSyncStatusManager::read_sync_status()
2918 {
2919   list<RGWCoroutinesStack *> stacks;
2920
2921   for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2922     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2923     RGWRemoteBucketLog *l = iter->second;
2924     stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2925
2926     stacks.push_back(stack);
2927   }
2928
2929   int ret = cr_mgr.run(stacks);
2930   if (ret < 0) {
2931     ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2932         << bucket_str{bucket} << dendl;
2933     return ret;
2934   }
2935
2936   return 0;
2937 }
2938
2939 int RGWBucketSyncStatusManager::run()
2940 {
2941   list<RGWCoroutinesStack *> stacks;
2942
2943   for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2944     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2945     RGWRemoteBucketLog *l = iter->second;
2946     stack->call(l->run_sync_cr());
2947
2948     stacks.push_back(stack);
2949   }
2950
2951   int ret = cr_mgr.run(stacks);
2952   if (ret < 0) {
2953     ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2954         << bucket_str{bucket} << dendl;
2955     return ret;
2956   }
2957
2958   return 0;
2959 }
2960
2961 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2962                                               const rgw_bucket_shard& bs)
2963 {
2964   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2965 }
2966
2967
2968 // TODO: move into rgw_data_sync_trim.cc
2969 #undef dout_prefix
2970 #define dout_prefix (*_dout << "data trim: ")
2971
2972 namespace {
2973
2974 /// return the marker that it's safe to trim up to
2975 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
2976 {
2977   return m.state == m.FullSync ? m.next_step_marker : m.marker;
2978 }
2979
2980 /// comparison operator for take_min_markers()
2981 bool operator<(const rgw_data_sync_marker& lhs,
2982                const rgw_data_sync_marker& rhs)
2983 {
2984   // sort by stable marker
2985   return get_stable_marker(lhs) < get_stable_marker(rhs);
2986 }
2987
2988 /// populate the container starting with 'dest' with the minimum stable marker
2989 /// of each shard for all of the peers in [first, last)
2990 template <typename IterIn, typename IterOut>
2991 void take_min_markers(IterIn first, IterIn last, IterOut dest)
2992 {
2993   if (first == last) {
2994     return;
2995   }
2996   // initialize markers with the first peer's
2997   auto m = dest;
2998   for (auto &shard : first->sync_markers) {
2999     *m = std::move(shard.second);
3000     ++m;
3001   }
3002   // for remaining peers, replace with smaller markers
3003   for (auto p = first + 1; p != last; ++p) {
3004     m = dest;
3005     for (auto &shard : p->sync_markers) {
3006       if (shard.second < *m) {
3007         *m = std::move(shard.second);
3008       }
3009       ++m;
3010     }
3011   }
3012 }
3013
3014 } // anonymous namespace
3015
3016 class DataLogTrimCR : public RGWCoroutine {
3017   RGWRados *store;
3018   RGWHTTPManager *http;
3019   const int num_shards;
3020   const std::string& zone_id; //< my zone id
3021   std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3022   std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3023   std::vector<std::string>& last_trim; //< last trimmed marker per shard
3024   int ret{0};
3025
3026  public:
3027   DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3028                    int num_shards, std::vector<std::string>& last_trim)
3029     : RGWCoroutine(store->ctx()), store(store), http(http),
3030       num_shards(num_shards),
3031       zone_id(store->get_zone().id),
3032       peer_status(store->zone_conn_map.size()),
3033       min_shard_markers(num_shards),
3034       last_trim(last_trim)
3035   {}
3036
3037   int operate() override;
3038 };
3039
3040 int DataLogTrimCR::operate()
3041 {
3042   reenter(this) {
3043     ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3044     set_status("fetching sync status");
3045     yield {
3046       // query data sync status from each sync peer
3047       rgw_http_param_pair params[] = {
3048         { "type", "data" },
3049         { "status", nullptr },
3050         { "source-zone", zone_id.c_str() },
3051         { nullptr, nullptr }
3052       };
3053
3054       auto p = peer_status.begin();
3055       for (auto& c : store->zone_conn_map) {
3056         ldout(cct, 20) << "query sync status from " << c.first << dendl;
3057         using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3058         spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3059               false);
3060         ++p;
3061       }
3062     }
3063
3064     // must get a successful reply from all peers to consider trimming
3065     ret = 0;
3066     while (ret == 0 && num_spawned() > 0) {
3067       yield wait_for_child();
3068       collect_next(&ret);
3069     }
3070     drain_all();
3071
3072     if (ret < 0) {
3073       ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3074       return set_cr_error(ret);
3075     }
3076
3077     ldout(cct, 10) << "trimming log shards" << dendl;
3078     set_status("trimming log shards");
3079     yield {
3080       // determine the minimum marker for each shard
3081       take_min_markers(peer_status.begin(), peer_status.end(),
3082                        min_shard_markers.begin());
3083
3084       for (int i = 0; i < num_shards; i++) {
3085         const auto& m = min_shard_markers[i];
3086         auto& stable = get_stable_marker(m);
3087         if (stable <= last_trim[i]) {
3088           continue;
3089         }
3090         ldout(cct, 10) << "trimming log shard " << i
3091             << " at marker=" << stable
3092             << " last_trim=" << last_trim[i] << dendl;
3093         using TrimCR = RGWSyncLogTrimCR;
3094         spawn(new TrimCR(store, store->data_log->get_oid(i),
3095                          stable, &last_trim[i]),
3096               true);
3097       }
3098     }
3099     return set_cr_done();
3100   }
3101   return 0;
3102 }
3103
3104 class DataLogTrimPollCR : public RGWCoroutine {
3105   RGWRados *store;
3106   RGWHTTPManager *http;
3107   const int num_shards;
3108   const utime_t interval; //< polling interval
3109   const std::string lock_oid; //< use first data log shard for lock
3110   const std::string lock_cookie;
3111   std::vector<std::string> last_trim; //< last trimmed marker per shard
3112
3113  public:
3114   DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3115                     int num_shards, utime_t interval)
3116     : RGWCoroutine(store->ctx()), store(store), http(http),
3117       num_shards(num_shards), interval(interval),
3118       lock_oid(store->data_log->get_oid(0)),
3119       lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3120       last_trim(num_shards)
3121   {}
3122
3123   int operate() override;
3124 };
3125
3126 int DataLogTrimPollCR::operate()
3127 {
3128   reenter(this) {
3129     for (;;) {
3130       set_status("sleeping");
3131       wait(interval);
3132
3133       // request a 'data_trim' lock that covers the entire wait interval to
3134       // prevent other gateways from attempting to trim for the duration
3135       set_status("acquiring trim lock");
3136       yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3137                                           rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3138                                           "data_trim", lock_cookie,
3139                                           interval.sec()));
3140       if (retcode < 0) {
3141         // if the lock is already held, go back to sleep and try again later
3142         ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3143             << interval.sec() << "s" << dendl;
3144         continue;
3145       }
3146
3147       set_status("trimming");
3148       yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3149
3150       // note that the lock is not released. this is intentional, as it avoids
3151       // duplicating this work in other gateways
3152     }
3153   }
3154   return 0;
3155 }
3156
3157 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3158                                       RGWHTTPManager *http,
3159                                       int num_shards, utime_t interval)
3160 {
3161   return new DataLogTrimPollCR(store, http, num_shards, interval);
3162 }