Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / shared_cache.hpp
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_SHAREDCACHE_H
16 #define CEPH_SHAREDCACHE_H
17
18 #include <map>
19 #include <list>
20 #include "common/Mutex.h"
21 #include "common/Cond.h"
22 #include "include/unordered_map.h"
23
24 // re-include our assert to clobber the system one; fix dout:
25 #include "include/assert.h"
26
27 template <class K, class V, class C = std::less<K>, class H = std::hash<K> >
28 class SharedLRU {
29   CephContext *cct;
30   typedef ceph::shared_ptr<V> VPtr;
31   typedef ceph::weak_ptr<V> WeakVPtr;
32   Mutex lock;
33   size_t max_size;
34   Cond cond;
35   unsigned size;
36 public:
37   int waiting;
38 private:
39   ceph::unordered_map<K, typename list<pair<K, VPtr> >::iterator, H> contents;
40   list<pair<K, VPtr> > lru;
41
42   map<K, pair<WeakVPtr, V*>, C> weak_refs;
43
44   void trim_cache(list<VPtr> *to_release) {
45     while (size > max_size) {
46       to_release->push_back(lru.back().second);
47       lru_remove(lru.back().first);
48     }
49   }
50
51   void lru_remove(const K& key) {
52     typename ceph::unordered_map<K, typename list<pair<K, VPtr> >::iterator, H>::iterator i = 
53       contents.find(key);
54     if (i == contents.end())
55       return;
56     lru.erase(i->second);
57     --size;
58     contents.erase(i);
59   }
60
61   void lru_add(const K& key, const VPtr& val, list<VPtr> *to_release) {
62     typename ceph::unordered_map<K, typename list<pair<K, VPtr> >::iterator, H>::iterator i =
63       contents.find(key);
64     if (i != contents.end()) {
65       lru.splice(lru.begin(), lru, i->second);
66     } else {
67       ++size;
68       lru.push_front(make_pair(key, val));
69       contents[key] = lru.begin();
70       trim_cache(to_release);
71     }
72   }
73
74   void remove(const K& key, V *valptr) {
75     Mutex::Locker l(lock);
76     typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.find(key);
77     if (i != weak_refs.end() && i->second.second == valptr) {
78       weak_refs.erase(i);
79     }
80     cond.Signal();
81   }
82
83   class Cleanup {
84   public:
85     SharedLRU<K, V, C> *cache;
86     K key;
87     Cleanup(SharedLRU<K, V, C> *cache, K key) : cache(cache), key(key) {}
88     void operator()(V *ptr) {
89       cache->remove(key, ptr);
90       delete ptr;
91     }
92   };
93
94 public:
95   SharedLRU(CephContext *cct = NULL, size_t max_size = 20)
96     : cct(cct), lock("SharedLRU::lock"), max_size(max_size), 
97       size(0), waiting(0) {
98     contents.rehash(max_size); 
99   }
100   
101   ~SharedLRU() {
102     contents.clear();
103     lru.clear();
104     if (!weak_refs.empty()) {
105       lderr(cct) << "leaked refs:\n";
106       dump_weak_refs(*_dout);
107       *_dout << dendl;
108       assert(weak_refs.empty());
109     }
110   }
111
112   /// adjust container comparator (for purposes of get_next sort order)
113   void reset_comparator(C comp) {
114     // get_next uses weak_refs; that's the only container we need to
115     // reorder.
116     map<K, pair<WeakVPtr, V*>, C> temp;
117
118     Mutex::Locker l(lock);
119     temp.swap(weak_refs);
120
121     // reconstruct with new comparator
122     weak_refs = map<K, pair<WeakVPtr, V*>, C>(comp);
123     weak_refs.insert(temp.begin(), temp.end());
124   }
125
126   C get_comparator() {
127     return weak_refs.key_comp();
128   }
129
130   void set_cct(CephContext *c) {
131     cct = c;
132   }
133
134   void dump_weak_refs() {
135     lderr(cct) << "leaked refs:\n";
136     dump_weak_refs(*_dout);
137     *_dout << dendl;
138   }
139
140   void dump_weak_refs(ostream& out) {
141     for (typename map<K, pair<WeakVPtr, V*>, C>::iterator p = weak_refs.begin();
142          p != weak_refs.end();
143          ++p) {
144       out << __func__ << " " << this << " weak_refs: "
145           << p->first << " = " << p->second.second
146           << " with " << p->second.first.use_count() << " refs"
147           << std::endl;
148     }
149   }
150
151   //clear all strong reference from the lru.
152   void clear() {
153     while (true) {
154       VPtr val; // release any ref we have after we drop the lock
155       Mutex::Locker l(lock);
156       if (size == 0)
157         break;
158
159       val = lru.back().second;
160       lru_remove(lru.back().first);
161     }
162   }
163
164   void clear(const K& key) {
165     VPtr val; // release any ref we have after we drop the lock
166     {
167       Mutex::Locker l(lock);
168       typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.find(key);
169       if (i != weak_refs.end()) {
170         val = i->second.first.lock();
171       }
172       lru_remove(key);
173     }
174   }
175
176   void purge(const K &key) {
177     VPtr val; // release any ref we have after we drop the lock
178     {
179       Mutex::Locker l(lock);
180       typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.find(key);
181       if (i != weak_refs.end()) {
182         val = i->second.first.lock();
183         weak_refs.erase(i);
184       }
185       lru_remove(key);
186     }
187   }
188
189   void set_size(size_t new_size) {
190     list<VPtr> to_release;
191     {
192       Mutex::Locker l(lock);
193       max_size = new_size;
194       trim_cache(&to_release);
195     }
196   }
197
198   // Returns K key s.t. key <= k for all currently cached k,v
199   K cached_key_lower_bound() {
200     Mutex::Locker l(lock);
201     return weak_refs.begin()->first;
202   }
203
204   VPtr lower_bound(const K& key) {
205     VPtr val;
206     list<VPtr> to_release;
207     {
208       Mutex::Locker l(lock);
209       ++waiting;
210       bool retry = false;
211       do {
212         retry = false;
213         if (weak_refs.empty())
214           break;
215         typename map<K, pair<WeakVPtr, V*>, C>::iterator i =
216           weak_refs.lower_bound(key);
217         if (i == weak_refs.end())
218           --i;
219         val = i->second.first.lock();
220         if (val) {
221           lru_add(i->first, val, &to_release);
222         } else {
223           retry = true;
224         }
225         if (retry)
226           cond.Wait(lock);
227       } while (retry);
228       --waiting;
229     }
230     return val;
231   }
232   bool get_next(const K &key, pair<K, VPtr> *next) {
233     pair<K, VPtr> r;
234     {
235       Mutex::Locker l(lock);
236       VPtr next_val;
237       typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.upper_bound(key);
238
239       while (i != weak_refs.end() &&
240              !(next_val = i->second.first.lock()))
241         ++i;
242
243       if (i == weak_refs.end())
244         return false;
245
246       if (next)
247         r = make_pair(i->first, next_val);
248     }
249     if (next)
250       *next = r;
251     return true;
252   }
253   bool get_next(const K &key, pair<K, V> *next) {
254     pair<K, VPtr> r;
255     bool found = get_next(key, &r);
256     if (!found || !next)
257       return found;
258     next->first = r.first;
259     assert(r.second);
260     next->second = *(r.second);
261     return found;
262   }
263
264   VPtr lookup(const K& key) {
265     VPtr val;
266     list<VPtr> to_release;
267     {
268       Mutex::Locker l(lock);
269       ++waiting;
270       bool retry = false;
271       do {
272         retry = false;
273         typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.find(key);
274         if (i != weak_refs.end()) {
275           val = i->second.first.lock();
276           if (val) {
277             lru_add(key, val, &to_release);
278           } else {
279             retry = true;
280           }
281         }
282         if (retry)
283           cond.Wait(lock);
284       } while (retry);
285       --waiting;
286     }
287     return val;
288   }
289   VPtr lookup_or_create(const K &key) {
290     VPtr val;
291     list<VPtr> to_release;
292     {
293       Mutex::Locker l(lock);
294       bool retry = false;
295       do {
296         retry = false;
297         typename map<K, pair<WeakVPtr, V*>, C>::iterator i = weak_refs.find(key);
298         if (i != weak_refs.end()) {
299           val = i->second.first.lock();
300           if (val) {
301             lru_add(key, val, &to_release);
302             return val;
303           } else {
304             retry = true;
305           }
306         }
307         if (retry)
308           cond.Wait(lock);
309       } while (retry);
310
311       V *new_value = new V();
312       VPtr new_val(new_value, Cleanup(this, key));
313       weak_refs.insert(make_pair(key, make_pair(new_val, new_value)));
314       lru_add(key, new_val, &to_release);
315       return new_val;
316     }
317   }
318
319   /**
320    * empty()
321    *
322    * Returns true iff there are no live references left to anything that has been
323    * in the cache.
324    */
325   bool empty() {
326     Mutex::Locker l(lock);
327     return weak_refs.empty();
328   }
329
330   /***
331    * Inserts a key if not present, or bumps it to the front of the LRU if
332    * it is, and then gives you a reference to the value. If the key already
333    * existed, you are responsible for deleting the new value you tried to
334    * insert.
335    *
336    * @param key The key to insert
337    * @param value The value that goes with the key
338    * @param existed Set to true if the value was already in the
339    * map, false otherwise
340    * @return A reference to the map's value for the given key
341    */
342   VPtr add(const K& key, V *value, bool *existed = NULL) {
343     VPtr val;
344     list<VPtr> to_release;
345     {
346       Mutex::Locker l(lock);
347       typename map<K, pair<WeakVPtr, V*>, C>::iterator actual =
348         weak_refs.lower_bound(key);
349       if (actual != weak_refs.end() && actual->first == key) {
350         if (existed) 
351           *existed = true;
352
353         return actual->second.first.lock();
354       }
355
356       if (existed)      
357         *existed = false;
358
359       val = VPtr(value, Cleanup(this, key));
360       weak_refs.insert(actual, make_pair(key, make_pair(val, value)));
361       lru_add(key, val, &to_release);
362     }
363     return val;
364   }
365
366   friend class SharedLRUTest;
367 };
368
369 #endif