rxrpc: Fix call timeouts

Fix the rxrpc call expiration timeouts and make them settable from
userspace.  By analogy with other rx implementations, there should be three
timeouts:

 (1) "Normal timeout"

     This is set for all calls and is triggered if we haven't received any
     packets from the peer in a while.  It is measured from the last time
     we received any packet on that call.  This is not reset by any
     connection packets (such as CHALLENGE/RESPONSE packets).

     If a service operation takes a long time, the server should generate
     PING ACKs at a duration that's substantially less than the normal
     timeout so is to keep both sides alive.  This is set at 1/6 of normal
     timeout.

 (2) "Idle timeout"

     This is set only for a service call and is triggered if we stop
     receiving the DATA packets that comprise the request data.  It is
     measured from the last time we received a DATA packet.

 (3) "Hard timeout"

     This can be set for a call and specified the maximum lifetime of that
     call.  It should not be specified by default.  Some operations (such
     as volume transfer) take a long time.

Allow userspace to set/change the timeouts on a call with sendmsg, using a
control message:

	RXRPC_SET_CALL_TIMEOUTS

The data to the message is a number of 32-bit words, not all of which need
be given:

	u32 hard_timeout;	/* sec from first packet */
	u32 idle_timeout;	/* msec from packet Rx */
	u32 normal_timeout;	/* msec from data Rx */

This can be set in combination with any other sendmsg() that affects a
call.

Signed-off-by: David Howells <dhowells@redhat.com>
This commit is contained in:
David Howells
2017-11-24 10:18:41 +00:00
parent 4812417894
commit a158bdd324
11 changed files with 289 additions and 200 deletions

View File

@@ -21,80 +21,6 @@
#include <net/af_rxrpc.h>
#include "ar-internal.h"
/*
* Set the timer
*/
void __rxrpc_set_timer(struct rxrpc_call *call, enum rxrpc_timer_trace why,
ktime_t now)
{
unsigned long t_j, now_j = jiffies;
ktime_t t;
bool queue = false;
if (call->state < RXRPC_CALL_COMPLETE) {
t = call->expire_at;
if (!ktime_after(t, now)) {
trace_rxrpc_timer(call, why, now, now_j);
queue = true;
goto out;
}
if (!ktime_after(call->resend_at, now)) {
call->resend_at = call->expire_at;
if (!test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
queue = true;
} else if (ktime_before(call->resend_at, t)) {
t = call->resend_at;
}
if (!ktime_after(call->ack_at, now)) {
call->ack_at = call->expire_at;
if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
queue = true;
} else if (ktime_before(call->ack_at, t)) {
t = call->ack_at;
}
if (!ktime_after(call->ping_at, now)) {
call->ping_at = call->expire_at;
if (!test_and_set_bit(RXRPC_CALL_EV_PING, &call->events))
queue = true;
} else if (ktime_before(call->ping_at, t)) {
t = call->ping_at;
}
t_j = nsecs_to_jiffies(ktime_to_ns(ktime_sub(t, now)));
t_j += jiffies;
/* We have to make sure that the calculated jiffies value falls
* at or after the nsec value, or we may loop ceaselessly
* because the timer times out, but we haven't reached the nsec
* timeout yet.
*/
t_j++;
if (call->timer.expires != t_j || !timer_pending(&call->timer)) {
mod_timer(&call->timer, t_j);
trace_rxrpc_timer(call, why, now, now_j);
}
}
out:
if (queue)
rxrpc_queue_call(call);
}
/*
* Set the timer
*/
void rxrpc_set_timer(struct rxrpc_call *call, enum rxrpc_timer_trace why,
ktime_t now)
{
read_lock_bh(&call->state_lock);
__rxrpc_set_timer(call, why, now);
read_unlock_bh(&call->state_lock);
}
/*
* Propose a PING ACK be sent.
*/
@@ -106,12 +32,13 @@ static void rxrpc_propose_ping(struct rxrpc_call *call,
!test_and_set_bit(RXRPC_CALL_EV_PING, &call->events))
rxrpc_queue_call(call);
} else {
ktime_t now = ktime_get_real();
ktime_t ping_at = ktime_add_ms(now, rxrpc_idle_ack_delay);
unsigned long now = jiffies;
unsigned long ping_at = now + rxrpc_idle_ack_delay;
if (ktime_before(ping_at, call->ping_at)) {
call->ping_at = ping_at;
rxrpc_set_timer(call, rxrpc_timer_set_for_ping, now);
if (time_before(ping_at, call->ping_at)) {
WRITE_ONCE(call->ping_at, ping_at);
rxrpc_reduce_call_timer(call, ping_at, now,
rxrpc_timer_set_for_ping);
}
}
}
@@ -125,8 +52,7 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
enum rxrpc_propose_ack_trace why)
{
enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use;
unsigned int expiry = rxrpc_soft_ack_delay;
ktime_t now, ack_at;
unsigned long now, ack_at, expiry = rxrpc_soft_ack_delay;
s8 prior = rxrpc_ack_priority[ack_reason];
/* Pings are handled specially because we don't want to accidentally
@@ -190,11 +116,12 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
background)
rxrpc_queue_call(call);
} else {
now = ktime_get_real();
ack_at = ktime_add_ms(now, expiry);
if (ktime_before(ack_at, call->ack_at)) {
call->ack_at = ack_at;
rxrpc_set_timer(call, rxrpc_timer_set_for_ack, now);
now = jiffies;
ack_at = jiffies + expiry;
if (time_before(ack_at, call->ack_at)) {
WRITE_ONCE(call->ack_at, ack_at);
rxrpc_reduce_call_timer(call, ack_at, now,
rxrpc_timer_set_for_ack);
}
}
@@ -227,18 +154,20 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
/*
* Perform retransmission of NAK'd and unack'd packets.
*/
static void rxrpc_resend(struct rxrpc_call *call, ktime_t now)
static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
unsigned long resend_at;
rxrpc_seq_t cursor, seq, top;
ktime_t max_age, oldest, ack_ts;
ktime_t now, max_age, oldest, ack_ts;
int ix;
u8 annotation, anno_type, retrans = 0, unacked = 0;
_enter("{%d,%d}", call->tx_hard_ack, call->tx_top);
max_age = ktime_sub_ms(now, rxrpc_resend_timeout);
now = ktime_get_real();
max_age = ktime_sub_ms(now, rxrpc_resend_timeout * 1000 / HZ);
spin_lock_bh(&call->lock);
@@ -282,7 +211,9 @@ static void rxrpc_resend(struct rxrpc_call *call, ktime_t now)
ktime_to_ns(ktime_sub(skb->tstamp, max_age)));
}
call->resend_at = ktime_add_ms(oldest, rxrpc_resend_timeout);
resend_at = nsecs_to_jiffies(ktime_to_ns(ktime_sub(oldest, now)));
resend_at += jiffies + rxrpc_resend_timeout;
WRITE_ONCE(call->resend_at, resend_at);
if (unacked)
rxrpc_congestion_timeout(call);
@@ -292,7 +223,8 @@ static void rxrpc_resend(struct rxrpc_call *call, ktime_t now)
* retransmitting data.
*/
if (!retrans) {
rxrpc_set_timer(call, rxrpc_timer_set_for_resend, now);
rxrpc_reduce_call_timer(call, resend_at, now,
rxrpc_timer_set_for_resend);
spin_unlock_bh(&call->lock);
ack_ts = ktime_sub(now, call->acks_latest_ts);
if (ktime_to_ns(ack_ts) < call->peer->rtt)
@@ -364,7 +296,7 @@ void rxrpc_process_call(struct work_struct *work)
{
struct rxrpc_call *call =
container_of(work, struct rxrpc_call, processor);
ktime_t now;
unsigned long now, next, t;
rxrpc_see_call(call);
@@ -384,8 +316,50 @@ recheck_state:
goto out_put;
}
now = ktime_get_real();
if (ktime_before(call->expire_at, now)) {
/* Work out if any timeouts tripped */
now = jiffies;
t = READ_ONCE(call->expect_rx_by);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now);
set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
}
t = READ_ONCE(call->expect_req_by);
if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
}
t = READ_ONCE(call->expect_term_by);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now);
set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
}
t = READ_ONCE(call->ack_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
cmpxchg(&call->ack_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_ACK, &call->events);
}
t = READ_ONCE(call->ping_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_PING, &call->events);
}
t = READ_ONCE(call->resend_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now);
cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_RESEND, &call->events);
}
/* Process events */
if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
rxrpc_abort_call("EXP", call, 0, RX_USER_ABORT, -ETIME);
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
goto recheck_state;
@@ -408,7 +382,22 @@ recheck_state:
goto recheck_state;
}
rxrpc_set_timer(call, rxrpc_timer_set_for_resend, now);
/* Make sure the timer is restarted */
next = call->expect_rx_by;
#define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; }
set(call->expect_req_by);
set(call->expect_term_by);
set(call->ack_at);
set(call->resend_at);
set(call->ping_at);
now = jiffies;
if (time_after_eq(now, next))
goto recheck_state;
rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
/* other events may have been raised since we started checking */
if (call->events && call->state < RXRPC_CALL_COMPLETE) {