diff --git a/reader.go b/reader.go index 04d90f355..879062666 100644 --- a/reader.go +++ b/reader.go @@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { } } +// FetchMessageBatch fetches a batch of messages from the reader. It is similar to +// FetchMessage, except it blocks until no. of messages read reaches batchSize. +func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) { + r.activateReadLag() + msgBatch := make([]Message, 0, batchSize) + + var i int + for i <= batchSize { + r.mutex.Lock() + + if !r.closed && r.version == 0 { + r.start(r.getTopicPartitionOffset()) + } + + version := r.version + r.mutex.Unlock() + + select { + case <-ctx.Done(): + return []Message{}, ctx.Err() + + case err := <-r.runError: + return []Message{}, err + + case m, ok := <-r.msgs: + if !ok { + return []Message{}, io.EOF + } + + if m.version < version { + continue + } + + r.mutex.Lock() + + switch { + case m.error != nil: + case version == r.version: + r.offset = m.message.Offset + 1 + r.lag = m.watermark - r.offset + } + + r.mutex.Unlock() + + if errors.Is(m.error, io.EOF) { + // io.EOF is used as a marker to indicate that the stream + // has been closed, in case it was received from the inner + // reader we don't want to confuse the program and replace + // the error with io.ErrUnexpectedEOF. + m.error = io.ErrUnexpectedEOF + } + if m.error != nil { + return nil, m.error + } + + msgBatch = append(msgBatch, m.message) + } + i++ + } + return msgBatch, nil +} + // ReadLag returns the current lag of the reader by fetching the last offset of // the topic and partition and computing the difference between that value and // the offset of the last message returned by ReadMessage.