Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / DispatchQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "msg/Message.h"
16 #include "DispatchQueue.h"
17 #include "Messenger.h"
18 #include "common/ceph_context.h"
19
20 #define dout_subsys ceph_subsys_ms
21 #include "common/debug.h"
22
23
24 /*******************
25  * DispatchQueue
26  */
27
28 #undef dout_prefix
29 #define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
30
31 double DispatchQueue::get_max_age(utime_t now) const {
32   Mutex::Locker l(lock);
33   if (marrival.empty())
34     return 0;
35   else
36     return (now - marrival.begin()->first);
37 }
38
39 uint64_t DispatchQueue::pre_dispatch(Message *m)
40 {
41   ldout(cct,1) << "<== " << m->get_source_inst()
42                << " " << m->get_seq()
43                << " ==== " << *m
44                << " ==== " << m->get_payload().length()
45                << "+" << m->get_middle().length()
46                << "+" << m->get_data().length()
47                << " (" << m->get_footer().front_crc << " "
48                << m->get_footer().middle_crc
49                << " " << m->get_footer().data_crc << ")"
50                << " " << m << " con " << m->get_connection()
51                << dendl;
52   uint64_t msize = m->get_dispatch_throttle_size();
53   m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
54   return msize;
55 }
56
57 void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
58 {
59   dispatch_throttle_release(msize);
60   ldout(cct,20) << "done calling dispatch on " << m << dendl;
61 }
62
63 bool DispatchQueue::can_fast_dispatch(const Message *m) const
64 {
65   return msgr->ms_can_fast_dispatch(m);
66 }
67
68 void DispatchQueue::fast_dispatch(Message *m)
69 {
70   uint64_t msize = pre_dispatch(m);
71   msgr->ms_fast_dispatch(m);
72   post_dispatch(m, msize);
73 }
74
75 void DispatchQueue::fast_preprocess(Message *m)
76 {
77   msgr->ms_fast_preprocess(m);
78 }
79
80 void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
81 {
82
83   Mutex::Locker l(lock);
84   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
85   add_arrival(m);
86   if (priority >= CEPH_MSG_PRIO_LOW) {
87     mqueue.enqueue_strict(
88         id, priority, QueueItem(m));
89   } else {
90     mqueue.enqueue(
91         id, priority, m->get_cost(), QueueItem(m));
92   }
93   cond.Signal();
94 }
95
96 void DispatchQueue::local_delivery(Message *m, int priority)
97 {
98   m->set_recv_stamp(ceph_clock_now());
99   Mutex::Locker l(local_delivery_lock);
100   if (local_messages.empty())
101     local_delivery_cond.Signal();
102   local_messages.push_back(make_pair(m, priority));
103   return;
104 }
105
106 void DispatchQueue::run_local_delivery()
107 {
108   local_delivery_lock.Lock();
109   while (true) {
110     if (stop_local_delivery)
111       break;
112     if (local_messages.empty()) {
113       local_delivery_cond.Wait(local_delivery_lock);
114       continue;
115     }
116     pair<Message *, int> mp = local_messages.front();
117     local_messages.pop_front();
118     local_delivery_lock.Unlock();
119     Message *m = mp.first;
120     int priority = mp.second;
121     fast_preprocess(m);
122     if (can_fast_dispatch(m)) {
123       fast_dispatch(m);
124     } else {
125       enqueue(m, priority, 0);
126     }
127     local_delivery_lock.Lock();
128   }
129   local_delivery_lock.Unlock();
130 }
131
132 void DispatchQueue::dispatch_throttle_release(uint64_t msize)
133 {
134   if (msize) {
135     ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
136             << dispatch_throttler.get_current() << "/"
137             << dispatch_throttler.get_max() << dendl;
138     dispatch_throttler.put(msize);
139   }
140 }
141
142 /*
143  * This function delivers incoming messages to the Messenger.
144  * Connections with messages are kept in queues; when beginning a message
145  * delivery the highest-priority queue is selected, the connection from the
146  * front of the queue is removed, and its message read. If the connection
147  * has remaining messages at that priority level, it is re-placed on to the
148  * end of the queue. If the queue is empty; it's removed.
149  * The message is then delivered and the process starts again.
150  */
151 void DispatchQueue::entry()
152 {
153   lock.Lock();
154   while (true) {
155     while (!mqueue.empty()) {
156       QueueItem qitem = mqueue.dequeue();
157       if (!qitem.is_code())
158         remove_arrival(qitem.get_message());
159       lock.Unlock();
160
161       if (qitem.is_code()) {
162         if (cct->_conf->ms_inject_internal_delays &&
163             cct->_conf->ms_inject_delay_probability &&
164             (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
165           utime_t t;
166           t.set_from_double(cct->_conf->ms_inject_internal_delays);
167           ldout(cct, 1) << "DispatchQueue::entry  inject delay of " << t
168                         << dendl;
169           t.sleep();
170         }
171         switch (qitem.get_code()) {
172         case D_BAD_REMOTE_RESET:
173           msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
174           break;
175         case D_CONNECT:
176           msgr->ms_deliver_handle_connect(qitem.get_connection());
177           break;
178         case D_ACCEPT:
179           msgr->ms_deliver_handle_accept(qitem.get_connection());
180           break;
181         case D_BAD_RESET:
182           msgr->ms_deliver_handle_reset(qitem.get_connection());
183           break;
184         case D_CONN_REFUSED:
185           msgr->ms_deliver_handle_refused(qitem.get_connection());
186           break;
187         default:
188           ceph_abort();
189         }
190       } else {
191         Message *m = qitem.get_message();
192         if (stop) {
193           ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
194           m->put();
195         } else {
196           uint64_t msize = pre_dispatch(m);
197           msgr->ms_deliver_dispatch(m);
198           post_dispatch(m, msize);
199         }
200       }
201
202       lock.Lock();
203     }
204     if (stop)
205       break;
206
207     // wait for something to be put on queue
208     cond.Wait(lock);
209   }
210   lock.Unlock();
211 }
212
213 void DispatchQueue::discard_queue(uint64_t id) {
214   Mutex::Locker l(lock);
215   list<QueueItem> removed;
216   mqueue.remove_by_class(id, &removed);
217   for (list<QueueItem>::iterator i = removed.begin();
218        i != removed.end();
219        ++i) {
220     assert(!(i->is_code())); // We don't discard id 0, ever!
221     Message *m = i->get_message();
222     remove_arrival(m);
223     dispatch_throttle_release(m->get_dispatch_throttle_size());
224     m->put();
225   }
226 }
227
228 void DispatchQueue::start()
229 {
230   assert(!stop);
231   assert(!dispatch_thread.is_started());
232   dispatch_thread.create("ms_dispatch");
233   local_delivery_thread.create("ms_local");
234 }
235
236 void DispatchQueue::wait()
237 {
238   local_delivery_thread.join();
239   dispatch_thread.join();
240 }
241
242 void DispatchQueue::discard_local()
243 {
244   for (list<pair<Message *, int> >::iterator p = local_messages.begin();
245        p != local_messages.end();
246        ++p) {
247     ldout(cct,20) << __func__ << " " << p->first << dendl;
248     p->first->put();
249   }
250   local_messages.clear();
251 }
252
253 void DispatchQueue::shutdown()
254 {
255   // stop my local delivery thread
256   local_delivery_lock.Lock();
257   stop_local_delivery = true;
258   local_delivery_cond.Signal();
259   local_delivery_lock.Unlock();
260
261   // stop my dispatch thread
262   lock.Lock();
263   stop = true;
264   cond.Signal();
265   lock.Unlock();
266 }