Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_gc.h"
5 #include "include/rados/librados.hpp"
6 #include "cls/rgw/cls_rgw_client.h"
7 #include "cls/refcount/cls_refcount_client.h"
8 #include "cls/lock/cls_lock_client.h"
9 #include "auth/Crypto.h"
10
11 #include <list>
12
13 #define dout_context g_ceph_context
14 #define dout_subsys ceph_subsys_rgw
15
16 using namespace std;
17 using namespace librados;
18
19 static string gc_oid_prefix = "gc";
20 static string gc_index_lock_name = "gc_process";
21
22
23 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
24   cct = _cct;
25   store = _store;
26
27   max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
28
29   obj_names = new string[max_objs];
30
31   for (int i = 0; i < max_objs; i++) {
32     obj_names[i] = gc_oid_prefix;
33     char buf[32];
34     snprintf(buf, 32, ".%d", i);
35     obj_names[i].append(buf);
36   }
37 }
38
39 void RGWGC::finalize()
40 {
41   delete[] obj_names;
42 }
43
44 int RGWGC::tag_index(const string& tag)
45 {
46   return rgw_shards_hash(tag, max_objs);
47 }
48
49 void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag)
50 {
51   cls_rgw_gc_obj_info info;
52   info.chain = chain;
53   info.tag = tag;
54
55   cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
56 }
57
58 int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
59 {
60   ObjectWriteOperation op;
61   add_chain(op, chain, tag);
62
63   int i = tag_index(tag);
64
65   if (sync)
66     return store->gc_operate(obj_names[i], &op);
67
68   return store->gc_aio_operate(obj_names[i], &op);
69 }
70
71 int RGWGC::defer_chain(const string& tag, bool sync)
72 {
73   ObjectWriteOperation op;
74   cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
75
76   int i = tag_index(tag);
77
78   if (sync)
79     return store->gc_operate(obj_names[i], &op);
80
81   return store->gc_aio_operate(obj_names[i], &op);
82 }
83
84 int RGWGC::remove(int index, const std::list<string>& tags)
85 {
86   ObjectWriteOperation op;
87   cls_rgw_gc_remove(op, tags);
88   return store->gc_operate(obj_names[index], &op);
89 }
90
91 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
92 {
93   result.clear();
94   string next_marker;
95
96   for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) {
97     std::list<cls_rgw_gc_obj_info> entries;
98     int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
99     if (ret == -ENOENT)
100       continue;
101     if (ret < 0)
102       return ret;
103
104     std::list<cls_rgw_gc_obj_info>::iterator iter;
105     for (iter = entries.begin(); iter != entries.end(); ++iter) {
106       result.push_back(*iter);
107     }
108
109     marker = next_marker;
110
111     if (*index == max_objs - 1) {
112       /* we cut short here, truncated will hold the correct value */
113       return 0;
114     }
115
116     if (result.size() == max) {
117       /* close approximation, it might be that the next of the objects don't hold
118        * anything, in this case truncated should have been false, but we can find
119        * that out on the next iteration
120        */
121       *truncated = true;
122       return 0;
123     }
124
125   }
126   *truncated = false;
127
128   return 0;
129 }
130
131 int RGWGC::process(int index, int max_secs)
132 {
133   rados::cls::lock::Lock l(gc_index_lock_name);
134   utime_t end = ceph_clock_now();
135   std::list<string> remove_tags;
136
137   /* max_secs should be greater than zero. We don't want a zero max_secs
138    * to be translated as no timeout, since we'd then need to break the
139    * lock and that would require a manual intervention. In this case
140    * we can just wait it out. */
141   if (max_secs <= 0)
142     return -EAGAIN;
143
144   end += max_secs;
145   utime_t time(max_secs, 0);
146   l.set_duration(time);
147
148   int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
149   if (ret == -EBUSY) { /* already locked by another gc processor */
150     dout(10) << "RGWGC::process() failed to acquire lock on " << obj_names[index] << dendl;
151     return 0;
152   }
153   if (ret < 0)
154     return ret;
155
156   string marker;
157   string next_marker;
158   bool truncated;
159   IoCtx *ctx = new IoCtx;
160   do {
161     int max = 100;
162     std::list<cls_rgw_gc_obj_info> entries;
163     ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, true, entries, &truncated, next_marker);
164     if (ret == -ENOENT) {
165       ret = 0;
166       goto done;
167     }
168     if (ret < 0)
169       goto done;
170
171     string last_pool;
172     std::list<cls_rgw_gc_obj_info>::iterator iter;
173     for (iter = entries.begin(); iter != entries.end(); ++iter) {
174       bool remove_tag;
175       cls_rgw_gc_obj_info& info = *iter;
176       std::list<cls_rgw_obj>::iterator liter;
177       cls_rgw_obj_chain& chain = info.chain;
178
179       utime_t now = ceph_clock_now();
180       if (now >= end)
181         goto done;
182
183       remove_tag = true;
184       for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
185         cls_rgw_obj& obj = *liter;
186
187         if (obj.pool != last_pool) {
188           delete ctx;
189           ctx = new IoCtx;
190           ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
191           if (ret < 0) {
192             dout(0) << "ERROR: failed to create ioctx pool=" << obj.pool << dendl;
193             continue;
194           }
195           last_pool = obj.pool;
196         }
197
198         ctx->locator_set_key(obj.loc);
199
200         const string& oid = obj.key.name; /* just stored raw oid there */
201
202         dout(5) << "gc::process: removing " << obj.pool << ":" << obj.key.name << dendl;
203         ObjectWriteOperation op;
204         cls_refcount_put(op, info.tag, true);
205         ret = ctx->operate(oid, &op);
206         if (ret == -ENOENT)
207           ret = 0;
208         if (ret < 0) {
209           remove_tag = false;
210           dout(0) << "failed to remove " << obj.pool << ":" << oid << "@" << obj.loc << dendl;
211         }
212
213         if (going_down()) // leave early, even if tag isn't removed, it's ok
214           goto done;
215       }
216       if (remove_tag) {
217         remove_tags.push_back(info.tag);
218 #define MAX_REMOVE_CHUNK 16
219         if (remove_tags.size() > MAX_REMOVE_CHUNK) {
220           RGWGC::remove(index, remove_tags);
221           remove_tags.clear();
222         }
223       }
224     }
225     if (!remove_tags.empty()) {
226       RGWGC::remove(index, remove_tags);
227       remove_tags.clear();
228     }
229   } while (truncated);
230
231 done:
232   if (!remove_tags.empty())
233     RGWGC::remove(index, remove_tags);
234   l.unlock(&store->gc_pool_ctx, obj_names[index]);
235   delete ctx;
236   return 0;
237 }
238
239 int RGWGC::process()
240 {
241   int max_secs = cct->_conf->rgw_gc_processor_max_time;
242
243   unsigned start;
244   int ret = get_random_bytes((char *)&start, sizeof(start));
245   if (ret < 0)
246     return ret;
247
248   for (int i = 0; i < max_objs; i++) {
249     int index = (i + start) % max_objs;
250     ret = process(index, max_secs);
251     if (ret < 0)
252       return ret;
253   }
254
255   return 0;
256 }
257
258 bool RGWGC::going_down()
259 {
260   return down_flag;
261 }
262
263 void RGWGC::start_processor()
264 {
265   worker = new GCWorker(cct, this);
266   worker->create("rgw_gc");
267 }
268
269 void RGWGC::stop_processor()
270 {
271   down_flag = true;
272   if (worker) {
273     worker->stop();
274     worker->join();
275   }
276   delete worker;
277   worker = NULL;
278 }
279
280 void *RGWGC::GCWorker::entry() {
281   do {
282     utime_t start = ceph_clock_now();
283     dout(2) << "garbage collection: start" << dendl;
284     int r = gc->process();
285     if (r < 0) {
286       dout(0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
287     }
288     dout(2) << "garbage collection: stop" << dendl;
289
290     if (gc->going_down())
291       break;
292
293     utime_t end = ceph_clock_now();
294     end -= start;
295     int secs = cct->_conf->rgw_gc_processor_period;
296
297     if (secs <= end.sec())
298       continue; // next round
299
300     secs -= end.sec();
301
302     lock.Lock();
303     cond.WaitInterval(lock, utime_t(secs, 0));
304     lock.Unlock();
305   } while (!gc->going_down());
306
307   return NULL;
308 }
309
310 void RGWGC::GCWorker::stop()
311 {
312   Mutex::Locker l(lock);
313   cond.Signal();
314 }
315