tipc: remove port_lock

In previous commits we have reduced usage of port_lock to a minimum,
and complemented it with usage of bh_lock_sock() at the remaining
locations. The purpose has been to remove this lock altogether, since
it largely duplicates the role of bh_lock_sock. We are now ready to do
this.

However, we still need to protect the BH callers from inadvertent
release of the socket while they hold a reference to it. We do this by
replacing port_lock by a combination of a rw-lock protecting the
reference table as such, and updating the socket reference counter while
the socket is referenced from BH. This technique is more standard and
comprehensible than the previous approach, and turns out to have a
positive effect on overall performance.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy
2014-08-22 18:09:16 -04:00
committed by David S. Miller
parent 9b50fd087a
commit 6c9808ce09
4 changed files with 114 additions and 155 deletions

View File

@@ -35,6 +35,7 @@
*/
#include "core.h"
#include "ref.h"
#include "port.h"
#include "name_table.h"
#include "node.h"
@@ -111,13 +112,6 @@ static struct proto tipc_proto_kern;
#include "socket.h"
/* tipc_sk_lock_next: find & lock next socket in registry from given port number
*/
static struct tipc_sock *tipc_sk_lock_next(u32 *ref)
{
return (struct tipc_sock *)tipc_ref_lock_next(ref);
}
/**
* advance_rx_queue - discard first buffer in socket receive queue
*
@@ -200,7 +194,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
port = &tsk->port;
ref = tipc_ref_acquire(tsk, &port->lock);
ref = tipc_ref_acquire(tsk);
if (!ref) {
pr_warn("Socket create failed; reference table exhausted\n");
return -ENOMEM;
@@ -226,7 +220,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port);
if (sock->state == SS_READY) {
tipc_port_set_unreturnable(port, true);
@@ -364,9 +357,7 @@ static int tipc_release(struct socket *sock)
}
tipc_withdraw(port, 0, NULL);
spin_lock_bh(port->lock);
tipc_ref_discard(port->ref);
spin_unlock_bh(port->lock);
k_cancel_timer(&port->timer);
if (port->connected) {
buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
@@ -1651,7 +1642,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
u32 dnode;
/* Validate destination and message */
tsk = tipc_port_lock(dport);
tsk = tipc_sk_get(dport);
if (unlikely(!tsk)) {
rc = tipc_msg_eval(buf, &dnode);
goto exit;
@@ -1672,8 +1663,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
rc = -TIPC_ERR_OVERLOAD;
}
bh_unlock_sock(sk);
tipc_port_unlock(port);
tipc_sk_put(tsk);
if (likely(!rc))
return 0;
exit:
@@ -1997,23 +1987,23 @@ restart:
static void tipc_sk_timeout(unsigned long ref)
{
struct tipc_sock *tsk = tipc_port_lock(ref);
struct tipc_sock *tsk;
struct tipc_port *port;
struct sock *sk;
struct sk_buff *buf = NULL;
struct tipc_msg *msg = NULL;
u32 peer_port, peer_node;
tsk = tipc_sk_get(ref);
if (!tsk)
return;
port = &tsk->port;
if (!port->connected) {
tipc_port_unlock(port);
return;
}
goto exit;
sk = &tsk->sk;
port = &tsk->port;
bh_lock_sock(sk);
if (!port->connected) {
bh_unlock_sock(sk);
goto exit;
}
peer_port = tipc_port_peerport(port);
peer_node = tipc_port_peernode(port);
@@ -2031,12 +2021,10 @@ static void tipc_sk_timeout(unsigned long ref)
k_start_timer(&port->timer, port->probing_interval);
}
bh_unlock_sock(sk);
tipc_port_unlock(port);
if (!buf)
return;
msg = buf_msg(buf);
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
if (buf)
tipc_link_xmit(buf, peer_node, ref);
exit:
tipc_sk_put(tsk);
}
static int tipc_sk_show(struct tipc_port *port, char *buf,
@@ -2100,13 +2088,13 @@ struct sk_buff *tipc_sk_socks_show(void)
pb = TLV_DATA(rep_tlv);
pb_len = ULTRA_STRING_MAX_LEN;
tsk = tipc_sk_lock_next(&ref);
for (; tsk; tsk = tipc_sk_lock_next(&ref)) {
bh_lock_sock(&tsk->sk);
tsk = tipc_sk_get_next(&ref);
for (; tsk; tsk = tipc_sk_get_next(&ref)) {
lock_sock(&tsk->sk);
str_len += tipc_sk_show(&tsk->port, pb + str_len,
pb_len - str_len, 0);
bh_unlock_sock(&tsk->sk);
tipc_port_unlock(&tsk->port);
release_sock(&tsk->sk);
tipc_sk_put(tsk);
}
str_len += 1; /* for "\0" */
skb_put(buf, TLV_SPACE(str_len));
@@ -2122,15 +2110,15 @@ void tipc_sk_reinit(void)
{
struct tipc_msg *msg;
u32 ref = 0;
struct tipc_sock *tsk = tipc_sk_lock_next(&ref);
struct tipc_sock *tsk = tipc_sk_get_next(&ref);
for (; tsk; tsk = tipc_sk_lock_next(&ref)) {
bh_lock_sock(&tsk->sk);
for (; tsk; tsk = tipc_sk_get_next(&ref)) {
lock_sock(&tsk->sk);
msg = &tsk->port.phdr;
msg_set_prevnode(msg, tipc_own_addr);
msg_set_orignode(msg, tipc_own_addr);
bh_unlock_sock(&tsk->sk);
tipc_port_unlock(&tsk->port);
release_sock(&tsk->sk);
tipc_sk_put(tsk);
}
}