1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
11 #include "auth/Crypto.h"
13 #include "common/armor.h"
14 #include "common/ceph_json.h"
15 #include "common/config.h"
16 #include "common/ceph_argparse.h"
17 #include "common/Formatter.h"
18 #include "common/errno.h"
20 #include "global/global_init.h"
22 #include "include/utime.h"
23 #include "include/str_list.h"
26 #include "rgw_bucket.h"
27 #include "rgw_rados.h"
29 #include "rgw_acl_s3.h"
31 #include "rgw_formats.h"
32 #include "rgw_usage.h"
33 #include "rgw_replica_log.h"
34 #include "rgw_object_expirer_core.h"
36 #include "cls/lock/cls_lock_client.h"
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_rgw
41 static string objexp_lock_name = "gc_process";
43 int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
44 const string& bucket_name,
45 const string& bucket_id,
46 RGWBucketInfo& bucket_info)
48 RGWObjectCtx obj_ctx(store);
51 * XXX Here's where it gets tricky. We went to all the trouble of
52 * punching the tenant through the objexp_hint_entry, but now we
53 * find that our instances do not actually have tenants. They are
54 * unique thanks to IDs. So the tenant string is not needed...
56 const string bucket_instance_id = bucket_name + ":" + bucket_id;
57 int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id,
58 bucket_info, NULL, NULL);
62 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
64 RGWBucketInfo bucket_info;
66 int ret = init_bucket_info(hint.tenant, hint.bucket_name,
67 hint.bucket_id, bucket_info);
69 ldout(store->ctx(), 15) << "NOTICE: cannot find bucket = " \
70 << hint.bucket_name << ". The object must be already removed" << dendl;
71 return -ERR_PRECONDITION_FAILED;
73 ldout(store->ctx(), 1) << "ERROR: could not init bucket = " \
74 << hint.bucket_name << "due to ret = " << ret << dendl;
78 RGWObjectCtx rctx(store);
80 rgw_obj_key key = hint.obj_key;
81 if (key.instance.empty()) {
82 key.instance = "null";
85 rgw_obj obj(bucket_info.bucket, key);
86 store->set_atomic(&rctx, obj);
87 ret = store->delete_obj(rctx, bucket_info, obj,
88 bucket_info.versioning_status(), 0, hint.exp_time);
93 void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
94 bool& need_trim) /* out */
98 for (list<cls_timeindex_entry>::iterator iter = entries.begin();
99 iter != entries.end();
102 objexp_hint_entry hint;
103 ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
104 << " - " << iter->key_ext << dendl;
106 int ret = store->objexp_hint_parse(*iter, hint);
108 ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
112 /* PRECOND_FAILED simply means that our hint is not valid.
113 * We can silently ignore that and move forward. */
114 ret = garbage_single_object(hint);
115 if (ret == -ERR_PRECONDITION_FAILED) {
116 ldout(store->ctx(), 15) << "not actual hint for object: " << hint.obj_key << dendl;
117 } else if (ret < 0) {
118 ldout(store->ctx(), 1) << "cannot remove expired object: " << hint.obj_key << dendl;
127 void RGWObjectExpirer::trim_chunk(const string& shard,
130 const string& from_marker,
131 const string& to_marker)
133 ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
134 << ", to_marker=" << to_marker << dendl;
136 real_time rt_from = from.to_real_time();
137 real_time rt_to = to.to_real_time();
139 int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
140 from_marker, to_marker);
142 ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
148 bool RGWObjectExpirer::process_single_shard(const string& shard,
149 const utime_t& last_run,
150 const utime_t& round_start)
154 bool truncated = false;
157 CephContext *cct = store->ctx();
158 int num_entries = cct->_conf->rgw_objexp_chunk_size;
160 int max_secs = cct->_conf->rgw_objexp_gc_interval;
161 utime_t end = ceph_clock_now();
164 rados::cls::lock::Lock l(objexp_lock_name);
166 utime_t time(max_secs, 0);
167 l.set_duration(time);
169 int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
170 if (ret == -EBUSY) { /* already locked by another processor */
171 dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
176 real_time rt_last = last_run.to_real_time();
177 real_time rt_start = round_start.to_real_time();
179 list<cls_timeindex_entry> entries;
180 ret = store->objexp_hint_list(shard, rt_last, rt_start,
181 num_entries, marker, entries,
182 &out_marker, &truncated);
184 ldout(cct, 10) << "cannot get removal hints from shard: " << shard
190 garbage_chunk(entries, need_trim);
193 trim_chunk(shard, last_run, round_start, marker, out_marker);
196 utime_t now = ceph_clock_now();
205 l.unlock(&store->objexp_pool_ctx, shard);
209 /* Returns true if all shards have been processed successfully. */
210 bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
211 const utime_t& round_start)
213 CephContext * const cct = store->ctx();
214 int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
215 bool all_done = true;
217 for (int i = 0; i < num_shards; i++) {
219 store->objexp_get_shard(i, shard);
221 ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
223 if (! process_single_shard(shard, last_run, round_start)) {
231 bool RGWObjectExpirer::going_down()
236 void RGWObjectExpirer::start_processor()
238 worker = new OEWorker(store->ctx(), this);
239 worker->create("rgw_obj_expirer");
242 void RGWObjectExpirer::stop_processor()
253 void *RGWObjectExpirer::OEWorker::entry() {
256 utime_t start = ceph_clock_now();
257 ldout(cct, 2) << "object expiration: start" << dendl;
258 if (oe->inspect_all_shards(last_run, start)) {
259 /* All shards have been processed properly. Next time we can start
260 * from this moment. */
263 ldout(cct, 2) << "object expiration: stop" << dendl;
266 if (oe->going_down())
269 utime_t end = ceph_clock_now();
271 int secs = cct->_conf->rgw_objexp_gc_interval;
273 if (secs <= end.sec())
274 continue; // next round
279 cond.WaitInterval(lock, utime_t(secs, 0));
281 } while (!oe->going_down());
286 void RGWObjectExpirer::OEWorker::stop()
288 Mutex::Locker l(lock);