Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_object_expirer_core.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5 #include <iostream>
6 #include <sstream>
7 #include <string>
8
9 using namespace std;
10
11 #include "auth/Crypto.h"
12
13 #include "common/armor.h"
14 #include "common/ceph_json.h"
15 #include "common/config.h"
16 #include "common/ceph_argparse.h"
17 #include "common/Formatter.h"
18 #include "common/errno.h"
19
20 #include "global/global_init.h"
21
22 #include "include/utime.h"
23 #include "include/str_list.h"
24
25 #include "rgw_user.h"
26 #include "rgw_bucket.h"
27 #include "rgw_rados.h"
28 #include "rgw_acl.h"
29 #include "rgw_acl_s3.h"
30 #include "rgw_log.h"
31 #include "rgw_formats.h"
32 #include "rgw_usage.h"
33 #include "rgw_replica_log.h"
34 #include "rgw_object_expirer_core.h"
35
36 #include "cls/lock/cls_lock_client.h"
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_rgw
40
41 static string objexp_lock_name = "gc_process";
42
43 int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
44                                        const string& bucket_name,
45                                        const string& bucket_id,
46                                        RGWBucketInfo& bucket_info)
47 {
48   RGWObjectCtx obj_ctx(store);
49
50   /*
51    * XXX Here's where it gets tricky. We went to all the trouble of
52    * punching the tenant through the objexp_hint_entry, but now we
53    * find that our instances do not actually have tenants. They are
54    * unique thanks to IDs. So the tenant string is not needed...
55    */
56   const string bucket_instance_id = bucket_name + ":" + bucket_id;
57   int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id,
58           bucket_info, NULL, NULL);
59   return ret;
60 }
61
62 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
63 {
64   RGWBucketInfo bucket_info;
65
66   int ret = init_bucket_info(hint.tenant, hint.bucket_name,
67           hint.bucket_id, bucket_info);
68   if (-ENOENT == ret) {
69     ldout(store->ctx(), 15) << "NOTICE: cannot find bucket = " \
70         << hint.bucket_name << ". The object must be already removed" << dendl;
71     return -ERR_PRECONDITION_FAILED;
72   } else if (ret < 0) {
73     ldout(store->ctx(),  1) << "ERROR: could not init bucket = " \
74         << hint.bucket_name << "due to ret = " << ret << dendl;
75     return ret;
76   }
77
78   RGWObjectCtx rctx(store);
79
80   rgw_obj_key key = hint.obj_key;
81   if (key.instance.empty()) {
82     key.instance = "null";
83   }
84
85   rgw_obj obj(bucket_info.bucket, key);
86   store->set_atomic(&rctx, obj);
87   ret = store->delete_obj(rctx, bucket_info, obj,
88           bucket_info.versioning_status(), 0, hint.exp_time);
89
90   return ret;
91 }
92
93 void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /* in  */
94                                   bool& need_trim)                         /* out */
95 {
96   need_trim = false;
97
98   for (list<cls_timeindex_entry>::iterator iter = entries.begin();
99        iter != entries.end();
100        ++iter)
101   {
102     objexp_hint_entry hint;
103     ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
104         << " - " << iter->key_ext << dendl;
105
106     int ret = store->objexp_hint_parse(*iter, hint);
107     if (ret < 0) {
108       ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
109       continue;
110     }
111
112     /* PRECOND_FAILED simply means that our hint is not valid.
113      * We can silently ignore that and move forward. */
114     ret = garbage_single_object(hint);
115     if (ret == -ERR_PRECONDITION_FAILED) {
116       ldout(store->ctx(), 15) << "not actual hint for object: " << hint.obj_key << dendl;
117     } else if (ret < 0) {
118       ldout(store->ctx(), 1) << "cannot remove expired object: " << hint.obj_key << dendl;
119     }
120
121     need_trim = true;
122   }
123
124   return;
125 }
126
127 void RGWObjectExpirer::trim_chunk(const string& shard,
128                                   const utime_t& from,
129                                   const utime_t& to,
130                                   const string& from_marker,
131                                   const string& to_marker)
132 {
133   ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
134                           << ", to_marker=" << to_marker << dendl;
135
136   real_time rt_from = from.to_real_time();
137   real_time rt_to = to.to_real_time();
138
139   int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
140                                     from_marker, to_marker);
141   if (ret < 0) {
142     ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
143   }
144
145   return;
146 }
147
148 bool RGWObjectExpirer::process_single_shard(const string& shard,
149                                             const utime_t& last_run,
150                                             const utime_t& round_start)
151 {
152   string marker;
153   string out_marker;
154   bool truncated = false;
155   bool done = true;
156
157   CephContext *cct = store->ctx();
158   int num_entries = cct->_conf->rgw_objexp_chunk_size;
159
160   int max_secs = cct->_conf->rgw_objexp_gc_interval;
161   utime_t end = ceph_clock_now();
162   end += max_secs;
163
164   rados::cls::lock::Lock l(objexp_lock_name);
165
166   utime_t time(max_secs, 0);
167   l.set_duration(time);
168
169   int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
170   if (ret == -EBUSY) { /* already locked by another processor */
171     dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
172     return false;
173   }
174
175   do {
176     real_time rt_last = last_run.to_real_time();
177     real_time rt_start = round_start.to_real_time();
178
179     list<cls_timeindex_entry> entries;
180     ret = store->objexp_hint_list(shard, rt_last, rt_start,
181                                   num_entries, marker, entries,
182                                   &out_marker, &truncated);
183     if (ret < 0) {
184       ldout(cct, 10) << "cannot get removal hints from shard: " << shard
185                      << dendl;
186       continue;
187     }
188
189     bool need_trim;
190     garbage_chunk(entries, need_trim);
191
192     if (need_trim) {
193       trim_chunk(shard, last_run, round_start, marker, out_marker);
194     }
195
196     utime_t now = ceph_clock_now();
197     if (now >= end) {
198       done = false;
199       break;
200     }
201
202     marker = out_marker;
203   } while (truncated);
204
205   l.unlock(&store->objexp_pool_ctx, shard);
206   return done;
207 }
208
209 /* Returns true if all shards have been processed successfully. */
210 bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
211                                           const utime_t& round_start)
212 {
213   CephContext * const cct = store->ctx();
214   int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
215   bool all_done = true;
216
217   for (int i = 0; i < num_shards; i++) {
218     string shard;
219     store->objexp_get_shard(i, shard);
220
221     ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
222
223     if (! process_single_shard(shard, last_run, round_start)) {
224       all_done = false;
225     }
226   }
227
228   return all_done;
229 }
230
231 bool RGWObjectExpirer::going_down()
232 {
233   return down_flag;
234 }
235
236 void RGWObjectExpirer::start_processor()
237 {
238   worker = new OEWorker(store->ctx(), this);
239   worker->create("rgw_obj_expirer");
240 }
241
242 void RGWObjectExpirer::stop_processor()
243 {
244   down_flag = true;
245   if (worker) {
246     worker->stop();
247     worker->join();
248   }
249   delete worker;
250   worker = NULL;
251 }
252
253 void *RGWObjectExpirer::OEWorker::entry() {
254   utime_t last_run;
255   do {
256     utime_t start = ceph_clock_now();
257     ldout(cct, 2) << "object expiration: start" << dendl;
258     if (oe->inspect_all_shards(last_run, start)) {
259       /* All shards have been processed properly. Next time we can start
260        * from this moment. */
261       last_run = start;
262     }
263     ldout(cct, 2) << "object expiration: stop" << dendl;
264
265
266     if (oe->going_down())
267       break;
268
269     utime_t end = ceph_clock_now();
270     end -= start;
271     int secs = cct->_conf->rgw_objexp_gc_interval;
272
273     if (secs <= end.sec())
274       continue; // next round
275
276     secs -= end.sec();
277
278     lock.Lock();
279     cond.WaitInterval(lock, utime_t(secs, 0));
280     lock.Unlock();
281   } while (!oe->going_down());
282
283   return NULL;
284 }
285
286 void RGWObjectExpirer::OEWorker::stop()
287 {
288   Mutex::Locker l(lock);
289   cond.Signal();
290 }
291