frwr_ops.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Copyright (c) 2015, 2017 Oracle. All rights reserved.
  4. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  5. */
  6. /* Lightweight memory registration using Fast Registration Work
  7. * Requests (FRWR).
  8. *
  9. * FRWR features ordered asynchronous registration and invalidation
  10. * of arbitrarily-sized memory regions. This is the fastest and safest
  11. * but most complex memory registration mode.
  12. */
  13. /* Normal operation
  14. *
  15. * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
  16. * Work Request (frwr_map). When the RDMA operation is finished, this
  17. * Memory Region is invalidated using a LOCAL_INV Work Request
  18. * (frwr_unmap_async and frwr_unmap_sync).
  19. *
  20. * Typically FAST_REG Work Requests are not signaled, and neither are
  21. * RDMA Send Work Requests (with the exception of signaling occasionally
  22. * to prevent provider work queue overflows). This greatly reduces HCA
  23. * interrupt workload.
  24. */
  25. /* Transport recovery
  26. *
  27. * frwr_map and frwr_unmap_* cannot run at the same time the transport
  28. * connect worker is running. The connect worker holds the transport
  29. * send lock, just as ->send_request does. This prevents frwr_map and
  30. * the connect worker from running concurrently. When a connection is
  31. * closed, the Receive completion queue is drained before the allowing
  32. * the connect worker to get control. This prevents frwr_unmap and the
  33. * connect worker from running concurrently.
  34. *
  35. * When the underlying transport disconnects, MRs that are in flight
  36. * are flushed and are likely unusable. Thus all MRs are destroyed.
  37. * New MRs are created on demand.
  38. */
  39. #include <linux/sunrpc/svc_rdma.h>
  40. #include "xprt_rdma.h"
  41. #include <trace/events/rpcrdma.h>
  42. static void frwr_cid_init(struct rpcrdma_ep *ep,
  43. struct rpcrdma_mr *mr)
  44. {
  45. struct rpc_rdma_cid *cid = &mr->mr_cid;
  46. cid->ci_queue_id = ep->re_attr.send_cq->res.id;
  47. cid->ci_completion_id = mr->mr_ibmr->res.id;
  48. }
  49. static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
  50. {
  51. if (mr->mr_device) {
  52. trace_xprtrdma_mr_unmap(mr);
  53. ib_dma_unmap_sg(mr->mr_device, mr->mr_sg, mr->mr_nents,
  54. mr->mr_dir);
  55. mr->mr_device = NULL;
  56. }
  57. }
  58. /**
  59. * frwr_mr_release - Destroy one MR
  60. * @mr: MR allocated by frwr_mr_init
  61. *
  62. */
  63. void frwr_mr_release(struct rpcrdma_mr *mr)
  64. {
  65. int rc;
  66. frwr_mr_unmap(mr->mr_xprt, mr);
  67. rc = ib_dereg_mr(mr->mr_ibmr);
  68. if (rc)
  69. trace_xprtrdma_frwr_dereg(mr, rc);
  70. kfree(mr->mr_sg);
  71. kfree(mr);
  72. }
  73. static void frwr_mr_put(struct rpcrdma_mr *mr)
  74. {
  75. frwr_mr_unmap(mr->mr_xprt, mr);
  76. /* The MR is returned to the req's MR free list instead
  77. * of to the xprt's MR free list. No spinlock is needed.
  78. */
  79. rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
  80. }
  81. /* frwr_reset - Place MRs back on the free list
  82. * @req: request to reset
  83. *
  84. * Used after a failed marshal. For FRWR, this means the MRs
  85. * don't have to be fully released and recreated.
  86. *
  87. * NB: This is safe only as long as none of @req's MRs are
  88. * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
  89. * Work Request.
  90. */
  91. void frwr_reset(struct rpcrdma_req *req)
  92. {
  93. struct rpcrdma_mr *mr;
  94. while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
  95. frwr_mr_put(mr);
  96. }
  97. /**
  98. * frwr_mr_init - Initialize one MR
  99. * @r_xprt: controlling transport instance
  100. * @mr: generic MR to prepare for FRWR
  101. *
  102. * Returns zero if successful. Otherwise a negative errno
  103. * is returned.
  104. */
  105. int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
  106. {
  107. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  108. unsigned int depth = ep->re_max_fr_depth;
  109. struct scatterlist *sg;
  110. struct ib_mr *frmr;
  111. sg = kcalloc_node(depth, sizeof(*sg), XPRTRDMA_GFP_FLAGS,
  112. ibdev_to_node(ep->re_id->device));
  113. if (!sg)
  114. return -ENOMEM;
  115. frmr = ib_alloc_mr(ep->re_pd, ep->re_mrtype, depth);
  116. if (IS_ERR(frmr))
  117. goto out_mr_err;
  118. mr->mr_xprt = r_xprt;
  119. mr->mr_ibmr = frmr;
  120. mr->mr_device = NULL;
  121. INIT_LIST_HEAD(&mr->mr_list);
  122. init_completion(&mr->mr_linv_done);
  123. frwr_cid_init(ep, mr);
  124. sg_init_table(sg, depth);
  125. mr->mr_sg = sg;
  126. return 0;
  127. out_mr_err:
  128. kfree(sg);
  129. trace_xprtrdma_frwr_alloc(mr, PTR_ERR(frmr));
  130. return PTR_ERR(frmr);
  131. }
  132. /**
  133. * frwr_query_device - Prepare a transport for use with FRWR
  134. * @ep: endpoint to fill in
  135. * @device: RDMA device to query
  136. *
  137. * On success, sets:
  138. * ep->re_attr
  139. * ep->re_max_requests
  140. * ep->re_max_rdma_segs
  141. * ep->re_max_fr_depth
  142. * ep->re_mrtype
  143. *
  144. * Return values:
  145. * On success, returns zero.
  146. * %-EINVAL - the device does not support FRWR memory registration
  147. * %-ENOMEM - the device is not sufficiently capable for NFS/RDMA
  148. */
  149. int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
  150. {
  151. const struct ib_device_attr *attrs = &device->attrs;
  152. int max_qp_wr, depth, delta;
  153. unsigned int max_sge;
  154. if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
  155. attrs->max_fast_reg_page_list_len == 0) {
  156. pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n",
  157. device->name);
  158. return -EINVAL;
  159. }
  160. max_sge = min_t(unsigned int, attrs->max_send_sge,
  161. RPCRDMA_MAX_SEND_SGES);
  162. if (max_sge < RPCRDMA_MIN_SEND_SGES) {
  163. pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge);
  164. return -ENOMEM;
  165. }
  166. ep->re_attr.cap.max_send_sge = max_sge;
  167. ep->re_attr.cap.max_recv_sge = 1;
  168. ep->re_mrtype = IB_MR_TYPE_MEM_REG;
  169. if (attrs->kernel_cap_flags & IBK_SG_GAPS_REG)
  170. ep->re_mrtype = IB_MR_TYPE_SG_GAPS;
  171. /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
  172. * capability, but perform optimally when the MRs are not larger
  173. * than a page.
  174. */
  175. if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS)
  176. ep->re_max_fr_depth = attrs->max_sge_rd;
  177. else
  178. ep->re_max_fr_depth = attrs->max_fast_reg_page_list_len;
  179. if (ep->re_max_fr_depth > RPCRDMA_MAX_DATA_SEGS)
  180. ep->re_max_fr_depth = RPCRDMA_MAX_DATA_SEGS;
  181. /* Add room for frwr register and invalidate WRs.
  182. * 1. FRWR reg WR for head
  183. * 2. FRWR invalidate WR for head
  184. * 3. N FRWR reg WRs for pagelist
  185. * 4. N FRWR invalidate WRs for pagelist
  186. * 5. FRWR reg WR for tail
  187. * 6. FRWR invalidate WR for tail
  188. * 7. The RDMA_SEND WR
  189. */
  190. depth = 7;
  191. /* Calculate N if the device max FRWR depth is smaller than
  192. * RPCRDMA_MAX_DATA_SEGS.
  193. */
  194. if (ep->re_max_fr_depth < RPCRDMA_MAX_DATA_SEGS) {
  195. delta = RPCRDMA_MAX_DATA_SEGS - ep->re_max_fr_depth;
  196. do {
  197. depth += 2; /* FRWR reg + invalidate */
  198. delta -= ep->re_max_fr_depth;
  199. } while (delta > 0);
  200. }
  201. max_qp_wr = attrs->max_qp_wr;
  202. max_qp_wr -= RPCRDMA_BACKWARD_WRS;
  203. max_qp_wr -= 1;
  204. if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
  205. return -ENOMEM;
  206. if (ep->re_max_requests > max_qp_wr)
  207. ep->re_max_requests = max_qp_wr;
  208. ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
  209. if (ep->re_attr.cap.max_send_wr > max_qp_wr) {
  210. ep->re_max_requests = max_qp_wr / depth;
  211. if (!ep->re_max_requests)
  212. return -ENOMEM;
  213. ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
  214. }
  215. ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
  216. ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
  217. ep->re_attr.cap.max_recv_wr = ep->re_max_requests;
  218. ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
  219. ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH;
  220. ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
  221. ep->re_max_rdma_segs =
  222. DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ep->re_max_fr_depth);
  223. /* Reply chunks require segments for head and tail buffers */
  224. ep->re_max_rdma_segs += 2;
  225. if (ep->re_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
  226. ep->re_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;
  227. /* Ensure the underlying device is capable of conveying the
  228. * largest r/wsize NFS will ask for. This guarantees that
  229. * failing over from one RDMA device to another will not
  230. * break NFS I/O.
  231. */
  232. if ((ep->re_max_rdma_segs * ep->re_max_fr_depth) < RPCRDMA_MAX_SEGS)
  233. return -ENOMEM;
  234. return 0;
  235. }
  236. /**
  237. * frwr_map - Register a memory region
  238. * @r_xprt: controlling transport
  239. * @seg: memory region co-ordinates
  240. * @nsegs: number of segments remaining
  241. * @writing: true when RDMA Write will be used
  242. * @xid: XID of RPC using the registered memory
  243. * @mr: MR to fill in
  244. *
  245. * Prepare a REG_MR Work Request to register a memory region
  246. * for remote access via RDMA READ or RDMA WRITE.
  247. *
  248. * Returns the next segment or a negative errno pointer.
  249. * On success, @mr is filled in.
  250. */
  251. struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
  252. struct rpcrdma_mr_seg *seg,
  253. int nsegs, bool writing, __be32 xid,
  254. struct rpcrdma_mr *mr)
  255. {
  256. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  257. struct ib_reg_wr *reg_wr;
  258. int i, n, dma_nents;
  259. struct ib_mr *ibmr;
  260. u8 key;
  261. if (nsegs > ep->re_max_fr_depth)
  262. nsegs = ep->re_max_fr_depth;
  263. for (i = 0; i < nsegs;) {
  264. sg_set_page(&mr->mr_sg[i], seg->mr_page,
  265. seg->mr_len, seg->mr_offset);
  266. ++seg;
  267. ++i;
  268. if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS)
  269. continue;
  270. if ((i < nsegs && seg->mr_offset) ||
  271. offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
  272. break;
  273. }
  274. mr->mr_dir = rpcrdma_data_dir(writing);
  275. mr->mr_nents = i;
  276. dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg, mr->mr_nents,
  277. mr->mr_dir);
  278. if (!dma_nents)
  279. goto out_dmamap_err;
  280. mr->mr_device = ep->re_id->device;
  281. ibmr = mr->mr_ibmr;
  282. n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE);
  283. if (n != dma_nents)
  284. goto out_mapmr_err;
  285. ibmr->iova &= 0x00000000ffffffff;
  286. ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
  287. key = (u8)(ibmr->rkey & 0x000000FF);
  288. ib_update_fast_reg_key(ibmr, ++key);
  289. reg_wr = &mr->mr_regwr;
  290. reg_wr->mr = ibmr;
  291. reg_wr->key = ibmr->rkey;
  292. reg_wr->access = writing ?
  293. IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
  294. IB_ACCESS_REMOTE_READ;
  295. mr->mr_handle = ibmr->rkey;
  296. mr->mr_length = ibmr->length;
  297. mr->mr_offset = ibmr->iova;
  298. trace_xprtrdma_mr_map(mr);
  299. return seg;
  300. out_dmamap_err:
  301. trace_xprtrdma_frwr_sgerr(mr, i);
  302. return ERR_PTR(-EIO);
  303. out_mapmr_err:
  304. trace_xprtrdma_frwr_maperr(mr, n);
  305. return ERR_PTR(-EIO);
  306. }
  307. /**
  308. * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
  309. * @cq: completion queue
  310. * @wc: WCE for a completed FastReg WR
  311. *
  312. * Each flushed MR gets destroyed after the QP has drained.
  313. */
  314. static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
  315. {
  316. struct ib_cqe *cqe = wc->wr_cqe;
  317. struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
  318. /* WARNING: Only wr_cqe and status are reliable at this point */
  319. trace_xprtrdma_wc_fastreg(wc, &mr->mr_cid);
  320. rpcrdma_flush_disconnect(cq->cq_context, wc);
  321. }
  322. /**
  323. * frwr_send - post Send WRs containing the RPC Call message
  324. * @r_xprt: controlling transport instance
  325. * @req: prepared RPC Call
  326. *
  327. * For FRWR, chain any FastReg WRs to the Send WR. Only a
  328. * single ib_post_send call is needed to register memory
  329. * and then post the Send WR.
  330. *
  331. * Returns the return code from ib_post_send.
  332. *
  333. * Caller must hold the transport send lock to ensure that the
  334. * pointers to the transport's rdma_cm_id and QP are stable.
  335. */
  336. int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
  337. {
  338. struct ib_send_wr *post_wr, *send_wr = &req->rl_wr;
  339. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  340. struct rpcrdma_mr *mr;
  341. unsigned int num_wrs;
  342. int ret;
  343. num_wrs = 1;
  344. post_wr = send_wr;
  345. list_for_each_entry(mr, &req->rl_registered, mr_list) {
  346. trace_xprtrdma_mr_fastreg(mr);
  347. mr->mr_cqe.done = frwr_wc_fastreg;
  348. mr->mr_regwr.wr.next = post_wr;
  349. mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe;
  350. mr->mr_regwr.wr.num_sge = 0;
  351. mr->mr_regwr.wr.opcode = IB_WR_REG_MR;
  352. mr->mr_regwr.wr.send_flags = 0;
  353. post_wr = &mr->mr_regwr.wr;
  354. ++num_wrs;
  355. }
  356. if ((kref_read(&req->rl_kref) > 1) || num_wrs > ep->re_send_count) {
  357. send_wr->send_flags |= IB_SEND_SIGNALED;
  358. ep->re_send_count = min_t(unsigned int, ep->re_send_batch,
  359. num_wrs - ep->re_send_count);
  360. } else {
  361. send_wr->send_flags &= ~IB_SEND_SIGNALED;
  362. ep->re_send_count -= num_wrs;
  363. }
  364. trace_xprtrdma_post_send(req);
  365. ret = ib_post_send(ep->re_id->qp, post_wr, NULL);
  366. if (ret)
  367. trace_xprtrdma_post_send_err(r_xprt, req, ret);
  368. return ret;
  369. }
  370. /**
  371. * frwr_reminv - handle a remotely invalidated mr on the @mrs list
  372. * @rep: Received reply
  373. * @mrs: list of MRs to check
  374. *
  375. */
  376. void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
  377. {
  378. struct rpcrdma_mr *mr;
  379. list_for_each_entry(mr, mrs, mr_list)
  380. if (mr->mr_handle == rep->rr_inv_rkey) {
  381. list_del_init(&mr->mr_list);
  382. trace_xprtrdma_mr_reminv(mr);
  383. frwr_mr_put(mr);
  384. break; /* only one invalidated MR per RPC */
  385. }
  386. }
  387. static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr)
  388. {
  389. if (likely(wc->status == IB_WC_SUCCESS))
  390. frwr_mr_put(mr);
  391. }
  392. /**
  393. * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
  394. * @cq: completion queue
  395. * @wc: WCE for a completed LocalInv WR
  396. *
  397. */
  398. static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
  399. {
  400. struct ib_cqe *cqe = wc->wr_cqe;
  401. struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
  402. /* WARNING: Only wr_cqe and status are reliable at this point */
  403. trace_xprtrdma_wc_li(wc, &mr->mr_cid);
  404. frwr_mr_done(wc, mr);
  405. rpcrdma_flush_disconnect(cq->cq_context, wc);
  406. }
  407. /**
  408. * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
  409. * @cq: completion queue
  410. * @wc: WCE for a completed LocalInv WR
  411. *
  412. * Awaken anyone waiting for an MR to finish being fenced.
  413. */
  414. static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
  415. {
  416. struct ib_cqe *cqe = wc->wr_cqe;
  417. struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
  418. /* WARNING: Only wr_cqe and status are reliable at this point */
  419. trace_xprtrdma_wc_li_wake(wc, &mr->mr_cid);
  420. frwr_mr_done(wc, mr);
  421. complete(&mr->mr_linv_done);
  422. rpcrdma_flush_disconnect(cq->cq_context, wc);
  423. }
  424. /**
  425. * frwr_unmap_sync - invalidate memory regions that were registered for @req
  426. * @r_xprt: controlling transport instance
  427. * @req: rpcrdma_req with a non-empty list of MRs to process
  428. *
  429. * Sleeps until it is safe for the host CPU to access the previously mapped
  430. * memory regions. This guarantees that registered MRs are properly fenced
  431. * from the server before the RPC consumer accesses the data in them. It
  432. * also ensures proper Send flow control: waking the next RPC waits until
  433. * this RPC has relinquished all its Send Queue entries.
  434. */
  435. void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
  436. {
  437. struct ib_send_wr *first, **prev, *last;
  438. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  439. const struct ib_send_wr *bad_wr;
  440. struct rpcrdma_mr *mr;
  441. int rc;
  442. /* ORDER: Invalidate all of the MRs first
  443. *
  444. * Chain the LOCAL_INV Work Requests and post them with
  445. * a single ib_post_send() call.
  446. */
  447. prev = &first;
  448. mr = rpcrdma_mr_pop(&req->rl_registered);
  449. do {
  450. trace_xprtrdma_mr_localinv(mr);
  451. r_xprt->rx_stats.local_inv_needed++;
  452. last = &mr->mr_invwr;
  453. last->next = NULL;
  454. last->wr_cqe = &mr->mr_cqe;
  455. last->sg_list = NULL;
  456. last->num_sge = 0;
  457. last->opcode = IB_WR_LOCAL_INV;
  458. last->send_flags = IB_SEND_SIGNALED;
  459. last->ex.invalidate_rkey = mr->mr_handle;
  460. last->wr_cqe->done = frwr_wc_localinv;
  461. *prev = last;
  462. prev = &last->next;
  463. } while ((mr = rpcrdma_mr_pop(&req->rl_registered)));
  464. mr = container_of(last, struct rpcrdma_mr, mr_invwr);
  465. /* Strong send queue ordering guarantees that when the
  466. * last WR in the chain completes, all WRs in the chain
  467. * are complete.
  468. */
  469. last->wr_cqe->done = frwr_wc_localinv_wake;
  470. reinit_completion(&mr->mr_linv_done);
  471. /* Transport disconnect drains the receive CQ before it
  472. * replaces the QP. The RPC reply handler won't call us
  473. * unless re_id->qp is a valid pointer.
  474. */
  475. bad_wr = NULL;
  476. rc = ib_post_send(ep->re_id->qp, first, &bad_wr);
  477. /* The final LOCAL_INV WR in the chain is supposed to
  478. * do the wake. If it was never posted, the wake will
  479. * not happen, so don't wait in that case.
  480. */
  481. if (bad_wr != first)
  482. wait_for_completion(&mr->mr_linv_done);
  483. if (!rc)
  484. return;
  485. /* On error, the MRs get destroyed once the QP has drained. */
  486. trace_xprtrdma_post_linv_err(req, rc);
  487. /* Force a connection loss to ensure complete recovery.
  488. */
  489. rpcrdma_force_disconnect(ep);
  490. }
  491. /**
  492. * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
  493. * @cq: completion queue
  494. * @wc: WCE for a completed LocalInv WR
  495. *
  496. */
  497. static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
  498. {
  499. struct ib_cqe *cqe = wc->wr_cqe;
  500. struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
  501. struct rpcrdma_rep *rep;
  502. /* WARNING: Only wr_cqe and status are reliable at this point */
  503. trace_xprtrdma_wc_li_done(wc, &mr->mr_cid);
  504. /* Ensure that @rep is generated before the MR is released */
  505. rep = mr->mr_req->rl_reply;
  506. smp_rmb();
  507. if (wc->status != IB_WC_SUCCESS) {
  508. if (rep)
  509. rpcrdma_unpin_rqst(rep);
  510. rpcrdma_flush_disconnect(cq->cq_context, wc);
  511. return;
  512. }
  513. frwr_mr_put(mr);
  514. rpcrdma_complete_rqst(rep);
  515. }
  516. /**
  517. * frwr_unmap_async - invalidate memory regions that were registered for @req
  518. * @r_xprt: controlling transport instance
  519. * @req: rpcrdma_req with a non-empty list of MRs to process
  520. *
  521. * This guarantees that registered MRs are properly fenced from the
  522. * server before the RPC consumer accesses the data in them. It also
  523. * ensures proper Send flow control: waking the next RPC waits until
  524. * this RPC has relinquished all its Send Queue entries.
  525. */
  526. void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
  527. {
  528. struct ib_send_wr *first, *last, **prev;
  529. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  530. struct rpcrdma_mr *mr;
  531. int rc;
  532. /* Chain the LOCAL_INV Work Requests and post them with
  533. * a single ib_post_send() call.
  534. */
  535. prev = &first;
  536. mr = rpcrdma_mr_pop(&req->rl_registered);
  537. do {
  538. trace_xprtrdma_mr_localinv(mr);
  539. r_xprt->rx_stats.local_inv_needed++;
  540. last = &mr->mr_invwr;
  541. last->next = NULL;
  542. last->wr_cqe = &mr->mr_cqe;
  543. last->sg_list = NULL;
  544. last->num_sge = 0;
  545. last->opcode = IB_WR_LOCAL_INV;
  546. last->send_flags = IB_SEND_SIGNALED;
  547. last->ex.invalidate_rkey = mr->mr_handle;
  548. last->wr_cqe->done = frwr_wc_localinv;
  549. *prev = last;
  550. prev = &last->next;
  551. } while ((mr = rpcrdma_mr_pop(&req->rl_registered)));
  552. /* Strong send queue ordering guarantees that when the
  553. * last WR in the chain completes, all WRs in the chain
  554. * are complete. The last completion will wake up the
  555. * RPC waiter.
  556. */
  557. last->wr_cqe->done = frwr_wc_localinv_done;
  558. /* Transport disconnect drains the receive CQ before it
  559. * replaces the QP. The RPC reply handler won't call us
  560. * unless re_id->qp is a valid pointer.
  561. */
  562. rc = ib_post_send(ep->re_id->qp, first, NULL);
  563. if (!rc)
  564. return;
  565. /* On error, the MRs get destroyed once the QP has drained. */
  566. trace_xprtrdma_post_linv_err(req, rc);
  567. /* The final LOCAL_INV WR in the chain is supposed to
  568. * do the wake. If it was never posted, the wake does
  569. * not happen. Unpin the rqst in preparation for its
  570. * retransmission.
  571. */
  572. rpcrdma_unpin_rqst(req->rl_reply);
  573. /* Force a connection loss to ensure complete recovery.
  574. */
  575. rpcrdma_force_disconnect(ep);
  576. }
  577. /**
  578. * frwr_wp_create - Create an MR for padding Write chunks
  579. * @r_xprt: transport resources to use
  580. *
  581. * Return 0 on success, negative errno on failure.
  582. */
  583. int frwr_wp_create(struct rpcrdma_xprt *r_xprt)
  584. {
  585. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  586. struct rpcrdma_mr_seg seg;
  587. struct rpcrdma_mr *mr;
  588. mr = rpcrdma_mr_get(r_xprt);
  589. if (!mr)
  590. return -EAGAIN;
  591. mr->mr_req = NULL;
  592. ep->re_write_pad_mr = mr;
  593. seg.mr_len = XDR_UNIT;
  594. seg.mr_page = virt_to_page(ep->re_write_pad);
  595. seg.mr_offset = offset_in_page(ep->re_write_pad);
  596. if (IS_ERR(frwr_map(r_xprt, &seg, 1, true, xdr_zero, mr)))
  597. return -EIO;
  598. trace_xprtrdma_mr_fastreg(mr);
  599. mr->mr_cqe.done = frwr_wc_fastreg;
  600. mr->mr_regwr.wr.next = NULL;
  601. mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe;
  602. mr->mr_regwr.wr.num_sge = 0;
  603. mr->mr_regwr.wr.opcode = IB_WR_REG_MR;
  604. mr->mr_regwr.wr.send_flags = 0;
  605. return ib_post_send(ep->re_id->qp, &mr->mr_regwr.wr, NULL);
  606. }