@@ -5,7 +5,7 @@ import { nextTick } from "node:process";
5
5
import { create , protoInt64 , toJson } from "@bufbuild/protobuf" ;
6
6
import { type Duration , DurationSchema , type Timestamp , timestampDate , timestampFromDate } from "@bufbuild/protobuf/wkt" ;
7
7
import { StatusIds_StatusCode } from "@ydbjs/api/operation" ;
8
- import { type OffsetsRange , OffsetsRangeSchema , type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset , StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema , type StreamReadMessage_FromClient , StreamReadMessage_FromClientSchema , type StreamReadMessage_FromServer , StreamReadMessage_FromServerSchema , type StreamReadMessage_InitRequest_TopicReadSettings , StreamReadMessage_InitRequest_TopicReadSettingsSchema , type StreamReadMessage_ReadResponse , TopicServiceDefinition } from "@ydbjs/api/topic" ;
8
+ import { Codec , type OffsetsRange , OffsetsRangeSchema , type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset , StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema , type StreamReadMessage_FromClient , StreamReadMessage_FromClientSchema , type StreamReadMessage_FromServer , StreamReadMessage_FromServerSchema , type StreamReadMessage_InitRequest_TopicReadSettings , StreamReadMessage_InitRequest_TopicReadSettingsSchema , type StreamReadMessage_ReadResponse , TopicServiceDefinition } from "@ydbjs/api/topic" ;
9
9
import type { Driver } from "@ydbjs/core" ;
10
10
import { YDBError } from "@ydbjs/error" ;
11
11
import { type RetryConfig , retry } from "@ydbjs/retry" ;
@@ -91,15 +91,33 @@ type TopicCommitPromise = {
91
91
}
92
92
93
93
export type TopicReaderOptions < Payload = Uint8Array > = {
94
+ // Topic path or an array of topic sources.
94
95
topic : string | TopicReaderSource | TopicReaderSource [ ]
96
+ // Consumer name.
95
97
consumer : string
98
+ // Maximum size of the internal buffer in bytes.
99
+ // If not provided, the default is 1MB.
96
100
maxBufferBytes ?: bigint
101
+ // How often to update the token in milliseconds.
97
102
updateTokenIntervalMs ?: number
98
-
103
+ // Compression options for the payload.
104
+ compression ?: {
105
+ // Custom decompression function that can be used to decompress the payload before emitting it.
106
+ decompress ?( codec : Codec , payload : Uint8Array ) : Uint8Array | Promise < Uint8Array >
107
+ }
108
+ // Custom decode function to decode the payload.
109
+ // If not provided, the payload will be returned as is.
110
+ // Decode function calls after decompression, if compression is used.
99
111
decode ?( payload : Uint8Array ) : Payload
100
-
112
+ // Hooks for partition session events.
113
+ // Called when a partition session is started.
114
+ // It can be used to initialize the partition session, for example, to set the read offset.
101
115
onPartitionSessionStart ?: onPartitionSessionStartCallback
116
+ // Called when a partition session is stopped.
117
+ // It can be used to commit the offsets for the partition session.
102
118
onPartitionSessionStop ?: onPartitionSessionStopCallback
119
+ // Called when receive commit offset response from server.
120
+ // This callback is called after the offsets are committed to the server.
103
121
onCommittedOffset ?: onCommittedOffsetCallback
104
122
}
105
123
@@ -250,6 +268,11 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
250
268
}
251
269
}
252
270
271
+ // If there are no pending commits for this partition session, remove it from the map.
272
+ if ( pendingCommits && pendingCommits . length === 0 ) {
273
+ this . #pendingCommits. delete ( partitionSession . partitionSessionId ) ;
274
+ }
275
+
253
276
this . #fromClientEmitter. emit ( 'message' , create ( StreamReadMessage_FromClientSchema , {
254
277
clientMessage : {
255
278
case : 'startPartitionSessionResponse' ,
@@ -576,7 +599,7 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
576
599
read ( options : { limit ?: number , waitMs ?: number , signal ?: AbortSignal } = { } ) : AsyncIterable < TopicMessage < Payload > [ ] > {
577
600
let limit = options . limit || Infinity ,
578
601
signal = options . signal ,
579
- waitMs = options . waitMs || 60000 ;
602
+ waitMs = options . waitMs || 60_000 ;
580
603
581
604
// Check if the reader has been disposed, cannot read with disposed reader
582
605
if ( this . #disposed) {
@@ -750,14 +773,34 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
750
773
break ;
751
774
}
752
775
776
+ let data = msg . data ;
777
+ if ( batch . codec !== Codec . UNSPECIFIED ) {
778
+ if ( ! this . #options. compression || ! this . #options. compression . decompress ) {
779
+ dbg ( 'error: cannot decompress message with codec %s, no decompression function provided' , batch . codec ) ;
780
+
781
+ throw new Error ( `Cannot decompress message with codec ${ batch . codec } , no decompression function provided` ) ;
782
+ }
783
+
784
+ // Decompress the message data using the provided decompress function
785
+ try {
786
+ // eslint-disable-next-line no-await-in-loop
787
+ data = await this . #options. compression . decompress ( batch . codec , msg . data ) ;
788
+ } catch ( err ) {
789
+ dbg ( 'error: decompression failed for message with codec %s: %O' , batch . codec , err ) ;
790
+
791
+ throw new Error ( `Decompression failed for message with codec ${ batch . codec } ` , { cause : err } ) ;
792
+ }
793
+ }
794
+
753
795
// Process the message
754
796
let message : TopicMessage < Payload > = {
755
797
partitionSessionId : partitionSession . partitionSessionId ,
756
798
partitionId : partitionSession . partitionId ,
757
799
producerId : batch . producerId ,
758
800
seqNo : msg . seqNo ,
759
801
offset : msg . offset ,
760
- payload : this . #options. decode ! ( msg . data ) ,
802
+ payload : this . #options. decode ?.( data ) || data as Payload ,
803
+ uncompressedSize : msg . uncompressedSize ,
761
804
createdAt : msg . createdAt ? timestampDate ( msg . createdAt ) : undefined ,
762
805
writtenAt : batch . writtenAt ? timestampDate ( batch . writtenAt ) : undefined ,
763
806
metadataItems : msg . metadataItems ? Object . fromEntries ( msg . metadataItems . map ( item => [ item . key , item . value ] ) ) : undefined ,
@@ -940,10 +983,14 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
940
983
* This method should be called when the reader is no longer needed to prevent memory leaks.
941
984
*/
942
985
dispose ( ) {
943
- dbg ( 'disposing TopicReader for consumer %s' , this . #options. consumer )
986
+ if ( this . #disposed) {
987
+ return ; // Already disposed, nothing to do
988
+ }
989
+ this . #disposed = true ;
990
+ dbg ( 'disposing TopicReader for consumer %s' , this . #options. consumer ) ;
944
991
945
- this . #disposed = true
946
992
this . #buffer. length = 0 // Clear the buffer to release memory
993
+ this . #freeBufferSize = this . #maxBufferSize; // Reset free buffer size to max buffer size
947
994
948
995
for ( let partitionSession of this . #partitionSessions. values ( ) ) {
949
996
// Stop all partition sessions gracefully
@@ -963,11 +1010,11 @@ export class TopicReader<Payload = Uint8Array> implements Disposable {
963
1010
964
1011
this . #pendingCommits. clear ( ) // Clear pending commits to release memory
965
1012
966
- this . #controller. abort ( )
967
1013
this . #fromClientEmitter. removeAllListeners ( )
968
1014
this . #fromServerEmitter. removeAllListeners ( )
969
1015
970
1016
clearInterval ( this . #updateTokenTicker)
1017
+ this . #controller. abort ( )
971
1018
}
972
1019
973
1020
[ Symbol . dispose ] ( ) {
0 commit comments