X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.cc;h=e7895ddca75ac321b90548a0edaf7990d04f0ecd;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/AsyncConnection.cc b/src/ceph/src/msg/async/AsyncConnection.cc new file mode 100644 index 0000000..e7895dd --- /dev/null +++ b/src/ceph/src/msg/async/AsyncConnection.cc @@ -0,0 +1,2546 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/Context.h" +#include "common/errno.h" +#include "AsyncMessenger.h" +#include "AsyncConnection.h" + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "common/EventTrace.h" + +// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR +#define SEQ_MASK 0x7fffffff + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix _conn_prefix(_dout) +ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { + return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this + << " :" << port + << " s=" << get_state_name(state) + << " pgs=" << peer_global_seq + << " cs=" << connect_seq + << " l=" << policy.lossy + << ")."; +} + +// Notes: +// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead + +const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; +const int ASYNC_COALESCE_THRESHOLD = 256; + +class C_time_wakeup : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {} + void do_request(int fd_or_id) override { + conn->wakeup_from(fd_or_id); + } +}; + +class C_handle_read : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_handle_read(AsyncConnectionRef c): conn(c) {} + void do_request(int fd_or_id) override { + conn->process(); + } +}; + +class C_handle_write : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_handle_write(AsyncConnectionRef c): conn(c) {} + void do_request(int fd) override { + conn->handle_write(); + } +}; + +class C_clean_handler : public EventCallback { + AsyncConnectionRef conn; + public: + explicit C_clean_handler(AsyncConnectionRef c): conn(c) {} + void do_request(int id) override { + conn->cleanup(); + delete this; + } +}; + +class C_tick_wakeup : public EventCallback { + AsyncConnectionRef conn; + + public: + explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} + void do_request(int fd_or_id) override { + conn->tick(fd_or_id); + } +}; + +static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) +{ + // create a buffer to read into that matches the data alignment + unsigned left = len; + if (off & ~CEPH_PAGE_MASK) { + // head + unsigned head = 0; + head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); + data.push_back(buffer::create(head)); + left -= head; + } + unsigned middle = left & CEPH_PAGE_MASK; + if (middle > 0) { + data.push_back(buffer::create_page_aligned(middle)); + left -= middle; + } + if (left) { + data.push_back(buffer::create(left)); + } +} + +AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, + Worker *w) + : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), + logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), + state(STATE_NONE), state_after_send(STATE_NONE), port(-1), + dispatch_queue(q), can_write(WriteStatus::NOWRITE), + keepalive(false), recv_buf(NULL), + recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), + recv_start(0), recv_end(0), + last_active(ceph::coarse_mono_clock::now()), + inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), + got_bad_auth(false), authorizer(NULL), replacing(false), + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), + worker(w), center(&w->center) +{ + read_handler = new C_handle_read(this); + write_handler = new C_handle_write(this); + wakeup_handler = new C_time_wakeup(this); + tick_handler = new C_tick_wakeup(this); + memset(msgvec, 0, sizeof(msgvec)); + // double recv_max_prefetch see "read_until" + recv_buf = new char[2*recv_max_prefetch]; + state_buffer = new char[4096]; + logger->inc(l_msgr_created_connections); +} + +AsyncConnection::~AsyncConnection() +{ + assert(out_q.empty()); + assert(sent.empty()); + delete authorizer; + if (recv_buf) + delete[] recv_buf; + if (state_buffer) + delete[] state_buffer; + assert(!delay_state); +} + +void AsyncConnection::maybe_start_delay_thread() +{ + if (!delay_state) { + auto pos = async_msgr->cct->_conf->get_val("ms_inject_delay_type").find(ceph_entity_type_name(peer_type)); + if (pos != string::npos) { + ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl; + delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id); + } + } +} + +/* return -1 means `fd` occurs error or closed, it should be closed + * return 0 means EAGAIN or EINTR */ +ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) +{ + ssize_t nread; + again: + nread = cs.read(buf, len); + if (nread < 0) { + if (nread == -EAGAIN) { + nread = 0; + } else if (nread == -EINTR) { + goto again; + } else { + ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() + << " : "<< strerror(nread) << dendl; + return -1; + } + } else if (nread == 0) { + ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " + << cs.fd() << dendl; + return -1; + } + return nread; +} + +// return the remaining bytes, it may larger than the length of ptr +// else return < 0 means error +ssize_t AsyncConnection::_try_send(bool more) +{ + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; + cs.shutdown(); + } + } + + assert(center->in_thread()); + ssize_t r = cs.send(outcoming_bl, more); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; + return r; + } + + ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r + << " remaining bytes " << outcoming_bl.length() << dendl; + + if (!open_write && is_queued()) { + center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); + open_write = true; + } + + if (open_write && !is_queued()) { + center->delete_file_event(cs.fd(), EVENT_WRITABLE); + open_write = false; + if (state_after_send != STATE_NONE) + center->dispatch_event_external(read_handler); + } + + return outcoming_bl.length(); +} + +// Because this func will be called multi times to populate +// the needed buffer, so the passed in bufferptr must be the same. +// Normally, only "read_message" will pass existing bufferptr in +// +// And it will uses readahead method to reduce small read overhead, +// "recv_buf" is used to store read buffer +// +// return the remaining bytes, 0 means this buffer is finished +// else return < 0 means error +ssize_t AsyncConnection::read_until(unsigned len, char *p) +{ + ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " + << state_offset << dendl; + + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; + cs.shutdown(); + } + } + + ssize_t r = 0; + uint64_t left = len - state_offset; + if (recv_end > recv_start) { + uint64_t to_read = MIN(recv_end - recv_start, left); + memcpy(p, recv_buf+recv_start, to_read); + recv_start += to_read; + left -= to_read; + ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer " + << " left is " << left << " buffer still has " + << recv_end - recv_start << dendl; + if (left == 0) { + return 0; + } + state_offset += to_read; + } + + recv_end = recv_start = 0; + /* nothing left in the prefetch buffer */ + if (len > recv_max_prefetch) { + /* this was a large read, we don't prefetch for these */ + do { + r = read_bulk(p+state_offset, left); + ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; + return -1; + } else if (r == static_cast(left)) { + state_offset = 0; + return 0; + } + state_offset += r; + left -= r; + } while (r > 0); + } else { + do { + r = read_bulk(recv_buf+recv_end, recv_max_prefetch); + ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end + << " left is " << left << " got " << r << dendl; + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; + return -1; + } + recv_end += r; + if (r >= static_cast(left)) { + recv_start = len - state_offset; + memcpy(p+state_offset, recv_buf, recv_start); + state_offset = 0; + return 0; + } + left -= r; + } while (r > 0); + memcpy(p+state_offset, recv_buf, recv_end-recv_start); + state_offset += (recv_end - recv_start); + recv_end = recv_start = 0; + } + ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining " + << len - state_offset << " bytes" << dendl; + return len - state_offset; +} + +void AsyncConnection::inject_delay() { + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(async_msgr->cct, 10) << __func__ << " sleep for " << + async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } +} + +void AsyncConnection::process() +{ + ssize_t r = 0; + int prev_state = state; +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + utime_t ltt_recv_stamp = ceph_clock_now(); +#endif + bool need_dispatch_writer = false; + std::lock_guard l(lock); + last_active = ceph::coarse_mono_clock::now(); + auto recv_start_time = ceph::mono_clock::now(); + do { + ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; + prev_state = state; + switch (state) { + case STATE_OPEN: + { + char tag = -1; + r = read_until(sizeof(tag), &tag); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (tag == CEPH_MSGR_TAG_KEEPALIVE) { + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; + set_last_keepalive(ceph_clock_now()); + } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { + state = STATE_OPEN_KEEPALIVE2; + } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + state = STATE_OPEN_KEEPALIVE2_ACK; + } else if (tag == CEPH_MSGR_TAG_ACK) { + state = STATE_OPEN_TAG_ACK; + } else if (tag == CEPH_MSGR_TAG_MSG) { + state = STATE_OPEN_MESSAGE_HEADER; + } else if (tag == CEPH_MSGR_TAG_CLOSE) { + state = STATE_OPEN_TAG_CLOSE; + } else { + ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl; + goto fail; + } + + break; + } + + case STATE_OPEN_KEEPALIVE2: + { + ceph_timespec *t; + r = read_until(sizeof(*t), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; + t = (ceph_timespec*)state_buffer; + utime_t kp_t = utime_t(*t); + write_lock.lock(); + _append_keepalive_or_ack(true, &kp_t); + write_lock.unlock(); + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; + set_last_keepalive(ceph_clock_now()); + need_dispatch_writer = true; + state = STATE_OPEN; + break; + } + + case STATE_OPEN_KEEPALIVE2_ACK: + { + ceph_timespec *t; + r = read_until(sizeof(*t), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + t = (ceph_timespec*)state_buffer; + set_last_keepalive_ack(utime_t(*t)); + ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; + state = STATE_OPEN; + break; + } + + case STATE_OPEN_TAG_ACK: + { + ceph_le64 *seq; + r = read_until(sizeof(*seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + seq = (ceph_le64*)state_buffer; + ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl; + handle_ack(*seq); + state = STATE_OPEN; + break; + } + + case STATE_OPEN_MESSAGE_HEADER: + { +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + ltt_recv_stamp = ceph_clock_now(); +#endif + recv_stamp = ceph_clock_now(); + ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl; + ceph_msg_header header; + ceph_msg_header_old oldheader; + __u32 header_crc = 0; + unsigned len; + if (has_feature(CEPH_FEATURE_NOSRCADDR)) + len = sizeof(header); + else + len = sizeof(oldheader); + + r = read_until(len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl; + + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + header = *((ceph_msg_header*)state_buffer); + if (msgr->crcflags & MSG_CRC_HEADER) + header_crc = ceph_crc32c(0, (unsigned char *)&header, + sizeof(header) - sizeof(header.crc)); + } else { + oldheader = *((ceph_msg_header_old*)state_buffer); + // this is fugly + memcpy(&header, &oldheader, sizeof(header)); + header.src = oldheader.src.name; + header.reserved = oldheader.reserved; + if (msgr->crcflags & MSG_CRC_HEADER) { + header.crc = oldheader.crc; + header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + } + } + + ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type + << " src " << entity_name_t(header.src) + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off << dendl; + + // verify header crc + if (msgr->crcflags & MSG_CRC_HEADER && header_crc != header.crc) { + ldout(async_msgr->cct,0) << __func__ << " got bad header crc " + << header_crc << " != " << header.crc << dendl; + goto fail; + } + + // Reset state + data_buf.clear(); + front.clear(); + middle.clear(); + data.clear(); + current_header = header; + state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE; + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE: + { + if (policy.throttler_messages) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + if (!policy.throttler_messages->get_or_fail()) { + ldout(async_msgr->cct, 10) << __func__ << " wants 1 message from policy throttle " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); + break; + } + } + + state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_BYTES: + { + cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len; + if (cur_msg_size) { + if (policy.throttler_bytes) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); + break; + } + } + } + + state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE; + break; + } + + case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE: + { + if (cur_msg_size) { + if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) { + ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle " + << dispatch_queue->dispatch_throttler.get_current() << "/" + << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (register_time_events.empty()) + register_time_events.insert(center->create_time_event(1000, wakeup_handler)); + break; + } + } + + throttle_stamp = ceph_clock_now(); + state = STATE_OPEN_MESSAGE_READ_FRONT; + break; + } + + case STATE_OPEN_MESSAGE_READ_FRONT: + { + // read front + unsigned front_len = current_header.front_len; + if (front_len) { + if (!front.length()) + front.push_back(buffer::create(front_len)); + + r = read_until(front_len, front.c_str()); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl; + } + state = STATE_OPEN_MESSAGE_READ_MIDDLE; + } + + case STATE_OPEN_MESSAGE_READ_MIDDLE: + { + // read middle + unsigned middle_len = current_header.middle_len; + if (middle_len) { + if (!middle.length()) + middle.push_back(buffer::create(middle_len)); + + r = read_until(middle_len, middle.c_str()); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl; + } + + state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE; + } + + case STATE_OPEN_MESSAGE_READ_DATA_PREPARE: + { + // read data + unsigned data_len = le32_to_cpu(current_header.data_len); + unsigned data_off = le32_to_cpu(current_header.data_off); + if (data_len) { + // get a buffer + map >::iterator p = rx_buffers.find(current_header.tid); + if (p != rx_buffers.end()) { + ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second + << " at offset " << data_off + << " len " << p->second.first.length() << dendl; + data_buf = p->second.first; + // make sure it's big enough + if (data_buf.length() < data_len) + data_buf.push_back(buffer::create(data_len - data_buf.length())); + data_blp = data_buf.begin(); + } else { + ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl; + alloc_aligned_buffer(data_buf, data_len, data_off); + data_blp = data_buf.begin(); + } + } + + msg_left = data_len; + state = STATE_OPEN_MESSAGE_READ_DATA; + } + + case STATE_OPEN_MESSAGE_READ_DATA: + { + while (msg_left > 0) { + bufferptr bp = data_blp.get_current_ptr(); + unsigned read = MIN(bp.length(), msg_left); + r = read_until(read, bp.c_str()); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl; + goto fail; + } else if (r > 0) { + break; + } + + data_blp.advance(read); + data.append(bp, 0, read); + msg_left -= read; + } + + if (msg_left > 0) + break; + + state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH; + } + + case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH: + { + ceph_msg_footer footer; + ceph_msg_footer_old old_footer; + unsigned len; + // footer + if (has_feature(CEPH_FEATURE_MSG_AUTH)) + len = sizeof(footer); + else + len = sizeof(old_footer); + + r = read_until(len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + footer = *((ceph_msg_footer*)state_buffer); + } else { + old_footer = *((ceph_msg_footer_old*)state_buffer); + footer.front_crc = old_footer.front_crc; + footer.middle_crc = old_footer.middle_crc; + footer.data_crc = old_footer.data_crc; + footer.sig = 0; + footer.flags = old_footer.flags; + } + int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; + ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl; + if (aborted) { + ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length() + << " byte message.. ABORTED" << dendl; + goto fail; + } + + ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length() + << " + " << data.length() << " byte message" << dendl; + Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer, + front, middle, data, this); + if (!message) { + ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl; + goto fail; + } + + // + // Check the signature if one should be present. A zero return indicates success. PLR + // + + if (session_security.get() == NULL) { + ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl; + } else { + if (session_security->check_message_signature(message)) { + ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl; + message->put(); + goto fail; + } + } + message->set_byte_throttler(policy.throttler_bytes); + message->set_message_throttler(policy.throttler_messages); + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(cur_msg_size); + + message->set_recv_stamp(recv_stamp); + message->set_throttle_stamp(throttle_stamp); + message->set_recv_complete_stamp(ceph_clock_now()); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the client + // side queueing because messages can't be renumbered, but the (kernel) client will + // occasionally pull a message out of the sent queue to send elsewhere. in that case + // it doesn't matter if we "got" it or not. + uint64_t cur_seq = in_seq; + if (message->get_seq() <= cur_seq) { + ldout(async_msgr->cct,0) << __func__ << " got old message " + << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message + << ", discarding" << dendl; + message->put(); + if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) + assert(0 == "old msgs despite reconnect_seq feature"); + break; + } + if (message->get_seq() > cur_seq + 1) { + ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq " + << cur_seq << " to " << message->get_seq() << dendl; + if (async_msgr->cct->_conf->ms_die_on_skipped_message) + assert(0 == "skipped incoming seq"); + } + + message->set_connection(this); + +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) { + utime_t ltt_processed_stamp = ceph_clock_now(); + double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000; + ostringstream buf; + if (message->get_type() == CEPH_MSG_OSD_OP) + OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false); + else + OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false); + } +#endif + + // note last received message. + in_seq = message->get_seq(); + ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq " + << message->get_seq() << " " << message + << " " << *message << dendl; + + if (!policy.lossy) { + ack_left++; + need_dispatch_writer = true; + } + state = STATE_OPEN; + + logger->inc(l_msgr_recv_messages); + logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); + + async_msgr->ms_fast_preprocess(message); + auto fast_dispatch_time = ceph::mono_clock::now(); + logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time); + if (delay_state) { + utime_t release = message->get_recv_stamp(); + double delay_period = 0; + if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + release += delay_period; + ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " + << message << " " << *message << dendl; + } + delay_state->queue(delay_period, release, message); + } else if (async_msgr->ms_can_fast_dispatch(message)) { + lock.unlock(); + dispatch_queue->fast_dispatch(message); + recv_start_time = ceph::mono_clock::now(); + logger->tinc(l_msgr_running_fast_dispatch_time, + recv_start_time - fast_dispatch_time); + lock.lock(); + } else { + dispatch_queue->enqueue(message, message->get_priority(), conn_id); + } + + break; + } + + case STATE_OPEN_TAG_CLOSE: + { + ldout(async_msgr->cct, 20) << __func__ << " got CLOSE" << dendl; + _stop(); + return ; + } + + case STATE_STANDBY: + { + ldout(async_msgr->cct, 20) << __func__ << " enter STANDY" << dendl; + + break; + } + + case STATE_NONE: + { + ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl; + break; + } + + case STATE_CLOSED: + { + ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; + break; + } + + case STATE_WAIT: + { + ldout(async_msgr->cct, 1) << __func__ << " enter wait state, failing" << dendl; + goto fail; + } + + default: + { + if (_process_connection() < 0) + goto fail; + break; + } + } + } while (prev_state != state); + + if (need_dispatch_writer && is_connected()) + center->dispatch_event_external(write_handler); + + logger->tinc(l_msgr_running_recv_time, ceph::mono_clock::now() - recv_start_time); + return; + + fail: + fault(); +} + +ssize_t AsyncConnection::_process_connection() +{ + ssize_t r = 0; + + switch(state) { + case STATE_WAIT_SEND: + { + std::lock_guard l(write_lock); + if (!outcoming_bl.length()) { + assert(state_after_send); + state = state_after_send; + state_after_send = STATE_NONE; + } + break; + } + + case STATE_CONNECTING: + { + assert(!policy.server); + + // reset connect state variables + got_bad_auth = false; + delete authorizer; + authorizer = NULL; + authorizer_buf.clear(); + memset(&connect_msg, 0, sizeof(connect_msg)); + memset(&connect_reply, 0, sizeof(connect_reply)); + + global_seq = async_msgr->get_global_seq(); + // close old socket. this is safe because we stopped the reader thread above. + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.close(); + } + + SocketOptions opts; + opts.priority = async_msgr->get_socket_priority(); + opts.connect_bind_addr = msgr->get_myaddr(); + r = worker->connect(get_peer_addr(), opts, &cs); + if (r < 0) + goto fail; + + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); + state = STATE_CONNECTING_RE; + break; + } + + case STATE_CONNECTING_RE: + { + r = cs.is_connected(); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; + if (r == -ECONNREFUSED) { + ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl; + dispatch_queue->queue_refused(this); + } + goto fail; + } else if (r == 0) { + ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl; + if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) + center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler); + break; + } + + center->delete_file_event(cs.fd(), EVENT_WRITABLE); + ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; + + bufferlist bl; + bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); + r = try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY; + ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: " + << get_peer_addr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY; + ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: " + << get_peer_addr() << dendl; + } else { + goto fail; + } + + break; + } + + case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY: + { + entity_addr_t paddr, peer_addr_for_me; + bufferlist myaddrbl; + unsigned banner_len = strlen(CEPH_BANNER); + unsigned need_len = banner_len + sizeof(ceph_entity_addr)*2; + r = read_until(need_len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read banner and identify addresses failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (memcmp(state_buffer, CEPH_BANNER, banner_len)) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer " + << get_peer_addr() << dendl; + goto fail; + } + + bufferlist bl; + bl.append(state_buffer+banner_len, sizeof(ceph_entity_addr)*2); + bufferlist::iterator p = bl.begin(); + try { + ::decode(paddr, p); + ::decode(peer_addr_for_me, p); + } catch (const buffer::error& e) { + lderr(async_msgr->cct) << __func__ << " decode peer addr failed " << dendl; + goto fail; + } + ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " + << paddr << " on socket " << cs.fd() << dendl; + if (peer_addr != paddr) { + if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && + peer_addr.get_nonce() == paddr.get_nonce()) { + ldout(async_msgr->cct, 0) << __func__ << " connect claims to be " << paddr + << " not " << peer_addr + << " - presumably this is the same node!" << dendl; + } else { + ldout(async_msgr->cct, 10) << __func__ << " connect claims to be " + << paddr << " not " << peer_addr << dendl; + goto fail; + } + } + + ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; + lock.unlock(); + async_msgr->learned_addr(peer_addr_for_me); + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + } + + lock.lock(); + if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) { + ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or " + << " replacing must be happened just now" << dendl; + return 0; + } + + ::encode(async_msgr->get_myaddr(), myaddrbl, 0); // legacy + r = try_send(myaddrbl); + if (r == 0) { + state = STATE_CONNECTING_SEND_CONNECT_MSG; + ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr " + << async_msgr->get_myaddr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG; + ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: " + << async_msgr->get_myaddr() << dendl; + } else { + ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " + << cpp_strerror(r) << dendl; + goto fail; + } + + break; + } + + case STATE_CONNECTING_SEND_CONNECT_MSG: + { + if (!got_bad_auth) { + delete authorizer; + authorizer = async_msgr->get_authorizer(peer_type, false); + } + bufferlist bl; + + connect_msg.features = policy.features_supported; + connect_msg.host_type = async_msgr->get_myinst().name.type(); + connect_msg.global_seq = global_seq; + connect_msg.connect_seq = connect_seq; + connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true); + connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0; + connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0; + if (authorizer) + ldout(async_msgr->cct, 10) << __func__ << " connect_msg.authorizer_len=" + << connect_msg.authorizer_len << " protocol=" + << connect_msg.authorizer_protocol << dendl; + connect_msg.flags = 0; + if (policy.lossy) + connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! + bl.append((char*)&connect_msg, sizeof(connect_msg)); + if (authorizer) { + bl.append(authorizer->bl.c_str(), authorizer->bl.length()); + } + ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq=" + << connect_seq << " proto=" << connect_msg.protocol_version << dendl; + + r = try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_WAIT_CONNECT_REPLY; + ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY; + ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; + } else { + ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " + << cpp_strerror(r) << dendl; + goto fail; + } + + break; + } + + case STATE_CONNECTING_WAIT_CONNECT_REPLY: + { + r = read_until(sizeof(connect_reply), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + connect_reply = *((ceph_msg_connect_reply*)state_buffer); + + ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag + << " connect_seq " << connect_reply.connect_seq << " global_seq " + << connect_reply.global_seq << " proto " << connect_reply.protocol_version + << " flags " << (int)connect_reply.flags << " features " + << connect_reply.features << dendl; + state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH; + + break; + } + + case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH: + { + bufferlist authorizer_reply; + if (connect_reply.authorizer_len) { + ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl; + assert(connect_reply.authorizer_len < 4096); + r = read_until(connect_reply.authorizer_len, state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + authorizer_reply.append(state_buffer, connect_reply.authorizer_len); + bufferlist::iterator iter = authorizer_reply.begin(); + if (authorizer && !authorizer->verify_reply(iter)) { + ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl; + goto fail; + } + } + r = handle_connect_reply(connect_msg, connect_reply); + if (r < 0) + goto fail; + + // state must be changed! + assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH); + break; + } + + case STATE_CONNECTING_WAIT_ACK_SEQ: + { + uint64_t newly_acked_seq = 0; + + r = read_until(sizeof(newly_acked_seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + newly_acked_seq = *((uint64_t*)state_buffer); + ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; + discard_requeued_up_to(newly_acked_seq); + //while (newly_acked_seq > out_seq.read()) { + // Message *m = _get_next_outgoing(NULL); + // assert(m); + // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() + // << " " << *m << dendl; + // assert(m->get_seq() <= newly_acked_seq); + // m->put(); + // out_seq.inc(); + //} + + bufferlist bl; + uint64_t s = in_seq; + bl.append((char*)&s, sizeof(s)); + r = try_send(bl); + if (r == 0) { + state = STATE_CONNECTING_READY; + ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl; + } else if (r > 0) { + state_after_send = STATE_CONNECTING_READY; + state = STATE_WAIT_SEND; + ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl; + } else { + goto fail; + } + break; + } + + case STATE_CONNECTING_READY: + { + // hooray! + peer_global_seq = connect_reply.global_seq; + policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY; + state = STATE_OPEN; + once_ready = true; + connect_seq += 1; + assert(connect_seq == connect_reply.connect_seq); + backoff = utime_t(); + set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features); + ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq + << ", lossy = " << policy.lossy << ", features " + << get_features() << dendl; + + // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the + // connection. PLR + if (authorizer != NULL) { + session_security.reset( + get_auth_session_handler(async_msgr->cct, + authorizer->protocol, + authorizer->session_key, + get_features())); + } else { + // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection. PLR + session_security.reset(); + } + + if (delay_state) + assert(delay_state->ready()); + dispatch_queue->queue_connect(this); + async_msgr->ms_deliver_handle_fast_connect(this); + + // make sure no pending tick timer + if (last_tick_id) + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + + // message may in queue between last _try_send and connection ready + // write event may already notify and we need to force scheduler again + write_lock.lock(); + can_write = WriteStatus::CANWRITE; + if (is_queued()) + center->dispatch_event_external(write_handler); + write_lock.unlock(); + maybe_start_delay_thread(); + break; + } + + case STATE_ACCEPTING: + { + bufferlist bl; + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); + + bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); + + ::encode(async_msgr->get_myaddr(), bl, 0); // legacy + port = async_msgr->get_myaddr().get_port(); + ::encode(socket_addr, bl, 0); // legacy + ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl; + + r = try_send(bl); + if (r == 0) { + state = STATE_ACCEPTING_WAIT_BANNER_ADDR; + ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: " + << get_peer_addr() << dendl; + } else if (r > 0) { + state = STATE_WAIT_SEND; + state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR; + ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: " + << get_peer_addr() << dendl; + } else { + goto fail; + } + + break; + } + case STATE_ACCEPTING_WAIT_BANNER_ADDR: + { + bufferlist addr_bl; + entity_addr_t peer_addr; + + r = read_until(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) { + ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer + << "' (should be '" << CEPH_BANNER << "')" << dendl; + goto fail; + } + + addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(ceph_entity_addr)); + { + bufferlist::iterator ti = addr_bl.begin(); + ::decode(peer_addr, ti); + } + + ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl; + if (peer_addr.is_blank_ip()) { + // peer apparently doesn't know what ip they have; figure it out for them. + int port = peer_addr.get_port(); + peer_addr.u = socket_addr.u; + peer_addr.set_port(port); + ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr + << " (socket is " << socket_addr << ")" << dendl; + } + set_peer_addr(peer_addr); // so that connection_state gets set up + state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + break; + } + + case STATE_ACCEPTING_WAIT_CONNECT_MSG: + { + r = read_until(sizeof(connect_msg), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + + connect_msg = *((ceph_msg_connect*)state_buffer); + state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH; + break; + } + + case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH: + { + bufferlist authorizer_reply; + + if (connect_msg.authorizer_len) { + if (!authorizer_buf.length()) + authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len)); + + r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str()); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl; + goto fail; + } else if (r > 0) { + break; + } + } + + ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq " + << connect_msg.connect_seq << " global_seq " + << connect_msg.global_seq << dendl; + set_peer_type(connect_msg.host_type); + policy = async_msgr->get_policy(connect_msg.host_type); + ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type + << ", policy.lossy=" << policy.lossy << " policy.server=" + << policy.server << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck << dendl; + + r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply); + if (r < 0) + goto fail; + + // state is changed by "handle_connect_msg" + assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH); + break; + } + + case STATE_ACCEPTING_WAIT_SEQ: + { + uint64_t newly_acked_seq; + r = read_until(sizeof(newly_acked_seq), state_buffer); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl; + goto fail_registered; + } else if (r > 0) { + break; + } + + newly_acked_seq = *((uint64_t*)state_buffer); + ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl; + discard_requeued_up_to(newly_acked_seq); + state = STATE_ACCEPTING_READY; + break; + } + + case STATE_ACCEPTING_READY: + { + ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; + state = STATE_OPEN; + memset(&connect_msg, 0, sizeof(connect_msg)); + + if (delay_state) + assert(delay_state->ready()); + // make sure no pending tick timer + if (last_tick_id) + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + + write_lock.lock(); + can_write = WriteStatus::CANWRITE; + if (is_queued()) + center->dispatch_event_external(write_handler); + write_lock.unlock(); + maybe_start_delay_thread(); + break; + } + + default: + { + lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl; + ceph_abort(); + } + } + + return 0; + +fail_registered: + ldout(async_msgr->cct, 10) << "accept fault after register" << dendl; + inject_delay(); + +fail: + return -1; +} + +int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply) +{ + uint64_t feat_missing; + if (reply.tag == CEPH_MSGR_TAG_FEATURES) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my " + << std::hex << connect.features << " < peer " + << reply.features << " missing " + << (reply.features & ~policy.features_supported) + << std::dec << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { + ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my " + << connect.protocol_version << " != " << reply.protocol_version + << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { + ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl; + if (got_bad_auth) + goto fail; + got_bad_auth = true; + delete authorizer; + authorizer = async_msgr->get_authorizer(peer_type, true); // try harder + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { + ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl; + was_session_reset(); + // see was_session_reset + outcoming_bl.clear(); + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + global_seq = async_msgr->get_global_seq(reply.global_seq); + ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_GLOBAL " + << reply.global_seq << " chose new " + << global_seq << dendl; + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { + assert(reply.connect_seq > connect_seq); + ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION " + << connect_seq << " -> " + << reply.connect_seq << dendl; + connect_seq = reply.connect_seq; + state = STATE_CONNECTING_SEND_CONNECT_MSG; + } + if (reply.tag == CEPH_MSGR_TAG_WAIT) { + ldout(async_msgr->cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl; + state = STATE_WAIT; + } + + feat_missing = policy.features_required & ~(uint64_t)connect_reply.features; + if (feat_missing) { + ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex + << feat_missing << std::dec << dendl; + goto fail; + } + + if (reply.tag == CEPH_MSGR_TAG_SEQ) { + ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; + state = STATE_CONNECTING_WAIT_ACK_SEQ; + } + if (reply.tag == CEPH_MSGR_TAG_READY) { + ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl; + state = STATE_CONNECTING_READY; + } + + return 0; + + fail: + return -1; +} + +ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl, + bufferlist &authorizer_reply) +{ + ssize_t r = 0; + ceph_msg_connect_reply reply; + bufferlist reply_bl; + + memset(&reply, 0, sizeof(reply)); + reply.protocol_version = async_msgr->get_proto_version(peer_type, false); + + // mismatch? + ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version + << ", their proto " << connect.protocol_version << dendl; + if (connect.protocol_version != reply.protocol_version) { + return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply); + } + // require signatures for cephx? + if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) { + if (peer_type == CEPH_ENTITY_TYPE_OSD || + peer_type == CEPH_ENTITY_TYPE_MDS) { + if (async_msgr->cct->_conf->cephx_require_signatures || + async_msgr->cct->_conf->cephx_cluster_require_signatures) { + ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + } else { + if (async_msgr->cct->_conf->cephx_require_signatures || + async_msgr->cct->_conf->cephx_service_require_signatures) { + ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl; + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + } + } + uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features; + if (feat_missing) { + ldout(async_msgr->cct, 1) << __func__ << " peer missing required features " + << std::hex << feat_missing << std::dec << dendl; + return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); + } + + lock.unlock(); + + bool authorizer_valid; + if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, + authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { + lock.lock(); + ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; + session_security.reset(); + return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply); + } + + // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR + ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl; + + // existing? + AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); + + inject_delay(); + + lock.lock(); + if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; + assert(state == STATE_CLOSED); + goto fail; + } + + if (existing == this) + existing = NULL; + if (existing) { + // There is no possible that existing connection will acquire this + // connection's lock + existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here) + + if (existing->state == STATE_CLOSED) { + ldout(async_msgr->cct, 1) << __func__ << " existing already closed." << dendl; + existing->lock.unlock(); + existing = NULL; + goto open; + } + + if (existing->replacing) { + ldout(async_msgr->cct, 1) << __func__ << " existing racing replace happened while replacing." + << " existing_state=" << get_state_name(existing->state) << dendl; + reply.global_seq = existing->peer_global_seq; + r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); + existing->lock.unlock(); + if (r < 0) + goto fail; + return 0; + } + + if (connect.global_seq < existing->peer_global_seq) { + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing + << ".gseq " << existing->peer_global_seq << " > " + << connect.global_seq << ", RETRY_GLOBAL" << dendl; + reply.global_seq = existing->peer_global_seq; // so we can send it below.. + existing->lock.unlock(); + return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); + } else { + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing + << ".gseq " << existing->peer_global_seq + << " <= " << connect.global_seq << ", looks ok" << dendl; + } + + if (existing->policy.lossy) { + ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy=" + << policy.lossy << ")" << dendl; + existing->was_session_reset(); + goto replace; + } + + ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq + << " vs existing csq=" << existing->connect_seq << " existing_state=" + << get_state_name(existing->state) << dendl; + + if (connect.connect_seq == 0 && existing->connect_seq > 0) { + ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl; + // this is a hard reset from peer + is_reset_from_peer = true; + if (policy.resetcheck) + existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s + goto replace; + } + + if (connect.connect_seq < existing->connect_seq) { + // old attempt, or we sent READY but they didn't get it. + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq " + << existing->connect_seq << " > " << connect.connect_seq + << ", RETRY_SESSION" << dendl; + reply.connect_seq = existing->connect_seq + 1; + existing->lock.unlock(); + return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + } + + if (connect.connect_seq == existing->connect_seq) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->state == STATE_OPEN || + existing->state == STATE_STANDBY) { + ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing + << ".cseq " << existing->connect_seq << " == " + << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; + reply.connect_seq = existing->connect_seq + 1; + existing->lock.unlock(); + return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); + } + + // connection race? + if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) { + // incoming wins + ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing + << ".cseq " << existing->connect_seq << " == " << connect.connect_seq + << ", or we are server, replacing my attempt" << dendl; + goto replace; + } else { + // our existing outgoing wins + ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing " + << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", sending WAIT" << dendl; + assert(peer_addr > async_msgr->get_myaddr()); + existing->lock.unlock(); + return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); + } + } + + assert(connect.connect_seq > existing->connect_seq); + assert(connect.global_seq >= existing->peer_global_seq); + if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other + existing->connect_seq == 0) { + ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " + << connect.connect_seq << ", " << existing << ".cseq = " + << existing->connect_seq << "), sending RESETSESSION" << dendl; + existing->lock.unlock(); + return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); + } + + // reconnect + ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq + << " > " << existing->connect_seq << dendl; + goto replace; + } // existing + else if (!replacing && connect.connect_seq > 0) { + // we reset, and they are opening a new session + ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " + << connect.connect_seq << "), sending RESETSESSION" << dendl; + return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); + } else { + // new session + ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl; + existing = NULL; + goto open; + } + ceph_abort(); + + replace: + ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; + + inject_delay(); + if (existing->policy.lossy) { + // disconnect from the Connection + ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl; + existing->_stop(); + existing->dispatch_queue->queue_reset(existing.get()); + } else { + assert(can_write == WriteStatus::NOWRITE); + existing->write_lock.lock(); + + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + if (is_reset_from_peer) { + existing->is_reset_from_peer = true; + } + + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + + if (existing->delay_state) { + existing->delay_state->flush(); + assert(!delay_state); + } + existing->reset_recv_state(); + + auto temp_cs = std::move(cs); + EventCenter *new_center = center; + Worker *new_worker = worker; + // avoid _stop shutdown replacing socket + // queue a reset on the new connection, which we're dumping for the old + _stop(); + + dispatch_queue->queue_reset(this); + ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; + existing->can_write = WriteStatus::REPLACING; + existing->replacing = true; + existing->state_offset = 0; + // avoid previous thread modify event + existing->state = STATE_NONE; + // Discard existing prefetch buffer in `recv_buf` + existing->recv_start = existing->recv_end = 0; + // there shouldn't exist any buffer + assert(recv_start == recv_end); + + auto deactivate_existing = std::bind( + [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable { + // we need to delete time event in original thread + { + std::lock_guard l(existing->lock); + existing->write_lock.lock(); + existing->requeue_sent(); + existing->outcoming_bl.clear(); + existing->open_write = false; + existing->write_lock.unlock(); + if (existing->state == STATE_NONE) { + existing->shutdown_socket(); + existing->cs = std::move(cs); + existing->worker->references--; + new_worker->references++; + existing->logger = new_worker->get_perf_counter(); + existing->worker = new_worker; + existing->center = new_center; + if (existing->delay_state) + existing->delay_state->set_center(new_center); + } else if (existing->state == STATE_CLOSED) { + auto back_to_close = std::bind( + [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs)); + new_center->submit_to( + new_center->get_id(), std::move(back_to_close), true); + return ; + } else { + ceph_abort(); + } + } + + // Before changing existing->center, it may already exists some events in existing->center's queue. + // Then if we mark down `existing`, it will execute in another thread and clean up connection. + // Previous event will result in segment fault + auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable { + std::lock_guard l(existing->lock); + if (existing->state == STATE_CLOSED) + return ; + assert(existing->state == STATE_NONE); + + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); + reply.global_seq = existing->peer_global_seq; + if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { + // handle error + existing->fault(); + } + }; + if (existing->center->in_thread()) + transfer_existing(); + else + existing->center->submit_to( + existing->center->get_id(), std::move(transfer_existing), true); + }, std::move(temp_cs)); + + existing->center->submit_to( + existing->center->get_id(), std::move(deactivate_existing), true); + existing->write_lock.unlock(); + existing->lock.unlock(); + return 0; + } + existing->lock.unlock(); + + open: + connect_seq = connect.connect_seq + 1; + peer_global_seq = connect.global_seq; + ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " + << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl; + + int next_state; + + // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence + if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { + reply.tag = CEPH_MSGR_TAG_SEQ; + next_state = STATE_ACCEPTING_WAIT_SEQ; + } else { + reply.tag = CEPH_MSGR_TAG_READY; + next_state = STATE_ACCEPTING_READY; + discard_requeued_up_to(0); + is_reset_from_peer = false; + in_seq = 0; + } + + // send READY reply + reply.features = policy.features_supported; + reply.global_seq = async_msgr->get_global_seq(); + reply.connect_seq = connect_seq; + reply.flags = 0; + reply.authorizer_len = authorizer_reply.length(); + if (policy.lossy) + reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; + + set_features((uint64_t)reply.features & (uint64_t)connect.features); + ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl; + + session_security.reset( + get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol, + session_key, get_features())); + + reply_bl.append((char*)&reply, sizeof(reply)); + + if (reply.authorizer_len) + reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); + + if (reply.tag == CEPH_MSGR_TAG_SEQ) { + uint64_t s = in_seq; + reply_bl.append((char*)&s, sizeof(s)); + } + + lock.unlock(); + // Because "replacing" will prevent other connections preempt this addr, + // it's safe that here we don't acquire Connection's lock + r = async_msgr->accept_conn(this); + + inject_delay(); + + lock.lock(); + replacing = false; + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr + << " just fail later one(this)" << dendl; + goto fail_registered; + } + if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; + assert(state == STATE_CLOSED); + goto fail_registered; + } + + r = try_send(reply_bl); + if (r < 0) + goto fail_registered; + + // notify + dispatch_queue->queue_accept(this); + async_msgr->ms_deliver_handle_fast_accept(this); + once_ready = true; + + if (r == 0) { + state = next_state; + ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl; + } else { + state = STATE_WAIT_SEND; + state_after_send = next_state; + } + + return 0; + + fail_registered: + ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl; + inject_delay(); + + fail: + ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl; + return -1; +} + +void AsyncConnection::_connect() +{ + ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl; + + state = STATE_CONNECTING; + // rescheduler connection in order to avoid lock dep + // may called by external thread(send_message) + center->dispatch_event_external(read_handler); +} + +void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) +{ + ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; + assert(socket.fd() >= 0); + + std::lock_guard l(lock); + cs = std::move(socket); + socket_addr = addr; + state = STATE_ACCEPTING; + // rescheduler connection in order to avoid lock dep + center->dispatch_event_external(read_handler); +} + +int AsyncConnection::send_message(Message *m) +{ + FUNCTRACE(); + lgeneric_subdout(async_msgr->cct, ms, + 1) << "-- " << async_msgr->get_myaddr() << " --> " + << get_peer_addr() << " -- " + << *m << " -- " << m << " con " + << m->get_connection().get() + << dendl; + + // optimistic think it's ok to encode(actually may broken now) + if (!m->get_priority()) + m->set_priority(async_msgr->get_default_send_priority()); + + m->get_header().src = async_msgr->get_myname(); + m->set_connection(this); + + if (m->get_type() == CEPH_MSG_OSD_OP) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true); + else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true); + + if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection + ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; + std::lock_guard l(write_lock); + if (can_write != WriteStatus::CLOSED) { + dispatch_queue->local_delivery(m, m->get_priority()); + } else { + ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." + << " Drop message " << m << dendl; + m->put(); + } + return 0; + } + + last_active = ceph::coarse_mono_clock::now(); + // we don't want to consider local message here, it's too lightweight which + // may disturb users + logger->inc(l_msgr_send_messages); + + bufferlist bl; + uint64_t f = get_features(); + + // TODO: Currently not all messages supports reencode like MOSDMap, so here + // only let fast dispatch support messages prepare message + bool can_fast_prepare = async_msgr->ms_can_fast_dispatch(m); + if (can_fast_prepare) + prepare_send_message(f, m, bl); + + std::lock_guard l(write_lock); + // "features" changes will change the payload encoding + if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) { + // ensure the correctness of message encoding + bl.clear(); + m->get_payload().clear(); + ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous " + << f << " != " << get_features() << dendl; + } + if (can_write == WriteStatus::CLOSED) { + ldout(async_msgr->cct, 10) << __func__ << " connection closed." + << " Drop message " << m << dendl; + m->put(); + } else { + m->trace.event("async enqueueing message"); + out_q[m->get_priority()].emplace_back(std::move(bl), m); + ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; + if (can_write != WriteStatus::REPLACING) + center->dispatch_event_external(write_handler); + } + return 0; +} + +void AsyncConnection::requeue_sent() +{ + if (sent.empty()) + return; + + list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!sent.empty()) { + Message* m = sent.back(); + sent.pop_back(); + ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend " + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(make_pair(bufferlist(), m)); + out_seq--; + } +} + +void AsyncConnection::discard_requeued_up_to(uint64_t seq) +{ + ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; + std::lock_guard l(write_lock); + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) + return; + list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + while (!rq.empty()) { + pair p = rq.front(); + if (p.second->get_seq() == 0 || p.second->get_seq() > seq) + break; + ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq() + << " <= " << seq << ", discarding" << dendl; + p.second->put(); + rq.pop_front(); + out_seq++; + } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); +} + +/* + * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue + * Must hold write_lock prior to calling. + */ +void AsyncConnection::discard_out_queue() +{ + ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; + + for (list::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; + (*p)->put(); + } + sent.clear(); + for (map > >::iterator p = out_q.begin(); p != out_q.end(); ++p) + for (list >::iterator r = p->second.begin(); r != p->second.end(); ++r) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl; + r->second->put(); + } + out_q.clear(); +} + +int AsyncConnection::randomize_out_seq() +{ + if (get_features() & CEPH_FEATURE_MSG_AUTH) { + // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error + // here. We'll check it on the call. PLR + uint64_t rand_seq; + int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq)); + rand_seq &= SEQ_MASK; + lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl; + out_seq = rand_seq; + return seq_error; + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + return 0; + } +} + +void AsyncConnection::fault() +{ + if (state == STATE_CLOSED || state == STATE_NONE) { + ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl; + return ; + } + + if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) { + ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl; + _stop(); + dispatch_queue->queue_reset(this); + return ; + } + + write_lock.lock(); + can_write = WriteStatus::NOWRITE; + shutdown_socket(); + open_write = false; + + // queue delayed items immediately + if (delay_state) + delay_state->flush(); + // requeue sent items + requeue_sent(); + recv_start = recv_end = 0; + state_offset = 0; + replacing = false; + is_reset_from_peer = false; + outcoming_bl.clear(); + if (!once_ready && !is_queued() && + state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { + ldout(async_msgr->cct, 10) << __func__ << " with nothing to send and in the half " + << " accept state just closed" << dendl; + write_lock.unlock(); + _stop(); + dispatch_queue->queue_reset(this); + return ; + } + reset_recv_state(); + if (policy.standby && !is_queued() && state != STATE_WAIT) { + ldout(async_msgr->cct, 10) << __func__ << " with nothing to send, going to standby" << dendl; + state = STATE_STANDBY; + write_lock.unlock(); + return; + } + + write_lock.unlock(); + if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) && + state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_* + // policy maybe empty when state is in accept + if (policy.server) { + ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl; + state = STATE_STANDBY; + } else { + ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl; + connect_seq++; + state = STATE_CONNECTING; + } + backoff = utime_t(); + center->dispatch_event_external(read_handler); + } else { + if (state == STATE_WAIT) { + backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); + } else if (backoff == utime_t()) { + backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff); + } else { + backoff += backoff; + if (backoff > async_msgr->cct->_conf->ms_max_backoff) + backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff); + } + + state = STATE_CONNECTING; + ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl; + // woke up again; + register_time_events.insert(center->create_time_event( + backoff.to_nsec()/1000, wakeup_handler)); + } +} + +void AsyncConnection::was_session_reset() +{ + ldout(async_msgr->cct,10) << __func__ << " started" << dendl; + std::lock_guard l(write_lock); + if (delay_state) + delay_state->discard(); + dispatch_queue->discard_queue(conn_id); + discard_out_queue(); + // note: we need to clear outcoming_bl here, but was_session_reset may be + // called by other thread, so let caller clear this itself! + // outcoming_bl.clear(); + + dispatch_queue->queue_remote_reset(this); + + if (randomize_out_seq()) { + ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; + } + + in_seq = 0; + connect_seq = 0; + // it's safe to directly set 0, double locked + ack_left = 0; + once_ready = false; + can_write = WriteStatus::NOWRITE; +} + +void AsyncConnection::_stop() +{ + if (state == STATE_CLOSED) + return ; + + if (delay_state) + delay_state->flush(); + + ldout(async_msgr->cct, 2) << __func__ << dendl; + std::lock_guard l(write_lock); + + reset_recv_state(); + dispatch_queue->discard_queue(conn_id); + discard_out_queue(); + async_msgr->unregister_conn(this); + worker->release_worker(); + + state = STATE_CLOSED; + open_write = false; + can_write = WriteStatus::CLOSED; + state_offset = 0; + // Make sure in-queue events will been processed + center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); +} + +void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) +{ + ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl; + + // associate message with Connection (for benefit of encode_payload) + if (m->empty_payload()) + ldout(async_msgr->cct, 20) << __func__ << " encoding features " + << features << " " << m << " " << *m << dendl; + else + ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features " + << features << " " << m << " " << *m << dendl; + + // encode and copy out of *m + m->encode(features, msgr->crcflags); + + bl.append(m->get_payload()); + bl.append(m->get_middle()); + bl.append(m->get_data()); +} + +ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) +{ + FUNCTRACE(); + assert(center->in_thread()); + m->set_seq(++out_seq); + + if (msgr->crcflags & MSG_CRC_HEADER) + m->calc_header_crc(); + + ceph_msg_header& header = m->get_header(); + ceph_msg_footer& footer = m->get_footer(); + + // TODO: let sign_message could be reentry? + // Now that we have all the crcs calculated, handle the + // digital signature for the message, if the AsyncConnection has session + // security set up. Some session security options do not + // actually calculate and check the signature, but they should + // handle the calls to sign_message and check_signature. PLR + if (session_security.get() == NULL) { + ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl; + } else { + if (session_security->sign_message(m)) { + ldout(async_msgr->cct, 20) << __func__ << " failed to sign m=" + << m << "): sig = " << footer.sig << dendl; + } else { + ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m + << "): sig = " << footer.sig << dendl; + } + } + + unsigned original_bl_len = outcoming_bl.length(); + + outcoming_bl.append(CEPH_MSGR_TAG_MSG); + + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + outcoming_bl.append((char*)&header, sizeof(header)); + } else { + ceph_msg_header_old oldheader; + memcpy(&oldheader, &header, sizeof(header)); + oldheader.src.name = header.src; + oldheader.src.addr = get_peer_addr(); + oldheader.orig_src = oldheader.src; + oldheader.reserved = header.reserved; + oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + outcoming_bl.append((char*)&oldheader, sizeof(oldheader)); + } + + ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type + << " src " << entity_name_t(header.src) + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off << dendl; + + if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { + for (const auto &pb : bl.buffers()) { + outcoming_bl.append((char*)pb.c_str(), pb.length()); + } + } else { + outcoming_bl.claim_append(bl); + } + + // send footer; if receiver doesn't support signatures, use the old footer format + ceph_msg_footer_old old_footer; + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + outcoming_bl.append((char*)&footer, sizeof(footer)); + } else { + if (msgr->crcflags & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + old_footer.data_crc = footer.data_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; + old_footer.flags = footer.flags; + outcoming_bl.append((char*)&old_footer, sizeof(old_footer)); + } + + m->trace.event("async writing message"); + ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() + << " " << m << dendl; + ssize_t total_send_size = outcoming_bl.length(); + ssize_t rc = _try_send(more); + if (rc < 0) { + ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " + << cpp_strerror(rc) << dendl; + } else if (rc == 0) { + logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len); + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; + } else { + logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length()); + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl; + } + if (m->get_type() == CEPH_MSG_OSD_OP) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false); + else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false); + m->put(); + + return rc; +} + +void AsyncConnection::reset_recv_state() +{ + // clean up state internal variables and states + if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && + state <= STATE_CONNECTING_READY) { + delete authorizer; + authorizer = NULL; + got_bad_auth = false; + } + + if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH + && policy.throttler_messages) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1 + << " message to policy throttler " + << policy.throttler_messages->get_current() << "/" + << policy.throttler_messages->get_max() << dendl; + policy.throttler_messages->put(); + } + if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + if (policy.throttler_bytes) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size + << " bytes to policy throttler " + << policy.throttler_bytes->get_current() << "/" + << policy.throttler_bytes->get_max() << dendl; + policy.throttler_bytes->put(cur_msg_size); + } + } + if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE && + state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { + ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size + << " bytes to dispatch_queue throttler " + << dispatch_queue->dispatch_throttler.get_current() << "/" + << dispatch_queue->dispatch_throttler.get_max() << dendl; + dispatch_queue->dispatch_throttle_release(cur_msg_size); + } +} + +void AsyncConnection::handle_ack(uint64_t seq) +{ + ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; + // trim sent list + std::lock_guard l(write_lock); + while (!sent.empty() && sent.front()->get_seq() <= seq) { + Message* m = sent.front(); + sent.pop_front(); + ldout(async_msgr->cct, 10) << __func__ << " got ack seq " + << seq << " >= " << m->get_seq() << " on " + << m << " " << *m << dendl; + m->put(); + } +} + +void AsyncConnection::DelayedDelivery::do_request(int id) +{ + Message *m = nullptr; + { + std::lock_guard l(delay_lock); + register_time_events.erase(id); + if (stop_dispatch) + return ; + if (delay_queue.empty()) + return ; + utime_t release = delay_queue.front().first; + m = delay_queue.front().second; + string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type; + utime_t now = ceph_clock_now(); + if ((release > now && + (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) { + utime_t t = release - now; + t.sleep(); + } + delay_queue.pop_front(); + } + if (msgr->ms_can_fast_dispatch(m)) { + dispatch_queue->fast_dispatch(m); + } else { + dispatch_queue->enqueue(m, m->get_priority(), conn_id); + } +} + +void AsyncConnection::DelayedDelivery::flush() { + stop_dispatch = true; + center->submit_to( + center->get_id(), [this] () mutable { + std::lock_guard l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + if (msgr->ms_can_fast_dispatch(m)) { + dispatch_queue->fast_dispatch(m); + } else { + dispatch_queue->enqueue(m, m->get_priority(), conn_id); + } + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + stop_dispatch = false; + }, true); +} + +void AsyncConnection::send_keepalive() +{ + ldout(async_msgr->cct, 10) << __func__ << dendl; + std::lock_guard l(write_lock); + if (can_write != WriteStatus::CLOSED) { + keepalive = true; + center->dispatch_event_external(write_handler); + } +} + +void AsyncConnection::mark_down() +{ + ldout(async_msgr->cct, 1) << __func__ << dendl; + std::lock_guard l(lock); + _stop(); +} + +void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp) +{ + ldout(async_msgr->cct, 10) << __func__ << dendl; + if (ack) { + assert(tp); + struct ceph_timespec ts; + tp->encode_timeval(&ts); + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + outcoming_bl.append((char*)&ts, sizeof(ts)); + } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct ceph_timespec ts; + utime_t t = ceph_clock_now(); + t.encode_timeval(&ts); + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); + outcoming_bl.append((char*)&ts, sizeof(ts)); + } else { + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE); + } +} + +void AsyncConnection::handle_write() +{ + ldout(async_msgr->cct, 10) << __func__ << dendl; + ssize_t r = 0; + + write_lock.lock(); + if (can_write == WriteStatus::CANWRITE) { + if (keepalive) { + _append_keepalive_or_ack(); + keepalive = false; + } + + auto start = ceph::mono_clock::now(); + bool more; + do { + bufferlist data; + Message *m = _get_next_outgoing(&data); + if (!m) + break; + + if (!policy.lossy) { + // put on sent list + sent.push_back(m); + m->get(); + } + more = _has_next_outgoing(); + write_lock.unlock(); + + // send_message or requeue messages may not encode message + if (!data.length()) + prepare_send_message(get_features(), m, data); + + r = write_message(m, data, more); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; + goto fail; + } + write_lock.lock(); + if (r > 0) + break; + } while (can_write == WriteStatus::CANWRITE); + write_lock.unlock(); + + uint64_t left = ack_left; + if (left) { + ceph_le64 s; + s = in_seq; + outcoming_bl.append(CEPH_MSGR_TAG_ACK); + outcoming_bl.append((char*)&s, sizeof(s)); + ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; + ack_left -= left; + left = ack_left; + r = _try_send(left); + } else if (is_queued()) { + r = _try_send(); + } + + logger->tinc(l_msgr_running_send_time, ceph::mono_clock::now() - start); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; + goto fail; + } + } else { + write_lock.unlock(); + lock.lock(); + write_lock.lock(); + if (state == STATE_STANDBY && !policy.server && is_queued()) { + ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; + _connect(); + } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { + r = _try_send(); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; + write_lock.unlock(); + fault(); + lock.unlock(); + return ; + } + } + write_lock.unlock(); + lock.unlock(); + } + + return ; + + fail: + lock.lock(); + fault(); + lock.unlock(); +} + +void AsyncConnection::wakeup_from(uint64_t id) +{ + lock.lock(); + register_time_events.erase(id); + lock.unlock(); + process(); +} + +void AsyncConnection::tick(uint64_t id) +{ + auto now = ceph::coarse_mono_clock::now(); + ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id + << " last_active" << last_active << dendl; + std::lock_guard l(lock); + last_tick_id = 0; + auto idle_period = std::chrono::duration_cast(now - last_active).count(); + if (inactive_timeout_us < (uint64_t)idle_period) { + ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " + << inactive_timeout_us + << " us, mark self fault." << dendl; + fault(); + } else if (is_connected()) { + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + } +}