// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef WP_QUEUE_H #define WP_QUEUE_H #include "OpQueue.h" #include #include #include namespace bi = boost::intrusive; template class MapKey { public: bool operator()(const S i, const T &k) const { return i < k.key; } bool operator()(const T &k, const S i) const { return k.key < i; } }; template class DelItem { public: void operator()(T* delete_this) { delete delete_this; } }; template class WeightedPriorityQueue : public OpQueue { private: class ListPair : public bi::list_base_hook<> { public: unsigned cost; T item; ListPair(unsigned c, T& i) : cost(c), item(i) {} }; class Klass : public bi::set_base_hook<> { typedef bi::list ListPairs; typedef typename ListPairs::iterator Lit; public: K key; // klass ListPairs lp; Klass(K& k) : key(k) {} friend bool operator< (const Klass &a, const Klass &b) { return a.key < b.key; } friend bool operator> (const Klass &a, const Klass &b) { return a.key > b.key; } friend bool operator== (const Klass &a, const Klass &b) { return a.key == b.key; } void insert(unsigned cost, T& item, bool front) { if (front) { lp.push_front(*new ListPair(cost, item)); } else { lp.push_back(*new ListPair(cost, item)); } } //Get the cost of the next item to dequeue unsigned get_cost() const { assert(!empty()); return lp.begin()->cost; } T pop() { assert(!lp.empty()); T ret = lp.begin()->item; lp.erase_and_dispose(lp.begin(), DelItem()); return ret; } bool empty() const { return lp.empty(); } unsigned get_size() const { return lp.size(); } unsigned filter_class(std::list* out) { unsigned count = 0; for (Lit i = --lp.end();; --i) { if (out) { out->push_front(i->item); } i = lp.erase_and_dispose(i, DelItem()); ++count; if (i == lp.begin()) { break; } } return count; } }; class SubQueue : public bi::set_base_hook<> { typedef bi::rbtree Klasses; typedef typename Klasses::iterator Kit; void check_end() { if (next == klasses.end()) { next = klasses.begin(); } } public: unsigned key; // priority Klasses klasses; Kit next; SubQueue(unsigned& p) : key(p), next(klasses.begin()) {} friend bool operator< (const SubQueue &a, const SubQueue &b) { return a.key < b.key; } friend bool operator> (const SubQueue &a, const SubQueue &b) { return a.key > b.key; } friend bool operator== (const SubQueue &a, const SubQueue &b) { return a.key == b.key; } bool empty() const { return klasses.empty(); } void insert(K cl, unsigned cost, T& item, bool front = false) { typename Klasses::insert_commit_data insert_data; std::pair ret = klasses.insert_unique_check(cl, MapKey(), insert_data); if (ret.second) { ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data); check_end(); } ret.first->insert(cost, item, front); } unsigned get_cost() const { assert(!empty()); return next->get_cost(); } T pop() { T ret = next->pop(); if (next->empty()) { next = klasses.erase_and_dispose(next, DelItem()); } else { ++next; } check_end(); return ret; } unsigned filter_class(K& cl, std::list* out) { unsigned count = 0; Kit i = klasses.find(cl, MapKey()); if (i != klasses.end()) { count = i->filter_class(out); Kit tmp = klasses.erase_and_dispose(i, DelItem()); if (next == i) { next = tmp; } check_end(); } return count; } void dump(ceph::Formatter *f) const { f->dump_int("num_keys", next->get_size()); if (!empty()) { f->dump_int("first_item_cost", next->get_cost()); } } }; class Queue { typedef bi::rbtree SubQueues; typedef typename SubQueues::iterator Sit; SubQueues queues; unsigned total_prio; unsigned max_cost; public: unsigned size; Queue() : total_prio(0), max_cost(0), size(0) {} bool empty() const { return !size; } void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) { typename SubQueues::insert_commit_data insert_data; std::pair ret = queues.insert_unique_check(p, MapKey(), insert_data); if (ret.second) { ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data); total_prio += p; } ret.first->insert(cl, cost, item, front); if (cost > max_cost) { max_cost = cost; } ++size; } T pop(bool strict = false) { --size; Sit i = --queues.end(); if (strict) { T ret = i->pop(); if (i->empty()) { queues.erase_and_dispose(i, DelItem()); } return ret; } if (queues.size() > 1) { while (true) { // Pick a new priority out of the total priority. unsigned prio = rand() % total_prio + 1; unsigned tp = total_prio - i->key; // Find the priority coresponding to the picked number. // Subtract high priorities to low priorities until the picked number // is more than the total and try to dequeue that priority. // Reverse the direction from previous implementation because there is a higher // chance of dequeuing a high priority op so spend less time spinning. while (prio <= tp) { --i; tp -= i->key; } // Flip a coin to see if this priority gets to run based on cost. // The next op's cost is multiplied by .9 and subtracted from the // max cost seen. Ops with lower costs will have a larger value // and allow them to be selected easier than ops with high costs. if (max_cost == 0 || rand() % max_cost <= (max_cost - ((i->get_cost() * 9) / 10))) { break; } i = --queues.end(); } } T ret = i->pop(); if (i->empty()) { total_prio -= i->key; queues.erase_and_dispose(i, DelItem()); } return ret; } void filter_class(K& cl, std::list* out) { for (Sit i = queues.begin(); i != queues.end();) { size -= i->filter_class(cl, out); if (i->empty()) { total_prio -= i->key; i = queues.erase_and_dispose(i, DelItem()); } else { ++i; } } } void dump(ceph::Formatter *f) const { for (typename SubQueues::const_iterator i = queues.begin(); i != queues.end(); ++i) { f->dump_int("total_priority", total_prio); f->dump_int("max_cost", max_cost); f->open_object_section("subqueue"); f->dump_int("priority", i->key); i->dump(f); f->close_section(); } } }; Queue strict; Queue normal; public: WeightedPriorityQueue(unsigned max_per, unsigned min_c) : strict(), normal() { std::srand(time(0)); } unsigned length() const final { return strict.size + normal.size; } void remove_by_class(K cl, std::list* removed = 0) final { strict.filter_class(cl, removed); normal.filter_class(cl, removed); } bool empty() const final { return !(strict.size + normal.size); } void enqueue_strict(K cl, unsigned p, T item) final { strict.insert(p, cl, 0, item); } void enqueue_strict_front(K cl, unsigned p, T item) final { strict.insert(p, cl, 0, item, true); } void enqueue(K cl, unsigned p, unsigned cost, T item) final { normal.insert(p, cl, cost, item); } void enqueue_front(K cl, unsigned p, unsigned cost, T item) final { normal.insert(p, cl, cost, item, true); } T dequeue() override { assert(strict.size + normal.size > 0); if (!strict.empty()) { return strict.pop(true); } return normal.pop(); } void dump(ceph::Formatter *f) const override { f->open_array_section("high_queues"); strict.dump(f); f->close_section(); f->open_array_section("queues"); normal.dump(f); f->close_section(); } }; #endif