svc_rdma_pcl.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Copyright (c) 2020 Oracle. All rights reserved.
  4. */
  5. #include <linux/sunrpc/svc_rdma.h>
  6. #include <linux/sunrpc/rpc_rdma.h>
  7. #include "xprt_rdma.h"
  8. #include <trace/events/rpcrdma.h>
  9. /**
  10. * pcl_free - Release all memory associated with a parsed chunk list
  11. * @pcl: parsed chunk list
  12. *
  13. */
  14. void pcl_free(struct svc_rdma_pcl *pcl)
  15. {
  16. while (!list_empty(&pcl->cl_chunks)) {
  17. struct svc_rdma_chunk *chunk;
  18. chunk = pcl_first_chunk(pcl);
  19. list_del(&chunk->ch_list);
  20. kfree(chunk);
  21. }
  22. }
  23. static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
  24. {
  25. struct svc_rdma_chunk *chunk;
  26. chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
  27. if (!chunk)
  28. return NULL;
  29. chunk->ch_position = position;
  30. chunk->ch_length = 0;
  31. chunk->ch_payload_length = 0;
  32. chunk->ch_segcount = 0;
  33. return chunk;
  34. }
  35. static struct svc_rdma_chunk *
  36. pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
  37. {
  38. struct svc_rdma_chunk *pos;
  39. pcl_for_each_chunk(pos, pcl) {
  40. if (pos->ch_position == position)
  41. return pos;
  42. }
  43. return NULL;
  44. }
  45. static void pcl_insert_position(struct svc_rdma_pcl *pcl,
  46. struct svc_rdma_chunk *chunk)
  47. {
  48. struct svc_rdma_chunk *pos;
  49. pcl_for_each_chunk(pos, pcl) {
  50. if (pos->ch_position > chunk->ch_position)
  51. break;
  52. }
  53. __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
  54. pcl->cl_count++;
  55. }
  56. static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
  57. struct svc_rdma_chunk *chunk,
  58. u32 handle, u32 length, u64 offset)
  59. {
  60. struct svc_rdma_segment *segment;
  61. segment = &chunk->ch_segments[chunk->ch_segcount];
  62. segment->rs_handle = handle;
  63. segment->rs_length = length;
  64. segment->rs_offset = offset;
  65. trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
  66. chunk->ch_length += length;
  67. chunk->ch_segcount++;
  68. }
  69. /**
  70. * pcl_alloc_call - Construct a parsed chunk list for the Call body
  71. * @rctxt: Ingress receive context
  72. * @p: Start of an un-decoded Read list
  73. *
  74. * Assumptions:
  75. * - The incoming Read list has already been sanity checked.
  76. * - cl_count is already set to the number of segments in
  77. * the un-decoded list.
  78. * - The list might not be in order by position.
  79. *
  80. * Return values:
  81. * %true: Parsed chunk list was successfully constructed, and
  82. * cl_count is updated to be the number of chunks (ie.
  83. * unique positions) in the Read list.
  84. * %false: Memory allocation failed.
  85. */
  86. bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
  87. {
  88. struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
  89. unsigned int i, segcount = pcl->cl_count;
  90. pcl->cl_count = 0;
  91. for (i = 0; i < segcount; i++) {
  92. struct svc_rdma_chunk *chunk;
  93. u32 position, handle, length;
  94. u64 offset;
  95. p++; /* skip the list discriminator */
  96. p = xdr_decode_read_segment(p, &position, &handle,
  97. &length, &offset);
  98. if (position != 0)
  99. continue;
  100. if (pcl_is_empty(pcl)) {
  101. chunk = pcl_alloc_chunk(segcount, position);
  102. if (!chunk)
  103. return false;
  104. pcl_insert_position(pcl, chunk);
  105. } else {
  106. chunk = list_first_entry(&pcl->cl_chunks,
  107. struct svc_rdma_chunk,
  108. ch_list);
  109. }
  110. pcl_set_read_segment(rctxt, chunk, handle, length, offset);
  111. }
  112. return true;
  113. }
  114. /**
  115. * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
  116. * @rctxt: Ingress receive context
  117. * @p: Start of an un-decoded Read list
  118. *
  119. * Assumptions:
  120. * - The incoming Read list has already been sanity checked.
  121. * - cl_count is already set to the number of segments in
  122. * the un-decoded list.
  123. * - The list might not be in order by position.
  124. *
  125. * Return values:
  126. * %true: Parsed chunk list was successfully constructed, and
  127. * cl_count is updated to be the number of chunks (ie.
  128. * unique position values) in the Read list.
  129. * %false: Memory allocation failed.
  130. *
  131. * TODO:
  132. * - Check for chunk range overlaps
  133. */
  134. bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
  135. {
  136. struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
  137. unsigned int i, segcount = pcl->cl_count;
  138. pcl->cl_count = 0;
  139. for (i = 0; i < segcount; i++) {
  140. struct svc_rdma_chunk *chunk;
  141. u32 position, handle, length;
  142. u64 offset;
  143. p++; /* skip the list discriminator */
  144. p = xdr_decode_read_segment(p, &position, &handle,
  145. &length, &offset);
  146. if (position == 0)
  147. continue;
  148. chunk = pcl_lookup_position(pcl, position);
  149. if (!chunk) {
  150. chunk = pcl_alloc_chunk(segcount, position);
  151. if (!chunk)
  152. return false;
  153. pcl_insert_position(pcl, chunk);
  154. }
  155. pcl_set_read_segment(rctxt, chunk, handle, length, offset);
  156. }
  157. return true;
  158. }
  159. /**
  160. * pcl_alloc_write - Construct a parsed chunk list from a Write list
  161. * @rctxt: Ingress receive context
  162. * @pcl: Parsed chunk list to populate
  163. * @p: Start of an un-decoded Write list
  164. *
  165. * Assumptions:
  166. * - The incoming Write list has already been sanity checked, and
  167. * - cl_count is set to the number of chunks in the un-decoded list.
  168. *
  169. * Return values:
  170. * %true: Parsed chunk list was successfully constructed.
  171. * %false: Memory allocation failed.
  172. */
  173. bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
  174. struct svc_rdma_pcl *pcl, __be32 *p)
  175. {
  176. struct svc_rdma_segment *segment;
  177. struct svc_rdma_chunk *chunk;
  178. unsigned int i, j;
  179. u32 segcount;
  180. for (i = 0; i < pcl->cl_count; i++) {
  181. p++; /* skip the list discriminator */
  182. segcount = be32_to_cpup(p++);
  183. chunk = pcl_alloc_chunk(segcount, 0);
  184. if (!chunk)
  185. return false;
  186. list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
  187. for (j = 0; j < segcount; j++) {
  188. segment = &chunk->ch_segments[j];
  189. p = xdr_decode_rdma_segment(p, &segment->rs_handle,
  190. &segment->rs_length,
  191. &segment->rs_offset);
  192. trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
  193. chunk->ch_length += segment->rs_length;
  194. chunk->ch_segcount++;
  195. }
  196. }
  197. return true;
  198. }
  199. static int pcl_process_region(const struct xdr_buf *xdr,
  200. unsigned int offset, unsigned int length,
  201. int (*actor)(const struct xdr_buf *, void *),
  202. void *data)
  203. {
  204. struct xdr_buf subbuf;
  205. if (!length)
  206. return 0;
  207. if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
  208. return -EMSGSIZE;
  209. return actor(&subbuf, data);
  210. }
  211. /**
  212. * pcl_process_nonpayloads - Process non-payload regions inside @xdr
  213. * @pcl: Chunk list to process
  214. * @xdr: xdr_buf to process
  215. * @actor: Function to invoke on each non-payload region
  216. * @data: Arguments for @actor
  217. *
  218. * This mechanism must ignore not only result payloads that were already
  219. * sent via RDMA Write, but also XDR padding for those payloads that
  220. * the upper layer has added.
  221. *
  222. * Assumptions:
  223. * The xdr->len and ch_position fields are aligned to 4-byte multiples.
  224. *
  225. * Returns:
  226. * On success, zero,
  227. * %-EMSGSIZE on XDR buffer overflow, or
  228. * The return value of @actor
  229. */
  230. int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
  231. const struct xdr_buf *xdr,
  232. int (*actor)(const struct xdr_buf *, void *),
  233. void *data)
  234. {
  235. struct svc_rdma_chunk *chunk, *next;
  236. unsigned int start;
  237. int ret;
  238. chunk = pcl_first_chunk(pcl);
  239. /* No result payloads were generated */
  240. if (!chunk || !chunk->ch_payload_length)
  241. return actor(xdr, data);
  242. /* Process the region before the first result payload */
  243. ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
  244. if (ret < 0)
  245. return ret;
  246. /* Process the regions between each middle result payload */
  247. while ((next = pcl_next_chunk(pcl, chunk))) {
  248. if (!next->ch_payload_length)
  249. break;
  250. start = pcl_chunk_end_offset(chunk);
  251. ret = pcl_process_region(xdr, start, next->ch_position - start,
  252. actor, data);
  253. if (ret < 0)
  254. return ret;
  255. chunk = next;
  256. }
  257. /* Process the region after the last result payload */
  258. start = pcl_chunk_end_offset(chunk);
  259. ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
  260. if (ret < 0)
  261. return ret;
  262. return 0;
  263. }