Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_multi.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <string.h>
5
6 #include <iostream>
7 #include <map>
8
9 #include "include/types.h"
10
11 #include "rgw_xml.h"
12 #include "rgw_multi.h"
13 #include "rgw_op.h"
14
15 #define dout_subsys ceph_subsys_rgw
16
17 using namespace std;
18
19
20 bool RGWMultiPart::xml_end(const char *el)
21 {
22   RGWMultiPartNumber *num_obj = static_cast<RGWMultiPartNumber *>(find_first("PartNumber"));
23   RGWMultiETag *etag_obj = static_cast<RGWMultiETag *>(find_first("ETag"));
24
25   if (!num_obj || !etag_obj)
26     return false;
27
28   string s = num_obj->get_data();
29   if (s.empty())
30     return false;
31
32   num = atoi(s.c_str());
33
34   s = etag_obj->get_data();
35   etag = s;
36
37   return true;
38 }
39
40 bool RGWMultiCompleteUpload::xml_end(const char *el) {
41   XMLObjIter iter = find("Part");
42   RGWMultiPart *part = static_cast<RGWMultiPart *>(iter.get_next());
43   while (part) {
44     int num = part->get_num();
45     string etag = part->get_etag();
46     parts[num] = etag;
47     part = static_cast<RGWMultiPart *>(iter.get_next());
48   }
49   return true;
50 }
51
52
53 XMLObj *RGWMultiXMLParser::alloc_obj(const char *el) {
54   XMLObj *obj = NULL;
55   if (strcmp(el, "CompleteMultipartUpload") == 0 ||
56       strcmp(el, "MultipartUpload") == 0) {
57     obj = new RGWMultiCompleteUpload();
58   } else if (strcmp(el, "Part") == 0) {
59     obj = new RGWMultiPart();
60   } else if (strcmp(el, "PartNumber") == 0) {
61     obj = new RGWMultiPartNumber();
62   } else if (strcmp(el, "ETag") == 0) {
63     obj = new RGWMultiETag();
64   }
65
66   return obj;
67 }
68
69 bool is_v2_upload_id(const string& upload_id)
70 {
71   const char *uid = upload_id.c_str();
72
73   return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0) ||
74          (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
75 }
76
77 int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContext *cct,
78                                 const string& upload_id,
79                                 string& meta_oid, int num_parts,
80                                 int marker, map<uint32_t, RGWUploadPartInfo>& parts,
81                                 int *next_marker, bool *truncated,
82                                 bool assume_unsorted)
83 {
84   map<string, bufferlist> parts_map;
85   map<string, bufferlist>::iterator iter;
86   bufferlist header;
87
88   rgw_obj obj;
89   obj.init_ns(bucket_info.bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
90   obj.set_in_extra_data(true);
91
92   rgw_raw_obj raw_obj;
93   store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
94
95   bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
96
97   int ret;
98
99   parts.clear();
100
101   if (sorted_omap) {
102     string p;
103     p = "part.";
104     char buf[32];
105
106     snprintf(buf, sizeof(buf), "%08d", marker);
107     p.append(buf);
108
109     ret = store->omap_get_vals(raw_obj, header, p, num_parts + 1, parts_map);
110   } else {
111     ret = store->omap_get_all(raw_obj, header, parts_map);
112   }
113   if (ret < 0)
114     return ret;
115
116   int i;
117   int last_num = 0;
118
119   uint32_t expected_next = marker + 1;
120
121   for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
122     bufferlist& bl = iter->second;
123     bufferlist::iterator bli = bl.begin();
124     RGWUploadPartInfo info;
125     try {
126       ::decode(info, bli);
127     } catch (buffer::error& err) {
128       ldout(cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
129       return -EIO;
130     }
131     if (sorted_omap) {
132       if (info.num != expected_next) {
133         /* ouch, we expected a specific part num here, but we got a different one. Either
134          * a part is missing, or it could be a case of mixed rgw versions working on the same
135          * upload, where one gateway doesn't support correctly sorted omap keys for multipart
136          * upload just assume data is unsorted.
137          */
138         return list_multipart_parts(store, bucket_info, cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
139       }
140       expected_next++;
141     }
142     if (sorted_omap ||
143       (int)info.num > marker) {
144       parts[info.num] = info;
145       last_num = info.num;
146     }
147   }
148
149   if (sorted_omap) {
150     if (truncated)
151       *truncated = (iter != parts_map.end());
152   } else {
153     /* rebuild a map with only num_parts entries */
154
155     map<uint32_t, RGWUploadPartInfo> new_parts;
156     map<uint32_t, RGWUploadPartInfo>::iterator piter;
157
158     for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
159       new_parts[piter->first] = piter->second;
160       last_num = piter->first;
161     }
162
163     if (truncated)
164       *truncated = (piter != parts.end());
165
166     parts.swap(new_parts);
167   }
168
169   if (next_marker) {
170     *next_marker = last_num;
171   }
172
173   return 0;
174 }
175
176 int list_multipart_parts(RGWRados *store, struct req_state *s,
177                                 const string& upload_id,
178                                 string& meta_oid, int num_parts,
179                                 int marker, map<uint32_t, RGWUploadPartInfo>& parts,
180                                 int *next_marker, bool *truncated,
181                                 bool assume_unsorted)
182 {
183   return list_multipart_parts(store, s->bucket_info, s->cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, assume_unsorted);
184 }
185
186 int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info, RGWMPObj& mp_obj)
187 {
188   rgw_obj meta_obj;
189   meta_obj.init_ns(bucket_info.bucket, mp_obj.get_meta(), RGW_OBJ_NS_MULTIPART);
190   meta_obj.set_in_extra_data(true);
191   meta_obj.index_hash_source = mp_obj.get_key();
192   cls_rgw_obj_chain chain;
193   list<rgw_obj_index_key> remove_objs;
194   map<uint32_t, RGWUploadPartInfo> obj_parts;
195   bool truncated;
196   int marker = 0;
197   int ret;
198
199   do {
200     ret = list_multipart_parts(store, bucket_info, cct, mp_obj.get_upload_id(), mp_obj.get_meta(), 1000,
201       marker, obj_parts, &marker, &truncated);
202     if (ret < 0)
203       return ret;
204     for (auto obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
205       RGWUploadPartInfo& obj_part = obj_iter->second;
206       rgw_obj obj;
207       if (obj_part.manifest.empty()) {
208         string oid = mp_obj.get_part(obj_iter->second.num);
209         obj.init_ns(bucket_info.bucket, oid, RGW_OBJ_NS_MULTIPART);
210         obj.index_hash_source = mp_obj.get_key();
211         ret = store->delete_obj(*obj_ctx, bucket_info, obj, 0);
212         if (ret < 0 && ret != -ENOENT)
213           return ret;
214       } else {
215         store->update_gc_chain(meta_obj, obj_part.manifest, &chain);
216         RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin();
217         if (oiter != obj_part.manifest.obj_end()) {
218           rgw_obj head;
219           rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
220           rgw_raw_obj_to_obj(bucket_info.bucket, raw_head, &head);
221
222           rgw_obj_index_key key;
223           head.key.get_index_key(&key);
224           remove_objs.push_back(key);
225         }
226       }
227     }
228   } while (truncated);
229   /* use upload id as tag */
230   ret = store->send_chain_to_gc(chain, mp_obj.get_upload_id() , false);  // do it async
231   if (ret < 0) {
232     ldout(cct, 5) << "gc->send_chain() returned " << ret << dendl;
233     return ret;
234   }
235   RGWRados::Object del_target(store, bucket_info, *obj_ctx, meta_obj);
236   RGWRados::Object::Delete del_op(&del_target);
237
238   del_op.params.bucket_owner = bucket_info.owner;
239   del_op.params.versioning_status = 0;
240   if (!remove_objs.empty()) {
241     del_op.params.remove_objs = &remove_objs;
242   }
243
244   // and also remove the metadata obj
245   ret = del_op.delete_obj();
246   return ret == -ENOENT?-ERR_NO_SUCH_UPLOAD:ret;
247 }
248
249 int list_bucket_multiparts(RGWRados *store, RGWBucketInfo& bucket_info,
250                                 string& prefix, string& marker, string& delim,
251                                 int& max_uploads, vector<rgw_bucket_dir_entry> *objs,
252                                 map<string, bool> *common_prefixes, bool *is_truncated)
253 {
254   RGWRados::Bucket target(store, bucket_info);
255   RGWRados::Bucket::List list_op(&target);
256   MultipartMetaFilter mp_filter;
257
258   list_op.params.prefix = prefix;
259   list_op.params.delim = delim;
260   list_op.params.marker = marker;
261   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
262   list_op.params.filter = &mp_filter;
263
264   return(list_op.list_objects(max_uploads, objs, common_prefixes,is_truncated));
265 }
266
267 int abort_bucket_multiparts(RGWRados *store, CephContext *cct, RGWBucketInfo& bucket_info,
268                                 string& prefix, string& delim)
269 {
270   int ret, max = 1000, num_deleted = 0;
271   vector<rgw_bucket_dir_entry> objs;
272   RGWObjectCtx obj_ctx(store);
273   string marker;
274   map<string, bool> common_prefixes;
275   bool is_truncated;
276
277   do {
278     ret = list_bucket_multiparts(store, bucket_info, prefix, marker, delim,
279                                 max, &objs, &common_prefixes, &is_truncated);
280     if (ret < 0) {
281       return ret;
282     }
283     if (!objs.empty()) {
284       RGWMultipartUploadEntry entry;
285       for (const auto& obj : objs) {
286         rgw_obj_key key(obj.key);
287         if (!entry.mp.from_meta(key.name))
288           continue;
289         entry.obj = obj;
290         ret = abort_multipart_upload(store, cct, &obj_ctx, bucket_info, entry.mp);
291         if (ret < 0) {
292           return ret;
293         }
294         num_deleted++;
295       }
296       if (num_deleted) {
297         ldout(store->ctx(),0) << "WARNING : aborted " << num_deleted << " incomplete multipart uploads" << dendl;
298       }
299     }
300   } while (is_truncated);
301
302   return ret;
303 }