ringbuf.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
  2. /*
  3. * Ring buffer operations.
  4. *
  5. * Copyright (C) 2020 Facebook, Inc.
  6. */
  7. #ifndef _GNU_SOURCE
  8. #define _GNU_SOURCE
  9. #endif
  10. #include <stdlib.h>
  11. #include <stdio.h>
  12. #include <errno.h>
  13. #include <unistd.h>
  14. #include <linux/err.h>
  15. #include <linux/bpf.h>
  16. #include <asm/barrier.h>
  17. #include <sys/mman.h>
  18. #include <sys/epoll.h>
  19. #include <time.h>
  20. #include "libbpf.h"
  21. #include "libbpf_internal.h"
  22. #include "bpf.h"
  23. struct ring {
  24. ring_buffer_sample_fn sample_cb;
  25. void *ctx;
  26. void *data;
  27. unsigned long *consumer_pos;
  28. unsigned long *producer_pos;
  29. unsigned long mask;
  30. int map_fd;
  31. };
  32. struct ring_buffer {
  33. struct epoll_event *events;
  34. struct ring *rings;
  35. size_t page_size;
  36. int epoll_fd;
  37. int ring_cnt;
  38. };
  39. struct user_ring_buffer {
  40. struct epoll_event event;
  41. unsigned long *consumer_pos;
  42. unsigned long *producer_pos;
  43. void *data;
  44. unsigned long mask;
  45. size_t page_size;
  46. int map_fd;
  47. int epoll_fd;
  48. };
  49. /* 8-byte ring buffer header structure */
  50. struct ringbuf_hdr {
  51. __u32 len;
  52. __u32 pad;
  53. };
  54. static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
  55. {
  56. if (r->consumer_pos) {
  57. munmap(r->consumer_pos, rb->page_size);
  58. r->consumer_pos = NULL;
  59. }
  60. if (r->producer_pos) {
  61. munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
  62. r->producer_pos = NULL;
  63. }
  64. }
  65. /* Add extra RINGBUF maps to this ring buffer manager */
  66. int ring_buffer__add(struct ring_buffer *rb, int map_fd,
  67. ring_buffer_sample_fn sample_cb, void *ctx)
  68. {
  69. struct bpf_map_info info;
  70. __u32 len = sizeof(info);
  71. struct epoll_event *e;
  72. struct ring *r;
  73. __u64 mmap_sz;
  74. void *tmp;
  75. int err;
  76. memset(&info, 0, sizeof(info));
  77. err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
  78. if (err) {
  79. err = -errno;
  80. pr_warn("ringbuf: failed to get map info for fd=%d: %d\n",
  81. map_fd, err);
  82. return libbpf_err(err);
  83. }
  84. if (info.type != BPF_MAP_TYPE_RINGBUF) {
  85. pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
  86. map_fd);
  87. return libbpf_err(-EINVAL);
  88. }
  89. tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
  90. if (!tmp)
  91. return libbpf_err(-ENOMEM);
  92. rb->rings = tmp;
  93. tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
  94. if (!tmp)
  95. return libbpf_err(-ENOMEM);
  96. rb->events = tmp;
  97. r = &rb->rings[rb->ring_cnt];
  98. memset(r, 0, sizeof(*r));
  99. r->map_fd = map_fd;
  100. r->sample_cb = sample_cb;
  101. r->ctx = ctx;
  102. r->mask = info.max_entries - 1;
  103. /* Map writable consumer page */
  104. tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0);
  105. if (tmp == MAP_FAILED) {
  106. err = -errno;
  107. pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
  108. map_fd, err);
  109. return libbpf_err(err);
  110. }
  111. r->consumer_pos = tmp;
  112. /* Map read-only producer page and data pages. We map twice as big
  113. * data size to allow simple reading of samples that wrap around the
  114. * end of a ring buffer. See kernel implementation for details.
  115. * */
  116. mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
  117. if (mmap_sz != (__u64)(size_t)mmap_sz) {
  118. pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries);
  119. return libbpf_err(-E2BIG);
  120. }
  121. tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size);
  122. if (tmp == MAP_FAILED) {
  123. err = -errno;
  124. ringbuf_unmap_ring(rb, r);
  125. pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
  126. map_fd, err);
  127. return libbpf_err(err);
  128. }
  129. r->producer_pos = tmp;
  130. r->data = tmp + rb->page_size;
  131. e = &rb->events[rb->ring_cnt];
  132. memset(e, 0, sizeof(*e));
  133. e->events = EPOLLIN;
  134. e->data.fd = rb->ring_cnt;
  135. if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
  136. err = -errno;
  137. ringbuf_unmap_ring(rb, r);
  138. pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
  139. map_fd, err);
  140. return libbpf_err(err);
  141. }
  142. rb->ring_cnt++;
  143. return 0;
  144. }
  145. void ring_buffer__free(struct ring_buffer *rb)
  146. {
  147. int i;
  148. if (!rb)
  149. return;
  150. for (i = 0; i < rb->ring_cnt; ++i)
  151. ringbuf_unmap_ring(rb, &rb->rings[i]);
  152. if (rb->epoll_fd >= 0)
  153. close(rb->epoll_fd);
  154. free(rb->events);
  155. free(rb->rings);
  156. free(rb);
  157. }
  158. struct ring_buffer *
  159. ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
  160. const struct ring_buffer_opts *opts)
  161. {
  162. struct ring_buffer *rb;
  163. int err;
  164. if (!OPTS_VALID(opts, ring_buffer_opts))
  165. return errno = EINVAL, NULL;
  166. rb = calloc(1, sizeof(*rb));
  167. if (!rb)
  168. return errno = ENOMEM, NULL;
  169. rb->page_size = getpagesize();
  170. rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
  171. if (rb->epoll_fd < 0) {
  172. err = -errno;
  173. pr_warn("ringbuf: failed to create epoll instance: %d\n", err);
  174. goto err_out;
  175. }
  176. err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
  177. if (err)
  178. goto err_out;
  179. return rb;
  180. err_out:
  181. ring_buffer__free(rb);
  182. return errno = -err, NULL;
  183. }
  184. static inline int roundup_len(__u32 len)
  185. {
  186. /* clear out top 2 bits (discard and busy, if set) */
  187. len <<= 2;
  188. len >>= 2;
  189. /* add length prefix */
  190. len += BPF_RINGBUF_HDR_SZ;
  191. /* round up to 8 byte alignment */
  192. return (len + 7) / 8 * 8;
  193. }
  194. static int64_t ringbuf_process_ring(struct ring* r)
  195. {
  196. int *len_ptr, len, err;
  197. /* 64-bit to avoid overflow in case of extreme application behavior */
  198. int64_t cnt = 0;
  199. unsigned long cons_pos, prod_pos;
  200. bool got_new_data;
  201. void *sample;
  202. cons_pos = smp_load_acquire(r->consumer_pos);
  203. do {
  204. got_new_data = false;
  205. prod_pos = smp_load_acquire(r->producer_pos);
  206. while (cons_pos < prod_pos) {
  207. len_ptr = r->data + (cons_pos & r->mask);
  208. len = smp_load_acquire(len_ptr);
  209. /* sample not committed yet, bail out for now */
  210. if (len & BPF_RINGBUF_BUSY_BIT)
  211. goto done;
  212. got_new_data = true;
  213. cons_pos += roundup_len(len);
  214. if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
  215. sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
  216. err = r->sample_cb(r->ctx, sample, len);
  217. if (err < 0) {
  218. /* update consumer pos and bail out */
  219. smp_store_release(r->consumer_pos,
  220. cons_pos);
  221. return err;
  222. }
  223. cnt++;
  224. }
  225. smp_store_release(r->consumer_pos, cons_pos);
  226. }
  227. } while (got_new_data);
  228. done:
  229. return cnt;
  230. }
  231. /* Consume available ring buffer(s) data without event polling.
  232. * Returns number of records consumed across all registered ring buffers (or
  233. * INT_MAX, whichever is less), or negative number if any of the callbacks
  234. * return error.
  235. */
  236. int ring_buffer__consume(struct ring_buffer *rb)
  237. {
  238. int64_t err, res = 0;
  239. int i;
  240. for (i = 0; i < rb->ring_cnt; i++) {
  241. struct ring *ring = &rb->rings[i];
  242. err = ringbuf_process_ring(ring);
  243. if (err < 0)
  244. return libbpf_err(err);
  245. res += err;
  246. }
  247. if (res > INT_MAX)
  248. return INT_MAX;
  249. return res;
  250. }
  251. /* Poll for available data and consume records, if any are available.
  252. * Returns number of records consumed (or INT_MAX, whichever is less), or
  253. * negative number, if any of the registered callbacks returned error.
  254. */
  255. int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
  256. {
  257. int i, cnt;
  258. int64_t err, res = 0;
  259. cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
  260. if (cnt < 0)
  261. return libbpf_err(-errno);
  262. for (i = 0; i < cnt; i++) {
  263. __u32 ring_id = rb->events[i].data.fd;
  264. struct ring *ring = &rb->rings[ring_id];
  265. err = ringbuf_process_ring(ring);
  266. if (err < 0)
  267. return libbpf_err(err);
  268. res += err;
  269. }
  270. if (res > INT_MAX)
  271. return INT_MAX;
  272. return res;
  273. }
  274. /* Get an fd that can be used to sleep until data is available in the ring(s) */
  275. int ring_buffer__epoll_fd(const struct ring_buffer *rb)
  276. {
  277. return rb->epoll_fd;
  278. }
  279. static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
  280. {
  281. if (rb->consumer_pos) {
  282. munmap(rb->consumer_pos, rb->page_size);
  283. rb->consumer_pos = NULL;
  284. }
  285. if (rb->producer_pos) {
  286. munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
  287. rb->producer_pos = NULL;
  288. }
  289. }
  290. void user_ring_buffer__free(struct user_ring_buffer *rb)
  291. {
  292. if (!rb)
  293. return;
  294. user_ringbuf_unmap_ring(rb);
  295. if (rb->epoll_fd >= 0)
  296. close(rb->epoll_fd);
  297. free(rb);
  298. }
  299. static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd)
  300. {
  301. struct bpf_map_info info;
  302. __u32 len = sizeof(info);
  303. __u64 mmap_sz;
  304. void *tmp;
  305. struct epoll_event *rb_epoll;
  306. int err;
  307. memset(&info, 0, sizeof(info));
  308. err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
  309. if (err) {
  310. err = -errno;
  311. pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", map_fd, err);
  312. return err;
  313. }
  314. if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
  315. pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd);
  316. return -EINVAL;
  317. }
  318. rb->map_fd = map_fd;
  319. rb->mask = info.max_entries - 1;
  320. /* Map read-only consumer page */
  321. tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
  322. if (tmp == MAP_FAILED) {
  323. err = -errno;
  324. pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
  325. map_fd, err);
  326. return err;
  327. }
  328. rb->consumer_pos = tmp;
  329. /* Map read-write the producer page and data pages. We map the data
  330. * region as twice the total size of the ring buffer to allow the
  331. * simple reading and writing of samples that wrap around the end of
  332. * the buffer. See the kernel implementation for details.
  333. */
  334. mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
  335. if (mmap_sz != (__u64)(size_t)mmap_sz) {
  336. pr_warn("user ringbuf: ring buf size (%u) is too big\n", info.max_entries);
  337. return -E2BIG;
  338. }
  339. tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED,
  340. map_fd, rb->page_size);
  341. if (tmp == MAP_FAILED) {
  342. err = -errno;
  343. pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
  344. map_fd, err);
  345. return err;
  346. }
  347. rb->producer_pos = tmp;
  348. rb->data = tmp + rb->page_size;
  349. rb_epoll = &rb->event;
  350. rb_epoll->events = EPOLLOUT;
  351. if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
  352. err = -errno;
  353. pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err);
  354. return err;
  355. }
  356. return 0;
  357. }
  358. struct user_ring_buffer *
  359. user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
  360. {
  361. struct user_ring_buffer *rb;
  362. int err;
  363. if (!OPTS_VALID(opts, user_ring_buffer_opts))
  364. return errno = EINVAL, NULL;
  365. rb = calloc(1, sizeof(*rb));
  366. if (!rb)
  367. return errno = ENOMEM, NULL;
  368. rb->page_size = getpagesize();
  369. rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
  370. if (rb->epoll_fd < 0) {
  371. err = -errno;
  372. pr_warn("user ringbuf: failed to create epoll instance: %d\n", err);
  373. goto err_out;
  374. }
  375. err = user_ringbuf_map(rb, map_fd);
  376. if (err)
  377. goto err_out;
  378. return rb;
  379. err_out:
  380. user_ring_buffer__free(rb);
  381. return errno = -err, NULL;
  382. }
  383. static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard)
  384. {
  385. __u32 new_len;
  386. struct ringbuf_hdr *hdr;
  387. uintptr_t hdr_offset;
  388. hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ;
  389. hdr = rb->data + (hdr_offset & rb->mask);
  390. new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
  391. if (discard)
  392. new_len |= BPF_RINGBUF_DISCARD_BIT;
  393. /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
  394. * the kernel.
  395. */
  396. __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
  397. }
  398. void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
  399. {
  400. user_ringbuf_commit(rb, sample, true);
  401. }
  402. void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
  403. {
  404. user_ringbuf_commit(rb, sample, false);
  405. }
  406. void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
  407. {
  408. __u32 avail_size, total_size, max_size;
  409. /* 64-bit to avoid overflow in case of extreme application behavior */
  410. __u64 cons_pos, prod_pos;
  411. struct ringbuf_hdr *hdr;
  412. /* The top two bits are used as special flags */
  413. if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT))
  414. return errno = E2BIG, NULL;
  415. /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
  416. * the kernel.
  417. */
  418. cons_pos = smp_load_acquire(rb->consumer_pos);
  419. /* Synchronizes with smp_store_release() in user_ringbuf_commit() */
  420. prod_pos = smp_load_acquire(rb->producer_pos);
  421. max_size = rb->mask + 1;
  422. avail_size = max_size - (prod_pos - cons_pos);
  423. /* Round up total size to a multiple of 8. */
  424. total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;
  425. if (total_size > max_size)
  426. return errno = E2BIG, NULL;
  427. if (avail_size < total_size)
  428. return errno = ENOSPC, NULL;
  429. hdr = rb->data + (prod_pos & rb->mask);
  430. hdr->len = size | BPF_RINGBUF_BUSY_BIT;
  431. hdr->pad = 0;
  432. /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
  433. * the kernel.
  434. */
  435. smp_store_release(rb->producer_pos, prod_pos + total_size);
  436. return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
  437. }
  438. static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end)
  439. {
  440. __u64 start_ns, end_ns, ns_per_s = 1000000000;
  441. start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec;
  442. end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec;
  443. return end_ns - start_ns;
  444. }
  445. void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
  446. {
  447. void *sample;
  448. int err, ms_remaining = timeout_ms;
  449. struct timespec start;
  450. if (timeout_ms < 0 && timeout_ms != -1)
  451. return errno = EINVAL, NULL;
  452. if (timeout_ms != -1) {
  453. err = clock_gettime(CLOCK_MONOTONIC, &start);
  454. if (err)
  455. return NULL;
  456. }
  457. do {
  458. int cnt, ms_elapsed;
  459. struct timespec curr;
  460. __u64 ns_per_ms = 1000000;
  461. sample = user_ring_buffer__reserve(rb, size);
  462. if (sample)
  463. return sample;
  464. else if (errno != ENOSPC)
  465. return NULL;
  466. /* The kernel guarantees at least one event notification
  467. * delivery whenever at least one sample is drained from the
  468. * ring buffer in an invocation to bpf_ringbuf_drain(). Other
  469. * additional events may be delivered at any time, but only one
  470. * event is guaranteed per bpf_ringbuf_drain() invocation,
  471. * provided that a sample is drained, and the BPF program did
  472. * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If
  473. * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a
  474. * wakeup event will be delivered even if no samples are
  475. * drained.
  476. */
  477. cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
  478. if (cnt < 0)
  479. return NULL;
  480. if (timeout_ms == -1)
  481. continue;
  482. err = clock_gettime(CLOCK_MONOTONIC, &curr);
  483. if (err)
  484. return NULL;
  485. ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms;
  486. ms_remaining = timeout_ms - ms_elapsed;
  487. } while (ms_remaining > 0);
  488. /* Try one more time to reserve a sample after the specified timeout has elapsed. */
  489. return user_ring_buffer__reserve(rb, size);
  490. }