Skip to content

Commit c7c2c26

Browse files
committed
feat: implement message batching and conditional last sequence number update in TopicWriter
Signed-off-by: Vladislav Polyakov <polRk@ydb.tech>
1 parent 1a4470b commit c7c2c26

File tree

1 file changed

+57
-2
lines changed

1 file changed

+57
-2
lines changed

packages/topic/src/writer.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,63 @@ export class TopicWriter<Payload = Uint8Array> implements Disposable, AsyncDispo
229229
this.#initialized.resolve(); // Mark the writer as initialized
230230

231231
// Store the last sequence number from the server.
232-
this.#lastSeqNo = message.serverMessage.value.lastSeqNo;
233-
dbg('updating last sequence number to %s', this.#lastSeqNo);
232+
if (!this.#lastSeqNo) {
233+
dbg('setting last sequence number to %s from server response', message.serverMessage.value.lastSeqNo);
234+
this.#lastSeqNo = message.serverMessage.value.lastSeqNo;
235+
}
236+
237+
let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = [];
238+
for (let seqNo of this.#inflight) {
239+
let message = this.#buffer.get(seqNo);
240+
if (!message) {
241+
dbg('message with seqNo %s not found in buffer, skipping', seqNo);
242+
continue; // Skip if the message is not found in the buffer
243+
}
244+
245+
messagesToSend.push(message); // Add the message to the list of messages to send
246+
}
247+
248+
// If inflight messages exist, send them.
249+
if (messagesToSend.length > 0) {
250+
dbg('sending %d in-flight messages after initialization', messagesToSend.length);
251+
252+
let batch: StreamWriteMessage_WriteRequest_MessageData[] = [];
253+
let batchSize = 0n;
254+
255+
// Build batch until size limit or no more messages
256+
while (messagesToSend.length > 0) {
257+
const message = messagesToSend[0];
258+
259+
// Check if adding this message would exceed the batch size limit
260+
if (batchSize + BigInt(message.data.length) > MAX_BATCH_SIZE) {
261+
// If the batch already has messages, send it
262+
if (batch.length > 0) break;
263+
264+
// If this is a single message exceeding the limit, we still need to send it
265+
dbg('large message of size %d bytes exceeds threshold, sending in its own batch', message.data.length);
266+
batch.push(messagesToSend.shift()!);
267+
break;
268+
}
269+
270+
// Add message to current batch
271+
batch.push(messagesToSend.shift()!);
272+
batchSize += BigInt(message.data.length);
273+
}
274+
275+
// Send the batch
276+
if (batch.length > 0) {
277+
dbg('sending batch of %d messages (%d bytes)', batch.length, batchSize);
278+
this.#fromClientEmitter.emit('message', create(StreamWriteMessage_FromClientSchema, {
279+
clientMessage: {
280+
case: 'writeRequest',
281+
value: {
282+
messages: batch,
283+
codec: this.#options.compression?.codec || Codec.RAW,
284+
}
285+
}
286+
}));
287+
}
288+
}
234289
}
235290

236291
if (message.serverMessage.case === 'writeResponse') {

0 commit comments

Comments
 (0)