+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "msg/Message.h"
-#include "DispatchQueue.h"
-#include "Messenger.h"
-#include "common/ceph_context.h"
-
-#define dout_subsys ceph_subsys_ms
-#include "common/debug.h"
-
-
-/*******************
- * DispatchQueue
- */
-
-#undef dout_prefix
-#define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
-
-double DispatchQueue::get_max_age(utime_t now) const {
- Mutex::Locker l(lock);
- if (marrival.empty())
- return 0;
- else
- return (now - marrival.begin()->first);
-}
-
-uint64_t DispatchQueue::pre_dispatch(Message *m)
-{
- ldout(cct,1) << "<== " << m->get_source_inst()
- << " " << m->get_seq()
- << " ==== " << *m
- << " ==== " << m->get_payload().length()
- << "+" << m->get_middle().length()
- << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " "
- << m->get_footer().middle_crc
- << " " << m->get_footer().data_crc << ")"
- << " " << m << " con " << m->get_connection()
- << dendl;
- uint64_t msize = m->get_dispatch_throttle_size();
- m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
- return msize;
-}
-
-void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
-{
- dispatch_throttle_release(msize);
- ldout(cct,20) << "done calling dispatch on " << m << dendl;
-}
-
-bool DispatchQueue::can_fast_dispatch(const Message *m) const
-{
- return msgr->ms_can_fast_dispatch(m);
-}
-
-void DispatchQueue::fast_dispatch(Message *m)
-{
- uint64_t msize = pre_dispatch(m);
- msgr->ms_fast_dispatch(m);
- post_dispatch(m, msize);
-}
-
-void DispatchQueue::fast_preprocess(Message *m)
-{
- msgr->ms_fast_preprocess(m);
-}
-
-void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
-{
-
- Mutex::Locker l(lock);
- ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
- add_arrival(m);
- if (priority >= CEPH_MSG_PRIO_LOW) {
- mqueue.enqueue_strict(
- id, priority, QueueItem(m));
- } else {
- mqueue.enqueue(
- id, priority, m->get_cost(), QueueItem(m));
- }
- cond.Signal();
-}
-
-void DispatchQueue::local_delivery(Message *m, int priority)
-{
- m->set_recv_stamp(ceph_clock_now());
- Mutex::Locker l(local_delivery_lock);
- if (local_messages.empty())
- local_delivery_cond.Signal();
- local_messages.push_back(make_pair(m, priority));
- return;
-}
-
-void DispatchQueue::run_local_delivery()
-{
- local_delivery_lock.Lock();
- while (true) {
- if (stop_local_delivery)
- break;
- if (local_messages.empty()) {
- local_delivery_cond.Wait(local_delivery_lock);
- continue;
- }
- pair<Message *, int> mp = local_messages.front();
- local_messages.pop_front();
- local_delivery_lock.Unlock();
- Message *m = mp.first;
- int priority = mp.second;
- fast_preprocess(m);
- if (can_fast_dispatch(m)) {
- fast_dispatch(m);
- } else {
- enqueue(m, priority, 0);
- }
- local_delivery_lock.Lock();
- }
- local_delivery_lock.Unlock();
-}
-
-void DispatchQueue::dispatch_throttle_release(uint64_t msize)
-{
- if (msize) {
- ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
- << dispatch_throttler.get_current() << "/"
- << dispatch_throttler.get_max() << dendl;
- dispatch_throttler.put(msize);
- }
-}
-
-/*
- * This function delivers incoming messages to the Messenger.
- * Connections with messages are kept in queues; when beginning a message
- * delivery the highest-priority queue is selected, the connection from the
- * front of the queue is removed, and its message read. If the connection
- * has remaining messages at that priority level, it is re-placed on to the
- * end of the queue. If the queue is empty; it's removed.
- * The message is then delivered and the process starts again.
- */
-void DispatchQueue::entry()
-{
- lock.Lock();
- while (true) {
- while (!mqueue.empty()) {
- QueueItem qitem = mqueue.dequeue();
- if (!qitem.is_code())
- remove_arrival(qitem.get_message());
- lock.Unlock();
-
- if (qitem.is_code()) {
- if (cct->_conf->ms_inject_internal_delays &&
- cct->_conf->ms_inject_delay_probability &&
- (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
- utime_t t;
- t.set_from_double(cct->_conf->ms_inject_internal_delays);
- ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t
- << dendl;
- t.sleep();
- }
- switch (qitem.get_code()) {
- case D_BAD_REMOTE_RESET:
- msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
- break;
- case D_CONNECT:
- msgr->ms_deliver_handle_connect(qitem.get_connection());
- break;
- case D_ACCEPT:
- msgr->ms_deliver_handle_accept(qitem.get_connection());
- break;
- case D_BAD_RESET:
- msgr->ms_deliver_handle_reset(qitem.get_connection());
- break;
- case D_CONN_REFUSED:
- msgr->ms_deliver_handle_refused(qitem.get_connection());
- break;
- default:
- ceph_abort();
- }
- } else {
- Message *m = qitem.get_message();
- if (stop) {
- ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
- m->put();
- } else {
- uint64_t msize = pre_dispatch(m);
- msgr->ms_deliver_dispatch(m);
- post_dispatch(m, msize);
- }
- }
-
- lock.Lock();
- }
- if (stop)
- break;
-
- // wait for something to be put on queue
- cond.Wait(lock);
- }
- lock.Unlock();
-}
-
-void DispatchQueue::discard_queue(uint64_t id) {
- Mutex::Locker l(lock);
- list<QueueItem> removed;
- mqueue.remove_by_class(id, &removed);
- for (list<QueueItem>::iterator i = removed.begin();
- i != removed.end();
- ++i) {
- assert(!(i->is_code())); // We don't discard id 0, ever!
- Message *m = i->get_message();
- remove_arrival(m);
- dispatch_throttle_release(m->get_dispatch_throttle_size());
- m->put();
- }
-}
-
-void DispatchQueue::start()
-{
- assert(!stop);
- assert(!dispatch_thread.is_started());
- dispatch_thread.create("ms_dispatch");
- local_delivery_thread.create("ms_local");
-}
-
-void DispatchQueue::wait()
-{
- local_delivery_thread.join();
- dispatch_thread.join();
-}
-
-void DispatchQueue::discard_local()
-{
- for (list<pair<Message *, int> >::iterator p = local_messages.begin();
- p != local_messages.end();
- ++p) {
- ldout(cct,20) << __func__ << " " << p->first << dendl;
- p->first->put();
- }
- local_messages.clear();
-}
-
-void DispatchQueue::shutdown()
-{
- // stop my local delivery thread
- local_delivery_lock.Lock();
- stop_local_delivery = true;
- local_delivery_cond.Signal();
- local_delivery_lock.Unlock();
-
- // stop my dispatch thread
- lock.Lock();
- stop = true;
- cond.Signal();
- lock.Unlock();
-}