Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_lc.cc
1 #include <string.h>
2 #include <iostream>
3 #include <map>
4
5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
7
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
15 #include "rgw_lc.h"
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
19
20 const char* LC_STATUS[] = {
21       "UNINITIAL",
22       "PROCESSING",
23       "FAILED",
24       "COMPLETE"
25 };
26
27 using namespace std;
28 using namespace librados;
29
30 bool LCRule::valid()
31 {
32   if (id.length() > MAX_ID_LEN) {
33     return false;
34   }
35   else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
36     return false;
37   }
38   else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
39     return false;
40   }
41   return true;
42 }
43
44 void RGWLifecycleConfiguration::add_rule(LCRule *rule)
45 {
46   string id;
47   rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48   rule_map.insert(pair<string, LCRule>(id, *rule));
49 }
50
51 bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
52 {
53   lc_op op;
54   if (rule->get_status().compare("Enabled") == 0) {
55     op.status = true;
56   }
57   if (rule->get_expiration().has_days()) {
58     op.expiration = rule->get_expiration().get_days();
59   }
60   if (rule->get_expiration().has_date()) {
61     op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
62   }
63   if (rule->get_noncur_expiration().has_days()) {
64     op.noncur_expiration = rule->get_noncur_expiration().get_days();
65   }
66   if (rule->get_mp_expiration().has_days()) {
67     op.mp_expiration = rule->get_mp_expiration().get_days();
68   }
69   op.dm_expiration = rule->get_dm_expiration();
70
71   std::string prefix;
72   if (rule->get_filter().has_prefix()){
73     prefix = rule->get_filter().get_prefix();
74   } else {
75     prefix = rule->get_prefix();
76   }
77   auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
78   return ret.second;
79 }
80
81 int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
82 {
83   if (!rule->valid()) {
84     return -EINVAL;
85   }
86   string id;
87   rule->get_id(id);
88   if (rule_map.find(id) != rule_map.end()) {  //id shouldn't be the same 
89     return -EINVAL;
90   }
91   rule_map.insert(pair<string, LCRule>(id, *rule));
92
93   if (!_add_rule(rule)) {
94     return -ERR_INVALID_REQUEST;
95   }
96   return 0;
97 }
98
99 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
100   if ((first.expiration > 0 || first.expiration_date != boost::none) && 
101     (second.expiration > 0 || second.expiration_date != boost::none)) {
102     return true;
103   } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
104     return true;
105   } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
106     return true;
107   } else {
108     return false;
109   }
110 }
111
112 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113 //define same action. 
114 bool RGWLifecycleConfiguration::valid() 
115 {
116   if (prefix_map.size() < 2) {
117     return true;
118   }
119   auto cur_iter = prefix_map.begin();
120   while (cur_iter != prefix_map.end()) {
121     auto next_iter = cur_iter;
122     ++next_iter;
123     while (next_iter != prefix_map.end()) {
124       string c_pre = cur_iter->first;
125       string n_pre = next_iter->first;
126       if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
127         if (has_same_action(cur_iter->second, next_iter->second)) {
128           return false;
129         } else {
130           ++next_iter;
131         }
132       } else {
133         break;
134       }
135     }
136     ++cur_iter;
137   }
138   return true;
139 }
140
141 void *RGWLC::LCWorker::entry() {
142   do {
143     utime_t start = ceph_clock_now();
144     if (should_work(start)) {
145       dout(5) << "life cycle: start" << dendl;
146       int r = lc->process();
147       if (r < 0) {
148         dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
149       }
150       dout(5) << "life cycle: stop" << dendl;
151     }
152     if (lc->going_down())
153       break;
154
155     utime_t end = ceph_clock_now();
156     int secs = schedule_next_start_time(start, end);
157     utime_t next;
158     next.set_from_double(end + secs);
159
160     dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
161
162     lock.Lock();
163     cond.WaitInterval(lock, utime_t(secs, 0));
164     lock.Unlock();
165   } while (!lc->going_down());
166
167   return NULL;
168 }
169
170 void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
171   cct = _cct;
172   store = _store;
173   max_objs = cct->_conf->rgw_lc_max_objs;
174   if (max_objs > HASH_PRIME)
175     max_objs = HASH_PRIME;
176
177   obj_names = new string[max_objs];
178
179   for (int i = 0; i < max_objs; i++) {
180     obj_names[i] = lc_oid_prefix;
181     char buf[32];
182     snprintf(buf, 32, ".%d", i);
183     obj_names[i].append(buf);
184   }
185
186 #define COOKIE_LEN 16
187   char cookie_buf[COOKIE_LEN + 1];
188   gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
189   cookie = cookie_buf;
190 }
191
192 void RGWLC::finalize()
193 {
194   delete[] obj_names;
195 }
196
197 bool RGWLC::if_already_run_today(time_t& start_date)
198 {
199   struct tm bdt;
200   time_t begin_of_day;
201   utime_t now = ceph_clock_now();
202   localtime_r(&start_date, &bdt);
203
204   if (cct->_conf->rgw_lc_debug_interval > 0) {
205     if (now - start_date < cct->_conf->rgw_lc_debug_interval)
206       return true;
207     else
208       return false;
209   }
210
211   bdt.tm_hour = 0;
212   bdt.tm_min = 0;
213   bdt.tm_sec = 0;
214   begin_of_day = mktime(&bdt);
215   if (now - begin_of_day < 24*60*60)
216     return true;
217   else
218     return false;
219 }
220
221 int RGWLC::bucket_lc_prepare(int index)
222 {
223   map<string, int > entries;
224
225   string marker;
226
227 #define MAX_LC_LIST_ENTRIES 100
228   do {
229     int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
230     if (ret < 0)
231       return ret;
232     map<string, int>::iterator iter;
233     for (iter = entries.begin(); iter != entries.end(); ++iter) {
234       pair<string, int > entry(iter->first, lc_uninitial);
235       ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);
236       if (ret < 0) {
237         dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
238         break;
239       }
240       marker = iter->first;
241     }
242   } while (!entries.empty());
243
244   return 0;
245 }
246
247 bool RGWLC::obj_has_expired(double timediff, int days)
248 {
249   double cmp;
250   if (cct->_conf->rgw_lc_debug_interval <= 0) {
251     /* Normal case, run properly */
252     cmp = days*24*60*60;
253   } else {
254     /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
255     cmp = days*cct->_conf->rgw_lc_debug_interval;
256   }
257
258   return (timediff >= cmp);
259 }
260
261 int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
262 {
263   if (remove_indeed) {
264     return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
265   } else {
266     obj_key.instance.clear();
267     RGWObjectCtx rctx(store);
268     rgw_obj obj(bucket_info.bucket, obj_key);
269     return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
270   }
271 }
272
273 int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
274 {
275   MultipartMetaFilter mp_filter;
276   vector<rgw_bucket_dir_entry> objs;
277   RGWMPObj mp_obj;
278   bool is_truncated;
279   int ret;
280   RGWBucketInfo& bucket_info = target->get_bucket_info();
281   RGWRados::Bucket::List list_op(target);
282   list_op.params.list_versions = false;
283   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
284   list_op.params.filter = &mp_filter;
285   for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
286     if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
287       continue;
288     }
289     list_op.params.prefix = prefix_iter->first;
290     do {
291       objs.clear();
292       list_op.params.marker = list_op.get_next_marker();
293       ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
294       if (ret < 0) {
295           if (ret == (-ENOENT))
296             return 0;
297           ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
298           return ret;
299       }
300
301       utime_t now = ceph_clock_now();
302       for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
303         if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
304           rgw_obj_key key(obj_iter->key);
305           if (!mp_obj.from_meta(key.name)) {
306             continue;
307           }
308           RGWObjectCtx rctx(store);
309           ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
310           if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
311             ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
312             return ret;
313           }
314         }
315       }
316     } while(is_truncated);
317   }
318   return 0;
319 }
320
321 int RGWLC::bucket_lc_process(string& shard_id)
322 {
323   RGWLifecycleConfiguration  config(cct);
324   RGWBucketInfo bucket_info;
325   map<string, bufferlist> bucket_attrs;
326   string next_marker, no_ns, list_versions;
327   bool is_truncated;
328   vector<rgw_bucket_dir_entry> objs;
329   RGWObjectCtx obj_ctx(store);
330   vector<std::string> result;
331   boost::split(result, shard_id, boost::is_any_of(":"));
332   string bucket_tenant = result[0];
333   string bucket_name = result[1];
334   string bucket_id = result[2];
335   int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
336   if (ret < 0) {
337     ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
338     return ret;
339   }
340
341   ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
342   if (ret !=0) {
343     ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
344     return -ENOENT;
345   }
346
347   RGWRados::Bucket target(store, bucket_info);
348   RGWRados::Bucket::List list_op(&target);
349
350   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
351   if (aiter == bucket_attrs.end())
352     return 0;
353
354   bufferlist::iterator iter(&aiter->second);
355   try {
356       config.decode(iter);
357     } catch (const buffer::error& e) {
358       ldout(cct, 0) << __func__ <<  "decode life cycle config failed" << dendl;
359       return -1;
360     }
361
362   map<string, lc_op>& prefix_map = config.get_prefix_map();
363   list_op.params.list_versions = bucket_info.versioned();
364   if (!bucket_info.versioned()) {
365     for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
366       if (!prefix_iter->second.status || 
367         (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
368         continue;
369       }
370       if (prefix_iter->second.expiration_date != boost::none && 
371         ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
372         continue;
373       }
374       list_op.params.prefix = prefix_iter->first;
375       do {
376         objs.clear();
377         list_op.params.marker = list_op.get_next_marker();
378         ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
379
380         if (ret < 0) {
381           if (ret == (-ENOENT))
382             return 0;
383           ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
384           return ret;
385         }
386         
387         utime_t now = ceph_clock_now();
388         bool is_expired;
389         for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
390           rgw_obj_key key(obj_iter->key);
391
392           if (!key.ns.empty()) {
393             continue;
394           }
395           if (prefix_iter->second.expiration_date != boost::none) {
396             //we have checked it before
397             is_expired = true;
398           } else {
399             is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
400           }
401           if (is_expired) {
402             RGWObjectCtx rctx(store);
403             rgw_obj obj(bucket_info.bucket, key);
404             RGWObjState *state;
405             int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
406             if (ret < 0) {
407               return ret;
408             }
409             if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
410               continue;
411             ret = remove_expired_obj(bucket_info, obj_iter->key, true);
412             if (ret < 0) {
413               ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
414             } else {
415               ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
416             }
417           }
418         }
419       } while (is_truncated);
420     }
421   } else {
422   //bucket versioning is enabled or suspended
423     rgw_obj_key pre_marker;
424     for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
425       if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0 
426         && prefix_iter->second.expiration_date == boost::none
427         && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
428         continue;
429       }
430       if (prefix_iter != prefix_map.begin() && 
431         (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
432         list_op.next_marker = pre_marker;
433       } else {
434         pre_marker = list_op.get_next_marker();
435       }
436       list_op.params.prefix = prefix_iter->first;
437       rgw_bucket_dir_entry pre_obj;
438       do {
439         if (!objs.empty()) {
440           pre_obj = objs.back();
441         }
442         objs.clear();
443         list_op.params.marker = list_op.get_next_marker();
444         ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
445
446         if (ret < 0) {
447           if (ret == (-ENOENT))
448             return 0;
449           ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
450           return ret;
451         }
452
453         utime_t now = ceph_clock_now();
454         ceph::real_time mtime;
455         bool remove_indeed = true;
456         int expiration;
457         bool skip_expiration;
458         bool is_expired;
459         for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
460           skip_expiration = false;
461           is_expired = false;
462           if (obj_iter->is_current()) {
463             if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
464               && !prefix_iter->second.dm_expiration) {
465               continue;
466             }
467             if (obj_iter->is_delete_marker()) {
468               if ((obj_iter + 1)==objs.end()) {
469                 if (is_truncated) {
470                   //deal with it in next round because we can't judge whether this marker is the only version
471                   list_op.next_marker = obj_iter->key;
472                   break;
473                 }
474               } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) {   //*obj_iter is delete marker and isn't the only version, do nothing.
475                 continue;
476               }
477               skip_expiration = prefix_iter->second.dm_expiration;
478               remove_indeed = true;   //we should remove the delete marker if it's the only version
479             } else {
480               remove_indeed = false;
481             }
482             mtime = obj_iter->meta.mtime;
483             expiration = prefix_iter->second.expiration;
484             if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
485               continue;
486             } else if (!skip_expiration) {
487               if (expiration > 0) {
488                 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
489               } else {
490                 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
491               }
492             }
493           } else {
494             if (prefix_iter->second.noncur_expiration <=0) {
495               continue;
496             }
497             remove_indeed = true;
498             mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
499             expiration = prefix_iter->second.noncur_expiration;
500             is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
501           }
502           if (skip_expiration || is_expired) {
503             if (obj_iter->is_visible()) {
504               RGWObjectCtx rctx(store);
505               rgw_obj obj(bucket_info.bucket, obj_iter->key);
506               RGWObjState *state;
507               int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
508               if (ret < 0) {
509                 return ret;
510               }
511               if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
512                 continue;
513             }
514             ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
515             if (ret < 0) {
516               ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
517             } else {
518               ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
519             }
520           }
521         }
522       } while (is_truncated);
523     }
524   }
525
526   ret = handle_multipart_expiration(&target, prefix_map);
527
528   return ret;
529 }
530
531 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
532 {
533   utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
534
535   rados::cls::lock::Lock l(lc_index_lock_name);
536   l.set_cookie(cookie);
537   l.set_duration(lock_duration);
538
539   do {
540     int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
541     if (ret == -EBUSY) { /* already locked by another lc processor */
542       dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
543       sleep(5);
544       continue;
545     }
546     if (ret < 0)
547       return 0;
548     dout(20) << "RGWLC::bucket_lc_post()  get lock" << obj_names[index] << dendl;
549     if (result ==  -ENOENT) {
550       ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index],  entry);
551       if (ret < 0) {
552         dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
553       }
554       goto clean;
555     } else if (result < 0) {
556       entry.second = lc_failed;
557     } else {
558       entry.second = lc_complete;
559     }
560
561     ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);
562     if (ret < 0) {
563       dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
564     }
565 clean:
566     l.unlock(&store->lc_pool_ctx, obj_names[index]);
567     dout(20) << "RGWLC::bucket_lc_post()  unlock" << obj_names[index] << dendl;
568     return 0;
569   } while (true);
570 }
571
572 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
573 {
574   int index = 0;
575   progress_map->clear();
576   for(; index <max_objs; index++) {
577     map<string, int > entries;
578     int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
579     if (ret < 0) {
580       if (ret == -ENOENT) {
581         dout(10) << __func__ << " ignoring unfound lc object="
582                              << obj_names[index] << dendl;
583         continue;
584       } else {
585         return ret;
586       }
587     }
588     map<string, int>::iterator iter;
589     for (iter = entries.begin(); iter != entries.end(); ++iter) {
590       progress_map->insert(*iter);
591     }
592   }
593   return 0;
594 }
595
596 int RGWLC::process()
597 {
598   int max_secs = cct->_conf->rgw_lc_lock_max_time;
599
600   unsigned start;
601   int ret = get_random_bytes((char *)&start, sizeof(start));
602   if (ret < 0)
603     return ret;
604
605   for (int i = 0; i < max_objs; i++) {
606     int index = (i + start) % max_objs;
607     ret = process(index, max_secs);
608     if (ret < 0)
609       return ret;
610   }
611
612   return 0;
613 }
614
615 int RGWLC::process(int index, int max_lock_secs)
616 {
617   rados::cls::lock::Lock l(lc_index_lock_name);
618   do {
619     utime_t now = ceph_clock_now();
620     pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
621     if (max_lock_secs <= 0)
622       return -EAGAIN;
623
624     utime_t time(max_lock_secs, 0);
625     l.set_duration(time);
626
627     int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
628     if (ret == -EBUSY) { /* already locked by another lc processor */
629       dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
630       sleep(5);
631       continue;
632     }
633     if (ret < 0)
634       return 0;
635
636     string marker;
637     cls_rgw_lc_obj_head head;
638     ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
639     if (ret < 0) {
640       dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
641       goto exit;
642     }
643
644     if(!if_already_run_today(head.start_date)) {
645       head.start_date = now;
646       head.marker.clear();
647       ret = bucket_lc_prepare(index);
648       if (ret < 0) {
649       dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
650       goto exit;
651       }
652     }
653
654     ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
655     if (ret < 0) {
656       dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
657       goto exit;
658     }
659
660     if (entry.first.empty())
661       goto exit;
662
663     entry.second = lc_processing;
664     ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);
665     if (ret < 0) {
666       dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
667       goto exit;
668     }
669
670     head.marker = entry.first;
671     ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index],  head);
672     if (ret < 0) {
673       dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
674       goto exit;
675     }
676     l.unlock(&store->lc_pool_ctx, obj_names[index]);
677     ret = bucket_lc_process(entry.first);
678     bucket_lc_post(index, max_lock_secs, entry, ret);
679   }while(1);
680
681 exit:
682     l.unlock(&store->lc_pool_ctx, obj_names[index]);
683     return 0;
684 }
685
686 void RGWLC::start_processor()
687 {
688   worker = new LCWorker(cct, this);
689   worker->create("lifecycle_thr");
690 }
691
692 void RGWLC::stop_processor()
693 {
694   down_flag = true;
695   if (worker) {
696     worker->stop();
697     worker->join();
698   }
699   delete worker;
700   worker = NULL;
701 }
702
703 void RGWLC::LCWorker::stop()
704 {
705   Mutex::Locker l(lock);
706   cond.Signal();
707 }
708
709 bool RGWLC::going_down()
710 {
711   return down_flag;
712 }
713
714 bool RGWLC::LCWorker::should_work(utime_t& now)
715 {
716   int start_hour;
717   int start_minute;
718   int end_hour;
719   int end_minute;
720   string worktime = cct->_conf->rgw_lifecycle_work_time;
721   sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
722   struct tm bdt;
723   time_t tt = now.sec();
724   localtime_r(&tt, &bdt);
725
726   if (cct->_conf->rgw_lc_debug_interval > 0) {
727           /* We're debugging, so say we can run */
728           return true;
729   } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
730                      (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
731           return true;
732   } else {
733           return false;
734   }
735
736 }
737
738 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
739 {
740   if (cct->_conf->rgw_lc_debug_interval > 0) {
741         int secs = start + cct->_conf->rgw_lc_debug_interval - now;
742         if (secs < 0)
743           secs = 0;
744         return (secs);
745   }
746
747   int start_hour;
748   int start_minute;
749   int end_hour;
750   int end_minute;
751   string worktime = cct->_conf->rgw_lifecycle_work_time;
752   sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
753   struct tm bdt;
754   time_t tt = now.sec();
755   time_t nt;
756   localtime_r(&tt, &bdt);
757   bdt.tm_hour = start_hour;
758   bdt.tm_min = start_minute;
759   bdt.tm_sec = 0;
760   nt = mktime(&bdt);
761
762   return (nt+24*60*60 - tt);
763 }
764