Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_data_sync.h
1 #ifndef CEPH_RGW_DATA_SYNC_H
2 #define CEPH_RGW_DATA_SYNC_H
3
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_bucket.h"
7
8 #include "rgw_sync_module.h"
9
10 #include "common/RWLock.h"
11 #include "common/ceph_json.h"
12
13
14 struct rgw_datalog_info {
15   uint32_t num_shards;
16
17   rgw_datalog_info() : num_shards(0) {}
18
19   void decode_json(JSONObj *obj);
20 };
21
22 struct rgw_data_sync_info {
23   enum SyncState {
24     StateInit = 0,
25     StateBuildingFullSyncMaps = 1,
26     StateSync = 2,
27   };
28
29   uint16_t state;
30   uint32_t num_shards;
31
32   uint64_t instance_id{0};
33
34   void encode(bufferlist& bl) const {
35     ENCODE_START(2, 1, bl);
36     ::encode(state, bl);
37     ::encode(num_shards, bl);
38     ::encode(instance_id, bl);
39     ENCODE_FINISH(bl);
40   }
41
42   void decode(bufferlist::iterator& bl) {
43      DECODE_START(2, bl);
44      ::decode(state, bl);
45      ::decode(num_shards, bl);
46      if (struct_v >= 2) {
47        ::decode(instance_id, bl);
48      }
49      DECODE_FINISH(bl);
50   }
51
52   void dump(Formatter *f) const {
53     string s;
54     switch ((SyncState)state) {
55       case StateInit:
56         s = "init";
57         break;
58       case StateBuildingFullSyncMaps:
59         s = "building-full-sync-maps";
60         break;
61       case StateSync:
62         s = "sync";
63         break;
64       default:
65         s = "unknown";
66         break;
67     }
68     encode_json("status", s, f);
69     encode_json("num_shards", num_shards, f);
70     encode_json("instance_id", instance_id, f);
71   }
72   void decode_json(JSONObj *obj) {
73     std::string s;
74     JSONDecoder::decode_json("status", s, obj);
75     if (s == "building-full-sync-maps") {
76       state = StateBuildingFullSyncMaps;
77     } else if (s == "sync") {
78       state = StateSync;
79     } else {
80       state = StateInit;
81     }
82     JSONDecoder::decode_json("num_shards", num_shards, obj);
83     JSONDecoder::decode_json("instance_id", num_shards, obj);
84   }
85
86   rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
87 };
88 WRITE_CLASS_ENCODER(rgw_data_sync_info)
89
90 struct rgw_data_sync_marker {
91   enum SyncState {
92     FullSync = 0,
93     IncrementalSync = 1,
94   };
95   uint16_t state;
96   string marker;
97   string next_step_marker;
98   uint64_t total_entries;
99   uint64_t pos;
100   real_time timestamp;
101
102   rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
103
104   void encode(bufferlist& bl) const {
105     ENCODE_START(1, 1, bl);
106     ::encode(state, bl);
107     ::encode(marker, bl);
108     ::encode(next_step_marker, bl);
109     ::encode(total_entries, bl);
110     ::encode(pos, bl);
111     ::encode(timestamp, bl);
112     ENCODE_FINISH(bl);
113   }
114
115   void decode(bufferlist::iterator& bl) {
116      DECODE_START(1, bl);
117     ::decode(state, bl);
118     ::decode(marker, bl);
119     ::decode(next_step_marker, bl);
120     ::decode(total_entries, bl);
121     ::decode(pos, bl);
122     ::decode(timestamp, bl);
123      DECODE_FINISH(bl);
124   }
125
126   void dump(Formatter *f) const {
127     encode_json("state", (int)state, f);
128     encode_json("marker", marker, f);
129     encode_json("next_step_marker", next_step_marker, f);
130     encode_json("total_entries", total_entries, f);
131     encode_json("pos", pos, f);
132     encode_json("timestamp", utime_t(timestamp), f);
133   }
134   void decode_json(JSONObj *obj) {
135     int s;
136     JSONDecoder::decode_json("state", s, obj);
137     state = s;
138     JSONDecoder::decode_json("marker", marker, obj);
139     JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
140     JSONDecoder::decode_json("total_entries", total_entries, obj);
141     JSONDecoder::decode_json("pos", pos, obj);
142     utime_t t;
143     JSONDecoder::decode_json("timestamp", t, obj);
144     timestamp = t.to_real_time();
145   }
146 };
147 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
148
149 struct rgw_data_sync_status {
150   rgw_data_sync_info sync_info;
151   map<uint32_t, rgw_data_sync_marker> sync_markers;
152
153   rgw_data_sync_status() {}
154
155   void encode(bufferlist& bl) const {
156     ENCODE_START(1, 1, bl);
157     ::encode(sync_info, bl);
158     /* sync markers are encoded separately */
159     ENCODE_FINISH(bl);
160   }
161
162   void decode(bufferlist::iterator& bl) {
163      DECODE_START(1, bl);
164     ::decode(sync_info, bl);
165     /* sync markers are decoded separately */
166      DECODE_FINISH(bl);
167   }
168
169   void dump(Formatter *f) const {
170     encode_json("info", sync_info, f);
171     encode_json("markers", sync_markers, f);
172   }
173   void decode_json(JSONObj *obj) {
174     JSONDecoder::decode_json("info", sync_info, obj);
175     JSONDecoder::decode_json("markers", sync_markers, obj);
176   }
177 };
178 WRITE_CLASS_ENCODER(rgw_data_sync_status)
179
180 struct rgw_datalog_entry {
181   string key;
182   ceph::real_time timestamp;
183
184   void decode_json(JSONObj *obj);
185 };
186
187 struct rgw_datalog_shard_data {
188   string marker;
189   bool truncated;
190   vector<rgw_datalog_entry> entries;
191
192   void decode_json(JSONObj *obj);
193 };
194
195 class RGWAsyncRadosProcessor;
196 class RGWDataSyncControlCR;
197
198 struct rgw_bucket_entry_owner {
199   string id;
200   string display_name;
201
202   rgw_bucket_entry_owner() {}
203   rgw_bucket_entry_owner(const string& _id, const string& _display_name) : id(_id), display_name(_display_name) {}
204
205   void decode_json(JSONObj *obj);
206 };
207
208 class RGWSyncErrorLogger;
209
210 struct RGWDataSyncEnv {
211   CephContext *cct;
212   RGWRados *store;
213   RGWRESTConn *conn;
214   RGWAsyncRadosProcessor *async_rados;
215   RGWHTTPManager *http_manager;
216   RGWSyncErrorLogger *error_logger;
217   string source_zone;
218   RGWSyncModuleInstanceRef sync_module;
219
220   RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
221
222   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
223             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
224             RGWSyncErrorLogger *_error_logger, const string& _source_zone,
225             RGWSyncModuleInstanceRef& _sync_module) {
226     cct = _cct;
227     store = _store;
228     conn = _conn;
229     async_rados = _async_rados;
230     http_manager = _http_manager;
231     error_logger = _error_logger;
232     source_zone = _source_zone;
233     sync_module = _sync_module;
234   }
235
236   string shard_obj_name(int shard_id);
237   string status_oid();
238 };
239
240 class RGWRemoteDataLog : public RGWCoroutinesManager {
241   RGWRados *store;
242   RGWAsyncRadosProcessor *async_rados;
243   RGWHTTPManager http_manager;
244
245   RGWDataSyncEnv sync_env;
246
247   RWLock lock;
248   RGWDataSyncControlCR *data_sync_cr;
249
250   bool initialized;
251
252 public:
253   RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
254     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
255       store(_store), async_rados(async_rados),
256       http_manager(store->ctx(), completion_mgr),
257       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
258       initialized(false) {}
259   int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& module);
260   void finish();
261
262   int read_log_info(rgw_datalog_info *log_info);
263   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
264   int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
265   int read_sync_status(rgw_data_sync_status *sync_status);
266   int init_sync_status(int num_shards);
267   int run_sync(int num_shards);
268
269   void wakeup(int shard_id, set<string>& keys);
270 };
271
272 class RGWDataSyncStatusManager {
273   RGWRados *store;
274   rgw_rados_ref ref;
275
276   string source_zone;
277   RGWRESTConn *conn;
278   RGWSyncErrorLogger *error_logger;
279   RGWSyncModuleInstanceRef sync_module;
280
281   RGWRemoteDataLog source_log;
282
283   string source_status_oid;
284   string source_shard_status_oid_prefix;
285
286   map<int, rgw_raw_obj> shard_objs;
287
288   int num_shards;
289
290 public:
291   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
292                            const string& _source_zone)
293     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
294       sync_module(nullptr),
295       source_log(store, async_rados), num_shards(0) {}
296   ~RGWDataSyncStatusManager() {
297     finalize();
298   }
299   int init();
300   void finalize();
301
302   static string shard_obj_name(const string& source_zone, int shard_id);
303   static string sync_status_oid(const string& source_zone);
304
305   int read_sync_status(rgw_data_sync_status *sync_status) {
306     return source_log.read_sync_status(sync_status);
307   }
308   int init_sync_status() { return source_log.init_sync_status(num_shards); }
309
310   int read_log_info(rgw_datalog_info *log_info) {
311     return source_log.read_log_info(log_info);
312   }
313   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
314     return source_log.read_source_log_shards_info(shards_info);
315   }
316   int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
317     return source_log.read_source_log_shards_next(shard_markers, result);
318   }
319
320   int run() { return source_log.run_sync(num_shards); }
321
322   void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
323   void stop() {
324     source_log.finish();
325   }
326 };
327
328 class RGWBucketSyncStatusManager;
329 class RGWBucketSyncCR;
330
331 struct rgw_bucket_shard_full_sync_marker {
332   rgw_obj_key position;
333   uint64_t count;
334
335   rgw_bucket_shard_full_sync_marker() : count(0) {}
336
337   void encode_attr(map<string, bufferlist>& attrs);
338
339   void encode(bufferlist& bl) const {
340     ENCODE_START(1, 1, bl);
341     ::encode(position, bl);
342     ::encode(count, bl);
343     ENCODE_FINISH(bl);
344   }
345
346   void decode(bufferlist::iterator& bl) {
347      DECODE_START(1, bl);
348     ::decode(position, bl);
349     ::decode(count, bl);
350      DECODE_FINISH(bl);
351   }
352
353   void dump(Formatter *f) const {
354     encode_json("position", position, f);
355     encode_json("count", count, f);
356   }
357 };
358 WRITE_CLASS_ENCODER(rgw_bucket_shard_full_sync_marker)
359
360 struct rgw_bucket_shard_inc_sync_marker {
361   string position;
362
363   rgw_bucket_shard_inc_sync_marker() {}
364
365   void encode_attr(map<string, bufferlist>& attrs);
366
367   void encode(bufferlist& bl) const {
368     ENCODE_START(1, 1, bl);
369     ::encode(position, bl);
370     ENCODE_FINISH(bl);
371   }
372
373   void decode(bufferlist::iterator& bl) {
374      DECODE_START(1, bl);
375     ::decode(position, bl);
376      DECODE_FINISH(bl);
377   }
378
379   void dump(Formatter *f) const {
380     encode_json("position", position, f);
381   }
382
383   bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
384     return (position < m.position);
385   }
386 };
387 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
388
389 struct rgw_bucket_shard_sync_info {
390   enum SyncState {
391     StateInit = 0,
392     StateFullSync = 1,
393     StateIncrementalSync = 2,
394   };
395
396   uint16_t state;
397   rgw_bucket_shard_full_sync_marker full_marker;
398   rgw_bucket_shard_inc_sync_marker inc_marker;
399
400   void decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs);
401   void encode_all_attrs(map<string, bufferlist>& attrs);
402   void encode_state_attr(map<string, bufferlist>& attrs);
403
404   void encode(bufferlist& bl) const {
405     ENCODE_START(1, 1, bl);
406     ::encode(state, bl);
407     ::encode(full_marker, bl);
408     ::encode(inc_marker, bl);
409     ENCODE_FINISH(bl);
410   }
411
412   void decode(bufferlist::iterator& bl) {
413      DECODE_START(1, bl);
414      ::decode(state, bl);
415      ::decode(full_marker, bl);
416      ::decode(inc_marker, bl);
417      DECODE_FINISH(bl);
418   }
419
420   void dump(Formatter *f) const {
421     string s;
422     switch ((SyncState)state) {
423       case StateInit:
424         s = "init";
425         break;
426       case StateFullSync:
427         s = "full-sync";
428         break;
429       case StateIncrementalSync:
430         s = "incremental-sync";
431         break;
432       default:
433         s = "unknown";
434         break;
435     }
436     encode_json("status", s, f);
437     encode_json("full_marker", full_marker, f);
438     encode_json("inc_marker", inc_marker, f);
439   }
440
441   rgw_bucket_shard_sync_info() : state((int)StateInit) {}
442
443 };
444 WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
445
446
447 class RGWRemoteBucketLog : public RGWCoroutinesManager {
448   RGWRados *store;
449   RGWRESTConn *conn{nullptr};
450   string source_zone;
451   rgw_bucket_shard bs;
452
453   RGWBucketSyncStatusManager *status_manager;
454   RGWAsyncRadosProcessor *async_rados;
455   RGWHTTPManager *http_manager;
456
457   RGWDataSyncEnv sync_env;
458   rgw_bucket_shard_sync_info init_status;
459
460   RGWBucketSyncCR *sync_cr{nullptr};
461
462 public:
463   RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
464                      RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
465                                        status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {}
466
467   int init(const string& _source_zone, RGWRESTConn *_conn,
468            const rgw_bucket& bucket, int shard_id,
469            RGWSyncErrorLogger *_error_logger,
470            RGWSyncModuleInstanceRef& _sync_module);
471   void finish();
472
473   RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status);
474   RGWCoroutine *init_sync_status_cr();
475   RGWCoroutine *run_sync_cr();
476
477   void wakeup();
478 };
479
480 class RGWBucketSyncStatusManager {
481   RGWRados *store;
482
483   RGWCoroutinesManager cr_mgr;
484
485   RGWHTTPManager http_manager;
486
487   string source_zone;
488   RGWRESTConn *conn;
489   RGWSyncErrorLogger *error_logger;
490   RGWSyncModuleInstanceRef sync_module;
491
492   rgw_bucket bucket;
493
494   map<int, RGWRemoteBucketLog *> source_logs;
495
496   string source_status_oid;
497   string source_shard_status_oid_prefix;
498
499   map<int, rgw_bucket_shard_sync_info> sync_status;
500   rgw_raw_obj status_obj;
501
502   int num_shards;
503
504 public:
505   RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
506                              const rgw_bucket& bucket) : store(_store),
507                                                                                      cr_mgr(_store->ctx(), _store->get_cr_registry()),
508                                                                                      http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
509                                                                                      source_zone(_source_zone),
510                                                                                      conn(NULL), error_logger(NULL),
511                                                                                      bucket(bucket),
512                                                                                      num_shards(0) {}
513   ~RGWBucketSyncStatusManager();
514
515   int init();
516
517   map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
518   int init_sync_status();
519
520   static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
521
522   int read_sync_status();
523   int run();
524 };
525
526 class RGWDefaultSyncModule : public RGWSyncModule {
527 public:
528   RGWDefaultSyncModule() {}
529   bool supports_data_export() override { return true; }
530   int create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) override;
531 };
532
533 // DataLogTrimCR factory function
534 extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
535                                              RGWHTTPManager *http,
536                                              int num_shards, utime_t interval);
537
538 #endif