X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_cache.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_cache.h;h=6bc9ef14903268f213440599af8d596aea5bd9e1;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_cache.h b/src/ceph/src/rgw/rgw_cache.h new file mode 100644 index 0000000..6bc9ef1 --- /dev/null +++ b/src/ceph/src/rgw/rgw_cache.h @@ -0,0 +1,575 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RGWCACHE_H +#define CEPH_RGWCACHE_H + +#include "rgw_rados.h" +#include +#include +#include "include/types.h" +#include "include/utime.h" +#include "include/assert.h" +#include "common/RWLock.h" + +enum { + UPDATE_OBJ, + REMOVE_OBJ, +}; + +#define CACHE_FLAG_DATA 0x01 +#define CACHE_FLAG_XATTRS 0x02 +#define CACHE_FLAG_META 0x04 +#define CACHE_FLAG_MODIFY_XATTRS 0x08 +#define CACHE_FLAG_OBJV 0x10 + +#define mydout(v) lsubdout(T::cct, rgw, v) + +struct ObjectMetaInfo { + uint64_t size; + real_time mtime; + + ObjectMetaInfo() : size(0) {} + + void encode(bufferlist& bl) const { + ENCODE_START(2, 2, bl); + ::encode(size, bl); + ::encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + ::decode(size, bl); + ::decode(mtime, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(ObjectMetaInfo) + +struct ObjectCacheInfo { + int status; + uint32_t flags; + uint64_t epoch; + bufferlist data; + map xattrs; + map rm_xattrs; + ObjectMetaInfo meta; + obj_version version; + + ObjectCacheInfo() : status(0), flags(0), epoch(0), version() {} + + void encode(bufferlist& bl) const { + ENCODE_START(5, 3, bl); + ::encode(status, bl); + ::encode(flags, bl); + ::encode(data, bl); + ::encode(xattrs, bl); + ::encode(meta, bl); + ::encode(rm_xattrs, bl); + ::encode(epoch, bl); + ::encode(version, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl); + ::decode(status, bl); + ::decode(flags, bl); + ::decode(data, bl); + ::decode(xattrs, bl); + ::decode(meta, bl); + if (struct_v >= 2) + ::decode(rm_xattrs, bl); + if (struct_v >= 4) + ::decode(epoch, bl); + if (struct_v >= 5) + ::decode(version, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(ObjectCacheInfo) + +struct RGWCacheNotifyInfo { + uint32_t op; + rgw_raw_obj obj; + ObjectCacheInfo obj_info; + off_t ofs; + string ns; + + RGWCacheNotifyInfo() : op(0), ofs(0) {} + + void encode(bufferlist& obl) const { + ENCODE_START(2, 2, obl); + ::encode(op, obl); + ::encode(obj, obl); + ::encode(obj_info, obl); + ::encode(ofs, obl); + ::encode(ns, obl); + ENCODE_FINISH(obl); + } + void decode(bufferlist::iterator& ibl) { + DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, ibl); + ::decode(op, ibl); + ::decode(obj, ibl); + ::decode(obj_info, ibl); + ::decode(ofs, ibl); + ::decode(ns, ibl); + DECODE_FINISH(ibl); + } + void dump(Formatter *f) const; + static void generate_test_instances(list& o); +}; +WRITE_CLASS_ENCODER(RGWCacheNotifyInfo) + +struct ObjectCacheEntry { + ObjectCacheInfo info; + std::list::iterator lru_iter; + uint64_t lru_promotion_ts; + uint64_t gen; + std::list > chained_entries; + + ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {} +}; + +class ObjectCache { + std::map cache_map; + std::list lru; + unsigned long lru_size; + unsigned long lru_counter; + unsigned long lru_window; + RWLock lock; + CephContext *cct; + + list chained_cache; + + bool enabled; + + void touch_lru(string& name, ObjectCacheEntry& entry, std::list::iterator& lru_iter); + void remove_lru(string& name, std::list::iterator& lru_iter); + + void do_invalidate_all(); +public: + ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { } + int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info); + void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info); + void remove(std::string& name); + void set_ctx(CephContext *_cct) { + cct = _cct; + lru_window = cct->_conf->rgw_cache_lru_size / 2; + } + bool chain_cache_entry(list& cache_info_entries, RGWChainedCache::Entry *chained_entry); + + void set_enabled(bool status); + + void chain_cache(RGWChainedCache *cache); + void invalidate_all(); +}; + +template +class RGWCache : public T +{ + ObjectCache cache; + + int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) { + return T::list_objects_raw_init(pool, handle); + } + int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) { + return T::list_objects_raw_next(obj, handle); + } + + string normal_name(rgw_pool& pool, const std::string& oid) { + std::string buf; + buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); + buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid); + return buf; + } + + void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj); + string normal_name(rgw_raw_obj& obj) { + return normal_name(obj.pool, obj.oid); + } + + int init_rados() override { + int ret; + cache.set_ctx(T::cct); + ret = T::init_rados(); + if (ret < 0) + return ret; + + return 0; + } + + bool need_watch_notify() override { + return true; + } + + int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op); + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) override; + + void set_cache_enabled(bool state) override { + cache.set_enabled(state); + } +public: + RGWCache() {} + + void register_chained_cache(RGWChainedCache *cc) override { + cache.chain_cache(cc); + } + + int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, + map& attrs, + map* rmattrs, + RGWObjVersionTracker *objv_tracker); + int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime, + map& attrs, int flags, + bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) override; + int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker = nullptr) override; + + int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj, + bufferlist& bl, off_t ofs, off_t end, + map *attrs, + rgw_cache_entry_info *cache_info) override; + + int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map *attrs, + bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override; + + int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override; + + bool chain_cache_entry(list& cache_info_entries, RGWChainedCache::Entry *chained_entry) override { + return cache.chain_cache_entry(cache_info_entries, chained_entry); + } +}; + +template +void RGWCache::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj) +{ + if (src_obj.size()) { + dst_pool = src_pool; + dst_obj = src_obj; + } else { + dst_pool = T::get_zone_params().domain_root; + dst_obj = src_pool.name; + } +} + +template +int RGWCache::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(obj); + cache.remove(name); + + ObjectCacheInfo info; + distribute_cache(name, obj, info, REMOVE_OBJ); + + return T::delete_system_obj(obj, objv_tracker); +} + +template +int RGWCache::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj, + bufferlist& obl, off_t ofs, off_t end, + map *attrs, + rgw_cache_entry_info *cache_info) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + if (ofs != 0) + return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info); + + string name = normal_name(obj.pool, oid); + + ObjectCacheInfo info; + + uint32_t flags = CACHE_FLAG_DATA; + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + if (attrs) + flags |= CACHE_FLAG_XATTRS; + + if (cache.get(name, info, flags, cache_info) == 0) { + if (info.status < 0) + return info.status; + + bufferlist& bl = info.data; + + bufferlist::iterator i = bl.begin(); + + obl.clear(); + + i.copy_all(obl); + if (objv_tracker) + objv_tracker->read_version = info.version; + if (attrs) + *attrs = info.xattrs; + return bl.length(); + } + int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info); + if (r < 0) { + if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors + info.status = r; + cache.put(name, info, cache_info); + } + return r; + } + + if (obl.length() == end + 1) { + /* in this case, most likely object contains more data, we can't cache it */ + return r; + } + + bufferptr p(r); + bufferlist& bl = info.data; + bl.clear(); + bufferlist::iterator o = obl.begin(); + o.copy_all(bl); + info.status = 0; + info.flags = flags; + if (objv_tracker) { + info.version = objv_tracker->read_version; + } + if (attrs) { + info.xattrs = *attrs; + } + cache.put(name, info, cache_info); + return r; +} + +template +int RGWCache::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, + map& attrs, + map* rmattrs, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + if (rmattrs) + info.rm_xattrs = *rmattrs; + info.status = 0; + info.flags = CACHE_FLAG_MODIFY_XATTRS; + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker); + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +template +int RGWCache::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime, + map& attrs, int flags, + bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + info.status = 0; + info.data = data; + info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META; + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + ceph::real_time result_mtime; + int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data, + objv_tracker, set_mtime); + if (mtime) { + *mtime = result_mtime; + } + info.meta.mtime = result_mtime; + info.meta.size = size; + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +template +int RGWCache::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + bool cacheable = false; + if ((ofs == 0) || (ofs == -1)) { + cacheable = true; + info.data = data; + info.meta.size = data.length(); + info.status = 0; + info.flags = CACHE_FLAG_DATA; + } + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker); + if (cacheable) { + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + } + + return ret; +} + +template +int RGWCache::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, + uint64_t *pepoch, map *attrs, + bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(pool, oid); + + uint64_t size; + real_time mtime; + uint64_t epoch; + + ObjectCacheInfo info; + uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + int r = cache.get(name, info, flags, NULL); + if (r == 0) { + if (info.status < 0) + return info.status; + + size = info.meta.size; + mtime = info.meta.mtime; + epoch = info.epoch; + if (objv_tracker) + objv_tracker->read_version = info.version; + goto done; + } + r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker); + if (r < 0) { + if (r == -ENOENT) { + info.status = r; + cache.put(name, info, NULL); + } + return r; + } + info.status = 0; + info.epoch = epoch; + info.meta.mtime = mtime; + info.meta.size = size; + info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) { + info.flags |= CACHE_FLAG_OBJV; + info.version = objv_tracker->read_version; + } + cache.put(name, info, NULL); +done: + if (psize) + *psize = size; + if (pmtime) + *pmtime = mtime; + if (pepoch) + *pepoch = epoch; + if (attrs) + *attrs = info.xattrs; + return 0; +} + +template +int RGWCache::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) +{ + RGWCacheNotifyInfo info; + + info.op = op; + + info.obj_info = obj_info; + info.obj = obj; + bufferlist bl; + ::encode(info, bl); + return T::distribute(normal_name, bl); +} + +template +int RGWCache::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + RGWCacheNotifyInfo info; + + try { + bufferlist::iterator iter = bl.begin(); + ::decode(info, iter); + } catch (buffer::end_of_buffer& err) { + mydout(0) << "ERROR: got bad notification" << dendl; + return -EIO; + } catch (buffer::error& err) { + mydout(0) << "ERROR: buffer::error" << dendl; + return -EIO; + } + + rgw_pool pool; + string oid; + normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); + string name = normal_name(pool, oid); + + switch (info.op) { + case UPDATE_OBJ: + cache.put(name, info.obj_info, NULL); + break; + case REMOVE_OBJ: + cache.remove(name); + break; + default: + mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl; + return -EINVAL; + } + + return 0; +} + +#endif