X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_quota.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_quota.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ce3d1265facd45b8ab2e210511553486e3a7feca;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_quota.cc b/src/ceph/src/rgw/rgw_quota.cc deleted file mode 100644 index ce3d126..0000000 --- a/src/ceph/src/rgw/rgw_quota.cc +++ /dev/null @@ -1,1008 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2013 Inktank, Inc - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#include "include/utime.h" -#include "common/lru_map.h" -#include "common/RefCountedObj.h" -#include "common/Thread.h" -#include "common/Mutex.h" -#include "common/RWLock.h" - -#include "rgw_common.h" -#include "rgw_rados.h" -#include "rgw_quota.h" -#include "rgw_bucket.h" -#include "rgw_user.h" - -#include - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rgw - - -struct RGWQuotaCacheStats { - RGWStorageStats stats; - utime_t expiration; - utime_t async_refresh_time; -}; - -template -class RGWQuotaCache { -protected: - RGWRados *store; - lru_map stats_map; - RefCountedWaitObject *async_refcount; - - class StatsAsyncTestSet : public lru_map::UpdateContext { - int objs_delta; - uint64_t added_bytes; - uint64_t removed_bytes; - public: - StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {} - bool update(RGWQuotaCacheStats *entry) override { - if (entry->async_refresh_time.sec() == 0) - return false; - - entry->async_refresh_time = utime_t(0, 0); - - return true; - } - }; - - virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) = 0; - - virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; - - virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map::UpdateContext *ctx) = 0; - virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; - - virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {} -public: - RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { - async_refcount = new RefCountedWaitObject; - } - virtual ~RGWQuotaCache() { - async_refcount->put_wait(); /* wait for all pending async requests to complete */ - } - - int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); - void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); - - virtual bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats); - - void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); - int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs); - void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats); - void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket); - - class AsyncRefreshHandler { - protected: - RGWRados *store; - RGWQuotaCache *cache; - public: - AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache *_cache) : store(_store), cache(_cache) {} - virtual ~AsyncRefreshHandler() {} - - virtual int init_fetch() = 0; - virtual void drop_reference() = 0; - }; - - virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0; -}; - -template -bool RGWQuotaCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) -{ - if (quota.max_size >= 0) { - if (quota.max_size_soft_threshold < 0) { - quota.max_size_soft_threshold = quota.max_size * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; - } - - if (cached_stats.size_rounded >= (uint64_t)quota.max_size_soft_threshold) { - ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (size): " - << cached_stats.size_rounded << " >= " << quota.max_size_soft_threshold << dendl; - return false; - } - } - - if (quota.max_objects >= 0) { - if (quota.max_objs_soft_threshold < 0) { - quota.max_objs_soft_threshold = quota.max_objects * store->ctx()->_conf->rgw_bucket_quota_soft_threshold; - } - - if (cached_stats.num_objects >= (uint64_t)quota.max_objs_soft_threshold) { - ldout(store->ctx(), 20) << "quota: can't use cached stats, exceeded soft threshold (num objs): " - << cached_stats.num_objects << " >= " << quota.max_objs_soft_threshold << dendl; - return false; - } - } - - return true; -} - -template -int RGWQuotaCache::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) -{ - /* protect against multiple updates */ - StatsAsyncTestSet test_update; - if (!map_find_and_update(user, bucket, &test_update)) { - /* most likely we just raced with another update */ - return 0; - } - - async_refcount->get(); - - - AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); - - int ret = handler->init_fetch(); - if (ret < 0) { - async_refcount->put(); - handler->drop_reference(); - return ret; - } - - return 0; -} - -template -void RGWQuotaCache::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket) -{ - ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; - - async_refcount->put(); -} - -template -void RGWQuotaCache::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats) -{ - ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; - - RGWQuotaCacheStats qs; - - map_find(user, bucket, qs); - - set_stats(user, bucket, qs, stats); - - async_refcount->put(); -} - -template -void RGWQuotaCache::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) -{ - qs.stats = stats; - qs.expiration = ceph_clock_now(); - qs.async_refresh_time = qs.expiration; - qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; - qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; - - map_add(user, bucket, qs); -} - -template -int RGWQuotaCache::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { - RGWQuotaCacheStats qs; - utime_t now = ceph_clock_now(); - if (map_find(user, bucket, qs)) { - if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { - int r = async_refresh(user, bucket, qs); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; - - /* continue processing, might be a transient error, async refresh is just optimization */ - } - } - - if (can_use_cached_stats(quota, qs.stats) && qs.expiration > - ceph_clock_now()) { - stats = qs.stats; - return 0; - } - } - - int ret = fetch_stats_from_storage(user, bucket, stats); - if (ret < 0 && ret != -ENOENT) - return ret; - - set_stats(user, bucket, qs, stats); - - return 0; -} - - -template -class RGWQuotaStatsUpdate : public lru_map::UpdateContext { - const int objs_delta; - const uint64_t added_bytes; - const uint64_t removed_bytes; -public: - RGWQuotaStatsUpdate(const int objs_delta, - const uint64_t added_bytes, - const uint64_t removed_bytes) - : objs_delta(objs_delta), - added_bytes(added_bytes), - removed_bytes(removed_bytes) { - } - - bool update(RGWQuotaCacheStats * const entry) override { - const uint64_t rounded_added = rgw_rounded_objsize(added_bytes); - const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes); - - if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) { - entry->stats.size += added_bytes - removed_bytes; - } else { - entry->stats.size = 0; - } - - if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) { - entry->stats.size_rounded += rounded_added - rounded_removed; - } else { - entry->stats.size_rounded = 0; - } - - if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) { - entry->stats.num_objects += objs_delta; - } else { - entry->stats.num_objects = 0; - } - - return true; - } -}; - - -template -void RGWQuotaCache::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, - uint64_t added_bytes, uint64_t removed_bytes) -{ - RGWQuotaStatsUpdate update(objs_delta, added_bytes, removed_bytes); - map_find_and_update(user, bucket, &update); - - data_modified(user, bucket); -} - -class BucketAsyncRefreshHandler : public RGWQuotaCache::AsyncRefreshHandler, - public RGWGetBucketStats_CB { - rgw_user user; -public: - BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache *_cache, - const rgw_user& _user, const rgw_bucket& _bucket) : - RGWQuotaCache::AsyncRefreshHandler(_store, _cache), - RGWGetBucketStats_CB(_bucket), user(_user) {} - - void drop_reference() override { put(); } - void handle_response(int r) override; - int init_fetch() override; -}; - -int BucketAsyncRefreshHandler::init_fetch() -{ - RGWBucketInfo bucket_info; - - RGWObjectCtx obj_ctx(store); - - int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; - return r; - } - - ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; - - r = store->get_bucket_stats_async(bucket_info, RGW_NO_SHARD, this); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl; - - /* get_bucket_stats_async() dropped our reference already */ - return r; - } - - return 0; -} - -void BucketAsyncRefreshHandler::handle_response(const int r) -{ - if (r < 0) { - ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; - cache->async_refresh_fail(user, bucket); - return; - } - - RGWStorageStats bs; - - for (const auto& pair : *stats) { - const RGWStorageStats& s = pair.second; - - bs.size += s.size; - bs.size_rounded += s.size_rounded; - bs.num_objects += s.num_objects; - } - - cache->async_refresh_response(user, bucket, bs); -} - -class RGWBucketStatsCache : public RGWQuotaCache { -protected: - bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { - return stats_map.find(bucket, qs); - } - - bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map::UpdateContext *ctx) override { - return stats_map.find_and_update(bucket, NULL, ctx); - } - - void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { - stats_map.add(bucket, qs); - } - - int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; - -public: - explicit RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { - } - - AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { - return new BucketAsyncRefreshHandler(store, this, user, bucket); - } -}; - -int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) -{ - RGWBucketInfo bucket_info; - - RGWObjectCtx obj_ctx(store); - - int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; - return r; - } - - string bucket_ver; - string master_ver; - - map bucket_stats; - r = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, - &master_ver, bucket_stats, nullptr); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket stats for bucket=" - << bucket.name << dendl; - return r; - } - - stats = RGWStorageStats(); - - for (const auto& pair : bucket_stats) { - const RGWStorageStats& s = pair.second; - - stats.size += s.size; - stats.size_rounded += s.size_rounded; - stats.num_objects += s.num_objects; - } - - return 0; -} - -class UserAsyncRefreshHandler : public RGWQuotaCache::AsyncRefreshHandler, - public RGWGetUserStats_CB { - rgw_bucket bucket; -public: - UserAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache *_cache, - const rgw_user& _user, const rgw_bucket& _bucket) : - RGWQuotaCache::AsyncRefreshHandler(_store, _cache), - RGWGetUserStats_CB(_user), - bucket(_bucket) {} - - void drop_reference() override { put(); } - int init_fetch() override; - void handle_response(int r) override; -}; - -int UserAsyncRefreshHandler::init_fetch() -{ - ldout(store->ctx(), 20) << "initiating async quota refresh for user=" << user << dendl; - int r = store->get_user_stats_async(user, this); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for user=" << user << dendl; - - /* get_bucket_stats_async() dropped our reference already */ - return r; - } - - return 0; -} - -void UserAsyncRefreshHandler::handle_response(int r) -{ - if (r < 0) { - ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; - cache->async_refresh_fail(user, bucket); - return; - } - - cache->async_refresh_response(user, bucket, stats); -} - -class RGWUserStatsCache : public RGWQuotaCache { - std::atomic down_flag = { false }; - RWLock rwlock; - map modified_buckets; - - /* thread, sync recent modified buckets info */ - class BucketsSyncThread : public Thread { - CephContext *cct; - RGWUserStatsCache *stats; - - Mutex lock; - Cond cond; - public: - - BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} - - void *entry() override { - ldout(cct, 20) << "BucketsSyncThread: start" << dendl; - do { - map buckets; - - stats->swap_modified_buckets(buckets); - - for (map::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { - rgw_bucket bucket = iter->first; - rgw_user& user = iter->second; - ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; - int r = stats->sync_bucket(user, bucket); - if (r < 0) { - ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; - } - } - - if (stats->going_down()) - break; - - lock.Lock(); - cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); - lock.Unlock(); - } while (!stats->going_down()); - ldout(cct, 20) << "BucketsSyncThread: done" << dendl; - - return NULL; - } - - void stop() { - Mutex::Locker l(lock); - cond.Signal(); - } - }; - - /* - * thread, full sync all users stats periodically - * - * only sync non idle users or ones that never got synced before, this is needed so that - * users that didn't have quota turned on before (or existed before the user objclass - * tracked stats) need to get their backend stats up to date. - */ - class UserSyncThread : public Thread { - CephContext *cct; - RGWUserStatsCache *stats; - - Mutex lock; - Cond cond; - public: - - UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} - - void *entry() override { - ldout(cct, 20) << "UserSyncThread: start" << dendl; - do { - int ret = stats->sync_all_users(); - if (ret < 0) { - ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl; - } - - if (stats->going_down()) - break; - - lock.Lock(); - cond.WaitInterval(lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); - lock.Unlock(); - } while (!stats->going_down()); - ldout(cct, 20) << "UserSyncThread: done" << dendl; - - return NULL; - } - - void stop() { - Mutex::Locker l(lock); - cond.Signal(); - } - }; - - BucketsSyncThread *buckets_sync_thread; - UserSyncThread *user_sync_thread; -protected: - bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { - return stats_map.find(user, qs); - } - - bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map::UpdateContext *ctx) override { - return stats_map.find_and_update(user, NULL, ctx); - } - - void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override { - stats_map.add(user, qs); - } - - int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) override; - int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket); - int sync_user(const rgw_user& user); - int sync_all_users(); - - void data_modified(const rgw_user& user, rgw_bucket& bucket) override; - - void swap_modified_buckets(map& out) { - rwlock.get_write(); - modified_buckets.swap(out); - rwlock.unlock(); - } - - template /* easier doing it as a template, Thread doesn't have ->stop() */ - void stop_thread(T **pthr) { - T *thread = *pthr; - if (!thread) - return; - - thread->stop(); - thread->join(); - delete thread; - *pthr = NULL; - } - -public: - RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), - rwlock("RGWUserStatsCache::rwlock") { - if (quota_threads) { - buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); - buckets_sync_thread->create("rgw_buck_st_syn"); - user_sync_thread = new UserSyncThread(store->ctx(), this); - user_sync_thread->create("rgw_user_st_syn"); - } else { - buckets_sync_thread = NULL; - user_sync_thread = NULL; - } - } - ~RGWUserStatsCache() override { - stop(); - } - - AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override { - return new UserAsyncRefreshHandler(store, this, user, bucket); - } - - bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats) override { - /* in the user case, the cached stats may contain a better estimation of the totals, as - * the backend is only periodically getting updated. - */ - return true; - } - - bool going_down() { - return down_flag; - } - - void stop() { - down_flag = true; - rwlock.get_write(); - stop_thread(&buckets_sync_thread); - rwlock.unlock(); - stop_thread(&user_sync_thread); - } -}; - -int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats) -{ - int r = store->get_user_stats(user, stats); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl; - return r; - } - - return 0; -} - -int RGWUserStatsCache::sync_bucket(const rgw_user& user, rgw_bucket& bucket) -{ - RGWBucketInfo bucket_info; - - RGWObjectCtx obj_ctx(store); - - int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl; - return r; - } - - r = rgw_bucket_sync_user_stats(store, user, bucket_info); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl; - return r; - } - - return 0; -} - -int RGWUserStatsCache::sync_user(const rgw_user& user) -{ - cls_user_header header; - string user_str = user.to_str(); - int ret = store->cls_user_get_header(user_str, &header); - if (ret < 0) { - ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl; - return ret; - } - - if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && - header.last_stats_update < header.last_stats_sync) { - ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; - return 0; - } - - real_time when_need_full_sync = header.last_stats_sync; - when_need_full_sync += make_timespan(store->ctx()->_conf->rgw_user_quota_sync_wait_time); - - // check if enough time passed since last full sync - /* FIXME: missing check? */ - - ret = rgw_user_sync_all_stats(store, user); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int RGWUserStatsCache::sync_all_users() -{ - string key = "user"; - void *handle; - - int ret = store->meta_mgr->list_keys_init(key, &handle); - if (ret < 0) { - ldout(store->ctx(), 10) << "ERROR: can't get key: ret=" << ret << dendl; - return ret; - } - - bool truncated; - int max = 1000; - - do { - list keys; - ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; - goto done; - } - for (list::iterator iter = keys.begin(); - iter != keys.end() && !going_down(); - ++iter) { - rgw_user user(*iter); - ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl; - int ret = sync_user(user); - if (ret < 0) { - ldout(store->ctx(), 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; - - /* continuing to next user */ - continue; - } - } - } while (truncated); - - ret = 0; -done: - store->meta_mgr->list_keys_complete(handle); - return ret; -} - -void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket) -{ - /* racy, but it's ok */ - rwlock.get_read(); - bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); - rwlock.unlock(); - - if (need_update) { - rwlock.get_write(); - modified_buckets[bucket] = user; - rwlock.unlock(); - } -} - - -class RGWQuotaInfoApplier { - /* NOTE: no non-static field allowed as instances are supposed to live in - * the static memory only. */ -protected: - RGWQuotaInfoApplier() = default; - -public: - virtual ~RGWQuotaInfoApplier() {} - - virtual bool is_size_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t size) const = 0; - - virtual bool is_num_objs_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t num_objs) const = 0; - - static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo); -}; - -class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier { -public: - bool is_size_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t size) const override; - - bool is_num_objs_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t num_objs) const override; -}; - -class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier { -public: - bool is_size_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t size) const override; - - bool is_num_objs_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t num_objs) const override; -}; - - -bool RGWQuotaInfoDefApplier::is_size_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t size) const -{ - if (qinfo.max_size < 0) { - /* The limit is not enabled. */ - return false; - } - - const uint64_t cur_size = stats.size_rounded; - const uint64_t new_size = rgw_rounded_objsize(size); - - if (cur_size + new_size > static_cast(qinfo.max_size)) { - dout(10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded - << " size=" << new_size << " " - << entity << "_quota.max_size=" << qinfo.max_size << dendl; - return true; - } - - return false; -} - -bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t num_objs) const -{ - if (qinfo.max_objects < 0) { - /* The limit is not enabled. */ - return false; - } - - if (stats.num_objects + num_objs > static_cast(qinfo.max_objects)) { - dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects - << " " << entity << "_quota.max_objects=" << qinfo.max_objects - << dendl; - return true; - } - - return false; -} - -bool RGWQuotaInfoRawApplier::is_size_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t size) const -{ - if (qinfo.max_size < 0) { - /* The limit is not enabled. */ - return false; - } - - const uint64_t cur_size = stats.size; - - if (cur_size + size > static_cast(qinfo.max_size)) { - dout(10) << "quota exceeded: stats.size=" << stats.size - << " size=" << size << " " - << entity << "_quota.max_size=" << qinfo.max_size << dendl; - return true; - } - - return false; -} - -bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const char * const entity, - const RGWQuotaInfo& qinfo, - const RGWStorageStats& stats, - const uint64_t num_objs) const -{ - if (qinfo.max_objects < 0) { - /* The limit is not enabled. */ - return false; - } - - if (stats.num_objects + num_objs > static_cast(qinfo.max_objects)) { - dout(10) << "quota exceeded: stats.num_objects=" << stats.num_objects - << " " << entity << "_quota.max_objects=" << qinfo.max_objects - << dendl; - return true; - } - - return false; -} - -const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance( - const RGWQuotaInfo& qinfo) -{ - static RGWQuotaInfoDefApplier default_qapplier; - static RGWQuotaInfoRawApplier raw_qapplier; - - if (qinfo.check_on_raw) { - return raw_qapplier; - } else { - return default_qapplier; - } -} - - -class RGWQuotaHandlerImpl : public RGWQuotaHandler { - RGWRados *store; - RGWBucketStatsCache bucket_stats_cache; - RGWUserStatsCache user_stats_cache; - - int check_quota(const char * const entity, - const RGWQuotaInfo& quota, - const RGWStorageStats& stats, - const uint64_t num_objs, - const uint64_t size) { - if (!quota.enabled) { - return 0; - } - - const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota); - - ldout(store->ctx(), 20) << entity - << " quota: max_objects=" << quota.max_objects - << " max_size=" << quota.max_size << dendl; - - - if (quota_applier.is_num_objs_exceeded(entity, quota, stats, num_objs)) { - return -ERR_QUOTA_EXCEEDED; - } - - if (quota_applier.is_size_exceeded(entity, quota, stats, size)) { - return -ERR_QUOTA_EXCEEDED; - } - - ldout(store->ctx(), 20) << entity << " quota OK:" - << " stats.num_objects=" << stats.num_objects - << " stats.size=" << stats.size << dendl; - return 0; - } -public: - RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), - bucket_stats_cache(_store), - user_stats_cache(_store, quota_threads) {} - - int check_quota(const rgw_user& user, - rgw_bucket& bucket, - RGWQuotaInfo& user_quota, - RGWQuotaInfo& bucket_quota, - uint64_t num_objs, - uint64_t size) override { - - if (!bucket_quota.enabled && !user_quota.enabled) { - return 0; - } - - /* - * we need to fetch bucket stats if the user quota is enabled, because - * the whole system relies on us periodically updating the user's bucket - * stats in the user's header, this happens in get_stats() if we actually - * fetch that info and not rely on cached data - */ - - if (bucket_quota.enabled) { - RGWStorageStats bucket_stats; - int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, - bucket_quota); - if (ret < 0) { - return ret; - } - ret = check_quota("bucket", bucket_quota, bucket_stats, num_objs, size); - if (ret < 0) { - return ret; - } - } - - if (user_quota.enabled) { - RGWStorageStats user_stats; - int ret = user_stats_cache.get_stats(user, bucket, user_stats, user_quota); - if (ret < 0) { - return ret; - } - ret = check_quota("user", user_quota, user_stats, num_objs, size); - if (ret < 0) { - return ret; - } - } - return 0; - } - - void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override { - bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); - user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); - } - - int check_bucket_shards(uint64_t max_objs_per_shard, uint64_t num_shards, - const rgw_user& user, const rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, - uint64_t num_objs, bool& need_resharding, uint32_t *suggested_num_shards) - { - RGWStorageStats bucket_stats; - int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, - bucket_quota); - if (ret < 0) { - return ret; - } - - if (bucket_stats.num_objects + num_objs > num_shards * max_objs_per_shard) { - ldout(store->ctx(), 0) << __func__ << ": resharding needed: stats.num_objects=" << bucket_stats.num_objects - << " shard max_objects=" << max_objs_per_shard * num_shards << dendl; - need_resharding = true; - if (suggested_num_shards) { - *suggested_num_shards = (bucket_stats.num_objects + num_objs) * 2 / max_objs_per_shard; - } - } else { - need_resharding = false; - } - - return 0; - } - -}; - - -RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads) -{ - return new RGWQuotaHandlerImpl(store, quota_threads); -} - -void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) -{ - delete handler; -} - -