Merge tag 'nfsd-5.9' of git://git.linux-nfs.org/projects/cel/cel-2.6

Pull NFS server updates from Chuck Lever:
 "Highlights:
   - Support for user extended attributes on NFS (RFC 8276)
   - Further reduce unnecessary NFSv4 delegation recalls

  Notable fixes:
   - Fix recent krb5p regression
   - Address a few resource leaks and a rare NULL dereference

  Other:
   - De-duplicate RPC/RDMA error handling and other utility functions
   - Replace storage and display of kernel memory addresses by tracepoints"

* tag 'nfsd-5.9' of git://git.linux-nfs.org/projects/cel/cel-2.6: (38 commits)
  svcrdma: CM event handler clean up
  svcrdma: Remove transport reference counting
  svcrdma: Fix another Receive buffer leak
  SUNRPC: Refresh the show_rqstp_flags() macro
  nfsd: netns.h: delete a duplicated word
  SUNRPC: Fix ("SUNRPC: Add "@len" parameter to gss_unwrap()")
  nfsd: avoid a NULL dereference in __cld_pipe_upcall()
  nfsd4: a client's own opens needn't prevent delegations
  nfsd: Use seq_putc() in two functions
  svcrdma: Display chunk completion ID when posting a rw_ctxt
  svcrdma: Record send_ctxt completion ID in trace_svcrdma_post_send()
  svcrdma: Introduce Send completion IDs
  svcrdma: Record Receive completion ID in svc_rdma_decode_rqst
  svcrdma: Introduce Receive completion IDs
  svcrdma: Introduce infrastructure to support completion IDs
  svcrdma: Add common XDR encoders for RDMA and Read segments
  svcrdma: Add common XDR decoders for RDMA and Read segments
  SUNRPC: Add helpers for decoding list discriminators symbolically
  svcrdma: Remove declarations for functions long removed
  svcrdma: Clean up trace_svcrdma_send_failed() tracepoint
  ...
This commit is contained in:
Linus Torvalds
2020-08-09 13:58:04 -07:00
32 changed files with 1816 additions and 475 deletions

View File

@@ -584,7 +584,7 @@ gss_unwrap_kerberos_v2(struct krb5_ctx *kctx, int offset, int len,
buf->head[0].iov_len);
memmove(ptr, ptr + GSS_KRB5_TOK_HDR_LEN + headskip, movelen);
buf->head[0].iov_len -= GSS_KRB5_TOK_HDR_LEN + headskip;
buf->len = len - GSS_KRB5_TOK_HDR_LEN + headskip;
buf->len = len - (GSS_KRB5_TOK_HDR_LEN + headskip);
/* Trim off the trailing "extra count" and checksum blob */
xdr_buf_trim(buf, ec + GSS_KRB5_TOK_HDR_LEN + tailskip);

View File

@@ -332,7 +332,7 @@ static struct rsi *rsi_update(struct cache_detail *cd, struct rsi *new, struct r
struct gss_svc_seq_data {
/* highest seq number seen so far: */
int sd_max;
u32 sd_max;
/* for i such that sd_max-GSS_SEQ_WIN < i <= sd_max, the i-th bit of
* sd_win is nonzero iff sequence number i has been seen already: */
unsigned long sd_win[GSS_SEQ_WIN/BITS_PER_LONG];
@@ -613,16 +613,29 @@ gss_svc_searchbyctx(struct cache_detail *cd, struct xdr_netobj *handle)
return found;
}
/* Implements sequence number algorithm as specified in RFC 2203. */
static int
gss_check_seq_num(struct rsc *rsci, int seq_num)
/**
* gss_check_seq_num - GSS sequence number window check
* @rqstp: RPC Call to use when reporting errors
* @rsci: cached GSS context state (updated on return)
* @seq_num: sequence number to check
*
* Implements sequence number algorithm as specified in
* RFC 2203, Section 5.3.3.1. "Context Management".
*
* Return values:
* %true: @rqstp's GSS sequence number is inside the window
* %false: @rqstp's GSS sequence number is outside the window
*/
static bool gss_check_seq_num(const struct svc_rqst *rqstp, struct rsc *rsci,
u32 seq_num)
{
struct gss_svc_seq_data *sd = &rsci->seqdata;
bool result = false;
spin_lock(&sd->sd_lock);
if (seq_num > sd->sd_max) {
if (seq_num >= sd->sd_max + GSS_SEQ_WIN) {
memset(sd->sd_win,0,sizeof(sd->sd_win));
memset(sd->sd_win, 0, sizeof(sd->sd_win));
sd->sd_max = seq_num;
} else while (sd->sd_max < seq_num) {
sd->sd_max++;
@@ -631,17 +644,25 @@ gss_check_seq_num(struct rsc *rsci, int seq_num)
__set_bit(seq_num % GSS_SEQ_WIN, sd->sd_win);
goto ok;
} else if (seq_num <= sd->sd_max - GSS_SEQ_WIN) {
goto drop;
goto toolow;
}
/* sd_max - GSS_SEQ_WIN < seq_num <= sd_max */
if (__test_and_set_bit(seq_num % GSS_SEQ_WIN, sd->sd_win))
goto drop;
goto alreadyseen;
ok:
result = true;
out:
spin_unlock(&sd->sd_lock);
return 1;
drop:
spin_unlock(&sd->sd_lock);
return 0;
return result;
toolow:
trace_rpcgss_svc_seqno_low(rqstp, seq_num,
sd->sd_max - GSS_SEQ_WIN,
sd->sd_max);
goto out;
alreadyseen:
trace_rpcgss_svc_seqno_seen(rqstp, seq_num);
goto out;
}
static inline u32 round_up_to_quad(u32 i)
@@ -721,14 +742,12 @@ gss_verify_header(struct svc_rqst *rqstp, struct rsc *rsci,
}
if (gc->gc_seq > MAXSEQ) {
trace_rpcgss_svc_large_seqno(rqstp->rq_xid, gc->gc_seq);
trace_rpcgss_svc_seqno_large(rqstp, gc->gc_seq);
*authp = rpcsec_gsserr_ctxproblem;
return SVC_DENIED;
}
if (!gss_check_seq_num(rsci, gc->gc_seq)) {
trace_rpcgss_svc_old_seqno(rqstp->rq_xid, gc->gc_seq);
if (!gss_check_seq_num(rqstp, rsci, gc->gc_seq))
return SVC_DROP;
}
return SVC_OK;
}
@@ -866,11 +885,13 @@ read_u32_from_xdr_buf(struct xdr_buf *buf, int base, u32 *obj)
static int
unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct gss_ctx *ctx)
{
u32 integ_len, rseqno, maj_stat;
int stat = -EINVAL;
u32 integ_len, maj_stat;
struct xdr_netobj mic;
struct xdr_buf integ_buf;
mic.data = NULL;
/* NFS READ normally uses splice to send data in-place. However
* the data in cache can change after the reply's MIC is computed
* but before the RPC reply is sent. To prevent the client from
@@ -885,34 +906,44 @@ unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct g
integ_len = svc_getnl(&buf->head[0]);
if (integ_len & 3)
return stat;
goto unwrap_failed;
if (integ_len > buf->len)
return stat;
if (xdr_buf_subsegment(buf, &integ_buf, 0, integ_len)) {
WARN_ON_ONCE(1);
return stat;
}
goto unwrap_failed;
if (xdr_buf_subsegment(buf, &integ_buf, 0, integ_len))
goto unwrap_failed;
/* copy out mic... */
if (read_u32_from_xdr_buf(buf, integ_len, &mic.len))
return stat;
goto unwrap_failed;
if (mic.len > RPC_MAX_AUTH_SIZE)
return stat;
goto unwrap_failed;
mic.data = kmalloc(mic.len, GFP_KERNEL);
if (!mic.data)
return stat;
goto unwrap_failed;
if (read_bytes_from_xdr_buf(buf, integ_len + 4, mic.data, mic.len))
goto out;
goto unwrap_failed;
maj_stat = gss_verify_mic(ctx, &integ_buf, &mic);
if (maj_stat != GSS_S_COMPLETE)
goto out;
if (svc_getnl(&buf->head[0]) != seq)
goto out;
goto bad_mic;
rseqno = svc_getnl(&buf->head[0]);
if (rseqno != seq)
goto bad_seqno;
/* trim off the mic and padding at the end before returning */
xdr_buf_trim(buf, round_up_to_quad(mic.len) + 4);
stat = 0;
out:
kfree(mic.data);
return stat;
unwrap_failed:
trace_rpcgss_svc_unwrap_failed(rqstp);
goto out;
bad_seqno:
trace_rpcgss_svc_seqno_bad(rqstp, seq, rseqno);
goto out;
bad_mic:
trace_rpcgss_svc_mic(rqstp, maj_stat);
goto out;
}
static inline int
@@ -937,6 +968,7 @@ unwrap_priv_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct gs
{
u32 priv_len, maj_stat;
int pad, remaining_len, offset;
u32 rseqno;
clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
@@ -951,14 +983,13 @@ unwrap_priv_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct gs
* not yet read from the head, so these two values are different: */
remaining_len = total_buf_len(buf);
if (priv_len > remaining_len)
return -EINVAL;
goto unwrap_failed;
pad = remaining_len - priv_len;
buf->len -= pad;
fix_priv_head(buf, pad);
maj_stat = gss_unwrap(ctx, 0, priv_len, buf);
pad = priv_len - buf->len;
buf->len -= pad;
/* The upper layers assume the buffer is aligned on 4-byte boundaries.
* In the krb5p case, at least, the data ends up offset, so we need to
* move it around. */
@@ -972,11 +1003,22 @@ unwrap_priv_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct gs
fix_priv_head(buf, pad);
}
if (maj_stat != GSS_S_COMPLETE)
return -EINVAL;
goto bad_unwrap;
out_seq:
if (svc_getnl(&buf->head[0]) != seq)
return -EINVAL;
rseqno = svc_getnl(&buf->head[0]);
if (rseqno != seq)
goto bad_seqno;
return 0;
unwrap_failed:
trace_rpcgss_svc_unwrap_failed(rqstp);
return -EINVAL;
bad_seqno:
trace_rpcgss_svc_seqno_bad(rqstp, seq, rseqno);
return -EINVAL;
bad_unwrap:
trace_rpcgss_svc_unwrap(rqstp, maj_stat);
return -EINVAL;
}
struct gss_svc_data {
@@ -1314,8 +1356,7 @@ static int svcauth_gss_proxy_init(struct svc_rqst *rqstp,
if (status)
goto out;
trace_rpcgss_svc_accept_upcall(rqstp->rq_xid, ud.major_status,
ud.minor_status);
trace_rpcgss_svc_accept_upcall(rqstp, ud.major_status, ud.minor_status);
switch (ud.major_status) {
case GSS_S_CONTINUE_NEEDED:
@@ -1490,8 +1531,6 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
int ret;
struct sunrpc_net *sn = net_generic(SVC_NET(rqstp), sunrpc_net_id);
trace_rpcgss_svc_accept(rqstp->rq_xid, argv->iov_len);
*authp = rpc_autherr_badcred;
if (!svcdata)
svcdata = kmalloc(sizeof(*svcdata), GFP_KERNEL);
@@ -1608,6 +1647,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
GSS_C_QOP_DEFAULT,
gc->gc_svc);
ret = SVC_OK;
trace_rpcgss_svc_authenticate(rqstp, gc);
goto out;
}
garbage_args:

View File

@@ -5,6 +5,9 @@
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/sched.h>
#include <linux/sunrpc/svc.h>
#include <linux/sunrpc/svc_xprt.h>
#include <linux/sunrpc/auth_gss.h>
#include <linux/sunrpc/gss_err.h>
#include <linux/sunrpc/auth_gss.h>

View File

@@ -40,7 +40,6 @@
* New MRs are created on demand.
*/
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"

View File

@@ -275,14 +275,6 @@ out:
return n;
}
static void
xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
{
*iptr++ = cpu_to_be32(mr->mr_handle);
*iptr++ = cpu_to_be32(mr->mr_length);
xdr_encode_hyper(iptr, mr->mr_offset);
}
static int
encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
{
@@ -292,7 +284,7 @@ encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
if (unlikely(!p))
return -EMSGSIZE;
xdr_encode_rdma_segment(p, mr);
xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
return 0;
}
@@ -307,8 +299,8 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return -EMSGSIZE;
*p++ = xdr_one; /* Item present */
*p++ = cpu_to_be32(position);
xdr_encode_rdma_segment(p, mr);
xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
mr->mr_offset);
return 0;
}
@@ -1133,11 +1125,11 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
p = xdr_inline_decode(xdr, 0);
/* Chunk lists */
if (*p++ != xdr_zero)
if (xdr_item_is_present(p++))
return false;
if (*p++ != xdr_zero)
if (xdr_item_is_present(p++))
return false;
if (*p++ != xdr_zero)
if (xdr_item_is_present(p++))
return false;
/* RPC header */
@@ -1176,10 +1168,7 @@ static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
if (unlikely(!p))
return -EIO;
handle = be32_to_cpup(p++);
*length = be32_to_cpup(p++);
xdr_decode_hyper(p, &offset);
xdr_decode_rdma_segment(p, &handle, length, &offset);
trace_xprtrdma_decode_seg(handle, *length, offset);
return 0;
}
@@ -1215,7 +1204,7 @@ static int decode_read_list(struct xdr_stream *xdr)
p = xdr_inline_decode(xdr, sizeof(*p));
if (unlikely(!p))
return -EIO;
if (unlikely(*p != xdr_zero))
if (unlikely(xdr_item_is_present(p)))
return -EIO;
return 0;
}
@@ -1234,7 +1223,7 @@ static int decode_write_list(struct xdr_stream *xdr, u32 *length)
p = xdr_inline_decode(xdr, sizeof(*p));
if (unlikely(!p))
return -EIO;
if (*p == xdr_zero)
if (xdr_item_is_absent(p))
break;
if (!first)
return -EIO;
@@ -1256,7 +1245,7 @@ static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
return -EIO;
*length = 0;
if (*p != xdr_zero)
if (xdr_item_is_present(p))
if (decode_write_chunk(xdr, length))
return -EIO;
return 0;

View File

@@ -87,7 +87,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
*/
get_page(virt_to_page(rqst->rq_buffer));
ctxt->sc_send_wr.opcode = IB_WR_SEND;
return svc_rdma_send(rdma, &ctxt->sc_send_wr);
return svc_rdma_send(rdma, ctxt);
}
/* Server-side transport endpoint wants a whole page for its send

View File

@@ -117,6 +117,13 @@ svc_rdma_next_recv_ctxt(struct list_head *list)
rc_list);
}
static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma,
struct rpc_rdma_cid *cid)
{
cid->ci_queue_id = rdma->sc_rq_cq->res.id;
cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
}
static struct svc_rdma_recv_ctxt *
svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
{
@@ -135,6 +142,8 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
goto fail2;
svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
ctxt->rc_recv_wr.next = NULL;
ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge;
@@ -248,16 +257,15 @@ static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma,
{
int ret;
svc_xprt_get(&rdma->sc_xprt);
trace_svcrdma_post_recv(ctxt);
ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL);
trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret);
if (ret)
goto err_post;
return 0;
err_post:
trace_svcrdma_rq_post_err(rdma, ret);
svc_rdma_recv_ctxt_put(rdma, ctxt);
svc_xprt_put(&rdma->sc_xprt);
return ret;
}
@@ -265,6 +273,8 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
return 0;
ctxt = svc_rdma_recv_ctxt_get(rdma);
if (!ctxt)
return -ENOMEM;
@@ -309,11 +319,10 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_recv_ctxt *ctxt;
trace_svcrdma_wc_receive(wc);
/* WARNING: Only wc->wr_cqe and wc->status are reliable */
ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
trace_svcrdma_wc_receive(wc, &ctxt->rc_cid);
if (wc->status != IB_WC_SUCCESS)
goto flushed;
@@ -333,15 +342,13 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
spin_unlock(&rdma->sc_rq_dto_lock);
if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
svc_xprt_enqueue(&rdma->sc_xprt);
goto out;
return;
flushed:
post_err:
svc_rdma_recv_ctxt_put(rdma, ctxt);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
out:
svc_xprt_put(&rdma->sc_xprt);
}
/**
@@ -419,7 +426,7 @@ static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
len = 0;
first = true;
while (*p != xdr_zero) {
while (xdr_item_is_present(p)) {
p = xdr_inline_decode(&rctxt->rc_stream,
rpcrdma_readseg_maxsz * sizeof(*p));
if (!p)
@@ -466,9 +473,7 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
if (!p)
return false;
handle = be32_to_cpup(p++);
length = be32_to_cpup(p++);
xdr_decode_hyper(p, &offset);
xdr_decode_rdma_segment(p, &handle, &length, &offset);
trace_svcrdma_decode_wseg(handle, length, offset);
total += length;
@@ -500,7 +505,7 @@ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
if (!p)
return false;
rctxt->rc_write_list = p;
while (*p != xdr_zero) {
while (xdr_item_is_present(p)) {
if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK))
return false;
++chcount;
@@ -532,12 +537,11 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
rctxt->rc_reply_chunk = p;
if (*p != xdr_zero) {
rctxt->rc_reply_chunk = NULL;
if (xdr_item_is_present(p)) {
if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK))
return false;
} else {
rctxt->rc_reply_chunk = NULL;
rctxt->rc_reply_chunk = p;
}
return true;
}
@@ -568,7 +572,7 @@ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
p += rpcrdma_fixed_maxsz;
/* Read list */
while (*p++ != xdr_zero) {
while (xdr_item_is_present(p++)) {
p++; /* position */
if (inv_rkey == xdr_zero)
inv_rkey = *p;
@@ -578,7 +582,7 @@ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
}
/* Write list */
while (*p++ != xdr_zero) {
while (xdr_item_is_present(p++)) {
segcount = be32_to_cpup(p++);
for (i = 0; i < segcount; i++) {
if (inv_rkey == xdr_zero)
@@ -590,7 +594,7 @@ static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
}
/* Reply chunk */
if (*p++ != xdr_zero) {
if (xdr_item_is_present(p++)) {
segcount = be32_to_cpup(p++);
for (i = 0; i < segcount; i++) {
if (inv_rkey == xdr_zero)
@@ -661,27 +665,27 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
hdr_len = xdr_stream_pos(&rctxt->rc_stream);
rq_arg->head[0].iov_len -= hdr_len;
rq_arg->len -= hdr_len;
trace_svcrdma_decode_rqst(rdma_argp, hdr_len);
trace_svcrdma_decode_rqst(rctxt, rdma_argp, hdr_len);
return hdr_len;
out_short:
trace_svcrdma_decode_short_err(rq_arg->len);
trace_svcrdma_decode_short_err(rctxt, rq_arg->len);
return -EINVAL;
out_version:
trace_svcrdma_decode_badvers_err(rdma_argp);
trace_svcrdma_decode_badvers_err(rctxt, rdma_argp);
return -EPROTONOSUPPORT;
out_drop:
trace_svcrdma_decode_drop_err(rdma_argp);
trace_svcrdma_decode_drop_err(rctxt, rdma_argp);
return 0;
out_proc:
trace_svcrdma_decode_badproc_err(rdma_argp);
trace_svcrdma_decode_badproc_err(rctxt, rdma_argp);
return -EINVAL;
out_inval:
trace_svcrdma_decode_parse_err(rdma_argp);
trace_svcrdma_decode_parse_err(rctxt, rdma_argp);
return -EINVAL;
}
@@ -714,57 +718,16 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_arg.buflen = head->rc_arg.buflen;
}
static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status)
static void svc_rdma_send_error(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *rctxt,
int status)
{
struct svc_rdma_send_ctxt *ctxt;
__be32 *p;
int ret;
struct svc_rdma_send_ctxt *sctxt;
ctxt = svc_rdma_send_ctxt_get(xprt);
if (!ctxt)
sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt)
return;
p = xdr_reserve_space(&ctxt->sc_stream,
rpcrdma_fixed_maxsz * sizeof(*p));
if (!p)
goto put_ctxt;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = xprt->sc_fc_credits;
*p = rdma_error;
switch (status) {
case -EPROTONOSUPPORT:
p = xdr_reserve_space(&ctxt->sc_stream, 3 * sizeof(*p));
if (!p)
goto put_ctxt;
*p++ = err_vers;
*p++ = rpcrdma_version;
*p = rpcrdma_version;
trace_svcrdma_err_vers(*rdma_argp);
break;
default:
p = xdr_reserve_space(&ctxt->sc_stream, sizeof(*p));
if (!p)
goto put_ctxt;
*p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp);
}
ctxt->sc_send_wr.num_sge = 1;
ctxt->sc_send_wr.opcode = IB_WR_SEND;
ctxt->sc_sges[0].length = ctxt->sc_hdrbuf.len;
ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
if (ret)
goto put_ctxt;
return;
put_ctxt:
svc_rdma_send_ctxt_put(xprt, ctxt);
svc_rdma_send_error_msg(rdma, sctxt, rctxt, status);
}
/* By convention, backchannel calls arrive via rdma_msg type
@@ -900,13 +863,13 @@ out_readchunk:
return 0;
out_err:
svc_rdma_send_error(rdma_xprt, p, ret);
svc_rdma_send_error(rdma_xprt, ctxt, ret);
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return 0;
out_postfail:
if (ret == -EINVAL)
svc_rdma_send_error(rdma_xprt, p, ret);
svc_rdma_send_error(rdma_xprt, ctxt, ret);
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return ret;

View File

@@ -7,6 +7,7 @@
#include <rdma/rw.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
@@ -144,17 +145,25 @@ static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
* demand, and not cached.
*/
struct svc_rdma_chunk_ctxt {
struct rpc_rdma_cid cc_cid;
struct ib_cqe cc_cqe;
struct svcxprt_rdma *cc_rdma;
struct list_head cc_rwctxts;
int cc_sqecount;
};
static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
struct rpc_rdma_cid *cid)
{
cid->ci_queue_id = rdma->sc_sq_cq->res.id;
cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
}
static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc)
{
svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
cc->cc_rdma = rdma;
svc_xprt_get(&rdma->sc_xprt);
INIT_LIST_HEAD(&cc->cc_rwctxts);
cc->cc_sqecount = 0;
@@ -174,7 +183,6 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
ctxt->rw_nents, dir);
svc_rdma_put_rw_ctxt(rdma, ctxt);
}
svc_xprt_put(&rdma->sc_xprt);
}
/* State for sending a Write or Reply chunk.
@@ -236,7 +244,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
struct svc_rdma_write_info *info =
container_of(cc, struct svc_rdma_write_info, wi_cc);
trace_svcrdma_wc_write(wc);
trace_svcrdma_wc_write(wc, &cc->cc_cid);
atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
wake_up(&rdma->sc_send_wait);
@@ -294,7 +302,7 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
struct svc_rdma_read_info *info =
container_of(cc, struct svc_rdma_read_info, ri_cc);
trace_svcrdma_wc_read(wc);
trace_svcrdma_wc_read(wc, &cc->cc_cid);
atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
wake_up(&rdma->sc_send_wait);
@@ -350,6 +358,7 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
do {
if (atomic_sub_return(cc->cc_sqecount,
&rdma->sc_sq_avail) > 0) {
trace_svcrdma_post_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
if (ret)
break;
@@ -441,34 +450,32 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
do {
unsigned int write_len;
u32 seg_length, seg_handle;
u64 seg_offset;
u32 handle, length;
u64 offset;
if (info->wi_seg_no >= info->wi_nsegs)
goto out_overflow;
seg_handle = be32_to_cpup(seg);
seg_length = be32_to_cpup(seg + 1);
xdr_decode_hyper(seg + 2, &seg_offset);
seg_offset += info->wi_seg_off;
xdr_decode_rdma_segment(seg, &handle, &length, &offset);
offset += info->wi_seg_off;
write_len = min(remaining, seg_length - info->wi_seg_off);
write_len = min(remaining, length - info->wi_seg_off);
ctxt = svc_rdma_get_rw_ctxt(rdma,
(write_len >> PAGE_SHIFT) + 2);
if (!ctxt)
return -ENOMEM;
constructor(info, write_len, ctxt);
ret = svc_rdma_rw_ctx_init(rdma, ctxt, seg_offset, seg_handle,
ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle,
DMA_TO_DEVICE);
if (ret < 0)
return -EIO;
trace_svcrdma_send_wseg(seg_handle, write_len, seg_offset);
trace_svcrdma_send_wseg(handle, write_len, offset);
list_add(&ctxt->rw_list, &cc->cc_rwctxts);
cc->cc_sqecount += ret;
if (write_len == seg_length - info->wi_seg_off) {
if (write_len == length - info->wi_seg_off) {
seg += 4;
info->wi_seg_no++;
info->wi_seg_off = 0;
@@ -684,35 +691,24 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
struct svc_rdma_read_info *info,
__be32 *p)
{
unsigned int i;
int ret;
ret = -EINVAL;
info->ri_chunklen = 0;
while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
u32 rs_handle, rs_length;
u64 rs_offset;
u32 handle, length;
u64 offset;
rs_handle = be32_to_cpup(p++);
rs_length = be32_to_cpup(p++);
p = xdr_decode_hyper(p, &rs_offset);
ret = svc_rdma_build_read_segment(info, rqstp,
rs_handle, rs_length,
rs_offset);
p = xdr_decode_rdma_segment(p, &handle, &length, &offset);
ret = svc_rdma_build_read_segment(info, rqstp, handle, length,
offset);
if (ret < 0)
break;
trace_svcrdma_send_rseg(rs_handle, rs_length, rs_offset);
info->ri_chunklen += rs_length;
trace_svcrdma_send_rseg(handle, length, offset);
info->ri_chunklen += length;
}
/* Pages under I/O have been copied to head->rc_pages.
* Prevent their premature release by svc_xprt_release() .
*/
for (i = 0; i < info->ri_readctxt->rc_page_count; i++)
rqstp->rq_pages[i] = NULL;
return ret;
}
@@ -807,6 +803,26 @@ out:
return ret;
}
/* Pages under I/O have been copied to head->rc_pages. Ensure they
* are not released by svc_xprt_release() until the I/O is complete.
*
* This has to be done after all Read WRs are constructed to properly
* handle a page that is part of I/O on behalf of two different RDMA
* segments.
*
* Do this only if I/O has been posted. Otherwise, we do indeed want
* svc_xprt_release() to clean things up properly.
*/
static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
const unsigned int start,
const unsigned int num_pages)
{
unsigned int i;
for (i = start; i < num_pages + start; i++)
rqstp->rq_pages[i] = NULL;
}
/**
* svc_rdma_recv_read_chunk - Pull a Read chunk from the client
* @rdma: controlling RDMA transport
@@ -860,6 +876,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
if (ret < 0)
goto out_err;
svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
return 0;
out_err:

View File

@@ -106,7 +106,6 @@
#include <rdma/rdma_cm.h>
#include <linux/sunrpc/debug.h>
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
@@ -123,6 +122,13 @@ svc_rdma_next_send_ctxt(struct list_head *list)
sc_list);
}
static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
struct rpc_rdma_cid *cid)
{
cid->ci_queue_id = rdma->sc_sq_cq->res.id;
cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
}
static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
{
@@ -145,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
goto fail2;
svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
ctxt->sc_send_wr.next = NULL;
ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
@@ -269,34 +277,33 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct svcxprt_rdma *rdma = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_send_ctxt *ctxt;
struct svc_rdma_send_ctxt *ctxt =
container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
trace_svcrdma_wc_send(wc);
trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
atomic_inc(&rdma->sc_sq_avail);
wake_up(&rdma->sc_send_wait);
ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
svc_rdma_send_ctxt_put(rdma, ctxt);
if (unlikely(wc->status != IB_WC_SUCCESS)) {
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
}
svc_xprt_put(&rdma->sc_xprt);
}
/**
* svc_rdma_send - Post a single Send WR
* @rdma: transport on which to post the WR
* @wr: prepared Send WR to post
* @ctxt: send ctxt with a Send WR ready to post
*
* Returns zero the Send WR was posted successfully. Otherwise, a
* negative errno is returned.
*/
int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
{
struct ib_send_wr *wr = &ctxt->sc_send_wr;
int ret;
might_sleep();
@@ -321,8 +328,7 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
continue;
}
svc_xprt_get(&rdma->sc_xprt);
trace_svcrdma_post_send(wr);
trace_svcrdma_post_send(ctxt);
ret = ib_post_send(rdma->sc_qp, wr, NULL);
if (ret)
break;
@@ -331,7 +337,6 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
trace_svcrdma_sq_post_err(rdma, ret);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_put(&rdma->sc_xprt);
wake_up(&rdma->sc_send_wait);
return ret;
}
@@ -375,11 +380,8 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
if (!p)
return -EMSGSIZE;
handle = be32_to_cpup(src++);
length = be32_to_cpup(src++);
xdr_decode_hyper(src, &offset);
xdr_decode_rdma_segment(src, &handle, &length, &offset);
*p++ = cpu_to_be32(handle);
if (*remaining < length) {
/* segment only partly filled */
length = *remaining;
@@ -388,8 +390,7 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
/* entire segment was consumed */
*remaining -= length;
}
*p++ = cpu_to_be32(length);
xdr_encode_hyper(p, offset);
xdr_encode_rdma_segment(p, handle, length, offset);
trace_svcrdma_encode_wseg(handle, length, offset);
return len;
@@ -801,45 +802,76 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
} else {
sctxt->sc_send_wr.opcode = IB_WR_SEND;
}
return svc_rdma_send(rdma, &sctxt->sc_send_wr);
return svc_rdma_send(rdma, sctxt);
}
/* Given the client-provided Write and Reply chunks, the server was not
* able to form a complete reply. Return an RDMA_ERROR message so the
* client can retire this RPC transaction. As above, the Send completion
* routine releases payload pages that were part of a previous RDMA Write.
/**
* svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
* @rdma: controlling transport context
* @sctxt: Send context for the response
* @rctxt: Receive context for incoming bad message
* @status: negative errno indicating error that occurred
*
* Remote Invalidation is skipped for simplicity.
* Given the client-provided Read, Write, and Reply chunks, the
* server was not able to parse the Call or form a complete Reply.
* Return an RDMA_ERROR message so the client can retire the RPC
* transaction.
*
* The caller does not have to release @sctxt. It is released by
* Send completion, or by this function on error.
*/
static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct svc_rqst *rqstp)
void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
struct svc_rdma_recv_ctxt *rctxt,
int status)
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
__be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *p;
rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
NULL);
rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
sctxt->sc_xprt_buf, NULL);
p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_ERR);
p = xdr_reserve_space(&sctxt->sc_stream,
rpcrdma_fixed_maxsz * sizeof(*p));
if (!p)
return -ENOMSG;
goto put_ctxt;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
*p++ = rdma_error;
*p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp);
*p = rdma_error;
svc_rdma_save_io_pages(rqstp, ctxt);
switch (status) {
case -EPROTONOSUPPORT:
p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
if (!p)
goto put_ctxt;
ctxt->sc_send_wr.num_sge = 1;
ctxt->sc_send_wr.opcode = IB_WR_SEND;
ctxt->sc_sges[0].length = ctxt->sc_hdrbuf.len;
return svc_rdma_send(rdma, &ctxt->sc_send_wr);
*p++ = err_vers;
*p++ = rpcrdma_version;
*p = rpcrdma_version;
trace_svcrdma_err_vers(*rdma_argp);
break;
default:
p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
if (!p)
goto put_ctxt;
*p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp);
}
/* Remote Invalidation is skipped for simplicity. */
sctxt->sc_send_wr.num_sge = 1;
sctxt->sc_send_wr.opcode = IB_WR_SEND;
sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
if (svc_rdma_send(rdma, sctxt))
goto put_ctxt;
return;
put_ctxt:
svc_rdma_send_ctxt_put(rdma, sctxt);
}
/**
@@ -930,15 +962,17 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (ret != -E2BIG && ret != -EINVAL)
goto err1;
ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
if (ret < 0)
goto err1;
/* Send completion releases payload pages that were part
* of previously posted RDMA Writes.
*/
svc_rdma_save_io_pages(rqstp, sctxt);
svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
return 0;
err1:
svc_rdma_send_ctxt_put(rdma, sctxt);
err0:
trace_svcrdma_send_failed(rqstp, ret);
trace_svcrdma_send_err(rqstp, ret);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
return -ENOTCONN;
}

View File

@@ -55,7 +55,6 @@
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/debug.h>
#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_xprt.h>
#include <linux/sunrpc/svc_rdma.h>
@@ -238,65 +237,56 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
svc_xprt_enqueue(&listen_xprt->sc_xprt);
}
/*
* Handles events generated on the listening endpoint. These events will be
* either be incoming connect requests or adapter removal events.
/**
* svc_rdma_listen_handler - Handle CM events generated on a listening endpoint
* @cma_id: the server's listener rdma_cm_id
* @event: details of the event
*
* Return values:
* %0: Do not destroy @cma_id
* %1: Destroy @cma_id (never returned here)
*
* NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners.
*/
static int rdma_listen_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
{
switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
"event = %s (%d)\n", cma_id, cma_id->context,
rdma_event_msg(event->event), event->event);
handle_connect_req(cma_id, &event->param.conn);
break;
default:
/* NB: No device removal upcall for INADDR_ANY listeners */
dprintk("svcrdma: Unexpected event on listening endpoint %p, "
"event = %s (%d)\n", cma_id,
rdma_event_msg(event->event), event->event);
break;
}
return 0;
}
static int rdma_cma_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
/**
* svc_rdma_cma_handler - Handle CM events on client connections
* @cma_id: the server's listener rdma_cm_id
* @event: details of the event
*
* Return values:
* %0: Do not destroy @cma_id
* %1: Destroy @cma_id (never returned here)
*/
static int svc_rdma_cma_handler(struct rdma_cm_id *cma_id,
struct rdma_cm_event *event)
{
struct svcxprt_rdma *rdma = cma_id->context;
struct svc_xprt *xprt = &rdma->sc_xprt;
switch (event->event) {
case RDMA_CM_EVENT_ESTABLISHED:
/* Accept complete */
svc_xprt_get(xprt);
dprintk("svcrdma: Connection completed on DTO xprt=%p, "
"cm_id=%p\n", xprt, cma_id);
clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
svc_xprt_enqueue(xprt);
break;
case RDMA_CM_EVENT_DISCONNECTED:
dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
xprt, cma_id);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt);
svc_xprt_put(xprt);
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
"event = %s (%d)\n", cma_id, xprt,
rdma_event_msg(event->event), event->event);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt);
svc_xprt_put(xprt);
break;
default:
dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
"event = %s (%d)\n", cma_id,
rdma_event_msg(event->event), event->event);
break;
}
return 0;
@@ -322,7 +312,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener");
listen_id = rdma_create_id(net, rdma_listen_handler, cma_xprt,
listen_id = rdma_create_id(net, svc_rdma_listen_handler, cma_xprt,
RDMA_PS_TCP, IB_QPT_RC);
if (IS_ERR(listen_id)) {
ret = PTR_ERR(listen_id);
@@ -486,7 +476,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
goto errout;
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;
newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
/* Construct RDMA-CM private message */
pmsg.cp_magic = rpcrdma_cmp_magic;
@@ -540,24 +530,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
return NULL;
}
/*
* When connected, an svc_xprt has at least two references:
*
* - A reference held by the cm_id between the ESTABLISHED and
* DISCONNECTED events. If the remote peer disconnected first, this
* reference could be gone.
*
* - A reference held by the svc_recv code that called this function
* as part of close processing.
*
* At a minimum one references should still be held.
*/
static void svc_rdma_detach(struct svc_xprt *xprt)
{
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
/* Disconnect and flush posted WQE */
rdma_disconnect(rdma->sc_cm_id);
}
@@ -567,6 +544,7 @@ static void __svc_rdma_free(struct work_struct *work)
container_of(work, struct svcxprt_rdma, sc_work);
struct svc_xprt *xprt = &rdma->sc_xprt;
/* This blocks until the Completion Queues are empty */
if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
ib_drain_qp(rdma->sc_qp);