These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / rds / send.c
index e9430f5..c9cdb35 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/list.h>
 #include <linux/ratelimit.h>
 #include <linux/export.h>
+#include <linux/sizes.h>
 
 #include "rds.h"
 
@@ -51,7 +52,7 @@
  * it to 0 will restore the old behavior (where we looped until we had
  * drained the queue).
  */
-static int send_batch_count = 64;
+static int send_batch_count = SZ_1K;
 module_param(send_batch_count, int, 0444);
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
@@ -223,7 +224,7 @@ restart:
                         * through a lot of messages, lets back off and see
                         * if anyone else jumps in
                         */
-                       if (batch_count >= 1024)
+                       if (batch_count >= send_batch_count)
                                goto over_batch;
 
                        spin_lock_irqsave(&conn->c_lock, flags);
@@ -282,26 +283,34 @@ restart:
                /* The transport either sends the whole rdma or none of it */
                if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
                        rm->m_final_op = &rm->rdma;
+                       /* The transport owns the mapped memory for now.
+                        * You can't unmap it while it's on the send queue
+                        */
+                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
-                       if (ret)
+                       if (ret) {
+                               clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+                               wake_up_interruptible(&rm->m_flush_wait);
                                break;
+                       }
                        conn->c_xmit_rdma_sent = 1;
 
-                       /* The transport owns the mapped memory for now.
-                        * You can't unmap it while it's on the send queue */
-                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
                if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
                        rm->m_final_op = &rm->atomic;
+                       /* The transport owns the mapped memory for now.
+                        * You can't unmap it while it's on the send queue
+                        */
+                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
-                       if (ret)
+                       if (ret) {
+                               clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
+                               wake_up_interruptible(&rm->m_flush_wait);
                                break;
+                       }
                        conn->c_xmit_atomic_sent = 1;
 
-                       /* The transport owns the mapped memory for now.
-                        * You can't unmap it while it's on the send queue */
-                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
                /*
@@ -411,15 +420,19 @@ over_batch:
         */
        if (ret == 0) {
                smp_mb();
-               if (!list_empty(&conn->c_send_queue) &&
+               if ((test_bit(0, &conn->c_map_queued) ||
+                    !list_empty(&conn->c_send_queue)) &&
                    send_gen == conn->c_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
-                       goto restart;
+                       if (batch_count < send_batch_count)
+                               goto restart;
+                       queue_delayed_work(rds_wq, &conn->c_send_w, 1);
                }
        }
 out:
        return ret;
 }
+EXPORT_SYMBOL_GPL(rds_send_xmit);
 
 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 {
@@ -769,8 +782,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
        while (!list_empty(&list)) {
                rm = list_entry(list.next, struct rds_message, m_sock_item);
                list_del_init(&rm->m_sock_item);
-
                rds_message_wait(rm);
+
+               /* just in case the code above skipped this message
+                * because RDS_MSG_ON_CONN wasn't set, run it again here
+                * taking m_rs_lock is the only thing that keeps us
+                * from racing with ack processing.
+                */
+               spin_lock_irqsave(&rm->m_rs_lock, flags);
+
+               spin_lock(&rs->rs_lock);
+               __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
+               spin_unlock(&rs->rs_lock);
+
+               rm->m_rs = NULL;
+               spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
                rds_message_put(rm);
        }
 }
@@ -986,11 +1013,18 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                release_sock(sk);
        }
 
-       /* racing with another thread binding seems ok here */
+       lock_sock(sk);
        if (daddr == 0 || rs->rs_bound_addr == 0) {
+               release_sock(sk);
                ret = -ENOTCONN; /* XXX not a great errno */
                goto out;
        }
+       release_sock(sk);
+
+       if (payload_len > rds_sk_sndbuf(rs)) {
+               ret = -EMSGSIZE;
+               goto out;
+       }
 
        /* size of rm including all sgs */
        ret = rds_rm_size(msg, payload_len);
@@ -1023,7 +1057,8 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
                conn = rs->rs_conn;
        else {
-               conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr,
+               conn = rds_conn_create_outgoing(sock_net(sock->sk),
+                                               rs->rs_bound_addr, daddr,
                                        rs->rs_transport,
                                        sock->sk->sk_allocation);
                if (IS_ERR(conn)) {
@@ -1063,11 +1098,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
        while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
                                  dport, &queued)) {
                rds_stats_inc(s_send_queue_full);
-               /* XXX make sure this is reasonable */
-               if (payload_len > rds_sk_sndbuf(rs)) {
-                       ret = -EMSGSIZE;
-                       goto out;
-               }
+
                if (nonblock) {
                        ret = -EAGAIN;
                        goto out;
@@ -1095,8 +1126,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
         */
        rds_stats_inc(s_send_queued);
 
-       if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-               rds_send_xmit(conn);
+       ret = rds_send_xmit(conn);
+       if (ret == -ENOMEM || ret == -EAGAIN)
+               queue_delayed_work(rds_wq, &conn->c_send_w, 1);
 
        rds_message_put(rm);
        return payload_len;
@@ -1152,8 +1184,8 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
        rds_stats_inc(s_send_queued);
        rds_stats_inc(s_send_pong);
 
-       if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-               queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+       /* schedule the send work on rds_wq */
+       queue_delayed_work(rds_wq, &conn->c_send_w, 1);
 
        rds_message_put(rm);
        return 0;