Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_rest_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_common.h"
5 #include "rgw_rest_client.h"
6 #include "rgw_auth_s3.h"
7 #include "rgw_http_errors.h"
8 #include "rgw_rados.h"
9
10 #include "common/ceph_crypto_cms.h"
11 #include "common/armor.h"
12 #include "common/strtol.h"
13 #include "include/str_list.h"
14 #include "rgw_crypt_sanitize.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_rgw
18
19 int RGWRESTSimpleRequest::get_status()
20 {
21   int retcode = get_req_retcode();
22   if (retcode < 0) {
23     return retcode;
24   }
25   return status;
26 }
27
28 int RGWRESTSimpleRequest::handle_header(const string& name, const string& val) 
29 {
30   if (name == "CONTENT_LENGTH") {
31     string err;
32     long len = strict_strtol(val.c_str(), 10, &err);
33     if (!err.empty()) {
34       ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
35       return -EINVAL;
36     }
37
38     max_response = len;
39   }
40
41   return 0;
42 }
43
44 int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
45 {
46   char line[len + 1];
47
48   char *s = (char *)ptr, *end = (char *)ptr + len;
49   char *p = line;
50   ldout(cct, 10) << "receive_http_header" << dendl;
51
52   while (s != end) {
53     if (*s == '\r') {
54       s++;
55       continue;
56     }
57     if (*s == '\n') {
58       *p = '\0';
59       ldout(cct, 10) << "received header:" << line << dendl;
60       // TODO: fill whatever data required here
61       char *l = line;
62       char *tok = strsep(&l, " \t:");
63       if (tok && l) {
64         while (*l == ' ')
65           l++;
66  
67         if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
68           http_status = atoi(l);
69           if (http_status == 100) /* 100-continue response */
70             continue;
71           status = rgw_http_error_to_errno(http_status);
72         } else {
73           /* convert header field name to upper case  */
74           char *src = tok;
75           char buf[len + 1];
76           size_t i;
77           for (i = 0; i < len && *src; ++i, ++src) {
78             switch (*src) {
79               case '-':
80                 buf[i] = '_';
81                 break;
82               default:
83                 buf[i] = toupper(*src);
84             }
85           }
86           buf[i] = '\0';
87           out_headers[buf] = l;
88           int r = handle_header(buf, l);
89           if (r < 0)
90             return r;
91         }
92       }
93     }
94     if (s != end)
95       *p++ = *s++;
96   }
97   return 0;
98 }
99
100 static void get_new_date_str(string& date_str)
101 {
102   date_str = rgw_to_asctime(ceph_clock_now());
103 }
104
105 int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
106 {
107   string new_url = url;
108   string new_resource = resource;
109
110   if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
111     new_url = new_url.substr(0, new_url.size() - 1);
112   } else if (resource[0] != '/') {
113     new_resource = "/";
114     new_resource.append(resource);
115   }
116   new_url.append(new_resource);
117
118   string date_str;
119   get_new_date_str(date_str);
120   headers.push_back(pair<string, string>("HTTP_DATE", date_str));
121
122   string canonical_header;
123   map<string, string> meta_map;
124   map<string, string> sub_resources;
125   rgw_create_s3_canonical_header(method, NULL, NULL, date_str.c_str(),
126                             meta_map, new_url.c_str(), sub_resources,
127                             canonical_header);
128
129   string digest;
130   try {
131     digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
132   } catch (int ret) {
133     return ret;
134   }
135
136   string auth_hdr = "AWS " + key.id + ":" + digest;
137
138   ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
139
140   headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
141   int r = process(method, new_url.c_str());
142   if (r < 0)
143     return r;
144
145   return status;
146 }
147
148 int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
149 {
150   if (!send_iter)
151     return 0;
152
153   if (len > send_iter->get_remaining())
154     len = send_iter->get_remaining();
155
156   send_iter->copy(len, (char *)ptr);
157
158   return len;
159 }
160
161 int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
162 {
163   size_t cp_len, left_len;
164
165   left_len = max_response > response.length() ? (max_response - response.length()) : 0;
166   if (left_len == 0)
167     return 0; /* don't read extra data */
168
169   cp_len = (len > left_len) ? left_len : len;
170   bufferptr p((char *)ptr, cp_len);
171
172   response.append(p);
173
174   return 0;
175
176 }
177
178 void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
179 {
180   if (dest.empty()) {
181     dest.append("?");
182   } else {
183     dest.append("&");
184   }
185   string url_name;
186   url_encode(name, url_name);
187   dest.append(url_name);
188
189   if (!val.empty()) {
190     string url_val;
191     url_encode(val, url_val);
192     dest.append("=");
193     dest.append(url_val);
194   }
195 }
196
197 void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
198 {
199   map<string, string>::iterator miter;
200   for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
201     append_param(dest, miter->first, miter->second);
202   }
203   param_vec_t::iterator iter;
204   for (iter = params.begin(); iter != params.end(); ++iter) {
205     append_param(dest, iter->first, iter->second);
206   }
207 }
208
209 int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
210 {
211   /* don't sign if no key is provided */
212   if (key.key.empty()) {
213     return 0;
214   }
215
216   if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
217     for (const auto& i: env.get_map()) {
218       ldout(cct, 20) << "> " << i.first << " -> " << rgw::crypt_sanitize::x_meta_map{i.first, i.second} << dendl;
219     }
220   }
221
222   string canonical_header;
223   if (!rgw_create_s3_canonical_header(info, NULL, canonical_header, false)) {
224     ldout(cct, 0) << "failed to create canonical s3 header" << dendl;
225     return -EINVAL;
226   }
227
228   ldout(cct, 10) << "generated canonical header: " << canonical_header << dendl;
229
230   string digest;
231   try {
232     digest = rgw::auth::s3::get_v2_signature(cct, key.key, canonical_header);
233   } catch (int ret) {
234     return ret;
235   }
236
237   string auth_hdr = "AWS " + key.id + ":" + digest;
238   ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
239   
240   env.set("AUTHORIZATION", auth_hdr);
241
242   return 0;
243 }
244
245 int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
246 {
247
248   string date_str;
249   get_new_date_str(date_str);
250
251   RGWEnv new_env;
252   req_info new_info(cct, &new_env);
253   new_info.rebuild_from(info);
254
255   new_env.set("HTTP_DATE", date_str.c_str());
256
257   int ret = sign_request(key, new_env, new_info);
258   if (ret < 0) {
259     ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
260     return ret;
261   }
262
263   for (const auto& kv: new_env.get_map()) {
264     headers.emplace_back(kv);
265   }
266
267   map<string, string>& meta_map = new_info.x_meta_map;
268   for (const auto& kv: meta_map) {
269     headers.emplace_back(kv);
270   }
271
272   string params_str;
273   get_params_str(info.args.get_params(), params_str);
274
275   string new_url = url;
276   string& resource = new_info.request_uri;
277   string new_resource = resource;
278   if (new_url[new_url.size() - 1] == '/' && resource[0] == '/') {
279     new_url = new_url.substr(0, new_url.size() - 1);
280   } else if (resource[0] != '/') {
281     new_resource = "/";
282     new_resource.append(resource);
283   }
284   new_url.append(new_resource + params_str);
285
286   bufferlist::iterator bliter;
287
288   if (inbl) {
289     bliter = inbl->begin();
290     send_iter = &bliter;
291
292     set_send_length(inbl->length());
293   }
294
295   int r = process(new_info.method, new_url.c_str());
296   if (r < 0){
297     if (r == -EINVAL){
298       // curl_easy has errored, generally means the service is not available
299       r = -ERR_SERVICE_UNAVAILABLE;
300     }
301     return r;
302   }
303
304   response.append((char)0); /* NULL terminate response */
305
306   if (outbl) {
307     outbl->claim(response);
308   }
309
310   return status;
311 }
312
313 class RGWRESTStreamOutCB : public RGWGetDataCB {
314   RGWRESTStreamWriteRequest *req;
315 public:
316   explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
317   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
318 };
319
320 int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
321 {
322   dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
323   if (!bl_ofs && bl_len == bl.length()) {
324     return req->add_output_data(bl);
325   }
326
327   bufferptr bp(bl.c_str() + bl_ofs, bl_len);
328   bufferlist new_bl;
329   new_bl.push_back(bp);
330
331   return req->add_output_data(new_bl);
332 }
333
334 RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
335 {
336   delete cb;
337 }
338
339 int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
340 {
341   lock.Lock();
342   if (status < 0) {
343     int ret = status;
344     lock.Unlock();
345     return ret;
346   }
347   pending_send.push_back(bl);
348   lock.Unlock();
349
350   bool done;
351   return http_manager.process_requests(false, &done);
352 }
353
354 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
355 {
356   string& s = grants_by_type[perm];
357
358   if (!s.empty())
359     s.append(", ");
360
361   string id_type_str;
362   ACLGranteeType& type = grant.get_type();
363   switch (type.get_type()) {
364     case ACL_TYPE_GROUP:
365       id_type_str = "uri";
366       break;
367     case ACL_TYPE_EMAIL_USER:
368       id_type_str = "emailAddress";
369       break;
370     default:
371       id_type_str = "id";
372   }
373   rgw_user id;
374   grant.get_id(id);
375   s.append(id_type_str + "=\"" + id.to_str() + "\"");
376 }
377
378 struct grant_type_to_header {
379   int type;
380   const char *header;
381 };
382
383 struct grant_type_to_header grants_headers_def[] = {
384   { RGW_PERM_FULL_CONTROL, "x-amz-grant-full-control"},
385   { RGW_PERM_READ,         "x-amz-grant-read"},
386   { RGW_PERM_WRITE,        "x-amz-grant-write"},
387   { RGW_PERM_READ_ACP,     "x-amz-grant-read-acp"},
388   { RGW_PERM_WRITE_ACP,    "x-amz-grant-write-acp"},
389   { 0, NULL}
390 };
391
392 static bool grants_by_type_check_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant, int check_perm)
393 {
394   if ((perm & check_perm) == check_perm) {
395     grants_by_type_add_one_grant(grants_by_type, check_perm, grant);
396     return true;
397   }
398   return false;
399 }
400
401 static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
402 {
403   struct grant_type_to_header *t;
404
405   for (t = grants_headers_def; t->header; t++) {
406     if (grants_by_type_check_perm(grants_by_type, perm, grant, t->type))
407       return;
408   }
409 }
410
411 static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
412 {
413   struct grant_type_to_header *t;
414
415   for (t = grants_headers_def; t->header; t++) {
416     map<int, string>::iterator iter = grants.find(t->type);
417     if (iter != grants.end()) {
418       env.set(t->header,iter->second);
419       meta_map[t->header] = iter->second;
420     }
421   }
422 }
423
424 int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
425 {
426   string resource = obj.bucket.name + "/" + obj.get_oid();
427   string new_url = url;
428   if (new_url[new_url.size() - 1] != '/')
429     new_url.append("/");
430
431   string date_str;
432   get_new_date_str(date_str);
433
434   RGWEnv new_env;
435   req_info new_info(cct, &new_env);
436   
437   string params_str;
438   map<string, string>& args = new_info.args.get_params();
439   get_params_str(args, params_str);
440
441   new_url.append(resource + params_str);
442
443   new_env.set("HTTP_DATE", date_str.c_str());
444
445   new_info.method = "PUT";
446
447   new_info.script_uri = "/";
448   new_info.script_uri.append(resource);
449   new_info.request_uri = new_info.script_uri;
450
451   /* merge send headers */
452   for (auto& attr: attrs) {
453     bufferlist& bl = attr.second;
454     const string& name = attr.first;
455     string val = bl.c_str();
456     if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
457       string header_name = RGW_AMZ_META_PREFIX;
458       header_name.append(name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1));
459       new_env.set(header_name, val);
460       new_info.x_meta_map[header_name] = val;
461     }
462   }
463   RGWAccessControlPolicy policy;
464   int ret = rgw_policy_from_attrset(cct, attrs, &policy);
465   if (ret < 0) {
466     ldout(cct, 0) << "ERROR: couldn't get policy ret=" << ret << dendl;
467     return ret;
468   }
469
470   /* update acl headers */
471   RGWAccessControlList& acl = policy.get_acl();
472   multimap<string, ACLGrant>& grant_map = acl.get_grant_map();
473   multimap<string, ACLGrant>::iterator giter;
474   map<int, string> grants_by_type;
475   for (giter = grant_map.begin(); giter != grant_map.end(); ++giter) {
476     ACLGrant& grant = giter->second;
477     ACLPermission& perm = grant.get_permission();
478     grants_by_type_add_perm(grants_by_type, perm.get_permissions(), grant);
479   }
480   add_grants_headers(grants_by_type, new_env, new_info.x_meta_map);
481   ret = sign_request(key, new_env, new_info);
482   if (ret < 0) {
483     ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
484     return ret;
485   }
486
487   for (const auto& kv: new_env.get_map()) {
488     headers.emplace_back(kv);
489   }
490
491   cb = new RGWRESTStreamOutCB(this);
492
493   set_send_length(obj_size);
494
495   int r = http_manager.add_request(this, new_info.method, new_url.c_str());
496   if (r < 0)
497     return r;
498
499   return 0;
500 }
501
502 int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
503 {
504   uint64_t sent = 0;
505
506   dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
507   lock.Lock();
508   if (pending_send.empty() || status < 0) {
509     lock.Unlock();
510     return status;
511   }
512
513   list<bufferlist>::iterator iter = pending_send.begin();
514   while (iter != pending_send.end() && len > 0) {
515     bufferlist& bl = *iter;
516     
517     list<bufferlist>::iterator next_iter = iter;
518     ++next_iter;
519     lock.Unlock();
520
521     uint64_t send_len = min(len, (size_t)bl.length());
522
523     memcpy(ptr, bl.c_str(), send_len);
524
525     ptr = (char *)ptr + send_len;
526     len -= send_len;
527     sent += send_len;
528
529     lock.Lock();
530
531     bufferlist new_bl;
532     if (bl.length() > send_len) {
533       bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
534       new_bl.append(bp);
535     }
536     pending_send.pop_front(); /* need to do this after we copy data from bl */
537     if (new_bl.length()) {
538       pending_send.push_front(new_bl);
539     }
540     iter = next_iter;
541   }
542   lock.Unlock();
543
544   return sent;
545 }
546
547
548 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
549 {
550   map<string, string>::iterator iter = out_headers.find(header_name);
551   if (iter != out_headers.end()) {
552     str = iter->second;
553   } else {
554     str.clear();
555   }
556 }
557
558 static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
559 {
560   string err;
561   vector<string> vec;
562
563   get_str_vec(s, ".", vec);
564
565   if (vec.empty()) {
566     return -EINVAL;
567   }
568
569   long secs = strict_strtol(vec[0].c_str(), 10, &err);
570   long nsecs = 0;
571   if (!err.empty()) {
572     ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
573     return -EINVAL;
574   }
575
576   if (vec.size() > 1) {
577     nsecs = strict_strtol(vec[1].c_str(), 10, &err);
578     if (!err.empty()) {
579       ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
580       return -EINVAL;
581     }
582   }
583
584   *rt = utime_t(secs, nsecs).to_real_time();
585
586   return 0;
587 }
588
589 int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
590 {
591   int ret = http_manager.complete_requests();
592   if (ret < 0)
593     return ret;
594
595   set_str_from_headers(out_headers, "ETAG", etag);
596
597   if (mtime) {
598     string mtime_str;
599     set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
600
601     ret = parse_rgwx_mtime(cct, mtime_str, mtime);
602     if (ret < 0) {
603       return ret;
604     }
605   }
606   return status;
607 }
608
609 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
610 {
611   string urlsafe_bucket, urlsafe_object;
612   url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
613   url_encode(obj.key.name, urlsafe_object);
614   string resource = urlsafe_bucket + "/" + urlsafe_object;
615
616   return send_request(&key, extra_headers, resource, nullptr, mgr);
617 }
618
619 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
620                                          bufferlist *send_data, RGWHTTPManager *mgr)
621 {
622   string new_url = url;
623   if (new_url[new_url.size() - 1] != '/')
624     new_url.append("/");
625
626   string date_str;
627   get_new_date_str(date_str);
628
629   RGWEnv new_env;
630   req_info new_info(cct, &new_env);
631   
632   string params_str;
633   map<string, string>& args = new_info.args.get_params();
634   get_params_str(args, params_str);
635
636   /* merge params with extra args so that we can sign correctly */
637   for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
638     new_info.args.append(iter->first, iter->second);
639   }
640
641   string new_resource;
642   if (resource[0] == '/') {
643     new_resource = resource.substr(1);
644   } else {
645     new_resource = resource;
646   }
647
648   new_url.append(new_resource + params_str);
649
650   new_env.set("HTTP_DATE", date_str.c_str());
651
652   for (map<string, string>::iterator iter = extra_headers.begin();
653        iter != extra_headers.end(); ++iter) {
654     new_env.set(iter->first.c_str(), iter->second.c_str());
655   }
656
657   new_info.method = method;
658
659   new_info.script_uri = "/";
660   new_info.script_uri.append(new_resource);
661   new_info.request_uri = new_info.script_uri;
662
663   new_info.init_meta_info(NULL);
664
665   if (key) {
666     int ret = sign_request(*key, new_env, new_info);
667     if (ret < 0) {
668       ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
669       return ret;
670     }
671   }
672
673   for (const auto& kv: new_env.get_map()) {
674     headers.emplace_back(kv);
675   }
676
677   bool send_data_hint = false;
678   if (send_data) {
679     outbl.claim(*send_data);
680     send_data_hint = true;
681   }
682
683   RGWHTTPManager *pmanager = &http_manager;
684   if (mgr) {
685     pmanager = mgr;
686   }
687
688   int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
689   if (r < 0)
690     return r;
691
692   if (!mgr) {
693     r = pmanager->complete_requests();
694     if (r < 0)
695       return r;
696   }
697
698   return 0;
699 }
700
701 int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
702 {
703   set_str_from_headers(out_headers, "ETAG", etag);
704   if (status >= 0) {
705     if (mtime) {
706       string mtime_str;
707       set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
708       if (!mtime_str.empty()) {
709         int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
710         if (ret < 0) {
711           return ret;
712         }
713       } else {
714         *mtime = real_time();
715       }
716     }
717     if (psize) {
718       string size_str;
719       set_str_from_headers(out_headers, "RGWX_OBJECT_SIZE", size_str);
720       string err;
721       *psize = strict_strtoll(size_str.c_str(), 10, &err);
722       if (!err.empty()) {
723         ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
724         return -EIO;
725       }
726     }
727   }
728
729   map<string, string>::iterator iter;
730   for (iter = out_headers.begin(); iter != out_headers.end(); ++iter) {
731     const string& attr_name = iter->first;
732     if (attr_name.compare(0, sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1, RGW_HTTP_RGWX_ATTR_PREFIX) == 0) {
733       string name = attr_name.substr(sizeof(RGW_HTTP_RGWX_ATTR_PREFIX) - 1);
734       const char *src = name.c_str();
735       char buf[name.size() + 1];
736       char *dest = buf;
737       for (; *src; ++src, ++dest) {
738         switch(*src) {
739           case '_':
740             *dest = '-';
741             break;
742           default:
743             *dest = tolower(*src);
744         }
745       }
746       *dest = '\0';
747       attrs[buf] = iter->second;
748     }
749   }
750   return status;
751 }
752
753 int RGWRESTStreamRWRequest::handle_header(const string& name, const string& val)
754 {
755   if (name == "RGWX_EMBEDDED_METADATA_LEN") {
756     string err;
757     long len = strict_strtol(val.c_str(), 10, &err);
758     if (!err.empty()) {
759       ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
760       return -EINVAL;
761     }
762
763     cb->set_extra_data_len(len);
764   }
765   return 0;
766 }
767
768 int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
769 {
770   bufferptr bp((const char *)ptr, len);
771   bufferlist bl;
772   bl.append(bp);
773   int ret = cb->handle_data(bl, ofs, len);
774   if (ret < 0)
775     return ret;
776   ofs += len;
777   return len;
778 }
779
780 int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
781 {
782   if (outbl.length() == 0) {
783     return 0;
784   }
785
786   uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
787   if (send_size > 0) {
788     memcpy(ptr, outbl.c_str() + write_ofs, send_size);
789     write_ofs += send_size;
790   }
791   return send_size;
792 }
793
794 class StreamIntoBufferlist : public RGWGetDataCB {
795   bufferlist& bl;
796 public:
797   StreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
798   int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
799     bl.claim_append(inbl);
800     return bl_len;
801   }
802 };
803