Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / PrioritizedQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef PRIORITY_QUEUE_H
16 #define PRIORITY_QUEUE_H
17
18 #include "common/Formatter.h"
19 #include "common/OpQueue.h"
20
21 /**
22  * Manages queue for normal and strict priority items
23  *
24  * On dequeue, the queue will select the lowest priority queue
25  * such that the q has bucket > cost of front queue item.
26  *
27  * If there is no such queue, we choose the next queue item for
28  * the highest priority queue.
29  *
30  * Before returning a dequeued item, we place into each bucket
31  * cost * (priority/total_priority) tokens.
32  *
33  * enqueue_strict and enqueue_strict_front queue items into queues
34  * which are serviced in strict priority order before items queued
35  * with enqueue and enqueue_front
36  *
37  * Within a priority class, we schedule round robin based on the class
38  * of type K used to enqueue items.  e.g. you could use entity_inst_t
39  * to provide fairness for different clients.
40  */
41 template <typename T, typename K>
42 class PrioritizedQueue : public OpQueue <T, K> {
43   int64_t total_priority;
44   int64_t max_tokens_per_subqueue;
45   int64_t min_cost;
46
47   typedef std::list<std::pair<unsigned, T> > ListPairs;
48
49   struct SubQueue {
50   private:
51     typedef std::map<K, ListPairs> Classes;
52     Classes q;
53     unsigned tokens, max_tokens;
54     int64_t size;
55     typename Classes::iterator cur;
56   public:
57     SubQueue(const SubQueue &other)
58       : q(other.q),
59         tokens(other.tokens),
60         max_tokens(other.max_tokens),
61         size(other.size),
62         cur(q.begin()) {}
63     SubQueue()
64       : tokens(0),
65         max_tokens(0),
66         size(0), cur(q.begin()) {}
67     void set_max_tokens(unsigned mt) {
68       max_tokens = mt;
69     }
70     unsigned get_max_tokens() const {
71       return max_tokens;
72     }
73     unsigned num_tokens() const {
74       return tokens;
75     }
76     void put_tokens(unsigned t) {
77       tokens += t;
78       if (tokens > max_tokens) {
79         tokens = max_tokens;
80       }
81     }
82     void take_tokens(unsigned t) {
83       if (tokens > t) {
84         tokens -= t;
85       } else {
86         tokens = 0;
87       }
88     }
89     void enqueue(K cl, unsigned cost, T item) {
90       q[cl].push_back(std::make_pair(cost, item));
91       if (cur == q.end())
92         cur = q.begin();
93       size++;
94     }
95     void enqueue_front(K cl, unsigned cost, T item) {
96       q[cl].push_front(std::make_pair(cost, item));
97       if (cur == q.end())
98         cur = q.begin();
99       size++;
100     }
101     std::pair<unsigned, T> front() const {
102       assert(!(q.empty()));
103       assert(cur != q.end());
104       return cur->second.front();
105     }
106     void pop_front() {
107       assert(!(q.empty()));
108       assert(cur != q.end());
109       cur->second.pop_front();
110       if (cur->second.empty()) {
111         q.erase(cur++);
112       } else {
113         ++cur;
114       }
115       if (cur == q.end()) {
116         cur = q.begin();
117       }
118       size--;
119     }
120     unsigned length() const {
121       assert(size >= 0);
122       return (unsigned)size;
123     }
124     bool empty() const {
125       return q.empty();
126     }
127     void remove_by_class(K k, std::list<T> *out) {
128       typename Classes::iterator i = q.find(k);
129       if (i == q.end()) {
130         return;
131       }
132       size -= i->second.size();
133       if (i == cur) {
134         ++cur;
135       }
136       if (out) {
137         for (typename ListPairs::reverse_iterator j =
138                i->second.rbegin();
139              j != i->second.rend();
140              ++j) {
141           out->push_front(j->second);
142         }
143       }
144       q.erase(i);
145       if (cur == q.end()) {
146         cur = q.begin();
147       }
148     }
149
150     void dump(ceph::Formatter *f) const {
151       f->dump_int("tokens", tokens);
152       f->dump_int("max_tokens", max_tokens);
153       f->dump_int("size", size);
154       f->dump_int("num_keys", q.size());
155       if (!empty()) {
156         f->dump_int("first_item_cost", front().first);
157       }
158     }
159   };
160
161   typedef std::map<unsigned, SubQueue> SubQueues;
162   SubQueues high_queue;
163   SubQueues queue;
164
165   SubQueue *create_queue(unsigned priority) {
166     typename SubQueues::iterator p = queue.find(priority);
167     if (p != queue.end()) {
168       return &p->second;
169     }
170     total_priority += priority;
171     SubQueue *sq = &queue[priority];
172     sq->set_max_tokens(max_tokens_per_subqueue);
173     return sq;
174   }
175
176   void remove_queue(unsigned priority) {
177     assert(queue.count(priority));
178     queue.erase(priority);
179     total_priority -= priority;
180     assert(total_priority >= 0);
181   }
182
183   void distribute_tokens(unsigned cost) {
184     if (total_priority == 0) {
185       return;
186     }
187     for (typename SubQueues::iterator i = queue.begin();
188          i != queue.end();
189          ++i) {
190       i->second.put_tokens(((i->first * cost) / total_priority) + 1);
191     }
192   }
193
194 public:
195   PrioritizedQueue(unsigned max_per, unsigned min_c)
196     : total_priority(0),
197       max_tokens_per_subqueue(max_per),
198       min_cost(min_c)
199   {}
200
201   unsigned length() const final {
202     unsigned total = 0;
203     for (typename SubQueues::const_iterator i = queue.begin();
204          i != queue.end();
205          ++i) {
206       assert(i->second.length());
207       total += i->second.length();
208     }
209     for (typename SubQueues::const_iterator i = high_queue.begin();
210          i != high_queue.end();
211          ++i) {
212       assert(i->second.length());
213       total += i->second.length();
214     }
215     return total;
216   }
217
218   void remove_by_class(K k, std::list<T> *out = 0) final {
219     for (typename SubQueues::iterator i = queue.begin();
220          i != queue.end();
221          ) {
222       i->second.remove_by_class(k, out);
223       if (i->second.empty()) {
224         unsigned priority = i->first;
225         ++i;
226         remove_queue(priority);
227       } else {
228         ++i;
229       }
230     }
231     for (typename SubQueues::iterator i = high_queue.begin();
232          i != high_queue.end();
233          ) {
234       i->second.remove_by_class(k, out);
235       if (i->second.empty()) {
236         high_queue.erase(i++);
237       } else {
238         ++i;
239       }
240     }
241   }
242
243   void enqueue_strict(K cl, unsigned priority, T item) final {
244     high_queue[priority].enqueue(cl, 0, item);
245   }
246
247   void enqueue_strict_front(K cl, unsigned priority, T item) final {
248     high_queue[priority].enqueue_front(cl, 0, item);
249   }
250
251   void enqueue(K cl, unsigned priority, unsigned cost, T item) final {
252     if (cost < min_cost)
253       cost = min_cost;
254     if (cost > max_tokens_per_subqueue)
255       cost = max_tokens_per_subqueue;
256     create_queue(priority)->enqueue(cl, cost, item);
257   }
258
259   void enqueue_front(K cl, unsigned priority, unsigned cost, T item) final {
260     if (cost < min_cost)
261       cost = min_cost;
262     if (cost > max_tokens_per_subqueue)
263       cost = max_tokens_per_subqueue;
264     create_queue(priority)->enqueue_front(cl, cost, item);
265   }
266
267   bool empty() const final {
268     assert(total_priority >= 0);
269     assert((total_priority == 0) || !(queue.empty()));
270     return queue.empty() && high_queue.empty();
271   }
272
273   T dequeue() final {
274     assert(!empty());
275
276     if (!(high_queue.empty())) {
277       T ret = high_queue.rbegin()->second.front().second;
278       high_queue.rbegin()->second.pop_front();
279       if (high_queue.rbegin()->second.empty()) {
280         high_queue.erase(high_queue.rbegin()->first);
281       }
282       return ret;
283     }
284
285     // if there are multiple buckets/subqueues with sufficient tokens,
286     // we behave like a strict priority queue among all subqueues that
287     // are eligible to run.
288     for (typename SubQueues::iterator i = queue.begin();
289          i != queue.end();
290          ++i) {
291       assert(!(i->second.empty()));
292       if (i->second.front().first < i->second.num_tokens()) {
293         T ret = i->second.front().second;
294         unsigned cost = i->second.front().first;
295         i->second.take_tokens(cost);
296         i->second.pop_front();
297         if (i->second.empty()) {
298           remove_queue(i->first);
299         }
300         distribute_tokens(cost);
301         return ret;
302       }
303     }
304
305     // if no subqueues have sufficient tokens, we behave like a strict
306     // priority queue.
307     T ret = queue.rbegin()->second.front().second;
308     unsigned cost = queue.rbegin()->second.front().first;
309     queue.rbegin()->second.pop_front();
310     if (queue.rbegin()->second.empty()) {
311       remove_queue(queue.rbegin()->first);
312     }
313     distribute_tokens(cost);
314     return ret;
315   }
316
317   void dump(ceph::Formatter *f) const final {
318     f->dump_int("total_priority", total_priority);
319     f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
320     f->dump_int("min_cost", min_cost);
321     f->open_array_section("high_queues");
322     for (typename SubQueues::const_iterator p = high_queue.begin();
323          p != high_queue.end();
324          ++p) {
325       f->open_object_section("subqueue");
326       f->dump_int("priority", p->first);
327       p->second.dump(f);
328       f->close_section();
329     }
330     f->close_section();
331     f->open_array_section("queues");
332     for (typename SubQueues::const_iterator p = queue.begin();
333          p != queue.end();
334          ++p) {
335       f->open_object_section("subqueue");
336       f->dump_int("priority", p->first);
337       p->second.dump(f);
338       f->close_section();
339     }
340     f->close_section();
341   }
342 };
343
344 #endif