Skip to content

Commit 40a80a2

Browse files
committed
conat: more unit tests of dstream
1 parent d22b6a1 commit 40a80a2

File tree

3 files changed

+72
-20
lines changed

3 files changed

+72
-20
lines changed

src/packages/nats/sync/dstream.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import { COCALC_MESSAGE_ID_HEADER } from "./core-stream";
4848

4949
const logger = getLogger("dstream");
5050

51-
const MAX_PARALLEL = 250;
51+
const MAX_PARALLEL = 50;
5252

5353
export interface DStreamOptions extends StreamOptions {
5454
noAutosave?: boolean;
@@ -75,7 +75,6 @@ export class DStream<T = any> extends EventEmitter {
7575

7676
constructor(opts: DStreamOptions) {
7777
super();
78-
7978
if (
8079
opts.noInventory ||
8180
opts.ephemeral ||
@@ -88,12 +87,10 @@ export class DStream<T = any> extends EventEmitter {
8887
this.noAutosave = !!opts.noAutosave;
8988
this.name = opts.name;
9089
this.stream =
91-
opts.ephemeral || opts.persist
92-
? new CoreStream(opts)
93-
: new Stream(opts);
90+
opts.ephemeral || opts.persist ? new CoreStream(opts) : new Stream(opts);
9491
this.messages = this.stream.messages;
9592
this.raw = this.stream.raw;
96-
if (!this.noAutosave) {
93+
if (!opts.ephemeral && !this.noAutosave) {
9794
this.client = getClient();
9895
this.client.on("connected", this.save);
9996
}
@@ -242,7 +239,7 @@ export class DStream<T = any> extends EventEmitter {
242239
}
243240
return obj;
244241
}
245-
return this.opts.env.jc.decode(this.opts.env.jc.encode(obj));
242+
return obj;
246243
};
247244

248245
publish = (
@@ -453,12 +450,20 @@ export class DStream<T = any> extends EventEmitter {
453450
);
454451
}
455452

456-
export const cache = refCache<UserStreamOptions, DStream>({
453+
type CreateOptions = UserStreamOptions & {
454+
noAutosave?: boolean;
455+
noInventory?: boolean;
456+
leader?: boolean;
457+
ephemeral?: boolean;
458+
persist?: boolean;
459+
};
460+
461+
export const cache = refCache<CreateOptions, DStream>({
457462
name: "dstream",
458463
createKey: userStreamOptionsKey,
459464
createObject: async (options) => {
460-
await waitUntilConnected();
461-
if (options.env == null) {
465+
if (options.env == null && !options.ephemeral) {
466+
await waitUntilConnected();
462467
options.env = await getEnv();
463468
}
464469
const { account_id, project_id, name, valueType = "json" } = options;
@@ -483,14 +488,6 @@ export const cache = refCache<UserStreamOptions, DStream>({
483488
},
484489
});
485490

486-
export async function dstream<T>(
487-
options: UserStreamOptions & {
488-
noAutosave?: boolean;
489-
noInventory?: boolean;
490-
leader?: boolean;
491-
ephemeral?: boolean;
492-
persist?: boolean;
493-
},
494-
): Promise<DStream<T>> {
491+
export async function dstream<T>(options: CreateOptions): Promise<DStream<T>> {
495492
return await cache(options);
496493
}

src/packages/nats/sync/stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ import jsonStableStringify from "json-stable-stringify";
7070
import { asyncDebounce } from "@cocalc/util/async-utils";
7171
import { waitUntilReady } from "@cocalc/nats/tiered-storage/client";
7272
import { COCALC_MESSAGE_ID_HEADER } from "./core-stream";
73+
import type { Client } from "@cocalc/nats/core/client";
7374

7475
interface RawMsg extends Msg {
7576
timestamp: number;
@@ -921,13 +922,14 @@ export interface UserStreamOptions {
921922
noCache?: boolean;
922923
desc?: JSONValue;
923924
valueType?: ValueType;
925+
client?: Client;
924926
}
925927

926928
export function userStreamOptionsKey(options: UserStreamOptions) {
927929
if (!options.name) {
928930
throw Error("name must be specified");
929931
}
930-
const { env, ...x } = options;
932+
const { env, client, ...x } = options;
931933
return jsonStableStringify(x);
932934
}
933935

src/packages/server/nats/test/core/core-stream-ephemeral.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { wait } from "@cocalc/server/nats/test/util";
88
import type { Client } from "@cocalc/nats/core/client";
99
import { is_date as isDate } from "@cocalc/util/misc";
10+
import { dstream } from "@cocalc/nats/sync/dstream";
1011

1112
beforeAll(before);
1213

@@ -248,4 +249,56 @@ describe("test creating a non-leader first, then the leader", () => {
248249
});
249250
});
250251

252+
describe("test using ephemeral dstream", () => {
253+
let client;
254+
let stream;
255+
let name = `test-${Math.random()}`;
256+
257+
it("creates an ephemeral dstream", async () => {
258+
client = connect();
259+
stream = await dstream({ client, name, ephemeral: true, leader: true });
260+
expect(stream.length).toBe(0);
261+
});
262+
263+
it("publishes a value", () => {
264+
stream.publish(0);
265+
expect(stream.getAll()).toEqual([0]);
266+
});
267+
268+
// [ ] TODO: this is be fast even for count=10000,
269+
// but it is NOT. We use a smaller value for now.
270+
const count = 100;
271+
it(`publish ${count} messages`, async () => {
272+
const v: number[] = [0];
273+
for (let i = 1; i < count; i++) {
274+
stream.publish(i);
275+
v.push(i);
276+
expect(stream.get(i)).toBe(i);
277+
expect(stream.length).toBe(i + 1);
278+
}
279+
expect(stream.length).toBe(count);
280+
expect(stream.getAll()).toEqual(v);
281+
await stream.save();
282+
});
283+
284+
let client2;
285+
let stream2;
286+
it("opens a second dstream non-leader", async () => {
287+
client2 = connect();
288+
stream2 = await dstream({
289+
client: client2,
290+
name,
291+
ephemeral: true,
292+
leader: true,
293+
});
294+
expect(stream2.length).toBe(count);
295+
});
296+
297+
it("write to the second stream and see reflected in the first", async () => {
298+
await stream2.publish("x");
299+
wait({ until: () => stream.length == 101 });
300+
expect(stream.get(stream.length - 1)).toBe("x");
301+
});
302+
});
303+
251304
afterAll(after);

0 commit comments

Comments
 (0)