X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_lc.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_lc.cc;h=cdcfaffff77cc7345352f5ff09f807e3f0a2755a;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_lc.cc b/src/ceph/src/rgw/rgw_lc.cc new file mode 100644 index 0000000..cdcfaff --- /dev/null +++ b/src/ceph/src/rgw/rgw_lc.cc @@ -0,0 +1,764 @@ +#include +#include +#include + +#include +#include + +#include "common/Formatter.h" +#include +#include "auth/Crypto.h" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/lock/cls_lock_client.h" +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_lc.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +const char* LC_STATUS[] = { + "UNINITIAL", + "PROCESSING", + "FAILED", + "COMPLETE" +}; + +using namespace std; +using namespace librados; + +bool LCRule::valid() +{ + if (id.length() > MAX_ID_LEN) { + return false; + } + else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) { + return false; + } + else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) { + return false; + } + return true; +} + +void RGWLifecycleConfiguration::add_rule(LCRule *rule) +{ + string id; + rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups + rule_map.insert(pair(id, *rule)); +} + +bool RGWLifecycleConfiguration::_add_rule(LCRule *rule) +{ + lc_op op; + if (rule->get_status().compare("Enabled") == 0) { + op.status = true; + } + if (rule->get_expiration().has_days()) { + op.expiration = rule->get_expiration().get_days(); + } + if (rule->get_expiration().has_date()) { + op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date()); + } + if (rule->get_noncur_expiration().has_days()) { + op.noncur_expiration = rule->get_noncur_expiration().get_days(); + } + if (rule->get_mp_expiration().has_days()) { + op.mp_expiration = rule->get_mp_expiration().get_days(); + } + op.dm_expiration = rule->get_dm_expiration(); + + std::string prefix; + if (rule->get_filter().has_prefix()){ + prefix = rule->get_filter().get_prefix(); + } else { + prefix = rule->get_prefix(); + } + auto ret = prefix_map.emplace(std::move(prefix), std::move(op)); + return ret.second; +} + +int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule) +{ + if (!rule->valid()) { + return -EINVAL; + } + string id; + rule->get_id(id); + if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same + return -EINVAL; + } + rule_map.insert(pair(id, *rule)); + + if (!_add_rule(rule)) { + return -ERR_INVALID_REQUEST; + } + return 0; +} + +bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) { + if ((first.expiration > 0 || first.expiration_date != boost::none) && + (second.expiration > 0 || second.expiration_date != boost::none)) { + return true; + } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) { + return true; + } else if (first.mp_expiration > 0 && second.mp_expiration > 0) { + return true; + } else { + return false; + } +} + +//Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules +//define same action. +bool RGWLifecycleConfiguration::valid() +{ + if (prefix_map.size() < 2) { + return true; + } + auto cur_iter = prefix_map.begin(); + while (cur_iter != prefix_map.end()) { + auto next_iter = cur_iter; + ++next_iter; + while (next_iter != prefix_map.end()) { + string c_pre = cur_iter->first; + string n_pre = next_iter->first; + if (n_pre.compare(0, c_pre.length(), c_pre) == 0) { + if (has_same_action(cur_iter->second, next_iter->second)) { + return false; + } else { + ++next_iter; + } + } else { + break; + } + } + ++cur_iter; + } + return true; +} + +void *RGWLC::LCWorker::entry() { + do { + utime_t start = ceph_clock_now(); + if (should_work(start)) { + dout(5) << "life cycle: start" << dendl; + int r = lc->process(); + if (r < 0) { + dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl; + } + dout(5) << "life cycle: stop" << dendl; + } + if (lc->going_down()) + break; + + utime_t end = ceph_clock_now(); + int secs = schedule_next_start_time(start, end); + utime_t next; + next.set_from_double(end + secs); + + dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <going_down()); + + return NULL; +} + +void RGWLC::initialize(CephContext *_cct, RGWRados *_store) { + cct = _cct; + store = _store; + max_objs = cct->_conf->rgw_lc_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } + +#define COOKIE_LEN 16 + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1); + cookie = cookie_buf; +} + +void RGWLC::finalize() +{ + delete[] obj_names; +} + +bool RGWLC::if_already_run_today(time_t& start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(); + localtime_r(&start_date, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + if (now - start_date < cct->_conf->rgw_lc_debug_interval) + return true; + else + return false; + } + + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} + +int RGWLC::bucket_lc_prepare(int index) +{ + map entries; + + string marker; + +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); + if (ret < 0) + return ret; + map::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + pair entry(iter->first, lc_uninitial); + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl; + break; + } + marker = iter->first; + } + } while (!entries.empty()); + + return 0; +} + +bool RGWLC::obj_has_expired(double timediff, int days) +{ + double cmp; + if (cct->_conf->rgw_lc_debug_interval <= 0) { + /* Normal case, run properly */ + cmp = days*24*60*60; + } else { + /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */ + cmp = days*cct->_conf->rgw_lc_debug_interval; + } + + return (timediff >= cmp); +} + +int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed) +{ + if (remove_indeed) { + return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key); + } else { + obj_key.instance.clear(); + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, obj_key); + return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status()); + } +} + +int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map& prefix_map) +{ + MultipartMetaFilter mp_filter; + vector objs; + RGWMPObj mp_obj; + bool is_truncated; + int ret; + RGWBucketInfo& bucket_info = target->get_bucket_info(); + RGWRados::Bucket::List list_op(target); + list_op.params.list_versions = false; + list_op.params.ns = RGW_OBJ_NS_MULTIPART; + list_op.params.filter = &mp_filter; + for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { + continue; + } + list_op.params.prefix = prefix_iter->first; + do { + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <meta.mtime), prefix_iter->second.mp_expiration)) { + rgw_obj_key key(obj_iter->key); + if (!mp_obj.from_meta(key.name)) { + continue; + } + RGWObjectCtx rctx(store); + ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj); + if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) { + ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret < bucket_attrs; + string next_marker, no_ns, list_versions; + bool is_truncated; + vector objs; + RGWObjectCtx obj_ctx(store); + vector result; + boost::split(result, shard_id, boost::is_any_of(":")); + string bucket_tenant = result[0]; + string bucket_name = result[1]; + string bucket_id = result[2]; + int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs); + if (ret < 0) { + ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return 0; + + bufferlist::iterator iter(&aiter->second); + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl; + return -1; + } + + map& prefix_map = config.get_prefix_map(); + list_op.params.list_versions = bucket_info.versioned(); + if (!bucket_info.versioned()) { + for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + if (!prefix_iter->second.status || + (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) { + continue; + } + if (prefix_iter->second.expiration_date != boost::none && + ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) { + continue; + } + list_op.params.prefix = prefix_iter->first; + do { + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <key); + + if (!key.ns.empty()) { + continue; + } + if (prefix_iter->second.expiration_date != boost::none) { + //we have checked it before + is_expired = true; + } else { + is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration); + } + if (is_expired) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, key); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; + ret = remove_expired_obj(bucket_info, obj_iter->key, true); + if (ret < 0) { + ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl; + } + } + } + } while (is_truncated); + } + } else { + //bucket versioning is enabled or suspended + rgw_obj_key pre_marker; + for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0 + && prefix_iter->second.expiration_date == boost::none + && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) { + continue; + } + if (prefix_iter != prefix_map.begin() && + (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { + list_op.next_marker = pre_marker; + } else { + pre_marker = list_op.get_next_marker(); + } + list_op.params.prefix = prefix_iter->first; + rgw_bucket_dir_entry pre_obj; + do { + if (!objs.empty()) { + pre_obj = objs.back(); + } + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <is_current()) { + if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none + && !prefix_iter->second.dm_expiration) { + continue; + } + if (obj_iter->is_delete_marker()) { + if ((obj_iter + 1)==objs.end()) { + if (is_truncated) { + //deal with it in next round because we can't judge whether this marker is the only version + list_op.next_marker = obj_iter->key; + break; + } + } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing. + continue; + } + skip_expiration = prefix_iter->second.dm_expiration; + remove_indeed = true; //we should remove the delete marker if it's the only version + } else { + remove_indeed = false; + } + mtime = obj_iter->meta.mtime; + expiration = prefix_iter->second.expiration; + if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) { + continue; + } else if (!skip_expiration) { + if (expiration > 0) { + is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration); + } else { + is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date); + } + } + } else { + if (prefix_iter->second.noncur_expiration <=0) { + continue; + } + remove_indeed = true; + mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime; + expiration = prefix_iter->second.noncur_expiration; + is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration); + } + if (skip_expiration || is_expired) { + if (obj_iter->is_visible()) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, obj_iter->key); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; + } + ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed); + if (ret < 0) { + ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl; + } + } + } + } while (is_truncated); + } + } + + ret = handle_multipart_expiration(&target, prefix_map); + + return ret; +} + +int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair& entry, int& result) +{ + utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); + + rados::cls::lock::Lock l(lc_index_lock_name); + l.set_cookie(cookie); + l.set_duration(lock_duration); + + do { + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(5); + continue; + } + if (ret < 0) + return 0; + dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl; + if (result == -ENOENT) { + ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl; + } + goto clean; + } else if (result < 0) { + entry.second = lc_failed; + } else { + entry.second = lc_complete; + } + + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl; + } +clean: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl; + return 0; + } while (true); +} + +int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map) +{ + int index = 0; + progress_map->clear(); + for(; index entries; + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries); + if (ret < 0) { + if (ret == -ENOENT) { + dout(10) << __func__ << " ignoring unfound lc object=" + << obj_names[index] << dendl; + continue; + } else { + return ret; + } + } + map::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + progress_map->insert(*iter); + } + } + return 0; +} + +int RGWLC::process() +{ + int max_secs = cct->_conf->rgw_lc_lock_max_time; + + unsigned start; + int ret = get_random_bytes((char *)&start, sizeof(start)); + if (ret < 0) + return ret; + + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + ret = process(index, max_secs); + if (ret < 0) + return ret; + } + + return 0; +} + +int RGWLC::process(int index, int max_lock_secs) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + utime_t now = ceph_clock_now(); + pair entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + if (max_lock_secs <= 0) + return -EAGAIN; + + utime_t time(max_lock_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(5); + continue; + } + if (ret < 0) + return 0; + + string marker; + cls_rgw_lc_obj_head head; + ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl; + goto exit; + } + + if(!if_already_run_today(head.start_date)) { + head.start_date = now; + head.marker.clear(); + ret = bucket_lc_prepare(index); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl; + goto exit; + } + } + + ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl; + goto exit; + } + + if (entry.first.empty()) + goto exit; + + entry.second = lc_processing; + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl; + goto exit; + } + + head.marker = entry.first; + ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl; + goto exit; + } + l.unlock(&store->lc_pool_ctx, obj_names[index]); + ret = bucket_lc_process(entry.first); + bucket_lc_post(index, max_lock_secs, entry, ret); + }while(1); + +exit: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + return 0; +} + +void RGWLC::start_processor() +{ + worker = new LCWorker(cct, this); + worker->create("lifecycle_thr"); +} + +void RGWLC::stop_processor() +{ + down_flag = true; + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void RGWLC::LCWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + +bool RGWLC::going_down() +{ + return down_flag; +} + +bool RGWLC::LCWorker::should_work(utime_t& now) +{ + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); + struct tm bdt; + time_t tt = now.sec(); + localtime_r(&tt, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + /* We're debugging, so say we can run */ + return true; + } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) && + (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) { + return true; + } else { + return false; + } + +} + +int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) +{ + if (cct->_conf->rgw_lc_debug_interval > 0) { + int secs = start + cct->_conf->rgw_lc_debug_interval - now; + if (secs < 0) + secs = 0; + return (secs); + } + + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); + struct tm bdt; + time_t tt = now.sec(); + time_t nt; + localtime_r(&tt, &bdt); + bdt.tm_hour = start_hour; + bdt.tm_min = start_minute; + bdt.tm_sec = 0; + nt = mktime(&bdt); + + return (nt+24*60*60 - tt); +} +