initial code repo
[stor4nfv.git] / src / ceph / src / common / WeightedPriorityQueue.h
diff --git a/src/ceph/src/common/WeightedPriorityQueue.h b/src/ceph/src/common/WeightedPriorityQueue.h
new file mode 100644 (file)
index 0000000..64ac120
--- /dev/null
@@ -0,0 +1,331 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef WP_QUEUE_H
+#define WP_QUEUE_H
+
+#include "OpQueue.h"
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/rbtree.hpp>
+#include <boost/intrusive/avl_set.hpp>
+
+namespace bi = boost::intrusive;
+
+template <typename T, typename S>
+class MapKey
+{
+  public:
+  bool operator()(const S i, const T &k) const
+  {
+    return i < k.key;
+  }
+  bool operator()(const T &k, const S i) const
+  {
+    return k.key < i;
+  }
+};
+
+template <typename T>
+class DelItem
+{
+  public:
+  void operator()(T* delete_this)
+    { delete delete_this; }
+};
+
+template <typename T, typename K>
+class WeightedPriorityQueue :  public OpQueue <T, K>
+{
+  private:
+    class ListPair : public bi::list_base_hook<>
+    {
+      public:
+        unsigned cost;
+        T item;
+        ListPair(unsigned c, T& i) :
+          cost(c),
+          item(i)
+          {}
+    };
+    class Klass : public bi::set_base_hook<>
+    {
+      typedef bi::list<ListPair> ListPairs;
+      typedef typename ListPairs::iterator Lit;
+      public:
+        K key;         // klass
+        ListPairs lp;
+        Klass(K& k) :
+          key(k)
+          {}
+      friend bool operator< (const Klass &a, const Klass &b)
+        { return a.key < b.key; }
+      friend bool operator> (const Klass &a, const Klass &b)
+        { return a.key > b.key; }
+      friend bool operator== (const Klass &a, const Klass &b)
+        { return a.key == b.key; }
+      void insert(unsigned cost, T& item, bool front) {
+        if (front) {
+          lp.push_front(*new ListPair(cost, item));
+        } else {
+          lp.push_back(*new ListPair(cost, item));
+        }
+      }
+      //Get the cost of the next item to dequeue
+      unsigned get_cost() const {
+        assert(!empty());
+        return lp.begin()->cost;
+      }
+      T pop() {
+       assert(!lp.empty());
+       T ret = lp.begin()->item;
+        lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
+        return ret;
+      }
+      bool empty() const {
+        return lp.empty();
+      }
+      unsigned get_size() const {
+       return lp.size();
+      }
+      unsigned filter_class(std::list<T>* out) {
+        unsigned count = 0;
+        for (Lit i = --lp.end();; --i) {
+          if (out) {
+            out->push_front(i->item);
+          }
+          i = lp.erase_and_dispose(i, DelItem<ListPair>());
+          ++count;
+          if (i == lp.begin()) {
+            break;
+          }
+        }
+        return count;
+      }
+    };
+    class SubQueue : public bi::set_base_hook<>
+    {
+      typedef bi::rbtree<Klass> Klasses;
+      typedef typename Klasses::iterator Kit;
+      void check_end() {
+        if (next == klasses.end()) {
+          next = klasses.begin();
+        }
+      }
+      public:
+       unsigned key;   // priority
+        Klasses klasses;
+       Kit next;
+       SubQueue(unsigned& p) :
+         key(p),
+         next(klasses.begin())
+         {}
+      friend bool operator< (const SubQueue &a, const SubQueue &b)
+        { return a.key < b.key; }
+      friend bool operator> (const SubQueue &a, const SubQueue &b)
+        { return a.key > b.key; }
+      friend bool operator== (const SubQueue &a, const SubQueue &b)
+        { return a.key == b.key; }
+      bool empty() const {
+        return klasses.empty();
+      }
+      void insert(K cl, unsigned cost, T& item, bool front = false) {
+        typename Klasses::insert_commit_data insert_data;
+       std::pair<Kit, bool> ret =
+          klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
+       if (ret.second) {
+         ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
+          check_end();
+       }
+       ret.first->insert(cost, item, front);
+      }
+      unsigned get_cost() const {
+        assert(!empty());
+        return next->get_cost();
+      }
+      T pop() {
+        T ret = next->pop();
+        if (next->empty()) {
+          next = klasses.erase_and_dispose(next, DelItem<Klass>());
+        } else {
+         ++next;
+       }
+        check_end();
+       return ret;
+      }
+      unsigned filter_class(K& cl, std::list<T>* out) {
+       unsigned count = 0;
+        Kit i = klasses.find(cl, MapKey<Klass, K>());
+        if (i != klasses.end()) {
+          count = i->filter_class(out);
+         Kit tmp = klasses.erase_and_dispose(i, DelItem<Klass>());
+         if (next == i) {
+            next = tmp;
+          }
+          check_end();
+        }
+       return count;
+      }
+      void dump(ceph::Formatter *f) const {
+        f->dump_int("num_keys", next->get_size());
+        if (!empty()) {
+          f->dump_int("first_item_cost", next->get_cost());
+        }
+      }
+    };
+    class Queue {
+      typedef bi::rbtree<SubQueue> SubQueues;
+      typedef typename SubQueues::iterator Sit;
+      SubQueues queues;
+      unsigned total_prio;
+      unsigned max_cost;
+      public:
+       unsigned size;
+       Queue() :
+         total_prio(0),
+         max_cost(0),
+         size(0)
+         {}
+       bool empty() const {
+         return !size;
+       }
+       void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
+         typename SubQueues::insert_commit_data insert_data;
+         std::pair<typename SubQueues::iterator, bool> ret =
+           queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
+         if (ret.second) {
+           ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
+           total_prio += p;
+         }
+         ret.first->insert(cl, cost, item, front);
+         if (cost > max_cost) {
+           max_cost = cost;
+         }
+         ++size;
+       }
+       T pop(bool strict = false) {
+         --size;
+         Sit i = --queues.end();
+         if (strict) {
+           T ret = i->pop();
+           if (i->empty()) {
+             queues.erase_and_dispose(i, DelItem<SubQueue>());
+           }
+           return ret;
+         }
+         if (queues.size() > 1) {
+           while (true) {
+             // Pick a new priority out of the total priority.
+             unsigned prio = rand() % total_prio + 1;
+             unsigned tp = total_prio - i->key;
+             // Find the priority coresponding to the picked number.
+             // Subtract high priorities to low priorities until the picked number
+             // is more than the total and try to dequeue that priority.
+             // Reverse the direction from previous implementation because there is a higher
+             // chance of dequeuing a high priority op so spend less time spinning.
+             while (prio <= tp) {
+               --i;
+               tp -= i->key;
+             }
+             // Flip a coin to see if this priority gets to run based on cost.
+             // The next op's cost is multiplied by .9 and subtracted from the
+             // max cost seen. Ops with lower costs will have a larger value
+             // and allow them to be selected easier than ops with high costs.
+             if (max_cost == 0 || rand() % max_cost <=
+                 (max_cost - ((i->get_cost() * 9) / 10))) {
+               break;
+             }
+             i = --queues.end();
+           }
+         }
+         T ret = i->pop();
+         if (i->empty()) {
+           total_prio -= i->key;
+           queues.erase_and_dispose(i, DelItem<SubQueue>());
+         }
+         return ret;
+       }
+       void filter_class(K& cl, std::list<T>* out) {
+         for (Sit i = queues.begin(); i != queues.end();) {
+           size -= i->filter_class(cl, out);
+           if (i->empty()) {
+             total_prio -= i->key;
+             i = queues.erase_and_dispose(i, DelItem<SubQueue>());
+           } else {
+             ++i;
+           }
+         }
+       }
+       void dump(ceph::Formatter *f) const {
+         for (typename SubQueues::const_iterator i = queues.begin();
+               i != queues.end(); ++i) {
+           f->dump_int("total_priority", total_prio);
+           f->dump_int("max_cost", max_cost);
+           f->open_object_section("subqueue");
+           f->dump_int("priority", i->key);
+           i->dump(f);
+           f->close_section();
+         }
+       }
+    };
+
+    Queue strict;
+    Queue normal;
+  public:
+    WeightedPriorityQueue(unsigned max_per, unsigned min_c) :
+      strict(),
+      normal()
+      {
+       std::srand(time(0));
+      }
+    unsigned length() const final {
+      return strict.size + normal.size;
+    }
+    void remove_by_class(K cl, std::list<T>* removed = 0) final {
+      strict.filter_class(cl, removed);
+      normal.filter_class(cl, removed);
+    }
+    bool empty() const final {
+      return !(strict.size + normal.size);
+    }
+    void enqueue_strict(K cl, unsigned p, T item) final {
+      strict.insert(p, cl, 0, item);
+    }
+    void enqueue_strict_front(K cl, unsigned p, T item) final {
+      strict.insert(p, cl, 0, item, true);
+    }
+    void enqueue(K cl, unsigned p, unsigned cost, T item) final {
+      normal.insert(p, cl, cost, item);
+    }
+    void enqueue_front(K cl, unsigned p, unsigned cost, T item) final {
+      normal.insert(p, cl, cost, item, true);
+    }
+    T dequeue() override {
+      assert(strict.size + normal.size > 0);
+      if (!strict.empty()) {
+       return strict.pop(true);
+      }
+      return normal.pop();
+    }
+    void dump(ceph::Formatter *f) const override {
+      f->open_array_section("high_queues");
+      strict.dump(f);
+      f->close_section();
+      f->open_array_section("queues");
+      normal.dump(f);
+      f->close_section();
+    }
+};
+
+#endif