Skip to content

Commit bd15092

Browse files
committed
feat: enhance TopicReader with pending commit management and graceful disposal
Signed-off-by: Vladislav Polyakov <polRk@ydb.tech>
1 parent 754baf3 commit bd15092

File tree

1 file changed

+129
-113
lines changed

1 file changed

+129
-113
lines changed

packages/topic/src/reader.ts

Lines changed: 129 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as assert from "node:assert";
22
import { EventEmitter, once } from "node:events";
3+
import { nextTick } from "node:process";
34

45
import { create, protoInt64, toJson } from "@bufbuild/protobuf";
56
import { type Duration, DurationSchema, type Timestamp, timestampDate, timestampFromDate } from "@bufbuild/protobuf/wkt";
@@ -12,7 +13,6 @@ import { backoff, combine, jitter } from "@ydbjs/retry/strategy";
1213
import type { StringValue } from "ms";
1314
import ms from "ms";
1415

15-
import { nextTick } from "node:process";
1616
import { AsyncEventEmitter } from "./aee.js";
1717
import { dbg } from "./dbg.js";
1818
import { type TopicMessage } from "./message.js";
@@ -83,6 +83,13 @@ export type onCommittedOffsetCallback = (
8383
committedOffset: bigint,
8484
) => void
8585

86+
type TopicCommitPromise = {
87+
partitionSessionId: bigint
88+
offset: bigint
89+
resolve: () => void
90+
reject: (reason?: any) => void
91+
}
92+
8693
export type TopicReaderOptions<Payload = Uint8Array> = {
8794
topic: string | TopicReaderSource | TopicReaderSource[]
8895
consumer: string
@@ -111,9 +118,17 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
111118
#fromClientEmitter = new EventEmitter<FromClientEmitterMap>();
112119
#fromServerEmitter = new EventEmitter<FromServerEmitterMap>();
113120

114-
// partitionSessionId -> TopicPartitionSession
115-
#partitionSessions: Map<bigint, TopicPartitionSession> = new Map();
121+
// partition sessions that are currently active.
122+
#partitionSessions: Map<bigint, TopicPartitionSession> = new Map(); // partitionSessionId -> TopicPartitionSession
123+
124+
// pending commits that are not yet resolved.
125+
#pendingCommits: Map<bigint, TopicCommitPromise[]> = new Map(); // partitionSessionId -> TopicCommitPromise[]
116126

127+
/**
128+
* Creates a new TopicReader instance.
129+
* @param driver - The YDB driver instance to use for communication with the YDB server.
130+
* @param options - Options for the topic reader, including topic path, consumer name, and optional callbacks.
131+
*/
117132
constructor(driver: Driver, options: TopicReaderOptions<Payload>) {
118133
this.#driver = driver;
119134
this.#options = { ...options };
@@ -219,6 +234,22 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
219234
}
220235
}
221236

237+
let pendingCommits = this.#pendingCommits.get(partitionSession.partitionSessionId);
238+
if (pendingCommits?.length) {
239+
// If there are pending commits for this partition session, resolve them.
240+
let i = 0;
241+
while (i < pendingCommits.length) {
242+
let commit = pendingCommits[i];
243+
if (commit.offset <= commitOffset) {
244+
// If the commit offset is less than or equal to the committed offset, resolve it.
245+
commit.resolve();
246+
pendingCommits.splice(i, 1); // Remove from pending commits
247+
} else {
248+
i++;
249+
}
250+
}
251+
}
252+
222253
this.#fromClientEmitter.emit('message', create(StreamReadMessage_FromClientSchema, {
223254
clientMessage: {
224255
case: 'startPartitionSessionResponse',
@@ -240,7 +271,6 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
240271
return;
241272
}
242273

243-
244274
if (this.#options.onPartitionSessionStop) {
245275
let committedOffset = message.serverMessage.value.committedOffset || 0n;
246276

@@ -256,6 +286,7 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
256286
partitionSession.stop();
257287
this.#partitionSessions.delete(partitionSession.partitionSessionId);
258288

289+
// Remove all messages from the buffer that belong to this partition session.
259290
for (let part of this.#buffer) {
260291
// Remove all messages from the buffer that belong to this partition session.
261292
let i = 0;
@@ -267,6 +298,18 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
267298
}
268299
}
269300
}
301+
302+
let pendingCommits = this.#pendingCommits.get(partitionSession.partitionSessionId);
303+
if (pendingCommits) {
304+
// If there are pending commits for this partition session, reject them.
305+
for (let commit of pendingCommits) {
306+
commit.reject('Partition session stopped without graceful stop');
307+
}
308+
309+
this.#pendingCommits.delete(partitionSession.partitionSessionId);
310+
}
311+
312+
return;
270313
}
271314

272315
// Отсылать ответ после того, как прочитали все сообщения из внутреннего буфера по конкретной partition session.
@@ -308,6 +351,33 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
308351
this.#options.onCommittedOffset(partitionSession, part.committedOffset)
309352
}
310353
}
354+
355+
// Resolve all pending commits for the partition sessions.
356+
for (let part of message.serverMessage.value.partitionsCommittedOffsets) {
357+
let partitionSessionId = part.partitionSessionId;
358+
let committedOffset = part.committedOffset;
359+
360+
// Resolve all pending commits for this partition session.
361+
let pendingCommits = this.#pendingCommits.get(partitionSessionId);
362+
if (pendingCommits) {
363+
let i = 0;
364+
while (i < pendingCommits.length) {
365+
let commit = pendingCommits[i];
366+
if (commit.offset <= committedOffset) {
367+
// If the commit offset is less than or equal to the committed offset, resolve it.
368+
commit.resolve();
369+
pendingCommits.splice(i, 1); // Remove from pending commits
370+
} else {
371+
i++;
372+
}
373+
}
374+
}
375+
376+
// If there are no pending commits for this partition session, remove it from the map.
377+
if (pendingCommits && pendingCommits.length === 0) {
378+
this.#pendingCommits.delete(partitionSessionId);
379+
}
380+
}
311381
}
312382
});
313383
}
@@ -720,7 +790,7 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
720790
}
721791

722792
nextTick((releasableBufferBytes: bigint) => {
723-
dbg('read %d messages, buffer size is %d bytes, free buffer size is %d bytes', messages.length, this.#maxBufferSize - this.#freeBufferSize, this.#freeBufferSize);
793+
dbg('return %d messages, buffer size is %d bytes, free buffer size is %d bytes', messages.length, this.#maxBufferSize - this.#freeBufferSize, this.#freeBufferSize);
724794

725795
if (releasableBufferBytes > 0n) {
726796
dbg('releasing %d bytes from buffer', releasableBufferBytes);
@@ -754,10 +824,10 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
754824
*
755825
* Throws if the reader is disposed or if any message lacks a partitionSessionId.
756826
*
757-
* @param input - TopicMessage, TopicMessage[], or TopicMessage[] to commit.
758-
* @returns PromiseLike<void> that resolves when the commit is acknowledged.
827+
* @param input - TopicMessage or TopicMessage[] to commit.
828+
* @returns Promise<void> that resolves when the commit is acknowledged.
759829
*/
760-
commit(input: TopicMessage | TopicMessage[]): void {
830+
commit(input: TopicMessage | TopicMessage[]): Promise<void> {
761831
// Check if the reader has been disposed, cannot commit with disposed reader
762832
if (this.#disposed) {
763833
throw new Error('Reader is disposed');
@@ -771,7 +841,7 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
771841

772842
// If input is empty, resolve immediately.
773843
if (input.length === 0) {
774-
return;
844+
return Promise.resolve();
775845
}
776846

777847
// Arrays to hold the final commit request structure
@@ -805,26 +875,28 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
805875
if (partOffsets.length > 0) {
806876
let last = partOffsets[partOffsets.length - 1];
807877

808-
if (offset === last.end + 1n) {
878+
if (offset === last.end) {
809879
// If the new offset is consecutive to the last range, extend the range
810880
// This creates a continuous range (e.g. 1-5 instead of 1-4, 5)
811-
last.end = offset;
812-
} else if (offset > last.end + 1n) {
881+
last.end = offset + 1n;
882+
} else if (offset > last.end) {
813883
// If there's a gap between offsets, create a new range
814884
// This handles non-consecutive offsets properly
815-
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset }));
885+
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset + 1n }));
816886
} else {
817887
// If offset <= last.end, it's either out of order or a duplicate.
818888
throw new Error(`Message with offset ${offset} is out of order or duplicate for partition session ${partitionSession.partitionSessionId}`);
819889
}
820890
} else {
821891
// First offset for this partition, create initial range
822-
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset }));
892+
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset + 1n }));
823893
}
824894
}
825895

826896
// Convert our optimized Map structure into the API's expected format
827897
for (let [partitionSessionId, partOffsets] of offsets.entries()) {
898+
dbg('committing offsets for partition session %s: %o', partitionSessionId, partOffsets);
899+
828900
commitOffsets.push(create(StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, {
829901
partitionSessionId,
830902
offsets: partOffsets
@@ -839,114 +911,58 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
839911
commitOffsets
840912
}
841913
}
842-
}))
914+
}));
915+
916+
// Create a promise that resolves when the commit is acknowledged by the server.
917+
return new Promise((resolve, reject) => {
918+
for (let [partitionSessionId, partOffsets] of offsets.entries()) {
919+
// Create a commit promise for each partition session
920+
let commitPromise: TopicCommitPromise = {
921+
partitionSessionId,
922+
offset: partOffsets[partOffsets.length - 1].end, // Use the last offset in the range
923+
resolve,
924+
reject
925+
};
926+
927+
// Add to pending commits map
928+
if (!this.#pendingCommits.has(partitionSessionId)) {
929+
this.#pendingCommits.set(partitionSessionId, []);
930+
}
843931

844-
// // Lazily initialized promise - only created when someone calls .then() on the returned thenable
845-
// let lazyPromise: Promise<void> | undefined = undefined;
846-
847-
// // Return a "thenable" object (has a then method) rather than a real Promise
848-
// // This design allows for lazy promise initialization only when needed
849-
// // Return a thenable object that resolves when the server acknowledges the commit.
850-
// return {
851-
// // The then method is called when someone awaits or chains .then() on the returned object.
852-
// // eslint-disable-next-line no-thenable
853-
// then: (onFulfilled: (value: void) => any, onRejected?: (reason: any) => any) => {
854-
// // If an AbortSignal is provided and already aborted, reject immediately.
855-
// if (signal?.aborted) {
856-
// return Promise.reject(new Error('Commit aborted', { cause: signal.reason })).then(onFulfilled, onRejected);
857-
// }
858-
859-
// // If the promise was already created (multiple .then() calls), reuse it.
860-
// if (lazyPromise) {
861-
// return lazyPromise.then(onFulfilled, onRejected);
862-
// }
863-
864-
// // Create a new promise and store its resolve/reject functions.
865-
// let { promise, resolve, reject } = Promise.withResolvers<void>();
866-
// lazyPromise = promise;
867-
868-
// // Map to track which partition sessions and offsets we are waiting to be committed.
869-
// let waitingCommits: Map<bigint, bigint> = new Map();
870-
// for (let [partitionSessionId, partOffsets] of offsets.entries()) {
871-
// // Store the highest offset for each partition session.
872-
// waitingCommits.set(partitionSessionId, partOffsets[partOffsets.length - 1].end);
873-
// }
874-
875-
// // Handler for server messages: checks if the commitOffsetResponse covers all requested offsets.
876-
// let handle = (message: StreamReadMessage_FromServer) => {
877-
// if (message.serverMessage.case === 'commitOffsetResponse') {
878-
// for (let part of message.serverMessage.value.partitionsCommittedOffsets) {
879-
// if (!waitingCommits.has(part.partitionSessionId)) {
880-
// continue;
881-
// }
882-
// let committedOffset = waitingCommits.get(part.partitionSessionId)!
883-
// // If the server committed at least up to the requested offset, remove from waiting.
884-
// if (part.committedOffset >= committedOffset) {
885-
// waitingCommits.delete(part.partitionSessionId);
886-
// }
887-
// }
888-
// // If all partitions are committed, resolve the promise and cleanup listeners.
889-
// if (waitingCommits.size > 0) {
890-
// return;
891-
// }
892-
// resolve();
893-
// cleanup();
894-
// }
895-
// };
896-
897-
// // Handler for errors: reject the promise and cleanup listeners.
898-
// let handleError = (err: unknown) => {
899-
// reject(err);
900-
// cleanup();
901-
// };
902-
903-
// // Handler for stream end: reject the promise and cleanup listeners.
904-
// let handleEnd = () => {
905-
// reject(new Error('Stream closed'));
906-
// cleanup();
907-
// };
908-
909-
// // Handler for abort signal: reject the promise and cleanup listeners.
910-
// let handleAbort: (() => void) | undefined;
911-
// if (signal) {
912-
// handleAbort = () => {
913-
// reject(new Error('Commit aborted', { cause: signal.reason }));
914-
// cleanup();
915-
// };
916-
// if (signal.aborted) {
917-
// handleAbort();
918-
// return promise.then(onFulfilled, onRejected);
919-
// }
920-
// signal.addEventListener('abort', handleAbort);
921-
// }
922-
923-
// // Cleanup function to remove all listeners after resolution or rejection.
924-
// let cleanup = () => {
925-
// this.#fromServerEmitter.removeListener('message', handle);
926-
// this.#fromServerEmitter.removeListener('error', handleError);
927-
// this.#fromServerEmitter.removeListener('end', handleEnd);
928-
// if (signal && handleAbort) {
929-
// signal.removeEventListener('abort', handleAbort);
930-
// }
931-
// };
932-
933-
// // Register listeners for commit responses, errors, and stream end.
934-
// this.#fromServerEmitter.on('message', handle);
935-
// this.#fromServerEmitter.once('error', handleError);
936-
// this.#fromServerEmitter.once('end', handleEnd);
937-
938-
// // Return the promise, so .then() or await will work as expected.
939-
// return promise.then(onFulfilled, onRejected);
940-
// }
941-
// };
932+
// Push the commit promise to the pending commits for this partition session
933+
this.#pendingCommits.get(partitionSessionId)!.push(commitPromise);
934+
}
935+
});
942936
}
943937

944938
/**
945939
* Disposes the TopicReader instance, cleaning up resources and aborting the stream.
946940
* This method should be called when the reader is no longer needed to prevent memory leaks.
947941
*/
948942
dispose() {
943+
dbg('disposing TopicReader for consumer %s', this.#options.consumer)
944+
949945
this.#disposed = true
946+
this.#buffer.length = 0 // Clear the buffer to release memory
947+
948+
for (let partitionSession of this.#partitionSessions.values()) {
949+
// Stop all partition sessions gracefully
950+
partitionSession.stop();
951+
}
952+
953+
this.#partitionSessions.clear() // Clear partition sessions to release memory
954+
955+
for (let [partitionSessionId, pendingCommits] of this.#pendingCommits.entries()) {
956+
// Reject all pending commits for this partition session
957+
for (let commit of pendingCommits) {
958+
commit.reject(new Error(`Reader disposed, commit for partition session ${partitionSessionId} rejected`));
959+
}
960+
961+
this.#pendingCommits.delete(partitionSessionId); // Remove from pending commits
962+
}
963+
964+
this.#pendingCommits.clear() // Clear pending commits to release memory
965+
950966
this.#controller.abort()
951967
this.#fromClientEmitter.removeAllListeners()
952968
this.#fromServerEmitter.removeAllListeners()

0 commit comments

Comments
 (0)