Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / QueueRing.h
1 #ifndef QUEUE_RING_H
2 #define QUEUE_RING_H
3
4 #include "common/Mutex.h"
5 #include "common/Cond.h"
6
7 #include <list>
8 #include <atomic>
9 #include <vector>
10
11 template <class T>
12 class QueueRing {
13   struct QueueBucket {
14     Mutex lock;
15     Cond cond;
16     typename std::list<T> entries;
17
18     QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
19     QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
20       entries = rhs.entries;
21     }
22
23     void enqueue(const T& entry) {
24       lock.Lock();
25       if (entries.empty()) {
26         cond.Signal();
27       }
28       entries.push_back(entry);
29       lock.Unlock();
30     }
31
32     void dequeue(T *entry) {
33       lock.Lock();
34       if (entries.empty()) {
35         cond.Wait(lock);
36       };
37       assert(!entries.empty());
38       *entry = entries.front();
39       entries.pop_front();
40       lock.Unlock();
41     };
42   };
43
44   std::vector<QueueBucket> buckets;
45   int num_buckets;
46
47   std::atomic<int64_t> cur_read_bucket = { 0 };
48   std::atomic<int64_t> cur_write_bucket = { 0 };
49
50 public:
51   QueueRing(int n) : buckets(n), num_buckets(n) {
52   }
53
54   void enqueue(const T& entry) {
55     buckets[++cur_write_bucket % num_buckets].enqueue(entry);
56   };
57
58   void dequeue(T *entry) {
59     buckets[++cur_read_bucket % num_buckets].dequeue(entry);
60   }
61 };
62
63 #endif