rcom.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. *******************************************************************************
  4. **
  5. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  6. ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
  7. **
  8. **
  9. *******************************************************************************
  10. ******************************************************************************/
  11. #include "dlm_internal.h"
  12. #include "lockspace.h"
  13. #include "member.h"
  14. #include "lowcomms.h"
  15. #include "midcomms.h"
  16. #include "rcom.h"
  17. #include "recover.h"
  18. #include "dir.h"
  19. #include "config.h"
  20. #include "memory.h"
  21. #include "lock.h"
  22. #include "util.h"
  23. static int rcom_response(struct dlm_ls *ls)
  24. {
  25. return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  26. }
  27. static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  28. struct dlm_rcom **rc_ret, char *mb, int mb_len)
  29. {
  30. struct dlm_rcom *rc;
  31. rc = (struct dlm_rcom *) mb;
  32. rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  33. rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
  34. rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
  35. rc->rc_header.h_length = cpu_to_le16(mb_len);
  36. rc->rc_header.h_cmd = DLM_RCOM;
  37. rc->rc_type = cpu_to_le32(type);
  38. spin_lock(&ls->ls_recover_lock);
  39. rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
  40. spin_unlock(&ls->ls_recover_lock);
  41. *rc_ret = rc;
  42. }
  43. static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  44. struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  45. {
  46. int mb_len = sizeof(struct dlm_rcom) + len;
  47. struct dlm_mhandle *mh;
  48. char *mb;
  49. mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
  50. if (!mh) {
  51. log_print("%s to %d type %d len %d ENOBUFS",
  52. __func__, to_nodeid, type, len);
  53. return -ENOBUFS;
  54. }
  55. _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  56. *mh_ret = mh;
  57. return 0;
  58. }
  59. static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
  60. int len, struct dlm_rcom **rc_ret,
  61. struct dlm_msg **msg_ret)
  62. {
  63. int mb_len = sizeof(struct dlm_rcom) + len;
  64. struct dlm_msg *msg;
  65. char *mb;
  66. msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
  67. NULL, NULL);
  68. if (!msg) {
  69. log_print("create_rcom to %d type %d len %d ENOBUFS",
  70. to_nodeid, type, len);
  71. return -ENOBUFS;
  72. }
  73. _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
  74. *msg_ret = msg;
  75. return 0;
  76. }
  77. static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
  78. {
  79. dlm_midcomms_commit_mhandle(mh, NULL, 0);
  80. }
  81. static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
  82. {
  83. dlm_lowcomms_commit_msg(msg);
  84. dlm_lowcomms_put_msg(msg);
  85. }
  86. static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  87. uint32_t flags)
  88. {
  89. rs->rs_flags = cpu_to_le32(flags);
  90. }
  91. /* When replying to a status request, a node also sends back its
  92. configuration values. The requesting node then checks that the remote
  93. node is configured the same way as itself. */
  94. static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  95. uint32_t num_slots)
  96. {
  97. rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  98. rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  99. rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  100. rf->rf_num_slots = cpu_to_le16(num_slots);
  101. rf->rf_generation = cpu_to_le32(ls->ls_generation);
  102. }
  103. static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  104. {
  105. struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  106. if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  107. log_error(ls, "version mismatch: %x nodeid %d: %x",
  108. DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
  109. le32_to_cpu(rc->rc_header.h_version));
  110. return -EPROTO;
  111. }
  112. if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
  113. le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
  114. log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
  115. ls->ls_lvblen, ls->ls_exflags, nodeid,
  116. le32_to_cpu(rf->rf_lvblen),
  117. le32_to_cpu(rf->rf_lsflags));
  118. return -EPROTO;
  119. }
  120. return 0;
  121. }
  122. static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
  123. {
  124. spin_lock(&ls->ls_rcom_spin);
  125. *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
  126. set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  127. spin_unlock(&ls->ls_rcom_spin);
  128. }
  129. static void disallow_sync_reply(struct dlm_ls *ls)
  130. {
  131. spin_lock(&ls->ls_rcom_spin);
  132. clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  133. clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
  134. spin_unlock(&ls->ls_rcom_spin);
  135. }
  136. /*
  137. * low nodeid gathers one slot value at a time from each node.
  138. * it sets need_slots=0, and saves rf_our_slot returned from each
  139. * rcom_config.
  140. *
  141. * other nodes gather all slot values at once from the low nodeid.
  142. * they set need_slots=1, and ignore the rf_our_slot returned from each
  143. * rcom_config. they use the rf_num_slots returned from the low
  144. * node's rcom_config.
  145. */
  146. int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
  147. {
  148. struct dlm_rcom *rc;
  149. struct dlm_msg *msg;
  150. int error = 0;
  151. ls->ls_recover_nodeid = nodeid;
  152. if (nodeid == dlm_our_nodeid()) {
  153. rc = ls->ls_recover_buf;
  154. rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
  155. goto out;
  156. }
  157. retry:
  158. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
  159. sizeof(struct rcom_status), &rc, &msg);
  160. if (error)
  161. goto out;
  162. set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
  163. allow_sync_reply(ls, &rc->rc_id);
  164. memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
  165. send_rcom_stateless(msg, rc);
  166. error = dlm_wait_function(ls, &rcom_response);
  167. disallow_sync_reply(ls);
  168. if (error == -ETIMEDOUT)
  169. goto retry;
  170. if (error)
  171. goto out;
  172. rc = ls->ls_recover_buf;
  173. if (rc->rc_result == cpu_to_le32(-ESRCH)) {
  174. /* we pretend the remote lockspace exists with 0 status */
  175. log_debug(ls, "remote node %d not ready", nodeid);
  176. rc->rc_result = 0;
  177. error = 0;
  178. } else {
  179. error = check_rcom_config(ls, rc, nodeid);
  180. }
  181. /* the caller looks at rc_result for the remote recovery status */
  182. out:
  183. return error;
  184. }
  185. static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  186. {
  187. struct dlm_rcom *rc;
  188. struct rcom_status *rs;
  189. uint32_t status;
  190. int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  191. int len = sizeof(struct rcom_config);
  192. struct dlm_msg *msg;
  193. int num_slots = 0;
  194. int error;
  195. if (!dlm_slots_version(&rc_in->rc_header)) {
  196. status = dlm_recover_status(ls);
  197. goto do_create;
  198. }
  199. rs = (struct rcom_status *)rc_in->rc_buf;
  200. if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
  201. status = dlm_recover_status(ls);
  202. goto do_create;
  203. }
  204. spin_lock(&ls->ls_recover_lock);
  205. status = ls->ls_recover_status;
  206. num_slots = ls->ls_num_slots;
  207. spin_unlock(&ls->ls_recover_lock);
  208. len += num_slots * sizeof(struct rcom_slot);
  209. do_create:
  210. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
  211. len, &rc, &msg);
  212. if (error)
  213. return;
  214. rc->rc_id = rc_in->rc_id;
  215. rc->rc_seq_reply = rc_in->rc_seq;
  216. rc->rc_result = cpu_to_le32(status);
  217. set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
  218. if (!num_slots)
  219. goto do_send;
  220. spin_lock(&ls->ls_recover_lock);
  221. if (ls->ls_num_slots != num_slots) {
  222. spin_unlock(&ls->ls_recover_lock);
  223. log_debug(ls, "receive_rcom_status num_slots %d to %d",
  224. num_slots, ls->ls_num_slots);
  225. rc->rc_result = 0;
  226. set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
  227. goto do_send;
  228. }
  229. dlm_slots_copy_out(ls, rc);
  230. spin_unlock(&ls->ls_recover_lock);
  231. do_send:
  232. send_rcom_stateless(msg, rc);
  233. }
  234. static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  235. {
  236. spin_lock(&ls->ls_rcom_spin);
  237. if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
  238. le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
  239. log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
  240. le32_to_cpu(rc_in->rc_type),
  241. le32_to_cpu(rc_in->rc_header.h_nodeid),
  242. (unsigned long long)le64_to_cpu(rc_in->rc_id),
  243. (unsigned long long)ls->ls_rcom_seq);
  244. goto out;
  245. }
  246. memcpy(ls->ls_recover_buf, rc_in,
  247. le16_to_cpu(rc_in->rc_header.h_length));
  248. set_bit(LSFL_RCOM_READY, &ls->ls_flags);
  249. clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
  250. wake_up(&ls->ls_wait_general);
  251. out:
  252. spin_unlock(&ls->ls_rcom_spin);
  253. }
  254. int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
  255. {
  256. struct dlm_rcom *rc;
  257. struct dlm_msg *msg;
  258. int error = 0;
  259. ls->ls_recover_nodeid = nodeid;
  260. retry:
  261. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
  262. &rc, &msg);
  263. if (error)
  264. goto out;
  265. memcpy(rc->rc_buf, last_name, last_len);
  266. allow_sync_reply(ls, &rc->rc_id);
  267. memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
  268. send_rcom_stateless(msg, rc);
  269. error = dlm_wait_function(ls, &rcom_response);
  270. disallow_sync_reply(ls);
  271. if (error == -ETIMEDOUT)
  272. goto retry;
  273. out:
  274. return error;
  275. }
  276. static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  277. {
  278. struct dlm_rcom *rc;
  279. int error, inlen, outlen, nodeid;
  280. struct dlm_msg *msg;
  281. nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  282. inlen = le16_to_cpu(rc_in->rc_header.h_length) -
  283. sizeof(struct dlm_rcom);
  284. outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
  285. error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
  286. &rc, &msg);
  287. if (error)
  288. return;
  289. rc->rc_id = rc_in->rc_id;
  290. rc->rc_seq_reply = rc_in->rc_seq;
  291. dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
  292. nodeid);
  293. send_rcom_stateless(msg, rc);
  294. }
  295. int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
  296. {
  297. struct dlm_rcom *rc;
  298. struct dlm_mhandle *mh;
  299. struct dlm_ls *ls = r->res_ls;
  300. int error;
  301. error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
  302. &rc, &mh);
  303. if (error)
  304. goto out;
  305. memcpy(rc->rc_buf, r->res_name, r->res_length);
  306. rc->rc_id = cpu_to_le64(r->res_id);
  307. send_rcom(mh, rc);
  308. out:
  309. return error;
  310. }
  311. static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  312. {
  313. struct dlm_rcom *rc;
  314. struct dlm_mhandle *mh;
  315. int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  316. int len = le16_to_cpu(rc_in->rc_header.h_length) -
  317. sizeof(struct dlm_rcom);
  318. /* Old code would send this special id to trigger a debug dump. */
  319. if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
  320. log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
  321. dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
  322. return;
  323. }
  324. error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
  325. if (error)
  326. return;
  327. error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
  328. DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
  329. if (error)
  330. ret_nodeid = error;
  331. rc->rc_result = cpu_to_le32(ret_nodeid);
  332. rc->rc_id = rc_in->rc_id;
  333. rc->rc_seq_reply = rc_in->rc_seq;
  334. send_rcom(mh, rc);
  335. }
  336. static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  337. {
  338. dlm_recover_master_reply(ls, rc_in);
  339. }
  340. static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
  341. struct rcom_lock *rl)
  342. {
  343. memset(rl, 0, sizeof(*rl));
  344. rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
  345. rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
  346. rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
  347. rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
  348. rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
  349. rl->rl_rqmode = lkb->lkb_rqmode;
  350. rl->rl_grmode = lkb->lkb_grmode;
  351. rl->rl_status = lkb->lkb_status;
  352. rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
  353. if (lkb->lkb_bastfn)
  354. rl->rl_asts |= DLM_CB_BAST;
  355. if (lkb->lkb_astfn)
  356. rl->rl_asts |= DLM_CB_CAST;
  357. rl->rl_namelen = cpu_to_le16(r->res_length);
  358. memcpy(rl->rl_name, r->res_name, r->res_length);
  359. /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
  360. If so, receive_rcom_lock_args() won't take this copy. */
  361. if (lkb->lkb_lvbptr)
  362. memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
  363. }
  364. int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
  365. {
  366. struct dlm_ls *ls = r->res_ls;
  367. struct dlm_rcom *rc;
  368. struct dlm_mhandle *mh;
  369. struct rcom_lock *rl;
  370. int error, len = sizeof(struct rcom_lock);
  371. if (lkb->lkb_lvbptr)
  372. len += ls->ls_lvblen;
  373. error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
  374. if (error)
  375. goto out;
  376. rl = (struct rcom_lock *) rc->rc_buf;
  377. pack_rcom_lock(r, lkb, rl);
  378. rc->rc_id = cpu_to_le64((uintptr_t)r);
  379. send_rcom(mh, rc);
  380. out:
  381. return error;
  382. }
  383. /* needs at least dlm_rcom + rcom_lock */
  384. static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
  385. {
  386. struct dlm_rcom *rc;
  387. struct dlm_mhandle *mh;
  388. int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
  389. dlm_recover_master_copy(ls, rc_in);
  390. error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
  391. sizeof(struct rcom_lock), &rc, &mh);
  392. if (error)
  393. return;
  394. /* We send back the same rcom_lock struct we received, but
  395. dlm_recover_master_copy() has filled in rl_remid and rl_result */
  396. memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
  397. rc->rc_id = rc_in->rc_id;
  398. rc->rc_seq_reply = rc_in->rc_seq;
  399. send_rcom(mh, rc);
  400. }
  401. /* If the lockspace doesn't exist then still send a status message
  402. back; it's possible that it just doesn't have its global_id yet. */
  403. int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
  404. {
  405. struct dlm_rcom *rc;
  406. struct rcom_config *rf;
  407. struct dlm_mhandle *mh;
  408. char *mb;
  409. int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
  410. mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
  411. if (!mh)
  412. return -ENOBUFS;
  413. rc = (struct dlm_rcom *) mb;
  414. rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  415. rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
  416. rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
  417. rc->rc_header.h_length = cpu_to_le16(mb_len);
  418. rc->rc_header.h_cmd = DLM_RCOM;
  419. rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
  420. rc->rc_id = rc_in->rc_id;
  421. rc->rc_seq_reply = rc_in->rc_seq;
  422. rc->rc_result = cpu_to_le32(-ESRCH);
  423. rf = (struct rcom_config *) rc->rc_buf;
  424. rf->rf_lvblen = cpu_to_le32(~0U);
  425. dlm_midcomms_commit_mhandle(mh, NULL, 0);
  426. return 0;
  427. }
  428. /*
  429. * Ignore messages for stage Y before we set
  430. * recover_status bit for stage X:
  431. *
  432. * recover_status = 0
  433. *
  434. * dlm_recover_members()
  435. * - send nothing
  436. * - recv nothing
  437. * - ignore NAMES, NAMES_REPLY
  438. * - ignore LOOKUP, LOOKUP_REPLY
  439. * - ignore LOCK, LOCK_REPLY
  440. *
  441. * recover_status |= NODES
  442. *
  443. * dlm_recover_members_wait()
  444. *
  445. * dlm_recover_directory()
  446. * - send NAMES
  447. * - recv NAMES_REPLY
  448. * - ignore LOOKUP, LOOKUP_REPLY
  449. * - ignore LOCK, LOCK_REPLY
  450. *
  451. * recover_status |= DIR
  452. *
  453. * dlm_recover_directory_wait()
  454. *
  455. * dlm_recover_masters()
  456. * - send LOOKUP
  457. * - recv LOOKUP_REPLY
  458. *
  459. * dlm_recover_locks()
  460. * - send LOCKS
  461. * - recv LOCKS_REPLY
  462. *
  463. * recover_status |= LOCKS
  464. *
  465. * dlm_recover_locks_wait()
  466. *
  467. * recover_status |= DONE
  468. */
  469. /* Called by dlm_recv; corresponds to dlm_receive_message() but special
  470. recovery-only comms are sent through here. */
  471. void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  472. {
  473. int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
  474. int stop, reply = 0, names = 0, lookup = 0, lock = 0;
  475. uint32_t status;
  476. uint64_t seq;
  477. switch (rc->rc_type) {
  478. case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
  479. reply = 1;
  480. break;
  481. case cpu_to_le32(DLM_RCOM_NAMES):
  482. names = 1;
  483. break;
  484. case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
  485. names = 1;
  486. reply = 1;
  487. break;
  488. case cpu_to_le32(DLM_RCOM_LOOKUP):
  489. lookup = 1;
  490. break;
  491. case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
  492. lookup = 1;
  493. reply = 1;
  494. break;
  495. case cpu_to_le32(DLM_RCOM_LOCK):
  496. lock = 1;
  497. break;
  498. case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
  499. lock = 1;
  500. reply = 1;
  501. break;
  502. }
  503. spin_lock(&ls->ls_recover_lock);
  504. status = ls->ls_recover_status;
  505. stop = dlm_recovery_stopped(ls);
  506. seq = ls->ls_recover_seq;
  507. spin_unlock(&ls->ls_recover_lock);
  508. if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
  509. goto ignore;
  510. if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
  511. goto ignore;
  512. if (!(status & DLM_RS_NODES) && (names || lookup || lock))
  513. goto ignore;
  514. if (!(status & DLM_RS_DIR) && (lookup || lock))
  515. goto ignore;
  516. switch (rc->rc_type) {
  517. case cpu_to_le32(DLM_RCOM_STATUS):
  518. receive_rcom_status(ls, rc);
  519. break;
  520. case cpu_to_le32(DLM_RCOM_NAMES):
  521. receive_rcom_names(ls, rc);
  522. break;
  523. case cpu_to_le32(DLM_RCOM_LOOKUP):
  524. receive_rcom_lookup(ls, rc);
  525. break;
  526. case cpu_to_le32(DLM_RCOM_LOCK):
  527. if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
  528. goto Eshort;
  529. receive_rcom_lock(ls, rc);
  530. break;
  531. case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
  532. receive_sync_reply(ls, rc);
  533. break;
  534. case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
  535. receive_sync_reply(ls, rc);
  536. break;
  537. case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
  538. receive_rcom_lookup_reply(ls, rc);
  539. break;
  540. case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
  541. if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
  542. goto Eshort;
  543. dlm_recover_process_copy(ls, rc);
  544. break;
  545. default:
  546. log_error(ls, "receive_rcom bad type %d",
  547. le32_to_cpu(rc->rc_type));
  548. }
  549. return;
  550. ignore:
  551. log_limit(ls, "dlm_receive_rcom ignore msg %d "
  552. "from %d %llu %llu recover seq %llu sts %x gen %u",
  553. le32_to_cpu(rc->rc_type),
  554. nodeid,
  555. (unsigned long long)le64_to_cpu(rc->rc_seq),
  556. (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
  557. (unsigned long long)seq,
  558. status, ls->ls_generation);
  559. return;
  560. Eshort:
  561. log_error(ls, "recovery message %d from %d is too short",
  562. le32_to_cpu(rc->rc_type), nodeid);
  563. }