backchannel_rqst.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. (c) 2007 Network Appliance, Inc. All Rights Reserved.
  4. (c) 2009 NetApp. All Rights Reserved.
  5. ******************************************************************************/
  6. #include <linux/tcp.h>
  7. #include <linux/slab.h>
  8. #include <linux/sunrpc/xprt.h>
  9. #include <linux/export.h>
  10. #include <linux/sunrpc/bc_xprt.h>
  11. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  12. #define RPCDBG_FACILITY RPCDBG_TRANS
  13. #endif
  14. #define BC_MAX_SLOTS 64U
  15. unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
  16. {
  17. return BC_MAX_SLOTS;
  18. }
  19. /*
  20. * Helper routines that track the number of preallocation elements
  21. * on the transport.
  22. */
  23. static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
  24. {
  25. return xprt->bc_alloc_count < xprt->bc_alloc_max;
  26. }
  27. /*
  28. * Free the preallocated rpc_rqst structure and the memory
  29. * buffers hanging off of it.
  30. */
  31. static void xprt_free_allocation(struct rpc_rqst *req)
  32. {
  33. struct xdr_buf *xbufp;
  34. dprintk("RPC: free allocations for req= %p\n", req);
  35. WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
  36. xbufp = &req->rq_rcv_buf;
  37. free_page((unsigned long)xbufp->head[0].iov_base);
  38. xbufp = &req->rq_snd_buf;
  39. free_page((unsigned long)xbufp->head[0].iov_base);
  40. kfree(req);
  41. }
  42. static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
  43. {
  44. buf->head[0].iov_len = PAGE_SIZE;
  45. buf->tail[0].iov_len = 0;
  46. buf->pages = NULL;
  47. buf->page_len = 0;
  48. buf->flags = 0;
  49. buf->len = 0;
  50. buf->buflen = PAGE_SIZE;
  51. }
  52. static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
  53. {
  54. struct page *page;
  55. /* Preallocate one XDR receive buffer */
  56. page = alloc_page(gfp_flags);
  57. if (page == NULL)
  58. return -ENOMEM;
  59. xdr_buf_init(buf, page_address(page), PAGE_SIZE);
  60. return 0;
  61. }
  62. static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
  63. {
  64. gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
  65. struct rpc_rqst *req;
  66. /* Pre-allocate one backchannel rpc_rqst */
  67. req = kzalloc(sizeof(*req), gfp_flags);
  68. if (req == NULL)
  69. return NULL;
  70. req->rq_xprt = xprt;
  71. INIT_LIST_HEAD(&req->rq_bc_list);
  72. /* Preallocate one XDR receive buffer */
  73. if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
  74. printk(KERN_ERR "Failed to create bc receive xbuf\n");
  75. goto out_free;
  76. }
  77. req->rq_rcv_buf.len = PAGE_SIZE;
  78. /* Preallocate one XDR send buffer */
  79. if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
  80. printk(KERN_ERR "Failed to create bc snd xbuf\n");
  81. goto out_free;
  82. }
  83. return req;
  84. out_free:
  85. xprt_free_allocation(req);
  86. return NULL;
  87. }
  88. /*
  89. * Preallocate up to min_reqs structures and related buffers for use
  90. * by the backchannel. This function can be called multiple times
  91. * when creating new sessions that use the same rpc_xprt. The
  92. * preallocated buffers are added to the pool of resources used by
  93. * the rpc_xprt. Any one of these resources may be used by an
  94. * incoming callback request. It's up to the higher levels in the
  95. * stack to enforce that the maximum number of session slots is not
  96. * being exceeded.
  97. *
  98. * Some callback arguments can be large. For example, a pNFS server
  99. * using multiple deviceids. The list can be unbound, but the client
  100. * has the ability to tell the server the maximum size of the callback
  101. * requests. Each deviceID is 16 bytes, so allocate one page
  102. * for the arguments to have enough room to receive a number of these
  103. * deviceIDs. The NFS client indicates to the pNFS server that its
  104. * callback requests can be up to 4096 bytes in size.
  105. */
  106. int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
  107. {
  108. if (!xprt->ops->bc_setup)
  109. return 0;
  110. return xprt->ops->bc_setup(xprt, min_reqs);
  111. }
  112. EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
  113. int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
  114. {
  115. struct rpc_rqst *req;
  116. struct list_head tmp_list;
  117. int i;
  118. dprintk("RPC: setup backchannel transport\n");
  119. if (min_reqs > BC_MAX_SLOTS)
  120. min_reqs = BC_MAX_SLOTS;
  121. /*
  122. * We use a temporary list to keep track of the preallocated
  123. * buffers. Once we're done building the list we splice it
  124. * into the backchannel preallocation list off of the rpc_xprt
  125. * struct. This helps minimize the amount of time the list
  126. * lock is held on the rpc_xprt struct. It also makes cleanup
  127. * easier in case of memory allocation errors.
  128. */
  129. INIT_LIST_HEAD(&tmp_list);
  130. for (i = 0; i < min_reqs; i++) {
  131. /* Pre-allocate one backchannel rpc_rqst */
  132. req = xprt_alloc_bc_req(xprt);
  133. if (req == NULL) {
  134. printk(KERN_ERR "Failed to create bc rpc_rqst\n");
  135. goto out_free;
  136. }
  137. /* Add the allocated buffer to the tmp list */
  138. dprintk("RPC: adding req= %p\n", req);
  139. list_add(&req->rq_bc_pa_list, &tmp_list);
  140. }
  141. /*
  142. * Add the temporary list to the backchannel preallocation list
  143. */
  144. spin_lock(&xprt->bc_pa_lock);
  145. list_splice(&tmp_list, &xprt->bc_pa_list);
  146. xprt->bc_alloc_count += min_reqs;
  147. xprt->bc_alloc_max += min_reqs;
  148. atomic_add(min_reqs, &xprt->bc_slot_count);
  149. spin_unlock(&xprt->bc_pa_lock);
  150. dprintk("RPC: setup backchannel transport done\n");
  151. return 0;
  152. out_free:
  153. /*
  154. * Memory allocation failed, free the temporary list
  155. */
  156. while (!list_empty(&tmp_list)) {
  157. req = list_first_entry(&tmp_list,
  158. struct rpc_rqst,
  159. rq_bc_pa_list);
  160. list_del(&req->rq_bc_pa_list);
  161. xprt_free_allocation(req);
  162. }
  163. dprintk("RPC: setup backchannel transport failed\n");
  164. return -ENOMEM;
  165. }
  166. /**
  167. * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
  168. * @xprt: the transport holding the preallocated strucures
  169. * @max_reqs: the maximum number of preallocated structures to destroy
  170. *
  171. * Since these structures may have been allocated by multiple calls
  172. * to xprt_setup_backchannel, we only destroy up to the maximum number
  173. * of reqs specified by the caller.
  174. */
  175. void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
  176. {
  177. if (xprt->ops->bc_destroy)
  178. xprt->ops->bc_destroy(xprt, max_reqs);
  179. }
  180. EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
  181. void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
  182. {
  183. struct rpc_rqst *req = NULL, *tmp = NULL;
  184. dprintk("RPC: destroy backchannel transport\n");
  185. if (max_reqs == 0)
  186. goto out;
  187. spin_lock_bh(&xprt->bc_pa_lock);
  188. xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
  189. list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
  190. dprintk("RPC: req=%p\n", req);
  191. list_del(&req->rq_bc_pa_list);
  192. xprt_free_allocation(req);
  193. xprt->bc_alloc_count--;
  194. atomic_dec(&xprt->bc_slot_count);
  195. if (--max_reqs == 0)
  196. break;
  197. }
  198. spin_unlock_bh(&xprt->bc_pa_lock);
  199. out:
  200. dprintk("RPC: backchannel list empty= %s\n",
  201. list_empty(&xprt->bc_pa_list) ? "true" : "false");
  202. }
  203. static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
  204. struct rpc_rqst *new)
  205. {
  206. struct rpc_rqst *req = NULL;
  207. dprintk("RPC: allocate a backchannel request\n");
  208. if (list_empty(&xprt->bc_pa_list)) {
  209. if (!new)
  210. goto not_found;
  211. if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
  212. goto not_found;
  213. list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
  214. xprt->bc_alloc_count++;
  215. atomic_inc(&xprt->bc_slot_count);
  216. }
  217. req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
  218. rq_bc_pa_list);
  219. req->rq_reply_bytes_recvd = 0;
  220. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  221. sizeof(req->rq_private_buf));
  222. req->rq_xid = xid;
  223. req->rq_connect_cookie = xprt->connect_cookie;
  224. dprintk("RPC: backchannel req=%p\n", req);
  225. not_found:
  226. return req;
  227. }
  228. /*
  229. * Return the preallocated rpc_rqst structure and XDR buffers
  230. * associated with this rpc_task.
  231. */
  232. void xprt_free_bc_request(struct rpc_rqst *req)
  233. {
  234. struct rpc_xprt *xprt = req->rq_xprt;
  235. xprt->ops->bc_free_rqst(req);
  236. }
  237. void xprt_free_bc_rqst(struct rpc_rqst *req)
  238. {
  239. struct rpc_xprt *xprt = req->rq_xprt;
  240. dprintk("RPC: free backchannel req=%p\n", req);
  241. req->rq_connect_cookie = xprt->connect_cookie - 1;
  242. smp_mb__before_atomic();
  243. clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
  244. smp_mb__after_atomic();
  245. /*
  246. * Return it to the list of preallocations so that it
  247. * may be reused by a new callback request.
  248. */
  249. spin_lock_bh(&xprt->bc_pa_lock);
  250. if (xprt_need_to_requeue(xprt)) {
  251. xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
  252. xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
  253. req->rq_rcv_buf.len = PAGE_SIZE;
  254. list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
  255. xprt->bc_alloc_count++;
  256. atomic_inc(&xprt->bc_slot_count);
  257. req = NULL;
  258. }
  259. spin_unlock_bh(&xprt->bc_pa_lock);
  260. if (req != NULL) {
  261. /*
  262. * The last remaining session was destroyed while this
  263. * entry was in use. Free the entry and don't attempt
  264. * to add back to the list because there is no need to
  265. * have anymore preallocated entries.
  266. */
  267. dprintk("RPC: Last session removed req=%p\n", req);
  268. xprt_free_allocation(req);
  269. }
  270. xprt_put(xprt);
  271. }
  272. /*
  273. * One or more rpc_rqst structure have been preallocated during the
  274. * backchannel setup. Buffer space for the send and private XDR buffers
  275. * has been preallocated as well. Use xprt_alloc_bc_request to allocate
  276. * to this request. Use xprt_free_bc_request to return it.
  277. *
  278. * We know that we're called in soft interrupt context, grab the spin_lock
  279. * since there is no need to grab the bottom half spin_lock.
  280. *
  281. * Return an available rpc_rqst, otherwise NULL if non are available.
  282. */
  283. struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
  284. {
  285. struct rpc_rqst *req, *new = NULL;
  286. do {
  287. spin_lock(&xprt->bc_pa_lock);
  288. list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
  289. if (req->rq_connect_cookie != xprt->connect_cookie)
  290. continue;
  291. if (req->rq_xid == xid)
  292. goto found;
  293. }
  294. req = xprt_get_bc_request(xprt, xid, new);
  295. found:
  296. spin_unlock(&xprt->bc_pa_lock);
  297. if (new) {
  298. if (req != new)
  299. xprt_free_allocation(new);
  300. break;
  301. } else if (req)
  302. break;
  303. new = xprt_alloc_bc_req(xprt);
  304. } while (new);
  305. return req;
  306. }
  307. /*
  308. * Add callback request to callback list. The callback
  309. * service sleeps on the sv_cb_waitq waiting for new
  310. * requests. Wake it up after adding enqueing the
  311. * request.
  312. */
  313. void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
  314. {
  315. struct rpc_xprt *xprt = req->rq_xprt;
  316. struct svc_serv *bc_serv = xprt->bc_serv;
  317. spin_lock(&xprt->bc_pa_lock);
  318. list_del(&req->rq_bc_pa_list);
  319. xprt->bc_alloc_count--;
  320. spin_unlock(&xprt->bc_pa_lock);
  321. req->rq_private_buf.len = copied;
  322. set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
  323. dprintk("RPC: add callback request to list\n");
  324. xprt_get(xprt);
  325. spin_lock(&bc_serv->sv_cb_lock);
  326. list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
  327. wake_up(&bc_serv->sv_cb_waitq);
  328. spin_unlock(&bc_serv->sv_cb_lock);
  329. }