Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5
6 #include <boost/utility/string_ref.hpp>
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/RefCountedObj.h"
16
17 #include "rgw_coroutine.h"
18
19 #include <atomic>
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 struct rgw_http_req_data : public RefCountedObject {
25   CURL *easy_handle;
26   curl_slist *h;
27   uint64_t id;
28   int ret;
29   std::atomic<bool> done = { false };
30   RGWHTTPClient *client;
31   void *user_info;
32   bool registered;
33   RGWHTTPManager *mgr;
34   char error_buf[CURL_ERROR_SIZE];
35
36   Mutex lock;
37   Cond cond;
38
39   rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
40                         client(nullptr), user_info(nullptr), registered(false),
41                         mgr(NULL), lock("rgw_http_req_data::lock") {
42     memset(error_buf, 0, sizeof(error_buf));
43   }
44
45   int wait() {
46     Mutex::Locker l(lock);
47     cond.Wait(lock);
48     return ret;
49   }
50
51
52   void finish(int r) {
53     Mutex::Locker l(lock);
54     ret = r;
55     if (easy_handle)
56       curl_easy_cleanup(easy_handle);
57
58     if (h)
59       curl_slist_free_all(h);
60
61     easy_handle = NULL;
62     h = NULL;
63     done = true;
64     cond.Signal();
65   }
66
67   bool is_done() {
68     return done;
69   }
70
71   int get_retcode() {
72     Mutex::Locker l(lock);
73     return ret;
74   }
75
76   RGWHTTPManager *get_manager() {
77     Mutex::Locker l(lock);
78     return mgr;
79   }
80 };
81
82 /*
83  * the simple set of callbacks will be called on RGWHTTPClient::process()
84  */
85 /* Static methods - callbacks for libcurl. */
86 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
87                                                  const size_t size,
88                                                  const size_t nmemb,
89                                                  void * const _info)
90 {
91   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
92   const size_t len = size * nmemb;
93   int ret = client->receive_header(ptr, size * nmemb);
94   if (ret < 0) {
95     dout(0) << "WARNING: client->receive_header() returned ret="
96             << ret << dendl;
97   }
98
99   return len;
100 }
101
102 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
103                                                const size_t size,
104                                                const size_t nmemb,
105                                                void * const _info)
106 {
107   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
108   const size_t len = size * nmemb;
109   int ret = client->receive_data(ptr, size * nmemb);
110   if (ret < 0) {
111     dout(0) << "WARNING: client->receive_data() returned ret="
112             << ret << dendl;
113   }
114
115   return len;
116 }
117
118 size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
119                                             const size_t size,
120                                             const size_t nmemb,
121                                             void * const _info)
122 {
123   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
124   int ret = client->send_data(ptr, size * nmemb);
125   if (ret < 0) {
126     dout(0) << "WARNING: client->send_data() returned ret="
127             << ret << dendl;
128   }
129
130   return ret;
131 }
132
133 /*
134  * the following set of callbacks will be called either on RGWHTTPManager::process(),
135  * or via the RGWHTTPManager async processing.
136  */
137 size_t RGWHTTPClient::receive_http_header(void * const ptr,
138                                           const size_t size,
139                                           const size_t nmemb,
140                                           void * const _info)
141 {
142   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
143   size_t len = size * nmemb;
144
145   Mutex::Locker l(req_data->lock);
146   
147   if (!req_data->registered) {
148     return len;
149   }
150
151   int ret = req_data->client->receive_header(ptr, size * nmemb);
152   if (ret < 0) {
153     dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
154   }
155
156   return len;
157 }
158
159 size_t RGWHTTPClient::receive_http_data(void * const ptr,
160                                         const size_t size,
161                                         const size_t nmemb,
162                                         void * const _info)
163 {
164   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
165   size_t len = size * nmemb;
166
167   Mutex::Locker l(req_data->lock);
168   
169   if (!req_data->registered) {
170     return len;
171   }
172   
173   int ret = req_data->client->receive_data(ptr, size * nmemb);
174   if (ret < 0) {
175     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
176   }
177
178   return len;
179 }
180
181 size_t RGWHTTPClient::send_http_data(void * const ptr,
182                                      const size_t size,
183                                      const size_t nmemb,
184                                      void * const _info)
185 {
186   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
187
188   Mutex::Locker l(req_data->lock);
189   
190   if (!req_data->registered) {
191     return 0;
192   }
193
194   int ret = req_data->client->send_data(ptr, size * nmemb);
195   if (ret < 0) {
196     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
197   }
198
199   return ret;
200 }
201
202 static curl_slist *headers_to_slist(param_vec_t& headers)
203 {
204   curl_slist *h = NULL;
205
206   param_vec_t::iterator iter;
207   for (iter = headers.begin(); iter != headers.end(); ++iter) {
208     pair<string, string>& p = *iter;
209     string val = p.first;
210
211     if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
212       val = val.substr(5);
213     }
214
215     /* we need to convert all underscores into dashes as some web servers forbid them
216      * in the http header field names
217      */
218     for (size_t i = 0; i < val.size(); i++) {
219       if (val[i] == '_') {
220         val[i] = '-';
221       }
222     }
223
224     val.append(": ");
225     val.append(p.second);
226     h = curl_slist_append(h, val.c_str());
227   }
228
229   return h;
230 }
231
232 static bool is_upload_request(const char *method)
233 {
234   if (method == nullptr) {
235     return false;
236   }
237   return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
238 }
239
240 /*
241  * process a single simple one off request, not going through RGWHTTPManager. Not using
242  * req_data.
243  */
244 int RGWHTTPClient::process(const char *method, const char *url)
245 {
246   int ret = 0;
247   CURL *curl_handle;
248
249   char error_buf[CURL_ERROR_SIZE];
250
251   last_method = (method ? method : "");
252   last_url = (url ? url : "");
253
254   curl_handle = curl_easy_init();
255
256   dout(20) << "sending request to " << url << dendl;
257
258   curl_slist *h = headers_to_slist(headers);
259
260   curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
261   curl_easy_setopt(curl_handle, CURLOPT_URL, url);
262   curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
263   curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
264   curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
265   curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
266   curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
267   curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
268   curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
269   if (h) {
270     curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
271   }
272   curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
273   curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
274   if (is_upload_request(method)) {
275     curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
276   }
277   if (has_send_len) {
278     curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); 
279   }
280   if (!verify_ssl) {
281     curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
282     curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
283     dout(20) << "ssl verification is set to off" << dendl;
284   }
285
286   CURLcode status = curl_easy_perform(curl_handle);
287   if (status) {
288     dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
289     ret = -EINVAL;
290   }
291   curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
292   curl_easy_cleanup(curl_handle);
293   curl_slist_free_all(h);
294
295   return ret;
296 }
297
298 string RGWHTTPClient::to_str()
299 {
300   string method_str = (last_method.empty() ? "<no-method>" : last_method);
301   string url_str = (last_url.empty() ? "<no-url>" : last_url);
302   return method_str + " " + url_str;
303 }
304
305 int RGWHTTPClient::get_req_retcode()
306 {
307   if (!req_data) {
308     return -EINVAL;
309   }
310
311   return req_data->get_retcode();
312 }
313
314 /*
315  * init request, will be used later with RGWHTTPManager
316  */
317 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
318 {
319   assert(!req_data);
320   _req_data->get();
321   req_data = _req_data;
322
323   CURL *easy_handle;
324
325   easy_handle = curl_easy_init();
326
327   req_data->easy_handle = easy_handle;
328
329   dout(20) << "sending request to " << url << dendl;
330
331   curl_slist *h = headers_to_slist(headers);
332
333   req_data->h = h;
334
335   last_method = (method ? method : "");
336   last_url = (url ? url : "");
337
338   curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
339   curl_easy_setopt(easy_handle, CURLOPT_URL, url);
340   curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
341   curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
342   curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
343   curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
344   curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
345   curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
346   curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
347   if (h) {
348     curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
349   }
350   curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
351   curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
352   if (send_data_hint || is_upload_request(method)) {
353     curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
354   }
355   if (has_send_len) {
356     curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); 
357   }
358   if (!verify_ssl) {
359     curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
360     curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
361     dout(20) << "ssl verification is set to off" << dendl;
362   }
363   curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
364
365   return 0;
366 }
367
368 /*
369  * wait for async request to complete
370  */
371 int RGWHTTPClient::wait()
372 {
373   if (!req_data->is_done()) {
374     return req_data->wait();
375   }
376
377   return req_data->ret;
378 }
379
380 RGWHTTPClient::~RGWHTTPClient()
381 {
382   if (req_data) {
383     RGWHTTPManager *http_manager = req_data->get_manager();
384     if (http_manager) {
385       http_manager->remove_request(this);
386     }
387
388     req_data->put();
389   }
390 }
391
392
393 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
394 {
395   const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
396
397   /* We're tokening the line that way due to backward compatibility. */
398   const size_t sep_loc = header_line.find_first_of(" \t:");
399
400   if (boost::string_ref::npos == sep_loc) {
401     /* Wrongly formatted header? Just skip it. */
402     return 0;
403   }
404
405   header_name_t name(header_line.substr(0, sep_loc));
406   if (0 == relevant_headers.count(name)) {
407     /* Not interested in this particular header. */
408     return 0;
409   }
410
411   const auto value_part = header_line.substr(sep_loc + 1);
412
413   /* Skip spaces and tabs after the separator. */
414   const size_t val_loc_s = value_part.find_first_not_of(' ');
415   const size_t val_loc_e = value_part.find_first_of("\r\n");
416
417   if (boost::string_ref::npos == val_loc_s ||
418       boost::string_ref::npos == val_loc_e) {
419     /* Empty value case. */
420     found_headers.emplace(name, header_value_t());
421   } else {
422     found_headers.emplace(name, header_value_t(
423         value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
424   }
425
426   return 0;
427 }
428
429 int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
430 {
431   int length_to_copy = 0;
432   if (post_data_index < post_data.length()) {
433     length_to_copy = min(post_data.length() - post_data_index, len);
434     memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
435     post_data_index += length_to_copy;
436   }
437   return length_to_copy;
438 }
439
440
441 static int clear_signal(int fd)
442 {
443   // since we're in non-blocking mode, we can try to read a lot more than
444   // one signal from signal_thread() to avoid later wakeups. non-blocking reads
445   // are also required to support the curl_multi_wait bug workaround
446   std::array<char, 256> buf;
447   int ret = ::read(fd, (void *)buf.data(), buf.size());
448   if (ret < 0) {
449     ret = -errno;
450     return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
451   }
452   return 0;
453 }
454
455 #if HAVE_CURL_MULTI_WAIT
456
457 static std::once_flag detect_flag;
458 static bool curl_multi_wait_bug_present = false;
459
460 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
461                                       int write_fd, int read_fd)
462 {
463   int ret = 0;
464
465   // write to write_fd so that read_fd becomes readable
466   uint32_t buf = 0;
467   ret = ::write(write_fd, &buf, sizeof(buf));
468   if (ret < 0) {
469     ret = -errno;
470     ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
471     return ret;
472   }
473
474   // pass read_fd in extra_fds for curl_multi_wait()
475   int num_fds;
476   struct curl_waitfd wait_fd;
477
478   wait_fd.fd = read_fd;
479   wait_fd.events = CURL_WAIT_POLLIN;
480   wait_fd.revents = 0;
481
482   ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
483   if (ret != CURLM_OK) {
484     ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
485     return -EIO;
486   }
487
488   // curl_multi_wait should flag revents when extra_fd is readable. if it
489   // doesn't, the bug is present and we can't rely on revents
490   if (wait_fd.revents == 0) {
491     curl_multi_wait_bug_present = true;
492     ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
493         "bug in curl_multi_wait(). enabling a workaround that may degrade "
494         "performance slightly." << dendl;
495   }
496
497   return clear_signal(read_fd);
498 }
499
500 static bool is_signaled(const curl_waitfd& wait_fd)
501 {
502   if (wait_fd.fd < 0) {
503     // no fd to signal
504     return false;
505   }
506
507   if (curl_multi_wait_bug_present) {
508     // we can't rely on revents, so we always return true if a wait_fd is given.
509     // this means we'll be trying a non-blocking read on this fd every time that
510     // curl_multi_wait() wakes up
511     return true;
512   }
513
514   return wait_fd.revents > 0;
515 }
516
517 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
518 {
519   int num_fds;
520   struct curl_waitfd wait_fd;
521
522   wait_fd.fd = signal_fd;
523   wait_fd.events = CURL_WAIT_POLLIN;
524   wait_fd.revents = 0;
525
526   int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
527   if (ret) {
528     ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
529     return -EIO;
530   }
531
532   if (is_signaled(wait_fd)) {
533     ret = clear_signal(signal_fd);
534     if (ret < 0) {
535       ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
536       return ret;
537     }
538   }
539   return 0;
540 }
541
542 #else
543
544 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
545 {
546   fd_set fdread;
547   fd_set fdwrite;
548   fd_set fdexcep;
549   int maxfd = -1;
550  
551   FD_ZERO(&fdread);
552   FD_ZERO(&fdwrite);
553   FD_ZERO(&fdexcep);
554
555   /* get file descriptors from the transfers */ 
556   int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
557   if (ret) {
558     ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
559     return -EIO;
560   }
561
562   if (signal_fd > 0) {
563     FD_SET(signal_fd, &fdread);
564     if (signal_fd >= maxfd) {
565       maxfd = signal_fd + 1;
566     }
567   }
568
569   /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
570   uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
571 #define RGW_CURL_TIMEOUT 1000
572   if (!to)
573     to = RGW_CURL_TIMEOUT;
574   struct timeval timeout;
575   timeout.tv_sec = to / 1000;
576   timeout.tv_usec = to % 1000;
577
578   ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
579   if (ret < 0) {
580     ret = -errno;
581     ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
582     return ret;
583   }
584
585   if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
586     ret = clear_signal(signal_fd);
587     if (ret < 0) {
588       ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
589       return ret;
590     }
591   }
592
593   return 0;
594 }
595
596 #endif
597
598 void *RGWHTTPManager::ReqsThread::entry()
599 {
600   manager->reqs_thread_entry();
601   return NULL;
602 }
603
604 /*
605  * RGWHTTPManager has two modes of operation: threaded and non-threaded.
606  */
607 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
608                                                     completion_mgr(_cm), is_threaded(false),
609                                                     reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
610                                                     reqs_thread(NULL)
611 {
612   multi_handle = (void *)curl_multi_init();
613   thread_pipe[0] = -1;
614   thread_pipe[1] = -1;
615 }
616
617 RGWHTTPManager::~RGWHTTPManager() {
618   stop();
619   if (multi_handle)
620     curl_multi_cleanup((CURLM *)multi_handle);
621 }
622
623 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
624 {
625   RWLock::WLocker rl(reqs_lock);
626   req_data->id = num_reqs;
627   req_data->registered = true;
628   reqs[num_reqs] = req_data;
629   num_reqs++;
630   ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
631 }
632
633 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
634 {
635   RWLock::WLocker rl(reqs_lock);
636   req_data->get();
637   req_data->registered = false;
638   unregistered_reqs.push_back(req_data);
639   ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
640 }
641
642 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
643 {
644   RWLock::WLocker rl(reqs_lock);
645   _complete_request(req_data);
646 }
647
648 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
649 {
650   map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
651   if (iter != reqs.end()) {
652     reqs.erase(iter);
653   }
654   {
655     Mutex::Locker l(req_data->lock);
656     req_data->mgr = nullptr;
657   }
658   if (completion_mgr) {
659     completion_mgr->complete(NULL, req_data->user_info);
660   }
661
662   req_data->put();
663 }
664
665 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
666 {
667   req_data->finish(ret);
668   complete_request(req_data);
669 }
670
671 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
672 {
673   req_data->finish(ret);
674   _complete_request(req_data);
675 }
676
677 /*
678  * hook request to the curl multi handle
679  */
680 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
681 {
682   ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
683   CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
684   if (mstatus) {
685     dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
686     return -EIO;
687   }
688   return 0;
689 }
690
691 /*
692  * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
693  * there will be no more processing on this request
694  */
695 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
696 {
697   if (req_data->easy_handle) {
698     curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
699   }
700   if (!req_data->is_done()) {
701     _finish_request(req_data, -ECANCELED);
702   }
703 }
704
705 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
706 {
707   RWLock::WLocker wl(reqs_lock);
708   _unlink_request(req_data);
709 }
710
711 void RGWHTTPManager::manage_pending_requests()
712 {
713   reqs_lock.get_read();
714   if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
715     reqs_lock.unlock();
716     return;
717   }
718   reqs_lock.unlock();
719
720   RWLock::WLocker wl(reqs_lock);
721
722   if (!unregistered_reqs.empty()) {
723     for (auto& r : unregistered_reqs) {
724       _unlink_request(r);
725       r->put();
726     }
727
728     unregistered_reqs.clear();
729   }
730
731   map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
732
733   list<std::pair<rgw_http_req_data *, int> > remove_reqs;
734
735   for (; iter != reqs.end(); ++iter) {
736     rgw_http_req_data *req_data = iter->second;
737     int r = link_request(req_data);
738     if (r < 0) {
739       ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
740       remove_reqs.push_back(std::make_pair(iter->second, r));
741     } else {
742       max_threaded_req = iter->first + 1;
743     }
744   }
745
746   for (auto piter : remove_reqs) {
747     rgw_http_req_data *req_data = piter.first;
748     int r = piter.second;
749
750     _finish_request(req_data, r);
751   }
752 }
753
754 int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
755 {
756   rgw_http_req_data *req_data = new rgw_http_req_data;
757
758   int ret = client->init_request(method, url, req_data, send_data_hint);
759   if (ret < 0) {
760     req_data->put();
761     req_data = NULL;
762     return ret;
763   }
764
765   req_data->mgr = this;
766   req_data->client = client;
767   req_data->user_info = client->get_user_info();
768
769   register_request(req_data);
770
771   if (!is_threaded) {
772     ret = link_request(req_data);
773     if (ret < 0) {
774       req_data->put();
775       req_data = NULL;
776     }
777     return ret;
778   }
779   ret = signal_thread();
780   if (ret < 0) {
781     finish_request(req_data, ret);
782   }
783
784   return ret;
785 }
786
787 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
788 {
789   rgw_http_req_data *req_data = client->get_req_data();
790
791   if (!is_threaded) {
792     unlink_request(req_data);
793     return 0;
794   }
795   unregister_request(req_data);
796   int ret = signal_thread();
797   if (ret < 0) {
798     return ret;
799   }
800
801   return 0;
802 }
803
804 /*
805  * the synchronous, non-threaded request processing method.
806  */
807 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
808 {
809   assert(!is_threaded);
810
811   int still_running;
812   int mstatus;
813
814   do {
815     if (wait_for_data) {
816       int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
817       if (ret < 0) {
818         return ret;
819       }
820     }
821
822     mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
823     switch (mstatus) {
824       case CURLM_OK:
825       case CURLM_CALL_MULTI_PERFORM:
826         break;
827       default:
828         dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
829         return -EINVAL;
830     }
831     int msgs_left;
832     CURLMsg *msg;
833     while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
834       if (msg->msg == CURLMSG_DONE) {
835         CURL *e = msg->easy_handle;
836         rgw_http_req_data *req_data;
837         curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
838
839         long http_status;
840         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
841
842         int status = rgw_http_error_to_errno(http_status);
843         int result = msg->data.result;
844         finish_request(req_data, status);
845         switch (result) {
846           case CURLE_OK:
847             break;
848           default:
849             dout(20) << "ERROR: msg->data.result=" << result << dendl;
850             return -EIO;
851         }
852       }
853     }
854   } while (mstatus == CURLM_CALL_MULTI_PERFORM);
855
856   *done = (still_running == 0);
857
858   return 0;
859 }
860
861 /*
862  * the synchronous, non-threaded request processing completion method.
863  */
864 int RGWHTTPManager::complete_requests()
865 {
866   bool done = false;
867   int ret;
868   do {
869     ret = process_requests(true, &done);
870   } while (!done && !ret);
871
872   return ret;
873 }
874
875 int RGWHTTPManager::set_threaded()
876 {
877   int r = pipe(thread_pipe);
878   if (r < 0) {
879     r = -errno;
880     ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
881     return r;
882   }
883
884   // enable non-blocking reads
885   r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
886   if (r < 0) {
887     r = -errno;
888     ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
889     TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
890     TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
891     return r;
892   }
893
894 #ifdef HAVE_CURL_MULTI_WAIT
895   // on first initialization, use this pipe to detect whether we're using a
896   // buggy version of libcurl
897   std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
898                  static_cast<CURLM*>(multi_handle),
899                  thread_pipe[1], thread_pipe[0]);
900 #endif
901
902   is_threaded = true;
903   reqs_thread = new ReqsThread(this);
904   reqs_thread->create("http_manager");
905   return 0;
906 }
907
908 void RGWHTTPManager::stop()
909 {
910   if (is_stopped) {
911     return;
912   }
913
914   is_stopped = true;
915
916   if (is_threaded) {
917     going_down = true;
918     signal_thread();
919     reqs_thread->join();
920     delete reqs_thread;
921     TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
922     TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
923   }
924 }
925
926 int RGWHTTPManager::signal_thread()
927 {
928   uint32_t buf = 0;
929   int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
930   if (ret < 0) {
931     ret = -errno;
932     ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
933     return ret;
934   }
935   return 0;
936 }
937
938 void *RGWHTTPManager::reqs_thread_entry()
939 {
940   int still_running;
941   int mstatus;
942
943   ldout(cct, 20) << __func__ << ": start" << dendl;
944
945   while (!going_down) {
946     int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
947     if (ret < 0) {
948       dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
949       return NULL;
950     }
951
952     manage_pending_requests();
953
954     mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
955     switch (mstatus) {
956       case CURLM_OK:
957       case CURLM_CALL_MULTI_PERFORM:
958         break;
959       default:
960         dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
961         break;
962     }
963     int msgs_left;
964     CURLMsg *msg;
965     while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
966       if (msg->msg == CURLMSG_DONE) {
967         int result = msg->data.result;
968         CURL *e = msg->easy_handle;
969         rgw_http_req_data *req_data;
970         curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
971         curl_multi_remove_handle((CURLM *)multi_handle, e);
972
973         long http_status;
974         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
975
976         int status = rgw_http_error_to_errno(http_status);
977         if (result != CURLE_OK && http_status == 0) {
978           status = -EAGAIN;
979         }
980         int id = req_data->id;
981         finish_request(req_data, status);
982         switch (result) {
983           case CURLE_OK:
984             break;
985           default:
986             dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
987             break;
988         }
989       }
990     }
991   }
992
993
994   RWLock::WLocker rl(reqs_lock);
995   for (auto r : unregistered_reqs) {
996     _finish_request(r, -ECANCELED);
997   }
998
999   unregistered_reqs.clear();
1000
1001   auto all_reqs = std::move(reqs);
1002   for (auto iter : all_reqs) {
1003     _finish_request(iter.second, -ECANCELED);
1004   }
1005
1006   reqs.clear();
1007
1008   if (completion_mgr) {
1009     completion_mgr->go_down();
1010   }
1011   
1012   return 0;
1013 }
1014
1015