1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "include/utime.h"
17 #include "common/lru_map.h"
18 #include "common/RefCountedObj.h"
19 #include "common/Thread.h"
20 #include "common/Mutex.h"
21 #include "common/RWLock.h"
23 #include "rgw_common.h"
24 #include "rgw_rados.h"
25 #include "rgw_quota.h"
26 #include "rgw_bucket.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rgw
35 struct RGWQuotaCacheStats {
36 RGWStorageStats stats;
38 utime_t async_refresh_time;
45 lru_map<T, RGWQuotaCacheStats> stats_map;
46 RefCountedWaitObject *async_refcount;
48 class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
51 uint64_t removed_bytes;
53 StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
54 bool update(RGWQuotaCacheStats *entry) override {
55 if (entry->async_refresh_time.sec() == 0)
58 entry->async_refresh_time = utime_t(0, 0);
64 virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0;
66 virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
68 virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
69 virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
71 virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
73 RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
74 async_refcount = new RefCountedWaitObject;
76 virtual ~RGWQuotaCache() {
77 async_refcount->put_wait(); /* wait for all pending async requests to complete */
80 int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
81 void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
83 virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
85 void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
86 int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
87 void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
88 void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
90 class AsyncRefreshHandler {
93 RGWQuotaCache<T> *cache;
95 AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache) : store(_store), cache(_cache) {}
96 virtual ~AsyncRefreshHandler() {}
98 virtual int init_fetch() = 0;
99 virtual void drop_reference() = 0;
102 virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
106 bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
108 if (quota.max_size >= 0) {
109 if (quota.max_size_soft_threshold < 0) {
110 quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
113 if (cached_stats.size_rounded >= (uint64_t)quota.max_size_soft_threshold) {
114 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): "
115 << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl;
120 if (quota.max_objects >= 0) {
121 if (quota.max_objs_soft_threshold < 0) {
122 quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold;
125 if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) {
126 ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): "
127 << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl;
136 int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
138 /* protect against multiple updates */
139 StatsAsyncTestSet test_update;
140 if (!map_find_and_update(user, bucket, &test_update)) {
141 /* most likely we just raced with another update */
145 async_refcount->get();
148 AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
150 int ret = handler->init_fetch();
152 async_refcount->put();
153 handler->drop_reference();
161 void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
163 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
165 async_refcount->put();
169 void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
171 ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
173 RGWQuotaCacheStats qs;
175 map_find(user, bucket, qs);
177 set_stats(user, bucket, qs, stats);
179 async_refcount->put();
183 void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
186 qs.expiration = ceph_clock_now();
187 qs.async_refresh_time = qs.expiration;
188 qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
189 qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
191 map_add(user, bucket, qs);
195 int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
196 RGWQuotaCacheStats qs;
197 utime_t now = ceph_clock_now();
198 if (map_find(user, bucket, qs)) {
199 if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
200 int r = async_refresh(user, bucket, qs);
202 ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
204 /* continue processing, might be a transient error, async refresh is just optimization */
208 if (can_use_cached_stats(quota, qs.stats) && qs.expiration >
215 int ret = fetch_stats_from_storage(user, bucket, stats);
216 if (ret < 0 && ret != -ENOENT)
219 set_stats(user, bucket, qs, stats);
226 class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
227 const int objs_delta;
228 const uint64_t added_bytes;
229 const uint64_t removed_bytes;
231 RGWQuotaStatsUpdate(const int objs_delta,
232 const uint64_t added_bytes,
233 const uint64_t removed_bytes)
234 : objs_delta(objs_delta),
235 added_bytes(added_bytes),
236 removed_bytes(removed_bytes) {
239 bool update(RGWQuotaCacheStats * const entry) override {
240 const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
241 const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
243 if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
244 entry->stats.size += added_bytes - removed_bytes;
246 entry->stats.size = 0;
249 if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
250 entry->stats.size_rounded += rounded_added - rounded_removed;
252 entry->stats.size_rounded = 0;
255 if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
256 entry->stats.num_objects += objs_delta;
258 entry->stats.num_objects = 0;
267 void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
268 uint64_t added_bytes, uint64_t removed_bytes)
270 RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
271 map_find_and_update(user, bucket, &update);
273 data_modified(user, bucket);
276 class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
277 public RGWGetBucketStats_CB {
280 BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
281 const rgw_user& _user, const rgw_bucket& _bucket) :
282 RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache),
283 RGWGetBucketStats_CB(_bucket), user(_user) {}
285 void drop_reference() override { put(); }
286 void handle_response(int r) override;
287 int init_fetch() override;
290 int BucketAsyncRefreshHandler::init_fetch()
292 RGWBucketInfo bucket_info;
294 RGWObjectCtx obj_ctx(store);
296 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
298 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
302 ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
304 r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this);
306 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
308 /* get_bucket_stats_async() dropped our reference already */
315 void BucketAsyncRefreshHandler::handle_response(const int r)
318 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
319 cache->async_refresh_fail(user, bucket);
325 for (const auto& pair : *stats) {
326 const RGWStorageStats& s = pair.second;
329 bs.size_rounded += s.size_rounded;
330 bs.num_objects += s.num_objects;
333 cache->async_refresh_response(user, bucket, bs);
336 class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
338 bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
339 return stats_map.find(bucket, qs);
342 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
343 return stats_map.find_and_update(bucket, NULL, ctx);
346 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
347 stats_map.add(bucket, qs);
350 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
353 explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache<rgw_bucket>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
356 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
357 return new BucketAsyncRefreshHandler(store, this, user, bucket);
361 int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
363 RGWBucketInfo bucket_info;
365 RGWObjectCtx obj_ctx(store);
367 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
369 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
376 map<RGWObjCategory, RGWStorageStats> bucket_stats;
377 r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver,
378 &master_ver, bucket_stats, nullptr);
380 ldout(store->ctx(), 0) << "could not get bucket stats for bucket="
381 << bucket.name << dendl;
385 stats = RGWStorageStats();
387 for (const auto& pair : bucket_stats) {
388 const RGWStorageStats& s = pair.second;
390 stats.size += s.size;
391 stats.size_rounded += s.size_rounded;
392 stats.num_objects += s.num_objects;
398 class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
399 public RGWGetUserStats_CB {
402 UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_user> *_cache,
403 const rgw_user& _user, const rgw_bucket& _bucket) :
404 RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_store, _cache),
405 RGWGetUserStats_CB(_user),
408 void drop_reference() override { put(); }
409 int init_fetch() override;
410 void handle_response(int r) override;
413 int UserAsyncRefreshHandler::init_fetch()
415 ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl;
416 int r = store->get_user_stats_async(user, this);
418 ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl;
420 /* get_bucket_stats_async() dropped our reference already */
427 void UserAsyncRefreshHandler::handle_response(int r)
430 ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
431 cache->async_refresh_fail(user, bucket);
435 cache->async_refresh_response(user, bucket, stats);
438 class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
439 std::atomic<bool> down_flag = { false };
441 map<rgw_bucket, rgw_user> modified_buckets;
443 /* thread, sync recent modified buckets info */
444 class BucketsSyncThread : public Thread {
446 RGWUserStatsCache *stats;
452 BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
454 void *entry() override {
455 ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
457 map<rgw_bucket, rgw_user> buckets;
459 stats->swap_modified_buckets(buckets);
461 for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
462 rgw_bucket bucket = iter->first;
463 rgw_user& user = iter->second;
464 ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
465 int r = stats->sync_bucket(user, bucket);
467 ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
471 if (stats->going_down())
475 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
477 } while (!stats->going_down());
478 ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
484 Mutex::Locker l(lock);
490 * thread, full sync all users stats periodically
492 * only sync non idle users or ones that never got synced before, this is needed so that
493 * users that didn't have quota turned on before (or existed before the user objclass
494 * tracked stats) need to get their backend stats up to date.
496 class UserSyncThread : public Thread {
498 RGWUserStatsCache *stats;
504 UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
506 void *entry() override {
507 ldout(cct, 20) << "UserSyncThread: start" << dendl;
509 int ret = stats->sync_all_users();
511 ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
514 if (stats->going_down())
518 cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
520 } while (!stats->going_down());
521 ldout(cct, 20) << "UserSyncThread: done" << dendl;
527 Mutex::Locker l(lock);
532 BucketsSyncThread *buckets_sync_thread;
533 UserSyncThread *user_sync_thread;
535 bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
536 return stats_map.find(user, qs);
539 bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
540 return stats_map.find_and_update(user, NULL, ctx);
543 void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
544 stats_map.add(user, qs);
547 int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override;
548 int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket);
549 int sync_user(const rgw_user& user);
550 int sync_all_users();
552 void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
554 void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
556 modified_buckets.swap(out);
560 template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
561 void stop_thread(T **pthr) {
573 RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache<rgw_user>(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
574 rwlock("RGWUserStatsCache::rwlock") {
576 buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
577 buckets_sync_thread->create("rgw_buck_st_syn");
578 user_sync_thread = new UserSyncThread(store->ctx(), this);
579 user_sync_thread->create("rgw_user_st_syn");
581 buckets_sync_thread = NULL;
582 user_sync_thread = NULL;
585 ~RGWUserStatsCache() override {
589 AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
590 return new UserAsyncRefreshHandler(store, this, user, bucket);
593 bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override {
594 /* in the user case, the cached stats may contain a better estimation of the totals, as
595 * the backend is only periodically getting updated.
607 stop_thread(&buckets_sync_thread);
609 stop_thread(&user_sync_thread);
613 int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
615 int r = store->get_user_stats(user, stats);
617 ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
624 int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket)
626 RGWBucketInfo bucket_info;
628 RGWObjectCtx obj_ctx(store);
630 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
632 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
636 r = rgw_bucket_sync_user_stats(store, user, bucket_info);
638 ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
645 int RGWUserStatsCache::sync_user(const rgw_user& user)
647 cls_user_header header;
648 string user_str = user.to_str();
649 int ret = store->cls_user_get_header(user_str, &header);
651 ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
655 if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
656 header.last_stats_update < header.last_stats_sync) {
657 ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
661 real_time when_need_full_sync = header.last_stats_sync;
662 when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time);
664 // check if enough time passed since last full sync
665 /* FIXME: missing check? */
667 ret = rgw_user_sync_all_stats(store, user);
669 ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
676 int RGWUserStatsCache::sync_all_users()
681 int ret = store->meta_mgr->list_keys_init(key, &handle);
683 ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl;
692 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
694 ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
697 for (list<string>::iterator iter = keys.begin();
698 iter != keys.end() && !going_down();
700 rgw_user user(*iter);
701 ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
702 int ret = sync_user(user);
704 ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
706 /* continuing to next user */
714 store->meta_mgr->list_keys_complete(handle);
718 void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
720 /* racy, but it's ok */
722 bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
727 modified_buckets[bucket] = user;
733 class RGWQuotaInfoApplier {
734 /* NOTE: no non-static field allowed as instances are supposed to live in
735 * the static memory only. */
737 RGWQuotaInfoApplier() = default;
740 virtual ~RGWQuotaInfoApplier() {}
742 virtual bool is_size_exceeded(const char * const entity,
743 const RGWQuotaInfo& qinfo,
744 const RGWStorageStats& stats,
745 const uint64_t size) const = 0;
747 virtual bool is_num_objs_exceeded(const char * const entity,
748 const RGWQuotaInfo& qinfo,
749 const RGWStorageStats& stats,
750 const uint64_t num_objs) const = 0;
752 static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
755 class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
757 bool is_size_exceeded(const char * const entity,
758 const RGWQuotaInfo& qinfo,
759 const RGWStorageStats& stats,
760 const uint64_t size) const override;
762 bool is_num_objs_exceeded(const char * const entity,
763 const RGWQuotaInfo& qinfo,
764 const RGWStorageStats& stats,
765 const uint64_t num_objs) const override;
768 class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
770 bool is_size_exceeded(const char * const entity,
771 const RGWQuotaInfo& qinfo,
772 const RGWStorageStats& stats,
773 const uint64_t size) const override;
775 bool is_num_objs_exceeded(const char * const entity,
776 const RGWQuotaInfo& qinfo,
777 const RGWStorageStats& stats,
778 const uint64_t num_objs) const override;
782 bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity,
783 const RGWQuotaInfo& qinfo,
784 const RGWStorageStats& stats,
785 const uint64_t size) const
787 if (qinfo.max_size < 0) {
788 /* The limit is not enabled. */
792 const uint64_t cur_size = stats.size_rounded;
793 const uint64_t new_size = rgw_rounded_objsize(size);
795 if (cur_size + new_size > static_cast<uint64_t>(qinfo.max_size)) {
796 dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
797 << " size=" << new_size << " "
798 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
805 bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity,
806 const RGWQuotaInfo& qinfo,
807 const RGWStorageStats& stats,
808 const uint64_t num_objs) const
810 if (qinfo.max_objects < 0) {
811 /* The limit is not enabled. */
815 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
816 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
817 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
825 bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity,
826 const RGWQuotaInfo& qinfo,
827 const RGWStorageStats& stats,
828 const uint64_t size) const
830 if (qinfo.max_size < 0) {
831 /* The limit is not enabled. */
835 const uint64_t cur_size = stats.size;
837 if (cur_size + size > static_cast<uint64_t>(qinfo.max_size)) {
838 dout(10) << "quota exceeded: stats.size=" << stats.size
839 << " size=" << size << " "
840 << entity << "_quota.max_size=" << qinfo.max_size << dendl;
847 bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity,
848 const RGWQuotaInfo& qinfo,
849 const RGWStorageStats& stats,
850 const uint64_t num_objs) const
852 if (qinfo.max_objects < 0) {
853 /* The limit is not enabled. */
857 if (stats.num_objects + num_objs > static_cast<uint64_t>(qinfo.max_objects)) {
858 dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects
859 << " " << entity << "_quota.max_objects=" << qinfo.max_objects
867 const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
868 const RGWQuotaInfo& qinfo)
870 static RGWQuotaInfoDefApplier default_qapplier;
871 static RGWQuotaInfoRawApplier raw_qapplier;
873 if (qinfo.check_on_raw) {
876 return default_qapplier;
881 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
883 RGWBucketStatsCache bucket_stats_cache;
884 RGWUserStatsCache user_stats_cache;
886 int check_quota(const char * const entity,
887 const RGWQuotaInfo& quota,
888 const RGWStorageStats& stats,
889 const uint64_t num_objs,
890 const uint64_t size) {
891 if (!quota.enabled) {
895 const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
897 ldout(store->ctx(), 20) << entity
898 << " quota: max_objects=" << quota.max_objects
899 << " max_size=" << quota.max_size << dendl;
902 if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) {
903 return -ERR_QUOTA_EXCEEDED;
906 if (quota_applier.is_size_exceeded(entity, quota, stats, size)) {
907 return -ERR_QUOTA_EXCEEDED;
910 ldout(store->ctx(), 20) << entity << " quota OK:"
911 << " stats.num_objects=" << stats.num_objects
912 << " stats.size=" << stats.size << dendl;
916 RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store),
917 bucket_stats_cache(_store),
918 user_stats_cache(_store, quota_threads) {}
920 int check_quota(const rgw_user& user,
922 RGWQuotaInfo& user_quota,
923 RGWQuotaInfo& bucket_quota,
925 uint64_t size) override {
927 if (!bucket_quota.enabled && !user_quota.enabled) {
932 * we need to fetch bucket stats if the user quota is enabled, because
933 * the whole system relies on us periodically updating the user's bucket
934 * stats in the user's header, this happens in get_stats() if we actually
935 * fetch that info and not rely on cached data
938 if (bucket_quota.enabled) {
939 RGWStorageStats bucket_stats;
940 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
945 ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size);
951 if (user_quota.enabled) {
952 RGWStorageStats user_stats;
953 int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota);
957 ret = check_quota("user", user_quota, user_stats, num_objs, size);
965 void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
966 bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
967 user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
970 int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards,
971 const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
972 uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards)
974 RGWStorageStats bucket_stats;
975 int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats,
981 if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) {
982 ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects
983 << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
984 need_resharding = true;
985 if (suggested_num_shards) {
986 *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard;
989 need_resharding = false;
998 RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
1000 return new RGWQuotaHandlerImpl(store, quota_threads);
1003 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)