Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/lib/bpf/ringbuf.c
29278 views
1
// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
2
/*
3
* Ring buffer operations.
4
*
5
* Copyright (C) 2020 Facebook, Inc.
6
*/
7
#ifndef _GNU_SOURCE
8
#define _GNU_SOURCE
9
#endif
10
#include <stdlib.h>
11
#include <stdio.h>
12
#include <errno.h>
13
#include <unistd.h>
14
#include <linux/err.h>
15
#include <linux/bpf.h>
16
#include <asm/barrier.h>
17
#include <sys/mman.h>
18
#include <sys/epoll.h>
19
#include <time.h>
20
21
#include "libbpf.h"
22
#include "libbpf_internal.h"
23
#include "bpf.h"
24
25
struct ring {
26
ring_buffer_sample_fn sample_cb;
27
void *ctx;
28
void *data;
29
unsigned long *consumer_pos;
30
unsigned long *producer_pos;
31
unsigned long mask;
32
int map_fd;
33
};
34
35
struct ring_buffer {
36
struct epoll_event *events;
37
struct ring **rings;
38
size_t page_size;
39
int epoll_fd;
40
int ring_cnt;
41
};
42
43
struct user_ring_buffer {
44
struct epoll_event event;
45
unsigned long *consumer_pos;
46
unsigned long *producer_pos;
47
void *data;
48
unsigned long mask;
49
size_t page_size;
50
int map_fd;
51
int epoll_fd;
52
};
53
54
/* 8-byte ring buffer header structure */
55
struct ringbuf_hdr {
56
__u32 len;
57
__u32 pad;
58
};
59
60
static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
61
{
62
if (r->consumer_pos) {
63
munmap(r->consumer_pos, rb->page_size);
64
r->consumer_pos = NULL;
65
}
66
if (r->producer_pos) {
67
munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
68
r->producer_pos = NULL;
69
}
70
71
free(r);
72
}
73
74
/* Add extra RINGBUF maps to this ring buffer manager */
75
int ring_buffer__add(struct ring_buffer *rb, int map_fd,
76
ring_buffer_sample_fn sample_cb, void *ctx)
77
{
78
struct bpf_map_info info;
79
__u32 len = sizeof(info);
80
struct epoll_event *e;
81
struct ring *r;
82
__u64 mmap_sz;
83
void *tmp;
84
int err;
85
86
memset(&info, 0, sizeof(info));
87
88
err = bpf_map_get_info_by_fd(map_fd, &info, &len);
89
if (err) {
90
err = -errno;
91
pr_warn("ringbuf: failed to get map info for fd=%d: %s\n",
92
map_fd, errstr(err));
93
return libbpf_err(err);
94
}
95
96
if (info.type != BPF_MAP_TYPE_RINGBUF) {
97
pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
98
map_fd);
99
return libbpf_err(-EINVAL);
100
}
101
102
tmp = libbpf_reallocarray(rb->rings, rb->ring_cnt + 1, sizeof(*rb->rings));
103
if (!tmp)
104
return libbpf_err(-ENOMEM);
105
rb->rings = tmp;
106
107
tmp = libbpf_reallocarray(rb->events, rb->ring_cnt + 1, sizeof(*rb->events));
108
if (!tmp)
109
return libbpf_err(-ENOMEM);
110
rb->events = tmp;
111
112
r = calloc(1, sizeof(*r));
113
if (!r)
114
return libbpf_err(-ENOMEM);
115
rb->rings[rb->ring_cnt] = r;
116
117
r->map_fd = map_fd;
118
r->sample_cb = sample_cb;
119
r->ctx = ctx;
120
r->mask = info.max_entries - 1;
121
122
/* Map writable consumer page */
123
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0);
124
if (tmp == MAP_FAILED) {
125
err = -errno;
126
pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
127
map_fd, errstr(err));
128
goto err_out;
129
}
130
r->consumer_pos = tmp;
131
132
/* Map read-only producer page and data pages. We map twice as big
133
* data size to allow simple reading of samples that wrap around the
134
* end of a ring buffer. See kernel implementation for details.
135
*/
136
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
137
if (mmap_sz != (__u64)(size_t)mmap_sz) {
138
err = -E2BIG;
139
pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries);
140
goto err_out;
141
}
142
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size);
143
if (tmp == MAP_FAILED) {
144
err = -errno;
145
pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %s\n",
146
map_fd, errstr(err));
147
goto err_out;
148
}
149
r->producer_pos = tmp;
150
r->data = tmp + rb->page_size;
151
152
e = &rb->events[rb->ring_cnt];
153
memset(e, 0, sizeof(*e));
154
155
e->events = EPOLLIN;
156
e->data.fd = rb->ring_cnt;
157
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
158
err = -errno;
159
pr_warn("ringbuf: failed to epoll add map fd=%d: %s\n",
160
map_fd, errstr(err));
161
goto err_out;
162
}
163
164
rb->ring_cnt++;
165
return 0;
166
167
err_out:
168
ringbuf_free_ring(rb, r);
169
return libbpf_err(err);
170
}
171
172
void ring_buffer__free(struct ring_buffer *rb)
173
{
174
int i;
175
176
if (!rb)
177
return;
178
179
for (i = 0; i < rb->ring_cnt; ++i)
180
ringbuf_free_ring(rb, rb->rings[i]);
181
if (rb->epoll_fd >= 0)
182
close(rb->epoll_fd);
183
184
free(rb->events);
185
free(rb->rings);
186
free(rb);
187
}
188
189
struct ring_buffer *
190
ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
191
const struct ring_buffer_opts *opts)
192
{
193
struct ring_buffer *rb;
194
int err;
195
196
if (!OPTS_VALID(opts, ring_buffer_opts))
197
return errno = EINVAL, NULL;
198
199
rb = calloc(1, sizeof(*rb));
200
if (!rb)
201
return errno = ENOMEM, NULL;
202
203
rb->page_size = getpagesize();
204
205
rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
206
if (rb->epoll_fd < 0) {
207
err = -errno;
208
pr_warn("ringbuf: failed to create epoll instance: %s\n", errstr(err));
209
goto err_out;
210
}
211
212
err = ring_buffer__add(rb, map_fd, sample_cb, ctx);
213
if (err)
214
goto err_out;
215
216
return rb;
217
218
err_out:
219
ring_buffer__free(rb);
220
return errno = -err, NULL;
221
}
222
223
static inline int roundup_len(__u32 len)
224
{
225
/* clear out top 2 bits (discard and busy, if set) */
226
len <<= 2;
227
len >>= 2;
228
/* add length prefix */
229
len += BPF_RINGBUF_HDR_SZ;
230
/* round up to 8 byte alignment */
231
return (len + 7) / 8 * 8;
232
}
233
234
static int64_t ringbuf_process_ring(struct ring *r, size_t n)
235
{
236
int *len_ptr, len, err;
237
/* 64-bit to avoid overflow in case of extreme application behavior */
238
int64_t cnt = 0;
239
unsigned long cons_pos, prod_pos;
240
bool got_new_data;
241
void *sample;
242
243
cons_pos = smp_load_acquire(r->consumer_pos);
244
do {
245
got_new_data = false;
246
prod_pos = smp_load_acquire(r->producer_pos);
247
while (cons_pos < prod_pos) {
248
len_ptr = r->data + (cons_pos & r->mask);
249
len = smp_load_acquire(len_ptr);
250
251
/* sample not committed yet, bail out for now */
252
if (len & BPF_RINGBUF_BUSY_BIT)
253
goto done;
254
255
got_new_data = true;
256
cons_pos += roundup_len(len);
257
258
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
259
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
260
err = r->sample_cb(r->ctx, sample, len);
261
if (err < 0) {
262
/* update consumer pos and bail out */
263
smp_store_release(r->consumer_pos,
264
cons_pos);
265
return err;
266
}
267
cnt++;
268
}
269
270
smp_store_release(r->consumer_pos, cons_pos);
271
272
if (cnt >= n)
273
goto done;
274
}
275
} while (got_new_data);
276
done:
277
return cnt;
278
}
279
280
/* Consume available ring buffer(s) data without event polling, up to n
281
* records.
282
*
283
* Returns number of records consumed across all registered ring buffers (or
284
* n, whichever is less), or negative number if any of the callbacks return
285
* error.
286
*/
287
int ring_buffer__consume_n(struct ring_buffer *rb, size_t n)
288
{
289
int64_t err, res = 0;
290
int i;
291
292
for (i = 0; i < rb->ring_cnt; i++) {
293
struct ring *ring = rb->rings[i];
294
295
err = ringbuf_process_ring(ring, n);
296
if (err < 0)
297
return libbpf_err(err);
298
res += err;
299
n -= err;
300
301
if (n == 0)
302
break;
303
}
304
return res > INT_MAX ? INT_MAX : res;
305
}
306
307
/* Consume available ring buffer(s) data without event polling.
308
* Returns number of records consumed across all registered ring buffers (or
309
* INT_MAX, whichever is less), or negative number if any of the callbacks
310
* return error.
311
*/
312
int ring_buffer__consume(struct ring_buffer *rb)
313
{
314
int64_t err, res = 0;
315
int i;
316
317
for (i = 0; i < rb->ring_cnt; i++) {
318
struct ring *ring = rb->rings[i];
319
320
err = ringbuf_process_ring(ring, INT_MAX);
321
if (err < 0)
322
return libbpf_err(err);
323
res += err;
324
if (res > INT_MAX) {
325
res = INT_MAX;
326
break;
327
}
328
}
329
return res;
330
}
331
332
/* Poll for available data and consume records, if any are available.
333
* Returns number of records consumed (or INT_MAX, whichever is less), or
334
* negative number, if any of the registered callbacks returned error.
335
*/
336
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
337
{
338
int i, cnt;
339
int64_t err, res = 0;
340
341
cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
342
if (cnt < 0)
343
return libbpf_err(-errno);
344
345
for (i = 0; i < cnt; i++) {
346
__u32 ring_id = rb->events[i].data.fd;
347
struct ring *ring = rb->rings[ring_id];
348
349
err = ringbuf_process_ring(ring, INT_MAX);
350
if (err < 0)
351
return libbpf_err(err);
352
res += err;
353
}
354
if (res > INT_MAX)
355
res = INT_MAX;
356
return res;
357
}
358
359
/* Get an fd that can be used to sleep until data is available in the ring(s) */
360
int ring_buffer__epoll_fd(const struct ring_buffer *rb)
361
{
362
return rb->epoll_fd;
363
}
364
365
struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx)
366
{
367
if (idx >= rb->ring_cnt)
368
return errno = ERANGE, NULL;
369
370
return rb->rings[idx];
371
}
372
373
unsigned long ring__consumer_pos(const struct ring *r)
374
{
375
/* Synchronizes with smp_store_release() in ringbuf_process_ring(). */
376
return smp_load_acquire(r->consumer_pos);
377
}
378
379
unsigned long ring__producer_pos(const struct ring *r)
380
{
381
/* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in
382
* the kernel.
383
*/
384
return smp_load_acquire(r->producer_pos);
385
}
386
387
size_t ring__avail_data_size(const struct ring *r)
388
{
389
unsigned long cons_pos, prod_pos;
390
391
cons_pos = ring__consumer_pos(r);
392
prod_pos = ring__producer_pos(r);
393
return prod_pos - cons_pos;
394
}
395
396
size_t ring__size(const struct ring *r)
397
{
398
return r->mask + 1;
399
}
400
401
int ring__map_fd(const struct ring *r)
402
{
403
return r->map_fd;
404
}
405
406
int ring__consume_n(struct ring *r, size_t n)
407
{
408
int64_t res;
409
410
res = ringbuf_process_ring(r, n);
411
if (res < 0)
412
return libbpf_err(res);
413
414
return res > INT_MAX ? INT_MAX : res;
415
}
416
417
int ring__consume(struct ring *r)
418
{
419
return ring__consume_n(r, INT_MAX);
420
}
421
422
static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
423
{
424
if (rb->consumer_pos) {
425
munmap(rb->consumer_pos, rb->page_size);
426
rb->consumer_pos = NULL;
427
}
428
if (rb->producer_pos) {
429
munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
430
rb->producer_pos = NULL;
431
}
432
}
433
434
void user_ring_buffer__free(struct user_ring_buffer *rb)
435
{
436
if (!rb)
437
return;
438
439
user_ringbuf_unmap_ring(rb);
440
441
if (rb->epoll_fd >= 0)
442
close(rb->epoll_fd);
443
444
free(rb);
445
}
446
447
static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd)
448
{
449
struct bpf_map_info info;
450
__u32 len = sizeof(info);
451
__u64 mmap_sz;
452
void *tmp;
453
struct epoll_event *rb_epoll;
454
int err;
455
456
memset(&info, 0, sizeof(info));
457
458
err = bpf_map_get_info_by_fd(map_fd, &info, &len);
459
if (err) {
460
err = -errno;
461
pr_warn("user ringbuf: failed to get map info for fd=%d: %s\n",
462
map_fd, errstr(err));
463
return err;
464
}
465
466
if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
467
pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd);
468
return -EINVAL;
469
}
470
471
rb->map_fd = map_fd;
472
rb->mask = info.max_entries - 1;
473
474
/* Map read-only consumer page */
475
tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
476
if (tmp == MAP_FAILED) {
477
err = -errno;
478
pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
479
map_fd, errstr(err));
480
return err;
481
}
482
rb->consumer_pos = tmp;
483
484
/* Map read-write the producer page and data pages. We map the data
485
* region as twice the total size of the ring buffer to allow the
486
* simple reading and writing of samples that wrap around the end of
487
* the buffer. See the kernel implementation for details.
488
*/
489
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
490
if (mmap_sz != (__u64)(size_t)mmap_sz) {
491
pr_warn("user ringbuf: ring buf size (%u) is too big\n", info.max_entries);
492
return -E2BIG;
493
}
494
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED,
495
map_fd, rb->page_size);
496
if (tmp == MAP_FAILED) {
497
err = -errno;
498
pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %s\n",
499
map_fd, errstr(err));
500
return err;
501
}
502
503
rb->producer_pos = tmp;
504
rb->data = tmp + rb->page_size;
505
506
rb_epoll = &rb->event;
507
rb_epoll->events = EPOLLOUT;
508
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
509
err = -errno;
510
pr_warn("user ringbuf: failed to epoll add map fd=%d: %s\n", map_fd, errstr(err));
511
return err;
512
}
513
514
return 0;
515
}
516
517
struct user_ring_buffer *
518
user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts)
519
{
520
struct user_ring_buffer *rb;
521
int err;
522
523
if (!OPTS_VALID(opts, user_ring_buffer_opts))
524
return errno = EINVAL, NULL;
525
526
rb = calloc(1, sizeof(*rb));
527
if (!rb)
528
return errno = ENOMEM, NULL;
529
530
rb->page_size = getpagesize();
531
532
rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
533
if (rb->epoll_fd < 0) {
534
err = -errno;
535
pr_warn("user ringbuf: failed to create epoll instance: %s\n", errstr(err));
536
goto err_out;
537
}
538
539
err = user_ringbuf_map(rb, map_fd);
540
if (err)
541
goto err_out;
542
543
return rb;
544
545
err_out:
546
user_ring_buffer__free(rb);
547
return errno = -err, NULL;
548
}
549
550
static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard)
551
{
552
__u32 new_len;
553
struct ringbuf_hdr *hdr;
554
uintptr_t hdr_offset;
555
556
hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ;
557
hdr = rb->data + (hdr_offset & rb->mask);
558
559
new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT;
560
if (discard)
561
new_len |= BPF_RINGBUF_DISCARD_BIT;
562
563
/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
564
* the kernel.
565
*/
566
__atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL);
567
}
568
569
void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample)
570
{
571
user_ringbuf_commit(rb, sample, true);
572
}
573
574
void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample)
575
{
576
user_ringbuf_commit(rb, sample, false);
577
}
578
579
void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
580
{
581
__u32 avail_size, total_size, max_size;
582
/* 64-bit to avoid overflow in case of extreme application behavior */
583
__u64 cons_pos, prod_pos;
584
struct ringbuf_hdr *hdr;
585
586
/* The top two bits are used as special flags */
587
if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT))
588
return errno = E2BIG, NULL;
589
590
/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
591
* the kernel.
592
*/
593
cons_pos = smp_load_acquire(rb->consumer_pos);
594
/* Synchronizes with smp_store_release() in user_ringbuf_commit() */
595
prod_pos = smp_load_acquire(rb->producer_pos);
596
597
max_size = rb->mask + 1;
598
avail_size = max_size - (prod_pos - cons_pos);
599
/* Round up total size to a multiple of 8. */
600
total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;
601
602
if (total_size > max_size)
603
return errno = E2BIG, NULL;
604
605
if (avail_size < total_size)
606
return errno = ENOSPC, NULL;
607
608
hdr = rb->data + (prod_pos & rb->mask);
609
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
610
hdr->pad = 0;
611
612
/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
613
* the kernel.
614
*/
615
smp_store_release(rb->producer_pos, prod_pos + total_size);
616
617
return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
618
}
619
620
static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end)
621
{
622
__u64 start_ns, end_ns, ns_per_s = 1000000000;
623
624
start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec;
625
end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec;
626
627
return end_ns - start_ns;
628
}
629
630
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
631
{
632
void *sample;
633
int err, ms_remaining = timeout_ms;
634
struct timespec start;
635
636
if (timeout_ms < 0 && timeout_ms != -1)
637
return errno = EINVAL, NULL;
638
639
if (timeout_ms != -1) {
640
err = clock_gettime(CLOCK_MONOTONIC, &start);
641
if (err)
642
return NULL;
643
}
644
645
do {
646
int cnt, ms_elapsed;
647
struct timespec curr;
648
__u64 ns_per_ms = 1000000;
649
650
sample = user_ring_buffer__reserve(rb, size);
651
if (sample)
652
return sample;
653
else if (errno != ENOSPC)
654
return NULL;
655
656
/* The kernel guarantees at least one event notification
657
* delivery whenever at least one sample is drained from the
658
* ring buffer in an invocation to bpf_ringbuf_drain(). Other
659
* additional events may be delivered at any time, but only one
660
* event is guaranteed per bpf_ringbuf_drain() invocation,
661
* provided that a sample is drained, and the BPF program did
662
* not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If
663
* BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a
664
* wakeup event will be delivered even if no samples are
665
* drained.
666
*/
667
cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
668
if (cnt < 0)
669
return NULL;
670
671
if (timeout_ms == -1)
672
continue;
673
674
err = clock_gettime(CLOCK_MONOTONIC, &curr);
675
if (err)
676
return NULL;
677
678
ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms;
679
ms_remaining = timeout_ms - ms_elapsed;
680
} while (ms_remaining > 0);
681
682
/* Try one more time to reserve a sample after the specified timeout has elapsed. */
683
return user_ring_buffer__reserve(rb, size);
684
}
685
686