// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef ASYNC_RESERVER_H #define ASYNC_RESERVER_H #include "common/Finisher.h" #include "common/Formatter.h" #define rdout(x) lgeneric_subdout(cct,reserver,x) /** * Manages a configurable number of asyncronous reservations. * * Memory usage is linear with the number of items queued and * linear with respect to the total number of priorities used * over all time. */ template class AsyncReserver { CephContext *cct; Finisher *f; unsigned max_allowed; unsigned min_priority; Mutex lock; struct Reservation { T item; unsigned prio = 0; Context *grant = 0; Context *preempt = 0; Reservation() {} Reservation(T i, unsigned pr, Context *g, Context *p = 0) : item(i), prio(pr), grant(g), preempt(p) {} void dump(Formatter *f) const { f->dump_stream("item") << item; f->dump_unsigned("prio", prio); f->dump_bool("can_preempt", !!preempt); } friend ostream& operator<<(ostream& out, const Reservation& r) { return out << r.item << "(prio " << r.prio << " grant " << r.grant << " preempt " << r.preempt << ")"; } }; map> queues; map::iterator>> queue_pointers; map in_progress; set> preempt_by_prio; ///< in_progress that can be preempted void preempt_one() { assert(!preempt_by_prio.empty()); auto q = in_progress.find(preempt_by_prio.begin()->second); assert(q != in_progress.end()); Reservation victim = q->second; rdout(10) << __func__ << " preempt " << victim << dendl; f->queue(victim.preempt); victim.preempt = nullptr; in_progress.erase(q); preempt_by_prio.erase(preempt_by_prio.begin()); } void do_queues() { rdout(20) << __func__ << ":\n"; JSONFormatter jf(true); jf.open_object_section("queue"); _dump(&jf); jf.close_section(); jf.flush(*_dout); *_dout << dendl; // in case min_priority was adjusted up or max_allowed was adjusted down while (!preempt_by_prio.empty() && (in_progress.size() > max_allowed || preempt_by_prio.begin()->first < min_priority)) { preempt_one(); } while (!queues.empty()) { // choose highest priority queue auto it = queues.end(); --it; assert(!it->second.empty()); if (it->first < min_priority) { break; } if (in_progress.size() >= max_allowed && !preempt_by_prio.empty() && it->first > preempt_by_prio.begin()->first) { preempt_one(); } if (in_progress.size() >= max_allowed) { break; // no room } // grant Reservation p = it->second.front(); rdout(10) << __func__ << " grant " << p << dendl; queue_pointers.erase(p.item); it->second.pop_front(); if (it->second.empty()) { queues.erase(it); } f->queue(p.grant); p.grant = nullptr; in_progress[p.item] = p; if (p.preempt) { preempt_by_prio.insert(make_pair(p.prio, p.item)); } } } public: AsyncReserver( CephContext *cct, Finisher *f, unsigned max_allowed, unsigned min_priority = 0) : cct(cct), f(f), max_allowed(max_allowed), min_priority(min_priority), lock("AsyncReserver::lock") {} void set_max(unsigned max) { Mutex::Locker l(lock); max_allowed = max; do_queues(); } void set_min_priority(unsigned min) { Mutex::Locker l(lock); min_priority = min; do_queues(); } void dump(Formatter *f) { Mutex::Locker l(lock); _dump(f); } void _dump(Formatter *f) { f->dump_unsigned("max_allowed", max_allowed); f->dump_unsigned("min_priority", min_priority); f->open_array_section("queues"); for (auto& p : queues) { f->open_object_section("queue"); f->dump_unsigned("priority", p.first); f->open_array_section("items"); for (auto& q : p.second) { f->dump_object("item", q); } f->close_section(); f->close_section(); } f->close_section(); f->open_array_section("in_progress"); for (auto& p : in_progress) { f->dump_object("item", p.second); } f->close_section(); } /** * Requests a reservation * * Note, on_reserved may be called following cancel_reservation. Thus, * the callback must be safe in that case. Callback will be called * with no locks held. cancel_reservation must be called to release the * reservation slot. */ void request_reservation( T item, ///< [in] reservation key Context *on_reserved, ///< [in] callback to be called on reservation unsigned prio, ///< [in] priority Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional) ) { Mutex::Locker l(lock); Reservation r(item, prio, on_reserved, on_preempt); rdout(10) << __func__ << " queue " << r << dendl; assert(!queue_pointers.count(item) && !in_progress.count(item)); queues[prio].push_back(r); queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end()))); do_queues(); } /** * Cancels reservation * * Frees the reservation under key for use. * Note, after cancel_reservation, the reservation_callback may or * may not still be called. */ void cancel_reservation( T item ///< [in] key for reservation to cancel ) { Mutex::Locker l(lock); auto i = queue_pointers.find(item); if (i != queue_pointers.end()) { unsigned prio = i->second.first; const Reservation& r = *i->second.second; rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl; delete r.grant; delete r.preempt; queues[prio].erase(i->second.second); if (queues[prio].empty()) { queues.erase(prio); } queue_pointers.erase(i); } else { auto p = in_progress.find(item); if (p != in_progress.end()) { rdout(10) << __func__ << " cancel " << p->second << " (was in progress)" << dendl; if (p->second.preempt) { preempt_by_prio.erase(make_pair(p->second.prio, p->second.item)); delete p->second.preempt; } in_progress.erase(p); } else { rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl; } } do_queues(); } /** * Has reservations * * Return true if there are reservations in progress */ bool has_reservation() { Mutex::Locker l(lock); return !in_progress.empty(); } static const unsigned MAX_PRIORITY = (unsigned)-1; }; #undef rdout #endif