--- /dev/null
+#ifndef QUEUE_RING_H
+#define QUEUE_RING_H
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+#include <list>
+#include <atomic>
+#include <vector>
+
+template <class T>
+class QueueRing {
+ struct QueueBucket {
+ Mutex lock;
+ Cond cond;
+ typename std::list<T> entries;
+
+ QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
+ QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
+ entries = rhs.entries;
+ }
+
+ void enqueue(const T& entry) {
+ lock.Lock();
+ if (entries.empty()) {
+ cond.Signal();
+ }
+ entries.push_back(entry);
+ lock.Unlock();
+ }
+
+ void dequeue(T *entry) {
+ lock.Lock();
+ if (entries.empty()) {
+ cond.Wait(lock);
+ };
+ assert(!entries.empty());
+ *entry = entries.front();
+ entries.pop_front();
+ lock.Unlock();
+ };
+ };
+
+ std::vector<QueueBucket> buckets;
+ int num_buckets;
+
+ std::atomic<int64_t> cur_read_bucket = { 0 };
+ std::atomic<int64_t> cur_write_bucket = { 0 };
+
+public:
+ QueueRing(int n) : buckets(n), num_buckets(n) {
+ }
+
+ void enqueue(const T& entry) {
+ buckets[++cur_write_bucket % num_buckets].enqueue(entry);
+ };
+
+ void dequeue(T *entry) {
+ buckets[++cur_read_bucket % num_buckets].dequeue(entry);
+ }
+};
+
+#endif