These changes are the raw update to qemu-2.6.
[kvmfornfv.git] / qemu / block / throttle-groups.c
1 /*
2  * QEMU block throttling group infrastructure
3  *
4  * Copyright (C) Nodalink, EURL. 2014
5  * Copyright (C) Igalia, S.L. 2015
6  *
7  * Authors:
8  *   BenoĆ®t Canet <benoit.canet@nodalink.com>
9  *   Alberto Garcia <berto@igalia.com>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 or
14  * (at your option) version 3 of the License.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, see <http://www.gnu.org/licenses/>.
23  */
24
25 #include "qemu/osdep.h"
26 #include "block/throttle-groups.h"
27 #include "qemu/queue.h"
28 #include "qemu/thread.h"
29 #include "sysemu/qtest.h"
30
31 /* The ThrottleGroup structure (with its ThrottleState) is shared
32  * among different BlockDriverState and it's independent from
33  * AioContext, so in order to use it from different threads it needs
34  * its own locking.
35  *
36  * This locking is however handled internally in this file, so it's
37  * transparent to outside users.
38  *
39  * The whole ThrottleGroup structure is private and invisible to
40  * outside users, that only use it through its ThrottleState.
41  *
42  * In addition to the ThrottleGroup structure, BlockDriverState has
43  * fields that need to be accessed by other members of the group and
44  * therefore also need to be protected by this lock. Once a BDS is
45  * registered in a group those fields can be accessed by other threads
46  * any time.
47  *
48  * Again, all this is handled internally and is mostly transparent to
49  * the outside. The 'throttle_timers' field however has an additional
50  * constraint because it may be temporarily invalid (see for example
51  * bdrv_set_aio_context()). Therefore in this file a thread will
52  * access some other BDS's timers only after verifying that that BDS
53  * has throttled requests in the queue.
54  */
55 typedef struct ThrottleGroup {
56     char *name; /* This is constant during the lifetime of the group */
57
58     QemuMutex lock; /* This lock protects the following four fields */
59     ThrottleState ts;
60     QLIST_HEAD(, BlockDriverState) head;
61     BlockDriverState *tokens[2];
62     bool any_timer_armed[2];
63
64     /* These two are protected by the global throttle_groups_lock */
65     unsigned refcount;
66     QTAILQ_ENTRY(ThrottleGroup) list;
67 } ThrottleGroup;
68
69 static QemuMutex throttle_groups_lock;
70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
71     QTAILQ_HEAD_INITIALIZER(throttle_groups);
72
73 /* Increments the reference count of a ThrottleGroup given its name.
74  *
75  * If no ThrottleGroup is found with the given name a new one is
76  * created.
77  *
78  * @name: the name of the ThrottleGroup
79  * @ret:  the ThrottleState member of the ThrottleGroup
80  */
81 ThrottleState *throttle_group_incref(const char *name)
82 {
83     ThrottleGroup *tg = NULL;
84     ThrottleGroup *iter;
85
86     qemu_mutex_lock(&throttle_groups_lock);
87
88     /* Look for an existing group with that name */
89     QTAILQ_FOREACH(iter, &throttle_groups, list) {
90         if (!strcmp(name, iter->name)) {
91             tg = iter;
92             break;
93         }
94     }
95
96     /* Create a new one if not found */
97     if (!tg) {
98         tg = g_new0(ThrottleGroup, 1);
99         tg->name = g_strdup(name);
100         qemu_mutex_init(&tg->lock);
101         throttle_init(&tg->ts);
102         QLIST_INIT(&tg->head);
103
104         QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
105     }
106
107     tg->refcount++;
108
109     qemu_mutex_unlock(&throttle_groups_lock);
110
111     return &tg->ts;
112 }
113
114 /* Decrease the reference count of a ThrottleGroup.
115  *
116  * When the reference count reaches zero the ThrottleGroup is
117  * destroyed.
118  *
119  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
120  */
121 void throttle_group_unref(ThrottleState *ts)
122 {
123     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
124
125     qemu_mutex_lock(&throttle_groups_lock);
126     if (--tg->refcount == 0) {
127         QTAILQ_REMOVE(&throttle_groups, tg, list);
128         qemu_mutex_destroy(&tg->lock);
129         g_free(tg->name);
130         g_free(tg);
131     }
132     qemu_mutex_unlock(&throttle_groups_lock);
133 }
134
135 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and
136  * the pointer) is guaranteed to remain constant during the lifetime
137  * of the group.
138  *
139  * @bs:   a BlockDriverState that is member of a throttling group
140  * @ret:  the name of the group.
141  */
142 const char *throttle_group_get_name(BlockDriverState *bs)
143 {
144     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
145     return tg->name;
146 }
147
148 /* Return the next BlockDriverState in the round-robin sequence,
149  * simulating a circular list.
150  *
151  * This assumes that tg->lock is held.
152  *
153  * @bs:  the current BlockDriverState
154  * @ret: the next BlockDriverState in the sequence
155  */
156 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
157 {
158     ThrottleState *ts = bs->throttle_state;
159     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
160     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
161
162     if (!next) {
163         return QLIST_FIRST(&tg->head);
164     }
165
166     return next;
167 }
168
169 /* Return the next BlockDriverState in the round-robin sequence with
170  * pending I/O requests.
171  *
172  * This assumes that tg->lock is held.
173  *
174  * @bs:        the current BlockDriverState
175  * @is_write:  the type of operation (read/write)
176  * @ret:       the next BlockDriverState with pending requests, or bs
177  *             if there is none.
178  */
179 static BlockDriverState *next_throttle_token(BlockDriverState *bs,
180                                              bool is_write)
181 {
182     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
183     BlockDriverState *token, *start;
184
185     start = token = tg->tokens[is_write];
186
187     /* get next bs round in round robin style */
188     token = throttle_group_next_bs(token);
189     while (token != start && !token->pending_reqs[is_write]) {
190         token = throttle_group_next_bs(token);
191     }
192
193     /* If no IO are queued for scheduling on the next round robin token
194      * then decide the token is the current bs because chances are
195      * the current bs get the current request queued.
196      */
197     if (token == start && !token->pending_reqs[is_write]) {
198         token = bs;
199     }
200
201     return token;
202 }
203
204 /* Check if the next I/O request for a BlockDriverState needs to be
205  * throttled or not. If there's no timer set in this group, set one
206  * and update the token accordingly.
207  *
208  * This assumes that tg->lock is held.
209  *
210  * @bs:         the current BlockDriverState
211  * @is_write:   the type of operation (read/write)
212  * @ret:        whether the I/O request needs to be throttled or not
213  */
214 static bool throttle_group_schedule_timer(BlockDriverState *bs,
215                                           bool is_write)
216 {
217     ThrottleState *ts = bs->throttle_state;
218     ThrottleTimers *tt = &bs->throttle_timers;
219     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
220     bool must_wait;
221
222     /* Check if any of the timers in this group is already armed */
223     if (tg->any_timer_armed[is_write]) {
224         return true;
225     }
226
227     must_wait = throttle_schedule_timer(ts, tt, is_write);
228
229     /* If a timer just got armed, set bs as the current token */
230     if (must_wait) {
231         tg->tokens[is_write] = bs;
232         tg->any_timer_armed[is_write] = true;
233     }
234
235     return must_wait;
236 }
237
238 /* Look for the next pending I/O request and schedule it.
239  *
240  * This assumes that tg->lock is held.
241  *
242  * @bs:        the current BlockDriverState
243  * @is_write:  the type of operation (read/write)
244  */
245 static void schedule_next_request(BlockDriverState *bs, bool is_write)
246 {
247     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
248     bool must_wait;
249     BlockDriverState *token;
250
251     /* Check if there's any pending request to schedule next */
252     token = next_throttle_token(bs, is_write);
253     if (!token->pending_reqs[is_write]) {
254         return;
255     }
256
257     /* Set a timer for the request if it needs to be throttled */
258     must_wait = throttle_group_schedule_timer(token, is_write);
259
260     /* If it doesn't have to wait, queue it for immediate execution */
261     if (!must_wait) {
262         /* Give preference to requests from the current bs */
263         if (qemu_in_coroutine() &&
264             qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
265             token = bs;
266         } else {
267             ThrottleTimers *tt = &token->throttle_timers;
268             int64_t now = qemu_clock_get_ns(tt->clock_type);
269             timer_mod(tt->timers[is_write], now + 1);
270             tg->any_timer_armed[is_write] = true;
271         }
272         tg->tokens[is_write] = token;
273     }
274 }
275
276 /* Check if an I/O request needs to be throttled, wait and set a timer
277  * if necessary, and schedule the next request using a round robin
278  * algorithm.
279  *
280  * @bs:        the current BlockDriverState
281  * @bytes:     the number of bytes for this I/O
282  * @is_write:  the type of operation (read/write)
283  */
284 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
285                                                         unsigned int bytes,
286                                                         bool is_write)
287 {
288     bool must_wait;
289     BlockDriverState *token;
290
291     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
292     qemu_mutex_lock(&tg->lock);
293
294     /* First we check if this I/O has to be throttled. */
295     token = next_throttle_token(bs, is_write);
296     must_wait = throttle_group_schedule_timer(token, is_write);
297
298     /* Wait if there's a timer set or queued requests of this type */
299     if (must_wait || bs->pending_reqs[is_write]) {
300         bs->pending_reqs[is_write]++;
301         qemu_mutex_unlock(&tg->lock);
302         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
303         qemu_mutex_lock(&tg->lock);
304         bs->pending_reqs[is_write]--;
305     }
306
307     /* The I/O will be executed, so do the accounting */
308     throttle_account(bs->throttle_state, is_write, bytes);
309
310     /* Schedule the next request */
311     schedule_next_request(bs, is_write);
312
313     qemu_mutex_unlock(&tg->lock);
314 }
315
316 /* Update the throttle configuration for a particular group. Similar
317  * to throttle_config(), but guarantees atomicity within the
318  * throttling group.
319  *
320  * @bs:  a BlockDriverState that is member of the group
321  * @cfg: the configuration to set
322  */
323 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg)
324 {
325     ThrottleTimers *tt = &bs->throttle_timers;
326     ThrottleState *ts = bs->throttle_state;
327     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
328     qemu_mutex_lock(&tg->lock);
329     /* throttle_config() cancels the timers */
330     if (timer_pending(tt->timers[0])) {
331         tg->any_timer_armed[0] = false;
332     }
333     if (timer_pending(tt->timers[1])) {
334         tg->any_timer_armed[1] = false;
335     }
336     throttle_config(ts, tt, cfg);
337     qemu_mutex_unlock(&tg->lock);
338 }
339
340 /* Get the throttle configuration from a particular group. Similar to
341  * throttle_get_config(), but guarantees atomicity within the
342  * throttling group.
343  *
344  * @bs:  a BlockDriverState that is member of the group
345  * @cfg: the configuration will be written here
346  */
347 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
348 {
349     ThrottleState *ts = bs->throttle_state;
350     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
351     qemu_mutex_lock(&tg->lock);
352     throttle_get_config(ts, cfg);
353     qemu_mutex_unlock(&tg->lock);
354 }
355
356 /* ThrottleTimers callback. This wakes up a request that was waiting
357  * because it had been throttled.
358  *
359  * @bs:        the BlockDriverState whose request had been throttled
360  * @is_write:  the type of operation (read/write)
361  */
362 static void timer_cb(BlockDriverState *bs, bool is_write)
363 {
364     ThrottleState *ts = bs->throttle_state;
365     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
366     bool empty_queue;
367
368     /* The timer has just been fired, so we can update the flag */
369     qemu_mutex_lock(&tg->lock);
370     tg->any_timer_armed[is_write] = false;
371     qemu_mutex_unlock(&tg->lock);
372
373     /* Run the request that was waiting for this timer */
374     empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
375
376     /* If the request queue was empty then we have to take care of
377      * scheduling the next one */
378     if (empty_queue) {
379         qemu_mutex_lock(&tg->lock);
380         schedule_next_request(bs, is_write);
381         qemu_mutex_unlock(&tg->lock);
382     }
383 }
384
385 static void read_timer_cb(void *opaque)
386 {
387     timer_cb(opaque, false);
388 }
389
390 static void write_timer_cb(void *opaque)
391 {
392     timer_cb(opaque, true);
393 }
394
395 /* Register a BlockDriverState in the throttling group, also
396  * initializing its timers and updating its throttle_state pointer to
397  * point to it. If a throttling group with that name does not exist
398  * yet, it will be created.
399  *
400  * @bs:        the BlockDriverState to insert
401  * @groupname: the name of the group
402  */
403 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
404 {
405     int i;
406     ThrottleState *ts = throttle_group_incref(groupname);
407     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
408     int clock_type = QEMU_CLOCK_REALTIME;
409
410     if (qtest_enabled()) {
411         /* For testing block IO throttling only */
412         clock_type = QEMU_CLOCK_VIRTUAL;
413     }
414
415     bs->throttle_state = ts;
416
417     qemu_mutex_lock(&tg->lock);
418     /* If the ThrottleGroup is new set this BlockDriverState as the token */
419     for (i = 0; i < 2; i++) {
420         if (!tg->tokens[i]) {
421             tg->tokens[i] = bs;
422         }
423     }
424
425     QLIST_INSERT_HEAD(&tg->head, bs, round_robin);
426
427     throttle_timers_init(&bs->throttle_timers,
428                          bdrv_get_aio_context(bs),
429                          clock_type,
430                          read_timer_cb,
431                          write_timer_cb,
432                          bs);
433
434     qemu_mutex_unlock(&tg->lock);
435 }
436
437 /* Unregister a BlockDriverState from its group, removing it from the
438  * list, destroying the timers and setting the throttle_state pointer
439  * to NULL.
440  *
441  * The BlockDriverState must not have pending throttled requests, so
442  * the caller has to drain them first.
443  *
444  * The group will be destroyed if it's empty after this operation.
445  *
446  * @bs: the BlockDriverState to remove
447  */
448 void throttle_group_unregister_bs(BlockDriverState *bs)
449 {
450     ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
451     int i;
452
453     assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0);
454     assert(qemu_co_queue_empty(&bs->throttled_reqs[0]));
455     assert(qemu_co_queue_empty(&bs->throttled_reqs[1]));
456
457     qemu_mutex_lock(&tg->lock);
458     for (i = 0; i < 2; i++) {
459         if (tg->tokens[i] == bs) {
460             BlockDriverState *token = throttle_group_next_bs(bs);
461             /* Take care of the case where this is the last bs in the group */
462             if (token == bs) {
463                 token = NULL;
464             }
465             tg->tokens[i] = token;
466         }
467     }
468
469     /* remove the current bs from the list */
470     QLIST_REMOVE(bs, round_robin);
471     throttle_timers_destroy(&bs->throttle_timers);
472     qemu_mutex_unlock(&tg->lock);
473
474     throttle_group_unref(&tg->ts);
475     bs->throttle_state = NULL;
476 }
477
478 static void throttle_groups_init(void)
479 {
480     qemu_mutex_init(&throttle_groups_lock);
481 }
482
483 block_init(throttle_groups_init);