Skip to content
This repository was archived by the owner on Sep 25, 2023. It is now read-only.

Commit 4d6d176

Browse files
committed
Fixed segmentio#428 by making reader.read respect context closure.
1 parent e0af1cf commit 4d6d176

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

reader.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -1337,25 +1337,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
13371337
return
13381338
}
13391339

1340-
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1341-
r.stats.fetches.observe(1)
1342-
r.stats.offset.observe(offset)
1340+
// readBatch wraps the call to conn.ReadBatchWith to make it interruptible.
1341+
// Conn methods are written in a non-interruptible style, so the only way to
1342+
// interrupt them is to close the connection in another goroutine.
1343+
func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) {
1344+
done := make(chan struct{})
1345+
defer close(done)
13431346

1344-
t0 := time.Now()
1345-
conn.SetReadDeadline(t0.Add(r.maxWait))
1347+
go func() {
1348+
select {
1349+
case <-ctx.Done():
1350+
conn.Close()
1351+
case <-done:
1352+
return
1353+
}
1354+
}()
13461355

13471356
batch := conn.ReadBatchWith(ReadBatchConfig{
13481357
MinBytes: r.minBytes,
13491358
MaxBytes: r.maxBytes,
13501359
IsolationLevel: r.isolationLevel,
13511360
})
1361+
return batch, ctx.Err()
1362+
}
1363+
1364+
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1365+
r.stats.fetches.observe(1)
1366+
r.stats.offset.observe(offset)
1367+
1368+
t0 := time.Now()
1369+
conn.SetReadDeadline(t0.Add(r.maxWait))
1370+
1371+
batch, err := r.readBatch(ctx, conn)
1372+
if err != nil {
1373+
return offset, err
1374+
}
1375+
13521376
highWaterMark := batch.HighWaterMark()
13531377

13541378
t1 := time.Now()
13551379
r.stats.waitTime.observeDuration(t1.Sub(t0))
13561380

13571381
var msg Message
1358-
var err error
13591382
var size int64
13601383
var bytes int64
13611384

reader_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -1313,3 +1313,24 @@ func getOffsets(t *testing.T, config ReaderConfig) offsetFetchResponseV1 {
13131313

13141314
return offsets
13151315
}
1316+
1317+
func TestReaderClose(t *testing.T) {
1318+
t.Parallel()
1319+
1320+
r := NewReader(ReaderConfig{
1321+
Brokers: []string{"localhost:9092"},
1322+
Topic: makeTopic(),
1323+
MaxWait: 2 * time.Second,
1324+
})
1325+
1326+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1327+
defer cancel()
1328+
1329+
_, _ = r.FetchMessage(ctx)
1330+
1331+
t0 := time.Now()
1332+
r.Close()
1333+
if time.Since(t0) > 100*time.Millisecond {
1334+
t.Errorf("r.Close took too long")
1335+
}
1336+
}

0 commit comments

Comments
 (0)