Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_orphan.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <string>
5
6 using namespace std;
7
8 #include "common/config.h"
9 #include "common/Formatter.h"
10 #include "common/errno.h"
11
12 #include "rgw_rados.h"
13 #include "rgw_orphan.h"
14
15 #define dout_subsys ceph_subsys_rgw
16
17 #define DEFAULT_NUM_SHARDS 64
18
19 static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
20 {
21   ssize_t pos = oid.find('_');
22   if (pos < 0) {
23     cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
24   }
25
26   string obj_marker = oid.substr(0, pos);
27
28   rgw_obj_key key;
29
30   rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
31
32   if (key.ns.empty()) {
33     return oid;
34   }
35
36   string s = oid;
37
38   if (force_ns) {
39     rgw_bucket b;
40     rgw_obj new_obj(b, key);
41     s = obj_marker + "_" + new_obj.get_oid();
42   }
43
44   /* cut out suffix */
45   size_t i = s.size() - 1;
46   for (; i >= s.size() - 10; --i) {
47     char c = s[i];
48     if (!isdigit(c) && c != '.' && c != '_') {
49       break;
50     }
51   }
52
53   return s.substr(0, i + 1);
54 }
55
56 int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
57 {
58   set<string> keys;
59   map<string, bufferlist> vals;
60   keys.insert(job_name);
61   int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
62   if (r < 0) {
63     return r;
64   }
65
66   map<string, bufferlist>::iterator iter = vals.find(job_name);
67   if (iter == vals.end()) {
68     return -ENOENT;
69   }
70
71   try {
72     bufferlist& bl = iter->second;
73     ::decode(state, bl);
74   } catch (buffer::error& err) {
75     lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
76     return -EIO;
77   }
78
79   return 0;
80 }
81
82 int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
83 {
84   map<string, bufferlist> vals;
85   bufferlist bl;
86   ::encode(state, bl);
87   vals[job_name] = bl;
88   int r = ioctx.omap_set(oid, vals);
89   if (r < 0) {
90     return r;
91   }
92
93   return 0;
94 }
95
96 int RGWOrphanStore::remove_job(const string& job_name)
97 {
98   set<string> keys;
99   keys.insert(job_name);
100
101   int r = ioctx.omap_rm_keys(oid, keys);
102   if (r < 0) {
103     return r;
104   }
105
106   return 0;
107 }
108
109 int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
110 {
111   map <string,bufferlist> vals;
112   int MAX_READ=1024;
113   string marker="";
114   int r = 0;
115
116   // loop through all the omap vals from index object, storing them to job_list,
117   // read in batches of 1024, we update the marker every iteration and exit the
118   // loop when we find that total size read out is less than batch size
119   do {
120     r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
121     if (r < 0) {
122       return r;
123     }
124     r = vals.size();
125
126     for (const auto &it : vals) {
127       marker=it.first;
128       RGWOrphanSearchState state;
129       try {
130         bufferlist bl = it.second;
131         ::decode(state, bl);
132       } catch (buffer::error& err) {
133         lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
134         return -EIO;
135       }
136       job_list[it.first] = state;
137     }
138   } while (r == MAX_READ);
139
140   return 0;
141 }
142
143 int RGWOrphanStore::init()
144 {
145   rgw_pool& log_pool = store->get_zone_params().log_pool;
146   int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx);
147   if (r < 0) {
148     cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
149     return r;
150   }
151
152   return 0;
153 }
154
155 int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
156 {
157   librados::ObjectWriteOperation op;
158   op.omap_set(entries);
159   cout << "storing " << entries.size() << " entries at " << oid << std::endl;
160   ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
161   for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
162     ldout(store->ctx(), 20) << " > " << iter->first << dendl;
163   }
164   int ret = ioctx.operate(oid, &op);
165   if (ret < 0) {
166     lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
167   }
168   
169   return 0;
170 }
171
172 int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
173 {
174 #define MAX_OMAP_GET 100
175   int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
176   if (ret < 0 && ret != -ENOENT) {
177     cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
178   }
179
180   *truncated = (entries->size() == MAX_OMAP_GET);
181
182   return 0;
183 }
184
185 int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
186   int r = orphan_store.init();
187   if (r < 0) {
188     return r;
189   }
190
191   RGWOrphanSearchState state;
192   r = orphan_store.read_job(job_name, state);
193   if (r < 0 && r != -ENOENT) {
194     lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
195     return r;
196   }
197
198   if (r == 0) {
199     search_info = state.info;
200     search_stage = state.stage;
201   } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */ 
202     search_info = *info;
203     search_info.job_name = job_name;
204     search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
205     search_info.start_time = ceph_clock_now();
206     search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
207
208     r = save_state();
209     if (r < 0) {
210       lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
211       return r;
212     }
213   } else {
214       lderr(store->ctx()) << "ERROR: job not found" << dendl;
215       return r;
216   }
217
218   index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
219   index_objs_prefix += job_name;
220
221   for (int i = 0; i < search_info.num_shards; i++) {
222     char buf[128];
223
224     snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
225     all_objs_index[i] = buf;
226
227     snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
228     buckets_instance_index[i] = buf;
229
230     snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
231     linked_objs_index[i] = buf;
232   }
233   return 0;
234 }
235
236 int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
237 {
238   map<int, list<string> >::iterator miter = oids.begin();
239
240   list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
241
242   for (; miter != oids.end(); ++miter) {
243     log_iter_info info;
244     info.oid = log_shards[miter->first];
245     info.cur = miter->second.begin();
246     info.end = miter->second.end();
247     liters.push_back(info);
248   }
249
250   list<log_iter_info>::iterator list_iter;
251   while (!liters.empty()) {
252      list_iter = liters.begin();
253
254      while (list_iter != liters.end()) {
255        log_iter_info& cur_info = *list_iter;
256
257        list<string>::iterator& cur = cur_info.cur;
258        list<string>::iterator& end = cur_info.end;
259
260        map<string, bufferlist> entries;
261 #define MAX_OMAP_SET_ENTRIES 100
262        for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
263          ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
264          entries[*cur] = bufferlist();
265        }
266
267        int ret = orphan_store.store_entries(cur_info.oid, entries);
268        if (ret < 0) {
269          return ret;
270        }
271        list<log_iter_info>::iterator tmp = list_iter;
272        ++list_iter;
273        if (cur == end) {
274          liters.erase(tmp);
275        }
276      }
277   }
278   return 0;
279 }
280
281 int RGWOrphanSearch::build_all_oids_index()
282 {
283   librados::IoCtx ioctx;
284
285   int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx);
286   if (ret < 0) {
287     lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
288     return ret;
289   }
290
291   ioctx.set_namespace(librados::all_nspaces);
292   librados::NObjectIterator i = ioctx.nobjects_begin();
293   librados::NObjectIterator i_end = ioctx.nobjects_end();
294
295   map<int, list<string> > oids;
296
297   int count = 0;
298   uint64_t total = 0;
299
300   cout << "logging all objects in the pool" << std::endl;
301
302   for (; i != i_end; ++i) {
303     string nspace = i->get_nspace();
304     string oid = i->get_oid();
305     string locator = i->get_locator();
306
307     ssize_t pos = oid.find('_');
308     if (pos < 0) {
309       cout << "unidentified oid: " << oid << ", skipping" << std::endl;
310       /* what is this object, oids should be in the format of <bucket marker>_<obj>,
311        * skip this entry
312        */
313       continue;
314     }
315     string stripped_oid = oid.substr(pos + 1);
316     rgw_obj_key key;
317     if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
318       cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
319       continue;
320     }
321
322     if (key.ns.empty()) {
323       /* skipping head objects, we don't want to remove these as they are mutable and
324        * cleaning them up is racy (can race with object removal and a later recreation)
325        */
326       cout << "skipping head object: oid=" << oid << std::endl;
327       continue;
328     }
329
330     string oid_fp = obj_fingerprint(oid);
331
332     ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
333
334     int shard = orphan_shard(oid_fp);
335     oids[shard].push_back(oid);
336
337 #define COUNT_BEFORE_FLUSH 1000
338     ++total;
339     if (++count >= COUNT_BEFORE_FLUSH) {
340       ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
341       ret = log_oids(all_objs_index, oids);
342       if (ret < 0) {
343         cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
344         return ret;
345       }
346       count = 0;
347       oids.clear();
348     }
349   }
350   ret = log_oids(all_objs_index, oids);
351   if (ret < 0) {
352     cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
353     return ret;
354   }
355   
356   return 0;
357 }
358
359 int RGWOrphanSearch::build_buckets_instance_index()
360 {
361   void *handle;
362   int max = 1000;
363   string section = "bucket.instance";
364   int ret = store->meta_mgr->list_keys_init(section, &handle);
365   if (ret < 0) {
366     lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
367     return -ret;
368   }
369
370   map<int, list<string> > instances;
371
372   bool truncated;
373
374   RGWObjectCtx obj_ctx(store);
375
376   int count = 0;
377   uint64_t total = 0;
378
379   do {
380     list<string> keys;
381     ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
382     if (ret < 0) {
383       lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
384       return -ret;
385     }
386
387     for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
388       ++total;
389       ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
390       int shard = orphan_shard(*iter);
391       instances[shard].push_back(*iter);
392
393       if (++count >= COUNT_BEFORE_FLUSH) {
394         ret = log_oids(buckets_instance_index, instances);
395         if (ret < 0) {
396           lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
397           return ret;
398         }
399         count = 0;
400         instances.clear();
401       }
402     }
403
404   } while (truncated);
405
406   ret = log_oids(buckets_instance_index, instances);
407   if (ret < 0) {
408     lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
409     return ret;
410   }
411   store->meta_mgr->list_keys_complete(handle);
412
413   return 0;
414 }
415
416 int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
417 {
418   set<string> obj_oids;
419   rgw_bucket& bucket = result.obj.bucket;
420   if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */
421     const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
422     obj_oids.insert(obj_fingerprint(loc));
423
424     /*
425      * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
426      * meta object, just add a "shadow" object to the mix
427      */
428     obj_oids.insert(obj_fingerprint(loc, "shadow"));
429   } else {
430     RGWObjManifest& manifest = result.manifest;
431
432     RGWObjManifest::obj_iterator miter;
433     for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
434       const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
435       string s = loc.oid;
436       obj_oids.insert(obj_fingerprint(s));
437     }
438   }
439
440   for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
441     ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
442
443     int shard = orphan_shard(*iter);
444     oids[shard].push_back(*iter);
445   }
446
447   return 0;
448 }
449
450 int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
451 {
452   RGWRados::Object::Stat& front_op = ops.front();
453
454   int ret = front_op.wait();
455   if (ret < 0) {
456     if (ret != -ENOENT) {
457       lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
458     }
459     goto done;
460   }
461   ret = handle_stat_result(oids, front_op.result);
462   if (ret < 0) {
463     lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
464   }
465 done:
466   ops.pop_front();
467   return ret;
468 }
469
470 int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
471 {
472   ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
473   RGWBucketInfo bucket_info;
474   RGWObjectCtx obj_ctx(store);
475   int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
476   if (ret < 0) {
477     if (ret == -ENOENT) {
478       /* probably raced with bucket removal */
479       return 0;
480     }
481     lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
482     return ret;
483   }
484
485   RGWRados::Bucket target(store, bucket_info);
486   RGWRados::Bucket::List list_op(&target);
487
488   string marker;
489   list_op.params.marker = rgw_obj_key(marker);
490   list_op.params.list_versions = true;
491   list_op.params.enforce_ns = false;
492
493   bool truncated;
494
495   deque<RGWRados::Object::Stat> stat_ops;
496
497   int count = 0;
498
499   do {
500     vector<rgw_bucket_dir_entry> result;
501
502 #define MAX_LIST_OBJS_ENTRIES 100
503     ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
504     if (ret < 0) {
505       cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
506       return -ret;
507     }
508
509     for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
510       rgw_bucket_dir_entry& entry = *iter;
511       if (entry.key.instance.empty()) {
512         ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
513       } else {
514         ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
515       }
516
517       ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
518       rgw_obj obj(bucket_info.bucket, entry.key);
519
520       RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
521
522       stat_ops.push_back(RGWRados::Object::Stat(&op_target));
523       RGWRados::Object::Stat& op = stat_ops.back();
524
525
526       ret = op.stat_async();
527       if (ret < 0) {
528         lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
529         return ret;
530       }
531       if (stat_ops.size() >= max_concurrent_ios) {
532         ret = pop_and_handle_stat_op(oids, stat_ops);
533         if (ret < 0) {
534           if (ret != -ENOENT) {
535             lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
536           }
537         }
538       }
539       if (++count >= COUNT_BEFORE_FLUSH) {
540         ret = log_oids(linked_objs_index, oids);
541         if (ret < 0) {
542           cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
543           return ret;
544         }
545         count = 0;
546         oids.clear();
547       }
548     }
549   } while (truncated);
550
551   while (!stat_ops.empty()) {
552     ret = pop_and_handle_stat_op(oids, stat_ops);
553     if (ret < 0) {
554       if (ret != -ENOENT) {
555         lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
556       }
557     }
558   }
559
560   return 0;
561 }
562
563 int RGWOrphanSearch::build_linked_oids_index()
564 {
565   map<int, list<string> > oids;
566   map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
567   for (; iter != buckets_instance_index.end(); ++iter) {
568     ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
569     bool truncated;
570
571     string oid = iter->second;
572
573     do {
574       map<string, bufferlist> entries;
575       int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
576       if (ret == -ENOENT) {
577         truncated = false;
578         ret = 0;
579       }
580
581       if (ret < 0) {
582         lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
583         return ret;
584       }
585
586       if (entries.empty()) {
587         break;
588       }
589
590       for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
591         ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
592         ret = build_linked_oids_for_bucket(eiter->first, oids);
593         if (ret < 0) {
594           lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
595                               << " returned ret=" << ret << dendl;
596           return ret;
597         }
598       }
599
600       search_stage.shard = iter->first;
601       search_stage.marker = entries.rbegin()->first; /* last entry */
602     } while (truncated);
603
604     search_stage.marker.clear();
605   }
606
607   int ret = log_oids(linked_objs_index, oids);
608   if (ret < 0) {
609     cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
610     return ret;
611   }
612
613   ret = save_state();
614   if (ret < 0) {
615     cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
616     return ret;
617   }
618
619   return 0;
620 }
621
622 class OMAPReader {
623   librados::IoCtx ioctx;
624   string oid;
625
626   map<string, bufferlist> entries;
627   map<string, bufferlist>::iterator iter;
628   string marker;
629   bool truncated;
630
631 public:
632   OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
633     iter = entries.end();
634   }
635
636   int get_next(string *key, bufferlist *pbl, bool *done);
637 };
638
639 int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
640 {
641   if (iter != entries.end()) {
642     *key = iter->first;
643     if (pbl) {
644       *pbl = iter->second;
645     }
646     ++iter;
647     *done = false;
648     marker = *key;
649     return 0;
650   }
651
652   if (!truncated) {
653     *done = true;
654     return 0;
655   }
656
657 #define MAX_OMAP_GET_ENTRIES 100
658   int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
659   if (ret < 0) {
660     if (ret == -ENOENT) {
661       *done = true;
662       return 0;
663     }
664     return ret;
665   }
666
667   truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
668   iter = entries.begin();
669   return get_next(key, pbl, done);
670 }
671
672 int RGWOrphanSearch::compare_oid_indexes()
673 {
674   assert(linked_objs_index.size() == all_objs_index.size());
675
676   librados::IoCtx& ioctx = orphan_store.get_ioctx();
677
678   librados::IoCtx data_ioctx;
679
680   int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx);
681   if (ret < 0) {
682     lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
683     return ret;
684   }
685
686   uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
687
688   map<int, string>::iterator liter = linked_objs_index.begin();
689   map<int, string>::iterator aiter = all_objs_index.begin();
690
691   for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
692     OMAPReader linked_entries(ioctx, liter->second);
693     OMAPReader all_entries(ioctx, aiter->second);
694
695     bool done;
696
697     string cur_linked;
698     bool linked_done = false;
699
700
701     do {
702       string key;
703       int r = all_entries.get_next(&key, NULL, &done);
704       if (r < 0) {
705         return r;
706       }
707       if (done) {
708         break;
709       }
710
711       string key_fp = obj_fingerprint(key);
712
713       while (cur_linked < key_fp && !linked_done) {
714         r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
715         if (r < 0) {
716           return r;
717         }
718       }
719
720       if (cur_linked == key_fp) {
721         ldout(store->ctx(), 20) << "linked: " << key << dendl;
722         continue;
723       }
724
725       time_t mtime;
726       r = data_ioctx.stat(key, NULL, &mtime);
727       if (r < 0) {
728         if (r != -ENOENT) {
729           lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
730         }
731         continue;
732       }
733       if (stale_secs && (uint64_t)mtime >= time_threshold) {
734         ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
735         continue;
736       }
737       ldout(store->ctx(), 20) << "leaked: " << key << dendl;
738       cout << "leaked: " << key << std::endl;
739     } while (!done);
740   }
741
742   return 0;
743 }
744
745 int RGWOrphanSearch::run()
746 {
747   int r;
748
749   switch (search_stage.stage) {
750     
751     case ORPHAN_SEARCH_STAGE_INIT:
752       ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
753       search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
754       r = save_state();
755       if (r < 0) {
756         lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
757         return r;
758       }
759       // fall through
760     case ORPHAN_SEARCH_STAGE_LSPOOL:
761       ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
762       r = build_all_oids_index();
763       if (r < 0) {
764         lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
765         return r;
766       }
767
768       search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
769       r = save_state();
770       if (r < 0) {
771         lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
772         return r;
773       }
774       // fall through
775
776     case ORPHAN_SEARCH_STAGE_LSBUCKETS:
777       ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
778       r = build_buckets_instance_index();
779       if (r < 0) {
780         lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
781         return r;
782       }
783
784       search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
785       r = save_state();
786       if (r < 0) {
787         lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
788         return r;
789       }
790       // fall through
791
792
793     case ORPHAN_SEARCH_STAGE_ITERATE_BI:
794       ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
795       r = build_linked_oids_index();
796       if (r < 0) {
797         lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
798         return r;
799       }
800
801       search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
802       r = save_state();
803       if (r < 0) {
804         lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
805         return r;
806       }
807       // fall through
808
809     case ORPHAN_SEARCH_STAGE_COMPARE:
810       r = compare_oid_indexes();
811       if (r < 0) {
812         lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
813         return r;
814       }
815
816       break;
817
818     default:
819       ceph_abort();
820   };
821
822   return 0;
823 }
824
825
826 int RGWOrphanSearch::remove_index(map<int, string>& index)
827 {
828   librados::IoCtx& ioctx = orphan_store.get_ioctx();
829
830   for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
831     int r = ioctx.remove(iter->second);
832     if (r < 0) {
833       if (r != -ENOENT) {
834         ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
835       }
836     }
837   }
838   return 0;
839 }
840
841 int RGWOrphanSearch::finish()
842 {
843   int r = remove_index(all_objs_index);
844   if (r < 0) {
845     ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
846   }
847   r = remove_index(buckets_instance_index);
848   if (r < 0) {
849     ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
850   }
851   r = remove_index(linked_objs_index);
852   if (r < 0) {
853     ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
854   }
855
856   r = orphan_store.remove_job(search_info.job_name);
857   if (r < 0) {
858     ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
859   }
860
861   return r;
862 }