Skip to content

Commit 37346eb

Browse files
Revert "feat(buffer): move polling to cgo loop (#493)" (#495)
This reverts commit 8adf8e1.
1 parent 8adf8e1 commit 37346eb

File tree

4 files changed

+57
-68
lines changed

4 files changed

+57
-68
lines changed

buf-perf.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import "C"
99
import (
1010
"fmt"
1111
"sync"
12-
"sync/atomic"
1312
"syscall"
14-
"unsafe"
1513
)
1614

1715
//
@@ -24,14 +22,15 @@ type PerfBuffer struct {
2422
slot uint
2523
eventsChan chan []byte
2624
lostChan chan uint64
25+
stop chan struct{}
2726
closed bool
2827
wg sync.WaitGroup
29-
stopFlag uint32 // use with atomic operations
3028
}
3129

3230
// Poll will wait until timeout in milliseconds to gather
3331
// data from the perf buffer.
3432
func (pb *PerfBuffer) Poll(timeout int) {
33+
pb.stop = make(chan struct{})
3534
pb.wg.Add(1)
3635
go pb.poll(timeout)
3736
}
@@ -42,12 +41,12 @@ func (pb *PerfBuffer) Start() {
4241
}
4342

4443
func (pb *PerfBuffer) Stop() {
45-
if atomic.LoadUint32(&pb.stopFlag) == 1 {
44+
if pb.stop == nil {
4645
return
4746
}
4847

4948
// Signal the poll goroutine to exit
50-
atomic.StoreUint32(&pb.stopFlag, 1)
49+
close(pb.stop)
5150

5251
// The event and lost channels should be drained here since the consumer
5352
// may have stopped at this point. Failure to drain it will
@@ -74,6 +73,9 @@ func (pb *PerfBuffer) Stop() {
7473
if pb.lostChan != nil {
7574
close(pb.lostChan)
7675
}
76+
77+
// Reset pb.stop to allow multiple safe calls to Stop()
78+
pb.stop = nil
7779
}
7880

7981
func (pb *PerfBuffer) Close() {
@@ -91,11 +93,20 @@ func (pb *PerfBuffer) Close() {
9193
func (pb *PerfBuffer) poll(timeout int) error {
9294
defer pb.wg.Done()
9395

94-
stopFlag := (*C.uint32_t)(unsafe.Pointer(&pb.stopFlag))
95-
ret := C.cgo_perf_buffer__poll(pb.pb, C.int(timeout), stopFlag)
96-
if ret < 0 {
97-
return fmt.Errorf("error polling perf buffer: %w", syscall.Errno(-ret))
96+
for {
97+
select {
98+
case <-pb.stop:
99+
return nil
100+
default:
101+
retC := C.perf_buffer__poll(pb.pb, C.int(timeout))
102+
if retC < 0 {
103+
errno := syscall.Errno(-retC)
104+
if errno == syscall.EINTR {
105+
continue
106+
}
107+
108+
return fmt.Errorf("error polling perf buffer: %w", errno)
109+
}
110+
}
98111
}
99-
100-
return nil
101112
}

buf-ring.go

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,26 @@ import "C"
99
import (
1010
"fmt"
1111
"sync"
12-
"sync/atomic"
1312
"syscall"
14-
"unsafe"
1513
)
1614

1715
//
1816
// RingBuffer
1917
//
2018

2119
type RingBuffer struct {
22-
rb *C.struct_ring_buffer
23-
bpfMap *BPFMap
24-
slots []uint
25-
closed bool
26-
wg sync.WaitGroup
27-
stopFlag uint32 // use with atomic operations
20+
rb *C.struct_ring_buffer
21+
bpfMap *BPFMap
22+
slots []uint
23+
stop chan struct{}
24+
closed bool
25+
wg sync.WaitGroup
2826
}
2927

3028
// Poll will wait until timeout in milliseconds to gather
3129
// data from the ring buffer.
3230
func (rb *RingBuffer) Poll(timeout int) {
31+
rb.stop = make(chan struct{})
3332
rb.wg.Add(1)
3433
go rb.poll(timeout)
3534
}
@@ -40,12 +39,12 @@ func (rb *RingBuffer) Start() {
4039
}
4140

4241
func (rb *RingBuffer) Stop() {
43-
if atomic.LoadUint32(&rb.stopFlag) == 1 {
42+
if rb.stop == nil {
4443
return
4544
}
4645

4746
// Signal the poll goroutine to exit
48-
atomic.StoreUint32(&rb.stopFlag, 1)
47+
close(rb.stop)
4948

5049
// The event channel should be drained here since the consumer
5150
// may have stopped at this point. Failure to drain it will
@@ -70,6 +69,9 @@ func (rb *RingBuffer) Stop() {
7069
eventChan := eventChannels.get(slot).(chan []byte)
7170
close(eventChan)
7271
}
72+
73+
// Reset pb.stop to allow multiple safe calls to Stop()
74+
rb.stop = nil
7375
}
7476

7577
func (rb *RingBuffer) Close() {
@@ -85,13 +87,32 @@ func (rb *RingBuffer) Close() {
8587
rb.closed = true
8688
}
8789

90+
func (rb *RingBuffer) isStopped() bool {
91+
select {
92+
case <-rb.stop:
93+
return true
94+
default:
95+
return false
96+
}
97+
}
98+
8899
func (rb *RingBuffer) poll(timeout int) error {
89100
defer rb.wg.Done()
90101

91-
stopFlag := (*C.uint32_t)(unsafe.Pointer(&rb.stopFlag))
92-
ret := C.cgo_ring_buffer__poll(rb.rb, C.int(timeout), stopFlag)
93-
if ret < 0 {
94-
return fmt.Errorf("error polling perf buffer: %w", syscall.Errno(-ret))
102+
for {
103+
retC := C.ring_buffer__poll(rb.rb, C.int(timeout))
104+
if rb.isStopped() {
105+
break
106+
}
107+
108+
if retC < 0 {
109+
errno := syscall.Errno(-retC)
110+
if errno == syscall.EINTR {
111+
continue
112+
}
113+
114+
return fmt.Errorf("error polling ring buffer: %w", errno)
115+
}
95116
}
96117

97118
return nil

libbpfgo.c

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -89,26 +89,6 @@ int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx)
8989
return ret;
9090
}
9191

92-
int cgo_ring_buffer__poll(struct ring_buffer *rb, int timeout, volatile uint32_t *stop_flag)
93-
{
94-
while (*stop_flag == 0) {
95-
int ret = ring_buffer__poll(rb, timeout);
96-
97-
if (ret < 0) {
98-
// Handle EINTR like the original Go code - continue polling
99-
if (errno == EINTR) {
100-
continue;
101-
}
102-
// Return other errors to Go
103-
return ret;
104-
}
105-
106-
// Continue polling until stop_flag is set
107-
// Events are still processed in real-time via callbacks
108-
}
109-
return 0;
110-
}
111-
11292
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
11393
{
11494
struct perf_buffer_opts pb_opts = {};
@@ -128,26 +108,6 @@ struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
128108
return pb;
129109
}
130110

131-
int cgo_perf_buffer__poll(struct perf_buffer *pb, int timeout_ms, volatile uint32_t *stop_flag)
132-
{
133-
while (*stop_flag == 0) {
134-
int ret = perf_buffer__poll(pb, timeout_ms);
135-
136-
if (ret < 0) {
137-
// Handle EINTR like the original Go code - continue polling
138-
if (errno == EINTR) {
139-
continue;
140-
}
141-
// Return other errors to Go
142-
return ret;
143-
}
144-
145-
// Continue polling until stop_flag is set
146-
// Events are still processed in real-time via callbacks
147-
}
148-
return 0;
149-
}
150-
151111
void cgo_bpf_map__initial_value(struct bpf_map *map, void *value)
152112
{
153113
size_t psize;

libbpfgo.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ void cgo_libbpf_set_print_fn();
2424
struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx);
2525
struct user_ring_buffer *cgo_init_user_ring_buf(int map_fd);
2626
int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx);
27-
int cgo_ring_buffer__poll(struct ring_buffer *rb, int timeout, volatile uint32_t *stop_flag);
28-
2927
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx);
30-
int cgo_perf_buffer__poll(struct perf_buffer *pb, int timeout, volatile uint32_t *stop_flag);
3128

3229
void cgo_bpf_map__initial_value(struct bpf_map *map, void *value);
3330

0 commit comments

Comments
 (0)