5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
20 const char* LC_STATUS[] = {
28 using namespace librados;
32 if (id.length() > MAX_ID_LEN) {
35 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
38 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
44 void RGWLifecycleConfiguration::add_rule(LCRule *rule)
47 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map.insert(pair<string, LCRule>(id, *rule));
51 bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
54 if (rule->get_status().compare("Enabled") == 0) {
57 if (rule->get_expiration().has_days()) {
58 op.expiration = rule->get_expiration().get_days();
60 if (rule->get_expiration().has_date()) {
61 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
63 if (rule->get_noncur_expiration().has_days()) {
64 op.noncur_expiration = rule->get_noncur_expiration().get_days();
66 if (rule->get_mp_expiration().has_days()) {
67 op.mp_expiration = rule->get_mp_expiration().get_days();
69 op.dm_expiration = rule->get_dm_expiration();
72 if (rule->get_filter().has_prefix()){
73 prefix = rule->get_filter().get_prefix();
75 prefix = rule->get_prefix();
77 auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
81 int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
88 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
91 rule_map.insert(pair<string, LCRule>(id, *rule));
93 if (!_add_rule(rule)) {
94 return -ERR_INVALID_REQUEST;
99 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
100 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
101 (second.expiration > 0 || second.expiration_date != boost::none)) {
103 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
105 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
112 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113 //define same action.
114 bool RGWLifecycleConfiguration::valid()
116 if (prefix_map.size() < 2) {
119 auto cur_iter = prefix_map.begin();
120 while (cur_iter != prefix_map.end()) {
121 auto next_iter = cur_iter;
123 while (next_iter != prefix_map.end()) {
124 string c_pre = cur_iter->first;
125 string n_pre = next_iter->first;
126 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
127 if (has_same_action(cur_iter->second, next_iter->second)) {
141 void *RGWLC::LCWorker::entry() {
143 utime_t start = ceph_clock_now();
144 if (should_work(start)) {
145 dout(5) << "life cycle: start" << dendl;
146 int r = lc->process();
148 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
150 dout(5) << "life cycle: stop" << dendl;
152 if (lc->going_down())
155 utime_t end = ceph_clock_now();
156 int secs = schedule_next_start_time(start, end);
158 next.set_from_double(end + secs);
160 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
163 cond.WaitInterval(lock, utime_t(secs, 0));
165 } while (!lc->going_down());
170 void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
173 max_objs = cct->_conf->rgw_lc_max_objs;
174 if (max_objs > HASH_PRIME)
175 max_objs = HASH_PRIME;
177 obj_names = new string[max_objs];
179 for (int i = 0; i < max_objs; i++) {
180 obj_names[i] = lc_oid_prefix;
182 snprintf(buf, 32, ".%d", i);
183 obj_names[i].append(buf);
186 #define COOKIE_LEN 16
187 char cookie_buf[COOKIE_LEN + 1];
188 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
192 void RGWLC::finalize()
197 bool RGWLC::if_already_run_today(time_t& start_date)
201 utime_t now = ceph_clock_now();
202 localtime_r(&start_date, &bdt);
204 if (cct->_conf->rgw_lc_debug_interval > 0) {
205 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
214 begin_of_day = mktime(&bdt);
215 if (now - begin_of_day < 24*60*60)
221 int RGWLC::bucket_lc_prepare(int index)
223 map<string, int > entries;
227 #define MAX_LC_LIST_ENTRIES 100
229 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
232 map<string, int>::iterator iter;
233 for (iter = entries.begin(); iter != entries.end(); ++iter) {
234 pair<string, int > entry(iter->first, lc_uninitial);
235 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
237 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
240 marker = iter->first;
242 } while (!entries.empty());
247 bool RGWLC::obj_has_expired(double timediff, int days)
250 if (cct->_conf->rgw_lc_debug_interval <= 0) {
251 /* Normal case, run properly */
254 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
255 cmp = days*cct->_conf->rgw_lc_debug_interval;
258 return (timediff >= cmp);
261 int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
264 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
266 obj_key.instance.clear();
267 RGWObjectCtx rctx(store);
268 rgw_obj obj(bucket_info.bucket, obj_key);
269 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
273 int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
275 MultipartMetaFilter mp_filter;
276 vector<rgw_bucket_dir_entry> objs;
280 RGWBucketInfo& bucket_info = target->get_bucket_info();
281 RGWRados::Bucket::List list_op(target);
282 list_op.params.list_versions = false;
283 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
284 list_op.params.filter = &mp_filter;
285 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
286 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
289 list_op.params.prefix = prefix_iter->first;
292 list_op.params.marker = list_op.get_next_marker();
293 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
295 if (ret == (-ENOENT))
297 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
301 utime_t now = ceph_clock_now();
302 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
303 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
304 rgw_obj_key key(obj_iter->key);
305 if (!mp_obj.from_meta(key.name)) {
308 RGWObjectCtx rctx(store);
309 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
310 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
311 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
316 } while(is_truncated);
321 int RGWLC::bucket_lc_process(string& shard_id)
323 RGWLifecycleConfiguration config(cct);
324 RGWBucketInfo bucket_info;
325 map<string, bufferlist> bucket_attrs;
326 string next_marker, no_ns, list_versions;
328 vector<rgw_bucket_dir_entry> objs;
329 RGWObjectCtx obj_ctx(store);
330 vector<std::string> result;
331 boost::split(result, shard_id, boost::is_any_of(":"));
332 string bucket_tenant = result[0];
333 string bucket_name = result[1];
334 string bucket_id = result[2];
335 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
337 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
341 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
343 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
347 RGWRados::Bucket target(store, bucket_info);
348 RGWRados::Bucket::List list_op(&target);
350 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
351 if (aiter == bucket_attrs.end())
354 bufferlist::iterator iter(&aiter->second);
357 } catch (const buffer::error& e) {
358 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
362 map<string, lc_op>& prefix_map = config.get_prefix_map();
363 list_op.params.list_versions = bucket_info.versioned();
364 if (!bucket_info.versioned()) {
365 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
366 if (!prefix_iter->second.status ||
367 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
370 if (prefix_iter->second.expiration_date != boost::none &&
371 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
374 list_op.params.prefix = prefix_iter->first;
377 list_op.params.marker = list_op.get_next_marker();
378 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
381 if (ret == (-ENOENT))
383 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
387 utime_t now = ceph_clock_now();
389 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
390 rgw_obj_key key(obj_iter->key);
392 if (!key.ns.empty()) {
395 if (prefix_iter->second.expiration_date != boost::none) {
396 //we have checked it before
399 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
402 RGWObjectCtx rctx(store);
403 rgw_obj obj(bucket_info.bucket, key);
405 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
409 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
411 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
413 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
415 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
419 } while (is_truncated);
422 //bucket versioning is enabled or suspended
423 rgw_obj_key pre_marker;
424 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
425 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
426 && prefix_iter->second.expiration_date == boost::none
427 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
430 if (prefix_iter != prefix_map.begin() &&
431 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
432 list_op.next_marker = pre_marker;
434 pre_marker = list_op.get_next_marker();
436 list_op.params.prefix = prefix_iter->first;
437 rgw_bucket_dir_entry pre_obj;
440 pre_obj = objs.back();
443 list_op.params.marker = list_op.get_next_marker();
444 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
447 if (ret == (-ENOENT))
449 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
453 utime_t now = ceph_clock_now();
454 ceph::real_time mtime;
455 bool remove_indeed = true;
457 bool skip_expiration;
459 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
460 skip_expiration = false;
462 if (obj_iter->is_current()) {
463 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
464 && !prefix_iter->second.dm_expiration) {
467 if (obj_iter->is_delete_marker()) {
468 if ((obj_iter + 1)==objs.end()) {
470 //deal with it in next round because we can't judge whether this marker is the only version
471 list_op.next_marker = obj_iter->key;
474 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
477 skip_expiration = prefix_iter->second.dm_expiration;
478 remove_indeed = true; //we should remove the delete marker if it's the only version
480 remove_indeed = false;
482 mtime = obj_iter->meta.mtime;
483 expiration = prefix_iter->second.expiration;
484 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
486 } else if (!skip_expiration) {
487 if (expiration > 0) {
488 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
490 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
494 if (prefix_iter->second.noncur_expiration <=0) {
497 remove_indeed = true;
498 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
499 expiration = prefix_iter->second.noncur_expiration;
500 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
502 if (skip_expiration || is_expired) {
503 if (obj_iter->is_visible()) {
504 RGWObjectCtx rctx(store);
505 rgw_obj obj(bucket_info.bucket, obj_iter->key);
507 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
511 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
514 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
516 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
518 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
522 } while (is_truncated);
526 ret = handle_multipart_expiration(&target, prefix_map);
531 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
533 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
535 rados::cls::lock::Lock l(lc_index_lock_name);
536 l.set_cookie(cookie);
537 l.set_duration(lock_duration);
540 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
541 if (ret == -EBUSY) { /* already locked by another lc processor */
542 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
548 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
549 if (result == -ENOENT) {
550 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
552 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
555 } else if (result < 0) {
556 entry.second = lc_failed;
558 entry.second = lc_complete;
561 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
563 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
566 l.unlock(&store->lc_pool_ctx, obj_names[index]);
567 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
572 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
575 progress_map->clear();
576 for(; index <max_objs; index++) {
577 map<string, int > entries;
578 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
580 if (ret == -ENOENT) {
581 dout(10) << __func__ << " ignoring unfound lc object="
582 << obj_names[index] << dendl;
588 map<string, int>::iterator iter;
589 for (iter = entries.begin(); iter != entries.end(); ++iter) {
590 progress_map->insert(*iter);
598 int max_secs = cct->_conf->rgw_lc_lock_max_time;
601 int ret = get_random_bytes((char *)&start, sizeof(start));
605 for (int i = 0; i < max_objs; i++) {
606 int index = (i + start) % max_objs;
607 ret = process(index, max_secs);
615 int RGWLC::process(int index, int max_lock_secs)
617 rados::cls::lock::Lock l(lc_index_lock_name);
619 utime_t now = ceph_clock_now();
620 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
621 if (max_lock_secs <= 0)
624 utime_t time(max_lock_secs, 0);
625 l.set_duration(time);
627 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
628 if (ret == -EBUSY) { /* already locked by another lc processor */
629 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
637 cls_rgw_lc_obj_head head;
638 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
640 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
644 if(!if_already_run_today(head.start_date)) {
645 head.start_date = now;
647 ret = bucket_lc_prepare(index);
649 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
654 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
656 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
660 if (entry.first.empty())
663 entry.second = lc_processing;
664 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
666 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
670 head.marker = entry.first;
671 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
673 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
676 l.unlock(&store->lc_pool_ctx, obj_names[index]);
677 ret = bucket_lc_process(entry.first);
678 bucket_lc_post(index, max_lock_secs, entry, ret);
682 l.unlock(&store->lc_pool_ctx, obj_names[index]);
686 void RGWLC::start_processor()
688 worker = new LCWorker(cct, this);
689 worker->create("lifecycle_thr");
692 void RGWLC::stop_processor()
703 void RGWLC::LCWorker::stop()
705 Mutex::Locker l(lock);
709 bool RGWLC::going_down()
714 bool RGWLC::LCWorker::should_work(utime_t& now)
720 string worktime = cct->_conf->rgw_lifecycle_work_time;
721 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
723 time_t tt = now.sec();
724 localtime_r(&tt, &bdt);
726 if (cct->_conf->rgw_lc_debug_interval > 0) {
727 /* We're debugging, so say we can run */
729 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
730 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
738 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
740 if (cct->_conf->rgw_lc_debug_interval > 0) {
741 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
751 string worktime = cct->_conf->rgw_lifecycle_work_time;
752 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
754 time_t tt = now.sec();
756 localtime_r(&tt, &bdt);
757 bdt.tm_hour = start_hour;
758 bdt.tm_min = start_minute;
762 return (nt+24*60*60 - tt);