Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55
56 #include "xprt_rdma.h"
57
58 /*
59  * Globals/Macros
60  */
61
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY        RPCDBG_TRANS
64 #endif
65
66 /*
67  * internal functions
68  */
69
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82         struct rpcrdma_rep *rep;
83         void (*func)(struct rpcrdma_rep *);
84         unsigned long flags;
85
86         data = data;
87         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88         while (!list_empty(&rpcrdma_tasklets_g)) {
89                 rep = list_entry(rpcrdma_tasklets_g.next,
90                                  struct rpcrdma_rep, rr_list);
91                 list_del(&rep->rr_list);
92                 func = rep->rr_func;
93                 rep->rr_func = NULL;
94                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96                 if (func)
97                         func(rep);
98                 else
99                         rpcrdma_recv_buffer_put(rep);
100
101                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102         }
103         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108 static const char * const async_event[] = {
109         "CQ error",
110         "QP fatal error",
111         "QP request error",
112         "QP access error",
113         "communication established",
114         "send queue drained",
115         "path migration successful",
116         "path mig error",
117         "device fatal error",
118         "port active",
119         "port error",
120         "LID change",
121         "P_key change",
122         "SM change",
123         "SRQ error",
124         "SRQ limit reached",
125         "last WQE reached",
126         "client reregister",
127         "GID change",
128 };
129
130 #define ASYNC_MSG(status)                                       \
131         ((status) < ARRAY_SIZE(async_event) ?                   \
132                 async_event[(status)] : "unknown async error")
133
134 static void
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
136 {
137         unsigned long flags;
138
139         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140         list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142         tasklet_schedule(&rpcrdma_tasklet_g);
143 }
144
145 static void
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
147 {
148         struct rpcrdma_ep *ep = context;
149
150         pr_err("RPC:       %s: %s on device %s ep %p\n",
151                __func__, ASYNC_MSG(event->event),
152                 event->device->name, context);
153         if (ep->rep_connected == 1) {
154                 ep->rep_connected = -EIO;
155                 rpcrdma_conn_func(ep);
156                 wake_up_all(&ep->rep_connect_wait);
157         }
158 }
159
160 static void
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
162 {
163         struct rpcrdma_ep *ep = context;
164
165         pr_err("RPC:       %s: %s on device %s ep %p\n",
166                __func__, ASYNC_MSG(event->event),
167                 event->device->name, context);
168         if (ep->rep_connected == 1) {
169                 ep->rep_connected = -EIO;
170                 rpcrdma_conn_func(ep);
171                 wake_up_all(&ep->rep_connect_wait);
172         }
173 }
174
175 static const char * const wc_status[] = {
176         "success",
177         "local length error",
178         "local QP operation error",
179         "local EE context operation error",
180         "local protection error",
181         "WR flushed",
182         "memory management operation error",
183         "bad response error",
184         "local access error",
185         "remote invalid request error",
186         "remote access error",
187         "remote operation error",
188         "transport retry counter exceeded",
189         "RNR retry counter exceeded",
190         "local RDD violation error",
191         "remove invalid RD request",
192         "operation aborted",
193         "invalid EE context number",
194         "invalid EE context state",
195         "fatal error",
196         "response timeout error",
197         "general error",
198 };
199
200 #define COMPLETION_MSG(status)                                  \
201         ((status) < ARRAY_SIZE(wc_status) ?                     \
202                 wc_status[(status)] : "unexpected completion error")
203
204 static void
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
206 {
207         /* WARNING: Only wr_id and status are reliable at this point */
208         if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209                 if (wc->status != IB_WC_SUCCESS &&
210                     wc->status != IB_WC_WR_FLUSH_ERR)
211                         pr_err("RPC:       %s: SEND: %s\n",
212                                __func__, COMPLETION_MSG(wc->status));
213         } else {
214                 struct rpcrdma_mw *r;
215
216                 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217                 r->mw_sendcompletion(wc);
218         }
219 }
220
221 static int
222 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
223 {
224         struct ib_wc *wcs;
225         int budget, count, rc;
226
227         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
228         do {
229                 wcs = ep->rep_send_wcs;
230
231                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
232                 if (rc <= 0)
233                         return rc;
234
235                 count = rc;
236                 while (count-- > 0)
237                         rpcrdma_sendcq_process_wc(wcs++);
238         } while (rc == RPCRDMA_POLLSIZE && --budget);
239         return 0;
240 }
241
242 /*
243  * Handle send, fast_reg_mr, and local_inv completions.
244  *
245  * Send events are typically suppressed and thus do not result
246  * in an upcall. Occasionally one is signaled, however. This
247  * prevents the provider's completion queue from wrapping and
248  * losing a completion.
249  */
250 static void
251 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
252 {
253         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
254         int rc;
255
256         rc = rpcrdma_sendcq_poll(cq, ep);
257         if (rc) {
258                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
259                         __func__, rc);
260                 return;
261         }
262
263         rc = ib_req_notify_cq(cq,
264                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
265         if (rc == 0)
266                 return;
267         if (rc < 0) {
268                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
269                         __func__, rc);
270                 return;
271         }
272
273         rpcrdma_sendcq_poll(cq, ep);
274 }
275
276 static void
277 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
278 {
279         struct rpcrdma_rep *rep =
280                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
281
282         /* WARNING: Only wr_id and status are reliable at this point */
283         if (wc->status != IB_WC_SUCCESS)
284                 goto out_fail;
285
286         /* status == SUCCESS means all fields in wc are trustworthy */
287         if (wc->opcode != IB_WC_RECV)
288                 return;
289
290         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
291                 __func__, rep, wc->byte_len);
292
293         rep->rr_len = wc->byte_len;
294         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
295                                    rdmab_addr(rep->rr_rdmabuf),
296                                    rep->rr_len, DMA_FROM_DEVICE);
297         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
298
299 out_schedule:
300         list_add_tail(&rep->rr_list, sched_list);
301         return;
302 out_fail:
303         if (wc->status != IB_WC_WR_FLUSH_ERR)
304                 pr_err("RPC:       %s: rep %p: %s\n",
305                        __func__, rep, COMPLETION_MSG(wc->status));
306         rep->rr_len = ~0U;
307         goto out_schedule;
308 }
309
310 static int
311 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
312 {
313         struct list_head sched_list;
314         struct ib_wc *wcs;
315         int budget, count, rc;
316
317         INIT_LIST_HEAD(&sched_list);
318         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
319         do {
320                 wcs = ep->rep_recv_wcs;
321
322                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
323                 if (rc <= 0)
324                         goto out_schedule;
325
326                 count = rc;
327                 while (count-- > 0)
328                         rpcrdma_recvcq_process_wc(wcs++, &sched_list);
329         } while (rc == RPCRDMA_POLLSIZE && --budget);
330         rc = 0;
331
332 out_schedule:
333         rpcrdma_schedule_tasklet(&sched_list);
334         return rc;
335 }
336
337 /*
338  * Handle receive completions.
339  *
340  * It is reentrant but processes single events in order to maintain
341  * ordering of receives to keep server credits.
342  *
343  * It is the responsibility of the scheduled tasklet to return
344  * recv buffers to the pool. NOTE: this affects synchronization of
345  * connection shutdown. That is, the structures required for
346  * the completion of the reply handler must remain intact until
347  * all memory has been reclaimed.
348  */
349 static void
350 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
351 {
352         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
353         int rc;
354
355         rc = rpcrdma_recvcq_poll(cq, ep);
356         if (rc) {
357                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
358                         __func__, rc);
359                 return;
360         }
361
362         rc = ib_req_notify_cq(cq,
363                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
364         if (rc == 0)
365                 return;
366         if (rc < 0) {
367                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
368                         __func__, rc);
369                 return;
370         }
371
372         rpcrdma_recvcq_poll(cq, ep);
373 }
374
375 static void
376 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
377 {
378         struct ib_wc wc;
379         LIST_HEAD(sched_list);
380
381         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
382                 rpcrdma_recvcq_process_wc(&wc, &sched_list);
383         if (!list_empty(&sched_list))
384                 rpcrdma_schedule_tasklet(&sched_list);
385         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
386                 rpcrdma_sendcq_process_wc(&wc);
387 }
388
389 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
390 static const char * const conn[] = {
391         "address resolved",
392         "address error",
393         "route resolved",
394         "route error",
395         "connect request",
396         "connect response",
397         "connect error",
398         "unreachable",
399         "rejected",
400         "established",
401         "disconnected",
402         "device removal",
403         "multicast join",
404         "multicast error",
405         "address change",
406         "timewait exit",
407 };
408
409 #define CONNECTION_MSG(status)                                          \
410         ((status) < ARRAY_SIZE(conn) ?                                  \
411                 conn[(status)] : "unrecognized connection error")
412 #endif
413
414 static int
415 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
416 {
417         struct rpcrdma_xprt *xprt = id->context;
418         struct rpcrdma_ia *ia = &xprt->rx_ia;
419         struct rpcrdma_ep *ep = &xprt->rx_ep;
420 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
421         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
422 #endif
423         struct ib_qp_attr *attr = &ia->ri_qp_attr;
424         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
425         int connstate = 0;
426
427         switch (event->event) {
428         case RDMA_CM_EVENT_ADDR_RESOLVED:
429         case RDMA_CM_EVENT_ROUTE_RESOLVED:
430                 ia->ri_async_rc = 0;
431                 complete(&ia->ri_done);
432                 break;
433         case RDMA_CM_EVENT_ADDR_ERROR:
434                 ia->ri_async_rc = -EHOSTUNREACH;
435                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
436                         __func__, ep);
437                 complete(&ia->ri_done);
438                 break;
439         case RDMA_CM_EVENT_ROUTE_ERROR:
440                 ia->ri_async_rc = -ENETUNREACH;
441                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
442                         __func__, ep);
443                 complete(&ia->ri_done);
444                 break;
445         case RDMA_CM_EVENT_ESTABLISHED:
446                 connstate = 1;
447                 ib_query_qp(ia->ri_id->qp, attr,
448                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
449                             iattr);
450                 dprintk("RPC:       %s: %d responder resources"
451                         " (%d initiator)\n",
452                         __func__, attr->max_dest_rd_atomic,
453                         attr->max_rd_atomic);
454                 goto connected;
455         case RDMA_CM_EVENT_CONNECT_ERROR:
456                 connstate = -ENOTCONN;
457                 goto connected;
458         case RDMA_CM_EVENT_UNREACHABLE:
459                 connstate = -ENETDOWN;
460                 goto connected;
461         case RDMA_CM_EVENT_REJECTED:
462                 connstate = -ECONNREFUSED;
463                 goto connected;
464         case RDMA_CM_EVENT_DISCONNECTED:
465                 connstate = -ECONNABORTED;
466                 goto connected;
467         case RDMA_CM_EVENT_DEVICE_REMOVAL:
468                 connstate = -ENODEV;
469 connected:
470                 dprintk("RPC:       %s: %sconnected\n",
471                                         __func__, connstate > 0 ? "" : "dis");
472                 ep->rep_connected = connstate;
473                 rpcrdma_conn_func(ep);
474                 wake_up_all(&ep->rep_connect_wait);
475                 /*FALLTHROUGH*/
476         default:
477                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
478                         __func__, sap, rpc_get_port(sap), ep,
479                         CONNECTION_MSG(event->event));
480                 break;
481         }
482
483 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
484         if (connstate == 1) {
485                 int ird = attr->max_dest_rd_atomic;
486                 int tird = ep->rep_remote_cma.responder_resources;
487
488                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
489                         sap, rpc_get_port(sap),
490                         ia->ri_id->device->name,
491                         ia->ri_ops->ro_displayname,
492                         xprt->rx_buf.rb_max_requests,
493                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
494         } else if (connstate < 0) {
495                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
496                         sap, rpc_get_port(sap), connstate);
497         }
498 #endif
499
500         return 0;
501 }
502
503 static struct rdma_cm_id *
504 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
505                         struct rpcrdma_ia *ia, struct sockaddr *addr)
506 {
507         struct rdma_cm_id *id;
508         int rc;
509
510         init_completion(&ia->ri_done);
511
512         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
513         if (IS_ERR(id)) {
514                 rc = PTR_ERR(id);
515                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
516                         __func__, rc);
517                 return id;
518         }
519
520         ia->ri_async_rc = -ETIMEDOUT;
521         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
522         if (rc) {
523                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
524                         __func__, rc);
525                 goto out;
526         }
527         wait_for_completion_interruptible_timeout(&ia->ri_done,
528                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
529         rc = ia->ri_async_rc;
530         if (rc)
531                 goto out;
532
533         ia->ri_async_rc = -ETIMEDOUT;
534         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
535         if (rc) {
536                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
537                         __func__, rc);
538                 goto out;
539         }
540         wait_for_completion_interruptible_timeout(&ia->ri_done,
541                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
542         rc = ia->ri_async_rc;
543         if (rc)
544                 goto out;
545
546         return id;
547
548 out:
549         rdma_destroy_id(id);
550         return ERR_PTR(rc);
551 }
552
553 /*
554  * Drain any cq, prior to teardown.
555  */
556 static void
557 rpcrdma_clean_cq(struct ib_cq *cq)
558 {
559         struct ib_wc wc;
560         int count = 0;
561
562         while (1 == ib_poll_cq(cq, 1, &wc))
563                 ++count;
564
565         if (count)
566                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
567                         __func__, count, wc.opcode);
568 }
569
570 /*
571  * Exported functions.
572  */
573
574 /*
575  * Open and initialize an Interface Adapter.
576  *  o initializes fields of struct rpcrdma_ia, including
577  *    interface and provider attributes and protection zone.
578  */
579 int
580 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
581 {
582         int rc, mem_priv;
583         struct rpcrdma_ia *ia = &xprt->rx_ia;
584         struct ib_device_attr *devattr = &ia->ri_devattr;
585
586         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
587         if (IS_ERR(ia->ri_id)) {
588                 rc = PTR_ERR(ia->ri_id);
589                 goto out1;
590         }
591
592         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
593         if (IS_ERR(ia->ri_pd)) {
594                 rc = PTR_ERR(ia->ri_pd);
595                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
596                         __func__, rc);
597                 goto out2;
598         }
599
600         rc = ib_query_device(ia->ri_id->device, devattr);
601         if (rc) {
602                 dprintk("RPC:       %s: ib_query_device failed %d\n",
603                         __func__, rc);
604                 goto out3;
605         }
606
607         if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
608                 ia->ri_have_dma_lkey = 1;
609                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
610         }
611
612         if (memreg == RPCRDMA_FRMR) {
613                 /* Requires both frmr reg and local dma lkey */
614                 if (((devattr->device_cap_flags &
615                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
616                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
617                       (devattr->max_fast_reg_page_list_len == 0)) {
618                         dprintk("RPC:       %s: FRMR registration "
619                                 "not supported by HCA\n", __func__);
620                         memreg = RPCRDMA_MTHCAFMR;
621                 }
622         }
623         if (memreg == RPCRDMA_MTHCAFMR) {
624                 if (!ia->ri_id->device->alloc_fmr) {
625                         dprintk("RPC:       %s: MTHCAFMR registration "
626                                 "not supported by HCA\n", __func__);
627                         memreg = RPCRDMA_ALLPHYSICAL;
628                 }
629         }
630
631         /*
632          * Optionally obtain an underlying physical identity mapping in
633          * order to do a memory window-based bind. This base registration
634          * is protected from remote access - that is enabled only by binding
635          * for the specific bytes targeted during each RPC operation, and
636          * revoked after the corresponding completion similar to a storage
637          * adapter.
638          */
639         switch (memreg) {
640         case RPCRDMA_FRMR:
641                 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
642                 break;
643         case RPCRDMA_ALLPHYSICAL:
644                 ia->ri_ops = &rpcrdma_physical_memreg_ops;
645                 mem_priv = IB_ACCESS_LOCAL_WRITE |
646                                 IB_ACCESS_REMOTE_WRITE |
647                                 IB_ACCESS_REMOTE_READ;
648                 goto register_setup;
649         case RPCRDMA_MTHCAFMR:
650                 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
651                 if (ia->ri_have_dma_lkey)
652                         break;
653                 mem_priv = IB_ACCESS_LOCAL_WRITE;
654         register_setup:
655                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
656                 if (IS_ERR(ia->ri_bind_mem)) {
657                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
658                                 "phys register failed with %lX\n",
659                                 __func__, PTR_ERR(ia->ri_bind_mem));
660                         rc = -ENOMEM;
661                         goto out3;
662                 }
663                 break;
664         default:
665                 printk(KERN_ERR "RPC: Unsupported memory "
666                                 "registration mode: %d\n", memreg);
667                 rc = -ENOMEM;
668                 goto out3;
669         }
670         dprintk("RPC:       %s: memory registration strategy is '%s'\n",
671                 __func__, ia->ri_ops->ro_displayname);
672
673         /* Else will do memory reg/dereg for each chunk */
674         ia->ri_memreg_strategy = memreg;
675
676         rwlock_init(&ia->ri_qplock);
677         return 0;
678
679 out3:
680         ib_dealloc_pd(ia->ri_pd);
681         ia->ri_pd = NULL;
682 out2:
683         rdma_destroy_id(ia->ri_id);
684         ia->ri_id = NULL;
685 out1:
686         return rc;
687 }
688
689 /*
690  * Clean up/close an IA.
691  *   o if event handles and PD have been initialized, free them.
692  *   o close the IA
693  */
694 void
695 rpcrdma_ia_close(struct rpcrdma_ia *ia)
696 {
697         int rc;
698
699         dprintk("RPC:       %s: entering\n", __func__);
700         if (ia->ri_bind_mem != NULL) {
701                 rc = ib_dereg_mr(ia->ri_bind_mem);
702                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
703                         __func__, rc);
704         }
705         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
706                 if (ia->ri_id->qp)
707                         rdma_destroy_qp(ia->ri_id);
708                 rdma_destroy_id(ia->ri_id);
709                 ia->ri_id = NULL;
710         }
711         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
712                 rc = ib_dealloc_pd(ia->ri_pd);
713                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
714                         __func__, rc);
715         }
716 }
717
718 /*
719  * Create unconnected endpoint.
720  */
721 int
722 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
723                                 struct rpcrdma_create_data_internal *cdata)
724 {
725         struct ib_device_attr *devattr = &ia->ri_devattr;
726         struct ib_cq *sendcq, *recvcq;
727         int rc, err;
728
729         /* check provider's send/recv wr limits */
730         if (cdata->max_requests > devattr->max_qp_wr)
731                 cdata->max_requests = devattr->max_qp_wr;
732
733         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
734         ep->rep_attr.qp_context = ep;
735         ep->rep_attr.srq = NULL;
736         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
737         rc = ia->ri_ops->ro_open(ia, ep, cdata);
738         if (rc)
739                 return rc;
740         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
741         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
742         ep->rep_attr.cap.max_recv_sge = 1;
743         ep->rep_attr.cap.max_inline_data = 0;
744         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
745         ep->rep_attr.qp_type = IB_QPT_RC;
746         ep->rep_attr.port_num = ~0;
747
748         if (cdata->padding) {
749                 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
750                                                       GFP_KERNEL);
751                 if (IS_ERR(ep->rep_padbuf))
752                         return PTR_ERR(ep->rep_padbuf);
753         } else
754                 ep->rep_padbuf = NULL;
755
756         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
757                 "iovs: send %d recv %d\n",
758                 __func__,
759                 ep->rep_attr.cap.max_send_wr,
760                 ep->rep_attr.cap.max_recv_wr,
761                 ep->rep_attr.cap.max_send_sge,
762                 ep->rep_attr.cap.max_recv_sge);
763
764         /* set trigger for requesting send completion */
765         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
766         if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
767                 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
768         else if (ep->rep_cqinit <= 2)
769                 ep->rep_cqinit = 0;
770         INIT_CQCOUNT(ep);
771         init_waitqueue_head(&ep->rep_connect_wait);
772         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
773
774         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
775                                   rpcrdma_cq_async_error_upcall, ep,
776                                   ep->rep_attr.cap.max_send_wr + 1, 0);
777         if (IS_ERR(sendcq)) {
778                 rc = PTR_ERR(sendcq);
779                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
780                         __func__, rc);
781                 goto out1;
782         }
783
784         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
785         if (rc) {
786                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
787                         __func__, rc);
788                 goto out2;
789         }
790
791         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
792                                   rpcrdma_cq_async_error_upcall, ep,
793                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
794         if (IS_ERR(recvcq)) {
795                 rc = PTR_ERR(recvcq);
796                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
797                         __func__, rc);
798                 goto out2;
799         }
800
801         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
802         if (rc) {
803                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
804                         __func__, rc);
805                 ib_destroy_cq(recvcq);
806                 goto out2;
807         }
808
809         ep->rep_attr.send_cq = sendcq;
810         ep->rep_attr.recv_cq = recvcq;
811
812         /* Initialize cma parameters */
813
814         /* RPC/RDMA does not use private data */
815         ep->rep_remote_cma.private_data = NULL;
816         ep->rep_remote_cma.private_data_len = 0;
817
818         /* Client offers RDMA Read but does not initiate */
819         ep->rep_remote_cma.initiator_depth = 0;
820         if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
821                 ep->rep_remote_cma.responder_resources = 32;
822         else
823                 ep->rep_remote_cma.responder_resources =
824                                                 devattr->max_qp_rd_atom;
825
826         ep->rep_remote_cma.retry_count = 7;
827         ep->rep_remote_cma.flow_control = 0;
828         ep->rep_remote_cma.rnr_retry_count = 0;
829
830         return 0;
831
832 out2:
833         err = ib_destroy_cq(sendcq);
834         if (err)
835                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
836                         __func__, err);
837 out1:
838         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
839         return rc;
840 }
841
842 /*
843  * rpcrdma_ep_destroy
844  *
845  * Disconnect and destroy endpoint. After this, the only
846  * valid operations on the ep are to free it (if dynamically
847  * allocated) or re-create it.
848  */
849 void
850 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
851 {
852         int rc;
853
854         dprintk("RPC:       %s: entering, connected is %d\n",
855                 __func__, ep->rep_connected);
856
857         cancel_delayed_work_sync(&ep->rep_connect_worker);
858
859         if (ia->ri_id->qp) {
860                 rpcrdma_ep_disconnect(ep, ia);
861                 rdma_destroy_qp(ia->ri_id);
862                 ia->ri_id->qp = NULL;
863         }
864
865         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
866
867         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
868         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
869         if (rc)
870                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
871                         __func__, rc);
872
873         rpcrdma_clean_cq(ep->rep_attr.send_cq);
874         rc = ib_destroy_cq(ep->rep_attr.send_cq);
875         if (rc)
876                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
877                         __func__, rc);
878 }
879
880 /*
881  * Connect unconnected endpoint.
882  */
883 int
884 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
885 {
886         struct rdma_cm_id *id, *old;
887         int rc = 0;
888         int retry_count = 0;
889
890         if (ep->rep_connected != 0) {
891                 struct rpcrdma_xprt *xprt;
892 retry:
893                 dprintk("RPC:       %s: reconnecting...\n", __func__);
894
895                 rpcrdma_ep_disconnect(ep, ia);
896                 rpcrdma_flush_cqs(ep);
897
898                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
899                 ia->ri_ops->ro_reset(xprt);
900
901                 id = rpcrdma_create_id(xprt, ia,
902                                 (struct sockaddr *)&xprt->rx_data.addr);
903                 if (IS_ERR(id)) {
904                         rc = -EHOSTUNREACH;
905                         goto out;
906                 }
907                 /* TEMP TEMP TEMP - fail if new device:
908                  * Deregister/remarshal *all* requests!
909                  * Close and recreate adapter, pd, etc!
910                  * Re-determine all attributes still sane!
911                  * More stuff I haven't thought of!
912                  * Rrrgh!
913                  */
914                 if (ia->ri_id->device != id->device) {
915                         printk("RPC:       %s: can't reconnect on "
916                                 "different device!\n", __func__);
917                         rdma_destroy_id(id);
918                         rc = -ENETUNREACH;
919                         goto out;
920                 }
921                 /* END TEMP */
922                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
923                 if (rc) {
924                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
925                                 __func__, rc);
926                         rdma_destroy_id(id);
927                         rc = -ENETUNREACH;
928                         goto out;
929                 }
930
931                 write_lock(&ia->ri_qplock);
932                 old = ia->ri_id;
933                 ia->ri_id = id;
934                 write_unlock(&ia->ri_qplock);
935
936                 rdma_destroy_qp(old);
937                 rdma_destroy_id(old);
938         } else {
939                 dprintk("RPC:       %s: connecting...\n", __func__);
940                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
941                 if (rc) {
942                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
943                                 __func__, rc);
944                         /* do not update ep->rep_connected */
945                         return -ENETUNREACH;
946                 }
947         }
948
949         ep->rep_connected = 0;
950
951         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
952         if (rc) {
953                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
954                                 __func__, rc);
955                 goto out;
956         }
957
958         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
959
960         /*
961          * Check state. A non-peer reject indicates no listener
962          * (ECONNREFUSED), which may be a transient state. All
963          * others indicate a transport condition which has already
964          * undergone a best-effort.
965          */
966         if (ep->rep_connected == -ECONNREFUSED &&
967             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
968                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
969                 goto retry;
970         }
971         if (ep->rep_connected <= 0) {
972                 /* Sometimes, the only way to reliably connect to remote
973                  * CMs is to use same nonzero values for ORD and IRD. */
974                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
975                     (ep->rep_remote_cma.responder_resources == 0 ||
976                      ep->rep_remote_cma.initiator_depth !=
977                                 ep->rep_remote_cma.responder_resources)) {
978                         if (ep->rep_remote_cma.responder_resources == 0)
979                                 ep->rep_remote_cma.responder_resources = 1;
980                         ep->rep_remote_cma.initiator_depth =
981                                 ep->rep_remote_cma.responder_resources;
982                         goto retry;
983                 }
984                 rc = ep->rep_connected;
985         } else {
986                 dprintk("RPC:       %s: connected\n", __func__);
987         }
988
989 out:
990         if (rc)
991                 ep->rep_connected = rc;
992         return rc;
993 }
994
995 /*
996  * rpcrdma_ep_disconnect
997  *
998  * This is separate from destroy to facilitate the ability
999  * to reconnect without recreating the endpoint.
1000  *
1001  * This call is not reentrant, and must not be made in parallel
1002  * on the same endpoint.
1003  */
1004 void
1005 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1006 {
1007         int rc;
1008
1009         rpcrdma_flush_cqs(ep);
1010         rc = rdma_disconnect(ia->ri_id);
1011         if (!rc) {
1012                 /* returns without wait if not connected */
1013                 wait_event_interruptible(ep->rep_connect_wait,
1014                                                         ep->rep_connected != 1);
1015                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1016                         (ep->rep_connected == 1) ? "still " : "dis");
1017         } else {
1018                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1019                 ep->rep_connected = rc;
1020         }
1021 }
1022
1023 static struct rpcrdma_req *
1024 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1025 {
1026         struct rpcrdma_req *req;
1027
1028         req = kzalloc(sizeof(*req), GFP_KERNEL);
1029         if (req == NULL)
1030                 return ERR_PTR(-ENOMEM);
1031
1032         req->rl_buffer = &r_xprt->rx_buf;
1033         return req;
1034 }
1035
1036 static struct rpcrdma_rep *
1037 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1038 {
1039         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1040         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1041         struct rpcrdma_rep *rep;
1042         int rc;
1043
1044         rc = -ENOMEM;
1045         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1046         if (rep == NULL)
1047                 goto out;
1048
1049         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1050                                                GFP_KERNEL);
1051         if (IS_ERR(rep->rr_rdmabuf)) {
1052                 rc = PTR_ERR(rep->rr_rdmabuf);
1053                 goto out_free;
1054         }
1055
1056         rep->rr_buffer = &r_xprt->rx_buf;
1057         return rep;
1058
1059 out_free:
1060         kfree(rep);
1061 out:
1062         return ERR_PTR(rc);
1063 }
1064
1065 int
1066 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1067 {
1068         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1069         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1070         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1071         char *p;
1072         size_t len;
1073         int i, rc;
1074
1075         buf->rb_max_requests = cdata->max_requests;
1076         spin_lock_init(&buf->rb_lock);
1077
1078         /* Need to allocate:
1079          *   1.  arrays for send and recv pointers
1080          *   2.  arrays of struct rpcrdma_req to fill in pointers
1081          *   3.  array of struct rpcrdma_rep for replies
1082          * Send/recv buffers in req/rep need to be registered
1083          */
1084         len = buf->rb_max_requests *
1085                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1086
1087         p = kzalloc(len, GFP_KERNEL);
1088         if (p == NULL) {
1089                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1090                         __func__, len);
1091                 rc = -ENOMEM;
1092                 goto out;
1093         }
1094         buf->rb_pool = p;       /* for freeing it later */
1095
1096         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1097         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1098         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1099         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1100
1101         rc = ia->ri_ops->ro_init(r_xprt);
1102         if (rc)
1103                 goto out;
1104
1105         for (i = 0; i < buf->rb_max_requests; i++) {
1106                 struct rpcrdma_req *req;
1107                 struct rpcrdma_rep *rep;
1108
1109                 req = rpcrdma_create_req(r_xprt);
1110                 if (IS_ERR(req)) {
1111                         dprintk("RPC:       %s: request buffer %d alloc"
1112                                 " failed\n", __func__, i);
1113                         rc = PTR_ERR(req);
1114                         goto out;
1115                 }
1116                 buf->rb_send_bufs[i] = req;
1117
1118                 rep = rpcrdma_create_rep(r_xprt);
1119                 if (IS_ERR(rep)) {
1120                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1121                                 __func__, i);
1122                         rc = PTR_ERR(rep);
1123                         goto out;
1124                 }
1125                 buf->rb_recv_bufs[i] = rep;
1126         }
1127
1128         return 0;
1129 out:
1130         rpcrdma_buffer_destroy(buf);
1131         return rc;
1132 }
1133
1134 static void
1135 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1136 {
1137         if (!rep)
1138                 return;
1139
1140         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1141         kfree(rep);
1142 }
1143
1144 static void
1145 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1146 {
1147         if (!req)
1148                 return;
1149
1150         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1151         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1152         kfree(req);
1153 }
1154
1155 void
1156 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1157 {
1158         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1159         int i;
1160
1161         /* clean up in reverse order from create
1162          *   1.  recv mr memory (mr free, then kfree)
1163          *   2.  send mr memory (mr free, then kfree)
1164          *   3.  MWs
1165          */
1166         dprintk("RPC:       %s: entering\n", __func__);
1167
1168         for (i = 0; i < buf->rb_max_requests; i++) {
1169                 if (buf->rb_recv_bufs)
1170                         rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1171                 if (buf->rb_send_bufs)
1172                         rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1173         }
1174
1175         ia->ri_ops->ro_destroy(buf);
1176
1177         kfree(buf->rb_pool);
1178 }
1179
1180 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1181  * some req segments uninitialized.
1182  */
1183 static void
1184 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1185 {
1186         if (*mw) {
1187                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1188                 *mw = NULL;
1189         }
1190 }
1191
1192 /* Cycle mw's back in reverse order, and "spin" them.
1193  * This delays and scrambles reuse as much as possible.
1194  */
1195 static void
1196 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1197 {
1198         struct rpcrdma_mr_seg *seg = req->rl_segments;
1199         struct rpcrdma_mr_seg *seg1 = seg;
1200         int i;
1201
1202         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1203                 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1204         rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1205 }
1206
1207 static void
1208 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1209 {
1210         buf->rb_send_bufs[--buf->rb_send_index] = req;
1211         req->rl_niovs = 0;
1212         if (req->rl_reply) {
1213                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1214                 req->rl_reply->rr_func = NULL;
1215                 req->rl_reply = NULL;
1216         }
1217 }
1218
1219 /* rpcrdma_unmap_one() was already done during deregistration.
1220  * Redo only the ib_post_send().
1221  */
1222 static void
1223 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1224 {
1225         struct rpcrdma_xprt *r_xprt =
1226                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1227         struct ib_send_wr invalidate_wr, *bad_wr;
1228         int rc;
1229
1230         dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1231
1232         /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1233         r->r.frmr.fr_state = FRMR_IS_INVALID;
1234
1235         memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1236         invalidate_wr.wr_id = (unsigned long)(void *)r;
1237         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1238         invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1239         DECR_CQCOUNT(&r_xprt->rx_ep);
1240
1241         dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1242                 __func__, r, r->r.frmr.fr_mr->rkey);
1243
1244         read_lock(&ia->ri_qplock);
1245         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1246         read_unlock(&ia->ri_qplock);
1247         if (rc) {
1248                 /* Force rpcrdma_buffer_get() to retry */
1249                 r->r.frmr.fr_state = FRMR_IS_STALE;
1250                 dprintk("RPC:       %s: ib_post_send failed, %i\n",
1251                         __func__, rc);
1252         }
1253 }
1254
1255 static void
1256 rpcrdma_retry_flushed_linv(struct list_head *stale,
1257                            struct rpcrdma_buffer *buf)
1258 {
1259         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1260         struct list_head *pos;
1261         struct rpcrdma_mw *r;
1262         unsigned long flags;
1263
1264         list_for_each(pos, stale) {
1265                 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1266                 rpcrdma_retry_local_inv(r, ia);
1267         }
1268
1269         spin_lock_irqsave(&buf->rb_lock, flags);
1270         list_splice_tail(stale, &buf->rb_mws);
1271         spin_unlock_irqrestore(&buf->rb_lock, flags);
1272 }
1273
1274 static struct rpcrdma_req *
1275 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1276                          struct list_head *stale)
1277 {
1278         struct rpcrdma_mw *r;
1279         int i;
1280
1281         i = RPCRDMA_MAX_SEGS - 1;
1282         while (!list_empty(&buf->rb_mws)) {
1283                 r = list_entry(buf->rb_mws.next,
1284                                struct rpcrdma_mw, mw_list);
1285                 list_del(&r->mw_list);
1286                 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1287                         list_add(&r->mw_list, stale);
1288                         continue;
1289                 }
1290                 req->rl_segments[i].rl_mw = r;
1291                 if (unlikely(i-- == 0))
1292                         return req;     /* Success */
1293         }
1294
1295         /* Not enough entries on rb_mws for this req */
1296         rpcrdma_buffer_put_sendbuf(req, buf);
1297         rpcrdma_buffer_put_mrs(req, buf);
1298         return NULL;
1299 }
1300
1301 static struct rpcrdma_req *
1302 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1303 {
1304         struct rpcrdma_mw *r;
1305         int i;
1306
1307         i = RPCRDMA_MAX_SEGS - 1;
1308         while (!list_empty(&buf->rb_mws)) {
1309                 r = list_entry(buf->rb_mws.next,
1310                                struct rpcrdma_mw, mw_list);
1311                 list_del(&r->mw_list);
1312                 req->rl_segments[i].rl_mw = r;
1313                 if (unlikely(i-- == 0))
1314                         return req;     /* Success */
1315         }
1316
1317         /* Not enough entries on rb_mws for this req */
1318         rpcrdma_buffer_put_sendbuf(req, buf);
1319         rpcrdma_buffer_put_mrs(req, buf);
1320         return NULL;
1321 }
1322
1323 /*
1324  * Get a set of request/reply buffers.
1325  *
1326  * Reply buffer (if needed) is attached to send buffer upon return.
1327  * Rule:
1328  *    rb_send_index and rb_recv_index MUST always be pointing to the
1329  *    *next* available buffer (non-NULL). They are incremented after
1330  *    removing buffers, and decremented *before* returning them.
1331  */
1332 struct rpcrdma_req *
1333 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1334 {
1335         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1336         struct list_head stale;
1337         struct rpcrdma_req *req;
1338         unsigned long flags;
1339
1340         spin_lock_irqsave(&buffers->rb_lock, flags);
1341         if (buffers->rb_send_index == buffers->rb_max_requests) {
1342                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1343                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1344                 return ((struct rpcrdma_req *)NULL);
1345         }
1346
1347         req = buffers->rb_send_bufs[buffers->rb_send_index];
1348         if (buffers->rb_send_index < buffers->rb_recv_index) {
1349                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1350                         __func__,
1351                         buffers->rb_recv_index - buffers->rb_send_index);
1352                 req->rl_reply = NULL;
1353         } else {
1354                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1355                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1356         }
1357         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1358
1359         INIT_LIST_HEAD(&stale);
1360         switch (ia->ri_memreg_strategy) {
1361         case RPCRDMA_FRMR:
1362                 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1363                 break;
1364         case RPCRDMA_MTHCAFMR:
1365                 req = rpcrdma_buffer_get_fmrs(req, buffers);
1366                 break;
1367         default:
1368                 break;
1369         }
1370         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1371         if (!list_empty(&stale))
1372                 rpcrdma_retry_flushed_linv(&stale, buffers);
1373         return req;
1374 }
1375
1376 /*
1377  * Put request/reply buffers back into pool.
1378  * Pre-decrement counter/array index.
1379  */
1380 void
1381 rpcrdma_buffer_put(struct rpcrdma_req *req)
1382 {
1383         struct rpcrdma_buffer *buffers = req->rl_buffer;
1384         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1385         unsigned long flags;
1386
1387         spin_lock_irqsave(&buffers->rb_lock, flags);
1388         rpcrdma_buffer_put_sendbuf(req, buffers);
1389         switch (ia->ri_memreg_strategy) {
1390         case RPCRDMA_FRMR:
1391         case RPCRDMA_MTHCAFMR:
1392                 rpcrdma_buffer_put_mrs(req, buffers);
1393                 break;
1394         default:
1395                 break;
1396         }
1397         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1398 }
1399
1400 /*
1401  * Recover reply buffers from pool.
1402  * This happens when recovering from error conditions.
1403  * Post-increment counter/array index.
1404  */
1405 void
1406 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1407 {
1408         struct rpcrdma_buffer *buffers = req->rl_buffer;
1409         unsigned long flags;
1410
1411         spin_lock_irqsave(&buffers->rb_lock, flags);
1412         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1413                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1414                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1415         }
1416         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1417 }
1418
1419 /*
1420  * Put reply buffers back into pool when not attached to
1421  * request. This happens in error conditions.
1422  */
1423 void
1424 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1425 {
1426         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1427         unsigned long flags;
1428
1429         rep->rr_func = NULL;
1430         spin_lock_irqsave(&buffers->rb_lock, flags);
1431         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1432         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1433 }
1434
1435 /*
1436  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1437  */
1438
1439 void
1440 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1441 {
1442         dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1443                 seg->mr_offset,
1444                 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1445 }
1446
1447 static int
1448 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1449                                 struct ib_mr **mrp, struct ib_sge *iov)
1450 {
1451         struct ib_phys_buf ipb;
1452         struct ib_mr *mr;
1453         int rc;
1454
1455         /*
1456          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1457          */
1458         iov->addr = ib_dma_map_single(ia->ri_id->device,
1459                         va, len, DMA_BIDIRECTIONAL);
1460         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1461                 return -ENOMEM;
1462
1463         iov->length = len;
1464
1465         if (ia->ri_have_dma_lkey) {
1466                 *mrp = NULL;
1467                 iov->lkey = ia->ri_dma_lkey;
1468                 return 0;
1469         } else if (ia->ri_bind_mem != NULL) {
1470                 *mrp = NULL;
1471                 iov->lkey = ia->ri_bind_mem->lkey;
1472                 return 0;
1473         }
1474
1475         ipb.addr = iov->addr;
1476         ipb.size = iov->length;
1477         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1478                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1479
1480         dprintk("RPC:       %s: phys convert: 0x%llx "
1481                         "registered 0x%llx length %d\n",
1482                         __func__, (unsigned long long)ipb.addr,
1483                         (unsigned long long)iov->addr, len);
1484
1485         if (IS_ERR(mr)) {
1486                 *mrp = NULL;
1487                 rc = PTR_ERR(mr);
1488                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1489         } else {
1490                 *mrp = mr;
1491                 iov->lkey = mr->lkey;
1492                 rc = 0;
1493         }
1494
1495         return rc;
1496 }
1497
1498 static int
1499 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1500                                 struct ib_mr *mr, struct ib_sge *iov)
1501 {
1502         int rc;
1503
1504         ib_dma_unmap_single(ia->ri_id->device,
1505                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1506
1507         if (NULL == mr)
1508                 return 0;
1509
1510         rc = ib_dereg_mr(mr);
1511         if (rc)
1512                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1513         return rc;
1514 }
1515
1516 /**
1517  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1518  * @ia: controlling rpcrdma_ia
1519  * @size: size of buffer to be allocated, in bytes
1520  * @flags: GFP flags
1521  *
1522  * Returns pointer to private header of an area of internally
1523  * registered memory, or an ERR_PTR. The registered buffer follows
1524  * the end of the private header.
1525  *
1526  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1527  * receiving the payload of RDMA RECV operations. regbufs are not
1528  * used for RDMA READ/WRITE operations, thus are registered only for
1529  * LOCAL access.
1530  */
1531 struct rpcrdma_regbuf *
1532 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1533 {
1534         struct rpcrdma_regbuf *rb;
1535         int rc;
1536
1537         rc = -ENOMEM;
1538         rb = kmalloc(sizeof(*rb) + size, flags);
1539         if (rb == NULL)
1540                 goto out;
1541
1542         rb->rg_size = size;
1543         rb->rg_owner = NULL;
1544         rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1545                                        &rb->rg_mr, &rb->rg_iov);
1546         if (rc)
1547                 goto out_free;
1548
1549         return rb;
1550
1551 out_free:
1552         kfree(rb);
1553 out:
1554         return ERR_PTR(rc);
1555 }
1556
1557 /**
1558  * rpcrdma_free_regbuf - deregister and free registered buffer
1559  * @ia: controlling rpcrdma_ia
1560  * @rb: regbuf to be deregistered and freed
1561  */
1562 void
1563 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1564 {
1565         if (rb) {
1566                 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1567                 kfree(rb);
1568         }
1569 }
1570
1571 /*
1572  * Prepost any receive buffer, then post send.
1573  *
1574  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1575  */
1576 int
1577 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1578                 struct rpcrdma_ep *ep,
1579                 struct rpcrdma_req *req)
1580 {
1581         struct ib_send_wr send_wr, *send_wr_fail;
1582         struct rpcrdma_rep *rep = req->rl_reply;
1583         int rc;
1584
1585         if (rep) {
1586                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1587                 if (rc)
1588                         goto out;
1589                 req->rl_reply = NULL;
1590         }
1591
1592         send_wr.next = NULL;
1593         send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1594         send_wr.sg_list = req->rl_send_iov;
1595         send_wr.num_sge = req->rl_niovs;
1596         send_wr.opcode = IB_WR_SEND;
1597         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1598                 ib_dma_sync_single_for_device(ia->ri_id->device,
1599                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1600                         DMA_TO_DEVICE);
1601         ib_dma_sync_single_for_device(ia->ri_id->device,
1602                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1603                 DMA_TO_DEVICE);
1604         ib_dma_sync_single_for_device(ia->ri_id->device,
1605                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1606                 DMA_TO_DEVICE);
1607
1608         if (DECR_CQCOUNT(ep) > 0)
1609                 send_wr.send_flags = 0;
1610         else { /* Provider must take a send completion every now and then */
1611                 INIT_CQCOUNT(ep);
1612                 send_wr.send_flags = IB_SEND_SIGNALED;
1613         }
1614
1615         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1616         if (rc)
1617                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1618                         rc);
1619 out:
1620         return rc;
1621 }
1622
1623 /*
1624  * (Re)post a receive buffer.
1625  */
1626 int
1627 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1628                      struct rpcrdma_ep *ep,
1629                      struct rpcrdma_rep *rep)
1630 {
1631         struct ib_recv_wr recv_wr, *recv_wr_fail;
1632         int rc;
1633
1634         recv_wr.next = NULL;
1635         recv_wr.wr_id = (u64) (unsigned long) rep;
1636         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1637         recv_wr.num_sge = 1;
1638
1639         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1640                                    rdmab_addr(rep->rr_rdmabuf),
1641                                    rdmab_length(rep->rr_rdmabuf),
1642                                    DMA_BIDIRECTIONAL);
1643
1644         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1645
1646         if (rc)
1647                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1648                         rc);
1649         return rc;
1650 }
1651
1652 /* How many chunk list items fit within our inline buffers?
1653  */
1654 unsigned int
1655 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1656 {
1657         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1658         int bytes, segments;
1659
1660         bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1661         bytes -= RPCRDMA_HDRLEN_MIN;
1662         if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1663                 pr_warn("RPC:       %s: inline threshold too small\n",
1664                         __func__);
1665                 return 0;
1666         }
1667
1668         segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1669         dprintk("RPC:       %s: max chunk list size = %d segments\n",
1670                 __func__, segments);
1671         return segments;
1672 }