Skip to content

Commit 8adf8e1

Browse files
feat(buffer): move polling to cgo loop (#493)
Polling loops are cgo hotpaths in consuming applications. Since cgo calls are rather expensive, moving the polling loop to cgo should significantly reduce the number of cgo calls. Now the go part only handles orchestration of the internal c loop. An atomic flag is used to signal the c loop to exit.
1 parent 3474da5 commit 8adf8e1

File tree

4 files changed

+68
-57
lines changed

4 files changed

+68
-57
lines changed

buf-perf.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import "C"
99
import (
1010
"fmt"
1111
"sync"
12+
"sync/atomic"
1213
"syscall"
14+
"unsafe"
1315
)
1416

1517
//
@@ -22,15 +24,14 @@ type PerfBuffer struct {
2224
slot uint
2325
eventsChan chan []byte
2426
lostChan chan uint64
25-
stop chan struct{}
2627
closed bool
2728
wg sync.WaitGroup
29+
stopFlag uint32 // use with atomic operations
2830
}
2931

3032
// Poll will wait until timeout in milliseconds to gather
3133
// data from the perf buffer.
3234
func (pb *PerfBuffer) Poll(timeout int) {
33-
pb.stop = make(chan struct{})
3435
pb.wg.Add(1)
3536
go pb.poll(timeout)
3637
}
@@ -41,12 +42,12 @@ func (pb *PerfBuffer) Start() {
4142
}
4243

4344
func (pb *PerfBuffer) Stop() {
44-
if pb.stop == nil {
45+
if atomic.LoadUint32(&pb.stopFlag) == 1 {
4546
return
4647
}
4748

4849
// Signal the poll goroutine to exit
49-
close(pb.stop)
50+
atomic.StoreUint32(&pb.stopFlag, 1)
5051

5152
// The event and lost channels should be drained here since the consumer
5253
// may have stopped at this point. Failure to drain it will
@@ -73,9 +74,6 @@ func (pb *PerfBuffer) Stop() {
7374
if pb.lostChan != nil {
7475
close(pb.lostChan)
7576
}
76-
77-
// Reset pb.stop to allow multiple safe calls to Stop()
78-
pb.stop = nil
7977
}
8078

8179
func (pb *PerfBuffer) Close() {
@@ -93,20 +91,11 @@ func (pb *PerfBuffer) Close() {
9391
func (pb *PerfBuffer) poll(timeout int) error {
9492
defer pb.wg.Done()
9593

96-
for {
97-
select {
98-
case <-pb.stop:
99-
return nil
100-
default:
101-
retC := C.perf_buffer__poll(pb.pb, C.int(timeout))
102-
if retC < 0 {
103-
errno := syscall.Errno(-retC)
104-
if errno == syscall.EINTR {
105-
continue
106-
}
107-
108-
return fmt.Errorf("error polling perf buffer: %w", errno)
109-
}
110-
}
94+
stopFlag := (*C.uint32_t)(unsafe.Pointer(&pb.stopFlag))
95+
ret := C.cgo_perf_buffer__poll(pb.pb, C.int(timeout), stopFlag)
96+
if ret < 0 {
97+
return fmt.Errorf("error polling perf buffer: %w", syscall.Errno(-ret))
11198
}
99+
100+
return nil
112101
}

buf-ring.go

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,27 @@ import "C"
99
import (
1010
"fmt"
1111
"sync"
12+
"sync/atomic"
1213
"syscall"
14+
"unsafe"
1315
)
1416

1517
//
1618
// RingBuffer
1719
//
1820

1921
type RingBuffer struct {
20-
rb *C.struct_ring_buffer
21-
bpfMap *BPFMap
22-
slots []uint
23-
stop chan struct{}
24-
closed bool
25-
wg sync.WaitGroup
22+
rb *C.struct_ring_buffer
23+
bpfMap *BPFMap
24+
slots []uint
25+
closed bool
26+
wg sync.WaitGroup
27+
stopFlag uint32 // use with atomic operations
2628
}
2729

2830
// Poll will wait until timeout in milliseconds to gather
2931
// data from the ring buffer.
3032
func (rb *RingBuffer) Poll(timeout int) {
31-
rb.stop = make(chan struct{})
3233
rb.wg.Add(1)
3334
go rb.poll(timeout)
3435
}
@@ -39,12 +40,12 @@ func (rb *RingBuffer) Start() {
3940
}
4041

4142
func (rb *RingBuffer) Stop() {
42-
if rb.stop == nil {
43+
if atomic.LoadUint32(&rb.stopFlag) == 1 {
4344
return
4445
}
4546

4647
// Signal the poll goroutine to exit
47-
close(rb.stop)
48+
atomic.StoreUint32(&rb.stopFlag, 1)
4849

4950
// The event channel should be drained here since the consumer
5051
// may have stopped at this point. Failure to drain it will
@@ -69,9 +70,6 @@ func (rb *RingBuffer) Stop() {
6970
eventChan := eventChannels.get(slot).(chan []byte)
7071
close(eventChan)
7172
}
72-
73-
// Reset pb.stop to allow multiple safe calls to Stop()
74-
rb.stop = nil
7573
}
7674

7775
func (rb *RingBuffer) Close() {
@@ -87,32 +85,13 @@ func (rb *RingBuffer) Close() {
8785
rb.closed = true
8886
}
8987

90-
func (rb *RingBuffer) isStopped() bool {
91-
select {
92-
case <-rb.stop:
93-
return true
94-
default:
95-
return false
96-
}
97-
}
98-
9988
func (rb *RingBuffer) poll(timeout int) error {
10089
defer rb.wg.Done()
10190

102-
for {
103-
retC := C.ring_buffer__poll(rb.rb, C.int(timeout))
104-
if rb.isStopped() {
105-
break
106-
}
107-
108-
if retC < 0 {
109-
errno := syscall.Errno(-retC)
110-
if errno == syscall.EINTR {
111-
continue
112-
}
113-
114-
return fmt.Errorf("error polling ring buffer: %w", errno)
115-
}
91+
stopFlag := (*C.uint32_t)(unsafe.Pointer(&rb.stopFlag))
92+
ret := C.cgo_ring_buffer__poll(rb.rb, C.int(timeout), stopFlag)
93+
if ret < 0 {
94+
return fmt.Errorf("error polling perf buffer: %w", syscall.Errno(-ret))
11695
}
11796

11897
return nil

libbpfgo.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx)
8989
return ret;
9090
}
9191

92+
int cgo_ring_buffer__poll(struct ring_buffer *rb, int timeout, volatile uint32_t *stop_flag)
93+
{
94+
while (*stop_flag == 0) {
95+
int ret = ring_buffer__poll(rb, timeout);
96+
97+
if (ret < 0) {
98+
// Handle EINTR like the original Go code - continue polling
99+
if (errno == EINTR) {
100+
continue;
101+
}
102+
// Return other errors to Go
103+
return ret;
104+
}
105+
106+
// Continue polling until stop_flag is set
107+
// Events are still processed in real-time via callbacks
108+
}
109+
return 0;
110+
}
111+
92112
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
93113
{
94114
struct perf_buffer_opts pb_opts = {};
@@ -108,6 +128,26 @@ struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
108128
return pb;
109129
}
110130

131+
int cgo_perf_buffer__poll(struct perf_buffer *pb, int timeout_ms, volatile uint32_t *stop_flag)
132+
{
133+
while (*stop_flag == 0) {
134+
int ret = perf_buffer__poll(pb, timeout_ms);
135+
136+
if (ret < 0) {
137+
// Handle EINTR like the original Go code - continue polling
138+
if (errno == EINTR) {
139+
continue;
140+
}
141+
// Return other errors to Go
142+
return ret;
143+
}
144+
145+
// Continue polling until stop_flag is set
146+
// Events are still processed in real-time via callbacks
147+
}
148+
return 0;
149+
}
150+
111151
void cgo_bpf_map__initial_value(struct bpf_map *map, void *value)
112152
{
113153
size_t psize;

libbpfgo.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ void cgo_libbpf_set_print_fn();
2424
struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx);
2525
struct user_ring_buffer *cgo_init_user_ring_buf(int map_fd);
2626
int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx);
27+
int cgo_ring_buffer__poll(struct ring_buffer *rb, int timeout, volatile uint32_t *stop_flag);
28+
2729
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx);
30+
int cgo_perf_buffer__poll(struct perf_buffer *pb, int timeout, volatile uint32_t *stop_flag);
2831

2932
void cgo_bpf_map__initial_value(struct bpf_map *map, void *value);
3033

0 commit comments

Comments
 (0)