1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3 * Copyright (c) 2014-2020, Oracle and/or its affiliates.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
10 * license below:
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
14 * are met:
15 *
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
23 *
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
27 * permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 */
41
42 /*
43 * rpc_rdma.c
44 *
45 * This file contains the guts of the RPC RDMA protocol, and
46 * does marshaling/unmarshaling, etc. It is also where interfacing
47 * to the Linux RPC framework lives.
48 */
49
50 #include <linux/highmem.h>
51
52 #include <linux/sunrpc/svc_rdma.h>
53
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
56
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY RPCDBG_TRANS
59 #endif
60
61 /* Returns size of largest RPC-over-RDMA header in a Call message
62 *
63 * The largest Call header contains a full-size Read list and a
64 * minimal Reply chunk.
65 */
rpcrdma_max_call_header_size(unsigned int maxsegs)66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
67 {
68 unsigned int size;
69
70 /* Fixed header fields and list discriminators */
71 size = RPCRDMA_HDRLEN_MIN;
72
73 /* Maximum Read list size */
74 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
75
76 /* Minimal Read chunk size */
77 size += sizeof(__be32); /* segment count */
78 size += rpcrdma_segment_maxsz * sizeof(__be32);
79 size += sizeof(__be32); /* list discriminator */
80
81 return size;
82 }
83
84 /* Returns size of largest RPC-over-RDMA header in a Reply message
85 *
86 * There is only one Write list or one Reply chunk per Reply
87 * message. The larger list is the Write list.
88 */
rpcrdma_max_reply_header_size(unsigned int maxsegs)89 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
90 {
91 unsigned int size;
92
93 /* Fixed header fields and list discriminators */
94 size = RPCRDMA_HDRLEN_MIN;
95
96 /* Maximum Write list size */
97 size += sizeof(__be32); /* segment count */
98 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
99 size += sizeof(__be32); /* list discriminator */
100
101 return size;
102 }
103
104 /**
105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
106 * @ep: endpoint to initialize
107 *
108 * The max_inline fields contain the maximum size of an RPC message
109 * so the marshaling code doesn't have to repeat this calculation
110 * for every RPC.
111 */
rpcrdma_set_max_header_sizes(struct rpcrdma_ep * ep)112 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
113 {
114 unsigned int maxsegs = ep->re_max_rdma_segs;
115
116 ep->re_max_inline_send =
117 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
118 ep->re_max_inline_recv =
119 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
120 }
121
122 /* The client can send a request inline as long as the RPCRDMA header
123 * plus the RPC call fit under the transport's inline limit. If the
124 * combined call message size exceeds that limit, the client must use
125 * a Read chunk for this operation.
126 *
127 * A Read chunk is also required if sending the RPC call inline would
128 * exceed this device's max_sge limit.
129 */
rpcrdma_args_inline(struct rpcrdma_xprt * r_xprt,struct rpc_rqst * rqst)130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
131 struct rpc_rqst *rqst)
132 {
133 struct xdr_buf *xdr = &rqst->rq_snd_buf;
134 struct rpcrdma_ep *ep = r_xprt->rx_ep;
135 unsigned int count, remaining, offset;
136
137 if (xdr->len > ep->re_max_inline_send)
138 return false;
139
140 if (xdr->page_len) {
141 remaining = xdr->page_len;
142 offset = offset_in_page(xdr->page_base);
143 count = RPCRDMA_MIN_SEND_SGES;
144 while (remaining) {
145 remaining -= min_t(unsigned int,
146 PAGE_SIZE - offset, remaining);
147 offset = 0;
148 if (++count > ep->re_attr.cap.max_send_sge)
149 return false;
150 }
151 }
152
153 return true;
154 }
155
156 /* The client can't know how large the actual reply will be. Thus it
157 * plans for the largest possible reply for that particular ULP
158 * operation. If the maximum combined reply message size exceeds that
159 * limit, the client must provide a write list or a reply chunk for
160 * this request.
161 */
rpcrdma_results_inline(struct rpcrdma_xprt * r_xprt,struct rpc_rqst * rqst)162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
163 struct rpc_rqst *rqst)
164 {
165 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
166 }
167
168 /* The client is required to provide a Reply chunk if the maximum
169 * size of the non-payload part of the RPC Reply is larger than
170 * the inline threshold.
171 */
172 static bool
rpcrdma_nonpayload_inline(const struct rpcrdma_xprt * r_xprt,const struct rpc_rqst * rqst)173 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
174 const struct rpc_rqst *rqst)
175 {
176 const struct xdr_buf *buf = &rqst->rq_rcv_buf;
177
178 return (buf->head[0].iov_len + buf->tail[0].iov_len) <
179 r_xprt->rx_ep->re_max_inline_recv;
180 }
181
182 /* ACL likes to be lazy in allocating pages. For TCP, these
183 * pages can be allocated during receive processing. Not true
184 * for RDMA, which must always provision receive buffers
185 * up front.
186 */
187 static noinline int
rpcrdma_alloc_sparse_pages(struct xdr_buf * buf)188 rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
189 {
190 struct page **ppages;
191 int len;
192
193 len = buf->page_len;
194 ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
195 while (len > 0) {
196 if (!*ppages)
197 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
198 if (!*ppages)
199 return -ENOBUFS;
200 ppages++;
201 len -= PAGE_SIZE;
202 }
203
204 return 0;
205 }
206
207 /* Convert @vec to a single SGL element.
208 *
209 * Returns pointer to next available SGE, and bumps the total number
210 * of SGEs consumed.
211 */
212 static struct rpcrdma_mr_seg *
rpcrdma_convert_kvec(struct kvec * vec,struct rpcrdma_mr_seg * seg,unsigned int * n)213 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
214 unsigned int *n)
215 {
216 seg->mr_page = virt_to_page(vec->iov_base);
217 seg->mr_offset = offset_in_page(vec->iov_base);
218 seg->mr_len = vec->iov_len;
219 ++seg;
220 ++(*n);
221 return seg;
222 }
223
224 /* Convert @xdrbuf into SGEs no larger than a page each. As they
225 * are registered, these SGEs are then coalesced into RDMA segments
226 * when the selected memreg mode supports it.
227 *
228 * Returns positive number of SGEs consumed, or a negative errno.
229 */
230
231 static int
rpcrdma_convert_iovs(struct rpcrdma_xprt * r_xprt,struct xdr_buf * xdrbuf,unsigned int pos,enum rpcrdma_chunktype type,struct rpcrdma_mr_seg * seg)232 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
233 unsigned int pos, enum rpcrdma_chunktype type,
234 struct rpcrdma_mr_seg *seg)
235 {
236 unsigned long page_base;
237 unsigned int len, n;
238 struct page **ppages;
239
240 n = 0;
241 if (pos == 0)
242 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
243
244 len = xdrbuf->page_len;
245 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
246 page_base = offset_in_page(xdrbuf->page_base);
247 while (len) {
248 seg->mr_page = *ppages;
249 seg->mr_offset = page_base;
250 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
251 len -= seg->mr_len;
252 ++ppages;
253 ++seg;
254 ++n;
255 page_base = 0;
256 }
257
258 if (type == rpcrdma_readch || type == rpcrdma_writech)
259 goto out;
260
261 if (xdrbuf->tail[0].iov_len)
262 rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
263
264 out:
265 if (unlikely(n > RPCRDMA_MAX_SEGS))
266 return -EIO;
267 return n;
268 }
269
270 static int
encode_rdma_segment(struct xdr_stream * xdr,struct rpcrdma_mr * mr)271 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
272 {
273 __be32 *p;
274
275 p = xdr_reserve_space(xdr, 4 * sizeof(*p));
276 if (unlikely(!p))
277 return -EMSGSIZE;
278
279 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
280 return 0;
281 }
282
283 static int
encode_read_segment(struct xdr_stream * xdr,struct rpcrdma_mr * mr,u32 position)284 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
285 u32 position)
286 {
287 __be32 *p;
288
289 p = xdr_reserve_space(xdr, 6 * sizeof(*p));
290 if (unlikely(!p))
291 return -EMSGSIZE;
292
293 *p++ = xdr_one; /* Item present */
294 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
295 mr->mr_offset);
296 return 0;
297 }
298
rpcrdma_mr_prepare(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct rpcrdma_mr_seg * seg,int nsegs,bool writing,struct rpcrdma_mr ** mr)299 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
300 struct rpcrdma_req *req,
301 struct rpcrdma_mr_seg *seg,
302 int nsegs, bool writing,
303 struct rpcrdma_mr **mr)
304 {
305 *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
306 if (!*mr) {
307 *mr = rpcrdma_mr_get(r_xprt);
308 if (!*mr)
309 goto out_getmr_err;
310 (*mr)->mr_req = req;
311 }
312
313 rpcrdma_mr_push(*mr, &req->rl_registered);
314 return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
315
316 out_getmr_err:
317 trace_xprtrdma_nomrs_err(r_xprt, req);
318 xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
319 rpcrdma_mrs_refresh(r_xprt);
320 return ERR_PTR(-EAGAIN);
321 }
322
323 /* Register and XDR encode the Read list. Supports encoding a list of read
324 * segments that belong to a single read chunk.
325 *
326 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
327 *
328 * Read chunklist (a linked list):
329 * N elements, position P (same P for all chunks of same arg!):
330 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
331 *
332 * Returns zero on success, or a negative errno if a failure occurred.
333 * @xdr is advanced to the next position in the stream.
334 *
335 * Only a single @pos value is currently supported.
336 */
rpcrdma_encode_read_list(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct rpc_rqst * rqst,enum rpcrdma_chunktype rtype)337 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
338 struct rpcrdma_req *req,
339 struct rpc_rqst *rqst,
340 enum rpcrdma_chunktype rtype)
341 {
342 struct xdr_stream *xdr = &req->rl_stream;
343 struct rpcrdma_mr_seg *seg;
344 struct rpcrdma_mr *mr;
345 unsigned int pos;
346 int nsegs;
347
348 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
349 goto done;
350
351 pos = rqst->rq_snd_buf.head[0].iov_len;
352 if (rtype == rpcrdma_areadch)
353 pos = 0;
354 seg = req->rl_segments;
355 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
356 rtype, seg);
357 if (nsegs < 0)
358 return nsegs;
359
360 do {
361 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
362 if (IS_ERR(seg))
363 return PTR_ERR(seg);
364
365 if (encode_read_segment(xdr, mr, pos) < 0)
366 return -EMSGSIZE;
367
368 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
369 r_xprt->rx_stats.read_chunk_count++;
370 nsegs -= mr->mr_nents;
371 } while (nsegs);
372
373 done:
374 if (xdr_stream_encode_item_absent(xdr) < 0)
375 return -EMSGSIZE;
376 return 0;
377 }
378
379 /* Register and XDR encode the Write list. Supports encoding a list
380 * containing one array of plain segments that belong to a single
381 * write chunk.
382 *
383 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
384 *
385 * Write chunklist (a list of (one) counted array):
386 * N elements:
387 * 1 - N - HLOO - HLOO - ... - HLOO - 0
388 *
389 * Returns zero on success, or a negative errno if a failure occurred.
390 * @xdr is advanced to the next position in the stream.
391 *
392 * Only a single Write chunk is currently supported.
393 */
rpcrdma_encode_write_list(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct rpc_rqst * rqst,enum rpcrdma_chunktype wtype)394 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
395 struct rpcrdma_req *req,
396 struct rpc_rqst *rqst,
397 enum rpcrdma_chunktype wtype)
398 {
399 struct xdr_stream *xdr = &req->rl_stream;
400 struct rpcrdma_ep *ep = r_xprt->rx_ep;
401 struct rpcrdma_mr_seg *seg;
402 struct rpcrdma_mr *mr;
403 int nsegs, nchunks;
404 __be32 *segcount;
405
406 if (wtype != rpcrdma_writech)
407 goto done;
408
409 seg = req->rl_segments;
410 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
411 rqst->rq_rcv_buf.head[0].iov_len,
412 wtype, seg);
413 if (nsegs < 0)
414 return nsegs;
415
416 if (xdr_stream_encode_item_present(xdr) < 0)
417 return -EMSGSIZE;
418 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
419 if (unlikely(!segcount))
420 return -EMSGSIZE;
421 /* Actual value encoded below */
422
423 nchunks = 0;
424 do {
425 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
426 if (IS_ERR(seg))
427 return PTR_ERR(seg);
428
429 if (encode_rdma_segment(xdr, mr) < 0)
430 return -EMSGSIZE;
431
432 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
433 r_xprt->rx_stats.write_chunk_count++;
434 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
435 nchunks++;
436 nsegs -= mr->mr_nents;
437 } while (nsegs);
438
439 if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) {
440 if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0)
441 return -EMSGSIZE;
442
443 trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr,
444 nsegs);
445 r_xprt->rx_stats.write_chunk_count++;
446 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
447 nchunks++;
448 nsegs -= mr->mr_nents;
449 }
450
451 /* Update count of segments in this Write chunk */
452 *segcount = cpu_to_be32(nchunks);
453
454 done:
455 if (xdr_stream_encode_item_absent(xdr) < 0)
456 return -EMSGSIZE;
457 return 0;
458 }
459
460 /* Register and XDR encode the Reply chunk. Supports encoding an array
461 * of plain segments that belong to a single write (reply) chunk.
462 *
463 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
464 *
465 * Reply chunk (a counted array):
466 * N elements:
467 * 1 - N - HLOO - HLOO - ... - HLOO
468 *
469 * Returns zero on success, or a negative errno if a failure occurred.
470 * @xdr is advanced to the next position in the stream.
471 */
rpcrdma_encode_reply_chunk(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct rpc_rqst * rqst,enum rpcrdma_chunktype wtype)472 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
473 struct rpcrdma_req *req,
474 struct rpc_rqst *rqst,
475 enum rpcrdma_chunktype wtype)
476 {
477 struct xdr_stream *xdr = &req->rl_stream;
478 struct rpcrdma_mr_seg *seg;
479 struct rpcrdma_mr *mr;
480 int nsegs, nchunks;
481 __be32 *segcount;
482
483 if (wtype != rpcrdma_replych) {
484 if (xdr_stream_encode_item_absent(xdr) < 0)
485 return -EMSGSIZE;
486 return 0;
487 }
488
489 seg = req->rl_segments;
490 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
491 if (nsegs < 0)
492 return nsegs;
493
494 if (xdr_stream_encode_item_present(xdr) < 0)
495 return -EMSGSIZE;
496 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
497 if (unlikely(!segcount))
498 return -EMSGSIZE;
499 /* Actual value encoded below */
500
501 nchunks = 0;
502 do {
503 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
504 if (IS_ERR(seg))
505 return PTR_ERR(seg);
506
507 if (encode_rdma_segment(xdr, mr) < 0)
508 return -EMSGSIZE;
509
510 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
511 r_xprt->rx_stats.reply_chunk_count++;
512 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
513 nchunks++;
514 nsegs -= mr->mr_nents;
515 } while (nsegs);
516
517 /* Update count of segments in the Reply chunk */
518 *segcount = cpu_to_be32(nchunks);
519
520 return 0;
521 }
522
rpcrdma_sendctx_done(struct kref * kref)523 static void rpcrdma_sendctx_done(struct kref *kref)
524 {
525 struct rpcrdma_req *req =
526 container_of(kref, struct rpcrdma_req, rl_kref);
527 struct rpcrdma_rep *rep = req->rl_reply;
528
529 rpcrdma_complete_rqst(rep);
530 rep->rr_rxprt->rx_stats.reply_waits_for_send++;
531 }
532
533 /**
534 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
535 * @sc: sendctx containing SGEs to unmap
536 *
537 */
rpcrdma_sendctx_unmap(struct rpcrdma_sendctx * sc)538 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
539 {
540 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
541 struct ib_sge *sge;
542
543 if (!sc->sc_unmap_count)
544 return;
545
546 /* The first two SGEs contain the transport header and
547 * the inline buffer. These are always left mapped so
548 * they can be cheaply re-used.
549 */
550 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
551 ++sge, --sc->sc_unmap_count)
552 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
553 DMA_TO_DEVICE);
554
555 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
556 }
557
558 /* Prepare an SGE for the RPC-over-RDMA transport header.
559 */
rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,u32 len)560 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
561 struct rpcrdma_req *req, u32 len)
562 {
563 struct rpcrdma_sendctx *sc = req->rl_sendctx;
564 struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
565 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
566
567 sge->addr = rdmab_addr(rb);
568 sge->length = len;
569 sge->lkey = rdmab_lkey(rb);
570
571 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
572 DMA_TO_DEVICE);
573 }
574
575 /* The head iovec is straightforward, as it is usually already
576 * DMA-mapped. Sync the content that has changed.
577 */
rpcrdma_prepare_head_iov(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,unsigned int len)578 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
579 struct rpcrdma_req *req, unsigned int len)
580 {
581 struct rpcrdma_sendctx *sc = req->rl_sendctx;
582 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
583 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
584
585 if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
586 return false;
587
588 sge->addr = rdmab_addr(rb);
589 sge->length = len;
590 sge->lkey = rdmab_lkey(rb);
591
592 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
593 DMA_TO_DEVICE);
594 return true;
595 }
596
597 /* If there is a page list present, DMA map and prepare an
598 * SGE for each page to be sent.
599 */
rpcrdma_prepare_pagelist(struct rpcrdma_req * req,struct xdr_buf * xdr)600 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
601 struct xdr_buf *xdr)
602 {
603 struct rpcrdma_sendctx *sc = req->rl_sendctx;
604 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
605 unsigned int page_base, len, remaining;
606 struct page **ppages;
607 struct ib_sge *sge;
608
609 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
610 page_base = offset_in_page(xdr->page_base);
611 remaining = xdr->page_len;
612 while (remaining) {
613 sge = &sc->sc_sges[req->rl_wr.num_sge++];
614 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
615 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
616 page_base, len, DMA_TO_DEVICE);
617 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
618 goto out_mapping_err;
619
620 sge->length = len;
621 sge->lkey = rdmab_lkey(rb);
622
623 sc->sc_unmap_count++;
624 ppages++;
625 remaining -= len;
626 page_base = 0;
627 }
628
629 return true;
630
631 out_mapping_err:
632 trace_xprtrdma_dma_maperr(sge->addr);
633 return false;
634 }
635
636 /* The tail iovec may include an XDR pad for the page list,
637 * as well as additional content, and may not reside in the
638 * same page as the head iovec.
639 */
rpcrdma_prepare_tail_iov(struct rpcrdma_req * req,struct xdr_buf * xdr,unsigned int page_base,unsigned int len)640 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
641 struct xdr_buf *xdr,
642 unsigned int page_base, unsigned int len)
643 {
644 struct rpcrdma_sendctx *sc = req->rl_sendctx;
645 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
646 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
647 struct page *page = virt_to_page(xdr->tail[0].iov_base);
648
649 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
650 DMA_TO_DEVICE);
651 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
652 goto out_mapping_err;
653
654 sge->length = len;
655 sge->lkey = rdmab_lkey(rb);
656 ++sc->sc_unmap_count;
657 return true;
658
659 out_mapping_err:
660 trace_xprtrdma_dma_maperr(sge->addr);
661 return false;
662 }
663
664 /* Copy the tail to the end of the head buffer.
665 */
rpcrdma_pullup_tail_iov(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct xdr_buf * xdr)666 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
667 struct rpcrdma_req *req,
668 struct xdr_buf *xdr)
669 {
670 unsigned char *dst;
671
672 dst = (unsigned char *)xdr->head[0].iov_base;
673 dst += xdr->head[0].iov_len + xdr->page_len;
674 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
675 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
676 }
677
678 /* Copy pagelist content into the head buffer.
679 */
rpcrdma_pullup_pagelist(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct xdr_buf * xdr)680 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
681 struct rpcrdma_req *req,
682 struct xdr_buf *xdr)
683 {
684 unsigned int len, page_base, remaining;
685 struct page **ppages;
686 unsigned char *src, *dst;
687
688 dst = (unsigned char *)xdr->head[0].iov_base;
689 dst += xdr->head[0].iov_len;
690 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
691 page_base = offset_in_page(xdr->page_base);
692 remaining = xdr->page_len;
693 while (remaining) {
694 src = page_address(*ppages);
695 src += page_base;
696 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
697 memcpy(dst, src, len);
698 r_xprt->rx_stats.pullup_copy_count += len;
699
700 ppages++;
701 dst += len;
702 remaining -= len;
703 page_base = 0;
704 }
705 }
706
707 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
708 * When the head, pagelist, and tail are small, a pull-up copy
709 * is considerably less costly than DMA mapping the components
710 * of @xdr.
711 *
712 * Assumptions:
713 * - the caller has already verified that the total length
714 * of the RPC Call body will fit into @rl_sendbuf.
715 */
rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct xdr_buf * xdr)716 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
717 struct rpcrdma_req *req,
718 struct xdr_buf *xdr)
719 {
720 if (unlikely(xdr->tail[0].iov_len))
721 rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
722
723 if (unlikely(xdr->page_len))
724 rpcrdma_pullup_pagelist(r_xprt, req, xdr);
725
726 /* The whole RPC message resides in the head iovec now */
727 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
728 }
729
rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct xdr_buf * xdr)730 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
731 struct rpcrdma_req *req,
732 struct xdr_buf *xdr)
733 {
734 struct kvec *tail = &xdr->tail[0];
735
736 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
737 return false;
738 if (xdr->page_len)
739 if (!rpcrdma_prepare_pagelist(req, xdr))
740 return false;
741 if (tail->iov_len)
742 if (!rpcrdma_prepare_tail_iov(req, xdr,
743 offset_in_page(tail->iov_base),
744 tail->iov_len))
745 return false;
746
747 if (req->rl_sendctx->sc_unmap_count)
748 kref_get(&req->rl_kref);
749 return true;
750 }
751
rpcrdma_prepare_readch(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,struct xdr_buf * xdr)752 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
753 struct rpcrdma_req *req,
754 struct xdr_buf *xdr)
755 {
756 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
757 return false;
758
759 /* If there is a Read chunk, the page list is being handled
760 * via explicit RDMA, and thus is skipped here.
761 */
762
763 /* Do not include the tail if it is only an XDR pad */
764 if (xdr->tail[0].iov_len > 3) {
765 unsigned int page_base, len;
766
767 /* If the content in the page list is an odd length,
768 * xdr_write_pages() adds a pad at the beginning of
769 * the tail iovec. Force the tail's non-pad content to
770 * land at the next XDR position in the Send message.
771 */
772 page_base = offset_in_page(xdr->tail[0].iov_base);
773 len = xdr->tail[0].iov_len;
774 page_base += len & 3;
775 len -= len & 3;
776 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
777 return false;
778 kref_get(&req->rl_kref);
779 }
780
781 return true;
782 }
783
784 /**
785 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
786 * @r_xprt: controlling transport
787 * @req: context of RPC Call being marshalled
788 * @hdrlen: size of transport header, in bytes
789 * @xdr: xdr_buf containing RPC Call
790 * @rtype: chunk type being encoded
791 *
792 * Returns 0 on success; otherwise a negative errno is returned.
793 */
rpcrdma_prepare_send_sges(struct rpcrdma_xprt * r_xprt,struct rpcrdma_req * req,u32 hdrlen,struct xdr_buf * xdr,enum rpcrdma_chunktype rtype)794 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
795 struct rpcrdma_req *req, u32 hdrlen,
796 struct xdr_buf *xdr,
797 enum rpcrdma_chunktype rtype)
798 {
799 int ret;
800
801 ret = -EAGAIN;
802 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
803 if (!req->rl_sendctx)
804 goto out_nosc;
805 req->rl_sendctx->sc_unmap_count = 0;
806 req->rl_sendctx->sc_req = req;
807 kref_init(&req->rl_kref);
808 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
809 req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
810 req->rl_wr.num_sge = 0;
811 req->rl_wr.opcode = IB_WR_SEND;
812
813 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
814
815 ret = -EIO;
816 switch (rtype) {
817 case rpcrdma_noch_pullup:
818 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
819 goto out_unmap;
820 break;
821 case rpcrdma_noch_mapped:
822 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
823 goto out_unmap;
824 break;
825 case rpcrdma_readch:
826 if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
827 goto out_unmap;
828 break;
829 case rpcrdma_areadch:
830 break;
831 default:
832 goto out_unmap;
833 }
834
835 return 0;
836
837 out_unmap:
838 rpcrdma_sendctx_unmap(req->rl_sendctx);
839 out_nosc:
840 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
841 return ret;
842 }
843
844 /**
845 * rpcrdma_marshal_req - Marshal and send one RPC request
846 * @r_xprt: controlling transport
847 * @rqst: RPC request to be marshaled
848 *
849 * For the RPC in "rqst", this function:
850 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
851 * - Registers Read, Write, and Reply chunks
852 * - Constructs the transport header
853 * - Posts a Send WR to send the transport header and request
854 *
855 * Returns:
856 * %0 if the RPC was sent successfully,
857 * %-ENOTCONN if the connection was lost,
858 * %-EAGAIN if the caller should call again with the same arguments,
859 * %-ENOBUFS if the caller should call again after a delay,
860 * %-EMSGSIZE if the transport header is too small,
861 * %-EIO if a permanent problem occurred while marshaling.
862 */
863 int
rpcrdma_marshal_req(struct rpcrdma_xprt * r_xprt,struct rpc_rqst * rqst)864 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
865 {
866 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
867 struct xdr_stream *xdr = &req->rl_stream;
868 enum rpcrdma_chunktype rtype, wtype;
869 struct xdr_buf *buf = &rqst->rq_snd_buf;
870 bool ddp_allowed;
871 __be32 *p;
872 int ret;
873
874 if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
875 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
876 if (ret)
877 return ret;
878 }
879
880 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
881 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
882 rqst);
883
884 /* Fixed header fields */
885 ret = -EMSGSIZE;
886 p = xdr_reserve_space(xdr, 4 * sizeof(*p));
887 if (!p)
888 goto out_err;
889 *p++ = rqst->rq_xid;
890 *p++ = rpcrdma_version;
891 *p++ = r_xprt->rx_buf.rb_max_requests;
892
893 /* When the ULP employs a GSS flavor that guarantees integrity
894 * or privacy, direct data placement of individual data items
895 * is not allowed.
896 */
897 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
898 &rqst->rq_cred->cr_auth->au_flags);
899
900 /*
901 * Chunks needed for results?
902 *
903 * o If the expected result is under the inline threshold, all ops
904 * return as inline.
905 * o Large read ops return data as write chunk(s), header as
906 * inline.
907 * o Large non-read ops return as a single reply chunk.
908 */
909 if (rpcrdma_results_inline(r_xprt, rqst))
910 wtype = rpcrdma_noch;
911 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
912 rpcrdma_nonpayload_inline(r_xprt, rqst))
913 wtype = rpcrdma_writech;
914 else
915 wtype = rpcrdma_replych;
916
917 /*
918 * Chunks needed for arguments?
919 *
920 * o If the total request is under the inline threshold, all ops
921 * are sent as inline.
922 * o Large write ops transmit data as read chunk(s), header as
923 * inline.
924 * o Large non-write ops are sent with the entire message as a
925 * single read chunk (protocol 0-position special case).
926 *
927 * This assumes that the upper layer does not present a request
928 * that both has a data payload, and whose non-data arguments
929 * by themselves are larger than the inline threshold.
930 */
931 if (rpcrdma_args_inline(r_xprt, rqst)) {
932 *p++ = rdma_msg;
933 rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
934 rpcrdma_noch_pullup : rpcrdma_noch_mapped;
935 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
936 *p++ = rdma_msg;
937 rtype = rpcrdma_readch;
938 } else {
939 r_xprt->rx_stats.nomsg_call_count++;
940 *p++ = rdma_nomsg;
941 rtype = rpcrdma_areadch;
942 }
943
944 /* This implementation supports the following combinations
945 * of chunk lists in one RPC-over-RDMA Call message:
946 *
947 * - Read list
948 * - Write list
949 * - Reply chunk
950 * - Read list + Reply chunk
951 *
952 * It might not yet support the following combinations:
953 *
954 * - Read list + Write list
955 *
956 * It does not support the following combinations:
957 *
958 * - Write list + Reply chunk
959 * - Read list + Write list + Reply chunk
960 *
961 * This implementation supports only a single chunk in each
962 * Read or Write list. Thus for example the client cannot
963 * send a Call message with a Position Zero Read chunk and a
964 * regular Read chunk at the same time.
965 */
966 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
967 if (ret)
968 goto out_err;
969 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
970 if (ret)
971 goto out_err;
972 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
973 if (ret)
974 goto out_err;
975
976 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
977 buf, rtype);
978 if (ret)
979 goto out_err;
980
981 trace_xprtrdma_marshal(req, rtype, wtype);
982 return 0;
983
984 out_err:
985 trace_xprtrdma_marshal_failed(rqst, ret);
986 r_xprt->rx_stats.failed_marshal_count++;
987 frwr_reset(req);
988 return ret;
989 }
990
__rpcrdma_update_cwnd_locked(struct rpc_xprt * xprt,struct rpcrdma_buffer * buf,u32 grant)991 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
992 struct rpcrdma_buffer *buf,
993 u32 grant)
994 {
995 buf->rb_credits = grant;
996 xprt->cwnd = grant << RPC_CWNDSHIFT;
997 }
998
rpcrdma_update_cwnd(struct rpcrdma_xprt * r_xprt,u32 grant)999 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
1000 {
1001 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1002
1003 spin_lock(&xprt->transport_lock);
1004 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
1005 spin_unlock(&xprt->transport_lock);
1006 }
1007
1008 /**
1009 * rpcrdma_reset_cwnd - Reset the xprt's congestion window
1010 * @r_xprt: controlling transport instance
1011 *
1012 * Prepare @r_xprt for the next connection by reinitializing
1013 * its credit grant to one (see RFC 8166, Section 3.3.3).
1014 */
rpcrdma_reset_cwnd(struct rpcrdma_xprt * r_xprt)1015 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
1016 {
1017 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1018
1019 spin_lock(&xprt->transport_lock);
1020 xprt->cong = 0;
1021 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
1022 spin_unlock(&xprt->transport_lock);
1023 }
1024
1025 /**
1026 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
1027 * @rqst: controlling RPC request
1028 * @srcp: points to RPC message payload in receive buffer
1029 * @copy_len: remaining length of receive buffer content
1030 * @pad: Write chunk pad bytes needed (zero for pure inline)
1031 *
1032 * The upper layer has set the maximum number of bytes it can
1033 * receive in each component of rq_rcv_buf. These values are set in
1034 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
1035 *
1036 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
1037 * many cases this function simply updates iov_base pointers in
1038 * rq_rcv_buf to point directly to the received reply data, to
1039 * avoid copying reply data.
1040 *
1041 * Returns the count of bytes which had to be memcopied.
1042 */
1043 static unsigned long
rpcrdma_inline_fixup(struct rpc_rqst * rqst,char * srcp,int copy_len,int pad)1044 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
1045 {
1046 unsigned long fixup_copy_count;
1047 int i, npages, curlen;
1048 char *destp;
1049 struct page **ppages;
1050 int page_base;
1051
1052 /* The head iovec is redirected to the RPC reply message
1053 * in the receive buffer, to avoid a memcopy.
1054 */
1055 rqst->rq_rcv_buf.head[0].iov_base = srcp;
1056 rqst->rq_private_buf.head[0].iov_base = srcp;
1057
1058 /* The contents of the receive buffer that follow
1059 * head.iov_len bytes are copied into the page list.
1060 */
1061 curlen = rqst->rq_rcv_buf.head[0].iov_len;
1062 if (curlen > copy_len)
1063 curlen = copy_len;
1064 srcp += curlen;
1065 copy_len -= curlen;
1066
1067 ppages = rqst->rq_rcv_buf.pages +
1068 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
1069 page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
1070 fixup_copy_count = 0;
1071 if (copy_len && rqst->rq_rcv_buf.page_len) {
1072 int pagelist_len;
1073
1074 pagelist_len = rqst->rq_rcv_buf.page_len;
1075 if (pagelist_len > copy_len)
1076 pagelist_len = copy_len;
1077 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
1078 for (i = 0; i < npages; i++) {
1079 curlen = PAGE_SIZE - page_base;
1080 if (curlen > pagelist_len)
1081 curlen = pagelist_len;
1082
1083 destp = kmap_atomic(ppages[i]);
1084 memcpy(destp + page_base, srcp, curlen);
1085 flush_dcache_page(ppages[i]);
1086 kunmap_atomic(destp);
1087 srcp += curlen;
1088 copy_len -= curlen;
1089 fixup_copy_count += curlen;
1090 pagelist_len -= curlen;
1091 if (!pagelist_len)
1092 break;
1093 page_base = 0;
1094 }
1095
1096 /* Implicit padding for the last segment in a Write
1097 * chunk is inserted inline at the front of the tail
1098 * iovec. The upper layer ignores the content of
1099 * the pad. Simply ensure inline content in the tail
1100 * that follows the Write chunk is properly aligned.
1101 */
1102 if (pad)
1103 srcp -= pad;
1104 }
1105
1106 /* The tail iovec is redirected to the remaining data
1107 * in the receive buffer, to avoid a memcopy.
1108 */
1109 if (copy_len || pad) {
1110 rqst->rq_rcv_buf.tail[0].iov_base = srcp;
1111 rqst->rq_private_buf.tail[0].iov_base = srcp;
1112 }
1113
1114 if (fixup_copy_count)
1115 trace_xprtrdma_fixup(rqst, fixup_copy_count);
1116 return fixup_copy_count;
1117 }
1118
1119 /* By convention, backchannel calls arrive via rdma_msg type
1120 * messages, and never populate the chunk lists. This makes
1121 * the RPC/RDMA header small and fixed in size, so it is
1122 * straightforward to check the RPC header's direction field.
1123 */
1124 static bool
rpcrdma_is_bcall(struct rpcrdma_xprt * r_xprt,struct rpcrdma_rep * rep)1125 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1126 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1127 {
1128 struct xdr_stream *xdr = &rep->rr_stream;
1129 __be32 *p;
1130
1131 if (rep->rr_proc != rdma_msg)
1132 return false;
1133
1134 /* Peek at stream contents without advancing. */
1135 p = xdr_inline_decode(xdr, 0);
1136
1137 /* Chunk lists */
1138 if (xdr_item_is_present(p++))
1139 return false;
1140 if (xdr_item_is_present(p++))
1141 return false;
1142 if (xdr_item_is_present(p++))
1143 return false;
1144
1145 /* RPC header */
1146 if (*p++ != rep->rr_xid)
1147 return false;
1148 if (*p != cpu_to_be32(RPC_CALL))
1149 return false;
1150
1151 /* Now that we are sure this is a backchannel call,
1152 * advance to the RPC header.
1153 */
1154 p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1155 if (unlikely(!p))
1156 return true;
1157
1158 rpcrdma_bc_receive_call(r_xprt, rep);
1159 return true;
1160 }
1161 #else /* CONFIG_SUNRPC_BACKCHANNEL */
1162 {
1163 return false;
1164 }
1165 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1166
decode_rdma_segment(struct xdr_stream * xdr,u32 * length)1167 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1168 {
1169 u32 handle;
1170 u64 offset;
1171 __be32 *p;
1172
1173 p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1174 if (unlikely(!p))
1175 return -EIO;
1176
1177 xdr_decode_rdma_segment(p, &handle, length, &offset);
1178 trace_xprtrdma_decode_seg(handle, *length, offset);
1179 return 0;
1180 }
1181
decode_write_chunk(struct xdr_stream * xdr,u32 * length)1182 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1183 {
1184 u32 segcount, seglength;
1185 __be32 *p;
1186
1187 p = xdr_inline_decode(xdr, sizeof(*p));
1188 if (unlikely(!p))
1189 return -EIO;
1190
1191 *length = 0;
1192 segcount = be32_to_cpup(p);
1193 while (segcount--) {
1194 if (decode_rdma_segment(xdr, &seglength))
1195 return -EIO;
1196 *length += seglength;
1197 }
1198
1199 return 0;
1200 }
1201
1202 /* In RPC-over-RDMA Version One replies, a Read list is never
1203 * expected. This decoder is a stub that returns an error if
1204 * a Read list is present.
1205 */
decode_read_list(struct xdr_stream * xdr)1206 static int decode_read_list(struct xdr_stream *xdr)
1207 {
1208 __be32 *p;
1209
1210 p = xdr_inline_decode(xdr, sizeof(*p));
1211 if (unlikely(!p))
1212 return -EIO;
1213 if (unlikely(xdr_item_is_present(p)))
1214 return -EIO;
1215 return 0;
1216 }
1217
1218 /* Supports only one Write chunk in the Write list
1219 */
decode_write_list(struct xdr_stream * xdr,u32 * length)1220 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1221 {
1222 u32 chunklen;
1223 bool first;
1224 __be32 *p;
1225
1226 *length = 0;
1227 first = true;
1228 do {
1229 p = xdr_inline_decode(xdr, sizeof(*p));
1230 if (unlikely(!p))
1231 return -EIO;
1232 if (xdr_item_is_absent(p))
1233 break;
1234 if (!first)
1235 return -EIO;
1236
1237 if (decode_write_chunk(xdr, &chunklen))
1238 return -EIO;
1239 *length += chunklen;
1240 first = false;
1241 } while (true);
1242 return 0;
1243 }
1244
decode_reply_chunk(struct xdr_stream * xdr,u32 * length)1245 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1246 {
1247 __be32 *p;
1248
1249 p = xdr_inline_decode(xdr, sizeof(*p));
1250 if (unlikely(!p))
1251 return -EIO;
1252
1253 *length = 0;
1254 if (xdr_item_is_present(p))
1255 if (decode_write_chunk(xdr, length))
1256 return -EIO;
1257 return 0;
1258 }
1259
1260 static int
rpcrdma_decode_msg(struct rpcrdma_xprt * r_xprt,struct rpcrdma_rep * rep,struct rpc_rqst * rqst)1261 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1262 struct rpc_rqst *rqst)
1263 {
1264 struct xdr_stream *xdr = &rep->rr_stream;
1265 u32 writelist, replychunk, rpclen;
1266 char *base;
1267
1268 /* Decode the chunk lists */
1269 if (decode_read_list(xdr))
1270 return -EIO;
1271 if (decode_write_list(xdr, &writelist))
1272 return -EIO;
1273 if (decode_reply_chunk(xdr, &replychunk))
1274 return -EIO;
1275
1276 /* RDMA_MSG sanity checks */
1277 if (unlikely(replychunk))
1278 return -EIO;
1279
1280 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1281 base = (char *)xdr_inline_decode(xdr, 0);
1282 rpclen = xdr_stream_remaining(xdr);
1283 r_xprt->rx_stats.fixup_copy_count +=
1284 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1285
1286 r_xprt->rx_stats.total_rdma_reply += writelist;
1287 return rpclen + xdr_align_size(writelist);
1288 }
1289
1290 static noinline int
rpcrdma_decode_nomsg(struct rpcrdma_xprt * r_xprt,struct rpcrdma_rep * rep)1291 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1292 {
1293 struct xdr_stream *xdr = &rep->rr_stream;
1294 u32 writelist, replychunk;
1295
1296 /* Decode the chunk lists */
1297 if (decode_read_list(xdr))
1298 return -EIO;
1299 if (decode_write_list(xdr, &writelist))
1300 return -EIO;
1301 if (decode_reply_chunk(xdr, &replychunk))
1302 return -EIO;
1303
1304 /* RDMA_NOMSG sanity checks */
1305 if (unlikely(writelist))
1306 return -EIO;
1307 if (unlikely(!replychunk))
1308 return -EIO;
1309
1310 /* Reply chunk buffer already is the reply vector */
1311 r_xprt->rx_stats.total_rdma_reply += replychunk;
1312 return replychunk;
1313 }
1314
1315 static noinline int
rpcrdma_decode_error(struct rpcrdma_xprt * r_xprt,struct rpcrdma_rep * rep,struct rpc_rqst * rqst)1316 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1317 struct rpc_rqst *rqst)
1318 {
1319 struct xdr_stream *xdr = &rep->rr_stream;
1320 __be32 *p;
1321
1322 p = xdr_inline_decode(xdr, sizeof(*p));
1323 if (unlikely(!p))
1324 return -EIO;
1325
1326 switch (*p) {
1327 case err_vers:
1328 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1329 if (!p)
1330 break;
1331 trace_xprtrdma_err_vers(rqst, p, p + 1);
1332 break;
1333 case err_chunk:
1334 trace_xprtrdma_err_chunk(rqst);
1335 break;
1336 default:
1337 trace_xprtrdma_err_unrecognized(rqst, p);
1338 }
1339
1340 return -EIO;
1341 }
1342
1343 /**
1344 * rpcrdma_unpin_rqst - Release rqst without completing it
1345 * @rep: RPC/RDMA Receive context
1346 *
1347 * This is done when a connection is lost so that a Reply
1348 * can be dropped and its matching Call can be subsequently
1349 * retransmitted on a new connection.
1350 */
rpcrdma_unpin_rqst(struct rpcrdma_rep * rep)1351 void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep)
1352 {
1353 struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt;
1354 struct rpc_rqst *rqst = rep->rr_rqst;
1355 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
1356
1357 req->rl_reply = NULL;
1358 rep->rr_rqst = NULL;
1359
1360 spin_lock(&xprt->queue_lock);
1361 xprt_unpin_rqst(rqst);
1362 spin_unlock(&xprt->queue_lock);
1363 }
1364
1365 /**
1366 * rpcrdma_complete_rqst - Pass completed rqst back to RPC
1367 * @rep: RPC/RDMA Receive context
1368 *
1369 * Reconstruct the RPC reply and complete the transaction
1370 * while @rqst is still pinned to ensure the rep, rqst, and
1371 * rq_task pointers remain stable.
1372 */
rpcrdma_complete_rqst(struct rpcrdma_rep * rep)1373 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1374 {
1375 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1376 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1377 struct rpc_rqst *rqst = rep->rr_rqst;
1378 int status;
1379
1380 switch (rep->rr_proc) {
1381 case rdma_msg:
1382 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1383 break;
1384 case rdma_nomsg:
1385 status = rpcrdma_decode_nomsg(r_xprt, rep);
1386 break;
1387 case rdma_error:
1388 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1389 break;
1390 default:
1391 status = -EIO;
1392 }
1393 if (status < 0)
1394 goto out_badheader;
1395
1396 out:
1397 spin_lock(&xprt->queue_lock);
1398 xprt_complete_rqst(rqst->rq_task, status);
1399 xprt_unpin_rqst(rqst);
1400 spin_unlock(&xprt->queue_lock);
1401 return;
1402
1403 out_badheader:
1404 trace_xprtrdma_reply_hdr_err(rep);
1405 r_xprt->rx_stats.bad_reply_count++;
1406 rqst->rq_task->tk_status = status;
1407 status = 0;
1408 goto out;
1409 }
1410
rpcrdma_reply_done(struct kref * kref)1411 static void rpcrdma_reply_done(struct kref *kref)
1412 {
1413 struct rpcrdma_req *req =
1414 container_of(kref, struct rpcrdma_req, rl_kref);
1415
1416 rpcrdma_complete_rqst(req->rl_reply);
1417 }
1418
1419 /**
1420 * rpcrdma_reply_handler - Process received RPC/RDMA messages
1421 * @rep: Incoming rpcrdma_rep object to process
1422 *
1423 * Errors must result in the RPC task either being awakened, or
1424 * allowed to timeout, to discover the errors at that time.
1425 */
rpcrdma_reply_handler(struct rpcrdma_rep * rep)1426 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1427 {
1428 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1429 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1430 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1431 struct rpcrdma_req *req;
1432 struct rpc_rqst *rqst;
1433 u32 credits;
1434 __be32 *p;
1435
1436 /* Any data means we had a useful conversation, so
1437 * then we don't need to delay the next reconnect.
1438 */
1439 if (xprt->reestablish_timeout)
1440 xprt->reestablish_timeout = 0;
1441
1442 /* Fixed transport header fields */
1443 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1444 rep->rr_hdrbuf.head[0].iov_base, NULL);
1445 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1446 if (unlikely(!p))
1447 goto out_shortreply;
1448 rep->rr_xid = *p++;
1449 rep->rr_vers = *p++;
1450 credits = be32_to_cpu(*p++);
1451 rep->rr_proc = *p++;
1452
1453 if (rep->rr_vers != rpcrdma_version)
1454 goto out_badversion;
1455
1456 if (rpcrdma_is_bcall(r_xprt, rep))
1457 return;
1458
1459 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1460 * get context for handling any incoming chunks.
1461 */
1462 spin_lock(&xprt->queue_lock);
1463 rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1464 if (!rqst)
1465 goto out_norqst;
1466 xprt_pin_rqst(rqst);
1467 spin_unlock(&xprt->queue_lock);
1468
1469 if (credits == 0)
1470 credits = 1; /* don't deadlock */
1471 else if (credits > r_xprt->rx_ep->re_max_requests)
1472 credits = r_xprt->rx_ep->re_max_requests;
1473 rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
1474 false);
1475 if (buf->rb_credits != credits)
1476 rpcrdma_update_cwnd(r_xprt, credits);
1477
1478 req = rpcr_to_rdmar(rqst);
1479 if (unlikely(req->rl_reply))
1480 rpcrdma_rep_put(buf, req->rl_reply);
1481 req->rl_reply = rep;
1482 rep->rr_rqst = rqst;
1483
1484 trace_xprtrdma_reply(rqst->rq_task, rep, credits);
1485
1486 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1487 frwr_reminv(rep, &req->rl_registered);
1488 if (!list_empty(&req->rl_registered))
1489 frwr_unmap_async(r_xprt, req);
1490 /* LocalInv completion will complete the RPC */
1491 else
1492 kref_put(&req->rl_kref, rpcrdma_reply_done);
1493 return;
1494
1495 out_badversion:
1496 trace_xprtrdma_reply_vers_err(rep);
1497 goto out;
1498
1499 out_norqst:
1500 spin_unlock(&xprt->queue_lock);
1501 trace_xprtrdma_reply_rqst_err(rep);
1502 goto out;
1503
1504 out_shortreply:
1505 trace_xprtrdma_reply_short_err(rep);
1506
1507 out:
1508 rpcrdma_rep_put(buf, rep);
1509 }
1510