Skip to content

Commit 52016ff

Browse files
committed
conat: update dstream-ephemeral tests (and fix a bug)
1 parent f9d0ace commit 52016ff

File tree

5 files changed

+87
-43
lines changed

5 files changed

+87
-43
lines changed

src/packages/backend/conat/test/core/core-stream-ephemeral.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ describe("test creating a non-leader first, then the leader", () => {
249249
});
250250
});
251251

252+
// There a lot more similar tests of dstream ephemeral in backend/conat/test/sync/dstream-ephemeral.test.ts
252253
describe("test using ephemeral dstream", () => {
253254
let client;
254255
let stream;

src/packages/backend/conat/test/sync/estream.test.ts renamed to src/packages/backend/conat/test/sync/dstream-ephemeral.test.ts

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,25 @@ The first tests are initially similar to those for dstream.test.ts, but with
66
77
DEVELOPMENT:
88
9-
pnpm test estream.test.ts
9+
pnpm test ./dstream-estream.test.ts
1010
1111
*/
1212

13+
import { connect, before, after } from "@cocalc/backend/conat/test/setup";
1314
import { createDstreamEphemeral as create } from "./util";
1415
import { dstream as createDstream0 } from "@cocalc/backend/conat/sync";
1516
import { once } from "@cocalc/util/async-utils";
1617

18+
beforeAll(before);
19+
1720
async function createDstream<T>(opts) {
18-
return await createDstream0<T>({ ephemeral: true, leader: true, ...opts });
21+
return await createDstream0<T>({
22+
noCache: true,
23+
noAutosave: true,
24+
ephemeral: true,
25+
leader: true,
26+
...opts,
27+
});
1928
}
2029

2130
describe("create a dstream and do some basic operations", () => {
@@ -65,14 +74,11 @@ describe("create a dstream and do some basic operations", () => {
6574
describe("create two dstreams and observe sync between them", () => {
6675
const name = `test-${Math.random()}`;
6776
let s1, s2;
77+
let client2;
6878
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
69-
s1 = await createDstream({ name, noAutosave: true, noCache: true });
70-
s2 = await createDstream({
71-
name,
72-
noAutosave: true,
73-
noCache: true,
74-
leader: false,
75-
});
79+
client2 = connect();
80+
s1 = await createDstream({ name });
81+
s2 = await createDstream({ client: client2, name, leader: false });
7682
// definitely distinct
7783
expect(s1 === s2).toBe(false);
7884
});
@@ -108,21 +114,50 @@ describe("create two dstreams and observe sync between them", () => {
108114
expect(s2.getAll()).toEqual(["hello", "hi from s2"]);
109115
});
110116

111-
it("write to s1 and s2 and save at the same time and see some 'random choice' of order gets imposed by the server", async () => {
117+
it("cleans up", () => {
118+
s1.close();
119+
s2.close();
120+
client2.close();
121+
});
122+
});
123+
124+
describe("create two dstreams and test sync with parallel save", () => {
125+
const name = `test-${Math.random()}`;
126+
let s1, s2;
127+
let client2;
128+
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
129+
client2 = connect();
130+
s1 = await createDstream({ name });
131+
s2 = await createDstream({ client: client2, name, leader: false });
132+
// definitely distinct
133+
expect(s1 === s2).toBe(false);
134+
});
135+
136+
it("write to s1 and s2 and save at the same time", async () => {
112137
s1.push("s1");
113138
s2.push("s2");
114139
// our changes are reflected locally
115-
expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1"]);
116-
expect(s2.getAll()).toEqual(["hello", "hi from s2", "s2"]);
140+
expect(s1.getAll()).toEqual(["s1"]);
141+
expect(s2.getAll()).toEqual(["s2"]);
117142
// now kick off the two saves *in parallel*
118143
s1.save();
119144
s2.save();
120145
await once(s1, "change");
121-
while (s2.length != s1.length) {
122-
await once(s2, "change");
146+
while (s2.length != 2 || s1.length != 2) {
147+
if (s1.length > 2 || s2.length > 2) {
148+
throw Error("bug");
149+
}
150+
if (s2.length < 2) {
151+
await once(s2, "change");
152+
} else if (s1.length < 2) {
153+
await once(s1, "change");
154+
}
123155
}
124156
expect(s1.getAll()).toEqual(s2.getAll());
125-
expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1", "s2"]);
157+
});
158+
159+
it("cleans up", () => {
160+
client2.close();
126161
});
127162
});
128163

@@ -195,7 +230,9 @@ describe("testing start_seq", () => {
195230

196231
let t;
197232
it("it opens another copy of the stream, but starting with the last sequence number, so only one message", async () => {
233+
const client = connect();
198234
t = await createDstream({
235+
client,
199236
name,
200237
noAutosave: true,
201238
leader: false,
@@ -227,8 +264,9 @@ describe("a little bit of a stress test", () => {
227264
s.push({ i });
228265
}
229266
expect(s.length).toBe(count);
230-
// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,
231-
// running under jest, hence why count is small.
267+
// [ ] TODO rewrite this save to send everything in a single message
268+
// which gets chunked, will we be much faster, then change the count
269+
// above to 1000 or 10000.
232270
await s.save();
233271
expect(s.length).toBe(count);
234272
});
@@ -249,50 +287,52 @@ describe("dstream typescript test", () => {
249287
});
250288
});
251289

252-
import { numSubscriptions } from "@cocalc/conat/client";
290+
describe("ensure there isn't a really obvious subscription leak", () => {
291+
let client;
253292

254-
describe("ensure there are no NATS subscription leaks", () => {
255-
// There is some slight slack at some point due to the clock stuff,
256-
// inventory, etc. It is constant and small, whereas we allocate
257-
// a large number of kv's in the test.
258-
const SLACK = 4;
293+
it("create a client, which initially has only one subscription (the inbox)", async () => {
294+
client = connect();
295+
expect(client.numSubscriptions()).toBe(1);
296+
});
259297

260-
it("creates and closes many kv's and checks there is no leak", async () => {
261-
const before = numSubscriptions();
262-
const COUNT = 20;
298+
const count = 100;
299+
it(`creates and closes ${count} streams and checks there is no leak`, async () => {
300+
const before = client.numSubscriptions();
263301
// create
264302
const a: any = [];
265-
for (let i = 0; i < COUNT; i++) {
303+
for (let i = 0; i < count; i++) {
266304
a[i] = await createDstream({
267305
name: `${Math.random()}`,
268-
noAutosave: true,
269306
});
270307
}
271-
for (let i = 0; i < COUNT; i++) {
308+
for (let i = 0; i < count; i++) {
272309
await a[i].close();
273310
}
274-
const after = numSubscriptions();
275-
expect(Math.abs(after - before)).toBeLessThan(SLACK);
311+
const after = client.numSubscriptions();
312+
expect(after).toBe(before);
313+
314+
// also check count on server went down.
315+
expect((await client.getSubscriptions()).size).toBe(before);
276316
});
277317

278-
it("does another leak test, but with a set operation each time", async () => {
279-
const before = numSubscriptions();
280-
const COUNT = 20;
318+
it("does another leak test, but with a publish operation each time", async () => {
319+
const before = client.numSubscriptions();
281320
// create
282321
const a: any = [];
283-
for (let i = 0; i < COUNT; i++) {
322+
for (let i = 0; i < count; i++) {
284323
a[i] = await createDstream({
285324
name: `${Math.random()}`,
286325
noAutosave: true,
287326
});
288327
a[i].publish(i);
289328
await a[i].save();
290329
}
291-
for (let i = 0; i < COUNT; i++) {
292-
await a[i].purge();
330+
for (let i = 0; i < count; i++) {
293331
await a[i].close();
294332
}
295-
const after = numSubscriptions();
296-
expect(Math.abs(after - before)).toBeLessThan(SLACK);
333+
const after = client.numSubscriptions();
334+
expect(after).toBe(before);
297335
});
298336
});
337+
338+
afterAll(after);

src/packages/conat/core/client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ export class Client {
395395
}
396396
};
397397

398+
numSubscriptions = () => Object.keys(this.queueGroups).length;
399+
398400
private getSubscriptions = async (): Promise<Set<string>> => {
399401
const f = (cb) =>
400402
this.conn.emit("subscriptions", null, (subs) => cb(undefined, subs));

src/packages/conat/sync/core-stream.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ export const ENFORCE_LIMITS_THROTTLE_MS = process.env.COCALC_TEST_MODE
6767
const HEADER_PREFIX = "CoCalc-";
6868

6969
export const COCALC_MESSAGE_ID_HEADER = `${HEADER_PREFIX}Msg-Id`;
70-
7170
export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;
7271
export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;
7372
export const COCALC_HEARTBEAT_HEADER = `${HEADER_PREFIX}Heartbeat`;
@@ -618,8 +617,8 @@ export class CoreStream<T = any> extends EventEmitter {
618617
seq,
619618
timestamp,
620619
sessionId: this.sessionId,
621-
msgID: options?.msgID,
622620
},
621+
[COCALC_MESSAGE_ID_HEADER]: options?.msgID,
623622
} as any;
624623
if (options?.headers) {
625624
for (const k in options.headers) {
@@ -719,7 +718,7 @@ export class CoreStream<T = any> extends EventEmitter {
719718

720719
// get server assigned time of n-th message in stream
721720
time = (n: number): Date | undefined => {
722-
const r = this.raw[n][0];
721+
const r = this.raw[n]?.[0];
723722
if (r == null) {
724723
return;
725724
}

src/packages/conat/sync/dstream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import { waitUntilConnected } from "@cocalc/conat/util";
4545
import { type Msg } from "@nats-io/nats-core";
4646
import { headersFromRawMessages } from "./stream";
4747
import { COCALC_MESSAGE_ID_HEADER } from "./core-stream";
48+
import type { Client } from "@cocalc/conat/core/client";
4849

4950
const logger = getLogger("dstream");
5051

@@ -56,6 +57,7 @@ export interface DStreamOptions extends StreamOptions {
5657
ephemeral?: boolean;
5758
persist?: boolean;
5859
leader?: boolean;
60+
client?: Client;
5961
}
6062

6163
export class DStream<T = any> extends EventEmitter {
@@ -111,7 +113,7 @@ export class DStream<T = any> extends EventEmitter {
111113
delete this.saved[last(raw).seq];
112114
const headers = headersFromRawMessages(raw);
113115
if (headers?.[COCALC_MESSAGE_ID_HEADER]) {
114-
// this is critical with conat-stream.ts, since otherwise there is a moment
116+
// this is critical with core-stream.ts, since otherwise there is a moment
115117
// when the same message is in both this.local *and* this.messages, and you'll
116118
// see it doubled in this.getAll(). I didn't see this ever with
117119
// stream.ts, but maybe it is possible. It probably wouldn't impact any application,

0 commit comments

Comments
 (0)