Merge tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

Pull NFS client updates from Anna Schumaker:
 "Stable bugfixes:
   - Revalidate "." and ".." correctly on open
   - Avoid RCU usage in tracepoints
   - Fix ugly referral attributes
   - Fix a typo in nomigration mount option
   - Revert "NFS: Move the flock open mode check into nfs_flock()"

  Features:
   - Implement a stronger send queue accounting system for NFS over RDMA
   - Switch some atomics to the new refcount_t type

  Other bugfixes and cleanups:
   - Clean up access mode bits
   - Remove special-case revalidations in nfs_opendir()
   - Improve invalidating NFS over RDMA memory for async operations that
     time out
   - Handle NFS over RDMA replies with a worqueue
   - Handle NFS over RDMA sends with a workqueue
   - Fix up replaying interrupted requests
   - Remove dead NFS over RDMA definitions
   - Update NFS over RDMA copyright information
   - Be more consistent with bool initialization and comparisons
   - Mark expected switch fall throughs
   - Various sunrpc tracepoint cleanups
   - Fix various OPEN races
   - Fix a typo in nfs_rename()
   - Use common error handling code in nfs_lock_and_join_request()
   - Check that some structures are properly cleaned up during
     net_exit()
   - Remove net pointer from dprintk()s"

* tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (62 commits)
  NFS: Revert "NFS: Move the flock open mode check into nfs_flock()"
  NFS: Fix typo in nomigration mount option
  nfs: Fix ugly referral attributes
  NFS: super: mark expected switch fall-throughs
  sunrpc: remove net pointer from messages
  nfs: remove net pointer from messages
  sunrpc: exit_net cleanup check added
  nfs client: exit_net cleanup check added
  nfs/write: Use common error handling code in nfs_lock_and_join_requests()
  NFSv4: Replace closed stateids with the "invalid special stateid"
  NFSv4: nfs_set_open_stateid must not trigger state recovery for closed state
  NFSv4: Check the open stateid when searching for expired state
  NFSv4: Clean up nfs4_delegreturn_done
  NFSv4: cleanup nfs4_close_done
  NFSv4: Retry NFS4ERR_OLD_STATEID errors in layoutreturn
  pNFS: Retry NFS4ERR_OLD_STATEID errors in layoutreturn-on-close
  NFSv4: Don't try to CLOSE if the stateid 'other' field has changed
  NFSv4: Retry CLOSE and DELEGRETURN on NFS4ERR_OLD_STATEID.
  NFS: Fix a typo in nfs_rename()
  NFSv4: Fix open create exclusive when the server reboots
  ...
This commit is contained in:
Linus Torvalds
2017-11-17 14:18:00 -08:00
43 changed files with 1194 additions and 719 deletions

View File

@@ -1491,7 +1491,6 @@ rpc_restart_call(struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(rpc_restart_call);
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
const char
*rpc_proc_name(const struct rpc_task *task)
{
@@ -1505,7 +1504,6 @@ const char
} else
return "no proc";
}
#endif
/*
* 0. Initial state
@@ -1519,6 +1517,7 @@ call_start(struct rpc_task *task)
struct rpc_clnt *clnt = task->tk_client;
int idx = task->tk_msg.rpc_proc->p_statidx;
trace_rpc_request(task);
dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
clnt->cl_program->name, clnt->cl_vers,
rpc_proc_name(task),
@@ -1586,6 +1585,7 @@ call_reserveresult(struct rpc_task *task)
switch (status) {
case -ENOMEM:
rpc_delay(task, HZ >> 2);
/* fall through */
case -EAGAIN: /* woken up; retry */
task->tk_action = call_retry_reserve;
return;
@@ -1647,10 +1647,13 @@ call_refreshresult(struct rpc_task *task)
/* Use rate-limiting and a max number of retries if refresh
* had status 0 but failed to update the cred.
*/
/* fall through */
case -ETIMEDOUT:
rpc_delay(task, 3*HZ);
/* fall through */
case -EAGAIN:
status = -EACCES;
/* fall through */
case -EKEYEXPIRED:
if (!task->tk_cred_retry)
break;
@@ -1911,6 +1914,7 @@ call_connect_status(struct rpc_task *task)
task->tk_action = call_bind;
return;
}
/* fall through */
case -ECONNRESET:
case -ECONNABORTED:
case -ENETUNREACH:
@@ -1924,6 +1928,7 @@ call_connect_status(struct rpc_task *task)
break;
/* retry with existing socket, after a delay */
rpc_delay(task, 3*HZ);
/* fall through */
case -EAGAIN:
/* Check for timeouts before looping back to call_bind */
case -ETIMEDOUT:
@@ -2025,6 +2030,7 @@ call_transmit_status(struct rpc_task *task)
rpc_exit(task, task->tk_status);
break;
}
/* fall through */
case -ECONNRESET:
case -ECONNABORTED:
case -EADDRINUSE:
@@ -2145,6 +2151,7 @@ call_status(struct rpc_task *task)
* were a timeout.
*/
rpc_delay(task, 3*HZ);
/* fall through */
case -ETIMEDOUT:
task->tk_action = call_timeout;
break;
@@ -2152,14 +2159,17 @@ call_status(struct rpc_task *task)
case -ECONNRESET:
case -ECONNABORTED:
rpc_force_rebind(clnt);
/* fall through */
case -EADDRINUSE:
rpc_delay(task, 3*HZ);
/* fall through */
case -EPIPE:
case -ENOTCONN:
task->tk_action = call_bind;
break;
case -ENOBUFS:
rpc_delay(task, HZ>>2);
/* fall through */
case -EAGAIN:
task->tk_action = call_transmit;
break;

View File

@@ -1410,8 +1410,8 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)
return PTR_ERR(gssd_dentry);
}
dprintk("RPC: sending pipefs MOUNT notification for net %p%s\n",
net, NET_NAME(net));
dprintk("RPC: sending pipefs MOUNT notification for net %x%s\n",
net->ns.inum, NET_NAME(net));
mutex_lock(&sn->pipefs_sb_lock);
sn->pipefs_sb = sb;
err = blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
@@ -1462,8 +1462,8 @@ static void rpc_kill_sb(struct super_block *sb)
goto out;
}
sn->pipefs_sb = NULL;
dprintk("RPC: sending pipefs UMOUNT notification for net %p%s\n",
net, NET_NAME(net));
dprintk("RPC: sending pipefs UMOUNT notification for net %x%s\n",
net->ns.inum, NET_NAME(net));
blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
RPC_PIPEFS_UMOUNT,
sb);

View File

@@ -216,9 +216,9 @@ static void rpcb_set_local(struct net *net, struct rpc_clnt *clnt,
smp_wmb();
sn->rpcb_users = 1;
dprintk("RPC: created new rpcb local clients (rpcb_local_clnt: "
"%p, rpcb_local_clnt4: %p) for net %p%s\n",
sn->rpcb_local_clnt, sn->rpcb_local_clnt4,
net, (net == &init_net) ? " (init_net)" : "");
"%p, rpcb_local_clnt4: %p) for net %x%s\n",
sn->rpcb_local_clnt, sn->rpcb_local_clnt4,
net->ns.inum, (net == &init_net) ? " (init_net)" : "");
}
/*

View File

@@ -274,10 +274,9 @@ static inline void rpc_task_set_debuginfo(struct rpc_task *task)
static void rpc_set_active(struct rpc_task *task)
{
trace_rpc_task_begin(task->tk_client, task, NULL);
rpc_task_set_debuginfo(task);
set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
trace_rpc_task_begin(task->tk_client, task, NULL);
}
/*

View File

@@ -65,10 +65,13 @@ err_proc:
static __net_exit void sunrpc_exit_net(struct net *net)
{
struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
rpc_pipefs_exit_net(net);
unix_gid_cache_destroy(net);
ip_map_cache_destroy(net);
rpc_proc_exit(net);
WARN_ON_ONCE(!list_empty(&sn->all_clients));
}
static struct pernet_operations sunrpc_net_ops = {

View File

@@ -1139,6 +1139,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
case -EAGAIN:
xprt_add_backlog(xprt, task);
dprintk("RPC: waiting for request slot\n");
/* fall through */
default:
task->tk_status = -EAGAIN;
}

View File

@@ -43,7 +43,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req))
return PTR_ERR(req);
req->rl_backchannel = true;
__set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags);
rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
DMA_TO_DEVICE, GFP_KERNEL);
@@ -223,8 +223,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*p++ = xdr_zero;
*p = xdr_zero;
if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
return -EIO;
return 0;
}

View File

@@ -306,28 +306,9 @@ out_reset:
}
}
/* Use a slow, safe mechanism to invalidate all memory regions
* that were registered for "req".
*/
static void
fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
bool sync)
{
struct rpcrdma_mw *mw;
while (!list_empty(&req->rl_registered)) {
mw = rpcrdma_pop_mw(&req->rl_registered);
if (sync)
fmr_op_recover_mr(mw);
else
rpcrdma_defer_mr_recovery(mw);
}
}
const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
.ro_unmap_safe = fmr_op_unmap_safe,
.ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,

View File

@@ -420,7 +420,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
if (rc)
goto out_senderr;
@@ -508,12 +507,6 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
f->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&f->fr_linv_done);
/* Initialize CQ count, since there is always a signaled
* WR being posted here. The new cqcount depends on how
* many SQEs are about to be consumed.
*/
rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
/* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
@@ -546,7 +539,6 @@ reset_mrs:
/* Find and reset the MRs in the LOCAL_INV WRs that did not
* get posted.
*/
rpcrdma_init_cqcount(&r_xprt->rx_ep, -count);
while (bad_wr) {
f = container_of(bad_wr, struct rpcrdma_frmr,
fr_invwr);
@@ -559,28 +551,9 @@ reset_mrs:
goto unmap;
}
/* Use a slow, safe mechanism to invalidate all memory regions
* that were registered for "req".
*/
static void
frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
bool sync)
{
struct rpcrdma_mw *mw;
while (!list_empty(&req->rl_registered)) {
mw = rpcrdma_pop_mw(&req->rl_registered);
if (sync)
frwr_op_recover_mr(mw);
else
rpcrdma_defer_mr_recovery(mw);
}
}
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
.ro_unmap_safe = frwr_op_unmap_safe,
.ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,

View File

@@ -1,4 +1,5 @@
/*
* Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -75,11 +76,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
/* Maximum Read list size */
maxsegs += 2; /* segment for head and tail buffers */
size = maxsegs * sizeof(struct rpcrdma_read_chunk);
size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
/* Minimal Read chunk size */
size += sizeof(__be32); /* segment count */
size += sizeof(struct rpcrdma_segment);
size += rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max call header size = %u\n",
@@ -102,7 +103,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
/* Maximum Write list size */
maxsegs += 2; /* segment for head and tail buffers */
size = sizeof(__be32); /* segment count */
size += maxsegs * sizeof(struct rpcrdma_segment);
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max reply header size = %u\n",
@@ -511,27 +512,60 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0;
}
/* Prepare the RPC-over-RDMA header SGE.
/**
* rpcrdma_unmap_sendctx - DMA-unmap Send buffers
* @sc: sendctx containing SGEs to unmap
*
*/
void
rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
{
struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
struct ib_sge *sge;
unsigned int count;
dprintk("RPC: %s: unmapping %u sges for sc=%p\n",
__func__, sc->sc_unmap_count, sc);
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
*/
sge = &sc->sc_sges[2];
for (count = sc->sc_unmap_count; count; ++sge, --count)
ib_dma_unmap_page(ia->ri_device,
sge->addr, sge->length, DMA_TO_DEVICE);
if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
smp_mb__after_atomic();
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
}
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
*/
static bool
rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
u32 len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
struct ib_sge *sge = &req->rl_send_sge[0];
struct ib_sge *sge = sc->sc_sges;
if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
if (!__rpcrdma_dma_map_regbuf(ia, rb))
return false;
sge->addr = rdmab_addr(rb);
sge->lkey = rdmab_lkey(rb);
}
if (!rpcrdma_dma_map_regbuf(ia, rb))
goto out_regbuf;
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
sge->length, DMA_TO_DEVICE);
req->rl_send_wr.num_sge++;
sc->sc_wr.num_sge++;
return true;
out_regbuf:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
return false;
}
/* Prepare the Send SGEs. The head and tail iovec, and each entry
@@ -541,10 +575,11 @@ static bool
rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
struct ib_device *device = ia->ri_device;
struct ib_sge *sge = req->rl_send_sge;
struct ib_sge *sge = sc->sc_sges;
u32 lkey = ia->ri_pd->local_dma_lkey;
struct page *page, **ppages;
@@ -552,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
* DMA-mapped. Sync the content that has changed.
*/
if (!rpcrdma_dma_map_regbuf(ia, rb))
return false;
goto out_regbuf;
sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len;
@@ -607,7 +642,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
sge[sge_no].length = len;
sge[sge_no].lkey = lkey;
req->rl_mapped_sges++;
sc->sc_unmap_count++;
ppages++;
remaining -= len;
page_base = 0;
@@ -633,56 +668,61 @@ map_tail:
goto out_mapping_err;
sge[sge_no].length = len;
sge[sge_no].lkey = lkey;
req->rl_mapped_sges++;
sc->sc_unmap_count++;
}
out:
req->rl_send_wr.num_sge = sge_no + 1;
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
return true;
out_regbuf:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
return false;
out_mapping_overflow:
rpcrdma_unmap_sendctx(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false;
out_mapping_err:
rpcrdma_unmap_sendctx(sc);
pr_err("rpcrdma: Send mapping error\n");
return false;
}
bool
rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
u32 hdrlen, struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype)
/**
* rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
* @r_xprt: controlling transport
* @req: context of RPC Call being marshalled
* @hdrlen: size of transport header, in bytes
* @xdr: xdr_buf containing RPC Call
* @rtype: chunk type being encoded
*
* Returns 0 on success; otherwise a negative errno is returned.
*/
int
rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
req->rl_send_wr.num_sge = 0;
req->rl_mapped_sges = 0;
req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
if (!req->rl_sendctx)
return -ENOBUFS;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
goto out_map;
if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
return -EIO;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
goto out_map;
if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
return -EIO;
return true;
out_map:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
return false;
}
void
rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
struct ib_sge *sge;
int count;
sge = &req->rl_send_sge[2];
for (count = req->rl_mapped_sges; count--; sge++)
ib_dma_unmap_page(device, sge->addr, sge->length,
DMA_TO_DEVICE);
req->rl_mapped_sges = 0;
return 0;
}
/**
@@ -833,12 +873,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
transfertypes[rtype], transfertypes[wtype],
xdr_stream_pos(xdr));
if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req,
xdr_stream_pos(xdr),
&rqst->rq_snd_buf, rtype)) {
ret = -EIO;
ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
&rqst->rq_snd_buf, rtype);
if (ret)
goto out_err;
}
return 0;
out_err:
@@ -970,14 +1008,13 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
* straightforward to check the RPC header's direction field.
*/
static bool
rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
__be32 xid, __be32 proc)
rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
{
struct xdr_stream *xdr = &rep->rr_stream;
__be32 *p;
if (proc != rdma_msg)
if (rep->rr_proc != rdma_msg)
return false;
/* Peek at stream contents without advancing. */
@@ -992,7 +1029,7 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
return false;
/* RPC header */
if (*p++ != xid)
if (*p++ != rep->rr_xid)
return false;
if (*p != cpu_to_be32(RPC_CALL))
return false;
@@ -1212,78 +1249,21 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
return -EREMOTEIO;
}
/* Process received RPC/RDMA messages.
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
/* Perform XID lookup, reconstruction of the RPC reply, and
* RPC completion while holding the transport lock to ensure
* the rep, rqst, and rq_task pointers remain stable.
*/
void
rpcrdma_reply_handler(struct work_struct *work)
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct xdr_stream *xdr = &rep->rr_stream;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
__be32 *p, xid, vers, proc;
struct rpc_rqst *rqst = rep->rr_rqst;
unsigned long cwnd;
int status;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_hdrbuf.head[0].iov_len == 0)
goto out_badstatus;
xdr_init_decode(xdr, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base);
/* Fixed transport header fields */
p = xdr_inline_decode(xdr, 4 * sizeof(*p));
if (unlikely(!p))
goto out_shortreply;
xid = *p++;
vers = *p++;
p++; /* credits */
proc = *p++;
if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
return;
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
spin_lock(&xprt->recv_lock);
rqst = xprt_lookup_rqst(xprt, xid);
if (!rqst)
goto out_norqst;
xprt_pin_rqst(rqst);
spin_unlock(&xprt->recv_lock);
req = rpcr_to_rdmar(rqst);
req->rl_reply = rep;
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(xid));
/* Invalidate and unmap the data payloads before waking the
* waiting application. This guarantees the memory regions
* are properly fenced from the server before the application
* accesses the data. It also ensures proper send flow control:
* waking the next RPC waits until this RPC has relinquished
* all its Send Queue entries.
*/
if (!list_empty(&req->rl_registered)) {
rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
&req->rl_registered);
}
xprt->reestablish_timeout = 0;
if (vers != rpcrdma_version)
goto out_badversion;
switch (proc) {
switch (rep->rr_proc) {
case rdma_msg:
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
break;
@@ -1302,15 +1282,137 @@ rpcrdma_reply_handler(struct work_struct *work)
out:
spin_lock(&xprt->recv_lock);
cwnd = xprt->cwnd;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
xprt_complete_rqst(rqst->rq_task, status);
xprt_unpin_rqst(rqst);
spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
return;
/* If the incoming reply terminated a pending RPC, the next
* RPC call will post a replacement receive buffer as it is
* being marshaled.
*/
out_badheader:
dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
r_xprt->rx_stats.bad_reply_count++;
status = -EIO;
goto out;
}
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
/* Invalidate and unmap the data payloads before waking
* the waiting application. This guarantees the memory
* regions are properly fenced from the server before the
* application accesses the data. It also ensures proper
* send flow control: waking the next RPC waits until this
* RPC has relinquished all its Send Queue entries.
*/
if (!list_empty(&req->rl_registered))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
&req->rl_registered);
/* Ensure that any DMA mapped pages associated with
* the Send of the RPC Call have been unmapped before
* allowing the RPC to complete. This protects argument
* memory not controlled by the RPC client from being
* re-used before we're done with it.
*/
if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
r_xprt->rx_stats.reply_waits_for_send++;
out_of_line_wait_on_bit(&req->rl_flags,
RPCRDMA_REQ_F_TX_RESOURCES,
bit_wait,
TASK_UNINTERRUPTIBLE);
}
}
/* Reply handling runs in the poll worker thread. Anything that
* might wait is deferred to a separate workqueue.
*/
void rpcrdma_deferred_completion(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
rpcrdma_release_rqst(rep->rr_rxprt, req);
rpcrdma_complete_rqst(rep);
}
/* Process received RPC/RDMA messages.
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
*/
void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
{
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
u32 credits;
__be32 *p;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_hdrbuf.head[0].iov_len == 0)
goto out_badstatus;
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base);
/* Fixed transport header fields */
p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
if (unlikely(!p))
goto out_shortreply;
rep->rr_xid = *p++;
rep->rr_vers = *p++;
credits = be32_to_cpu(*p++);
rep->rr_proc = *p++;
if (rep->rr_vers != rpcrdma_version)
goto out_badversion;
if (rpcrdma_is_bcall(r_xprt, rep))
return;
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
spin_lock(&xprt->recv_lock);
rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
if (!rqst)
goto out_norqst;
xprt_pin_rqst(rqst);
if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > buf->rb_max_requests)
credits = buf->rb_max_requests;
buf->rb_credits = credits;
spin_unlock(&xprt->recv_lock);
req = rpcr_to_rdmar(rqst);
req->rl_reply = rep;
rep->rr_rqst = rqst;
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(rep->rr_xid));
if (list_empty(&req->rl_registered) &&
!test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
rpcrdma_complete_rqst(rep);
else
queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
out_badstatus:
@@ -1321,37 +1423,22 @@ out_badstatus:
}
return;
/* If the incoming reply terminated a pending RPC, the next
* RPC call will post a replacement receive buffer as it is
* being marshaled.
*/
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(vers));
status = -EIO;
r_xprt->rx_stats.bad_reply_count++;
goto out;
__func__, be32_to_cpu(rep->rr_vers));
goto repost;
out_badheader:
dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
r_xprt->rx_stats.bad_reply_count++;
status = -EIO;
goto out;
/* The req was still available, but by the time the recv_lock
* was acquired, the rqst and task had been released. Thus the RPC
* has already been terminated.
/* The RPC transaction has already been terminated, or the header
* is corrupt.
*/
out_norqst:
spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
__func__, be32_to_cpu(xid));
__func__, be32_to_cpu(rep->rr_xid));
goto repost;
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
/* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning.

View File

@@ -1,4 +1,5 @@
/*
* Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -678,16 +679,14 @@ xprt_rdma_free(struct rpc_task *task)
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
if (req->rl_backchannel)
if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags))
return;
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
if (!list_empty(&req->rl_registered))
ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_unmap_sges(ia, req);
if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
rpcrdma_release_rqst(r_xprt, req);
rpcrdma_buffer_put(req);
}
@@ -728,7 +727,8 @@ xprt_rdma_send_request(struct rpc_task *task)
/* On retransmit, remove any previously registered chunks */
if (unlikely(!list_empty(&req->rl_registered)))
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
&req->rl_registered);
rc = rpcrdma_marshal_req(r_xprt, rqst);
if (rc < 0)
@@ -742,6 +742,7 @@ xprt_rdma_send_request(struct rpc_task *task)
goto drop_connection;
req->rl_connect_cookie = xprt->connect_cookie;
set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
@@ -789,11 +790,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
seq_printf(seq, "%lu %lu %lu %lu\n",
seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
r_xprt->rx_stats.mrs_recovered,
r_xprt->rx_stats.mrs_orphaned,
r_xprt->rx_stats.mrs_allocated,
r_xprt->rx_stats.local_inv_needed);
r_xprt->rx_stats.local_inv_needed,
r_xprt->rx_stats.empty_sendctx_q,
r_xprt->rx_stats.reply_waits_for_send);
}
static int

View File

@@ -1,4 +1,5 @@
/*
* Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -49,9 +50,10 @@
#include <linux/interrupt.h>
#include <linux/slab.h>
#include <linux/prefetch.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
#include <asm-generic/barrier.h>
#include <asm/bitops.h>
#include <rdma/ib_cm.h>
@@ -73,7 +75,7 @@ static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
static struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
int
rpcrdma_alloc_wq(void)
@@ -126,30 +128,17 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
static void
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_sendctx *sc =
container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
/* WARNING: Only wr_cqe and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
}
/* Perform basic sanity checking to avoid using garbage
* to update the credit grant value.
*/
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
__be32 *p = rep->rr_rdmabuf->rg_base;
u32 credits;
credits = be32_to_cpup(p + 2);
if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > buffer->rb_max_requests)
credits = buffer->rb_max_requests;
atomic_set(&buffer->rb_credits, credits);
rpcrdma_sendctx_put_locked(sc);
}
/**
@@ -181,11 +170,8 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
if (wc->byte_len >= RPCRDMA_HDRLEN_ERR)
rpcrdma_update_granted_credits(rep);
out_schedule:
queue_work(rpcrdma_receive_wq, &rep->rr_work);
rpcrdma_reply_handler(rep);
return;
out_fail:
@@ -295,7 +281,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_DISCONNECTED:
connstate = -ECONNABORTED;
connected:
atomic_set(&xprt->rx_buf.rb_credits, 1);
xprt->rx_buf.rb_credits = 1;
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
@@ -564,16 +550,15 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_recv_sge);
/* set trigger for requesting send completion */
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
if (ep->rep_cqinit <= 2)
ep->rep_cqinit = 0; /* always signal? */
rpcrdma_init_cqcount(ep, 0);
ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
cdata->max_requests >> 2);
ep->rep_send_count = ep->rep_send_batch;
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
sendcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_send_wr + 1,
0, IB_POLL_SOFTIRQ);
1, IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -583,7 +568,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
recvcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_recv_wr + 1,
0, IB_POLL_SOFTIRQ);
0, IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -846,6 +831,168 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_drain_qp(ia->ri_id->qp);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
* lock-free.
*
* Consumer is the code path that posts Sends. This path dequeues a
* sendctx for use by a Send operation. Multiple consumer threads
* are serialized by the RPC transport lock, which allows only one
* ->send_request call at a time.
*
* Producer is the code path that handles Send completions. This path
* enqueues a sendctx that has been completed. Multiple producer
* threads are serialized by the ib_poll_cq() function.
*/
/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
* queue activity, and ib_drain_qp has flushed all remaining Send
* requests.
*/
static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
{
unsigned long i;
for (i = 0; i <= buf->rb_sc_last; i++)
kfree(buf->rb_sc_ctxs[i]);
kfree(buf->rb_sc_ctxs);
}
static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
{
struct rpcrdma_sendctx *sc;
sc = kzalloc(sizeof(*sc) +
ia->ri_max_send_sges * sizeof(struct ib_sge),
GFP_KERNEL);
if (!sc)
return NULL;
sc->sc_wr.wr_cqe = &sc->sc_cqe;
sc->sc_wr.sg_list = sc->sc_sges;
sc->sc_wr.opcode = IB_WR_SEND;
sc->sc_cqe.done = rpcrdma_wc_send;
return sc;
}
static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_sendctx *sc;
unsigned long i;
/* Maximum number of concurrent outstanding Send WRs. Capping
* the circular queue size stops Send Queue overflow by causing
* the ->send_request call to fail temporarily before too many
* Sends are posted.
*/
i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
if (!buf->rb_sc_ctxs)
return -ENOMEM;
buf->rb_sc_last = i - 1;
for (i = 0; i <= buf->rb_sc_last; i++) {
sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
if (!sc)
goto out_destroy;
sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
return 0;
out_destroy:
rpcrdma_sendctxs_destroy(buf);
return -ENOMEM;
}
/* The sendctx queue is not guaranteed to have a size that is a
* power of two, thus the helpers in circ_buf.h cannot be used.
* The other option is to use modulus (%), which can be expensive.
*/
static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
unsigned long item)
{
return likely(item < buf->rb_sc_last) ? item + 1 : 0;
}
/**
* rpcrdma_sendctx_get_locked - Acquire a send context
* @buf: transport buffers from which to acquire an unused context
*
* Returns pointer to a free send completion context; or NULL if
* the queue is empty.
*
* Usage: Called to acquire an SGE array before preparing a Send WR.
*
* The caller serializes calls to this function (per rpcrdma_buffer),
* and provides an effective memory barrier that flushes the new value
* of rb_sc_head.
*/
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
{
struct rpcrdma_xprt *r_xprt;
struct rpcrdma_sendctx *sc;
unsigned long next_head;
next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
if (next_head == READ_ONCE(buf->rb_sc_tail))
goto out_emptyq;
/* ORDER: item must be accessed _before_ head is updated */
sc = buf->rb_sc_ctxs[next_head];
/* Releasing the lock in the caller acts as a memory
* barrier that flushes rb_sc_head.
*/
buf->rb_sc_head = next_head;
return sc;
out_emptyq:
/* The queue is "empty" if there have not been enough Send
* completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again.
*/
dprintk("RPC: %s: empty sendctx queue\n", __func__);
r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
}
/**
* rpcrdma_sendctx_put_locked - Release a send context
* @sc: send context to release
*
* Usage: Called from Send completion to return a sendctxt
* to the queue.
*
* The caller serializes calls to this function (per rpcrdma_buffer).
*/
void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
{
struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
unsigned long next_tail;
/* Unmap SGEs of previously completed by unsignaled
* Sends by walking up the queue until @sc is found.
*/
next_tail = buf->rb_sc_tail;
do {
next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */
rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
} while (buf->rb_sc_ctxs[next_tail] != sc);
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
}
static void
rpcrdma_mr_recovery_worker(struct work_struct *work)
{
@@ -941,13 +1088,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
INIT_LIST_HEAD(&req->rl_registered);
req->rl_send_wr.next = NULL;
req->rl_send_wr.wr_cqe = &req->rl_cqe;
req->rl_send_wr.sg_list = req->rl_send_sge;
req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}
@@ -974,7 +1116,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
@@ -995,7 +1137,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
atomic_set(&buf->rb_credits, 1);
spin_lock_init(&buf->rb_mwlock);
spin_lock_init(&buf->rb_lock);
spin_lock_init(&buf->rb_recovery_lock);
@@ -1022,7 +1163,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
req->rl_backchannel = false;
list_add(&req->rl_list, &buf->rb_send_bufs);
}
@@ -1040,6 +1180,10 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
goto out;
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -1116,6 +1260,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
cancel_delayed_work_sync(&buf->rb_recovery_worker);
cancel_delayed_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
@@ -1231,7 +1377,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply;
req->rl_send_wr.num_sge = 0;
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
@@ -1363,7 +1508,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
struct ib_send_wr *send_wr_fail;
int rc;
@@ -1377,7 +1522,14 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);
rpcrdma_set_signaled(ep, send_wr);
if (!ep->rep_send_count ||
test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch;
} else {
send_wr->send_flags &= ~IB_SEND_SIGNALED;
--ep->rep_send_count;
}
rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
if (rc)
goto out_postsend_err;

View File

@@ -1,4 +1,5 @@
/*
* Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -93,8 +94,8 @@ enum {
*/
struct rpcrdma_ep {
atomic_t rep_cqcount;
int rep_cqinit;
unsigned int rep_send_count;
unsigned int rep_send_batch;
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
@@ -104,25 +105,6 @@ struct rpcrdma_ep {
struct delayed_work rep_connect_worker;
};
static inline void
rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
{
atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
}
/* To update send queue accounting, provider must take a
* send completion every now and then.
*/
static inline void
rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
{
send_wr->send_flags = 0;
if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
rpcrdma_init_cqcount(ep, 0);
send_wr->send_flags = IB_SEND_SIGNALED;
}
}
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
* allocated when the forward channel is set up.
@@ -164,12 +146,6 @@ rdmab_lkey(struct rpcrdma_regbuf *rb)
return rb->rg_iov.lkey;
}
static inline struct rpcrdma_msg *
rdmab_to_msg(struct rpcrdma_regbuf *rb)
{
return (struct rpcrdma_msg *)rb->rg_base;
}
static inline struct ib_device *
rdmab_device(struct rpcrdma_regbuf *rb)
{
@@ -202,22 +178,24 @@ enum {
};
/*
* struct rpcrdma_rep -- this structure encapsulates state required to recv
* and complete a reply, asychronously. It needs several pieces of
* state:
* o recv buffer (posted to provider)
* o ib_sge (also donated to provider)
* o status of reply (length, success or not)
* o bookkeeping state to get run by reply handler (list, etc)
* struct rpcrdma_rep -- this structure encapsulates state required
* to receive and complete an RPC Reply, asychronously. It needs
* several pieces of state:
*
* These are allocated during initialization, per-transport instance.
* o receive buffer and ib_sge (donated to provider)
* o status of receive (success or not, length, inv rkey)
* o bookkeeping state to get run by reply handler (XDR stream)
*
* N of these are associated with a transport instance, and stored in
* struct rpcrdma_buffer. N is the max number of outstanding requests.
* These structures are allocated during transport initialization.
* N of these are associated with a transport instance, managed by
* struct rpcrdma_buffer. N is the max number of outstanding RPCs.
*/
struct rpcrdma_rep {
struct ib_cqe rr_cqe;
__be32 rr_xid;
__be32 rr_vers;
__be32 rr_proc;
int rr_wc_flags;
u32 rr_inv_rkey;
struct rpcrdma_regbuf *rr_rdmabuf;
@@ -225,10 +203,34 @@ struct rpcrdma_rep {
struct work_struct rr_work;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
struct rpc_rqst *rr_rqst;
struct list_head rr_list;
struct ib_recv_wr rr_recv_wr;
};
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
*/
struct rpcrdma_req;
struct rpcrdma_xprt;
struct rpcrdma_sendctx {
struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe;
struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count;
struct ib_sge sc_sges[];
};
/* Limit the number of SGEs that can be unmapped during one
* Send completion. This caps the amount of work a single
* completion can do before returning to the provider.
*
* Setting this to zero disables Send completion batching.
*/
enum {
RPCRDMA_MAX_SEND_BATCH = 7,
};
/*
* struct rpcrdma_mw - external memory region metadata
*
@@ -340,26 +342,30 @@ enum {
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_list;
unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf;
struct ib_send_wr rl_send_wr;
struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
struct rpcrdma_sendctx *rl_sendctx;
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
unsigned long rl_flags;
struct list_head rl_registered; /* registered segments */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
/* rl_flags */
enum {
RPCRDMA_REQ_F_BACKCHANNEL = 0,
RPCRDMA_REQ_F_PENDING,
RPCRDMA_REQ_F_TX_RESOURCES,
};
static inline void
rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req)
{
@@ -399,12 +405,17 @@ struct rpcrdma_buffer {
struct list_head rb_mws;
struct list_head rb_all;
unsigned long rb_sc_head;
unsigned long rb_sc_tail;
unsigned long rb_sc_last;
struct rpcrdma_sendctx **rb_sc_ctxs;
spinlock_t rb_lock; /* protect buf lists */
int rb_send_count, rb_recv_count;
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
atomic_t rb_credits; /* most recent credit grant */
u32 rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
@@ -453,10 +464,12 @@ struct rpcrdma_stats {
unsigned long mrs_recovered;
unsigned long mrs_orphaned;
unsigned long mrs_allocated;
unsigned long empty_sendctx_q;
/* accessed when receiving a reply */
unsigned long long total_rdma_reply;
unsigned long long fixup_copy_count;
unsigned long reply_waits_for_send;
unsigned long local_inv_needed;
unsigned long nomsg_call_count;
unsigned long bcall_count;
@@ -473,8 +486,6 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mw **);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,
struct list_head *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool);
void (*ro_recover_mr)(struct rpcrdma_mw *);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
@@ -532,6 +543,8 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
bool frwr_is_supported(struct rpcrdma_ia *);
bool fmr_is_supported(struct rpcrdma_ia *);
extern struct workqueue_struct *rpcrdma_receive_wq;
/*
* Endpoint calls - xprtrdma/verbs.c
*/
@@ -554,6 +567,8 @@ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
@@ -610,12 +625,18 @@ enum rpcrdma_chunktype {
rpcrdma_replych
};
bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
u32, struct xdr_buf *, enum rpcrdma_chunktype);
void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype);
void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_reply_handler(struct work_struct *work);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req);
void rpcrdma_deferred_completion(struct work_struct *work);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{

View File

@@ -552,6 +552,7 @@ static int xs_local_send_request(struct rpc_task *task)
default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
/* fall through */
case -EPIPE:
xs_close(xprt);
status = -ENOTCONN;
@@ -1611,6 +1612,7 @@ static void xs_tcp_state_change(struct sock *sk)
xprt->connect_cookie++;
clear_bit(XPRT_CONNECTED, &xprt->state);
xs_tcp_force_close(xprt);
/* fall through */
case TCP_CLOSING:
/*
* If the server closed down the connection, make sure that
@@ -2368,6 +2370,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
switch (ret) {
case 0:
xs_set_srcport(transport, sock);
/* fall through */
case -EINPROGRESS:
/* SYN_SENT! */
if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
@@ -2419,6 +2422,7 @@ static void xs_tcp_setup_socket(struct work_struct *work)
default:
printk("%s: connect returned unhandled error %d\n",
__func__, status);
/* fall through */
case -EADDRNOTAVAIL:
/* We're probably in TIME_WAIT. Get rid of existing socket,
* and retry