Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_quota.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank, Inc
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #include "include/utime.h"
17 #include "common/lru_map.h"
18 #include "common/RefCountedObj.h"
19 #include "common/Thread.h"
20 #include "common/Mutex.h"
21 #include "common/RWLock.h"
22
23 #include "rgw_common.h"
24 #include "rgw_rados.h"
25 #include "rgw_quota.h"
26 #include "rgw_bucket.h"
27 #include "rgw_user.h"
28
29 #include <atomic>
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rgw
33
34
35 struct RGWQuotaCacheStats {
36   RGWStorageStats stats;
37   utime_t expiration;
38   utime_t async_refresh_time;
39 };
40
41 template<class T>
42 class RGWQuotaCache {
43 protected:
44   RGWRados *store;
45   lru_map<T, RGWQuotaCacheStats> stats_map;
46   RefCountedWaitObject *async_refcount;
47
48   class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
49     int objs_delta;
50     uint64_t added_bytes;
51     uint64_t removed_bytes;
52   public:
53     StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54     bool update(RGWQuotaCacheStats *entry) override {
55       if (entry->async_refresh_time.sec() == 0)
56         return false;
57
58       entry->async_refresh_time = utime_t(0, 0);
59
60       return true;
61     }
62   };
63
64   virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
65
66   virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
67
68   virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
69   virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
70
71   virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
72 public:
73   RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
74     async_refcount = new RefCountedWaitObject;
75   }
76   virtual ~RGWQuotaCache() {
77     async_refcount->put_wait(); /* wait for all pending async requests to complete */
78   }
79
80   int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
81   void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
82
83   virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
84
85   void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
86   int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
87   void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
88   void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
89
90   class AsyncRefreshHandler {
91   protected:
92     RGWRados *store;
93     RGWQuotaCache<T> *cache;
94   public:
95     AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
96     virtual ~AsyncRefreshHandler() {}
97
98     virtual int init_fetch() = 0;
99     virtual void drop_reference() = 0;
100   };
101
102   virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
103 };
104
105 template<class T>
106 bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
107 {
108   if (quota.max_size >= 0) {
109     if (quota.max_size_soft_threshold < 0) {
110       quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
111     }
112
113     if (cached_stats.size_rounded  >= (uint64_t)quota.max_size_soft_threshold) {
114       ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
115         << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl;
116       return false;
117     }
118   }
119
120   if (quota.max_objects >= 0) {
121     if (quota.max_objs_soft_threshold < 0) {
122       quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
123     }
124
125     if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
126       ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
127         << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
128       return false;
129     }
130   }
131
132   return true;
133 }
134
135 template<class T>
136 int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
137 {
138   /* protect against multiple updates */
139   StatsAsyncTestSet test_update;
140   if (!map_find_and_update(user, bucket, &test_update)) {
141     /* most likely we just raced with another update */
142     return 0;
143   }
144
145   async_refcount->get();
146
147
148   AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
149
150   int ret = handler->init_fetch();
151   if (ret < 0) {
152     async_refcount->put();
153     handler->drop_reference();
154     return ret;
155   }
156
157   return 0;
158 }
159
160 template<class T>
161 void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
162 {
163   ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
164
165   async_refcount->put();
166 }
167
168 template<class T>
169 void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
170 {
171   ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
172
173   RGWQuotaCacheStats qs;
174
175   map_find(user, bucket, qs);
176
177   set_stats(user, bucket, qs, stats);
178
179   async_refcount->put();
180 }
181
182 template<class T>
183 void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
184 {
185   qs.stats = stats;
186   qs.expiration = ceph_clock_now();
187   qs.async_refresh_time = qs.expiration;
188   qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
189   qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
190
191   map_add(user, bucket, qs);
192 }
193
194 template<class T>
195 int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
196   RGWQuotaCacheStats qs;
197   utime_t now = ceph_clock_now();
198   if (map_find(user, bucket, qs)) {
199     if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
200       int r = async_refresh(user, bucket, qs);
201       if (r < 0) {
202         ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
203
204         /* continue processing, might be a transient error, async refresh is just optimization */
205       }
206     }
207
208     if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
209         ceph_clock_now()) {
210       stats = qs.stats;
211       return 0;
212     }
213   }
214
215   int ret = fetch_stats_from_storage(user, bucket, stats);
216   if (ret < 0 && ret != -ENOENT)
217     return ret;
218
219   set_stats(user, bucket, qs, stats);
220
221   return 0;
222 }
223
224
225 template<class T>
226 class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
227   const int objs_delta;
228   const uint64_t added_bytes;
229   const uint64_t removed_bytes;
230 public:
231   RGWQuotaStatsUpdate(const int objs_delta,
232                       const uint64_t added_bytes,
233                       const uint64_t removed_bytes)
234     : objs_delta(objs_delta),
235       added_bytes(added_bytes),
236       removed_bytes(removed_bytes) {
237   }
238
239   bool update(RGWQuotaCacheStats * const entry) override {
240     const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
241     const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
242
243     if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
244       entry->stats.size += added_bytes - removed_bytes;
245     } else {
246       entry->stats.size = 0;
247     }
248
249     if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
250       entry->stats.size_rounded += rounded_added - rounded_removed;
251     } else {
252       entry->stats.size_rounded = 0;
253     }
254
255     if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
256       entry->stats.num_objects += objs_delta;
257     } else {
258       entry->stats.num_objects = 0;
259     }
260
261     return true;
262   }
263 };
264
265
266 template<class T>
267 void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
268                                  uint64_t added_bytes, uint64_t removed_bytes)
269 {
270   RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
271   map_find_and_update(user, bucket, &update);
272
273   data_modified(user, bucket);
274 }
275
276 class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
277                                   public RGWGetBucketStats_CB {
278   rgw_user user;
279 public:
280   BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
281                             const rgw_user& _user, const rgw_bucket& _bucket) :
282                                       RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
283                                       RGWGetBucketStats_CB(_bucket), user(_user) {}
284
285   void drop_reference() override { put(); }
286   void handle_response(int r) override;
287   int init_fetch() override;
288 };
289
290 int BucketAsyncRefreshHandler::init_fetch()
291 {
292   RGWBucketInfo bucket_info;
293
294   RGWObjectCtx obj_ctx(store);
295
296   int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
297   if (r < 0) {
298     ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
299     return r;
300   }
301
302   ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
303
304   r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
305   if (r < 0) {
306     ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
307
308     /* get_bucket_stats_async() dropped our reference already */
309     return r;
310   }
311
312   return 0;
313 }
314
315 void BucketAsyncRefreshHandler::handle_response(const int r)
316 {
317   if (r < 0) {
318     ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
319     cache->async_refresh_fail(user, bucket);
320     return;
321   }
322
323   RGWStorageStats bs;
324
325   for (const auto& pair : *stats) {
326     const RGWStorageStats& s = pair.second;
327
328     bs.size += s.size;
329     bs.size_rounded += s.size_rounded;
330     bs.num_objects += s.num_objects;
331   }
332
333   cache->async_refresh_response(user, bucket, bs);
334 }
335
336 class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
337 protected:
338   bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
339     return stats_map.find(bucket, qs);
340   }
341
342   bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
343     return stats_map.find_and_update(bucket, NULL, ctx);
344   }
345
346   void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
347     stats_map.add(bucket, qs);
348   }
349
350   int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
351
352 public:
353   explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
354   }
355
356   AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
357     return new BucketAsyncRefreshHandler(store, this, user, bucket);
358   }
359 };
360
361 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
362 {
363   RGWBucketInfo bucket_info;
364
365   RGWObjectCtx obj_ctx(store);
366
367   int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
368   if (r < 0) {
369     ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
370     return r;
371   }
372
373   string bucket_ver;
374   string master_ver;
375
376   map<RGWObjCategory, RGWStorageStats> bucket_stats;
377   r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
378                                   &master_ver, bucket_stats, nullptr);
379   if (r < 0) {
380     ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
381                            << bucket.name << dendl;
382     return r;
383   }
384
385   stats = RGWStorageStats();
386
387   for (const auto& pair : bucket_stats) {
388     const RGWStorageStats& s = pair.second;
389
390     stats.size += s.size;
391     stats.size_rounded += s.size_rounded;
392     stats.num_objects += s.num_objects;
393   }
394
395   return 0;
396 }
397
398 class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
399                                 public RGWGetUserStats_CB {
400   rgw_bucket bucket;
401 public:
402   UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
403                           const rgw_user& _user, const rgw_bucket& _bucket) :
404                           RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
405                           RGWGetUserStats_CB(_user),
406                           bucket(_bucket) {}
407
408   void drop_reference() override { put(); }
409   int init_fetch() override;
410   void handle_response(int r) override;
411 };
412
413 int UserAsyncRefreshHandler::init_fetch()
414 {
415   ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
416   int r = store->get_user_stats_async(user, this);
417   if (r < 0) {
418     ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
419
420     /* get_bucket_stats_async() dropped our reference already */
421     return r;
422   }
423
424   return 0;
425 }
426
427 void UserAsyncRefreshHandler::handle_response(int r)
428 {
429   if (r < 0) {
430     ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
431     cache->async_refresh_fail(user, bucket);
432     return;
433   }
434
435   cache->async_refresh_response(user, bucket, stats);
436 }
437
438 class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
439   std::atomic<bool> down_flag = { false };
440   RWLock rwlock;
441   map<rgw_bucket, rgw_user> modified_buckets;
442
443   /* thread, sync recent modified buckets info */
444   class BucketsSyncThread : public Thread {
445     CephContext *cct;
446     RGWUserStatsCache *stats;
447
448     Mutex lock;
449     Cond cond;
450   public:
451
452     BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
453
454     void *entry() override {
455       ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
456       do {
457         map<rgw_bucket, rgw_user> buckets;
458
459         stats->swap_modified_buckets(buckets);
460
461         for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
462           rgw_bucket bucket = iter->first;
463           rgw_user& user = iter->second;
464           ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
465           int r = stats->sync_bucket(user, bucket);
466           if (r < 0) {
467             ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
468           }
469         }
470
471         if (stats->going_down())
472           break;
473
474         lock.Lock();
475         cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
476         lock.Unlock();
477       } while (!stats->going_down());
478       ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
479
480       return NULL;
481     }
482
483     void stop() {
484       Mutex::Locker l(lock);
485       cond.Signal();
486     }
487   };
488
489   /*
490    * thread, full sync all users stats periodically
491    *
492    * only sync non idle users or ones that never got synced before, this is needed so that
493    * users that didn't have quota turned on before (or existed before the user objclass
494    * tracked stats) need to get their backend stats up to date.
495    */
496   class UserSyncThread : public Thread {
497     CephContext *cct;
498     RGWUserStatsCache *stats;
499
500     Mutex lock;
501     Cond cond;
502   public:
503
504     UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
505
506     void *entry() override {
507       ldout(cct, 20) << "UserSyncThread: start" << dendl;
508       do {
509         int ret = stats->sync_all_users();
510         if (ret < 0) {
511           ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
512         }
513
514         if (stats->going_down())
515           break;
516
517         lock.Lock();
518         cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
519         lock.Unlock();
520       } while (!stats->going_down());
521       ldout(cct, 20) << "UserSyncThread: done" << dendl;
522
523       return NULL;
524     }
525
526     void stop() {
527       Mutex::Locker l(lock);
528       cond.Signal();
529     }
530   };
531
532   BucketsSyncThread *buckets_sync_thread;
533   UserSyncThread *user_sync_thread;
534 protected:
535   bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
536     return stats_map.find(user, qs);
537   }
538
539   bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
540     return stats_map.find_and_update(user, NULL, ctx);
541   }
542
543   void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
544     stats_map.add(user, qs);
545   }
546
547   int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
548   int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
549   int sync_user(const rgw_user& user);
550   int sync_all_users();
551
552   void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
553
554   void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
555     rwlock.get_write();
556     modified_buckets.swap(out);
557     rwlock.unlock();
558   }
559
560   template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
561   void stop_thread(T **pthr) {
562     T *thread = *pthr;
563     if (!thread)
564       return;
565
566     thread->stop();
567     thread->join();
568     delete thread;
569     *pthr = NULL;
570   }
571
572 public:
573   RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
574                                         rwlock("RGWUserStatsCache::rwlock") {
575     if (quota_threads) {
576       buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
577       buckets_sync_thread->create("rgw_buck_st_syn");
578       user_sync_thread = new UserSyncThread(store->ctx(), this);
579       user_sync_thread->create("rgw_user_st_syn");
580     } else {
581       buckets_sync_thread = NULL;
582       user_sync_thread = NULL;
583     }
584   }
585   ~RGWUserStatsCache() override {
586     stop();
587   }
588
589   AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
590     return new UserAsyncRefreshHandler(store, this, user, bucket);
591   }
592
593   bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
594     /* in the user case, the cached stats may contain a better estimation of the totals, as
595      * the backend is only periodically getting updated.
596      */
597     return true;
598   }
599
600   bool going_down() {
601     return down_flag;
602   }
603
604   void stop() {
605     down_flag = true;
606     rwlock.get_write();
607     stop_thread(&buckets_sync_thread);
608     rwlock.unlock();
609     stop_thread(&user_sync_thread);
610   }
611 };
612
613 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
614 {
615   int r = store->get_user_stats(user, stats);
616   if (r < 0) {
617     ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
618     return r;
619   }
620
621   return 0;
622 }
623
624 int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
625 {
626   RGWBucketInfo bucket_info;
627
628   RGWObjectCtx obj_ctx(store);
629
630   int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
631   if (r < 0) {
632     ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
633     return r;
634   }
635
636   r = rgw_bucket_sync_user_stats(store, user, bucket_info);
637   if (r < 0) {
638     ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
639     return r;
640   }
641
642   return 0;
643 }
644
645 int RGWUserStatsCache::sync_user(const rgw_user& user)
646 {
647   cls_user_header header;
648   string user_str = user.to_str();
649   int ret = store->cls_user_get_header(user_str, &header);
650   if (ret < 0) {
651     ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
652     return ret;
653   }
654
655   if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
656       header.last_stats_update < header.last_stats_sync) {
657     ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
658     return 0;
659   }
660
661   real_time when_need_full_sync = header.last_stats_sync;
662   when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
663   
664   // check if enough time passed since last full sync
665   /* FIXME: missing check? */
666
667   ret = rgw_user_sync_all_stats(store, user);
668   if (ret < 0) {
669     ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
670     return ret;
671   }
672
673   return 0;
674 }
675
676 int RGWUserStatsCache::sync_all_users()
677 {
678   string key = "user";
679   void *handle;
680
681   int ret = store->meta_mgr->list_keys_init(key, &handle);
682   if (ret < 0) {
683     ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
684     return ret;
685   }
686
687   bool truncated;
688   int max = 1000;
689
690   do {
691     list<string> keys;
692     ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
693     if (ret < 0) {
694       ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
695       goto done;
696     }
697     for (list<string>::iterator iter = keys.begin();
698          iter != keys.end() && !going_down(); 
699          ++iter) {
700       rgw_user user(*iter);
701       ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
702       int ret = sync_user(user);
703       if (ret < 0) {
704         ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
705
706         /* continuing to next user */
707         continue;
708       }
709     }
710   } while (truncated);
711
712   ret = 0;
713 done:
714   store->meta_mgr->list_keys_complete(handle);
715   return ret;
716 }
717
718 void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
719 {
720   /* racy, but it's ok */
721   rwlock.get_read();
722   bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
723   rwlock.unlock();
724
725   if (need_update) {
726     rwlock.get_write();
727     modified_buckets[bucket] = user;
728     rwlock.unlock();
729   }
730 }
731
732
733 class RGWQuotaInfoApplier {
734   /* NOTE: no non-static field allowed as instances are supposed to live in
735    * the static memory only. */
736 protected:
737   RGWQuotaInfoApplier() = default;
738
739 public:
740   virtual ~RGWQuotaInfoApplier() {}
741
742   virtual bool is_size_exceeded(const char * const entity,
743                                 const RGWQuotaInfo& qinfo,
744                                 const RGWStorageStats& stats,
745                                 const uint64_t size) const = 0;
746
747   virtual bool is_num_objs_exceeded(const char * const entity,
748                                     const RGWQuotaInfo& qinfo,
749                                     const RGWStorageStats& stats,
750                                     const uint64_t num_objs) const = 0;
751
752   static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
753 };
754
755 class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
756 public:
757   bool is_size_exceeded(const char * const entity,
758                                 const RGWQuotaInfo& qinfo,
759                                 const RGWStorageStats& stats,
760                                 const uint64_t size) const override;
761
762   bool is_num_objs_exceeded(const char * const entity,
763                                     const RGWQuotaInfo& qinfo,
764                                     const RGWStorageStats& stats,
765                                     const uint64_t num_objs) const override;
766 };
767
768 class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
769 public:
770   bool is_size_exceeded(const char * const entity,
771                                 const RGWQuotaInfo& qinfo,
772                                 const RGWStorageStats& stats,
773                                 const uint64_t size) const override;
774
775   bool is_num_objs_exceeded(const char * const entity,
776                                     const RGWQuotaInfo& qinfo,
777                                     const RGWStorageStats& stats,
778                                     const uint64_t num_objs) const override;
779 };
780
781
782 bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
783                                               const RGWQuotaInfo& qinfo,
784                                               const RGWStorageStats& stats,
785                                               const uint64_t size) const
786 {
787   if (qinfo.max_size < 0) {
788     /* The limit is not enabled. */
789     return false;
790   }
791
792   const uint64_t cur_size = stats.size_rounded;
793   const uint64_t new_size = rgw_rounded_objsize(size);
794
795   if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
796     dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
797              << " size=" << new_size << " "
798              << entity << "_quota.max_size=" << qinfo.max_size << dendl;
799     return true;
800   }
801
802   return false;
803 }
804
805 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
806                                                   const RGWQuotaInfo& qinfo,
807                                                   const RGWStorageStats& stats,
808                                                   const uint64_t num_objs) const
809 {
810   if (qinfo.max_objects < 0) {
811     /* The limit is not enabled. */
812     return false;
813   }
814
815   if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
816     dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
817              << " " << entity << "_quota.max_objects=" << qinfo.max_objects
818              << dendl;
819     return true;
820   }
821
822   return false;
823 }
824
825 bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
826                                               const RGWQuotaInfo& qinfo,
827                                               const RGWStorageStats& stats,
828                                               const uint64_t size) const
829 {
830   if (qinfo.max_size < 0) {
831     /* The limit is not enabled. */
832     return false;
833   }
834
835   const uint64_t cur_size = stats.size;
836
837   if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
838     dout(10) << "quota exceeded: stats.size=" << stats.size
839              << " size=" << size << " "
840              << entity << "_quota.max_size=" << qinfo.max_size << dendl;
841     return true;
842   }
843
844   return false;
845 }
846
847 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
848                                                   const RGWQuotaInfo& qinfo,
849                                                   const RGWStorageStats& stats,
850                                                   const uint64_t num_objs) const
851 {
852   if (qinfo.max_objects < 0) {
853     /* The limit is not enabled. */
854     return false;
855   }
856
857   if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
858     dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
859              << " " << entity << "_quota.max_objects=" << qinfo.max_objects
860              << dendl;
861     return true;
862   }
863
864   return false;
865 }
866
867 const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
868   const RGWQuotaInfo& qinfo)
869 {
870   static RGWQuotaInfoDefApplier default_qapplier;
871   static RGWQuotaInfoRawApplier raw_qapplier;
872
873   if (qinfo.check_on_raw) {
874     return raw_qapplier;
875   } else {
876     return default_qapplier;
877   }
878 }
879
880
881 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
882   RGWRados *store;
883   RGWBucketStatsCache bucket_stats_cache;
884   RGWUserStatsCache user_stats_cache;
885
886   int check_quota(const char * const entity,
887                   const RGWQuotaInfo& quota,
888                   const RGWStorageStats& stats,
889                   const uint64_t num_objs,
890                   const uint64_t size) {
891     if (!quota.enabled) {
892       return 0;
893     }
894
895     const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
896
897     ldout(store->ctx(), 20) << entity
898                             << " quota: max_objects=" << quota.max_objects
899                             << " max_size=" << quota.max_size << dendl;
900
901
902     if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
903       return -ERR_QUOTA_EXCEEDED;
904     }
905
906     if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
907       return -ERR_QUOTA_EXCEEDED;
908     }
909
910     ldout(store->ctx(), 20) << entity << " quota OK:"
911                             << " stats.num_objects=" << stats.num_objects
912                             << " stats.size=" << stats.size << dendl;
913     return 0;
914   }
915 public:
916   RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store),
917                                     bucket_stats_cache(_store),
918                                     user_stats_cache(_store, quota_threads) {}
919
920   int check_quota(const rgw_user& user,
921                           rgw_bucket& bucket,
922                           RGWQuotaInfo& user_quota,
923                           RGWQuotaInfo& bucket_quota,
924                           uint64_t num_objs,
925                           uint64_t size) override {
926
927     if (!bucket_quota.enabled && !user_quota.enabled) {
928       return 0;
929     }
930
931     /*
932      * we need to fetch bucket stats if the user quota is enabled, because
933      * the whole system relies on us periodically updating the user's bucket
934      * stats in the user's header, this happens in get_stats() if we actually
935      * fetch that info and not rely on cached data
936      */
937
938     if (bucket_quota.enabled) {
939       RGWStorageStats bucket_stats;
940       int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
941                                            bucket_quota);
942       if (ret < 0) {
943         return ret;
944       }
945       ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
946       if (ret < 0) {
947         return ret;
948       }
949     }
950
951     if (user_quota.enabled) {
952       RGWStorageStats user_stats;
953       int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
954       if (ret < 0) {
955         return ret;
956       }
957       ret = check_quota("user", user_quota, user_stats, num_objs, size);
958       if (ret < 0) {
959         return ret;
960       }
961     }
962     return 0;
963   }
964
965   void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
966     bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
967     user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
968   }
969
970   int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
971                           const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
972                           uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
973   {
974     RGWStorageStats bucket_stats;
975     int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
976                                            bucket_quota);
977     if (ret < 0) {
978       return ret;
979     }
980
981     if (bucket_stats.num_objects  + num_objs > num_shards * max_objs_per_shard) {
982       ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
983              << " shard max_objects=" <<  max_objs_per_shard * num_shards << dendl;
984       need_resharding = true;
985       if (suggested_num_shards) {
986         *suggested_num_shards = (bucket_stats.num_objects  + num_objs) * 2 / max_objs_per_shard;
987       }
988     } else {
989       need_resharding = false;
990     }
991
992     return 0;
993   }
994
995 };
996
997
998 RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
999 {
1000   return new RGWQuotaHandlerImpl(store, quota_threads);
1001 }
1002
1003 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
1004 {
1005   delete handler;
1006 }
1007
1008