-
Notifications
You must be signed in to change notification settings - Fork 811
Fixed #428 by making reader.read respect context closure #429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@achille-roussel Hi! Could you please look at this fix? |
Thanks for the contribution! Spawning a goroutine for each batch we read from kafka seems like it could add significant overhead for high throughput readers. I feel like we would want to measure the impact before merging this change. May I ask which value you're using for MaxWait? Typically I've seen programs use values in the 500ms-1s range, which is why I originally assumed that blocking Close for that duration would be fine (since it often happens when the program exits). |
I set I guess I could set it to 1s and keep restarting |
I had a similar issue where MaxWait = 5 * time.Minutes would make the reader hang on a rebalance. I set this MaxWait this high because I do not need to be hammered by EOF messages when I am at the head of a topic/partition. (Using groups with multiple topics) |
@achille-roussel Hey, any chance for this to be merged? |
I'll take another look at it this week. |
+1 |
@achille-roussel any news? Sorry to bother, but this is important for us, and it's bothersome to live with a local package replacement. |
reader_test.go
Outdated
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
defer cancel() | ||
|
||
_, _ = r.FetchMessage(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err handling?
and second r.Close
via defer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@Antonboom Does it look mergeable now? |
@feldgendler, I'm not the owner of repo :( @achille-roussel, merge please... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution!
Generally this looks good, but the overhead may not be practical for high throughput readers, which tend to use low value MaxWait and therefore aren't impacted by the closure waiting on inflight requests. We would need to validate that we aren't introducing performance regressions with this change before we merge it in.
case <-done: | ||
return | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to wait for this goroutine to finish before returning from the readBatch
function, otherwise the goroutine will execute asynchronously and may close the connection when ctx
is canceled after the function returned.
case <-done: | ||
return | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that having this goroutine spawn for every batch read by the consumer may greatly increase pressure on the Go scheduler and GC, which could impact performance of high throughput consumers.
Could we either add a benchmark to evaluate the impact of this change?
Closes #428.
This change is