+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include <errno.h>
-#include <iostream>
-#include <fstream>
-
-
-#include "SimpleMessenger.h"
-
-#include "common/config.h"
-#include "common/Timer.h"
-#include "common/errno.h"
-#include "common/valgrind.h"
-#include "auth/Crypto.h"
-#include "include/Spinlock.h"
-
-#define dout_subsys ceph_subsys_ms
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
- return *_dout << "-- " << msgr->get_myaddr() << " ";
-}
-
-
-/*******************
- * SimpleMessenger
- */
-
-SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce)
- : SimplePolicyMessenger(cct, name,mname, _nonce),
- accepter(this, _nonce),
- dispatch_queue(cct, this, mname),
- reaper_thread(this),
- nonce(_nonce),
- lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
- global_seq(0),
- cluster_protocol(0),
- reaper_started(false), reaper_stop(false),
- timeout(0),
- local_connection(new PipeConnection(cct, this))
-{
- ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
- "SimpleMessenger read timeout");
- ceph_spin_init(&global_seq_lock);
- init_local_connection();
-}
-
-/**
- * Destroy the SimpleMessenger. Pretty simple since all the work is done
- * elsewhere.
- */
-SimpleMessenger::~SimpleMessenger()
-{
- assert(!did_bind); // either we didn't bind or we shut down the Accepter
- assert(rank_pipe.empty()); // we don't have any running Pipes.
- assert(!reaper_started); // the reaper thread is stopped
- ceph_spin_destroy(&global_seq_lock);
-}
-
-void SimpleMessenger::ready()
-{
- ldout(cct,10) << "ready " << get_myaddr() << dendl;
- dispatch_queue.start();
-
- lock.Lock();
- if (did_bind)
- accepter.start();
- lock.Unlock();
-}
-
-
-int SimpleMessenger::shutdown()
-{
- ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
- mark_down_all();
-
- // break ref cycles on the loopback connection
- local_connection->set_priv(NULL);
-
- lock.Lock();
- stop_cond.Signal();
- stopped = true;
- lock.Unlock();
-
- return 0;
-}
-
-int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
-{
- // set envelope
- m->get_header().src = get_myname();
- m->set_cct(cct);
-
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
- ldout(cct,1) <<"--> " << dest.name << " "
- << dest.addr << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " " << m
- << dendl;
-
- if (dest.addr == entity_addr_t()) {
- ldout(cct,0) << "send_message message " << *m
- << " with empty dest " << dest.addr << dendl;
- m->put();
- return -EINVAL;
- }
-
- lock.Lock();
- Pipe *pipe = _lookup_pipe(dest.addr);
- submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
- dest.addr, dest.name.type(), true);
- lock.Unlock();
- return 0;
-}
-
-int SimpleMessenger::_send_message(Message *m, Connection *con)
-{
- //set envelope
- m->get_header().src = get_myname();
-
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
- ldout(cct,1) << "--> " << con->get_peer_addr()
- << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " " << m << " con " << con
- << dendl;
-
- submit_message(m, static_cast<PipeConnection*>(con),
- con->get_peer_addr(), con->get_peer_type(), false);
- return 0;
-}
-
-/**
- * If my_inst.addr doesn't have an IP set, this function
- * will fill it in from the passed addr. Otherwise it does nothing and returns.
- */
-void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
-{
- if (my_inst.addr.is_blank_ip()) {
- int port = my_inst.addr.get_port();
- my_inst.addr.u = addr.u;
- my_inst.addr.set_port(port);
- init_local_connection();
- }
-}
-
-void SimpleMessenger::set_addr(const entity_addr_t &addr)
-{
- entity_addr_t t = addr;
- t.set_nonce(nonce);
- set_myaddr(t);
- init_local_connection();
-}
-
-int SimpleMessenger::get_proto_version(int peer_type, bool connect)
-{
- int my_type = my_inst.name.type();
-
- // set reply protocol version
- if (peer_type == my_type) {
- // internal
- return cluster_protocol;
- } else {
- // public
- if (connect) {
- switch (peer_type) {
- case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
- case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
- case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
- }
- } else {
- switch (my_type) {
- case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
- case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
- case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
- }
- }
- }
- return 0;
-}
-
-
-
-
-
-
-
-/********************************************
- * SimpleMessenger
- */
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-
-void SimpleMessenger::reaper_entry()
-{
- ldout(cct,10) << "reaper_entry start" << dendl;
- lock.Lock();
- while (!reaper_stop) {
- reaper(); // may drop and retake the lock
- if (reaper_stop)
- break;
- reaper_cond.Wait(lock);
- }
- lock.Unlock();
- ldout(cct,10) << "reaper_entry done" << dendl;
-}
-
-/*
- * note: assumes lock is held
- */
-void SimpleMessenger::reaper()
-{
- ldout(cct,10) << "reaper" << dendl;
- assert(lock.is_locked());
-
- while (!pipe_reap_queue.empty()) {
- Pipe *p = pipe_reap_queue.front();
- pipe_reap_queue.pop_front();
- ldout(cct,10) << "reaper reaping pipe " << p << " " <<
- p->get_peer_addr() << dendl;
- p->pipe_lock.Lock();
- p->discard_out_queue();
- if (p->connection_state) {
- // mark_down, mark_down_all, or fault() should have done this,
- // or accept() may have switch the Connection to a different
- // Pipe... but make sure!
- bool cleared = p->connection_state->clear_pipe(p);
- assert(!cleared);
- }
- p->pipe_lock.Unlock();
- p->unregister_pipe();
- assert(pipes.count(p));
- pipes.erase(p);
-
- // drop msgr lock while joining thread; the delay through could be
- // trying to fast dispatch, preventing it from joining without
- // blocking and deadlocking.
- lock.Unlock();
- p->join();
- lock.Lock();
-
- if (p->sd >= 0)
- ::close(p->sd);
- ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- p->put();
- ldout(cct,10) << "reaper deleted pipe " << p << dendl;
- }
- ldout(cct,10) << "reaper done" << dendl;
-}
-
-void SimpleMessenger::queue_reap(Pipe *pipe)
-{
- ldout(cct,10) << "queue_reap " << pipe << dendl;
- lock.Lock();
- pipe_reap_queue.push_back(pipe);
- reaper_cond.Signal();
- lock.Unlock();
-}
-
-bool SimpleMessenger::is_connected(Connection *con)
-{
- bool r = false;
- if (con) {
- Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
- if (p) {
- assert(p->msgr == this);
- r = p->is_connected();
- p->put();
- }
- }
- return r;
-}
-
-int SimpleMessenger::bind(const entity_addr_t &bind_addr)
-{
- lock.Lock();
- if (started) {
- ldout(cct,10) << "rank.bind already started" << dendl;
- lock.Unlock();
- return -1;
- }
- ldout(cct,10) << "rank.bind " << bind_addr << dendl;
- lock.Unlock();
-
- // bind to a socket
- set<int> avoid_ports;
- int r = accepter.bind(bind_addr, avoid_ports);
- if (r >= 0)
- did_bind = true;
- return r;
-}
-
-int SimpleMessenger::rebind(const set<int>& avoid_ports)
-{
- ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
- assert(did_bind);
- accepter.stop();
- mark_down_all();
- return accepter.rebind(avoid_ports);
-}
-
-
-int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
-{
- if (!cct->_conf->ms_bind_before_connect)
- return 0;
- Mutex::Locker l(lock);
- if (did_bind) {
- assert(my_inst.addr == bind_addr);
- return 0;
- }
- if (started) {
- ldout(cct,10) << "rank.bind already started" << dendl;
- return -1;
- }
- ldout(cct,10) << "rank.bind " << bind_addr << dendl;
-
- set_myaddr(bind_addr);
- return 0;
-}
-
-
-int SimpleMessenger::start()
-{
- lock.Lock();
- ldout(cct,1) << "messenger.start" << dendl;
-
- // register at least one entity, first!
- assert(my_inst.name.type() >= 0);
-
- assert(!started);
- started = true;
- stopped = false;
-
- if (!did_bind) {
- my_inst.addr.nonce = nonce;
- init_local_connection();
- }
-
- lock.Unlock();
-
- reaper_started = true;
- reaper_thread.create("ms_reaper");
- return 0;
-}
-
-Pipe *SimpleMessenger::add_accept_pipe(int sd)
-{
- lock.Lock();
- Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
- p->sd = sd;
- p->pipe_lock.Lock();
- p->start_reader();
- p->pipe_lock.Unlock();
- pipes.insert(p);
- accepting_pipes.insert(p);
- lock.Unlock();
- return p;
-}
-
-/* connect_rank
- * NOTE: assumes messenger.lock held.
- */
-Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
- int type,
- PipeConnection *con,
- Message *first)
-{
- assert(lock.is_locked());
- assert(addr != my_inst.addr);
-
- ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
-
- // create pipe
- Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
- static_cast<PipeConnection*>(con));
- pipe->pipe_lock.Lock();
- pipe->set_peer_type(type);
- pipe->set_peer_addr(addr);
- pipe->policy = get_policy(type);
- pipe->start_writer();
- if (first)
- pipe->_send(first);
- pipe->pipe_lock.Unlock();
- pipe->register_pipe();
- pipes.insert(pipe);
-
- return pipe;
-}
-
-
-
-
-
-
-AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
-{
- return ms_deliver_get_authorizer(peer_type, force_new);
-}
-
-bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
- bool& isvalid,CryptoKey& session_key)
-{
- return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key);
-}
-
-ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
-{
- Mutex::Locker l(lock);
- if (my_inst.addr == dest.addr) {
- // local
- return local_connection;
- }
-
- // remote
- while (true) {
- Pipe *pipe = _lookup_pipe(dest.addr);
- if (pipe) {
- ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
- } else {
- pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
- ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
- }
- Mutex::Locker l(pipe->pipe_lock);
- if (pipe->connection_state)
- return pipe->connection_state;
- // we failed too quickly! retry. FIXME.
- }
-}
-
-ConnectionRef SimpleMessenger::get_loopback_connection()
-{
- return local_connection;
-}
-
-void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
- const entity_addr_t& dest_addr, int dest_type,
- bool already_locked)
-{
- m->trace.event("simple submitting message");
- if (cct->_conf->ms_dump_on_send) {
- m->encode(-1, true);
- ldout(cct, 0) << "submit_message " << *m << "\n";
- m->get_payload().hexdump(*_dout);
- if (m->get_data().length() > 0) {
- *_dout << " data:\n";
- m->get_data().hexdump(*_dout);
- }
- *_dout << dendl;
- m->clear_payload();
- }
-
- // existing connection?
- if (con) {
- Pipe *pipe = NULL;
- bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
- if (!ok) {
- ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr
- << ", failed lossy con, dropping message " << m << dendl;
- m->put();
- return;
- }
- while (pipe && ok) {
- // we loop in case of a racing reconnect, either from us or them
- pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
- if (pipe->state != Pipe::STATE_CLOSED) {
- ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
- pipe->_send(m);
- pipe->pipe_lock.Unlock();
- pipe->put();
- return;
- }
- Pipe *current_pipe;
- ok = con->try_get_pipe(¤t_pipe);
- pipe->pipe_lock.Unlock();
- if (current_pipe == pipe) {
- ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
- << ", had pipe " << pipe << ", but it closed." << dendl;
- pipe->put();
- current_pipe->put();
- m->put();
- return;
- } else {
- pipe->put();
- pipe = current_pipe;
- }
- }
- }
-
- // local?
- if (my_inst.addr == dest_addr) {
- // local
- ldout(cct,20) << "submit_message " << *m << " local" << dendl;
- m->set_connection(local_connection.get());
- dispatch_queue.local_delivery(m, m->get_priority());
- return;
- }
-
- // remote, no existing pipe.
- const Policy& policy = get_policy(dest_type);
- if (policy.server) {
- ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
- << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
- m->put();
- } else {
- ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
- if (!already_locked) {
- /** We couldn't handle the Message without reference to global data, so
- * grab the lock and do it again. If we got here, we know it's a non-lossy
- * Connection, so we can use our existing pointer without doing another lookup. */
- Mutex::Locker l(lock);
- submit_message(m, con, dest_addr, dest_type, true);
- } else {
- connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
- }
- }
-}
-
-int SimpleMessenger::send_keepalive(Connection *con)
-{
- int ret = 0;
- Pipe *pipe = static_cast<Pipe *>(
- static_cast<PipeConnection*>(con)->get_pipe());
- if (pipe) {
- ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
- assert(pipe->msgr == this);
- pipe->pipe_lock.Lock();
- pipe->_send_keepalive();
- pipe->pipe_lock.Unlock();
- pipe->put();
- } else {
- ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl;
- ret = -EPIPE;
- }
- return ret;
-}
-
-
-
-void SimpleMessenger::wait()
-{
- lock.Lock();
- if (!started) {
- lock.Unlock();
- return;
- }
- if (!stopped)
- stop_cond.Wait(lock);
-
- lock.Unlock();
-
- // done! clean up.
- if (did_bind) {
- ldout(cct,20) << "wait: stopping accepter thread" << dendl;
- accepter.stop();
- did_bind = false;
- ldout(cct,20) << "wait: stopped accepter thread" << dendl;
- }
-
- dispatch_queue.shutdown();
- if (dispatch_queue.is_started()) {
- ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
- dispatch_queue.wait();
- dispatch_queue.discard_local();
- ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
- }
-
- if (reaper_started) {
- ldout(cct,20) << "wait: stopping reaper thread" << dendl;
- lock.Lock();
- reaper_cond.Signal();
- reaper_stop = true;
- lock.Unlock();
- reaper_thread.join();
- reaper_started = false;
- ldout(cct,20) << "wait: stopped reaper thread" << dendl;
- }
-
- // close+reap all pipes
- lock.Lock();
- {
- ldout(cct,10) << "wait: closing pipes" << dendl;
-
- while (!rank_pipe.empty()) {
- Pipe *p = rank_pipe.begin()->second;
- p->unregister_pipe();
- p->pipe_lock.Lock();
- p->stop_and_wait();
- // don't generate an event here; we're shutting down anyway.
- PipeConnectionRef con = p->connection_state;
- if (con)
- con->clear_pipe(p);
- p->pipe_lock.Unlock();
- }
-
- reaper();
- ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
- while (!pipes.empty()) {
- reaper_cond.Wait(lock);
- reaper();
- }
- }
- lock.Unlock();
-
- ldout(cct,10) << "wait: done." << dendl;
- ldout(cct,1) << "shutdown complete." << dendl;
- started = false;
-}
-
-
-void SimpleMessenger::mark_down_all()
-{
- ldout(cct,1) << "mark_down_all" << dendl;
- lock.Lock();
- for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
- Pipe *p = *q;
- ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
- p->pipe_lock.Lock();
- p->stop();
- PipeConnectionRef con = p->connection_state;
- if (con && con->clear_pipe(p))
- dispatch_queue.queue_reset(con.get());
- p->pipe_lock.Unlock();
- }
- accepting_pipes.clear();
-
- while (!rank_pipe.empty()) {
- ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
- Pipe *p = it->second;
- ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl;
- rank_pipe.erase(it);
- p->unregister_pipe();
- p->pipe_lock.Lock();
- p->stop();
- PipeConnectionRef con = p->connection_state;
- if (con && con->clear_pipe(p))
- dispatch_queue.queue_reset(con.get());
- p->pipe_lock.Unlock();
- }
- lock.Unlock();
-}
-
-void SimpleMessenger::mark_down(const entity_addr_t& addr)
-{
- lock.Lock();
- Pipe *p = _lookup_pipe(addr);
- if (p) {
- ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
- p->unregister_pipe();
- p->pipe_lock.Lock();
- p->stop();
- if (p->connection_state) {
- // generate a reset event for the caller in this case, even
- // though they asked for it, since this is the addr-based (and
- // not Connection* based) interface
- PipeConnectionRef con = p->connection_state;
- if (con && con->clear_pipe(p))
- dispatch_queue.queue_reset(con.get());
- }
- p->pipe_lock.Unlock();
- } else {
- ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
- }
- lock.Unlock();
-}
-
-void SimpleMessenger::mark_down(Connection *con)
-{
- if (con == NULL)
- return;
- lock.Lock();
- Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
- if (p) {
- ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
- assert(p->msgr == this);
- p->unregister_pipe();
- p->pipe_lock.Lock();
- p->stop();
- if (p->connection_state) {
- // do not generate a reset event for the caller in this case,
- // since they asked for it.
- p->connection_state->clear_pipe(p);
- }
- p->pipe_lock.Unlock();
- p->put();
- } else {
- ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl;
- }
- lock.Unlock();
-}
-
-void SimpleMessenger::mark_disposable(Connection *con)
-{
- lock.Lock();
- Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
- if (p) {
- ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
- assert(p->msgr == this);
- p->pipe_lock.Lock();
- p->policy.lossy = true;
- p->pipe_lock.Unlock();
- p->put();
- } else {
- ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl;
- }
- lock.Unlock();
-}
-
-void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
-{
- // be careful here: multiple threads may block here, and readers of
- // my_inst.addr do NOT hold any lock.
-
- // this always goes from true -> false under the protection of the
- // mutex. if it is already false, we need not retake the mutex at
- // all.
- if (!need_addr)
- return;
-
- lock.Lock();
- if (need_addr) {
- entity_addr_t t = peer_addr_for_me;
- t.set_port(my_inst.addr.get_port());
- t.set_nonce(my_inst.addr.get_nonce());
- ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
- "SimpleMessenger learned addr");
- my_inst.addr = t;
- ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
- need_addr = false;
- init_local_connection();
- }
- lock.Unlock();
-}
-
-void SimpleMessenger::init_local_connection()
-{
- local_connection->peer_addr = my_inst.addr;
- local_connection->peer_type = my_inst.name.type();
- local_connection->set_features(CEPH_FEATURES_ALL);
- ms_deliver_handle_fast_connect(local_connection.get());
-}