dlm: fix race between remove and lookup

It was possible for a remove message on an old
rsb to be sent after a lookup message on a new
rsb, where the rsbs were for the same resource
name.  This could lead to a missing directory
entry for the new rsb.

It is fixed by keeping a copy of the resource
name being removed until after the remove has
been sent.  A lookup checks if this in-progress
remove matches the name it is looking up.

Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
David Teigland
2012-06-14 12:17:32 -05:00
parent 1d7c484eeb
commit 05c32f47bf
3 changed files with 178 additions and 41 deletions

View File

@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
/* FIXME: make this more efficient */
/* If there's an rsb for the same resource being removed, ensure
that the remove message is sent before the new lookup message.
It should be rare to need a delay here, but if not, then it may
be worthwhile to add a proper wait mechanism rather than a delay. */
static int shrink_bucket(struct dlm_ls *ls, int b)
static void wait_pending_remove(struct dlm_rsb *r)
{
struct rb_node *n;
struct dlm_ls *ls = r->res_ls;
restart:
spin_lock(&ls->ls_remove_spin);
if (ls->ls_remove_len &&
!rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
log_debug(ls, "delay lookup for remove dir %d %s",
r->res_dir_nodeid, r->res_name);
spin_unlock(&ls->ls_remove_spin);
msleep(1);
goto restart;
}
spin_unlock(&ls->ls_remove_spin);
}
/*
* ls_remove_spin protects ls_remove_name and ls_remove_len which are
* read by other threads in wait_pending_remove. ls_remove_names
* and ls_remove_lens are only used by the scan thread, so they do
* not need protection.
*/
static void shrink_bucket(struct dlm_ls *ls, int b)
{
struct rb_node *n, *next;
struct dlm_rsb *r;
char *name;
int our_nodeid = dlm_our_nodeid();
int count = 0, found;
int remote_count = 0;
int i, len, rv;
for (;;) {
found = 0;
spin_lock(&ls->ls_rsbtbl[b].lock);
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
/* If we're the directory record for this rsb, and
we're not the master of it, then we need to wait
for the master node to send us a dir remove for
before removing the dir record. */
spin_lock(&ls->ls_rsbtbl[b].lock);
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!dlm_no_directory(ls) && !is_master(r) &&
(dlm_dir_nodeid(r) == our_nodeid)) {
continue;
}
/* If we're the directory record for this rsb, and
we're not the master of it, then we need to wait
for the master node to send us a dir remove for
before removing the dir record. */
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ))
continue;
found = 1;
break;
if (!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid)) {
continue;
}
if (!found) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
break;
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
continue;
}
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
spin_unlock(&ls->ls_rsbtbl[b].lock);
if (!dlm_no_directory(ls) &&
(r->res_master_nodeid == our_nodeid) &&
(dlm_dir_nodeid(r) != our_nodeid)) {
/* We're the master of this rsb but we're not
the directory record, so we need to tell the
dir node to remove the dir record. */
if (!dlm_no_directory(ls) && is_master(r) &&
(dlm_dir_nodeid(r) != our_nodeid)) {
send_remove(r);
}
ls->ls_remove_lens[remote_count] = r->res_length;
memcpy(ls->ls_remove_names[remote_count], r->res_name,
DLM_RESNAME_MAXLEN);
remote_count++;
dlm_free_rsb(r);
count++;
} else {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "tossed rsb in use %s", r->res_name);
if (remote_count >= DLM_REMOVE_NAMES_MAX)
break;
continue;
}
}
return count;
if (!kref_put(&r->res_ref, kill_rsb)) {
log_error(ls, "tossed rsb in use %s", r->res_name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
dlm_free_rsb(r);
}
spin_unlock(&ls->ls_rsbtbl[b].lock);
/*
* While searching for rsb's to free, we found some that require
* remote removal. We leave them in place and find them again here
* so there is a very small gap between removing them from the toss
* list and sending the removal. Keeping this gap small is
* important to keep us (the master node) from being out of sync
* with the remote dir node for very long.
*
* From the time the rsb is removed from toss until just after
* send_remove, the rsb name is saved in ls_remove_name. A new
* lookup checks this to ensure that a new lookup message for the
* same resource name is not sent just before the remove message.
*/
for (i = 0; i < remote_count; i++) {
name = ls->ls_remove_names[i];
len = ls->ls_remove_lens[i];
spin_lock(&ls->ls_rsbtbl[b].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (rv) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name not toss %s", name);
continue;
}
if (r->res_master_nodeid != our_nodeid) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name master %d dir %d our %d %s",
r->res_master_nodeid, r->res_dir_nodeid,
our_nodeid, name);
continue;
}
if (r->res_dir_nodeid == our_nodeid) {
/* should never happen */
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name dir %d master %d our %d %s",
r->res_dir_nodeid, r->res_master_nodeid,
our_nodeid, name);
continue;
}
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ)) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_debug(ls, "remove_name toss_time %lu now %lu %s",
r->res_toss_time, jiffies, name);
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
spin_unlock(&ls->ls_rsbtbl[b].lock);
log_error(ls, "remove_name in use %s", name);
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
/* block lookup of same name until we've sent remove */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = len;
memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
spin_unlock(&ls->ls_rsbtbl[b].lock);
send_remove(r);
/* allow lookup of name again */
spin_lock(&ls->ls_remove_spin);
ls->ls_remove_len = 0;
memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
spin_unlock(&ls->ls_remove_spin);
dlm_free_rsb(r);
}
}
void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 0;
}
wait_pending_remove(r);
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;