Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / DispatchQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_DISPATCHQUEUE_H
16 #define CEPH_DISPATCHQUEUE_H
17
18 #include <atomic>
19 #include <map>
20 #include <boost/intrusive_ptr.hpp>
21 #include "include/assert.h"
22 #include "include/xlist.h"
23 #include "common/Mutex.h"
24 #include "common/Cond.h"
25 #include "common/Thread.h"
26 #include "common/PrioritizedQueue.h"
27
28 class CephContext;
29 class Messenger;
30 class Message;
31 struct Connection;
32
33 /**
34  * The DispatchQueue contains all the connections which have Messages
35  * they want to be dispatched, carefully organized by Message priority
36  * and permitted to deliver in a round-robin fashion.
37  * See Messenger::dispatch_entry for details.
38  */
39 class DispatchQueue {
40   class QueueItem {
41     int type;
42     ConnectionRef con;
43     MessageRef m;
44   public:
45     explicit QueueItem(Message *m) : type(-1), con(0), m(m) {}
46     QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
47     bool is_code() const {
48       return type != -1;
49     }
50     int get_code () const {
51       assert(is_code());
52       return type;
53     }
54     Message *get_message() {
55       assert(!is_code());
56       return m.get();
57     }
58     Connection *get_connection() {
59       assert(is_code());
60       return con.get();
61     }
62   };
63     
64   CephContext *cct;
65   Messenger *msgr;
66   mutable Mutex lock;
67   Cond cond;
68
69   PrioritizedQueue<QueueItem, uint64_t> mqueue;
70
71   set<pair<double, Message*> > marrival;
72   map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
73   void add_arrival(Message *m) {
74     marrival_map.insert(
75       make_pair(
76         m,
77         marrival.insert(make_pair(m->get_recv_stamp(), m)).first
78         )
79       );
80   }
81   void remove_arrival(Message *m) {
82     map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
83       marrival_map.find(m);
84     assert(i != marrival_map.end());
85     marrival.erase(i->second);
86     marrival_map.erase(i);
87   }
88
89   std::atomic<uint64_t> next_id;
90     
91   enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
92
93   /**
94    * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
95    */
96   class DispatchThread : public Thread {
97     DispatchQueue *dq;
98   public:
99     explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
100     void *entry() override {
101       dq->entry();
102       return 0;
103     }
104   } dispatch_thread;
105
106   Mutex local_delivery_lock;
107   Cond local_delivery_cond;
108   bool stop_local_delivery;
109   list<pair<Message *, int> > local_messages;
110   class LocalDeliveryThread : public Thread {
111     DispatchQueue *dq;
112   public:
113     explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
114     void *entry() override {
115       dq->run_local_delivery();
116       return 0;
117     }
118   } local_delivery_thread;
119
120   uint64_t pre_dispatch(Message *m);
121   void post_dispatch(Message *m, uint64_t msize);
122
123  public:
124
125   /// Throttle preventing us from building up a big backlog waiting for dispatch
126   Throttle dispatch_throttler;
127
128   bool stop;
129   void local_delivery(Message *m, int priority);
130   void run_local_delivery();
131
132   double get_max_age(utime_t now) const;
133
134   int get_queue_len() const {
135     Mutex::Locker l(lock);
136     return mqueue.length();
137   }
138
139   /**
140    * Release memory accounting back to the dispatch throttler.
141    *
142    * @param msize The amount of memory to release.
143    */
144   void dispatch_throttle_release(uint64_t msize);
145
146   void queue_connect(Connection *con) {
147     Mutex::Locker l(lock);
148     if (stop)
149       return;
150     mqueue.enqueue_strict(
151       0,
152       CEPH_MSG_PRIO_HIGHEST,
153       QueueItem(D_CONNECT, con));
154     cond.Signal();
155   }
156   void queue_accept(Connection *con) {
157     Mutex::Locker l(lock);
158     if (stop)
159       return;
160     mqueue.enqueue_strict(
161       0,
162       CEPH_MSG_PRIO_HIGHEST,
163       QueueItem(D_ACCEPT, con));
164     cond.Signal();
165   }
166   void queue_remote_reset(Connection *con) {
167     Mutex::Locker l(lock);
168     if (stop)
169       return;
170     mqueue.enqueue_strict(
171       0,
172       CEPH_MSG_PRIO_HIGHEST,
173       QueueItem(D_BAD_REMOTE_RESET, con));
174     cond.Signal();
175   }
176   void queue_reset(Connection *con) {
177     Mutex::Locker l(lock);
178     if (stop)
179       return;
180     mqueue.enqueue_strict(
181       0,
182       CEPH_MSG_PRIO_HIGHEST,
183       QueueItem(D_BAD_RESET, con));
184     cond.Signal();
185   }
186   void queue_refused(Connection *con) {
187     Mutex::Locker l(lock);
188     if (stop)
189       return;
190     mqueue.enqueue_strict(
191       0,
192       CEPH_MSG_PRIO_HIGHEST,
193       QueueItem(D_CONN_REFUSED, con));
194     cond.Signal();
195   }
196
197   bool can_fast_dispatch(const Message *m) const;
198   void fast_dispatch(Message *m);
199   void fast_preprocess(Message *m);
200   void enqueue(Message *m, int priority, uint64_t id);
201   void discard_queue(uint64_t id);
202   void discard_local();
203   uint64_t get_id() {
204     return next_id++;
205   }
206   void start();
207   void entry();
208   void wait();
209   void shutdown();
210   bool is_started() const {return dispatch_thread.is_started();}
211
212   DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
213     : cct(cct), msgr(msgr),
214       lock("Messenger::DispatchQueue::lock" + name),
215       mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
216              cct->_conf->ms_pq_min_cost),
217       next_id(1),
218       dispatch_thread(this),
219       local_delivery_lock("Messenger::DispatchQueue::local_delivery_lock" + name),
220       stop_local_delivery(false),
221       local_delivery_thread(this),
222       dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
223                          cct->_conf->ms_dispatch_throttle_bytes),
224       stop(false)
225     {}
226   ~DispatchQueue() {
227     assert(mqueue.empty());
228     assert(marrival.empty());
229     assert(local_messages.empty());
230   }
231 };
232
233 #endif