Merge tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

Pull NFS client updates from Anna Schumaker:
 "Stable bugfixes:
   - Dequeue the request from the receive queue while we're re-encoding
     # v4.20+
   - Fix buffer handling of GSS MIC without slack # 5.1

  Features:
   - Increase xprtrdma maximum transport header and slot table sizes
   - Add support for nfs4_call_sync() calls using a custom
     rpc_task_struct
   - Optimize the default readahead size
   - Enable pNFS filelayout LAYOUTGET on OPEN

  Other bugfixes and cleanups:
   - Fix possible null-pointer dereferences and memory leaks
   - Various NFS over RDMA cleanups
   - Various NFS over RDMA comment updates
   - Don't receive TCP data into a reset request buffer
   - Don't try to parse incomplete RPC messages
   - Fix congestion window race with disconnect
   - Clean up pNFS return-on-close error handling
   - Fixes for NFS4ERR_OLD_STATEID handling"

* tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (53 commits)
  pNFS/filelayout: enable LAYOUTGET on OPEN
  NFS: Optimise the default readahead size
  NFSv4: Handle NFS4ERR_OLD_STATEID in LOCKU
  NFSv4: Handle NFS4ERR_OLD_STATEID in CLOSE/OPEN_DOWNGRADE
  NFSv4: Fix OPEN_DOWNGRADE error handling
  pNFS: Handle NFS4ERR_OLD_STATEID on layoutreturn by bumping the state seqid
  NFSv4: Add a helper to increment stateid seqids
  NFSv4: Handle RPC level errors in LAYOUTRETURN
  NFSv4: Handle NFS4ERR_DELAY correctly in return-on-close
  NFSv4: Clean up pNFS return-on-close error handling
  pNFS: Ensure we do clear the return-on-close layout stateid on fatal errors
  NFS: remove unused check for negative dentry
  NFSv3: use nfs_add_or_obtain() to create and reference inodes
  NFS: Refactor nfs_instantiate() for dentry referencing callers
  SUNRPC: Fix congestion window race with disconnect
  SUNRPC: Don't try to parse incomplete RPC messages
  SUNRPC: Rename xdr_buf_read_netobj to xdr_buf_read_mic
  SUNRPC: Fix buffer handling of GSS MIC without slack
  SUNRPC: RPC level errors should always set task->tk_rpc_status
  SUNRPC: Don't receive TCP data into a request buffer that has been reset
  ...
This commit is contained in:
Linus Torvalds
2019-09-26 12:20:14 -07:00
29 changed files with 834 additions and 579 deletions

View File

@@ -1960,7 +1960,7 @@ gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred,
if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset, integ_len))
goto unwrap_failed;
if (xdr_buf_read_netobj(rcv_buf, &mic, mic_offset))
if (xdr_buf_read_mic(rcv_buf, &mic, mic_offset))
goto unwrap_failed;
maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &integ_buf, &mic);
if (maj_stat == GSS_S_CONTEXT_EXPIRED)

View File

@@ -1837,7 +1837,7 @@ call_allocate(struct rpc_task *task)
return;
}
rpc_exit(task, -ERESTARTSYS);
rpc_call_rpcerror(task, -ERESTARTSYS);
}
static int
@@ -1862,6 +1862,7 @@ rpc_xdr_encode(struct rpc_task *task)
req->rq_rbuffer,
req->rq_rcvsize);
req->rq_reply_bytes_recvd = 0;
req->rq_snd_buf.head[0].iov_len = 0;
xdr_init_encode(&xdr, &req->rq_snd_buf,
req->rq_snd_buf.head[0].iov_base, req);
@@ -1881,6 +1882,8 @@ call_encode(struct rpc_task *task)
if (!rpc_task_need_encode(task))
goto out;
dprint_status(task);
/* Dequeue task from the receive queue while we're encoding */
xprt_request_dequeue_xprt(task);
/* Encode here so that rpcsec_gss can use correct sequence number. */
rpc_xdr_encode(task);
/* Did the encode result in an error condition? */
@@ -2479,6 +2482,7 @@ call_decode(struct rpc_task *task)
struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp;
struct xdr_stream xdr;
int err;
dprint_status(task);
@@ -2501,6 +2505,15 @@ call_decode(struct rpc_task *task)
* before it changed req->rq_reply_bytes_recvd.
*/
smp_rmb();
/*
* Did we ever call xprt_complete_rqst()? If not, we should assume
* the message is incomplete.
*/
err = -EAGAIN;
if (!req->rq_reply_bytes_recvd)
goto out;
req->rq_rcv_buf.len = req->rq_private_buf.len;
/* Check that the softirq receive buffer is valid */
@@ -2509,7 +2522,9 @@ call_decode(struct rpc_task *task)
xdr_init_decode(&xdr, &req->rq_rcv_buf,
req->rq_rcv_buf.head[0].iov_base, req);
switch (rpc_decode_header(task, &xdr)) {
err = rpc_decode_header(task, &xdr);
out:
switch (err) {
case 0:
task->tk_action = rpc_exit_task;
task->tk_status = rpcauth_unwrap_resp(task, &xdr);
@@ -2518,9 +2533,6 @@ call_decode(struct rpc_task *task)
return;
case -EAGAIN:
task->tk_status = 0;
xdr_free_bvec(&req->rq_rcv_buf);
req->rq_reply_bytes_recvd = 0;
req->rq_rcv_buf.len = 0;
if (task->tk_client->cl_discrtry)
xprt_conditional_disconnect(req->rq_xprt,
req->rq_connect_cookie);
@@ -2561,7 +2573,7 @@ rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
return 0;
out_fail:
trace_rpc_bad_callhdr(task);
rpc_exit(task, error);
rpc_call_rpcerror(task, error);
return error;
}
@@ -2628,7 +2640,7 @@ out_garbage:
return -EAGAIN;
}
out_err:
rpc_exit(task, error);
rpc_call_rpcerror(task, error);
return error;
out_unparsable:

View File

@@ -541,33 +541,14 @@ rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
return NULL;
}
static void
rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
}
/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}
/*
* Wake up a task on a specific queue
*/
void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{
if (!RPC_IS_QUEUED(task))
return;
spin_lock(&queue->lock);
rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
spin_unlock(&queue->lock);
rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
task, NULL, NULL);
}
/*
@@ -930,8 +911,10 @@ static void __rpc_execute(struct rpc_task *task)
/*
* Signalled tasks should exit rather than sleep.
*/
if (RPC_SIGNALLED(task))
if (RPC_SIGNALLED(task)) {
task->tk_rpc_status = -ERESTARTSYS;
rpc_exit(task, -ERESTARTSYS);
}
/*
* The queue->lock protects against races with
@@ -967,6 +950,7 @@ static void __rpc_execute(struct rpc_task *task)
*/
dprintk("RPC: %5u got signal\n", task->tk_pid);
set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
task->tk_rpc_status = -ERESTARTSYS;
rpc_exit(task, -ERESTARTSYS);
}
dprintk("RPC: %5u sync task resuming\n", task->tk_pid);

View File

@@ -560,7 +560,7 @@ EXPORT_SYMBOL_GPL(xdr_init_encode);
* required at the end of encoding, or any other time when the xdr_buf
* data might be read.
*/
void xdr_commit_encode(struct xdr_stream *xdr)
inline void xdr_commit_encode(struct xdr_stream *xdr)
{
int shift = xdr->scratch.iov_len;
void *page;
@@ -1236,43 +1236,60 @@ xdr_encode_word(struct xdr_buf *buf, unsigned int base, u32 obj)
}
EXPORT_SYMBOL_GPL(xdr_encode_word);
/* If the netobj starting offset bytes from the start of xdr_buf is contained
* entirely in the head or the tail, set object to point to it; otherwise
* try to find space for it at the end of the tail, copy it there, and
* set obj to point to it. */
int xdr_buf_read_netobj(struct xdr_buf *buf, struct xdr_netobj *obj, unsigned int offset)
/**
* xdr_buf_read_mic() - obtain the address of the GSS mic from xdr buf
* @buf: pointer to buffer containing a mic
* @mic: on success, returns the address of the mic
* @offset: the offset in buf where mic may be found
*
* This function may modify the xdr buf if the mic is found to be straddling
* a boundary between head, pages, and tail. On success the mic can be read
* from the address returned. There is no need to free the mic.
*
* Return: Success returns 0, otherwise an integer error.
*/
int xdr_buf_read_mic(struct xdr_buf *buf, struct xdr_netobj *mic, unsigned int offset)
{
struct xdr_buf subbuf;
unsigned int boundary;
if (xdr_decode_word(buf, offset, &obj->len))
if (xdr_decode_word(buf, offset, &mic->len))
return -EFAULT;
if (xdr_buf_subsegment(buf, &subbuf, offset + 4, obj->len))
offset += 4;
/* Is the mic partially in the head? */
boundary = buf->head[0].iov_len;
if (offset < boundary && (offset + mic->len) > boundary)
xdr_shift_buf(buf, boundary - offset);
/* Is the mic partially in the pages? */
boundary += buf->page_len;
if (offset < boundary && (offset + mic->len) > boundary)
xdr_shrink_pagelen(buf, boundary - offset);
if (xdr_buf_subsegment(buf, &subbuf, offset, mic->len))
return -EFAULT;
/* Is the obj contained entirely in the head? */
obj->data = subbuf.head[0].iov_base;
if (subbuf.head[0].iov_len == obj->len)
/* Is the mic contained entirely in the head? */
mic->data = subbuf.head[0].iov_base;
if (subbuf.head[0].iov_len == mic->len)
return 0;
/* ..or is the obj contained entirely in the tail? */
obj->data = subbuf.tail[0].iov_base;
if (subbuf.tail[0].iov_len == obj->len)
/* ..or is the mic contained entirely in the tail? */
mic->data = subbuf.tail[0].iov_base;
if (subbuf.tail[0].iov_len == mic->len)
return 0;
/* use end of tail as storage for obj:
* (We don't copy to the beginning because then we'd have
* to worry about doing a potentially overlapping copy.
* This assumes the object is at most half the length of the
* tail.) */
if (obj->len > buf->buflen - buf->len)
/* Find a contiguous area in @buf to hold all of @mic */
if (mic->len > buf->buflen - buf->len)
return -ENOMEM;
if (buf->tail[0].iov_len != 0)
obj->data = buf->tail[0].iov_base + buf->tail[0].iov_len;
mic->data = buf->tail[0].iov_base + buf->tail[0].iov_len;
else
obj->data = buf->head[0].iov_base + buf->head[0].iov_len;
__read_bytes_from_xdr_buf(&subbuf, obj->data, obj->len);
mic->data = buf->head[0].iov_base + buf->head[0].iov_len;
__read_bytes_from_xdr_buf(&subbuf, mic->data, mic->len);
return 0;
}
EXPORT_SYMBOL_GPL(xdr_buf_read_netobj);
EXPORT_SYMBOL_GPL(xdr_buf_read_mic);
/* Returns 0 on success, or else a negative error code. */
static int

View File

@@ -456,6 +456,12 @@ void xprt_release_rqst_cong(struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
{
if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
__xprt_lock_write_next_cong(xprt);
}
/*
* Clear the congestion window wait flag and wake up the next
* entry on xprt->sending
@@ -671,6 +677,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt)
spin_lock(&xprt->transport_lock);
xprt_clear_connected(xprt);
xprt_clear_write_space_locked(xprt);
xprt_clear_congestion_window_wait_locked(xprt);
xprt_wake_pending_tasks(xprt, -ENOTCONN);
spin_unlock(&xprt->transport_lock);
}
@@ -1323,6 +1330,36 @@ xprt_request_dequeue_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock);
}
/**
* xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
* @task: pointer to rpc_task
*
* Remove a task from the transmit and receive queues, and ensure that
* it is not pinned by the receive work item.
*/
void
xprt_request_dequeue_xprt(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
xprt_is_pinned_rqst(req)) {
spin_lock(&xprt->queue_lock);
xprt_request_dequeue_transmit_locked(task);
xprt_request_dequeue_receive_locked(task);
while (xprt_is_pinned_rqst(req)) {
set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
spin_unlock(&xprt->queue_lock);
xprt_wait_on_pinned_rqst(req);
spin_lock(&xprt->queue_lock);
clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
}
spin_unlock(&xprt->queue_lock);
}
}
/**
* xprt_request_prepare - prepare an encoded request for transport
* @req: pointer to rpc_rqst
@@ -1747,28 +1784,6 @@ void xprt_retry_reserve(struct rpc_task *task)
xprt_do_reserve(xprt, task);
}
static void
xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
{
struct rpc_xprt *xprt = req->rq_xprt;
if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
xprt_is_pinned_rqst(req)) {
spin_lock(&xprt->queue_lock);
xprt_request_dequeue_transmit_locked(task);
xprt_request_dequeue_receive_locked(task);
while (xprt_is_pinned_rqst(req)) {
set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
spin_unlock(&xprt->queue_lock);
xprt_wait_on_pinned_rqst(req);
spin_lock(&xprt->queue_lock);
clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
}
spin_unlock(&xprt->queue_lock);
}
}
/**
* xprt_release - release an RPC request slot
* @task: task which is finished with the slot
@@ -1788,7 +1803,7 @@ void xprt_release(struct rpc_task *task)
}
xprt = req->rq_xprt;
xprt_request_dequeue_all(task, req);
xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)

View File

@@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
return r_xprt->rx_buf.rb_bc_srv_max_requests;
return RPCRDMA_BACKWARD_WRS >> 1;
}
static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)

View File

@@ -7,67 +7,37 @@
/* Lightweight memory registration using Fast Registration Work
* Requests (FRWR).
*
* FRWR features ordered asynchronous registration and deregistration
* of arbitrarily sized memory regions. This is the fastest and safest
* FRWR features ordered asynchronous registration and invalidation
* of arbitrarily-sized memory regions. This is the fastest and safest
* but most complex memory registration mode.
*/
/* Normal operation
*
* A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
* A Memory Region is prepared for RDMA Read or Write using a FAST_REG
* Work Request (frwr_map). When the RDMA operation is finished, this
* Memory Region is invalidated using a LOCAL_INV Work Request
* (frwr_unmap_sync).
* (frwr_unmap_async and frwr_unmap_sync).
*
* Typically these Work Requests are not signaled, and neither are RDMA
* SEND Work Requests (with the exception of signaling occasionally to
* prevent provider work queue overflows). This greatly reduces HCA
* Typically FAST_REG Work Requests are not signaled, and neither are
* RDMA Send Work Requests (with the exception of signaling occasionally
* to prevent provider work queue overflows). This greatly reduces HCA
* interrupt workload.
*
* As an optimization, frwr_unmap marks MRs INVALID before the
* LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
* rb_mrs immediately so that no work (like managing a linked list
* under a spinlock) is needed in the completion upcall.
*
* But this means that frwr_map() can occasionally encounter an MR
* that is INVALID but the LOCAL_INV WR has not completed. Work Queue
* ordering prevents a subsequent FAST_REG WR from executing against
* that MR while it is still being invalidated.
*/
/* Transport recovery
*
* ->op_map and the transport connect worker cannot run at the same
* time, but ->op_unmap can fire while the transport connect worker
* is running. Thus MR recovery is handled in ->op_map, to guarantee
* that recovered MRs are owned by a sending RPC, and not one where
* ->op_unmap could fire at the same time transport reconnect is
* being done.
* frwr_map and frwr_unmap_* cannot run at the same time the transport
* connect worker is running. The connect worker holds the transport
* send lock, just as ->send_request does. This prevents frwr_map and
* the connect worker from running concurrently. When a connection is
* closed, the Receive completion queue is drained before the allowing
* the connect worker to get control. This prevents frwr_unmap and the
* connect worker from running concurrently.
*
* When the underlying transport disconnects, MRs are left in one of
* four states:
*
* INVALID: The MR was not in use before the QP entered ERROR state.
*
* VALID: The MR was registered before the QP entered ERROR state.
*
* FLUSHED_FR: The MR was being registered when the QP entered ERROR
* state, and the pending WR was flushed.
*
* FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
* state, and the pending WR was flushed.
*
* When frwr_map encounters FLUSHED and VALID MRs, they are recovered
* with ib_dereg_mr and then are re-initialized. Because MR recovery
* allocates fresh resources, it is deferred to a workqueue, and the
* recovered MRs are placed back on the rb_mrs list when recovery is
* complete. frwr_map allocates another MR for the current RPC while
* the broken MR is reset.
*
* To ensure that frwr_map doesn't encounter an MR that is marked
* INVALID but that is about to be flushed due to a previous transport
* disconnect, the transport connect worker attempts to drain all
* pending send queue WRs before the transport is reconnected.
* When the underlying transport disconnects, MRs that are in flight
* are flushed and are likely unusable. Thus all flushed MRs are
* destroyed. New MRs are created on demand.
*/
#include <linux/sunrpc/rpc_rdma.h>
@@ -118,15 +88,8 @@ void frwr_release_mr(struct rpcrdma_mr *mr)
kfree(mr);
}
/* MRs are dynamically allocated, so simply clean up and release the MR.
* A replacement MR will subsequently be allocated on demand.
*/
static void
frwr_mr_recycle_worker(struct work_struct *work)
static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
{
struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
trace_xprtrdma_mr_recycle(mr);
if (mr->mr_dir != DMA_NONE) {
@@ -136,14 +99,40 @@ frwr_mr_recycle_worker(struct work_struct *work)
mr->mr_dir = DMA_NONE;
}
spin_lock(&r_xprt->rx_buf.rb_mrlock);
spin_lock(&r_xprt->rx_buf.rb_lock);
list_del(&mr->mr_all);
r_xprt->rx_stats.mrs_recycled++;
spin_unlock(&r_xprt->rx_buf.rb_mrlock);
spin_unlock(&r_xprt->rx_buf.rb_lock);
frwr_release_mr(mr);
}
/* MRs are dynamically allocated, so simply clean up and release the MR.
* A replacement MR will subsequently be allocated on demand.
*/
static void
frwr_mr_recycle_worker(struct work_struct *work)
{
struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr,
mr_recycle);
frwr_mr_recycle(mr->mr_xprt, mr);
}
/* frwr_recycle - Discard MRs
* @req: request to reset
*
* Used after a reconnect. These MRs could be in flight, we can't
* tell. Safe thing to do is release them.
*/
void frwr_recycle(struct rpcrdma_req *req)
{
struct rpcrdma_mr *mr;
while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
frwr_mr_recycle(mr->mr_xprt, mr);
}
/* frwr_reset - Place MRs back on the free list
* @req: request to reset
*
@@ -156,12 +145,10 @@ frwr_mr_recycle_worker(struct work_struct *work)
*/
void frwr_reset(struct rpcrdma_req *req)
{
while (!list_empty(&req->rl_registered)) {
struct rpcrdma_mr *mr;
struct rpcrdma_mr *mr;
mr = rpcrdma_mr_pop(&req->rl_registered);
rpcrdma_mr_unmap_and_put(mr);
}
while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
rpcrdma_mr_put(mr);
}
/**
@@ -179,11 +166,14 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
struct ib_mr *frmr;
int rc;
/* NB: ib_alloc_mr and device drivers typically allocate
* memory with GFP_KERNEL.
*/
frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
if (IS_ERR(frmr))
goto out_mr_err;
sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
if (!sg)
goto out_list_err;
@@ -203,8 +193,6 @@ out_mr_err:
return rc;
out_list_err:
dprintk("RPC: %s: sg allocation failure\n",
__func__);
ib_dereg_mr(frmr);
return -ENOMEM;
}
@@ -290,8 +278,8 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
ia->ri_max_frwr_depth);
ia->ri_max_segs =
DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
/* Reply chunks require segments for head and tail buffers */
ia->ri_max_segs += 2;
if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
@@ -323,31 +311,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
* @nsegs: number of segments remaining
* @writing: true when RDMA Write will be used
* @xid: XID of RPC using the registered memory
* @out: initialized MR
* @mr: MR to fill in
*
* Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*
* Returns the next segment or a negative errno pointer.
* On success, the prepared MR is planted in @out.
* On success, @mr is filled in.
*/
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
struct rpcrdma_mr **out)
struct rpcrdma_mr *mr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
struct rpcrdma_mr *mr;
struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
struct ib_mr *ibmr;
int i, n;
u8 key;
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
goto out_getmr_err;
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
for (i = 0; i < nsegs;) {
@@ -362,7 +344,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
++seg;
++i;
if (holes_ok)
if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
continue;
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
@@ -397,22 +379,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
mr->mr_offset = ibmr->iova;
trace_xprtrdma_mr_map(mr);
*out = mr;
return seg;
out_getmr_err:
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
return ERR_PTR(-EAGAIN);
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
rpcrdma_mr_put(mr);
return ERR_PTR(-EIO);
out_mapmr_err:
trace_xprtrdma_frwr_maperr(mr, n);
rpcrdma_mr_recycle(mr);
return ERR_PTR(-EIO);
}
@@ -485,7 +460,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
trace_xprtrdma_mr_remoteinv(mr);
rpcrdma_mr_unmap_and_put(mr);
rpcrdma_mr_put(mr);
break; /* only one invalidated MR per RPC */
}
}
@@ -495,7 +470,7 @@ static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
if (wc->status != IB_WC_SUCCESS)
rpcrdma_mr_recycle(mr);
else
rpcrdma_mr_unmap_and_put(mr);
rpcrdma_mr_put(mr);
}
/**
@@ -532,8 +507,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_wake(wc, frwr);
complete(&frwr->fr_linv_done);
__frwr_release_mr(wc, mr);
complete(&frwr->fr_linv_done);
}
/**
@@ -562,8 +537,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
while (!list_empty(&req->rl_registered)) {
mr = rpcrdma_mr_pop(&req->rl_registered);
while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
@@ -632,11 +606,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
struct rpcrdma_frwr *frwr =
container_of(cqe, struct rpcrdma_frwr, fr_cqe);
struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_done(wc, frwr);
rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
__frwr_release_mr(wc, mr);
/* Ensure @rep is generated before __frwr_release_mr */
smp_rmb();
rpcrdma_complete_rqst(rep);
}
/**
@@ -662,15 +640,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
while (!list_empty(&req->rl_registered)) {
mr = rpcrdma_mr_pop(&req->rl_registered);
while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
frwr = &mr->frwr;
frwr->fr_cqe.done = frwr_wc_localinv;
frwr->fr_req = req;
last = &frwr->fr_invwr;
last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;

View File

@@ -342,6 +342,32 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return 0;
}
static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing,
struct rpcrdma_mr **mr)
{
*mr = rpcrdma_mr_pop(&req->rl_free_mrs);
if (!*mr) {
*mr = rpcrdma_mr_get(r_xprt);
if (!*mr)
goto out_getmr_err;
trace_xprtrdma_mr_get(req);
(*mr)->mr_req = req;
}
rpcrdma_mr_push(*mr, &req->rl_registered);
return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
out_getmr_err:
trace_xprtrdma_nomrs(req);
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
if (r_xprt->rx_ep.rep_connected != -ENODEV)
schedule_work(&r_xprt->rx_buf.rb_refresh_worker);
return ERR_PTR(-EAGAIN);
}
/* Register and XDR encode the Read list. Supports encoding a list of read
* segments that belong to a single read chunk.
*
@@ -356,9 +382,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
*
* Only a single @pos value is currently supported.
*/
static noinline int
rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct rpc_rqst *rqst,
enum rpcrdma_chunktype rtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -379,10 +406,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return nsegs;
do {
seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
@@ -411,9 +437,10 @@ done:
*
* Only a single Write chunk is currently supported.
*/
static noinline int
rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct rpc_rqst *rqst,
enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -440,10 +467,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
@@ -474,9 +500,10 @@ done:
* Returns zero on success, or a negative errno if a failure occurred.
* @xdr is advanced to the next position in the stream.
*/
static noinline int
rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct rpc_rqst *rqst,
enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -501,10 +528,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
@@ -841,12 +867,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* chunks. Very likely the connection has been replaced,
* so these registrations are invalid and unusable.
*/
while (unlikely(!list_empty(&req->rl_registered))) {
struct rpcrdma_mr *mr;
mr = rpcrdma_mr_pop(&req->rl_registered);
rpcrdma_mr_recycle(mr);
}
frwr_recycle(req);
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
@@ -1240,8 +1261,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
struct rpc_rqst *rqst = rep->rr_rqst;
int status;
xprt->reestablish_timeout = 0;
switch (rep->rr_proc) {
case rdma_msg:
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
@@ -1300,6 +1319,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
u32 credits;
__be32 *p;
/* Any data means we had a useful conversation, so
* then we don't need to delay the next reconnect.
*/
if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0;
/* Fixed transport header fields */
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base, NULL);

View File

@@ -423,8 +423,6 @@ void xprt_rdma_close(struct rpc_xprt *xprt)
if (ep->rep_connected == -ENODEV)
return;
if (ep->rep_connected > 0)
xprt->reestablish_timeout = 0;
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
@@ -434,6 +432,7 @@ void xprt_rdma_close(struct rpc_xprt *xprt)
xprt->cwnd = RPC_CWNDSHIFT;
out:
xprt->reestablish_timeout = 0;
++xprt->connect_cookie;
xprt_disconnect_done(xprt);
}
@@ -494,9 +493,9 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
* @reconnect_timeout: reconnect timeout after server disconnects
*
*/
static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
unsigned long connect_timeout,
unsigned long reconnect_timeout)
static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
unsigned long connect_timeout,
unsigned long reconnect_timeout)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -571,6 +570,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return;
out_sleep:
set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
}
@@ -589,7 +589,8 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
rpc_wake_up_next(&xprt->backlog);
if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
clear_bit(XPRT_CONGESTED, &xprt->state);
}
static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
@@ -803,7 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
.set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
.set_connect_timeout = xprt_rdma_set_connect_timeout,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,

View File

@@ -53,6 +53,7 @@
#include <linux/slab.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
#include <linux/log2.h>
#include <asm-generic/barrier.h>
#include <asm/bitops.h>
@@ -74,8 +75,10 @@
* internal functions
*/
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
@@ -405,9 +408,8 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
struct rpcrdma_rep *rep;
cancel_delayed_work_sync(&buf->rb_refresh_worker);
cancel_work_sync(&buf->rb_refresh_worker);
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
@@ -429,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
*/
list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
rpcrdma_reps_destroy(buf);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
@@ -604,10 +605,10 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
* Unlike a normal reconnection, a fresh PD and a new set
* of MRs and buffers is needed.
*/
static int
rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
struct ib_qp_init_attr *qp_init_attr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
int rc, err;
trace_xprtrdma_reinsert(r_xprt);
@@ -624,7 +625,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
}
rc = -ENETUNREACH;
err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
if (err) {
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
@@ -641,16 +642,16 @@ out1:
return rc;
}
static int
rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
struct rpcrdma_ia *ia)
static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
struct ib_qp_init_attr *qp_init_attr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rdma_cm_id *id, *old;
int err, rc;
trace_xprtrdma_reconnect(r_xprt);
rpcrdma_ep_disconnect(ep, ia);
rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
id = rpcrdma_create_id(r_xprt, ia);
@@ -672,7 +673,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
goto out_destroy;
}
err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
if (err)
goto out_destroy;
@@ -697,25 +698,27 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct ib_qp_init_attr qp_init_attr;
int rc;
retry:
memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
rc = -ENETUNREACH;
goto out_noupdate;
}
break;
case -ENODEV:
rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
if (rc)
goto out_noupdate;
break;
default:
rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
if (rc)
goto out;
}
@@ -729,6 +732,8 @@ retry:
if (rc)
goto out;
if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
if (ep->rep_connected <= 0) {
if (ep->rep_connected == -EAGAIN)
@@ -942,14 +947,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
unsigned int count;
LIST_HEAD(free);
LIST_HEAD(all);
for (count = 0; count < ia->ri_max_segs; count++) {
struct rpcrdma_mr *mr;
int rc;
mr = kzalloc(sizeof(*mr), GFP_KERNEL);
mr = kzalloc(sizeof(*mr), GFP_NOFS);
if (!mr)
break;
@@ -961,15 +964,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
mr->mr_xprt = r_xprt;
list_add(&mr->mr_list, &free);
list_add(&mr->mr_all, &all);
spin_lock(&buf->rb_lock);
list_add(&mr->mr_list, &buf->rb_mrs);
list_add(&mr->mr_all, &buf->rb_all_mrs);
spin_unlock(&buf->rb_lock);
}
spin_lock(&buf->rb_mrlock);
list_splice(&free, &buf->rb_mrs);
list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
}
@@ -977,7 +978,7 @@ static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
rb_refresh_worker.work);
rb_refresh_worker);
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
@@ -999,12 +1000,18 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t maxhdrsize;
req = kzalloc(sizeof(*req), flags);
if (req == NULL)
goto out1;
rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE, flags);
if (!rb)
goto out2;
req->rl_rdmabuf = rb;
@@ -1018,6 +1025,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
if (!req->rl_recvbuf)
goto out4;
INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1065,6 +1073,40 @@ out:
return NULL;
}
static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
{
struct llist_node *node;
/* Calls to llist_del_first are required to be serialized */
node = llist_del_first(&buf->rb_free_reps);
if (!node)
return NULL;
return llist_entry(node, struct rpcrdma_rep, rr_node);
}
static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
struct rpcrdma_rep *rep)
{
if (!rep->rr_temp)
llist_add(&rep->rr_node, &buf->rb_free_reps);
else
rpcrdma_rep_destroy(rep);
}
static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_rep *rep;
while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
rpcrdma_rep_destroy(rep);
}
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
@@ -1078,12 +1120,10 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
INIT_LIST_HEAD(&buf->rb_all);
INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker);
INIT_LIST_HEAD(&buf->rb_all_mrs);
INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
rpcrdma_mrs_create(r_xprt);
@@ -1102,7 +1142,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
}
buf->rb_credits = 1;
INIT_LIST_HEAD(&buf->rb_recv_bufs);
init_llist_head(&buf->rb_free_reps);
rc = rpcrdma_sendctxs_create(r_xprt);
if (rc)
@@ -1114,12 +1154,6 @@ out:
return rc;
}
static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
{
rpcrdma_regbuf_free(rep->rr_rdmabuf);
kfree(rep);
}
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
@@ -1127,11 +1161,13 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
* This function assumes that the caller prevents concurrent device
* unload and transport tear-down.
*/
void
rpcrdma_req_destroy(struct rpcrdma_req *req)
void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
list_del(&req->rl_all);
while (!list_empty(&req->rl_free_mrs))
rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
@@ -1147,25 +1183,19 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
unsigned int count;
count = 0;
spin_lock(&buf->rb_mrlock);
while (!list_empty(&buf->rb_all)) {
mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
spin_lock(&buf->rb_lock);
while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
struct rpcrdma_mr,
mr_all)) != NULL) {
list_del(&mr->mr_all);
spin_unlock(&buf->rb_mrlock);
/* Ensure MW is not on any rl_registered list */
if (!list_empty(&mr->mr_list))
list_del(&mr->mr_list);
spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
count++;
spin_lock(&buf->rb_mrlock);
spin_lock(&buf->rb_lock);
}
spin_unlock(&buf->rb_mrlock);
spin_unlock(&buf->rb_lock);
r_xprt->rx_stats.mrs_allocated = 0;
dprintk("RPC: %s: released %u MRs\n", __func__, count);
}
/**
@@ -1179,18 +1209,10 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
cancel_delayed_work_sync(&buf->rb_refresh_worker);
cancel_work_sync(&buf->rb_refresh_worker);
rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
rep = list_first_entry(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
rpcrdma_rep_destroy(rep);
}
rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
@@ -1215,54 +1237,20 @@ struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mr *mr = NULL;
struct rpcrdma_mr *mr;
spin_lock(&buf->rb_mrlock);
if (!list_empty(&buf->rb_mrs))
mr = rpcrdma_mr_pop(&buf->rb_mrs);
spin_unlock(&buf->rb_mrlock);
if (!mr)
goto out_nomrs;
spin_lock(&buf->rb_lock);
mr = rpcrdma_mr_pop(&buf->rb_mrs);
spin_unlock(&buf->rb_lock);
return mr;
out_nomrs:
trace_xprtrdma_nomrs(r_xprt);
if (r_xprt->rx_ep.rep_connected != -ENODEV)
schedule_delayed_work(&buf->rb_refresh_worker, 0);
/* Allow the reply handler and refresh worker to run */
cond_resched();
return NULL;
}
static void
__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
{
spin_lock(&buf->rb_mrlock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
spin_unlock(&buf->rb_mrlock);
}
/**
* rpcrdma_mr_put - Release an rpcrdma_mr object
* @mr: object to release
* rpcrdma_mr_put - DMA unmap an MR and release it
* @mr: MR to release
*
*/
void
rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}
/**
* rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
* @mr: object to release
*
*/
void
rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
@@ -1272,7 +1260,19 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
}
static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
mr->mr_req = NULL;
spin_lock(&buf->rb_lock);
rpcrdma_mr_push(mr, &buf->rb_mrs);
spin_unlock(&buf->rb_lock);
}
/**
@@ -1303,39 +1303,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
struct rpcrdma_rep *rep = req->rl_reply;
if (req->rl_reply)
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
if (rep) {
if (!rep->rr_temp) {
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
rep = NULL;
}
}
spin_unlock(&buffers->rb_lock);
if (rep)
rpcrdma_rep_destroy(rep);
}
/*
* Put reply buffers back into pool when not attached to
* request. This happens in error conditions.
/**
* rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
* @rep: rep to release
*
* Used after error conditions.
*/
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
if (!rep->rr_temp) {
spin_lock(&buffers->rb_lock);
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
} else {
rpcrdma_rep_destroy(rep);
}
rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
}
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@ -1483,7 +1468,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (ep->rep_receive_count > needed)
if (likely(ep->rep_receive_count > needed))
goto out;
needed -= ep->rep_receive_count;
if (!temp)
@@ -1491,22 +1476,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
/* fast path: all needed reps can be found on the free list */
wr = NULL;
spin_lock(&buf->rb_lock);
while (needed) {
rep = list_first_entry_or_null(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list);
rep = rpcrdma_rep_get_locked(buf);
if (!rep)
break;
list_del(&rep->rr_list);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
--needed;
}
spin_unlock(&buf->rb_lock);
while (needed) {
rep = rpcrdma_rep_create(r_xprt, temp);
rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep)
break;
@@ -1523,7 +1496,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
goto release_wrs;
trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
trace_xprtrdma_post_recv(rep);
++count;
}

View File

@@ -47,6 +47,7 @@
#include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */
#include <linux/llist.h>
#include <rdma/rdma_cm.h> /* RDMA connection api */
#include <rdma/ib_verbs.h> /* RDMA verbs api */
@@ -117,9 +118,6 @@ struct rpcrdma_ep {
#endif
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
*
* The below structure appears at the front of a large region of kmalloc'd
* memory, which always starts on a good alignment boundary.
*/
struct rpcrdma_regbuf {
@@ -158,25 +156,22 @@ static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)
/* To ensure a transport can always make forward progress,
* the number of RDMA segments allowed in header chunk lists
* is capped at 8. This prevents less-capable devices and
* memory registrations from overrunning the Send buffer
* while building chunk lists.
* is capped at 16. This prevents less-capable devices from
* overrunning the Send buffer while building chunk lists.
*
* Elements of the Read list take up more room than the
* Write list or Reply chunk. 8 read segments means the Read
* list (or Write list or Reply chunk) cannot consume more
* than
* Write list or Reply chunk. 16 read segments means the
* chunk lists cannot consume more than
*
* ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
* ((16 + 2) * read segment size) + 1 XDR words,
*
* And the fixed part of the header is another 24 bytes.
*
* The smallest inline threshold is 1024 bytes, ensuring that
* at least 750 bytes are available for RPC messages.
* or about 400 bytes. The fixed part of the header is
* another 24 bytes. Thus when the inline threshold is
* 1024 bytes, at least 600 bytes are available for RPC
* message bodies.
*/
enum {
RPCRDMA_MAX_HDR_SEGS = 8,
RPCRDMA_HDRBUF_SIZE = 256,
RPCRDMA_MAX_HDR_SEGS = 16,
};
/*
@@ -206,7 +201,7 @@ struct rpcrdma_rep {
struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
struct list_head rr_list;
struct llist_node rr_node;
struct ib_recv_wr rr_recv_wr;
};
@@ -240,20 +235,20 @@ struct rpcrdma_sendctx {
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
*/
struct rpcrdma_req;
struct rpcrdma_frwr {
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
struct completion fr_linv_done;
struct rpcrdma_req *fr_req;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
};
};
struct rpcrdma_req;
struct rpcrdma_mr {
struct list_head mr_list;
struct rpcrdma_req *mr_req;
struct scatterlist *mr_sg;
int mr_nents;
enum dma_data_direction mr_dir;
@@ -331,7 +326,8 @@ struct rpcrdma_req {
struct list_head rl_all;
struct kref rl_kref;
struct list_head rl_registered; /* registered segments */
struct list_head rl_free_mrs;
struct list_head rl_registered;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
@@ -344,7 +340,7 @@ rpcr_to_rdmar(const struct rpc_rqst *rqst)
static inline void
rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list)
{
list_add_tail(&mr->mr_list, list);
list_add(&mr->mr_list, list);
}
static inline struct rpcrdma_mr *
@@ -352,8 +348,9 @@ rpcrdma_mr_pop(struct list_head *list)
{
struct rpcrdma_mr *mr;
mr = list_first_entry(list, struct rpcrdma_mr, mr_list);
list_del_init(&mr->mr_list);
mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list);
if (mr)
list_del_init(&mr->mr_list);
return mr;
}
@@ -364,19 +361,19 @@ rpcrdma_mr_pop(struct list_head *list)
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
spinlock_t rb_mrlock; /* protect rb_mrs list */
spinlock_t rb_lock;
struct list_head rb_send_bufs;
struct list_head rb_mrs;
struct list_head rb_all;
unsigned long rb_sc_head;
unsigned long rb_sc_tail;
unsigned long rb_sc_last;
struct rpcrdma_sendctx **rb_sc_ctxs;
spinlock_t rb_lock; /* protect buf lists */
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
struct list_head rb_allreqs;
struct list_head rb_all_mrs;
struct llist_head rb_free_reps;
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
@@ -384,7 +381,7 @@ struct rpcrdma_buffer {
u32 rb_bc_srv_max_requests;
u32 rb_bc_max_requests;
struct delayed_work rb_refresh_worker;
struct work_struct rb_refresh_worker;
};
/*
@@ -490,7 +487,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);
static inline void
rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
@@ -546,6 +542,7 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
bool frwr_is_supported(struct ib_device *device);
void frwr_recycle(struct rpcrdma_req *req);
void frwr_reset(struct rpcrdma_req *req);
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
@@ -554,7 +551,7 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
struct rpcrdma_mr **mr);
struct rpcrdma_mr *mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);

View File

@@ -562,10 +562,14 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
printk(KERN_WARNING "Callback slot table overflowed\n");
return -ESHUTDOWN;
}
if (transport->recv.copied && !req->rq_private_buf.len)
return -ESHUTDOWN;
ret = xs_read_stream_request(transport, msg, flags, req);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_bc_request(req, transport->recv.copied);
else
req->rq_private_buf.len = transport->recv.copied;
return ret;
}
@@ -587,7 +591,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid);
if (!req) {
if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
msg->msg_flags |= MSG_TRUNC;
goto out;
}
@@ -599,6 +603,8 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
spin_lock(&xprt->queue_lock);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_rqst(req->rq_task, transport->recv.copied);
else
req->rq_private_buf.len = transport->recv.copied;
xprt_unpin_rqst(req);
out:
spin_unlock(&xprt->queue_lock);