dlm: delayed reply message warning

Add an option (disabled by default) to print a warning message
when a lock has been waiting a configurable amount of time for
a reply message from another node.  This is mainly for debugging.

Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
David Teigland
2011-03-28 14:17:26 -05:00
parent 4bcad6c1ef
commit c6ff669bac
6 changed files with 108 additions and 11 deletions

View File

@@ -799,10 +799,84 @@ static int msg_reply_type(int mstype)
return -1;
}
static int nodeid_warned(int nodeid, int num_nodes, int *warned)
{
int i;
for (i = 0; i < num_nodes; i++) {
if (!warned[i]) {
warned[i] = nodeid;
return 0;
}
if (warned[i] == nodeid)
return 1;
}
return 0;
}
void dlm_scan_waiters(struct dlm_ls *ls)
{
struct dlm_lkb *lkb;
ktime_t zero = ktime_set(0, 0);
s64 us;
s64 debug_maxus = 0;
u32 debug_scanned = 0;
u32 debug_expired = 0;
int num_nodes = 0;
int *warned = NULL;
if (!dlm_config.ci_waitwarn_us)
return;
mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
if (ktime_equal(lkb->lkb_wait_time, zero))
continue;
debug_scanned++;
us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
if (us < dlm_config.ci_waitwarn_us)
continue;
lkb->lkb_wait_time = zero;
debug_expired++;
if (us > debug_maxus)
debug_maxus = us;
if (!num_nodes) {
num_nodes = ls->ls_num_nodes;
warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
if (warned)
memset(warned, 0, num_nodes * sizeof(int));
}
if (!warned)
continue;
if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
continue;
log_error(ls, "waitwarn %x %lld %d us check connection to "
"node %d", lkb->lkb_id, (long long)us,
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
}
mutex_unlock(&ls->ls_waiters_mutex);
if (warned)
kfree(warned);
if (debug_expired)
log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
debug_scanned, debug_expired,
dlm_config.ci_waitwarn_us, (long long)debug_maxus);
}
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
@@ -842,6 +916,8 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
lkb->lkb_wait_count++;
lkb->lkb_wait_type = mstype;
lkb->lkb_wait_time = ktime_get();
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
out:
@@ -1157,6 +1233,16 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
mutex_unlock(&ls->ls_timeout_mutex);
if (!dlm_config.ci_waitwarn_us)
return;
mutex_lock(&ls->ls_waiters_mutex);
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
if (ktime_to_us(lkb->lkb_wait_time))
lkb->lkb_wait_time = ktime_get();
}
mutex_unlock(&ls->ls_waiters_mutex);
}
/* lkb is master or local copy */
@@ -2844,12 +2930,12 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
struct dlm_mhandle *mh;
int to_nodeid, error;
error = add_to_waiters(lkb, mstype);
to_nodeid = r->res_nodeid;
error = add_to_waiters(lkb, mstype, to_nodeid);
if (error)
return error;
to_nodeid = r->res_nodeid;
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
if (error)
goto fail;
@@ -2951,12 +3037,12 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
struct dlm_mhandle *mh;
int to_nodeid, error;
error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
to_nodeid = dlm_dir_nodeid(r);
error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
if (error)
return error;
to_nodeid = dlm_dir_nodeid(r);
error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
if (error)
goto fail;