recvmsg.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC recvmsg() implementation
  3. *
  4. * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells ([email protected])
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include <linux/net.h>
  9. #include <linux/skbuff.h>
  10. #include <linux/export.h>
  11. #include <linux/sched/signal.h>
  12. #include <net/sock.h>
  13. #include <net/af_rxrpc.h>
  14. #include "ar-internal.h"
  15. /*
  16. * Post a call for attention by the socket or kernel service. Further
  17. * notifications are suppressed by putting recvmsg_link on a dummy queue.
  18. */
  19. void rxrpc_notify_socket(struct rxrpc_call *call)
  20. {
  21. struct rxrpc_sock *rx;
  22. struct sock *sk;
  23. _enter("%d", call->debug_id);
  24. if (!list_empty(&call->recvmsg_link))
  25. return;
  26. rcu_read_lock();
  27. rx = rcu_dereference(call->socket);
  28. sk = &rx->sk;
  29. if (rx && sk->sk_state < RXRPC_CLOSE) {
  30. if (call->notify_rx) {
  31. spin_lock_bh(&call->notify_lock);
  32. call->notify_rx(sk, call, call->user_call_ID);
  33. spin_unlock_bh(&call->notify_lock);
  34. } else {
  35. write_lock_bh(&rx->recvmsg_lock);
  36. if (list_empty(&call->recvmsg_link)) {
  37. rxrpc_get_call(call, rxrpc_call_got);
  38. list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  39. }
  40. write_unlock_bh(&rx->recvmsg_lock);
  41. if (!sock_flag(sk, SOCK_DEAD)) {
  42. _debug("call %ps", sk->sk_data_ready);
  43. sk->sk_data_ready(sk);
  44. }
  45. }
  46. }
  47. rcu_read_unlock();
  48. _leave("");
  49. }
  50. /*
  51. * Transition a call to the complete state.
  52. */
  53. bool __rxrpc_set_call_completion(struct rxrpc_call *call,
  54. enum rxrpc_call_completion compl,
  55. u32 abort_code,
  56. int error)
  57. {
  58. if (call->state < RXRPC_CALL_COMPLETE) {
  59. call->abort_code = abort_code;
  60. call->error = error;
  61. call->completion = compl;
  62. call->state = RXRPC_CALL_COMPLETE;
  63. trace_rxrpc_call_complete(call);
  64. wake_up(&call->waitq);
  65. rxrpc_notify_socket(call);
  66. return true;
  67. }
  68. return false;
  69. }
  70. bool rxrpc_set_call_completion(struct rxrpc_call *call,
  71. enum rxrpc_call_completion compl,
  72. u32 abort_code,
  73. int error)
  74. {
  75. bool ret = false;
  76. if (call->state < RXRPC_CALL_COMPLETE) {
  77. write_lock_bh(&call->state_lock);
  78. ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
  79. write_unlock_bh(&call->state_lock);
  80. }
  81. return ret;
  82. }
  83. /*
  84. * Record that a call successfully completed.
  85. */
  86. bool __rxrpc_call_completed(struct rxrpc_call *call)
  87. {
  88. return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
  89. }
  90. bool rxrpc_call_completed(struct rxrpc_call *call)
  91. {
  92. bool ret = false;
  93. if (call->state < RXRPC_CALL_COMPLETE) {
  94. write_lock_bh(&call->state_lock);
  95. ret = __rxrpc_call_completed(call);
  96. write_unlock_bh(&call->state_lock);
  97. }
  98. return ret;
  99. }
  100. /*
  101. * Record that a call is locally aborted.
  102. */
  103. bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
  104. rxrpc_seq_t seq, u32 abort_code, int error)
  105. {
  106. trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
  107. abort_code, error);
  108. return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
  109. abort_code, error);
  110. }
  111. bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
  112. rxrpc_seq_t seq, u32 abort_code, int error)
  113. {
  114. bool ret;
  115. write_lock_bh(&call->state_lock);
  116. ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
  117. write_unlock_bh(&call->state_lock);
  118. return ret;
  119. }
  120. /*
  121. * Pass a call terminating message to userspace.
  122. */
  123. static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  124. {
  125. u32 tmp = 0;
  126. int ret;
  127. switch (call->completion) {
  128. case RXRPC_CALL_SUCCEEDED:
  129. ret = 0;
  130. if (rxrpc_is_service_call(call))
  131. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  132. break;
  133. case RXRPC_CALL_REMOTELY_ABORTED:
  134. tmp = call->abort_code;
  135. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  136. break;
  137. case RXRPC_CALL_LOCALLY_ABORTED:
  138. tmp = call->abort_code;
  139. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  140. break;
  141. case RXRPC_CALL_NETWORK_ERROR:
  142. tmp = -call->error;
  143. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  144. break;
  145. case RXRPC_CALL_LOCAL_ERROR:
  146. tmp = -call->error;
  147. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  148. break;
  149. default:
  150. pr_err("Invalid terminal call state %u\n", call->state);
  151. BUG();
  152. break;
  153. }
  154. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
  155. call->rx_pkt_offset, call->rx_pkt_len, ret);
  156. return ret;
  157. }
  158. /*
  159. * End the packet reception phase.
  160. */
  161. static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
  162. {
  163. _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
  164. trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
  165. ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
  166. if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
  167. rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
  168. rxrpc_propose_ack_terminal_ack);
  169. //rxrpc_send_ack_packet(call, false, NULL);
  170. }
  171. write_lock_bh(&call->state_lock);
  172. switch (call->state) {
  173. case RXRPC_CALL_CLIENT_RECV_REPLY:
  174. __rxrpc_call_completed(call);
  175. write_unlock_bh(&call->state_lock);
  176. break;
  177. case RXRPC_CALL_SERVER_RECV_REQUEST:
  178. call->tx_phase = true;
  179. call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
  180. call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
  181. write_unlock_bh(&call->state_lock);
  182. rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
  183. rxrpc_propose_ack_processing_op);
  184. break;
  185. default:
  186. write_unlock_bh(&call->state_lock);
  187. break;
  188. }
  189. }
  190. /*
  191. * Discard a packet we've used up and advance the Rx window by one.
  192. */
  193. static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
  194. {
  195. struct rxrpc_skb_priv *sp;
  196. struct sk_buff *skb;
  197. rxrpc_serial_t serial;
  198. rxrpc_seq_t hard_ack, top;
  199. bool last = false;
  200. u8 subpacket;
  201. int ix;
  202. _enter("%d", call->debug_id);
  203. hard_ack = call->rx_hard_ack;
  204. top = smp_load_acquire(&call->rx_top);
  205. ASSERT(before(hard_ack, top));
  206. hard_ack++;
  207. ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
  208. skb = call->rxtx_buffer[ix];
  209. rxrpc_see_skb(skb, rxrpc_skb_rotated);
  210. sp = rxrpc_skb(skb);
  211. subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
  212. serial = sp->hdr.serial + subpacket;
  213. if (subpacket == sp->nr_subpackets - 1 &&
  214. sp->rx_flags & RXRPC_SKB_INCL_LAST)
  215. last = true;
  216. call->rxtx_buffer[ix] = NULL;
  217. call->rxtx_annotations[ix] = 0;
  218. /* Barrier against rxrpc_input_data(). */
  219. smp_store_release(&call->rx_hard_ack, hard_ack);
  220. rxrpc_free_skb(skb, rxrpc_skb_freed);
  221. trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
  222. if (last) {
  223. rxrpc_end_rx_phase(call, serial);
  224. } else {
  225. /* Check to see if there's an ACK that needs sending. */
  226. if (atomic_inc_return(&call->ackr_nr_consumed) > 2)
  227. rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
  228. true, false,
  229. rxrpc_propose_ack_rotate_rx);
  230. if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
  231. rxrpc_send_ack_packet(call, false, NULL);
  232. }
  233. }
  234. /*
  235. * Decrypt and verify a (sub)packet. The packet's length may be changed due to
  236. * padding, but if this is the case, the packet length will be resident in the
  237. * socket buffer. Note that we can't modify the master skb info as the skb may
  238. * be the home to multiple subpackets.
  239. */
  240. static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
  241. u8 annotation,
  242. unsigned int offset, unsigned int len)
  243. {
  244. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  245. rxrpc_seq_t seq = sp->hdr.seq;
  246. u16 cksum = sp->hdr.cksum;
  247. u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
  248. _enter("");
  249. /* For all but the head jumbo subpacket, the security checksum is in a
  250. * jumbo header immediately prior to the data.
  251. */
  252. if (subpacket > 0) {
  253. __be16 tmp;
  254. if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
  255. BUG();
  256. cksum = ntohs(tmp);
  257. seq += subpacket;
  258. }
  259. return call->security->verify_packet(call, skb, offset, len,
  260. seq, cksum);
  261. }
  262. /*
  263. * Locate the data within a packet. This is complicated by:
  264. *
  265. * (1) An skb may contain a jumbo packet - so we have to find the appropriate
  266. * subpacket.
  267. *
  268. * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
  269. * contains an extra header which includes the true length of the data,
  270. * excluding any encrypted padding.
  271. */
  272. static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
  273. u8 *_annotation,
  274. unsigned int *_offset, unsigned int *_len,
  275. bool *_last)
  276. {
  277. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  278. unsigned int offset = sizeof(struct rxrpc_wire_header);
  279. unsigned int len;
  280. bool last = false;
  281. int ret;
  282. u8 annotation = *_annotation;
  283. u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
  284. /* Locate the subpacket */
  285. offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
  286. len = skb->len - offset;
  287. if (subpacket < sp->nr_subpackets - 1)
  288. len = RXRPC_JUMBO_DATALEN;
  289. else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
  290. last = true;
  291. if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
  292. ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
  293. if (ret < 0)
  294. return ret;
  295. *_annotation |= RXRPC_RX_ANNO_VERIFIED;
  296. }
  297. *_offset = offset;
  298. *_len = len;
  299. *_last = last;
  300. call->security->locate_data(call, skb, _offset, _len);
  301. return 0;
  302. }
  303. /*
  304. * Deliver messages to a call. This keeps processing packets until the buffer
  305. * is filled and we find either more DATA (returns 0) or the end of the DATA
  306. * (returns 1). If more packets are required, it returns -EAGAIN.
  307. */
  308. static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
  309. struct msghdr *msg, struct iov_iter *iter,
  310. size_t len, int flags, size_t *_offset)
  311. {
  312. struct rxrpc_skb_priv *sp;
  313. struct sk_buff *skb;
  314. rxrpc_serial_t serial;
  315. rxrpc_seq_t hard_ack, top, seq;
  316. size_t remain;
  317. bool rx_pkt_last;
  318. unsigned int rx_pkt_offset, rx_pkt_len;
  319. int ix, copy, ret = -EAGAIN, ret2;
  320. if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
  321. call->ackr_reason)
  322. rxrpc_send_ack_packet(call, false, NULL);
  323. rx_pkt_offset = call->rx_pkt_offset;
  324. rx_pkt_len = call->rx_pkt_len;
  325. rx_pkt_last = call->rx_pkt_last;
  326. if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
  327. seq = call->rx_hard_ack;
  328. ret = 1;
  329. goto done;
  330. }
  331. /* Barriers against rxrpc_input_data(). */
  332. hard_ack = call->rx_hard_ack;
  333. seq = hard_ack + 1;
  334. while (top = smp_load_acquire(&call->rx_top),
  335. before_eq(seq, top)
  336. ) {
  337. ix = seq & RXRPC_RXTX_BUFF_MASK;
  338. skb = call->rxtx_buffer[ix];
  339. if (!skb) {
  340. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
  341. rx_pkt_offset, rx_pkt_len, 0);
  342. break;
  343. }
  344. smp_rmb();
  345. rxrpc_see_skb(skb, rxrpc_skb_seen);
  346. sp = rxrpc_skb(skb);
  347. if (!(flags & MSG_PEEK)) {
  348. serial = sp->hdr.serial;
  349. serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
  350. trace_rxrpc_receive(call, rxrpc_receive_front,
  351. serial, seq);
  352. }
  353. if (msg)
  354. sock_recv_timestamp(msg, sock->sk, skb);
  355. if (rx_pkt_offset == 0) {
  356. ret2 = rxrpc_locate_data(call, skb,
  357. &call->rxtx_annotations[ix],
  358. &rx_pkt_offset, &rx_pkt_len,
  359. &rx_pkt_last);
  360. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
  361. rx_pkt_offset, rx_pkt_len, ret2);
  362. if (ret2 < 0) {
  363. ret = ret2;
  364. goto out;
  365. }
  366. } else {
  367. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
  368. rx_pkt_offset, rx_pkt_len, 0);
  369. }
  370. /* We have to handle short, empty and used-up DATA packets. */
  371. remain = len - *_offset;
  372. copy = rx_pkt_len;
  373. if (copy > remain)
  374. copy = remain;
  375. if (copy > 0) {
  376. ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
  377. copy);
  378. if (ret2 < 0) {
  379. ret = ret2;
  380. goto out;
  381. }
  382. /* handle piecemeal consumption of data packets */
  383. rx_pkt_offset += copy;
  384. rx_pkt_len -= copy;
  385. *_offset += copy;
  386. }
  387. if (rx_pkt_len > 0) {
  388. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
  389. rx_pkt_offset, rx_pkt_len, 0);
  390. ASSERTCMP(*_offset, ==, len);
  391. ret = 0;
  392. break;
  393. }
  394. /* The whole packet has been transferred. */
  395. if (!(flags & MSG_PEEK))
  396. rxrpc_rotate_rx_window(call);
  397. rx_pkt_offset = 0;
  398. rx_pkt_len = 0;
  399. if (rx_pkt_last) {
  400. ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
  401. ret = 1;
  402. goto out;
  403. }
  404. seq++;
  405. }
  406. out:
  407. if (!(flags & MSG_PEEK)) {
  408. call->rx_pkt_offset = rx_pkt_offset;
  409. call->rx_pkt_len = rx_pkt_len;
  410. call->rx_pkt_last = rx_pkt_last;
  411. }
  412. done:
  413. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
  414. rx_pkt_offset, rx_pkt_len, ret);
  415. if (ret == -EAGAIN)
  416. set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
  417. return ret;
  418. }
  419. /*
  420. * Receive a message from an RxRPC socket
  421. * - we need to be careful about two or more threads calling recvmsg
  422. * simultaneously
  423. */
  424. int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
  425. int flags)
  426. {
  427. struct rxrpc_call *call;
  428. struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  429. struct list_head *l;
  430. size_t copied = 0;
  431. long timeo;
  432. int ret;
  433. DEFINE_WAIT(wait);
  434. trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
  435. if (flags & (MSG_OOB | MSG_TRUNC))
  436. return -EOPNOTSUPP;
  437. timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  438. try_again:
  439. lock_sock(&rx->sk);
  440. /* Return immediately if a client socket has no outstanding calls */
  441. if (RB_EMPTY_ROOT(&rx->calls) &&
  442. list_empty(&rx->recvmsg_q) &&
  443. rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  444. release_sock(&rx->sk);
  445. return -EAGAIN;
  446. }
  447. if (list_empty(&rx->recvmsg_q)) {
  448. ret = -EWOULDBLOCK;
  449. if (timeo == 0) {
  450. call = NULL;
  451. goto error_no_call;
  452. }
  453. release_sock(&rx->sk);
  454. /* Wait for something to happen */
  455. prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
  456. TASK_INTERRUPTIBLE);
  457. ret = sock_error(&rx->sk);
  458. if (ret)
  459. goto wait_error;
  460. if (list_empty(&rx->recvmsg_q)) {
  461. if (signal_pending(current))
  462. goto wait_interrupted;
  463. trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
  464. 0, 0, 0, 0);
  465. timeo = schedule_timeout(timeo);
  466. }
  467. finish_wait(sk_sleep(&rx->sk), &wait);
  468. goto try_again;
  469. }
  470. /* Find the next call and dequeue it if we're not just peeking. If we
  471. * do dequeue it, that comes with a ref that we will need to release.
  472. */
  473. write_lock_bh(&rx->recvmsg_lock);
  474. l = rx->recvmsg_q.next;
  475. call = list_entry(l, struct rxrpc_call, recvmsg_link);
  476. if (!(flags & MSG_PEEK))
  477. list_del_init(&call->recvmsg_link);
  478. else
  479. rxrpc_get_call(call, rxrpc_call_got);
  480. write_unlock_bh(&rx->recvmsg_lock);
  481. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
  482. /* We're going to drop the socket lock, so we need to lock the call
  483. * against interference by sendmsg.
  484. */
  485. if (!mutex_trylock(&call->user_mutex)) {
  486. ret = -EWOULDBLOCK;
  487. if (flags & MSG_DONTWAIT)
  488. goto error_requeue_call;
  489. ret = -ERESTARTSYS;
  490. if (mutex_lock_interruptible(&call->user_mutex) < 0)
  491. goto error_requeue_call;
  492. }
  493. release_sock(&rx->sk);
  494. if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
  495. BUG();
  496. if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
  497. if (flags & MSG_CMSG_COMPAT) {
  498. unsigned int id32 = call->user_call_ID;
  499. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  500. sizeof(unsigned int), &id32);
  501. } else {
  502. unsigned long idl = call->user_call_ID;
  503. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  504. sizeof(unsigned long), &idl);
  505. }
  506. if (ret < 0)
  507. goto error_unlock_call;
  508. }
  509. if (msg->msg_name && call->peer) {
  510. struct sockaddr_rxrpc *srx = msg->msg_name;
  511. size_t len = sizeof(call->peer->srx);
  512. memcpy(msg->msg_name, &call->peer->srx, len);
  513. srx->srx_service = call->service_id;
  514. msg->msg_namelen = len;
  515. }
  516. switch (READ_ONCE(call->state)) {
  517. case RXRPC_CALL_CLIENT_RECV_REPLY:
  518. case RXRPC_CALL_SERVER_RECV_REQUEST:
  519. case RXRPC_CALL_SERVER_ACK_REQUEST:
  520. ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
  521. flags, &copied);
  522. if (ret == -EAGAIN)
  523. ret = 0;
  524. if (after(call->rx_top, call->rx_hard_ack) &&
  525. call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
  526. rxrpc_notify_socket(call);
  527. break;
  528. default:
  529. ret = 0;
  530. break;
  531. }
  532. if (ret < 0)
  533. goto error_unlock_call;
  534. if (call->state == RXRPC_CALL_COMPLETE) {
  535. ret = rxrpc_recvmsg_term(call, msg);
  536. if (ret < 0)
  537. goto error_unlock_call;
  538. if (!(flags & MSG_PEEK))
  539. rxrpc_release_call(rx, call);
  540. msg->msg_flags |= MSG_EOR;
  541. ret = 1;
  542. }
  543. if (ret == 0)
  544. msg->msg_flags |= MSG_MORE;
  545. else
  546. msg->msg_flags &= ~MSG_MORE;
  547. ret = copied;
  548. error_unlock_call:
  549. mutex_unlock(&call->user_mutex);
  550. rxrpc_put_call(call, rxrpc_call_put);
  551. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
  552. return ret;
  553. error_requeue_call:
  554. if (!(flags & MSG_PEEK)) {
  555. write_lock_bh(&rx->recvmsg_lock);
  556. list_add(&call->recvmsg_link, &rx->recvmsg_q);
  557. write_unlock_bh(&rx->recvmsg_lock);
  558. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
  559. } else {
  560. rxrpc_put_call(call, rxrpc_call_put);
  561. }
  562. error_no_call:
  563. release_sock(&rx->sk);
  564. error_trace:
  565. trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
  566. return ret;
  567. wait_interrupted:
  568. ret = sock_intr_errno(timeo);
  569. wait_error:
  570. finish_wait(sk_sleep(&rx->sk), &wait);
  571. call = NULL;
  572. goto error_trace;
  573. }
  574. /**
  575. * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
  576. * @sock: The socket that the call exists on
  577. * @call: The call to send data through
  578. * @iter: The buffer to receive into
  579. * @_len: The amount of data we want to receive (decreased on return)
  580. * @want_more: True if more data is expected to be read
  581. * @_abort: Where the abort code is stored if -ECONNABORTED is returned
  582. * @_service: Where to store the actual service ID (may be upgraded)
  583. *
  584. * Allow a kernel service to receive data and pick up information about the
  585. * state of a call. Returns 0 if got what was asked for and there's more
  586. * available, 1 if we got what was asked for and we're at the end of the data
  587. * and -EAGAIN if we need more data.
  588. *
  589. * Note that we may return -EAGAIN to drain empty packets at the end of the
  590. * data, even if we've already copied over the requested data.
  591. *
  592. * *_abort should also be initialised to 0.
  593. */
  594. int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
  595. struct iov_iter *iter, size_t *_len,
  596. bool want_more, u32 *_abort, u16 *_service)
  597. {
  598. size_t offset = 0;
  599. int ret;
  600. _enter("{%d,%s},%zu,%d",
  601. call->debug_id, rxrpc_call_states[call->state],
  602. *_len, want_more);
  603. ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
  604. mutex_lock(&call->user_mutex);
  605. switch (READ_ONCE(call->state)) {
  606. case RXRPC_CALL_CLIENT_RECV_REPLY:
  607. case RXRPC_CALL_SERVER_RECV_REQUEST:
  608. case RXRPC_CALL_SERVER_ACK_REQUEST:
  609. ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
  610. *_len, 0, &offset);
  611. *_len -= offset;
  612. if (ret < 0)
  613. goto out;
  614. /* We can only reach here with a partially full buffer if we
  615. * have reached the end of the data. We must otherwise have a
  616. * full buffer or have been given -EAGAIN.
  617. */
  618. if (ret == 1) {
  619. if (iov_iter_count(iter) > 0)
  620. goto short_data;
  621. if (!want_more)
  622. goto read_phase_complete;
  623. ret = 0;
  624. goto out;
  625. }
  626. if (!want_more)
  627. goto excess_data;
  628. goto out;
  629. case RXRPC_CALL_COMPLETE:
  630. goto call_complete;
  631. default:
  632. ret = -EINPROGRESS;
  633. goto out;
  634. }
  635. read_phase_complete:
  636. ret = 1;
  637. out:
  638. switch (call->ackr_reason) {
  639. case RXRPC_ACK_IDLE:
  640. break;
  641. case RXRPC_ACK_DELAY:
  642. if (ret != -EAGAIN)
  643. break;
  644. fallthrough;
  645. default:
  646. rxrpc_send_ack_packet(call, false, NULL);
  647. }
  648. if (_service)
  649. *_service = call->service_id;
  650. mutex_unlock(&call->user_mutex);
  651. _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
  652. return ret;
  653. short_data:
  654. trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
  655. ret = -EBADMSG;
  656. goto out;
  657. excess_data:
  658. trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
  659. ret = -EMSGSIZE;
  660. goto out;
  661. call_complete:
  662. *_abort = call->abort_code;
  663. ret = call->error;
  664. if (call->completion == RXRPC_CALL_SUCCEEDED) {
  665. ret = 1;
  666. if (iov_iter_count(iter) > 0)
  667. ret = -ECONNRESET;
  668. }
  669. goto out;
  670. }
  671. EXPORT_SYMBOL(rxrpc_kernel_recv_data);