X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FPrioritizedQueue.h;fp=src%2Fceph%2Fsrc%2Fcommon%2FPrioritizedQueue.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=816d80ac1aea406b3c9769eca5060ffc71fca28c;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/common/PrioritizedQueue.h b/src/ceph/src/common/PrioritizedQueue.h deleted file mode 100644 index 816d80a..0000000 --- a/src/ceph/src/common/PrioritizedQueue.h +++ /dev/null @@ -1,344 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef PRIORITY_QUEUE_H -#define PRIORITY_QUEUE_H - -#include "common/Formatter.h" -#include "common/OpQueue.h" - -/** - * Manages queue for normal and strict priority items - * - * On dequeue, the queue will select the lowest priority queue - * such that the q has bucket > cost of front queue item. - * - * If there is no such queue, we choose the next queue item for - * the highest priority queue. - * - * Before returning a dequeued item, we place into each bucket - * cost * (priority/total_priority) tokens. - * - * enqueue_strict and enqueue_strict_front queue items into queues - * which are serviced in strict priority order before items queued - * with enqueue and enqueue_front - * - * Within a priority class, we schedule round robin based on the class - * of type K used to enqueue items. e.g. you could use entity_inst_t - * to provide fairness for different clients. - */ -template -class PrioritizedQueue : public OpQueue { - int64_t total_priority; - int64_t max_tokens_per_subqueue; - int64_t min_cost; - - typedef std::list > ListPairs; - - struct SubQueue { - private: - typedef std::map Classes; - Classes q; - unsigned tokens, max_tokens; - int64_t size; - typename Classes::iterator cur; - public: - SubQueue(const SubQueue &other) - : q(other.q), - tokens(other.tokens), - max_tokens(other.max_tokens), - size(other.size), - cur(q.begin()) {} - SubQueue() - : tokens(0), - max_tokens(0), - size(0), cur(q.begin()) {} - void set_max_tokens(unsigned mt) { - max_tokens = mt; - } - unsigned get_max_tokens() const { - return max_tokens; - } - unsigned num_tokens() const { - return tokens; - } - void put_tokens(unsigned t) { - tokens += t; - if (tokens > max_tokens) { - tokens = max_tokens; - } - } - void take_tokens(unsigned t) { - if (tokens > t) { - tokens -= t; - } else { - tokens = 0; - } - } - void enqueue(K cl, unsigned cost, T item) { - q[cl].push_back(std::make_pair(cost, item)); - if (cur == q.end()) - cur = q.begin(); - size++; - } - void enqueue_front(K cl, unsigned cost, T item) { - q[cl].push_front(std::make_pair(cost, item)); - if (cur == q.end()) - cur = q.begin(); - size++; - } - std::pair front() const { - assert(!(q.empty())); - assert(cur != q.end()); - return cur->second.front(); - } - void pop_front() { - assert(!(q.empty())); - assert(cur != q.end()); - cur->second.pop_front(); - if (cur->second.empty()) { - q.erase(cur++); - } else { - ++cur; - } - if (cur == q.end()) { - cur = q.begin(); - } - size--; - } - unsigned length() const { - assert(size >= 0); - return (unsigned)size; - } - bool empty() const { - return q.empty(); - } - void remove_by_class(K k, std::list *out) { - typename Classes::iterator i = q.find(k); - if (i == q.end()) { - return; - } - size -= i->second.size(); - if (i == cur) { - ++cur; - } - if (out) { - for (typename ListPairs::reverse_iterator j = - i->second.rbegin(); - j != i->second.rend(); - ++j) { - out->push_front(j->second); - } - } - q.erase(i); - if (cur == q.end()) { - cur = q.begin(); - } - } - - void dump(ceph::Formatter *f) const { - f->dump_int("tokens", tokens); - f->dump_int("max_tokens", max_tokens); - f->dump_int("size", size); - f->dump_int("num_keys", q.size()); - if (!empty()) { - f->dump_int("first_item_cost", front().first); - } - } - }; - - typedef std::map SubQueues; - SubQueues high_queue; - SubQueues queue; - - SubQueue *create_queue(unsigned priority) { - typename SubQueues::iterator p = queue.find(priority); - if (p != queue.end()) { - return &p->second; - } - total_priority += priority; - SubQueue *sq = &queue[priority]; - sq->set_max_tokens(max_tokens_per_subqueue); - return sq; - } - - void remove_queue(unsigned priority) { - assert(queue.count(priority)); - queue.erase(priority); - total_priority -= priority; - assert(total_priority >= 0); - } - - void distribute_tokens(unsigned cost) { - if (total_priority == 0) { - return; - } - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); - ++i) { - i->second.put_tokens(((i->first * cost) / total_priority) + 1); - } - } - -public: - PrioritizedQueue(unsigned max_per, unsigned min_c) - : total_priority(0), - max_tokens_per_subqueue(max_per), - min_cost(min_c) - {} - - unsigned length() const final { - unsigned total = 0; - for (typename SubQueues::const_iterator i = queue.begin(); - i != queue.end(); - ++i) { - assert(i->second.length()); - total += i->second.length(); - } - for (typename SubQueues::const_iterator i = high_queue.begin(); - i != high_queue.end(); - ++i) { - assert(i->second.length()); - total += i->second.length(); - } - return total; - } - - void remove_by_class(K k, std::list *out = 0) final { - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); - ) { - i->second.remove_by_class(k, out); - if (i->second.empty()) { - unsigned priority = i->first; - ++i; - remove_queue(priority); - } else { - ++i; - } - } - for (typename SubQueues::iterator i = high_queue.begin(); - i != high_queue.end(); - ) { - i->second.remove_by_class(k, out); - if (i->second.empty()) { - high_queue.erase(i++); - } else { - ++i; - } - } - } - - void enqueue_strict(K cl, unsigned priority, T item) final { - high_queue[priority].enqueue(cl, 0, item); - } - - void enqueue_strict_front(K cl, unsigned priority, T item) final { - high_queue[priority].enqueue_front(cl, 0, item); - } - - void enqueue(K cl, unsigned priority, unsigned cost, T item) final { - if (cost < min_cost) - cost = min_cost; - if (cost > max_tokens_per_subqueue) - cost = max_tokens_per_subqueue; - create_queue(priority)->enqueue(cl, cost, item); - } - - void enqueue_front(K cl, unsigned priority, unsigned cost, T item) final { - if (cost < min_cost) - cost = min_cost; - if (cost > max_tokens_per_subqueue) - cost = max_tokens_per_subqueue; - create_queue(priority)->enqueue_front(cl, cost, item); - } - - bool empty() const final { - assert(total_priority >= 0); - assert((total_priority == 0) || !(queue.empty())); - return queue.empty() && high_queue.empty(); - } - - T dequeue() final { - assert(!empty()); - - if (!(high_queue.empty())) { - T ret = high_queue.rbegin()->second.front().second; - high_queue.rbegin()->second.pop_front(); - if (high_queue.rbegin()->second.empty()) { - high_queue.erase(high_queue.rbegin()->first); - } - return ret; - } - - // if there are multiple buckets/subqueues with sufficient tokens, - // we behave like a strict priority queue among all subqueues that - // are eligible to run. - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); - ++i) { - assert(!(i->second.empty())); - if (i->second.front().first < i->second.num_tokens()) { - T ret = i->second.front().second; - unsigned cost = i->second.front().first; - i->second.take_tokens(cost); - i->second.pop_front(); - if (i->second.empty()) { - remove_queue(i->first); - } - distribute_tokens(cost); - return ret; - } - } - - // if no subqueues have sufficient tokens, we behave like a strict - // priority queue. - T ret = queue.rbegin()->second.front().second; - unsigned cost = queue.rbegin()->second.front().first; - queue.rbegin()->second.pop_front(); - if (queue.rbegin()->second.empty()) { - remove_queue(queue.rbegin()->first); - } - distribute_tokens(cost); - return ret; - } - - void dump(ceph::Formatter *f) const final { - f->dump_int("total_priority", total_priority); - f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue); - f->dump_int("min_cost", min_cost); - f->open_array_section("high_queues"); - for (typename SubQueues::const_iterator p = high_queue.begin(); - p != high_queue.end(); - ++p) { - f->open_object_section("subqueue"); - f->dump_int("priority", p->first); - p->second.dump(f); - f->close_section(); - } - f->close_section(); - f->open_array_section("queues"); - for (typename SubQueues::const_iterator p = queue.begin(); - p != queue.end(); - ++p) { - f->open_object_section("subqueue"); - f->dump_int("priority", p->first); - p->second.dump(f); - f->close_section(); - } - f->close_section(); - } -}; - -#endif