md-cluster.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /*
  3. * Copyright (C) 2015, SUSE
  4. */
  5. #include <linux/module.h>
  6. #include <linux/kthread.h>
  7. #include <linux/dlm.h>
  8. #include <linux/sched.h>
  9. #include <linux/raid/md_p.h>
  10. #include "md.h"
  11. #include "md-bitmap.h"
  12. #include "md-cluster.h"
  13. #define LVB_SIZE 64
  14. #define NEW_DEV_TIMEOUT 5000
  15. struct dlm_lock_resource {
  16. dlm_lockspace_t *ls;
  17. struct dlm_lksb lksb;
  18. char *name; /* lock name. */
  19. uint32_t flags; /* flags to pass to dlm_lock() */
  20. wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  21. bool sync_locking_done;
  22. void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  23. struct mddev *mddev; /* pointing back to mddev. */
  24. int mode;
  25. };
  26. struct resync_info {
  27. __le64 lo;
  28. __le64 hi;
  29. };
  30. /* md_cluster_info flags */
  31. #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
  32. #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
  33. #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
  34. /* Lock the send communication. This is done through
  35. * bit manipulation as opposed to a mutex in order to
  36. * accommodate lock and hold. See next comment.
  37. */
  38. #define MD_CLUSTER_SEND_LOCK 4
  39. /* If cluster operations (such as adding a disk) must lock the
  40. * communication channel, so as to perform extra operations
  41. * (update metadata) and no other operation is allowed on the
  42. * MD. Token needs to be locked and held until the operation
  43. * completes witha md_update_sb(), which would eventually release
  44. * the lock.
  45. */
  46. #define MD_CLUSTER_SEND_LOCKED_ALREADY 5
  47. /* We should receive message after node joined cluster and
  48. * set up all the related infos such as bitmap and personality */
  49. #define MD_CLUSTER_ALREADY_IN_CLUSTER 6
  50. #define MD_CLUSTER_PENDING_RECV_EVENT 7
  51. #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8
  52. struct md_cluster_info {
  53. struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  54. /* dlm lock space and resources for clustered raid. */
  55. dlm_lockspace_t *lockspace;
  56. int slot_number;
  57. struct completion completion;
  58. struct mutex recv_mutex;
  59. struct dlm_lock_resource *bitmap_lockres;
  60. struct dlm_lock_resource **other_bitmap_lockres;
  61. struct dlm_lock_resource *resync_lockres;
  62. struct list_head suspend_list;
  63. spinlock_t suspend_lock;
  64. /* record the region which write should be suspended */
  65. sector_t suspend_lo;
  66. sector_t suspend_hi;
  67. int suspend_from; /* the slot which broadcast suspend_lo/hi */
  68. struct md_thread *recovery_thread;
  69. unsigned long recovery_map;
  70. /* communication loc resources */
  71. struct dlm_lock_resource *ack_lockres;
  72. struct dlm_lock_resource *message_lockres;
  73. struct dlm_lock_resource *token_lockres;
  74. struct dlm_lock_resource *no_new_dev_lockres;
  75. struct md_thread *recv_thread;
  76. struct completion newdisk_completion;
  77. wait_queue_head_t wait;
  78. unsigned long state;
  79. /* record the region in RESYNCING message */
  80. sector_t sync_low;
  81. sector_t sync_hi;
  82. };
  83. enum msg_type {
  84. METADATA_UPDATED = 0,
  85. RESYNCING,
  86. NEWDISK,
  87. REMOVE,
  88. RE_ADD,
  89. BITMAP_NEEDS_SYNC,
  90. CHANGE_CAPACITY,
  91. BITMAP_RESIZE,
  92. };
  93. struct cluster_msg {
  94. __le32 type;
  95. __le32 slot;
  96. /* TODO: Unionize this for smaller footprint */
  97. __le64 low;
  98. __le64 high;
  99. char uuid[16];
  100. __le32 raid_slot;
  101. };
  102. static void sync_ast(void *arg)
  103. {
  104. struct dlm_lock_resource *res;
  105. res = arg;
  106. res->sync_locking_done = true;
  107. wake_up(&res->sync_locking);
  108. }
  109. static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
  110. {
  111. int ret = 0;
  112. ret = dlm_lock(res->ls, mode, &res->lksb,
  113. res->flags, res->name, strlen(res->name),
  114. 0, sync_ast, res, res->bast);
  115. if (ret)
  116. return ret;
  117. wait_event(res->sync_locking, res->sync_locking_done);
  118. res->sync_locking_done = false;
  119. if (res->lksb.sb_status == 0)
  120. res->mode = mode;
  121. return res->lksb.sb_status;
  122. }
  123. static int dlm_unlock_sync(struct dlm_lock_resource *res)
  124. {
  125. return dlm_lock_sync(res, DLM_LOCK_NL);
  126. }
  127. /*
  128. * An variation of dlm_lock_sync, which make lock request could
  129. * be interrupted
  130. */
  131. static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
  132. struct mddev *mddev)
  133. {
  134. int ret = 0;
  135. ret = dlm_lock(res->ls, mode, &res->lksb,
  136. res->flags, res->name, strlen(res->name),
  137. 0, sync_ast, res, res->bast);
  138. if (ret)
  139. return ret;
  140. wait_event(res->sync_locking, res->sync_locking_done
  141. || kthread_should_stop()
  142. || test_bit(MD_CLOSING, &mddev->flags));
  143. if (!res->sync_locking_done) {
  144. /*
  145. * the convert queue contains the lock request when request is
  146. * interrupted, and sync_ast could still be run, so need to
  147. * cancel the request and reset completion
  148. */
  149. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
  150. &res->lksb, res);
  151. res->sync_locking_done = false;
  152. if (unlikely(ret != 0))
  153. pr_info("failed to cancel previous lock request "
  154. "%s return %d\n", res->name, ret);
  155. return -EPERM;
  156. } else
  157. res->sync_locking_done = false;
  158. if (res->lksb.sb_status == 0)
  159. res->mode = mode;
  160. return res->lksb.sb_status;
  161. }
  162. static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
  163. char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
  164. {
  165. struct dlm_lock_resource *res = NULL;
  166. int ret, namelen;
  167. struct md_cluster_info *cinfo = mddev->cluster_info;
  168. res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  169. if (!res)
  170. return NULL;
  171. init_waitqueue_head(&res->sync_locking);
  172. res->sync_locking_done = false;
  173. res->ls = cinfo->lockspace;
  174. res->mddev = mddev;
  175. res->mode = DLM_LOCK_IV;
  176. namelen = strlen(name);
  177. res->name = kzalloc(namelen + 1, GFP_KERNEL);
  178. if (!res->name) {
  179. pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
  180. goto out_err;
  181. }
  182. strscpy(res->name, name, namelen + 1);
  183. if (with_lvb) {
  184. res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
  185. if (!res->lksb.sb_lvbptr) {
  186. pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
  187. goto out_err;
  188. }
  189. res->flags = DLM_LKF_VALBLK;
  190. }
  191. if (bastfn)
  192. res->bast = bastfn;
  193. res->flags |= DLM_LKF_EXPEDITE;
  194. ret = dlm_lock_sync(res, DLM_LOCK_NL);
  195. if (ret) {
  196. pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
  197. goto out_err;
  198. }
  199. res->flags &= ~DLM_LKF_EXPEDITE;
  200. res->flags |= DLM_LKF_CONVERT;
  201. return res;
  202. out_err:
  203. kfree(res->lksb.sb_lvbptr);
  204. kfree(res->name);
  205. kfree(res);
  206. return NULL;
  207. }
  208. static void lockres_free(struct dlm_lock_resource *res)
  209. {
  210. int ret = 0;
  211. if (!res)
  212. return;
  213. /*
  214. * use FORCEUNLOCK flag, so we can unlock even the lock is on the
  215. * waiting or convert queue
  216. */
  217. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
  218. &res->lksb, res);
  219. if (unlikely(ret != 0))
  220. pr_err("failed to unlock %s return %d\n", res->name, ret);
  221. else
  222. wait_event(res->sync_locking, res->sync_locking_done);
  223. kfree(res->name);
  224. kfree(res->lksb.sb_lvbptr);
  225. kfree(res);
  226. }
  227. static void add_resync_info(struct dlm_lock_resource *lockres,
  228. sector_t lo, sector_t hi)
  229. {
  230. struct resync_info *ri;
  231. ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
  232. ri->lo = cpu_to_le64(lo);
  233. ri->hi = cpu_to_le64(hi);
  234. }
  235. static int read_resync_info(struct mddev *mddev,
  236. struct dlm_lock_resource *lockres)
  237. {
  238. struct resync_info ri;
  239. struct md_cluster_info *cinfo = mddev->cluster_info;
  240. int ret = 0;
  241. dlm_lock_sync(lockres, DLM_LOCK_CR);
  242. memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  243. if (le64_to_cpu(ri.hi) > 0) {
  244. cinfo->suspend_hi = le64_to_cpu(ri.hi);
  245. cinfo->suspend_lo = le64_to_cpu(ri.lo);
  246. ret = 1;
  247. }
  248. dlm_unlock_sync(lockres);
  249. return ret;
  250. }
  251. static void recover_bitmaps(struct md_thread *thread)
  252. {
  253. struct mddev *mddev = thread->mddev;
  254. struct md_cluster_info *cinfo = mddev->cluster_info;
  255. struct dlm_lock_resource *bm_lockres;
  256. char str[64];
  257. int slot, ret;
  258. sector_t lo, hi;
  259. while (cinfo->recovery_map) {
  260. slot = fls64((u64)cinfo->recovery_map) - 1;
  261. snprintf(str, 64, "bitmap%04d", slot);
  262. bm_lockres = lockres_init(mddev, str, NULL, 1);
  263. if (!bm_lockres) {
  264. pr_err("md-cluster: Cannot initialize bitmaps\n");
  265. goto clear_bit;
  266. }
  267. ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
  268. if (ret) {
  269. pr_err("md-cluster: Could not DLM lock %s: %d\n",
  270. str, ret);
  271. goto clear_bit;
  272. }
  273. ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
  274. if (ret) {
  275. pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
  276. goto clear_bit;
  277. }
  278. /* Clear suspend_area associated with the bitmap */
  279. spin_lock_irq(&cinfo->suspend_lock);
  280. cinfo->suspend_hi = 0;
  281. cinfo->suspend_lo = 0;
  282. cinfo->suspend_from = -1;
  283. spin_unlock_irq(&cinfo->suspend_lock);
  284. /* Kick off a reshape if needed */
  285. if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
  286. test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
  287. mddev->reshape_position != MaxSector)
  288. md_wakeup_thread(mddev->sync_thread);
  289. if (hi > 0) {
  290. if (lo < mddev->recovery_cp)
  291. mddev->recovery_cp = lo;
  292. /* wake up thread to continue resync in case resync
  293. * is not finished */
  294. if (mddev->recovery_cp != MaxSector) {
  295. /*
  296. * clear the REMOTE flag since we will launch
  297. * resync thread in current node.
  298. */
  299. clear_bit(MD_RESYNCING_REMOTE,
  300. &mddev->recovery);
  301. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  302. md_wakeup_thread(mddev->thread);
  303. }
  304. }
  305. clear_bit:
  306. lockres_free(bm_lockres);
  307. clear_bit(slot, &cinfo->recovery_map);
  308. }
  309. }
  310. static void recover_prep(void *arg)
  311. {
  312. struct mddev *mddev = arg;
  313. struct md_cluster_info *cinfo = mddev->cluster_info;
  314. set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  315. }
  316. static void __recover_slot(struct mddev *mddev, int slot)
  317. {
  318. struct md_cluster_info *cinfo = mddev->cluster_info;
  319. set_bit(slot, &cinfo->recovery_map);
  320. if (!cinfo->recovery_thread) {
  321. cinfo->recovery_thread = md_register_thread(recover_bitmaps,
  322. mddev, "recover");
  323. if (!cinfo->recovery_thread) {
  324. pr_warn("md-cluster: Could not create recovery thread\n");
  325. return;
  326. }
  327. }
  328. md_wakeup_thread(cinfo->recovery_thread);
  329. }
  330. static void recover_slot(void *arg, struct dlm_slot *slot)
  331. {
  332. struct mddev *mddev = arg;
  333. struct md_cluster_info *cinfo = mddev->cluster_info;
  334. pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
  335. mddev->bitmap_info.cluster_name,
  336. slot->nodeid, slot->slot,
  337. cinfo->slot_number);
  338. /* deduct one since dlm slot starts from one while the num of
  339. * cluster-md begins with 0 */
  340. __recover_slot(mddev, slot->slot - 1);
  341. }
  342. static void recover_done(void *arg, struct dlm_slot *slots,
  343. int num_slots, int our_slot,
  344. uint32_t generation)
  345. {
  346. struct mddev *mddev = arg;
  347. struct md_cluster_info *cinfo = mddev->cluster_info;
  348. cinfo->slot_number = our_slot;
  349. /* completion is only need to be complete when node join cluster,
  350. * it doesn't need to run during another node's failure */
  351. if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
  352. complete(&cinfo->completion);
  353. clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  354. }
  355. clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  356. }
  357. /* the ops is called when node join the cluster, and do lock recovery
  358. * if node failure occurs */
  359. static const struct dlm_lockspace_ops md_ls_ops = {
  360. .recover_prep = recover_prep,
  361. .recover_slot = recover_slot,
  362. .recover_done = recover_done,
  363. };
  364. /*
  365. * The BAST function for the ack lock resource
  366. * This function wakes up the receive thread in
  367. * order to receive and process the message.
  368. */
  369. static void ack_bast(void *arg, int mode)
  370. {
  371. struct dlm_lock_resource *res = arg;
  372. struct md_cluster_info *cinfo = res->mddev->cluster_info;
  373. if (mode == DLM_LOCK_EX) {
  374. if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
  375. md_wakeup_thread(cinfo->recv_thread);
  376. else
  377. set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
  378. }
  379. }
  380. static void remove_suspend_info(struct mddev *mddev, int slot)
  381. {
  382. struct md_cluster_info *cinfo = mddev->cluster_info;
  383. mddev->pers->quiesce(mddev, 1);
  384. spin_lock_irq(&cinfo->suspend_lock);
  385. cinfo->suspend_hi = 0;
  386. cinfo->suspend_lo = 0;
  387. spin_unlock_irq(&cinfo->suspend_lock);
  388. mddev->pers->quiesce(mddev, 0);
  389. }
  390. static void process_suspend_info(struct mddev *mddev,
  391. int slot, sector_t lo, sector_t hi)
  392. {
  393. struct md_cluster_info *cinfo = mddev->cluster_info;
  394. struct mdp_superblock_1 *sb = NULL;
  395. struct md_rdev *rdev;
  396. if (!hi) {
  397. /*
  398. * clear the REMOTE flag since resync or recovery is finished
  399. * in remote node.
  400. */
  401. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  402. remove_suspend_info(mddev, slot);
  403. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  404. md_wakeup_thread(mddev->thread);
  405. return;
  406. }
  407. rdev_for_each(rdev, mddev)
  408. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  409. sb = page_address(rdev->sb_page);
  410. break;
  411. }
  412. /*
  413. * The bitmaps are not same for different nodes
  414. * if RESYNCING is happening in one node, then
  415. * the node which received the RESYNCING message
  416. * probably will perform resync with the region
  417. * [lo, hi] again, so we could reduce resync time
  418. * a lot if we can ensure that the bitmaps among
  419. * different nodes are match up well.
  420. *
  421. * sync_low/hi is used to record the region which
  422. * arrived in the previous RESYNCING message,
  423. *
  424. * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
  425. * and set RESYNC_MASK since resync thread is running
  426. * in another node, so we don't need to do the resync
  427. * again with the same section.
  428. *
  429. * Skip md_bitmap_sync_with_cluster in case reshape
  430. * happening, because reshaping region is small and
  431. * we don't want to trigger lots of WARN.
  432. */
  433. if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
  434. md_bitmap_sync_with_cluster(mddev, cinfo->sync_low,
  435. cinfo->sync_hi, lo, hi);
  436. cinfo->sync_low = lo;
  437. cinfo->sync_hi = hi;
  438. mddev->pers->quiesce(mddev, 1);
  439. spin_lock_irq(&cinfo->suspend_lock);
  440. cinfo->suspend_from = slot;
  441. cinfo->suspend_lo = lo;
  442. cinfo->suspend_hi = hi;
  443. spin_unlock_irq(&cinfo->suspend_lock);
  444. mddev->pers->quiesce(mddev, 0);
  445. }
  446. static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
  447. {
  448. char disk_uuid[64];
  449. struct md_cluster_info *cinfo = mddev->cluster_info;
  450. char event_name[] = "EVENT=ADD_DEVICE";
  451. char raid_slot[16];
  452. char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
  453. int len;
  454. len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
  455. sprintf(disk_uuid + len, "%pU", cmsg->uuid);
  456. snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
  457. pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
  458. init_completion(&cinfo->newdisk_completion);
  459. set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  460. kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
  461. wait_for_completion_timeout(&cinfo->newdisk_completion,
  462. NEW_DEV_TIMEOUT);
  463. clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  464. }
  465. static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
  466. {
  467. int got_lock = 0;
  468. struct md_cluster_info *cinfo = mddev->cluster_info;
  469. mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
  470. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  471. wait_event(mddev->thread->wqueue,
  472. (got_lock = mddev_trylock(mddev)) ||
  473. test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
  474. md_reload_sb(mddev, mddev->good_device_nr);
  475. if (got_lock)
  476. mddev_unlock(mddev);
  477. }
  478. static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
  479. {
  480. struct md_rdev *rdev;
  481. rcu_read_lock();
  482. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  483. if (rdev) {
  484. set_bit(ClusterRemove, &rdev->flags);
  485. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  486. md_wakeup_thread(mddev->thread);
  487. }
  488. else
  489. pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
  490. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  491. rcu_read_unlock();
  492. }
  493. static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
  494. {
  495. struct md_rdev *rdev;
  496. rcu_read_lock();
  497. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  498. if (rdev && test_bit(Faulty, &rdev->flags))
  499. clear_bit(Faulty, &rdev->flags);
  500. else
  501. pr_warn("%s: %d Could not find disk(%d) which is faulty",
  502. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  503. rcu_read_unlock();
  504. }
  505. static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
  506. {
  507. int ret = 0;
  508. if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
  509. "node %d received its own msg\n", le32_to_cpu(msg->slot)))
  510. return -1;
  511. switch (le32_to_cpu(msg->type)) {
  512. case METADATA_UPDATED:
  513. process_metadata_update(mddev, msg);
  514. break;
  515. case CHANGE_CAPACITY:
  516. set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
  517. break;
  518. case RESYNCING:
  519. set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  520. process_suspend_info(mddev, le32_to_cpu(msg->slot),
  521. le64_to_cpu(msg->low),
  522. le64_to_cpu(msg->high));
  523. break;
  524. case NEWDISK:
  525. process_add_new_disk(mddev, msg);
  526. break;
  527. case REMOVE:
  528. process_remove_disk(mddev, msg);
  529. break;
  530. case RE_ADD:
  531. process_readd_disk(mddev, msg);
  532. break;
  533. case BITMAP_NEEDS_SYNC:
  534. __recover_slot(mddev, le32_to_cpu(msg->slot));
  535. break;
  536. case BITMAP_RESIZE:
  537. if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
  538. ret = md_bitmap_resize(mddev->bitmap,
  539. le64_to_cpu(msg->high), 0, 0);
  540. break;
  541. default:
  542. ret = -1;
  543. pr_warn("%s:%d Received unknown message from %d\n",
  544. __func__, __LINE__, msg->slot);
  545. }
  546. return ret;
  547. }
  548. /*
  549. * thread for receiving message
  550. */
  551. static void recv_daemon(struct md_thread *thread)
  552. {
  553. struct md_cluster_info *cinfo = thread->mddev->cluster_info;
  554. struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
  555. struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
  556. struct cluster_msg msg;
  557. int ret;
  558. mutex_lock(&cinfo->recv_mutex);
  559. /*get CR on Message*/
  560. if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
  561. pr_err("md/raid1:failed to get CR on MESSAGE\n");
  562. mutex_unlock(&cinfo->recv_mutex);
  563. return;
  564. }
  565. /* read lvb and wake up thread to process this message_lockres */
  566. memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
  567. ret = process_recvd_msg(thread->mddev, &msg);
  568. if (ret)
  569. goto out;
  570. /*release CR on ack_lockres*/
  571. ret = dlm_unlock_sync(ack_lockres);
  572. if (unlikely(ret != 0))
  573. pr_info("unlock ack failed return %d\n", ret);
  574. /*up-convert to PR on message_lockres*/
  575. ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
  576. if (unlikely(ret != 0))
  577. pr_info("lock PR on msg failed return %d\n", ret);
  578. /*get CR on ack_lockres again*/
  579. ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
  580. if (unlikely(ret != 0))
  581. pr_info("lock CR on ack failed return %d\n", ret);
  582. out:
  583. /*release CR on message_lockres*/
  584. ret = dlm_unlock_sync(message_lockres);
  585. if (unlikely(ret != 0))
  586. pr_info("unlock msg failed return %d\n", ret);
  587. mutex_unlock(&cinfo->recv_mutex);
  588. }
  589. /* lock_token()
  590. * Takes the lock on the TOKEN lock resource so no other
  591. * node can communicate while the operation is underway.
  592. */
  593. static int lock_token(struct md_cluster_info *cinfo)
  594. {
  595. int error;
  596. error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  597. if (error) {
  598. pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
  599. __func__, __LINE__, error);
  600. } else {
  601. /* Lock the receive sequence */
  602. mutex_lock(&cinfo->recv_mutex);
  603. }
  604. return error;
  605. }
  606. /* lock_comm()
  607. * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
  608. */
  609. static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
  610. {
  611. int rv, set_bit = 0;
  612. struct mddev *mddev = cinfo->mddev;
  613. /*
  614. * If resync thread run after raid1d thread, then process_metadata_update
  615. * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
  616. * since another node already got EX on Token and waiting the EX of Ack),
  617. * so let resync wake up thread in case flag is set.
  618. */
  619. if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  620. &cinfo->state)) {
  621. rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  622. &cinfo->state);
  623. WARN_ON_ONCE(rv);
  624. md_wakeup_thread(mddev->thread);
  625. set_bit = 1;
  626. }
  627. wait_event(cinfo->wait,
  628. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
  629. rv = lock_token(cinfo);
  630. if (set_bit)
  631. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  632. return rv;
  633. }
  634. static void unlock_comm(struct md_cluster_info *cinfo)
  635. {
  636. WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
  637. mutex_unlock(&cinfo->recv_mutex);
  638. dlm_unlock_sync(cinfo->token_lockres);
  639. clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
  640. wake_up(&cinfo->wait);
  641. }
  642. /* __sendmsg()
  643. * This function performs the actual sending of the message. This function is
  644. * usually called after performing the encompassing operation
  645. * The function:
  646. * 1. Grabs the message lockresource in EX mode
  647. * 2. Copies the message to the message LVB
  648. * 3. Downconverts message lockresource to CW
  649. * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  650. * and the other nodes read the message. The thread will wait here until all other
  651. * nodes have released ack lock resource.
  652. * 5. Downconvert ack lockresource to CR
  653. */
  654. static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  655. {
  656. int error;
  657. int slot = cinfo->slot_number - 1;
  658. cmsg->slot = cpu_to_le32(slot);
  659. /*get EX on Message*/
  660. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
  661. if (error) {
  662. pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
  663. goto failed_message;
  664. }
  665. memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
  666. sizeof(struct cluster_msg));
  667. /*down-convert EX to CW on Message*/
  668. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
  669. if (error) {
  670. pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
  671. error);
  672. goto failed_ack;
  673. }
  674. /*up-convert CR to EX on Ack*/
  675. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
  676. if (error) {
  677. pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
  678. error);
  679. goto failed_ack;
  680. }
  681. /*down-convert EX to CR on Ack*/
  682. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
  683. if (error) {
  684. pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
  685. error);
  686. goto failed_ack;
  687. }
  688. failed_ack:
  689. error = dlm_unlock_sync(cinfo->message_lockres);
  690. if (unlikely(error != 0)) {
  691. pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
  692. error);
  693. /* in case the message can't be released due to some reason */
  694. goto failed_ack;
  695. }
  696. failed_message:
  697. return error;
  698. }
  699. static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
  700. bool mddev_locked)
  701. {
  702. int ret;
  703. ret = lock_comm(cinfo, mddev_locked);
  704. if (!ret) {
  705. ret = __sendmsg(cinfo, cmsg);
  706. unlock_comm(cinfo);
  707. }
  708. return ret;
  709. }
  710. static int gather_all_resync_info(struct mddev *mddev, int total_slots)
  711. {
  712. struct md_cluster_info *cinfo = mddev->cluster_info;
  713. int i, ret = 0;
  714. struct dlm_lock_resource *bm_lockres;
  715. char str[64];
  716. sector_t lo, hi;
  717. for (i = 0; i < total_slots; i++) {
  718. memset(str, '\0', 64);
  719. snprintf(str, 64, "bitmap%04d", i);
  720. bm_lockres = lockres_init(mddev, str, NULL, 1);
  721. if (!bm_lockres)
  722. return -ENOMEM;
  723. if (i == (cinfo->slot_number - 1)) {
  724. lockres_free(bm_lockres);
  725. continue;
  726. }
  727. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  728. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  729. if (ret == -EAGAIN) {
  730. if (read_resync_info(mddev, bm_lockres)) {
  731. pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
  732. __func__, __LINE__,
  733. (unsigned long long) cinfo->suspend_lo,
  734. (unsigned long long) cinfo->suspend_hi,
  735. i);
  736. cinfo->suspend_from = i;
  737. }
  738. ret = 0;
  739. lockres_free(bm_lockres);
  740. continue;
  741. }
  742. if (ret) {
  743. lockres_free(bm_lockres);
  744. goto out;
  745. }
  746. /* Read the disk bitmap sb and check if it needs recovery */
  747. ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
  748. if (ret) {
  749. pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
  750. lockres_free(bm_lockres);
  751. continue;
  752. }
  753. if ((hi > 0) && (lo < mddev->recovery_cp)) {
  754. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  755. mddev->recovery_cp = lo;
  756. md_check_recovery(mddev);
  757. }
  758. lockres_free(bm_lockres);
  759. }
  760. out:
  761. return ret;
  762. }
  763. static int join(struct mddev *mddev, int nodes)
  764. {
  765. struct md_cluster_info *cinfo;
  766. int ret, ops_rv;
  767. char str[64];
  768. cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
  769. if (!cinfo)
  770. return -ENOMEM;
  771. INIT_LIST_HEAD(&cinfo->suspend_list);
  772. spin_lock_init(&cinfo->suspend_lock);
  773. init_completion(&cinfo->completion);
  774. set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  775. init_waitqueue_head(&cinfo->wait);
  776. mutex_init(&cinfo->recv_mutex);
  777. mddev->cluster_info = cinfo;
  778. cinfo->mddev = mddev;
  779. memset(str, 0, 64);
  780. sprintf(str, "%pU", mddev->uuid);
  781. ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
  782. 0, LVB_SIZE, &md_ls_ops, mddev,
  783. &ops_rv, &cinfo->lockspace);
  784. if (ret)
  785. goto err;
  786. wait_for_completion(&cinfo->completion);
  787. if (nodes < cinfo->slot_number) {
  788. pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
  789. cinfo->slot_number, nodes);
  790. ret = -ERANGE;
  791. goto err;
  792. }
  793. /* Initiate the communication resources */
  794. ret = -ENOMEM;
  795. cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
  796. if (!cinfo->recv_thread) {
  797. pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
  798. goto err;
  799. }
  800. cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
  801. if (!cinfo->message_lockres)
  802. goto err;
  803. cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
  804. if (!cinfo->token_lockres)
  805. goto err;
  806. cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
  807. if (!cinfo->no_new_dev_lockres)
  808. goto err;
  809. ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  810. if (ret) {
  811. ret = -EAGAIN;
  812. pr_err("md-cluster: can't join cluster to avoid lock issue\n");
  813. goto err;
  814. }
  815. cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
  816. if (!cinfo->ack_lockres) {
  817. ret = -ENOMEM;
  818. goto err;
  819. }
  820. /* get sync CR lock on ACK. */
  821. if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
  822. pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
  823. ret);
  824. dlm_unlock_sync(cinfo->token_lockres);
  825. /* get sync CR lock on no-new-dev. */
  826. if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
  827. pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
  828. pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
  829. snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
  830. cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
  831. if (!cinfo->bitmap_lockres) {
  832. ret = -ENOMEM;
  833. goto err;
  834. }
  835. if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
  836. pr_err("Failed to get bitmap lock\n");
  837. ret = -EINVAL;
  838. goto err;
  839. }
  840. cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
  841. if (!cinfo->resync_lockres) {
  842. ret = -ENOMEM;
  843. goto err;
  844. }
  845. return 0;
  846. err:
  847. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  848. md_unregister_thread(&cinfo->recovery_thread);
  849. md_unregister_thread(&cinfo->recv_thread);
  850. lockres_free(cinfo->message_lockres);
  851. lockres_free(cinfo->token_lockres);
  852. lockres_free(cinfo->ack_lockres);
  853. lockres_free(cinfo->no_new_dev_lockres);
  854. lockres_free(cinfo->resync_lockres);
  855. lockres_free(cinfo->bitmap_lockres);
  856. if (cinfo->lockspace)
  857. dlm_release_lockspace(cinfo->lockspace, 2);
  858. mddev->cluster_info = NULL;
  859. kfree(cinfo);
  860. return ret;
  861. }
  862. static void load_bitmaps(struct mddev *mddev, int total_slots)
  863. {
  864. struct md_cluster_info *cinfo = mddev->cluster_info;
  865. /* load all the node's bitmap info for resync */
  866. if (gather_all_resync_info(mddev, total_slots))
  867. pr_err("md-cluster: failed to gather all resyn infos\n");
  868. set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
  869. /* wake up recv thread in case something need to be handled */
  870. if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
  871. md_wakeup_thread(cinfo->recv_thread);
  872. }
  873. static void resync_bitmap(struct mddev *mddev)
  874. {
  875. struct md_cluster_info *cinfo = mddev->cluster_info;
  876. struct cluster_msg cmsg = {0};
  877. int err;
  878. cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
  879. err = sendmsg(cinfo, &cmsg, 1);
  880. if (err)
  881. pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
  882. __func__, __LINE__, err);
  883. }
  884. static void unlock_all_bitmaps(struct mddev *mddev);
  885. static int leave(struct mddev *mddev)
  886. {
  887. struct md_cluster_info *cinfo = mddev->cluster_info;
  888. if (!cinfo)
  889. return 0;
  890. /*
  891. * BITMAP_NEEDS_SYNC message should be sent when node
  892. * is leaving the cluster with dirty bitmap, also we
  893. * can only deliver it when dlm connection is available.
  894. *
  895. * Also, we should send BITMAP_NEEDS_SYNC message in
  896. * case reshaping is interrupted.
  897. */
  898. if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
  899. (mddev->reshape_position != MaxSector &&
  900. test_bit(MD_CLOSING, &mddev->flags)))
  901. resync_bitmap(mddev);
  902. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  903. md_unregister_thread(&cinfo->recovery_thread);
  904. md_unregister_thread(&cinfo->recv_thread);
  905. lockres_free(cinfo->message_lockres);
  906. lockres_free(cinfo->token_lockres);
  907. lockres_free(cinfo->ack_lockres);
  908. lockres_free(cinfo->no_new_dev_lockres);
  909. lockres_free(cinfo->resync_lockres);
  910. lockres_free(cinfo->bitmap_lockres);
  911. unlock_all_bitmaps(mddev);
  912. dlm_release_lockspace(cinfo->lockspace, 2);
  913. kfree(cinfo);
  914. return 0;
  915. }
  916. /* slot_number(): Returns the MD slot number to use
  917. * DLM starts the slot numbers from 1, wheras cluster-md
  918. * wants the number to be from zero, so we deduct one
  919. */
  920. static int slot_number(struct mddev *mddev)
  921. {
  922. struct md_cluster_info *cinfo = mddev->cluster_info;
  923. return cinfo->slot_number - 1;
  924. }
  925. /*
  926. * Check if the communication is already locked, else lock the communication
  927. * channel.
  928. * If it is already locked, token is in EX mode, and hence lock_token()
  929. * should not be called.
  930. */
  931. static int metadata_update_start(struct mddev *mddev)
  932. {
  933. struct md_cluster_info *cinfo = mddev->cluster_info;
  934. int ret;
  935. /*
  936. * metadata_update_start is always called with the protection of
  937. * reconfig_mutex, so set WAITING_FOR_TOKEN here.
  938. */
  939. ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  940. &cinfo->state);
  941. WARN_ON_ONCE(ret);
  942. md_wakeup_thread(mddev->thread);
  943. wait_event(cinfo->wait,
  944. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
  945. test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
  946. /* If token is already locked, return 0 */
  947. if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
  948. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  949. return 0;
  950. }
  951. ret = lock_token(cinfo);
  952. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  953. return ret;
  954. }
  955. static int metadata_update_finish(struct mddev *mddev)
  956. {
  957. struct md_cluster_info *cinfo = mddev->cluster_info;
  958. struct cluster_msg cmsg;
  959. struct md_rdev *rdev;
  960. int ret = 0;
  961. int raid_slot = -1;
  962. memset(&cmsg, 0, sizeof(cmsg));
  963. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  964. /* Pick up a good active device number to send.
  965. */
  966. rdev_for_each(rdev, mddev)
  967. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  968. raid_slot = rdev->desc_nr;
  969. break;
  970. }
  971. if (raid_slot >= 0) {
  972. cmsg.raid_slot = cpu_to_le32(raid_slot);
  973. ret = __sendmsg(cinfo, &cmsg);
  974. } else
  975. pr_warn("md-cluster: No good device id found to send\n");
  976. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  977. unlock_comm(cinfo);
  978. return ret;
  979. }
  980. static void metadata_update_cancel(struct mddev *mddev)
  981. {
  982. struct md_cluster_info *cinfo = mddev->cluster_info;
  983. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  984. unlock_comm(cinfo);
  985. }
  986. static int update_bitmap_size(struct mddev *mddev, sector_t size)
  987. {
  988. struct md_cluster_info *cinfo = mddev->cluster_info;
  989. struct cluster_msg cmsg = {0};
  990. int ret;
  991. cmsg.type = cpu_to_le32(BITMAP_RESIZE);
  992. cmsg.high = cpu_to_le64(size);
  993. ret = sendmsg(cinfo, &cmsg, 0);
  994. if (ret)
  995. pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
  996. __func__, __LINE__, ret);
  997. return ret;
  998. }
  999. static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
  1000. {
  1001. struct bitmap_counts *counts;
  1002. char str[64];
  1003. struct dlm_lock_resource *bm_lockres;
  1004. struct bitmap *bitmap = mddev->bitmap;
  1005. unsigned long my_pages = bitmap->counts.pages;
  1006. int i, rv;
  1007. /*
  1008. * We need to ensure all the nodes can grow to a larger
  1009. * bitmap size before make the reshaping.
  1010. */
  1011. rv = update_bitmap_size(mddev, newsize);
  1012. if (rv)
  1013. return rv;
  1014. for (i = 0; i < mddev->bitmap_info.nodes; i++) {
  1015. if (i == md_cluster_ops->slot_number(mddev))
  1016. continue;
  1017. bitmap = get_bitmap_from_slot(mddev, i);
  1018. if (IS_ERR(bitmap)) {
  1019. pr_err("can't get bitmap from slot %d\n", i);
  1020. bitmap = NULL;
  1021. goto out;
  1022. }
  1023. counts = &bitmap->counts;
  1024. /*
  1025. * If we can hold the bitmap lock of one node then
  1026. * the slot is not occupied, update the pages.
  1027. */
  1028. snprintf(str, 64, "bitmap%04d", i);
  1029. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1030. if (!bm_lockres) {
  1031. pr_err("Cannot initialize %s lock\n", str);
  1032. goto out;
  1033. }
  1034. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1035. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1036. if (!rv)
  1037. counts->pages = my_pages;
  1038. lockres_free(bm_lockres);
  1039. if (my_pages != counts->pages)
  1040. /*
  1041. * Let's revert the bitmap size if one node
  1042. * can't resize bitmap
  1043. */
  1044. goto out;
  1045. md_bitmap_free(bitmap);
  1046. }
  1047. return 0;
  1048. out:
  1049. md_bitmap_free(bitmap);
  1050. update_bitmap_size(mddev, oldsize);
  1051. return -1;
  1052. }
  1053. /*
  1054. * return 0 if all the bitmaps have the same sync_size
  1055. */
  1056. static int cluster_check_sync_size(struct mddev *mddev)
  1057. {
  1058. int i, rv;
  1059. bitmap_super_t *sb;
  1060. unsigned long my_sync_size, sync_size = 0;
  1061. int node_num = mddev->bitmap_info.nodes;
  1062. int current_slot = md_cluster_ops->slot_number(mddev);
  1063. struct bitmap *bitmap = mddev->bitmap;
  1064. char str[64];
  1065. struct dlm_lock_resource *bm_lockres;
  1066. sb = kmap_atomic(bitmap->storage.sb_page);
  1067. my_sync_size = sb->sync_size;
  1068. kunmap_atomic(sb);
  1069. for (i = 0; i < node_num; i++) {
  1070. if (i == current_slot)
  1071. continue;
  1072. bitmap = get_bitmap_from_slot(mddev, i);
  1073. if (IS_ERR(bitmap)) {
  1074. pr_err("can't get bitmap from slot %d\n", i);
  1075. return -1;
  1076. }
  1077. /*
  1078. * If we can hold the bitmap lock of one node then
  1079. * the slot is not occupied, update the sb.
  1080. */
  1081. snprintf(str, 64, "bitmap%04d", i);
  1082. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1083. if (!bm_lockres) {
  1084. pr_err("md-cluster: Cannot initialize %s\n", str);
  1085. md_bitmap_free(bitmap);
  1086. return -1;
  1087. }
  1088. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1089. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1090. if (!rv)
  1091. md_bitmap_update_sb(bitmap);
  1092. lockres_free(bm_lockres);
  1093. sb = kmap_atomic(bitmap->storage.sb_page);
  1094. if (sync_size == 0)
  1095. sync_size = sb->sync_size;
  1096. else if (sync_size != sb->sync_size) {
  1097. kunmap_atomic(sb);
  1098. md_bitmap_free(bitmap);
  1099. return -1;
  1100. }
  1101. kunmap_atomic(sb);
  1102. md_bitmap_free(bitmap);
  1103. }
  1104. return (my_sync_size == sync_size) ? 0 : -1;
  1105. }
  1106. /*
  1107. * Update the size for cluster raid is a little more complex, we perform it
  1108. * by the steps:
  1109. * 1. hold token lock and update superblock in initiator node.
  1110. * 2. send METADATA_UPDATED msg to other nodes.
  1111. * 3. The initiator node continues to check each bitmap's sync_size, if all
  1112. * bitmaps have the same value of sync_size, then we can set capacity and
  1113. * let other nodes to perform it. If one node can't update sync_size
  1114. * accordingly, we need to revert to previous value.
  1115. */
  1116. static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
  1117. {
  1118. struct md_cluster_info *cinfo = mddev->cluster_info;
  1119. struct cluster_msg cmsg;
  1120. struct md_rdev *rdev;
  1121. int ret = 0;
  1122. int raid_slot = -1;
  1123. md_update_sb(mddev, 1);
  1124. if (lock_comm(cinfo, 1)) {
  1125. pr_err("%s: lock_comm failed\n", __func__);
  1126. return;
  1127. }
  1128. memset(&cmsg, 0, sizeof(cmsg));
  1129. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  1130. rdev_for_each(rdev, mddev)
  1131. if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
  1132. raid_slot = rdev->desc_nr;
  1133. break;
  1134. }
  1135. if (raid_slot >= 0) {
  1136. cmsg.raid_slot = cpu_to_le32(raid_slot);
  1137. /*
  1138. * We can only change capiticy after all the nodes can do it,
  1139. * so need to wait after other nodes already received the msg
  1140. * and handled the change
  1141. */
  1142. ret = __sendmsg(cinfo, &cmsg);
  1143. if (ret) {
  1144. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1145. __func__, __LINE__);
  1146. unlock_comm(cinfo);
  1147. return;
  1148. }
  1149. } else {
  1150. pr_err("md-cluster: No good device id found to send\n");
  1151. unlock_comm(cinfo);
  1152. return;
  1153. }
  1154. /*
  1155. * check the sync_size from other node's bitmap, if sync_size
  1156. * have already updated in other nodes as expected, send an
  1157. * empty metadata msg to permit the change of capacity
  1158. */
  1159. if (cluster_check_sync_size(mddev) == 0) {
  1160. memset(&cmsg, 0, sizeof(cmsg));
  1161. cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
  1162. ret = __sendmsg(cinfo, &cmsg);
  1163. if (ret)
  1164. pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
  1165. __func__, __LINE__);
  1166. set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
  1167. } else {
  1168. /* revert to previous sectors */
  1169. ret = mddev->pers->resize(mddev, old_dev_sectors);
  1170. ret = __sendmsg(cinfo, &cmsg);
  1171. if (ret)
  1172. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1173. __func__, __LINE__);
  1174. }
  1175. unlock_comm(cinfo);
  1176. }
  1177. static int resync_start(struct mddev *mddev)
  1178. {
  1179. struct md_cluster_info *cinfo = mddev->cluster_info;
  1180. return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
  1181. }
  1182. static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
  1183. {
  1184. struct md_cluster_info *cinfo = mddev->cluster_info;
  1185. spin_lock_irq(&cinfo->suspend_lock);
  1186. *lo = cinfo->suspend_lo;
  1187. *hi = cinfo->suspend_hi;
  1188. spin_unlock_irq(&cinfo->suspend_lock);
  1189. }
  1190. static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
  1191. {
  1192. struct md_cluster_info *cinfo = mddev->cluster_info;
  1193. struct resync_info ri;
  1194. struct cluster_msg cmsg = {0};
  1195. /* do not send zero again, if we have sent before */
  1196. if (hi == 0) {
  1197. memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  1198. if (le64_to_cpu(ri.hi) == 0)
  1199. return 0;
  1200. }
  1201. add_resync_info(cinfo->bitmap_lockres, lo, hi);
  1202. /* Re-acquire the lock to refresh LVB */
  1203. dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
  1204. cmsg.type = cpu_to_le32(RESYNCING);
  1205. cmsg.low = cpu_to_le64(lo);
  1206. cmsg.high = cpu_to_le64(hi);
  1207. /*
  1208. * mddev_lock is held if resync_info_update is called from
  1209. * resync_finish (md_reap_sync_thread -> resync_finish)
  1210. */
  1211. if (lo == 0 && hi == 0)
  1212. return sendmsg(cinfo, &cmsg, 1);
  1213. else
  1214. return sendmsg(cinfo, &cmsg, 0);
  1215. }
  1216. static int resync_finish(struct mddev *mddev)
  1217. {
  1218. struct md_cluster_info *cinfo = mddev->cluster_info;
  1219. int ret = 0;
  1220. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  1221. /*
  1222. * If resync thread is interrupted so we can't say resync is finished,
  1223. * another node will launch resync thread to continue.
  1224. */
  1225. if (!test_bit(MD_CLOSING, &mddev->flags))
  1226. ret = resync_info_update(mddev, 0, 0);
  1227. dlm_unlock_sync(cinfo->resync_lockres);
  1228. return ret;
  1229. }
  1230. static int area_resyncing(struct mddev *mddev, int direction,
  1231. sector_t lo, sector_t hi)
  1232. {
  1233. struct md_cluster_info *cinfo = mddev->cluster_info;
  1234. int ret = 0;
  1235. if ((direction == READ) &&
  1236. test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
  1237. return 1;
  1238. spin_lock_irq(&cinfo->suspend_lock);
  1239. if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
  1240. ret = 1;
  1241. spin_unlock_irq(&cinfo->suspend_lock);
  1242. return ret;
  1243. }
  1244. /* add_new_disk() - initiates a disk add
  1245. * However, if this fails before writing md_update_sb(),
  1246. * add_new_disk_cancel() must be called to release token lock
  1247. */
  1248. static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
  1249. {
  1250. struct md_cluster_info *cinfo = mddev->cluster_info;
  1251. struct cluster_msg cmsg;
  1252. int ret = 0;
  1253. struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
  1254. char *uuid = sb->device_uuid;
  1255. memset(&cmsg, 0, sizeof(cmsg));
  1256. cmsg.type = cpu_to_le32(NEWDISK);
  1257. memcpy(cmsg.uuid, uuid, 16);
  1258. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1259. if (lock_comm(cinfo, 1))
  1260. return -EAGAIN;
  1261. ret = __sendmsg(cinfo, &cmsg);
  1262. if (ret) {
  1263. unlock_comm(cinfo);
  1264. return ret;
  1265. }
  1266. cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
  1267. ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
  1268. cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
  1269. /* Some node does not "see" the device */
  1270. if (ret == -EAGAIN)
  1271. ret = -ENOENT;
  1272. if (ret)
  1273. unlock_comm(cinfo);
  1274. else {
  1275. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  1276. /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
  1277. * will run soon after add_new_disk, the below path will be
  1278. * invoked:
  1279. * md_wakeup_thread(mddev->thread)
  1280. * -> conf->thread (raid1d)
  1281. * -> md_check_recovery -> md_update_sb
  1282. * -> metadata_update_start/finish
  1283. * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
  1284. *
  1285. * For other failure cases, metadata_update_cancel and
  1286. * add_new_disk_cancel also clear below bit as well.
  1287. * */
  1288. set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1289. wake_up(&cinfo->wait);
  1290. }
  1291. return ret;
  1292. }
  1293. static void add_new_disk_cancel(struct mddev *mddev)
  1294. {
  1295. struct md_cluster_info *cinfo = mddev->cluster_info;
  1296. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1297. unlock_comm(cinfo);
  1298. }
  1299. static int new_disk_ack(struct mddev *mddev, bool ack)
  1300. {
  1301. struct md_cluster_info *cinfo = mddev->cluster_info;
  1302. if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
  1303. pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
  1304. return -EINVAL;
  1305. }
  1306. if (ack)
  1307. dlm_unlock_sync(cinfo->no_new_dev_lockres);
  1308. complete(&cinfo->newdisk_completion);
  1309. return 0;
  1310. }
  1311. static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
  1312. {
  1313. struct cluster_msg cmsg = {0};
  1314. struct md_cluster_info *cinfo = mddev->cluster_info;
  1315. cmsg.type = cpu_to_le32(REMOVE);
  1316. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1317. return sendmsg(cinfo, &cmsg, 1);
  1318. }
  1319. static int lock_all_bitmaps(struct mddev *mddev)
  1320. {
  1321. int slot, my_slot, ret, held = 1, i = 0;
  1322. char str[64];
  1323. struct md_cluster_info *cinfo = mddev->cluster_info;
  1324. cinfo->other_bitmap_lockres =
  1325. kcalloc(mddev->bitmap_info.nodes - 1,
  1326. sizeof(struct dlm_lock_resource *), GFP_KERNEL);
  1327. if (!cinfo->other_bitmap_lockres) {
  1328. pr_err("md: can't alloc mem for other bitmap locks\n");
  1329. return 0;
  1330. }
  1331. my_slot = slot_number(mddev);
  1332. for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
  1333. if (slot == my_slot)
  1334. continue;
  1335. memset(str, '\0', 64);
  1336. snprintf(str, 64, "bitmap%04d", slot);
  1337. cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
  1338. if (!cinfo->other_bitmap_lockres[i])
  1339. return -ENOMEM;
  1340. cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
  1341. ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
  1342. if (ret)
  1343. held = -1;
  1344. i++;
  1345. }
  1346. return held;
  1347. }
  1348. static void unlock_all_bitmaps(struct mddev *mddev)
  1349. {
  1350. struct md_cluster_info *cinfo = mddev->cluster_info;
  1351. int i;
  1352. /* release other node's bitmap lock if they are existed */
  1353. if (cinfo->other_bitmap_lockres) {
  1354. for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
  1355. if (cinfo->other_bitmap_lockres[i]) {
  1356. lockres_free(cinfo->other_bitmap_lockres[i]);
  1357. }
  1358. }
  1359. kfree(cinfo->other_bitmap_lockres);
  1360. cinfo->other_bitmap_lockres = NULL;
  1361. }
  1362. }
  1363. static int gather_bitmaps(struct md_rdev *rdev)
  1364. {
  1365. int sn, err;
  1366. sector_t lo, hi;
  1367. struct cluster_msg cmsg = {0};
  1368. struct mddev *mddev = rdev->mddev;
  1369. struct md_cluster_info *cinfo = mddev->cluster_info;
  1370. cmsg.type = cpu_to_le32(RE_ADD);
  1371. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1372. err = sendmsg(cinfo, &cmsg, 1);
  1373. if (err)
  1374. goto out;
  1375. for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
  1376. if (sn == (cinfo->slot_number - 1))
  1377. continue;
  1378. err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
  1379. if (err) {
  1380. pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
  1381. goto out;
  1382. }
  1383. if ((hi > 0) && (lo < mddev->recovery_cp))
  1384. mddev->recovery_cp = lo;
  1385. }
  1386. out:
  1387. return err;
  1388. }
  1389. static struct md_cluster_operations cluster_ops = {
  1390. .join = join,
  1391. .leave = leave,
  1392. .slot_number = slot_number,
  1393. .resync_start = resync_start,
  1394. .resync_finish = resync_finish,
  1395. .resync_info_update = resync_info_update,
  1396. .resync_info_get = resync_info_get,
  1397. .metadata_update_start = metadata_update_start,
  1398. .metadata_update_finish = metadata_update_finish,
  1399. .metadata_update_cancel = metadata_update_cancel,
  1400. .area_resyncing = area_resyncing,
  1401. .add_new_disk = add_new_disk,
  1402. .add_new_disk_cancel = add_new_disk_cancel,
  1403. .new_disk_ack = new_disk_ack,
  1404. .remove_disk = remove_disk,
  1405. .load_bitmaps = load_bitmaps,
  1406. .gather_bitmaps = gather_bitmaps,
  1407. .resize_bitmaps = resize_bitmaps,
  1408. .lock_all_bitmaps = lock_all_bitmaps,
  1409. .unlock_all_bitmaps = unlock_all_bitmaps,
  1410. .update_size = update_size,
  1411. };
  1412. static int __init cluster_init(void)
  1413. {
  1414. pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
  1415. pr_info("Registering Cluster MD functions\n");
  1416. register_md_cluster_operations(&cluster_ops, THIS_MODULE);
  1417. return 0;
  1418. }
  1419. static void cluster_exit(void)
  1420. {
  1421. unregister_md_cluster_operations();
  1422. }
  1423. module_init(cluster_init);
  1424. module_exit(cluster_exit);
  1425. MODULE_AUTHOR("SUSE");
  1426. MODULE_LICENSE("GPL");
  1427. MODULE_DESCRIPTION("Clustering support for MD");