These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / sunrpc / xprtrdma / verbs.c
index 4870d27..eadd165 100644 (file)
@@ -52,6 +52,7 @@
 #include <linux/prefetch.h>
 #include <linux/sunrpc/addr.h>
 #include <asm/bitops.h>
+#include <linux/module.h> /* try_module_get()/module_put() */
 
 #include "xprt_rdma.h"
 
  * internal functions
  */
 
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
+static struct workqueue_struct *rpcrdma_receive_wq;
 
-static void
-rpcrdma_run_tasklet(unsigned long data)
+int
+rpcrdma_alloc_wq(void)
 {
-       struct rpcrdma_rep *rep;
-       void (*func)(struct rpcrdma_rep *);
-       unsigned long flags;
+       struct workqueue_struct *recv_wq;
 
-       data = data;
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       while (!list_empty(&rpcrdma_tasklets_g)) {
-               rep = list_entry(rpcrdma_tasklets_g.next,
-                                struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               func = rep->rr_func;
-               rep->rr_func = NULL;
-               spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-
-               if (func)
-                       func(rep);
-               else
-                       rpcrdma_recv_buffer_put(rep);
-
-               spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       }
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-}
+       recv_wq = alloc_workqueue("xprtrdma_receive",
+                                 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+                                 0);
+       if (!recv_wq)
+               return -ENOMEM;
 
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
-static const char * const async_event[] = {
-       "CQ error",
-       "QP fatal error",
-       "QP request error",
-       "QP access error",
-       "communication established",
-       "send queue drained",
-       "path migration successful",
-       "path mig error",
-       "device fatal error",
-       "port active",
-       "port error",
-       "LID change",
-       "P_key change",
-       "SM change",
-       "SRQ error",
-       "SRQ limit reached",
-       "last WQE reached",
-       "client reregister",
-       "GID change",
-};
-
-#define ASYNC_MSG(status)                                      \
-       ((status) < ARRAY_SIZE(async_event) ?                   \
-               async_event[(status)] : "unknown async error")
+       rpcrdma_receive_wq = recv_wq;
+       return 0;
+}
 
-static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
+void
+rpcrdma_destroy_wq(void)
 {
-       unsigned long flags;
+       struct workqueue_struct *wq;
 
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       list_splice_tail(sched_list, &rpcrdma_tasklets_g);
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-       tasklet_schedule(&rpcrdma_tasklet_g);
+       if (rpcrdma_receive_wq) {
+               wq = rpcrdma_receive_wq;
+               rpcrdma_receive_wq = NULL;
+               destroy_workqueue(wq);
+       }
 }
 
 static void
@@ -148,7 +103,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
        struct rpcrdma_ep *ep = context;
 
        pr_err("RPC:       %s: %s on device %s ep %p\n",
-              __func__, ASYNC_MSG(event->event),
+              __func__, ib_event_msg(event->event),
                event->device->name, context);
        if (ep->rep_connected == 1) {
                ep->rep_connected = -EIO;
@@ -163,7 +118,7 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
        struct rpcrdma_ep *ep = context;
 
        pr_err("RPC:       %s: %s on device %s ep %p\n",
-              __func__, ASYNC_MSG(event->event),
+              __func__, ib_event_msg(event->event),
                event->device->name, context);
        if (ep->rep_connected == 1) {
                ep->rep_connected = -EIO;
@@ -172,35 +127,6 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
        }
 }
 
-static const char * const wc_status[] = {
-       "success",
-       "local length error",
-       "local QP operation error",
-       "local EE context operation error",
-       "local protection error",
-       "WR flushed",
-       "memory management operation error",
-       "bad response error",
-       "local access error",
-       "remote invalid request error",
-       "remote access error",
-       "remote operation error",
-       "transport retry counter exceeded",
-       "RNR retry counter exceeded",
-       "local RDD violation error",
-       "remove invalid RD request",
-       "operation aborted",
-       "invalid EE context number",
-       "invalid EE context state",
-       "fatal error",
-       "response timeout error",
-       "general error",
-};
-
-#define COMPLETION_MSG(status)                                 \
-       ((status) < ARRAY_SIZE(wc_status) ?                     \
-               wc_status[(status)] : "unexpected completion error")
-
 static void
 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 {
@@ -209,7 +135,7 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
                if (wc->status != IB_WC_SUCCESS &&
                    wc->status != IB_WC_WR_FLUSH_ERR)
                        pr_err("RPC:       %s: SEND: %s\n",
-                              __func__, COMPLETION_MSG(wc->status));
+                              __func__, ib_wc_status_msg(wc->status));
        } else {
                struct rpcrdma_mw *r;
 
@@ -218,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
        }
 }
 
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
 {
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[2];
+       int count, rc;
 
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_send_wcs;
+               pos = wcs;
 
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       return rc;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_sendcq_process_wc(wcs++);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       return 0;
+                       rpcrdma_sendcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
+       return;
 }
 
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
  */
 static void
 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-
-       rc = rpcrdma_sendcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+       do {
+               rpcrdma_sendcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
+}
 
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+static void
+rpcrdma_receive_worker(struct work_struct *work)
+{
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
 
-       rpcrdma_sendcq_poll(cq, ep);
+       rpcrdma_reply_handler(rep);
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -291,126 +208,70 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
                __func__, rep, wc->byte_len);
 
        rep->rr_len = wc->byte_len;
-       ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+       ib_dma_sync_single_for_cpu(rep->rr_device,
                                   rdmab_addr(rep->rr_rdmabuf),
                                   rep->rr_len, DMA_FROM_DEVICE);
        prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-       list_add_tail(&rep->rr_list, sched_list);
+       queue_work(rpcrdma_receive_wq, &rep->rr_work);
        return;
+
 out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
                pr_err("RPC:       %s: rep %p: %s\n",
-                      __func__, rep, COMPLETION_MSG(wc->status));
-       rep->rr_len = ~0U;
+                      __func__, rep, ib_wc_status_msg(wc->status));
+       rep->rr_len = RPCRDMA_BAD_LEN;
        goto out_schedule;
 }
 
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
-       struct list_head sched_list;
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[4];
+       int count, rc;
 
-       INIT_LIST_HEAD(&sched_list);
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_recv_wcs;
+               pos = wcs;
 
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       goto out_schedule;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(wcs++, &sched_list);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       rc = 0;
-
-out_schedule:
-       rpcrdma_schedule_tasklet(&sched_list);
-       return rc;
+                       rpcrdma_recvcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
 }
 
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
  */
 static void
 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-
-       rc = rpcrdma_recvcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-
-       rpcrdma_recvcq_poll(cq, ep);
+       do {
+               rpcrdma_recvcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
 }
 
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
        struct ib_wc wc;
-       LIST_HEAD(sched_list);
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc, &sched_list);
-       if (!list_empty(&sched_list))
-               rpcrdma_schedule_tasklet(&sched_list);
+               rpcrdma_recvcq_process_wc(&wc);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
                rpcrdma_sendcq_process_wc(&wc);
 }
 
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-static const char * const conn[] = {
-       "address resolved",
-       "address error",
-       "route resolved",
-       "route error",
-       "connect request",
-       "connect response",
-       "connect error",
-       "unreachable",
-       "rejected",
-       "established",
-       "disconnected",
-       "device removal",
-       "multicast join",
-       "multicast error",
-       "address change",
-       "timewait exit",
-};
-
-#define CONNECTION_MSG(status)                                         \
-       ((status) < ARRAY_SIZE(conn) ?                                  \
-               conn[(status)] : "unrecognized connection error")
-#endif
-
 static int
 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 {
@@ -476,7 +337,7 @@ connected:
        default:
                dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
                        __func__, sap, rpc_get_port(sap), ep,
-                       CONNECTION_MSG(event->event));
+                       rdma_event_msg(event->event));
                break;
        }
 
@@ -487,7 +348,7 @@ connected:
 
                pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
                        sap, rpc_get_port(sap),
-                       ia->ri_id->device->name,
+                       ia->ri_device->name,
                        ia->ri_ops->ro_displayname,
                        xprt->rx_buf.rb_max_requests,
                        ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
@@ -500,6 +361,14 @@ connected:
        return 0;
 }
 
+static void rpcrdma_destroy_id(struct rdma_cm_id *id)
+{
+       if (id) {
+               module_put(id->device->owner);
+               rdma_destroy_id(id);
+       }
+}
+
 static struct rdma_cm_id *
 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
                        struct rpcrdma_ia *ia, struct sockaddr *addr)
@@ -509,7 +378,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 
        init_completion(&ia->ri_done);
 
-       id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
+       id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
+                           IB_QPT_RC);
        if (IS_ERR(id)) {
                rc = PTR_ERR(id);
                dprintk("RPC:       %s: rdma_create_id() failed %i\n",
@@ -526,6 +396,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
        }
        wait_for_completion_interruptible_timeout(&ia->ri_done,
                                msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
+
+       /* FIXME:
+        * Until xprtrdma supports DEVICE_REMOVAL, the provider must
+        * be pinned while there are active NFS/RDMA mounts to prevent
+        * hangs and crashes at umount time.
+        */
+       if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
+               dprintk("RPC:       %s: Failed to get device module\n",
+                       __func__);
+               ia->ri_async_rc = -ENODEV;
+       }
        rc = ia->ri_async_rc;
        if (rc)
                goto out;
@@ -535,16 +416,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
        if (rc) {
                dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
                        __func__, rc);
-               goto out;
+               goto put;
        }
        wait_for_completion_interruptible_timeout(&ia->ri_done,
                                msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
        rc = ia->ri_async_rc;
        if (rc)
-               goto out;
+               goto put;
 
        return id;
-
+put:
+       module_put(id->device->owner);
 out:
        rdma_destroy_id(id);
        return ERR_PTR(rc);
@@ -579,17 +461,20 @@ rpcrdma_clean_cq(struct ib_cq *cq)
 int
 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 {
-       int rc, mem_priv;
        struct rpcrdma_ia *ia = &xprt->rx_ia;
        struct ib_device_attr *devattr = &ia->ri_devattr;
+       int rc;
+
+       ia->ri_dma_mr = NULL;
 
        ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
        if (IS_ERR(ia->ri_id)) {
                rc = PTR_ERR(ia->ri_id);
                goto out1;
        }
+       ia->ri_device = ia->ri_id->device;
 
-       ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+       ia->ri_pd = ib_alloc_pd(ia->ri_device);
        if (IS_ERR(ia->ri_pd)) {
                rc = PTR_ERR(ia->ri_pd);
                dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
@@ -597,69 +482,39 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
                goto out2;
        }
 
-       rc = ib_query_device(ia->ri_id->device, devattr);
+       rc = ib_query_device(ia->ri_device, devattr);
        if (rc) {
                dprintk("RPC:       %s: ib_query_device failed %d\n",
                        __func__, rc);
                goto out3;
        }
 
-       if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
-               ia->ri_have_dma_lkey = 1;
-               ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
-       }
-
        if (memreg == RPCRDMA_FRMR) {
-               /* Requires both frmr reg and local dma lkey */
-               if (((devattr->device_cap_flags &
-                    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
-                   (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
-                     (devattr->max_fast_reg_page_list_len == 0)) {
+               if (!(devattr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
+                   (devattr->max_fast_reg_page_list_len == 0)) {
                        dprintk("RPC:       %s: FRMR registration "
                                "not supported by HCA\n", __func__);
                        memreg = RPCRDMA_MTHCAFMR;
                }
        }
        if (memreg == RPCRDMA_MTHCAFMR) {
-               if (!ia->ri_id->device->alloc_fmr) {
+               if (!ia->ri_device->alloc_fmr) {
                        dprintk("RPC:       %s: MTHCAFMR registration "
                                "not supported by HCA\n", __func__);
-                       memreg = RPCRDMA_ALLPHYSICAL;
+                       rc = -EINVAL;
+                       goto out3;
                }
        }
 
-       /*
-        * Optionally obtain an underlying physical identity mapping in
-        * order to do a memory window-based bind. This base registration
-        * is protected from remote access - that is enabled only by binding
-        * for the specific bytes targeted during each RPC operation, and
-        * revoked after the corresponding completion similar to a storage
-        * adapter.
-        */
        switch (memreg) {
        case RPCRDMA_FRMR:
                ia->ri_ops = &rpcrdma_frwr_memreg_ops;
                break;
        case RPCRDMA_ALLPHYSICAL:
                ia->ri_ops = &rpcrdma_physical_memreg_ops;
-               mem_priv = IB_ACCESS_LOCAL_WRITE |
-                               IB_ACCESS_REMOTE_WRITE |
-                               IB_ACCESS_REMOTE_READ;
-               goto register_setup;
+               break;
        case RPCRDMA_MTHCAFMR:
                ia->ri_ops = &rpcrdma_fmr_memreg_ops;
-               if (ia->ri_have_dma_lkey)
-                       break;
-               mem_priv = IB_ACCESS_LOCAL_WRITE;
-       register_setup:
-               ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
-               if (IS_ERR(ia->ri_bind_mem)) {
-                       printk(KERN_ALERT "%s: ib_get_dma_mr for "
-                               "phys register failed with %lX\n",
-                               __func__, PTR_ERR(ia->ri_bind_mem));
-                       rc = -ENOMEM;
-                       goto out3;
-               }
                break;
        default:
                printk(KERN_ERR "RPC: Unsupported memory "
@@ -670,9 +525,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
        dprintk("RPC:       %s: memory registration strategy is '%s'\n",
                __func__, ia->ri_ops->ro_displayname);
 
-       /* Else will do memory reg/dereg for each chunk */
-       ia->ri_memreg_strategy = memreg;
-
        rwlock_init(&ia->ri_qplock);
        return 0;
 
@@ -680,7 +532,7 @@ out3:
        ib_dealloc_pd(ia->ri_pd);
        ia->ri_pd = NULL;
 out2:
-       rdma_destroy_id(ia->ri_id);
+       rpcrdma_destroy_id(ia->ri_id);
        ia->ri_id = NULL;
 out1:
        return rc;
@@ -694,25 +546,17 @@ out1:
 void
 rpcrdma_ia_close(struct rpcrdma_ia *ia)
 {
-       int rc;
-
        dprintk("RPC:       %s: entering\n", __func__);
-       if (ia->ri_bind_mem != NULL) {
-               rc = ib_dereg_mr(ia->ri_bind_mem);
-               dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
-                       __func__, rc);
-       }
        if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
                if (ia->ri_id->qp)
                        rdma_destroy_qp(ia->ri_id);
-               rdma_destroy_id(ia->ri_id);
+               rpcrdma_destroy_id(ia->ri_id);
                ia->ri_id = NULL;
        }
-       if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
-               rc = ib_dealloc_pd(ia->ri_pd);
-               dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
-                       __func__, rc);
-       }
+
+       /* If the pd is still busy, xprtrdma missed freeing a resource */
+       if (ia->ri_pd && !IS_ERR(ia->ri_pd))
+               ib_dealloc_pd(ia->ri_pd);
 }
 
 /*
@@ -724,35 +568,44 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 {
        struct ib_device_attr *devattr = &ia->ri_devattr;
        struct ib_cq *sendcq, *recvcq;
+       struct ib_cq_init_attr cq_attr = {};
+       unsigned int max_qp_wr;
        int rc, err;
 
+       if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
+               dprintk("RPC:       %s: insufficient sge's available\n",
+                       __func__);
+               return -ENOMEM;
+       }
+
+       if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+               dprintk("RPC:       %s: insufficient wqe's available\n",
+                       __func__);
+               return -ENOMEM;
+       }
+       max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
        /* check provider's send/recv wr limits */
-       if (cdata->max_requests > devattr->max_qp_wr)
-               cdata->max_requests = devattr->max_qp_wr;
+       if (cdata->max_requests > max_qp_wr)
+               cdata->max_requests = max_qp_wr;
 
        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
        ep->rep_attr.qp_context = ep;
        ep->rep_attr.srq = NULL;
        ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
        rc = ia->ri_ops->ro_open(ia, ep, cdata);
        if (rc)
                return rc;
        ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
-       ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
+       ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+       ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
        ep->rep_attr.cap.max_recv_sge = 1;
        ep->rep_attr.cap.max_inline_data = 0;
        ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
        ep->rep_attr.qp_type = IB_QPT_RC;
        ep->rep_attr.port_num = ~0;
 
-       if (cdata->padding) {
-               ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
-                                                     GFP_KERNEL);
-               if (IS_ERR(ep->rep_padbuf))
-                       return PTR_ERR(ep->rep_padbuf);
-       } else
-               ep->rep_padbuf = NULL;
-
        dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
                "iovs: send %d recv %d\n",
                __func__,
@@ -771,9 +624,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
-       sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
-                                 rpcrdma_cq_async_error_upcall, ep,
-                                 ep->rep_attr.cap.max_send_wr + 1, 0);
+       cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
+       sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(sendcq)) {
                rc = PTR_ERR(sendcq);
                dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -788,9 +641,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                goto out2;
        }
 
-       recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
-                                 rpcrdma_cq_async_error_upcall, ep,
-                                 ep->rep_attr.cap.max_recv_wr + 1, 0);
+       cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
+       recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -835,7 +688,8 @@ out2:
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, err);
 out1:
-       rpcrdma_free_regbuf(ia, ep->rep_padbuf);
+       if (ia->ri_dma_mr)
+               ib_dereg_mr(ia->ri_dma_mr);
        return rc;
 }
 
@@ -856,25 +710,32 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
        cancel_delayed_work_sync(&ep->rep_connect_worker);
 
-       if (ia->ri_id->qp) {
+       if (ia->ri_id->qp)
                rpcrdma_ep_disconnect(ep, ia);
+
+       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+       rpcrdma_clean_cq(ep->rep_attr.send_cq);
+
+       if (ia->ri_id->qp) {
                rdma_destroy_qp(ia->ri_id);
                ia->ri_id->qp = NULL;
        }
 
-       rpcrdma_free_regbuf(ia, ep->rep_padbuf);
-
-       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
        rc = ib_destroy_cq(ep->rep_attr.recv_cq);
        if (rc)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, rc);
 
-       rpcrdma_clean_cq(ep->rep_attr.send_cq);
        rc = ib_destroy_cq(ep->rep_attr.send_cq);
        if (rc)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, rc);
+
+       if (ia->ri_dma_mr) {
+               rc = ib_dereg_mr(ia->ri_dma_mr);
+               dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
+                       __func__, rc);
+       }
 }
 
 /*
@@ -896,8 +757,6 @@ retry:
                rpcrdma_flush_cqs(ep);
 
                xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
-               ia->ri_ops->ro_reset(xprt);
-
                id = rpcrdma_create_id(xprt, ia,
                                (struct sockaddr *)&xprt->rx_data.addr);
                if (IS_ERR(id)) {
@@ -911,10 +770,10 @@ retry:
                 * More stuff I haven't thought of!
                 * Rrrgh!
                 */
-               if (ia->ri_id->device != id->device) {
+               if (ia->ri_device != id->device) {
                        printk("RPC:       %s: can't reconnect on "
                                "different device!\n", __func__);
-                       rdma_destroy_id(id);
+                       rpcrdma_destroy_id(id);
                        rc = -ENETUNREACH;
                        goto out;
                }
@@ -923,7 +782,7 @@ retry:
                if (rc) {
                        dprintk("RPC:       %s: rdma_create_qp failed %i\n",
                                __func__, rc);
-                       rdma_destroy_id(id);
+                       rpcrdma_destroy_id(id);
                        rc = -ENETUNREACH;
                        goto out;
                }
@@ -934,7 +793,7 @@ retry:
                write_unlock(&ia->ri_qplock);
 
                rdma_destroy_qp(old);
-               rdma_destroy_id(old);
+               rpcrdma_destroy_id(old);
        } else {
                dprintk("RPC:       %s: connecting...\n", __func__);
                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
@@ -983,7 +842,21 @@ retry:
                }
                rc = ep->rep_connected;
        } else {
+               struct rpcrdma_xprt *r_xprt;
+               unsigned int extras;
+
                dprintk("RPC:       %s: connected\n", __func__);
+
+               r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+               extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+               if (extras) {
+                       rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+                       if (rc)
+                               pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+                                       __func__, rc);
+                               rc = 0;
+               }
        }
 
 out:
@@ -1020,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        }
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+       struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
 
        req = kzalloc(sizeof(*req), GFP_KERNEL);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
+       INIT_LIST_HEAD(&req->rl_free);
+       spin_lock(&buffer->rb_reqslock);
+       list_add(&req->rl_all, &buffer->rb_allreqs);
+       spin_unlock(&buffer->rb_reqslock);
        req->rl_buffer = &r_xprt->rx_buf;
        return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -1053,7 +931,9 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
                goto out_free;
        }
 
-       rep->rr_buffer = &r_xprt->rx_buf;
+       rep->rr_device = ia->ri_device;
+       rep->rr_rxprt = r_xprt;
+       INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
 
 out_free:
@@ -1067,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-       char *p;
-       size_t len;
        int i, rc;
 
-       buf->rb_max_requests = cdata->max_requests;
+       buf->rb_max_requests = r_xprt->rx_data.max_requests;
+       buf->rb_bc_srv_max_requests = 0;
        spin_lock_init(&buf->rb_lock);
 
-       /* Need to allocate:
-        *   1.  arrays for send and recv pointers
-        *   2.  arrays of struct rpcrdma_req to fill in pointers
-        *   3.  array of struct rpcrdma_rep for replies
-        * Send/recv buffers in req/rep need to be registered
-        */
-       len = buf->rb_max_requests *
-               (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
-       p = kzalloc(len, GFP_KERNEL);
-       if (p == NULL) {
-               dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
-                       __func__, len);
-               rc = -ENOMEM;
-               goto out;
-       }
-       buf->rb_pool = p;       /* for freeing it later */
-
-       buf->rb_send_bufs = (struct rpcrdma_req **) p;
-       p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
-       buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
-       p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
        rc = ia->ri_ops->ro_init(r_xprt);
        if (rc)
                goto out;
 
+       INIT_LIST_HEAD(&buf->rb_send_bufs);
+       INIT_LIST_HEAD(&buf->rb_allreqs);
+       spin_lock_init(&buf->rb_reqslock);
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
-               struct rpcrdma_rep *rep;
 
                req = rpcrdma_create_req(r_xprt);
                if (IS_ERR(req)) {
@@ -1113,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
                        rc = PTR_ERR(req);
                        goto out;
                }
-               buf->rb_send_bufs[i] = req;
+               req->rl_backchannel = false;
+               list_add(&req->rl_free, &buf->rb_send_bufs);
+       }
+
+       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       for (i = 0; i < buf->rb_max_requests + 2; i++) {
+               struct rpcrdma_rep *rep;
 
                rep = rpcrdma_create_rep(r_xprt);
                if (IS_ERR(rep)) {
@@ -1122,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
                        rc = PTR_ERR(rep);
                        goto out;
                }
-               buf->rb_recv_bufs[i] = rep;
+               list_add(&rep->rr_list, &buf->rb_recv_bufs);
        }
 
        return 0;
@@ -1131,22 +994,38 @@ out:
        return rc;
 }
 
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_req *req;
+
+       req = list_first_entry(&buf->rb_send_bufs,
+                              struct rpcrdma_req, rl_free);
+       list_del(&req->rl_free);
+       return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_rep *rep;
+
+       rep = list_first_entry(&buf->rb_recv_bufs,
+                              struct rpcrdma_rep, rr_list);
+       list_del(&rep->rr_list);
+       return rep;
+}
+
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-       if (!rep)
-               return;
-
        rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
        kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-       if (!req)
-               return;
-
        rpcrdma_free_regbuf(ia, req->rl_sendbuf);
        rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
        kfree(req);
@@ -1156,220 +1035,88 @@ void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-       int i;
-
-       /* clean up in reverse order from create
-        *   1.  recv mr memory (mr free, then kfree)
-        *   2.  send mr memory (mr free, then kfree)
-        *   3.  MWs
-        */
-       dprintk("RPC:       %s: entering\n", __func__);
-
-       for (i = 0; i < buf->rb_max_requests; i++) {
-               if (buf->rb_recv_bufs)
-                       rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
-               if (buf->rb_send_bufs)
-                       rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
-       }
-
-       ia->ri_ops->ro_destroy(buf);
 
-       kfree(buf->rb_pool);
-}
+       while (!list_empty(&buf->rb_recv_bufs)) {
+               struct rpcrdma_rep *rep;
 
-/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
- * some req segments uninitialized.
- */
-static void
-rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
-{
-       if (*mw) {
-               list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
-               *mw = NULL;
+               rep = rpcrdma_buffer_get_rep_locked(buf);
+               rpcrdma_destroy_rep(ia, rep);
        }
-}
 
-/* Cycle mw's back in reverse order, and "spin" them.
- * This delays and scrambles reuse as much as possible.
- */
-static void
-rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-       struct rpcrdma_mr_seg *seg = req->rl_segments;
-       struct rpcrdma_mr_seg *seg1 = seg;
-       int i;
+       spin_lock(&buf->rb_reqslock);
+       while (!list_empty(&buf->rb_allreqs)) {
+               struct rpcrdma_req *req;
 
-       for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
-               rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
-       rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
-}
+               req = list_first_entry(&buf->rb_allreqs,
+                                      struct rpcrdma_req, rl_all);
+               list_del(&req->rl_all);
 
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-       buf->rb_send_bufs[--buf->rb_send_index] = req;
-       req->rl_niovs = 0;
-       if (req->rl_reply) {
-               buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-               req->rl_reply->rr_func = NULL;
-               req->rl_reply = NULL;
+               spin_unlock(&buf->rb_reqslock);
+               rpcrdma_destroy_req(ia, req);
+               spin_lock(&buf->rb_reqslock);
        }
-}
-
-/* rpcrdma_unmap_one() was already done during deregistration.
- * Redo only the ib_post_send().
- */
-static void
-rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
-{
-       struct rpcrdma_xprt *r_xprt =
-                               container_of(ia, struct rpcrdma_xprt, rx_ia);
-       struct ib_send_wr invalidate_wr, *bad_wr;
-       int rc;
-
-       dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
-
-       /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
-       r->r.frmr.fr_state = FRMR_IS_INVALID;
-
-       memset(&invalidate_wr, 0, sizeof(invalidate_wr));
-       invalidate_wr.wr_id = (unsigned long)(void *)r;
-       invalidate_wr.opcode = IB_WR_LOCAL_INV;
-       invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
-       DECR_CQCOUNT(&r_xprt->rx_ep);
-
-       dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
-               __func__, r, r->r.frmr.fr_mr->rkey);
+       spin_unlock(&buf->rb_reqslock);
 
-       read_lock(&ia->ri_qplock);
-       rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
-       read_unlock(&ia->ri_qplock);
-       if (rc) {
-               /* Force rpcrdma_buffer_get() to retry */
-               r->r.frmr.fr_state = FRMR_IS_STALE;
-               dprintk("RPC:       %s: ib_post_send failed, %i\n",
-                       __func__, rc);
-       }
+       ia->ri_ops->ro_destroy(buf);
 }
 
-static void
-rpcrdma_retry_flushed_linv(struct list_head *stale,
-                          struct rpcrdma_buffer *buf)
+struct rpcrdma_mw *
+rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 {
-       struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-       struct list_head *pos;
-       struct rpcrdma_mw *r;
-       unsigned long flags;
-
-       list_for_each(pos, stale) {
-               r = list_entry(pos, struct rpcrdma_mw, mw_list);
-               rpcrdma_retry_local_inv(r, ia);
-       }
-
-       spin_lock_irqsave(&buf->rb_lock, flags);
-       list_splice_tail(stale, &buf->rb_mws);
-       spin_unlock_irqrestore(&buf->rb_lock, flags);
-}
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_mw *mw = NULL;
 
-static struct rpcrdma_req *
-rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
-                        struct list_head *stale)
-{
-       struct rpcrdma_mw *r;
-       int i;
-
-       i = RPCRDMA_MAX_SEGS - 1;
-       while (!list_empty(&buf->rb_mws)) {
-               r = list_entry(buf->rb_mws.next,
-                              struct rpcrdma_mw, mw_list);
-               list_del(&r->mw_list);
-               if (r->r.frmr.fr_state == FRMR_IS_STALE) {
-                       list_add(&r->mw_list, stale);
-                       continue;
-               }
-               req->rl_segments[i].rl_mw = r;
-               if (unlikely(i-- == 0))
-                       return req;     /* Success */
+       spin_lock(&buf->rb_mwlock);
+       if (!list_empty(&buf->rb_mws)) {
+               mw = list_first_entry(&buf->rb_mws,
+                                     struct rpcrdma_mw, mw_list);
+               list_del_init(&mw->mw_list);
        }
+       spin_unlock(&buf->rb_mwlock);
 
-       /* Not enough entries on rb_mws for this req */
-       rpcrdma_buffer_put_sendbuf(req, buf);
-       rpcrdma_buffer_put_mrs(req, buf);
-       return NULL;
+       if (!mw)
+               pr_err("RPC:       %s: no MWs available\n", __func__);
+       return mw;
 }
 
-static struct rpcrdma_req *
-rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+void
+rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 {
-       struct rpcrdma_mw *r;
-       int i;
-
-       i = RPCRDMA_MAX_SEGS - 1;
-       while (!list_empty(&buf->rb_mws)) {
-               r = list_entry(buf->rb_mws.next,
-                              struct rpcrdma_mw, mw_list);
-               list_del(&r->mw_list);
-               req->rl_segments[i].rl_mw = r;
-               if (unlikely(i-- == 0))
-                       return req;     /* Success */
-       }
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
-       /* Not enough entries on rb_mws for this req */
-       rpcrdma_buffer_put_sendbuf(req, buf);
-       rpcrdma_buffer_put_mrs(req, buf);
-       return NULL;
+       spin_lock(&buf->rb_mwlock);
+       list_add_tail(&mw->mw_list, &buf->rb_mws);
+       spin_unlock(&buf->rb_mwlock);
 }
 
 /*
  * Get a set of request/reply buffers.
  *
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- *    rb_send_index and rb_recv_index MUST always be pointing to the
- *    *next* available buffer (non-NULL). They are incremented after
- *    removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
-       struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
-       struct list_head stale;
        struct rpcrdma_req *req;
-       unsigned long flags;
-
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       if (buffers->rb_send_index == buffers->rb_max_requests) {
-               spin_unlock_irqrestore(&buffers->rb_lock, flags);
-               dprintk("RPC:       %s: out of request buffers\n", __func__);
-               return ((struct rpcrdma_req *)NULL);
-       }
 
-       req = buffers->rb_send_bufs[buffers->rb_send_index];
-       if (buffers->rb_send_index < buffers->rb_recv_index) {
-               dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
-                       __func__,
-                       buffers->rb_recv_index - buffers->rb_send_index);
-               req->rl_reply = NULL;
-       } else {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+       spin_lock(&buffers->rb_lock);
+       if (list_empty(&buffers->rb_send_bufs))
+               goto out_reqbuf;
+       req = rpcrdma_buffer_get_req_locked(buffers);
+       if (list_empty(&buffers->rb_recv_bufs))
+               goto out_repbuf;
+       req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
+       return req;
 
-       INIT_LIST_HEAD(&stale);
-       switch (ia->ri_memreg_strategy) {
-       case RPCRDMA_FRMR:
-               req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
-               break;
-       case RPCRDMA_MTHCAFMR:
-               req = rpcrdma_buffer_get_fmrs(req, buffers);
-               break;
-       default:
-               break;
-       }
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
-       if (!list_empty(&stale))
-               rpcrdma_retry_flushed_linv(&stale, buffers);
+out_reqbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of request buffers\n", __func__);
+       return NULL;
+out_repbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of reply buffers\n", __func__);
+       req->rl_reply = NULL;
        return req;
 }
 
@@ -1381,39 +1128,31 @@ void
 rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
-       unsigned long flags;
+       struct rpcrdma_rep *rep = req->rl_reply;
 
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       rpcrdma_buffer_put_sendbuf(req, buffers);
-       switch (ia->ri_memreg_strategy) {
-       case RPCRDMA_FRMR:
-       case RPCRDMA_MTHCAFMR:
-               rpcrdma_buffer_put_mrs(req, buffers);
-               break;
-       default:
-               break;
-       }
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       req->rl_niovs = 0;
+       req->rl_reply = NULL;
+
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+       if (rep)
+               list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
  * Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
  */
 void
 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       unsigned long flags;
 
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       if (buffers->rb_recv_index < buffers->rb_max_requests) {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       if (!list_empty(&buffers->rb_recv_bufs))
+               req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1423,13 +1162,11 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
-       struct rpcrdma_buffer *buffers = rep->rr_buffer;
-       unsigned long flags;
+       struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
 
-       rep->rr_func = NULL;
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1444,75 +1181,6 @@ rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
                (unsigned long long)seg->mr_dma, seg->mr_dmalen);
 }
 
-static int
-rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
-                               struct ib_mr **mrp, struct ib_sge *iov)
-{
-       struct ib_phys_buf ipb;
-       struct ib_mr *mr;
-       int rc;
-
-       /*
-        * All memory passed here was kmalloc'ed, therefore phys-contiguous.
-        */
-       iov->addr = ib_dma_map_single(ia->ri_id->device,
-                       va, len, DMA_BIDIRECTIONAL);
-       if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
-               return -ENOMEM;
-
-       iov->length = len;
-
-       if (ia->ri_have_dma_lkey) {
-               *mrp = NULL;
-               iov->lkey = ia->ri_dma_lkey;
-               return 0;
-       } else if (ia->ri_bind_mem != NULL) {
-               *mrp = NULL;
-               iov->lkey = ia->ri_bind_mem->lkey;
-               return 0;
-       }
-
-       ipb.addr = iov->addr;
-       ipb.size = iov->length;
-       mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
-                       IB_ACCESS_LOCAL_WRITE, &iov->addr);
-
-       dprintk("RPC:       %s: phys convert: 0x%llx "
-                       "registered 0x%llx length %d\n",
-                       __func__, (unsigned long long)ipb.addr,
-                       (unsigned long long)iov->addr, len);
-
-       if (IS_ERR(mr)) {
-               *mrp = NULL;
-               rc = PTR_ERR(mr);
-               dprintk("RPC:       %s: failed with %i\n", __func__, rc);
-       } else {
-               *mrp = mr;
-               iov->lkey = mr->lkey;
-               rc = 0;
-       }
-
-       return rc;
-}
-
-static int
-rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
-                               struct ib_mr *mr, struct ib_sge *iov)
-{
-       int rc;
-
-       ib_dma_unmap_single(ia->ri_id->device,
-                       iov->addr, iov->length, DMA_BIDIRECTIONAL);
-
-       if (NULL == mr)
-               return 0;
-
-       rc = ib_dereg_mr(mr);
-       if (rc)
-               dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
-       return rc;
-}
-
 /**
  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
  * @ia: controlling rpcrdma_ia
@@ -1532,26 +1200,29 @@ struct rpcrdma_regbuf *
 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
 {
        struct rpcrdma_regbuf *rb;
-       int rc;
+       struct ib_sge *iov;
 
-       rc = -ENOMEM;
        rb = kmalloc(sizeof(*rb) + size, flags);
        if (rb == NULL)
                goto out;
 
-       rb->rg_size = size;
-       rb->rg_owner = NULL;
-       rc = rpcrdma_register_internal(ia, rb->rg_base, size,
-                                      &rb->rg_mr, &rb->rg_iov);
-       if (rc)
+       iov = &rb->rg_iov;
+       iov->addr = ib_dma_map_single(ia->ri_device,
+                                     (void *)rb->rg_base, size,
+                                     DMA_BIDIRECTIONAL);
+       if (ib_dma_mapping_error(ia->ri_device, iov->addr))
                goto out_free;
 
+       iov->length = size;
+       iov->lkey = ia->ri_pd->local_dma_lkey;
+       rb->rg_size = size;
+       rb->rg_owner = NULL;
        return rb;
 
 out_free:
        kfree(rb);
 out:
-       return ERR_PTR(rc);
+       return ERR_PTR(-ENOMEM);
 }
 
 /**
@@ -1562,10 +1233,15 @@ out:
 void
 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
 {
-       if (rb) {
-               rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
-               kfree(rb);
-       }
+       struct ib_sge *iov;
+
+       if (!rb)
+               return;
+
+       iov = &rb->rg_iov;
+       ib_dma_unmap_single(ia->ri_device,
+                           iov->addr, iov->length, DMA_BIDIRECTIONAL);
+       kfree(rb);
 }
 
 /*
@@ -1578,9 +1254,11 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
                struct rpcrdma_ep *ep,
                struct rpcrdma_req *req)
 {
+       struct ib_device *device = ia->ri_device;
        struct ib_send_wr send_wr, *send_wr_fail;
        struct rpcrdma_rep *rep = req->rl_reply;
-       int rc;
+       struct ib_sge *iov = req->rl_send_iov;
+       int i, rc;
 
        if (rep) {
                rc = rpcrdma_ep_post_recv(ia, ep, rep);
@@ -1591,19 +1269,15 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 
        send_wr.next = NULL;
        send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
-       send_wr.sg_list = req->rl_send_iov;
+       send_wr.sg_list = iov;
        send_wr.num_sge = req->rl_niovs;
        send_wr.opcode = IB_WR_SEND;
-       if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
-               ib_dma_sync_single_for_device(ia->ri_id->device,
-                       req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
-                       DMA_TO_DEVICE);
-       ib_dma_sync_single_for_device(ia->ri_id->device,
-               req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
-               DMA_TO_DEVICE);
-       ib_dma_sync_single_for_device(ia->ri_id->device,
-               req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
-               DMA_TO_DEVICE);
+
+       for (i = 0; i < send_wr.num_sge; i++)
+               ib_dma_sync_single_for_device(device, iov[i].addr,
+                                             iov[i].length, DMA_TO_DEVICE);
+       dprintk("RPC:       %s: posting %d s/g entries\n",
+               __func__, send_wr.num_sge);
 
        if (DECR_CQCOUNT(ep) > 0)
                send_wr.send_flags = 0;
@@ -1636,7 +1310,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
        recv_wr.num_sge = 1;
 
-       ib_dma_sync_single_for_cpu(ia->ri_id->device,
+       ib_dma_sync_single_for_cpu(ia->ri_device,
                                   rdmab_addr(rep->rr_rdmabuf),
                                   rdmab_length(rep->rr_rdmabuf),
                                   DMA_BIDIRECTIONAL);
@@ -1649,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        return rc;
 }
 
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+       struct rpcrdma_rep *rep;
+       unsigned long flags;
+       int rc;
+
+       while (count--) {
+               spin_lock_irqsave(&buffers->rb_lock, flags);
+               if (list_empty(&buffers->rb_recv_bufs))
+                       goto out_reqbuf;
+               rep = rpcrdma_buffer_get_rep_locked(buffers);
+               spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+               rc = rpcrdma_ep_post_recv(ia, ep, rep);
+               if (rc)
+                       goto out_rc;
+       }
+
+       return 0;
+
+out_reqbuf:
+       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       pr_warn("%s: no extra receive buffers\n", __func__);
+       return -ENOMEM;
+
+out_rc:
+       rpcrdma_recv_buffer_put(rep);
+       return rc;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int