X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2FDispatchQueue.h;fp=src%2Fceph%2Fsrc%2Fmsg%2FDispatchQueue.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=55698819361da7489f5f1b1a2a5abe8a6b551a1c;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/DispatchQueue.h b/src/ceph/src/msg/DispatchQueue.h deleted file mode 100644 index 5569881..0000000 --- a/src/ceph/src/msg/DispatchQueue.h +++ /dev/null @@ -1,233 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_DISPATCHQUEUE_H -#define CEPH_DISPATCHQUEUE_H - -#include -#include -#include -#include "include/assert.h" -#include "include/xlist.h" -#include "common/Mutex.h" -#include "common/Cond.h" -#include "common/Thread.h" -#include "common/PrioritizedQueue.h" - -class CephContext; -class Messenger; -class Message; -struct Connection; - -/** - * The DispatchQueue contains all the connections which have Messages - * they want to be dispatched, carefully organized by Message priority - * and permitted to deliver in a round-robin fashion. - * See Messenger::dispatch_entry for details. - */ -class DispatchQueue { - class QueueItem { - int type; - ConnectionRef con; - MessageRef m; - public: - explicit QueueItem(Message *m) : type(-1), con(0), m(m) {} - QueueItem(int type, Connection *con) : type(type), con(con), m(0) {} - bool is_code() const { - return type != -1; - } - int get_code () const { - assert(is_code()); - return type; - } - Message *get_message() { - assert(!is_code()); - return m.get(); - } - Connection *get_connection() { - assert(is_code()); - return con.get(); - } - }; - - CephContext *cct; - Messenger *msgr; - mutable Mutex lock; - Cond cond; - - PrioritizedQueue mqueue; - - set > marrival; - map >::iterator> marrival_map; - void add_arrival(Message *m) { - marrival_map.insert( - make_pair( - m, - marrival.insert(make_pair(m->get_recv_stamp(), m)).first - ) - ); - } - void remove_arrival(Message *m) { - map >::iterator>::iterator i = - marrival_map.find(m); - assert(i != marrival_map.end()); - marrival.erase(i->second); - marrival_map.erase(i); - } - - std::atomic next_id; - - enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES }; - - /** - * The DispatchThread runs dispatch_entry to empty out the dispatch_queue. - */ - class DispatchThread : public Thread { - DispatchQueue *dq; - public: - explicit DispatchThread(DispatchQueue *dq) : dq(dq) {} - void *entry() override { - dq->entry(); - return 0; - } - } dispatch_thread; - - Mutex local_delivery_lock; - Cond local_delivery_cond; - bool stop_local_delivery; - list > local_messages; - class LocalDeliveryThread : public Thread { - DispatchQueue *dq; - public: - explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {} - void *entry() override { - dq->run_local_delivery(); - return 0; - } - } local_delivery_thread; - - uint64_t pre_dispatch(Message *m); - void post_dispatch(Message *m, uint64_t msize); - - public: - - /// Throttle preventing us from building up a big backlog waiting for dispatch - Throttle dispatch_throttler; - - bool stop; - void local_delivery(Message *m, int priority); - void run_local_delivery(); - - double get_max_age(utime_t now) const; - - int get_queue_len() const { - Mutex::Locker l(lock); - return mqueue.length(); - } - - /** - * Release memory accounting back to the dispatch throttler. - * - * @param msize The amount of memory to release. - */ - void dispatch_throttle_release(uint64_t msize); - - void queue_connect(Connection *con) { - Mutex::Locker l(lock); - if (stop) - return; - mqueue.enqueue_strict( - 0, - CEPH_MSG_PRIO_HIGHEST, - QueueItem(D_CONNECT, con)); - cond.Signal(); - } - void queue_accept(Connection *con) { - Mutex::Locker l(lock); - if (stop) - return; - mqueue.enqueue_strict( - 0, - CEPH_MSG_PRIO_HIGHEST, - QueueItem(D_ACCEPT, con)); - cond.Signal(); - } - void queue_remote_reset(Connection *con) { - Mutex::Locker l(lock); - if (stop) - return; - mqueue.enqueue_strict( - 0, - CEPH_MSG_PRIO_HIGHEST, - QueueItem(D_BAD_REMOTE_RESET, con)); - cond.Signal(); - } - void queue_reset(Connection *con) { - Mutex::Locker l(lock); - if (stop) - return; - mqueue.enqueue_strict( - 0, - CEPH_MSG_PRIO_HIGHEST, - QueueItem(D_BAD_RESET, con)); - cond.Signal(); - } - void queue_refused(Connection *con) { - Mutex::Locker l(lock); - if (stop) - return; - mqueue.enqueue_strict( - 0, - CEPH_MSG_PRIO_HIGHEST, - QueueItem(D_CONN_REFUSED, con)); - cond.Signal(); - } - - bool can_fast_dispatch(const Message *m) const; - void fast_dispatch(Message *m); - void fast_preprocess(Message *m); - void enqueue(Message *m, int priority, uint64_t id); - void discard_queue(uint64_t id); - void discard_local(); - uint64_t get_id() { - return next_id++; - } - void start(); - void entry(); - void wait(); - void shutdown(); - bool is_started() const {return dispatch_thread.is_started();} - - DispatchQueue(CephContext *cct, Messenger *msgr, string &name) - : cct(cct), msgr(msgr), - lock("Messenger::DispatchQueue::lock" + name), - mqueue(cct->_conf->ms_pq_max_tokens_per_priority, - cct->_conf->ms_pq_min_cost), - next_id(1), - dispatch_thread(this), - local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name), - stop_local_delivery(false), - local_delivery_thread(this), - dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name, - cct->_conf->ms_dispatch_throttle_bytes), - stop(false) - {} - ~DispatchQueue() { - assert(mqueue.empty()); - assert(marrival.empty()); - assert(local_messages.empty()); - } -}; - -#endif