Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_rest_log.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation. See file COPYING.
12  *
13  */
14 #include "common/ceph_json.h"
15 #include "common/strtol.h"
16 #include "rgw_rest.h"
17 #include "rgw_op.h"
18 #include "rgw_rest_s3.h"
19 #include "rgw_rest_log.h"
20 #include "rgw_client_io.h"
21 #include "rgw_sync.h"
22 #include "rgw_data_sync.h"
23 #include "rgw_common.h"
24 #include "common/errno.h"
25 #include "include/assert.h"
26
27 #define dout_context g_ceph_context
28 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
29 #define dout_subsys ceph_subsys_rgw
30
31 static int parse_date_str(string& in, real_time& out) {
32   uint64_t epoch = 0;
33   uint64_t nsec = 0;
34
35   if (!in.empty()) {
36     if (utime_t::parse_date(in, &epoch, &nsec) < 0) {
37       dout(5) << "Error parsing date " << in << dendl;
38       return -EINVAL;
39     }
40   }
41   out = utime_t(epoch, nsec).to_real_time();
42   return 0;
43 }
44
45 void RGWOp_MDLog_List::execute() {
46   string   period = s->info.args.get("period");
47   string   shard = s->info.args.get("id");
48   string   max_entries_str = s->info.args.get("max-entries");
49   string   st = s->info.args.get("start-time"),
50            et = s->info.args.get("end-time"),
51            marker = s->info.args.get("marker"),
52            err;
53   real_time  ut_st, 
54              ut_et;
55   void    *handle;
56   unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
57
58   shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
59   if (!err.empty()) {
60     dout(5) << "Error parsing shard_id " << shard << dendl;
61     http_ret = -EINVAL;
62     return;
63   }
64
65   if (parse_date_str(st, ut_st) < 0) {
66     http_ret = -EINVAL;
67     return;
68   }
69
70   if (parse_date_str(et, ut_et) < 0) {
71     http_ret = -EINVAL;
72     return;
73   }
74
75   if (!max_entries_str.empty()) {
76     max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
77     if (!err.empty()) {
78       dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
79       http_ret = -EINVAL;
80       return;
81     }
82     if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
83       max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
84     }
85   } 
86
87   if (period.empty()) {
88     ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
89     period = store->get_current_period_id();
90     if (period.empty()) {
91       ldout(s->cct, 5) << "Missing period id" << dendl;
92       http_ret = -EINVAL;
93       return;
94     }
95   }
96
97   RGWMetadataLog meta_log{s->cct, store, period};
98
99   meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle);
100
101   http_ret = meta_log.list_entries(handle, max_entries, entries,
102                                    &last_marker, &truncated);
103
104   meta_log.complete_list_entries(handle);
105 }
106
107 void RGWOp_MDLog_List::send_response() {
108   set_req_state_err(s, http_ret);
109   dump_errno(s);
110   end_header(s);
111
112   if (http_ret < 0)
113     return;
114
115   s->formatter->open_object_section("log_entries");
116   s->formatter->dump_string("marker", last_marker);
117   s->formatter->dump_bool("truncated", truncated);
118   {
119     s->formatter->open_array_section("entries");
120     for (list<cls_log_entry>::iterator iter = entries.begin();
121          iter != entries.end(); ++iter) {
122       cls_log_entry& entry = *iter;
123       store->meta_mgr->dump_log_entry(entry, s->formatter);
124       flusher.flush();
125     }
126     s->formatter->close_section();
127   }
128   s->formatter->close_section();
129   flusher.flush();
130 }
131
132 void RGWOp_MDLog_Info::execute() {
133   num_objects = s->cct->_conf->rgw_md_log_max_shards;
134   period = store->meta_mgr->read_oldest_log_period();
135   http_ret = period.get_error();
136 }
137
138 void RGWOp_MDLog_Info::send_response() {
139   set_req_state_err(s, http_ret);
140   dump_errno(s);
141   end_header(s);
142
143   s->formatter->open_object_section("mdlog");
144   s->formatter->dump_unsigned("num_objects", num_objects);
145   if (period) {
146     s->formatter->dump_string("period", period.get_period().get_id());
147     s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
148   }
149   s->formatter->close_section();
150   flusher.flush();
151 }
152
153 void RGWOp_MDLog_ShardInfo::execute() {
154   string period = s->info.args.get("period");
155   string shard = s->info.args.get("id");
156   string err;
157
158   unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
159   if (!err.empty()) {
160     dout(5) << "Error parsing shard_id " << shard << dendl;
161     http_ret = -EINVAL;
162     return;
163   }
164
165   if (period.empty()) {
166     ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
167     period = store->get_current_period_id();
168
169     if (period.empty()) {
170       ldout(s->cct, 5) << "Missing period id" << dendl;
171       http_ret = -EINVAL;
172       return;
173     }
174   }
175   RGWMetadataLog meta_log{s->cct, store, period};
176
177   http_ret = meta_log.get_info(shard_id, &info);
178 }
179
180 void RGWOp_MDLog_ShardInfo::send_response() {
181   set_req_state_err(s, http_ret);
182   dump_errno(s);
183   end_header(s);
184
185   encode_json("info", info, s->formatter);
186   flusher.flush();
187 }
188
189 void RGWOp_MDLog_Delete::execute() {
190   string   st = s->info.args.get("start-time"),
191            et = s->info.args.get("end-time"),
192            start_marker = s->info.args.get("start-marker"),
193            end_marker = s->info.args.get("end-marker"),
194            period = s->info.args.get("period"),
195            shard = s->info.args.get("id"),
196            err;
197   real_time  ut_st, 
198              ut_et;
199   unsigned shard_id;
200
201   http_ret = 0;
202
203   shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
204   if (!err.empty()) {
205     dout(5) << "Error parsing shard_id " << shard << dendl;
206     http_ret = -EINVAL;
207     return;
208   }
209   if (et.empty() && end_marker.empty()) { /* bounding end */
210     http_ret = -EINVAL;
211     return;
212   }
213
214   if (parse_date_str(st, ut_st) < 0) {
215     http_ret = -EINVAL;
216     return;
217   }
218
219   if (parse_date_str(et, ut_et) < 0) {
220     http_ret = -EINVAL;
221     return;
222   }
223
224   if (period.empty()) {
225     ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
226     period = store->get_current_period_id();
227
228     if (period.empty()) {
229       ldout(s->cct, 5) << "Missing period id" << dendl;
230       http_ret = -EINVAL;
231       return;
232     }
233   }
234   RGWMetadataLog meta_log{s->cct, store, period};
235
236   http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
237 }
238
239 void RGWOp_MDLog_Lock::execute() {
240   string period, shard_id_str, duration_str, locker_id, zone_id;
241   unsigned shard_id;
242
243   http_ret = 0;
244
245   period       = s->info.args.get("period");
246   shard_id_str = s->info.args.get("id");
247   duration_str = s->info.args.get("length");
248   locker_id    = s->info.args.get("locker-id");
249   zone_id      = s->info.args.get("zone-id");
250
251   if (period.empty()) {
252     ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
253     period = store->get_current_period_id();
254   }
255
256   if (period.empty() ||
257       shard_id_str.empty() ||
258       (duration_str.empty()) ||
259       locker_id.empty() ||
260       zone_id.empty()) {
261     dout(5) << "Error invalid parameter list" << dendl;
262     http_ret = -EINVAL;
263     return;
264   }
265
266   string err;
267   shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
268   if (!err.empty()) {
269     dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
270     http_ret = -EINVAL;
271     return;
272   }
273
274   RGWMetadataLog meta_log{s->cct, store, period};
275   unsigned dur;
276   dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
277   if (!err.empty() || dur <= 0) {
278     dout(5) << "invalid length param " << duration_str << dendl;
279     http_ret = -EINVAL;
280     return;
281   }
282   http_ret = meta_log.lock_exclusive(shard_id, make_timespan(dur), zone_id,
283                                      locker_id);
284   if (http_ret == -EBUSY)
285     http_ret = -ERR_LOCKED;
286 }
287
288 void RGWOp_MDLog_Unlock::execute() {
289   string period, shard_id_str, locker_id, zone_id;
290   unsigned shard_id;
291
292   http_ret = 0;
293
294   period       = s->info.args.get("period");
295   shard_id_str = s->info.args.get("id");
296   locker_id    = s->info.args.get("locker-id");
297   zone_id      = s->info.args.get("zone-id");
298
299   if (period.empty()) {
300     ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
301     period = store->get_current_period_id();
302   }
303
304   if (period.empty() ||
305       shard_id_str.empty() ||
306       locker_id.empty() ||
307       zone_id.empty()) {
308     dout(5) << "Error invalid parameter list" << dendl;
309     http_ret = -EINVAL;
310     return;
311   }
312
313   string err;
314   shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
315   if (!err.empty()) {
316     dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
317     http_ret = -EINVAL;
318     return;
319   }
320
321   RGWMetadataLog meta_log{s->cct, store, period};
322   http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
323 }
324
325 void RGWOp_MDLog_Notify::execute() {
326   char *data;
327   int len = 0;
328 #define LARGE_ENOUGH_BUF (128 * 1024)
329   int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
330   if (r < 0) {
331     http_ret = r;
332     return;
333   }
334
335   ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
336
337   JSONParser p;
338   r = p.parse(data, len);
339   free(data);
340   if (r < 0) {
341     ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
342     http_ret = r;
343     return;
344   }
345
346   set<int> updated_shards;
347   try {
348     decode_json_obj(updated_shards, &p);
349   } catch (JSONDecoder::err& err) {
350     ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
351     http_ret = -EINVAL;
352     return;
353   }
354
355   if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
356     for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
357       ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
358     }
359   }
360
361   store->wakeup_meta_sync_shards(updated_shards);
362
363   http_ret = 0;
364 }
365
366 void RGWOp_BILog_List::execute() {
367   string tenant_name = s->info.args.get("tenant"),
368          bucket_name = s->info.args.get("bucket"),
369          marker = s->info.args.get("marker"),
370          max_entries_str = s->info.args.get("max-entries"),
371          bucket_instance = s->info.args.get("bucket-instance");
372   RGWBucketInfo bucket_info;
373   unsigned max_entries;
374
375   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
376
377   if (bucket_name.empty() && bucket_instance.empty()) {
378     dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
379     http_ret = -EINVAL;
380     return;
381   }
382
383   int shard_id;
384   http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
385   if (http_ret < 0) {
386     return;
387   }
388
389   if (!bucket_instance.empty()) {
390     http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
391     if (http_ret < 0) {
392       dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
393       return;
394     }
395   } else { /* !bucket_name.empty() */
396     http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
397     if (http_ret < 0) {
398       dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
399       return;
400     }
401   }
402
403   bool truncated;
404   unsigned count = 0;
405   string err;
406
407   max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
408   if (!err.empty())
409     max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
410
411   send_response();
412   do {
413     list<rgw_bi_log_entry> entries;
414     int ret = store->list_bi_log_entries(bucket_info, shard_id,
415                                           marker, max_entries - count, 
416                                           entries, &truncated);
417     if (ret < 0) {
418       dout(5) << "ERROR: list_bi_log_entries()" << dendl;
419       return;
420     }
421
422     count += entries.size();
423
424     send_response(entries, marker);
425   } while (truncated && count < max_entries);
426
427   send_response_end();
428 }
429
430 void RGWOp_BILog_List::send_response() {
431   if (sent_header)
432     return;
433
434   set_req_state_err(s, http_ret);
435   dump_errno(s);
436   end_header(s);
437
438   sent_header = true;
439
440   if (http_ret < 0)
441     return;
442
443   s->formatter->open_array_section("entries");
444 }
445
446 void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
447 {
448   for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
449     rgw_bi_log_entry& entry = *iter;
450     encode_json("entry", entry, s->formatter);
451
452     marker = entry.id;
453     flusher.flush();
454   }
455 }
456
457 void RGWOp_BILog_List::send_response_end() {
458   s->formatter->close_section();
459   flusher.flush();
460 }
461       
462 void RGWOp_BILog_Info::execute() {
463   string tenant_name = s->info.args.get("tenant"),
464          bucket_name = s->info.args.get("bucket"),
465          bucket_instance = s->info.args.get("bucket-instance");
466   RGWBucketInfo bucket_info;
467
468   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
469
470   if (bucket_name.empty() && bucket_instance.empty()) {
471     dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
472     http_ret = -EINVAL;
473     return;
474   }
475
476   int shard_id;
477   http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
478   if (http_ret < 0) {
479     return;
480   }
481
482   if (!bucket_instance.empty()) {
483     http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
484     if (http_ret < 0) {
485       dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
486       return;
487     }
488   } else { /* !bucket_name.empty() */
489     http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
490     if (http_ret < 0) {
491       dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
492       return;
493     }
494   }
495   map<RGWObjCategory, RGWStorageStats> stats;
496   int ret =  store->get_bucket_stats(bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
497   if (ret < 0 && ret != -ENOENT) {
498     http_ret = ret;
499     return;
500   }
501 }
502
503 void RGWOp_BILog_Info::send_response() {
504   set_req_state_err(s, http_ret);
505   dump_errno(s);
506   end_header(s);
507
508   if (http_ret < 0)
509     return;
510
511   s->formatter->open_object_section("info");
512   encode_json("bucket_ver", bucket_ver, s->formatter);
513   encode_json("master_ver", master_ver, s->formatter);
514   encode_json("max_marker", max_marker, s->formatter);
515   encode_json("syncstopped", syncstopped, s->formatter);
516   s->formatter->close_section();
517
518   flusher.flush();
519 }
520
521 void RGWOp_BILog_Delete::execute() {
522   string tenant_name = s->info.args.get("tenant"),
523          bucket_name = s->info.args.get("bucket"),
524          start_marker = s->info.args.get("start-marker"),
525          end_marker = s->info.args.get("end-marker"),
526          bucket_instance = s->info.args.get("bucket-instance");
527
528   RGWBucketInfo bucket_info;
529
530   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
531
532   http_ret = 0;
533   if ((bucket_name.empty() && bucket_instance.empty()) ||
534       end_marker.empty()) {
535     dout(5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
536     http_ret = -EINVAL;
537     return;
538   }
539
540   int shard_id;
541   http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
542   if (http_ret < 0) {
543     return;
544   }
545
546   if (!bucket_instance.empty()) {
547     http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
548     if (http_ret < 0) {
549       dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
550       return;
551     }
552   } else { /* !bucket_name.empty() */
553     http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
554     if (http_ret < 0) {
555       dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
556       return;
557     }
558   }
559   http_ret = store->trim_bi_log_entries(bucket_info, shard_id, start_marker, end_marker);
560   if (http_ret < 0) {
561     dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
562   }
563   return;
564 }
565
566 void RGWOp_DATALog_List::execute() {
567   string   shard = s->info.args.get("id");
568
569   string   st = s->info.args.get("start-time"),
570            et = s->info.args.get("end-time"),
571            max_entries_str = s->info.args.get("max-entries"),
572            marker = s->info.args.get("marker"),
573            err;
574   real_time  ut_st, 
575              ut_et;
576   unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
577
578   s->info.args.get_bool("extra-info", &extra_info, false);
579
580   shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
581   if (!err.empty()) {
582     dout(5) << "Error parsing shard_id " << shard << dendl;
583     http_ret = -EINVAL;
584     return;
585   }
586
587   if (parse_date_str(st, ut_st) < 0) {
588     http_ret = -EINVAL;
589     return;
590   }
591
592   if (parse_date_str(et, ut_et) < 0) {
593     http_ret = -EINVAL;
594     return;
595   }
596
597   if (!max_entries_str.empty()) {
598     max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
599     if (!err.empty()) {
600       dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
601       http_ret = -EINVAL;
602       return;
603     }
604     if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
605       max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
606     }
607   }
608
609   // Note that last_marker is updated to be the marker of the last
610   // entry listed
611   http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et,
612                                            max_entries, entries, marker,
613                                            &last_marker, &truncated);
614 }
615
616 void RGWOp_DATALog_List::send_response() {
617   set_req_state_err(s, http_ret);
618   dump_errno(s);
619   end_header(s);
620
621   if (http_ret < 0)
622     return;
623
624   s->formatter->open_object_section("log_entries");
625   s->formatter->dump_string("marker", last_marker);
626   s->formatter->dump_bool("truncated", truncated);
627   {
628     s->formatter->open_array_section("entries");
629     for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
630          iter != entries.end(); ++iter) {
631       rgw_data_change_log_entry& entry = *iter;
632       if (!extra_info) {
633         encode_json("entry", entry.entry, s->formatter);
634       } else {
635         encode_json("entry", entry, s->formatter);
636       }
637       flusher.flush();
638     }
639     s->formatter->close_section();
640   }
641   s->formatter->close_section();
642   flusher.flush();
643 }
644
645
646 void RGWOp_DATALog_Info::execute() {
647   num_objects = s->cct->_conf->rgw_data_log_num_shards;
648   http_ret = 0;
649 }
650
651 void RGWOp_DATALog_Info::send_response() {
652   set_req_state_err(s, http_ret);
653   dump_errno(s);
654   end_header(s);
655
656   s->formatter->open_object_section("num_objects");
657   s->formatter->dump_unsigned("num_objects", num_objects);
658   s->formatter->close_section();
659   flusher.flush();
660 }
661
662 void RGWOp_DATALog_ShardInfo::execute() {
663   string shard = s->info.args.get("id");
664   string err;
665
666   unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
667   if (!err.empty()) {
668     dout(5) << "Error parsing shard_id " << shard << dendl;
669     http_ret = -EINVAL;
670     return;
671   }
672
673   http_ret = store->data_log->get_info(shard_id, &info);
674 }
675
676 void RGWOp_DATALog_ShardInfo::send_response() {
677   set_req_state_err(s, http_ret);
678   dump_errno(s);
679   end_header(s);
680
681   encode_json("info", info, s->formatter);
682   flusher.flush();
683 }
684
685 void RGWOp_DATALog_Lock::execute() {
686   string shard_id_str, duration_str, locker_id, zone_id;
687   unsigned shard_id;
688
689   http_ret = 0;
690
691   shard_id_str = s->info.args.get("id");
692   duration_str = s->info.args.get("length");
693   locker_id    = s->info.args.get("locker-id");
694   zone_id      = s->info.args.get("zone-id");
695
696   if (shard_id_str.empty() ||
697       (duration_str.empty()) ||
698       locker_id.empty() ||
699       zone_id.empty()) {
700     dout(5) << "Error invalid parameter list" << dendl;
701     http_ret = -EINVAL;
702     return;
703   }
704
705   string err;
706   shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
707   if (!err.empty()) {
708     dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
709     http_ret = -EINVAL;
710     return;
711   }
712
713   unsigned dur;
714   dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
715   if (!err.empty() || dur <= 0) {
716     dout(5) << "invalid length param " << duration_str << dendl;
717     http_ret = -EINVAL;
718     return;
719   }
720   http_ret = store->data_log->lock_exclusive(shard_id, make_timespan(dur), zone_id, locker_id);
721   if (http_ret == -EBUSY)
722     http_ret = -ERR_LOCKED;
723 }
724
725 void RGWOp_DATALog_Unlock::execute() {
726   string shard_id_str, locker_id, zone_id;
727   unsigned shard_id;
728
729   http_ret = 0;
730
731   shard_id_str = s->info.args.get("id");
732   locker_id    = s->info.args.get("locker-id");
733   zone_id      = s->info.args.get("zone-id");
734
735   if (shard_id_str.empty() ||
736       locker_id.empty() ||
737       zone_id.empty()) {
738     dout(5) << "Error invalid parameter list" << dendl;
739     http_ret = -EINVAL;
740     return;
741   }
742
743   string err;
744   shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
745   if (!err.empty()) {
746     dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
747     http_ret = -EINVAL;
748     return;
749   }
750
751   http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
752 }
753
754 void RGWOp_DATALog_Notify::execute() {
755   string  source_zone = s->info.args.get("source-zone");
756   char *data;
757   int len = 0;
758 #define LARGE_ENOUGH_BUF (128 * 1024)
759   int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
760   if (r < 0) {
761     http_ret = r;
762     return;
763   }
764
765   ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
766
767   JSONParser p;
768   r = p.parse(data, len);
769   free(data);
770   if (r < 0) {
771     ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
772     http_ret = r;
773     return;
774   }
775
776   map<int, set<string> > updated_shards;
777   try {
778     decode_json_obj(updated_shards, &p);
779   } catch (JSONDecoder::err& err) {
780     ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
781     http_ret = -EINVAL;
782     return;
783   }
784
785   if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
786     for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
787       ldout(s->cct, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
788       set<string>& keys = iter->second;
789       for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
790       ldout(s->cct, 20) << __func__ << "(): modified key=" << *kiter << dendl;
791       }
792     }
793   }
794
795   store->wakeup_data_sync_shards(source_zone, updated_shards);
796
797   http_ret = 0;
798 }
799
800 void RGWOp_DATALog_Delete::execute() {
801   string   st = s->info.args.get("start-time"),
802            et = s->info.args.get("end-time"),
803            start_marker = s->info.args.get("start-marker"),
804            end_marker = s->info.args.get("end-marker"),
805            shard = s->info.args.get("id"),
806            err;
807   real_time  ut_st, 
808              ut_et;
809   unsigned shard_id;
810
811   http_ret = 0;
812
813   shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
814   if (!err.empty()) {
815     dout(5) << "Error parsing shard_id " << shard << dendl;
816     http_ret = -EINVAL;
817     return;
818   }
819   if (et.empty() && end_marker.empty()) { /* bounding end */
820     http_ret = -EINVAL;
821     return;
822   }
823
824   if (parse_date_str(st, ut_st) < 0) {
825     http_ret = -EINVAL;
826     return;
827   }
828
829   if (parse_date_str(et, ut_et) < 0) {
830     http_ret = -EINVAL;
831     return;
832   }
833
834   http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
835 }
836
837 // not in header to avoid pulling in rgw_sync.h
838 class RGWOp_MDLog_Status : public RGWRESTOp {
839   rgw_meta_sync_status status;
840 public:
841   int check_caps(RGWUserCaps& caps) override {
842     return caps.check_cap("mdlog", RGW_CAP_READ);
843   }
844   int verify_permission() override {
845     return check_caps(s->user->caps);
846   }
847   void execute() override;
848   void send_response() override;
849   const string name() override { return "get_metadata_log_status"; }
850 };
851
852 void RGWOp_MDLog_Status::execute()
853 {
854   auto sync = store->get_meta_sync_manager();
855   if (sync == nullptr) {
856     ldout(s->cct, 1) << "no sync manager" << dendl;
857     http_ret = -ENOENT;
858     return;
859   }
860   http_ret = sync->read_sync_status(&status);
861 }
862
863 void RGWOp_MDLog_Status::send_response()
864 {
865   set_req_state_err(s, http_ret);
866   dump_errno(s);
867   end_header(s);
868
869   if (http_ret >= 0) {
870     encode_json("status", status, s->formatter);
871   }
872   flusher.flush();
873 }
874
875 // not in header to avoid pulling in rgw_data_sync.h
876 class RGWOp_DATALog_Status : public RGWRESTOp {
877   rgw_data_sync_status status;
878 public:
879   int check_caps(RGWUserCaps& caps) override {
880     return caps.check_cap("datalog", RGW_CAP_READ);
881   }
882   int verify_permission() override {
883     return check_caps(s->user->caps);
884   }
885   void execute() override ;
886   void send_response() override;
887   const string name() override { return "get_data_changes_log_status"; }
888 };
889
890 void RGWOp_DATALog_Status::execute()
891 {
892   const auto source_zone = s->info.args.get("source-zone");
893   auto sync = store->get_data_sync_manager(source_zone);
894   if (sync == nullptr) {
895     ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl;
896     http_ret = -ENOENT;
897     return;
898   }
899   http_ret = sync->read_sync_status(&status);
900 }
901
902 void RGWOp_DATALog_Status::send_response()
903 {
904   set_req_state_err(s, http_ret);
905   dump_errno(s);
906   end_header(s);
907
908   if (http_ret >= 0) {
909     encode_json("status", status, s->formatter);
910   }
911   flusher.flush();
912 }
913
914
915 RGWOp *RGWHandler_Log::op_get() {
916   bool exists;
917   string type = s->info.args.get("type", &exists);
918
919   if (!exists) {
920     return NULL;
921   }
922
923   if (type.compare("metadata") == 0) {
924     if (s->info.args.exists("id")) {
925       if (s->info.args.exists("info")) {
926         return new RGWOp_MDLog_ShardInfo;
927       } else {
928         return new RGWOp_MDLog_List;
929       }
930     } else if (s->info.args.exists("status")) {
931       return new RGWOp_MDLog_Status;
932     } else {
933       return new RGWOp_MDLog_Info;
934     }
935   } else if (type.compare("bucket-index") == 0) {
936     if (s->info.args.exists("info")) {
937       return new RGWOp_BILog_Info;
938     } else {
939       return new RGWOp_BILog_List;
940     }
941   } else if (type.compare("data") == 0) {
942     if (s->info.args.exists("id")) {
943       if (s->info.args.exists("info")) {
944         return new RGWOp_DATALog_ShardInfo;
945       } else {
946         return new RGWOp_DATALog_List;
947       }
948     } else if (s->info.args.exists("status")) {
949       return new RGWOp_DATALog_Status;
950     } else {
951       return new RGWOp_DATALog_Info;
952     }
953   }
954   return NULL;
955 }
956
957 RGWOp *RGWHandler_Log::op_delete() {
958   bool exists;
959   string type = s->info.args.get("type", &exists);
960
961   if (!exists) {
962     return NULL;
963   }
964
965   if (type.compare("metadata") == 0)
966     return new RGWOp_MDLog_Delete;
967   else if (type.compare("bucket-index") == 0) 
968     return new RGWOp_BILog_Delete;
969   else if (type.compare("data") == 0)
970     return new RGWOp_DATALog_Delete;
971   return NULL;
972 }
973
974 RGWOp *RGWHandler_Log::op_post() {
975   bool exists;
976   string type = s->info.args.get("type", &exists);
977
978   if (!exists) {
979     return NULL;
980   }
981
982   if (type.compare("metadata") == 0) {
983     if (s->info.args.exists("lock"))
984       return new RGWOp_MDLog_Lock;
985     else if (s->info.args.exists("unlock"))
986       return new RGWOp_MDLog_Unlock;
987     else if (s->info.args.exists("notify"))
988       return new RGWOp_MDLog_Notify;        
989   } else if (type.compare("data") == 0) {
990     if (s->info.args.exists("lock"))
991       return new RGWOp_DATALog_Lock;
992     else if (s->info.args.exists("unlock"))
993       return new RGWOp_DATALog_Unlock;
994     else if (s->info.args.exists("notify"))
995       return new RGWOp_DATALog_Notify;      
996   }
997   return NULL;
998 }
999