Skip to content

Commit d22b6a1

Browse files
committed
conat: implement sending entire state of ephemeral stream in one message instead of potentially a large number of small ones
1 parent e545fc0 commit d22b6a1

File tree

1 file changed

+20
-26
lines changed

1 file changed

+20
-26
lines changed

src/packages/nats/sync/core-stream.ts

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import { type ValueType } from "@cocalc/nats/types";
3535
import { EventEmitter } from "events";
3636
import {
3737
type Subscription,
38-
type Message as Msg,
38+
Message,
3939
type Headers,
4040
messageData,
4141
} from "@cocalc/nats/core/client";
@@ -54,7 +54,7 @@ import * as persistClient from "@cocalc/nats/persist/client";
5454
import type { Client } from "@cocalc/nats/core/client";
5555
import jsonStableStringify from "json-stable-stringify";
5656

57-
export interface RawMsg extends Msg {
57+
export interface RawMsg extends Message {
5858
timestamp: number;
5959
seq: number;
6060
sessionId: string;
@@ -267,7 +267,7 @@ export class CoreStream<T = any> extends EventEmitter {
267267
if (done || value == null) {
268268
return;
269269
}
270-
const m = value as Msg;
270+
const m = value as Message;
271271
if (m.headers == null) {
272272
throw Error("missing header");
273273
}
@@ -280,7 +280,7 @@ export class CoreStream<T = any> extends EventEmitter {
280280
}
281281
};
282282

283-
private processPersistentMessage = (m: Msg, noEmit: boolean) => {
283+
private processPersistentMessage = (m: Message, noEmit: boolean) => {
284284
if (m.headers == null) {
285285
throw Error("missing header");
286286
}
@@ -315,7 +315,6 @@ export class CoreStream<T = any> extends EventEmitter {
315315
};
316316

317317
private getAllFromLeader = async ({
318-
maxWait = 30000,
319318
start_seq = 0,
320319
noEmit,
321320
}: { maxWait?: number; start_seq?: number; noEmit?: boolean } = {}) => {
@@ -332,17 +331,12 @@ export class CoreStream<T = any> extends EventEmitter {
332331
let d = 250;
333332
while (this.client != null) {
334333
try {
335-
for await (const raw0 of await this.client.requestMany(
336-
this.subject + ".all",
337-
{ start_seq },
338-
{ maxWait },
339-
)) {
340-
this.lastHeartbeat = Date.now();
341-
if (raw0.headers?.done) {
342-
// done
343-
return;
344-
}
345-
const raw = getRawMsg(raw0);
334+
const resp = await this.client.request(this.subject + ".all", {
335+
start_seq,
336+
});
337+
this.lastHeartbeat = Date.now();
338+
for (const x of resp.data) {
339+
const raw = getRawMsg(new Message(x));
346340
if (
347341
!this.leader &&
348342
this.sessionId &&
@@ -396,14 +390,15 @@ export class CoreStream<T = any> extends EventEmitter {
396390
for await (const raw of sub) {
397391
if (raw.subject.endsWith(".all")) {
398392
const { start_seq = 0 } = raw.data ?? {};
399-
//const payload = this.raw.filter((x)=>x.seq >= start_seq)
400-
for (const [m] of this.raw) {
401-
if (m.seq >= start_seq) {
402-
raw.respond(m.data, { headers: m.headers });
403-
}
404-
}
405-
raw.respond(null, { headers: { done: true } });
406-
continue;
393+
394+
const payload = this.raw
395+
.filter((x) => x[0].seq >= start_seq)
396+
.map((x) => {
397+
const { headers, encoding, raw } = x[0];
398+
return { headers, encoding, raw };
399+
});
400+
401+
raw.respond(payload);
407402
} else if (raw.subject.endsWith(".send")) {
408403
const options = raw.headers?.[COCALC_OPTIONS_HEADER];
409404
let resp;
@@ -414,7 +409,6 @@ export class CoreStream<T = any> extends EventEmitter {
414409
return;
415410
}
416411
raw.respond(resp);
417-
continue;
418412
}
419413
}
420414
};
@@ -828,7 +822,7 @@ export async function cstream<T>(
828822
return await cache(options);
829823
}
830824

831-
function getRawMsg(raw: Msg): RawMsg {
825+
function getRawMsg(raw: Message): RawMsg {
832826
const {
833827
seq = 0,
834828
timestamp = 0,

0 commit comments

Comments
 (0)