2 #include "rgw_coroutine.h"
3 #include "rgw_cr_rados.h"
5 #include "cls/lock/cls_lock_client.h"
7 #include <boost/asio/yield.hpp>
9 #define dout_context g_ceph_context
10 #define dout_subsys ceph_subsys_rgw
12 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
13 if (processor->is_going_down()) {
17 processor->m_req_queue.push_back(req);
18 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
23 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
24 return processor->m_req_queue.empty();
27 RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
28 if (processor->m_req_queue.empty())
30 RGWAsyncRadosRequest *req = processor->m_req_queue.front();
31 processor->m_req_queue.pop_front();
32 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
37 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) {
38 processor->handle_request(req);
39 processor->req_throttle.put(1);
42 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
43 if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
46 deque<RGWAsyncRadosRequest *>::iterator iter;
47 if (processor->m_req_queue.empty()) {
48 dout(20) << "RGWWQ: empty" << dendl;
51 dout(20) << "RGWWQ:" << dendl;
52 for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
53 dout(20) << "req: " << hex << *iter << dec << dendl;
57 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
58 : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
59 req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
60 req_wq(this, g_conf->rgw_op_thread_timeout,
61 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
64 void RGWAsyncRadosProcessor::start() {
68 void RGWAsyncRadosProcessor::stop() {
72 for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
77 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
82 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
87 int RGWAsyncGetSystemObj::_send_request()
89 return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, pattrs, NULL);
92 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
93 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
94 bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(caller, cn), store(_store), obj_ctx(_obj_ctx),
95 objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), pattrs(NULL),
100 int RGWSimpleRadosReadAttrsCR::send_request()
102 req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
103 store, &obj_ctx, NULL,
107 req->set_read_attrs(pattrs);
109 async_rados->queue(req);
113 int RGWSimpleRadosReadAttrsCR::request_complete()
115 return req->get_ret_status();
118 int RGWAsyncPutSystemObj::_send_request()
120 return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
123 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
124 RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
125 bool _exclusive, bufferlist& _bl)
126 : RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
127 obj(_obj), exclusive(_exclusive), bl(_bl)
131 int RGWAsyncPutSystemObjAttrs::_send_request()
133 return store->system_obj_set_attrs(NULL, obj, *attrs, NULL, objv_tracker);
136 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
137 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
138 map<string, bufferlist> *_attrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
139 objv_tracker(_objv_tracker), obj(_obj),
145 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj,
146 uint64_t _window_size)
147 : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
148 store(_store), obj(_obj), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
152 int RGWAsyncLockSystemObj::_send_request()
155 int r = store->get_raw_obj_ref(obj, &ref);
157 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
161 rados::cls::lock::Lock l(lock_name);
162 utime_t duration(duration_secs, 0);
163 l.set_duration(duration);
164 l.set_cookie(cookie);
167 return l.lock_exclusive(&ref.ioctx, ref.oid);
170 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
171 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
172 const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
176 duration_secs(_duration_secs)
180 int RGWAsyncUnlockSystemObj::_send_request()
183 int r = store->get_raw_obj_ref(obj, &ref);
185 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
189 rados::cls::lock::Lock l(lock_name);
191 l.set_cookie(cookie);
193 return l.unlock(&ref.ioctx, ref.oid);
196 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
197 RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
198 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(caller, cn), store(_store),
200 lock_name(_name), cookie(_cookie)
204 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
205 const rgw_raw_obj& _obj,
206 map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
211 stringstream& s = set_description();
212 s << "set omap keys dest=" << obj << " keys=[" << s.str() << "]";
213 for (auto i = entries.begin(); i != entries.end(); ++i) {
214 if (i != entries.begin()) {
222 int RGWRadosSetOmapKeysCR::send_request()
224 int r = store->get_raw_obj_ref(obj, &ref);
226 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
230 set_status() << "sending request";
232 librados::ObjectWriteOperation op;
233 op.omap_set(entries);
235 cn = stack->create_completion_notifier();
236 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
239 int RGWRadosSetOmapKeysCR::request_complete()
241 int r = cn->completion()->get_return_value();
243 set_status() << "request complete; ret=" << r;
248 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
249 const rgw_raw_obj& _obj,
250 const string& _marker,
251 map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
254 entries(_entries), max_entries(_max_entries), rval(0),
257 set_description() << "set omap keys dest=" << obj << " marker=" << marker;
260 int RGWRadosGetOmapKeysCR::send_request() {
261 int r = store->get_raw_obj_ref(obj, &ref);
263 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
267 set_status() << "send request";
269 librados::ObjectReadOperation op;
270 op.omap_get_vals2(marker, max_entries, entries, nullptr, &rval);
272 cn = stack->create_completion_notifier();
273 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
276 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados *_store,
277 const rgw_raw_obj& _obj,
278 const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
283 set_description() << "remove omap keys dest=" << obj << " keys=" << keys;
286 int RGWRadosRemoveOmapKeysCR::send_request() {
287 int r = store->get_raw_obj_ref(obj, &ref);
289 lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
293 set_status() << "send request";
295 librados::ObjectWriteOperation op;
296 op.omap_rm_keys(keys);
298 cn = stack->create_completion_notifier();
299 return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
302 int RGWRadosRemoveOmapKeysCR::request_complete()
304 int r = cn->completion()->get_return_value();
306 set_status() << "request complete; ret=" << r;
311 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
312 : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
314 set_description() << "remove dest=" << obj;
317 int RGWRadosRemoveCR::send_request()
319 auto rados = store->get_rados_handle();
320 int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
322 lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
325 ioctx.locator_set_key(obj.loc);
327 set_status() << "send request";
329 librados::ObjectWriteOperation op;
332 cn = stack->create_completion_notifier();
333 return ioctx.aio_operate(obj.oid, cn->completion(), &op);
336 int RGWRadosRemoveCR::request_complete()
338 int r = cn->completion()->get_return_value();
340 set_status() << "request complete; ret=" << r;
345 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
346 const rgw_raw_obj& _obj,
347 const string& _lock_name,
348 const string& _cookie,
349 uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
350 async_rados(_async_rados),
352 lock_name(_lock_name),
358 set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
361 void RGWSimpleRadosLockCR::request_cleanup()
369 int RGWSimpleRadosLockCR::send_request()
371 set_status() << "sending request";
372 req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
373 store, NULL, obj, lock_name, cookie, duration);
374 async_rados->queue(req);
378 int RGWSimpleRadosLockCR::request_complete()
380 set_status() << "request complete; ret=" << req->get_ret_status();
381 return req->get_ret_status();
384 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
385 const rgw_raw_obj& _obj,
386 const string& _lock_name,
387 const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
388 async_rados(_async_rados),
390 lock_name(_lock_name),
395 set_description() << "rados unlock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie;
398 void RGWSimpleRadosUnlockCR::request_cleanup()
406 int RGWSimpleRadosUnlockCR::send_request()
408 set_status() << "sending request";
410 req = new RGWAsyncUnlockSystemObj(this, stack->create_completion_notifier(),
411 store, NULL, obj, lock_name, cookie);
412 async_rados->queue(req);
416 int RGWSimpleRadosUnlockCR::request_complete()
418 set_status() << "request complete; ret=" << req->get_ret_status();
419 return req->get_ret_status();
422 int RGWOmapAppend::operate() {
425 if (!has_product() && going_down) {
426 set_status() << "going down";
429 set_status() << "waiting for product";
430 yield wait_for_product();
433 while (consume(&entry)) {
434 set_status() << "adding entry: " << entry;
435 entries[entry] = bufferlist();
436 if (entries.size() >= window_size) {
440 if (entries.size() >= window_size || going_down) {
441 set_status() << "flushing to omap";
442 call(new RGWRadosSetOmapKeysCR(store, obj, entries));
446 if (get_ret_status() < 0) {
447 ldout(cct, 0) << "ERROR: failed to store entries in omap" << dendl;
448 return set_state(RGWCoroutine_Error);
451 /* done with coroutine */
452 return set_state(RGWCoroutine_Done);
457 void RGWOmapAppend::flush_pending() {
458 receive(pending_entries);
459 num_pending_entries = 0;
462 bool RGWOmapAppend::append(const string& s) {
467 pending_entries.push_back(s);
468 if (++num_pending_entries >= (int)window_size) {
474 bool RGWOmapAppend::finish() {
481 int RGWAsyncGetBucketInstanceInfo::_send_request()
483 RGWObjectCtx obj_ctx(store);
484 int r = store->get_bucket_instance_info(obj_ctx, bucket, *bucket_info, NULL, NULL);
486 ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for "
494 int RGWAsyncFetchRemoteObj::_send_request()
496 RGWObjectCtx obj_ctx(store);
500 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
501 string client_id = store->zone_id() + buf;
502 string op_id = store->unique_id(store->get_new_req_id());
503 map<string, bufferlist> attrs;
505 rgw_obj src_obj(bucket_info.bucket, key);
507 rgw_obj dest_obj(src_obj);
509 int r = store->fetch_remote_obj(obj_ctx,
513 false, /* don't record op state in ops log */
518 bucket_info, /* dest */
519 bucket_info, /* source */
520 NULL, /* real_time* src_mtime, */
521 NULL, /* real_time* mtime, */
522 NULL, /* const real_time* mod_ptr, */
523 NULL, /* const real_time* unmod_ptr, */
524 false, /* high precision time */
525 NULL, /* const char *if_match, */
526 NULL, /* const char *if_nomatch, */
527 RGWRados::ATTRSMOD_NONE,
530 RGW_OBJ_CATEGORY_MAIN,
532 real_time(), /* delete_at */
533 &key.instance, /* string *version_id, */
534 NULL, /* string *ptag, */
535 NULL, /* string *petag, */
536 NULL, /* void (*progress_cb)(off_t, void *), */
537 NULL, /* void *progress_data*); */
541 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
546 int RGWAsyncStatRemoteObj::_send_request()
548 RGWObjectCtx obj_ctx(store);
552 snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
553 string client_id = store->zone_id() + buf;
554 string op_id = store->unique_id(store->get_new_req_id());
556 rgw_obj src_obj(bucket_info.bucket, key);
558 rgw_obj dest_obj(src_obj);
560 int r = store->stat_remote_obj(obj_ctx,
563 nullptr, /* req_info */
566 bucket_info, /* source */
567 pmtime, /* real_time* src_mtime, */
568 psize, /* uint64_t * */
569 nullptr, /* const real_time* mod_ptr, */
570 nullptr, /* const real_time* unmod_ptr, */
571 true, /* high precision time */
572 nullptr, /* const char *if_match, */
573 nullptr, /* const char *if_nomatch, */
576 nullptr, /* string *ptag, */
577 nullptr); /* string *petag, */
580 ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
586 int RGWAsyncRemoveObj::_send_request()
588 RGWObjectCtx obj_ctx(store);
590 rgw_obj obj(bucket_info.bucket, key);
592 ldout(store->ctx(), 0) << __func__ << "(): deleting obj=" << obj << dendl;
594 obj_ctx.obj.set_atomic(obj);
598 int ret = store->get_obj_state(&obj_ctx, bucket_info, obj, &state);
600 ldout(store->ctx(), 20) << __func__ << "(): get_obj_state() obj=" << obj << " returned ret=" << ret << dendl;
604 /* has there been any racing object write? */
605 if (del_if_older && (state->mtime > timestamp)) {
606 ldout(store->ctx(), 20) << __func__ << "(): skipping object removal obj=" << obj << " (obj mtime=" << state->mtime << ", request timestamp=" << timestamp << ")" << dendl;
610 RGWAccessControlPolicy policy;
613 map<string, bufferlist>::iterator iter = state->attrset.find(RGW_ATTR_ACL);
614 if (iter != state->attrset.end()) {
615 bufferlist::iterator bliter = iter->second.begin();
617 policy.decode(bliter);
618 } catch (buffer::error& err) {
619 ldout(store->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
624 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
625 RGWRados::Object::Delete del_op(&del_target);
627 del_op.params.bucket_owner = bucket_info.owner;
628 del_op.params.obj_owner = policy.get_owner();
630 del_op.params.unmod_since = timestamp;
633 del_op.params.versioning_status = BUCKET_VERSIONED;
635 del_op.params.olh_epoch = versioned_epoch;
636 del_op.params.marker_version_id = marker_version_id;
637 del_op.params.obj_owner.set_id(owner);
638 del_op.params.obj_owner.set_name(owner_display_name);
639 del_op.params.mtime = timestamp;
640 del_op.params.high_precision_time = true;
641 del_op.params.zones_trace = zones_trace;
643 ret = del_op.delete_obj();
645 ldout(store->ctx(), 20) << __func__ << "(): delete_obj() obj=" << obj << " returned ret=" << ret << dendl;
650 int RGWContinuousLeaseCR::operate()
653 caller->set_sleeping(false);
654 return set_cr_done();
657 while (!going_down) {
658 yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
660 caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
663 ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
664 return set_state(RGWCoroutine_Error, retcode);
667 yield wait(utime_t(interval / 2, 0));
669 set_locked(false); /* moot at this point anyway */
670 yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
671 return set_state(RGWCoroutine_Done);
676 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
677 const cls_log_entry& entry) : RGWSimpleCoroutine(_store->ctx()),
681 stringstream& s = set_description();
682 s << "timelog add entry oid=" << oid << "entry={id=" << entry.id << ", section=" << entry.section << ", name=" << entry.name << "}";
683 entries.push_back(entry);
686 int RGWRadosTimelogAddCR::send_request()
688 set_status() << "sending request";
690 cn = stack->create_completion_notifier();
691 return store->time_log_add(oid, entries, cn->completion(), true);
694 int RGWRadosTimelogAddCR::request_complete()
696 int r = cn->completion()->get_return_value();
698 set_status() << "request complete; ret=" << r;
703 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
704 const std::string& oid,
705 const real_time& start_time,
706 const real_time& end_time,
707 const std::string& from_marker,
708 const std::string& to_marker)
709 : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
710 start_time(start_time), end_time(end_time),
711 from_marker(from_marker), to_marker(to_marker)
713 set_description() << "timelog trim oid=" << oid
714 << " start_time=" << start_time << " end_time=" << end_time
715 << " from_marker=" << from_marker << " to_marker=" << to_marker;
718 int RGWRadosTimelogTrimCR::send_request()
720 set_status() << "sending request";
722 cn = stack->create_completion_notifier();
723 return store->time_log_trim(oid, start_time, end_time, from_marker,
724 to_marker, cn->completion());
727 int RGWRadosTimelogTrimCR::request_complete()
729 int r = cn->completion()->get_return_value();
731 set_status() << "request complete; ret=" << r;
737 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
738 const std::string& to_marker,
739 std::string *last_trim_marker)
740 : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
741 std::string{}, to_marker),
742 cct(store->ctx()), last_trim_marker(last_trim_marker)
746 int RGWSyncLogTrimCR::request_complete()
748 int r = RGWRadosTimelogTrimCR::request_complete();
749 if (r < 0 && r != -ENODATA) {
752 if (*last_trim_marker < to_marker) {
753 *last_trim_marker = to_marker;
759 int RGWAsyncStatObj::_send_request()
762 store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
763 return store->raw_obj_stat(raw_obj, psize, pmtime, pepoch,
764 nullptr, nullptr, objv_tracker);
767 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
768 const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize,
769 real_time* pmtime, uint64_t *pepoch,
770 RGWObjVersionTracker *objv_tracker)
771 : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
772 bucket_info(_bucket_info), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
773 objv_tracker(objv_tracker)
777 void RGWStatObjCR::request_cleanup()
785 int RGWStatObjCR::send_request()
787 req = new RGWAsyncStatObj(this, stack->create_completion_notifier(),
788 store, bucket_info, obj, psize, pmtime, pepoch, objv_tracker);
789 async_rados->queue(req);
793 int RGWStatObjCR::request_complete()
795 return req->get_ret_status();