Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / WeightedPriorityQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef WP_QUEUE_H
16 #define WP_QUEUE_H
17
18 #include "OpQueue.h"
19
20 #include <boost/intrusive/list.hpp>
21 #include <boost/intrusive/rbtree.hpp>
22 #include <boost/intrusive/avl_set.hpp>
23
24 namespace bi = boost::intrusive;
25
26 template <typename T, typename S>
27 class MapKey
28 {
29   public:
30   bool operator()(const S i, const T &k) const
31   {
32     return i < k.key;
33   }
34   bool operator()(const T &k, const S i) const
35   {
36     return k.key < i;
37   }
38 };
39
40 template <typename T>
41 class DelItem
42 {
43   public:
44   void operator()(T* delete_this)
45     { delete delete_this; }
46 };
47
48 template <typename T, typename K>
49 class WeightedPriorityQueue :  public OpQueue <T, K>
50 {
51   private:
52     class ListPair : public bi::list_base_hook<>
53     {
54       public:
55         unsigned cost;
56         T item;
57         ListPair(unsigned c, T& i) :
58           cost(c),
59           item(i)
60           {}
61     };
62     class Klass : public bi::set_base_hook<>
63     {
64       typedef bi::list<ListPair> ListPairs;
65       typedef typename ListPairs::iterator Lit;
66       public:
67         K key;          // klass
68         ListPairs lp;
69         Klass(K& k) :
70           key(k)
71           {}
72       friend bool operator< (const Klass &a, const Klass &b)
73         { return a.key < b.key; }
74       friend bool operator> (const Klass &a, const Klass &b)
75         { return a.key > b.key; }
76       friend bool operator== (const Klass &a, const Klass &b)
77         { return a.key == b.key; }
78       void insert(unsigned cost, T& item, bool front) {
79         if (front) {
80           lp.push_front(*new ListPair(cost, item));
81         } else {
82           lp.push_back(*new ListPair(cost, item));
83         }
84       }
85       //Get the cost of the next item to dequeue
86       unsigned get_cost() const {
87         assert(!empty());
88         return lp.begin()->cost;
89       }
90       T pop() {
91         assert(!lp.empty());
92         T ret = lp.begin()->item;
93         lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
94         return ret;
95       }
96       bool empty() const {
97         return lp.empty();
98       }
99       unsigned get_size() const {
100         return lp.size();
101       }
102       unsigned filter_class(std::list<T>* out) {
103         unsigned count = 0;
104         for (Lit i = --lp.end();; --i) {
105           if (out) {
106             out->push_front(i->item);
107           }
108           i = lp.erase_and_dispose(i, DelItem<ListPair>());
109           ++count;
110           if (i == lp.begin()) {
111             break;
112           }
113         }
114         return count;
115       }
116     };
117     class SubQueue : public bi::set_base_hook<>
118     {
119       typedef bi::rbtree<Klass> Klasses;
120       typedef typename Klasses::iterator Kit;
121       void check_end() {
122         if (next == klasses.end()) {
123           next = klasses.begin();
124         }
125       }
126       public:
127         unsigned key;   // priority
128         Klasses klasses;
129         Kit next;
130         SubQueue(unsigned& p) :
131           key(p),
132           next(klasses.begin())
133           {}
134       friend bool operator< (const SubQueue &a, const SubQueue &b)
135         { return a.key < b.key; }
136       friend bool operator> (const SubQueue &a, const SubQueue &b)
137         { return a.key > b.key; }
138       friend bool operator== (const SubQueue &a, const SubQueue &b)
139         { return a.key == b.key; }
140       bool empty() const {
141         return klasses.empty();
142       }
143       void insert(K cl, unsigned cost, T& item, bool front = false) {
144         typename Klasses::insert_commit_data insert_data;
145         std::pair<Kit, bool> ret =
146           klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
147         if (ret.second) {
148           ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
149           check_end();
150         }
151         ret.first->insert(cost, item, front);
152       }
153       unsigned get_cost() const {
154         assert(!empty());
155         return next->get_cost();
156       }
157       T pop() {
158         T ret = next->pop();
159         if (next->empty()) {
160           next = klasses.erase_and_dispose(next, DelItem<Klass>());
161         } else {
162           ++next;
163         }
164         check_end();
165         return ret;
166       }
167       unsigned filter_class(K& cl, std::list<T>* out) {
168         unsigned count = 0;
169         Kit i = klasses.find(cl, MapKey<Klass, K>());
170         if (i != klasses.end()) {
171           count = i->filter_class(out);
172           Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
173           if (next == i) {
174             next = tmp;
175           }
176           check_end();
177         }
178         return count;
179       }
180       void dump(ceph::Formatter *f) const {
181         f->dump_int("num_keys", next->get_size());
182         if (!empty()) {
183           f->dump_int("first_item_cost", next->get_cost());
184         }
185       }
186     };
187     class Queue {
188       typedef bi::rbtree<SubQueue> SubQueues;
189       typedef typename SubQueues::iterator Sit;
190       SubQueues queues;
191       unsigned total_prio;
192       unsigned max_cost;
193       public:
194         unsigned size;
195         Queue() :
196           total_prio(0),
197           max_cost(0),
198           size(0)
199           {}
200         bool empty() const {
201           return !size;
202         }
203         void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
204           typename SubQueues::insert_commit_data insert_data;
205           std::pair<typename SubQueues::iterator, bool> ret =
206             queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
207           if (ret.second) {
208             ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
209             total_prio += p;
210           }
211           ret.first->insert(cl, cost, item, front);
212           if (cost > max_cost) {
213             max_cost = cost;
214           }
215           ++size;
216         }
217         T pop(bool strict = false) {
218           --size;
219           Sit i = --queues.end();
220           if (strict) {
221             T ret = i->pop();
222             if (i->empty()) {
223               queues.erase_and_dispose(i, DelItem<SubQueue>());
224             }
225             return ret;
226           }
227           if (queues.size() > 1) {
228             while (true) {
229               // Pick a new priority out of the total priority.
230               unsigned prio = rand() % total_prio + 1;
231               unsigned tp = total_prio - i->key;
232               // Find the priority coresponding to the picked number.
233               // Subtract high priorities to low priorities until the picked number
234               // is more than the total and try to dequeue that priority.
235               // Reverse the direction from previous implementation because there is a higher
236               // chance of dequeuing a high priority op so spend less time spinning.
237               while (prio <= tp) {
238                 --i;
239                 tp -= i->key;
240               }
241               // Flip a coin to see if this priority gets to run based on cost.
242               // The next op's cost is multiplied by .9 and subtracted from the
243               // max cost seen. Ops with lower costs will have a larger value
244               // and allow them to be selected easier than ops with high costs.
245               if (max_cost == 0 || rand() % max_cost <=
246                   (max_cost - ((i->get_cost() * 9) / 10))) {
247                 break;
248               }
249               i = --queues.end();
250             }
251           }
252           T ret = i->pop();
253           if (i->empty()) {
254             total_prio -= i->key;
255             queues.erase_and_dispose(i, DelItem<SubQueue>());
256           }
257           return ret;
258         }
259         void filter_class(K& cl, std::list<T>* out) {
260           for (Sit i = queues.begin(); i != queues.end();) {
261             size -= i->filter_class(cl, out);
262             if (i->empty()) {
263               total_prio -= i->key;
264               i = queues.erase_and_dispose(i, DelItem<SubQueue>());
265             } else {
266               ++i;
267             }
268           }
269         }
270         void dump(ceph::Formatter *f) const {
271           for (typename SubQueues::const_iterator i = queues.begin();
272                 i != queues.end(); ++i) {
273             f->dump_int("total_priority", total_prio);
274             f->dump_int("max_cost", max_cost);
275             f->open_object_section("subqueue");
276             f->dump_int("priority", i->key);
277             i->dump(f);
278             f->close_section();
279           }
280         }
281     };
282
283     Queue strict;
284     Queue normal;
285   public:
286     WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
287       strict(),
288       normal()
289       {
290         std::srand(time(0));
291       }
292     unsigned length() const final {
293       return strict.size + normal.size;
294     }
295     void remove_by_class(K cl, std::list<T>* removed = 0) final {
296       strict.filter_class(cl, removed);
297       normal.filter_class(cl, removed);
298     }
299     bool empty() const final {
300       return !(strict.size + normal.size);
301     }
302     void enqueue_strict(K cl, unsigned p, T item) final {
303       strict.insert(p, cl, 0, item);
304     }
305     void enqueue_strict_front(K cl, unsigned p, T item) final {
306       strict.insert(p, cl, 0, item, true);
307     }
308     void enqueue(K cl, unsigned p, unsigned cost, T item) final {
309       normal.insert(p, cl, cost, item);
310     }
311     void enqueue_front(K cl, unsigned p, unsigned cost, T item) final {
312       normal.insert(p, cl, cost, item, true);
313     }
314     T dequeue() override {
315       assert(strict.size + normal.size > 0);
316       if (!strict.empty()) {
317         return strict.pop(true);
318       }
319       return normal.pop();
320     }
321     void dump(ceph::Formatter *f) const override {
322       f->open_array_section("high_queues");
323       strict.dump(f);
324       f->close_section();
325       f->open_array_section("queues");
326       normal.dump(f);
327       f->close_section();
328     }
329 };
330
331 #endif