Skip to content

Commit 754baf3

Browse files
committed
feat: enhance TopicReader with retry strategy and improved error handling
Signed-off-by: Vladislav Polyakov <polRk@ydb.tech>
1 parent af169fe commit 754baf3

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

packages/topic/src/reader.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { StatusIds_StatusCode } from "@ydbjs/api/operation";
77
import { type OffsetsRange, OffsetsRangeSchema, type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset, StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, type StreamReadMessage_FromClient, StreamReadMessage_FromClientSchema, type StreamReadMessage_FromServer, StreamReadMessage_FromServerSchema, type StreamReadMessage_InitRequest_TopicReadSettings, StreamReadMessage_InitRequest_TopicReadSettingsSchema, type StreamReadMessage_ReadResponse, TopicServiceDefinition } from "@ydbjs/api/topic";
88
import type { Driver } from "@ydbjs/core";
99
import { YDBError } from "@ydbjs/error";
10-
import { retry } from "@ydbjs/retry";
10+
import { type RetryConfig, retry } from "@ydbjs/retry";
11+
import { backoff, combine, jitter } from "@ydbjs/retry/strategy";
1112
import type { StringValue } from "ms";
1213
import ms from "ms";
1314

@@ -326,12 +327,24 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
326327
let signal = this.#controller.signal
327328
await this.#driver.ready(signal)
328329

330+
let retryConfig: RetryConfig = {
331+
signal,
332+
budget: Infinity,
333+
strategy: combine(jitter(50), backoff(50, 5000)),
334+
retry(error) {
335+
dbg('retrying stream read due to %O', error);
336+
return true;
337+
},
338+
}
339+
329340
try {
330341
// TODO: handle user errors (for example tx errors). Ex: use abort signal
331-
await retry({ signal, idempotent: true }, async () => {
342+
await retry(retryConfig, async () => {
343+
using outgoing = new AsyncEventEmitter<StreamReadMessage_FromClient>(this.#fromClientEmitter, 'message')
344+
332345
let stream = this.#driver
333346
.createClient(TopicServiceDefinition)
334-
.streamRead(new AsyncEventEmitter(this.#fromClientEmitter, 'message'), { signal });
347+
.streamRead(outgoing, { signal });
335348

336349
nextTick(() => {
337350
dbg('start consuming topic stream for consumer %s with autoPartitioningSupport=%o', this.#options.consumer, false);
@@ -353,15 +366,16 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
353366

354367
if (event.status !== StatusIds_StatusCode.SUCCESS) {
355368
let error = new YDBError(event.status, event.issues)
356-
this.#fromServerEmitter.emit('error', error);
357-
369+
dbg('received error from server: %s', error.message);
358370
throw error;
359371
}
360372

361373
this.#fromServerEmitter.emit('message', event);
362374
}
363375
});
364376
} catch (error) {
377+
dbg('error: %O', error);
378+
365379
this.#fromServerEmitter.emit('error', error);
366380
} finally {
367381
this.#fromServerEmitter.emit('end');

0 commit comments

Comments
 (0)