X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FmClockPriorityQueue.h;fp=src%2Fceph%2Fsrc%2Fcommon%2FmClockPriorityQueue.h;h=7f7b7c35b5c8c1fdebdfd3417ad21a71b0c331b0;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/common/mClockPriorityQueue.h b/src/ceph/src/common/mClockPriorityQueue.h new file mode 100644 index 0000000..7f7b7c3 --- /dev/null +++ b/src/ceph/src/common/mClockPriorityQueue.h @@ -0,0 +1,361 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + + +#include +#include +#include +#include + +#include "common/Formatter.h" +#include "common/OpQueue.h" + +#include "dmclock/src/dmclock_server.h" + +// the following is done to unclobber _ASSERT_H so it returns to the +// way ceph likes it +#include "include/assert.h" + + +namespace ceph { + + namespace dmc = crimson::dmclock; + + template + class mClockQueue : public OpQueue { + + using priority_t = unsigned; + using cost_t = unsigned; + + typedef std::list > ListPairs; + + static unsigned filter_list_pairs(ListPairs *l, + std::function f, + std::list* out = nullptr) { + unsigned ret = 0; + for (typename ListPairs::iterator i = l->end(); + i != l->begin(); + /* no inc */ + ) { + auto next = i; + --next; + if (f(next->second)) { + ++ret; + if (out) out->push_back(next->second); + l->erase(next); + } else { + i = next; + } + } + return ret; + } + + struct SubQueue { + private: + typedef std::map Classes; + // client-class to ordered queue + Classes q; + + unsigned tokens, max_tokens; + int64_t size; + + typename Classes::iterator cur; + + public: + + SubQueue(const SubQueue &other) + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), + cur(q.begin()) {} + + SubQueue() + : tokens(0), + max_tokens(0), + size(0), cur(q.begin()) {} + + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + + unsigned get_max_tokens() const { + return max_tokens; + } + + unsigned num_tokens() const { + return tokens; + } + + void put_tokens(unsigned t) { + tokens += t; + if (tokens > max_tokens) { + tokens = max_tokens; + } + } + + void take_tokens(unsigned t) { + if (tokens > t) { + tokens -= t; + } else { + tokens = 0; + } + } + + void enqueue(K cl, cost_t cost, T item) { + q[cl].push_back(std::make_pair(cost, item)); + if (cur == q.end()) + cur = q.begin(); + size++; + } + + void enqueue_front(K cl, cost_t cost, T item) { + q[cl].push_front(std::make_pair(cost, item)); + if (cur == q.end()) + cur = q.begin(); + size++; + } + + std::pair front() const { + assert(!(q.empty())); + assert(cur != q.end()); + return cur->second.front(); + } + + void pop_front() { + assert(!(q.empty())); + assert(cur != q.end()); + cur->second.pop_front(); + if (cur->second.empty()) { + auto i = cur; + ++cur; + q.erase(i); + } else { + ++cur; + } + if (cur == q.end()) { + cur = q.begin(); + } + size--; + } + + unsigned length() const { + assert(size >= 0); + return (unsigned)size; + } + + bool empty() const { + return q.empty(); + } + + void remove_by_filter(std::function f) { + for (typename Classes::iterator i = q.begin(); + i != q.end(); + /* no-inc */) { + size -= filter_list_pairs(&(i->second), f); + if (i->second.empty()) { + if (cur == i) { + ++cur; + } + i = q.erase(i); + } else { + ++i; + } + } + if (cur == q.end()) cur = q.begin(); + } + + void remove_by_class(K k, std::list *out) { + typename Classes::iterator i = q.find(k); + if (i == q.end()) { + return; + } + size -= i->second.size(); + if (i == cur) { + ++cur; + } + if (out) { + for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { + out->push_front(j->second); + } + } + q.erase(i); + if (cur == q.end()) cur = q.begin(); + } + + void dump(ceph::Formatter *f) const { + f->dump_int("size", size); + f->dump_int("num_keys", q.size()); + } + }; + + using SubQueues = std::map; + + SubQueues high_queue; + + dmc::PullPriorityQueue queue; + + // when enqueue_front is called, rather than try to re-calc tags + // to put in mClock priority queue, we'll just keep a separate + // list from which we dequeue items first, and only when it's + // empty do we use queue. + std::list> queue_front; + + public: + + mClockQueue( + const typename dmc::PullPriorityQueue::ClientInfoFunc& info_func) : + queue(info_func, true) + { + // empty + } + + unsigned length() const override final { + unsigned total = 0; + total += queue_front.size(); + total += queue.request_count(); + for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) { + assert(i->second.length()); + total += i->second.length(); + } + return total; + } + + // be sure to do things in reverse priority order and push_front + // to the list so items end up on list in front-to-back priority + // order + void remove_by_filter(std::function filter_accum) { + queue.remove_by_req_filter(filter_accum, true); + + for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { + if (filter_accum(i->second)) { + i = decltype(i){ queue_front.erase(std::next(i).base()) }; + } else { + ++i; + } + } + + for (typename SubQueues::iterator i = high_queue.begin(); + i != high_queue.end(); + /* no-inc */ ) { + i->second.remove_by_filter(filter_accum); + if (i->second.empty()) { + i = high_queue.erase(i); + } else { + ++i; + } + } + } + + void remove_by_class(K k, std::list *out = nullptr) override final { + if (out) { + queue.remove_by_client(k, + true, + [&out] (const T& t) { out->push_front(t); }); + } else { + queue.remove_by_client(k, true); + } + + for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { + if (k == i->first) { + if (nullptr != out) out->push_front(i->second); + i = decltype(i){ queue_front.erase(std::next(i).base()) }; + } else { + ++i; + } + } + + for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) { + i->second.remove_by_class(k, out); + if (i->second.empty()) { + i = high_queue.erase(i); + } else { + ++i; + } + } + } + + void enqueue_strict(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue(cl, 0, item); + } + + void enqueue_strict_front(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue_front(cl, 0, item); + } + + void enqueue(K cl, unsigned priority, unsigned cost, T item) override final { + // priority is ignored + queue.add_request(std::move(item), cl, cost); + } + + void enqueue_front(K cl, + unsigned priority, + unsigned cost, + T item) override final { + queue_front.emplace_front(std::pair(cl, item)); + } + + bool empty() const override final { + return queue.empty() && high_queue.empty() && queue_front.empty(); + } + + T dequeue() override final { + assert(!empty()); + + if (!(high_queue.empty())) { + T ret = high_queue.rbegin()->second.front().second; + high_queue.rbegin()->second.pop_front(); + if (high_queue.rbegin()->second.empty()) { + high_queue.erase(high_queue.rbegin()->first); + } + return ret; + } + + if (!queue_front.empty()) { + T ret = queue_front.front().second; + queue_front.pop_front(); + return ret; + } + + auto pr = queue.pull_request(); + assert(pr.is_retn()); + auto& retn = pr.get_retn(); + return *(retn.request); + } + + void dump(ceph::Formatter *f) const override final { + f->open_array_section("high_queues"); + for (typename SubQueues::const_iterator p = high_queue.begin(); + p != high_queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + + f->open_object_section("queue_front"); + f->dump_int("size", queue_front.size()); + f->close_section(); + + f->open_object_section("queue"); + f->dump_int("size", queue.request_count()); + f->close_section(); + } // dump + }; + +} // namespace ceph