af_rxrpc: Keep rxrpc_call pointers in a hashtable

Keep track of rxrpc_call structures in a hashtable so they can be
found directly from the network parameters which define the call.

This allows incoming packets to be routed directly to a call without walking
through hierarchy of peer -> transport -> connection -> call and all the
spinlocks that that entailed.

Signed-off-by: Tim Smith <tim@electronghost.co.uk>
Signed-off-by: David Howells <dhowells@redhat.com>
This commit is contained in:
Tim Smith
2014-03-03 23:04:45 +00:00
committed by David Howells
parent e8388eb103
commit 7727640cc3
3 changed files with 277 additions and 106 deletions

View File

@@ -523,36 +523,38 @@ protocol_error:
* post an incoming packet to the appropriate call/socket to deal with
* - must get rid of the sk_buff, either by freeing it or by queuing it
*/
static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
static void rxrpc_post_packet_to_call(struct rxrpc_call *call,
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp;
struct rxrpc_call *call;
struct rb_node *p;
__be32 call_id;
_enter("%p,%p", conn, skb);
read_lock_bh(&conn->lock);
_enter("%p,%p", call, skb);
sp = rxrpc_skb(skb);
/* look at extant calls by channel number first */
call = conn->channels[ntohl(sp->hdr.cid) & RXRPC_CHANNELMASK];
if (!call || call->call_id != sp->hdr.callNumber)
goto call_not_extant;
_debug("extant call [%d]", call->state);
ASSERTCMP(call->conn, ==, conn);
read_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_LOCALLY_ABORTED:
if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events)) {
rxrpc_queue_call(call);
goto free_unlock;
}
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_NETWORK_ERROR:
case RXRPC_CALL_DEAD:
goto dead_call;
case RXRPC_CALL_COMPLETE:
case RXRPC_CALL_CLIENT_FINAL_ACK:
/* complete server call */
if (call->conn->in_clientflag)
goto dead_call;
/* resend last packet of a completed call */
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
rxrpc_queue_call(call);
goto free_unlock;
default:
break;
@@ -560,7 +562,6 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
read_unlock(&call->state_lock);
rxrpc_get_call(call);
read_unlock_bh(&conn->lock);
if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
sp->hdr.flags & RXRPC_JUMBO_PACKET)
@@ -571,80 +572,16 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
rxrpc_put_call(call);
goto done;
call_not_extant:
/* search the completed calls in case what we're dealing with is
* there */
_debug("call not extant");
call_id = sp->hdr.callNumber;
p = conn->calls.rb_node;
while (p) {
call = rb_entry(p, struct rxrpc_call, conn_node);
if (call_id < call->call_id)
p = p->rb_left;
else if (call_id > call->call_id)
p = p->rb_right;
else
goto found_completed_call;
}
dead_call:
/* it's a either a really old call that we no longer remember or its a
* new incoming call */
read_unlock_bh(&conn->lock);
if (sp->hdr.flags & RXRPC_CLIENT_INITIATED &&
sp->hdr.seq == cpu_to_be32(1)) {
_debug("incoming call");
skb_queue_tail(&conn->trans->local->accept_queue, skb);
rxrpc_queue_work(&conn->trans->local->acceptor);
goto done;
}
_debug("dead call");
if (sp->hdr.type != RXRPC_PACKET_TYPE_ABORT) {
skb->priority = RX_CALL_DEAD;
rxrpc_reject_packet(conn->trans->local, skb);
rxrpc_reject_packet(call->conn->trans->local, skb);
goto unlock;
}
goto done;
/* resend last packet of a completed call
* - client calls may have been aborted or ACK'd
* - server calls may have been aborted
*/
found_completed_call:
_debug("completed call");
if (atomic_read(&call->usage) == 0)
goto dead_call;
/* synchronise any state changes */
read_lock(&call->state_lock);
ASSERTIFCMP(call->state != RXRPC_CALL_CLIENT_FINAL_ACK,
call->state, >=, RXRPC_CALL_COMPLETE);
if (call->state == RXRPC_CALL_LOCALLY_ABORTED ||
call->state == RXRPC_CALL_REMOTELY_ABORTED ||
call->state == RXRPC_CALL_DEAD) {
read_unlock(&call->state_lock);
goto dead_call;
}
if (call->conn->in_clientflag) {
read_unlock(&call->state_lock);
goto dead_call; /* complete server call */
}
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
rxrpc_queue_call(call);
free_unlock:
read_unlock(&call->state_lock);
read_unlock_bh(&conn->lock);
rxrpc_free_skb(skb);
unlock:
read_unlock(&call->state_lock);
done:
_leave("");
}
@@ -663,17 +600,42 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
rxrpc_queue_conn(conn);
}
static struct rxrpc_connection *rxrpc_conn_from_local(struct rxrpc_local *local,
struct sk_buff *skb,
struct rxrpc_skb_priv *sp)
{
struct rxrpc_peer *peer;
struct rxrpc_transport *trans;
struct rxrpc_connection *conn;
peer = rxrpc_find_peer(local, ip_hdr(skb)->saddr,
udp_hdr(skb)->source);
if (IS_ERR(peer))
goto cant_find_conn;
trans = rxrpc_find_transport(local, peer);
rxrpc_put_peer(peer);
if (!trans)
goto cant_find_conn;
conn = rxrpc_find_connection(trans, &sp->hdr);
rxrpc_put_transport(trans);
if (!conn)
goto cant_find_conn;
return conn;
cant_find_conn:
return NULL;
}
/*
* handle data received on the local endpoint
* - may be called in interrupt context
*/
void rxrpc_data_ready(struct sock *sk, int count)
{
struct rxrpc_connection *conn;
struct rxrpc_transport *trans;
struct rxrpc_skb_priv *sp;
struct rxrpc_local *local;
struct rxrpc_peer *peer;
struct sk_buff *skb;
int ret;
@@ -748,27 +710,34 @@ void rxrpc_data_ready(struct sock *sk, int count)
(sp->hdr.callNumber == 0 || sp->hdr.seq == 0))
goto bad_message;
peer = rxrpc_find_peer(local, ip_hdr(skb)->saddr, udp_hdr(skb)->source);
if (IS_ERR(peer))
goto cant_route_call;
if (sp->hdr.callNumber == 0) {
/* This is a connection-level packet. These should be
* fairly rare, so the extra overhead of looking them up the
* old-fashioned way doesn't really hurt */
struct rxrpc_connection *conn;
trans = rxrpc_find_transport(local, peer);
rxrpc_put_peer(peer);
if (!trans)
goto cant_route_call;
conn = rxrpc_conn_from_local(local, skb, sp);
if (!conn)
goto cant_route_call;
conn = rxrpc_find_connection(trans, &sp->hdr);
rxrpc_put_transport(trans);
if (!conn)
goto cant_route_call;
_debug("CONN %p {%d}", conn, conn->debug_id);
if (sp->hdr.callNumber == 0)
_debug("CONN %p {%d}", conn, conn->debug_id);
rxrpc_post_packet_to_conn(conn, skb);
else
rxrpc_post_packet_to_call(conn, skb);
rxrpc_put_connection(conn);
rxrpc_put_connection(conn);
} else {
struct rxrpc_call *call;
u8 in_clientflag = 0;
if (sp->hdr.flags & RXRPC_CLIENT_INITIATED)
in_clientflag = RXRPC_CLIENT_INITIATED;
call = rxrpc_find_call_hash(in_clientflag, sp->hdr.cid,
sp->hdr.callNumber, sp->hdr.epoch,
sp->hdr.serviceId, local, AF_INET,
(u8 *)&ip_hdr(skb)->saddr);
if (call)
rxrpc_post_packet_to_call(call, skb);
else
goto cant_route_call;
}
rxrpc_put_local(local);
return;