-
Notifications
You must be signed in to change notification settings - Fork 817
Fixed #428 by making reader.read respect context closure #429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1337,25 +1337,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star | |
return | ||
} | ||
|
||
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { | ||
r.stats.fetches.observe(1) | ||
r.stats.offset.observe(offset) | ||
// readBatch wraps the call to conn.ReadBatchWith to make it interruptible. | ||
// Conn methods are written in a non-interruptible style, so the only way to | ||
// interrupt them is to close the connection in another goroutine. | ||
func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) { | ||
done := make(chan struct{}) | ||
defer close(done) | ||
|
||
t0 := time.Now() | ||
conn.SetReadDeadline(t0.Add(r.maxWait)) | ||
go func() { | ||
select { | ||
case <-ctx.Done(): | ||
conn.Close() | ||
case <-done: | ||
return | ||
} | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm concerned that having this goroutine spawn for every batch read by the consumer may greatly increase pressure on the Go scheduler and GC, which could impact performance of high throughput consumers. Could we either add a benchmark to evaluate the impact of this change? |
||
|
||
batch := conn.ReadBatchWith(ReadBatchConfig{ | ||
MinBytes: r.minBytes, | ||
MaxBytes: r.maxBytes, | ||
IsolationLevel: r.isolationLevel, | ||
}) | ||
return batch, ctx.Err() | ||
} | ||
|
||
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { | ||
r.stats.fetches.observe(1) | ||
r.stats.offset.observe(offset) | ||
|
||
t0 := time.Now() | ||
conn.SetReadDeadline(t0.Add(r.maxWait)) | ||
|
||
batch, err := r.readBatch(ctx, conn) | ||
if err != nil { | ||
return offset, err | ||
} | ||
|
||
highWaterMark := batch.HighWaterMark() | ||
|
||
t1 := time.Now() | ||
r.stats.waitTime.observeDuration(t1.Sub(t0)) | ||
|
||
var msg Message | ||
var err error | ||
var size int64 | ||
var bytes int64 | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1313,3 +1313,24 @@ func getOffsets(t *testing.T, config ReaderConfig) offsetFetchResponseV1 { | |
|
||
return offsets | ||
} | ||
|
||
func TestReaderClose(t *testing.T) { | ||
t.Parallel() | ||
|
||
r := NewReader(ReaderConfig{ | ||
Brokers: []string{"localhost:9092"}, | ||
Topic: makeTopic(), | ||
MaxWait: 2 * time.Second, | ||
}) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
_, _ = r.FetchMessage(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. err handling? and second There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
|
||
t0 := time.Now() | ||
r.Close() | ||
if time.Since(t0) > 100*time.Millisecond { | ||
t.Errorf("r.Close took too long") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to wait for this goroutine to finish before returning from the
readBatch
function, otherwise the goroutine will execute asynchronously and may close the connection whenctx
is canceled after the function returned.