1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "common/ceph_json.h"
15 #include "common/strtol.h"
18 #include "rgw_rest_s3.h"
19 #include "rgw_rest_log.h"
20 #include "rgw_client_io.h"
22 #include "rgw_data_sync.h"
23 #include "rgw_common.h"
24 #include "common/errno.h"
25 #include "include/assert.h"
27 #define dout_context g_ceph_context
28 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
29 #define dout_subsys ceph_subsys_rgw
31 static int parse_date_str(string& in, real_time& out) {
36 if (utime_t::parse_date(in, &epoch, &nsec) < 0) {
37 dout(5) << "Error parsing date " << in << dendl;
41 out = utime_t(epoch, nsec).to_real_time();
45 void RGWOp_MDLog_List::execute() {
46 string period = s->info.args.get("period");
47 string shard = s->info.args.get("id");
48 string max_entries_str = s->info.args.get("max-entries");
49 string st = s->info.args.get("start-time"),
50 et = s->info.args.get("end-time"),
51 marker = s->info.args.get("marker"),
56 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
58 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
60 dout(5) << "Error parsing shard_id " << shard << dendl;
65 if (parse_date_str(st, ut_st) < 0) {
70 if (parse_date_str(et, ut_et) < 0) {
75 if (!max_entries_str.empty()) {
76 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
78 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
82 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
83 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
88 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
89 period = store->get_current_period_id();
91 ldout(s->cct, 5) << "Missing period id" << dendl;
97 RGWMetadataLog meta_log{s->cct, store, period};
99 meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle);
101 http_ret = meta_log.list_entries(handle, max_entries, entries,
102 &last_marker, &truncated);
104 meta_log.complete_list_entries(handle);
107 void RGWOp_MDLog_List::send_response() {
108 set_req_state_err(s, http_ret);
115 s->formatter->open_object_section("log_entries");
116 s->formatter->dump_string("marker", last_marker);
117 s->formatter->dump_bool("truncated", truncated);
119 s->formatter->open_array_section("entries");
120 for (list<cls_log_entry>::iterator iter = entries.begin();
121 iter != entries.end(); ++iter) {
122 cls_log_entry& entry = *iter;
123 store->meta_mgr->dump_log_entry(entry, s->formatter);
126 s->formatter->close_section();
128 s->formatter->close_section();
132 void RGWOp_MDLog_Info::execute() {
133 num_objects = s->cct->_conf->rgw_md_log_max_shards;
134 period = store->meta_mgr->read_oldest_log_period();
135 http_ret = period.get_error();
138 void RGWOp_MDLog_Info::send_response() {
139 set_req_state_err(s, http_ret);
143 s->formatter->open_object_section("mdlog");
144 s->formatter->dump_unsigned("num_objects", num_objects);
146 s->formatter->dump_string("period", period.get_period().get_id());
147 s->formatter->dump_unsigned("realm_epoch", period.get_epoch());
149 s->formatter->close_section();
153 void RGWOp_MDLog_ShardInfo::execute() {
154 string period = s->info.args.get("period");
155 string shard = s->info.args.get("id");
158 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
160 dout(5) << "Error parsing shard_id " << shard << dendl;
165 if (period.empty()) {
166 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
167 period = store->get_current_period_id();
169 if (period.empty()) {
170 ldout(s->cct, 5) << "Missing period id" << dendl;
175 RGWMetadataLog meta_log{s->cct, store, period};
177 http_ret = meta_log.get_info(shard_id, &info);
180 void RGWOp_MDLog_ShardInfo::send_response() {
181 set_req_state_err(s, http_ret);
185 encode_json("info", info, s->formatter);
189 void RGWOp_MDLog_Delete::execute() {
190 string st = s->info.args.get("start-time"),
191 et = s->info.args.get("end-time"),
192 start_marker = s->info.args.get("start-marker"),
193 end_marker = s->info.args.get("end-marker"),
194 period = s->info.args.get("period"),
195 shard = s->info.args.get("id"),
203 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
205 dout(5) << "Error parsing shard_id " << shard << dendl;
209 if (et.empty() && end_marker.empty()) { /* bounding end */
214 if (parse_date_str(st, ut_st) < 0) {
219 if (parse_date_str(et, ut_et) < 0) {
224 if (period.empty()) {
225 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
226 period = store->get_current_period_id();
228 if (period.empty()) {
229 ldout(s->cct, 5) << "Missing period id" << dendl;
234 RGWMetadataLog meta_log{s->cct, store, period};
236 http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
239 void RGWOp_MDLog_Lock::execute() {
240 string period, shard_id_str, duration_str, locker_id, zone_id;
245 period = s->info.args.get("period");
246 shard_id_str = s->info.args.get("id");
247 duration_str = s->info.args.get("length");
248 locker_id = s->info.args.get("locker-id");
249 zone_id = s->info.args.get("zone-id");
251 if (period.empty()) {
252 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
253 period = store->get_current_period_id();
256 if (period.empty() ||
257 shard_id_str.empty() ||
258 (duration_str.empty()) ||
261 dout(5) << "Error invalid parameter list" << dendl;
267 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
269 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
274 RGWMetadataLog meta_log{s->cct, store, period};
276 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
277 if (!err.empty() || dur <= 0) {
278 dout(5) << "invalid length param " << duration_str << dendl;
282 http_ret = meta_log.lock_exclusive(shard_id, make_timespan(dur), zone_id,
284 if (http_ret == -EBUSY)
285 http_ret = -ERR_LOCKED;
288 void RGWOp_MDLog_Unlock::execute() {
289 string period, shard_id_str, locker_id, zone_id;
294 period = s->info.args.get("period");
295 shard_id_str = s->info.args.get("id");
296 locker_id = s->info.args.get("locker-id");
297 zone_id = s->info.args.get("zone-id");
299 if (period.empty()) {
300 ldout(s->cct, 5) << "Missing period id trying to use current" << dendl;
301 period = store->get_current_period_id();
304 if (period.empty() ||
305 shard_id_str.empty() ||
308 dout(5) << "Error invalid parameter list" << dendl;
314 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
316 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
321 RGWMetadataLog meta_log{s->cct, store, period};
322 http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
325 void RGWOp_MDLog_Notify::execute() {
328 #define LARGE_ENOUGH_BUF (128 * 1024)
329 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
335 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
338 r = p.parse(data, len);
341 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
346 set<int> updated_shards;
348 decode_json_obj(updated_shards, &p);
349 } catch (JSONDecoder::err& err) {
350 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
355 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
356 for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
357 ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
361 store->wakeup_meta_sync_shards(updated_shards);
366 void RGWOp_BILog_List::execute() {
367 string tenant_name = s->info.args.get("tenant"),
368 bucket_name = s->info.args.get("bucket"),
369 marker = s->info.args.get("marker"),
370 max_entries_str = s->info.args.get("max-entries"),
371 bucket_instance = s->info.args.get("bucket-instance");
372 RGWBucketInfo bucket_info;
373 unsigned max_entries;
375 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
377 if (bucket_name.empty() && bucket_instance.empty()) {
378 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
384 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
389 if (!bucket_instance.empty()) {
390 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
392 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
395 } else { /* !bucket_name.empty() */
396 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
398 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
407 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
409 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
413 list<rgw_bi_log_entry> entries;
414 int ret = store->list_bi_log_entries(bucket_info, shard_id,
415 marker, max_entries - count,
416 entries, &truncated);
418 dout(5) << "ERROR: list_bi_log_entries()" << dendl;
422 count += entries.size();
424 send_response(entries, marker);
425 } while (truncated && count < max_entries);
430 void RGWOp_BILog_List::send_response() {
434 set_req_state_err(s, http_ret);
443 s->formatter->open_array_section("entries");
446 void RGWOp_BILog_List::send_response(list<rgw_bi_log_entry>& entries, string& marker)
448 for (list<rgw_bi_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
449 rgw_bi_log_entry& entry = *iter;
450 encode_json("entry", entry, s->formatter);
457 void RGWOp_BILog_List::send_response_end() {
458 s->formatter->close_section();
462 void RGWOp_BILog_Info::execute() {
463 string tenant_name = s->info.args.get("tenant"),
464 bucket_name = s->info.args.get("bucket"),
465 bucket_instance = s->info.args.get("bucket-instance");
466 RGWBucketInfo bucket_info;
468 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
470 if (bucket_name.empty() && bucket_instance.empty()) {
471 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl;
477 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
482 if (!bucket_instance.empty()) {
483 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
485 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
488 } else { /* !bucket_name.empty() */
489 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
491 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
495 map<RGWObjCategory, RGWStorageStats> stats;
496 int ret = store->get_bucket_stats(bucket_info, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
497 if (ret < 0 && ret != -ENOENT) {
503 void RGWOp_BILog_Info::send_response() {
504 set_req_state_err(s, http_ret);
511 s->formatter->open_object_section("info");
512 encode_json("bucket_ver", bucket_ver, s->formatter);
513 encode_json("master_ver", master_ver, s->formatter);
514 encode_json("max_marker", max_marker, s->formatter);
515 encode_json("syncstopped", syncstopped, s->formatter);
516 s->formatter->close_section();
521 void RGWOp_BILog_Delete::execute() {
522 string tenant_name = s->info.args.get("tenant"),
523 bucket_name = s->info.args.get("bucket"),
524 start_marker = s->info.args.get("start-marker"),
525 end_marker = s->info.args.get("end-marker"),
526 bucket_instance = s->info.args.get("bucket-instance");
528 RGWBucketInfo bucket_info;
530 RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
533 if ((bucket_name.empty() && bucket_instance.empty()) ||
534 end_marker.empty()) {
535 dout(5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl;
541 http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
546 if (!bucket_instance.empty()) {
547 http_ret = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, NULL, NULL);
549 dout(5) << "could not get bucket instance info for bucket instance id=" << bucket_instance << dendl;
552 } else { /* !bucket_name.empty() */
553 http_ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL, NULL);
555 dout(5) << "could not get bucket info for bucket=" << bucket_name << dendl;
559 http_ret = store->trim_bi_log_entries(bucket_info, shard_id, start_marker, end_marker);
561 dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
566 void RGWOp_DATALog_List::execute() {
567 string shard = s->info.args.get("id");
569 string st = s->info.args.get("start-time"),
570 et = s->info.args.get("end-time"),
571 max_entries_str = s->info.args.get("max-entries"),
572 marker = s->info.args.get("marker"),
576 unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
578 s->info.args.get_bool("extra-info", &extra_info, false);
580 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
582 dout(5) << "Error parsing shard_id " << shard << dendl;
587 if (parse_date_str(st, ut_st) < 0) {
592 if (parse_date_str(et, ut_et) < 0) {
597 if (!max_entries_str.empty()) {
598 max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
600 dout(5) << "Error parsing max-entries " << max_entries_str << dendl;
604 if (max_entries > LOG_CLASS_LIST_MAX_ENTRIES) {
605 max_entries = LOG_CLASS_LIST_MAX_ENTRIES;
609 // Note that last_marker is updated to be the marker of the last
611 http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et,
612 max_entries, entries, marker,
613 &last_marker, &truncated);
616 void RGWOp_DATALog_List::send_response() {
617 set_req_state_err(s, http_ret);
624 s->formatter->open_object_section("log_entries");
625 s->formatter->dump_string("marker", last_marker);
626 s->formatter->dump_bool("truncated", truncated);
628 s->formatter->open_array_section("entries");
629 for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
630 iter != entries.end(); ++iter) {
631 rgw_data_change_log_entry& entry = *iter;
633 encode_json("entry", entry.entry, s->formatter);
635 encode_json("entry", entry, s->formatter);
639 s->formatter->close_section();
641 s->formatter->close_section();
646 void RGWOp_DATALog_Info::execute() {
647 num_objects = s->cct->_conf->rgw_data_log_num_shards;
651 void RGWOp_DATALog_Info::send_response() {
652 set_req_state_err(s, http_ret);
656 s->formatter->open_object_section("num_objects");
657 s->formatter->dump_unsigned("num_objects", num_objects);
658 s->formatter->close_section();
662 void RGWOp_DATALog_ShardInfo::execute() {
663 string shard = s->info.args.get("id");
666 unsigned shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
668 dout(5) << "Error parsing shard_id " << shard << dendl;
673 http_ret = store->data_log->get_info(shard_id, &info);
676 void RGWOp_DATALog_ShardInfo::send_response() {
677 set_req_state_err(s, http_ret);
681 encode_json("info", info, s->formatter);
685 void RGWOp_DATALog_Lock::execute() {
686 string shard_id_str, duration_str, locker_id, zone_id;
691 shard_id_str = s->info.args.get("id");
692 duration_str = s->info.args.get("length");
693 locker_id = s->info.args.get("locker-id");
694 zone_id = s->info.args.get("zone-id");
696 if (shard_id_str.empty() ||
697 (duration_str.empty()) ||
700 dout(5) << "Error invalid parameter list" << dendl;
706 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
708 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
714 dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
715 if (!err.empty() || dur <= 0) {
716 dout(5) << "invalid length param " << duration_str << dendl;
720 http_ret = store->data_log->lock_exclusive(shard_id, make_timespan(dur), zone_id, locker_id);
721 if (http_ret == -EBUSY)
722 http_ret = -ERR_LOCKED;
725 void RGWOp_DATALog_Unlock::execute() {
726 string shard_id_str, locker_id, zone_id;
731 shard_id_str = s->info.args.get("id");
732 locker_id = s->info.args.get("locker-id");
733 zone_id = s->info.args.get("zone-id");
735 if (shard_id_str.empty() ||
738 dout(5) << "Error invalid parameter list" << dendl;
744 shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
746 dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
751 http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
754 void RGWOp_DATALog_Notify::execute() {
755 string source_zone = s->info.args.get("source-zone");
758 #define LARGE_ENOUGH_BUF (128 * 1024)
759 int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
765 ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
768 r = p.parse(data, len);
771 ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
776 map<int, set<string> > updated_shards;
778 decode_json_obj(updated_shards, &p);
779 } catch (JSONDecoder::err& err) {
780 ldout(s->cct, 0) << "ERROR: failed to decode JSON" << dendl;
785 if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
786 for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
787 ldout(s->cct, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
788 set<string>& keys = iter->second;
789 for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
790 ldout(s->cct, 20) << __func__ << "(): modified key=" << *kiter << dendl;
795 store->wakeup_data_sync_shards(source_zone, updated_shards);
800 void RGWOp_DATALog_Delete::execute() {
801 string st = s->info.args.get("start-time"),
802 et = s->info.args.get("end-time"),
803 start_marker = s->info.args.get("start-marker"),
804 end_marker = s->info.args.get("end-marker"),
805 shard = s->info.args.get("id"),
813 shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err);
815 dout(5) << "Error parsing shard_id " << shard << dendl;
819 if (et.empty() && end_marker.empty()) { /* bounding end */
824 if (parse_date_str(st, ut_st) < 0) {
829 if (parse_date_str(et, ut_et) < 0) {
834 http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
837 // not in header to avoid pulling in rgw_sync.h
838 class RGWOp_MDLog_Status : public RGWRESTOp {
839 rgw_meta_sync_status status;
841 int check_caps(RGWUserCaps& caps) override {
842 return caps.check_cap("mdlog", RGW_CAP_READ);
844 int verify_permission() override {
845 return check_caps(s->user->caps);
847 void execute() override;
848 void send_response() override;
849 const string name() override { return "get_metadata_log_status"; }
852 void RGWOp_MDLog_Status::execute()
854 auto sync = store->get_meta_sync_manager();
855 if (sync == nullptr) {
856 ldout(s->cct, 1) << "no sync manager" << dendl;
860 http_ret = sync->read_sync_status(&status);
863 void RGWOp_MDLog_Status::send_response()
865 set_req_state_err(s, http_ret);
870 encode_json("status", status, s->formatter);
875 // not in header to avoid pulling in rgw_data_sync.h
876 class RGWOp_DATALog_Status : public RGWRESTOp {
877 rgw_data_sync_status status;
879 int check_caps(RGWUserCaps& caps) override {
880 return caps.check_cap("datalog", RGW_CAP_READ);
882 int verify_permission() override {
883 return check_caps(s->user->caps);
885 void execute() override ;
886 void send_response() override;
887 const string name() override { return "get_data_changes_log_status"; }
890 void RGWOp_DATALog_Status::execute()
892 const auto source_zone = s->info.args.get("source-zone");
893 auto sync = store->get_data_sync_manager(source_zone);
894 if (sync == nullptr) {
895 ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl;
899 http_ret = sync->read_sync_status(&status);
902 void RGWOp_DATALog_Status::send_response()
904 set_req_state_err(s, http_ret);
909 encode_json("status", status, s->formatter);
915 RGWOp *RGWHandler_Log::op_get() {
917 string type = s->info.args.get("type", &exists);
923 if (type.compare("metadata") == 0) {
924 if (s->info.args.exists("id")) {
925 if (s->info.args.exists("info")) {
926 return new RGWOp_MDLog_ShardInfo;
928 return new RGWOp_MDLog_List;
930 } else if (s->info.args.exists("status")) {
931 return new RGWOp_MDLog_Status;
933 return new RGWOp_MDLog_Info;
935 } else if (type.compare("bucket-index") == 0) {
936 if (s->info.args.exists("info")) {
937 return new RGWOp_BILog_Info;
939 return new RGWOp_BILog_List;
941 } else if (type.compare("data") == 0) {
942 if (s->info.args.exists("id")) {
943 if (s->info.args.exists("info")) {
944 return new RGWOp_DATALog_ShardInfo;
946 return new RGWOp_DATALog_List;
948 } else if (s->info.args.exists("status")) {
949 return new RGWOp_DATALog_Status;
951 return new RGWOp_DATALog_Info;
957 RGWOp *RGWHandler_Log::op_delete() {
959 string type = s->info.args.get("type", &exists);
965 if (type.compare("metadata") == 0)
966 return new RGWOp_MDLog_Delete;
967 else if (type.compare("bucket-index") == 0)
968 return new RGWOp_BILog_Delete;
969 else if (type.compare("data") == 0)
970 return new RGWOp_DATALog_Delete;
974 RGWOp *RGWHandler_Log::op_post() {
976 string type = s->info.args.get("type", &exists);
982 if (type.compare("metadata") == 0) {
983 if (s->info.args.exists("lock"))
984 return new RGWOp_MDLog_Lock;
985 else if (s->info.args.exists("unlock"))
986 return new RGWOp_MDLog_Unlock;
987 else if (s->info.args.exists("notify"))
988 return new RGWOp_MDLog_Notify;
989 } else if (type.compare("data") == 0) {
990 if (s->info.args.exists("lock"))
991 return new RGWOp_DATALog_Lock;
992 else if (s->info.args.exists("unlock"))
993 return new RGWOp_DATALog_Unlock;
994 else if (s->info.args.exists("notify"))
995 return new RGWOp_DATALog_Notify;