Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / AsyncReserver.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef ASYNC_RESERVER_H
16 #define ASYNC_RESERVER_H
17
18 #include "common/Finisher.h"
19 #include "common/Formatter.h"
20
21 #define rdout(x) lgeneric_subdout(cct,reserver,x)
22
23 /**
24  * Manages a configurable number of asyncronous reservations.
25  *
26  * Memory usage is linear with the number of items queued and
27  * linear with respect to the total number of priorities used
28  * over all time.
29  */
30 template <typename T>
31 class AsyncReserver {
32   CephContext *cct;
33   Finisher *f;
34   unsigned max_allowed;
35   unsigned min_priority;
36   Mutex lock;
37
38   struct Reservation {
39     T item;
40     unsigned prio = 0;
41     Context *grant = 0;
42     Context *preempt = 0;
43     Reservation() {}
44     Reservation(T i, unsigned pr, Context *g, Context *p = 0)
45       : item(i), prio(pr), grant(g), preempt(p) {}
46     void dump(Formatter *f) const {
47       f->dump_stream("item") << item;
48       f->dump_unsigned("prio", prio);
49       f->dump_bool("can_preempt", !!preempt);
50     }
51     friend ostream& operator<<(ostream& out, const Reservation& r) {
52       return out << r.item << "(prio " << r.prio << " grant " << r.grant
53                  << " preempt " << r.preempt << ")";
54     }
55   };
56
57   map<unsigned, list<Reservation>> queues;
58   map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
59   map<T,Reservation> in_progress;
60   set<pair<unsigned,T>> preempt_by_prio;  ///< in_progress that can be preempted
61
62   void preempt_one() {
63     assert(!preempt_by_prio.empty());
64     auto q = in_progress.find(preempt_by_prio.begin()->second);
65     assert(q != in_progress.end());
66     Reservation victim = q->second;
67     rdout(10) << __func__ << " preempt " << victim << dendl;
68     f->queue(victim.preempt);
69     victim.preempt = nullptr;
70     in_progress.erase(q);
71     preempt_by_prio.erase(preempt_by_prio.begin());
72   }
73
74   void do_queues() {
75     rdout(20) << __func__ << ":\n";
76     JSONFormatter jf(true);
77     jf.open_object_section("queue");
78     _dump(&jf);
79     jf.close_section();
80     jf.flush(*_dout);
81     *_dout << dendl;
82
83     // in case min_priority was adjusted up or max_allowed was adjusted down
84     while (!preempt_by_prio.empty() &&
85            (in_progress.size() > max_allowed ||
86             preempt_by_prio.begin()->first < min_priority)) {
87       preempt_one();
88     }
89
90     while (!queues.empty()) {
91       // choose highest priority queue
92       auto it = queues.end();
93       --it;
94       assert(!it->second.empty());
95       if (it->first < min_priority) {
96         break;
97       }
98       if (in_progress.size() >= max_allowed &&
99           !preempt_by_prio.empty() &&
100           it->first > preempt_by_prio.begin()->first) {
101         preempt_one();
102       }
103       if (in_progress.size() >= max_allowed) {
104         break; // no room
105       }
106       // grant
107       Reservation p = it->second.front();
108       rdout(10) << __func__ << " grant " << p << dendl;
109       queue_pointers.erase(p.item);
110       it->second.pop_front();
111       if (it->second.empty()) {
112         queues.erase(it);
113       }
114       f->queue(p.grant);
115       p.grant = nullptr;
116       in_progress[p.item] = p;
117       if (p.preempt) {
118         preempt_by_prio.insert(make_pair(p.prio, p.item));
119       }
120     }
121   }
122 public:
123   AsyncReserver(
124     CephContext *cct,
125     Finisher *f,
126     unsigned max_allowed,
127     unsigned min_priority = 0)
128     : cct(cct),
129       f(f),
130       max_allowed(max_allowed),
131       min_priority(min_priority),
132       lock("AsyncReserver::lock") {}
133
134   void set_max(unsigned max) {
135     Mutex::Locker l(lock);
136     max_allowed = max;
137     do_queues();
138   }
139
140   void set_min_priority(unsigned min) {
141     Mutex::Locker l(lock);
142     min_priority = min;
143     do_queues();
144   }
145
146   void dump(Formatter *f) {
147     Mutex::Locker l(lock);
148     _dump(f);
149   }
150   void _dump(Formatter *f) {
151     f->dump_unsigned("max_allowed", max_allowed);
152     f->dump_unsigned("min_priority", min_priority);
153     f->open_array_section("queues");
154     for (auto& p : queues) {
155       f->open_object_section("queue");
156       f->dump_unsigned("priority", p.first);
157       f->open_array_section("items");
158       for (auto& q : p.second) {
159         f->dump_object("item", q);
160       }
161       f->close_section();
162       f->close_section();
163     }
164     f->close_section();
165     f->open_array_section("in_progress");
166     for (auto& p : in_progress) {
167       f->dump_object("item", p.second);
168     }
169     f->close_section();
170   }
171
172   /**
173    * Requests a reservation
174    *
175    * Note, on_reserved may be called following cancel_reservation.  Thus,
176    * the callback must be safe in that case.  Callback will be called
177    * with no locks held.  cancel_reservation must be called to release the
178    * reservation slot.
179    */
180   void request_reservation(
181     T item,                   ///< [in] reservation key
182     Context *on_reserved,     ///< [in] callback to be called on reservation
183     unsigned prio,            ///< [in] priority
184     Context *on_preempt = 0   ///< [in] callback to be called if we are preempted (optional)
185     ) {
186     Mutex::Locker l(lock);
187     Reservation r(item, prio, on_reserved, on_preempt);
188     rdout(10) << __func__ << " queue " << r << dendl;
189     assert(!queue_pointers.count(item) &&
190            !in_progress.count(item));
191     queues[prio].push_back(r);
192     queue_pointers.insert(make_pair(item,
193                                     make_pair(prio,--(queues[prio]).end())));
194     do_queues();
195   }
196
197   /**
198    * Cancels reservation
199    *
200    * Frees the reservation under key for use.
201    * Note, after cancel_reservation, the reservation_callback may or
202    * may not still be called. 
203    */
204   void cancel_reservation(
205     T item                   ///< [in] key for reservation to cancel
206     ) {
207     Mutex::Locker l(lock);
208     auto i = queue_pointers.find(item);
209     if (i != queue_pointers.end()) {
210       unsigned prio = i->second.first;
211       const Reservation& r = *i->second.second;
212       rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
213       delete r.grant;
214       delete r.preempt;
215       queues[prio].erase(i->second.second);
216       if (queues[prio].empty()) {
217         queues.erase(prio);
218       }
219       queue_pointers.erase(i);
220     } else {
221       auto p = in_progress.find(item);
222       if (p != in_progress.end()) {
223         rdout(10) << __func__ << " cancel " << p->second
224                   << " (was in progress)" << dendl;
225         if (p->second.preempt) {
226           preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
227           delete p->second.preempt;
228         }
229         in_progress.erase(p);
230       } else {
231         rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
232       }
233     }
234     do_queues();
235   }
236
237   /**
238    * Has reservations
239    *
240    * Return true if there are reservations in progress
241    */
242   bool has_reservation() {
243     Mutex::Locker l(lock);
244     return !in_progress.empty();
245   }
246   static const unsigned MAX_PRIORITY = (unsigned)-1;
247 };
248
249 #undef rdout
250 #endif