Skip to content

Add method to fetch messages in batch #1390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
}
}

// FetchMessageBatch fetches a batch of messages from the reader. It is similar to
// FetchMessage, except it blocks until no. of messages read reaches batchSize.
func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) {
r.activateReadLag()
msgBatch := make([]Message, 0, batchSize)

var i int
for i <= batchSize {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
}

version := r.version
r.mutex.Unlock()

select {
case <-ctx.Done():
return []Message{}, ctx.Err()

case err := <-r.runError:
return []Message{}, err

case m, ok := <-r.msgs:
if !ok {
return []Message{}, io.EOF
}

if m.version < version {
continue
}

r.mutex.Lock()

switch {
case m.error != nil:
case version == r.version:
r.offset = m.message.Offset + 1
r.lag = m.watermark - r.offset
}

r.mutex.Unlock()

if errors.Is(m.error, io.EOF) {
// io.EOF is used as a marker to indicate that the stream
// has been closed, in case it was received from the inner
// reader we don't want to confuse the program and replace
// the error with io.ErrUnexpectedEOF.
m.error = io.ErrUnexpectedEOF
}
if m.error != nil {
return nil, m.error
}

msgBatch = append(msgBatch, m.message)
}
i++
}
return msgBatch, nil
}
Copy link
Author

@krsoninikhil krsoninikhil Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication of the code can be avoided by calling this method in FetchMessage. I'll refactor if once the approach gets reviewed.

Copy link

@ghaninia ghaninia Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the number of messages doesn't reach the desired batchSize?

you changed the offset when the batch is processed, what happens if one of the messages in the batch fails? Is there any mechanism in place to handle that? Do you have any ideas for a fallback strategy for this?!

@krsoninikhil

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghaninia for batch processing possible use manuall ack, maybe?

If one of message failed, we can ack all messages before failed, except message with problem.


// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
Expand Down