From 4d6d176ab1c82939d55532a25574876ae00a06bf Mon Sep 17 00:00:00 2001 From: Alexey Feldgendler Date: Tue, 14 Apr 2020 17:14:49 +0200 Subject: [PATCH 1/3] Fixed #428 by making reader.read respect context closure. --- reader.go | 35 +++++++++++++++++++++++++++++------ reader_test.go | 21 +++++++++++++++++++++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/reader.go b/reader.go index 2e3619c08..fb1d61613 100644 --- a/reader.go +++ b/reader.go @@ -1337,25 +1337,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star return } -func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { - r.stats.fetches.observe(1) - r.stats.offset.observe(offset) +// readBatch wraps the call to conn.ReadBatchWith to make it interruptible. +// Conn methods are written in a non-interruptible style, so the only way to +// interrupt them is to close the connection in another goroutine. +func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) { + done := make(chan struct{}) + defer close(done) - t0 := time.Now() - conn.SetReadDeadline(t0.Add(r.maxWait)) + go func() { + select { + case <-ctx.Done(): + conn.Close() + case <-done: + return + } + }() batch := conn.ReadBatchWith(ReadBatchConfig{ MinBytes: r.minBytes, MaxBytes: r.maxBytes, IsolationLevel: r.isolationLevel, }) + return batch, ctx.Err() +} + +func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { + r.stats.fetches.observe(1) + r.stats.offset.observe(offset) + + t0 := time.Now() + conn.SetReadDeadline(t0.Add(r.maxWait)) + + batch, err := r.readBatch(ctx, conn) + if err != nil { + return offset, err + } + highWaterMark := batch.HighWaterMark() t1 := time.Now() r.stats.waitTime.observeDuration(t1.Sub(t0)) var msg Message - var err error var size int64 var bytes int64 diff --git a/reader_test.go b/reader_test.go index b37e0011d..5d60bfee6 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1313,3 +1313,24 @@ func getOffsets(t *testing.T, config ReaderConfig) offsetFetchResponseV1 { return offsets } + +func TestReaderClose(t *testing.T) { + t.Parallel() + + r := NewReader(ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: makeTopic(), + MaxWait: 2 * time.Second, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, _ = r.FetchMessage(ctx) + + t0 := time.Now() + r.Close() + if time.Since(t0) > 100*time.Millisecond { + t.Errorf("r.Close took too long") + } +} From 6be1b0d93b6faf7f3d0ac781ee49fe70b92aa74e Mon Sep 17 00:00:00 2001 From: Alexey Feldgendler Date: Fri, 12 Feb 2021 18:04:04 +0100 Subject: [PATCH 2/3] Added defer r.Close() and error checking to TestReaderClose. --- reader_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/reader_test.go b/reader_test.go index 5d60bfee6..c55842b70 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1322,11 +1322,15 @@ func TestReaderClose(t *testing.T) { Topic: makeTopic(), MaxWait: 2 * time.Second, }) + defer r.Close() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, _ = r.FetchMessage(ctx) + _, err := r.FetchMessage(ctx) + if err != context.Canceled { + t.Errorf("bad err: %v", err) + } t0 := time.Now() r.Close() From 7dfda91b22d1d2b9c1b1a34f6f2b5f75cee806b1 Mon Sep 17 00:00:00 2001 From: Alexey Feldgendler Date: Fri, 12 Feb 2021 18:42:42 +0100 Subject: [PATCH 3/3] Fixed the expected error in TestReaderClose. --- reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reader_test.go b/reader_test.go index c55842b70..6d016e95b 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1328,7 +1328,7 @@ func TestReaderClose(t *testing.T) { defer cancel() _, err := r.FetchMessage(ctx) - if err != context.Canceled { + if err != context.DeadlineExceeded { t.Errorf("bad err: %v", err) }