1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "msg/Message.h"
16 #include "DispatchQueue.h"
17 #include "Messenger.h"
18 #include "common/ceph_context.h"
20 #define dout_subsys ceph_subsys_ms
21 #include "common/debug.h"
29 #define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
31 double DispatchQueue::get_max_age(utime_t now) const {
32 Mutex::Locker l(lock);
36 return (now - marrival.begin()->first);
39 uint64_t DispatchQueue::pre_dispatch(Message *m)
41 ldout(cct,1) << "<== " << m->get_source_inst()
42 << " " << m->get_seq()
44 << " ==== " << m->get_payload().length()
45 << "+" << m->get_middle().length()
46 << "+" << m->get_data().length()
47 << " (" << m->get_footer().front_crc << " "
48 << m->get_footer().middle_crc
49 << " " << m->get_footer().data_crc << ")"
50 << " " << m << " con " << m->get_connection()
52 uint64_t msize = m->get_dispatch_throttle_size();
53 m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
57 void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
59 dispatch_throttle_release(msize);
60 ldout(cct,20) << "done calling dispatch on " << m << dendl;
63 bool DispatchQueue::can_fast_dispatch(const Message *m) const
65 return msgr->ms_can_fast_dispatch(m);
68 void DispatchQueue::fast_dispatch(Message *m)
70 uint64_t msize = pre_dispatch(m);
71 msgr->ms_fast_dispatch(m);
72 post_dispatch(m, msize);
75 void DispatchQueue::fast_preprocess(Message *m)
77 msgr->ms_fast_preprocess(m);
80 void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
83 Mutex::Locker l(lock);
84 ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
86 if (priority >= CEPH_MSG_PRIO_LOW) {
87 mqueue.enqueue_strict(
88 id, priority, QueueItem(m));
91 id, priority, m->get_cost(), QueueItem(m));
96 void DispatchQueue::local_delivery(Message *m, int priority)
98 m->set_recv_stamp(ceph_clock_now());
99 Mutex::Locker l(local_delivery_lock);
100 if (local_messages.empty())
101 local_delivery_cond.Signal();
102 local_messages.push_back(make_pair(m, priority));
106 void DispatchQueue::run_local_delivery()
108 local_delivery_lock.Lock();
110 if (stop_local_delivery)
112 if (local_messages.empty()) {
113 local_delivery_cond.Wait(local_delivery_lock);
116 pair<Message *, int> mp = local_messages.front();
117 local_messages.pop_front();
118 local_delivery_lock.Unlock();
119 Message *m = mp.first;
120 int priority = mp.second;
122 if (can_fast_dispatch(m)) {
125 enqueue(m, priority, 0);
127 local_delivery_lock.Lock();
129 local_delivery_lock.Unlock();
132 void DispatchQueue::dispatch_throttle_release(uint64_t msize)
135 ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
136 << dispatch_throttler.get_current() << "/"
137 << dispatch_throttler.get_max() << dendl;
138 dispatch_throttler.put(msize);
143 * This function delivers incoming messages to the Messenger.
144 * Connections with messages are kept in queues; when beginning a message
145 * delivery the highest-priority queue is selected, the connection from the
146 * front of the queue is removed, and its message read. If the connection
147 * has remaining messages at that priority level, it is re-placed on to the
148 * end of the queue. If the queue is empty; it's removed.
149 * The message is then delivered and the process starts again.
151 void DispatchQueue::entry()
155 while (!mqueue.empty()) {
156 QueueItem qitem = mqueue.dequeue();
157 if (!qitem.is_code())
158 remove_arrival(qitem.get_message());
161 if (qitem.is_code()) {
162 if (cct->_conf->ms_inject_internal_delays &&
163 cct->_conf->ms_inject_delay_probability &&
164 (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
166 t.set_from_double(cct->_conf->ms_inject_internal_delays);
167 ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t
171 switch (qitem.get_code()) {
172 case D_BAD_REMOTE_RESET:
173 msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
176 msgr->ms_deliver_handle_connect(qitem.get_connection());
179 msgr->ms_deliver_handle_accept(qitem.get_connection());
182 msgr->ms_deliver_handle_reset(qitem.get_connection());
185 msgr->ms_deliver_handle_refused(qitem.get_connection());
191 Message *m = qitem.get_message();
193 ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
196 uint64_t msize = pre_dispatch(m);
197 msgr->ms_deliver_dispatch(m);
198 post_dispatch(m, msize);
207 // wait for something to be put on queue
213 void DispatchQueue::discard_queue(uint64_t id) {
214 Mutex::Locker l(lock);
215 list<QueueItem> removed;
216 mqueue.remove_by_class(id, &removed);
217 for (list<QueueItem>::iterator i = removed.begin();
220 assert(!(i->is_code())); // We don't discard id 0, ever!
221 Message *m = i->get_message();
223 dispatch_throttle_release(m->get_dispatch_throttle_size());
228 void DispatchQueue::start()
231 assert(!dispatch_thread.is_started());
232 dispatch_thread.create("ms_dispatch");
233 local_delivery_thread.create("ms_local");
236 void DispatchQueue::wait()
238 local_delivery_thread.join();
239 dispatch_thread.join();
242 void DispatchQueue::discard_local()
244 for (list<pair<Message *, int> >::iterator p = local_messages.begin();
245 p != local_messages.end();
247 ldout(cct,20) << __func__ << " " << p->first << dendl;
250 local_messages.clear();
253 void DispatchQueue::shutdown()
255 // stop my local delivery thread
256 local_delivery_lock.Lock();
257 stop_local_delivery = true;
258 local_delivery_cond.Signal();
259 local_delivery_lock.Unlock();
261 // stop my dispatch thread