X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FPipe.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FPipe.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=848efd45c0090f2381a930d11a6aae1014ed6572;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/simple/Pipe.cc b/src/ceph/src/msg/simple/Pipe.cc deleted file mode 100644 index 848efd4..0000000 --- a/src/ceph/src/msg/simple/Pipe.cc +++ /dev/null @@ -1,2678 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "msg/Message.h" -#include "Pipe.h" -#include "SimpleMessenger.h" - -#include "common/debug.h" -#include "common/errno.h" -#include "common/valgrind.h" - -// Below included to get encode_encrypt(); That probably should be in Crypto.h, instead - -#include "auth/Crypto.h" -#include "auth/cephx/CephxProtocol.h" -#include "auth/AuthSessionHandler.h" - -#include "include/sock_compat.h" - -// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR -#define SEQ_MASK 0x7fffffff -#define dout_subsys ceph_subsys_ms - -#undef dout_prefix -#define dout_prefix *_dout << *this -ostream& Pipe::_pipe_prefix(std::ostream &out) const { - return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this - << " sd=" << sd << " :" << port - << " s=" << state - << " pgs=" << peer_global_seq - << " cs=" << connect_seq - << " l=" << policy.lossy - << " c=" << connection_state - << ")."; -} - -ostream& operator<<(ostream &out, const Pipe &pipe) { - return pipe._pipe_prefix(out); -} - -/** - * The DelayedDelivery is for injecting delays into Message delivery off - * the socket. It is only enabled if delays are requested, and if they - * are then it pulls Messages off the DelayQueue and puts them into the - * in_q (SimpleMessenger::dispatch_queue). - * Please note that this probably has issues with Pipe shutdown and - * replacement semantics. I've tried, but no guarantees. - */ -class Pipe::DelayedDelivery: public Thread { - Pipe *pipe; - std::deque< pair > delay_queue; - Mutex delay_lock; - Cond delay_cond; - int flush_count; - bool active_flush; - bool stop_delayed_delivery; - bool delay_dispatching; // we are in fast dispatch now - bool stop_fast_dispatching_flag; // we need to stop fast dispatching - -public: - explicit DelayedDelivery(Pipe *p) - : pipe(p), - delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), - active_flush(false), - stop_delayed_delivery(false), - delay_dispatching(false), - stop_fast_dispatching_flag(false) { } - ~DelayedDelivery() override { - discard(); - } - void *entry() override; - void queue(utime_t release, Message *m) { - Mutex::Locker l(delay_lock); - delay_queue.push_back(make_pair(release, m)); - delay_cond.Signal(); - } - void discard(); - void flush(); - bool is_flushing() { - Mutex::Locker l(delay_lock); - return flush_count > 0 || active_flush; - } - void wait_for_flush() { - Mutex::Locker l(delay_lock); - while (flush_count > 0 || active_flush) - delay_cond.Wait(delay_lock); - } - void stop() { - delay_lock.Lock(); - stop_delayed_delivery = true; - delay_cond.Signal(); - delay_lock.Unlock(); - } - void steal_for_pipe(Pipe *new_owner) { - Mutex::Locker l(delay_lock); - pipe = new_owner; - } - /** - * We need to stop fast dispatching before we need to stop putting - * normal messages into the DispatchQueue. - */ - void stop_fast_dispatching(); -}; - -/************************************** - * Pipe - */ - -Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) - : RefCountedObject(r->cct), - reader_thread(this), - writer_thread(this), - delay_thread(NULL), - msgr(r), - conn_id(r->dispatch_queue.get_id()), - recv_ofs(0), - recv_len(0), - sd(-1), port(0), - peer_type(-1), - pipe_lock("SimpleMessenger::Pipe::pipe_lock"), - state(st), - connection_state(NULL), - reader_running(false), reader_needs_join(false), - reader_dispatching(false), notify_on_dispatch_done(false), - writer_running(false), - in_q(&(r->dispatch_queue)), - send_keepalive(false), - send_keepalive_ack(false), - connect_seq(0), peer_global_seq(0), - out_seq(0), in_seq(0), in_seq_acked(0) { - ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket"); - ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state"); - ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len"); - ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs"); - if (con) { - connection_state = con; - connection_state->reset_pipe(this); - } else { - connection_state = new PipeConnection(msgr->cct, msgr); - connection_state->pipe = get(); - } - - if (randomize_out_seq()) { - lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; - } - - - msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms - if (msgr->timeout == 0) - msgr->timeout = -1; - - recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size; - recv_buf = new char[recv_max_prefetch]; -} - -Pipe::~Pipe() -{ - assert(out_q.empty()); - assert(sent.empty()); - delete delay_thread; - delete[] recv_buf; -} - -void Pipe::handle_ack(uint64_t seq) -{ - lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl; - // trim sent list - while (!sent.empty() && - sent.front()->get_seq() <= seq) { - Message *m = sent.front(); - sent.pop_front(); - lsubdout(msgr->cct, ms, 10) << "reader got ack seq " - << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; - m->put(); - } -} - -void Pipe::start_reader() -{ - assert(pipe_lock.is_locked()); - assert(!reader_running); - if (reader_needs_join) { - reader_thread.join(); - reader_needs_join = false; - } - reader_running = true; - reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); -} - -void Pipe::maybe_start_delay_thread() -{ - if (!delay_thread) { - auto pos = msgr->cct->_conf->get_val("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type)); - if (pos != string::npos) { - lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; - delay_thread = new DelayedDelivery(this); - delay_thread->create("ms_pipe_delay"); - } - } -} - -void Pipe::start_writer() -{ - assert(pipe_lock.is_locked()); - assert(!writer_running); - writer_running = true; - writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes); -} - -void Pipe::join_reader() -{ - if (!reader_running) - return; - cond.Signal(); - pipe_lock.Unlock(); - reader_thread.join(); - pipe_lock.Lock(); - reader_needs_join = false; -} - -void Pipe::DelayedDelivery::discard() -{ - lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl; - Mutex::Locker l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } -} - -void Pipe::DelayedDelivery::flush() -{ - lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl; - Mutex::Locker l(delay_lock); - flush_count = delay_queue.size(); - delay_cond.Signal(); -} - -void *Pipe::DelayedDelivery::entry() -{ - Mutex::Locker locker(delay_lock); - lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl; - - while (!stop_delayed_delivery) { - if (delay_queue.empty()) { - lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; - delay_cond.Wait(delay_lock); - continue; - } - utime_t release = delay_queue.front().first; - Message *m = delay_queue.front().second; - string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type; - if (!flush_count && - (release > ceph_clock_now() && - (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { - lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; - delay_cond.WaitUntil(delay_lock, release); - continue; - } - lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; - delay_queue.pop_front(); - if (flush_count > 0) { - --flush_count; - active_flush = true; - } - if (pipe->in_q->can_fast_dispatch(m)) { - if (!stop_fast_dispatching_flag) { - delay_dispatching = true; - delay_lock.Unlock(); - pipe->in_q->fast_dispatch(m); - delay_lock.Lock(); - delay_dispatching = false; - if (stop_fast_dispatching_flag) { - // we need to let the stopping thread proceed - delay_cond.Signal(); - delay_lock.Unlock(); - delay_lock.Lock(); - } - } - } else { - pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); - } - active_flush = false; - } - lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl; - return NULL; -} - -void Pipe::DelayedDelivery::stop_fast_dispatching() { - Mutex::Locker l(delay_lock); - stop_fast_dispatching_flag = true; - while (delay_dispatching) - delay_cond.Wait(delay_lock); -} - - -int Pipe::accept() -{ - ldout(msgr->cct,10) << "accept" << dendl; - assert(pipe_lock.is_locked()); - assert(state == STATE_ACCEPTING); - - pipe_lock.Unlock(); - - // vars - bufferlist addrs; - entity_addr_t socket_addr; - socklen_t len; - int r; - char banner[strlen(CEPH_BANNER)+1]; - bufferlist addrbl; - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - Pipe *existing = 0; - bufferptr bp; - bufferlist authorizer, authorizer_reply; - bool authorizer_valid; - uint64_t feat_missing; - bool replaced = false; - // this variable denotes if the connection attempt from peer is a hard - // reset or not, it is true if there is an existing connection and the - // connection sequence from peer is equal to zero - bool is_reset_from_peer = false; - CryptoKey session_key; - int removed; // single-use down below - - // this should roughly mirror pseudocode at - // http://ceph.com/wiki/Messaging_protocol - int reply_tag = 0; - uint64_t existing_seq = -1; - - // used for reading in the remote acked seq on connect - uint64_t newly_acked_seq = 0; - - recv_reset(); - - set_socket_options(); - - // announce myself. - r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); - if (r < 0) { - ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; - goto fail_unlocked; - } - - // and my addr - ::encode(msgr->my_inst.addr, addrs, 0); // legacy - - port = msgr->my_inst.addr.get_port(); - - // and peer's socket addr (they might not know their ip) - sockaddr_storage ss; - len = sizeof(ss); - r = ::getpeername(sd, (sockaddr*)&ss, &len); - if (r < 0) { - ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl; - goto fail_unlocked; - } - socket_addr.set_sockaddr((sockaddr*)&ss); - ::encode(socket_addr, addrs, 0); // legacy - - r = tcp_write(addrs.c_str(), addrs.length()); - if (r < 0) { - ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; - goto fail_unlocked; - } - - ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl; - - // identify peer - if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { - ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; - goto fail_unlocked; - } - if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - banner[strlen(CEPH_BANNER)] = 0; - ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; - goto fail_unlocked; - } - { - bufferptr tp(sizeof(ceph_entity_addr)); - addrbl.push_back(std::move(tp)); - } - if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { - ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; - goto fail_unlocked; - } - { - bufferlist::iterator ti = addrbl.begin(); - ::decode(peer_addr, ti); - } - - ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl; - if (peer_addr.is_blank_ip()) { - // peer apparently doesn't know what ip they have; figure it out for them. - int port = peer_addr.get_port(); - peer_addr.u = socket_addr.u; - peer_addr.set_port(port); - ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr - << " (socket is " << socket_addr << ")" << dendl; - } - set_peer_addr(peer_addr); // so that connection_state gets set up - - while (1) { - if (tcp_read((char*)&connect, sizeof(connect)) < 0) { - ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; - goto fail_unlocked; - } - - authorizer.clear(); - if (connect.authorizer_len) { - bp = buffer::create(connect.authorizer_len); - if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) { - ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl; - goto fail_unlocked; - } - authorizer.push_back(std::move(bp)); - authorizer_reply.clear(); - } - - ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq - << " global_seq " << connect.global_seq - << dendl; - - msgr->lock.Lock(); // FIXME - pipe_lock.Lock(); - if (msgr->dispatch_queue.stop) - goto shutting_down; - if (state != STATE_ACCEPTING) { - goto shutting_down; - } - - // note peer's type, flags - set_peer_type(connect.host_type); - policy = msgr->get_policy(connect.host_type); - ldout(msgr->cct,10) << "accept of host_type " << connect.host_type - << ", policy.lossy=" << policy.lossy - << " policy.server=" << policy.server - << " policy.standby=" << policy.standby - << " policy.resetcheck=" << policy.resetcheck - << dendl; - - memset(&reply, 0, sizeof(reply)); - reply.protocol_version = msgr->get_proto_version(peer_type, false); - msgr->lock.Unlock(); - - // mismatch? - ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version - << ", their proto " << connect.protocol_version << dendl; - if (connect.protocol_version != reply.protocol_version) { - reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - goto reply; - } - - // require signatures for cephx? - if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { - if (peer_type == CEPH_ENTITY_TYPE_OSD || - peer_type == CEPH_ENTITY_TYPE_MDS) { - if (msgr->cct->_conf->cephx_require_signatures || - msgr->cct->_conf->cephx_cluster_require_signatures) { - ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; - policy.features_required |= CEPH_FEATURE_MSG_AUTH; - } - } else { - if (msgr->cct->_conf->cephx_require_signatures || - msgr->cct->_conf->cephx_service_require_signatures) { - ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl; - policy.features_required |= CEPH_FEATURE_MSG_AUTH; - } - } - } - - feat_missing = policy.features_required & ~(uint64_t)connect.features; - if (feat_missing) { - ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; - reply.tag = CEPH_MSGR_TAG_FEATURES; - goto reply; - } - - // Check the authorizer. If not good, bail out. - - pipe_lock.Unlock(); - - if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, - authorizer_reply, authorizer_valid, session_key) || - !authorizer_valid) { - ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; - pipe_lock.Lock(); - if (state != STATE_ACCEPTING) - goto shutting_down_msgr_unlocked; - reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; - session_security.reset(); - goto reply; - } - - // We've verified the authorizer for this pipe, so set up the session security structure. PLR - - ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; - - retry_existing_lookup: - msgr->lock.Lock(); - pipe_lock.Lock(); - if (msgr->dispatch_queue.stop) - goto shutting_down; - if (state != STATE_ACCEPTING) - goto shutting_down; - - // existing? - existing = msgr->_lookup_pipe(peer_addr); - if (existing) { - existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) - if (existing->reader_dispatching) { - /** we need to wait, or we can deadlock if downstream - * fast_dispatchers are (naughtily!) waiting on resources - * held by somebody trying to make use of the SimpleMessenger lock. - * So drop locks, wait, and retry. It just looks like a slow network - * to everybody else. - * - * We take a ref to existing here since it might get reaped before we - * wake up (see bug #15870). We can be confident that it lived until - * locked it since we held the msgr lock from _lookup_pipe through to - * locking existing->lock and checking reader_dispatching. - */ - existing->get(); - pipe_lock.Unlock(); - msgr->lock.Unlock(); - existing->notify_on_dispatch_done = true; - while (existing->reader_dispatching) - existing->cond.Wait(existing->pipe_lock); - existing->pipe_lock.Unlock(); - existing->put(); - existing = nullptr; - goto retry_existing_lookup; - } - - if (connect.global_seq < existing->peer_global_seq) { - ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq - << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; - reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; - reply.global_seq = existing->peer_global_seq; // so we can send it below.. - existing->pipe_lock.Unlock(); - msgr->lock.Unlock(); - goto reply; - } else { - ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq - << " <= " << connect.global_seq << ", looks ok" << dendl; - } - - if (existing->policy.lossy) { - ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy=" - << policy.lossy << ")" << dendl; - existing->was_session_reset(); - goto replace; - } - - ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq - << " vs existing " << existing->connect_seq - << " state " << existing->get_state_name() << dendl; - - if (connect.connect_seq == 0 && existing->connect_seq > 0) { - ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl; - // this is a hard reset from peer - is_reset_from_peer = true; - if (policy.resetcheck) - existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s - goto replace; - } - - if (connect.connect_seq < existing->connect_seq) { - // old attempt, or we sent READY but they didn't get it. - ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; - goto retry_session; - } - - if (connect.connect_seq == existing->connect_seq) { - // if the existing connection successfully opened, and/or - // subsequently went to standby, then the peer should bump - // their connect_seq and retry: this is not a connection race - // we need to resolve here. - if (existing->state == STATE_OPEN || - existing->state == STATE_STANDBY) { - ldout(msgr->cct,10) << "accept connection race, existing " << existing - << ".cseq " << existing->connect_seq - << " == " << connect.connect_seq - << ", OPEN|STANDBY, RETRY_SESSION" << dendl; - goto retry_session; - } - - // connection race? - if (peer_addr < msgr->my_inst.addr || - existing->policy.server) { - // incoming wins - ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl; - if (!(existing->state == STATE_CONNECTING || - existing->state == STATE_WAIT)) - lderr(msgr->cct) << "accept race bad state, would replace, existing=" - << existing->get_state_name() - << " " << existing << ".cseq=" << existing->connect_seq - << " == " << connect.connect_seq - << dendl; - assert(existing->state == STATE_CONNECTING || - existing->state == STATE_WAIT); - goto replace; - } else { - // our existing outgoing wins - ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > msgr->my_inst.addr); - if (!(existing->state == STATE_CONNECTING)) - lderr(msgr->cct) << "accept race bad state, would send wait, existing=" - << existing->get_state_name() - << " " << existing << ".cseq=" << existing->connect_seq - << " == " << connect.connect_seq - << dendl; - assert(existing->state == STATE_CONNECTING); - // make sure our outgoing connection will follow through - existing->_send_keepalive(); - reply.tag = CEPH_MSGR_TAG_WAIT; - existing->pipe_lock.Unlock(); - msgr->lock.Unlock(); - goto reply; - } - } - - assert(connect.connect_seq > existing->connect_seq); - assert(connect.global_seq >= existing->peer_global_seq); - if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other - existing->connect_seq == 0) { - ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq - << ", " << existing << ".cseq = " << existing->connect_seq - << "), sending RESETSESSION" << dendl; - reply.tag = CEPH_MSGR_TAG_RESETSESSION; - msgr->lock.Unlock(); - existing->pipe_lock.Unlock(); - goto reply; - } - - // reconnect - ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq - << " > " << existing->connect_seq << dendl; - goto replace; - } // existing - else if (connect.connect_seq > 0) { - // we reset, and they are opening a new session - ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; - msgr->lock.Unlock(); - reply.tag = CEPH_MSGR_TAG_RESETSESSION; - goto reply; - } else { - // new session - ldout(msgr->cct,10) << "accept new session" << dendl; - existing = NULL; - goto open; - } - ceph_abort(); - - retry_session: - assert(existing->pipe_lock.is_locked()); - assert(pipe_lock.is_locked()); - reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; - reply.connect_seq = existing->connect_seq + 1; - existing->pipe_lock.Unlock(); - msgr->lock.Unlock(); - goto reply; - - reply: - assert(pipe_lock.is_locked()); - reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; - reply.authorizer_len = authorizer_reply.length(); - pipe_lock.Unlock(); - r = tcp_write((char*)&reply, sizeof(reply)); - if (r < 0) - goto fail_unlocked; - if (reply.authorizer_len) { - r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (r < 0) - goto fail_unlocked; - } - } - - replace: - assert(existing->pipe_lock.is_locked()); - assert(pipe_lock.is_locked()); - // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence - if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { - reply_tag = CEPH_MSGR_TAG_SEQ; - existing_seq = existing->in_seq; - } - ldout(msgr->cct,10) << "accept replacing " << existing << dendl; - existing->stop(); - existing->unregister_pipe(); - replaced = true; - - if (existing->policy.lossy) { - // disconnect from the Connection - assert(existing->connection_state); - if (existing->connection_state->clear_pipe(existing)) - msgr->dispatch_queue.queue_reset(existing->connection_state.get()); - } else { - // queue a reset on the new connection, which we're dumping for the old - msgr->dispatch_queue.queue_reset(connection_state.get()); - - // drop my Connection, and take a ref to the existing one. do not - // clear existing->connection_state, since read_message and - // write_message both dereference it without pipe_lock. - connection_state = existing->connection_state; - - // make existing Connection reference us - connection_state->reset_pipe(this); - - if (existing->delay_thread) { - existing->delay_thread->steal_for_pipe(this); - delay_thread = existing->delay_thread; - existing->delay_thread = NULL; - delay_thread->flush(); - } - - // steal incoming queue - uint64_t replaced_conn_id = conn_id; - conn_id = existing->conn_id; - existing->conn_id = replaced_conn_id; - - // reset the in_seq if this is a hard reset from peer, - // otherwise we respect our original connection's value - in_seq = is_reset_from_peer ? 0 : existing->in_seq; - in_seq_acked = in_seq; - - // steal outgoing queue and out_seq - existing->requeue_sent(); - out_seq = existing->out_seq; - ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl; - for (map >::iterator p = existing->out_q.begin(); - p != existing->out_q.end(); - ++p) - out_q[p->first].splice(out_q[p->first].begin(), p->second); - } - existing->stop_and_wait(); - existing->pipe_lock.Unlock(); - - open: - // open - assert(pipe_lock.is_locked()); - connect_seq = connect.connect_seq + 1; - peer_global_seq = connect.global_seq; - assert(state == STATE_ACCEPTING); - state = STATE_OPEN; - ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; - - // send READY reply - reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); - reply.features = policy.features_supported; - reply.global_seq = msgr->get_global_seq(); - reply.connect_seq = connect_seq; - reply.flags = 0; - reply.authorizer_len = authorizer_reply.length(); - if (policy.lossy) - reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; - - connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); - ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl; - - session_security.reset( - get_auth_session_handler(msgr->cct, - connect.authorizer_protocol, - session_key, - connection_state->get_features())); - - // notify - msgr->dispatch_queue.queue_accept(connection_state.get()); - msgr->ms_deliver_handle_fast_accept(connection_state.get()); - - // ok! - if (msgr->dispatch_queue.stop) - goto shutting_down; - removed = msgr->accepting_pipes.erase(this); - assert(removed == 1); - register_pipe(); - msgr->lock.Unlock(); - pipe_lock.Unlock(); - - r = tcp_write((char*)&reply, sizeof(reply)); - if (r < 0) { - goto fail_registered; - } - - if (reply.authorizer_len) { - r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (r < 0) { - goto fail_registered; - } - } - - if (reply_tag == CEPH_MSGR_TAG_SEQ) { - if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { - ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; - goto fail_registered; - } - if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { - ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; - goto fail_registered; - } - } - - pipe_lock.Lock(); - discard_requeued_up_to(newly_acked_seq); - if (state != STATE_CLOSED) { - ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl; - start_writer(); - } - ldout(msgr->cct,20) << "accept done" << dendl; - - maybe_start_delay_thread(); - - return 0; // success. - - fail_registered: - ldout(msgr->cct, 10) << "accept fault after register" << dendl; - - if (msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - fail_unlocked: - pipe_lock.Lock(); - if (state != STATE_CLOSED) { - bool queued = is_queued(); - ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl; - if (queued) { - state = policy.server ? STATE_STANDBY : STATE_CONNECTING; - } else if (replaced) { - state = STATE_STANDBY; - } else { - state = STATE_CLOSED; - state_closed = true; - } - fault(); - if (queued || replaced) - start_writer(); - } - return -1; - - shutting_down: - msgr->lock.Unlock(); - shutting_down_msgr_unlocked: - assert(pipe_lock.is_locked()); - - if (msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - state = STATE_CLOSED; - state_closed = true; - fault(); - return -1; -} - -void Pipe::set_socket_options() -{ - // disable Nagle algorithm? - if (msgr->cct->_conf->ms_tcp_nodelay) { - int flag = 1; - int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); - if (r < 0) { - r = -errno; - ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: " - << cpp_strerror(r) << dendl; - } - } - if (msgr->cct->_conf->ms_tcp_rcvbuf) { - int size = msgr->cct->_conf->ms_tcp_rcvbuf; - int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); - if (r < 0) { - r = -errno; - ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size - << ": " << cpp_strerror(r) << dendl; - } - } - - // block ESIGPIPE -#ifdef CEPH_USE_SO_NOSIGPIPE - int val = 1; - int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); - if (r) { - r = -errno; - ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: " - << cpp_strerror(r) << dendl; - } -#endif - -#ifdef SO_PRIORITY - int prio = msgr->get_socket_priority(); - if (prio >= 0) { - int r = -1; -#ifdef IPTOS_CLASS_CS6 - int iptos = IPTOS_CLASS_CS6; - int addr_family = 0; - if (!peer_addr.is_blank_ip()) { - addr_family = peer_addr.get_family(); - } else { - addr_family = msgr->get_myaddr().get_family(); - } - switch (addr_family) { - case AF_INET: - r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos)); - break; - case AF_INET6: - r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos)); - break; - default: - lderr(msgr->cct) << "couldn't set ToS of unknown family (" - << addr_family << ")" - << " to " << iptos << dendl; - return; - } - if (r < 0) { - r = -errno; - ldout(msgr->cct,0) << "couldn't set TOS to " << iptos - << ": " << cpp_strerror(r) << dendl; - } -#endif - // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0. - // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT - // We need to call setsockopt(SO_PRIORITY) after it. - r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio)); - if (r < 0) { - r = -errno; - ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio - << ": " << cpp_strerror(r) << dendl; - } - } -#endif -} - -int Pipe::connect() -{ - bool got_bad_auth = false; - - ldout(msgr->cct,10) << "connect " << connect_seq << dendl; - assert(pipe_lock.is_locked()); - - __u32 cseq = connect_seq; - __u32 gseq = msgr->get_global_seq(); - - // stop reader thread - join_reader(); - - pipe_lock.Unlock(); - - char tag = -1; - int rc = -1; - struct msghdr msg; - struct iovec msgvec[2]; - int msglen; - char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy - entity_addr_t paddr; - entity_addr_t peer_addr_for_me, socket_addr; - AuthAuthorizer *authorizer = NULL; - bufferlist addrbl, myaddrbl; - const md_config_t *conf = msgr->cct->_conf; - - // close old socket. this is safe because we stopped the reader thread above. - if (sd >= 0) - ::close(sd); - - // create socket? - sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0); - if (sd < 0) { - rc = -errno; - lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl; - goto fail; - } - - recv_reset(); - - set_socket_options(); - - { - entity_addr_t addr2bind = msgr->get_myaddr(); - if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) { - addr2bind.set_port(0); - int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len()); - if (r < 0) { - ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl; - goto fail; - } - } - } - - // connect! - ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; - rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len()); - if (rc < 0) { - int stored_errno = errno; - ldout(msgr->cct,2) << "connect error " << peer_addr - << ", " << cpp_strerror(stored_errno) << dendl; - if (stored_errno == ECONNREFUSED) { - ldout(msgr->cct, 2) << "connection refused!" << dendl; - msgr->dispatch_queue.queue_refused(connection_state.get()); - } - goto fail; - } - - // verify banner - // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. - rc = tcp_read((char*)&banner, strlen(CEPH_BANNER)); - if (rc < 0) { - ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl; - goto fail; - } - if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl; - goto fail; - } - - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = banner; - msgvec[0].iov_len = strlen(CEPH_BANNER); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; - rc = do_sendmsg(&msg, msglen); - if (rc < 0) { - ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl; - goto fail; - } - - // identify peer - { -#if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__) - bufferptr p(sizeof(ceph_entity_addr) * 2); -#else - int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage); - bufferptr p(wirelen * 2); -#endif - addrbl.push_back(std::move(p)); - } - rc = tcp_read(addrbl.c_str(), addrbl.length()); - if (rc < 0) { - ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl; - goto fail; - } - try { - bufferlist::iterator p = addrbl.begin(); - ::decode(paddr, p); - ::decode(peer_addr_for_me, p); - } - catch (buffer::error& e) { - ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what() - << dendl; - goto fail; - } - port = peer_addr_for_me.get_port(); - - ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; - if (peer_addr != paddr) { - if (paddr.is_blank_ip() && - peer_addr.get_port() == paddr.get_port() && - peer_addr.get_nonce() == paddr.get_nonce()) { - ldout(msgr->cct,0) << "connect claims to be " - << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; - } else { - ldout(msgr->cct,10) << "connect claims to be " - << paddr << " not " << peer_addr << dendl; - goto fail; - } - } - - ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl; - - msgr->learned_addr(peer_addr_for_me); - - ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy - - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = myaddrbl.c_str(); - msgvec[0].iov_len = myaddrbl.length(); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; - rc = do_sendmsg(&msg, msglen); - if (rc < 0) { - ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl; - goto fail; - } - ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl; - - - while (1) { - delete authorizer; - authorizer = msgr->get_authorizer(peer_type, false); - bufferlist authorizer_reply; - - ceph_msg_connect connect; - connect.features = policy.features_supported; - connect.host_type = msgr->get_myinst().name.type(); - connect.global_seq = gseq; - connect.connect_seq = cseq; - connect.protocol_version = msgr->get_proto_version(peer_type, true); - connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; - connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; - if (authorizer) - ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len - << " protocol=" << connect.authorizer_protocol << dendl; - connect.flags = 0; - if (policy.lossy) - connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&connect; - msgvec[0].iov_len = sizeof(connect); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; - if (authorizer) { - msgvec[1].iov_base = authorizer->bl.c_str(); - msgvec[1].iov_len = authorizer->bl.length(); - msg.msg_iovlen++; - msglen += msgvec[1].iov_len; - } - - ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq - << " proto=" << connect.protocol_version << dendl; - rc = do_sendmsg(&msg, msglen); - if (rc < 0) { - ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl; - goto fail; - } - - ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl; - ceph_msg_connect_reply reply; - rc = tcp_read((char*)&reply, sizeof(reply)); - if (rc < 0) { - ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl; - goto fail; - } - - ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag - << " connect_seq " << reply.connect_seq - << " global_seq " << reply.global_seq - << " proto " << reply.protocol_version - << " flags " << (int)reply.flags - << " features " << reply.features - << dendl; - - authorizer_reply.clear(); - - if (reply.authorizer_len) { - ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; - bufferptr bp = buffer::create(reply.authorizer_len); - rc = tcp_read(bp.c_str(), reply.authorizer_len); - if (rc < 0) { - ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl; - goto fail; - } - authorizer_reply.push_back(bp); - } - - if (authorizer) { - bufferlist::iterator iter = authorizer_reply.begin(); - if (!authorizer->verify_reply(iter)) { - ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl; - goto fail; - } - } - - if (conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - pipe_lock.Lock(); - if (state != STATE_CONNECTING) { - ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl; - goto stop_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_FEATURES) { - ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex - << connect.features << " < peer " << reply.features - << " missing " << (reply.features & ~policy.features_supported) - << std::dec << dendl; - goto fail_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { - ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version - << " != " << reply.protocol_version << dendl; - goto fail_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { - ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl; - if (got_bad_auth) - goto stop_locked; - got_bad_auth = true; - pipe_lock.Unlock(); - delete authorizer; - authorizer = msgr->get_authorizer(peer_type, true); // try harder - continue; - } - if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { - ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl; - was_session_reset(); - cseq = 0; - pipe_lock.Unlock(); - continue; - } - if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - gseq = msgr->get_global_seq(reply.global_seq); - ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq - << " chose new " << gseq << dendl; - pipe_lock.Unlock(); - continue; - } - if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { - assert(reply.connect_seq > connect_seq); - ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq - << " -> " << reply.connect_seq << dendl; - cseq = connect_seq = reply.connect_seq; - pipe_lock.Unlock(); - continue; - } - - if (reply.tag == CEPH_MSGR_TAG_WAIT) { - ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl; - state = STATE_WAIT; - goto stop_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_READY || - reply.tag == CEPH_MSGR_TAG_SEQ) { - uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features; - if (feat_missing) { - ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; - goto fail_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_SEQ) { - ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; - uint64_t newly_acked_seq = 0; - rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)); - if (rc < 0) { - ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl; - goto fail_locked; - } - ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq - << " vs out_seq " << out_seq << dendl; - while (newly_acked_seq > out_seq) { - Message *m = _get_next_outgoing(); - assert(m); - ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq() - << " " << *m << dendl; - assert(m->get_seq() <= newly_acked_seq); - m->put(); - ++out_seq; - } - if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) { - ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; - goto fail_locked; - } - } - - // hooray! - peer_global_seq = reply.global_seq; - policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY; - state = STATE_OPEN; - connect_seq = cseq + 1; - assert(connect_seq == reply.connect_seq); - backoff = utime_t(); - connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); - ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy - << ", features " << connection_state->get_features() << dendl; - - - // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the - // connection. PLR - - if (authorizer != NULL) { - session_security.reset( - get_auth_session_handler(msgr->cct, - authorizer->protocol, - authorizer->session_key, - connection_state->get_features())); - } else { - // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR - session_security.reset(); - } - - msgr->dispatch_queue.queue_connect(connection_state.get()); - msgr->ms_deliver_handle_fast_connect(connection_state.get()); - - if (!reader_running) { - ldout(msgr->cct,20) << "connect starting reader" << dendl; - start_reader(); - } - maybe_start_delay_thread(); - delete authorizer; - return 0; - } - - // protocol error - ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl; - goto fail_locked; - } - - fail: - if (conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - pipe_lock.Lock(); - fail_locked: - if (state == STATE_CONNECTING) - fault(); - else - ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name() - << " != connecting, stopping" << dendl; - - stop_locked: - delete authorizer; - return rc; -} - -void Pipe::register_pipe() -{ - ldout(msgr->cct,10) << "register_pipe" << dendl; - assert(msgr->lock.is_locked()); - Pipe *existing = msgr->_lookup_pipe(peer_addr); - assert(existing == NULL); - msgr->rank_pipe[peer_addr] = this; -} - -void Pipe::unregister_pipe() -{ - assert(msgr->lock.is_locked()); - ceph::unordered_map::iterator p = msgr->rank_pipe.find(peer_addr); - if (p != msgr->rank_pipe.end() && p->second == this) { - ldout(msgr->cct,10) << "unregister_pipe" << dendl; - msgr->rank_pipe.erase(p); - } else { - ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; - msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. - } -} - -void Pipe::join() -{ - ldout(msgr->cct, 20) << "join" << dendl; - if (writer_thread.is_started()) - writer_thread.join(); - if (reader_thread.is_started()) - reader_thread.join(); - if (delay_thread) { - ldout(msgr->cct, 20) << "joining delay_thread" << dendl; - delay_thread->stop(); - delay_thread->join(); - } -} - -void Pipe::requeue_sent() -{ - if (sent.empty()) - return; - - list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; - while (!sent.empty()) { - Message *m = sent.back(); - sent.pop_back(); - ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq - << " (" << m->get_seq() << ")" << dendl; - rq.push_front(m); - out_seq--; - } -} - -void Pipe::discard_requeued_up_to(uint64_t seq) -{ - ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; - if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) - return; - list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; - while (!rq.empty()) { - Message *m = rq.front(); - if (m->get_seq() == 0 || m->get_seq() > seq) - break; - ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq - << " <= " << seq << ", discarding" << dendl; - m->put(); - rq.pop_front(); - out_seq++; - } - if (rq.empty()) - out_q.erase(CEPH_MSG_PRIO_HIGHEST); -} - -/* - * Tears down the Pipe's message queues, and removes them from the DispatchQueue - * Must hold pipe_lock prior to calling. - */ -void Pipe::discard_out_queue() -{ - ldout(msgr->cct,10) << "discard_queue" << dendl; - - for (list::iterator p = sent.begin(); p != sent.end(); ++p) { - ldout(msgr->cct,20) << " discard " << *p << dendl; - (*p)->put(); - } - sent.clear(); - for (map >::iterator p = out_q.begin(); p != out_q.end(); ++p) - for (list::iterator r = p->second.begin(); r != p->second.end(); ++r) { - ldout(msgr->cct,20) << " discard " << *r << dendl; - (*r)->put(); - } - out_q.clear(); -} - -void Pipe::fault(bool onread) -{ - const md_config_t *conf = msgr->cct->_conf; - assert(pipe_lock.is_locked()); - cond.Signal(); - - if (onread && state == STATE_CONNECTING) { - ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl; - return; - } - - ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl; - - if (state == STATE_CLOSED || - state == STATE_CLOSING) { - ldout(msgr->cct,10) << "fault already closed|closing" << dendl; - if (connection_state->clear_pipe(this)) - msgr->dispatch_queue.queue_reset(connection_state.get()); - return; - } - - shutdown_socket(); - - // lossy channel? - if (policy.lossy && state != STATE_CONNECTING) { - ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl; - - // disconnect from Connection, and mark it failed. future messages - // will be dropped. - assert(connection_state); - stop(); - bool cleared = connection_state->clear_pipe(this); - - // crib locks, blech. note that Pipe is now STATE_CLOSED and the - // rank_pipe entry is ignored by others. - pipe_lock.Unlock(); - - if (conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - msgr->lock.Lock(); - pipe_lock.Lock(); - unregister_pipe(); - msgr->lock.Unlock(); - - if (delay_thread) - delay_thread->discard(); - in_q->discard_queue(conn_id); - discard_out_queue(); - if (cleared) - msgr->dispatch_queue.queue_reset(connection_state.get()); - return; - } - - // queue delayed items immediately - if (delay_thread) - delay_thread->flush(); - - // requeue sent items - requeue_sent(); - - if (policy.standby && !is_queued()) { - ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; - state = STATE_STANDBY; - return; - } - - if (state != STATE_CONNECTING) { - if (policy.server) { - ldout(msgr->cct,0) << "fault, server, going to standby" << dendl; - state = STATE_STANDBY; - } else { - ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl; - connect_seq++; - state = STATE_CONNECTING; - } - backoff = utime_t(); - } else if (backoff == utime_t()) { - ldout(msgr->cct,0) << "fault" << dendl; - backoff.set_from_double(conf->ms_initial_backoff); - } else { - ldout(msgr->cct,10) << "fault waiting " << backoff << dendl; - cond.WaitInterval(pipe_lock, backoff); - backoff += backoff; - if (backoff > conf->ms_max_backoff) - backoff.set_from_double(conf->ms_max_backoff); - ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl; - } -} - -int Pipe::randomize_out_seq() -{ - if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) { - // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error - // here. We'll check it on the call. PLR - int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq)); - out_seq &= SEQ_MASK; - lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl; - return seq_error; - } else { - // previously, seq #'s always started at 0. - out_seq = 0; - return 0; - } -} - -void Pipe::was_session_reset() -{ - assert(pipe_lock.is_locked()); - - ldout(msgr->cct,10) << "was_session_reset" << dendl; - in_q->discard_queue(conn_id); - if (delay_thread) - delay_thread->discard(); - discard_out_queue(); - - msgr->dispatch_queue.queue_remote_reset(connection_state.get()); - - if (randomize_out_seq()) { - lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; - } - - in_seq = 0; - connect_seq = 0; -} - -void Pipe::stop() -{ - ldout(msgr->cct,10) << "stop" << dendl; - assert(pipe_lock.is_locked()); - state = STATE_CLOSED; - state_closed = true; - cond.Signal(); - shutdown_socket(); -} - -void Pipe::stop_and_wait() -{ - assert(pipe_lock.is_locked_by_me()); - if (state != STATE_CLOSED) - stop(); - - if (msgr->cct->_conf->ms_inject_internal_delays) { - ldout(msgr->cct, 10) << __func__ << " sleep for " - << msgr->cct->_conf->ms_inject_internal_delays - << dendl; - utime_t t; - t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - - if (delay_thread) { - pipe_lock.Unlock(); - delay_thread->stop_fast_dispatching(); - pipe_lock.Lock(); - } - while (reader_running && - reader_dispatching) - cond.Wait(pipe_lock); -} - -/* read msgs from socket. - * also, server. - */ -void Pipe::reader() -{ - pipe_lock.Lock(); - - if (state == STATE_ACCEPTING) { - accept(); - assert(pipe_lock.is_locked()); - } - - // loop. - while (state != STATE_CLOSED && - state != STATE_CONNECTING) { - assert(pipe_lock.is_locked()); - - // sleep if (re)connecting - if (state == STATE_STANDBY) { - ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl; - cond.Wait(pipe_lock); - continue; - } - - // get a reference to the AuthSessionHandler while we have the pipe_lock - ceph::shared_ptr auth_handler = session_security; - - pipe_lock.Unlock(); - - char tag = -1; - ldout(msgr->cct,20) << "reader reading tag..." << dendl; - if (tcp_read((char*)&tag, 1) < 0) { - pipe_lock.Lock(); - ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl; - fault(true); - continue; - } - - if (tag == CEPH_MSGR_TAG_KEEPALIVE) { - ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl; - pipe_lock.Lock(); - connection_state->set_last_keepalive(ceph_clock_now()); - continue; - } - if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { - ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl; - ceph_timespec t; - int rc = tcp_read((char*)&t, sizeof(t)); - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " - << cpp_strerror(errno) << dendl; - fault(true); - } else { - send_keepalive_ack = true; - keepalive_ack_stamp = utime_t(t); - ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp - << dendl; - connection_state->set_last_keepalive(ceph_clock_now()); - cond.Signal(); - } - continue; - } - if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { - ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl; - struct ceph_timespec t; - int rc = tcp_read((char*)&t, sizeof(t)); - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; - fault(true); - } else { - connection_state->set_last_keepalive_ack(utime_t(t)); - } - continue; - } - - // open ... - if (tag == CEPH_MSGR_TAG_ACK) { - ldout(msgr->cct,20) << "reader got ACK" << dendl; - ceph_le64 seq; - int rc = tcp_read((char*)&seq, sizeof(seq)); - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl; - fault(true); - } else if (state != STATE_CLOSED) { - handle_ack(seq); - } - continue; - } - - else if (tag == CEPH_MSGR_TAG_MSG) { - ldout(msgr->cct,20) << "reader got MSG" << dendl; - Message *m = 0; - int r = read_message(&m, auth_handler.get()); - - pipe_lock.Lock(); - - if (!m) { - if (r < 0) - fault(true); - continue; - } - - m->trace.event("pipe read message"); - - if (state == STATE_CLOSED || - state == STATE_CONNECTING) { - in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - continue; - } - - // check received seq#. if it is old, drop the message. - // note that incoming messages may skip ahead. this is convenient for the client - // side queueing because messages can't be renumbered, but the (kernel) client will - // occasionally pull a message out of the sent queue to send elsewhere. in that case - // it doesn't matter if we "got" it or not. - if (m->get_seq() <= in_seq) { - ldout(msgr->cct,0) << "reader got old message " - << m->get_seq() << " <= " << in_seq << " " << m << " " << *m - << ", discarding" << dendl; - in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && - msgr->cct->_conf->ms_die_on_old_message) - assert(0 == "old msgs despite reconnect_seq feature"); - continue; - } - if (m->get_seq() > in_seq + 1) { - ldout(msgr->cct,0) << "reader missed message? skipped from seq " - << in_seq << " to " << m->get_seq() << dendl; - if (msgr->cct->_conf->ms_die_on_skipped_message) - assert(0 == "skipped incoming seq"); - } - - m->set_connection(connection_state.get()); - - // note last received message. - in_seq = m->get_seq(); - - cond.Signal(); // wake up writer, to ack this - - ldout(msgr->cct,10) << "reader got message " - << m->get_seq() << " " << m << " " << *m - << dendl; - in_q->fast_preprocess(m); - - if (delay_thread) { - utime_t release; - if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { - release = m->get_recv_stamp(); - release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; - lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; - } - delay_thread->queue(release, m); - } else { - if (in_q->can_fast_dispatch(m)) { - reader_dispatching = true; - pipe_lock.Unlock(); - in_q->fast_dispatch(m); - pipe_lock.Lock(); - reader_dispatching = false; - if (state == STATE_CLOSED || - notify_on_dispatch_done) { // there might be somebody waiting - notify_on_dispatch_done = false; - cond.Signal(); - } - } else { - in_q->enqueue(m, m->get_priority(), conn_id); - } - } - } - - else if (tag == CEPH_MSGR_TAG_CLOSE) { - ldout(msgr->cct,20) << "reader got CLOSE" << dendl; - pipe_lock.Lock(); - if (state == STATE_CLOSING) { - state = STATE_CLOSED; - state_closed = true; - } else { - state = STATE_CLOSING; - } - cond.Signal(); - break; - } - else { - ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl; - pipe_lock.Lock(); - fault(true); - } - } - - - // reap? - reader_running = false; - reader_needs_join = true; - unlock_maybe_reap(); - ldout(msgr->cct,10) << "reader done" << dendl; -} - -/* write msgs to socket. - * also, client. - */ -void Pipe::writer() -{ - pipe_lock.Lock(); - while (state != STATE_CLOSED) {// && state != STATE_WAIT) { - ldout(msgr->cct,10) << "writer: state = " << get_state_name() - << " policy.server=" << policy.server << dendl; - - // standby? - if (is_queued() && state == STATE_STANDBY && !policy.server) - state = STATE_CONNECTING; - - // connect? - if (state == STATE_CONNECTING) { - assert(!policy.server); - connect(); - continue; - } - - if (state == STATE_CLOSING) { - // write close tag - ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl; - char tag = CEPH_MSGR_TAG_CLOSE; - state = STATE_CLOSED; - state_closed = true; - pipe_lock.Unlock(); - if (sd >= 0) { - // we can ignore return value, actually; we don't care if this succeeds. - int r = ::write(sd, &tag, 1); - (void)r; - } - pipe_lock.Lock(); - continue; - } - - if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && - (is_queued() || in_seq > in_seq_acked)) { - - // keepalive? - if (send_keepalive) { - int rc; - if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { - pipe_lock.Unlock(); - rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2, - ceph_clock_now()); - } else { - pipe_lock.Unlock(); - rc = write_keepalive(); - } - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "writer couldn't write keepalive[2], " - << cpp_strerror(errno) << dendl; - fault(); - continue; - } - send_keepalive = false; - } - if (send_keepalive_ack) { - utime_t t = keepalive_ack_stamp; - pipe_lock.Unlock(); - int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t); - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl; - fault(); - continue; - } - send_keepalive_ack = false; - } - - // send ack? - if (in_seq > in_seq_acked) { - uint64_t send_seq = in_seq; - pipe_lock.Unlock(); - int rc = write_ack(send_seq); - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl; - fault(); - continue; - } - in_seq_acked = send_seq; - } - - // grab outgoing message - Message *m = _get_next_outgoing(); - if (m) { - m->set_seq(++out_seq); - if (!policy.lossy) { - // put on sent list - sent.push_back(m); - m->get(); - } - - // associate message with Connection (for benefit of encode_payload) - m->set_connection(connection_state.get()); - - uint64_t features = connection_state->get_features(); - - if (m->empty_payload()) - ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features - << " " << m << " " << *m << dendl; - else - ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features - << " " << m << " " << *m << dendl; - - // encode and copy out of *m - m->encode(features, msgr->crcflags); - - // prepare everything - const ceph_msg_header& header = m->get_header(); - const ceph_msg_footer& footer = m->get_footer(); - - // Now that we have all the crcs calculated, handle the - // digital signature for the message, if the pipe has session - // security set up. Some session security options do not - // actually calculate and check the signature, but they should - // handle the calls to sign_message and check_signature. PLR - if (session_security.get() == NULL) { - ldout(msgr->cct, 20) << "writer no session security" << dendl; - } else { - if (session_security->sign_message(m)) { - ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq - << "): sig = " << footer.sig << dendl; - } else { - ldout(msgr->cct, 20) << "writer signed seq # " << header.seq - << "): sig = " << footer.sig << dendl; - } - } - - bufferlist blist = m->get_payload(); - blist.append(m->get_middle()); - blist.append(m->get_data()); - - pipe_lock.Unlock(); - - m->trace.event("pipe writing message"); - - ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl; - int rc = write_message(header, footer, blist); - - pipe_lock.Lock(); - if (rc < 0) { - ldout(msgr->cct,1) << "writer error sending " << m << ", " - << cpp_strerror(errno) << dendl; - fault(); - } - m->put(); - } - continue; - } - - // wait - ldout(msgr->cct,20) << "writer sleeping" << dendl; - cond.Wait(pipe_lock); - } - - ldout(msgr->cct,20) << "writer finishing" << dendl; - - // reap? - writer_running = false; - unlock_maybe_reap(); - ldout(msgr->cct,10) << "writer done" << dendl; -} - -void Pipe::unlock_maybe_reap() -{ - if (!reader_running && !writer_running) { - shutdown_socket(); - pipe_lock.Unlock(); - if (delay_thread && delay_thread->is_flushing()) { - delay_thread->wait_for_flush(); - } - msgr->queue_reap(this); - } else { - pipe_lock.Unlock(); - } -} - -static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) -{ - // create a buffer to read into that matches the data alignment - unsigned left = len; - if (off & ~CEPH_PAGE_MASK) { - // head - unsigned head = 0; - head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); - data.push_back(buffer::create(head)); - left -= head; - } - unsigned middle = left & CEPH_PAGE_MASK; - if (middle > 0) { - data.push_back(buffer::create_page_aligned(middle)); - left -= middle; - } - if (left) { - data.push_back(buffer::create(left)); - } -} - -int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) -{ - int ret = -1; - // envelope - //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl; - - ceph_msg_header header; - ceph_msg_footer footer; - __u32 header_crc = 0; - - if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { - if (tcp_read((char*)&header, sizeof(header)) < 0) - return -1; - if (msgr->crcflags & MSG_CRC_HEADER) { - header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); - } - } else { - ceph_msg_header_old oldheader; - if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0) - return -1; - // this is fugly - memcpy(&header, &oldheader, sizeof(header)); - header.src = oldheader.src.name; - header.reserved = oldheader.reserved; - if (msgr->crcflags & MSG_CRC_HEADER) { - header.crc = oldheader.crc; - header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); - } - } - - ldout(msgr->cct,20) << "reader got envelope type=" << header.type - << " src " << entity_name_t(header.src) - << " front=" << header.front_len - << " data=" << header.data_len - << " off " << header.data_off - << dendl; - - // verify header crc - if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) { - ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; - return -1; - } - - bufferlist front, middle, data; - int front_len, middle_len; - unsigned data_len, data_off; - int aborted; - Message *message; - utime_t recv_stamp = ceph_clock_now(); - - if (policy.throttler_messages) { - ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " - << policy.throttler_messages->get_current() << "/" - << policy.throttler_messages->get_max() << dendl; - policy.throttler_messages->get(); - } - - uint64_t message_size = header.front_len + header.middle_len + header.data_len; - if (message_size) { - if (policy.throttler_bytes) { - ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " - << policy.throttler_bytes->get_current() << "/" - << policy.throttler_bytes->get_max() << dendl; - policy.throttler_bytes->get(message_size); - } - - // throttle total bytes waiting for dispatch. do this _after_ the - // policy throttle, as this one does not deadlock (unless dispatch - // blocks indefinitely, which it shouldn't). in contrast, the - // policy throttle carries for the lifetime of the message. - ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " - << in_q->dispatch_throttler.get_current() << "/" - << in_q->dispatch_throttler.get_max() << dendl; - in_q->dispatch_throttler.get(message_size); - } - - utime_t throttle_stamp = ceph_clock_now(); - - // read front - front_len = header.front_len; - if (front_len) { - bufferptr bp = buffer::create(front_len); - if (tcp_read(bp.c_str(), front_len) < 0) - goto out_dethrottle; - front.push_back(std::move(bp)); - ldout(msgr->cct,20) << "reader got front " << front.length() << dendl; - } - - // read middle - middle_len = header.middle_len; - if (middle_len) { - bufferptr bp = buffer::create(middle_len); - if (tcp_read(bp.c_str(), middle_len) < 0) - goto out_dethrottle; - middle.push_back(std::move(bp)); - ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl; - } - - - // read data - data_len = le32_to_cpu(header.data_len); - data_off = le32_to_cpu(header.data_off); - if (data_len) { - unsigned offset = 0; - unsigned left = data_len; - - bufferlist newbuf, rxbuf; - bufferlist::iterator blp; - int rxbuf_version = 0; - - while (left > 0) { - // wait for data - if (tcp_read_wait() < 0) - goto out_dethrottle; - - // get a buffer - connection_state->lock.Lock(); - map >::iterator p = connection_state->rx_buffers.find(header.tid); - if (p != connection_state->rx_buffers.end()) { - if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { - ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second - << " at offset " << offset - << " len " << p->second.first.length() << dendl; - rxbuf = p->second.first; - rxbuf_version = p->second.second; - // make sure it's big enough - if (rxbuf.length() < data_len) - rxbuf.push_back(buffer::create(data_len - rxbuf.length())); - blp = p->second.first.begin(); - blp.advance(offset); - } - } else { - if (!newbuf.length()) { - ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl; - alloc_aligned_buffer(newbuf, data_len, data_off); - blp = newbuf.begin(); - blp.advance(offset); - } - } - bufferptr bp = blp.get_current_ptr(); - int read = MIN(bp.length(), left); - ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; - ssize_t got = tcp_read_nonblocking(bp.c_str(), read); - ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; - connection_state->lock.Unlock(); - if (got < 0) - goto out_dethrottle; - if (got > 0) { - blp.advance(got); - data.append(bp, 0, got); - offset += got; - left -= got; - } // else we got a signal or something; just loop. - } - } - - // footer - if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { - if (tcp_read((char*)&footer, sizeof(footer)) < 0) - goto out_dethrottle; - } else { - ceph_msg_footer_old old_footer; - if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0) - goto out_dethrottle; - footer.front_crc = old_footer.front_crc; - footer.middle_crc = old_footer.middle_crc; - footer.data_crc = old_footer.data_crc; - footer.sig = 0; - footer.flags = old_footer.flags; - } - - aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; - ldout(msgr->cct,10) << "aborted = " << aborted << dendl; - if (aborted) { - ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message.. ABORTED" << dendl; - ret = 0; - goto out_dethrottle; - } - - ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message" << dendl; - message = decode_message(msgr->cct, msgr->crcflags, header, footer, - front, middle, data, connection_state.get()); - if (!message) { - ret = -EINVAL; - goto out_dethrottle; - } - - // - // Check the signature if one should be present. A zero return indicates success. PLR - // - - if (auth_handler == NULL) { - ldout(msgr->cct, 10) << "No session security set" << dendl; - } else { - if (auth_handler->check_message_signature(message)) { - ldout(msgr->cct, 0) << "Signature check failed" << dendl; - message->put(); - ret = -EINVAL; - goto out_dethrottle; - } - } - - message->set_byte_throttler(policy.throttler_bytes); - message->set_message_throttler(policy.throttler_messages); - - // store reservation size in message, so we don't get confused - // by messages entering the dispatch queue through other paths. - message->set_dispatch_throttle_size(message_size); - - message->set_recv_stamp(recv_stamp); - message->set_throttle_stamp(throttle_stamp); - message->set_recv_complete_stamp(ceph_clock_now()); - - *pm = message; - return 0; - - out_dethrottle: - // release bytes reserved from the throttlers on failure - if (policy.throttler_messages) { - ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " - << policy.throttler_messages->get_current() << "/" - << policy.throttler_messages->get_max() << dendl; - policy.throttler_messages->put(); - } - if (message_size) { - if (policy.throttler_bytes) { - ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " - << policy.throttler_bytes->get_current() << "/" - << policy.throttler_bytes->get_max() << dendl; - policy.throttler_bytes->put(message_size); - } - - in_q->dispatch_throttle_release(message_size); - } - return ret; -} - -int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more) -{ - MSGR_SIGPIPE_STOPPER; - while (len > 0) { - int r; - r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); - if (r == 0) - ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; - if (r < 0) { - r = -errno; - ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl; - return r; - } - if (state == STATE_CLOSED) { - ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; - return -EINTR; // close enough - } - - len -= r; - if (len == 0) break; - - // hrmph. trim r bytes off the front of our message. - ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl; - while (r > 0) { - if (msg->msg_iov[0].iov_len <= (size_t)r) { - // lose this whole item - //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; - r -= msg->msg_iov[0].iov_len; - msg->msg_iov++; - msg->msg_iovlen--; - } else { - // partial! - //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; - msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r; - msg->msg_iov[0].iov_len -= r; - break; - } - } - } - return 0; -} - - -int Pipe::write_ack(uint64_t seq) -{ - ldout(msgr->cct,10) << "write_ack " << seq << dendl; - - char c = CEPH_MSGR_TAG_ACK; - ceph_le64 s; - s = seq; - - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[2]; - msgvec[0].iov_base = &c; - msgvec[0].iov_len = 1; - msgvec[1].iov_base = &s; - msgvec[1].iov_len = sizeof(s); - msg.msg_iov = msgvec; - msg.msg_iovlen = 2; - - if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0) - return -1; - return 0; -} - -int Pipe::write_keepalive() -{ - ldout(msgr->cct,10) << "write_keepalive" << dendl; - - char c = CEPH_MSGR_TAG_KEEPALIVE; - - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[2]; - msgvec[0].iov_base = &c; - msgvec[0].iov_len = 1; - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - - if (do_sendmsg(&msg, 1) < 0) - return -1; - return 0; -} - -int Pipe::write_keepalive2(char tag, const utime_t& t) -{ - ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl; - struct ceph_timespec ts; - t.encode_timeval(&ts); - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[2]; - msgvec[0].iov_base = &tag; - msgvec[0].iov_len = 1; - msgvec[1].iov_base = &ts; - msgvec[1].iov_len = sizeof(ts); - msg.msg_iov = msgvec; - msg.msg_iovlen = 2; - - if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0) - return -1; - return 0; -} - - -int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist) -{ - int ret; - - // set up msghdr and iovecs - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = msgvec; - int msglen = 0; - - // send tag - char tag = CEPH_MSGR_TAG_MSG; - msgvec[msg.msg_iovlen].iov_base = &tag; - msgvec[msg.msg_iovlen].iov_len = 1; - msglen++; - msg.msg_iovlen++; - - // send envelope - ceph_msg_header_old oldheader; - if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { - msgvec[msg.msg_iovlen].iov_base = (char*)&header; - msgvec[msg.msg_iovlen].iov_len = sizeof(header); - msglen += sizeof(header); - msg.msg_iovlen++; - } else { - memcpy(&oldheader, &header, sizeof(header)); - oldheader.src.name = header.src; - oldheader.src.addr = connection_state->get_peer_addr(); - oldheader.orig_src = oldheader.src; - oldheader.reserved = header.reserved; - if (msgr->crcflags & MSG_CRC_HEADER) { - oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, - sizeof(oldheader) - sizeof(oldheader.crc)); - } else { - oldheader.crc = 0; - } - msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader; - msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader); - msglen += sizeof(oldheader); - msg.msg_iovlen++; - } - - // payload (front+data) - list::const_iterator pb = blist.buffers().begin(); - unsigned b_off = 0; // carry-over buffer offset, if any - unsigned bl_pos = 0; // blist pos - unsigned left = blist.length(); - - while (left > 0) { - unsigned donow = MIN(left, pb->length()-b_off); - if (donow == 0) { - ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() - << " b_off " << b_off << dendl; - } - assert(donow > 0); - ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off - << " leftinchunk " << left - << " buffer len " << pb->length() - << " writing " << donow - << dendl; - - if (msg.msg_iovlen >= SM_IOV_MAX-2) { - if (do_sendmsg(&msg, msglen, true)) - goto fail; - - // and restart the iov - msg.msg_iov = msgvec; - msg.msg_iovlen = 0; - msglen = 0; - } - - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); - msgvec[msg.msg_iovlen].iov_len = donow; - msglen += donow; - msg.msg_iovlen++; - - assert(left >= donow); - left -= donow; - b_off += donow; - bl_pos += donow; - if (left == 0) - break; - while (b_off == pb->length()) { - ++pb; - b_off = 0; - } - } - assert(left == 0); - - // send footer; if receiver doesn't support signatures, use the old footer format - - ceph_msg_footer_old old_footer; - if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { - msgvec[msg.msg_iovlen].iov_base = (void*)&footer; - msgvec[msg.msg_iovlen].iov_len = sizeof(footer); - msglen += sizeof(footer); - msg.msg_iovlen++; - } else { - if (msgr->crcflags & MSG_CRC_HEADER) { - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - } else { - old_footer.front_crc = old_footer.middle_crc = 0; - } - old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; - old_footer.flags = footer.flags; - msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer; - msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer); - msglen += sizeof(old_footer); - msg.msg_iovlen++; - } - - // send - if (do_sendmsg(&msg, msglen)) - goto fail; - - ret = 0; - - out: - return ret; - - fail: - ret = -1; - goto out; -} - - -int Pipe::tcp_read(char *buf, unsigned len) -{ - if (sd < 0) - return -EINVAL; - - while (len > 0) { - - if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { - ldout(msgr->cct, 0) << "injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); - } - } - - if (tcp_read_wait() < 0) - return -1; - - ssize_t got = tcp_read_nonblocking(buf, len); - - if (got < 0) - return -1; - - len -= got; - buf += got; - //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl; - } - return 0; -} - -int Pipe::tcp_read_wait() -{ - if (sd < 0) - return -EINVAL; - struct pollfd pfd; - short evmask; - pfd.fd = sd; - pfd.events = POLLIN; -#if defined(__linux__) - pfd.events |= POLLRDHUP; -#endif - - if (has_pending_data()) - return 0; - - int r = poll(&pfd, 1, msgr->timeout); - if (r < 0) - return -errno; - if (r == 0) - return -EAGAIN; - - evmask = POLLERR | POLLHUP | POLLNVAL; -#if defined(__linux__) - evmask |= POLLRDHUP; -#endif - if (pfd.revents & evmask) - return -1; - - if (!(pfd.revents & POLLIN)) - return -1; - - return 0; -} - -ssize_t Pipe::do_recv(char *buf, size_t len, int flags) -{ -again: - ssize_t got = ::recv( sd, buf, len, flags ); - if (got < 0) { - if (errno == EINTR) { - goto again; - } - ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " - << got << " " << cpp_strerror(errno) << dendl; - return -1; - } - if (got == 0) { - return -1; - } - return got; -} - -ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags) -{ - size_t left = len; - ssize_t total_recv = 0; - if (recv_len > recv_ofs) { - int to_read = MIN(recv_len - recv_ofs, left); - memcpy(buf, &recv_buf[recv_ofs], to_read); - recv_ofs += to_read; - left -= to_read; - if (left == 0) { - return to_read; - } - buf += to_read; - total_recv += to_read; - } - - /* nothing left in the prefetch buffer */ - - if (left > recv_max_prefetch) { - /* this was a large read, we don't prefetch for these */ - ssize_t ret = do_recv(buf, left, flags ); - if (ret < 0) { - if (total_recv > 0) - return total_recv; - return ret; - } - total_recv += ret; - return total_recv; - } - - - ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags); - if (got < 0) { - if (total_recv > 0) - return total_recv; - - return got; - } - - recv_len = (size_t)got; - got = MIN(left, (size_t)got); - memcpy(buf, recv_buf, got); - recv_ofs = got; - total_recv += got; - return total_recv; -} - -ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len) -{ - ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT ); - if (got < 0) { - ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " - << got << " " << cpp_strerror(errno) << dendl; - return -1; - } - if (got == 0) { - /* poll() said there was data, but we didn't read any - peer - * sent a FIN. Maybe POLLRDHUP signals this, but this is - * standard socket behavior as documented by Stevens. - */ - return -1; - } - return got; -} - -int Pipe::tcp_write(const char *buf, unsigned len) -{ - if (sd < 0) - return -1; - struct pollfd pfd; - pfd.fd = sd; - pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR; -#if defined(__linux__) - pfd.events |= POLLRDHUP; -#endif - - if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { - if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { - ldout(msgr->cct, 0) << "injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); - } - } - - if (poll(&pfd, 1, -1) < 0) - return -1; - - if (!(pfd.revents & POLLOUT)) - return -1; - - //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; - assert(len > 0); - while (len > 0) { - MSGR_SIGPIPE_STOPPER; - int did = ::send( sd, buf, len, MSG_NOSIGNAL ); - if (did < 0) { - //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; - //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; - return did; - } - len -= did; - buf += did; - //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; - } - return 0; -}