These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / sunrpc / xprtrdma / rpc_rdma.c
index 2c53ea9..c10d969 100644 (file)
@@ -71,6 +71,67 @@ static const char transfertypes[][12] = {
 };
 #endif
 
+/* The client can send a request inline as long as the RPCRDMA header
+ * plus the RPC call fit under the transport's inline limit. If the
+ * combined call message size exceeds that limit, the client must use
+ * the read chunk list for this operation.
+ */
+static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+{
+       unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+
+       return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+}
+
+/* The client can't know how large the actual reply will be. Thus it
+ * plans for the largest possible reply for that particular ULP
+ * operation. If the maximum combined reply message size exceeds that
+ * limit, the client must provide a write list or a reply chunk for
+ * this request.
+ */
+static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+{
+       unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+
+       return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+}
+
+static int
+rpcrdma_tail_pullup(struct xdr_buf *buf)
+{
+       size_t tlen = buf->tail[0].iov_len;
+       size_t skip = tlen & 3;
+
+       /* Do not include the tail if it is only an XDR pad */
+       if (tlen < 4)
+               return 0;
+
+       /* xdr_write_pages() adds a pad at the beginning of the tail
+        * if the content in "buf->pages" is unaligned. Force the
+        * tail's actual content to land at the next XDR position
+        * after the head instead.
+        */
+       if (skip) {
+               unsigned char *src, *dst;
+               unsigned int count;
+
+               src = buf->tail[0].iov_base;
+               dst = buf->head[0].iov_base;
+               dst += buf->head[0].iov_len;
+
+               src += skip;
+               tlen -= skip;
+
+               dprintk("RPC:       %s: skip=%zu, memmove(%p, %p, %zu)\n",
+                       __func__, skip, dst, src, tlen);
+
+               for (count = tlen; count; count--)
+                       *dst++ = *src++;
+       }
+
+       return tlen;
+}
+
 /*
  * Chunk assembly from upper layer xdr_buf.
  *
@@ -122,6 +183,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
        if (len && n == nsegs)
                return -EIO;
 
+       /* When encoding the read list, the tail is always sent inline */
+       if (type == rpcrdma_readch)
+               return n;
+
        if (xdrbuf->tail[0].iov_len) {
                /* the rpcrdma protocol allows us to omit any trailing
                 * xdr pad bytes, saving the server an RDMA operation. */
@@ -284,9 +349,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
        return (unsigned char *)iptr - (unsigned char *)headerp;
 
 out:
-       if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
-               return n;
-
        for (pos = 0; nchunks--;)
                pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
                                                      &req->rl_segments[pos]);
@@ -300,8 +362,7 @@ out:
  * pre-registered memory buffer for this request. For small amounts
  * of data, this is efficient. The cutoff value is tunable.
  */
-static int
-rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
 {
        int i, npages, curlen;
        int copy_len;
@@ -313,16 +374,9 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
        destp = rqst->rq_svec[0].iov_base;
        curlen = rqst->rq_svec[0].iov_len;
        destp += curlen;
-       /*
-        * Do optional padding where it makes sense. Alignment of write
-        * payload can help the server, if our setting is accurate.
-        */
-       pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
-       if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
-               pad = 0;        /* don't pad this request */
 
-       dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",
-               __func__, pad, destp, rqst->rq_slen, curlen);
+       dprintk("RPC:       %s: destp 0x%p len %d hdrlen %d\n",
+               __func__, destp, rqst->rq_slen, curlen);
 
        copy_len = rqst->rq_snd_buf.page_len;
 
@@ -358,7 +412,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
                page_base = 0;
        }
        /* header now contains entire send message */
-       return pad;
 }
 
 /*
@@ -383,11 +436,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
        char *base;
-       size_t rpclen, padlen;
+       size_t rpclen;
        ssize_t hdrlen;
        enum rpcrdma_chunktype rtype, wtype;
        struct rpcrdma_msg *headerp;
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+       if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+               return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
        /*
         * rpclen gets amount of data in first buffer, which is the
         * pre-registered buffer.
@@ -405,28 +463,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        /*
         * Chunks needed for results?
         *
+        * o Read ops return data as write chunk(s), header as inline.
         * o If the expected result is under the inline threshold, all ops
-        *   return as inline (but see later).
+        *   return as inline.
         * o Large non-read ops return as a single reply chunk.
-        * o Large read ops return data as write chunk(s), header as inline.
-        *
-        * Note: the NFS code sending down multiple result segments implies
-        * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
-        */
-
-       /*
-        * This code can handle read chunks, write chunks OR reply
-        * chunks -- only one type. If the request is too big to fit
-        * inline, then we will choose read chunks. If the request is
-        * a READ, then use write chunks to separate the file data
-        * into pages; otherwise use reply chunks.
         */
-       if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
-               wtype = rpcrdma_noch;
-       else if (rqst->rq_rcv_buf.page_len == 0)
-               wtype = rpcrdma_replych;
-       else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+       if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
                wtype = rpcrdma_writech;
+       else if (rpcrdma_results_inline(rqst))
+               wtype = rpcrdma_noch;
        else
                wtype = rpcrdma_replych;
 
@@ -435,21 +480,25 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         *
         * o If the total request is under the inline threshold, all ops
         *   are sent as inline.
-        * o Large non-write ops are sent with the entire message as a
-        *   single read chunk (protocol 0-position special case).
         * o Large write ops transmit data as read chunk(s), header as
         *   inline.
+        * o Large non-write ops are sent with the entire message as a
+        *   single read chunk (protocol 0-position special case).
         *
-        * Note: the NFS code sending down multiple argument segments
-        * implies the op is a write.
-        * TBD check NFSv4 setacl
+        * This assumes that the upper layer does not present a request
+        * that both has a data payload, and whose non-data arguments
+        * by themselves are larger than the inline threshold.
         */
-       if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+       if (rpcrdma_args_inline(rqst)) {
                rtype = rpcrdma_noch;
-       else if (rqst->rq_snd_buf.page_len == 0)
-               rtype = rpcrdma_areadch;
-       else
+       } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
                rtype = rpcrdma_readch;
+       } else {
+               r_xprt->rx_stats.nomsg_call_count++;
+               headerp->rm_type = htonl(RDMA_NOMSG);
+               rtype = rpcrdma_areadch;
+               rpclen = 0;
+       }
 
        /* The following simplification is not true forever */
        if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
@@ -461,7 +510,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        }
 
        hdrlen = RPCRDMA_HDRLEN_MIN;
-       padlen = 0;
 
        /*
         * Pull up any extra send data into the preregistered buffer.
@@ -470,45 +518,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         */
        if (rtype == rpcrdma_noch) {
 
-               padlen = rpcrdma_inline_pullup(rqst,
-                                               RPCRDMA_INLINE_PAD_VALUE(rqst));
-
-               if (padlen) {
-                       headerp->rm_type = rdma_msgp;
-                       headerp->rm_body.rm_padded.rm_align =
-                               cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
-                       headerp->rm_body.rm_padded.rm_thresh =
-                               cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
-                       headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
-                       headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
-                       headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
-                       hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-                       if (wtype != rpcrdma_noch) {
-                               dprintk("RPC:       %s: invalid chunk list\n",
-                                       __func__);
-                               return -EIO;
-                       }
-               } else {
-                       headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
-                       headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
-                       headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
-                       /* new length after pullup */
-                       rpclen = rqst->rq_svec[0].iov_len;
-                       /*
-                        * Currently we try to not actually use read inline.
-                        * Reply chunks have the desirable property that
-                        * they land, packed, directly in the target buffers
-                        * without headers, so they require no fixup. The
-                        * additional RDMA Write op sends the same amount
-                        * of data, streams on-the-wire and adds no overhead
-                        * on receive. Therefore, we request a reply chunk
-                        * for non-writes wherever feasible and efficient.
-                        */
-                       if (wtype == rpcrdma_noch)
-                               wtype = rpcrdma_replych;
-               }
-       }
+               rpcrdma_inline_pullup(rqst);
 
+               headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+               headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+               headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+               /* new length after pullup */
+               rpclen = rqst->rq_svec[0].iov_len;
+       } else if (rtype == rpcrdma_readch)
+               rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
        if (rtype != rpcrdma_noch) {
                hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
                                               headerp, rtype);
@@ -521,9 +539,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        if (hdrlen < 0)
                return hdrlen;
 
-       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
+       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd"
                " headerp 0x%p base 0x%p lkey 0x%x\n",
-               __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+               __func__, transfertypes[wtype], hdrlen, rpclen,
                headerp, base, rdmab_lkey(req->rl_rdmabuf));
 
        /*
@@ -537,26 +555,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
        req->rl_send_iov[0].length = hdrlen;
        req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
 
+       req->rl_niovs = 1;
+       if (rtype == rpcrdma_areadch)
+               return 0;
+
        req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
        req->rl_send_iov[1].length = rpclen;
        req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
 
        req->rl_niovs = 2;
-
-       if (padlen) {
-               struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-
-               req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
-               req->rl_send_iov[2].length = padlen;
-               req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
-
-               req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
-               req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
-               req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
-
-               req->rl_niovs = 4;
-       }
-
        return 0;
 }
 
@@ -709,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
        spin_unlock_bh(&xprt->transport_lock);
 }
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+       __be32 *p = (__be32 *)headerp;
+
+       if (headerp->rm_type != rdma_msg)
+               return false;
+       if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+               return false;
+       if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+               return false;
+       if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+               return false;
+
+       /* sanity */
+       if (p[7] != headerp->rm_xid)
+               return false;
+       /* call direction */
+       if (p[8] != cpu_to_be32(RPC_CALL))
+               return false;
+
+       return true;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
 /*
  * This function is called when an async event is posted to
  * the connection which changes the connection state. All it
@@ -721,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
        schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
@@ -732,60 +770,39 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       struct rpc_xprt *xprt = rep->rr_xprt;
-       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        __be32 *iptr;
        int rdmalen, status;
        unsigned long cwnd;
        u32 credits;
 
-       /* Check status. If bad, signal disconnect and return rep to pool */
-       if (rep->rr_len == ~0U) {
-               rpcrdma_recv_buffer_put(rep);
-               if (r_xprt->rx_ep.rep_connected == 1) {
-                       r_xprt->rx_ep.rep_connected = -EIO;
-                       rpcrdma_conn_func(&r_xprt->rx_ep);
-               }
-               return;
-       }
-       if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
-               dprintk("RPC:       %s: short/invalid reply\n", __func__);
-               goto repost;
-       }
+       dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
+
+       if (rep->rr_len == RPCRDMA_BAD_LEN)
+               goto out_badstatus;
+       if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+               goto out_shortreply;
+
        headerp = rdmab_to_msg(rep->rr_rdmabuf);
-       if (headerp->rm_vers != rpcrdma_version) {
-               dprintk("RPC:       %s: invalid version %d\n",
-                       __func__, be32_to_cpu(headerp->rm_vers));
-               goto repost;
-       }
+       if (headerp->rm_vers != rpcrdma_version)
+               goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+       if (rpcrdma_is_bcall(headerp))
+               goto out_bcall;
+#endif
 
-       /* Get XID and try for a match. */
-       spin_lock(&xprt->transport_lock);
+       /* Match incoming rpcrdma_rep to an rpcrdma_req to
+        * get context for handling any incoming chunks.
+        */
+       spin_lock_bh(&xprt->transport_lock);
        rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-       if (rqst == NULL) {
-               spin_unlock(&xprt->transport_lock);
-               dprintk("RPC:       %s: reply 0x%p failed "
-                       "to match any request xid 0x%08x len %d\n",
-                       __func__, rep, be32_to_cpu(headerp->rm_xid),
-                       rep->rr_len);
-repost:
-               r_xprt->rx_stats.bad_reply_count++;
-               rep->rr_func = rpcrdma_reply_handler;
-               if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
-                       rpcrdma_recv_buffer_put(rep);
-
-               return;
-       }
+       if (!rqst)
+               goto out_nomatch;
 
-       /* get request object */
        req = rpcr_to_rdmar(rqst);
-       if (req->rl_reply) {
-               spin_unlock(&xprt->transport_lock);
-               dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
-                       "request 0x%p: xid 0x%08x\n", __func__, rep, req,
-                       be32_to_cpu(headerp->rm_xid));
-               goto repost;
-       }
+       if (req->rl_reply)
+               goto out_duplicate;
 
        dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
                "                   RPC request 0x%p xid 0x%08x\n",
@@ -882,8 +899,50 @@ badheader:
        if (xprt->cwnd > cwnd)
                xprt_release_rqst_cong(rqst->rq_task);
 
+       xprt_complete_rqst(rqst->rq_task, status);
+       spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
                        __func__, xprt, rqst, status);
-       xprt_complete_rqst(rqst->rq_task, status);
-       spin_unlock(&xprt->transport_lock);
+       return;
+
+out_badstatus:
+       rpcrdma_recv_buffer_put(rep);
+       if (r_xprt->rx_ep.rep_connected == 1) {
+               r_xprt->rx_ep.rep_connected = -EIO;
+               rpcrdma_conn_func(&r_xprt->rx_ep);
+       }
+       return;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+       rpcrdma_bc_receive_call(r_xprt, rep);
+       return;
+#endif
+
+out_shortreply:
+       dprintk("RPC:       %s: short/invalid reply\n", __func__);
+       goto repost;
+
+out_badversion:
+       dprintk("RPC:       %s: invalid version %d\n",
+               __func__, be32_to_cpu(headerp->rm_vers));
+       goto repost;
+
+out_nomatch:
+       spin_unlock_bh(&xprt->transport_lock);
+       dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
+               __func__, be32_to_cpu(headerp->rm_xid),
+               rep->rr_len);
+       goto repost;
+
+out_duplicate:
+       spin_unlock_bh(&xprt->transport_lock);
+       dprintk("RPC:       %s: "
+               "duplicate reply %p to RPC request %p: xid 0x%08x\n",
+               __func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+       r_xprt->rx_stats.bad_reply_count++;
+       if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+               rpcrdma_recv_buffer_put(rep);
 }