Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_reshard.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_rados.h"
5 #include "rgw_bucket.h"
6 #include "rgw_reshard.h"
7 #include "cls/rgw/cls_rgw_client.h"
8 #include "cls/lock/cls_lock_client.h"
9 #include "common/errno.h"
10 #include "common/ceph_json.h"
11
12 #include "common/dout.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
16
17 const string reshard_oid_prefix = "reshard.";
18 const string reshard_lock_name = "reshard_process";
19 const string bucket_instance_lock_name = "bucket_instance_lock";
20
21 using namespace std;
22
23 #define RESHARD_SHARD_WINDOW 64
24 #define RESHARD_MAX_AIO 128
25
26 class BucketReshardShard {
27   RGWRados *store;
28   const RGWBucketInfo& bucket_info;
29   int num_shard;
30   RGWRados::BucketShard bs;
31   vector<rgw_cls_bi_entry> entries;
32   map<uint8_t, rgw_bucket_category_stats> stats;
33   deque<librados::AioCompletion *>& aio_completions;
34
35   int wait_next_completion() {
36     librados::AioCompletion *c = aio_completions.front();
37     aio_completions.pop_front();
38
39     c->wait_for_safe();
40
41     int ret = c->get_return_value();
42     c->release();
43
44     if (ret < 0) {
45       derr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << dendl;
46       return ret;
47     }
48
49     return 0;
50   }
51
52   int get_completion(librados::AioCompletion **c) {
53     if (aio_completions.size() >= RESHARD_MAX_AIO) {
54       int ret = wait_next_completion();
55       if (ret < 0) {
56         return ret;
57       }
58     }
59
60     *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
61     aio_completions.push_back(*c);
62
63     return 0;
64   }
65
66 public:
67   BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
68                      int _num_shard,
69                      deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
70                                                                        aio_completions(_completions) {
71     num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
72     bs.init(bucket_info.bucket, num_shard);
73   }
74
75   int get_num_shard() {
76     return num_shard;
77   }
78
79   int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
80                 const rgw_bucket_category_stats& entry_stats) {
81     entries.push_back(entry);
82     if (account) {
83       rgw_bucket_category_stats& target = stats[category];
84       target.num_entries += entry_stats.num_entries;
85       target.total_size += entry_stats.total_size;
86       target.total_size_rounded += entry_stats.total_size_rounded;
87     }
88     if (entries.size() >= RESHARD_SHARD_WINDOW) {
89       int ret = flush();
90       if (ret < 0) {
91         return ret;
92       }
93     }
94     return 0;
95   }
96   int flush() {
97     if (entries.size() == 0) {
98       return 0;
99     }
100
101     librados::ObjectWriteOperation op;
102     for (auto& entry : entries) {
103       store->bi_put(op, bs, entry);
104     }
105     cls_rgw_bucket_update_stats(op, false, stats);
106
107     librados::AioCompletion *c;
108     int ret = get_completion(&c);
109     if (ret < 0) {
110       return ret;
111     }
112     ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
113     if (ret < 0) {
114       derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
115       return ret;
116     }
117     entries.clear();
118     stats.clear();
119     return 0;
120   }
121
122   int wait_all_aio() {
123     int ret = 0;
124     while (!aio_completions.empty()) {
125       int r = wait_next_completion();
126       if (r < 0) {
127         ret = r;
128       }
129     }
130     return ret;
131   }
132 };
133
134 class BucketReshardManager {
135   RGWRados *store;
136   const RGWBucketInfo& target_bucket_info;
137   deque<librados::AioCompletion *> completions;
138   int num_target_shards;
139   vector<BucketReshardShard *> target_shards;
140
141 public:
142   BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
143                                                                                                        num_target_shards(_num_target_shards) {
144     target_shards.resize(num_target_shards);
145     for (int i = 0; i < num_target_shards; ++i) {
146       target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
147     }
148   }
149
150   ~BucketReshardManager() {
151     for (auto& shard : target_shards) {
152       int ret = shard->wait_all_aio();
153       if (ret < 0) {
154         ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
155       }
156     }
157   }
158
159   int add_entry(int shard_index,
160                 rgw_cls_bi_entry& entry, bool account, uint8_t category,
161                 const rgw_bucket_category_stats& entry_stats) {
162     int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
163     if (ret < 0) {
164       derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
165       return ret;
166     }
167     return 0;
168   }
169
170   int finish() {
171     int ret = 0;
172     for (auto& shard : target_shards) {
173       int r = shard->flush();
174       if (r < 0) {
175         derr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
176         ret = r;
177       }
178     }
179     for (auto& shard : target_shards) {
180       int r = shard->wait_all_aio();
181       if (r < 0) {
182         derr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << dendl;
183         ret = r;
184       }
185       delete shard;
186     }
187     target_shards.clear();
188     return ret;
189   }
190 };
191
192 RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
193                                                      store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
194                                                      reshard_lock(reshard_lock_name) {
195   const rgw_bucket& b = bucket_info.bucket;
196   reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
197
198   utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
199 #define COOKIE_LEN 16
200   char cookie_buf[COOKIE_LEN + 1];
201   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
202   cookie_buf[COOKIE_LEN] = '\0';
203
204   reshard_lock.set_cookie(cookie_buf);
205   reshard_lock.set_duration(lock_duration);
206 }
207
208 int RGWBucketReshard::lock_bucket()
209 {
210   int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
211   if (ret < 0) {
212     ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
213     return ret;
214   }
215   return 0;
216 }
217
218 void RGWBucketReshard::unlock_bucket()
219 {
220   int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
221   if (ret < 0) {
222     ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
223   }
224 }
225
226 int RGWBucketReshard::set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status)
227 {
228   if (new_instance_id.empty()) {
229     ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
230     return -EINVAL;
231   }
232
233   cls_rgw_bucket_instance_entry instance_entry;
234   instance_entry.set_status(new_instance_id, num_shards, status);
235
236   int ret = store->bucket_set_reshard(bucket_info, instance_entry);
237   if (ret < 0) {
238     ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
239                   << cpp_strerror(-ret) << dendl;
240     return ret;
241   }
242   return 0;
243 }
244
245 int RGWBucketReshard::clear_resharding()
246 {
247   cls_rgw_bucket_instance_entry instance_entry;
248
249   int ret = store->bucket_set_reshard(bucket_info, instance_entry);
250   if (ret < 0) {
251     ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
252                   << cpp_strerror(-ret) << dendl;
253     return ret;
254   }
255   return 0;
256 }
257
258 static int create_new_bucket_instance(RGWRados *store,
259                                       int new_num_shards,
260                                       const RGWBucketInfo& bucket_info,
261                                       map<string, bufferlist>& attrs,
262                                       RGWBucketInfo& new_bucket_info)
263 {
264   new_bucket_info = bucket_info;
265
266   store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
267   new_bucket_info.bucket.oid.clear();
268
269   new_bucket_info.num_shards = new_num_shards;
270   new_bucket_info.objv_tracker.clear();
271
272   new_bucket_info.new_bucket_instance_id.clear();
273   new_bucket_info.reshard_status = 0;
274
275   int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
276   if (ret < 0) {
277     cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
278     return -ret;
279   }
280
281   ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
282   if (ret < 0) {
283     cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
284     return -ret;
285   }
286
287   return 0;
288 }
289
290 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
291                                                  RGWBucketInfo& new_bucket_info)
292 {
293   return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info);
294 }
295
296 class BucketInfoReshardUpdate
297 {
298   RGWRados *store;
299   RGWBucketInfo bucket_info;
300   std::map<string, bufferlist> bucket_attrs;
301
302   bool in_progress{false};
303
304   int set_status(cls_rgw_reshard_status s) {
305     bucket_info.reshard_status = s;
306     int ret = store->put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs);
307     if (ret < 0) {
308       ldout(store->ctx(), 0) << "ERROR: failed to write bucket info, ret=" << ret << dendl;
309       return ret;
310     }
311     return 0;
312   }
313
314 public:
315   BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
316                           map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store), 
317                                                                                                  bucket_info(_bucket_info),
318                                                                                                  bucket_attrs(_bucket_attrs) {
319     bucket_info.new_bucket_instance_id = new_bucket_id;
320   }
321   ~BucketInfoReshardUpdate() {
322     if (in_progress) {
323       bucket_info.new_bucket_instance_id.clear();
324       set_status(CLS_RGW_RESHARD_NONE);
325     }
326   }
327
328   int start() {
329     int ret = set_status(CLS_RGW_RESHARD_IN_PROGRESS);
330     if (ret < 0) {
331       return ret;
332     }
333     in_progress = true;
334     return 0;
335   }
336
337   int complete() {
338     int ret = set_status(CLS_RGW_RESHARD_DONE);
339     if (ret < 0) {
340       return ret;
341     }
342     in_progress = false;
343     return 0;
344   }
345 };
346
347 int RGWBucketReshard::do_reshard(
348                    int num_shards,
349                    const RGWBucketInfo& new_bucket_info,
350                    int max_entries,
351                    bool verbose,
352                    ostream *out,
353                    Formatter *formatter)
354 {
355   rgw_bucket& bucket = bucket_info.bucket;
356
357   int ret = 0;
358
359   if (out) {
360     (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
361     (*out) << "***         these will need to be removed manually             ***" << std::endl;
362     (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
363     (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
364     (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
365     (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
366   }
367
368   /* update bucket info  -- in progress*/
369   list<rgw_cls_bi_entry> entries;
370
371   if (max_entries < 0) {
372     ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
373     return -EINVAL;
374   }
375
376   BucketInfoReshardUpdate bucket_info_updater(store, bucket_info, bucket_attrs, new_bucket_info.bucket.bucket_id);
377
378   ret = bucket_info_updater.start();
379   if (ret < 0) {
380     ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
381     return ret;
382   }
383
384   int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
385
386   BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
387
388   verbose = verbose && (formatter != nullptr);
389
390   if (verbose) {
391     formatter->open_array_section("entries");
392   }
393
394   uint64_t total_entries = 0;
395
396   if (!verbose) {
397     cout << "total entries:";
398   }
399
400   int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
401   string marker;
402   for (int i = 0; i < num_source_shards; ++i) {
403     bool is_truncated = true;
404     marker.clear();
405     while (is_truncated) {
406       entries.clear();
407       ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
408       if (ret < 0 && ret != -ENOENT) {
409         derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
410         return -ret;
411       }
412
413       list<rgw_cls_bi_entry>::iterator iter;
414       for (iter = entries.begin(); iter != entries.end(); ++iter) {
415         rgw_cls_bi_entry& entry = *iter;
416         if (verbose) {
417           formatter->open_object_section("entry");
418
419           encode_json("shard_id", i, formatter);
420           encode_json("num_entry", total_entries, formatter);
421           encode_json("entry", entry, formatter);
422         }
423         total_entries++;
424
425         marker = entry.idx;
426
427         int target_shard_id;
428         cls_rgw_obj_key cls_key;
429         uint8_t category;
430         rgw_bucket_category_stats stats;
431         bool account = entry.get_info(&cls_key, &category, &stats);
432         rgw_obj_key key(cls_key);
433         rgw_obj obj(new_bucket_info.bucket, key);
434         int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
435         if (ret < 0) {
436           lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
437           return ret;
438         }
439
440         int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
441
442         ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
443         if (ret < 0) {
444           return ret;
445         }
446         if (verbose) {
447           formatter->close_section();
448           if (out) {
449             formatter->flush(*out);
450             formatter->flush(*out);
451           }
452         } else if (out && !(total_entries % 1000)) {
453           (*out) << " " << total_entries;
454         }
455       }
456     }
457   }
458   if (verbose) {
459     formatter->close_section();
460     if (out) {
461       formatter->flush(*out);
462     }
463   } else if (out) {
464     (*out) << " " << total_entries << std::endl;
465   }
466
467   ret = target_shards_mgr.finish();
468   if (ret < 0) {
469     lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
470     return EIO;
471   }
472
473   RGWBucketAdminOpState bucket_op;
474
475   bucket_op.set_bucket_name(new_bucket_info.bucket.name);
476   bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
477   bucket_op.set_user_id(new_bucket_info.owner);
478   string err;
479   int r = RGWBucketAdminOp::link(store, bucket_op, &err);
480   if (r < 0) {
481     lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << ")" << dendl;
482     return -r;
483   }
484
485   ret = bucket_info_updater.complete();
486   if (ret < 0) {
487     ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
488     /* don't error out, reshard process succeeded */
489   }
490   return 0;
491 }
492
493 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
494 {
495   librados::IoCtx index_ctx;
496   map<int, string> bucket_objs;
497
498   int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
499   if (r < 0) {
500     return r;
501   }
502
503   for (auto i : bucket_objs) {
504     cls_rgw_bucket_instance_entry entry;
505
506     int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
507     if (ret < 0 && ret != -ENOENT) {
508       lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
509       return ret;
510     }
511
512     status->push_back(entry);
513   }
514
515   return 0;
516 }
517
518 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
519                               bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
520
521 {
522   int ret = lock_bucket();
523   if (ret < 0) {
524     return ret;
525   }
526
527   RGWBucketInfo new_bucket_info;
528   ret = create_new_bucket_instance(num_shards, new_bucket_info);
529   if (ret < 0) {
530     unlock_bucket();
531     return ret;
532   }
533
534   if (reshard_log) {
535     ret = reshard_log->update(bucket_info, new_bucket_info);
536     if (ret < 0) {
537       unlock_bucket();
538       return ret;
539     }
540   }
541
542   ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
543   if (ret < 0) {
544     unlock_bucket();
545     return ret;
546   }
547
548   ret = do_reshard(num_shards,
549                    new_bucket_info,
550                    max_op_entries,
551                    verbose, out, formatter);
552
553   if (ret < 0) {
554     unlock_bucket();
555     return ret;
556   }
557
558   ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE);
559   if (ret < 0) {
560     unlock_bucket();
561     return ret;
562   }
563
564   unlock_bucket();
565
566   return 0;
567 }
568
569
570 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
571                        Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
572                                                 verbose(_verbose), out(_out), formatter(_formatter)
573 {
574   num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
575 }
576
577 string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
578 {
579   return tenant + ":" + bucket_name;
580 }
581
582 #define MAX_RESHARD_LOGSHARDS_PRIME 7877
583
584 void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
585 {
586   string key = get_logshard_key(tenant, bucket_name);
587
588   uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
589   uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
590   sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
591   int logshard = sid % num_logshards;
592
593   get_logshard_oid(logshard, oid);
594 }
595
596 int RGWReshard::add(cls_rgw_reshard_entry& entry)
597 {
598   if (!store->can_reshard()) {
599     ldout(store->ctx(), 20) << __func__ << " Resharding is disabled"  << dendl;
600     return 0;
601   }
602
603   string logshard_oid;
604
605   get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
606
607   librados::ObjectWriteOperation op;
608   cls_rgw_reshard_add(op, entry);
609
610   int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
611   if (ret < 0) {
612     lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
613     return ret;
614   }
615   return 0;
616 }
617
618 int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
619 {
620   cls_rgw_reshard_entry entry;
621   entry.bucket_name = bucket_info.bucket.name;
622   entry.bucket_id = bucket_info.bucket.bucket_id;
623
624   int ret = get(entry);
625   if (ret < 0) {
626     return ret;
627   }
628
629   entry.new_instance_id = new_bucket_info.bucket.name + ":"  + new_bucket_info.bucket.bucket_id;
630
631   ret = add(entry);
632   if (ret < 0) {
633     ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
634       cpp_strerror(-ret) << dendl;
635   }
636
637   return ret;
638 }
639
640
641 int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
642 {
643   string logshard_oid;
644
645   get_logshard_oid(logshard_num, &logshard_oid);
646
647   int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
648
649   if (ret < 0) {
650     if (ret == -ENOENT) {
651       *is_truncated = false;
652       ret = 0;
653     }
654     lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
655     if (ret == -EACCES) {
656       lderr(store->ctx()) << "access denied to pool " << store->get_zone_params().reshard_pool
657                           << ". Fix the pool access permissions of your client" << dendl;
658     }
659   }
660
661   return ret;
662 }
663
664 int RGWReshard::get(cls_rgw_reshard_entry& entry)
665 {
666   string logshard_oid;
667
668   get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
669
670   int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
671   if (ret < 0) {
672     lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
673     return ret;
674   }
675
676   return 0;
677 }
678
679 int RGWReshard::remove(cls_rgw_reshard_entry& entry)
680 {
681   string logshard_oid;
682
683   get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
684
685   librados::ObjectWriteOperation op;
686   cls_rgw_reshard_remove(op, entry);
687
688   int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
689   if (ret < 0) {
690     lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
691     return ret;
692   }
693
694   return ret;
695 }
696
697 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
698 {
699   int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
700   if (ret < 0) {
701     lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
702     return ret;
703   }
704
705   return 0;
706 }
707
708 const int num_retries = 10;
709 const int default_reshard_sleep_duration = 5;
710
711 int RGWReshardWait::do_wait()
712 {
713   Mutex::Locker l(lock);
714
715   cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
716
717   if (going_down) {
718     return -ECANCELED;
719   }
720
721   return 0;
722 }
723
724 int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
725 {
726   int ret = 0;
727   cls_rgw_bucket_instance_entry entry;
728
729   for (int i=0; i < num_retries;i++) {
730     ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
731     if (ret < 0) {
732       ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :"  <<
733         cpp_strerror(-ret)<< dendl;
734       return ret;
735     }
736     if (!entry.resharding_in_progress()) {
737       *new_bucket_id = entry.new_bucket_instance_id;
738       return 0;
739     }
740     ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
741     /* needed to unlock as clear resharding uses the same lock */
742
743     if (i == num_retries - 1) {
744       break;
745     }
746
747     ret = do_wait();
748     if (ret < 0) {
749       ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
750       return ret;
751     }
752   }
753   ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
754   return -ERR_BUSY_RESHARDING;
755 }
756
757 int RGWReshard::process_single_logshard(int logshard_num)
758 {
759   string marker;
760   bool truncated = true;
761
762   CephContext *cct = store->ctx();
763   int max_entries = 1000;
764   int max_secs = 60;
765
766   rados::cls::lock::Lock l(reshard_lock_name);
767
768   utime_t time(max_secs, 0);
769   l.set_duration(time);
770
771   char cookie_buf[COOKIE_LEN + 1];
772   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
773   cookie_buf[COOKIE_LEN] = '\0';
774
775   l.set_cookie(cookie_buf);
776
777   string logshard_oid;
778   get_logshard_oid(logshard_num, &logshard_oid);
779
780   int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
781   if (ret == -EBUSY) { /* already locked by another processor */
782     ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
783     return ret;
784   }
785
786   utime_t lock_start_time = ceph_clock_now();
787
788   do {
789     std::list<cls_rgw_reshard_entry> entries;
790     ret = list(logshard_num, marker, max_entries, entries, &truncated);
791     if (ret < 0) {
792       ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
793       continue;
794     }
795
796     for(auto& entry: entries) {
797       if(entry.new_instance_id.empty()) {
798
799         ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name  << dendl;
800
801         RGWObjectCtx obj_ctx(store);
802         rgw_bucket bucket;
803         RGWBucketInfo bucket_info;
804         map<string, bufferlist> attrs;
805
806         ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
807                                    &attrs);
808         if (ret < 0) {
809           ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
810           return -ret;
811         }
812
813         RGWBucketReshard br(store, bucket_info, attrs);
814
815         Formatter* formatter = new JSONFormatter(false);
816         auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
817         ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
818         if (ret < 0) {
819           ldout (store->ctx(), 0) <<  __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
820             cpp_strerror(-ret)<< dendl;
821           return ret;
822         }
823
824         ldout (store->ctx(), 20) <<  " removing entry" << entry.bucket_name<< dendl;
825
826         ret = remove(entry);
827         if (ret < 0) {
828           ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
829                        << cpp_strerror(-ret) << dendl;
830           return ret;
831         }
832       }
833       utime_t now = ceph_clock_now();
834
835       if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
836         l.set_renew(true);
837         ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
838         if (ret == -EBUSY) { /* already locked by another processor */
839           ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
840           return ret;
841         }
842         lock_start_time = now;
843       }
844       entry.get_key(&marker);
845     }
846   } while (truncated);
847
848   l.unlock(&store->reshard_pool_ctx, logshard_oid);
849   return 0;
850 }
851
852
853 void  RGWReshard::get_logshard_oid(int shard_num, string *logshard)
854 {
855   char buf[32];
856   snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
857
858   string objname(reshard_oid_prefix);
859   *logshard =  objname + buf;
860 }
861
862 int RGWReshard::process_all_logshards()
863 {
864   if (!store->can_reshard()) {
865     ldout(store->ctx(), 20) << __func__ << " Resharding is disabled"  << dendl;
866     return 0;
867   }
868   int ret = 0;
869
870   for (int i = 0; i < num_logshards; i++) {
871     string logshard;
872     get_logshard_oid(i, &logshard);
873
874     ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
875
876     ret = process_single_logshard(i);
877     if (ret <0) {
878       return ret;
879     }
880   }
881
882   return 0;
883 }
884
885 bool RGWReshard::going_down()
886 {
887   return down_flag;
888 }
889
890 void RGWReshard::start_processor()
891 {
892   worker = new ReshardWorker(store->ctx(), this);
893   worker->create("rgw_reshard");
894 }
895
896 void RGWReshard::stop_processor()
897 {
898   down_flag = true;
899   if (worker) {
900     worker->stop();
901     worker->join();
902   }
903   delete worker;
904   worker = nullptr;
905 }
906
907 void *RGWReshard::ReshardWorker::entry() {
908   utime_t last_run;
909   do {
910     utime_t start = ceph_clock_now();
911     if (reshard->process_all_logshards()) {
912       /* All shards have been processed properly. Next time we can start
913        * from this moment. */
914       last_run = start;
915     }
916
917     if (reshard->going_down())
918       break;
919
920     utime_t end = ceph_clock_now();
921     end -= start;
922     int secs = cct->_conf->rgw_reshard_thread_interval;
923
924     if (secs <= end.sec())
925       continue; // next round
926
927     secs -= end.sec();
928
929     lock.Lock();
930     cond.WaitInterval(lock, utime_t(secs, 0));
931     lock.Unlock();
932   } while (!reshard->going_down());
933
934   return NULL;
935 }
936
937 void RGWReshard::ReshardWorker::stop()
938 {
939   Mutex::Locker l(lock);
940   cond.Signal();
941 }