// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include #include #include #include #include #include "msg/Message.h" #include "Pipe.h" #include "SimpleMessenger.h" #include "common/debug.h" #include "common/errno.h" #include "common/valgrind.h" // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead #include "auth/Crypto.h" #include "auth/cephx/CephxProtocol.h" #include "auth/AuthSessionHandler.h" #include "include/sock_compat.h" // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR #define SEQ_MASK 0x7fffffff #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << *this ostream& Pipe::_pipe_prefix(std::ostream &out) const { return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " :" << port << " s=" << state << " pgs=" << peer_global_seq << " cs=" << connect_seq << " l=" << policy.lossy << " c=" << connection_state << ")."; } ostream& operator<<(ostream &out, const Pipe &pipe) { return pipe._pipe_prefix(out); } /** * The DelayedDelivery is for injecting delays into Message delivery off * the socket. It is only enabled if delays are requested, and if they * are then it pulls Messages off the DelayQueue and puts them into the * in_q (SimpleMessenger::dispatch_queue). * Please note that this probably has issues with Pipe shutdown and * replacement semantics. I've tried, but no guarantees. */ class Pipe::DelayedDelivery: public Thread { Pipe *pipe; std::deque< pair > delay_queue; Mutex delay_lock; Cond delay_cond; int flush_count; bool active_flush; bool stop_delayed_delivery; bool delay_dispatching; // we are in fast dispatch now bool stop_fast_dispatching_flag; // we need to stop fast dispatching public: explicit DelayedDelivery(Pipe *p) : pipe(p), delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), active_flush(false), stop_delayed_delivery(false), delay_dispatching(false), stop_fast_dispatching_flag(false) { } ~DelayedDelivery() override { discard(); } void *entry() override; void queue(utime_t release, Message *m) { Mutex::Locker l(delay_lock); delay_queue.push_back(make_pair(release, m)); delay_cond.Signal(); } void discard(); void flush(); bool is_flushing() { Mutex::Locker l(delay_lock); return flush_count > 0 || active_flush; } void wait_for_flush() { Mutex::Locker l(delay_lock); while (flush_count > 0 || active_flush) delay_cond.Wait(delay_lock); } void stop() { delay_lock.Lock(); stop_delayed_delivery = true; delay_cond.Signal(); delay_lock.Unlock(); } void steal_for_pipe(Pipe *new_owner) { Mutex::Locker l(delay_lock); pipe = new_owner; } /** * We need to stop fast dispatching before we need to stop putting * normal messages into the DispatchQueue. */ void stop_fast_dispatching(); }; /************************************** * Pipe */ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) : RefCountedObject(r->cct), reader_thread(this), writer_thread(this), delay_thread(NULL), msgr(r), conn_id(r->dispatch_queue.get_id()), recv_ofs(0), recv_len(0), sd(-1), port(0), peer_type(-1), pipe_lock("SimpleMessenger::Pipe::pipe_lock"), state(st), connection_state(NULL), reader_running(false), reader_needs_join(false), reader_dispatching(false), notify_on_dispatch_done(false), writer_running(false), in_q(&(r->dispatch_queue)), send_keepalive(false), send_keepalive_ack(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0) { ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket"); ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state"); ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len"); ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs"); if (con) { connection_state = con; connection_state->reset_pipe(this); } else { connection_state = new PipeConnection(msgr->cct, msgr); connection_state->pipe = get(); } if (randomize_out_seq()) { lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; } msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms if (msgr->timeout == 0) msgr->timeout = -1; recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size; recv_buf = new char[recv_max_prefetch]; } Pipe::~Pipe() { assert(out_q.empty()); assert(sent.empty()); delete delay_thread; delete[] recv_buf; } void Pipe::handle_ack(uint64_t seq) { lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl; // trim sent list while (!sent.empty() && sent.front()->get_seq() <= seq) { Message *m = sent.front(); sent.pop_front(); lsubdout(msgr->cct, ms, 10) << "reader got ack seq " << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; m->put(); } } void Pipe::start_reader() { assert(pipe_lock.is_locked()); assert(!reader_running); if (reader_needs_join) { reader_thread.join(); reader_needs_join = false; } reader_running = true; reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes); } void Pipe::maybe_start_delay_thread() { if (!delay_thread) { auto pos = msgr->cct->_conf->get_val("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type)); if (pos != string::npos) { lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; delay_thread = new DelayedDelivery(this); delay_thread->create("ms_pipe_delay"); } } } void Pipe::start_writer() { assert(pipe_lock.is_locked()); assert(!writer_running); writer_running = true; writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes); } void Pipe::join_reader() { if (!reader_running) return; cond.Signal(); pipe_lock.Unlock(); reader_thread.join(); pipe_lock.Lock(); reader_needs_join = false; } void Pipe::DelayedDelivery::discard() { lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl; Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); delay_queue.pop_front(); } } void Pipe::DelayedDelivery::flush() { lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl; Mutex::Locker l(delay_lock); flush_count = delay_queue.size(); delay_cond.Signal(); } void *Pipe::DelayedDelivery::entry() { Mutex::Locker locker(delay_lock); lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl; while (!stop_delayed_delivery) { if (delay_queue.empty()) { lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; delay_cond.Wait(delay_lock); continue; } utime_t release = delay_queue.front().first; Message *m = delay_queue.front().second; string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type; if (!flush_count && (release > ceph_clock_now() && (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; delay_cond.WaitUntil(delay_lock, release); continue; } lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; delay_queue.pop_front(); if (flush_count > 0) { --flush_count; active_flush = true; } if (pipe->in_q->can_fast_dispatch(m)) { if (!stop_fast_dispatching_flag) { delay_dispatching = true; delay_lock.Unlock(); pipe->in_q->fast_dispatch(m); delay_lock.Lock(); delay_dispatching = false; if (stop_fast_dispatching_flag) { // we need to let the stopping thread proceed delay_cond.Signal(); delay_lock.Unlock(); delay_lock.Lock(); } } } else { pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); } active_flush = false; } lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl; return NULL; } void Pipe::DelayedDelivery::stop_fast_dispatching() { Mutex::Locker l(delay_lock); stop_fast_dispatching_flag = true; while (delay_dispatching) delay_cond.Wait(delay_lock); } int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; assert(pipe_lock.is_locked()); assert(state == STATE_ACCEPTING); pipe_lock.Unlock(); // vars bufferlist addrs; entity_addr_t socket_addr; socklen_t len; int r; char banner[strlen(CEPH_BANNER)+1]; bufferlist addrbl; ceph_msg_connect connect; ceph_msg_connect_reply reply; Pipe *existing = 0; bufferptr bp; bufferlist authorizer, authorizer_reply; bool authorizer_valid; uint64_t feat_missing; bool replaced = false; // this variable denotes if the connection attempt from peer is a hard // reset or not, it is true if there is an existing connection and the // connection sequence from peer is equal to zero bool is_reset_from_peer = false; CryptoKey session_key; int removed; // single-use down below // this should roughly mirror pseudocode at // http://ceph.com/wiki/Messaging_protocol int reply_tag = 0; uint64_t existing_seq = -1; // used for reading in the remote acked seq on connect uint64_t newly_acked_seq = 0; recv_reset(); set_socket_options(); // announce myself. r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; goto fail_unlocked; } // and my addr ::encode(msgr->my_inst.addr, addrs, 0); // legacy port = msgr->my_inst.addr.get_port(); // and peer's socket addr (they might not know their ip) sockaddr_storage ss; len = sizeof(ss); r = ::getpeername(sd, (sockaddr*)&ss, &len); if (r < 0) { ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl; goto fail_unlocked; } socket_addr.set_sockaddr((sockaddr*)&ss); ::encode(socket_addr, addrs, 0); // legacy r = tcp_write(addrs.c_str(), addrs.length()); if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; goto fail_unlocked; } ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl; // identify peer if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; goto fail_unlocked; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { banner[strlen(CEPH_BANNER)] = 0; ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; goto fail_unlocked; } { bufferptr tp(sizeof(ceph_entity_addr)); addrbl.push_back(std::move(tp)); } if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; goto fail_unlocked; } { bufferlist::iterator ti = addrbl.begin(); ::decode(peer_addr, ti); } ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl; if (peer_addr.is_blank_ip()) { // peer apparently doesn't know what ip they have; figure it out for them. int port = peer_addr.get_port(); peer_addr.u = socket_addr.u; peer_addr.set_port(port); ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr << " (socket is " << socket_addr << ")" << dendl; } set_peer_addr(peer_addr); // so that connection_state gets set up while (1) { if (tcp_read((char*)&connect, sizeof(connect)) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; goto fail_unlocked; } authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl; goto fail_unlocked; } authorizer.push_back(std::move(bp)); authorizer_reply.clear(); } ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq << " global_seq " << connect.global_seq << dendl; msgr->lock.Lock(); // FIXME pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; if (state != STATE_ACCEPTING) { goto shutting_down; } // note peer's type, flags set_peer_type(connect.host_type); policy = msgr->get_policy(connect.host_type); ldout(msgr->cct,10) << "accept of host_type " << connect.host_type << ", policy.lossy=" << policy.lossy << " policy.server=" << policy.server << " policy.standby=" << policy.standby << " policy.resetcheck=" << policy.resetcheck << dendl; memset(&reply, 0, sizeof(reply)); reply.protocol_version = msgr->get_proto_version(peer_type, false); msgr->lock.Unlock(); // mismatch? ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; goto reply; } // require signatures for cephx? if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { if (peer_type == CEPH_ENTITY_TYPE_OSD || peer_type == CEPH_ENTITY_TYPE_MDS) { if (msgr->cct->_conf->cephx_require_signatures || msgr->cct->_conf->cephx_cluster_require_signatures) { ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; policy.features_required |= CEPH_FEATURE_MSG_AUTH; } } else { if (msgr->cct->_conf->cephx_require_signatures || msgr->cct->_conf->cephx_service_require_signatures) { ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl; policy.features_required |= CEPH_FEATURE_MSG_AUTH; } } } feat_missing = policy.features_required & ~(uint64_t)connect.features; if (feat_missing) { ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; goto reply; } // Check the authorizer. If not good, bail out. pipe_lock.Unlock(); if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; pipe_lock.Lock(); if (state != STATE_ACCEPTING) goto shutting_down_msgr_unlocked; reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; session_security.reset(); goto reply; } // We've verified the authorizer for this pipe, so set up the session security structure. PLR ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; retry_existing_lookup: msgr->lock.Lock(); pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; if (state != STATE_ACCEPTING) goto shutting_down; // existing? existing = msgr->_lookup_pipe(peer_addr); if (existing) { existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) if (existing->reader_dispatching) { /** we need to wait, or we can deadlock if downstream * fast_dispatchers are (naughtily!) waiting on resources * held by somebody trying to make use of the SimpleMessenger lock. * So drop locks, wait, and retry. It just looks like a slow network * to everybody else. * * We take a ref to existing here since it might get reaped before we * wake up (see bug #15870). We can be confident that it lived until * locked it since we held the msgr lock from _lookup_pipe through to * locking existing->lock and checking reader_dispatching. */ existing->get(); pipe_lock.Unlock(); msgr->lock.Unlock(); existing->notify_on_dispatch_done = true; while (existing->reader_dispatching) existing->cond.Wait(existing->pipe_lock); existing->pipe_lock.Unlock(); existing->put(); existing = nullptr; goto retry_existing_lookup; } if (connect.global_seq < existing->peer_global_seq) { ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; reply.global_seq = existing->peer_global_seq; // so we can send it below.. existing->pipe_lock.Unlock(); msgr->lock.Unlock(); goto reply; } else { ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq << " <= " << connect.global_seq << ", looks ok" << dendl; } if (existing->policy.lossy) { ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy=" << policy.lossy << ")" << dendl; existing->was_session_reset(); goto replace; } ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq << " vs existing " << existing->connect_seq << " state " << existing->get_state_name() << dendl; if (connect.connect_seq == 0 && existing->connect_seq > 0) { ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl; // this is a hard reset from peer is_reset_from_peer = true; if (policy.resetcheck) existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s goto replace; } if (connect.connect_seq < existing->connect_seq) { // old attempt, or we sent READY but they didn't get it. ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; goto retry_session; } if (connect.connect_seq == existing->connect_seq) { // if the existing connection successfully opened, and/or // subsequently went to standby, then the peer should bump // their connect_seq and retry: this is not a connection race // we need to resolve here. if (existing->state == STATE_OPEN || existing->state == STATE_STANDBY) { ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; goto retry_session; } // connection race? if (peer_addr < msgr->my_inst.addr || existing->policy.server) { // incoming wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl; if (!(existing->state == STATE_CONNECTING || existing->state == STATE_WAIT)) lderr(msgr->cct) << "accept race bad state, would replace, existing=" << existing->get_state_name() << " " << existing << ".cseq=" << existing->connect_seq << " == " << connect.connect_seq << dendl; assert(existing->state == STATE_CONNECTING || existing->state == STATE_WAIT); goto replace; } else { // our existing outgoing wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; assert(peer_addr > msgr->my_inst.addr); if (!(existing->state == STATE_CONNECTING)) lderr(msgr->cct) << "accept race bad state, would send wait, existing=" << existing->get_state_name() << " " << existing << ".cseq=" << existing->connect_seq << " == " << connect.connect_seq << dendl; assert(existing->state == STATE_CONNECTING); // make sure our outgoing connection will follow through existing->_send_keepalive(); reply.tag = CEPH_MSGR_TAG_WAIT; existing->pipe_lock.Unlock(); msgr->lock.Unlock(); goto reply; } } assert(connect.connect_seq > existing->connect_seq); assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other existing->connect_seq == 0) { ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RESETSESSION; msgr->lock.Unlock(); existing->pipe_lock.Unlock(); goto reply; } // reconnect ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq << " > " << existing->connect_seq << dendl; goto replace; } // existing else if (connect.connect_seq > 0) { // we reset, and they are opening a new session ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; msgr->lock.Unlock(); reply.tag = CEPH_MSGR_TAG_RESETSESSION; goto reply; } else { // new session ldout(msgr->cct,10) << "accept new session" << dendl; existing = NULL; goto open; } ceph_abort(); retry_session: assert(existing->pipe_lock.is_locked()); assert(pipe_lock.is_locked()); reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq + 1; existing->pipe_lock.Unlock(); msgr->lock.Unlock(); goto reply; reply: assert(pipe_lock.is_locked()); reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) goto fail_unlocked; if (reply.authorizer_len) { r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); if (r < 0) goto fail_unlocked; } } replace: assert(existing->pipe_lock.is_locked()); assert(pipe_lock.is_locked()); // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { reply_tag = CEPH_MSGR_TAG_SEQ; existing_seq = existing->in_seq; } ldout(msgr->cct,10) << "accept replacing " << existing << dendl; existing->stop(); existing->unregister_pipe(); replaced = true; if (existing->policy.lossy) { // disconnect from the Connection assert(existing->connection_state); if (existing->connection_state->clear_pipe(existing)) msgr->dispatch_queue.queue_reset(existing->connection_state.get()); } else { // queue a reset on the new connection, which we're dumping for the old msgr->dispatch_queue.queue_reset(connection_state.get()); // drop my Connection, and take a ref to the existing one. do not // clear existing->connection_state, since read_message and // write_message both dereference it without pipe_lock. connection_state = existing->connection_state; // make existing Connection reference us connection_state->reset_pipe(this); if (existing->delay_thread) { existing->delay_thread->steal_for_pipe(this); delay_thread = existing->delay_thread; existing->delay_thread = NULL; delay_thread->flush(); } // steal incoming queue uint64_t replaced_conn_id = conn_id; conn_id = existing->conn_id; existing->conn_id = replaced_conn_id; // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value in_seq = is_reset_from_peer ? 0 : existing->in_seq; in_seq_acked = in_seq; // steal outgoing queue and out_seq existing->requeue_sent(); out_seq = existing->out_seq; ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl; for (map >::iterator p = existing->out_q.begin(); p != existing->out_q.end(); ++p) out_q[p->first].splice(out_q[p->first].begin(), p->second); } existing->stop_and_wait(); existing->pipe_lock.Unlock(); open: // open assert(pipe_lock.is_locked()); connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; assert(state == STATE_ACCEPTING); state = STATE_OPEN; ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; // send READY reply reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); reply.features = policy.features_supported; reply.global_seq = msgr->get_global_seq(); reply.connect_seq = connect_seq; reply.flags = 0; reply.authorizer_len = authorizer_reply.length(); if (policy.lossy) reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl; session_security.reset( get_auth_session_handler(msgr->cct, connect.authorizer_protocol, session_key, connection_state->get_features())); // notify msgr->dispatch_queue.queue_accept(connection_state.get()); msgr->ms_deliver_handle_fast_accept(connection_state.get()); // ok! if (msgr->dispatch_queue.stop) goto shutting_down; removed = msgr->accepting_pipes.erase(this); assert(removed == 1); register_pipe(); msgr->lock.Unlock(); pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) { goto fail_registered; } if (reply.authorizer_len) { r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); if (r < 0) { goto fail_registered; } } if (reply_tag == CEPH_MSGR_TAG_SEQ) { if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_registered; } if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; goto fail_registered; } } pipe_lock.Lock(); discard_requeued_up_to(newly_acked_seq); if (state != STATE_CLOSED) { ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl; start_writer(); } ldout(msgr->cct,20) << "accept done" << dendl; maybe_start_delay_thread(); return 0; // success. fail_registered: ldout(msgr->cct, 10) << "accept fault after register" << dendl; if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } fail_unlocked: pipe_lock.Lock(); if (state != STATE_CLOSED) { bool queued = is_queued(); ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl; if (queued) { state = policy.server ? STATE_STANDBY : STATE_CONNECTING; } else if (replaced) { state = STATE_STANDBY; } else { state = STATE_CLOSED; state_closed = true; } fault(); if (queued || replaced) start_writer(); } return -1; shutting_down: msgr->lock.Unlock(); shutting_down_msgr_unlocked: assert(pipe_lock.is_locked()); if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } state = STATE_CLOSED; state_closed = true; fault(); return -1; } void Pipe::set_socket_options() { // disable Nagle algorithm? if (msgr->cct->_conf->ms_tcp_nodelay) { int flag = 1; int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); if (r < 0) { r = -errno; ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: " << cpp_strerror(r) << dendl; } } if (msgr->cct->_conf->ms_tcp_rcvbuf) { int size = msgr->cct->_conf->ms_tcp_rcvbuf; int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); if (r < 0) { r = -errno; ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size << ": " << cpp_strerror(r) << dendl; } } // block ESIGPIPE #ifdef CEPH_USE_SO_NOSIGPIPE int val = 1; int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); if (r) { r = -errno; ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: " << cpp_strerror(r) << dendl; } #endif #ifdef SO_PRIORITY int prio = msgr->get_socket_priority(); if (prio >= 0) { int r = -1; #ifdef IPTOS_CLASS_CS6 int iptos = IPTOS_CLASS_CS6; int addr_family = 0; if (!peer_addr.is_blank_ip()) { addr_family = peer_addr.get_family(); } else { addr_family = msgr->get_myaddr().get_family(); } switch (addr_family) { case AF_INET: r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos)); break; case AF_INET6: r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos)); break; default: lderr(msgr->cct) << "couldn't set ToS of unknown family (" << addr_family << ")" << " to " << iptos << dendl; return; } if (r < 0) { r = -errno; ldout(msgr->cct,0) << "couldn't set TOS to " << iptos << ": " << cpp_strerror(r) << dendl; } #endif // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0. // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT // We need to call setsockopt(SO_PRIORITY) after it. r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio)); if (r < 0) { r = -errno; ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio << ": " << cpp_strerror(r) << dendl; } } #endif } int Pipe::connect() { bool got_bad_auth = false; ldout(msgr->cct,10) << "connect " << connect_seq << dendl; assert(pipe_lock.is_locked()); __u32 cseq = connect_seq; __u32 gseq = msgr->get_global_seq(); // stop reader thread join_reader(); pipe_lock.Unlock(); char tag = -1; int rc = -1; struct msghdr msg; struct iovec msgvec[2]; int msglen; char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy entity_addr_t paddr; entity_addr_t peer_addr_for_me, socket_addr; AuthAuthorizer *authorizer = NULL; bufferlist addrbl, myaddrbl; const md_config_t *conf = msgr->cct->_conf; // close old socket. this is safe because we stopped the reader thread above. if (sd >= 0) ::close(sd); // create socket? sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0); if (sd < 0) { rc = -errno; lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl; goto fail; } recv_reset(); set_socket_options(); { entity_addr_t addr2bind = msgr->get_myaddr(); if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) { addr2bind.set_port(0); int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len()); if (r < 0) { ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl; goto fail; } } } // connect! ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len()); if (rc < 0) { int stored_errno = errno; ldout(msgr->cct,2) << "connect error " << peer_addr << ", " << cpp_strerror(stored_errno) << dendl; if (stored_errno == ECONNREFUSED) { ldout(msgr->cct, 2) << "connection refused!" << dendl; msgr->dispatch_queue.queue_refused(connection_state.get()); } goto fail; } // verify banner // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. rc = tcp_read((char*)&banner, strlen(CEPH_BANNER)); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl; goto fail; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl; goto fail; } memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = banner; msgvec[0].iov_len = strlen(CEPH_BANNER); msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; rc = do_sendmsg(&msg, msglen); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl; goto fail; } // identify peer { #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__) bufferptr p(sizeof(ceph_entity_addr) * 2); #else int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage); bufferptr p(wirelen * 2); #endif addrbl.push_back(std::move(p)); } rc = tcp_read(addrbl.c_str(), addrbl.length()); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl; goto fail; } try { bufferlist::iterator p = addrbl.begin(); ::decode(paddr, p); ::decode(peer_addr_for_me, p); } catch (buffer::error& e) { ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what() << dendl; goto fail; } port = peer_addr_for_me.get_port(); ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; if (peer_addr != paddr) { if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { ldout(msgr->cct,0) << "connect claims to be " << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; } else { ldout(msgr->cct,10) << "connect claims to be " << paddr << " not " << peer_addr << dendl; goto fail; } } ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl; msgr->learned_addr(peer_addr_for_me); ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = myaddrbl.c_str(); msgvec[0].iov_len = myaddrbl.length(); msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; rc = do_sendmsg(&msg, msglen); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl; goto fail; } ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl; while (1) { delete authorizer; authorizer = msgr->get_authorizer(peer_type, false); bufferlist authorizer_reply; ceph_msg_connect connect; connect.features = policy.features_supported; connect.host_type = msgr->get_myinst().name.type(); connect.global_seq = gseq; connect.connect_seq = cseq; connect.protocol_version = msgr->get_proto_version(peer_type, true); connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; if (authorizer) ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len << " protocol=" << connect.authorizer_protocol << dendl; connect.flags = 0; if (policy.lossy) connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = (char*)&connect; msgvec[0].iov_len = sizeof(connect); msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; if (authorizer) { msgvec[1].iov_base = authorizer->bl.c_str(); msgvec[1].iov_len = authorizer->bl.length(); msg.msg_iovlen++; msglen += msgvec[1].iov_len; } ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq << " proto=" << connect.protocol_version << dendl; rc = do_sendmsg(&msg, msglen); if (rc < 0) { ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl; goto fail; } ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl; ceph_msg_connect_reply reply; rc = tcp_read((char*)&reply, sizeof(reply)); if (rc < 0) { ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl; goto fail; } ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag << " connect_seq " << reply.connect_seq << " global_seq " << reply.global_seq << " proto " << reply.protocol_version << " flags " << (int)reply.flags << " features " << reply.features << dendl; authorizer_reply.clear(); if (reply.authorizer_len) { ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; bufferptr bp = buffer::create(reply.authorizer_len); rc = tcp_read(bp.c_str(), reply.authorizer_len); if (rc < 0) { ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl; goto fail; } authorizer_reply.push_back(bp); } if (authorizer) { bufferlist::iterator iter = authorizer_reply.begin(); if (!authorizer->verify_reply(iter)) { ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl; goto fail; } } if (conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } pipe_lock.Lock(); if (state != STATE_CONNECTING) { ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl; goto stop_locked; } if (reply.tag == CEPH_MSGR_TAG_FEATURES) { ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex << connect.features << " < peer " << reply.features << " missing " << (reply.features & ~policy.features_supported) << std::dec << dendl; goto fail_locked; } if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version << " != " << reply.protocol_version << dendl; goto fail_locked; } if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl; if (got_bad_auth) goto stop_locked; got_bad_auth = true; pipe_lock.Unlock(); delete authorizer; authorizer = msgr->get_authorizer(peer_type, true); // try harder continue; } if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl; was_session_reset(); cseq = 0; pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { gseq = msgr->get_global_seq(reply.global_seq); ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq << " chose new " << gseq << dendl; pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { assert(reply.connect_seq > connect_seq); ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq << " -> " << reply.connect_seq << dendl; cseq = connect_seq = reply.connect_seq; pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_WAIT) { ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl; state = STATE_WAIT; goto stop_locked; } if (reply.tag == CEPH_MSGR_TAG_READY || reply.tag == CEPH_MSGR_TAG_SEQ) { uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features; if (feat_missing) { ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; goto fail_locked; } if (reply.tag == CEPH_MSGR_TAG_SEQ) { ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; uint64_t newly_acked_seq = 0; rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)); if (rc < 0) { ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl; goto fail_locked; } ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq << " vs out_seq " << out_seq << dendl; while (newly_acked_seq > out_seq) { Message *m = _get_next_outgoing(); assert(m); ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq() << " " << *m << dendl; assert(m->get_seq() <= newly_acked_seq); m->put(); ++out_seq; } if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) { ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; goto fail_locked; } } // hooray! peer_global_seq = reply.global_seq; policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY; state = STATE_OPEN; connect_seq = cseq + 1; assert(connect_seq == reply.connect_seq); backoff = utime_t(); connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy << ", features " << connection_state->get_features() << dendl; // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the // connection. PLR if (authorizer != NULL) { session_security.reset( get_auth_session_handler(msgr->cct, authorizer->protocol, authorizer->session_key, connection_state->get_features())); } else { // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR session_security.reset(); } msgr->dispatch_queue.queue_connect(connection_state.get()); msgr->ms_deliver_handle_fast_connect(connection_state.get()); if (!reader_running) { ldout(msgr->cct,20) << "connect starting reader" << dendl; start_reader(); } maybe_start_delay_thread(); delete authorizer; return 0; } // protocol error ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl; goto fail_locked; } fail: if (conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } pipe_lock.Lock(); fail_locked: if (state == STATE_CONNECTING) fault(); else ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name() << " != connecting, stopping" << dendl; stop_locked: delete authorizer; return rc; } void Pipe::register_pipe() { ldout(msgr->cct,10) << "register_pipe" << dendl; assert(msgr->lock.is_locked()); Pipe *existing = msgr->_lookup_pipe(peer_addr); assert(existing == NULL); msgr->rank_pipe[peer_addr] = this; } void Pipe::unregister_pipe() { assert(msgr->lock.is_locked()); ceph::unordered_map::iterator p = msgr->rank_pipe.find(peer_addr); if (p != msgr->rank_pipe.end() && p->second == this) { ldout(msgr->cct,10) << "unregister_pipe" << dendl; msgr->rank_pipe.erase(p); } else { ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. } } void Pipe::join() { ldout(msgr->cct, 20) << "join" << dendl; if (writer_thread.is_started()) writer_thread.join(); if (reader_thread.is_started()) reader_thread.join(); if (delay_thread) { ldout(msgr->cct, 20) << "joining delay_thread" << dendl; delay_thread->stop(); delay_thread->join(); } } void Pipe::requeue_sent() { if (sent.empty()) return; list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!sent.empty()) { Message *m = sent.back(); sent.pop_back(); ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq << " (" << m->get_seq() << ")" << dendl; rq.push_front(m); out_seq--; } } void Pipe::discard_requeued_up_to(uint64_t seq) { ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) return; list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!rq.empty()) { Message *m = rq.front(); if (m->get_seq() == 0 || m->get_seq() > seq) break; ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq << " <= " << seq << ", discarding" << dendl; m->put(); rq.pop_front(); out_seq++; } if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST); } /* * Tears down the Pipe's message queues, and removes them from the DispatchQueue * Must hold pipe_lock prior to calling. */ void Pipe::discard_out_queue() { ldout(msgr->cct,10) << "discard_queue" << dendl; for (list::iterator p = sent.begin(); p != sent.end(); ++p) { ldout(msgr->cct,20) << " discard " << *p << dendl; (*p)->put(); } sent.clear(); for (map >::iterator p = out_q.begin(); p != out_q.end(); ++p) for (list::iterator r = p->second.begin(); r != p->second.end(); ++r) { ldout(msgr->cct,20) << " discard " << *r << dendl; (*r)->put(); } out_q.clear(); } void Pipe::fault(bool onread) { const md_config_t *conf = msgr->cct->_conf; assert(pipe_lock.is_locked()); cond.Signal(); if (onread && state == STATE_CONNECTING) { ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl; return; } ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl; if (state == STATE_CLOSED || state == STATE_CLOSING) { ldout(msgr->cct,10) << "fault already closed|closing" << dendl; if (connection_state->clear_pipe(this)) msgr->dispatch_queue.queue_reset(connection_state.get()); return; } shutdown_socket(); // lossy channel? if (policy.lossy && state != STATE_CONNECTING) { ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl; // disconnect from Connection, and mark it failed. future messages // will be dropped. assert(connection_state); stop(); bool cleared = connection_state->clear_pipe(this); // crib locks, blech. note that Pipe is now STATE_CLOSED and the // rank_pipe entry is ignored by others. pipe_lock.Unlock(); if (conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } msgr->lock.Lock(); pipe_lock.Lock(); unregister_pipe(); msgr->lock.Unlock(); if (delay_thread) delay_thread->discard(); in_q->discard_queue(conn_id); discard_out_queue(); if (cleared) msgr->dispatch_queue.queue_reset(connection_state.get()); return; } // queue delayed items immediately if (delay_thread) delay_thread->flush(); // requeue sent items requeue_sent(); if (policy.standby && !is_queued()) { ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; return; } if (state != STATE_CONNECTING) { if (policy.server) { ldout(msgr->cct,0) << "fault, server, going to standby" << dendl; state = STATE_STANDBY; } else { ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl; connect_seq++; state = STATE_CONNECTING; } backoff = utime_t(); } else if (backoff == utime_t()) { ldout(msgr->cct,0) << "fault" << dendl; backoff.set_from_double(conf->ms_initial_backoff); } else { ldout(msgr->cct,10) << "fault waiting " << backoff << dendl; cond.WaitInterval(pipe_lock, backoff); backoff += backoff; if (backoff > conf->ms_max_backoff) backoff.set_from_double(conf->ms_max_backoff); ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl; } } int Pipe::randomize_out_seq() { if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) { // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error // here. We'll check it on the call. PLR int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq)); out_seq &= SEQ_MASK; lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl; return seq_error; } else { // previously, seq #'s always started at 0. out_seq = 0; return 0; } } void Pipe::was_session_reset() { assert(pipe_lock.is_locked()); ldout(msgr->cct,10) << "was_session_reset" << dendl; in_q->discard_queue(conn_id); if (delay_thread) delay_thread->discard(); discard_out_queue(); msgr->dispatch_queue.queue_remote_reset(connection_state.get()); if (randomize_out_seq()) { lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; } in_seq = 0; connect_seq = 0; } void Pipe::stop() { ldout(msgr->cct,10) << "stop" << dendl; assert(pipe_lock.is_locked()); state = STATE_CLOSED; state_closed = true; cond.Signal(); shutdown_socket(); } void Pipe::stop_and_wait() { assert(pipe_lock.is_locked_by_me()); if (state != STATE_CLOSED) stop(); if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << __func__ << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; utime_t t; t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays); t.sleep(); } if (delay_thread) { pipe_lock.Unlock(); delay_thread->stop_fast_dispatching(); pipe_lock.Lock(); } while (reader_running && reader_dispatching) cond.Wait(pipe_lock); } /* read msgs from socket. * also, server. */ void Pipe::reader() { pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); // sleep if (re)connecting if (state == STATE_STANDBY) { ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl; cond.Wait(pipe_lock); continue; } // get a reference to the AuthSessionHandler while we have the pipe_lock ceph::shared_ptr auth_handler = session_security; pipe_lock.Unlock(); char tag = -1; ldout(msgr->cct,20) << "reader reading tag..." << dendl; if (tcp_read((char*)&tag, 1) < 0) { pipe_lock.Lock(); ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl; fault(true); continue; } if (tag == CEPH_MSGR_TAG_KEEPALIVE) { ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl; pipe_lock.Lock(); connection_state->set_last_keepalive(ceph_clock_now()); continue; } if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl; ceph_timespec t; int rc = tcp_read((char*)&t, sizeof(t)); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; fault(true); } else { send_keepalive_ack = true; keepalive_ack_stamp = utime_t(t); ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp << dendl; connection_state->set_last_keepalive(ceph_clock_now()); cond.Signal(); } continue; } if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl; struct ceph_timespec t; int rc = tcp_read((char*)&t, sizeof(t)); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; fault(true); } else { connection_state->set_last_keepalive_ack(utime_t(t)); } continue; } // open ... if (tag == CEPH_MSGR_TAG_ACK) { ldout(msgr->cct,20) << "reader got ACK" << dendl; ceph_le64 seq; int rc = tcp_read((char*)&seq, sizeof(seq)); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl; fault(true); } else if (state != STATE_CLOSED) { handle_ack(seq); } continue; } else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } m->trace.event("pipe read message"); if (state == STATE_CLOSED || state == STATE_CONNECTING) { in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); continue; } // check received seq#. if it is old, drop the message. // note that incoming messages may skip ahead. this is convenient for the client // side queueing because messages can't be renumbered, but the (kernel) client will // occasionally pull a message out of the sent queue to send elsewhere. in that case // it doesn't matter if we "got" it or not. if (m->get_seq() <= in_seq) { ldout(msgr->cct,0) << "reader got old message " << m->get_seq() << " <= " << in_seq << " " << m << " " << *m << ", discarding" << dendl; in_q->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && msgr->cct->_conf->ms_die_on_old_message) assert(0 == "old msgs despite reconnect_seq feature"); continue; } if (m->get_seq() > in_seq + 1) { ldout(msgr->cct,0) << "reader missed message? skipped from seq " << in_seq << " to " << m->get_seq() << dendl; if (msgr->cct->_conf->ms_die_on_skipped_message) assert(0 == "skipped incoming seq"); } m->set_connection(connection_state.get()); // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { in_q->enqueue(m, m->get_priority(), conn_id); } } } else if (tag == CEPH_MSGR_TAG_CLOSE) { ldout(msgr->cct,20) << "reader got CLOSE" << dendl; pipe_lock.Lock(); if (state == STATE_CLOSING) { state = STATE_CLOSED; state_closed = true; } else { state = STATE_CLOSING; } cond.Signal(); break; } else { ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl; pipe_lock.Lock(); fault(true); } } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl; } /* write msgs to socket. * also, client. */ void Pipe::writer() { pipe_lock.Lock(); while (state != STATE_CLOSED) {// && state != STATE_WAIT) { ldout(msgr->cct,10) << "writer: state = " << get_state_name() << " policy.server=" << policy.server << dendl; // standby? if (is_queued() && state == STATE_STANDBY && !policy.server) state = STATE_CONNECTING; // connect? if (state == STATE_CONNECTING) { assert(!policy.server); connect(); continue; } if (state == STATE_CLOSING) { // write close tag ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl; char tag = CEPH_MSGR_TAG_CLOSE; state = STATE_CLOSED; state_closed = true; pipe_lock.Unlock(); if (sd >= 0) { // we can ignore return value, actually; we don't care if this succeeds. int r = ::write(sd, &tag, 1); (void)r; } pipe_lock.Lock(); continue; } if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && (is_queued() || in_seq > in_seq_acked)) { // keepalive? if (send_keepalive) { int rc; if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { pipe_lock.Unlock(); rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2, ceph_clock_now()); } else { pipe_lock.Unlock(); rc = write_keepalive(); } pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "writer couldn't write keepalive[2], " << cpp_strerror(errno) << dendl; fault(); continue; } send_keepalive = false; } if (send_keepalive_ack) { utime_t t = keepalive_ack_stamp; pipe_lock.Unlock(); int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl; fault(); continue; } send_keepalive_ack = false; } // send ack? if (in_seq > in_seq_acked) { uint64_t send_seq = in_seq; pipe_lock.Unlock(); int rc = write_ack(send_seq); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl; fault(); continue; } in_seq_acked = send_seq; } // grab outgoing message Message *m = _get_next_outgoing(); if (m) { m->set_seq(++out_seq); if (!policy.lossy) { // put on sent list sent.push_back(m); m->get(); } // associate message with Connection (for benefit of encode_payload) m->set_connection(connection_state.get()); uint64_t features = connection_state->get_features(); if (m->empty_payload()) ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features << " " << m << " " << *m << dendl; else ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features << " " << m << " " << *m << dendl; // encode and copy out of *m m->encode(features, msgr->crcflags); // prepare everything const ceph_msg_header& header = m->get_header(); const ceph_msg_footer& footer = m->get_footer(); // Now that we have all the crcs calculated, handle the // digital signature for the message, if the pipe has session // security set up. Some session security options do not // actually calculate and check the signature, but they should // handle the calls to sign_message and check_signature. PLR if (session_security.get() == NULL) { ldout(msgr->cct, 20) << "writer no session security" << dendl; } else { if (session_security->sign_message(m)) { ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq << "): sig = " << footer.sig << dendl; } else { ldout(msgr->cct, 20) << "writer signed seq # " << header.seq << "): sig = " << footer.sig << dendl; } } bufferlist blist = m->get_payload(); blist.append(m->get_middle()); blist.append(m->get_data()); pipe_lock.Unlock(); m->trace.event("pipe writing message"); ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(header, footer, blist); pipe_lock.Lock(); if (rc < 0) { ldout(msgr->cct,1) << "writer error sending " << m << ", " << cpp_strerror(errno) << dendl; fault(); } m->put(); } continue; } // wait ldout(msgr->cct,20) << "writer sleeping" << dendl; cond.Wait(pipe_lock); } ldout(msgr->cct,20) << "writer finishing" << dendl; // reap? writer_running = false; unlock_maybe_reap(); ldout(msgr->cct,10) << "writer done" << dendl; } void Pipe::unlock_maybe_reap() { if (!reader_running && !writer_running) { shutdown_socket(); pipe_lock.Unlock(); if (delay_thread && delay_thread->is_flushing()) { delay_thread->wait_for_flush(); } msgr->queue_reap(this); } else { pipe_lock.Unlock(); } } static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment unsigned left = len; if (off & ~CEPH_PAGE_MASK) { // head unsigned head = 0; head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); data.push_back(buffer::create(head)); left -= head; } unsigned middle = left & CEPH_PAGE_MASK; if (middle > 0) { data.push_back(buffer::create_page_aligned(middle)); left -= middle; } if (left) { data.push_back(buffer::create(left)); } } int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler) { int ret = -1; // envelope //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl; ceph_msg_header header; ceph_msg_footer footer; __u32 header_crc = 0; if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { if (tcp_read((char*)&header, sizeof(header)) < 0) return -1; if (msgr->crcflags & MSG_CRC_HEADER) { header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } } else { ceph_msg_header_old oldheader; if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0) return -1; // this is fugly memcpy(&header, &oldheader, sizeof(header)); header.src = oldheader.src.name; header.reserved = oldheader.reserved; if (msgr->crcflags & MSG_CRC_HEADER) { header.crc = oldheader.crc; header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); } } ldout(msgr->cct,20) << "reader got envelope type=" << header.type << " src " << entity_name_t(header.src) << " front=" << header.front_len << " data=" << header.data_len << " off " << header.data_off << dendl; // verify header crc if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) { ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; return -1; } bufferlist front, middle, data; int front_len, middle_len; unsigned data_len, data_off; int aborted; Message *message; utime_t recv_stamp = ceph_clock_now(); if (policy.throttler_messages) { ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler " << policy.throttler_messages->get_current() << "/" << policy.throttler_messages->get_max() << dendl; policy.throttler_messages->get(); } uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { if (policy.throttler_bytes) { ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; policy.throttler_bytes->get(message_size); } // throttle total bytes waiting for dispatch. do this _after_ the // policy throttle, as this one does not deadlock (unless dispatch // blocks indefinitely, which it shouldn't). in contrast, the // policy throttle carries for the lifetime of the message. ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " << in_q->dispatch_throttler.get_current() << "/" << in_q->dispatch_throttler.get_max() << dendl; in_q->dispatch_throttler.get(message_size); } utime_t throttle_stamp = ceph_clock_now(); // read front front_len = header.front_len; if (front_len) { bufferptr bp = buffer::create(front_len); if (tcp_read(bp.c_str(), front_len) < 0) goto out_dethrottle; front.push_back(std::move(bp)); ldout(msgr->cct,20) << "reader got front " << front.length() << dendl; } // read middle middle_len = header.middle_len; if (middle_len) { bufferptr bp = buffer::create(middle_len); if (tcp_read(bp.c_str(), middle_len) < 0) goto out_dethrottle; middle.push_back(std::move(bp)); ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl; } // read data data_len = le32_to_cpu(header.data_len); data_off = le32_to_cpu(header.data_off); if (data_len) { unsigned offset = 0; unsigned left = data_len; bufferlist newbuf, rxbuf; bufferlist::iterator blp; int rxbuf_version = 0; while (left > 0) { // wait for data if (tcp_read_wait() < 0) goto out_dethrottle; // get a buffer connection_state->lock.Lock(); map >::iterator p = connection_state->rx_buffers.find(header.tid); if (p != connection_state->rx_buffers.end()) { if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second << " at offset " << offset << " len " << p->second.first.length() << dendl; rxbuf = p->second.first; rxbuf_version = p->second.second; // make sure it's big enough if (rxbuf.length() < data_len) rxbuf.push_back(buffer::create(data_len - rxbuf.length())); blp = p->second.first.begin(); blp.advance(offset); } } else { if (!newbuf.length()) { ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl; alloc_aligned_buffer(newbuf, data_len, data_off); blp = newbuf.begin(); blp.advance(offset); } } bufferptr bp = blp.get_current_ptr(); int read = MIN(bp.length(), left); ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; ssize_t got = tcp_read_nonblocking(bp.c_str(), read); ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; connection_state->lock.Unlock(); if (got < 0) goto out_dethrottle; if (got > 0) { blp.advance(got); data.append(bp, 0, got); offset += got; left -= got; } // else we got a signal or something; just loop. } } // footer if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { if (tcp_read((char*)&footer, sizeof(footer)) < 0) goto out_dethrottle; } else { ceph_msg_footer_old old_footer; if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0) goto out_dethrottle; footer.front_crc = old_footer.front_crc; footer.middle_crc = old_footer.middle_crc; footer.data_crc = old_footer.data_crc; footer.sig = 0; footer.flags = old_footer.flags; } aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; ldout(msgr->cct,10) << "aborted = " << aborted << dendl; if (aborted) { ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message.. ABORTED" << dendl; ret = 0; goto out_dethrottle; } ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; message = decode_message(msgr->cct, msgr->crcflags, header, footer, front, middle, data, connection_state.get()); if (!message) { ret = -EINVAL; goto out_dethrottle; } // // Check the signature if one should be present. A zero return indicates success. PLR // if (auth_handler == NULL) { ldout(msgr->cct, 10) << "No session security set" << dendl; } else { if (auth_handler->check_message_signature(message)) { ldout(msgr->cct, 0) << "Signature check failed" << dendl; message->put(); ret = -EINVAL; goto out_dethrottle; } } message->set_byte_throttler(policy.throttler_bytes); message->set_message_throttler(policy.throttler_messages); // store reservation size in message, so we don't get confused // by messages entering the dispatch queue through other paths. message->set_dispatch_throttle_size(message_size); message->set_recv_stamp(recv_stamp); message->set_throttle_stamp(throttle_stamp); message->set_recv_complete_stamp(ceph_clock_now()); *pm = message; return 0; out_dethrottle: // release bytes reserved from the throttlers on failure if (policy.throttler_messages) { ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler " << policy.throttler_messages->get_current() << "/" << policy.throttler_messages->get_max() << dendl; policy.throttler_messages->put(); } if (message_size) { if (policy.throttler_bytes) { ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler " << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; policy.throttler_bytes->put(message_size); } in_q->dispatch_throttle_release(message_size); } return ret; } int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more) { MSGR_SIGPIPE_STOPPER; while (len > 0) { int r; r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); if (r == 0) ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; if (r < 0) { r = -errno; ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl; return r; } if (state == STATE_CLOSED) { ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; return -EINTR; // close enough } len -= r; if (len == 0) break; // hrmph. trim r bytes off the front of our message. ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl; while (r > 0) { if (msg->msg_iov[0].iov_len <= (size_t)r) { // lose this whole item //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; r -= msg->msg_iov[0].iov_len; msg->msg_iov++; msg->msg_iovlen--; } else { // partial! //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r; msg->msg_iov[0].iov_len -= r; break; } } } return 0; } int Pipe::write_ack(uint64_t seq) { ldout(msgr->cct,10) << "write_ack " << seq << dendl; char c = CEPH_MSGR_TAG_ACK; ceph_le64 s; s = seq; struct msghdr msg; memset(&msg, 0, sizeof(msg)); struct iovec msgvec[2]; msgvec[0].iov_base = &c; msgvec[0].iov_len = 1; msgvec[1].iov_base = &s; msgvec[1].iov_len = sizeof(s); msg.msg_iov = msgvec; msg.msg_iovlen = 2; if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0) return -1; return 0; } int Pipe::write_keepalive() { ldout(msgr->cct,10) << "write_keepalive" << dendl; char c = CEPH_MSGR_TAG_KEEPALIVE; struct msghdr msg; memset(&msg, 0, sizeof(msg)); struct iovec msgvec[2]; msgvec[0].iov_base = &c; msgvec[0].iov_len = 1; msg.msg_iov = msgvec; msg.msg_iovlen = 1; if (do_sendmsg(&msg, 1) < 0) return -1; return 0; } int Pipe::write_keepalive2(char tag, const utime_t& t) { ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl; struct ceph_timespec ts; t.encode_timeval(&ts); struct msghdr msg; memset(&msg, 0, sizeof(msg)); struct iovec msgvec[2]; msgvec[0].iov_base = &tag; msgvec[0].iov_len = 1; msgvec[1].iov_base = &ts; msgvec[1].iov_len = sizeof(ts); msg.msg_iov = msgvec; msg.msg_iovlen = 2; if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0) return -1; return 0; } int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist) { int ret; // set up msghdr and iovecs struct msghdr msg; memset(&msg, 0, sizeof(msg)); msg.msg_iov = msgvec; int msglen = 0; // send tag char tag = CEPH_MSGR_TAG_MSG; msgvec[msg.msg_iovlen].iov_base = &tag; msgvec[msg.msg_iovlen].iov_len = 1; msglen++; msg.msg_iovlen++; // send envelope ceph_msg_header_old oldheader; if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { msgvec[msg.msg_iovlen].iov_base = (char*)&header; msgvec[msg.msg_iovlen].iov_len = sizeof(header); msglen += sizeof(header); msg.msg_iovlen++; } else { memcpy(&oldheader, &header, sizeof(header)); oldheader.src.name = header.src; oldheader.src.addr = connection_state->get_peer_addr(); oldheader.orig_src = oldheader.src; oldheader.reserved = header.reserved; if (msgr->crcflags & MSG_CRC_HEADER) { oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); } else { oldheader.crc = 0; } msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader; msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader); msglen += sizeof(oldheader); msg.msg_iovlen++; } // payload (front+data) list::const_iterator pb = blist.buffers().begin(); unsigned b_off = 0; // carry-over buffer offset, if any unsigned bl_pos = 0; // blist pos unsigned left = blist.length(); while (left > 0) { unsigned donow = MIN(left, pb->length()-b_off); if (donow == 0) { ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl; } assert(donow > 0); ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off << " leftinchunk " << left << " buffer len " << pb->length() << " writing " << donow << dendl; if (msg.msg_iovlen >= SM_IOV_MAX-2) { if (do_sendmsg(&msg, msglen, true)) goto fail; // and restart the iov msg.msg_iov = msgvec; msg.msg_iovlen = 0; msglen = 0; } msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); msgvec[msg.msg_iovlen].iov_len = donow; msglen += donow; msg.msg_iovlen++; assert(left >= donow); left -= donow; b_off += donow; bl_pos += donow; if (left == 0) break; while (b_off == pb->length()) { ++pb; b_off = 0; } } assert(left == 0); // send footer; if receiver doesn't support signatures, use the old footer format ceph_msg_footer_old old_footer; if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) { msgvec[msg.msg_iovlen].iov_base = (void*)&footer; msgvec[msg.msg_iovlen].iov_len = sizeof(footer); msglen += sizeof(footer); msg.msg_iovlen++; } else { if (msgr->crcflags & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; old_footer.middle_crc = footer.middle_crc; } else { old_footer.front_crc = old_footer.middle_crc = 0; } old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; old_footer.flags = footer.flags; msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer; msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer); msglen += sizeof(old_footer); msg.msg_iovlen++; } // send if (do_sendmsg(&msg, msglen)) goto fail; ret = 0; out: return ret; fail: ret = -1; goto out; } int Pipe::tcp_read(char *buf, unsigned len) { if (sd < 0) return -EINVAL; while (len > 0) { if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(msgr->cct, 0) << "injecting socket failure" << dendl; ::shutdown(sd, SHUT_RDWR); } } if (tcp_read_wait() < 0) return -1; ssize_t got = tcp_read_nonblocking(buf, len); if (got < 0) return -1; len -= got; buf += got; //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl; } return 0; } int Pipe::tcp_read_wait() { if (sd < 0) return -EINVAL; struct pollfd pfd; short evmask; pfd.fd = sd; pfd.events = POLLIN; #if defined(__linux__) pfd.events |= POLLRDHUP; #endif if (has_pending_data()) return 0; int r = poll(&pfd, 1, msgr->timeout); if (r < 0) return -errno; if (r == 0) return -EAGAIN; evmask = POLLERR | POLLHUP | POLLNVAL; #if defined(__linux__) evmask |= POLLRDHUP; #endif if (pfd.revents & evmask) return -1; if (!(pfd.revents & POLLIN)) return -1; return 0; } ssize_t Pipe::do_recv(char *buf, size_t len, int flags) { again: ssize_t got = ::recv( sd, buf, len, flags ); if (got < 0) { if (errno == EINTR) { goto again; } ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " << got << " " << cpp_strerror(errno) << dendl; return -1; } if (got == 0) { return -1; } return got; } ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags) { size_t left = len; ssize_t total_recv = 0; if (recv_len > recv_ofs) { int to_read = MIN(recv_len - recv_ofs, left); memcpy(buf, &recv_buf[recv_ofs], to_read); recv_ofs += to_read; left -= to_read; if (left == 0) { return to_read; } buf += to_read; total_recv += to_read; } /* nothing left in the prefetch buffer */ if (left > recv_max_prefetch) { /* this was a large read, we don't prefetch for these */ ssize_t ret = do_recv(buf, left, flags ); if (ret < 0) { if (total_recv > 0) return total_recv; return ret; } total_recv += ret; return total_recv; } ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags); if (got < 0) { if (total_recv > 0) return total_recv; return got; } recv_len = (size_t)got; got = MIN(left, (size_t)got); memcpy(buf, recv_buf, got); recv_ofs = got; total_recv += got; return total_recv; } ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len) { ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT ); if (got < 0) { ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned " << got << " " << cpp_strerror(errno) << dendl; return -1; } if (got == 0) { /* poll() said there was data, but we didn't read any - peer * sent a FIN. Maybe POLLRDHUP signals this, but this is * standard socket behavior as documented by Stevens. */ return -1; } return got; } int Pipe::tcp_write(const char *buf, unsigned len) { if (sd < 0) return -1; struct pollfd pfd; pfd.fd = sd; pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR; #if defined(__linux__) pfd.events |= POLLRDHUP; #endif if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(msgr->cct, 0) << "injecting socket failure" << dendl; ::shutdown(sd, SHUT_RDWR); } } if (poll(&pfd, 1, -1) < 0) return -1; if (!(pfd.revents & POLLOUT)) return -1; //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; assert(len > 0); while (len > 0) { MSGR_SIGPIPE_STOPPER; int did = ::send( sd, buf, len, MSG_NOSIGNAL ); if (did < 0) { //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl; return did; } len -= did; buf += did; //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl; } return 0; }