1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
6 #include <boost/utility/string_ref.hpp>
10 #include <curl/multi.h>
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/RefCountedObj.h"
17 #include "rgw_coroutine.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 struct rgw_http_req_data : public RefCountedObject {
29 std::atomic<bool> done = { false };
30 RGWHTTPClient *client;
34 char error_buf[CURL_ERROR_SIZE];
39 rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
40 client(nullptr), user_info(nullptr), registered(false),
41 mgr(NULL), lock("rgw_http_req_data::lock") {
42 memset(error_buf, 0, sizeof(error_buf));
46 Mutex::Locker l(lock);
53 Mutex::Locker l(lock);
56 curl_easy_cleanup(easy_handle);
59 curl_slist_free_all(h);
72 Mutex::Locker l(lock);
76 RGWHTTPManager *get_manager() {
77 Mutex::Locker l(lock);
83 * the simple set of callbacks will be called on RGWHTTPClient::process()
85 /* Static methods - callbacks for libcurl. */
86 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
91 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
92 const size_t len = size * nmemb;
93 int ret = client->receive_header(ptr, size * nmemb);
95 dout(0) << "WARNING: client->receive_header() returned ret="
102 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
107 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
108 const size_t len = size * nmemb;
109 int ret = client->receive_data(ptr, size * nmemb);
111 dout(0) << "WARNING: client->receive_data() returned ret="
118 size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
123 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
124 int ret = client->send_data(ptr, size * nmemb);
126 dout(0) << "WARNING: client->send_data() returned ret="
134 * the following set of callbacks will be called either on RGWHTTPManager::process(),
135 * or via the RGWHTTPManager async processing.
137 size_t RGWHTTPClient::receive_http_header(void * const ptr,
142 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
143 size_t len = size * nmemb;
145 Mutex::Locker l(req_data->lock);
147 if (!req_data->registered) {
151 int ret = req_data->client->receive_header(ptr, size * nmemb);
153 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
159 size_t RGWHTTPClient::receive_http_data(void * const ptr,
164 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
165 size_t len = size * nmemb;
167 Mutex::Locker l(req_data->lock);
169 if (!req_data->registered) {
173 int ret = req_data->client->receive_data(ptr, size * nmemb);
175 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
181 size_t RGWHTTPClient::send_http_data(void * const ptr,
186 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
188 Mutex::Locker l(req_data->lock);
190 if (!req_data->registered) {
194 int ret = req_data->client->send_data(ptr, size * nmemb);
196 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
202 static curl_slist *headers_to_slist(param_vec_t& headers)
204 curl_slist *h = NULL;
206 param_vec_t::iterator iter;
207 for (iter = headers.begin(); iter != headers.end(); ++iter) {
208 pair<string, string>& p = *iter;
209 string val = p.first;
211 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
215 /* we need to convert all underscores into dashes as some web servers forbid them
216 * in the http header field names
218 for (size_t i = 0; i < val.size(); i++) {
225 val.append(p.second);
226 h = curl_slist_append(h, val.c_str());
232 static bool is_upload_request(const char *method)
234 if (method == nullptr) {
237 return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
241 * process a single simple one off request, not going through RGWHTTPManager. Not using
244 int RGWHTTPClient::process(const char *method, const char *url)
249 char error_buf[CURL_ERROR_SIZE];
251 last_method = (method ? method : "");
252 last_url = (url ? url : "");
254 curl_handle = curl_easy_init();
256 dout(20) << "sending request to " << url << dendl;
258 curl_slist *h = headers_to_slist(headers);
260 curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
261 curl_easy_setopt(curl_handle, CURLOPT_URL, url);
262 curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
263 curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
264 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
265 curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
266 curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
267 curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
268 curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
270 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
272 curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
273 curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
274 if (is_upload_request(method)) {
275 curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
278 curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
281 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
282 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
283 dout(20) << "ssl verification is set to off" << dendl;
286 CURLcode status = curl_easy_perform(curl_handle);
288 dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
291 curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
292 curl_easy_cleanup(curl_handle);
293 curl_slist_free_all(h);
298 string RGWHTTPClient::to_str()
300 string method_str = (last_method.empty() ? "<no-method>" : last_method);
301 string url_str = (last_url.empty() ? "<no-url>" : last_url);
302 return method_str + " " + url_str;
305 int RGWHTTPClient::get_req_retcode()
311 return req_data->get_retcode();
315 * init request, will be used later with RGWHTTPManager
317 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
321 req_data = _req_data;
325 easy_handle = curl_easy_init();
327 req_data->easy_handle = easy_handle;
329 dout(20) << "sending request to " << url << dendl;
331 curl_slist *h = headers_to_slist(headers);
335 last_method = (method ? method : "");
336 last_url = (url ? url : "");
338 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
339 curl_easy_setopt(easy_handle, CURLOPT_URL, url);
340 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
341 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
342 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
343 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
344 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
345 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
346 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
348 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
350 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
351 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
352 if (send_data_hint || is_upload_request(method)) {
353 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
356 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
359 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
360 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
361 dout(20) << "ssl verification is set to off" << dendl;
363 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
369 * wait for async request to complete
371 int RGWHTTPClient::wait()
373 if (!req_data->is_done()) {
374 return req_data->wait();
377 return req_data->ret;
380 RGWHTTPClient::~RGWHTTPClient()
383 RGWHTTPManager *http_manager = req_data->get_manager();
385 http_manager->remove_request(this);
393 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
395 const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
397 /* We're tokening the line that way due to backward compatibility. */
398 const size_t sep_loc = header_line.find_first_of(" \t:");
400 if (boost::string_ref::npos == sep_loc) {
401 /* Wrongly formatted header? Just skip it. */
405 header_name_t name(header_line.substr(0, sep_loc));
406 if (0 == relevant_headers.count(name)) {
407 /* Not interested in this particular header. */
411 const auto value_part = header_line.substr(sep_loc + 1);
413 /* Skip spaces and tabs after the separator. */
414 const size_t val_loc_s = value_part.find_first_not_of(' ');
415 const size_t val_loc_e = value_part.find_first_of("\r\n");
417 if (boost::string_ref::npos == val_loc_s ||
418 boost::string_ref::npos == val_loc_e) {
419 /* Empty value case. */
420 found_headers.emplace(name, header_value_t());
422 found_headers.emplace(name, header_value_t(
423 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
429 int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
431 int length_to_copy = 0;
432 if (post_data_index < post_data.length()) {
433 length_to_copy = min(post_data.length() - post_data_index, len);
434 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
435 post_data_index += length_to_copy;
437 return length_to_copy;
441 static int clear_signal(int fd)
443 // since we're in non-blocking mode, we can try to read a lot more than
444 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
445 // are also required to support the curl_multi_wait bug workaround
446 std::array<char, 256> buf;
447 int ret = ::read(fd, (void *)buf.data(), buf.size());
450 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
455 #if HAVE_CURL_MULTI_WAIT
457 static std::once_flag detect_flag;
458 static bool curl_multi_wait_bug_present = false;
460 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
461 int write_fd, int read_fd)
465 // write to write_fd so that read_fd becomes readable
467 ret = ::write(write_fd, &buf, sizeof(buf));
470 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
474 // pass read_fd in extra_fds for curl_multi_wait()
476 struct curl_waitfd wait_fd;
478 wait_fd.fd = read_fd;
479 wait_fd.events = CURL_WAIT_POLLIN;
482 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
483 if (ret != CURLM_OK) {
484 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
488 // curl_multi_wait should flag revents when extra_fd is readable. if it
489 // doesn't, the bug is present and we can't rely on revents
490 if (wait_fd.revents == 0) {
491 curl_multi_wait_bug_present = true;
492 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
493 "bug in curl_multi_wait(). enabling a workaround that may degrade "
494 "performance slightly." << dendl;
497 return clear_signal(read_fd);
500 static bool is_signaled(const curl_waitfd& wait_fd)
502 if (wait_fd.fd < 0) {
507 if (curl_multi_wait_bug_present) {
508 // we can't rely on revents, so we always return true if a wait_fd is given.
509 // this means we'll be trying a non-blocking read on this fd every time that
510 // curl_multi_wait() wakes up
514 return wait_fd.revents > 0;
517 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
520 struct curl_waitfd wait_fd;
522 wait_fd.fd = signal_fd;
523 wait_fd.events = CURL_WAIT_POLLIN;
526 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
528 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
532 if (is_signaled(wait_fd)) {
533 ret = clear_signal(signal_fd);
535 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
544 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
555 /* get file descriptors from the transfers */
556 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
558 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
563 FD_SET(signal_fd, &fdread);
564 if (signal_fd >= maxfd) {
565 maxfd = signal_fd + 1;
569 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
570 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
571 #define RGW_CURL_TIMEOUT 1000
573 to = RGW_CURL_TIMEOUT;
574 struct timeval timeout;
575 timeout.tv_sec = to / 1000;
576 timeout.tv_usec = to % 1000;
578 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
581 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
585 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
586 ret = clear_signal(signal_fd);
588 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
598 void *RGWHTTPManager::ReqsThread::entry()
600 manager->reqs_thread_entry();
605 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
607 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
608 completion_mgr(_cm), is_threaded(false),
609 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
612 multi_handle = (void *)curl_multi_init();
617 RGWHTTPManager::~RGWHTTPManager() {
620 curl_multi_cleanup((CURLM *)multi_handle);
623 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
625 RWLock::WLocker rl(reqs_lock);
626 req_data->id = num_reqs;
627 req_data->registered = true;
628 reqs[num_reqs] = req_data;
630 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
633 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
635 RWLock::WLocker rl(reqs_lock);
637 req_data->registered = false;
638 unregistered_reqs.push_back(req_data);
639 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
642 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
644 RWLock::WLocker rl(reqs_lock);
645 _complete_request(req_data);
648 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
650 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
651 if (iter != reqs.end()) {
655 Mutex::Locker l(req_data->lock);
656 req_data->mgr = nullptr;
658 if (completion_mgr) {
659 completion_mgr->complete(NULL, req_data->user_info);
665 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
667 req_data->finish(ret);
668 complete_request(req_data);
671 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
673 req_data->finish(ret);
674 _complete_request(req_data);
678 * hook request to the curl multi handle
680 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
682 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
683 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
685 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
692 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
693 * there will be no more processing on this request
695 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
697 if (req_data->easy_handle) {
698 curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
700 if (!req_data->is_done()) {
701 _finish_request(req_data, -ECANCELED);
705 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
707 RWLock::WLocker wl(reqs_lock);
708 _unlink_request(req_data);
711 void RGWHTTPManager::manage_pending_requests()
713 reqs_lock.get_read();
714 if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
720 RWLock::WLocker wl(reqs_lock);
722 if (!unregistered_reqs.empty()) {
723 for (auto& r : unregistered_reqs) {
728 unregistered_reqs.clear();
731 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
733 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
735 for (; iter != reqs.end(); ++iter) {
736 rgw_http_req_data *req_data = iter->second;
737 int r = link_request(req_data);
739 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
740 remove_reqs.push_back(std::make_pair(iter->second, r));
742 max_threaded_req = iter->first + 1;
746 for (auto piter : remove_reqs) {
747 rgw_http_req_data *req_data = piter.first;
748 int r = piter.second;
750 _finish_request(req_data, r);
754 int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
756 rgw_http_req_data *req_data = new rgw_http_req_data;
758 int ret = client->init_request(method, url, req_data, send_data_hint);
765 req_data->mgr = this;
766 req_data->client = client;
767 req_data->user_info = client->get_user_info();
769 register_request(req_data);
772 ret = link_request(req_data);
779 ret = signal_thread();
781 finish_request(req_data, ret);
787 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
789 rgw_http_req_data *req_data = client->get_req_data();
792 unlink_request(req_data);
795 unregister_request(req_data);
796 int ret = signal_thread();
805 * the synchronous, non-threaded request processing method.
807 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
809 assert(!is_threaded);
816 int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
822 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
825 case CURLM_CALL_MULTI_PERFORM:
828 dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
833 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
834 if (msg->msg == CURLMSG_DONE) {
835 CURL *e = msg->easy_handle;
836 rgw_http_req_data *req_data;
837 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
840 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
842 int status = rgw_http_error_to_errno(http_status);
843 int result = msg->data.result;
844 finish_request(req_data, status);
849 dout(20) << "ERROR: msg->data.result=" << result << dendl;
854 } while (mstatus == CURLM_CALL_MULTI_PERFORM);
856 *done = (still_running == 0);
862 * the synchronous, non-threaded request processing completion method.
864 int RGWHTTPManager::complete_requests()
869 ret = process_requests(true, &done);
870 } while (!done && !ret);
875 int RGWHTTPManager::set_threaded()
877 int r = pipe(thread_pipe);
880 ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
884 // enable non-blocking reads
885 r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
888 ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
889 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
890 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
894 #ifdef HAVE_CURL_MULTI_WAIT
895 // on first initialization, use this pipe to detect whether we're using a
896 // buggy version of libcurl
897 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
898 static_cast<CURLM*>(multi_handle),
899 thread_pipe[1], thread_pipe[0]);
903 reqs_thread = new ReqsThread(this);
904 reqs_thread->create("http_manager");
908 void RGWHTTPManager::stop()
921 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
922 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
926 int RGWHTTPManager::signal_thread()
929 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
932 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
938 void *RGWHTTPManager::reqs_thread_entry()
943 ldout(cct, 20) << __func__ << ": start" << dendl;
945 while (!going_down) {
946 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
948 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
952 manage_pending_requests();
954 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
957 case CURLM_CALL_MULTI_PERFORM:
960 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
965 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
966 if (msg->msg == CURLMSG_DONE) {
967 int result = msg->data.result;
968 CURL *e = msg->easy_handle;
969 rgw_http_req_data *req_data;
970 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
971 curl_multi_remove_handle((CURLM *)multi_handle, e);
974 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
976 int status = rgw_http_error_to_errno(http_status);
977 if (result != CURLE_OK && http_status == 0) {
980 int id = req_data->id;
981 finish_request(req_data, status);
986 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
994 RWLock::WLocker rl(reqs_lock);
995 for (auto r : unregistered_reqs) {
996 _finish_request(r, -ECANCELED);
999 unregistered_reqs.clear();
1001 auto all_reqs = std::move(reqs);
1002 for (auto iter : all_reqs) {
1003 _finish_request(iter.second, -ECANCELED);
1008 if (completion_mgr) {
1009 completion_mgr->go_down();