Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_cache.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RGWCACHE_H
5 #define CEPH_RGWCACHE_H
6
7 #include "rgw_rados.h"
8 #include <string>
9 #include <map>
10 #include "include/types.h"
11 #include "include/utime.h"
12 #include "include/assert.h"
13 #include "common/RWLock.h"
14
15 enum {
16   UPDATE_OBJ,
17   REMOVE_OBJ,
18 };
19
20 #define CACHE_FLAG_DATA           0x01
21 #define CACHE_FLAG_XATTRS         0x02
22 #define CACHE_FLAG_META           0x04
23 #define CACHE_FLAG_MODIFY_XATTRS  0x08
24 #define CACHE_FLAG_OBJV           0x10
25
26 #define mydout(v) lsubdout(T::cct, rgw, v)
27
28 struct ObjectMetaInfo {
29   uint64_t size;
30   real_time mtime;
31
32   ObjectMetaInfo() : size(0) {}
33
34   void encode(bufferlist& bl) const {
35     ENCODE_START(2, 2, bl);
36     ::encode(size, bl);
37     ::encode(mtime, bl);
38     ENCODE_FINISH(bl);
39   }
40   void decode(bufferlist::iterator& bl) {
41     DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
42     ::decode(size, bl);
43     ::decode(mtime, bl);
44     DECODE_FINISH(bl);
45   }
46   void dump(Formatter *f) const;
47   static void generate_test_instances(list<ObjectMetaInfo*>& o);
48 };
49 WRITE_CLASS_ENCODER(ObjectMetaInfo)
50
51 struct ObjectCacheInfo {
52   int status;
53   uint32_t flags;
54   uint64_t epoch;
55   bufferlist data;
56   map<string, bufferlist> xattrs;
57   map<string, bufferlist> rm_xattrs;
58   ObjectMetaInfo meta;
59   obj_version version;
60
61   ObjectCacheInfo() : status(0), flags(0), epoch(0), version() {}
62
63   void encode(bufferlist& bl) const {
64     ENCODE_START(5, 3, bl);
65     ::encode(status, bl);
66     ::encode(flags, bl);
67     ::encode(data, bl);
68     ::encode(xattrs, bl);
69     ::encode(meta, bl);
70     ::encode(rm_xattrs, bl);
71     ::encode(epoch, bl);
72     ::encode(version, bl);
73     ENCODE_FINISH(bl);
74   }
75   void decode(bufferlist::iterator& bl) {
76     DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, bl);
77     ::decode(status, bl);
78     ::decode(flags, bl);
79     ::decode(data, bl);
80     ::decode(xattrs, bl);
81     ::decode(meta, bl);
82     if (struct_v >= 2)
83       ::decode(rm_xattrs, bl);
84     if (struct_v >= 4)
85       ::decode(epoch, bl);
86     if (struct_v >= 5)
87       ::decode(version, bl);
88     DECODE_FINISH(bl);
89   }
90   void dump(Formatter *f) const;
91   static void generate_test_instances(list<ObjectCacheInfo*>& o);
92 };
93 WRITE_CLASS_ENCODER(ObjectCacheInfo)
94
95 struct RGWCacheNotifyInfo {
96   uint32_t op;
97   rgw_raw_obj obj;
98   ObjectCacheInfo obj_info;
99   off_t ofs;
100   string ns;
101
102   RGWCacheNotifyInfo() : op(0), ofs(0) {}
103
104   void encode(bufferlist& obl) const {
105     ENCODE_START(2, 2, obl);
106     ::encode(op, obl);
107     ::encode(obj, obl);
108     ::encode(obj_info, obl);
109     ::encode(ofs, obl);
110     ::encode(ns, obl);
111     ENCODE_FINISH(obl);
112   }
113   void decode(bufferlist::iterator& ibl) {
114     DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, ibl);
115     ::decode(op, ibl);
116     ::decode(obj, ibl);
117     ::decode(obj_info, ibl);
118     ::decode(ofs, ibl);
119     ::decode(ns, ibl);
120     DECODE_FINISH(ibl);
121   }
122   void dump(Formatter *f) const;
123   static void generate_test_instances(list<RGWCacheNotifyInfo*>& o);
124 };
125 WRITE_CLASS_ENCODER(RGWCacheNotifyInfo)
126
127 struct ObjectCacheEntry {
128   ObjectCacheInfo info;
129   std::list<string>::iterator lru_iter;
130   uint64_t lru_promotion_ts;
131   uint64_t gen;
132   std::list<pair<RGWChainedCache *, string> > chained_entries;
133
134   ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
135 };
136
137 class ObjectCache {
138   std::map<string, ObjectCacheEntry> cache_map;
139   std::list<string> lru;
140   unsigned long lru_size;
141   unsigned long lru_counter;
142   unsigned long lru_window;
143   RWLock lock;
144   CephContext *cct;
145
146   list<RGWChainedCache *> chained_cache;
147
148   bool enabled;
149
150   void touch_lru(string& name, ObjectCacheEntry& entry, std::list<string>::iterator& lru_iter);
151   void remove_lru(string& name, std::list<string>::iterator& lru_iter);
152
153   void do_invalidate_all();
154 public:
155   ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { }
156   int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
157   void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
158   void remove(std::string& name);
159   void set_ctx(CephContext *_cct) {
160     cct = _cct;
161     lru_window = cct->_conf->rgw_cache_lru_size / 2;
162   }
163   bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry);
164
165   void set_enabled(bool status);
166
167   void chain_cache(RGWChainedCache *cache);
168   void invalidate_all();
169 };
170
171 template <class T>
172 class RGWCache  : public T
173 {
174   ObjectCache cache;
175
176   int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
177     return T::list_objects_raw_init(pool, handle);
178   }
179   int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
180     return T::list_objects_raw_next(obj, handle);
181   }
182
183   string normal_name(rgw_pool& pool, const std::string& oid) {
184     std::string buf;
185     buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
186     buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
187     return buf;
188   }
189
190   void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
191   string normal_name(rgw_raw_obj& obj) {
192     return normal_name(obj.pool, obj.oid);
193   }
194
195   int init_rados() override {
196     int ret;
197     cache.set_ctx(T::cct);
198     ret = T::init_rados();
199     if (ret < 0)
200       return ret;
201
202     return 0;
203   }
204
205   bool need_watch_notify() override {
206     return true;
207   }
208
209   int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
210   int watch_cb(uint64_t notify_id,
211                uint64_t cookie,
212                uint64_t notifier_id,
213                bufferlist& bl) override;
214
215   void set_cache_enabled(bool state) override {
216     cache.set_enabled(state);
217   }
218 public:
219   RGWCache() {}
220
221   void register_chained_cache(RGWChainedCache *cc) override {
222     cache.chain_cache(cc);
223   }
224
225   int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, 
226                 map<string, bufferlist>& attrs,
227                 map<string, bufferlist>* rmattrs,
228                 RGWObjVersionTracker *objv_tracker);
229   int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
230               map<std::string, bufferlist>& attrs, int flags,
231               bufferlist& data,
232               RGWObjVersionTracker *objv_tracker,
233               real_time set_mtime) override;
234   int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive,
235                           RGWObjVersionTracker *objv_tracker = nullptr) override;
236
237   int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
238                      RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
239                      bufferlist& bl, off_t ofs, off_t end,
240                      map<string, bufferlist> *attrs,
241                      rgw_cache_entry_info *cache_info) override;
242
243   int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
244                    bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
245
246   int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
247
248   bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
249     return cache.chain_cache_entry(cache_info_entries, chained_entry);
250   }
251 };
252
253 template <class T>
254 void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
255 {
256   if (src_obj.size()) {
257     dst_pool = src_pool;
258     dst_obj = src_obj;
259   } else {
260     dst_pool = T::get_zone_params().domain_root;
261     dst_obj = src_pool.name;
262   }
263 }
264
265 template <class T>
266 int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
267 {
268   rgw_pool pool;
269   string oid;
270   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
271
272   string name = normal_name(obj);
273   cache.remove(name);
274
275   ObjectCacheInfo info;
276   distribute_cache(name, obj, info, REMOVE_OBJ);
277
278   return T::delete_system_obj(obj, objv_tracker);
279 }
280
281 template <class T>
282 int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
283                      RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
284                      bufferlist& obl, off_t ofs, off_t end,
285                      map<string, bufferlist> *attrs,
286                      rgw_cache_entry_info *cache_info)
287 {
288   rgw_pool pool;
289   string oid;
290   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
291   if (ofs != 0)
292     return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
293
294   string name = normal_name(obj.pool, oid);
295
296   ObjectCacheInfo info;
297
298   uint32_t flags = CACHE_FLAG_DATA;
299   if (objv_tracker)
300     flags |= CACHE_FLAG_OBJV;
301   if (attrs)
302     flags |= CACHE_FLAG_XATTRS;
303   
304   if (cache.get(name, info, flags, cache_info) == 0) {
305     if (info.status < 0)
306       return info.status;
307
308     bufferlist& bl = info.data;
309
310     bufferlist::iterator i = bl.begin();
311
312     obl.clear();
313
314     i.copy_all(obl);
315     if (objv_tracker)
316       objv_tracker->read_version = info.version;
317     if (attrs)
318       *attrs = info.xattrs;
319     return bl.length();
320   }
321   int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
322   if (r < 0) {
323     if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
324       info.status = r;
325       cache.put(name, info, cache_info);
326     }
327     return r;
328   }
329
330   if (obl.length() == end + 1) {
331     /* in this case, most likely object contains more data, we can't cache it */
332     return r;
333   }
334
335   bufferptr p(r);
336   bufferlist& bl = info.data;
337   bl.clear();
338   bufferlist::iterator o = obl.begin();
339   o.copy_all(bl);
340   info.status = 0;
341   info.flags = flags;
342   if (objv_tracker) {
343     info.version = objv_tracker->read_version;
344   }
345   if (attrs) {
346     info.xattrs = *attrs;
347   }
348   cache.put(name, info, cache_info);
349   return r;
350 }
351
352 template <class T>
353 int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, 
354                            map<string, bufferlist>& attrs,
355                            map<string, bufferlist>* rmattrs,
356                            RGWObjVersionTracker *objv_tracker) 
357 {
358   rgw_pool pool;
359   string oid;
360   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
361   ObjectCacheInfo info;
362   info.xattrs = attrs;
363   if (rmattrs)
364     info.rm_xattrs = *rmattrs;
365   info.status = 0;
366   info.flags = CACHE_FLAG_MODIFY_XATTRS;
367   if (objv_tracker) {
368     info.version = objv_tracker->write_version;
369     info.flags |= CACHE_FLAG_OBJV;
370   }
371   int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
372   string name = normal_name(pool, oid);
373   if (ret >= 0) {
374     cache.put(name, info, NULL);
375     int r = distribute_cache(name, obj, info, UPDATE_OBJ);
376     if (r < 0)
377       mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
378   } else {
379     cache.remove(name);
380   }
381
382   return ret;
383 }
384
385 template <class T>
386 int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
387               map<std::string, bufferlist>& attrs, int flags,
388               bufferlist& data,
389               RGWObjVersionTracker *objv_tracker,
390               real_time set_mtime)
391 {
392   rgw_pool pool;
393   string oid;
394   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
395   ObjectCacheInfo info;
396   info.xattrs = attrs;
397   info.status = 0;
398   info.data = data;
399   info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
400   if (objv_tracker) {
401     info.version = objv_tracker->write_version;
402     info.flags |= CACHE_FLAG_OBJV;
403   }
404   ceph::real_time result_mtime;
405   int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
406                                    objv_tracker, set_mtime);
407   if (mtime) {
408     *mtime = result_mtime;
409   }
410   info.meta.mtime = result_mtime;
411   info.meta.size = size;
412   string name = normal_name(pool, oid);
413   if (ret >= 0) {
414     cache.put(name, info, NULL);
415     int r = distribute_cache(name, obj, info, UPDATE_OBJ);
416     if (r < 0)
417       mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
418   } else {
419     cache.remove(name);
420   }
421
422   return ret;
423 }
424
425 template <class T>
426 int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive,
427                                      RGWObjVersionTracker *objv_tracker)
428 {
429   rgw_pool pool;
430   string oid;
431   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
432   ObjectCacheInfo info;
433   bool cacheable = false;
434   if ((ofs == 0) || (ofs == -1)) {
435     cacheable = true;
436     info.data = data;
437     info.meta.size = data.length();
438     info.status = 0;
439     info.flags = CACHE_FLAG_DATA;
440   }
441   if (objv_tracker) {
442     info.version = objv_tracker->write_version;
443     info.flags |= CACHE_FLAG_OBJV;
444   }
445   int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
446   if (cacheable) {
447     string name = normal_name(pool, oid);
448     if (ret >= 0) {
449       cache.put(name, info, NULL);
450       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
451       if (r < 0)
452         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
453     } else {
454       cache.remove(name);
455     }
456   }
457
458   return ret;
459 }
460
461 template <class T>
462 int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
463                           uint64_t *pepoch, map<string, bufferlist> *attrs,
464                           bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
465 {
466   rgw_pool pool;
467   string oid;
468   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
469
470   string name = normal_name(pool, oid);
471
472   uint64_t size;
473   real_time mtime;
474   uint64_t epoch;
475
476   ObjectCacheInfo info;
477   uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
478   if (objv_tracker)
479     flags |= CACHE_FLAG_OBJV;
480   int r = cache.get(name, info, flags, NULL);
481   if (r == 0) {
482     if (info.status < 0)
483       return info.status;
484
485     size = info.meta.size;
486     mtime = info.meta.mtime;
487     epoch = info.epoch;
488     if (objv_tracker)
489       objv_tracker->read_version = info.version;
490     goto done;
491   }
492   r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
493   if (r < 0) {
494     if (r == -ENOENT) {
495       info.status = r;
496       cache.put(name, info, NULL);
497     }
498     return r;
499   }
500   info.status = 0;
501   info.epoch = epoch;
502   info.meta.mtime = mtime;
503   info.meta.size = size;
504   info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
505   if (objv_tracker) {
506     info.flags |= CACHE_FLAG_OBJV;
507     info.version = objv_tracker->read_version;
508   }
509   cache.put(name, info, NULL);
510 done:
511   if (psize)
512     *psize = size;
513   if (pmtime)
514     *pmtime = mtime;
515   if (pepoch)
516     *pepoch = epoch;
517   if (attrs)
518     *attrs = info.xattrs;
519   return 0;
520 }
521
522 template <class T>
523 int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
524 {
525   RGWCacheNotifyInfo info;
526
527   info.op = op;
528
529   info.obj_info = obj_info;
530   info.obj = obj;
531   bufferlist bl;
532   ::encode(info, bl);
533   return T::distribute(normal_name, bl);
534 }
535
536 template <class T>
537 int RGWCache<T>::watch_cb(uint64_t notify_id,
538                           uint64_t cookie,
539                           uint64_t notifier_id,
540                           bufferlist& bl)
541 {
542   RGWCacheNotifyInfo info;
543
544   try {
545     bufferlist::iterator iter = bl.begin();
546     ::decode(info, iter);
547   } catch (buffer::end_of_buffer& err) {
548     mydout(0) << "ERROR: got bad notification" << dendl;
549     return -EIO;
550   } catch (buffer::error& err) {
551     mydout(0) << "ERROR: buffer::error" << dendl;
552     return -EIO;
553   }
554
555   rgw_pool pool;
556   string oid;
557   normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
558   string name = normal_name(pool, oid);
559   
560   switch (info.op) {
561   case UPDATE_OBJ:
562     cache.put(name, info.obj_info, NULL);
563     break;
564   case REMOVE_OBJ:
565     cache.remove(name);
566     break;
567   default:
568     mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
569     return -EINVAL;
570   }
571
572   return 0;
573 }
574
575 #endif