These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / ceph / messenger.c
index 967080a..63ae5dd 100644 (file)
@@ -6,6 +6,7 @@
 #include <linux/inet.h>
 #include <linux/kthread.h>
 #include <linux/net.h>
+#include <linux/nsproxy.h>
 #include <linux/slab.h>
 #include <linux/socket.h>
 #include <linux/string.h>
@@ -162,6 +163,7 @@ static struct kmem_cache    *ceph_msg_data_cache;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
 
 #ifdef CONFIG_LOCKDEP
 static struct lock_class_key socket_class;
@@ -175,7 +177,7 @@ static struct lock_class_key socket_class;
 
 static void queue_con(struct ceph_connection *con);
 static void cancel_con(struct ceph_connection *con);
-static void con_work(struct work_struct *);
+static void ceph_con_workfn(struct work_struct *);
 static void con_fault(struct ceph_connection *con);
 
 /*
@@ -275,23 +277,22 @@ static void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
-       ceph_msgr_slab_exit();
-
        BUG_ON(zero_page == NULL);
-       kunmap(zero_page);
        page_cache_release(zero_page);
        zero_page = NULL;
+
+       ceph_msgr_slab_exit();
 }
 
 int ceph_msgr_init(void)
 {
+       if (ceph_msgr_slab_init())
+               return -ENOMEM;
+
        BUG_ON(zero_page != NULL);
        zero_page = ZERO_PAGE(0);
        page_cache_get(zero_page);
 
-       if (ceph_msgr_slab_init())
-               return -ENOMEM;
-
        /*
         * The number of active work items is limited by the number of
         * connections, so leave @max_active at default.
@@ -480,8 +481,8 @@ static int ceph_tcp_connect(struct ceph_connection *con)
        int ret;
 
        BUG_ON(con->sock);
-       ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
-                              IPPROTO_TCP, &sock);
+       ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family,
+                              SOCK_STREAM, IPPROTO_TCP, &sock);
        if (ret)
                return ret;
        sock->sk->sk_allocation = GFP_NOFS;
@@ -508,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
 
-       if (con->msgr->tcp_nodelay) {
+       if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
                int optval = 1;
 
                ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
@@ -636,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con)
 static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
-       BUG_ON(msg->con == NULL);
-       msg->con->ops->put(msg->con);
-       msg->con = NULL;
 
        ceph_msg_put(msg);
 }
@@ -661,20 +659,21 @@ static void reset_connection(struct ceph_connection *con)
 
        if (con->in_msg) {
                BUG_ON(con->in_msg->con != con);
-               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->ops->put(con);
        }
 
        con->connect_seq = 0;
        con->out_seq = 0;
        if (con->out_msg) {
+               BUG_ON(con->out_msg->con != con);
                ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
        }
        con->in_seq = 0;
        con->in_seq_acked = 0;
+
+       con->out_skip = 0;
 }
 
 /*
@@ -749,7 +748,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        mutex_init(&con->mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
-       INIT_DELAYED_WORK(&con->work, con_work);
+       INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
 
        con->state = CON_STATE_CLOSED;
 }
@@ -774,6 +773,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 
 static void con_out_kvec_reset(struct ceph_connection *con)
 {
+       BUG_ON(con->out_skip);
+
        con->out_kvec_left = 0;
        con->out_kvec_bytes = 0;
        con->out_kvec_cur = &con->out_kvec[0];
@@ -782,9 +783,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
 static void con_out_kvec_add(struct ceph_connection *con,
                                size_t size, void *data)
 {
-       int index;
+       int index = con->out_kvec_left;
 
-       index = con->out_kvec_left;
+       BUG_ON(con->out_skip);
        BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
 
        con->out_kvec[index].iov_len = size;
@@ -793,6 +794,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
        con->out_kvec_bytes += size;
 }
 
+/*
+ * Chop off a kvec from the end.  Return residual number of bytes for
+ * that kvec, i.e. how many bytes would have been written if the kvec
+ * hadn't been nuked.
+ */
+static int con_out_kvec_skip(struct ceph_connection *con)
+{
+       int off = con->out_kvec_cur - con->out_kvec;
+       int skip = 0;
+
+       if (con->out_kvec_bytes > 0) {
+               skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
+               BUG_ON(con->out_kvec_bytes < skip);
+               BUG_ON(!con->out_kvec_left);
+               con->out_kvec_bytes -= skip;
+               con->out_kvec_left--;
+       }
+
+       return skip;
+}
+
 #ifdef CONFIG_BLOCK
 
 /*
@@ -1178,6 +1200,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
        return new_piece;
 }
 
+static size_t sizeof_footer(struct ceph_connection *con)
+{
+       return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
+           sizeof(struct ceph_msg_footer) :
+           sizeof(struct ceph_msg_footer_old);
+}
+
 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 {
        BUG_ON(!msg);
@@ -1200,11 +1229,10 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 
        dout("prepare_write_message_footer %p\n", con);
-       con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
                if (con->ops->sign_message)
-                       con->ops->sign_message(con, m);
+                       con->ops->sign_message(m);
                else
                        m->footer.sig = 0;
                con->out_kvec[v].iov_len = sizeof(m->footer);
@@ -1228,7 +1256,6 @@ static void prepare_write_message(struct ceph_connection *con)
        u32 crc;
 
        con_out_kvec_reset(con);
-       con->out_kvec_is_msg = true;
        con->out_msg_done = false;
 
        /* Sneak an ack in there first?  If we can get it into the same
@@ -1268,18 +1295,19 @@ static void prepare_write_message(struct ceph_connection *con)
 
        /* tag + hdr + front + middle */
        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-       con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+       con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 
        if (m->middle)
                con_out_kvec_add(con, m->middle->vec.iov_len,
                        m->middle->vec.iov_base);
 
-       /* fill in crc (except data pages), footer */
+       /* fill in hdr crc and finalize hdr */
        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
        con->out_msg->hdr.crc = cpu_to_le32(crc);
-       con->out_msg->footer.flags = 0;
+       memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
 
+       /* fill in front and middle crc, footer */
        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
        con->out_msg->footer.front_crc = cpu_to_le32(crc);
        if (m->middle) {
@@ -1291,6 +1319,7 @@ static void prepare_write_message(struct ceph_connection *con)
        dout("%s front_crc %u middle_crc %u\n", __func__,
             le32_to_cpu(con->out_msg->footer.front_crc),
             le32_to_cpu(con->out_msg->footer.middle_crc));
+       con->out_msg->footer.flags = 0;
 
        /* is there a data payload? */
        con->out_msg->footer.data_crc = 0;
@@ -1351,7 +1380,16 @@ static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
        con_out_kvec_reset(con);
-       con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+       if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
+               struct timespec now = CURRENT_TIME;
+
+               con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
+               ceph_encode_timespec(&con->out_temp_keepalive2, &now);
+               con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
+                                &con->out_temp_keepalive2);
+       } else {
+               con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
+       }
        con_flag_set(con, CON_FLAG_WRITE_PENDING);
 }
 
@@ -1422,7 +1460,8 @@ static int prepare_write_connect(struct ceph_connection *con)
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
+       con->out_connect.features =
+           cpu_to_le64(from_msgr(con->msgr)->supported_features);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1485,7 +1524,6 @@ static int write_partial_kvec(struct ceph_connection *con)
                }
        }
        con->out_kvec_left = 0;
-       con->out_kvec_is_msg = false;
        ret = 1;
 out:
        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1517,7 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
@@ -1542,10 +1580,10 @@ static int write_partial_message_data(struct ceph_connection *con)
                bool need_crc;
                int ret;
 
-               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
-                                                       &last_piece);
+               page = ceph_msg_data_next(cursor, &page_offset, &length,
+                                         &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-                                     length, last_piece);
+                                       length, !last_piece);
                if (ret <= 0) {
                        if (do_datacrc)
                                msg->footer.data_crc = cpu_to_le32(crc);
@@ -1554,7 +1592,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                }
                if (do_datacrc && cursor->need_crc)
                        crc = ceph_crc32c_page(crc, page, page_offset, length);
-               need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+               need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
@@ -1577,6 +1615,7 @@ static int write_partial_skip(struct ceph_connection *con)
 {
        int ret;
 
+       dout("%s %p %d left\n", __func__, con, con->out_skip);
        while (con->out_skip > 0) {
                size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
@@ -1625,6 +1664,12 @@ static void prepare_read_tag(struct ceph_connection *con)
        con->in_tag = CEPH_MSGR_TAG_READY;
 }
 
+static void prepare_read_keepalive_ack(struct ceph_connection *con)
+{
+       dout("prepare_read_keepalive_ack %p\n", con);
+       con->in_base_pos = 0;
+}
+
 /*
  * Prepare to read a message.
  */
@@ -1732,17 +1777,17 @@ static int verify_hello(struct ceph_connection *con)
 
 static bool addr_is_blank(struct sockaddr_storage *ss)
 {
+       struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr;
+       struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr;
+
        switch (ss->ss_family) {
        case AF_INET:
-               return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
+               return addr->s_addr == htonl(INADDR_ANY);
        case AF_INET6:
-               return
-                    ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
-                    ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
-                    ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
-                    ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
+               return ipv6_addr_any(addr6);
+       default:
+               return true;
        }
-       return false;
 }
 
 static int addr_port(struct sockaddr_storage *ss)
@@ -1989,8 +2034,8 @@ static int process_banner(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = con->msgr->supported_features;
-       u64 req_feat = con->msgr->required_features;
+       u64 sup_feat = from_msgr(con->msgr)->supported_features;
+       u64 req_feat = from_msgr(con->msgr)->required_features;
        u64 server_feat = ceph_sanitize_features(
                                le64_to_cpu(con->in_reply.features));
        int ret;
@@ -2216,7 +2261,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       const bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        struct page *page;
        size_t page_offset;
        size_t length;
@@ -2230,8 +2275,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
        if (do_datacrc)
                crc = con->in_data_crc;
        while (cursor->resid) {
-               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
-                                                       NULL);
+               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
                        if (do_datacrc)
@@ -2242,7 +2286,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
                if (do_datacrc)
                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+               (void) ceph_msg_data_advance(cursor, (size_t)ret);
        }
        if (do_datacrc)
                con->in_data_crc = crc;
@@ -2262,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con)
        int end;
        int ret;
        unsigned int front_len, middle_len, data_len;
-       bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
        u64 seq;
        u32 crc;
@@ -2301,9 +2345,9 @@ static int read_partial_message(struct ceph_connection *con)
                        ceph_pr_addr(&con->peer_addr.in_addr),
                        seq, con->in_seq + 1);
                con->in_base_pos = -front_len - middle_len - data_len -
-                       sizeof(m->footer);
+                       sizeof_footer(con);
                con->in_tag = CEPH_MSGR_TAG_READY;
-               return 0;
+               return 1;
        } else if ((s64)seq - (s64)con->in_seq > 1) {
                pr_err("read_partial_message bad seq %lld expected %lld\n",
                       seq, con->in_seq + 1);
@@ -2322,21 +2366,14 @@ static int read_partial_message(struct ceph_connection *con)
                        return ret;
 
                BUG_ON(!con->in_msg ^ skip);
-               if (con->in_msg && data_len > con->in_msg->data_length) {
-                       pr_warn("%s skipping long message (%u > %zd)\n",
-                               __func__, data_len, con->in_msg->data_length);
-                       ceph_msg_put(con->in_msg);
-                       con->in_msg = NULL;
-                       skip = 1;
-               }
                if (skip) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
                        con->in_base_pos = -front_len - middle_len - data_len -
-                               sizeof(m->footer);
+                               sizeof_footer(con);
                        con->in_tag = CEPH_MSGR_TAG_READY;
                        con->in_seq++;
-                       return 0;
+                       return 1;
                }
 
                BUG_ON(!con->in_msg);
@@ -2414,7 +2451,7 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        if (need_sign && con->ops->check_message_signature &&
-           con->ops->check_message_signature(con, m)) {
+           con->ops->check_message_signature(m)) {
                pr_err("read_partial_message %p signature check failed\n", m);
                return -EBADMSG;
        }
@@ -2429,13 +2466,10 @@ static int read_partial_message(struct ceph_connection *con)
  */
 static void process_message(struct ceph_connection *con)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg = con->in_msg;
 
        BUG_ON(con->in_msg->con != con);
-       con->in_msg->con = NULL;
-       msg = con->in_msg;
        con->in_msg = NULL;
-       con->ops->put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -2457,6 +2491,17 @@ static void process_message(struct ceph_connection *con)
        mutex_lock(&con->mutex);
 }
 
+static int read_keepalive_ack(struct ceph_connection *con)
+{
+       struct ceph_timespec ceph_ts;
+       size_t size = sizeof(ceph_ts);
+       int ret = read_partial(con, size, size, &ceph_ts);
+       if (ret <= 0)
+               return ret;
+       ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
+       prepare_read_tag(con);
+       return 1;
+}
 
 /*
  * Write something to the socket.  Called in a worker thread when the
@@ -2493,13 +2538,13 @@ more:
 
 more_kvec:
        /* kvec data queued? */
-       if (con->out_skip) {
-               ret = write_partial_skip(con);
+       if (con->out_kvec_left) {
+               ret = write_partial_kvec(con);
                if (ret <= 0)
                        goto out;
        }
-       if (con->out_kvec_left) {
-               ret = write_partial_kvec(con);
+       if (con->out_skip) {
+               ret = write_partial_skip(con);
                if (ret <= 0)
                        goto out;
        }
@@ -2526,6 +2571,10 @@ more_kvec:
 
 do_next:
        if (con->state == CON_STATE_OPEN) {
+               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
+                       prepare_write_keepalive(con);
+                       goto more;
+               }
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -2535,10 +2584,6 @@ do_next:
                        prepare_write_ack(con);
                        goto more;
                }
-               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
-                       prepare_write_keepalive(con);
-                       goto more;
-               }
        }
 
        /* Nothing to do! */
@@ -2641,6 +2686,9 @@ more:
                case CEPH_MSGR_TAG_ACK:
                        prepare_read_ack(con);
                        break;
+               case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+                       prepare_read_keepalive_ack(con);
+                       break;
                case CEPH_MSGR_TAG_CLOSE:
                        con_close_socket(con);
                        con->state = CON_STATE_CLOSED;
@@ -2654,7 +2702,7 @@ more:
                if (ret <= 0) {
                        switch (ret) {
                        case -EBADMSG:
-                               con->error_msg = "bad crc";
+                               con->error_msg = "bad crc/signature";
                                /* fall through */
                        case -EBADE:
                                ret = -EIO;
@@ -2684,6 +2732,12 @@ more:
                process_ack(con);
                goto more;
        }
+       if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+               ret = read_keepalive_ack(con);
+               if (ret <= 0)
+                       goto out;
+               goto more;
+       }
 
 out:
        dout("try_read done on %p ret %d\n", con, ret);
@@ -2799,7 +2853,7 @@ static void con_fault_finish(struct ceph_connection *con)
 /*
  * Do some work on a connection.  Drop a connection ref when we're done.
  */
-static void con_work(struct work_struct *work)
+static void ceph_con_workfn(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
@@ -2889,10 +2943,8 @@ static void con_fault(struct ceph_connection *con)
 
        if (con->in_msg) {
                BUG_ON(con->in_msg->con != con);
-               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->ops->put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2923,15 +2975,8 @@ static void con_fault(struct ceph_connection *con)
  * initialize a new messenger instance
  */
 void ceph_messenger_init(struct ceph_messenger *msgr,
-                       struct ceph_entity_addr *myaddr,
-                       u64 supported_features,
-                       u64 required_features,
-                       bool nocrc,
-                       bool tcp_nodelay)
+                        struct ceph_entity_addr *myaddr)
 {
-       msgr->supported_features = supported_features;
-       msgr->required_features = required_features;
-
        spin_lock_init(&msgr->global_seq_lock);
 
        if (myaddr)
@@ -2941,15 +2986,29 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
        msgr->inst.addr.type = 0;
        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
        encode_my_addr(msgr);
-       msgr->nocrc = nocrc;
-       msgr->tcp_nodelay = tcp_nodelay;
 
        atomic_set(&msgr->stopping, 0);
+       write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
 
        dout("%s %p\n", __func__, msgr);
 }
 EXPORT_SYMBOL(ceph_messenger_init);
 
+void ceph_messenger_fini(struct ceph_messenger *msgr)
+{
+       put_net(read_pnet(&msgr->net));
+}
+EXPORT_SYMBOL(ceph_messenger_fini);
+
+static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
+{
+       if (msg->con)
+               msg->con->ops->put(msg->con);
+
+       msg->con = con ? con->ops->get(con) : NULL;
+       BUG_ON(msg->con != con);
+}
+
 static void clear_standby(struct ceph_connection *con)
 {
        /* come back from STANDBY? */
@@ -2981,9 +3040,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
                return;
        }
 
-       BUG_ON(msg->con != NULL);
-       msg->con = con->ops->get(con);
-       BUG_ON(msg->con == NULL);
+       msg_con_set(msg, con);
 
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
@@ -3011,31 +3068,45 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 {
        struct ceph_connection *con = msg->con;
 
-       if (!con)
+       if (!con) {
+               dout("%s msg %p null con\n", __func__, msg);
                return;         /* Message not in our possession */
+       }
 
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
-               BUG_ON(msg->con == NULL);
-               msg->con->ops->put(msg->con);
-               msg->con = NULL;
                msg->hdr.seq = 0;
 
                ceph_msg_put(msg);
        }
        if (con->out_msg == msg) {
-               dout("%s %p msg %p - was sending\n", __func__, con, msg);
-               con->out_msg = NULL;
-               if (con->out_kvec_is_msg) {
-                       con->out_skip = con->out_kvec_bytes;
-                       con->out_kvec_is_msg = false;
+               BUG_ON(con->out_skip);
+               /* footer */
+               if (con->out_msg_done) {
+                       con->out_skip += con_out_kvec_skip(con);
+               } else {
+                       BUG_ON(!msg->data_length);
+                       if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
+                               con->out_skip += sizeof(msg->footer);
+                       else
+                               con->out_skip += sizeof(msg->old_footer);
                }
+               /* data, middle, front */
+               if (msg->data_length)
+                       con->out_skip += msg->cursor.total_resid;
+               if (msg->middle)
+                       con->out_skip += con_out_kvec_skip(con);
+               con->out_skip += con_out_kvec_skip(con);
+
+               dout("%s %p msg %p - was sending, will write %d skip %d\n",
+                    __func__, con, msg, con->out_kvec_bytes, con->out_skip);
                msg->hdr.seq = 0;
-
+               con->out_msg = NULL;
                ceph_msg_put(msg);
        }
+
        mutex_unlock(&con->mutex);
 }
 
@@ -3044,16 +3115,13 @@ void ceph_msg_revoke(struct ceph_msg *msg)
  */
 void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
-       struct ceph_connection *con;
+       struct ceph_connection *con = msg->con;
 
-       BUG_ON(msg == NULL);
-       if (!msg->con) {
+       if (!con) {
                dout("%s msg %p null con\n", __func__, msg);
-
                return;         /* Message not in our possession */
        }
 
-       con = msg->con;
        mutex_lock(&con->mutex);
        if (con->in_msg == msg) {
                unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
@@ -3094,6 +3162,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+bool ceph_con_keepalive_expired(struct ceph_connection *con,
+                              unsigned long interval)
+{
+       if (interval > 0 &&
+           (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+               struct timespec now = CURRENT_TIME;
+               struct timespec ts;
+               jiffies_to_timespec(interval, &ts);
+               ts = timespec_add(con->last_keepalive_ack, ts);
+               return timespec_compare(&now, &ts) >= 0;
+       }
+       return false;
+}
+
 static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
 {
        struct ceph_msg_data *data;
@@ -3285,9 +3367,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
        }
        if (msg) {
                BUG_ON(*skip);
+               msg_con_set(msg, con);
                con->in_msg = msg;
-               con->in_msg->con = con->ops->get(con);
-               BUG_ON(con->in_msg->con == NULL);
        } else {
                /*
                 * Null message pointer means either we should skip
@@ -3334,6 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
        dout("%s %p\n", __func__, m);
        WARN_ON(!list_empty(&m->list_head));
 
+       msg_con_set(m, NULL);
+
        /* drop middle, data, if any */
        if (m->middle) {
                ceph_buffer_put(m->middle);