These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / rds / ib_cm.c
index 8a09ee7..da5a7fb 100644 (file)
 #include "rds.h"
 #include "ib.h"
 
-static char *rds_ib_event_type_strings[] = {
-#define RDS_IB_EVENT_STRING(foo) \
-               [IB_EVENT_##foo] = __stringify(IB_EVENT_##foo)
-       RDS_IB_EVENT_STRING(CQ_ERR),
-       RDS_IB_EVENT_STRING(QP_FATAL),
-       RDS_IB_EVENT_STRING(QP_REQ_ERR),
-       RDS_IB_EVENT_STRING(QP_ACCESS_ERR),
-       RDS_IB_EVENT_STRING(COMM_EST),
-       RDS_IB_EVENT_STRING(SQ_DRAINED),
-       RDS_IB_EVENT_STRING(PATH_MIG),
-       RDS_IB_EVENT_STRING(PATH_MIG_ERR),
-       RDS_IB_EVENT_STRING(DEVICE_FATAL),
-       RDS_IB_EVENT_STRING(PORT_ACTIVE),
-       RDS_IB_EVENT_STRING(PORT_ERR),
-       RDS_IB_EVENT_STRING(LID_CHANGE),
-       RDS_IB_EVENT_STRING(PKEY_CHANGE),
-       RDS_IB_EVENT_STRING(SM_CHANGE),
-       RDS_IB_EVENT_STRING(SRQ_ERR),
-       RDS_IB_EVENT_STRING(SRQ_LIMIT_REACHED),
-       RDS_IB_EVENT_STRING(QP_LAST_WQE_REACHED),
-       RDS_IB_EVENT_STRING(CLIENT_REREGISTER),
-#undef RDS_IB_EVENT_STRING
-};
-
-static char *rds_ib_event_str(enum ib_event_type type)
-{
-       return rds_str_array(rds_ib_event_type_strings,
-                            ARRAY_SIZE(rds_ib_event_type_strings), type);
-};
-
 /*
  * Set the selected protocol version
  */
@@ -165,7 +135,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
        rds_ib_recv_init_ring(ic);
        /* Post receive buffers - as a side effect, this will update
         * the posted credit count. */
-       rds_ib_recv_refill(conn, 1);
+       rds_ib_recv_refill(conn, 1, GFP_KERNEL);
 
        /* Tune RNR behavior */
        rds_ib_tune_rnr(ic, &qp_attr);
@@ -243,7 +213,97 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
 static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
 {
        rdsdebug("event %u (%s) data %p\n",
-                event->event, rds_ib_event_str(event->event), data);
+                event->event, ib_event_msg(event->event), data);
+}
+
+/* Plucking the oldest entry from the ring can be done concurrently with
+ * the thread refilling the ring.  Each ring operation is protected by
+ * spinlocks and the transient state of refilling doesn't change the
+ * recording of which entry is oldest.
+ *
+ * This relies on IB only calling one cq comp_handler for each cq so that
+ * there will only be one caller of rds_recv_incoming() per RDS connection.
+ */
+static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_recv_tasklet);
+}
+
+static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
+                   struct ib_wc *wcs,
+                   struct rds_ib_ack_state *ack_state)
+{
+       int nr;
+       int i;
+       struct ib_wc *wc;
+
+       while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
+               for (i = 0; i < nr; i++) {
+                       wc = wcs + i;
+                       rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+                                (unsigned long long)wc->wr_id, wc->status,
+                                wc->byte_len, be32_to_cpu(wc->ex.imm_data));
+
+                       if (wc->wr_id & RDS_IB_SEND_OP)
+                               rds_ib_send_cqe_handler(ic, wc);
+                       else
+                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
+               }
+       }
+}
+
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state state;
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+       ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+       if (rds_conn_up(conn) &&
+           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued)))
+               rds_send_xmit(ic->conn);
+}
+
+static void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
+       struct rds_ib_ack_state state;
+
+       if (!rds_ibdev)
+               rds_conn_drop(conn);
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+       ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+       poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+
+       if (state.ack_next_valid)
+               rds_ib_set_ack(ic, state.ack_next, state.ack_required);
+       if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
+               rds_send_drop_acked(conn, state.ack_recv, NULL);
+               ic->i_ack_recv = state.ack_recv;
+       }
+
+       if (rds_conn_up(conn))
+               rds_ib_attempt_ack(ic);
 }
 
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
@@ -252,7 +312,7 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
        struct rds_ib_connection *ic = conn->c_transport_data;
 
        rdsdebug("conn %p ic %p event %u (%s)\n", conn, ic, event->event,
-                rds_ib_event_str(event->event));
+                ib_event_msg(event->event));
 
        switch (event->event) {
        case IB_EVENT_COMM_EST:
@@ -261,13 +321,25 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
        default:
                rdsdebug("Fatal QP Event %u (%s) "
                        "- connection %pI4->%pI4, reconnecting\n",
-                       event->event, rds_ib_event_str(event->event),
+                       event->event, ib_event_msg(event->event),
                        &conn->c_laddr, &conn->c_faddr);
                rds_conn_drop(conn);
                break;
        }
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -277,6 +349,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        struct rds_ib_connection *ic = conn->c_transport_data;
        struct ib_device *dev = ic->i_cm_id->device;
        struct ib_qp_init_attr attr;
+       struct ib_cq_init_attr cq_attr = {};
        struct rds_ib_device *rds_ibdev;
        int ret;
 
@@ -298,11 +371,12 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 
        /* Protection domain and memory range */
        ic->i_pd = rds_ibdev->pd;
-       ic->i_mr = rds_ibdev->mr;
 
-       ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+       cq_attr.cqe = ic->i_send_ring.w_nr + 1;
+
+       ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                     rds_ib_cq_event_handler, conn,
-                                    ic->i_send_ring.w_nr + 1, 0);
+                                    &cq_attr);
        if (IS_ERR(ic->i_send_cq)) {
                ret = PTR_ERR(ic->i_send_cq);
                ic->i_send_cq = NULL;
@@ -310,9 +384,10 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
                goto out;
        }
 
-       ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
+       cq_attr.cqe = ic->i_recv_ring.w_nr;
+       ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
                                     rds_ib_cq_event_handler, conn,
-                                    ic->i_recv_ring.w_nr, 0);
+                                    &cq_attr);
        if (IS_ERR(ic->i_recv_cq)) {
                ret = PTR_ERR(ic->i_recv_cq);
                ic->i_recv_cq = NULL;
@@ -402,7 +477,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 
        rds_ib_recv_init_ack(ic);
 
-       rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
+       rdsdebug("conn %p pd %p cq %p %p\n", conn, ic->i_pd,
                 ic->i_send_cq, ic->i_recv_cq);
 
 out:
@@ -475,8 +550,9 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
                 (unsigned long long)be64_to_cpu(lguid),
                 (unsigned long long)be64_to_cpu(fguid));
 
-       conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport,
-                              GFP_KERNEL);
+       /* RDS/IB is not currently netns aware, thus init_net */
+       conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr,
+                              &rds_ib_transport, GFP_KERNEL);
        if (IS_ERR(conn)) {
                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
                conn = NULL;
@@ -592,7 +668,7 @@ int rds_ib_conn_connect(struct rds_connection *conn)
 
        /* XXX I wonder what affect the port space has */
        /* delegate cm event handler to rdma_transport */
-       ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn,
+       ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn,
                                     RDMA_PS_TCP, IB_QPT_RC);
        if (IS_ERR(ic->i_cm_id)) {
                ret = PTR_ERR(ic->i_cm_id);
@@ -664,8 +740,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                wait_event(rds_ib_ring_empty_wait,
                           rds_ib_ring_empty(&ic->i_recv_ring) &&
                           (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_send_tasklet);
                tasklet_kill(&ic->i_recv_tasklet);
 
+               /* first destroy the ib state that generates callbacks */
+               if (ic->i_cm_id->qp)
+                       rdma_destroy_qp(ic->i_cm_id);
+               if (ic->i_send_cq)
+                       ib_destroy_cq(ic->i_send_cq);
+               if (ic->i_recv_cq)
+                       ib_destroy_cq(ic->i_recv_cq);
+
+               /* then free the resources that ib callbacks use */
                if (ic->i_send_hdrs)
                        ib_dma_free_coherent(dev,
                                           ic->i_send_ring.w_nr *
@@ -689,12 +775,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                if (ic->i_recvs)
                        rds_ib_recv_clear_ring(ic);
 
-               if (ic->i_cm_id->qp)
-                       rdma_destroy_qp(ic->i_cm_id);
-               if (ic->i_send_cq)
-                       ib_destroy_cq(ic->i_send_cq);
-               if (ic->i_recv_cq)
-                       ib_destroy_cq(ic->i_recv_cq);
                rdma_destroy_id(ic->i_cm_id);
 
                /*
@@ -705,7 +785,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 
                ic->i_cm_id = NULL;
                ic->i_pd = NULL;
-               ic->i_mr = NULL;
                ic->i_send_cq = NULL;
                ic->i_recv_cq = NULL;
                ic->i_send_hdrs = NULL;
@@ -768,8 +847,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        }
 
        INIT_LIST_HEAD(&ic->ib_node);
-       tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
-                    (unsigned long) ic);
+       tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+                    (unsigned long)ic);
+       tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
+                    (unsigned long)ic);
        mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);