snap.c 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320
  1. // SPDX-License-Identifier: GPL-2.0
  2. #include <linux/ceph/ceph_debug.h>
  3. #include <linux/fs.h>
  4. #include <linux/sort.h>
  5. #include <linux/slab.h>
  6. #include <linux/iversion.h>
  7. #include "super.h"
  8. #include "mds_client.h"
  9. #include <linux/ceph/decode.h>
  10. /* unused map expires after 5 minutes */
  11. #define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
  12. /*
  13. * Snapshots in ceph are driven in large part by cooperation from the
  14. * client. In contrast to local file systems or file servers that
  15. * implement snapshots at a single point in the system, ceph's
  16. * distributed access to storage requires clients to help decide
  17. * whether a write logically occurs before or after a recently created
  18. * snapshot.
  19. *
  20. * This provides a perfect instantanous client-wide snapshot. Between
  21. * clients, however, snapshots may appear to be applied at slightly
  22. * different points in time, depending on delays in delivering the
  23. * snapshot notification.
  24. *
  25. * Snapshots are _not_ file system-wide. Instead, each snapshot
  26. * applies to the subdirectory nested beneath some directory. This
  27. * effectively divides the hierarchy into multiple "realms," where all
  28. * of the files contained by each realm share the same set of
  29. * snapshots. An individual realm's snap set contains snapshots
  30. * explicitly created on that realm, as well as any snaps in its
  31. * parent's snap set _after_ the point at which the parent became it's
  32. * parent (due to, say, a rename). Similarly, snaps from prior parents
  33. * during the time intervals during which they were the parent are included.
  34. *
  35. * The client is spared most of this detail, fortunately... it must only
  36. * maintains a hierarchy of realms reflecting the current parent/child
  37. * realm relationship, and for each realm has an explicit list of snaps
  38. * inherited from prior parents.
  39. *
  40. * A snap_realm struct is maintained for realms containing every inode
  41. * with an open cap in the system. (The needed snap realm information is
  42. * provided by the MDS whenever a cap is issued, i.e., on open.) A 'seq'
  43. * version number is used to ensure that as realm parameters change (new
  44. * snapshot, new parent, etc.) the client's realm hierarchy is updated.
  45. *
  46. * The realm hierarchy drives the generation of a 'snap context' for each
  47. * realm, which simply lists the resulting set of snaps for the realm. This
  48. * is attached to any writes sent to OSDs.
  49. */
  50. /*
  51. * Unfortunately error handling is a bit mixed here. If we get a snap
  52. * update, but don't have enough memory to update our realm hierarchy,
  53. * it's not clear what we can do about it (besides complaining to the
  54. * console).
  55. */
  56. /*
  57. * increase ref count for the realm
  58. *
  59. * caller must hold snap_rwsem.
  60. */
  61. void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
  62. struct ceph_snap_realm *realm)
  63. {
  64. lockdep_assert_held(&mdsc->snap_rwsem);
  65. /*
  66. * The 0->1 and 1->0 transitions must take the snap_empty_lock
  67. * atomically with the refcount change. Go ahead and bump the
  68. * nref here, unless it's 0, in which case we take the spinlock
  69. * and then do the increment and remove it from the list.
  70. */
  71. if (atomic_inc_not_zero(&realm->nref))
  72. return;
  73. spin_lock(&mdsc->snap_empty_lock);
  74. if (atomic_inc_return(&realm->nref) == 1)
  75. list_del_init(&realm->empty_item);
  76. spin_unlock(&mdsc->snap_empty_lock);
  77. }
  78. static void __insert_snap_realm(struct rb_root *root,
  79. struct ceph_snap_realm *new)
  80. {
  81. struct rb_node **p = &root->rb_node;
  82. struct rb_node *parent = NULL;
  83. struct ceph_snap_realm *r = NULL;
  84. while (*p) {
  85. parent = *p;
  86. r = rb_entry(parent, struct ceph_snap_realm, node);
  87. if (new->ino < r->ino)
  88. p = &(*p)->rb_left;
  89. else if (new->ino > r->ino)
  90. p = &(*p)->rb_right;
  91. else
  92. BUG();
  93. }
  94. rb_link_node(&new->node, parent, p);
  95. rb_insert_color(&new->node, root);
  96. }
  97. /*
  98. * create and get the realm rooted at @ino and bump its ref count.
  99. *
  100. * caller must hold snap_rwsem for write.
  101. */
  102. static struct ceph_snap_realm *ceph_create_snap_realm(
  103. struct ceph_mds_client *mdsc,
  104. u64 ino)
  105. {
  106. struct ceph_snap_realm *realm;
  107. lockdep_assert_held_write(&mdsc->snap_rwsem);
  108. realm = kzalloc(sizeof(*realm), GFP_NOFS);
  109. if (!realm)
  110. return ERR_PTR(-ENOMEM);
  111. /* Do not release the global dummy snaprealm until unmouting */
  112. if (ino == CEPH_INO_GLOBAL_SNAPREALM)
  113. atomic_set(&realm->nref, 2);
  114. else
  115. atomic_set(&realm->nref, 1);
  116. realm->ino = ino;
  117. INIT_LIST_HEAD(&realm->children);
  118. INIT_LIST_HEAD(&realm->child_item);
  119. INIT_LIST_HEAD(&realm->empty_item);
  120. INIT_LIST_HEAD(&realm->dirty_item);
  121. INIT_LIST_HEAD(&realm->rebuild_item);
  122. INIT_LIST_HEAD(&realm->inodes_with_caps);
  123. spin_lock_init(&realm->inodes_with_caps_lock);
  124. __insert_snap_realm(&mdsc->snap_realms, realm);
  125. mdsc->num_snap_realms++;
  126. dout("%s %llx %p\n", __func__, realm->ino, realm);
  127. return realm;
  128. }
  129. /*
  130. * lookup the realm rooted at @ino.
  131. *
  132. * caller must hold snap_rwsem.
  133. */
  134. static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
  135. u64 ino)
  136. {
  137. struct rb_node *n = mdsc->snap_realms.rb_node;
  138. struct ceph_snap_realm *r;
  139. lockdep_assert_held(&mdsc->snap_rwsem);
  140. while (n) {
  141. r = rb_entry(n, struct ceph_snap_realm, node);
  142. if (ino < r->ino)
  143. n = n->rb_left;
  144. else if (ino > r->ino)
  145. n = n->rb_right;
  146. else {
  147. dout("%s %llx %p\n", __func__, r->ino, r);
  148. return r;
  149. }
  150. }
  151. return NULL;
  152. }
  153. struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
  154. u64 ino)
  155. {
  156. struct ceph_snap_realm *r;
  157. r = __lookup_snap_realm(mdsc, ino);
  158. if (r)
  159. ceph_get_snap_realm(mdsc, r);
  160. return r;
  161. }
  162. static void __put_snap_realm(struct ceph_mds_client *mdsc,
  163. struct ceph_snap_realm *realm);
  164. /*
  165. * called with snap_rwsem (write)
  166. */
  167. static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
  168. struct ceph_snap_realm *realm)
  169. {
  170. lockdep_assert_held_write(&mdsc->snap_rwsem);
  171. dout("%s %p %llx\n", __func__, realm, realm->ino);
  172. rb_erase(&realm->node, &mdsc->snap_realms);
  173. mdsc->num_snap_realms--;
  174. if (realm->parent) {
  175. list_del_init(&realm->child_item);
  176. __put_snap_realm(mdsc, realm->parent);
  177. }
  178. kfree(realm->prior_parent_snaps);
  179. kfree(realm->snaps);
  180. ceph_put_snap_context(realm->cached_context);
  181. kfree(realm);
  182. }
  183. /*
  184. * caller holds snap_rwsem (write)
  185. */
  186. static void __put_snap_realm(struct ceph_mds_client *mdsc,
  187. struct ceph_snap_realm *realm)
  188. {
  189. lockdep_assert_held_write(&mdsc->snap_rwsem);
  190. /*
  191. * We do not require the snap_empty_lock here, as any caller that
  192. * increments the value must hold the snap_rwsem.
  193. */
  194. if (atomic_dec_and_test(&realm->nref))
  195. __destroy_snap_realm(mdsc, realm);
  196. }
  197. /*
  198. * See comments in ceph_get_snap_realm. Caller needn't hold any locks.
  199. */
  200. void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
  201. struct ceph_snap_realm *realm)
  202. {
  203. if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
  204. return;
  205. if (down_write_trylock(&mdsc->snap_rwsem)) {
  206. spin_unlock(&mdsc->snap_empty_lock);
  207. __destroy_snap_realm(mdsc, realm);
  208. up_write(&mdsc->snap_rwsem);
  209. } else {
  210. list_add(&realm->empty_item, &mdsc->snap_empty);
  211. spin_unlock(&mdsc->snap_empty_lock);
  212. }
  213. }
  214. /*
  215. * Clean up any realms whose ref counts have dropped to zero. Note
  216. * that this does not include realms who were created but not yet
  217. * used.
  218. *
  219. * Called under snap_rwsem (write)
  220. */
  221. static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
  222. {
  223. struct ceph_snap_realm *realm;
  224. lockdep_assert_held_write(&mdsc->snap_rwsem);
  225. spin_lock(&mdsc->snap_empty_lock);
  226. while (!list_empty(&mdsc->snap_empty)) {
  227. realm = list_first_entry(&mdsc->snap_empty,
  228. struct ceph_snap_realm, empty_item);
  229. list_del(&realm->empty_item);
  230. spin_unlock(&mdsc->snap_empty_lock);
  231. __destroy_snap_realm(mdsc, realm);
  232. spin_lock(&mdsc->snap_empty_lock);
  233. }
  234. spin_unlock(&mdsc->snap_empty_lock);
  235. }
  236. void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc)
  237. {
  238. struct ceph_snap_realm *global_realm;
  239. down_write(&mdsc->snap_rwsem);
  240. global_realm = __lookup_snap_realm(mdsc, CEPH_INO_GLOBAL_SNAPREALM);
  241. if (global_realm)
  242. ceph_put_snap_realm(mdsc, global_realm);
  243. __cleanup_empty_realms(mdsc);
  244. up_write(&mdsc->snap_rwsem);
  245. }
  246. /*
  247. * adjust the parent realm of a given @realm. adjust child list, and parent
  248. * pointers, and ref counts appropriately.
  249. *
  250. * return true if parent was changed, 0 if unchanged, <0 on error.
  251. *
  252. * caller must hold snap_rwsem for write.
  253. */
  254. static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
  255. struct ceph_snap_realm *realm,
  256. u64 parentino)
  257. {
  258. struct ceph_snap_realm *parent;
  259. lockdep_assert_held_write(&mdsc->snap_rwsem);
  260. if (realm->parent_ino == parentino)
  261. return 0;
  262. parent = ceph_lookup_snap_realm(mdsc, parentino);
  263. if (!parent) {
  264. parent = ceph_create_snap_realm(mdsc, parentino);
  265. if (IS_ERR(parent))
  266. return PTR_ERR(parent);
  267. }
  268. dout("%s %llx %p: %llx %p -> %llx %p\n", __func__, realm->ino,
  269. realm, realm->parent_ino, realm->parent, parentino, parent);
  270. if (realm->parent) {
  271. list_del_init(&realm->child_item);
  272. ceph_put_snap_realm(mdsc, realm->parent);
  273. }
  274. realm->parent_ino = parentino;
  275. realm->parent = parent;
  276. list_add(&realm->child_item, &parent->children);
  277. return 1;
  278. }
  279. static int cmpu64_rev(const void *a, const void *b)
  280. {
  281. if (*(u64 *)a < *(u64 *)b)
  282. return 1;
  283. if (*(u64 *)a > *(u64 *)b)
  284. return -1;
  285. return 0;
  286. }
  287. /*
  288. * build the snap context for a given realm.
  289. */
  290. static int build_snap_context(struct ceph_snap_realm *realm,
  291. struct list_head *realm_queue,
  292. struct list_head *dirty_realms)
  293. {
  294. struct ceph_snap_realm *parent = realm->parent;
  295. struct ceph_snap_context *snapc;
  296. int err = 0;
  297. u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
  298. /*
  299. * build parent context, if it hasn't been built.
  300. * conservatively estimate that all parent snaps might be
  301. * included by us.
  302. */
  303. if (parent) {
  304. if (!parent->cached_context) {
  305. /* add to the queue head */
  306. list_add(&parent->rebuild_item, realm_queue);
  307. return 1;
  308. }
  309. num += parent->cached_context->num_snaps;
  310. }
  311. /* do i actually need to update? not if my context seq
  312. matches realm seq, and my parents' does to. (this works
  313. because we rebuild_snap_realms() works _downward_ in
  314. hierarchy after each update.) */
  315. if (realm->cached_context &&
  316. realm->cached_context->seq == realm->seq &&
  317. (!parent ||
  318. realm->cached_context->seq >= parent->cached_context->seq)) {
  319. dout("%s %llx %p: %p seq %lld (%u snaps) (unchanged)\n",
  320. __func__, realm->ino, realm, realm->cached_context,
  321. realm->cached_context->seq,
  322. (unsigned int)realm->cached_context->num_snaps);
  323. return 0;
  324. }
  325. /* alloc new snap context */
  326. err = -ENOMEM;
  327. if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
  328. goto fail;
  329. snapc = ceph_create_snap_context(num, GFP_NOFS);
  330. if (!snapc)
  331. goto fail;
  332. /* build (reverse sorted) snap vector */
  333. num = 0;
  334. snapc->seq = realm->seq;
  335. if (parent) {
  336. u32 i;
  337. /* include any of parent's snaps occurring _after_ my
  338. parent became my parent */
  339. for (i = 0; i < parent->cached_context->num_snaps; i++)
  340. if (parent->cached_context->snaps[i] >=
  341. realm->parent_since)
  342. snapc->snaps[num++] =
  343. parent->cached_context->snaps[i];
  344. if (parent->cached_context->seq > snapc->seq)
  345. snapc->seq = parent->cached_context->seq;
  346. }
  347. memcpy(snapc->snaps + num, realm->snaps,
  348. sizeof(u64)*realm->num_snaps);
  349. num += realm->num_snaps;
  350. memcpy(snapc->snaps + num, realm->prior_parent_snaps,
  351. sizeof(u64)*realm->num_prior_parent_snaps);
  352. num += realm->num_prior_parent_snaps;
  353. sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
  354. snapc->num_snaps = num;
  355. dout("%s %llx %p: %p seq %lld (%u snaps)\n", __func__, realm->ino,
  356. realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps);
  357. ceph_put_snap_context(realm->cached_context);
  358. realm->cached_context = snapc;
  359. /* queue realm for cap_snap creation */
  360. list_add_tail(&realm->dirty_item, dirty_realms);
  361. return 0;
  362. fail:
  363. /*
  364. * if we fail, clear old (incorrect) cached_context... hopefully
  365. * we'll have better luck building it later
  366. */
  367. if (realm->cached_context) {
  368. ceph_put_snap_context(realm->cached_context);
  369. realm->cached_context = NULL;
  370. }
  371. pr_err("%s %llx %p fail %d\n", __func__, realm->ino, realm, err);
  372. return err;
  373. }
  374. /*
  375. * rebuild snap context for the given realm and all of its children.
  376. */
  377. static void rebuild_snap_realms(struct ceph_snap_realm *realm,
  378. struct list_head *dirty_realms)
  379. {
  380. LIST_HEAD(realm_queue);
  381. int last = 0;
  382. bool skip = false;
  383. list_add_tail(&realm->rebuild_item, &realm_queue);
  384. while (!list_empty(&realm_queue)) {
  385. struct ceph_snap_realm *_realm, *child;
  386. _realm = list_first_entry(&realm_queue,
  387. struct ceph_snap_realm,
  388. rebuild_item);
  389. /*
  390. * If the last building failed dues to memory
  391. * issue, just empty the realm_queue and return
  392. * to avoid infinite loop.
  393. */
  394. if (last < 0) {
  395. list_del_init(&_realm->rebuild_item);
  396. continue;
  397. }
  398. last = build_snap_context(_realm, &realm_queue, dirty_realms);
  399. dout("%s %llx %p, %s\n", __func__, _realm->ino, _realm,
  400. last > 0 ? "is deferred" : !last ? "succeeded" : "failed");
  401. /* is any child in the list ? */
  402. list_for_each_entry(child, &_realm->children, child_item) {
  403. if (!list_empty(&child->rebuild_item)) {
  404. skip = true;
  405. break;
  406. }
  407. }
  408. if (!skip) {
  409. list_for_each_entry(child, &_realm->children, child_item)
  410. list_add_tail(&child->rebuild_item, &realm_queue);
  411. }
  412. /* last == 1 means need to build parent first */
  413. if (last <= 0)
  414. list_del_init(&_realm->rebuild_item);
  415. }
  416. }
  417. /*
  418. * helper to allocate and decode an array of snapids. free prior
  419. * instance, if any.
  420. */
  421. static int dup_array(u64 **dst, __le64 *src, u32 num)
  422. {
  423. u32 i;
  424. kfree(*dst);
  425. if (num) {
  426. *dst = kcalloc(num, sizeof(u64), GFP_NOFS);
  427. if (!*dst)
  428. return -ENOMEM;
  429. for (i = 0; i < num; i++)
  430. (*dst)[i] = get_unaligned_le64(src + i);
  431. } else {
  432. *dst = NULL;
  433. }
  434. return 0;
  435. }
  436. static bool has_new_snaps(struct ceph_snap_context *o,
  437. struct ceph_snap_context *n)
  438. {
  439. if (n->num_snaps == 0)
  440. return false;
  441. /* snaps are in descending order */
  442. return n->snaps[0] > o->seq;
  443. }
  444. /*
  445. * When a snapshot is applied, the size/mtime inode metadata is queued
  446. * in a ceph_cap_snap (one for each snapshot) until writeback
  447. * completes and the metadata can be flushed back to the MDS.
  448. *
  449. * However, if a (sync) write is currently in-progress when we apply
  450. * the snapshot, we have to wait until the write succeeds or fails
  451. * (and a final size/mtime is known). In this case the
  452. * cap_snap->writing = 1, and is said to be "pending." When the write
  453. * finishes, we __ceph_finish_cap_snap().
  454. *
  455. * Caller must hold snap_rwsem for read (i.e., the realm topology won't
  456. * change).
  457. */
  458. static void ceph_queue_cap_snap(struct ceph_inode_info *ci,
  459. struct ceph_cap_snap **pcapsnap)
  460. {
  461. struct inode *inode = &ci->netfs.inode;
  462. struct ceph_snap_context *old_snapc, *new_snapc;
  463. struct ceph_cap_snap *capsnap = *pcapsnap;
  464. struct ceph_buffer *old_blob = NULL;
  465. int used, dirty;
  466. spin_lock(&ci->i_ceph_lock);
  467. used = __ceph_caps_used(ci);
  468. dirty = __ceph_caps_dirty(ci);
  469. old_snapc = ci->i_head_snapc;
  470. new_snapc = ci->i_snap_realm->cached_context;
  471. /*
  472. * If there is a write in progress, treat that as a dirty Fw,
  473. * even though it hasn't completed yet; by the time we finish
  474. * up this capsnap it will be.
  475. */
  476. if (used & CEPH_CAP_FILE_WR)
  477. dirty |= CEPH_CAP_FILE_WR;
  478. if (__ceph_have_pending_cap_snap(ci)) {
  479. /* there is no point in queuing multiple "pending" cap_snaps,
  480. as no new writes are allowed to start when pending, so any
  481. writes in progress now were started before the previous
  482. cap_snap. lucky us. */
  483. dout("%s %p %llx.%llx already pending\n",
  484. __func__, inode, ceph_vinop(inode));
  485. goto update_snapc;
  486. }
  487. if (ci->i_wrbuffer_ref_head == 0 &&
  488. !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
  489. dout("%s %p %llx.%llx nothing dirty|writing\n",
  490. __func__, inode, ceph_vinop(inode));
  491. goto update_snapc;
  492. }
  493. BUG_ON(!old_snapc);
  494. /*
  495. * There is no need to send FLUSHSNAP message to MDS if there is
  496. * no new snapshot. But when there is dirty pages or on-going
  497. * writes, we still need to create cap_snap. cap_snap is needed
  498. * by the write path and page writeback path.
  499. *
  500. * also see ceph_try_drop_cap_snap()
  501. */
  502. if (has_new_snaps(old_snapc, new_snapc)) {
  503. if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
  504. capsnap->need_flush = true;
  505. } else {
  506. if (!(used & CEPH_CAP_FILE_WR) &&
  507. ci->i_wrbuffer_ref_head == 0) {
  508. dout("%s %p %llx.%llx no new_snap|dirty_page|writing\n",
  509. __func__, inode, ceph_vinop(inode));
  510. goto update_snapc;
  511. }
  512. }
  513. dout("%s %p %llx.%llx cap_snap %p queuing under %p %s %s\n",
  514. __func__, inode, ceph_vinop(inode), capsnap, old_snapc,
  515. ceph_cap_string(dirty), capsnap->need_flush ? "" : "no_flush");
  516. ihold(inode);
  517. capsnap->follows = old_snapc->seq;
  518. capsnap->issued = __ceph_caps_issued(ci, NULL);
  519. capsnap->dirty = dirty;
  520. capsnap->mode = inode->i_mode;
  521. capsnap->uid = inode->i_uid;
  522. capsnap->gid = inode->i_gid;
  523. if (dirty & CEPH_CAP_XATTR_EXCL) {
  524. old_blob = __ceph_build_xattrs_blob(ci);
  525. capsnap->xattr_blob =
  526. ceph_buffer_get(ci->i_xattrs.blob);
  527. capsnap->xattr_version = ci->i_xattrs.version;
  528. } else {
  529. capsnap->xattr_blob = NULL;
  530. capsnap->xattr_version = 0;
  531. }
  532. capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
  533. /* dirty page count moved from _head to this cap_snap;
  534. all subsequent writes page dirties occur _after_ this
  535. snapshot. */
  536. capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
  537. ci->i_wrbuffer_ref_head = 0;
  538. capsnap->context = old_snapc;
  539. list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
  540. if (used & CEPH_CAP_FILE_WR) {
  541. dout("%s %p %llx.%llx cap_snap %p snapc %p seq %llu used WR,"
  542. " now pending\n", __func__, inode, ceph_vinop(inode),
  543. capsnap, old_snapc, old_snapc->seq);
  544. capsnap->writing = 1;
  545. } else {
  546. /* note mtime, size NOW. */
  547. __ceph_finish_cap_snap(ci, capsnap);
  548. }
  549. *pcapsnap = NULL;
  550. old_snapc = NULL;
  551. update_snapc:
  552. if (ci->i_wrbuffer_ref_head == 0 &&
  553. ci->i_wr_ref == 0 &&
  554. ci->i_dirty_caps == 0 &&
  555. ci->i_flushing_caps == 0) {
  556. ci->i_head_snapc = NULL;
  557. } else {
  558. ci->i_head_snapc = ceph_get_snap_context(new_snapc);
  559. dout(" new snapc is %p\n", new_snapc);
  560. }
  561. spin_unlock(&ci->i_ceph_lock);
  562. ceph_buffer_put(old_blob);
  563. ceph_put_snap_context(old_snapc);
  564. }
  565. /*
  566. * Finalize the size, mtime for a cap_snap.. that is, settle on final values
  567. * to be used for the snapshot, to be flushed back to the mds.
  568. *
  569. * If capsnap can now be flushed, add to snap_flush list, and return 1.
  570. *
  571. * Caller must hold i_ceph_lock.
  572. */
  573. int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
  574. struct ceph_cap_snap *capsnap)
  575. {
  576. struct inode *inode = &ci->netfs.inode;
  577. struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
  578. BUG_ON(capsnap->writing);
  579. capsnap->size = i_size_read(inode);
  580. capsnap->mtime = inode->i_mtime;
  581. capsnap->atime = inode->i_atime;
  582. capsnap->ctime = inode->i_ctime;
  583. capsnap->btime = ci->i_btime;
  584. capsnap->change_attr = inode_peek_iversion_raw(inode);
  585. capsnap->time_warp_seq = ci->i_time_warp_seq;
  586. capsnap->truncate_size = ci->i_truncate_size;
  587. capsnap->truncate_seq = ci->i_truncate_seq;
  588. if (capsnap->dirty_pages) {
  589. dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
  590. "still has %d dirty pages\n", __func__, inode,
  591. ceph_vinop(inode), capsnap, capsnap->context,
  592. capsnap->context->seq, ceph_cap_string(capsnap->dirty),
  593. capsnap->size, capsnap->dirty_pages);
  594. return 0;
  595. }
  596. /* Fb cap still in use, delay it */
  597. if (ci->i_wb_ref) {
  598. dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
  599. "used WRBUFFER, delaying\n", __func__, inode,
  600. ceph_vinop(inode), capsnap, capsnap->context,
  601. capsnap->context->seq, ceph_cap_string(capsnap->dirty),
  602. capsnap->size);
  603. capsnap->writing = 1;
  604. return 0;
  605. }
  606. ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
  607. dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu\n",
  608. __func__, inode, ceph_vinop(inode), capsnap, capsnap->context,
  609. capsnap->context->seq, ceph_cap_string(capsnap->dirty),
  610. capsnap->size);
  611. spin_lock(&mdsc->snap_flush_lock);
  612. if (list_empty(&ci->i_snap_flush_item)) {
  613. ihold(inode);
  614. list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
  615. }
  616. spin_unlock(&mdsc->snap_flush_lock);
  617. return 1; /* caller may want to ceph_flush_snaps */
  618. }
  619. /*
  620. * Queue cap_snaps for snap writeback for this realm and its children.
  621. * Called under snap_rwsem, so realm topology won't change.
  622. */
  623. static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
  624. {
  625. struct ceph_inode_info *ci;
  626. struct inode *lastinode = NULL;
  627. struct ceph_cap_snap *capsnap = NULL;
  628. dout("%s %p %llx inode\n", __func__, realm, realm->ino);
  629. spin_lock(&realm->inodes_with_caps_lock);
  630. list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) {
  631. struct inode *inode = igrab(&ci->netfs.inode);
  632. if (!inode)
  633. continue;
  634. spin_unlock(&realm->inodes_with_caps_lock);
  635. iput(lastinode);
  636. lastinode = inode;
  637. /*
  638. * Allocate the capsnap memory outside of ceph_queue_cap_snap()
  639. * to reduce very possible but unnecessary frequently memory
  640. * allocate/free in this loop.
  641. */
  642. if (!capsnap) {
  643. capsnap = kmem_cache_zalloc(ceph_cap_snap_cachep, GFP_NOFS);
  644. if (!capsnap) {
  645. pr_err("ENOMEM allocating ceph_cap_snap on %p\n",
  646. inode);
  647. return;
  648. }
  649. }
  650. capsnap->cap_flush.is_capsnap = true;
  651. refcount_set(&capsnap->nref, 1);
  652. INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
  653. INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
  654. INIT_LIST_HEAD(&capsnap->ci_item);
  655. ceph_queue_cap_snap(ci, &capsnap);
  656. spin_lock(&realm->inodes_with_caps_lock);
  657. }
  658. spin_unlock(&realm->inodes_with_caps_lock);
  659. iput(lastinode);
  660. if (capsnap)
  661. kmem_cache_free(ceph_cap_snap_cachep, capsnap);
  662. dout("%s %p %llx done\n", __func__, realm, realm->ino);
  663. }
  664. /*
  665. * Parse and apply a snapblob "snap trace" from the MDS. This specifies
  666. * the snap realm parameters from a given realm and all of its ancestors,
  667. * up to the root.
  668. *
  669. * Caller must hold snap_rwsem for write.
  670. */
  671. int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
  672. void *p, void *e, bool deletion,
  673. struct ceph_snap_realm **realm_ret)
  674. {
  675. struct ceph_mds_snap_realm *ri; /* encoded */
  676. __le64 *snaps; /* encoded */
  677. __le64 *prior_parent_snaps; /* encoded */
  678. struct ceph_snap_realm *realm;
  679. struct ceph_snap_realm *first_realm = NULL;
  680. struct ceph_snap_realm *realm_to_rebuild = NULL;
  681. struct ceph_client *client = mdsc->fsc->client;
  682. int rebuild_snapcs;
  683. int err = -ENOMEM;
  684. int ret;
  685. LIST_HEAD(dirty_realms);
  686. lockdep_assert_held_write(&mdsc->snap_rwsem);
  687. dout("%s deletion=%d\n", __func__, deletion);
  688. more:
  689. realm = NULL;
  690. rebuild_snapcs = 0;
  691. ceph_decode_need(&p, e, sizeof(*ri), bad);
  692. ri = p;
  693. p += sizeof(*ri);
  694. ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) +
  695. le32_to_cpu(ri->num_prior_parent_snaps)), bad);
  696. snaps = p;
  697. p += sizeof(u64) * le32_to_cpu(ri->num_snaps);
  698. prior_parent_snaps = p;
  699. p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps);
  700. realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino));
  701. if (!realm) {
  702. realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino));
  703. if (IS_ERR(realm)) {
  704. err = PTR_ERR(realm);
  705. goto fail;
  706. }
  707. }
  708. /* ensure the parent is correct */
  709. err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
  710. if (err < 0)
  711. goto fail;
  712. rebuild_snapcs += err;
  713. if (le64_to_cpu(ri->seq) > realm->seq) {
  714. dout("%s updating %llx %p %lld -> %lld\n", __func__,
  715. realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
  716. /* update realm parameters, snap lists */
  717. realm->seq = le64_to_cpu(ri->seq);
  718. realm->created = le64_to_cpu(ri->created);
  719. realm->parent_since = le64_to_cpu(ri->parent_since);
  720. realm->num_snaps = le32_to_cpu(ri->num_snaps);
  721. err = dup_array(&realm->snaps, snaps, realm->num_snaps);
  722. if (err < 0)
  723. goto fail;
  724. realm->num_prior_parent_snaps =
  725. le32_to_cpu(ri->num_prior_parent_snaps);
  726. err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps,
  727. realm->num_prior_parent_snaps);
  728. if (err < 0)
  729. goto fail;
  730. if (realm->seq > mdsc->last_snap_seq)
  731. mdsc->last_snap_seq = realm->seq;
  732. rebuild_snapcs = 1;
  733. } else if (!realm->cached_context) {
  734. dout("%s %llx %p seq %lld new\n", __func__,
  735. realm->ino, realm, realm->seq);
  736. rebuild_snapcs = 1;
  737. } else {
  738. dout("%s %llx %p seq %lld unchanged\n", __func__,
  739. realm->ino, realm, realm->seq);
  740. }
  741. dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
  742. realm, rebuild_snapcs, p, e);
  743. /*
  744. * this will always track the uppest parent realm from which
  745. * we need to rebuild the snapshot contexts _downward_ in
  746. * hierarchy.
  747. */
  748. if (rebuild_snapcs)
  749. realm_to_rebuild = realm;
  750. /* rebuild_snapcs when we reach the _end_ (root) of the trace */
  751. if (realm_to_rebuild && p >= e)
  752. rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
  753. if (!first_realm)
  754. first_realm = realm;
  755. else
  756. ceph_put_snap_realm(mdsc, realm);
  757. if (p < e)
  758. goto more;
  759. /*
  760. * queue cap snaps _after_ we've built the new snap contexts,
  761. * so that i_head_snapc can be set appropriately.
  762. */
  763. while (!list_empty(&dirty_realms)) {
  764. realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
  765. dirty_item);
  766. list_del_init(&realm->dirty_item);
  767. queue_realm_cap_snaps(realm);
  768. }
  769. if (realm_ret)
  770. *realm_ret = first_realm;
  771. else
  772. ceph_put_snap_realm(mdsc, first_realm);
  773. __cleanup_empty_realms(mdsc);
  774. return 0;
  775. bad:
  776. err = -EIO;
  777. fail:
  778. if (realm && !IS_ERR(realm))
  779. ceph_put_snap_realm(mdsc, realm);
  780. if (first_realm)
  781. ceph_put_snap_realm(mdsc, first_realm);
  782. pr_err("%s error %d\n", __func__, err);
  783. /*
  784. * When receiving a corrupted snap trace we don't know what
  785. * exactly has happened in MDS side. And we shouldn't continue
  786. * writing to OSD, which may corrupt the snapshot contents.
  787. *
  788. * Just try to blocklist this kclient and then this kclient
  789. * must be remounted to continue after the corrupted metadata
  790. * fixed in the MDS side.
  791. */
  792. WRITE_ONCE(mdsc->fsc->mount_state, CEPH_MOUNT_FENCE_IO);
  793. ret = ceph_monc_blocklist_add(&client->monc, &client->msgr.inst.addr);
  794. if (ret)
  795. pr_err("%s failed to blocklist %s: %d\n", __func__,
  796. ceph_pr_addr(&client->msgr.inst.addr), ret);
  797. WARN(1, "%s: %s%sdo remount to continue%s",
  798. __func__, ret ? "" : ceph_pr_addr(&client->msgr.inst.addr),
  799. ret ? "" : " was blocklisted, ",
  800. err == -EIO ? " after corrupted snaptrace is fixed" : "");
  801. return err;
  802. }
  803. /*
  804. * Send any cap_snaps that are queued for flush. Try to carry
  805. * s_mutex across multiple snap flushes to avoid locking overhead.
  806. *
  807. * Caller holds no locks.
  808. */
  809. static void flush_snaps(struct ceph_mds_client *mdsc)
  810. {
  811. struct ceph_inode_info *ci;
  812. struct inode *inode;
  813. struct ceph_mds_session *session = NULL;
  814. dout("%s\n", __func__);
  815. spin_lock(&mdsc->snap_flush_lock);
  816. while (!list_empty(&mdsc->snap_flush_list)) {
  817. ci = list_first_entry(&mdsc->snap_flush_list,
  818. struct ceph_inode_info, i_snap_flush_item);
  819. inode = &ci->netfs.inode;
  820. ihold(inode);
  821. spin_unlock(&mdsc->snap_flush_lock);
  822. ceph_flush_snaps(ci, &session);
  823. iput(inode);
  824. spin_lock(&mdsc->snap_flush_lock);
  825. }
  826. spin_unlock(&mdsc->snap_flush_lock);
  827. ceph_put_mds_session(session);
  828. dout("%s done\n", __func__);
  829. }
  830. /**
  831. * ceph_change_snap_realm - change the snap_realm for an inode
  832. * @inode: inode to move to new snap realm
  833. * @realm: new realm to move inode into (may be NULL)
  834. *
  835. * Detach an inode from its old snaprealm (if any) and attach it to
  836. * the new snaprealm (if any). The old snap realm reference held by
  837. * the inode is put. If realm is non-NULL, then the caller's reference
  838. * to it is taken over by the inode.
  839. */
  840. void ceph_change_snap_realm(struct inode *inode, struct ceph_snap_realm *realm)
  841. {
  842. struct ceph_inode_info *ci = ceph_inode(inode);
  843. struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
  844. struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
  845. lockdep_assert_held(&ci->i_ceph_lock);
  846. if (oldrealm) {
  847. spin_lock(&oldrealm->inodes_with_caps_lock);
  848. list_del_init(&ci->i_snap_realm_item);
  849. if (oldrealm->ino == ci->i_vino.ino)
  850. oldrealm->inode = NULL;
  851. spin_unlock(&oldrealm->inodes_with_caps_lock);
  852. ceph_put_snap_realm(mdsc, oldrealm);
  853. }
  854. ci->i_snap_realm = realm;
  855. if (realm) {
  856. spin_lock(&realm->inodes_with_caps_lock);
  857. list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
  858. if (realm->ino == ci->i_vino.ino)
  859. realm->inode = inode;
  860. spin_unlock(&realm->inodes_with_caps_lock);
  861. }
  862. }
  863. /*
  864. * Handle a snap notification from the MDS.
  865. *
  866. * This can take two basic forms: the simplest is just a snap creation
  867. * or deletion notification on an existing realm. This should update the
  868. * realm and its children.
  869. *
  870. * The more difficult case is realm creation, due to snap creation at a
  871. * new point in the file hierarchy, or due to a rename that moves a file or
  872. * directory into another realm.
  873. */
  874. void ceph_handle_snap(struct ceph_mds_client *mdsc,
  875. struct ceph_mds_session *session,
  876. struct ceph_msg *msg)
  877. {
  878. struct super_block *sb = mdsc->fsc->sb;
  879. int mds = session->s_mds;
  880. u64 split;
  881. int op;
  882. int trace_len;
  883. struct ceph_snap_realm *realm = NULL;
  884. void *p = msg->front.iov_base;
  885. void *e = p + msg->front.iov_len;
  886. struct ceph_mds_snap_head *h;
  887. int num_split_inos, num_split_realms;
  888. __le64 *split_inos = NULL, *split_realms = NULL;
  889. int i;
  890. int locked_rwsem = 0;
  891. bool close_sessions = false;
  892. if (!ceph_inc_mds_stopping_blocker(mdsc, session))
  893. return;
  894. /* decode */
  895. if (msg->front.iov_len < sizeof(*h))
  896. goto bad;
  897. h = p;
  898. op = le32_to_cpu(h->op);
  899. split = le64_to_cpu(h->split); /* non-zero if we are splitting an
  900. * existing realm */
  901. num_split_inos = le32_to_cpu(h->num_split_inos);
  902. num_split_realms = le32_to_cpu(h->num_split_realms);
  903. trace_len = le32_to_cpu(h->trace_len);
  904. p += sizeof(*h);
  905. dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
  906. mds, ceph_snap_op_name(op), split, trace_len);
  907. down_write(&mdsc->snap_rwsem);
  908. locked_rwsem = 1;
  909. if (op == CEPH_SNAP_OP_SPLIT) {
  910. struct ceph_mds_snap_realm *ri;
  911. /*
  912. * A "split" breaks part of an existing realm off into
  913. * a new realm. The MDS provides a list of inodes
  914. * (with caps) and child realms that belong to the new
  915. * child.
  916. */
  917. split_inos = p;
  918. p += sizeof(u64) * num_split_inos;
  919. split_realms = p;
  920. p += sizeof(u64) * num_split_realms;
  921. ceph_decode_need(&p, e, sizeof(*ri), bad);
  922. /* we will peek at realm info here, but will _not_
  923. * advance p, as the realm update will occur below in
  924. * ceph_update_snap_trace. */
  925. ri = p;
  926. realm = ceph_lookup_snap_realm(mdsc, split);
  927. if (!realm) {
  928. realm = ceph_create_snap_realm(mdsc, split);
  929. if (IS_ERR(realm))
  930. goto out;
  931. }
  932. dout("splitting snap_realm %llx %p\n", realm->ino, realm);
  933. for (i = 0; i < num_split_inos; i++) {
  934. struct ceph_vino vino = {
  935. .ino = le64_to_cpu(split_inos[i]),
  936. .snap = CEPH_NOSNAP,
  937. };
  938. struct inode *inode = ceph_find_inode(sb, vino);
  939. struct ceph_inode_info *ci;
  940. if (!inode)
  941. continue;
  942. ci = ceph_inode(inode);
  943. spin_lock(&ci->i_ceph_lock);
  944. if (!ci->i_snap_realm)
  945. goto skip_inode;
  946. /*
  947. * If this inode belongs to a realm that was
  948. * created after our new realm, we experienced
  949. * a race (due to another split notifications
  950. * arriving from a different MDS). So skip
  951. * this inode.
  952. */
  953. if (ci->i_snap_realm->created >
  954. le64_to_cpu(ri->created)) {
  955. dout(" leaving %p %llx.%llx in newer realm %llx %p\n",
  956. inode, ceph_vinop(inode), ci->i_snap_realm->ino,
  957. ci->i_snap_realm);
  958. goto skip_inode;
  959. }
  960. dout(" will move %p %llx.%llx to split realm %llx %p\n",
  961. inode, ceph_vinop(inode), realm->ino, realm);
  962. ceph_get_snap_realm(mdsc, realm);
  963. ceph_change_snap_realm(inode, realm);
  964. spin_unlock(&ci->i_ceph_lock);
  965. iput(inode);
  966. continue;
  967. skip_inode:
  968. spin_unlock(&ci->i_ceph_lock);
  969. iput(inode);
  970. }
  971. /* we may have taken some of the old realm's children. */
  972. for (i = 0; i < num_split_realms; i++) {
  973. struct ceph_snap_realm *child =
  974. __lookup_snap_realm(mdsc,
  975. le64_to_cpu(split_realms[i]));
  976. if (!child)
  977. continue;
  978. adjust_snap_realm_parent(mdsc, child, realm->ino);
  979. }
  980. } else {
  981. /*
  982. * In the non-split case both 'num_split_inos' and
  983. * 'num_split_realms' should be 0, making this a no-op.
  984. * However the MDS happens to populate 'split_realms' list
  985. * in one of the UPDATE op cases by mistake.
  986. *
  987. * Skip both lists just in case to ensure that 'p' is
  988. * positioned at the start of realm info, as expected by
  989. * ceph_update_snap_trace().
  990. */
  991. p += sizeof(u64) * num_split_inos;
  992. p += sizeof(u64) * num_split_realms;
  993. }
  994. /*
  995. * update using the provided snap trace. if we are deleting a
  996. * snap, we can avoid queueing cap_snaps.
  997. */
  998. if (ceph_update_snap_trace(mdsc, p, e,
  999. op == CEPH_SNAP_OP_DESTROY,
  1000. NULL)) {
  1001. close_sessions = true;
  1002. goto bad;
  1003. }
  1004. if (op == CEPH_SNAP_OP_SPLIT)
  1005. /* we took a reference when we created the realm, above */
  1006. ceph_put_snap_realm(mdsc, realm);
  1007. __cleanup_empty_realms(mdsc);
  1008. up_write(&mdsc->snap_rwsem);
  1009. flush_snaps(mdsc);
  1010. ceph_dec_mds_stopping_blocker(mdsc);
  1011. return;
  1012. bad:
  1013. pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
  1014. ceph_msg_dump(msg);
  1015. out:
  1016. if (locked_rwsem)
  1017. up_write(&mdsc->snap_rwsem);
  1018. ceph_dec_mds_stopping_blocker(mdsc);
  1019. if (close_sessions)
  1020. ceph_mdsc_close_sessions(mdsc);
  1021. return;
  1022. }
  1023. struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
  1024. u64 snap)
  1025. {
  1026. struct ceph_snapid_map *sm, *exist;
  1027. struct rb_node **p, *parent;
  1028. int ret;
  1029. exist = NULL;
  1030. spin_lock(&mdsc->snapid_map_lock);
  1031. p = &mdsc->snapid_map_tree.rb_node;
  1032. while (*p) {
  1033. exist = rb_entry(*p, struct ceph_snapid_map, node);
  1034. if (snap > exist->snap) {
  1035. p = &(*p)->rb_left;
  1036. } else if (snap < exist->snap) {
  1037. p = &(*p)->rb_right;
  1038. } else {
  1039. if (atomic_inc_return(&exist->ref) == 1)
  1040. list_del_init(&exist->lru);
  1041. break;
  1042. }
  1043. exist = NULL;
  1044. }
  1045. spin_unlock(&mdsc->snapid_map_lock);
  1046. if (exist) {
  1047. dout("%s found snapid map %llx -> %x\n", __func__,
  1048. exist->snap, exist->dev);
  1049. return exist;
  1050. }
  1051. sm = kmalloc(sizeof(*sm), GFP_NOFS);
  1052. if (!sm)
  1053. return NULL;
  1054. ret = get_anon_bdev(&sm->dev);
  1055. if (ret < 0) {
  1056. kfree(sm);
  1057. return NULL;
  1058. }
  1059. INIT_LIST_HEAD(&sm->lru);
  1060. atomic_set(&sm->ref, 1);
  1061. sm->snap = snap;
  1062. exist = NULL;
  1063. parent = NULL;
  1064. p = &mdsc->snapid_map_tree.rb_node;
  1065. spin_lock(&mdsc->snapid_map_lock);
  1066. while (*p) {
  1067. parent = *p;
  1068. exist = rb_entry(*p, struct ceph_snapid_map, node);
  1069. if (snap > exist->snap)
  1070. p = &(*p)->rb_left;
  1071. else if (snap < exist->snap)
  1072. p = &(*p)->rb_right;
  1073. else
  1074. break;
  1075. exist = NULL;
  1076. }
  1077. if (exist) {
  1078. if (atomic_inc_return(&exist->ref) == 1)
  1079. list_del_init(&exist->lru);
  1080. } else {
  1081. rb_link_node(&sm->node, parent, p);
  1082. rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
  1083. }
  1084. spin_unlock(&mdsc->snapid_map_lock);
  1085. if (exist) {
  1086. free_anon_bdev(sm->dev);
  1087. kfree(sm);
  1088. dout("%s found snapid map %llx -> %x\n", __func__,
  1089. exist->snap, exist->dev);
  1090. return exist;
  1091. }
  1092. dout("%s create snapid map %llx -> %x\n", __func__,
  1093. sm->snap, sm->dev);
  1094. return sm;
  1095. }
  1096. void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
  1097. struct ceph_snapid_map *sm)
  1098. {
  1099. if (!sm)
  1100. return;
  1101. if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
  1102. if (!RB_EMPTY_NODE(&sm->node)) {
  1103. sm->last_used = jiffies;
  1104. list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
  1105. spin_unlock(&mdsc->snapid_map_lock);
  1106. } else {
  1107. /* already cleaned up by
  1108. * ceph_cleanup_snapid_map() */
  1109. spin_unlock(&mdsc->snapid_map_lock);
  1110. kfree(sm);
  1111. }
  1112. }
  1113. }
  1114. void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
  1115. {
  1116. struct ceph_snapid_map *sm;
  1117. unsigned long now;
  1118. LIST_HEAD(to_free);
  1119. spin_lock(&mdsc->snapid_map_lock);
  1120. now = jiffies;
  1121. while (!list_empty(&mdsc->snapid_map_lru)) {
  1122. sm = list_first_entry(&mdsc->snapid_map_lru,
  1123. struct ceph_snapid_map, lru);
  1124. if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
  1125. break;
  1126. rb_erase(&sm->node, &mdsc->snapid_map_tree);
  1127. list_move(&sm->lru, &to_free);
  1128. }
  1129. spin_unlock(&mdsc->snapid_map_lock);
  1130. while (!list_empty(&to_free)) {
  1131. sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
  1132. list_del(&sm->lru);
  1133. dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
  1134. free_anon_bdev(sm->dev);
  1135. kfree(sm);
  1136. }
  1137. }
  1138. void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
  1139. {
  1140. struct ceph_snapid_map *sm;
  1141. struct rb_node *p;
  1142. LIST_HEAD(to_free);
  1143. spin_lock(&mdsc->snapid_map_lock);
  1144. while ((p = rb_first(&mdsc->snapid_map_tree))) {
  1145. sm = rb_entry(p, struct ceph_snapid_map, node);
  1146. rb_erase(p, &mdsc->snapid_map_tree);
  1147. RB_CLEAR_NODE(p);
  1148. list_move(&sm->lru, &to_free);
  1149. }
  1150. spin_unlock(&mdsc->snapid_map_lock);
  1151. while (!list_empty(&to_free)) {
  1152. sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
  1153. list_del(&sm->lru);
  1154. free_anon_bdev(sm->dev);
  1155. if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
  1156. pr_err("snapid map %llx -> %x still in use\n",
  1157. sm->snap, sm->dev);
  1158. }
  1159. kfree(sm);
  1160. }
  1161. }