Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_op.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <system_error>
7 #include <unistd.h>
8
9 #include <sstream>
10
11 #include <boost/algorithm/string/predicate.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/optional.hpp>
14 #include <boost/utility/in_place_factory.hpp>
15 #include <boost/utility/string_view.hpp>
16
17 #include "common/Clock.h"
18 #include "common/armor.h"
19 #include "common/backport14.h"
20 #include "common/errno.h"
21 #include "common/mime.h"
22 #include "common/utf8.h"
23 #include "common/ceph_json.h"
24
25 #include "rgw_rados.h"
26 #include "rgw_op.h"
27 #include "rgw_rest.h"
28 #include "rgw_acl.h"
29 #include "rgw_acl_s3.h"
30 #include "rgw_acl_swift.h"
31 #include "rgw_user.h"
32 #include "rgw_bucket.h"
33 #include "rgw_log.h"
34 #include "rgw_multi.h"
35 #include "rgw_multi_del.h"
36 #include "rgw_cors.h"
37 #include "rgw_cors_s3.h"
38 #include "rgw_rest_conn.h"
39 #include "rgw_rest_s3.h"
40 #include "rgw_tar.h"
41 #include "rgw_client_io.h"
42 #include "rgw_compression.h"
43 #include "rgw_role.h"
44 #include "rgw_tag_s3.h"
45 #include "cls/lock/cls_lock_client.h"
46 #include "cls/rgw/cls_rgw_client.h"
47
48
49 #include "include/assert.h"
50
51 #include "compressor/Compressor.h"
52
53 #include "rgw_acl_swift.h"
54
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_rgw
57
58 using namespace std;
59 using namespace librados;
60 using ceph::crypto::MD5;
61 using boost::optional;
62 using boost::none;
63
64 using rgw::IAM::ARN;
65 using rgw::IAM::Effect;
66 using rgw::IAM::Policy;
67
68 using rgw::IAM::Policy;
69
70 static string mp_ns = RGW_OBJ_NS_MULTIPART;
71 static string shadow_ns = RGW_OBJ_NS_SHADOW;
72
73 static void forward_req_info(CephContext *cct, req_info& info, const std::string& bucket_name);
74 static int forward_request_to_master(struct req_state *s, obj_version *objv, RGWRados *store,
75                                      bufferlist& in_data, JSONParser *jp, req_info *forward_info = nullptr);
76
77 static MultipartMetaFilter mp_filter;
78
79 static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_content)
80 {
81   int r = -ERANGE;
82   string s(range);
83   string ofs_str;
84   string end_str;
85
86   *partial_content = false;
87
88   size_t pos = s.find("bytes=");
89   if (pos == string::npos) {
90     pos = 0;
91     while (isspace(s[pos]))
92       pos++;
93     int end = pos;
94     while (isalpha(s[end]))
95       end++;
96     if (strncasecmp(s.c_str(), "bytes", end - pos) != 0)
97       return 0;
98     while (isspace(s[end]))
99       end++;
100     if (s[end] != '=')
101       return 0;
102     s = s.substr(end + 1);
103   } else {
104     s = s.substr(pos + 6); /* size of("bytes=")  */
105   }
106   pos = s.find('-');
107   if (pos == string::npos)
108     goto done;
109
110   *partial_content = true;
111
112   ofs_str = s.substr(0, pos);
113   end_str = s.substr(pos + 1);
114   if (end_str.length()) {
115     end = atoll(end_str.c_str());
116     if (end < 0)
117       goto done;
118   }
119
120   if (ofs_str.length()) {
121     ofs = atoll(ofs_str.c_str());
122   } else { // RFC2616 suffix-byte-range-spec
123     ofs = -end;
124     end = -1;
125   }
126
127   if (end >= 0 && end < ofs)
128     goto done;
129
130   r = 0;
131 done:
132   return r;
133 }
134
135 static int decode_policy(CephContext *cct,
136                          bufferlist& bl,
137                          RGWAccessControlPolicy *policy)
138 {
139   bufferlist::iterator iter = bl.begin();
140   try {
141     policy->decode(iter);
142   } catch (buffer::error& err) {
143     ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
144     return -EIO;
145   }
146   if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
147     RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy);
148     ldout(cct, 15) << __func__ << " Read AccessControlPolicy";
149     s3policy->to_xml(*_dout);
150     *_dout << dendl;
151   }
152   return 0;
153 }
154
155
156 static int get_user_policy_from_attr(CephContext * const cct,
157                                      RGWRados * const store,
158                                      map<string, bufferlist>& attrs,
159                                      RGWAccessControlPolicy& policy    /* out */)
160 {
161   auto aiter = attrs.find(RGW_ATTR_ACL);
162   if (aiter != attrs.end()) {
163     int ret = decode_policy(cct, aiter->second, &policy);
164     if (ret < 0) {
165       return ret;
166     }
167   } else {
168     return -ENOENT;
169   }
170
171   return 0;
172 }
173
174 static int get_bucket_instance_policy_from_attr(CephContext *cct,
175                                                 RGWRados *store,
176                                                 RGWBucketInfo& bucket_info,
177                                                 map<string, bufferlist>& bucket_attrs,
178                                                 RGWAccessControlPolicy *policy,
179                                                 rgw_raw_obj& obj)
180 {
181   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_ACL);
182
183   if (aiter != bucket_attrs.end()) {
184     int ret = decode_policy(cct, aiter->second, policy);
185     if (ret < 0)
186       return ret;
187   } else {
188     ldout(cct, 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
189     RGWUserInfo uinfo;
190     /* object exists, but policy is broken */
191     int r = rgw_get_user_info_by_uid(store, bucket_info.owner, uinfo);
192     if (r < 0)
193       return r;
194
195     policy->create_default(bucket_info.owner, uinfo.display_name);
196   }
197   return 0;
198 }
199
200 static int get_obj_policy_from_attr(CephContext *cct,
201                                     RGWRados *store,
202                                     RGWObjectCtx& obj_ctx,
203                                     RGWBucketInfo& bucket_info,
204                                     map<string, bufferlist>& bucket_attrs,
205                                     RGWAccessControlPolicy *policy,
206                                     rgw_obj& obj)
207 {
208   bufferlist bl;
209   int ret = 0;
210
211   RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
212   RGWRados::Object::Read rop(&op_target);
213
214   ret = rop.get_attr(RGW_ATTR_ACL, bl);
215   if (ret >= 0) {
216     ret = decode_policy(cct, bl, policy);
217     if (ret < 0)
218       return ret;
219   } else if (ret == -ENODATA) {
220     /* object exists, but policy is broken */
221     ldout(cct, 0) << "WARNING: couldn't find acl header for object, generating default" << dendl;
222     RGWUserInfo uinfo;
223     ret = rgw_get_user_info_by_uid(store, bucket_info.owner, uinfo);
224     if (ret < 0)
225       return ret;
226
227     policy->create_default(bucket_info.owner, uinfo.display_name);
228   }
229   return ret;
230 }
231
232
233 /**
234  * Get the AccessControlPolicy for an object off of disk.
235  * policy: must point to a valid RGWACL, and will be filled upon return.
236  * bucket: name of the bucket containing the object.
237  * object: name of the object to get the ACL for.
238  * Returns: 0 on success, -ERR# otherwise.
239  */
240 static int get_bucket_policy_from_attr(CephContext *cct,
241                                        RGWRados *store,
242                                        RGWBucketInfo& bucket_info,
243                                        map<string, bufferlist>& bucket_attrs,
244                                        RGWAccessControlPolicy *policy)
245 {
246   rgw_raw_obj instance_obj;
247   store->get_bucket_instance_obj(bucket_info.bucket, instance_obj);
248   return get_bucket_instance_policy_from_attr(cct, store, bucket_info, bucket_attrs,
249                                               policy, instance_obj);
250 }
251
252 static optional<Policy> get_iam_policy_from_attr(CephContext* cct,
253                                                  RGWRados* store,
254                                                  map<string, bufferlist>& attrs,
255                                                  const string& tenant) {
256   auto i = attrs.find(RGW_ATTR_IAM_POLICY);
257   if (i != attrs.end()) {
258     return Policy(cct, tenant, i->second);
259   } else {
260     return none;
261   }
262 }
263
264 static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
265 {
266   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
267   RGWRados::Object::Read read_op(&op_target);
268
269   read_op.params.attrs = &attrs;
270
271   return read_op.prepare();
272 }
273
274 static int modify_obj_attr(RGWRados *store, struct req_state *s, rgw_obj& obj, const char* attr_name, bufferlist& attr_val)
275 {
276   map<string, bufferlist> attrs;
277   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
278   RGWRados::Object::Read read_op(&op_target);
279
280   read_op.params.attrs = &attrs;
281   
282   int r = read_op.prepare();
283   if (r < 0) {
284     return r;
285   }
286   store->set_atomic(s->obj_ctx, read_op.state.obj);
287   attrs[attr_name] = attr_val;
288   return store->set_attrs(s->obj_ctx, s->bucket_info, read_op.state.obj, attrs, NULL);
289 }
290
291 static int get_system_obj_attrs(RGWRados *store, struct req_state *s, rgw_raw_obj& obj, map<string, bufferlist>& attrs,
292                          uint64_t *obj_size, RGWObjVersionTracker *objv_tracker)
293 {
294   RGWRados::SystemObject src(store, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
295   RGWRados::SystemObject::Read rop(&src);
296
297   rop.stat_params.attrs = &attrs;
298   rop.stat_params.obj_size = obj_size;
299
300   int ret = rop.stat(objv_tracker);
301   return ret;
302 }
303
304 static int read_bucket_policy(RGWRados *store,
305                               struct req_state *s,
306                               RGWBucketInfo& bucket_info,
307                               map<string, bufferlist>& bucket_attrs,
308                               RGWAccessControlPolicy *policy,
309                               rgw_bucket& bucket)
310 {
311   if (!s->system_request && bucket_info.flags & BUCKET_SUSPENDED) {
312     ldout(s->cct, 0) << "NOTICE: bucket " << bucket_info.bucket.name << " is suspended" << dendl;
313     return -ERR_USER_SUSPENDED;
314   }
315
316   if (bucket.name.empty()) {
317     return 0;
318   }
319
320   int ret = get_bucket_policy_from_attr(s->cct, store, bucket_info, bucket_attrs, policy);
321   if (ret == -ENOENT) {
322       ret = -ERR_NO_SUCH_BUCKET;
323   }
324
325   return ret;
326 }
327
328 static int read_obj_policy(RGWRados *store,
329                            struct req_state *s,
330                            RGWBucketInfo& bucket_info,
331                            map<string, bufferlist>& bucket_attrs,
332                            RGWAccessControlPolicy* acl,
333                            optional<Policy>& policy,
334                            rgw_bucket& bucket,
335                            rgw_obj_key& object)
336 {
337   string upload_id;
338   upload_id = s->info.args.get("uploadId");
339   rgw_obj obj;
340
341   if (!s->system_request && bucket_info.flags & BUCKET_SUSPENDED) {
342     ldout(s->cct, 0) << "NOTICE: bucket " << bucket_info.bucket.name << " is suspended" << dendl;
343     return -ERR_USER_SUSPENDED;
344   }
345
346   if (!upload_id.empty()) {
347     /* multipart upload */
348     RGWMPObj mp(object.name, upload_id);
349     string oid = mp.get_meta();
350     obj.init_ns(bucket, oid, mp_ns);
351     obj.set_in_extra_data(true);
352   } else {
353     obj = rgw_obj(bucket, object);
354   }
355   policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs, bucket.tenant);
356
357   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
358   int ret = get_obj_policy_from_attr(s->cct, store, *obj_ctx,
359                                      bucket_info, bucket_attrs, acl, obj);
360   if (ret == -ENOENT) {
361     /* object does not exist checking the bucket's ACL to make sure
362        that we send a proper error code */
363     RGWAccessControlPolicy bucket_policy(s->cct);
364     ret = get_bucket_policy_from_attr(s->cct, store, bucket_info, bucket_attrs, &bucket_policy);
365     if (ret < 0) {
366       return ret;
367     }
368
369     const rgw_user& bucket_owner = bucket_policy.get_owner().get_id();
370     if (bucket_owner.compare(s->user->user_id) != 0 &&
371         ! s->auth.identity->is_admin_of(bucket_owner) &&
372         ! bucket_policy.verify_permission(*s->auth.identity, s->perm_mask,
373                                           RGW_PERM_READ)) {
374       ret = -EACCES;
375     } else {
376       ret = -ENOENT;
377     }
378   }
379
380   return ret;
381 }
382
383 /**
384  * Get the AccessControlPolicy for an user, bucket or object off of disk.
385  * s: The req_state to draw information from.
386  * only_bucket: If true, reads the user and bucket ACLs rather than the object ACL.
387  * Returns: 0 on success, -ERR# otherwise.
388  */
389 int rgw_build_bucket_policies(RGWRados* store, struct req_state* s)
390 {
391   int ret = 0;
392   rgw_obj_key obj;
393   RGWUserInfo bucket_owner_info;
394   RGWObjectCtx obj_ctx(store);
395
396   string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
397   if (!bi.empty()) {
398     ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &s->bucket_instance_shard_id);
399     if (ret < 0) {
400       return ret;
401     }
402   }
403
404   if(s->dialect.compare("s3") == 0) {
405     s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy_S3>(s->cct);
406   } else if(s->dialect.compare("swift")  == 0) {
407     /* We aren't allocating the account policy for those operations using
408      * the Swift's infrastructure that don't really need req_state::user.
409      * Typical example here is the implementation of /info. */
410     if (!s->user->user_id.empty()) {
411       s->user_acl = ceph::make_unique<RGWAccessControlPolicy_SWIFTAcct>(s->cct);
412     }
413     s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy_SWIFT>(s->cct);
414   } else {
415     s->bucket_acl = ceph::make_unique<RGWAccessControlPolicy>(s->cct);
416   }
417
418   /* check if copy source is within the current domain */
419   if (!s->src_bucket_name.empty()) {
420     RGWBucketInfo source_info;
421
422     if (s->bucket_instance_id.empty()) {
423       ret = store->get_bucket_info(obj_ctx, s->src_tenant_name, s->src_bucket_name, source_info, NULL);
424     } else {
425       ret = store->get_bucket_instance_info(obj_ctx, s->bucket_instance_id, source_info, NULL, NULL);
426     }
427     if (ret == 0) {
428       string& zonegroup = source_info.zonegroup;
429       s->local_source = store->get_zonegroup().equals(zonegroup);
430     }
431   }
432
433   struct {
434     rgw_user uid;
435     std::string display_name;
436   } acct_acl_user = {
437     s->user->user_id,
438     s->user->display_name,
439   };
440
441   if (!s->bucket_name.empty()) {
442     s->bucket_exists = true;
443     if (s->bucket_instance_id.empty()) {
444       ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, s->bucket_name, s->bucket_info, NULL, &s->bucket_attrs);
445     } else {
446       ret = store->get_bucket_instance_info(obj_ctx, s->bucket_instance_id, s->bucket_info, NULL, &s->bucket_attrs);
447     }
448     if (ret < 0) {
449       if (ret != -ENOENT) {
450         string bucket_log;
451         rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, bucket_log);
452         ldout(s->cct, 0) << "NOTICE: couldn't get bucket from bucket_name (name=" << bucket_log << ")" << dendl;
453         return ret;
454       }
455       s->bucket_exists = false;
456     }
457     s->bucket = s->bucket_info.bucket;
458
459     if (s->bucket_exists) {
460       ret = read_bucket_policy(store, s, s->bucket_info, s->bucket_attrs,
461                                s->bucket_acl.get(), s->bucket);
462       acct_acl_user = {
463         s->bucket_info.owner,
464         s->bucket_acl->get_owner().get_display_name(),
465       };
466     } else {
467       s->bucket_acl->create_default(s->user->user_id, s->user->display_name);
468       ret = -ERR_NO_SUCH_BUCKET;
469     }
470
471     s->bucket_owner = s->bucket_acl->get_owner();
472
473     RGWZoneGroup zonegroup;
474     int r = store->get_zonegroup(s->bucket_info.zonegroup, zonegroup);
475     if (!r) {
476       if (!zonegroup.endpoints.empty()) {
477         s->zonegroup_endpoint = zonegroup.endpoints.front();
478       } else {
479         // use zonegroup's master zone endpoints
480         auto z = zonegroup.zones.find(zonegroup.master_zone);
481         if (z != zonegroup.zones.end() && !z->second.endpoints.empty()) {
482           s->zonegroup_endpoint = z->second.endpoints.front();
483         }
484       }
485       s->zonegroup_name = zonegroup.get_name();
486     }
487     if (r < 0 && ret == 0) {
488       ret = r;
489     }
490
491     if (s->bucket_exists && !store->get_zonegroup().equals(s->bucket_info.zonegroup)) {
492       ldout(s->cct, 0) << "NOTICE: request for data in a different zonegroup (" << s->bucket_info.zonegroup << " != " << store->get_zonegroup().get_id() << ")" << dendl;
493       /* we now need to make sure that the operation actually requires copy source, that is
494        * it's a copy operation
495        */
496       if (store->get_zonegroup().is_master_zonegroup() && s->system_request) {
497         /*If this is the master, don't redirect*/
498       } else if (s->op_type == RGW_OP_GET_BUCKET_LOCATION ) {
499         /* If op is get bucket location, don't redirect */
500       } else if (!s->local_source ||
501           (s->op != OP_PUT && s->op != OP_COPY) ||
502           s->object.empty()) {
503         return -ERR_PERMANENT_REDIRECT;
504       }
505     }
506   }
507
508   /* handle user ACL only for those APIs which support it */
509   if (s->user_acl) {
510     map<string, bufferlist> uattrs;
511
512     ret = rgw_get_user_attrs_by_uid(store, acct_acl_user.uid, uattrs);
513     if (!ret) {
514       ret = get_user_policy_from_attr(s->cct, store, uattrs, *s->user_acl);
515     }
516     if (-ENOENT == ret) {
517       /* In already existing clusters users won't have ACL. In such case
518        * assuming that only account owner has the rights seems to be
519        * reasonable. That allows to have only one verification logic.
520        * NOTE: there is small compatibility kludge for global, empty tenant:
521        *  1. if we try to reach an existing bucket, its owner is considered
522        *     as account owner.
523        *  2. otherwise account owner is identity stored in s->user->user_id.  */
524       s->user_acl->create_default(acct_acl_user.uid,
525                                   acct_acl_user.display_name);
526       ret = 0;
527     } else {
528       ldout(s->cct, 0) << "NOTICE: couldn't get user attrs for handling ACL (user_id="
529                        << s->user->user_id
530                        << ", ret="
531                        << ret
532                        << ")" << dendl;
533       return ret;
534     }
535   }
536
537   try {
538     s->iam_policy = get_iam_policy_from_attr(s->cct, store, s->bucket_attrs,
539                                              s->bucket_tenant);
540   } catch (const std::exception& e) {
541     // Really this is a can't happen condition. We parse the policy
542     // when it's given to us, so perhaps we should abort or otherwise
543     // raise bloody murder.
544     lderr(s->cct) << "Error reading IAM Policy: " << e.what() << dendl;
545     ret = -EACCES;
546   }
547
548   return ret;
549 }
550
551 /**
552  * Get the AccessControlPolicy for a bucket or object off of disk.
553  * s: The req_state to draw information from.
554  * only_bucket: If true, reads the bucket ACL rather than the object ACL.
555  * Returns: 0 on success, -ERR# otherwise.
556  */
557 int rgw_build_object_policies(RGWRados *store, struct req_state *s,
558                               bool prefetch_data)
559 {
560   int ret = 0;
561
562   if (!s->object.empty()) {
563     if (!s->bucket_exists) {
564       return -ERR_NO_SUCH_BUCKET;
565     }
566     s->object_acl = ceph::make_unique<RGWAccessControlPolicy>(s->cct);
567
568     rgw_obj obj(s->bucket, s->object);
569       
570     store->set_atomic(s->obj_ctx, obj);
571     if (prefetch_data) {
572       store->set_prefetch_data(s->obj_ctx, obj);
573     }
574     ret = read_obj_policy(store, s, s->bucket_info, s->bucket_attrs,
575                           s->object_acl.get(), s->iam_policy, s->bucket,
576                           s->object);
577   }
578
579   return ret;
580 }
581
582 rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
583                                                 struct req_state* s)
584 {
585   rgw::IAM::Environment e;
586   const auto& m = s->info.env->get_map();
587   auto t = ceph::real_clock::now();
588   e.emplace(std::piecewise_construct,
589             std::forward_as_tuple("aws:CurrentTime"),
590             std::forward_as_tuple(std::to_string(
591                                     ceph::real_clock::to_time_t(t))));
592   e.emplace(std::piecewise_construct,
593             std::forward_as_tuple("aws:EpochTime"),
594             std::forward_as_tuple(ceph::to_iso_8601(t)));
595   // TODO: This is fine for now, but once we have STS we'll need to
596   // look and see. Also this won't work with the IdentityApplier
597   // model, since we need to know the actual credential.
598   e.emplace(std::piecewise_construct,
599             std::forward_as_tuple("aws:PrincipalType"),
600             std::forward_as_tuple("User"));
601
602   auto i = m.find("HTTP_REFERER");
603   if (i != m.end()) {
604     e.emplace(std::piecewise_construct,
605               std::forward_as_tuple("aws:Referer"),
606               std::forward_as_tuple(i->second));
607   }
608
609   // These seem to be the semantics, judging from rest_rgw_s3.cc
610   i = m.find("SERVER_PORT_SECURE");
611   if (i != m.end()) {
612     e.emplace(std::piecewise_construct,
613               std::forward_as_tuple("aws:SecureTransport"),
614               std::forward_as_tuple("true"));
615   }
616
617   i = m.find("HTTP_HOST");
618   if (i != m.end()) {
619     e.emplace(std::piecewise_construct,
620               std::forward_as_tuple("aws:SourceIp"),
621               std::forward_as_tuple(i->second));
622   }
623
624   i = m.find("HTTP_USER_AGENT"); {
625   if (i != m.end())
626     e.emplace(std::piecewise_construct,
627               std::forward_as_tuple("aws:UserAgent"),
628               std::forward_as_tuple(i->second));
629   }
630
631   if (s->user) {
632     // What to do about aws::userid? One can have multiple access
633     // keys so that isn't really suitable. Do we have a durable
634     // identifier that can persist through name changes?
635     e.emplace(std::piecewise_construct,
636               std::forward_as_tuple("aws:username"),
637               std::forward_as_tuple(s->user->user_id.id));
638   }
639   return e;
640 }
641
642 void rgw_bucket_object_pre_exec(struct req_state *s)
643 {
644   if (s->expect_cont)
645     dump_continue(s);
646
647   dump_bucket_from_state(s);
648 }
649
650 int RGWGetObj::verify_permission()
651 {
652   obj = rgw_obj(s->bucket, s->object);
653   store->set_atomic(s->obj_ctx, obj);
654   if (get_data) {
655     store->set_prefetch_data(s->obj_ctx, obj);
656   }
657
658   if (torrent.get_flag()) {
659     if (obj.key.instance.empty()) {
660       action = rgw::IAM::s3GetObjectTorrent;
661     } else {
662       action = rgw::IAM::s3GetObjectVersionTorrent;
663     }
664   } else {
665     if (obj.key.instance.empty()) {
666       action = rgw::IAM::s3GetObject;
667     } else {
668       action = rgw::IAM::s3GetObjectVersion;
669     }
670   }
671
672   if (!verify_object_permission(s, action)) {
673     return -EACCES;
674   }
675
676   return 0;
677 }
678
679
680 int RGWOp::verify_op_mask()
681 {
682   uint32_t required_mask = op_mask();
683
684   ldout(s->cct, 20) << "required_mask= " << required_mask
685                     << " user.op_mask=" << s->user->op_mask << dendl;
686
687   if ((s->user->op_mask & required_mask) != required_mask) {
688     return -EPERM;
689   }
690
691   if (!s->system_request && (required_mask & RGW_OP_TYPE_MODIFY) && !store->zone_is_writeable()) {
692     ldout(s->cct, 5) << "NOTICE: modify request to a read-only zone by a non-system user, permission denied"  << dendl;
693     return -EPERM;
694   }
695
696   return 0;
697 }
698
699 int RGWGetObjTags::verify_permission()
700 {
701   if (!verify_object_permission(s,
702                                 s->object.instance.empty() ?
703                                 rgw::IAM::s3GetObjectTagging:
704                                 rgw::IAM::s3GetObjectVersionTagging))
705     return -EACCES;
706
707   return 0;
708 }
709
710 void RGWGetObjTags::pre_exec()
711 {
712   rgw_bucket_object_pre_exec(s);
713 }
714
715 void RGWGetObjTags::execute()
716 {
717   rgw_obj obj;
718   map<string,bufferlist> attrs;
719
720   obj = rgw_obj(s->bucket, s->object);
721
722   store->set_atomic(s->obj_ctx, obj);
723
724   op_ret = get_obj_attrs(store, s, obj, attrs);
725   if (op_ret < 0) {
726     ldout(s->cct, 0) << "ERROR: failed to get obj attrs, obj=" << obj
727                      << " ret=" << op_ret << dendl;
728     return;
729   }
730
731   auto tags = attrs.find(RGW_ATTR_TAGS);
732   if(tags != attrs.end()){
733     has_tags = true;
734     tags_bl.append(tags->second);
735   }
736   send_response_data(tags_bl);
737 }
738
739 int RGWPutObjTags::verify_permission()
740 {
741   if (!verify_object_permission(s,
742                                 s->object.instance.empty() ?
743                                 rgw::IAM::s3PutObjectTagging:
744                                 rgw::IAM::s3PutObjectVersionTagging))
745     return -EACCES;
746   return 0;
747 }
748
749 void RGWPutObjTags::execute()
750 {
751   op_ret = get_params();
752   if (op_ret < 0)
753     return;
754
755   if (s->object.empty()){
756     op_ret= -EINVAL; // we only support tagging on existing objects
757     return;
758   }
759
760   rgw_obj obj;
761   obj = rgw_obj(s->bucket, s->object);
762   store->set_atomic(s->obj_ctx, obj);
763   op_ret = modify_obj_attr(store, s, obj, RGW_ATTR_TAGS, tags_bl);
764   if (op_ret == -ECANCELED){
765     op_ret = -ERR_TAG_CONFLICT;
766   }
767 }
768
769 void RGWDeleteObjTags::pre_exec()
770 {
771   rgw_bucket_object_pre_exec(s);
772 }
773
774
775 int RGWDeleteObjTags::verify_permission()
776 {
777   if (!s->object.empty()) {
778     if (!verify_object_permission(s,
779                                   s->object.instance.empty() ?
780                                   rgw::IAM::s3DeleteObjectTagging:
781                                   rgw::IAM::s3DeleteObjectVersionTagging))
782       return -EACCES;
783   }
784   return 0;
785 }
786
787 void RGWDeleteObjTags::execute()
788 {
789   if (s->object.empty())
790     return;
791
792   rgw_obj obj;
793   obj = rgw_obj(s->bucket, s->object);
794   store->set_atomic(s->obj_ctx, obj);
795   map <string, bufferlist> attrs;
796   map <string, bufferlist> rmattr;
797   bufferlist bl;
798   rmattr[RGW_ATTR_TAGS] = bl;
799   op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, &rmattr);
800 }
801
802 int RGWOp::do_aws4_auth_completion()
803 {
804   ldout(s->cct, 5) << "NOTICE: call to do_aws4_auth_completion"  << dendl;
805   if (s->auth.completer) {
806     if (!s->auth.completer->complete()) {
807       return -ERR_AMZ_CONTENT_SHA256_MISMATCH;
808     } else {
809       dout(10) << "v4 auth ok -- do_aws4_auth_completion" << dendl;
810     }
811
812     /* TODO(rzarzynski): yes, we're really called twice on PUTs. Only first
813      * call passes, so we disable second one. This is old behaviour, sorry!
814      * Plan for tomorrow: seek and destroy. */
815     s->auth.completer = nullptr;
816   }
817
818   return 0;
819 }
820
821 int RGWOp::init_quota()
822 {
823   /* no quota enforcement for system requests */
824   if (s->system_request)
825     return 0;
826
827   /* init quota related stuff */
828   if (!(s->user->op_mask & RGW_OP_TYPE_MODIFY)) {
829     return 0;
830   }
831
832   /* only interested in object related ops */
833   if (s->object.empty()) {
834     return 0;
835   }
836
837   RGWUserInfo owner_info;
838   RGWUserInfo *uinfo;
839
840   if (s->user->user_id == s->bucket_owner.get_id()) {
841     uinfo = s->user;
842   } else {
843     int r = rgw_get_user_info_by_uid(store, s->bucket_info.owner, owner_info);
844     if (r < 0)
845       return r;
846     uinfo = &owner_info;
847   }
848
849   if (s->bucket_info.quota.enabled) {
850     bucket_quota = s->bucket_info.quota;
851   } else if (uinfo->bucket_quota.enabled) {
852     bucket_quota = uinfo->bucket_quota;
853   } else {
854     bucket_quota = store->get_bucket_quota();
855   }
856
857   if (uinfo->user_quota.enabled) {
858     user_quota = uinfo->user_quota;
859   } else {
860     user_quota = store->get_user_quota();
861   }
862
863   return 0;
864 }
865
866 static bool validate_cors_rule_method(RGWCORSRule *rule, const char *req_meth) {
867   uint8_t flags = 0;
868
869   if (!req_meth) {
870     dout(5) << "req_meth is null" << dendl;
871     return false;
872   }
873
874   if (strcmp(req_meth, "GET") == 0) flags = RGW_CORS_GET;
875   else if (strcmp(req_meth, "POST") == 0) flags = RGW_CORS_POST;
876   else if (strcmp(req_meth, "PUT") == 0) flags = RGW_CORS_PUT;
877   else if (strcmp(req_meth, "DELETE") == 0) flags = RGW_CORS_DELETE;
878   else if (strcmp(req_meth, "HEAD") == 0) flags = RGW_CORS_HEAD;
879
880   if ((rule->get_allowed_methods() & flags) == flags) {
881     dout(10) << "Method " << req_meth << " is supported" << dendl;
882   } else {
883     dout(5) << "Method " << req_meth << " is not supported" << dendl;
884     return false;
885   }
886
887   return true;
888 }
889
890 int RGWOp::read_bucket_cors()
891 {
892   bufferlist bl;
893
894   map<string, bufferlist>::iterator aiter = s->bucket_attrs.find(RGW_ATTR_CORS);
895   if (aiter == s->bucket_attrs.end()) {
896     ldout(s->cct, 20) << "no CORS configuration attr found" << dendl;
897     cors_exist = false;
898     return 0; /* no CORS configuration found */
899   }
900
901   cors_exist = true;
902
903   bl = aiter->second;
904
905   bufferlist::iterator iter = bl.begin();
906   try {
907     bucket_cors.decode(iter);
908   } catch (buffer::error& err) {
909     ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
910     return -EIO;
911   }
912   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
913     RGWCORSConfiguration_S3 *s3cors = static_cast<RGWCORSConfiguration_S3 *>(&bucket_cors);
914     ldout(s->cct, 15) << "Read RGWCORSConfiguration";
915     s3cors->to_xml(*_dout);
916     *_dout << dendl;
917   }
918   return 0;
919 }
920
921 /** CORS 6.2.6.
922  * If any of the header field-names is not a ASCII case-insensitive match for
923  * any of the values in list of headers do not set any additional headers and
924  * terminate this set of steps.
925  * */
926 static void get_cors_response_headers(RGWCORSRule *rule, const char *req_hdrs, string& hdrs, string& exp_hdrs, unsigned *max_age) {
927   if (req_hdrs) {
928     list<string> hl;
929     get_str_list(req_hdrs, hl);
930     for(list<string>::iterator it = hl.begin(); it != hl.end(); ++it) {
931       if (!rule->is_header_allowed((*it).c_str(), (*it).length())) {
932         dout(5) << "Header " << (*it) << " is not registered in this rule" << dendl;
933       } else {
934         if (hdrs.length() > 0) hdrs.append(",");
935         hdrs.append((*it));
936       }
937     }
938   }
939   rule->format_exp_headers(exp_hdrs);
940   *max_age = rule->get_max_age();
941 }
942
943 /**
944  * Generate the CORS header response
945  *
946  * This is described in the CORS standard, section 6.2.
947  */
948 bool RGWOp::generate_cors_headers(string& origin, string& method, string& headers, string& exp_headers, unsigned *max_age)
949 {
950   /* CORS 6.2.1. */
951   const char *orig = s->info.env->get("HTTP_ORIGIN");
952   if (!orig) {
953     return false;
954   }
955
956   /* Custom: */
957   origin = orig;
958   op_ret = read_bucket_cors();
959   if (op_ret < 0) {
960     return false;
961   }
962
963   if (!cors_exist) {
964     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
965     return false;
966   }
967
968   /* CORS 6.2.2. */
969   RGWCORSRule *rule = bucket_cors.host_name_rule(orig);
970   if (!rule)
971     return false;
972
973   /*
974    * Set the Allowed-Origin header to a asterisk if this is allowed in the rule
975    * and no Authorization was send by the client
976    *
977    * The origin parameter specifies a URI that may access the resource.  The browser must enforce this.
978    * For requests without credentials, the server may specify "*" as a wildcard,
979    * thereby allowing any origin to access the resource.
980    */
981   const char *authorization = s->info.env->get("HTTP_AUTHORIZATION");
982   if (!authorization && rule->has_wildcard_origin())
983     origin = "*";
984
985   /* CORS 6.2.3. */
986   const char *req_meth = s->info.env->get("HTTP_ACCESS_CONTROL_REQUEST_METHOD");
987   if (!req_meth) {
988     req_meth = s->info.method;
989   }
990
991   if (req_meth) {
992     method = req_meth;
993     /* CORS 6.2.5. */
994     if (!validate_cors_rule_method(rule, req_meth)) {
995      return false;
996     }
997   }
998
999   /* CORS 6.2.4. */
1000   const char *req_hdrs = s->info.env->get("HTTP_ACCESS_CONTROL_REQUEST_HEADERS");
1001
1002   /* CORS 6.2.6. */
1003   get_cors_response_headers(rule, req_hdrs, headers, exp_headers, max_age);
1004
1005   return true;
1006 }
1007
1008 int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
1009                                        const rgw_bucket_dir_entry& ent,
1010                                        RGWAccessControlPolicy * const bucket_acl,
1011                                        const optional<Policy>& bucket_policy,
1012                                        const off_t start_ofs,
1013                                        const off_t end_ofs)
1014 {
1015   ldout(s->cct, 20) << "user manifest obj=" << ent.key.name << "[" << ent.key.instance << "]" << dendl;
1016   RGWGetObj_CB cb(this);
1017   RGWGetDataCB* filter = &cb;
1018   boost::optional<RGWGetObj_Decompress> decompress;
1019
1020   int64_t cur_ofs = start_ofs;
1021   int64_t cur_end = end_ofs;
1022
1023   rgw_obj part(bucket, ent.key);
1024
1025   map<string, bufferlist> attrs;
1026
1027   uint64_t obj_size;
1028   RGWObjectCtx obj_ctx(store);
1029   RGWAccessControlPolicy obj_policy(s->cct);
1030
1031   ldout(s->cct, 20) << "reading obj=" << part << " ofs=" << cur_ofs << " end=" << cur_end << dendl;
1032
1033   obj_ctx.obj.set_atomic(part);
1034   store->set_prefetch_data(&obj_ctx, part);
1035
1036   RGWRados::Object op_target(store, s->bucket_info, obj_ctx, part);
1037   RGWRados::Object::Read read_op(&op_target);
1038
1039   read_op.conds.if_match = ent.meta.etag.c_str();
1040   read_op.params.attrs = &attrs;
1041   read_op.params.obj_size = &obj_size;
1042
1043   op_ret = read_op.prepare();
1044   if (op_ret < 0)
1045     return op_ret;
1046   op_ret = read_op.range_to_ofs(ent.meta.accounted_size, cur_ofs, cur_end);
1047   if (op_ret < 0)
1048     return op_ret;
1049   bool need_decompress;
1050   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
1051   if (op_ret < 0) {
1052           lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
1053       return -EIO;
1054   }
1055
1056   if (need_decompress)
1057   {
1058     if (cs_info.orig_size != ent.meta.accounted_size) {
1059       // hmm.. something wrong, object not as expected, abort!
1060       ldout(s->cct, 0) << "ERROR: expected cs_info.orig_size=" << cs_info.orig_size <<
1061           ", actual read size=" << ent.meta.size << dendl;
1062       return -EIO;
1063     }
1064     decompress.emplace(s->cct, &cs_info, partial_content, filter);
1065     filter = &*decompress;
1066   }
1067   else
1068   {
1069     if (obj_size != ent.meta.size) {
1070       // hmm.. something wrong, object not as expected, abort!
1071       ldout(s->cct, 0) << "ERROR: expected obj_size=" << obj_size << ", actual read size=" << ent.meta.size << dendl;
1072       return -EIO;
1073           }
1074   }
1075
1076   op_ret = rgw_policy_from_attrset(s->cct, attrs, &obj_policy);
1077   if (op_ret < 0)
1078     return op_ret;
1079
1080   /* We can use global user_acl because LOs cannot have segments
1081    * stored inside different accounts. */
1082   if (s->system_request) {
1083     ldout(s->cct, 2) << "overriding permissions due to system operation" << dendl;
1084   } else if (s->auth.identity->is_admin_of(s->user->user_id)) {
1085     ldout(s->cct, 2) << "overriding permissions due to admin operation" << dendl;
1086   } else if (!verify_object_permission(s, part, s->user_acl.get(), bucket_acl,
1087                                        &obj_policy, bucket_policy, action)) {
1088     return -EPERM;
1089   }
1090
1091   if (ent.meta.size == 0) {
1092     return 0;
1093   }
1094
1095   perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs);
1096   filter->fixup_range(cur_ofs, cur_end);
1097   op_ret = read_op.iterate(cur_ofs, cur_end, filter);
1098   if (op_ret >= 0)
1099           op_ret = filter->flush();
1100   return op_ret;
1101 }
1102
1103 static int iterate_user_manifest_parts(CephContext * const cct,
1104                                        RGWRados * const store,
1105                                        const off_t ofs,
1106                                        const off_t end,
1107                                        RGWBucketInfo *pbucket_info,
1108                                        const string& obj_prefix,
1109                                        RGWAccessControlPolicy * const bucket_acl,
1110                                        const optional<Policy>& bucket_policy,
1111                                        uint64_t * const ptotal_len,
1112                                        uint64_t * const pobj_size,
1113                                        string * const pobj_sum,
1114                                        int (*cb)(rgw_bucket& bucket,
1115                                                  const rgw_bucket_dir_entry& ent,
1116                                                  RGWAccessControlPolicy * const bucket_acl,
1117                                                  const optional<Policy>& bucket_policy,
1118                                                  off_t start_ofs,
1119                                                  off_t end_ofs,
1120                                                  void *param),
1121                                        void * const cb_param)
1122 {
1123   rgw_bucket& bucket = pbucket_info->bucket;
1124   uint64_t obj_ofs = 0, len_count = 0;
1125   bool found_start = false, found_end = false, handled_end = false;
1126   string delim;
1127   bool is_truncated;
1128   vector<rgw_bucket_dir_entry> objs;
1129
1130   utime_t start_time = ceph_clock_now();
1131
1132   RGWRados::Bucket target(store, *pbucket_info);
1133   RGWRados::Bucket::List list_op(&target);
1134
1135   list_op.params.prefix = obj_prefix;
1136   list_op.params.delim = delim;
1137
1138   MD5 etag_sum;
1139   do {
1140 #define MAX_LIST_OBJS 100
1141     int r = list_op.list_objects(MAX_LIST_OBJS, &objs, NULL, &is_truncated);
1142     if (r < 0) {
1143       return r;
1144     }
1145
1146     for (rgw_bucket_dir_entry& ent : objs) {
1147       const uint64_t cur_total_len = obj_ofs;
1148       const uint64_t obj_size = ent.meta.accounted_size;
1149       uint64_t start_ofs = 0, end_ofs = obj_size;
1150
1151       if ((ptotal_len || cb) && !found_start && cur_total_len + obj_size > (uint64_t)ofs) {
1152         start_ofs = ofs - obj_ofs;
1153         found_start = true;
1154       }
1155
1156       obj_ofs += obj_size;
1157       if (pobj_sum) {
1158         etag_sum.Update((const byte *)ent.meta.etag.c_str(),
1159                         ent.meta.etag.length());
1160       }
1161
1162       if ((ptotal_len || cb) && !found_end && obj_ofs > (uint64_t)end) {
1163         end_ofs = end - cur_total_len + 1;
1164         found_end = true;
1165       }
1166
1167       perfcounter->tinc(l_rgw_get_lat,
1168                         (ceph_clock_now() - start_time));
1169
1170       if (found_start && !handled_end) {
1171         len_count += end_ofs - start_ofs;
1172
1173         if (cb) {
1174           r = cb(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs, cb_param);
1175           if (r < 0) {
1176             return r;
1177           }
1178         }
1179       }
1180
1181       handled_end = found_end;
1182       start_time = ceph_clock_now();
1183     }
1184   } while (is_truncated);
1185
1186   if (ptotal_len) {
1187     *ptotal_len = len_count;
1188   }
1189   if (pobj_size) {
1190     *pobj_size = obj_ofs;
1191   }
1192   if (pobj_sum) {
1193     complete_etag(etag_sum, pobj_sum);
1194   }
1195
1196   return 0;
1197 }
1198
1199 struct rgw_slo_part {
1200   RGWAccessControlPolicy *bucket_acl = nullptr;
1201   Policy* bucket_policy = nullptr;
1202   rgw_bucket bucket;
1203   string obj_name;
1204   uint64_t size = 0;
1205   string etag;
1206 };
1207
1208 static int iterate_slo_parts(CephContext *cct,
1209                              RGWRados *store,
1210                              off_t ofs,
1211                              off_t end,
1212                              map<uint64_t, rgw_slo_part>& slo_parts,
1213                              int (*cb)(rgw_bucket& bucket,
1214                                        const rgw_bucket_dir_entry& ent,
1215                                        RGWAccessControlPolicy *bucket_acl,
1216                                        const optional<Policy>& bucket_policy,
1217                                        off_t start_ofs,
1218                                        off_t end_ofs,
1219                                        void *param),
1220                              void *cb_param)
1221 {
1222   bool found_start = false, found_end = false;
1223
1224   if (slo_parts.empty()) {
1225     return 0;
1226   }
1227
1228   utime_t start_time = ceph_clock_now();
1229
1230   map<uint64_t, rgw_slo_part>::iterator iter = slo_parts.upper_bound(ofs);
1231   if (iter != slo_parts.begin()) {
1232     --iter;
1233   }
1234
1235   uint64_t obj_ofs = iter->first;
1236
1237   for (; iter != slo_parts.end() && !found_end; ++iter) {
1238     rgw_slo_part& part = iter->second;
1239     rgw_bucket_dir_entry ent;
1240
1241     ent.key.name = part.obj_name;
1242     ent.meta.accounted_size = ent.meta.size = part.size;
1243     ent.meta.etag = part.etag;
1244
1245     uint64_t cur_total_len = obj_ofs;
1246     uint64_t start_ofs = 0, end_ofs = ent.meta.size;
1247
1248     if (!found_start && cur_total_len + ent.meta.size > (uint64_t)ofs) {
1249       start_ofs = ofs - obj_ofs;
1250       found_start = true;
1251     }
1252
1253     obj_ofs += ent.meta.size;
1254
1255     if (!found_end && obj_ofs > (uint64_t)end) {
1256       end_ofs = end - cur_total_len + 1;
1257       found_end = true;
1258     }
1259
1260     perfcounter->tinc(l_rgw_get_lat,
1261                       (ceph_clock_now() - start_time));
1262
1263     if (found_start) {
1264       if (cb) {
1265         // SLO is a Swift thing, and Swift has no knowledge of S3 Policies.
1266         int r = cb(part.bucket, ent, part.bucket_acl,
1267                    (part.bucket_policy ?
1268                     optional<Policy>(*part.bucket_policy) : none),
1269                    start_ofs, end_ofs, cb_param);
1270         if (r < 0)
1271           return r;
1272       }
1273     }
1274
1275     start_time = ceph_clock_now();
1276   }
1277
1278   return 0;
1279 }
1280
1281 static int get_obj_user_manifest_iterate_cb(rgw_bucket& bucket,
1282                                             const rgw_bucket_dir_entry& ent,
1283                                             RGWAccessControlPolicy * const bucket_acl,
1284                                             const optional<Policy>& bucket_policy,
1285                                             const off_t start_ofs,
1286                                             const off_t end_ofs,
1287                                             void * const param)
1288 {
1289   RGWGetObj *op = static_cast<RGWGetObj *>(param);
1290   return op->read_user_manifest_part(bucket, ent, bucket_acl, bucket_policy, start_ofs, end_ofs);
1291 }
1292
1293 int RGWGetObj::handle_user_manifest(const char *prefix)
1294 {
1295   const boost::string_view prefix_view(prefix);
1296   ldout(s->cct, 2) << "RGWGetObj::handle_user_manifest() prefix="
1297                    << prefix_view << dendl;
1298
1299   const size_t pos = prefix_view.find('/');
1300   if (pos == string::npos) {
1301     return -EINVAL;
1302   }
1303
1304   const std::string bucket_name = url_decode(prefix_view.substr(0, pos));
1305   const std::string obj_prefix = url_decode(prefix_view.substr(pos + 1));
1306
1307   rgw_bucket bucket;
1308
1309   RGWAccessControlPolicy _bucket_acl(s->cct);
1310   RGWAccessControlPolicy *bucket_acl;
1311   optional<Policy> _bucket_policy;
1312   optional<Policy>* bucket_policy;
1313   RGWBucketInfo bucket_info;
1314   RGWBucketInfo *pbucket_info;
1315
1316   if (bucket_name.compare(s->bucket.name) != 0) {
1317     map<string, bufferlist> bucket_attrs;
1318     RGWObjectCtx obj_ctx(store);
1319     int r = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
1320                                   bucket_name, bucket_info, NULL,
1321                                   &bucket_attrs);
1322     if (r < 0) {
1323       ldout(s->cct, 0) << "could not get bucket info for bucket="
1324                        << bucket_name << dendl;
1325       return r;
1326     }
1327     bucket = bucket_info.bucket;
1328     pbucket_info = &bucket_info;
1329     bucket_acl = &_bucket_acl;
1330     r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl, bucket);
1331     if (r < 0) {
1332       ldout(s->cct, 0) << "failed to read bucket policy" << dendl;
1333       return r;
1334     }
1335     _bucket_policy = get_iam_policy_from_attr(s->cct, store, bucket_attrs,
1336                                               bucket_info.bucket.tenant);
1337     bucket_policy = &_bucket_policy;
1338   } else {
1339     bucket = s->bucket;
1340     pbucket_info = &s->bucket_info;
1341     bucket_acl = s->bucket_acl.get();
1342     bucket_policy = &s->iam_policy;
1343   }
1344
1345   /* dry run to find out:
1346    * - total length (of the parts we are going to send to client),
1347    * - overall DLO's content size,
1348    * - md5 sum of overall DLO's content (for etag of Swift API). */
1349   int r = iterate_user_manifest_parts(s->cct, store, ofs, end,
1350         pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
1351         nullptr, &s->obj_size, &lo_etag,
1352         nullptr /* cb */, nullptr /* cb arg */);
1353   if (r < 0) {
1354     return r;
1355   }
1356
1357   r = RGWRados::Object::Read::range_to_ofs(s->obj_size, ofs, end);
1358   if (r < 0) {
1359     return r;
1360   }
1361
1362   r = iterate_user_manifest_parts(s->cct, store, ofs, end,
1363         pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
1364         &total_len, nullptr, nullptr,
1365         nullptr, nullptr);
1366   if (r < 0) {
1367     return r;
1368   }
1369
1370   if (!get_data) {
1371     bufferlist bl;
1372     send_response_data(bl, 0, 0);
1373     return 0;
1374   }
1375
1376   r = iterate_user_manifest_parts(s->cct, store, ofs, end,
1377         pbucket_info, obj_prefix, bucket_acl, *bucket_policy,
1378         nullptr, nullptr, nullptr,
1379         get_obj_user_manifest_iterate_cb, (void *)this);
1380   if (r < 0) {
1381     return r;
1382   }
1383
1384   if (!total_len) {
1385     bufferlist bl;
1386     send_response_data(bl, 0, 0);
1387   }
1388
1389   return 0;
1390 }
1391
1392 int RGWGetObj::handle_slo_manifest(bufferlist& bl)
1393 {
1394   RGWSLOInfo slo_info;
1395   bufferlist::iterator bliter = bl.begin();
1396   try {
1397     ::decode(slo_info, bliter);
1398   } catch (buffer::error& err) {
1399     ldout(s->cct, 0) << "ERROR: failed to decode slo manifest" << dendl;
1400     return -EIO;
1401   }
1402   ldout(s->cct, 2) << "RGWGetObj::handle_slo_manifest()" << dendl;
1403
1404   vector<RGWAccessControlPolicy> allocated_acls;
1405   map<string, pair<RGWAccessControlPolicy *, optional<Policy>>> policies;
1406   map<string, rgw_bucket> buckets;
1407
1408   map<uint64_t, rgw_slo_part> slo_parts;
1409
1410   MD5 etag_sum;
1411   total_len = 0;
1412
1413   for (const auto& entry : slo_info.entries) {
1414     const string& path = entry.path;
1415
1416     /* If the path starts with slashes, strip them all. */
1417     const size_t pos_init = path.find_first_not_of('/');
1418     /* According to the documentation of std::string::find following check
1419      * is not necessary as we should get the std::string::npos propagation
1420      * here. This might be true with the accuracy to implementation's bugs.
1421      * See following question on SO:
1422      * http://stackoverflow.com/questions/1011790/why-does-stdstring-findtext-stdstringnpos-not-return-npos
1423      */
1424     if (pos_init == string::npos) {
1425       return -EINVAL;
1426     }
1427
1428     const size_t pos_sep = path.find('/', pos_init);
1429     if (pos_sep == string::npos) {
1430       return -EINVAL;
1431     }
1432
1433     string bucket_name = path.substr(pos_init, pos_sep - pos_init);
1434     string obj_name = path.substr(pos_sep + 1);
1435
1436     rgw_bucket bucket;
1437     RGWAccessControlPolicy *bucket_acl;
1438     Policy* bucket_policy;
1439
1440     if (bucket_name.compare(s->bucket.name) != 0) {
1441       const auto& piter = policies.find(bucket_name);
1442       if (piter != policies.end()) {
1443         bucket_acl = piter->second.first;
1444         bucket_policy = piter->second.second.get_ptr();
1445         bucket = buckets[bucket_name];
1446       } else {
1447         allocated_acls.push_back(RGWAccessControlPolicy(s->cct));
1448         RGWAccessControlPolicy& _bucket_acl = allocated_acls.back();
1449
1450         RGWBucketInfo bucket_info;
1451         map<string, bufferlist> bucket_attrs;
1452         RGWObjectCtx obj_ctx(store);
1453         int r = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
1454                                        bucket_name, bucket_info, nullptr,
1455                                        &bucket_attrs);
1456         if (r < 0) {
1457           ldout(s->cct, 0) << "could not get bucket info for bucket="
1458                            << bucket_name << dendl;
1459           return r;
1460         }
1461         bucket = bucket_info.bucket;
1462         bucket_acl = &_bucket_acl;
1463         r = read_bucket_policy(store, s, bucket_info, bucket_attrs, bucket_acl,
1464                                bucket);
1465         if (r < 0) {
1466           ldout(s->cct, 0) << "failed to read bucket ACL for bucket "
1467                            << bucket << dendl;
1468           return r;
1469         }
1470         auto _bucket_policy = get_iam_policy_from_attr(
1471           s->cct, store, bucket_attrs, bucket_info.bucket.tenant);
1472         bucket_policy = _bucket_policy.get_ptr();
1473         buckets[bucket_name] = bucket;
1474         policies[bucket_name] = make_pair(bucket_acl, _bucket_policy);
1475       }
1476     } else {
1477       bucket = s->bucket;
1478       bucket_acl = s->bucket_acl.get();
1479       bucket_policy = s->iam_policy.get_ptr();
1480     }
1481
1482     rgw_slo_part part;
1483     part.bucket_acl = bucket_acl;
1484     part.bucket_policy = bucket_policy;
1485     part.bucket = bucket;
1486     part.obj_name = obj_name;
1487     part.size = entry.size_bytes;
1488     part.etag = entry.etag;
1489     ldout(s->cct, 20) << "slo_part: ofs=" << ofs
1490                       << " bucket=" << part.bucket
1491                       << " obj=" << part.obj_name
1492                       << " size=" << part.size
1493                       << " etag=" << part.etag
1494                       << dendl;
1495
1496     etag_sum.Update((const byte *)entry.etag.c_str(),
1497                     entry.etag.length());
1498
1499     slo_parts[total_len] = part;
1500     total_len += part.size;
1501   }
1502
1503   complete_etag(etag_sum, &lo_etag);
1504
1505   s->obj_size = slo_info.total_size;
1506   ldout(s->cct, 20) << "s->obj_size=" << s->obj_size << dendl;
1507
1508   int r = RGWRados::Object::Read::range_to_ofs(total_len, ofs, end);
1509   if (r < 0) {
1510     return r;
1511   }
1512
1513   total_len = end - ofs + 1;
1514
1515   r = iterate_slo_parts(s->cct, store, ofs, end, slo_parts,
1516         get_obj_user_manifest_iterate_cb, (void *)this);
1517   if (r < 0) {
1518     return r;
1519   }
1520
1521   return 0;
1522 }
1523
1524 int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
1525 {
1526   /* garbage collection related handling */
1527   utime_t start_time = ceph_clock_now();
1528   if (start_time > gc_invalidate_time) {
1529     int r = store->defer_gc(s->obj_ctx, s->bucket_info, obj);
1530     if (r < 0) {
1531       dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
1532     }
1533     gc_invalidate_time = start_time;
1534     gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
1535   }
1536   return send_response_data(bl, bl_ofs, bl_len);
1537 }
1538
1539 bool RGWGetObj::prefetch_data()
1540 {
1541   /* HEAD request, stop prefetch*/
1542   if (!get_data) {
1543     return false;
1544   }
1545
1546   bool prefetch_first_chunk = true;
1547   range_str = s->info.env->get("HTTP_RANGE");
1548
1549   if(range_str) {
1550     int r = parse_range(range_str, ofs, end, &partial_content);
1551     /* error on parsing the range, stop prefetch and will fail in execte() */
1552     if (r < 0) {
1553       range_parsed = false;
1554       return false;
1555     } else {
1556       range_parsed = true;
1557     }
1558     /* range get goes to shadown objects, stop prefetch */
1559     if (ofs >= s->cct->_conf->rgw_max_chunk_size) {
1560       prefetch_first_chunk = false;
1561     }
1562   }
1563
1564   return get_data && prefetch_first_chunk;
1565 }
1566 void RGWGetObj::pre_exec()
1567 {
1568   rgw_bucket_object_pre_exec(s);
1569 }
1570
1571 static bool object_is_expired(map<string, bufferlist>& attrs) {
1572   map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_DELETE_AT);
1573   if (iter != attrs.end()) {
1574     utime_t delete_at;
1575     try {
1576       ::decode(delete_at, iter->second);
1577     } catch (buffer::error& err) {
1578       dout(0) << "ERROR: " << __func__ << ": failed to decode " RGW_ATTR_DELETE_AT " attr" << dendl;
1579       return false;
1580     }
1581
1582     if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
1583       return true;
1584     }
1585   }
1586
1587   return false;
1588 }
1589
1590 void RGWGetObj::execute()
1591 {
1592   utime_t start_time = s->time;
1593   bufferlist bl;
1594   gc_invalidate_time = ceph_clock_now();
1595   gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
1596
1597   bool need_decompress;
1598   int64_t ofs_x, end_x;
1599
1600   RGWGetObj_CB cb(this);
1601   RGWGetDataCB* filter = (RGWGetDataCB*)&cb;
1602   boost::optional<RGWGetObj_Decompress> decompress;
1603   std::unique_ptr<RGWGetDataCB> decrypt;
1604   map<string, bufferlist>::iterator attr_iter;
1605
1606   perfcounter->inc(l_rgw_get);
1607
1608   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
1609   RGWRados::Object::Read read_op(&op_target);
1610
1611   op_ret = get_params();
1612   if (op_ret < 0)
1613     goto done_err;
1614
1615   op_ret = init_common();
1616   if (op_ret < 0)
1617     goto done_err;
1618
1619   read_op.conds.mod_ptr = mod_ptr;
1620   read_op.conds.unmod_ptr = unmod_ptr;
1621   read_op.conds.high_precision_time = s->system_request; /* system request need to use high precision time */
1622   read_op.conds.mod_zone_id = mod_zone_id;
1623   read_op.conds.mod_pg_ver = mod_pg_ver;
1624   read_op.conds.if_match = if_match;
1625   read_op.conds.if_nomatch = if_nomatch;
1626   read_op.params.attrs = &attrs;
1627   read_op.params.lastmod = &lastmod;
1628   read_op.params.obj_size = &s->obj_size;
1629
1630   op_ret = read_op.prepare();
1631   if (op_ret < 0)
1632     goto done_err;
1633   version_id = read_op.state.obj.key.instance;
1634
1635   /* STAT ops don't need data, and do no i/o */
1636   if (get_type() == RGW_OP_STAT_OBJ) {
1637     return;
1638   }
1639
1640   /* start gettorrent */
1641   if (torrent.get_flag())
1642   {
1643     attr_iter = attrs.find(RGW_ATTR_CRYPT_MODE);
1644     if (attr_iter != attrs.end() && attr_iter->second.to_str() == "SSE-C-AES256") {
1645       op_ret = -ERR_INVALID_REQUEST;
1646       goto done_err;
1647     }
1648     torrent.init(s, store);
1649     op_ret = torrent.get_torrent_file(read_op, total_len, bl, obj);
1650     if (op_ret < 0)
1651     {
1652       ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret
1653                        << dendl;
1654       goto done_err;
1655     }
1656     op_ret = send_response_data(bl, 0, total_len);
1657     if (op_ret < 0)
1658     {
1659       ldout(s->cct, 0) << "ERROR: failed to send_response_data ret= " << op_ret 
1660                        << dendl;
1661       goto done_err;
1662     }
1663     return;
1664   }
1665   /* end gettorrent */
1666
1667   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
1668   if (op_ret < 0) {
1669     lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
1670     goto done_err;
1671   }
1672   if (need_decompress) {
1673       s->obj_size = cs_info.orig_size;
1674       decompress.emplace(s->cct, &cs_info, partial_content, filter);
1675       filter = &*decompress;
1676   }
1677
1678   attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
1679   if (attr_iter != attrs.end() && !skip_manifest) {
1680     op_ret = handle_user_manifest(attr_iter->second.c_str());
1681     if (op_ret < 0) {
1682       ldout(s->cct, 0) << "ERROR: failed to handle user manifest ret="
1683                        << op_ret << dendl;
1684       goto done_err;
1685     }
1686     return;
1687   }
1688
1689   attr_iter = attrs.find(RGW_ATTR_SLO_MANIFEST);
1690   if (attr_iter != attrs.end() && !skip_manifest) {
1691     is_slo = true;
1692     op_ret = handle_slo_manifest(attr_iter->second);
1693     if (op_ret < 0) {
1694       ldout(s->cct, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret
1695                        << dendl;
1696       goto done_err;
1697     }
1698     return;
1699   }
1700
1701   // for range requests with obj size 0
1702   if (range_str && !(s->obj_size)) {
1703     total_len = 0;
1704     op_ret = -ERANGE;
1705     goto done_err;
1706   }
1707
1708   op_ret = read_op.range_to_ofs(s->obj_size, ofs, end);
1709   if (op_ret < 0)
1710     goto done_err;
1711   total_len = (ofs <= end ? end + 1 - ofs : 0);
1712
1713   /* Check whether the object has expired. Swift API documentation
1714    * stands that we should return 404 Not Found in such case. */
1715   if (need_object_expiration() && object_is_expired(attrs)) {
1716     op_ret = -ENOENT;
1717     goto done_err;
1718   }
1719
1720   start = ofs;
1721
1722   /* STAT ops don't need data, and do no i/o */
1723   if (get_type() == RGW_OP_STAT_OBJ) {
1724     return;
1725   }
1726
1727   attr_iter = attrs.find(RGW_ATTR_MANIFEST);
1728   op_ret = this->get_decrypt_filter(&decrypt, filter,
1729                                     attr_iter != attrs.end() ? &(attr_iter->second) : nullptr);
1730   if (decrypt != nullptr) {
1731     filter = decrypt.get();
1732   }
1733   if (op_ret < 0) {
1734     goto done_err;
1735   }
1736
1737   if (!get_data || ofs > end) {
1738     send_response_data(bl, 0, 0);
1739     return;
1740   }
1741
1742   perfcounter->inc(l_rgw_get_b, end - ofs);
1743
1744   ofs_x = ofs;
1745   end_x = end;
1746   filter->fixup_range(ofs_x, end_x);
1747   op_ret = read_op.iterate(ofs_x, end_x, filter);
1748
1749   if (op_ret >= 0)
1750     op_ret = filter->flush();
1751
1752   perfcounter->tinc(l_rgw_get_lat,
1753                     (ceph_clock_now() - start_time));
1754   if (op_ret < 0) {
1755     goto done_err;
1756   }
1757
1758   op_ret = send_response_data(bl, 0, 0);
1759   if (op_ret < 0) {
1760     goto done_err;
1761   }
1762   return;
1763
1764 done_err:
1765   send_response_data_error();
1766 }
1767
1768 int RGWGetObj::init_common()
1769 {
1770   if (range_str) {
1771     /* range parsed error when prefetch*/
1772     if (!range_parsed) {
1773       int r = parse_range(range_str, ofs, end, &partial_content);
1774       if (r < 0)
1775         return r;
1776     }
1777   }
1778   if (if_mod) {
1779     if (parse_time(if_mod, &mod_time) < 0)
1780       return -EINVAL;
1781     mod_ptr = &mod_time;
1782   }
1783
1784   if (if_unmod) {
1785     if (parse_time(if_unmod, &unmod_time) < 0)
1786       return -EINVAL;
1787     unmod_ptr = &unmod_time;
1788   }
1789
1790   return 0;
1791 }
1792
1793 int RGWListBuckets::verify_permission()
1794 {
1795   if (!verify_user_permission(s, RGW_PERM_READ)) {
1796     return -EACCES;
1797   }
1798
1799   return 0;
1800 }
1801
1802 int RGWGetUsage::verify_permission()
1803 {
1804   if (s->auth.identity->is_anonymous()) {
1805     return -EACCES;
1806   }
1807
1808   return 0;
1809 }
1810
1811 void RGWListBuckets::execute()
1812 {
1813   bool done;
1814   bool started = false;
1815   uint64_t total_count = 0;
1816
1817   const uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
1818
1819   op_ret = get_params();
1820   if (op_ret < 0) {
1821     goto send_end;
1822   }
1823
1824   if (supports_account_metadata()) {
1825     op_ret = rgw_get_user_attrs_by_uid(store, s->user->user_id, attrs);
1826     if (op_ret < 0) {
1827       goto send_end;
1828     }
1829   }
1830
1831   is_truncated = false;
1832   do {
1833     RGWUserBuckets buckets;
1834     uint64_t read_count;
1835     if (limit >= 0) {
1836       read_count = min(limit - total_count, (uint64_t)max_buckets);
1837     } else {
1838       read_count = max_buckets;
1839     }
1840
1841     op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
1842                                    marker, end_marker, read_count,
1843                                    should_get_stats(), &is_truncated,
1844                                    get_default_max());
1845     if (op_ret < 0) {
1846       /* hmm.. something wrong here.. the user was authenticated, so it
1847          should exist */
1848       ldout(s->cct, 10) << "WARNING: failed on rgw_get_user_buckets uid="
1849                         << s->user->user_id << dendl;
1850       break;
1851     }
1852
1853     /* We need to have stats for all our policies - even if a given policy
1854      * isn't actually used in a given account. In such situation its usage
1855      * stats would be simply full of zeros. */
1856     for (const auto& policy : store->get_zonegroup().placement_targets) {
1857       policies_stats.emplace(policy.second.name,
1858                              decltype(policies_stats)::mapped_type());
1859     }
1860
1861     std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
1862     for (const auto& kv : m) {
1863       const auto& bucket = kv.second;
1864
1865       global_stats.bytes_used += bucket.size;
1866       global_stats.bytes_used_rounded += bucket.size_rounded;
1867       global_stats.objects_count += bucket.count;
1868
1869       /* operator[] still can create a new entry for storage policy seen
1870        * for first time. */
1871       auto& policy_stats = policies_stats[bucket.placement_rule];
1872       policy_stats.bytes_used += bucket.size;
1873       policy_stats.bytes_used_rounded += bucket.size_rounded;
1874       policy_stats.buckets_count++;
1875       policy_stats.objects_count += bucket.count;
1876     }
1877     global_stats.buckets_count += m.size();
1878     total_count += m.size();
1879
1880     done = (m.size() < read_count || (limit >= 0 && total_count >= (uint64_t)limit));
1881
1882     if (!started) {
1883       send_response_begin(buckets.count() > 0);
1884       started = true;
1885     }
1886
1887     if (!m.empty()) {
1888       map<string, RGWBucketEnt>::reverse_iterator riter = m.rbegin();
1889       marker = riter->first;
1890
1891       handle_listing_chunk(std::move(buckets));
1892     }
1893   } while (is_truncated && !done);
1894
1895 send_end:
1896   if (!started) {
1897     send_response_begin(false);
1898   }
1899   send_response_end();
1900 }
1901
1902 void RGWGetUsage::execute()
1903 {
1904   uint64_t start_epoch = 0;
1905   uint64_t end_epoch = (uint64_t)-1;
1906   op_ret = get_params();
1907   if (op_ret < 0)
1908     return;
1909     
1910   if (!start_date.empty()) {
1911     op_ret = utime_t::parse_date(start_date, &start_epoch, NULL);
1912     if (op_ret < 0) {
1913       ldout(store->ctx(), 0) << "ERROR: failed to parse start date" << dendl;
1914       return;
1915     }
1916   }
1917     
1918   if (!end_date.empty()) {
1919     op_ret = utime_t::parse_date(end_date, &end_epoch, NULL);
1920     if (op_ret < 0) {
1921       ldout(store->ctx(), 0) << "ERROR: failed to parse end date" << dendl;
1922       return;
1923     }
1924   }
1925      
1926   uint32_t max_entries = 1000;
1927
1928   bool is_truncated = true;
1929
1930   RGWUsageIter usage_iter;
1931   
1932   while (is_truncated) {
1933     op_ret = store->read_usage(s->user->user_id, start_epoch, end_epoch, max_entries,
1934                                 &is_truncated, usage_iter, usage);
1935
1936     if (op_ret == -ENOENT) {
1937       op_ret = 0;
1938       is_truncated = false;
1939     }
1940
1941     if (op_ret < 0) {
1942       return;
1943     }    
1944   }
1945
1946   op_ret = rgw_user_sync_all_stats(store, s->user->user_id);
1947   if (op_ret < 0) {
1948     ldout(store->ctx(), 0) << "ERROR: failed to sync user stats: " << dendl;
1949     return;
1950   }
1951
1952   op_ret = rgw_user_get_all_buckets_stats(store, s->user->user_id, buckets_usage);
1953   if (op_ret < 0) {
1954     cerr << "ERROR: failed to sync user stats: " << std::endl;
1955     return ;
1956   }
1957
1958   string user_str = s->user->user_id.to_str();
1959   op_ret = store->cls_user_get_header(user_str, &header);
1960   if (op_ret < 0) {
1961     ldout(store->ctx(), 0) << "ERROR: can't read user header: "  << dendl;
1962     return;
1963   }
1964   
1965   return;
1966 }
1967
1968 int RGWStatAccount::verify_permission()
1969 {
1970   if (!verify_user_permission(s, RGW_PERM_READ)) {
1971     return -EACCES;
1972   }
1973
1974   return 0;
1975 }
1976
1977 void RGWStatAccount::execute()
1978 {
1979   string marker;
1980   bool is_truncated = false;
1981   uint64_t max_buckets = s->cct->_conf->rgw_list_buckets_max_chunk;
1982
1983   do {
1984     RGWUserBuckets buckets;
1985
1986     op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets, marker,
1987                                    string(), max_buckets, true, &is_truncated);
1988     if (op_ret < 0) {
1989       /* hmm.. something wrong here.. the user was authenticated, so it
1990          should exist */
1991       ldout(s->cct, 10) << "WARNING: failed on rgw_get_user_buckets uid="
1992                         << s->user->user_id << dendl;
1993       break;
1994     } else {
1995       /* We need to have stats for all our policies - even if a given policy
1996        * isn't actually used in a given account. In such situation its usage
1997        * stats would be simply full of zeros. */
1998       for (const auto& policy : store->get_zonegroup().placement_targets) {
1999         policies_stats.emplace(policy.second.name,
2000                                decltype(policies_stats)::mapped_type());
2001       }
2002
2003       std::map<std::string, RGWBucketEnt>& m = buckets.get_buckets();
2004       for (const auto& kv : m) {
2005         const auto& bucket = kv.second;
2006
2007         global_stats.bytes_used += bucket.size;
2008         global_stats.bytes_used_rounded += bucket.size_rounded;
2009         global_stats.objects_count += bucket.count;
2010
2011         /* operator[] still can create a new entry for storage policy seen
2012          * for first time. */
2013         auto& policy_stats = policies_stats[bucket.placement_rule];
2014         policy_stats.bytes_used += bucket.size;
2015         policy_stats.bytes_used_rounded += bucket.size_rounded;
2016         policy_stats.buckets_count++;
2017         policy_stats.objects_count += bucket.count;
2018       }
2019       global_stats.buckets_count += m.size();
2020
2021     }
2022   } while (is_truncated);
2023 }
2024
2025 int RGWGetBucketVersioning::verify_permission()
2026 {
2027   if (s->iam_policy) {
2028     if (s->iam_policy->eval(s->env, *s->auth.identity,
2029                             rgw::IAM::s3GetBucketVersioning,
2030                             ARN(s->bucket)) == Effect::Allow) {
2031       return 0;
2032     }
2033   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2034     return 0;
2035   }
2036   return -EACCES;
2037 }
2038
2039 void RGWGetBucketVersioning::pre_exec()
2040 {
2041   rgw_bucket_object_pre_exec(s);
2042 }
2043
2044 void RGWGetBucketVersioning::execute()
2045 {
2046   versioned = s->bucket_info.versioned();
2047   versioning_enabled = s->bucket_info.versioning_enabled();
2048 }
2049
2050 int RGWSetBucketVersioning::verify_permission()
2051 {
2052   if (s->iam_policy) {
2053     if (s->iam_policy->eval(s->env, *s->auth.identity,
2054                             rgw::IAM::s3PutBucketVersioning,
2055                             ARN(s->bucket)) == Effect::Allow) {
2056       return 0;
2057     }
2058   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2059     return 0;
2060   }
2061   return -EACCES;
2062 }
2063
2064 void RGWSetBucketVersioning::pre_exec()
2065 {
2066   rgw_bucket_object_pre_exec(s);
2067 }
2068
2069 void RGWSetBucketVersioning::execute()
2070 {
2071   op_ret = get_params();
2072   if (op_ret < 0)
2073     return;
2074
2075   if (!store->is_meta_master()) {
2076     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
2077     if (op_ret < 0) {
2078       ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
2079       return;
2080     }
2081   }
2082
2083   if (enable_versioning) {
2084     s->bucket_info.flags |= BUCKET_VERSIONED;
2085     s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
2086   } else {
2087     s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
2088   }
2089
2090   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
2091                                           &s->bucket_attrs);
2092   if (op_ret < 0) {
2093     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
2094                      << " returned err=" << op_ret << dendl;
2095     return;
2096   }
2097 }
2098
2099 int RGWGetBucketWebsite::verify_permission()
2100 {
2101   if (s->iam_policy) {
2102     if (s->iam_policy->eval(s->env, *s->auth.identity,
2103                            rgw::IAM::s3GetBucketWebsite,
2104                            ARN(s->bucket)) == Effect::Allow) {
2105       return 0;
2106     }
2107   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2108     return 0;
2109   }
2110
2111   return -EACCES;
2112 }
2113
2114 void RGWGetBucketWebsite::pre_exec()
2115 {
2116   rgw_bucket_object_pre_exec(s);
2117 }
2118
2119 void RGWGetBucketWebsite::execute()
2120 {
2121   if (!s->bucket_info.has_website) {
2122     op_ret = -ENOENT;
2123   }
2124 }
2125
2126 int RGWSetBucketWebsite::verify_permission()
2127 {
2128   if (s->iam_policy) {
2129     if (s->iam_policy->eval(s->env, *s->auth.identity,
2130                            rgw::IAM::s3PutBucketWebsite,
2131                            ARN(s->bucket)) == Effect::Allow) {
2132       return 0;
2133     }
2134   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2135     return 0;
2136   }
2137
2138   return -EACCES;
2139 }
2140
2141 void RGWSetBucketWebsite::pre_exec()
2142 {
2143   rgw_bucket_object_pre_exec(s);
2144 }
2145
2146 void RGWSetBucketWebsite::execute()
2147 {
2148   op_ret = get_params();
2149
2150   if (op_ret < 0)
2151     return;
2152
2153   if (!store->is_meta_master()) {
2154     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
2155     if (op_ret < 0) {
2156       ldout(s->cct, 20) << __func__ << " forward_request_to_master returned ret=" << op_ret << dendl;
2157       return;
2158     }
2159   }
2160
2161   s->bucket_info.has_website = true;
2162   s->bucket_info.website_conf = website_conf;
2163
2164   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
2165   if (op_ret < 0) {
2166     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
2167     return;
2168   }
2169 }
2170
2171 int RGWDeleteBucketWebsite::verify_permission()
2172 {
2173   if (s->user->user_id.compare(s->bucket_owner.get_id()) != 0)
2174     return -EACCES;
2175
2176   return 0;
2177 }
2178
2179 void RGWDeleteBucketWebsite::pre_exec()
2180 {
2181   rgw_bucket_object_pre_exec(s);
2182 }
2183
2184 void RGWDeleteBucketWebsite::execute()
2185 {
2186   s->bucket_info.has_website = false;
2187   s->bucket_info.website_conf = RGWBucketWebsiteConf();
2188
2189   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
2190   if (op_ret < 0) {
2191     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
2192     return;
2193   }
2194 }
2195
2196 int RGWStatBucket::verify_permission()
2197 {
2198   // This (a HEAD request on a bucket) is governed by the s3:ListBucket permission.
2199   if (!verify_bucket_permission(s, rgw::IAM::s3ListBucket)) {
2200     return -EACCES;
2201   }
2202
2203   return 0;
2204 }
2205
2206 void RGWStatBucket::pre_exec()
2207 {
2208   rgw_bucket_object_pre_exec(s);
2209 }
2210
2211 void RGWStatBucket::execute()
2212 {
2213   if (!s->bucket_exists) {
2214     op_ret = -ERR_NO_SUCH_BUCKET;
2215     return;
2216   }
2217
2218   RGWUserBuckets buckets;
2219   bucket.bucket = s->bucket;
2220   buckets.add(bucket);
2221   map<string, RGWBucketEnt>& m = buckets.get_buckets();
2222   op_ret = store->update_containers_stats(m);
2223   if (! op_ret)
2224     op_ret = -EEXIST;
2225   if (op_ret > 0) {
2226     op_ret = 0;
2227     map<string, RGWBucketEnt>::iterator iter = m.find(bucket.bucket.name);
2228     if (iter != m.end()) {
2229       bucket = iter->second;
2230     } else {
2231       op_ret = -EINVAL;
2232     }
2233   }
2234 }
2235
2236 int RGWListBucket::verify_permission()
2237 {
2238   op_ret = get_params();
2239   if (op_ret < 0) {
2240     return op_ret;
2241   }
2242
2243   if (!verify_bucket_permission(s,
2244                                 list_versions ?
2245                                 rgw::IAM::s3ListBucketVersions :
2246                                 rgw::IAM::s3ListBucket)) {
2247     return -EACCES;
2248   }
2249
2250   return 0;
2251 }
2252
2253 int RGWListBucket::parse_max_keys()
2254 {
2255   if (!max_keys.empty()) {
2256     char *endptr;
2257     max = strtol(max_keys.c_str(), &endptr, 10);
2258     if (endptr) {
2259       while (*endptr && isspace(*endptr)) // ignore white space
2260         endptr++;
2261       if (*endptr) {
2262         return -EINVAL;
2263       }
2264     }
2265   } else {
2266     max = default_max;
2267   }
2268
2269   return 0;
2270 }
2271
2272 void RGWListBucket::pre_exec()
2273 {
2274   rgw_bucket_object_pre_exec(s);
2275 }
2276
2277 void RGWListBucket::execute()
2278 {
2279   if (!s->bucket_exists) {
2280     op_ret = -ERR_NO_SUCH_BUCKET;
2281     return;
2282   }
2283
2284   if (need_container_stats()) {
2285     map<string, RGWBucketEnt> m;
2286     m[s->bucket.name] = RGWBucketEnt();
2287     m.begin()->second.bucket = s->bucket;
2288     op_ret = store->update_containers_stats(m);
2289     if (op_ret > 0) {
2290       bucket = m.begin()->second;
2291     }
2292   }
2293
2294   RGWRados::Bucket target(store, s->bucket_info);
2295   if (shard_id >= 0) {
2296     target.set_shard_id(shard_id);
2297   }
2298   RGWRados::Bucket::List list_op(&target);
2299
2300   list_op.params.prefix = prefix;
2301   list_op.params.delim = delimiter;
2302   list_op.params.marker = marker;
2303   list_op.params.end_marker = end_marker;
2304   list_op.params.list_versions = list_versions;
2305
2306   op_ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
2307   if (op_ret >= 0) {
2308     next_marker = list_op.get_next_marker();
2309   }
2310 }
2311
2312 int RGWGetBucketLogging::verify_permission()
2313 {
2314   if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2315     return -EACCES;
2316   }
2317
2318   return 0;
2319 }
2320
2321 int RGWGetBucketLocation::verify_permission()
2322 {
2323   if (s->iam_policy) {
2324     if (s->iam_policy->eval(s->env, *s->auth.identity,
2325                             rgw::IAM::s3GetBucketLocation,
2326                             ARN(s->bucket)) == Effect::Allow) {
2327       return 0;
2328     }
2329   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
2330     return 0;
2331   }
2332   return -EACCES;
2333 }
2334
2335 int RGWCreateBucket::verify_permission()
2336 {
2337   /* This check is mostly needed for S3 that doesn't support account ACL.
2338    * Swift doesn't allow to delegate any permission to an anonymous user,
2339    * so it will become an early exit in such case. */
2340   if (s->auth.identity->is_anonymous()) {
2341     return -EACCES;
2342   }
2343
2344   if (!verify_user_permission(s, RGW_PERM_WRITE)) {
2345     return -EACCES;
2346   }
2347
2348   if (s->user->user_id.tenant != s->bucket_tenant) {
2349     ldout(s->cct, 10) << "user cannot create a bucket in a different tenant"
2350                       << " (user_id.tenant=" << s->user->user_id.tenant
2351                       << " requested=" << s->bucket_tenant << ")"
2352                       << dendl;
2353     return -EACCES;
2354   }
2355   if (s->user->max_buckets < 0) {
2356     return -EPERM;
2357   }
2358
2359   if (s->user->max_buckets) {
2360     RGWUserBuckets buckets;
2361     string marker;
2362     bool is_truncated = false;
2363     op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
2364                                    marker, string(), s->user->max_buckets,
2365                                    false, &is_truncated);
2366     if (op_ret < 0) {
2367       return op_ret;
2368     }
2369
2370     if ((int)buckets.count() >= s->user->max_buckets) {
2371       return -ERR_TOO_MANY_BUCKETS;
2372     }
2373   }
2374
2375   return 0;
2376 }
2377
2378 static int forward_request_to_master(struct req_state *s, obj_version *objv,
2379                                     RGWRados *store, bufferlist& in_data,
2380                                     JSONParser *jp, req_info *forward_info)
2381 {
2382   if (!store->rest_master_conn) {
2383     ldout(s->cct, 0) << "rest connection is invalid" << dendl;
2384     return -EINVAL;
2385   }
2386   ldout(s->cct, 0) << "sending request to master zonegroup" << dendl;
2387   bufferlist response;
2388   string uid_str = s->user->user_id.to_str();
2389 #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
2390   int ret = store->rest_master_conn->forward(uid_str, (forward_info ? *forward_info : s->info),
2391                                              objv, MAX_REST_RESPONSE, &in_data, &response);
2392   if (ret < 0)
2393     return ret;
2394
2395   ldout(s->cct, 20) << "response: " << response.c_str() << dendl;
2396   if (jp && !jp->parse(response.c_str(), response.length())) {
2397     ldout(s->cct, 0) << "failed parsing response from master zonegroup" << dendl;
2398     return -EINVAL;
2399   }
2400
2401   return 0;
2402 }
2403
2404 void RGWCreateBucket::pre_exec()
2405 {
2406   rgw_bucket_object_pre_exec(s);
2407 }
2408
2409 static void prepare_add_del_attrs(const map<string, bufferlist>& orig_attrs,
2410                                   map<string, bufferlist>& out_attrs,
2411                                   map<string, bufferlist>& out_rmattrs)
2412 {
2413   for (const auto& kv : orig_attrs) {
2414     const string& name = kv.first;
2415
2416     /* Check if the attr is user-defined metadata item. */
2417     if (name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1,
2418                      RGW_ATTR_META_PREFIX) == 0) {
2419       /* For the objects all existing meta attrs have to be removed. */
2420       out_rmattrs[name] = kv.second;
2421     } else if (out_attrs.find(name) == std::end(out_attrs)) {
2422       out_attrs[name] = kv.second;
2423     }
2424   }
2425 }
2426
2427 /* Fuse resource metadata basing on original attributes in @orig_attrs, set
2428  * of _custom_ attribute names to remove in @rmattr_names and attributes in
2429  * @out_attrs. Place results in @out_attrs.
2430  *
2431  * NOTE: it's supposed that all special attrs already present in @out_attrs
2432  * will be preserved without any change. Special attributes are those which
2433  * names start with RGW_ATTR_META_PREFIX. They're complement to custom ones
2434  * used for X-Account-Meta-*, X-Container-Meta-*, X-Amz-Meta and so on.  */
2435 static void prepare_add_del_attrs(const map<string, bufferlist>& orig_attrs,
2436                                   const set<string>& rmattr_names,
2437                                   map<string, bufferlist>& out_attrs)
2438 {
2439   for (const auto& kv : orig_attrs) {
2440     const string& name = kv.first;
2441
2442     /* Check if the attr is user-defined metadata item. */
2443     if (name.compare(0, strlen(RGW_ATTR_META_PREFIX),
2444                      RGW_ATTR_META_PREFIX) == 0) {
2445       /* For the buckets all existing meta attrs are preserved,
2446          except those that are listed in rmattr_names. */
2447       if (rmattr_names.find(name) != std::end(rmattr_names)) {
2448         const auto aiter = out_attrs.find(name);
2449
2450         if (aiter != std::end(out_attrs)) {
2451           out_attrs.erase(aiter);
2452         }
2453       } else {
2454         /* emplace() won't alter the map if the key is already present.
2455          * This behaviour is fully intensional here. */
2456         out_attrs.emplace(kv);
2457       }
2458     } else if (out_attrs.find(name) == std::end(out_attrs)) {
2459       out_attrs[name] = kv.second;
2460     }
2461   }
2462 }
2463
2464
2465 static void populate_with_generic_attrs(const req_state * const s,
2466                                         map<string, bufferlist>& out_attrs)
2467 {
2468   for (const auto& kv : s->generic_attrs) {
2469     bufferlist& attrbl = out_attrs[kv.first];
2470     const string& val = kv.second;
2471     attrbl.clear();
2472     attrbl.append(val.c_str(), val.size() + 1);
2473   }
2474 }
2475
2476
2477 static int filter_out_quota_info(std::map<std::string, bufferlist>& add_attrs,
2478                                  const std::set<std::string>& rmattr_names,
2479                                  RGWQuotaInfo& quota,
2480                                  bool * quota_extracted = nullptr)
2481 {
2482   bool extracted = false;
2483
2484   /* Put new limit on max objects. */
2485   auto iter = add_attrs.find(RGW_ATTR_QUOTA_NOBJS);
2486   std::string err;
2487   if (std::end(add_attrs) != iter) {
2488     quota.max_objects =
2489       static_cast<int64_t>(strict_strtoll(iter->second.c_str(), 10, &err));
2490     if (!err.empty()) {
2491       return -EINVAL;
2492     }
2493     add_attrs.erase(iter);
2494     extracted = true;
2495   }
2496
2497   /* Put new limit on bucket (container) size. */
2498   iter = add_attrs.find(RGW_ATTR_QUOTA_MSIZE);
2499   if (iter != add_attrs.end()) {
2500     quota.max_size =
2501       static_cast<int64_t>(strict_strtoll(iter->second.c_str(), 10, &err));
2502     if (!err.empty()) {
2503       return -EINVAL;
2504     }
2505     add_attrs.erase(iter);
2506     extracted = true;
2507   }
2508
2509   for (const auto& name : rmattr_names) {
2510     /* Remove limit on max objects. */
2511     if (name.compare(RGW_ATTR_QUOTA_NOBJS) == 0) {
2512       quota.max_objects = -1;
2513       extracted = true;
2514     }
2515
2516     /* Remove limit on max bucket size. */
2517     if (name.compare(RGW_ATTR_QUOTA_MSIZE) == 0) {
2518       quota.max_size = -1;
2519       extracted = true;
2520     }
2521   }
2522
2523   /* Swift requries checking on raw usage instead of the 4 KiB rounded one. */
2524   quota.check_on_raw = true;
2525   quota.enabled = quota.max_size > 0 || quota.max_objects > 0;
2526
2527   if (quota_extracted) {
2528     *quota_extracted = extracted;
2529   }
2530
2531   return 0;
2532 }
2533
2534
2535 static void filter_out_website(std::map<std::string, ceph::bufferlist>& add_attrs,
2536                                const std::set<std::string>& rmattr_names,
2537                                RGWBucketWebsiteConf& ws_conf)
2538 {
2539   std::string lstval;
2540
2541   /* Let's define a mapping between each custom attribute and the memory where
2542    * attribute's value should be stored. The memory location is expressed by
2543    * a non-const reference. */
2544   const auto mapping  = {
2545     std::make_pair(RGW_ATTR_WEB_INDEX,     std::ref(ws_conf.index_doc_suffix)),
2546     std::make_pair(RGW_ATTR_WEB_ERROR,     std::ref(ws_conf.error_doc)),
2547     std::make_pair(RGW_ATTR_WEB_LISTINGS,  std::ref(lstval)),
2548     std::make_pair(RGW_ATTR_WEB_LIST_CSS,  std::ref(ws_conf.listing_css_doc)),
2549     std::make_pair(RGW_ATTR_SUBDIR_MARKER, std::ref(ws_conf.subdir_marker))
2550   };
2551
2552   for (const auto& kv : mapping) {
2553     const char * const key = kv.first;
2554     auto& target = kv.second;
2555
2556     auto iter = add_attrs.find(key);
2557
2558     if (std::end(add_attrs) != iter) {
2559       /* The "target" is a reference to ws_conf. */
2560       target = iter->second.c_str();
2561       add_attrs.erase(iter);
2562     }
2563
2564     if (rmattr_names.count(key)) {
2565       target = std::string();
2566     }
2567   }
2568
2569   if (! lstval.empty()) {
2570     ws_conf.listing_enabled = boost::algorithm::iequals(lstval, "true");
2571   }
2572 }
2573
2574
2575 void RGWCreateBucket::execute()
2576 {
2577   RGWAccessControlPolicy old_policy(s->cct);
2578   buffer::list aclbl;
2579   buffer::list corsbl;
2580   bool existed;
2581   string bucket_name;
2582   rgw_make_bucket_entry_name(s->bucket_tenant, s->bucket_name, bucket_name);
2583   rgw_raw_obj obj(store->get_zone_params().domain_root, bucket_name);
2584   obj_version objv, *pobjv = NULL;
2585
2586   op_ret = get_params();
2587   if (op_ret < 0)
2588     return;
2589
2590   if (!location_constraint.empty() &&
2591       !store->has_zonegroup_api(location_constraint)) {
2592       ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
2593                        << " can't be found." << dendl;
2594       op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
2595       s->err.message = "The specified location-constraint is not valid";
2596       return;
2597   }
2598
2599   if (!store->get_zonegroup().is_master_zonegroup() && !location_constraint.empty() &&
2600       store->get_zonegroup().api_name != location_constraint) {
2601     ldout(s->cct, 0) << "location constraint (" << location_constraint << ")"
2602                      << " doesn't match zonegroup" << " (" << store->get_zonegroup().api_name << ")"
2603                      << dendl;
2604     op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
2605     s->err.message = "The specified location-constraint is not valid";
2606     return;
2607   }
2608
2609   const auto& zonegroup = store->get_zonegroup();
2610   if (!placement_rule.empty() &&
2611       !zonegroup.placement_targets.count(placement_rule)) {
2612     ldout(s->cct, 0) << "placement target (" << placement_rule << ")"
2613                      << " doesn't exist in the placement targets of zonegroup"
2614                      << " (" << store->get_zonegroup().api_name << ")" << dendl;
2615     op_ret = -ERR_INVALID_LOCATION_CONSTRAINT;
2616     s->err.message = "The specified placement target does not exist";
2617     return;
2618   }
2619
2620   /* we need to make sure we read bucket info, it's not read before for this
2621    * specific request */
2622   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
2623   op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, s->bucket_name,
2624                                   s->bucket_info, NULL, &s->bucket_attrs);
2625   if (op_ret < 0 && op_ret != -ENOENT)
2626     return;
2627   s->bucket_exists = (op_ret != -ENOENT);
2628
2629   s->bucket_owner.set_id(s->user->user_id);
2630   s->bucket_owner.set_name(s->user->display_name);
2631   if (s->bucket_exists) {
2632     int r = get_bucket_policy_from_attr(s->cct, store, s->bucket_info,
2633                                         s->bucket_attrs, &old_policy);
2634     if (r >= 0)  {
2635       if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
2636         op_ret = -EEXIST;
2637         return;
2638       }
2639     }
2640   }
2641
2642   RGWBucketInfo master_info;
2643   rgw_bucket *pmaster_bucket;
2644   uint32_t *pmaster_num_shards;
2645   real_time creation_time;
2646
2647   if (!store->is_meta_master()) {
2648     JSONParser jp;
2649     op_ret = forward_request_to_master(s, NULL, store, in_data, &jp);
2650     if (op_ret < 0) {
2651       return;
2652     }
2653
2654     JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
2655     JSONDecoder::decode_json("object_ver", objv, &jp);
2656     JSONDecoder::decode_json("bucket_info", master_info, &jp);
2657     ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
2658     ldout(s->cct, 20) << "got creation time: << " << master_info.creation_time << dendl;
2659     pmaster_bucket= &master_info.bucket;
2660     creation_time = master_info.creation_time;
2661     pmaster_num_shards = &master_info.num_shards;
2662     pobjv = &objv;
2663   } else {
2664     pmaster_bucket = NULL;
2665     pmaster_num_shards = NULL;
2666   }
2667
2668   string zonegroup_id;
2669
2670   if (s->system_request) {
2671     zonegroup_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "zonegroup");
2672     if (zonegroup_id.empty()) {
2673       zonegroup_id = store->get_zonegroup().get_id();
2674     }
2675   } else {
2676     zonegroup_id = store->get_zonegroup().get_id();
2677   }
2678
2679   if (s->bucket_exists) {
2680     string selected_placement_rule;
2681     rgw_bucket bucket;
2682     bucket.tenant = s->bucket_tenant;
2683     bucket.name = s->bucket_name;
2684     op_ret = store->select_bucket_placement(*(s->user), zonegroup_id,
2685                                             placement_rule,
2686                                             &selected_placement_rule, nullptr);
2687     if (selected_placement_rule != s->bucket_info.placement_rule) {
2688       op_ret = -EEXIST;
2689       return;
2690     }
2691   }
2692
2693   /* Encode special metadata first as we're using std::map::emplace under
2694    * the hood. This method will add the new items only if the map doesn't
2695    * contain such keys yet. */
2696   policy.encode(aclbl);
2697   emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
2698
2699   if (has_cors) {
2700     cors_config.encode(corsbl);
2701     emplace_attr(RGW_ATTR_CORS, std::move(corsbl));
2702   }
2703
2704   RGWQuotaInfo quota_info;
2705   const RGWQuotaInfo * pquota_info = nullptr;
2706   if (need_metadata_upload()) {
2707     /* It's supposed that following functions WILL NOT change any special
2708      * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
2709     op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
2710     if (op_ret < 0) {
2711       return;
2712     }
2713     prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
2714     populate_with_generic_attrs(s, attrs);
2715
2716     op_ret = filter_out_quota_info(attrs, rmattr_names, quota_info);
2717     if (op_ret < 0) {
2718       return;
2719     } else {
2720       pquota_info = &quota_info;
2721     }
2722
2723     /* Web site of Swift API. */
2724     filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
2725     s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
2726   }
2727
2728   s->bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */
2729   s->bucket.name = s->bucket_name;
2730
2731   /* Handle updates of the metadata for Swift's object versioning. */
2732   if (swift_ver_location) {
2733     s->bucket_info.swift_ver_location = *swift_ver_location;
2734     s->bucket_info.swift_versioning = (! swift_ver_location->empty());
2735   }
2736
2737   op_ret = store->create_bucket(*(s->user), s->bucket, zonegroup_id,
2738                                 placement_rule, s->bucket_info.swift_ver_location,
2739                                 pquota_info, attrs,
2740                                 info, pobjv, &ep_objv, creation_time,
2741                                 pmaster_bucket, pmaster_num_shards, true);
2742   /* continue if EEXIST and create_bucket will fail below.  this way we can
2743    * recover from a partial create by retrying it. */
2744   ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret << " bucket=" << s->bucket << dendl;
2745
2746   if (op_ret && op_ret != -EEXIST)
2747     return;
2748
2749   existed = (op_ret == -EEXIST);
2750
2751   if (existed) {
2752     /* bucket already existed, might have raced with another bucket creation, or
2753      * might be partial bucket creation that never completed. Read existing bucket
2754      * info, verify that the reported bucket owner is the current user.
2755      * If all is ok then update the user's list of buckets.
2756      * Otherwise inform client about a name conflict.
2757      */
2758     if (info.owner.compare(s->user->user_id) != 0) {
2759       op_ret = -EEXIST;
2760       return;
2761     }
2762     s->bucket = info.bucket;
2763   }
2764
2765   op_ret = rgw_link_bucket(store, s->user->user_id, s->bucket,
2766                            info.creation_time, false);
2767   if (op_ret && !existed && op_ret != -EEXIST) {
2768     /* if it exists (or previously existed), don't remove it! */
2769     op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
2770                                s->bucket.name);
2771     if (op_ret < 0) {
2772       ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
2773                        << dendl;
2774     }
2775   } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
2776     op_ret = -ERR_BUCKET_EXISTS;
2777   }
2778
2779   if (need_metadata_upload() && existed) {
2780     /* OK, it looks we lost race with another request. As it's required to
2781      * handle metadata fusion and upload, the whole operation becomes very
2782      * similar in nature to PutMetadataBucket. However, as the attrs may
2783      * changed in the meantime, we have to refresh. */
2784     short tries = 0;
2785     do {
2786       RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
2787       RGWBucketInfo binfo;
2788       map<string, bufferlist> battrs;
2789
2790       op_ret = store->get_bucket_info(obj_ctx, s->bucket_tenant, s->bucket_name,
2791                                       binfo, nullptr, &battrs);
2792       if (op_ret < 0) {
2793         return;
2794       } else if (binfo.owner.compare(s->user->user_id) != 0) {
2795         /* New bucket doesn't belong to the account we're operating on. */
2796         op_ret = -EEXIST;
2797         return;
2798       } else {
2799         s->bucket_info = binfo;
2800         s->bucket_attrs = battrs;
2801       }
2802
2803       attrs.clear();
2804
2805       op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
2806       if (op_ret < 0) {
2807         return;
2808       }
2809       prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
2810       populate_with_generic_attrs(s, attrs);
2811       op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
2812       if (op_ret < 0) {
2813         return;
2814       }
2815
2816       /* Handle updates of the metadata for Swift's object versioning. */
2817       if (swift_ver_location) {
2818         s->bucket_info.swift_ver_location = *swift_ver_location;
2819         s->bucket_info.swift_versioning = (! swift_ver_location->empty());
2820       }
2821
2822       /* Web site of Swift API. */
2823       filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
2824       s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
2825
2826       /* This will also set the quota on the bucket. */
2827       op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
2828                                     &s->bucket_info.objv_tracker);
2829     } while (op_ret == -ECANCELED && tries++ < 20);
2830
2831     /* Restore the proper return code. */
2832     if (op_ret >= 0) {
2833       op_ret = -ERR_BUCKET_EXISTS;
2834     }
2835   }
2836 }
2837
2838 int RGWDeleteBucket::verify_permission()
2839 {
2840   if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucket)) {
2841     return -EACCES;
2842   }
2843
2844   return 0;
2845 }
2846
2847 void RGWDeleteBucket::pre_exec()
2848 {
2849   rgw_bucket_object_pre_exec(s);
2850 }
2851
2852 void RGWDeleteBucket::execute()
2853 {
2854   op_ret = -EINVAL;
2855
2856   if (s->bucket_name.empty())
2857     return;
2858
2859   if (!s->bucket_exists) {
2860     ldout(s->cct, 0) << "ERROR: bucket " << s->bucket_name << " not found" << dendl;
2861     op_ret = -ERR_NO_SUCH_BUCKET;
2862     return;
2863   }
2864   RGWObjVersionTracker ot;
2865   ot.read_version = s->bucket_info.ep_objv;
2866
2867   if (s->system_request) {
2868     string tag = s->info.args.get(RGW_SYS_PARAM_PREFIX "tag");
2869     string ver_str = s->info.args.get(RGW_SYS_PARAM_PREFIX "ver");
2870     if (!tag.empty()) {
2871       ot.read_version.tag = tag;
2872       uint64_t ver;
2873       string err;
2874       ver = strict_strtol(ver_str.c_str(), 10, &err);
2875       if (!err.empty()) {
2876         ldout(s->cct, 0) << "failed to parse ver param" << dendl;
2877         op_ret = -EINVAL;
2878         return;
2879       }
2880       ot.read_version.ver = ver;
2881     }
2882   }
2883
2884   op_ret = rgw_bucket_sync_user_stats(store, s->user->user_id, s->bucket_info);
2885   if ( op_ret < 0) {
2886      ldout(s->cct, 1) << "WARNING: failed to sync user stats before bucket delete: op_ret= " << op_ret << dendl;
2887   }
2888   
2889   op_ret = store->check_bucket_empty(s->bucket_info);
2890   if (op_ret < 0) {
2891     return;
2892   }
2893
2894   if (!store->is_meta_master()) {
2895     bufferlist in_data;
2896     op_ret = forward_request_to_master(s, &ot.read_version, store, in_data,
2897                                        NULL);
2898     if (op_ret < 0) {
2899       if (op_ret == -ENOENT) {
2900         /* adjust error, we want to return with NoSuchBucket and not
2901          * NoSuchKey */
2902         op_ret = -ERR_NO_SUCH_BUCKET;
2903       }
2904       return;
2905     }
2906   }
2907
2908   string prefix, delimiter;
2909
2910   if (s->prot_flags & RGW_REST_SWIFT) {
2911     string path_args;
2912     path_args = s->info.args.get("path");
2913     if (!path_args.empty()) {
2914       if (!delimiter.empty() || !prefix.empty()) {
2915         op_ret = -EINVAL;
2916         return;
2917       }
2918       prefix = path_args;
2919       delimiter="/";
2920     }
2921   }
2922
2923   op_ret = abort_bucket_multiparts(store, s->cct, s->bucket_info, prefix, delimiter);
2924
2925   if (op_ret < 0) {
2926     return;
2927   }
2928
2929   op_ret = store->delete_bucket(s->bucket_info, ot, false);
2930
2931   if (op_ret == -ECANCELED) {
2932     // lost a race, either with mdlog sync or another delete bucket operation.
2933     // in either case, we've already called rgw_unlink_bucket()
2934     op_ret = 0;
2935     return;
2936   }
2937
2938   if (op_ret == 0) {
2939     op_ret = rgw_unlink_bucket(store, s->user->user_id, s->bucket.tenant,
2940                                s->bucket.name, false);
2941     if (op_ret < 0) {
2942       ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
2943                        << dendl;
2944     }
2945   }
2946
2947   if (op_ret < 0) {
2948     return;
2949   }
2950
2951
2952 }
2953
2954 int RGWPutObj::verify_permission()
2955 {
2956   if (copy_source) {
2957
2958     RGWAccessControlPolicy cs_acl(s->cct);
2959     optional<Policy> policy;
2960     map<string, bufferlist> cs_attrs;
2961     rgw_bucket cs_bucket(copy_source_bucket_info.bucket);
2962     rgw_obj_key cs_object(copy_source_object_name, copy_source_version_id);
2963
2964     rgw_obj obj(cs_bucket, cs_object);
2965     store->set_atomic(s->obj_ctx, obj);
2966     store->set_prefetch_data(s->obj_ctx, obj);
2967
2968     /* check source object permissions */
2969     if (read_obj_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_acl, policy,
2970                         cs_bucket, cs_object) < 0) {
2971       return -EACCES;
2972     }
2973
2974     /* admin request overrides permission checks */
2975     if (! s->auth.identity->is_admin_of(cs_acl.get_owner().get_id())) {
2976       if (policy) {
2977         auto e = policy->eval(s->env, *s->auth.identity,
2978                               cs_object.instance.empty() ?
2979                               rgw::IAM::s3GetObject :
2980                               rgw::IAM::s3GetObjectVersion,
2981                               rgw::IAM::ARN(obj));
2982         if (e == Effect::Deny) {
2983           return -EACCES; 
2984         } else if (e == Effect::Pass &&
2985                    !cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
2986                                                 RGW_PERM_READ)) {
2987           return -EACCES;
2988         }
2989       } else if (!cs_acl.verify_permission(*s->auth.identity, s->perm_mask,
2990                                            RGW_PERM_READ)) {
2991         return -EACCES;
2992       }
2993     }
2994   }
2995
2996   if (s->iam_policy) {
2997     auto e = s->iam_policy->eval(s->env, *s->auth.identity,
2998                                  rgw::IAM::s3PutObject,
2999                                  rgw_obj(s->bucket, s->object));
3000     if (e == Effect::Allow) {
3001       return 0;
3002     } else if (e == Effect::Deny) {
3003       return -EACCES;
3004     }
3005   }
3006
3007   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
3008     return -EACCES;
3009   }
3010
3011   return 0;
3012 }
3013
3014 void RGWPutObjProcessor_Multipart::get_mp(RGWMPObj** _mp){
3015   *_mp = &mp;
3016 }
3017
3018 int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand)
3019 {
3020   string oid = obj_str;
3021   upload_id = s->info.args.get("uploadId");
3022   if (!oid_rand) {
3023     mp.init(oid, upload_id);
3024   } else {
3025     mp.init(oid, upload_id, *oid_rand);
3026   }
3027
3028   part_num = s->info.args.get("partNumber");
3029   if (part_num.empty()) {
3030     ldout(s->cct, 10) << "part number is empty" << dendl;
3031     return -EINVAL;
3032   }
3033
3034   string err;
3035   uint64_t num = (uint64_t)strict_strtol(part_num.c_str(), 10, &err);
3036
3037   if (!err.empty()) {
3038     ldout(s->cct, 10) << "bad part number: " << part_num << ": " << err << dendl;
3039     return -EINVAL;
3040   }
3041
3042   string upload_prefix = oid + ".";
3043
3044   if (!oid_rand) {
3045     upload_prefix.append(upload_id);
3046   } else {
3047     upload_prefix.append(*oid_rand);
3048   }
3049
3050   rgw_obj target_obj;
3051   target_obj.init(bucket, oid);
3052
3053   manifest.set_prefix(upload_prefix);
3054
3055   manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, num);
3056
3057   int r = manifest_gen.create_begin(store->ctx(), &manifest, s->bucket_info.placement_rule, bucket, target_obj);
3058   if (r < 0) {
3059     return r;
3060   }
3061
3062   cur_obj = manifest_gen.get_cur_obj(store);
3063   rgw_raw_obj_to_obj(bucket, cur_obj, &head_obj);
3064   head_obj.index_hash_source = obj_str;
3065
3066   r = prepare_init(store, NULL);
3067   if (r < 0) {
3068     return r;
3069   }
3070
3071   return 0;
3072 }
3073
3074 int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
3075                                               const string& etag,
3076                                               real_time *mtime, real_time set_mtime,
3077                                               map<string, bufferlist>& attrs,
3078                                               real_time delete_at,
3079                                               const char *if_match,
3080                                               const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
3081 {
3082   complete_writing_data();
3083
3084   RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
3085   op_target.set_versioning_disabled(true);
3086   RGWRados::Object::Write head_obj_op(&op_target);
3087
3088   head_obj_op.meta.set_mtime = set_mtime;
3089   head_obj_op.meta.mtime = mtime;
3090   head_obj_op.meta.owner = s->owner.get_id();
3091   head_obj_op.meta.delete_at = delete_at;
3092   head_obj_op.meta.zones_trace = zones_trace;
3093   head_obj_op.meta.modify_tail = true;
3094
3095   int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
3096   if (r < 0)
3097     return r;
3098
3099   bufferlist bl;
3100   RGWUploadPartInfo info;
3101   string p = "part.";
3102   bool sorted_omap = is_v2_upload_id(upload_id);
3103
3104   if (sorted_omap) {
3105     string err;
3106     int part_num_int = strict_strtol(part_num.c_str(), 10, &err);
3107     if (!err.empty()) {
3108       dout(10) << "bad part number specified: " << part_num << dendl;
3109       return -EINVAL;
3110     }
3111     char buf[32];
3112     snprintf(buf, sizeof(buf), "%08d", part_num_int);
3113     p.append(buf);
3114   } else {
3115     p.append(part_num);
3116   }
3117   info.num = atoi(part_num.c_str());
3118   info.etag = etag;
3119   info.size = obj_len;
3120   info.accounted_size = accounted_size;
3121   info.modified = real_clock::now();
3122   info.manifest = manifest;
3123
3124   bool compressed;
3125   r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
3126   if (r < 0) {
3127     dout(1) << "cannot get compression info" << dendl;
3128     return r;
3129   }
3130
3131   ::encode(info, bl);
3132
3133   string multipart_meta_obj = mp.get_meta();
3134
3135   rgw_obj meta_obj;
3136   meta_obj.init_ns(bucket, multipart_meta_obj, mp_ns);
3137   meta_obj.set_in_extra_data(true);
3138
3139   rgw_raw_obj raw_meta_obj;
3140
3141   store->obj_to_raw(s->bucket_info.placement_rule, meta_obj, &raw_meta_obj);
3142
3143   r = store->omap_set(raw_meta_obj, p, bl);
3144
3145   return r;
3146 }
3147
3148 RGWPutObjProcessor *RGWPutObj::select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart)
3149 {
3150   RGWPutObjProcessor *processor;
3151
3152   bool multipart = s->info.args.exists("uploadId");
3153
3154   uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
3155
3156   if (!multipart) {
3157     processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
3158     (static_cast<RGWPutObjProcessor_Atomic *>(processor))->set_olh_epoch(olh_epoch);
3159     (static_cast<RGWPutObjProcessor_Atomic *>(processor))->set_version_id(version_id);
3160   } else {
3161     processor = new RGWPutObjProcessor_Multipart(obj_ctx, s->bucket_info, part_size, s);
3162   }
3163
3164   if (is_multipart) {
3165     *is_multipart = multipart;
3166   }
3167
3168   return processor;
3169 }
3170
3171 void RGWPutObj::dispose_processor(RGWPutObjDataProcessor *processor)
3172 {
3173   delete processor;
3174 }
3175
3176 void RGWPutObj::pre_exec()
3177 {
3178   rgw_bucket_object_pre_exec(s);
3179 }
3180
3181 class RGWPutObj_CB : public RGWGetDataCB
3182 {
3183   RGWPutObj *op;
3184 public:
3185   RGWPutObj_CB(RGWPutObj *_op) : op(_op) {}
3186   ~RGWPutObj_CB() override {}
3187
3188   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
3189     return op->get_data_cb(bl, bl_ofs, bl_len);
3190   }
3191 };
3192
3193 int RGWPutObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
3194 {
3195   bufferlist bl_tmp;
3196   bl.copy(bl_ofs, bl_len, bl_tmp);
3197
3198   bl_aux.append(bl_tmp);
3199
3200   return bl_len;
3201 }
3202
3203 int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
3204 {
3205   RGWPutObj_CB cb(this);
3206   RGWGetDataCB* filter = &cb;
3207   boost::optional<RGWGetObj_Decompress> decompress;
3208   std::unique_ptr<RGWGetDataCB> decrypt;
3209   RGWCompressionInfo cs_info;
3210   map<string, bufferlist> attrs;
3211   map<string, bufferlist>::iterator attr_iter;
3212   int ret = 0;
3213
3214   uint64_t obj_size;
3215   int64_t new_ofs, new_end;
3216
3217   new_ofs = fst;
3218   new_end = lst;
3219
3220   rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
3221   rgw_obj obj(copy_source_bucket_info.bucket, obj_key);
3222
3223   RGWRados::Object op_target(store, copy_source_bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
3224   RGWRados::Object::Read read_op(&op_target);
3225   read_op.params.obj_size = &obj_size;
3226   read_op.params.attrs = &attrs;
3227
3228   ret = read_op.prepare();
3229   if (ret < 0)
3230     return ret;
3231
3232   bool need_decompress;
3233   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
3234   if (op_ret < 0) {
3235           lderr(s->cct) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
3236       return -EIO;
3237   }
3238
3239   bool partial_content = true;
3240   if (need_decompress)
3241   {
3242     obj_size = cs_info.orig_size;
3243     decompress.emplace(s->cct, &cs_info, partial_content, filter);
3244     filter = &*decompress;
3245   }
3246
3247   attr_iter = attrs.find(RGW_ATTR_MANIFEST);
3248   op_ret = this->get_decrypt_filter(&decrypt,
3249                                     filter,
3250                                     attrs,
3251                                     attr_iter != attrs.end() ? &(attr_iter->second) : nullptr);
3252   if (decrypt != nullptr) {
3253     filter = decrypt.get();
3254   }
3255   if (op_ret < 0) {
3256     return ret;
3257   }
3258
3259   ret = read_op.range_to_ofs(obj_size, new_ofs, new_end);
3260   if (ret < 0)
3261     return ret;
3262
3263   filter->fixup_range(new_ofs, new_end);
3264   ret = read_op.iterate(new_ofs, new_end, filter);
3265
3266   if (ret >= 0)
3267     ret = filter->flush();
3268
3269   bl.claim_append(bl_aux);
3270
3271   return ret;
3272 }
3273
3274 // special handling for compression type = "random" with multipart uploads
3275 static CompressorRef get_compressor_plugin(const req_state *s,
3276                                            const std::string& compression_type)
3277 {
3278   if (compression_type != "random") {
3279     return Compressor::create(s->cct, compression_type);
3280   }
3281
3282   bool is_multipart{false};
3283   const auto& upload_id = s->info.args.get("uploadId", &is_multipart);
3284
3285   if (!is_multipart) {
3286     return Compressor::create(s->cct, compression_type);
3287   }
3288
3289   // use a hash of the multipart upload id so all parts use the same plugin
3290   const auto alg = std::hash<std::string>{}(upload_id) % Compressor::COMP_ALG_LAST;
3291   if (alg == Compressor::COMP_ALG_NONE) {
3292     return nullptr;
3293   }
3294   return Compressor::create(s->cct, alg);
3295 }
3296
3297 void RGWPutObj::execute()
3298 {
3299   RGWPutObjProcessor *processor = NULL;
3300   RGWPutObjDataProcessor *filter = nullptr;
3301   std::unique_ptr<RGWPutObjDataProcessor> encrypt;
3302   char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
3303   char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
3304   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
3305   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
3306   MD5 hash;
3307   bufferlist bl, aclbl, bs;
3308   int len;
3309   map<string, string>::iterator iter;
3310   bool multipart;
3311   
3312   off_t fst;
3313   off_t lst;
3314   const auto& compression_type = store->get_zone_params().get_compression_type(
3315       s->bucket_info.placement_rule);
3316   CompressorRef plugin;
3317   boost::optional<RGWPutObj_Compress> compressor;
3318
3319   bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
3320   perfcounter->inc(l_rgw_put);
3321   op_ret = -EINVAL;
3322   if (s->object.empty()) {
3323     goto done;
3324   }
3325
3326   if (!s->bucket_exists) {
3327     op_ret = -ERR_NO_SUCH_BUCKET;
3328     return;
3329   }
3330
3331   op_ret = get_params();
3332   if (op_ret < 0) {
3333     ldout(s->cct, 20) << "get_params() returned ret=" << op_ret << dendl;
3334     goto done;
3335   }
3336
3337   op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
3338   if (op_ret < 0) {
3339     ldout(s->cct, 20) << "get_system_versioning_params() returned ret="
3340                       << op_ret << dendl;
3341     goto done;
3342   }
3343
3344   if (supplied_md5_b64) {
3345     need_calc_md5 = true;
3346
3347     ldout(s->cct, 15) << "supplied_md5_b64=" << supplied_md5_b64 << dendl;
3348     op_ret = ceph_unarmor(supplied_md5_bin, &supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1],
3349                        supplied_md5_b64, supplied_md5_b64 + strlen(supplied_md5_b64));
3350     ldout(s->cct, 15) << "ceph_armor ret=" << op_ret << dendl;
3351     if (op_ret != CEPH_CRYPTO_MD5_DIGESTSIZE) {
3352       op_ret = -ERR_INVALID_DIGEST;
3353       goto done;
3354     }
3355
3356     buf_to_hex((const unsigned char *)supplied_md5_bin, CEPH_CRYPTO_MD5_DIGESTSIZE, supplied_md5);
3357     ldout(s->cct, 15) << "supplied_md5=" << supplied_md5 << dendl;
3358   }
3359
3360   if (!chunked_upload) { /* with chunked upload we don't know how big is the upload.
3361                             we also check sizes at the end anyway */
3362     op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
3363                                 user_quota, bucket_quota, s->content_length);
3364     if (op_ret < 0) {
3365       ldout(s->cct, 20) << "check_quota() returned ret=" << op_ret << dendl;
3366       goto done;
3367     }
3368     op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
3369     if (op_ret < 0) {
3370       ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
3371       goto done;
3372     }
3373   }
3374
3375   if (supplied_etag) {
3376     strncpy(supplied_md5, supplied_etag, sizeof(supplied_md5) - 1);
3377     supplied_md5[sizeof(supplied_md5) - 1] = '\0';
3378   }
3379
3380   processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
3381
3382   // no filters by default
3383   filter = processor;
3384
3385   /* Handle object versioning of Swift API. */
3386   if (! multipart) {
3387     rgw_obj obj(s->bucket, s->object);
3388     op_ret = store->swift_versioning_copy(*static_cast<RGWObjectCtx *>(s->obj_ctx),
3389                                           s->bucket_owner.get_id(),
3390                                           s->bucket_info,
3391                                           obj);
3392     if (op_ret < 0) {
3393       goto done;
3394     }
3395   }
3396
3397   op_ret = processor->prepare(store, NULL);
3398   if (op_ret < 0) {
3399     ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
3400                       << dendl;
3401     goto done;
3402   }
3403
3404   fst = copy_source_range_fst;
3405   lst = copy_source_range_lst;
3406
3407   op_ret = get_encrypt_filter(&encrypt, filter);
3408   if (op_ret < 0) {
3409     goto done;
3410   }
3411   if (encrypt != nullptr) {
3412     filter = encrypt.get();
3413   } else {
3414     //no encryption, we can try compression
3415     if (compression_type != "none") {
3416       plugin = get_compressor_plugin(s, compression_type);
3417       if (!plugin) {
3418         ldout(s->cct, 1) << "Cannot load plugin for compression type "
3419             << compression_type << dendl;
3420       } else {
3421         compressor.emplace(s->cct, plugin, filter);
3422         filter = &*compressor;
3423       }
3424     }
3425   }
3426
3427   do {
3428     bufferlist data;
3429     if (fst > lst)
3430       break;
3431     if (!copy_source) {
3432       len = get_data(data);
3433     } else {
3434       uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
3435       op_ret = get_data(fst, cur_lst, data);
3436       if (op_ret < 0)
3437         goto done;
3438       len = data.length();
3439       s->content_length += len;
3440       fst += len;
3441     }
3442     if (len < 0) {
3443       op_ret = len;
3444       goto done;
3445     }
3446
3447     if (need_calc_md5) {
3448       hash.Update((const byte *)data.c_str(), data.length());
3449     }
3450
3451     /* update torrrent */
3452     torrent.update(data);
3453
3454     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
3455      * head, e.g., multipart object we need to make sure we're the first one writing to this object
3456      */
3457     bool need_to_wait = (ofs == 0) && multipart;
3458
3459     bufferlist orig_data;
3460
3461     if (need_to_wait) {
3462       orig_data = data;
3463     }
3464
3465     op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
3466     if (op_ret < 0) {
3467       if (!need_to_wait || op_ret != -EEXIST) {
3468         ldout(s->cct, 20) << "processor->thottle_data() returned ret="
3469                           << op_ret << dendl;
3470         goto done;
3471       }
3472       /* need_to_wait == true and op_ret == -EEXIST */
3473       ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
3474
3475       /* restore original data */
3476       data.swap(orig_data);
3477
3478       /* restart processing with different oid suffix */
3479
3480       dispose_processor(processor);
3481       processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
3482       filter = processor;
3483
3484       string oid_rand;
3485       char buf[33];
3486       gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
3487       oid_rand.append(buf);
3488
3489       op_ret = processor->prepare(store, &oid_rand);
3490       if (op_ret < 0) {
3491         ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
3492                          << op_ret << dendl;
3493         goto done;
3494       }
3495
3496       op_ret = get_encrypt_filter(&encrypt, filter);
3497       if (op_ret < 0) {
3498         goto done;
3499       }
3500       if (encrypt != nullptr) {
3501         filter = encrypt.get();
3502       } else {
3503         if (compressor) {
3504           compressor.emplace(s->cct, plugin, filter);
3505           filter = &*compressor;
3506         }
3507       }
3508       op_ret = put_data_and_throttle(filter, data, ofs, false);
3509       if (op_ret < 0) {
3510         goto done;
3511       }
3512     }
3513
3514     ofs += len;
3515   } while (len > 0);
3516
3517   {
3518     bufferlist flush;
3519     op_ret = put_data_and_throttle(filter, flush, ofs, false);
3520     if (op_ret < 0) {
3521       goto done;
3522     }
3523   }
3524
3525   if (!chunked_upload && ofs != s->content_length) {
3526     op_ret = -ERR_REQUEST_TIMEOUT;
3527     goto done;
3528   }
3529   s->obj_size = ofs;
3530
3531   perfcounter->inc(l_rgw_put_b, s->obj_size);
3532
3533   op_ret = do_aws4_auth_completion();
3534   if (op_ret < 0) {
3535     goto done;
3536   }
3537
3538   op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
3539                               user_quota, bucket_quota, s->obj_size);
3540   if (op_ret < 0) {
3541     ldout(s->cct, 20) << "second check_quota() returned op_ret=" << op_ret << dendl;
3542     goto done;
3543   }
3544
3545   op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
3546   if (op_ret < 0) {
3547     ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
3548     goto done;
3549   }
3550
3551   hash.Final(m);
3552
3553   if (compressor && compressor->is_compressed()) {
3554     bufferlist tmp;
3555     RGWCompressionInfo cs_info;
3556     cs_info.compression_type = plugin->get_type_name();
3557     cs_info.orig_size = s->obj_size;
3558     cs_info.blocks = move(compressor->get_compression_blocks());
3559     ::encode(cs_info, tmp);
3560     attrs[RGW_ATTR_COMPRESSION] = tmp;
3561     ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
3562         << " with type=" << cs_info.compression_type
3563         << ", orig_size=" << cs_info.orig_size
3564         << ", blocks=" << cs_info.blocks.size() << dendl;
3565   }
3566
3567   buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
3568
3569   etag = calc_md5;
3570
3571   if (supplied_md5_b64 && strcmp(calc_md5, supplied_md5)) {
3572     op_ret = -ERR_BAD_DIGEST;
3573     goto done;
3574   }
3575
3576   policy.encode(aclbl);
3577   emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
3578
3579   if (dlo_manifest) {
3580     op_ret = encode_dlo_manifest_attr(dlo_manifest, attrs);
3581     if (op_ret < 0) {
3582       ldout(s->cct, 0) << "bad user manifest: " << dlo_manifest << dendl;
3583       goto done;
3584     }
3585     complete_etag(hash, &etag);
3586     ldout(s->cct, 10) << __func__ << ": calculated md5 for user manifest: " << etag << dendl;
3587   }
3588
3589   if (slo_info) {
3590     bufferlist manifest_bl;
3591     ::encode(*slo_info, manifest_bl);
3592     emplace_attr(RGW_ATTR_SLO_MANIFEST, std::move(manifest_bl));
3593
3594     hash.Update((byte *)slo_info->raw_data, slo_info->raw_data_len);
3595     complete_etag(hash, &etag);
3596     ldout(s->cct, 10) << __func__ << ": calculated md5 for user manifest: " << etag << dendl;
3597   }
3598
3599   if (supplied_etag && etag.compare(supplied_etag) != 0) {
3600     op_ret = -ERR_UNPROCESSABLE_ENTITY;
3601     goto done;
3602   }
3603   bl.append(etag.c_str(), etag.size() + 1);
3604   emplace_attr(RGW_ATTR_ETAG, std::move(bl));
3605
3606   populate_with_generic_attrs(s, attrs);
3607   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
3608   if (op_ret < 0) {
3609     goto done;
3610   }
3611   encode_delete_at_attr(delete_at, attrs);
3612   encode_obj_tags_attr(obj_tags.get(), attrs);
3613
3614   /* Add a custom metadata to expose the information whether an object
3615    * is an SLO or not. Appending the attribute must be performed AFTER
3616    * processing any input from user in order to prohibit overwriting. */
3617   if (slo_info) {
3618     bufferlist slo_userindicator_bl;
3619     slo_userindicator_bl.append("True", 4);
3620     emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
3621   }
3622
3623   op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
3624                                (delete_at ? *delete_at : real_time()), if_match, if_nomatch,
3625                                (user_data.empty() ? nullptr : &user_data));
3626
3627   /* produce torrent */
3628   if (s->cct->_conf->rgw_torrent_flag && (ofs == torrent.get_data_len()))
3629   {
3630     torrent.init(s, store);
3631     torrent.set_create_date(mtime);
3632     op_ret =  torrent.complete();
3633     if (0 != op_ret)
3634     {
3635       ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl;
3636       goto done;
3637     }
3638   }
3639
3640 done:
3641   dispose_processor(processor);
3642   perfcounter->tinc(l_rgw_put_lat,
3643                     (ceph_clock_now() - s->time));
3644 }
3645
3646 int RGWPostObj::verify_permission()
3647 {
3648   return 0;
3649 }
3650 /*
3651 RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx)
3652 {
3653   RGWPutObjProcessor *processor;
3654
3655   uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
3656
3657   processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
3658
3659   return processor;
3660 }
3661
3662 void RGWPostObj::dispose_processor(RGWPutObjDataProcessor *processor)
3663 {
3664   delete processor;
3665 }
3666 */
3667 void RGWPostObj::pre_exec()
3668 {
3669   rgw_bucket_object_pre_exec(s);
3670 }
3671
3672 void RGWPostObj::execute()
3673 {
3674   RGWPutObjDataProcessor *filter = nullptr;
3675   boost::optional<RGWPutObj_Compress> compressor;
3676   CompressorRef plugin;
3677   char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
3678
3679   /* Read in the data from the POST form. */
3680   op_ret = get_params();
3681   if (op_ret < 0) {
3682     return;
3683   }
3684
3685   op_ret = verify_params();
3686   if (op_ret < 0) {
3687     return;
3688   }
3689
3690   if (s->iam_policy) {
3691     auto e = s->iam_policy->eval(s->env, *s->auth.identity,
3692                                  rgw::IAM::s3PutObject,
3693                                  rgw_obj(s->bucket, s->object));
3694     if (e == Effect::Deny) {
3695       op_ret = -EACCES;
3696       return;
3697     } else if (e == Effect::Pass && !verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
3698       op_ret = -EACCES;
3699       return;
3700     }
3701   } else if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
3702     op_ret = -EACCES;
3703     return;
3704   }
3705
3706   /* Start iteration over data fields. It's necessary as Swift's FormPost
3707    * is capable to handle multiple files in single form. */
3708   do {
3709     std::unique_ptr<RGWPutObjDataProcessor> encrypt;
3710     char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
3711     unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
3712     MD5 hash;
3713     ceph::buffer::list bl, aclbl;
3714     int len = 0;
3715
3716     op_ret = store->check_quota(s->bucket_owner.get_id(),
3717                                 s->bucket,
3718                                 user_quota,
3719                                 bucket_quota,
3720                                 s->content_length);
3721     if (op_ret < 0) {
3722       return;
3723     }
3724
3725     op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
3726     if (op_ret < 0) {
3727       return;
3728     }
3729
3730     if (supplied_md5_b64) {
3731       char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
3732       ldout(s->cct, 15) << "supplied_md5_b64=" << supplied_md5_b64 << dendl;
3733       op_ret = ceph_unarmor(supplied_md5_bin, &supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1],
3734                             supplied_md5_b64, supplied_md5_b64 + strlen(supplied_md5_b64));
3735       ldout(s->cct, 15) << "ceph_armor ret=" << op_ret << dendl;
3736       if (op_ret != CEPH_CRYPTO_MD5_DIGESTSIZE) {
3737         op_ret = -ERR_INVALID_DIGEST;
3738         return;
3739       }
3740
3741       buf_to_hex((const unsigned char *)supplied_md5_bin, CEPH_CRYPTO_MD5_DIGESTSIZE, supplied_md5);
3742       ldout(s->cct, 15) << "supplied_md5=" << supplied_md5 << dendl;
3743     }
3744
3745     RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
3746                                         s->bucket_info,
3747                                         s->bucket,
3748                                         get_current_filename(),
3749                                         /* part size */
3750                                         s->cct->_conf->rgw_obj_stripe_size,
3751                                         s->req_id,
3752                                         s->bucket_info.versioning_enabled());
3753     /* No filters by default. */
3754     filter = &processor;
3755
3756     op_ret = processor.prepare(store, nullptr);
3757     if (op_ret < 0) {
3758       return;
3759     }
3760
3761     op_ret = get_encrypt_filter(&encrypt, filter);
3762     if (op_ret < 0) {
3763       return;
3764     }
3765     if (encrypt != nullptr) {
3766       filter = encrypt.get();
3767     } else {
3768       const auto& compression_type = store->get_zone_params().get_compression_type(
3769           s->bucket_info.placement_rule);
3770       if (compression_type != "none") {
3771         plugin = Compressor::create(s->cct, compression_type);
3772         if (!plugin) {
3773           ldout(s->cct, 1) << "Cannot load plugin for compression type "
3774                            << compression_type << dendl;
3775         } else {
3776           compressor.emplace(s->cct, plugin, filter);
3777           filter = &*compressor;
3778         }
3779       }
3780     }
3781
3782     bool again;
3783     do {
3784       ceph::bufferlist data;
3785       len = get_data(data, again);
3786
3787       if (len < 0) {
3788         op_ret = len;
3789         return;
3790       }
3791
3792       if (!len) {
3793         break;
3794       }
3795
3796       hash.Update((const byte *)data.c_str(), data.length());
3797       op_ret = put_data_and_throttle(filter, data, ofs, false);
3798
3799       ofs += len;
3800
3801       if (ofs > max_len) {
3802         op_ret = -ERR_TOO_LARGE;
3803         return;
3804       }
3805     } while (again);
3806
3807     {
3808       bufferlist flush;
3809       op_ret = put_data_and_throttle(filter, flush, ofs, false);
3810     }
3811
3812     if (len < min_len) {
3813       op_ret = -ERR_TOO_SMALL;
3814       return;
3815     }
3816
3817     s->obj_size = ofs;
3818
3819     if (supplied_md5_b64 && strcmp(calc_md5, supplied_md5)) {
3820       op_ret = -ERR_BAD_DIGEST;
3821       return;
3822     }
3823
3824     op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
3825                                 user_quota, bucket_quota, s->obj_size);
3826     if (op_ret < 0) {
3827       return;
3828     }
3829
3830     op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
3831     if (op_ret < 0) {
3832       return;
3833     }
3834
3835     hash.Final(m);
3836     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
3837
3838     etag = calc_md5;
3839     bl.append(etag.c_str(), etag.size() + 1);
3840     emplace_attr(RGW_ATTR_ETAG, std::move(bl));
3841
3842     policy.encode(aclbl);
3843     emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
3844
3845     const std::string content_type = get_current_content_type();
3846     if (! content_type.empty()) {
3847       ceph::bufferlist ct_bl;
3848       ct_bl.append(content_type.c_str(), content_type.size() + 1);
3849       emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
3850     }
3851
3852     if (compressor && compressor->is_compressed()) {
3853       ceph::bufferlist tmp;
3854       RGWCompressionInfo cs_info;
3855       cs_info.compression_type = plugin->get_type_name();
3856       cs_info.orig_size = s->obj_size;
3857       cs_info.blocks = move(compressor->get_compression_blocks());
3858       ::encode(cs_info, tmp);
3859       emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
3860     }
3861
3862     op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(),
3863                                 attrs, (delete_at ? *delete_at : real_time()));
3864   } while (is_next_file_to_upload());
3865 }
3866
3867
3868 void RGWPutMetadataAccount::filter_out_temp_url(map<string, bufferlist>& add_attrs,
3869                                                 const set<string>& rmattr_names,
3870                                                 map<int, string>& temp_url_keys)
3871 {
3872   map<string, bufferlist>::iterator iter;
3873
3874   iter = add_attrs.find(RGW_ATTR_TEMPURL_KEY1);
3875   if (iter != add_attrs.end()) {
3876     temp_url_keys[0] = iter->second.c_str();
3877     add_attrs.erase(iter);
3878   }
3879
3880   iter = add_attrs.find(RGW_ATTR_TEMPURL_KEY2);
3881   if (iter != add_attrs.end()) {
3882     temp_url_keys[1] = iter->second.c_str();
3883     add_attrs.erase(iter);
3884   }
3885
3886   for (const string& name : rmattr_names) {
3887     if (name.compare(RGW_ATTR_TEMPURL_KEY1) == 0) {
3888       temp_url_keys[0] = string();
3889     }
3890     if (name.compare(RGW_ATTR_TEMPURL_KEY2) == 0) {
3891       temp_url_keys[1] = string();
3892     }
3893   }
3894 }
3895
3896 int RGWPutMetadataAccount::init_processing()
3897 {
3898   /* First, go to the base class. At the time of writing the method was
3899    * responsible only for initializing the quota. This isn't necessary
3900    * here as we are touching metadata only. I'm putting this call only
3901    * for the future. */
3902   op_ret = RGWOp::init_processing();
3903   if (op_ret < 0) {
3904     return op_ret;
3905   }
3906
3907   op_ret = get_params();
3908   if (op_ret < 0) {
3909     return op_ret;
3910   }
3911
3912   op_ret = rgw_get_user_attrs_by_uid(store, s->user->user_id, orig_attrs,
3913                                      &acct_op_tracker);
3914   if (op_ret < 0) {
3915     return op_ret;
3916   }
3917
3918   if (has_policy) {
3919     bufferlist acl_bl;
3920     policy.encode(acl_bl);
3921     attrs.emplace(RGW_ATTR_ACL, std::move(acl_bl));
3922   }
3923
3924   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
3925   if (op_ret < 0) {
3926     return op_ret;
3927   }
3928   prepare_add_del_attrs(orig_attrs, rmattr_names, attrs);
3929   populate_with_generic_attrs(s, attrs);
3930
3931   /* Try extract the TempURL-related stuff now to allow verify_permission
3932    * evaluate whether we need FULL_CONTROL or not. */
3933   filter_out_temp_url(attrs, rmattr_names, temp_url_keys);
3934
3935   /* The same with quota except a client needs to be reseller admin. */
3936   op_ret = filter_out_quota_info(attrs, rmattr_names, new_quota,
3937                                  &new_quota_extracted);
3938   if (op_ret < 0) {
3939     return op_ret;
3940   }
3941
3942   return 0;
3943 }
3944
3945 int RGWPutMetadataAccount::verify_permission()
3946 {
3947   if (s->auth.identity->is_anonymous()) {
3948     return -EACCES;
3949   }
3950
3951   if (!verify_user_permission(s, RGW_PERM_WRITE)) {
3952     return -EACCES;
3953   }
3954
3955   /* Altering TempURL keys requires FULL_CONTROL. */
3956   if (!temp_url_keys.empty() && s->perm_mask != RGW_PERM_FULL_CONTROL) {
3957     return -EPERM;
3958   }
3959
3960   /* We are failing this intensionally to allow system user/reseller admin
3961    * override in rgw_process.cc. This is the way to specify a given RGWOp
3962    * expect extra privileges.  */
3963   if (new_quota_extracted) {
3964     return -EACCES;
3965   }
3966
3967   return 0;
3968 }
3969
3970 void RGWPutMetadataAccount::execute()
3971 {
3972   /* Params have been extracted earlier. See init_processing(). */
3973   RGWUserInfo new_uinfo;
3974   op_ret = rgw_get_user_info_by_uid(store, s->user->user_id, new_uinfo,
3975                                     &acct_op_tracker);
3976   if (op_ret < 0) {
3977     return;
3978   }
3979
3980   /* Handle the TempURL-related stuff. */
3981   if (!temp_url_keys.empty()) {
3982     for (auto& pair : temp_url_keys) {
3983       new_uinfo.temp_url_keys[pair.first] = std::move(pair.second);
3984     }
3985   }
3986
3987   /* Handle the quota extracted at the verify_permission step. */
3988   if (new_quota_extracted) {
3989     new_uinfo.user_quota = std::move(new_quota);
3990   }
3991
3992   /* We are passing here the current (old) user info to allow the function
3993    * optimize-out some operations. */
3994   op_ret = rgw_store_user_info(store, new_uinfo, s->user,
3995                                &acct_op_tracker, real_time(), false, &attrs);
3996 }
3997
3998 int RGWPutMetadataBucket::verify_permission()
3999 {
4000   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
4001     return -EACCES;
4002   }
4003
4004   return 0;
4005 }
4006
4007 void RGWPutMetadataBucket::pre_exec()
4008 {
4009   rgw_bucket_object_pre_exec(s);
4010 }
4011
4012 void RGWPutMetadataBucket::execute()
4013 {
4014   op_ret = get_params();
4015   if (op_ret < 0) {
4016     return;
4017   }
4018
4019   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs, false);
4020   if (op_ret < 0) {
4021     return;
4022   }
4023
4024   if (!placement_rule.empty() &&
4025       placement_rule != s->bucket_info.placement_rule) {
4026     op_ret = -EEXIST;
4027     return;
4028   }
4029
4030   /* Encode special metadata first as we're using std::map::emplace under
4031    * the hood. This method will add the new items only if the map doesn't
4032    * contain such keys yet. */
4033   if (has_policy) {
4034     if (s->dialect.compare("swift") == 0) {
4035         auto old_policy = \
4036           static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
4037         auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
4038         new_policy->filter_merge(policy_rw_mask, old_policy);
4039         policy = *new_policy;
4040     }
4041     buffer::list bl;
4042     policy.encode(bl);
4043     emplace_attr(RGW_ATTR_ACL, std::move(bl));
4044   }
4045
4046   if (has_cors) {
4047     buffer::list bl;
4048     cors_config.encode(bl);
4049     emplace_attr(RGW_ATTR_CORS, std::move(bl));
4050   }
4051
4052   /* It's supposed that following functions WILL NOT change any special
4053    * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
4054   prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
4055   populate_with_generic_attrs(s, attrs);
4056
4057   /* According  to the Swift's behaviour and its container_quota WSGI middleware
4058    * implementation: anyone with write permissions is able to set the bucket
4059    * quota. This stays in contrast to account quotas that can be set only by
4060    * clients holding reseller admin privileges. */
4061   op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
4062   if (op_ret < 0) {
4063     return;
4064   }
4065
4066   if (swift_ver_location) {
4067     s->bucket_info.swift_ver_location = *swift_ver_location;
4068     s->bucket_info.swift_versioning = (! swift_ver_location->empty());
4069   }
4070
4071   /* Web site of Swift API. */
4072   filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
4073   s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
4074
4075   /* Setting attributes also stores the provided bucket info. Due to this
4076    * fact, the new quota settings can be serialized with the same call. */
4077   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
4078                                 &s->bucket_info.objv_tracker);
4079 }
4080
4081 int RGWPutMetadataObject::verify_permission()
4082 {
4083   // This looks to be something specific to Swift. We could add
4084   // operations like swift:PutMetadataObject to the Policy Engine.
4085   if (!verify_object_permission_no_policy(s, RGW_PERM_WRITE)) {
4086     return -EACCES;
4087   }
4088
4089   return 0;
4090 }
4091
4092 void RGWPutMetadataObject::pre_exec()
4093 {
4094   rgw_bucket_object_pre_exec(s);
4095 }
4096
4097 void RGWPutMetadataObject::execute()
4098 {
4099   rgw_obj obj(s->bucket, s->object);
4100   map<string, bufferlist> attrs, orig_attrs, rmattrs;
4101
4102   store->set_atomic(s->obj_ctx, obj);
4103
4104   op_ret = get_params();
4105   if (op_ret < 0) {
4106     return;
4107   }
4108
4109   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
4110   if (op_ret < 0) {
4111     return;
4112   }
4113
4114   /* check if obj exists, read orig attrs */
4115   op_ret = get_obj_attrs(store, s, obj, orig_attrs);
4116   if (op_ret < 0) {
4117     return;
4118   }
4119
4120   /* Check whether the object has expired. Swift API documentation
4121    * stands that we should return 404 Not Found in such case. */
4122   if (need_object_expiration() && object_is_expired(orig_attrs)) {
4123     op_ret = -ENOENT;
4124     return;
4125   }
4126
4127   /* Filter currently existing attributes. */
4128   prepare_add_del_attrs(orig_attrs, attrs, rmattrs);
4129   populate_with_generic_attrs(s, attrs);
4130   encode_delete_at_attr(delete_at, attrs);
4131
4132   if (dlo_manifest) {
4133     op_ret = encode_dlo_manifest_attr(dlo_manifest, attrs);
4134     if (op_ret < 0) {
4135       ldout(s->cct, 0) << "bad user manifest: " << dlo_manifest << dendl;
4136       return;
4137     }
4138   }
4139
4140   op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, &rmattrs);
4141 }
4142
4143 int RGWDeleteObj::handle_slo_manifest(bufferlist& bl)
4144 {
4145   RGWSLOInfo slo_info;
4146   bufferlist::iterator bliter = bl.begin();
4147   try {
4148     ::decode(slo_info, bliter);
4149   } catch (buffer::error& err) {
4150     ldout(s->cct, 0) << "ERROR: failed to decode slo manifest" << dendl;
4151     return -EIO;
4152   }
4153
4154   try {
4155     deleter = std::unique_ptr<RGWBulkDelete::Deleter>(\
4156           new RGWBulkDelete::Deleter(store, s));
4157   } catch (std::bad_alloc) {
4158     return -ENOMEM;
4159   }
4160
4161   list<RGWBulkDelete::acct_path_t> items;
4162   for (const auto& iter : slo_info.entries) {
4163     const string& path_str = iter.path;
4164
4165     const size_t sep_pos = path_str.find('/', 1 /* skip first slash */);
4166     if (boost::string_view::npos == sep_pos) {
4167       return -EINVAL;
4168     }
4169
4170     RGWBulkDelete::acct_path_t path;
4171
4172     path.bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
4173     path.obj_key = url_decode(path_str.substr(sep_pos + 1));
4174
4175     items.push_back(path);
4176   }
4177
4178   /* Request removal of the manifest object itself. */
4179   RGWBulkDelete::acct_path_t path;
4180   path.bucket_name = s->bucket_name;
4181   path.obj_key = s->object;
4182   items.push_back(path);
4183
4184   int ret = deleter->delete_chunk(items);
4185   if (ret < 0) {
4186     return ret;
4187   }
4188
4189   return 0;
4190 }
4191
4192 int RGWDeleteObj::verify_permission()
4193 {
4194   if (s->iam_policy) {
4195     auto r = s->iam_policy->eval(s->env, *s->auth.identity,
4196                                  s->object.instance.empty() ?
4197                                  rgw::IAM::s3DeleteObject :
4198                                  rgw::IAM::s3DeleteObjectVersion,
4199                                  ARN(s->bucket, s->object.name));
4200     if (r == Effect::Allow)
4201       return true;
4202     else if (r == Effect::Deny)
4203       return false;
4204   }
4205
4206   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
4207     return -EACCES;
4208   }
4209
4210   return 0;
4211 }
4212
4213 void RGWDeleteObj::pre_exec()
4214 {
4215   rgw_bucket_object_pre_exec(s);
4216 }
4217
4218 void RGWDeleteObj::execute()
4219 {
4220   if (!s->bucket_exists) {
4221     op_ret = -ERR_NO_SUCH_BUCKET;
4222     return;
4223   }
4224
4225   op_ret = get_params();
4226   if (op_ret < 0) {
4227     return;
4228   }
4229
4230   rgw_obj obj(s->bucket, s->object);
4231   map<string, bufferlist> attrs;
4232
4233
4234   if (!s->object.empty()) {
4235     if (need_object_expiration() || multipart_delete) {
4236       /* check if obj exists, read orig attrs */
4237       op_ret = get_obj_attrs(store, s, obj, attrs);
4238       if (op_ret < 0) {
4239         return;
4240       }
4241     }
4242
4243     if (multipart_delete) {
4244       const auto slo_attr = attrs.find(RGW_ATTR_SLO_MANIFEST);
4245
4246       if (slo_attr != attrs.end()) {
4247         op_ret = handle_slo_manifest(slo_attr->second);
4248         if (op_ret < 0) {
4249           ldout(s->cct, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
4250         }
4251       } else {
4252         op_ret = -ERR_NOT_SLO_MANIFEST;
4253       }
4254
4255       return;
4256     }
4257
4258     RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
4259     obj_ctx->obj.set_atomic(obj);
4260
4261     bool ver_restored = false;
4262     op_ret = store->swift_versioning_restore(*obj_ctx, s->bucket_owner.get_id(),
4263                                              s->bucket_info, obj, ver_restored);
4264     if (op_ret < 0) {
4265       return;
4266     }
4267
4268     if (!ver_restored) {
4269       /* Swift's versioning mechanism hasn't found any previous version of
4270        * the object that could be restored. This means we should proceed
4271        * with the regular delete path. */
4272       RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, obj);
4273       RGWRados::Object::Delete del_op(&del_target);
4274
4275       op_ret = get_system_versioning_params(s, &del_op.params.olh_epoch,
4276                                             &del_op.params.marker_version_id);
4277       if (op_ret < 0) {
4278         return;
4279       }
4280
4281       del_op.params.bucket_owner = s->bucket_owner.get_id();
4282       del_op.params.versioning_status = s->bucket_info.versioning_status();
4283       del_op.params.obj_owner = s->owner;
4284       del_op.params.unmod_since = unmod_since;
4285       del_op.params.high_precision_time = s->system_request; /* system request uses high precision time */
4286
4287       op_ret = del_op.delete_obj();
4288       if (op_ret >= 0) {
4289         delete_marker = del_op.result.delete_marker;
4290         version_id = del_op.result.version_id;
4291       }
4292
4293       /* Check whether the object has expired. Swift API documentation
4294        * stands that we should return 404 Not Found in such case. */
4295       if (need_object_expiration() && object_is_expired(attrs)) {
4296         op_ret = -ENOENT;
4297         return;
4298       }
4299     }
4300
4301     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
4302       op_ret = 0;
4303     }
4304   } else {
4305     op_ret = -EINVAL;
4306   }
4307 }
4308
4309
4310 bool RGWCopyObj::parse_copy_location(const string& url_src, string& bucket_name, rgw_obj_key& key)
4311 {
4312   string name_str;
4313   string params_str;
4314
4315   size_t pos = url_src.find('?');
4316   if (pos == string::npos) {
4317     name_str = url_src;
4318   } else {
4319     name_str = url_src.substr(0, pos);
4320     params_str = url_src.substr(pos + 1);
4321   }
4322
4323   std::string dec_src = url_decode(name_str);
4324   const char *src = dec_src.c_str();
4325
4326   if (*src == '/') ++src;
4327
4328   string str(src);
4329
4330   pos = str.find('/');
4331   if (pos ==string::npos)
4332     return false;
4333
4334   bucket_name = str.substr(0, pos);
4335   key.name = str.substr(pos + 1);
4336
4337   if (key.name.empty()) {
4338     return false;
4339   }
4340
4341   if (!params_str.empty()) {
4342     RGWHTTPArgs args;
4343     args.set(params_str);
4344     args.parse();
4345
4346     key.instance = args.get("versionId", NULL);
4347   }
4348
4349   return true;
4350 }
4351
4352 int RGWCopyObj::verify_permission()
4353 {
4354   RGWAccessControlPolicy src_acl(s->cct);
4355   optional<Policy> src_policy;
4356   op_ret = get_params();
4357   if (op_ret < 0)
4358     return op_ret;
4359
4360   op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
4361   if (op_ret < 0) {
4362     return op_ret;
4363   }
4364   map<string, bufferlist> src_attrs;
4365
4366   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
4367
4368   if (s->bucket_instance_id.empty()) {
4369     op_ret = store->get_bucket_info(obj_ctx, src_tenant_name, src_bucket_name, src_bucket_info, NULL, &src_attrs);
4370   } else {
4371     /* will only happen in intra region sync where the source and dest bucket is the same */
4372     op_ret = store->get_bucket_instance_info(obj_ctx, s->bucket_instance_id, src_bucket_info, NULL, &src_attrs);
4373   }
4374   if (op_ret < 0) {
4375     if (op_ret == -ENOENT) {
4376       op_ret = -ERR_NO_SUCH_BUCKET;
4377     }
4378     return op_ret;
4379   }
4380
4381   src_bucket = src_bucket_info.bucket;
4382
4383   /* get buckets info (source and dest) */
4384   if (s->local_source &&  source_zone.empty()) {
4385     rgw_obj src_obj(src_bucket, src_object);
4386     store->set_atomic(s->obj_ctx, src_obj);
4387     store->set_prefetch_data(s->obj_ctx, src_obj);
4388
4389     /* check source object permissions */
4390     op_ret = read_obj_policy(store, s, src_bucket_info, src_attrs, &src_acl,
4391                              src_policy, src_bucket, src_object);
4392     if (op_ret < 0) {
4393       return op_ret;
4394     }
4395
4396     /* admin request overrides permission checks */
4397     if (!s->auth.identity->is_admin_of(src_acl.get_owner().get_id())) {
4398       if (src_policy) {
4399         auto e = src_policy->eval(s->env, *s->auth.identity,
4400                                   src_object.instance.empty() ?
4401                                   rgw::IAM::s3GetObject :
4402                                   rgw::IAM::s3GetObjectVersion,
4403                                   ARN(src_obj));
4404         if (e == Effect::Deny) {
4405           return -EACCES;
4406         } else if (e == Effect::Pass &&
4407                    !src_acl.verify_permission(*s->auth.identity, s->perm_mask,
4408                                               RGW_PERM_READ)) { 
4409           return -EACCES;
4410         }
4411       } else if (!src_acl.verify_permission(*s->auth.identity,
4412                                                s->perm_mask,
4413                                             RGW_PERM_READ)) {
4414         return -EACCES;
4415       }
4416     }
4417   }
4418
4419   RGWAccessControlPolicy dest_bucket_policy(s->cct);
4420   map<string, bufferlist> dest_attrs;
4421
4422   if (src_bucket_name.compare(dest_bucket_name) == 0) { /* will only happen if s->local_source
4423                                                            or intra region sync */
4424     dest_bucket_info = src_bucket_info;
4425     dest_attrs = src_attrs;
4426   } else {
4427     op_ret = store->get_bucket_info(obj_ctx, dest_tenant_name, dest_bucket_name,
4428                                     dest_bucket_info, nullptr, &dest_attrs);
4429     if (op_ret < 0) {
4430       if (op_ret == -ENOENT) {
4431         op_ret = -ERR_NO_SUCH_BUCKET;
4432       }
4433       return op_ret;
4434     }
4435   }
4436
4437   dest_bucket = dest_bucket_info.bucket;
4438
4439   rgw_obj dest_obj(dest_bucket, dest_object);
4440   store->set_atomic(s->obj_ctx, dest_obj);
4441
4442   /* check dest bucket permissions */
4443   op_ret = read_bucket_policy(store, s, dest_bucket_info, dest_attrs,
4444                               &dest_bucket_policy, dest_bucket);
4445   if (op_ret < 0) {
4446     return op_ret;
4447   }
4448
4449   /* admin request overrides permission checks */
4450   if (! s->auth.identity->is_admin_of(dest_policy.get_owner().get_id()) &&
4451       ! dest_bucket_policy.verify_permission(*s->auth.identity, s->perm_mask,
4452                                              RGW_PERM_WRITE)) {
4453     return -EACCES;
4454   }
4455
4456   op_ret = init_dest_policy();
4457   if (op_ret < 0) {
4458     return op_ret;
4459   }
4460
4461   return 0;
4462 }
4463
4464
4465 int RGWCopyObj::init_common()
4466 {
4467   if (if_mod) {
4468     if (parse_time(if_mod, &mod_time) < 0) {
4469       op_ret = -EINVAL;
4470       return op_ret;
4471     }
4472     mod_ptr = &mod_time;
4473   }
4474
4475   if (if_unmod) {
4476     if (parse_time(if_unmod, &unmod_time) < 0) {
4477       op_ret = -EINVAL;
4478       return op_ret;
4479     }
4480     unmod_ptr = &unmod_time;
4481   }
4482
4483   bufferlist aclbl;
4484   dest_policy.encode(aclbl);
4485   emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
4486
4487   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
4488   if (op_ret < 0) {
4489     return op_ret;
4490   }
4491   populate_with_generic_attrs(s, attrs);
4492
4493   return 0;
4494 }
4495
4496 static void copy_obj_progress_cb(off_t ofs, void *param)
4497 {
4498   RGWCopyObj *op = static_cast<RGWCopyObj *>(param);
4499   op->progress_cb(ofs);
4500 }
4501
4502 void RGWCopyObj::progress_cb(off_t ofs)
4503 {
4504   if (!s->cct->_conf->rgw_copy_obj_progress)
4505     return;
4506
4507   if (ofs - last_ofs < s->cct->_conf->rgw_copy_obj_progress_every_bytes)
4508     return;
4509
4510   send_partial_response(ofs);
4511
4512   last_ofs = ofs;
4513 }
4514
4515 void RGWCopyObj::pre_exec()
4516 {
4517   rgw_bucket_object_pre_exec(s);
4518 }
4519
4520 void RGWCopyObj::execute()
4521 {
4522   if (init_common() < 0)
4523     return;
4524
4525   rgw_obj src_obj(src_bucket, src_object);
4526   rgw_obj dst_obj(dest_bucket, dest_object);
4527
4528   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
4529   obj_ctx.obj.set_atomic(src_obj);
4530   obj_ctx.obj.set_atomic(dst_obj);
4531
4532   encode_delete_at_attr(delete_at, attrs);
4533
4534   bool high_precision_time = (s->system_request);
4535
4536   /* Handle object versioning of Swift API. In case of copying to remote this
4537    * should fail gently (op_ret == 0) as the dst_obj will not exist here. */
4538   op_ret = store->swift_versioning_copy(obj_ctx,
4539                                         dest_bucket_info.owner,
4540                                         dest_bucket_info,
4541                                         dst_obj);
4542   if (op_ret < 0) {
4543     return;
4544   }
4545
4546   op_ret = store->copy_obj(obj_ctx,
4547                            s->user->user_id,
4548                            client_id,
4549                            op_id,
4550                            &s->info,
4551                            source_zone,
4552                            dst_obj,
4553                            src_obj,
4554                            dest_bucket_info,
4555                            src_bucket_info,
4556                            &src_mtime,
4557                            &mtime,
4558                            mod_ptr,
4559                            unmod_ptr,
4560                            high_precision_time,
4561                            if_match,
4562                            if_nomatch,
4563                            attrs_mod,
4564                            copy_if_newer,
4565                            attrs, RGW_OBJ_CATEGORY_MAIN,
4566                            olh_epoch,
4567                            (delete_at ? *delete_at : real_time()),
4568                            (version_id.empty() ? NULL : &version_id),
4569                            &s->req_id, /* use req_id as tag */
4570                            &etag,
4571                            copy_obj_progress_cb, (void *)this
4572     );
4573 }
4574
4575 int RGWGetACLs::verify_permission()
4576 {
4577   bool perm;
4578   if (!s->object.empty()) {
4579     perm = verify_object_permission(s,
4580                                     s->object.instance.empty() ?
4581                                     rgw::IAM::s3GetObjectAcl :
4582                                     rgw::IAM::s3GetObjectVersionAcl);
4583   } else {
4584     perm = verify_bucket_permission(s, rgw::IAM::s3GetBucketAcl);
4585   }
4586   if (!perm)
4587     return -EACCES;
4588
4589   return 0;
4590 }
4591
4592 void RGWGetACLs::pre_exec()
4593 {
4594   rgw_bucket_object_pre_exec(s);
4595 }
4596
4597 void RGWGetACLs::execute()
4598 {
4599   stringstream ss;
4600   RGWAccessControlPolicy* const acl = \
4601     (!s->object.empty() ? s->object_acl.get() : s->bucket_acl.get());
4602   RGWAccessControlPolicy_S3* const s3policy = \
4603     static_cast<RGWAccessControlPolicy_S3*>(acl);
4604   s3policy->to_xml(ss);
4605   acls = ss.str();
4606 }
4607
4608
4609
4610 int RGWPutACLs::verify_permission()
4611 {
4612   bool perm;
4613   if (!s->object.empty()) {
4614     perm = verify_object_permission(s,
4615                                     s->object.instance.empty() ?
4616                                     rgw::IAM::s3PutObjectAcl :
4617                                     rgw::IAM::s3PutObjectVersionAcl);
4618   } else {
4619     perm = verify_bucket_permission(s, rgw::IAM::s3PutBucketAcl);
4620   }
4621   if (!perm)
4622     return -EACCES;
4623
4624   return 0;
4625 }
4626
4627 int RGWGetLC::verify_permission()
4628 {
4629   bool perm;
4630   perm = verify_bucket_permission(s, rgw::IAM::s3GetLifecycleConfiguration);
4631   if (!perm)
4632     return -EACCES;
4633
4634   return 0;
4635 }
4636
4637 int RGWPutLC::verify_permission()
4638 {
4639   bool perm;
4640   perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
4641   if (!perm)
4642     return -EACCES;
4643
4644   return 0;
4645 }
4646
4647 int RGWDeleteLC::verify_permission()
4648 {
4649   bool perm;
4650   perm = verify_bucket_permission(s, rgw::IAM::s3PutLifecycleConfiguration);
4651   if (!perm)
4652     return -EACCES;
4653
4654   return 0;
4655 }
4656
4657 void RGWPutACLs::pre_exec()
4658 {
4659   rgw_bucket_object_pre_exec(s);
4660 }
4661
4662 void RGWGetLC::pre_exec()
4663 {
4664   rgw_bucket_object_pre_exec(s);
4665 }
4666
4667 void RGWPutLC::pre_exec()
4668 {
4669   rgw_bucket_object_pre_exec(s);
4670 }
4671
4672 void RGWDeleteLC::pre_exec()
4673 {
4674   rgw_bucket_object_pre_exec(s);
4675 }
4676
4677 void RGWPutACLs::execute()
4678 {
4679   bufferlist bl;
4680
4681   RGWAccessControlPolicy_S3 *policy = NULL;
4682   RGWACLXMLParser_S3 parser(s->cct);
4683   RGWAccessControlPolicy_S3 new_policy(s->cct);
4684   stringstream ss;
4685   char *new_data = NULL;
4686   rgw_obj obj;
4687
4688   op_ret = 0; /* XXX redundant? */
4689
4690   if (!parser.init()) {
4691     op_ret = -EINVAL;
4692     return;
4693   }
4694
4695
4696   RGWAccessControlPolicy* const existing_policy = \
4697     (s->object.empty() ? s->bucket_acl.get() : s->object_acl.get());
4698
4699   owner = existing_policy->get_owner();
4700
4701   op_ret = get_params();
4702   if (op_ret < 0) {
4703     if (op_ret == -ERANGE) {
4704       ldout(s->cct, 4) << "The size of request xml data is larger than the max limitation, data size = "
4705                        << s->length << dendl;
4706       op_ret = -ERR_MALFORMED_XML;
4707       s->err.message = "The XML you provided was larger than the maximum " +
4708                        std::to_string(s->cct->_conf->rgw_max_put_param_size) +
4709                        " bytes allowed.";
4710     }
4711     return;
4712   }
4713
4714   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
4715
4716   if (!s->canned_acl.empty() && len) {
4717     op_ret = -EINVAL;
4718     return;
4719   }
4720
4721   if (!s->canned_acl.empty() || s->has_acl_header) {
4722     op_ret = get_policy_from_state(store, s, ss);
4723     if (op_ret < 0)
4724       return;
4725
4726     new_data = strdup(ss.str().c_str());
4727     free(data);
4728     data = new_data;
4729     len = ss.str().size();
4730   }
4731
4732   if (!parser.parse(data, len, 1)) {
4733     op_ret = -EINVAL;
4734     return;
4735   }
4736   policy = static_cast<RGWAccessControlPolicy_S3 *>(parser.find_first("AccessControlPolicy"));
4737   if (!policy) {
4738     op_ret = -EINVAL;
4739     return;
4740   }
4741
4742   const RGWAccessControlList& req_acl = policy->get_acl();
4743   const multimap<string, ACLGrant>& req_grant_map = req_acl.get_grant_map();
4744 #define ACL_GRANTS_MAX_NUM      100
4745   int max_num = s->cct->_conf->rgw_acl_grants_max_num;
4746   if (max_num < 0) {
4747     max_num = ACL_GRANTS_MAX_NUM;
4748   }
4749
4750   int grants_num = req_grant_map.size();
4751   if (grants_num > max_num) {
4752     ldout(s->cct, 4) << "An acl can have up to "
4753                      << max_num
4754                      << " grants, request acl grants num: "
4755                      << grants_num << dendl;
4756     op_ret = -ERR_MALFORMED_ACL_ERROR;
4757     s->err.message = "The request is rejected, because the acl grants number you requested is larger than the maximum "
4758                      + std::to_string(max_num)
4759                      + " grants allowed in an acl.";
4760     return;
4761   }
4762
4763   // forward bucket acl requests to meta master zone
4764   if (s->object.empty() && !store->is_meta_master()) {
4765     bufferlist in_data;
4766     // include acl data unless it was generated from a canned_acl
4767     if (s->canned_acl.empty()) {
4768       in_data.append(data, len);
4769     }
4770     op_ret = forward_request_to_master(s, NULL, store, in_data, NULL);
4771     if (op_ret < 0) {
4772       ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
4773       return;
4774     }
4775   }
4776
4777   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
4778     ldout(s->cct, 15) << "Old AccessControlPolicy";
4779     policy->to_xml(*_dout);
4780     *_dout << dendl;
4781   }
4782
4783   op_ret = policy->rebuild(store, &owner, new_policy);
4784   if (op_ret < 0)
4785     return;
4786
4787   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
4788     ldout(s->cct, 15) << "New AccessControlPolicy:";
4789     new_policy.to_xml(*_dout);
4790     *_dout << dendl;
4791   }
4792
4793   new_policy.encode(bl);
4794   map<string, bufferlist> attrs;
4795
4796   if (!s->object.empty()) {
4797     obj = rgw_obj(s->bucket, s->object);
4798     store->set_atomic(s->obj_ctx, obj);
4799     //if instance is empty, we should modify the latest object
4800     op_ret = modify_obj_attr(store, s, obj, RGW_ATTR_ACL, bl);
4801   } else {
4802     attrs = s->bucket_attrs;
4803     attrs[RGW_ATTR_ACL] = bl;
4804     op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
4805   }
4806   if (op_ret == -ECANCELED) {
4807     op_ret = 0; /* lost a race, but it's ok because acls are immutable */
4808   }
4809 }
4810
4811 static void get_lc_oid(struct req_state *s, string& oid)
4812 {
4813   string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
4814   int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs;
4815   int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
4816   oid = lc_oid_prefix;
4817   char buf[32];
4818   snprintf(buf, 32, ".%d", index);
4819   oid.append(buf);
4820   return;
4821 }
4822
4823 void RGWPutLC::execute()
4824 {
4825   bufferlist bl;
4826   
4827   RGWLifecycleConfiguration_S3 *config = NULL;
4828   RGWLCXMLParser_S3 parser(s->cct);
4829   RGWLifecycleConfiguration_S3 new_config(s->cct);
4830
4831   if (!parser.init()) {
4832     op_ret = -EINVAL;
4833     return;
4834   }
4835
4836   op_ret = get_params();
4837   if (op_ret < 0)
4838     return;
4839
4840   ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
4841
4842   if (!parser.parse(data, len, 1)) {
4843     op_ret = -ERR_MALFORMED_XML;
4844     return;
4845   }
4846   config = static_cast<RGWLifecycleConfiguration_S3 *>(parser.find_first("LifecycleConfiguration"));
4847   if (!config) {
4848     op_ret = -ERR_MALFORMED_XML;
4849     return;
4850   }
4851
4852   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
4853     ldout(s->cct, 15) << "Old LifecycleConfiguration:";
4854     config->to_xml(*_dout);
4855     *_dout << dendl;
4856   }
4857
4858   op_ret = config->rebuild(store, new_config);
4859   if (op_ret < 0)
4860     return;
4861
4862   if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
4863     ldout(s->cct, 15) << "New LifecycleConfiguration:";
4864     new_config.to_xml(*_dout);
4865     *_dout << dendl;
4866   }
4867   
4868   new_config.encode(bl);
4869   map<string, bufferlist> attrs;
4870   attrs = s->bucket_attrs;
4871   attrs[RGW_ATTR_LC] = bl;
4872   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
4873   if (op_ret < 0)
4874     return;
4875   string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;  
4876   string oid; 
4877   get_lc_oid(s, oid);
4878   pair<string, int> entry(shard_id, lc_uninitial);
4879   int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
4880   rados::cls::lock::Lock l(lc_index_lock_name); 
4881   utime_t time(max_lock_secs, 0);
4882   l.set_duration(time);
4883   l.set_cookie(cookie);
4884   librados::IoCtx *ctx = store->get_lc_pool_ctx();
4885   do {
4886     op_ret = l.lock_exclusive(ctx, oid);
4887     if (op_ret == -EBUSY) {
4888       dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
4889       sleep(5);
4890       continue;
4891     }
4892     if (op_ret < 0) {
4893       dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << op_ret << dendl;
4894       break;
4895     }
4896     op_ret = cls_rgw_lc_set_entry(*ctx, oid, entry);
4897     if (op_ret < 0) {
4898       dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << op_ret << dendl;     
4899     }
4900     break;
4901   }while(1);
4902   l.unlock(ctx, oid);
4903   return;
4904 }
4905
4906 void RGWDeleteLC::execute()
4907 {
4908   bufferlist bl;
4909   map<string, bufferlist> orig_attrs, attrs;
4910   map<string, bufferlist>::iterator iter;
4911   rgw_raw_obj obj;
4912   store->get_bucket_instance_obj(s->bucket, obj);
4913   store->set_prefetch_data(s->obj_ctx, obj);
4914   op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
4915   if (op_ret < 0)
4916     return;
4917     
4918   for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
4919       const string& name = iter->first;
4920       dout(10) << "DeleteLC : attr: " << name << dendl;
4921       if (name.compare(0, (sizeof(RGW_ATTR_LC) - 1), RGW_ATTR_LC) != 0) {
4922         if (attrs.find(name) == attrs.end()) {
4923         attrs[name] = iter->second;
4924         }
4925       }
4926     }
4927   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
4928   string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
4929   pair<string, int> entry(shard_id, lc_uninitial);
4930   string oid; 
4931   get_lc_oid(s, oid);
4932   int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
4933   librados::IoCtx *ctx = store->get_lc_pool_ctx();
4934   rados::cls::lock::Lock l(lc_index_lock_name);
4935   utime_t time(max_lock_secs, 0);
4936   l.set_duration(time);
4937   do {
4938     op_ret = l.lock_exclusive(ctx, oid);
4939     if (op_ret == -EBUSY) {
4940       dout(0) << "RGWLC::RGWDeleteLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
4941       sleep(5);
4942       continue;
4943     }
4944     if (op_ret < 0) {
4945       dout(0) << "RGWLC::RGWDeleteLC() failed to acquire lock " << oid << op_ret << dendl;
4946       break;
4947     }
4948     op_ret = cls_rgw_lc_rm_entry(*ctx, oid, entry);
4949     if (op_ret < 0) {
4950       dout(0) << "RGWLC::RGWDeleteLC() failed to set entry " << oid << op_ret << dendl;
4951     }
4952     break;
4953   }while(1);
4954   l.unlock(ctx, oid);
4955   return;
4956 }
4957
4958 int RGWGetCORS::verify_permission()
4959 {
4960   if (s->iam_policy) {
4961     if (s->iam_policy->eval(s->env, *s->auth.identity,
4962                             rgw::IAM::s3PutBucketCORS,
4963                             ARN(s->bucket)) == Effect::Allow) {
4964       return 0;
4965     }
4966   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
4967     return 0;
4968   }
4969   return -EACCES;
4970 }
4971
4972 void RGWGetCORS::execute()
4973 {
4974   op_ret = read_bucket_cors();
4975   if (op_ret < 0)
4976     return ;
4977
4978   if (!cors_exist) {
4979     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
4980     op_ret = -ENOENT;
4981     return;
4982   }
4983 }
4984
4985 int RGWPutCORS::verify_permission()
4986 {
4987   if (s->iam_policy) {
4988     if (s->iam_policy->eval(s->env, *s->auth.identity,
4989                             rgw::IAM::s3PutBucketCORS,
4990                             ARN(s->bucket)) == Effect::Allow) {
4991       return 0;
4992     }
4993   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
4994     return 0;
4995   }
4996   return -EACCES;
4997 }
4998
4999 void RGWPutCORS::execute()
5000 {
5001   rgw_raw_obj obj;
5002
5003   op_ret = get_params();
5004   if (op_ret < 0)
5005     return;
5006
5007   if (!store->is_meta_master()) {
5008     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
5009     if (op_ret < 0) {
5010       ldout(s->cct, 20) << __func__ << "forward_request_to_master returned ret=" << op_ret << dendl;
5011       return;
5012     }
5013   }
5014
5015   map<string, bufferlist> attrs = s->bucket_attrs;
5016   attrs[RGW_ATTR_CORS] = cors_bl;
5017   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
5018 }
5019
5020 int RGWDeleteCORS::verify_permission()
5021 {
5022   if (false == s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
5023     return -EACCES;
5024   }
5025
5026   return 0;
5027 }
5028
5029 void RGWDeleteCORS::execute()
5030 {
5031   op_ret = read_bucket_cors();
5032   if (op_ret < 0)
5033     return;
5034
5035   bufferlist bl;
5036   rgw_raw_obj obj;
5037   if (!cors_exist) {
5038     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
5039     op_ret = -ENOENT;
5040     return;
5041   }
5042   store->get_bucket_instance_obj(s->bucket, obj);
5043   store->set_prefetch_data(s->obj_ctx, obj);
5044   map<string, bufferlist> orig_attrs, attrs, rmattrs;
5045   map<string, bufferlist>::iterator iter;
5046
5047   op_ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker);
5048   if (op_ret < 0)
5049     return;
5050
5051   /* only remove meta attrs */
5052   for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) {
5053     const string& name = iter->first;
5054     dout(10) << "DeleteCORS : attr: " << name << dendl;
5055     if (name.compare(0, (sizeof(RGW_ATTR_CORS) - 1), RGW_ATTR_CORS) == 0) {
5056       rmattrs[name] = iter->second;
5057     } else if (attrs.find(name) == attrs.end()) {
5058       attrs[name] = iter->second;
5059     }
5060   }
5061   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker);
5062 }
5063
5064 void RGWOptionsCORS::get_response_params(string& hdrs, string& exp_hdrs, unsigned *max_age) {
5065   get_cors_response_headers(rule, req_hdrs, hdrs, exp_hdrs, max_age);
5066 }
5067
5068 int RGWOptionsCORS::validate_cors_request(RGWCORSConfiguration *cc) {
5069   rule = cc->host_name_rule(origin);
5070   if (!rule) {
5071     dout(10) << "There is no cors rule present for " << origin << dendl;
5072     return -ENOENT;
5073   }
5074
5075   if (!validate_cors_rule_method(rule, req_meth)) {
5076     return -ENOENT;
5077   }
5078   return 0;
5079 }
5080
5081 void RGWOptionsCORS::execute()
5082 {
5083   op_ret = read_bucket_cors();
5084   if (op_ret < 0)
5085     return;
5086
5087   origin = s->info.env->get("HTTP_ORIGIN");
5088   if (!origin) {
5089     dout(0) <<
5090     "Preflight request without mandatory Origin header"
5091     << dendl;
5092     op_ret = -EINVAL;
5093     return;
5094   }
5095   req_meth = s->info.env->get("HTTP_ACCESS_CONTROL_REQUEST_METHOD");
5096   if (!req_meth) {
5097     dout(0) <<
5098     "Preflight request without mandatory Access-control-request-method header"
5099     << dendl;
5100     op_ret = -EINVAL;
5101     return;
5102   }
5103   if (!cors_exist) {
5104     dout(2) << "No CORS configuration set yet for this bucket" << dendl;
5105     op_ret = -ENOENT;
5106     return;
5107   }
5108   req_hdrs = s->info.env->get("HTTP_ACCESS_CONTROL_REQUEST_HEADERS");
5109   op_ret = validate_cors_request(&bucket_cors);
5110   if (!rule) {
5111     origin = req_meth = NULL;
5112     return;
5113   }
5114   return;
5115 }
5116
5117 int RGWGetRequestPayment::verify_permission()
5118 {
5119   if (s->iam_policy &&
5120       s->iam_policy->eval(s->env, *s->auth.identity,
5121                           rgw::IAM::s3GetBucketRequestPayment,
5122                           ARN(s->bucket)) != Effect::Allow) {
5123       return -EACCES;
5124   }
5125   return 0;
5126 }
5127
5128 void RGWGetRequestPayment::pre_exec()
5129 {
5130   rgw_bucket_object_pre_exec(s);
5131 }
5132
5133 void RGWGetRequestPayment::execute()
5134 {
5135   requester_pays = s->bucket_info.requester_pays;
5136 }
5137
5138 int RGWSetRequestPayment::verify_permission()
5139 {
5140   if (s->iam_policy) {
5141     if (s->iam_policy->eval(s->env, *s->auth.identity,
5142                             rgw::IAM::s3PutBucketRequestPayment,
5143                             ARN(s->bucket)) == Effect::Allow) {
5144       return 0;
5145     }
5146   } else if (s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
5147     return 0;
5148   }
5149   return -EACCES;
5150 }
5151
5152 void RGWSetRequestPayment::pre_exec()
5153 {
5154   rgw_bucket_object_pre_exec(s);
5155 }
5156
5157 void RGWSetRequestPayment::execute()
5158 {
5159   op_ret = get_params();
5160
5161   if (op_ret < 0)
5162     return;
5163
5164   s->bucket_info.requester_pays = requester_pays;
5165   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
5166                                            &s->bucket_attrs);
5167   if (op_ret < 0) {
5168     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
5169                      << " returned err=" << op_ret << dendl;
5170     return;
5171   }
5172 }
5173
5174 int RGWInitMultipart::verify_permission()
5175 {
5176   if (s->iam_policy) {
5177     auto e = s->iam_policy->eval(s->env, *s->auth.identity,
5178                                  rgw::IAM::s3PutObject,
5179                                  rgw_obj(s->bucket, s->object));
5180     if (e == Effect::Allow) {
5181       return 0;
5182     } else if (e == Effect::Deny) {
5183       return -EACCES;
5184     }
5185   }
5186
5187   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
5188     return -EACCES;
5189   }
5190
5191   return 0;
5192 }
5193
5194 void RGWInitMultipart::pre_exec()
5195 {
5196   rgw_bucket_object_pre_exec(s);
5197 }
5198
5199 void RGWInitMultipart::execute()
5200 {
5201   bufferlist aclbl;
5202   map<string, bufferlist> attrs;
5203   rgw_obj obj;
5204
5205   if (get_params() < 0)
5206     return;
5207
5208   if (s->object.empty())
5209     return;
5210
5211   policy.encode(aclbl);
5212   attrs[RGW_ATTR_ACL] = aclbl;
5213
5214   populate_with_generic_attrs(s, attrs);
5215
5216   /* select encryption mode */
5217   op_ret = prepare_encryption(attrs);
5218   if (op_ret != 0)
5219     return;
5220
5221   op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
5222   if (op_ret < 0) {
5223     return;
5224   }
5225
5226   do {
5227     char buf[33];
5228     gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);
5229     upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
5230     upload_id.append(buf);
5231
5232     string tmp_obj_name;
5233     RGWMPObj mp(s->object.name, upload_id);
5234     tmp_obj_name = mp.get_meta();
5235
5236     obj.init_ns(s->bucket, tmp_obj_name, mp_ns);
5237     // the meta object will be indexed with 0 size, we c
5238     obj.set_in_extra_data(true);
5239     obj.index_hash_source = s->object.name;
5240
5241     RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
5242     op_target.set_versioning_disabled(true); /* no versioning for multipart meta */
5243
5244     RGWRados::Object::Write obj_op(&op_target);
5245
5246     obj_op.meta.owner = s->owner.get_id();
5247     obj_op.meta.category = RGW_OBJ_CATEGORY_MULTIMETA;
5248     obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
5249
5250     op_ret = obj_op.write_meta(0, 0, attrs);
5251   } while (op_ret == -EEXIST);
5252 }
5253
5254 static int get_multipart_info(RGWRados *store, struct req_state *s,
5255                               string& meta_oid,
5256                               RGWAccessControlPolicy *policy,
5257                               map<string, bufferlist>& attrs)
5258 {
5259   map<string, bufferlist>::iterator iter;
5260   bufferlist header;
5261
5262   rgw_obj obj;
5263   obj.init_ns(s->bucket, meta_oid, mp_ns);
5264   obj.set_in_extra_data(true);
5265
5266   int op_ret = get_obj_attrs(store, s, obj, attrs);
5267   if (op_ret < 0) {
5268     if (op_ret == -ENOENT) {
5269       return -ERR_NO_SUCH_UPLOAD;
5270     }
5271     return op_ret;
5272   }
5273
5274   if (policy) {
5275     for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
5276       string name = iter->first;
5277       if (name.compare(RGW_ATTR_ACL) == 0) {
5278         bufferlist& bl = iter->second;
5279         bufferlist::iterator bli = bl.begin();
5280         try {
5281           ::decode(*policy, bli);
5282         } catch (buffer::error& err) {
5283           ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
5284           return -EIO;
5285         }
5286         break;
5287       }
5288     }
5289   }
5290
5291   return 0;
5292 }
5293
5294 int RGWCompleteMultipart::verify_permission()
5295 {
5296   if (s->iam_policy) {
5297     auto e = s->iam_policy->eval(s->env, *s->auth.identity,
5298                                  rgw::IAM::s3PutObject,
5299                                  rgw_obj(s->bucket, s->object));
5300     if (e == Effect::Allow) {
5301       return 0;
5302     } else if (e == Effect::Deny) {
5303       return -EACCES;
5304     }
5305   }
5306
5307   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
5308     return -EACCES;
5309   }
5310
5311   return 0;
5312 }
5313
5314 void RGWCompleteMultipart::pre_exec()
5315 {
5316   rgw_bucket_object_pre_exec(s);
5317 }
5318
5319 void RGWCompleteMultipart::execute()
5320 {
5321   RGWMultiCompleteUpload *parts;
5322   map<int, string>::iterator iter;
5323   RGWMultiXMLParser parser;
5324   string meta_oid;
5325   map<uint32_t, RGWUploadPartInfo> obj_parts;
5326   map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
5327   map<string, bufferlist> attrs;
5328   off_t ofs = 0;
5329   MD5 hash;
5330   char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
5331   char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
5332   bufferlist etag_bl;
5333   rgw_obj meta_obj;
5334   rgw_obj target_obj;
5335   RGWMPObj mp;
5336   RGWObjManifest manifest;
5337   uint64_t olh_epoch = 0;
5338   string version_id;
5339
5340   op_ret = get_params();
5341   if (op_ret < 0)
5342     return;
5343   op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
5344   if (op_ret < 0) {
5345     return;
5346   }
5347
5348   if (!data || !len) {
5349     op_ret = -ERR_MALFORMED_XML;
5350     return;
5351   }
5352
5353   if (!parser.init()) {
5354     op_ret = -EIO;
5355     return;
5356   }
5357
5358   if (!parser.parse(data, len, 1)) {
5359     op_ret = -ERR_MALFORMED_XML;
5360     return;
5361   }
5362
5363   parts = static_cast<RGWMultiCompleteUpload *>(parser.find_first("CompleteMultipartUpload"));
5364   if (!parts || parts->parts.empty()) {
5365     op_ret = -ERR_MALFORMED_XML;
5366     return;
5367   }
5368
5369   if ((int)parts->parts.size() >
5370       s->cct->_conf->rgw_multipart_part_upload_limit) {
5371     op_ret = -ERANGE;
5372     return;
5373   }
5374
5375   mp.init(s->object.name, upload_id);
5376   meta_oid = mp.get_meta();
5377
5378   int total_parts = 0;
5379   int handled_parts = 0;
5380   int max_parts = 1000;
5381   int marker = 0;
5382   bool truncated;
5383   RGWCompressionInfo cs_info;
5384   bool compressed = false;
5385   uint64_t accounted_size = 0;
5386
5387   uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size;
5388
5389   list<rgw_obj_index_key> remove_objs; /* objects to be removed from index listing */
5390
5391   bool versioned_object = s->bucket_info.versioning_enabled();
5392
5393   iter = parts->parts.begin();
5394
5395   meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
5396   meta_obj.set_in_extra_data(true);
5397   meta_obj.index_hash_source = s->object.name;
5398
5399   /*take a cls lock on meta_obj to prevent racing completions (or retries)
5400     from deleting the parts*/
5401   rgw_pool meta_pool;
5402   rgw_raw_obj raw_obj;
5403   int max_lock_secs_mp =
5404     s->cct->_conf->get_val<int64_t>("rgw_mp_lock_max_time");
5405   utime_t dur(max_lock_secs_mp, 0);
5406
5407   store->obj_to_raw((s->bucket_info).placement_rule, meta_obj, &raw_obj);
5408   store->get_obj_data_pool((s->bucket_info).placement_rule,
5409                            meta_obj,&meta_pool);
5410   store->open_pool_ctx(meta_pool, serializer.ioctx);
5411
5412   op_ret = serializer.try_lock(raw_obj.oid, dur);
5413   if (op_ret < 0) {
5414     dout(0) << "RGWCompleteMultipart::execute() failed to acquire lock " << dendl;
5415     op_ret = -ERR_INTERNAL_ERROR;
5416     s->err.message = "This multipart completion is already in progress";
5417     return;
5418   }
5419
5420   op_ret = get_obj_attrs(store, s, meta_obj, attrs);
5421
5422   if (op_ret < 0) {
5423     ldout(s->cct, 0) << "ERROR: failed to get obj attrs, obj=" << meta_obj
5424                      << " ret=" << op_ret << dendl;
5425     return;
5426   }
5427
5428   do {
5429     op_ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts,
5430                                   marker, obj_parts, &marker, &truncated);
5431     if (op_ret == -ENOENT) {
5432       op_ret = -ERR_NO_SUCH_UPLOAD;
5433     }
5434     if (op_ret < 0)
5435       return;
5436
5437     total_parts += obj_parts.size();
5438     if (!truncated && total_parts != (int)parts->parts.size()) {
5439       ldout(s->cct, 0) << "NOTICE: total parts mismatch: have: " << total_parts
5440                        << " expected: " << parts->parts.size() << dendl;
5441       op_ret = -ERR_INVALID_PART;
5442       return;
5443     }
5444
5445     for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter, ++handled_parts) {
5446       uint64_t part_size = obj_iter->second.accounted_size;
5447       if (handled_parts < (int)parts->parts.size() - 1 &&
5448           part_size < min_part_size) {
5449         op_ret = -ERR_TOO_SMALL;
5450         return;
5451       }
5452
5453       char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
5454       if (iter->first != (int)obj_iter->first) {
5455         ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: "
5456                          << iter->first << " next uploaded: "
5457                          << obj_iter->first << dendl;
5458         op_ret = -ERR_INVALID_PART;
5459         return;
5460       }
5461       string part_etag = rgw_string_unquote(iter->second);
5462       if (part_etag.compare(obj_iter->second.etag) != 0) {
5463         ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first
5464                          << " etag: " << iter->second << dendl;
5465         op_ret = -ERR_INVALID_PART;
5466         return;
5467       }
5468
5469       hex_to_buf(obj_iter->second.etag.c_str(), petag,
5470                 CEPH_CRYPTO_MD5_DIGESTSIZE);
5471       hash.Update((const byte *)petag, sizeof(petag));
5472
5473       RGWUploadPartInfo& obj_part = obj_iter->second;
5474
5475       /* update manifest for part */
5476       string oid = mp.get_part(obj_iter->second.num);
5477       rgw_obj src_obj;
5478       src_obj.init_ns(s->bucket, oid, mp_ns);
5479
5480       if (obj_part.manifest.empty()) {
5481         ldout(s->cct, 0) << "ERROR: empty manifest for object part: obj="
5482                          << src_obj << dendl;
5483         op_ret = -ERR_INVALID_PART;
5484         return;
5485       } else {
5486         manifest.append(obj_part.manifest, store);
5487       }
5488
5489       if (obj_part.cs_info.compression_type != "none") {
5490         if (compressed && cs_info.compression_type != obj_part.cs_info.compression_type) {
5491           ldout(s->cct, 0) << "ERROR: compression type was changed during multipart upload ("
5492                            << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
5493           op_ret = -ERR_INVALID_PART;
5494           return;
5495         }
5496         int64_t new_ofs; // offset in compression data for new part
5497         if (cs_info.blocks.size() > 0)
5498           new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
5499         else
5500           new_ofs = 0;
5501         for (const auto& block : obj_part.cs_info.blocks) {
5502           compression_block cb;
5503           cb.old_ofs = block.old_ofs + cs_info.orig_size;
5504           cb.new_ofs = new_ofs;
5505           cb.len = block.len;
5506           cs_info.blocks.push_back(cb);
5507           new_ofs = cb.new_ofs + cb.len;
5508         } 
5509         if (!compressed)
5510           cs_info.compression_type = obj_part.cs_info.compression_type;
5511         cs_info.orig_size += obj_part.cs_info.orig_size;
5512         compressed = true;
5513       }
5514
5515       rgw_obj_index_key remove_key;
5516       src_obj.key.get_index_key(&remove_key);
5517
5518       remove_objs.push_back(remove_key);
5519
5520       ofs += obj_part.size;
5521       accounted_size += obj_part.accounted_size;
5522     }
5523   } while (truncated);
5524   hash.Final((byte *)final_etag);
5525
5526   buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
5527   snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],  sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
5528            "-%lld", (long long)parts->parts.size());
5529   etag = final_etag_str;
5530   ldout(s->cct, 10) << "calculated etag: " << final_etag_str << dendl;
5531
5532   etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
5533
5534   attrs[RGW_ATTR_ETAG] = etag_bl;
5535
5536   if (compressed) {
5537     // write compression attribute to full object
5538     bufferlist tmp;
5539     ::encode(cs_info, tmp);
5540     attrs[RGW_ATTR_COMPRESSION] = tmp;
5541   }
5542
5543   target_obj.init(s->bucket, s->object.name);
5544   if (versioned_object) {
5545     store->gen_rand_obj_instance_name(&target_obj);
5546   }
5547
5548   RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
5549
5550   obj_ctx.obj.set_atomic(target_obj);
5551
5552   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), target_obj);
5553   RGWRados::Object::Write obj_op(&op_target);
5554
5555   obj_op.meta.manifest = &manifest;
5556   obj_op.meta.remove_objs = &remove_objs;
5557
5558   obj_op.meta.ptag = &s->req_id; /* use req_id as operation tag */
5559   obj_op.meta.owner = s->owner.get_id();
5560   obj_op.meta.flags = PUT_OBJ_CREATE;
5561   obj_op.meta.modify_tail = true;
5562   obj_op.meta.completeMultipart = true;
5563   op_ret = obj_op.write_meta(ofs, accounted_size, attrs);
5564   if (op_ret < 0)
5565     return;
5566
5567   // remove the upload obj
5568   int r = store->delete_obj(*static_cast<RGWObjectCtx *>(s->obj_ctx),
5569                             s->bucket_info, meta_obj, 0);
5570   if (r >= 0)  {
5571     /* serializer's exclusive lock is released */
5572     serializer.clear_locked();
5573   } else {
5574       ldout(store->ctx(), 0) << "WARNING: failed to remove object "
5575                              << meta_obj << dendl;
5576   }
5577 }
5578
5579 int RGWCompleteMultipart::MPSerializer::try_lock(
5580   const std::string& _oid,
5581   utime_t dur)
5582 {
5583   oid = _oid;
5584   op.assert_exists();
5585   lock.set_duration(dur);
5586   lock.lock_exclusive(&op);
5587   int ret = ioctx.operate(oid, &op);
5588   if (! ret) {
5589     locked = true;
5590   }
5591   return ret;
5592 }
5593
5594 void RGWCompleteMultipart::complete()
5595 {
5596   /* release exclusive lock iff not already */
5597   if (unlikely(serializer.locked)) {
5598     int r = serializer.unlock();
5599     if (r < 0) {
5600       ldout(store->ctx(), 0) << "WARNING: failed to unlock "
5601                              << serializer.oid << dendl;
5602     }
5603   }
5604   send_response();
5605 }
5606
5607 int RGWAbortMultipart::verify_permission()
5608 {
5609   if (s->iam_policy) {
5610     auto e = s->iam_policy->eval(s->env, *s->auth.identity,
5611                                  rgw::IAM::s3AbortMultipartUpload,
5612                                  rgw_obj(s->bucket, s->object));
5613     if (e == Effect::Allow) {
5614       return 0;
5615     } else if (e == Effect::Deny) {
5616       return -EACCES;
5617     }
5618   }
5619
5620   if (!verify_bucket_permission_no_policy(s, RGW_PERM_WRITE)) {
5621     return -EACCES;
5622   }
5623
5624   return 0;
5625 }
5626
5627 void RGWAbortMultipart::pre_exec()
5628 {
5629   rgw_bucket_object_pre_exec(s);
5630 }
5631
5632 void RGWAbortMultipart::execute()
5633 {
5634   op_ret = -EINVAL;
5635   string upload_id;
5636   string meta_oid;
5637   upload_id = s->info.args.get("uploadId");
5638   map<string, bufferlist> attrs;
5639   rgw_obj meta_obj;
5640   RGWMPObj mp;
5641
5642   if (upload_id.empty() || s->object.empty())
5643     return;
5644
5645   mp.init(s->object.name, upload_id);
5646   meta_oid = mp.get_meta();
5647
5648   op_ret = get_multipart_info(store, s, meta_oid, NULL, attrs);
5649   if (op_ret < 0)
5650     return;
5651
5652   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
5653   op_ret = abort_multipart_upload(store, s->cct, obj_ctx, s->bucket_info, mp);
5654 }
5655
5656 int RGWListMultipart::verify_permission()
5657 {
5658   if (!verify_object_permission(s, rgw::IAM::s3ListMultipartUploadParts))
5659     return -EACCES;
5660
5661   return 0;
5662 }
5663
5664 void RGWListMultipart::pre_exec()
5665 {
5666   rgw_bucket_object_pre_exec(s);
5667 }
5668
5669 void RGWListMultipart::execute()
5670 {
5671   map<string, bufferlist> xattrs;
5672   string meta_oid;
5673   RGWMPObj mp;
5674
5675   op_ret = get_params();
5676   if (op_ret < 0)
5677     return;
5678
5679   mp.init(s->object.name, upload_id);
5680   meta_oid = mp.get_meta();
5681
5682   op_ret = get_multipart_info(store, s, meta_oid, &policy, xattrs);
5683   if (op_ret < 0)
5684     return;
5685
5686   op_ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts,
5687                                 marker, parts, NULL, &truncated);
5688 }
5689
5690 int RGWListBucketMultiparts::verify_permission()
5691 {
5692   if (!verify_bucket_permission(s,
5693                                 rgw::IAM::s3ListBucketMultiPartUploads))
5694     return -EACCES;
5695
5696   return 0;
5697 }
5698
5699 void RGWListBucketMultiparts::pre_exec()
5700 {
5701   rgw_bucket_object_pre_exec(s);
5702 }
5703
5704 void RGWListBucketMultiparts::execute()
5705 {
5706   vector<rgw_bucket_dir_entry> objs;
5707   string marker_meta;
5708
5709   op_ret = get_params();
5710   if (op_ret < 0)
5711     return;
5712
5713   if (s->prot_flags & RGW_REST_SWIFT) {
5714     string path_args;
5715     path_args = s->info.args.get("path");
5716     if (!path_args.empty()) {
5717       if (!delimiter.empty() || !prefix.empty()) {
5718         op_ret = -EINVAL;
5719         return;
5720       }
5721       prefix = path_args;
5722       delimiter="/";
5723     }
5724   }
5725   marker_meta = marker.get_meta();
5726
5727   op_ret = list_bucket_multiparts(store, s->bucket_info, prefix, marker_meta, delimiter,
5728                                   max_uploads, &objs, &common_prefixes, &is_truncated);
5729   if (op_ret < 0) {
5730     return;
5731   }
5732
5733   if (!objs.empty()) {
5734     vector<rgw_bucket_dir_entry>::iterator iter;
5735     RGWMultipartUploadEntry entry;
5736     for (iter = objs.begin(); iter != objs.end(); ++iter) {
5737       rgw_obj_key key(iter->key);
5738       if (!entry.mp.from_meta(key.name))
5739         continue;
5740       entry.obj = *iter;
5741       uploads.push_back(entry);
5742     }
5743     next_marker = entry;
5744   }
5745 }
5746
5747 void RGWGetHealthCheck::execute()
5748 {
5749   if (!g_conf->rgw_healthcheck_disabling_path.empty() &&
5750       (::access(g_conf->rgw_healthcheck_disabling_path.c_str(), F_OK) == 0)) {
5751     /* Disabling path specified & existent in the filesystem. */
5752     op_ret = -ERR_SERVICE_UNAVAILABLE; /* 503 */
5753   } else {
5754     op_ret = 0; /* 200 OK */
5755   }
5756 }
5757
5758 int RGWDeleteMultiObj::verify_permission()
5759 {
5760   acl_allowed = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
5761   if (!acl_allowed && !s->iam_policy)
5762     return -EACCES;
5763
5764   return 0;
5765 }
5766
5767 void RGWDeleteMultiObj::pre_exec()
5768 {
5769   rgw_bucket_object_pre_exec(s);
5770 }
5771
5772 void RGWDeleteMultiObj::execute()
5773 {
5774   RGWMultiDelDelete *multi_delete;
5775   vector<rgw_obj_key>::iterator iter;
5776   RGWMultiDelXMLParser parser;
5777   int num_processed = 0;
5778   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
5779
5780   op_ret = get_params();
5781   if (op_ret < 0) {
5782     goto error;
5783   }
5784
5785   if (!data) {
5786     op_ret = -EINVAL;
5787     goto error;
5788   }
5789
5790   if (!parser.init()) {
5791     op_ret = -EINVAL;
5792     goto error;
5793   }
5794
5795   if (!parser.parse(data, len, 1)) {
5796     op_ret = -EINVAL;
5797     goto error;
5798   }
5799
5800   multi_delete = static_cast<RGWMultiDelDelete *>(parser.find_first("Delete"));
5801   if (!multi_delete) {
5802     op_ret = -EINVAL;
5803     goto error;
5804   }
5805
5806   if (multi_delete->is_quiet())
5807     quiet = true;
5808
5809   begin_response();
5810   if (multi_delete->objects.empty()) {
5811     goto done;
5812   }
5813
5814   for (iter = multi_delete->objects.begin();
5815         iter != multi_delete->objects.end() && num_processed < max_to_delete;
5816         ++iter, num_processed++) {
5817     rgw_obj obj(bucket, *iter);
5818     if (s->iam_policy) {
5819       auto e = s->iam_policy->eval(s->env,
5820                                    *s->auth.identity,
5821                                    iter->instance.empty() ?
5822                                    rgw::IAM::s3DeleteObject :
5823                                    rgw::IAM::s3DeleteObjectVersion,
5824                                    obj);
5825       if ((e == Effect::Deny) ||
5826           (e == Effect::Pass && !acl_allowed)) {
5827         send_partial_response(*iter, false, "", -EACCES);
5828         continue;
5829       }
5830     }
5831
5832     obj_ctx->obj.set_atomic(obj);
5833
5834     RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, obj);
5835     RGWRados::Object::Delete del_op(&del_target);
5836
5837     del_op.params.bucket_owner = s->bucket_owner.get_id();
5838     del_op.params.versioning_status = s->bucket_info.versioning_status();
5839     del_op.params.obj_owner = s->owner;
5840
5841     op_ret = del_op.delete_obj();
5842     if (op_ret == -ENOENT) {
5843       op_ret = 0;
5844     }
5845
5846     send_partial_response(*iter, del_op.result.delete_marker,
5847                           del_op.result.version_id, op_ret);
5848   }
5849
5850   /*  set the return code to zero, errors at this point will be
5851   dumped to the response */
5852   op_ret = 0;
5853
5854 done:
5855   // will likely segfault if begin_response() has not been called
5856   end_response();
5857   free(data);
5858   return;
5859
5860 error:
5861   send_status();
5862   free(data);
5863   return;
5864
5865 }
5866
5867 bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
5868                                                map<string, bufferlist>& battrs,
5869                                                ACLOwner& bucket_owner /* out */)
5870 {
5871   RGWAccessControlPolicy bacl(store->ctx());
5872   int ret = read_bucket_policy(store, s, binfo, battrs, &bacl, binfo.bucket);
5873   if (ret < 0) {
5874     return false;
5875   }
5876
5877   auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
5878
5879   bucket_owner = bacl.get_owner();
5880
5881   /* We can use global user_acl because each BulkDelete request is allowed
5882    * to work on entities from a single account only. */
5883   return verify_bucket_permission(s, binfo.bucket, s->user_acl.get(),
5884                                   &bacl, policy, rgw::IAM::s3DeleteBucket);
5885 }
5886
5887 bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
5888 {
5889   auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
5890
5891   RGWBucketInfo binfo;
5892   map<string, bufferlist> battrs;
5893   ACLOwner bowner;
5894
5895   int ret = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
5896                                    path.bucket_name, binfo, nullptr,
5897                                    &battrs);
5898   if (ret < 0) {
5899     goto binfo_fail;
5900   }
5901
5902   if (!verify_permission(binfo, battrs, bowner)) {
5903     ret = -EACCES;
5904     goto auth_fail;
5905   }
5906
5907   if (!path.obj_key.empty()) {
5908     rgw_obj obj(binfo.bucket, path.obj_key);
5909     obj_ctx.obj.set_atomic(obj);
5910
5911     RGWRados::Object del_target(store, binfo, obj_ctx, obj);
5912     RGWRados::Object::Delete del_op(&del_target);
5913
5914     del_op.params.bucket_owner = binfo.owner;
5915     del_op.params.versioning_status = binfo.versioning_status();
5916     del_op.params.obj_owner = bowner;
5917
5918     ret = del_op.delete_obj();
5919     if (ret < 0) {
5920       goto delop_fail;
5921     }
5922   } else {
5923     RGWObjVersionTracker ot;
5924     ot.read_version = binfo.ep_objv;
5925
5926     ret = store->delete_bucket(binfo, ot);
5927     if (0 == ret) {
5928       ret = rgw_unlink_bucket(store, binfo.owner, binfo.bucket.tenant,
5929                               binfo.bucket.name, false);
5930       if (ret < 0) {
5931         ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << ret
5932                          << dendl;
5933       }
5934     }
5935     if (ret < 0) {
5936       goto delop_fail;
5937     }
5938
5939     if (!store->is_meta_master()) {
5940       bufferlist in_data;
5941       ret = forward_request_to_master(s, &ot.read_version, store, in_data,
5942                                       nullptr);
5943       if (ret < 0) {
5944         if (ret == -ENOENT) {
5945           /* adjust error, we want to return with NoSuchBucket and not
5946            * NoSuchKey */
5947           ret = -ERR_NO_SUCH_BUCKET;
5948         }
5949         goto delop_fail;
5950       }
5951     }
5952   }
5953
5954   num_deleted++;
5955   return true;
5956
5957
5958 binfo_fail:
5959     if (-ENOENT == ret) {
5960       ldout(store->ctx(), 20) << "cannot find bucket = " << path.bucket_name << dendl;
5961       num_unfound++;
5962     } else {
5963       ldout(store->ctx(), 20) << "cannot get bucket info, ret = " << ret
5964                               << dendl;
5965
5966       fail_desc_t failed_item = {
5967         .err  = ret,
5968         .path = path
5969       };
5970       failures.push_back(failed_item);
5971     }
5972     return false;
5973
5974 auth_fail:
5975     ldout(store->ctx(), 20) << "wrong auth for " << path << dendl;
5976     {
5977       fail_desc_t failed_item = {
5978         .err  = ret,
5979         .path = path
5980       };
5981       failures.push_back(failed_item);
5982     }
5983     return false;
5984
5985 delop_fail:
5986     if (-ENOENT == ret) {
5987       ldout(store->ctx(), 20) << "cannot find entry " << path << dendl;
5988       num_unfound++;
5989     } else {
5990       fail_desc_t failed_item = {
5991         .err  = ret,
5992         .path = path
5993       };
5994       failures.push_back(failed_item);
5995     }
5996     return false;
5997 }
5998
5999 bool RGWBulkDelete::Deleter::delete_chunk(const std::list<acct_path_t>& paths)
6000 {
6001   ldout(store->ctx(), 20) << "in delete_chunk" << dendl;
6002   for (auto path : paths) {
6003     ldout(store->ctx(), 20) << "bulk deleting path: " << path << dendl;
6004     delete_single(path);
6005   }
6006
6007   return true;
6008 }
6009
6010 int RGWBulkDelete::verify_permission()
6011 {
6012   return 0;
6013 }
6014
6015 void RGWBulkDelete::pre_exec()
6016 {
6017   rgw_bucket_object_pre_exec(s);
6018 }
6019
6020 void RGWBulkDelete::execute()
6021 {
6022   deleter = std::unique_ptr<Deleter>(new Deleter(store, s));
6023
6024   bool is_truncated = false;
6025   do {
6026     list<RGWBulkDelete::acct_path_t> items;
6027
6028     int ret = get_data(items, &is_truncated);
6029     if (ret < 0) {
6030       return;
6031     }
6032
6033     ret = deleter->delete_chunk(items);
6034   } while (!op_ret && is_truncated);
6035
6036   return;
6037 }
6038
6039
6040 constexpr std::array<int, 2> RGWBulkUploadOp::terminal_errors;
6041
6042 int RGWBulkUploadOp::verify_permission()
6043 {
6044   if (s->auth.identity->is_anonymous()) {
6045     return -EACCES;
6046   }
6047
6048   if (! verify_user_permission(s, RGW_PERM_WRITE)) {
6049     return -EACCES;
6050   }
6051
6052   if (s->user->user_id.tenant != s->bucket_tenant) {
6053     ldout(s->cct, 10) << "user cannot create a bucket in a different tenant"
6054                       << " (user_id.tenant=" << s->user->user_id.tenant
6055                       << " requested=" << s->bucket_tenant << ")"
6056                       << dendl;
6057     return -EACCES;
6058   }
6059
6060   if (s->user->max_buckets < 0) {
6061     return -EPERM;
6062   }
6063
6064   return 0;
6065 }
6066
6067 void RGWBulkUploadOp::pre_exec()
6068 {
6069   rgw_bucket_object_pre_exec(s);
6070 }
6071
6072 boost::optional<std::pair<std::string, rgw_obj_key>>
6073 RGWBulkUploadOp::parse_path(const boost::string_ref& path)
6074 {
6075   /* We need to skip all slashes at the beginning in order to preserve
6076    * compliance with Swift. */
6077   const size_t start_pos = path.find_first_not_of('/');
6078
6079   if (boost::string_ref::npos != start_pos) {
6080     /* Seperator is the first slash after the leading ones. */
6081     const size_t sep_pos = path.substr(start_pos).find('/');
6082
6083     if (boost::string_ref::npos != sep_pos) {
6084       const auto bucket_name = path.substr(start_pos, sep_pos - start_pos);
6085       const auto obj_name = path.substr(sep_pos + 1);
6086
6087       return std::make_pair(bucket_name.to_string(),
6088                             rgw_obj_key(obj_name.to_string()));
6089     } else {
6090       /* It's guaranteed here that bucket name is at least one character
6091        * long and is different than slash. */
6092       return std::make_pair(path.substr(start_pos).to_string(),
6093                             rgw_obj_key());
6094     }
6095   }
6096
6097   return none;
6098 }
6099
6100 std::pair<std::string, std::string>
6101 RGWBulkUploadOp::handle_upload_path(struct req_state *s)
6102 {
6103   std::string bucket_path, file_prefix;
6104   if (! s->init_state.url_bucket.empty()) {
6105     file_prefix = bucket_path = s->init_state.url_bucket + "/";
6106     if (! s->object.empty()) {
6107       std::string& object_name = s->object.name;
6108
6109       /* As rgw_obj_key::empty() already verified emptiness of s->object.name,
6110        * we can safely examine its last element. */
6111       if (object_name.back() == '/') {
6112         file_prefix.append(object_name);
6113       } else {
6114         file_prefix.append(object_name).append("/");
6115       }
6116     }
6117   }
6118   return std::make_pair(bucket_path, file_prefix);
6119 }
6120
6121 int RGWBulkUploadOp::handle_dir_verify_permission()
6122 {
6123   if (s->user->max_buckets > 0) {
6124     RGWUserBuckets buckets;
6125     std::string marker;
6126     bool is_truncated = false;
6127     op_ret = rgw_read_user_buckets(store, s->user->user_id, buckets,
6128                                    marker, std::string(), s->user->max_buckets,
6129                                    false, &is_truncated);
6130     if (op_ret < 0) {
6131       return op_ret;
6132     }
6133
6134     if (buckets.count() >= static_cast<size_t>(s->user->max_buckets)) {
6135       return -ERR_TOO_MANY_BUCKETS;
6136     }
6137   }
6138
6139   return 0;
6140 }
6141
6142 static void forward_req_info(CephContext *cct, req_info& info, const std::string& bucket_name)
6143 {
6144   /* the request of container or object level will contain bucket name.
6145    * only at account level need to append the bucket name */
6146   if (info.script_uri.find(bucket_name) != std::string::npos) {
6147     return;
6148   }
6149
6150   ldout(cct, 20) << "append the bucket: "<< bucket_name << " to req_info" << dendl;
6151   info.script_uri.append("/").append(bucket_name);
6152   info.request_uri_aws4 = info.request_uri = info.script_uri;
6153   info.effective_uri = "/" + bucket_name;
6154 }
6155
6156 int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
6157 {
6158   ldout(s->cct, 20) << "bulk upload: got directory=" << path << dendl;
6159
6160   op_ret = handle_dir_verify_permission();
6161   if (op_ret < 0) {
6162     return op_ret;
6163   }
6164
6165   std::string bucket_name;
6166   rgw_obj_key object_junk;
6167   std::tie(bucket_name, object_junk) =  *parse_path(path);
6168
6169   rgw_raw_obj obj(store->get_zone_params().domain_root,
6170                   rgw_make_bucket_entry_name(s->bucket_tenant, bucket_name));
6171
6172   /* we need to make sure we read bucket info, it's not read before for this
6173    * specific request */
6174   RGWBucketInfo binfo;
6175   std::map<std::string, ceph::bufferlist> battrs;
6176   op_ret = store->get_bucket_info(*dir_ctx, s->bucket_tenant, bucket_name,
6177                                   binfo, NULL, &battrs);
6178   if (op_ret < 0 && op_ret != -ENOENT) {
6179     return op_ret;
6180   }
6181   const bool bucket_exists = (op_ret != -ENOENT);
6182
6183   if (bucket_exists) {
6184     RGWAccessControlPolicy old_policy(s->cct);
6185     int r = get_bucket_policy_from_attr(s->cct, store, binfo,
6186                                         battrs, &old_policy);
6187     if (r >= 0)  {
6188       if (old_policy.get_owner().get_id().compare(s->user->user_id) != 0) {
6189         op_ret = -EEXIST;
6190         return op_ret;
6191       }
6192     }
6193   }
6194
6195   RGWBucketInfo master_info;
6196   rgw_bucket *pmaster_bucket = nullptr;
6197   uint32_t *pmaster_num_shards = nullptr;
6198   real_time creation_time;
6199   obj_version objv, ep_objv, *pobjv = nullptr;
6200
6201   if (! store->is_meta_master()) {
6202     JSONParser jp;
6203     ceph::bufferlist in_data;
6204     req_info info = s->info;
6205     forward_req_info(s->cct, info, bucket_name);
6206     op_ret = forward_request_to_master(s, nullptr, store, in_data, &jp, &info);
6207     if (op_ret < 0) {
6208       return op_ret;
6209     }
6210
6211     JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
6212     JSONDecoder::decode_json("object_ver", objv, &jp);
6213     JSONDecoder::decode_json("bucket_info", master_info, &jp);
6214
6215     ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver="
6216                       << objv.ver << dendl;
6217     ldout(s->cct, 20) << "got creation_time="<< master_info.creation_time
6218                       << dendl;
6219
6220     pmaster_bucket= &master_info.bucket;
6221     creation_time = master_info.creation_time;
6222     pmaster_num_shards = &master_info.num_shards;
6223     pobjv = &objv;
6224   } else {
6225     pmaster_bucket = nullptr;
6226     pmaster_num_shards = nullptr;
6227   }
6228
6229
6230   std::string placement_rule;
6231   if (bucket_exists) {
6232     std::string selected_placement_rule;
6233     rgw_bucket bucket;
6234     bucket.tenant = s->bucket_tenant;
6235     bucket.name = s->bucket_name;
6236     op_ret = store->select_bucket_placement(*(s->user),
6237                                             store->get_zonegroup().get_id(),
6238                                             placement_rule,
6239                                             &selected_placement_rule,
6240                                             nullptr);
6241     if (selected_placement_rule != binfo.placement_rule) {
6242       op_ret = -EEXIST;
6243       ldout(s->cct, 20) << "bulk upload: non-coherent placement rule" << dendl;
6244       return op_ret;
6245     }
6246   }
6247
6248   /* Create metadata: ACLs. */
6249   std::map<std::string, ceph::bufferlist> attrs;
6250   RGWAccessControlPolicy policy;
6251   policy.create_default(s->user->user_id, s->user->display_name);
6252   ceph::bufferlist aclbl;
6253   policy.encode(aclbl);
6254   attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
6255
6256   RGWQuotaInfo quota_info;
6257   const RGWQuotaInfo * pquota_info = nullptr;
6258
6259   rgw_bucket bucket;
6260   bucket.tenant = s->bucket_tenant; /* ignored if bucket exists */
6261   bucket.name = bucket_name;
6262
6263
6264   RGWBucketInfo out_info;
6265   op_ret = store->create_bucket(*(s->user),
6266                                 bucket,
6267                                 store->get_zonegroup().get_id(),
6268                                 placement_rule, binfo.swift_ver_location,
6269                                 pquota_info, attrs,
6270                                 out_info, pobjv, &ep_objv, creation_time,
6271                                 pmaster_bucket, pmaster_num_shards, true);
6272   /* continue if EEXIST and create_bucket will fail below.  this way we can
6273    * recover from a partial create by retrying it. */
6274   ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << op_ret
6275                     << ", bucket=" << bucket << dendl;
6276
6277   if (op_ret && op_ret != -EEXIST) {
6278     return op_ret;
6279   }
6280
6281   const bool existed = (op_ret == -EEXIST);
6282   if (existed) {
6283     /* bucket already existed, might have raced with another bucket creation, or
6284      * might be partial bucket creation that never completed. Read existing bucket
6285      * info, verify that the reported bucket owner is the current user.
6286      * If all is ok then update the user's list of buckets.
6287      * Otherwise inform client about a name conflict.
6288      */
6289     if (out_info.owner.compare(s->user->user_id) != 0) {
6290       op_ret = -EEXIST;
6291       ldout(s->cct, 20) << "bulk upload: conflicting bucket name" << dendl;
6292       return op_ret;
6293     }
6294     bucket = out_info.bucket;
6295   }
6296
6297   op_ret = rgw_link_bucket(store, s->user->user_id, bucket,
6298                            out_info.creation_time, false);
6299   if (op_ret && !existed && op_ret != -EEXIST) {
6300     /* if it exists (or previously existed), don't remove it! */
6301     op_ret = rgw_unlink_bucket(store, s->user->user_id,
6302                                bucket.tenant, bucket.name);
6303     if (op_ret < 0) {
6304       ldout(s->cct, 0) << "bulk upload: WARNING: failed to unlink bucket: ret="
6305                        << op_ret << dendl;
6306     }
6307   } else if (op_ret == -EEXIST || (op_ret == 0 && existed)) {
6308     ldout(s->cct, 20) << "bulk upload: containers already exists"
6309                       << dendl;
6310     op_ret = -ERR_BUCKET_EXISTS;
6311   }
6312
6313   return op_ret;
6314 }
6315
6316
6317 bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
6318                                                     const rgw_obj& obj,
6319                                                     std::map<std::string, ceph::bufferlist>& battrs,
6320                                                     ACLOwner& bucket_owner /* out */)
6321 {
6322   RGWAccessControlPolicy bacl(store->ctx());
6323   op_ret = read_bucket_policy(store, s, binfo, battrs, &bacl, binfo.bucket);
6324   if (op_ret < 0) {
6325     ldout(s->cct, 20) << "bulk upload: cannot read_policy() for bucket"
6326                       << dendl;
6327     return false;
6328   }
6329
6330   auto policy = get_iam_policy_from_attr(s->cct, store, battrs, binfo.bucket.tenant);
6331
6332   bucket_owner = bacl.get_owner();
6333   if (policy) {
6334     auto e = policy->eval(s->env, *s->auth.identity,
6335                           rgw::IAM::s3PutObject, obj);
6336     if (e == Effect::Allow) {
6337       return true;
6338     } else if (e == Effect::Deny) {
6339       return false;
6340     }
6341   }
6342     
6343   return verify_bucket_permission_no_policy(s, s->user_acl.get(),
6344                                             &bacl, RGW_PERM_WRITE);
6345 }
6346
6347 int RGWBulkUploadOp::handle_file(const boost::string_ref path,
6348                                  const size_t size,
6349                                  AlignedStreamGetter& body)
6350 {
6351
6352   ldout(s->cct, 20) << "bulk upload: got file=" << path << ", size=" << size
6353                     << dendl;
6354
6355   RGWPutObjDataProcessor *filter = nullptr;
6356   boost::optional<RGWPutObj_Compress> compressor;
6357
6358   if (size > static_cast<const size_t>(s->cct->_conf->rgw_max_put_size)) {
6359     op_ret = -ERR_TOO_LARGE;
6360     return op_ret;
6361   }
6362
6363   std::string bucket_name;
6364   rgw_obj_key object;
6365   std::tie(bucket_name, object) = *parse_path(path);
6366
6367   auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
6368   RGWBucketInfo binfo;
6369   std::map<std::string, ceph::bufferlist> battrs;
6370   ACLOwner bowner;
6371   op_ret = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
6372                                   bucket_name, binfo, nullptr, &battrs);
6373   if (op_ret == -ENOENT) {
6374     ldout(s->cct, 20) << "bulk upload: non existent directory=" << bucket_name
6375                       << dendl;
6376   } else if (op_ret < 0) {
6377     return op_ret;
6378   }
6379
6380   if (! handle_file_verify_permission(binfo,
6381                                       rgw_obj(binfo.bucket, object),
6382                                       battrs, bowner)) {
6383     ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl;
6384     op_ret = -EACCES;
6385     return op_ret;
6386   }
6387
6388   op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
6389                               user_quota, bucket_quota, size);
6390   if (op_ret < 0) {
6391     return op_ret;
6392   }
6393
6394   op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
6395   if (op_ret < 0) {
6396     return op_ret;
6397   }
6398
6399   RGWPutObjProcessor_Atomic processor(obj_ctx,
6400                                       binfo,
6401                                       binfo.bucket,
6402                                       object.name,
6403                                       /* part size */
6404                                       s->cct->_conf->rgw_obj_stripe_size,
6405                                       s->req_id,
6406                                       binfo.versioning_enabled());
6407
6408   /* No filters by default. */
6409   filter = &processor;
6410
6411   op_ret = processor.prepare(store, nullptr);
6412   if (op_ret < 0) {
6413     ldout(s->cct, 20) << "bulk upload: cannot prepare processor due to ret="
6414                       << op_ret << dendl;
6415     return op_ret;
6416   }
6417
6418   const auto& compression_type = store->get_zone_params().get_compression_type(
6419       binfo.placement_rule);
6420   CompressorRef plugin;
6421   if (compression_type != "none") {
6422     plugin = Compressor::create(s->cct, compression_type);
6423     if (! plugin) {
6424       ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
6425                        << compression_type << dendl;
6426     } else {
6427       compressor.emplace(s->cct, plugin, filter);
6428       filter = &*compressor;
6429     }
6430   }
6431
6432   /* Upload file content. */
6433   ssize_t len = 0;
6434   size_t ofs = 0;
6435   MD5 hash;
6436   do {
6437     ceph::bufferlist data;
6438     len = body.get_at_most(s->cct->_conf->rgw_max_chunk_size, data);
6439
6440     ldout(s->cct, 20) << "bulk upload: body=" << data.c_str() << dendl;
6441     if (len < 0) {
6442       op_ret = len;
6443       return op_ret;
6444     } else if (len > 0) {
6445       hash.Update((const byte *)data.c_str(), data.length());
6446       op_ret = put_data_and_throttle(filter, data, ofs, false);
6447       if (op_ret < 0) {
6448         ldout(s->cct, 20) << "processor->thottle_data() returned ret="
6449                           << op_ret << dendl;
6450         return op_ret;
6451       }
6452
6453       ofs += len;
6454     }
6455
6456   } while (len > 0);
6457
6458   if (ofs != size) {
6459     ldout(s->cct, 10) << "bulk upload: real file size different from declared"
6460                       << dendl;
6461     op_ret = -EINVAL;
6462   }
6463
6464   op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
6465                               user_quota, bucket_quota, size);
6466   if (op_ret < 0) {
6467     ldout(s->cct, 20) << "bulk upload: quota exceeded for path=" << path
6468                       << dendl;
6469     return op_ret;
6470   }
6471
6472   op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
6473   if (op_ret < 0) {
6474     return op_ret;
6475   }
6476
6477   char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
6478   unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
6479   hash.Final(m);
6480   buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
6481
6482   /* Create metadata: ETAG. */
6483   std::map<std::string, ceph::bufferlist> attrs;
6484   std::string etag = calc_md5;
6485   ceph::bufferlist etag_bl;
6486   etag_bl.append(etag.c_str(), etag.size() + 1);
6487   attrs.emplace(RGW_ATTR_ETAG, std::move(etag_bl));
6488
6489   /* Create metadata: ACLs. */
6490   RGWAccessControlPolicy policy;
6491   policy.create_default(s->user->user_id, s->user->display_name);
6492   ceph::bufferlist aclbl;
6493   policy.encode(aclbl);
6494   attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
6495
6496   /* Create metadata: compression info. */
6497   if (compressor && compressor->is_compressed()) {
6498     ceph::bufferlist tmp;
6499     RGWCompressionInfo cs_info;
6500     cs_info.compression_type = plugin->get_type_name();
6501     cs_info.orig_size = s->obj_size;
6502     cs_info.blocks = std::move(compressor->get_compression_blocks());
6503     ::encode(cs_info, tmp);
6504     attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
6505   }
6506
6507   /* Complete the transaction. */
6508   op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs,
6509                               ceph::real_time() /* delete_at */);
6510   if (op_ret < 0) {
6511     ldout(s->cct, 20) << "bulk upload: processor::complete returned op_ret="
6512                       << op_ret << dendl;
6513   }
6514
6515   return op_ret;
6516 }
6517
6518 void RGWBulkUploadOp::execute()
6519 {
6520   ceph::bufferlist buffer(64 * 1024);
6521
6522   ldout(s->cct, 20) << "bulk upload: start" << dendl;
6523
6524   /* Create an instance of stream-abstracting class. Having this indirection
6525    * allows for easy introduction of decompressors like gzip and bzip2. */
6526   auto stream = create_stream();
6527   if (! stream) {
6528     return;
6529   }
6530
6531   /* Handling the $UPLOAD_PATH accordingly to the Swift's Bulk middleware. See: 
6532    * https://github.com/openstack/swift/blob/2.13.0/swift/common/middleware/bulk.py#L31-L41 */
6533   std::string bucket_path, file_prefix;
6534   std::tie(bucket_path, file_prefix) = handle_upload_path(s);
6535
6536   auto status = rgw::tar::StatusIndicator::create();
6537   do {
6538     op_ret = stream->get_exactly(rgw::tar::BLOCK_SIZE, buffer);
6539     if (op_ret < 0) {
6540       ldout(s->cct, 2) << "bulk upload: cannot read header" << dendl;
6541       return;
6542     }
6543
6544     /* We need to re-interpret the buffer as a TAR block. Exactly two blocks
6545      * must be tracked to detect out end-of-archive. It occurs when both of
6546      * them are empty (zeroed). Tracing this particular inter-block dependency
6547      * is responsibility of the rgw::tar::StatusIndicator class. */
6548     boost::optional<rgw::tar::HeaderView> header;
6549     std::tie(status, header) = rgw::tar::interpret_block(status, buffer);
6550
6551     if (! status.empty() && header) {
6552       /* This specific block isn't empty (entirely zeroed), so we can parse
6553        * it as a TAR header and dispatch. At the moment we do support only
6554        * regular files and directories. Everything else (symlinks, devices)
6555        * will be ignored but won't cease the whole upload. */
6556       switch (header->get_filetype()) {
6557         case rgw::tar::FileType::NORMAL_FILE: {
6558           ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl;
6559
6560           boost::string_ref filename = bucket_path.empty() ? header->get_filename() : \
6561                             file_prefix + header->get_filename().to_string();
6562           auto body = AlignedStreamGetter(0, header->get_filesize(),
6563                                           rgw::tar::BLOCK_SIZE, *stream);
6564           op_ret = handle_file(filename,
6565                                header->get_filesize(),
6566                                body);
6567           if (! op_ret) {
6568             /* Only regular files counts. */
6569             num_created++;
6570           } else {
6571             failures.emplace_back(op_ret, filename.to_string());
6572           }
6573           break;
6574         }
6575         case rgw::tar::FileType::DIRECTORY: {
6576           ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
6577
6578           boost::string_ref dirname = bucket_path.empty() ? header->get_filename() : bucket_path;
6579           op_ret = handle_dir(dirname);
6580           if (op_ret < 0 && op_ret != -ERR_BUCKET_EXISTS) {
6581             failures.emplace_back(op_ret, dirname.to_string());
6582           }
6583           break;
6584         }
6585         default: {
6586           /* Not recognized. Skip. */
6587           op_ret = 0;
6588           break;
6589         }
6590       }
6591
6592       /* In case of any problems with sub-request authorization Swift simply
6593        * terminates whole upload immediately. */
6594       if (boost::algorithm::contains(std::initializer_list<int>{ op_ret },
6595                                      terminal_errors)) {
6596         ldout(s->cct, 2) << "bulk upload: terminating due to ret=" << op_ret
6597                          << dendl;
6598         break;
6599       }
6600     } else {
6601       ldout(s->cct, 2) << "bulk upload: an empty block" << dendl;
6602       op_ret = 0;
6603     }
6604
6605     buffer.clear();
6606   } while (! status.eof());
6607
6608   return;
6609 }
6610
6611 RGWBulkUploadOp::AlignedStreamGetter::~AlignedStreamGetter()
6612 {
6613   const size_t aligned_legnth = length + (-length % alignment);
6614   ceph::bufferlist junk;
6615
6616   DecoratedStreamGetter::get_exactly(aligned_legnth - position, junk);
6617 }
6618
6619 ssize_t RGWBulkUploadOp::AlignedStreamGetter::get_at_most(const size_t want,
6620                                                           ceph::bufferlist& dst)
6621 {
6622   const size_t max_to_read = std::min(want, length - position);
6623   const auto len = DecoratedStreamGetter::get_at_most(max_to_read, dst);
6624   if (len > 0) {
6625     position += len;
6626   }
6627   return len;
6628 }
6629
6630 ssize_t RGWBulkUploadOp::AlignedStreamGetter::get_exactly(const size_t want,
6631                                                           ceph::bufferlist& dst)
6632 {
6633   const auto len = DecoratedStreamGetter::get_exactly(want, dst);
6634   if (len > 0) {
6635     position += len;
6636   }
6637   return len;
6638 }
6639
6640 int RGWSetAttrs::verify_permission()
6641 {
6642   // This looks to be part of the RGW-NFS machinery and has no S3 or
6643   // Swift equivalent.
6644   bool perm;
6645   if (!s->object.empty()) {
6646     perm = verify_object_permission_no_policy(s, RGW_PERM_WRITE);
6647   } else {
6648     perm = verify_bucket_permission_no_policy(s, RGW_PERM_WRITE);
6649   }
6650   if (!perm)
6651     return -EACCES;
6652
6653   return 0;
6654 }
6655
6656 void RGWSetAttrs::pre_exec()
6657 {
6658   rgw_bucket_object_pre_exec(s);
6659 }
6660
6661 void RGWSetAttrs::execute()
6662 {
6663   op_ret = get_params();
6664   if (op_ret < 0)
6665     return;
6666
6667   rgw_obj obj(s->bucket, s->object);
6668
6669   if (!s->object.empty()) {
6670     store->set_atomic(s->obj_ctx, obj);
6671     op_ret = store->set_attrs(s->obj_ctx, s->bucket_info, obj, attrs, nullptr);
6672   } else {
6673     for (auto& iter : attrs) {
6674       s->bucket_attrs[iter.first] = std::move(iter.second);
6675     }
6676     op_ret = rgw_bucket_set_attrs(store, s->bucket_info, s->bucket_attrs,
6677                                   &s->bucket_info.objv_tracker);
6678   }
6679 }
6680
6681 void RGWGetObjLayout::pre_exec()
6682 {
6683   rgw_bucket_object_pre_exec(s);
6684 }
6685
6686 void RGWGetObjLayout::execute()
6687 {
6688   rgw_obj obj(s->bucket, s->object);
6689   RGWRados::Object target(store,
6690                           s->bucket_info,
6691                           *static_cast<RGWObjectCtx *>(s->obj_ctx),
6692                           rgw_obj(s->bucket, s->object));
6693   RGWRados::Object::Read stat_op(&target);
6694
6695   op_ret = stat_op.prepare();
6696   if (op_ret < 0) {
6697     return;
6698   }
6699
6700   head_obj = stat_op.state.head_obj;
6701
6702   op_ret = target.get_manifest(&manifest);
6703 }
6704
6705
6706 int RGWConfigBucketMetaSearch::verify_permission()
6707 {
6708   if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
6709     return -EACCES;
6710   }
6711
6712   return 0;
6713 }
6714
6715 void RGWConfigBucketMetaSearch::pre_exec()
6716 {
6717   rgw_bucket_object_pre_exec(s);
6718 }
6719
6720 void RGWConfigBucketMetaSearch::execute()
6721 {
6722   op_ret = get_params();
6723   if (op_ret < 0) {
6724     ldout(s->cct, 20) << "NOTICE: get_params() returned ret=" << op_ret << dendl;
6725     return;
6726   }
6727
6728   s->bucket_info.mdsearch_config = mdsearch_config;
6729
6730   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
6731   if (op_ret < 0) {
6732     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
6733     return;
6734   }
6735 }
6736
6737 int RGWGetBucketMetaSearch::verify_permission()
6738 {
6739   if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
6740     return -EACCES;
6741   }
6742
6743   return 0;
6744 }
6745
6746 void RGWGetBucketMetaSearch::pre_exec()
6747 {
6748   rgw_bucket_object_pre_exec(s);
6749 }
6750
6751 int RGWDelBucketMetaSearch::verify_permission()
6752 {
6753   if (!s->auth.identity->is_owner_of(s->bucket_owner.get_id())) {
6754     return -EACCES;
6755   }
6756
6757   return 0;
6758 }
6759
6760 void RGWDelBucketMetaSearch::pre_exec()
6761 {
6762   rgw_bucket_object_pre_exec(s);
6763 }
6764
6765 void RGWDelBucketMetaSearch::execute()
6766 {
6767   s->bucket_info.mdsearch_config.clear();
6768
6769   op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
6770   if (op_ret < 0) {
6771     ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
6772     return;
6773   }
6774 }
6775
6776
6777 RGWHandler::~RGWHandler()
6778 {
6779 }
6780
6781 int RGWHandler::init(RGWRados *_store,
6782                      struct req_state *_s,
6783                      rgw::io::BasicClient *cio)
6784 {
6785   store = _store;
6786   s = _s;
6787
6788   return 0;
6789 }
6790
6791 int RGWHandler::do_init_permissions()
6792 {
6793   int ret = rgw_build_bucket_policies(store, s);
6794   s->env = rgw_build_iam_environment(store, s);
6795
6796   if (ret < 0) {
6797     ldout(s->cct, 10) << "read_permissions on " << s->bucket << " ret=" << ret << dendl;
6798     if (ret == -ENODATA)
6799       ret = -EACCES;
6800   }
6801
6802   return ret;
6803 }
6804
6805 int RGWHandler::do_read_permissions(RGWOp *op, bool only_bucket)
6806 {
6807   if (only_bucket) {
6808     /* already read bucket info */
6809     return 0;
6810   }
6811   int ret = rgw_build_object_policies(store, s, op->prefetch_data());
6812
6813   if (ret < 0) {
6814     ldout(s->cct, 10) << "read_permissions on " << s->bucket << ":"
6815                       << s->object << " only_bucket=" << only_bucket
6816                       << " ret=" << ret << dendl;
6817     if (ret == -ENODATA)
6818       ret = -EACCES;
6819   }
6820
6821   return ret;
6822 }
6823
6824 int RGWOp::error_handler(int err_no, string *error_content) {
6825   return dialect_handler->error_handler(err_no, error_content);
6826 }
6827
6828 int RGWHandler::error_handler(int err_no, string *error_content) {
6829   // This is the do-nothing error handler
6830   return err_no;
6831 }
6832
6833
6834 void RGWPutBucketPolicy::send_response()
6835 {
6836   if (op_ret) {
6837     set_req_state_err(s, op_ret);
6838   }
6839   dump_errno(s);
6840   end_header(s);
6841 }
6842
6843 int RGWPutBucketPolicy::verify_permission()
6844 {
6845   if (!verify_bucket_permission(s, rgw::IAM::s3PutBucketPolicy)) {
6846     return -EACCES;
6847   }
6848
6849   return 0;
6850 }
6851
6852 int RGWPutBucketPolicy::get_params()
6853 {
6854   const auto max_size = s->cct->_conf->rgw_max_put_param_size;
6855   // At some point when I have more time I want to make a version of
6856   // rgw_rest_read_all_input that doesn't use malloc.
6857   op_ret = rgw_rest_read_all_input(s, &data, &len, max_size, false);
6858   // And throws exceptions.
6859   return op_ret;
6860 }
6861
6862 void RGWPutBucketPolicy::execute()
6863 {
6864   op_ret = get_params();
6865   if (op_ret < 0) {
6866     return;
6867   }
6868
6869   bufferlist in_data = bufferlist::static_from_mem(data, len);
6870
6871   if (!store->is_meta_master()) {
6872     op_ret = forward_request_to_master(s, NULL, store, in_data, nullptr);
6873     if (op_ret < 0) {
6874       ldout(s->cct, 20) << "forward_request_to_master returned ret=" << op_ret << dendl;
6875       return;
6876     }
6877   }
6878
6879   try {
6880     Policy p(s->cct, s->bucket_tenant, in_data);
6881     auto attrs = s->bucket_attrs;
6882     attrs[RGW_ATTR_IAM_POLICY].clear();
6883     attrs[RGW_ATTR_IAM_POLICY].append(p.text);
6884     op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
6885                                   &s->bucket_info.objv_tracker);
6886     if (op_ret == -ECANCELED) {
6887       op_ret = 0; /* lost a race, but it's ok because policies are immutable */
6888     }
6889   } catch (rgw::IAM::PolicyParseException& e) {
6890     ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
6891     op_ret = -EINVAL;
6892   }
6893 }
6894
6895 void RGWGetBucketPolicy::send_response()
6896 {
6897   if (op_ret) {
6898     set_req_state_err(s, op_ret);
6899   }
6900   dump_errno(s);
6901   end_header(s, this, "application/json");
6902   dump_body(s, policy);
6903 }
6904
6905 int RGWGetBucketPolicy::verify_permission()
6906 {
6907   if (!verify_bucket_permission(s, rgw::IAM::s3GetBucketPolicy)) {
6908     return -EACCES;
6909   }
6910
6911   return 0;
6912 }
6913
6914 void RGWGetBucketPolicy::execute()
6915 {
6916   auto attrs = s->bucket_attrs;
6917   map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_IAM_POLICY);
6918   if (aiter == attrs.end()) {
6919     ldout(s->cct, 0) << __func__ << " can't find bucket IAM POLICY attr" 
6920                      << " bucket_name = " << s->bucket_name << dendl;
6921     op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
6922     s->err.message = "The bucket policy does not exist";
6923     return;
6924   } else {
6925     policy = attrs[RGW_ATTR_IAM_POLICY];
6926
6927     if (policy.length() == 0) {
6928       ldout(s->cct, 10) << "The bucket policy does not exist, bucket: " << s->bucket_name << dendl;
6929       op_ret = -ERR_NO_SUCH_BUCKET_POLICY;
6930       s->err.message = "The bucket policy does not exist";
6931       return;
6932     }
6933   } 
6934 }
6935
6936 void RGWDeleteBucketPolicy::send_response()
6937 {
6938   if (op_ret) {
6939     set_req_state_err(s, op_ret);
6940   }
6941   dump_errno(s);
6942   end_header(s);
6943 }
6944
6945 int RGWDeleteBucketPolicy::verify_permission()
6946 {
6947   if (!verify_bucket_permission(s, rgw::IAM::s3DeleteBucketPolicy)) {
6948     return -EACCES;
6949   }
6950
6951   return 0;
6952 }
6953
6954 void RGWDeleteBucketPolicy::execute()
6955 {
6956   auto attrs = s->bucket_attrs;
6957   attrs.erase(RGW_ATTR_IAM_POLICY);
6958   op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
6959                                 &s->bucket_info.objv_tracker);
6960   if (op_ret == -ECANCELED) {
6961     op_ret = 0; /* lost a race, but it's ok because policies are immutable */
6962   }
6963 }