Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust: "Highlights include: New features: - RDMA client backchannel from Chuck - Support for NFSv4.2 file CLONE using the btrfs ioctl Bugfixes + cleanups: - Move socket data receive out of the bottom halves and into a workqueue - Refactor NFSv4 error handling so synchronous and asynchronous RPC handles errors identically. - Fix a panic when blocks or object layouts reads return a bad data length - Fix nfsroot so it can handle a 1024 byte long path. - Fix bad usage of page offset in bl_read_pagelist - Various NFSv4 callback cleanups+fixes - Fix GETATTR bitmap verification - Support hexadecimal number for sunrpc debug sysctl files" * tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (53 commits) Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug nfs: Fix GETATTR bitmap verification nfs: Remove unused xdr page offsets in getacl/setacl arguments fs/nfs: remove unnecessary new_valid_dev check SUNRPC: fix variable type NFS: Enable client side NFSv4.1 backchannel to use other transports pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors SUNRPC: Remove the TCP-only restriction in bc_svc_process() svcrdma: Add backward direction service for RPC/RDMA transport xprtrdma: Handle incoming backward direction RPC calls xprtrdma: Add support for sending backward direction RPC replies xprtrdma: Pre-allocate Work Requests for backchannel xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers SUNRPC: Abstract backchannel operations xprtrdma: Saving IRQs no longer needed for rb_lock xprtrdma: Remove reply tasklet xprtrdma: Use workqueue to process RPC/RDMA replies xprtrdma: Replace send and receive arrays xprtrdma: Refactor reply handler error handling ...
This commit is contained in:
@@ -137,6 +137,14 @@ out_free:
|
||||
* callback requests can be up to 4096 bytes in size.
|
||||
*/
|
||||
int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
|
||||
{
|
||||
if (!xprt->ops->bc_setup)
|
||||
return 0;
|
||||
return xprt->ops->bc_setup(xprt, min_reqs);
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
|
||||
|
||||
int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
|
||||
{
|
||||
struct rpc_rqst *req;
|
||||
struct list_head tmp_list;
|
||||
@@ -192,7 +200,6 @@ out_free:
|
||||
dprintk("RPC: setup backchannel transport failed\n");
|
||||
return -ENOMEM;
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
|
||||
|
||||
/**
|
||||
* xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
|
||||
@@ -204,6 +211,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
|
||||
* of reqs specified by the caller.
|
||||
*/
|
||||
void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
|
||||
{
|
||||
if (xprt->ops->bc_destroy)
|
||||
xprt->ops->bc_destroy(xprt, max_reqs);
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
|
||||
|
||||
void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
|
||||
{
|
||||
struct rpc_rqst *req = NULL, *tmp = NULL;
|
||||
|
||||
@@ -227,7 +241,6 @@ out:
|
||||
dprintk("RPC: backchannel list empty= %s\n",
|
||||
list_empty(&xprt->bc_pa_list) ? "true" : "false");
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
|
||||
|
||||
static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
|
||||
{
|
||||
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
|
||||
{
|
||||
struct rpc_xprt *xprt = req->rq_xprt;
|
||||
|
||||
xprt->ops->bc_free_rqst(req);
|
||||
}
|
||||
|
||||
void xprt_free_bc_rqst(struct rpc_rqst *req)
|
||||
{
|
||||
struct rpc_xprt *xprt = req->rq_xprt;
|
||||
|
||||
dprintk("RPC: free backchannel req=%p\n", req);
|
||||
|
||||
req->rq_connect_cookie = xprt->connect_cookie - 1;
|
||||
|
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
|
||||
/* reset result send buffer "put" position */
|
||||
resv->iov_len = 0;
|
||||
|
||||
if (rqstp->rq_prot != IPPROTO_TCP) {
|
||||
printk(KERN_ERR "No support for Non-TCP transports!\n");
|
||||
BUG();
|
||||
}
|
||||
|
||||
/*
|
||||
* Skip the next two words because they've already been
|
||||
* processed in the transport
|
||||
|
@@ -76,7 +76,7 @@ static int
|
||||
proc_dodebug(struct ctl_table *table, int write,
|
||||
void __user *buffer, size_t *lenp, loff_t *ppos)
|
||||
{
|
||||
char tmpbuf[20], c, *s;
|
||||
char tmpbuf[20], c, *s = NULL;
|
||||
char __user *p;
|
||||
unsigned int value;
|
||||
size_t left, len;
|
||||
@@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write,
|
||||
return -EFAULT;
|
||||
tmpbuf[left] = '\0';
|
||||
|
||||
for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--)
|
||||
value = 10 * value + (*s - '0');
|
||||
if (*s && !isspace(*s))
|
||||
return -EINVAL;
|
||||
while (left && isspace(*s))
|
||||
left--, s++;
|
||||
value = simple_strtol(tmpbuf, &s, 0);
|
||||
if (s) {
|
||||
left -= (s - tmpbuf);
|
||||
if (left && !isspace(*s))
|
||||
return -EINVAL;
|
||||
while (left && isspace(*s))
|
||||
left--, s++;
|
||||
} else
|
||||
left = 0;
|
||||
*(unsigned int *) table->data = value;
|
||||
/* Display the RPC tasks on writing to rpc_debug */
|
||||
if (strcmp(table->procname, "rpc_debug") == 0)
|
||||
rpc_show_tasks(&init_net);
|
||||
} else {
|
||||
if (!access_ok(VERIFY_WRITE, buffer, left))
|
||||
return -EFAULT;
|
||||
len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data);
|
||||
len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data);
|
||||
if (len > left)
|
||||
len = left;
|
||||
if (__copy_to_user(buffer, tmpbuf, len))
|
||||
if (copy_to_user(buffer, tmpbuf, len))
|
||||
return -EFAULT;
|
||||
if ((left -= len) > 0) {
|
||||
if (put_user('\n', (char __user *)buffer + len))
|
||||
|
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
|
||||
svc_rdma.o svc_rdma_transport.o \
|
||||
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
|
||||
module.o
|
||||
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
|
||||
|
394
net/sunrpc/xprtrdma/backchannel.c
Normal file
394
net/sunrpc/xprtrdma/backchannel.c
Normal file
@@ -0,0 +1,394 @@
|
||||
/*
|
||||
* Copyright (c) 2015 Oracle. All rights reserved.
|
||||
*
|
||||
* Support for backward direction RPCs on RPC/RDMA.
|
||||
*/
|
||||
|
||||
#include <linux/module.h>
|
||||
#include <linux/sunrpc/xprt.h>
|
||||
#include <linux/sunrpc/svc.h>
|
||||
#include <linux/sunrpc/svc_xprt.h>
|
||||
|
||||
#include "xprt_rdma.h"
|
||||
|
||||
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
|
||||
# define RPCDBG_FACILITY RPCDBG_TRANS
|
||||
#endif
|
||||
|
||||
#define RPCRDMA_BACKCHANNEL_DEBUG
|
||||
|
||||
static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
|
||||
struct rpc_rqst *rqst)
|
||||
{
|
||||
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
|
||||
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
|
||||
|
||||
spin_lock(&buf->rb_reqslock);
|
||||
list_del(&req->rl_all);
|
||||
spin_unlock(&buf->rb_reqslock);
|
||||
|
||||
rpcrdma_destroy_req(&r_xprt->rx_ia, req);
|
||||
|
||||
kfree(rqst);
|
||||
}
|
||||
|
||||
static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
|
||||
struct rpc_rqst *rqst)
|
||||
{
|
||||
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
||||
struct rpcrdma_regbuf *rb;
|
||||
struct rpcrdma_req *req;
|
||||
struct xdr_buf *buf;
|
||||
size_t size;
|
||||
|
||||
req = rpcrdma_create_req(r_xprt);
|
||||
if (!req)
|
||||
return -ENOMEM;
|
||||
req->rl_backchannel = true;
|
||||
|
||||
size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
|
||||
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
|
||||
if (IS_ERR(rb))
|
||||
goto out_fail;
|
||||
req->rl_rdmabuf = rb;
|
||||
|
||||
size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
|
||||
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
|
||||
if (IS_ERR(rb))
|
||||
goto out_fail;
|
||||
rb->rg_owner = req;
|
||||
req->rl_sendbuf = rb;
|
||||
/* so that rpcr_to_rdmar works when receiving a request */
|
||||
rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
|
||||
|
||||
buf = &rqst->rq_snd_buf;
|
||||
buf->head[0].iov_base = rqst->rq_buffer;
|
||||
buf->head[0].iov_len = 0;
|
||||
buf->tail[0].iov_base = NULL;
|
||||
buf->tail[0].iov_len = 0;
|
||||
buf->page_len = 0;
|
||||
buf->len = 0;
|
||||
buf->buflen = size;
|
||||
|
||||
return 0;
|
||||
|
||||
out_fail:
|
||||
rpcrdma_bc_free_rqst(r_xprt, rqst);
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
/* Allocate and add receive buffers to the rpcrdma_buffer's
|
||||
* existing list of rep's. These are released when the
|
||||
* transport is destroyed.
|
||||
*/
|
||||
static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
|
||||
unsigned int count)
|
||||
{
|
||||
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
|
||||
struct rpcrdma_rep *rep;
|
||||
unsigned long flags;
|
||||
int rc = 0;
|
||||
|
||||
while (count--) {
|
||||
rep = rpcrdma_create_rep(r_xprt);
|
||||
if (IS_ERR(rep)) {
|
||||
pr_err("RPC: %s: reply buffer alloc failed\n",
|
||||
__func__);
|
||||
rc = PTR_ERR(rep);
|
||||
break;
|
||||
}
|
||||
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
|
||||
* @xprt: transport associated with these backchannel resources
|
||||
* @reqs: number of concurrent incoming requests to expect
|
||||
*
|
||||
* Returns 0 on success; otherwise a negative errno
|
||||
*/
|
||||
int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
|
||||
{
|
||||
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
||||
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
|
||||
struct rpc_rqst *rqst;
|
||||
unsigned int i;
|
||||
int rc;
|
||||
|
||||
/* The backchannel reply path returns each rpc_rqst to the
|
||||
* bc_pa_list _after_ the reply is sent. If the server is
|
||||
* faster than the client, it can send another backward
|
||||
* direction request before the rpc_rqst is returned to the
|
||||
* list. The client rejects the request in this case.
|
||||
*
|
||||
* Twice as many rpc_rqsts are prepared to ensure there is
|
||||
* always an rpc_rqst available as soon as a reply is sent.
|
||||
*/
|
||||
if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
|
||||
goto out_err;
|
||||
|
||||
for (i = 0; i < (reqs << 1); i++) {
|
||||
rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
|
||||
if (!rqst) {
|
||||
pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
|
||||
__func__);
|
||||
goto out_free;
|
||||
}
|
||||
|
||||
rqst->rq_xprt = &r_xprt->rx_xprt;
|
||||
INIT_LIST_HEAD(&rqst->rq_list);
|
||||
INIT_LIST_HEAD(&rqst->rq_bc_list);
|
||||
|
||||
if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
|
||||
goto out_free;
|
||||
|
||||
spin_lock_bh(&xprt->bc_pa_lock);
|
||||
list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
|
||||
spin_unlock_bh(&xprt->bc_pa_lock);
|
||||
}
|
||||
|
||||
rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
|
||||
if (rc)
|
||||
goto out_free;
|
||||
|
||||
rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
|
||||
if (rc)
|
||||
goto out_free;
|
||||
|
||||
buffer->rb_bc_srv_max_requests = reqs;
|
||||
request_module("svcrdma");
|
||||
|
||||
return 0;
|
||||
|
||||
out_free:
|
||||
xprt_rdma_bc_destroy(xprt, reqs);
|
||||
|
||||
out_err:
|
||||
pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_rdma_bc_up - Create transport endpoint for backchannel service
|
||||
* @serv: server endpoint
|
||||
* @net: network namespace
|
||||
*
|
||||
* The "xprt" is an implied argument: it supplies the name of the
|
||||
* backchannel transport class.
|
||||
*
|
||||
* Returns zero on success, negative errno on failure
|
||||
*/
|
||||
int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* rpcrdma_bc_marshal_reply - Send backwards direction reply
|
||||
* @rqst: buffer containing RPC reply data
|
||||
*
|
||||
* Returns zero on success.
|
||||
*/
|
||||
int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
|
||||
{
|
||||
struct rpc_xprt *xprt = rqst->rq_xprt;
|
||||
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
||||
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
|
||||
struct rpcrdma_msg *headerp;
|
||||
size_t rpclen;
|
||||
|
||||
headerp = rdmab_to_msg(req->rl_rdmabuf);
|
||||
headerp->rm_xid = rqst->rq_xid;
|
||||
headerp->rm_vers = rpcrdma_version;
|
||||
headerp->rm_credit =
|
||||
cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
|
||||
headerp->rm_type = rdma_msg;
|
||||
headerp->rm_body.rm_chunks[0] = xdr_zero;
|
||||
headerp->rm_body.rm_chunks[1] = xdr_zero;
|
||||
headerp->rm_body.rm_chunks[2] = xdr_zero;
|
||||
|
||||
rpclen = rqst->rq_svec[0].iov_len;
|
||||
|
||||
pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
|
||||
__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
|
||||
pr_info("RPC: %s: RPC/RDMA: %*ph\n",
|
||||
__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
|
||||
pr_info("RPC: %s: RPC: %*ph\n",
|
||||
__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
|
||||
|
||||
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
|
||||
req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
|
||||
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
|
||||
|
||||
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
|
||||
req->rl_send_iov[1].length = rpclen;
|
||||
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
|
||||
|
||||
req->rl_niovs = 2;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_rdma_bc_destroy - Release resources for handling backchannel requests
|
||||
* @xprt: transport associated with these backchannel resources
|
||||
* @reqs: number of incoming requests to destroy; ignored
|
||||
*/
|
||||
void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
|
||||
{
|
||||
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
||||
struct rpc_rqst *rqst, *tmp;
|
||||
|
||||
spin_lock_bh(&xprt->bc_pa_lock);
|
||||
list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
|
||||
list_del(&rqst->rq_bc_pa_list);
|
||||
spin_unlock_bh(&xprt->bc_pa_lock);
|
||||
|
||||
rpcrdma_bc_free_rqst(r_xprt, rqst);
|
||||
|
||||
spin_lock_bh(&xprt->bc_pa_lock);
|
||||
}
|
||||
spin_unlock_bh(&xprt->bc_pa_lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_rdma_bc_free_rqst - Release a backchannel rqst
|
||||
* @rqst: request to release
|
||||
*/
|
||||
void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
|
||||
{
|
||||
struct rpc_xprt *xprt = rqst->rq_xprt;
|
||||
|
||||
smp_mb__before_atomic();
|
||||
WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
|
||||
clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
|
||||
smp_mb__after_atomic();
|
||||
|
||||
spin_lock_bh(&xprt->bc_pa_lock);
|
||||
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
|
||||
spin_unlock_bh(&xprt->bc_pa_lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* rpcrdma_bc_receive_call - Handle a backward direction call
|
||||
* @xprt: transport receiving the call
|
||||
* @rep: receive buffer containing the call
|
||||
*
|
||||
* Called in the RPC reply handler, which runs in a tasklet.
|
||||
* Be quick about it.
|
||||
*
|
||||
* Operational assumptions:
|
||||
* o Backchannel credits are ignored, just as the NFS server
|
||||
* forechannel currently does
|
||||
* o The ULP manages a replay cache (eg, NFSv4.1 sessions).
|
||||
* No replay detection is done at the transport level
|
||||
*/
|
||||
void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
|
||||
struct rpcrdma_rep *rep)
|
||||
{
|
||||
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
|
||||
struct rpcrdma_msg *headerp;
|
||||
struct svc_serv *bc_serv;
|
||||
struct rpcrdma_req *req;
|
||||
struct rpc_rqst *rqst;
|
||||
struct xdr_buf *buf;
|
||||
size_t size;
|
||||
__be32 *p;
|
||||
|
||||
headerp = rdmab_to_msg(rep->rr_rdmabuf);
|
||||
#ifdef RPCRDMA_BACKCHANNEL_DEBUG
|
||||
pr_info("RPC: %s: callback XID %08x, length=%u\n",
|
||||
__func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
|
||||
pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
|
||||
#endif
|
||||
|
||||
/* Sanity check:
|
||||
* Need at least enough bytes for RPC/RDMA header, as code
|
||||
* here references the header fields by array offset. Also,
|
||||
* backward calls are always inline, so ensure there
|
||||
* are some bytes beyond the RPC/RDMA header.
|
||||
*/
|
||||
if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
|
||||
goto out_short;
|
||||
p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
|
||||
size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
|
||||
|
||||
/* Grab a free bc rqst */
|
||||
spin_lock(&xprt->bc_pa_lock);
|
||||
if (list_empty(&xprt->bc_pa_list)) {
|
||||
spin_unlock(&xprt->bc_pa_lock);
|
||||
goto out_overflow;
|
||||
}
|
||||
rqst = list_first_entry(&xprt->bc_pa_list,
|
||||
struct rpc_rqst, rq_bc_pa_list);
|
||||
list_del(&rqst->rq_bc_pa_list);
|
||||
spin_unlock(&xprt->bc_pa_lock);
|
||||
#ifdef RPCRDMA_BACKCHANNEL_DEBUG
|
||||
pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
|
||||
#endif
|
||||
|
||||
/* Prepare rqst */
|
||||
rqst->rq_reply_bytes_recvd = 0;
|
||||
rqst->rq_bytes_sent = 0;
|
||||
rqst->rq_xid = headerp->rm_xid;
|
||||
set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
|
||||
|
||||
buf = &rqst->rq_rcv_buf;
|
||||
memset(buf, 0, sizeof(*buf));
|
||||
buf->head[0].iov_base = p;
|
||||
buf->head[0].iov_len = size;
|
||||
buf->len = size;
|
||||
|
||||
/* The receive buffer has to be hooked to the rpcrdma_req
|
||||
* so that it can be reposted after the server is done
|
||||
* parsing it but just before sending the backward
|
||||
* direction reply.
|
||||
*/
|
||||
req = rpcr_to_rdmar(rqst);
|
||||
#ifdef RPCRDMA_BACKCHANNEL_DEBUG
|
||||
pr_info("RPC: %s: attaching rep %p to req %p\n",
|
||||
__func__, rep, req);
|
||||
#endif
|
||||
req->rl_reply = rep;
|
||||
|
||||
/* Defeat the retransmit detection logic in send_request */
|
||||
req->rl_connect_cookie = 0;
|
||||
|
||||
/* Queue rqst for ULP's callback service */
|
||||
bc_serv = xprt->bc_serv;
|
||||
spin_lock(&bc_serv->sv_cb_lock);
|
||||
list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
|
||||
spin_unlock(&bc_serv->sv_cb_lock);
|
||||
|
||||
wake_up(&bc_serv->sv_cb_waitq);
|
||||
|
||||
r_xprt->rx_stats.bcall_count++;
|
||||
return;
|
||||
|
||||
out_overflow:
|
||||
pr_warn("RPC/RDMA backchannel overflow\n");
|
||||
xprt_disconnect_done(xprt);
|
||||
/* This receive buffer gets reposted automatically
|
||||
* when the connection is re-established.
|
||||
*/
|
||||
return;
|
||||
|
||||
out_short:
|
||||
pr_warn("RPC/RDMA short backward direction call\n");
|
||||
|
||||
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
|
||||
xprt_disconnect_done(xprt);
|
||||
else
|
||||
pr_warn("RPC: %s: reposting rep %p\n",
|
||||
__func__, rep);
|
||||
}
|
@@ -256,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
|
||||
|
||||
/* WARNING: Only wr_id and status are reliable at this point */
|
||||
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
|
||||
pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n",
|
||||
__func__, r, ib_wc_status_msg(wc->status), wc->status);
|
||||
if (wc->status == IB_WC_WR_FLUSH_ERR)
|
||||
dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
|
||||
else
|
||||
pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
|
||||
__func__, r, ib_wc_status_msg(wc->status), wc->status);
|
||||
r->r.frmr.fr_state = FRMR_IS_STALE;
|
||||
}
|
||||
|
||||
|
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
|
||||
enum rpcrdma_chunktype rtype, wtype;
|
||||
struct rpcrdma_msg *headerp;
|
||||
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
|
||||
return rpcrdma_bc_marshal_reply(rqst);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* rpclen gets amount of data in first buffer, which is the
|
||||
* pre-registered buffer.
|
||||
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
}
|
||||
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
/* By convention, backchannel calls arrive via rdma_msg type
|
||||
* messages, and never populate the chunk lists. This makes
|
||||
* the RPC/RDMA header small and fixed in size, so it is
|
||||
* straightforward to check the RPC header's direction field.
|
||||
*/
|
||||
static bool
|
||||
rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
|
||||
{
|
||||
__be32 *p = (__be32 *)headerp;
|
||||
|
||||
if (headerp->rm_type != rdma_msg)
|
||||
return false;
|
||||
if (headerp->rm_body.rm_chunks[0] != xdr_zero)
|
||||
return false;
|
||||
if (headerp->rm_body.rm_chunks[1] != xdr_zero)
|
||||
return false;
|
||||
if (headerp->rm_body.rm_chunks[2] != xdr_zero)
|
||||
return false;
|
||||
|
||||
/* sanity */
|
||||
if (p[7] != headerp->rm_xid)
|
||||
return false;
|
||||
/* call direction */
|
||||
if (p[8] != cpu_to_be32(RPC_CALL))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
|
||||
|
||||
/*
|
||||
* This function is called when an async event is posted to
|
||||
* the connection which changes the connection state. All it
|
||||
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
|
||||
schedule_delayed_work(&ep->rep_connect_worker, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Called as a tasklet to do req/reply match and complete a request
|
||||
/* Process received RPC/RDMA messages.
|
||||
*
|
||||
* Errors must result in the RPC task either being awakened, or
|
||||
* allowed to timeout, to discover the errors at that time.
|
||||
*/
|
||||
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
|
||||
unsigned long cwnd;
|
||||
u32 credits;
|
||||
|
||||
/* Check status. If bad, signal disconnect and return rep to pool */
|
||||
if (rep->rr_len == ~0U) {
|
||||
rpcrdma_recv_buffer_put(rep);
|
||||
if (r_xprt->rx_ep.rep_connected == 1) {
|
||||
r_xprt->rx_ep.rep_connected = -EIO;
|
||||
rpcrdma_conn_func(&r_xprt->rx_ep);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
|
||||
dprintk("RPC: %s: short/invalid reply\n", __func__);
|
||||
goto repost;
|
||||
}
|
||||
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
|
||||
|
||||
if (rep->rr_len == RPCRDMA_BAD_LEN)
|
||||
goto out_badstatus;
|
||||
if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
|
||||
goto out_shortreply;
|
||||
|
||||
headerp = rdmab_to_msg(rep->rr_rdmabuf);
|
||||
if (headerp->rm_vers != rpcrdma_version) {
|
||||
dprintk("RPC: %s: invalid version %d\n",
|
||||
__func__, be32_to_cpu(headerp->rm_vers));
|
||||
goto repost;
|
||||
}
|
||||
if (headerp->rm_vers != rpcrdma_version)
|
||||
goto out_badversion;
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
if (rpcrdma_is_bcall(headerp))
|
||||
goto out_bcall;
|
||||
#endif
|
||||
|
||||
/* Get XID and try for a match. */
|
||||
spin_lock(&xprt->transport_lock);
|
||||
/* Match incoming rpcrdma_rep to an rpcrdma_req to
|
||||
* get context for handling any incoming chunks.
|
||||
*/
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
|
||||
if (rqst == NULL) {
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
dprintk("RPC: %s: reply 0x%p failed "
|
||||
"to match any request xid 0x%08x len %d\n",
|
||||
__func__, rep, be32_to_cpu(headerp->rm_xid),
|
||||
rep->rr_len);
|
||||
repost:
|
||||
r_xprt->rx_stats.bad_reply_count++;
|
||||
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
|
||||
rpcrdma_recv_buffer_put(rep);
|
||||
if (!rqst)
|
||||
goto out_nomatch;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* get request object */
|
||||
req = rpcr_to_rdmar(rqst);
|
||||
if (req->rl_reply) {
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
dprintk("RPC: %s: duplicate reply 0x%p to RPC "
|
||||
"request 0x%p: xid 0x%08x\n", __func__, rep, req,
|
||||
be32_to_cpu(headerp->rm_xid));
|
||||
goto repost;
|
||||
}
|
||||
if (req->rl_reply)
|
||||
goto out_duplicate;
|
||||
|
||||
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
|
||||
" RPC request 0x%p xid 0x%08x\n",
|
||||
@@ -883,8 +899,50 @@ badheader:
|
||||
if (xprt->cwnd > cwnd)
|
||||
xprt_release_rqst_cong(rqst->rq_task);
|
||||
|
||||
xprt_complete_rqst(rqst->rq_task, status);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
|
||||
__func__, xprt, rqst, status);
|
||||
xprt_complete_rqst(rqst->rq_task, status);
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
return;
|
||||
|
||||
out_badstatus:
|
||||
rpcrdma_recv_buffer_put(rep);
|
||||
if (r_xprt->rx_ep.rep_connected == 1) {
|
||||
r_xprt->rx_ep.rep_connected = -EIO;
|
||||
rpcrdma_conn_func(&r_xprt->rx_ep);
|
||||
}
|
||||
return;
|
||||
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
out_bcall:
|
||||
rpcrdma_bc_receive_call(r_xprt, rep);
|
||||
return;
|
||||
#endif
|
||||
|
||||
out_shortreply:
|
||||
dprintk("RPC: %s: short/invalid reply\n", __func__);
|
||||
goto repost;
|
||||
|
||||
out_badversion:
|
||||
dprintk("RPC: %s: invalid version %d\n",
|
||||
__func__, be32_to_cpu(headerp->rm_vers));
|
||||
goto repost;
|
||||
|
||||
out_nomatch:
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
|
||||
__func__, be32_to_cpu(headerp->rm_xid),
|
||||
rep->rr_len);
|
||||
goto repost;
|
||||
|
||||
out_duplicate:
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
dprintk("RPC: %s: "
|
||||
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
|
||||
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
|
||||
|
||||
repost:
|
||||
r_xprt->rx_stats.bad_reply_count++;
|
||||
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
|
||||
rpcrdma_recv_buffer_put(rep);
|
||||
}
|
||||
|
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
|
||||
unregister_sysctl_table(svcrdma_table_header);
|
||||
svcrdma_table_header = NULL;
|
||||
}
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
svc_unreg_xprt_class(&svc_rdma_bc_class);
|
||||
#endif
|
||||
svc_unreg_xprt_class(&svc_rdma_class);
|
||||
kmem_cache_destroy(svc_rdma_map_cachep);
|
||||
kmem_cache_destroy(svc_rdma_ctxt_cachep);
|
||||
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
|
||||
|
||||
/* Register RDMA with the SVC transport switch */
|
||||
svc_reg_xprt_class(&svc_rdma_class);
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
svc_reg_xprt_class(&svc_rdma_bc_class);
|
||||
#endif
|
||||
return 0;
|
||||
err1:
|
||||
kmem_cache_destroy(svc_rdma_map_cachep);
|
||||
|
@@ -56,6 +56,7 @@
|
||||
|
||||
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
|
||||
|
||||
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
|
||||
static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
|
||||
struct net *net,
|
||||
struct sockaddr *sa, int salen,
|
||||
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
|
||||
.xcl_ident = XPRT_TRANSPORT_RDMA,
|
||||
};
|
||||
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
|
||||
struct sockaddr *, int, int);
|
||||
static void svc_rdma_bc_detach(struct svc_xprt *);
|
||||
static void svc_rdma_bc_free(struct svc_xprt *);
|
||||
|
||||
static struct svc_xprt_ops svc_rdma_bc_ops = {
|
||||
.xpo_create = svc_rdma_bc_create,
|
||||
.xpo_detach = svc_rdma_bc_detach,
|
||||
.xpo_free = svc_rdma_bc_free,
|
||||
.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
|
||||
.xpo_secure_port = svc_rdma_secure_port,
|
||||
};
|
||||
|
||||
struct svc_xprt_class svc_rdma_bc_class = {
|
||||
.xcl_name = "rdma-bc",
|
||||
.xcl_owner = THIS_MODULE,
|
||||
.xcl_ops = &svc_rdma_bc_ops,
|
||||
.xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
|
||||
};
|
||||
|
||||
static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
|
||||
struct net *net,
|
||||
struct sockaddr *sa, int salen,
|
||||
int flags)
|
||||
{
|
||||
struct svcxprt_rdma *cma_xprt;
|
||||
struct svc_xprt *xprt;
|
||||
|
||||
cma_xprt = rdma_create_xprt(serv, 0);
|
||||
if (!cma_xprt)
|
||||
return ERR_PTR(-ENOMEM);
|
||||
xprt = &cma_xprt->sc_xprt;
|
||||
|
||||
svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
|
||||
serv->sv_bc_xprt = xprt;
|
||||
|
||||
dprintk("svcrdma: %s(%p)\n", __func__, xprt);
|
||||
return xprt;
|
||||
}
|
||||
|
||||
static void svc_rdma_bc_detach(struct svc_xprt *xprt)
|
||||
{
|
||||
dprintk("svcrdma: %s(%p)\n", __func__, xprt);
|
||||
}
|
||||
|
||||
static void svc_rdma_bc_free(struct svc_xprt *xprt)
|
||||
{
|
||||
struct svcxprt_rdma *rdma =
|
||||
container_of(xprt, struct svcxprt_rdma, sc_xprt);
|
||||
|
||||
dprintk("svcrdma: %s(%p)\n", __func__, xprt);
|
||||
if (xprt)
|
||||
kfree(rdma);
|
||||
}
|
||||
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
|
||||
|
||||
struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
|
||||
{
|
||||
struct svc_rdma_op_ctxt *ctxt;
|
||||
|
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
|
||||
static int
|
||||
xprt_rdma_enable_swap(struct rpc_xprt *xprt)
|
||||
{
|
||||
return -EINVAL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
|
||||
.print_stats = xprt_rdma_print_stats,
|
||||
.enable_swap = xprt_rdma_enable_swap,
|
||||
.disable_swap = xprt_rdma_disable_swap,
|
||||
.inject_disconnect = xprt_rdma_inject_disconnect
|
||||
.inject_disconnect = xprt_rdma_inject_disconnect,
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
.bc_setup = xprt_rdma_bc_setup,
|
||||
.bc_up = xprt_rdma_bc_up,
|
||||
.bc_free_rqst = xprt_rdma_bc_free_rqst,
|
||||
.bc_destroy = xprt_rdma_bc_destroy,
|
||||
#endif
|
||||
};
|
||||
|
||||
static struct xprt_class xprt_rdma = {
|
||||
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
|
||||
dprintk("RPC: %s: xprt_unregister returned %i\n",
|
||||
__func__, rc);
|
||||
|
||||
rpcrdma_destroy_wq();
|
||||
frwr_destroy_recovery_wq();
|
||||
}
|
||||
|
||||
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
|
||||
if (rc)
|
||||
return rc;
|
||||
|
||||
rc = rpcrdma_alloc_wq();
|
||||
if (rc) {
|
||||
frwr_destroy_recovery_wq();
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = xprt_register_transport(&xprt_rdma);
|
||||
if (rc) {
|
||||
rpcrdma_destroy_wq();
|
||||
frwr_destroy_recovery_wq();
|
||||
return rc;
|
||||
}
|
||||
|
@@ -68,47 +68,33 @@
|
||||
* internal functions
|
||||
*/
|
||||
|
||||
/*
|
||||
* handle replies in tasklet context, using a single, global list
|
||||
* rdma tasklet function -- just turn around and call the func
|
||||
* for all replies on the list
|
||||
*/
|
||||
static struct workqueue_struct *rpcrdma_receive_wq;
|
||||
|
||||
static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
|
||||
static LIST_HEAD(rpcrdma_tasklets_g);
|
||||
|
||||
static void
|
||||
rpcrdma_run_tasklet(unsigned long data)
|
||||
int
|
||||
rpcrdma_alloc_wq(void)
|
||||
{
|
||||
struct rpcrdma_rep *rep;
|
||||
unsigned long flags;
|
||||
struct workqueue_struct *recv_wq;
|
||||
|
||||
data = data;
|
||||
spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
|
||||
while (!list_empty(&rpcrdma_tasklets_g)) {
|
||||
rep = list_entry(rpcrdma_tasklets_g.next,
|
||||
struct rpcrdma_rep, rr_list);
|
||||
list_del(&rep->rr_list);
|
||||
spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
|
||||
recv_wq = alloc_workqueue("xprtrdma_receive",
|
||||
WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
|
||||
0);
|
||||
if (!recv_wq)
|
||||
return -ENOMEM;
|
||||
|
||||
rpcrdma_reply_handler(rep);
|
||||
|
||||
spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
|
||||
}
|
||||
spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
|
||||
rpcrdma_receive_wq = recv_wq;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
|
||||
|
||||
static void
|
||||
rpcrdma_schedule_tasklet(struct list_head *sched_list)
|
||||
void
|
||||
rpcrdma_destroy_wq(void)
|
||||
{
|
||||
unsigned long flags;
|
||||
struct workqueue_struct *wq;
|
||||
|
||||
spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
|
||||
list_splice_tail(sched_list, &rpcrdma_tasklets_g);
|
||||
spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
|
||||
tasklet_schedule(&rpcrdma_tasklet_g);
|
||||
if (rpcrdma_receive_wq) {
|
||||
wq = rpcrdma_receive_wq;
|
||||
rpcrdma_receive_wq = NULL;
|
||||
destroy_workqueue(wq);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
|
||||
/* The common case is a single send completion is waiting. By
|
||||
* passing two WC entries to ib_poll_cq, a return code of 1
|
||||
* means there is exactly one WC waiting and no more. We don't
|
||||
* have to invoke ib_poll_cq again to know that the CQ has been
|
||||
* properly drained.
|
||||
*/
|
||||
static void
|
||||
rpcrdma_sendcq_poll(struct ib_cq *cq)
|
||||
{
|
||||
struct ib_wc *wcs;
|
||||
int budget, count, rc;
|
||||
struct ib_wc *pos, wcs[2];
|
||||
int count, rc;
|
||||
|
||||
budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
|
||||
do {
|
||||
wcs = ep->rep_send_wcs;
|
||||
pos = wcs;
|
||||
|
||||
rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
|
||||
if (rc <= 0)
|
||||
return rc;
|
||||
rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
|
||||
if (rc < 0)
|
||||
break;
|
||||
|
||||
count = rc;
|
||||
while (count-- > 0)
|
||||
rpcrdma_sendcq_process_wc(wcs++);
|
||||
} while (rc == RPCRDMA_POLLSIZE && --budget);
|
||||
return 0;
|
||||
rpcrdma_sendcq_process_wc(pos++);
|
||||
} while (rc == ARRAY_SIZE(wcs));
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle send, fast_reg_mr, and local_inv completions.
|
||||
*
|
||||
* Send events are typically suppressed and thus do not result
|
||||
* in an upcall. Occasionally one is signaled, however. This
|
||||
* prevents the provider's completion queue from wrapping and
|
||||
* losing a completion.
|
||||
/* Handle provider send completion upcalls.
|
||||
*/
|
||||
static void
|
||||
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
|
||||
{
|
||||
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
|
||||
int rc;
|
||||
|
||||
rc = rpcrdma_sendcq_poll(cq, ep);
|
||||
if (rc) {
|
||||
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
|
||||
__func__, rc);
|
||||
return;
|
||||
}
|
||||
|
||||
rc = ib_req_notify_cq(cq,
|
||||
IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
|
||||
if (rc == 0)
|
||||
return;
|
||||
if (rc < 0) {
|
||||
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
|
||||
__func__, rc);
|
||||
return;
|
||||
}
|
||||
|
||||
rpcrdma_sendcq_poll(cq, ep);
|
||||
do {
|
||||
rpcrdma_sendcq_poll(cq);
|
||||
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
|
||||
IB_CQ_REPORT_MISSED_EVENTS) > 0);
|
||||
}
|
||||
|
||||
static void
|
||||
rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
|
||||
rpcrdma_receive_worker(struct work_struct *work)
|
||||
{
|
||||
struct rpcrdma_rep *rep =
|
||||
container_of(work, struct rpcrdma_rep, rr_work);
|
||||
|
||||
rpcrdma_reply_handler(rep);
|
||||
}
|
||||
|
||||
static void
|
||||
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
|
||||
{
|
||||
struct rpcrdma_rep *rep =
|
||||
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
|
||||
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
|
||||
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
|
||||
|
||||
out_schedule:
|
||||
list_add_tail(&rep->rr_list, sched_list);
|
||||
queue_work(rpcrdma_receive_wq, &rep->rr_work);
|
||||
return;
|
||||
|
||||
out_fail:
|
||||
if (wc->status != IB_WC_WR_FLUSH_ERR)
|
||||
pr_err("RPC: %s: rep %p: %s\n",
|
||||
__func__, rep, ib_wc_status_msg(wc->status));
|
||||
rep->rr_len = ~0U;
|
||||
rep->rr_len = RPCRDMA_BAD_LEN;
|
||||
goto out_schedule;
|
||||
}
|
||||
|
||||
static int
|
||||
rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
|
||||
/* The wc array is on stack: automatic memory is always CPU-local.
|
||||
*
|
||||
* struct ib_wc is 64 bytes, making the poll array potentially
|
||||
* large. But this is at the bottom of the call chain. Further
|
||||
* substantial work is done in another thread.
|
||||
*/
|
||||
static void
|
||||
rpcrdma_recvcq_poll(struct ib_cq *cq)
|
||||
{
|
||||
struct list_head sched_list;
|
||||
struct ib_wc *wcs;
|
||||
int budget, count, rc;
|
||||
struct ib_wc *pos, wcs[4];
|
||||
int count, rc;
|
||||
|
||||
INIT_LIST_HEAD(&sched_list);
|
||||
budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
|
||||
do {
|
||||
wcs = ep->rep_recv_wcs;
|
||||
pos = wcs;
|
||||
|
||||
rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
|
||||
if (rc <= 0)
|
||||
goto out_schedule;
|
||||
rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
|
||||
if (rc < 0)
|
||||
break;
|
||||
|
||||
count = rc;
|
||||
while (count-- > 0)
|
||||
rpcrdma_recvcq_process_wc(wcs++, &sched_list);
|
||||
} while (rc == RPCRDMA_POLLSIZE && --budget);
|
||||
rc = 0;
|
||||
|
||||
out_schedule:
|
||||
rpcrdma_schedule_tasklet(&sched_list);
|
||||
return rc;
|
||||
rpcrdma_recvcq_process_wc(pos++);
|
||||
} while (rc == ARRAY_SIZE(wcs));
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle receive completions.
|
||||
*
|
||||
* It is reentrant but processes single events in order to maintain
|
||||
* ordering of receives to keep server credits.
|
||||
*
|
||||
* It is the responsibility of the scheduled tasklet to return
|
||||
* recv buffers to the pool. NOTE: this affects synchronization of
|
||||
* connection shutdown. That is, the structures required for
|
||||
* the completion of the reply handler must remain intact until
|
||||
* all memory has been reclaimed.
|
||||
/* Handle provider receive completion upcalls.
|
||||
*/
|
||||
static void
|
||||
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
|
||||
{
|
||||
struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
|
||||
int rc;
|
||||
|
||||
rc = rpcrdma_recvcq_poll(cq, ep);
|
||||
if (rc) {
|
||||
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
|
||||
__func__, rc);
|
||||
return;
|
||||
}
|
||||
|
||||
rc = ib_req_notify_cq(cq,
|
||||
IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
|
||||
if (rc == 0)
|
||||
return;
|
||||
if (rc < 0) {
|
||||
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
|
||||
__func__, rc);
|
||||
return;
|
||||
}
|
||||
|
||||
rpcrdma_recvcq_poll(cq, ep);
|
||||
do {
|
||||
rpcrdma_recvcq_poll(cq);
|
||||
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
|
||||
IB_CQ_REPORT_MISSED_EVENTS) > 0);
|
||||
}
|
||||
|
||||
static void
|
||||
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
|
||||
{
|
||||
struct ib_wc wc;
|
||||
LIST_HEAD(sched_list);
|
||||
|
||||
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
|
||||
rpcrdma_recvcq_process_wc(&wc, &sched_list);
|
||||
if (!list_empty(&sched_list))
|
||||
rpcrdma_schedule_tasklet(&sched_list);
|
||||
rpcrdma_recvcq_process_wc(&wc);
|
||||
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
|
||||
rpcrdma_sendcq_process_wc(&wc);
|
||||
}
|
||||
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
|
||||
struct ib_device_attr *devattr = &ia->ri_devattr;
|
||||
struct ib_cq *sendcq, *recvcq;
|
||||
struct ib_cq_init_attr cq_attr = {};
|
||||
unsigned int max_qp_wr;
|
||||
int rc, err;
|
||||
|
||||
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
|
||||
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
|
||||
dprintk("RPC: %s: insufficient wqe's available\n",
|
||||
__func__);
|
||||
return -ENOMEM;
|
||||
}
|
||||
max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
|
||||
|
||||
/* check provider's send/recv wr limits */
|
||||
if (cdata->max_requests > devattr->max_qp_wr)
|
||||
cdata->max_requests = devattr->max_qp_wr;
|
||||
if (cdata->max_requests > max_qp_wr)
|
||||
cdata->max_requests = max_qp_wr;
|
||||
|
||||
ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
|
||||
ep->rep_attr.qp_context = ep;
|
||||
ep->rep_attr.srq = NULL;
|
||||
ep->rep_attr.cap.max_send_wr = cdata->max_requests;
|
||||
ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
|
||||
rc = ia->ri_ops->ro_open(ia, ep, cdata);
|
||||
if (rc)
|
||||
return rc;
|
||||
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
|
||||
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
|
||||
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
|
||||
ep->rep_attr.cap.max_recv_sge = 1;
|
||||
ep->rep_attr.cap.max_inline_data = 0;
|
||||
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
|
||||
|
||||
cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
|
||||
sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
|
||||
rpcrdma_cq_async_error_upcall, ep, &cq_attr);
|
||||
rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
|
||||
if (IS_ERR(sendcq)) {
|
||||
rc = PTR_ERR(sendcq);
|
||||
dprintk("RPC: %s: failed to create send CQ: %i\n",
|
||||
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
|
||||
|
||||
cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
|
||||
recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
|
||||
rpcrdma_cq_async_error_upcall, ep, &cq_attr);
|
||||
rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
|
||||
if (IS_ERR(recvcq)) {
|
||||
rc = PTR_ERR(recvcq);
|
||||
dprintk("RPC: %s: failed to create recv CQ: %i\n",
|
||||
@@ -886,7 +842,21 @@ retry:
|
||||
}
|
||||
rc = ep->rep_connected;
|
||||
} else {
|
||||
struct rpcrdma_xprt *r_xprt;
|
||||
unsigned int extras;
|
||||
|
||||
dprintk("RPC: %s: connected\n", __func__);
|
||||
|
||||
r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
|
||||
extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
|
||||
|
||||
if (extras) {
|
||||
rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
|
||||
if (rc)
|
||||
pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
|
||||
__func__, rc);
|
||||
rc = 0;
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
@@ -923,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
|
||||
}
|
||||
}
|
||||
|
||||
static struct rpcrdma_req *
|
||||
struct rpcrdma_req *
|
||||
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
|
||||
{
|
||||
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
|
||||
struct rpcrdma_req *req;
|
||||
|
||||
req = kzalloc(sizeof(*req), GFP_KERNEL);
|
||||
if (req == NULL)
|
||||
return ERR_PTR(-ENOMEM);
|
||||
|
||||
INIT_LIST_HEAD(&req->rl_free);
|
||||
spin_lock(&buffer->rb_reqslock);
|
||||
list_add(&req->rl_all, &buffer->rb_allreqs);
|
||||
spin_unlock(&buffer->rb_reqslock);
|
||||
req->rl_buffer = &r_xprt->rx_buf;
|
||||
return req;
|
||||
}
|
||||
|
||||
static struct rpcrdma_rep *
|
||||
struct rpcrdma_rep *
|
||||
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
|
||||
{
|
||||
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
|
||||
@@ -958,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
|
||||
|
||||
rep->rr_device = ia->ri_device;
|
||||
rep->rr_rxprt = r_xprt;
|
||||
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
|
||||
return rep;
|
||||
|
||||
out_free:
|
||||
@@ -971,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
|
||||
{
|
||||
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
|
||||
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
||||
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
|
||||
char *p;
|
||||
size_t len;
|
||||
int i, rc;
|
||||
|
||||
buf->rb_max_requests = cdata->max_requests;
|
||||
buf->rb_max_requests = r_xprt->rx_data.max_requests;
|
||||
buf->rb_bc_srv_max_requests = 0;
|
||||
spin_lock_init(&buf->rb_lock);
|
||||
|
||||
/* Need to allocate:
|
||||
* 1. arrays for send and recv pointers
|
||||
* 2. arrays of struct rpcrdma_req to fill in pointers
|
||||
* 3. array of struct rpcrdma_rep for replies
|
||||
* Send/recv buffers in req/rep need to be registered
|
||||
*/
|
||||
len = buf->rb_max_requests *
|
||||
(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
|
||||
|
||||
p = kzalloc(len, GFP_KERNEL);
|
||||
if (p == NULL) {
|
||||
dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
|
||||
__func__, len);
|
||||
rc = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
buf->rb_pool = p; /* for freeing it later */
|
||||
|
||||
buf->rb_send_bufs = (struct rpcrdma_req **) p;
|
||||
p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
|
||||
buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
|
||||
p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
|
||||
|
||||
rc = ia->ri_ops->ro_init(r_xprt);
|
||||
if (rc)
|
||||
goto out;
|
||||
|
||||
INIT_LIST_HEAD(&buf->rb_send_bufs);
|
||||
INIT_LIST_HEAD(&buf->rb_allreqs);
|
||||
spin_lock_init(&buf->rb_reqslock);
|
||||
for (i = 0; i < buf->rb_max_requests; i++) {
|
||||
struct rpcrdma_req *req;
|
||||
struct rpcrdma_rep *rep;
|
||||
|
||||
req = rpcrdma_create_req(r_xprt);
|
||||
if (IS_ERR(req)) {
|
||||
@@ -1017,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
|
||||
rc = PTR_ERR(req);
|
||||
goto out;
|
||||
}
|
||||
buf->rb_send_bufs[i] = req;
|
||||
req->rl_backchannel = false;
|
||||
list_add(&req->rl_free, &buf->rb_send_bufs);
|
||||
}
|
||||
|
||||
INIT_LIST_HEAD(&buf->rb_recv_bufs);
|
||||
for (i = 0; i < buf->rb_max_requests + 2; i++) {
|
||||
struct rpcrdma_rep *rep;
|
||||
|
||||
rep = rpcrdma_create_rep(r_xprt);
|
||||
if (IS_ERR(rep)) {
|
||||
@@ -1026,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
|
||||
rc = PTR_ERR(rep);
|
||||
goto out;
|
||||
}
|
||||
buf->rb_recv_bufs[i] = rep;
|
||||
list_add(&rep->rr_list, &buf->rb_recv_bufs);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -1035,22 +994,38 @@ out:
|
||||
return rc;
|
||||
}
|
||||
|
||||
static struct rpcrdma_req *
|
||||
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
|
||||
{
|
||||
struct rpcrdma_req *req;
|
||||
|
||||
req = list_first_entry(&buf->rb_send_bufs,
|
||||
struct rpcrdma_req, rl_free);
|
||||
list_del(&req->rl_free);
|
||||
return req;
|
||||
}
|
||||
|
||||
static struct rpcrdma_rep *
|
||||
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
|
||||
{
|
||||
struct rpcrdma_rep *rep;
|
||||
|
||||
rep = list_first_entry(&buf->rb_recv_bufs,
|
||||
struct rpcrdma_rep, rr_list);
|
||||
list_del(&rep->rr_list);
|
||||
return rep;
|
||||
}
|
||||
|
||||
static void
|
||||
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
|
||||
{
|
||||
if (!rep)
|
||||
return;
|
||||
|
||||
rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
|
||||
kfree(rep);
|
||||
}
|
||||
|
||||
static void
|
||||
void
|
||||
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
|
||||
{
|
||||
if (!req)
|
||||
return;
|
||||
|
||||
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
|
||||
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
|
||||
kfree(req);
|
||||
@@ -1060,25 +1035,29 @@ void
|
||||
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
|
||||
{
|
||||
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
|
||||
int i;
|
||||
|
||||
/* clean up in reverse order from create
|
||||
* 1. recv mr memory (mr free, then kfree)
|
||||
* 2. send mr memory (mr free, then kfree)
|
||||
* 3. MWs
|
||||
*/
|
||||
dprintk("RPC: %s: entering\n", __func__);
|
||||
while (!list_empty(&buf->rb_recv_bufs)) {
|
||||
struct rpcrdma_rep *rep;
|
||||
|
||||
for (i = 0; i < buf->rb_max_requests; i++) {
|
||||
if (buf->rb_recv_bufs)
|
||||
rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
|
||||
if (buf->rb_send_bufs)
|
||||
rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
|
||||
rep = rpcrdma_buffer_get_rep_locked(buf);
|
||||
rpcrdma_destroy_rep(ia, rep);
|
||||
}
|
||||
|
||||
ia->ri_ops->ro_destroy(buf);
|
||||
spin_lock(&buf->rb_reqslock);
|
||||
while (!list_empty(&buf->rb_allreqs)) {
|
||||
struct rpcrdma_req *req;
|
||||
|
||||
kfree(buf->rb_pool);
|
||||
req = list_first_entry(&buf->rb_allreqs,
|
||||
struct rpcrdma_req, rl_all);
|
||||
list_del(&req->rl_all);
|
||||
|
||||
spin_unlock(&buf->rb_reqslock);
|
||||
rpcrdma_destroy_req(ia, req);
|
||||
spin_lock(&buf->rb_reqslock);
|
||||
}
|
||||
spin_unlock(&buf->rb_reqslock);
|
||||
|
||||
ia->ri_ops->ro_destroy(buf);
|
||||
}
|
||||
|
||||
struct rpcrdma_mw *
|
||||
@@ -1110,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
|
||||
spin_unlock(&buf->rb_mwlock);
|
||||
}
|
||||
|
||||
static void
|
||||
rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
|
||||
{
|
||||
buf->rb_send_bufs[--buf->rb_send_index] = req;
|
||||
req->rl_niovs = 0;
|
||||
if (req->rl_reply) {
|
||||
buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
|
||||
req->rl_reply = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get a set of request/reply buffers.
|
||||
*
|
||||
* Reply buffer (if needed) is attached to send buffer upon return.
|
||||
* Rule:
|
||||
* rb_send_index and rb_recv_index MUST always be pointing to the
|
||||
* *next* available buffer (non-NULL). They are incremented after
|
||||
* removing buffers, and decremented *before* returning them.
|
||||
* Reply buffer (if available) is attached to send buffer upon return.
|
||||
*/
|
||||
struct rpcrdma_req *
|
||||
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
|
||||
{
|
||||
struct rpcrdma_req *req;
|
||||
unsigned long flags;
|
||||
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
spin_lock(&buffers->rb_lock);
|
||||
if (list_empty(&buffers->rb_send_bufs))
|
||||
goto out_reqbuf;
|
||||
req = rpcrdma_buffer_get_req_locked(buffers);
|
||||
if (list_empty(&buffers->rb_recv_bufs))
|
||||
goto out_repbuf;
|
||||
req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
return req;
|
||||
|
||||
if (buffers->rb_send_index == buffers->rb_max_requests) {
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
dprintk("RPC: %s: out of request buffers\n", __func__);
|
||||
return ((struct rpcrdma_req *)NULL);
|
||||
}
|
||||
|
||||
req = buffers->rb_send_bufs[buffers->rb_send_index];
|
||||
if (buffers->rb_send_index < buffers->rb_recv_index) {
|
||||
dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
|
||||
__func__,
|
||||
buffers->rb_recv_index - buffers->rb_send_index);
|
||||
req->rl_reply = NULL;
|
||||
} else {
|
||||
req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
|
||||
buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
|
||||
}
|
||||
buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
|
||||
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
out_reqbuf:
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
pr_warn("RPC: %s: out of request buffers\n", __func__);
|
||||
return NULL;
|
||||
out_repbuf:
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
pr_warn("RPC: %s: out of reply buffers\n", __func__);
|
||||
req->rl_reply = NULL;
|
||||
return req;
|
||||
}
|
||||
|
||||
@@ -1168,30 +1128,31 @@ void
|
||||
rpcrdma_buffer_put(struct rpcrdma_req *req)
|
||||
{
|
||||
struct rpcrdma_buffer *buffers = req->rl_buffer;
|
||||
unsigned long flags;
|
||||
struct rpcrdma_rep *rep = req->rl_reply;
|
||||
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
rpcrdma_buffer_put_sendbuf(req, buffers);
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
req->rl_niovs = 0;
|
||||
req->rl_reply = NULL;
|
||||
|
||||
spin_lock(&buffers->rb_lock);
|
||||
list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
|
||||
if (rep)
|
||||
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Recover reply buffers from pool.
|
||||
* This happens when recovering from error conditions.
|
||||
* Post-increment counter/array index.
|
||||
* This happens when recovering from disconnect.
|
||||
*/
|
||||
void
|
||||
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
|
||||
{
|
||||
struct rpcrdma_buffer *buffers = req->rl_buffer;
|
||||
unsigned long flags;
|
||||
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
if (buffers->rb_recv_index < buffers->rb_max_requests) {
|
||||
req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
|
||||
buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
|
||||
}
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
spin_lock(&buffers->rb_lock);
|
||||
if (!list_empty(&buffers->rb_recv_bufs))
|
||||
req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1202,11 +1163,10 @@ void
|
||||
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
|
||||
{
|
||||
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
|
||||
unsigned long flags;
|
||||
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
spin_lock(&buffers->rb_lock);
|
||||
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
|
||||
spin_unlock(&buffers->rb_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1363,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
|
||||
* @r_xprt: transport associated with these backchannel resources
|
||||
* @min_reqs: minimum number of incoming requests expected
|
||||
*
|
||||
* Returns zero if all requested buffers were posted, or a negative errno.
|
||||
*/
|
||||
int
|
||||
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
|
||||
{
|
||||
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
|
||||
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
||||
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
|
||||
struct rpcrdma_rep *rep;
|
||||
unsigned long flags;
|
||||
int rc;
|
||||
|
||||
while (count--) {
|
||||
spin_lock_irqsave(&buffers->rb_lock, flags);
|
||||
if (list_empty(&buffers->rb_recv_bufs))
|
||||
goto out_reqbuf;
|
||||
rep = rpcrdma_buffer_get_rep_locked(buffers);
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
|
||||
rc = rpcrdma_ep_post_recv(ia, ep, rep);
|
||||
if (rc)
|
||||
goto out_rc;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
out_reqbuf:
|
||||
spin_unlock_irqrestore(&buffers->rb_lock, flags);
|
||||
pr_warn("%s: no extra receive buffers\n", __func__);
|
||||
return -ENOMEM;
|
||||
|
||||
out_rc:
|
||||
rpcrdma_recv_buffer_put(rep);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* How many chunk list items fit within our inline buffers?
|
||||
*/
|
||||
unsigned int
|
||||
|
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
|
||||
* RDMA Endpoint -- one per transport instance
|
||||
*/
|
||||
|
||||
#define RPCRDMA_WC_BUDGET (128)
|
||||
#define RPCRDMA_POLLSIZE (16)
|
||||
|
||||
struct rpcrdma_ep {
|
||||
atomic_t rep_cqcount;
|
||||
int rep_cqinit;
|
||||
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
|
||||
struct rdma_conn_param rep_remote_cma;
|
||||
struct sockaddr_storage rep_remote_addr;
|
||||
struct delayed_work rep_connect_worker;
|
||||
struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
|
||||
struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
|
||||
};
|
||||
|
||||
/*
|
||||
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
|
||||
*/
|
||||
#define RPCRDMA_IGNORE_COMPLETION (0ULL)
|
||||
|
||||
/* Pre-allocate extra Work Requests for handling backward receives
|
||||
* and sends. This is a fixed value because the Work Queues are
|
||||
* allocated when the forward channel is set up.
|
||||
*/
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
#define RPCRDMA_BACKWARD_WRS (8)
|
||||
#else
|
||||
#define RPCRDMA_BACKWARD_WRS (0)
|
||||
#endif
|
||||
|
||||
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
|
||||
*
|
||||
* The below structure appears at the front of a large region of kmalloc'd
|
||||
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
|
||||
unsigned int rr_len;
|
||||
struct ib_device *rr_device;
|
||||
struct rpcrdma_xprt *rr_rxprt;
|
||||
struct work_struct rr_work;
|
||||
struct list_head rr_list;
|
||||
struct rpcrdma_regbuf *rr_rdmabuf;
|
||||
};
|
||||
|
||||
#define RPCRDMA_BAD_LEN (~0U)
|
||||
|
||||
/*
|
||||
* struct rpcrdma_mw - external memory region metadata
|
||||
*
|
||||
@@ -256,6 +264,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
|
||||
#define RPCRDMA_MAX_IOVS (2)
|
||||
|
||||
struct rpcrdma_req {
|
||||
struct list_head rl_free;
|
||||
unsigned int rl_niovs;
|
||||
unsigned int rl_nchunks;
|
||||
unsigned int rl_connect_cookie;
|
||||
@@ -265,6 +274,9 @@ struct rpcrdma_req {
|
||||
struct rpcrdma_regbuf *rl_rdmabuf;
|
||||
struct rpcrdma_regbuf *rl_sendbuf;
|
||||
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
|
||||
|
||||
struct list_head rl_all;
|
||||
bool rl_backchannel;
|
||||
};
|
||||
|
||||
static inline struct rpcrdma_req *
|
||||
@@ -289,12 +301,14 @@ struct rpcrdma_buffer {
|
||||
struct list_head rb_all;
|
||||
char *rb_pool;
|
||||
|
||||
spinlock_t rb_lock; /* protect buf arrays */
|
||||
spinlock_t rb_lock; /* protect buf lists */
|
||||
struct list_head rb_send_bufs;
|
||||
struct list_head rb_recv_bufs;
|
||||
u32 rb_max_requests;
|
||||
int rb_send_index;
|
||||
int rb_recv_index;
|
||||
struct rpcrdma_req **rb_send_bufs;
|
||||
struct rpcrdma_rep **rb_recv_bufs;
|
||||
|
||||
u32 rb_bc_srv_max_requests;
|
||||
spinlock_t rb_reqslock; /* protect rb_allreqs */
|
||||
struct list_head rb_allreqs;
|
||||
};
|
||||
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
|
||||
|
||||
@@ -340,6 +354,7 @@ struct rpcrdma_stats {
|
||||
unsigned long failed_marshal_count;
|
||||
unsigned long bad_reply_count;
|
||||
unsigned long nomsg_call_count;
|
||||
unsigned long bcall_count;
|
||||
};
|
||||
|
||||
/*
|
||||
@@ -415,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
|
||||
/*
|
||||
* Buffer calls - xprtrdma/verbs.c
|
||||
*/
|
||||
struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
|
||||
struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
|
||||
void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
|
||||
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
|
||||
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
|
||||
|
||||
@@ -431,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
|
||||
struct rpcrdma_regbuf *);
|
||||
|
||||
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
|
||||
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
|
||||
|
||||
int frwr_alloc_recovery_wq(void);
|
||||
void frwr_destroy_recovery_wq(void);
|
||||
|
||||
int rpcrdma_alloc_wq(void);
|
||||
void rpcrdma_destroy_wq(void);
|
||||
|
||||
/*
|
||||
* Wrappers for chunk registration, shared by read/write chunk code.
|
||||
*/
|
||||
@@ -495,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
|
||||
int xprt_rdma_init(void);
|
||||
void xprt_rdma_cleanup(void);
|
||||
|
||||
/* Backchannel calls - xprtrdma/backchannel.c
|
||||
*/
|
||||
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
|
||||
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
|
||||
int xprt_rdma_bc_up(struct svc_serv *, struct net *);
|
||||
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
|
||||
void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
|
||||
int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
|
||||
void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
|
||||
void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
|
||||
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
|
||||
|
||||
/* Temporary NFS request map cache. Created in svc_rdma.c */
|
||||
extern struct kmem_cache *svc_rdma_map_cachep;
|
||||
/* WR context cache. Created in svc_rdma.c */
|
||||
|
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
|
||||
int flags = XS_SENDMSG_FLAGS;
|
||||
|
||||
remainder -= len;
|
||||
if (remainder != 0 || more)
|
||||
if (more)
|
||||
flags |= MSG_MORE;
|
||||
if (remainder != 0)
|
||||
flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
|
||||
err = do_sendpage(sock, *ppage, base, len, flags);
|
||||
if (remainder == 0 || err != len)
|
||||
break;
|
||||
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
|
||||
|
||||
kernel_sock_shutdown(sock, SHUT_RDWR);
|
||||
|
||||
mutex_lock(&transport->recv_mutex);
|
||||
write_lock_bh(&sk->sk_callback_lock);
|
||||
transport->inet = NULL;
|
||||
transport->sock = NULL;
|
||||
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
|
||||
xprt_clear_connected(xprt);
|
||||
write_unlock_bh(&sk->sk_callback_lock);
|
||||
xs_sock_reset_connection_flags(xprt);
|
||||
mutex_unlock(&transport->recv_mutex);
|
||||
|
||||
trace_rpc_socket_close(xprt, sock);
|
||||
sock_release(sock);
|
||||
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
|
||||
|
||||
cancel_delayed_work_sync(&transport->connect_worker);
|
||||
xs_close(xprt);
|
||||
cancel_work_sync(&transport->recv_worker);
|
||||
xs_xprt_free(xprt);
|
||||
module_put(THIS_MODULE);
|
||||
}
|
||||
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
|
||||
}
|
||||
|
||||
/**
|
||||
* xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
|
||||
* @sk: socket with data to read
|
||||
* xs_local_data_read_skb
|
||||
* @xprt: transport
|
||||
* @sk: socket
|
||||
* @skb: skbuff
|
||||
*
|
||||
* Currently this assumes we can read the whole reply in a single gulp.
|
||||
*/
|
||||
static void xs_local_data_ready(struct sock *sk)
|
||||
static void xs_local_data_read_skb(struct rpc_xprt *xprt,
|
||||
struct sock *sk,
|
||||
struct sk_buff *skb)
|
||||
{
|
||||
struct rpc_task *task;
|
||||
struct rpc_xprt *xprt;
|
||||
struct rpc_rqst *rovr;
|
||||
struct sk_buff *skb;
|
||||
int err, repsize, copied;
|
||||
int repsize, copied;
|
||||
u32 _xid;
|
||||
__be32 *xp;
|
||||
|
||||
read_lock_bh(&sk->sk_callback_lock);
|
||||
dprintk("RPC: %s...\n", __func__);
|
||||
xprt = xprt_from_sock(sk);
|
||||
if (xprt == NULL)
|
||||
goto out;
|
||||
|
||||
skb = skb_recv_datagram(sk, 0, 1, &err);
|
||||
if (skb == NULL)
|
||||
goto out;
|
||||
|
||||
repsize = skb->len - sizeof(rpc_fraghdr);
|
||||
if (repsize < 4) {
|
||||
dprintk("RPC: impossible RPC reply size %d\n", repsize);
|
||||
goto dropit;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Copy the XID from the skb... */
|
||||
xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
|
||||
if (xp == NULL)
|
||||
goto dropit;
|
||||
return;
|
||||
|
||||
/* Look up and lock the request corresponding to the given XID */
|
||||
spin_lock(&xprt->transport_lock);
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
rovr = xprt_lookup_rqst(xprt, *xp);
|
||||
if (!rovr)
|
||||
goto out_unlock;
|
||||
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
|
||||
xprt_complete_rqst(task, copied);
|
||||
|
||||
out_unlock:
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
dropit:
|
||||
skb_free_datagram(sk, skb);
|
||||
out:
|
||||
read_unlock_bh(&sk->sk_callback_lock);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
}
|
||||
|
||||
static void xs_local_data_receive(struct sock_xprt *transport)
|
||||
{
|
||||
struct sk_buff *skb;
|
||||
struct sock *sk;
|
||||
int err;
|
||||
|
||||
mutex_lock(&transport->recv_mutex);
|
||||
sk = transport->inet;
|
||||
if (sk == NULL)
|
||||
goto out;
|
||||
for (;;) {
|
||||
skb = skb_recv_datagram(sk, 0, 1, &err);
|
||||
if (skb == NULL)
|
||||
break;
|
||||
xs_local_data_read_skb(&transport->xprt, sk, skb);
|
||||
skb_free_datagram(sk, skb);
|
||||
}
|
||||
out:
|
||||
mutex_unlock(&transport->recv_mutex);
|
||||
}
|
||||
|
||||
static void xs_local_data_receive_workfn(struct work_struct *work)
|
||||
{
|
||||
struct sock_xprt *transport =
|
||||
container_of(work, struct sock_xprt, recv_worker);
|
||||
xs_local_data_receive(transport);
|
||||
}
|
||||
|
||||
/**
|
||||
* xs_udp_data_ready - "data ready" callback for UDP sockets
|
||||
* @sk: socket with data to read
|
||||
* xs_udp_data_read_skb - receive callback for UDP sockets
|
||||
* @xprt: transport
|
||||
* @sk: socket
|
||||
* @skb: skbuff
|
||||
*
|
||||
*/
|
||||
static void xs_udp_data_ready(struct sock *sk)
|
||||
static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
|
||||
struct sock *sk,
|
||||
struct sk_buff *skb)
|
||||
{
|
||||
struct rpc_task *task;
|
||||
struct rpc_xprt *xprt;
|
||||
struct rpc_rqst *rovr;
|
||||
struct sk_buff *skb;
|
||||
int err, repsize, copied;
|
||||
int repsize, copied;
|
||||
u32 _xid;
|
||||
__be32 *xp;
|
||||
|
||||
read_lock_bh(&sk->sk_callback_lock);
|
||||
dprintk("RPC: xs_udp_data_ready...\n");
|
||||
if (!(xprt = xprt_from_sock(sk)))
|
||||
goto out;
|
||||
|
||||
if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
|
||||
goto out;
|
||||
|
||||
repsize = skb->len - sizeof(struct udphdr);
|
||||
if (repsize < 4) {
|
||||
dprintk("RPC: impossible RPC reply size %d!\n", repsize);
|
||||
goto dropit;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Copy the XID from the skb... */
|
||||
xp = skb_header_pointer(skb, sizeof(struct udphdr),
|
||||
sizeof(_xid), &_xid);
|
||||
if (xp == NULL)
|
||||
goto dropit;
|
||||
return;
|
||||
|
||||
/* Look up and lock the request corresponding to the given XID */
|
||||
spin_lock(&xprt->transport_lock);
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
rovr = xprt_lookup_rqst(xprt, *xp);
|
||||
if (!rovr)
|
||||
goto out_unlock;
|
||||
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
|
||||
xprt_complete_rqst(task, copied);
|
||||
|
||||
out_unlock:
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
dropit:
|
||||
skb_free_datagram(sk, skb);
|
||||
out:
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
}
|
||||
|
||||
static void xs_udp_data_receive(struct sock_xprt *transport)
|
||||
{
|
||||
struct sk_buff *skb;
|
||||
struct sock *sk;
|
||||
int err;
|
||||
|
||||
mutex_lock(&transport->recv_mutex);
|
||||
sk = transport->inet;
|
||||
if (sk == NULL)
|
||||
goto out;
|
||||
for (;;) {
|
||||
skb = skb_recv_datagram(sk, 0, 1, &err);
|
||||
if (skb == NULL)
|
||||
break;
|
||||
xs_udp_data_read_skb(&transport->xprt, sk, skb);
|
||||
skb_free_datagram(sk, skb);
|
||||
}
|
||||
out:
|
||||
mutex_unlock(&transport->recv_mutex);
|
||||
}
|
||||
|
||||
static void xs_udp_data_receive_workfn(struct work_struct *work)
|
||||
{
|
||||
struct sock_xprt *transport =
|
||||
container_of(work, struct sock_xprt, recv_worker);
|
||||
xs_udp_data_receive(transport);
|
||||
}
|
||||
|
||||
/**
|
||||
* xs_data_ready - "data ready" callback for UDP sockets
|
||||
* @sk: socket with data to read
|
||||
*
|
||||
*/
|
||||
static void xs_data_ready(struct sock *sk)
|
||||
{
|
||||
struct rpc_xprt *xprt;
|
||||
|
||||
read_lock_bh(&sk->sk_callback_lock);
|
||||
dprintk("RPC: xs_data_ready...\n");
|
||||
xprt = xprt_from_sock(sk);
|
||||
if (xprt != NULL) {
|
||||
struct sock_xprt *transport = container_of(xprt,
|
||||
struct sock_xprt, xprt);
|
||||
queue_work(rpciod_workqueue, &transport->recv_worker);
|
||||
}
|
||||
read_unlock_bh(&sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
|
||||
dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
|
||||
|
||||
/* Find and lock the request corresponding to this xid */
|
||||
spin_lock(&xprt->transport_lock);
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
req = xprt_lookup_rqst(xprt, transport->tcp_xid);
|
||||
if (!req) {
|
||||
dprintk("RPC: XID %08x request not found!\n",
|
||||
ntohl(transport->tcp_xid));
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
|
||||
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
|
||||
xprt_complete_rqst(req->rq_task, transport->tcp_copied);
|
||||
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
|
||||
struct rpc_rqst *req;
|
||||
|
||||
/* Look up and lock the request corresponding to the given XID */
|
||||
spin_lock(&xprt->transport_lock);
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
|
||||
if (req == NULL) {
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
printk(KERN_WARNING "Callback slot table overflowed\n");
|
||||
xprt_force_disconnect(xprt);
|
||||
return -1;
|
||||
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
|
||||
|
||||
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
|
||||
xprt_complete_bc_request(req, transport->tcp_copied);
|
||||
spin_unlock(&xprt->transport_lock);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
|
||||
xs_tcp_read_reply(xprt, desc) :
|
||||
xs_tcp_read_callback(xprt, desc);
|
||||
}
|
||||
|
||||
static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
|
||||
SVC_SOCK_ANONYMOUS);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
return 0;
|
||||
}
|
||||
#else
|
||||
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
|
||||
struct xdr_skb_reader *desc)
|
||||
@@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
|
||||
return len - desc.count;
|
||||
}
|
||||
|
||||
static void xs_tcp_data_receive(struct sock_xprt *transport)
|
||||
{
|
||||
struct rpc_xprt *xprt = &transport->xprt;
|
||||
struct sock *sk;
|
||||
read_descriptor_t rd_desc = {
|
||||
.count = 2*1024*1024,
|
||||
.arg.data = xprt,
|
||||
};
|
||||
unsigned long total = 0;
|
||||
int read = 0;
|
||||
|
||||
mutex_lock(&transport->recv_mutex);
|
||||
sk = transport->inet;
|
||||
if (sk == NULL)
|
||||
goto out;
|
||||
|
||||
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
|
||||
for (;;) {
|
||||
lock_sock(sk);
|
||||
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
|
||||
release_sock(sk);
|
||||
if (read <= 0)
|
||||
break;
|
||||
total += read;
|
||||
rd_desc.count = 65536;
|
||||
}
|
||||
out:
|
||||
mutex_unlock(&transport->recv_mutex);
|
||||
trace_xs_tcp_data_ready(xprt, read, total);
|
||||
}
|
||||
|
||||
static void xs_tcp_data_receive_workfn(struct work_struct *work)
|
||||
{
|
||||
struct sock_xprt *transport =
|
||||
container_of(work, struct sock_xprt, recv_worker);
|
||||
xs_tcp_data_receive(transport);
|
||||
}
|
||||
|
||||
/**
|
||||
* xs_tcp_data_ready - "data ready" callback for TCP sockets
|
||||
* @sk: socket with data to read
|
||||
@@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
|
||||
*/
|
||||
static void xs_tcp_data_ready(struct sock *sk)
|
||||
{
|
||||
struct sock_xprt *transport;
|
||||
struct rpc_xprt *xprt;
|
||||
read_descriptor_t rd_desc;
|
||||
int read;
|
||||
unsigned long total = 0;
|
||||
|
||||
dprintk("RPC: xs_tcp_data_ready...\n");
|
||||
|
||||
read_lock_bh(&sk->sk_callback_lock);
|
||||
if (!(xprt = xprt_from_sock(sk))) {
|
||||
read = 0;
|
||||
if (!(xprt = xprt_from_sock(sk)))
|
||||
goto out;
|
||||
}
|
||||
transport = container_of(xprt, struct sock_xprt, xprt);
|
||||
|
||||
/* Any data means we had a useful conversation, so
|
||||
* the we don't need to delay the next reconnect
|
||||
*/
|
||||
if (xprt->reestablish_timeout)
|
||||
xprt->reestablish_timeout = 0;
|
||||
queue_work(rpciod_workqueue, &transport->recv_worker);
|
||||
|
||||
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
|
||||
rd_desc.arg.data = xprt;
|
||||
do {
|
||||
rd_desc.count = 65536;
|
||||
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
|
||||
if (read > 0)
|
||||
total += read;
|
||||
} while (read > 0);
|
||||
out:
|
||||
trace_xs_tcp_data_ready(xprt, read, total);
|
||||
read_unlock_bh(&sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
@@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
|
||||
xs_save_old_callbacks(transport, sk);
|
||||
|
||||
sk->sk_user_data = xprt;
|
||||
sk->sk_data_ready = xs_local_data_ready;
|
||||
sk->sk_data_ready = xs_data_ready;
|
||||
sk->sk_write_space = xs_udp_write_space;
|
||||
sk->sk_error_report = xs_error_report;
|
||||
sk->sk_allocation = GFP_NOIO;
|
||||
@@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
|
||||
xs_save_old_callbacks(transport, sk);
|
||||
|
||||
sk->sk_user_data = xprt;
|
||||
sk->sk_data_ready = xs_udp_data_ready;
|
||||
sk->sk_data_ready = xs_data_ready;
|
||||
sk->sk_write_space = xs_udp_write_space;
|
||||
sk->sk_allocation = GFP_NOIO;
|
||||
|
||||
@@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task)
|
||||
{
|
||||
struct rpc_rqst *req = task->tk_rqstp;
|
||||
struct svc_xprt *xprt;
|
||||
u32 len;
|
||||
int len;
|
||||
|
||||
dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
|
||||
/*
|
||||
@@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
|
||||
.enable_swap = xs_enable_swap,
|
||||
.disable_swap = xs_disable_swap,
|
||||
.inject_disconnect = xs_inject_disconnect,
|
||||
#ifdef CONFIG_SUNRPC_BACKCHANNEL
|
||||
.bc_setup = xprt_setup_bc,
|
||||
.bc_up = xs_tcp_bc_up,
|
||||
.bc_free_rqst = xprt_free_bc_rqst,
|
||||
.bc_destroy = xprt_destroy_bc,
|
||||
#endif
|
||||
};
|
||||
|
||||
/*
|
||||
@@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
|
||||
}
|
||||
|
||||
new = container_of(xprt, struct sock_xprt, xprt);
|
||||
mutex_init(&new->recv_mutex);
|
||||
memcpy(&xprt->addr, args->dstaddr, args->addrlen);
|
||||
xprt->addrlen = args->addrlen;
|
||||
if (args->srcaddr)
|
||||
@@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
|
||||
xprt->ops = &xs_local_ops;
|
||||
xprt->timeout = &xs_local_default_timeout;
|
||||
|
||||
INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
|
||||
INIT_DELAYED_WORK(&transport->connect_worker,
|
||||
xs_dummy_setup_socket);
|
||||
|
||||
@@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
|
||||
|
||||
xprt->timeout = &xs_udp_default_timeout;
|
||||
|
||||
INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
|
||||
INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
|
||||
|
||||
switch (addr->sa_family) {
|
||||
case AF_INET:
|
||||
if (((struct sockaddr_in *)addr)->sin_port != htons(0))
|
||||
xprt_set_bound(xprt);
|
||||
|
||||
INIT_DELAYED_WORK(&transport->connect_worker,
|
||||
xs_udp_setup_socket);
|
||||
xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
|
||||
break;
|
||||
case AF_INET6:
|
||||
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
|
||||
xprt_set_bound(xprt);
|
||||
|
||||
INIT_DELAYED_WORK(&transport->connect_worker,
|
||||
xs_udp_setup_socket);
|
||||
xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
|
||||
break;
|
||||
default:
|
||||
@@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
|
||||
xprt->ops = &xs_tcp_ops;
|
||||
xprt->timeout = &xs_tcp_default_timeout;
|
||||
|
||||
INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
|
||||
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
|
||||
|
||||
switch (addr->sa_family) {
|
||||
case AF_INET:
|
||||
if (((struct sockaddr_in *)addr)->sin_port != htons(0))
|
||||
xprt_set_bound(xprt);
|
||||
|
||||
INIT_DELAYED_WORK(&transport->connect_worker,
|
||||
xs_tcp_setup_socket);
|
||||
xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
|
||||
break;
|
||||
case AF_INET6:
|
||||
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
|
||||
xprt_set_bound(xprt);
|
||||
|
||||
INIT_DELAYED_WORK(&transport->connect_worker,
|
||||
xs_tcp_setup_socket);
|
||||
xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
|
||||
break;
|
||||
default:
|
||||
|
Reference in New Issue
Block a user