xprtrdma: Chunk list encoders no longer share one rl_segments array
authorChuck Lever <chuck.lever@oracle.com>
Wed, 29 Jun 2016 17:54:25 +0000 (13:54 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Mon, 11 Jul 2016 19:50:43 +0000 (15:50 -0400)
Currently, all three chunk list encoders each use a portion of the
one rl_segments array in rpcrdma_req. This is because the MWs for
each chunk list were preserved in rl_segments so that ro_unmap could
find and invalidate them after the RPC was complete.

However, now that MWs are placed on a per-req linked list as they
are registered, there is no longer any information in rpcrdma_mr_seg
that is shared between ro_map and ro_unmap_{sync,safe}, and thus
nothing in rl_segments needs to be preserved after
rpcrdma_marshal_req is complete.

Thus the rl_segments array can be used now just for the needs of
each rpcrdma_convert_iovs call. Once each chunk list is encoded, the
next chunk list encoder is free to re-use all of rl_segments.

This means all three chunk lists in one RPC request can now each
encode a full size data payload with no increase in the size of
rl_segments.

This is a key requirement for Kerberos support, since both the Call
and Reply for a single RPC transaction are conveyed via Long
messages (RDMA Read/Write). Both can be large.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 6d34c1f..f60d229 100644 (file)
@@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
  * MR when they can.
  */
 static int
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
-                    int n, int nsegs)
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
 {
        size_t page_offset;
        u32 remaining;
@@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
        base = vec->iov_base;
        page_offset = offset_in_page(base);
        remaining = vec->iov_len;
-       while (remaining && n < nsegs) {
+       while (remaining && n < RPCRDMA_MAX_SEGS) {
                seg[n].mr_page = NULL;
                seg[n].mr_offset = base;
                seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
@@ -230,23 +229,23 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 
 static int
 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
-       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
 {
-       int len, n = 0, p;
-       int page_base;
+       int len, n, p, page_base;
        struct page **ppages;
 
+       n = 0;
        if (pos == 0) {
-               n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
-               if (n == nsegs)
-                       return -EIO;
+               n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
+               if (n == RPCRDMA_MAX_SEGS)
+                       goto out_overflow;
        }
 
        len = xdrbuf->page_len;
        ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
        page_base = xdrbuf->page_base & ~PAGE_MASK;
        p = 0;
-       while (len && n < nsegs) {
+       while (len && n < RPCRDMA_MAX_SEGS) {
                if (!ppages[p]) {
                        /* alloc the pagelist for receiving buffer */
                        ppages[p] = alloc_page(GFP_ATOMIC);
@@ -257,7 +256,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
                seg[n].mr_offset = (void *)(unsigned long) page_base;
                seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
                if (seg[n].mr_len > PAGE_SIZE)
-                       return -EIO;
+                       goto out_overflow;
                len -= seg[n].mr_len;
                ++n;
                ++p;
@@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
        }
 
        /* Message overflows the seg array */
-       if (len && n == nsegs)
-               return -EIO;
+       if (len && n == RPCRDMA_MAX_SEGS)
+               goto out_overflow;
 
        /* When encoding the read list, the tail is always sent inline */
        if (type == rpcrdma_readch)
@@ -277,12 +276,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
                 * xdr pad bytes, saving the server an RDMA operation. */
                if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
                        return n;
-               n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
-               if (n == nsegs)
-                       return -EIO;
+               n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
+               if (n == RPCRDMA_MAX_SEGS)
+                       goto out_overflow;
        }
 
        return n;
+
+out_overflow:
+       pr_err("rpcrdma: segment array overflow\n");
+       return -EIO;
 }
 
 static inline __be32 *
@@ -310,7 +313,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
                         struct rpcrdma_req *req, struct rpc_rqst *rqst,
                         __be32 *iptr, enum rpcrdma_chunktype rtype)
 {
-       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
        unsigned int pos;
        int n, nsegs;
@@ -323,8 +326,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
        pos = rqst->rq_snd_buf.head[0].iov_len;
        if (rtype == rpcrdma_areadch)
                pos = 0;
-       nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
-                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+       seg = req->rl_segments;
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -349,11 +352,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
                        mw->mw_handle, n < nsegs ? "more" : "last");
 
                r_xprt->rx_stats.read_chunk_count++;
-               req->rl_nchunks++;
                seg += n;
                nsegs -= n;
        } while (nsegs);
-       req->rl_nextseg = seg;
 
        /* Finish Read list */
        *iptr++ = xdr_zero;     /* Next item not present */
@@ -377,7 +378,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                          struct rpc_rqst *rqst, __be32 *iptr,
                          enum rpcrdma_chunktype wtype)
 {
-       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
        int n, nsegs, nchunks;
        __be32 *segcount;
@@ -387,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                return iptr;
        }
 
+       seg = req->rl_segments;
        nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
                                     rqst->rq_rcv_buf.head[0].iov_len,
-                                    wtype, seg,
-                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+                                    wtype, seg);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -414,12 +415,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 
                r_xprt->rx_stats.write_chunk_count++;
                r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-               req->rl_nchunks++;
                nchunks++;
                seg   += n;
                nsegs -= n;
        } while (nsegs);
-       req->rl_nextseg = seg;
 
        /* Update count of segments in this Write chunk */
        *segcount = cpu_to_be32(nchunks);
@@ -446,7 +445,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
                           struct rpcrdma_req *req, struct rpc_rqst *rqst,
                           __be32 *iptr, enum rpcrdma_chunktype wtype)
 {
-       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       struct rpcrdma_mr_seg *seg;
        struct rpcrdma_mw *mw;
        int n, nsegs, nchunks;
        __be32 *segcount;
@@ -456,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
                return iptr;
        }
 
-       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
-                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+       seg = req->rl_segments;
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -481,12 +480,10 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 
                r_xprt->rx_stats.reply_chunk_count++;
                r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-               req->rl_nchunks++;
                nchunks++;
                seg   += n;
                nsegs -= n;
        } while (nsegs);
-       req->rl_nextseg = seg;
 
        /* Update count of segments in the Reply chunk */
        *segcount = cpu_to_be32(nchunks);
@@ -656,8 +653,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         * send a Call message with a Position Zero Read chunk and a
         * regular Read chunk at the same time.
         */
-       req->rl_nchunks = 0;
-       req->rl_nextseg = req->rl_segments;
        iptr = headerp->rm_body.rm_chunks;
        iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
        if (IS_ERR(iptr))
index f5d0511..670fad5 100644 (file)
@@ -171,23 +171,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
  *   o recv buffer (posted to provider)
  *   o ib_sge (also donated to provider)
  *   o status of reply (length, success or not)
- *   o bookkeeping state to get run by tasklet (list, etc)
+ *   o bookkeeping state to get run by reply handler (list, etc)
  *
- * These are allocated during initialization, per-transport instance;
- * however, the tasklet execution list itself is global, as it should
- * always be pretty short.
+ * These are allocated during initialization, per-transport instance.
  *
  * N of these are associated with a transport instance, and stored in
  * struct rpcrdma_buffer. N is the max number of outstanding requests.
  */
 
-#define RPCRDMA_MAX_DATA_SEGS  ((1 * 1024 * 1024) / PAGE_SIZE)
-
-/* data segments + head/tail for Call + head/tail for Reply */
-#define RPCRDMA_MAX_SEGS       (RPCRDMA_MAX_DATA_SEGS + 4)
-
-struct rpcrdma_buffer;
-
 struct rpcrdma_rep {
        struct ib_cqe           rr_cqe;
        unsigned int            rr_len;
@@ -267,13 +258,18 @@ struct rpcrdma_mw {
  * of iovs for send operations. The reason is that the iovs passed to
  * ib_post_{send,recv} must not be modified until the work request
  * completes.
- *
- * NOTES:
- *   o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
- *     marshal. The number needed varies depending on the iov lists that
- *     are passed to us and the memory registration mode we are in.
  */
 
+/* Maximum number of page-sized "segments" per chunk list to be
+ * registered or invalidated. Must handle a Reply chunk:
+ */
+enum {
+       RPCRDMA_MAX_IOV_SEGS    = 3,
+       RPCRDMA_MAX_DATA_SEGS   = ((1 * 1024 * 1024) / PAGE_SIZE) + 1,
+       RPCRDMA_MAX_SEGS        = RPCRDMA_MAX_DATA_SEGS +
+                                 RPCRDMA_MAX_IOV_SEGS,
+};
+
 struct rpcrdma_mr_seg {                /* chunk descriptors */
        u32             mr_len;         /* length of chunk or segment */
        struct page     *mr_page;       /* owning page, if any */
@@ -282,10 +278,10 @@ struct rpcrdma_mr_seg {           /* chunk descriptors */
 
 #define RPCRDMA_MAX_IOVS       (2)
 
+struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_free;
        unsigned int            rl_niovs;
-       unsigned int            rl_nchunks;
        unsigned int            rl_connect_cookie;
        struct rpc_task         *rl_task;
        struct rpcrdma_buffer   *rl_buffer;
@@ -293,13 +289,13 @@ struct rpcrdma_req {
        struct ib_sge           rl_send_iov[RPCRDMA_MAX_IOVS];
        struct rpcrdma_regbuf   *rl_rdmabuf;
        struct rpcrdma_regbuf   *rl_sendbuf;
-       struct list_head        rl_registered;  /* registered segments */
-       struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
-       struct rpcrdma_mr_seg   *rl_nextseg;
 
        struct ib_cqe           rl_cqe;
        struct list_head        rl_all;
        bool                    rl_backchannel;
+
+       struct list_head        rl_registered;  /* registered segments */
+       struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
 };
 
 static inline struct rpcrdma_req *