Skip to content

Fixed #428 by making reader.read respect context closure #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

feldgendler
Copy link

@feldgendler feldgendler commented Apr 14, 2020

Closes #428.


This change is Reviewable

@feldgendler
Copy link
Author

@achille-roussel Hi! Could you please look at this fix?

@achille-roussel
Copy link
Contributor

Thanks for the contribution!

Spawning a goroutine for each batch we read from kafka seems like it could add significant overhead for high throughput readers. I feel like we would want to measure the impact before merging this change.

May I ask which value you're using for MaxWait? Typically I've seen programs use values in the 500ms-1s range, which is why I originally assumed that blocking Close for that duration would be fine (since it often happens when the program exits).

@feldgendler
Copy link
Author

I set MaxWait to a very large value, essentially infinity. I don't see why I'd want FetchMessage to return empty-handed. Whenever it did, I would just restart it because I've got nothing better to do.

I guess I could set it to 1s and keep restarting FetchMessages (although that sounds like a lot of needless activity), but it would cause a problem in our unit tests. We use kafka-go in a complex component that we create and destroy for each unit test. I wouldn't want the teardown of the environment to block each of the tests for a second.

@baconalot
Copy link

I had a similar issue where MaxWait = 5 * time.Minutes would make the reader hang on a rebalance. I set this MaxWait this high because I do not need to be hammered by EOF messages when I am at the head of a topic/partition. (Using groups with multiple topics)

@feldgendler
Copy link
Author

@achille-roussel Hey, any chance for this to be merged?

@achille-roussel
Copy link
Contributor

I'll take another look at it this week.

@Antonboom
Copy link

+1

@feldgendler
Copy link
Author

@achille-roussel any news? Sorry to bother, but this is important for us, and it's bothersome to live with a local package replacement.

reader_test.go Outdated
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, _ = r.FetchMessage(ctx)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err handling?

and second r.Close via defer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@feldgendler
Copy link
Author

@Antonboom Does it look mergeable now?

@Antonboom
Copy link

@Antonboom Does it look mergeable now?

@feldgendler, I'm not the owner of repo :(
I'm also interested in this fix and was just passing by.

@achille-roussel, merge please...

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!

Generally this looks good, but the overhead may not be practical for high throughput readers, which tend to use low value MaxWait and therefore aren't impacted by the closure waiting on inflight requests. We would need to validate that we aren't introducing performance regressions with this change before we merge it in.

case <-done:
return
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to wait for this goroutine to finish before returning from the readBatch function, otherwise the goroutine will execute asynchronously and may close the connection when ctx is canceled after the function returned.

case <-done:
return
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that having this goroutine spawn for every batch read by the consumer may greatly increase pressure on the Go scheduler and GC, which could impact performance of high throughput consumers.

Could we either add a benchmark to evaluate the impact of this change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reader.Close hangs for ReaderConfig.MaxWait
4 participants