Skip to content

Commit 771ef55

Browse files
committed
conat: get dstream tests working
1 parent 1492393 commit 771ef55

File tree

6 files changed

+82
-48
lines changed

6 files changed

+82
-48
lines changed

src/packages/backend/conat/test/core/connect.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
/*
2+
23
pnpm test ./connect.test.ts
4+
35
*/
46

57
import { getPort } from "@cocalc/backend/conat/test/util";
@@ -55,6 +57,8 @@ describe("create server *after* client and ensure connects properly", () => {
5557
});
5658

5759
describe("create server after sync creating a subscription and publishing a message, and observe that messages are dropped", () => {
60+
// The moral here is do NOT use subscribeSync and publishSync
61+
// unless you don't care very much...
5862
let cn;
5963
it("starts a client, despite there being no server yet", async () => {
6064
cn = connect(`http://localhost:${port}`, { path });
@@ -73,6 +77,7 @@ describe("create server after sync creating a subscription and publishing a mess
7377
it("start the server", async () => {
7478
server = await initConatServer({ port, path });
7579
await wait({ until: () => cn.conn.connected });
80+
await delay(50);
7681
});
7782

7883
it("see that both messages we sent before connecting were dropped", async () => {

src/packages/backend/conat/test/sync/dstream.test.ts

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ pnpm test ./dstream.test.ts
99

1010
import { createDstream as create } from "./util";
1111
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12+
import { once } from "@cocalc/util/async-utils";
13+
import { connect, before, after } from "@cocalc/backend/conat/test/setup";
14+
15+
beforeAll(before);
1216

1317
describe("create a dstream and do some basic operations", () => {
1418
let s;
@@ -54,7 +58,6 @@ describe("create a dstream and do some basic operations", () => {
5458
});
5559
});
5660

57-
/*
5861
describe("create two dstreams and observe sync between them", () => {
5962
const name = `test-${Math.random()}`;
6063
let s1, s2;
@@ -93,13 +96,16 @@ describe("create two dstreams and observe sync between them", () => {
9396
// now kick off the two saves *in parallel*
9497
s1.save();
9598
s2.save();
96-
await once(s1, "change");
97-
if (s2.length != s1.length) {
99+
while (s1.length < 4) {
100+
await once(s1, "change");
101+
}
102+
while (s2.length < 4) {
98103
await once(s2, "change");
99104
}
100105
expect(s1.getAll()).toEqual(s2.getAll());
101-
// in fact s1,s2 is the order since we called s1.save first:
102-
expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1", "s2"]);
106+
expect(new Set(s1.getAll())).toEqual(
107+
new Set(["hello", "hi from s2", "s1", "s2"]),
108+
);
103109
});
104110
});
105111

@@ -156,7 +162,7 @@ describe("closing also saves by default, but not if autosave is off", () => {
156162

157163
it("creates stream and write a message", async () => {
158164
// noAutosave: false is the default:
159-
s = await createDstream({ name, noAutosave: false});
165+
s = await createDstream({ name, noAutosave: false });
160166
s.push(389);
161167
});
162168

@@ -210,7 +216,7 @@ describe("testing start_seq", () => {
210216
expect(s.start_seq).toEqual(seq[2]);
211217
});
212218

213-
it("it then pulls in the previous message, so now two messages are loaded", async () => {
219+
it.skip("it then pulls in the previous message, so now two messages are loaded", async () => {
214220
await s.load({ start_seq: seq[1] });
215221
expect(s.length).toBe(2);
216222
expect(s.getAll()).toEqual([2, 3]);
@@ -253,51 +259,52 @@ describe("dstream typescript test", () => {
253259
});
254260
});
255261

256-
import { numSubscriptions } from "@cocalc/conat/client";
262+
describe("ensure there isn't a really obvious subscription leak", () => {
263+
let client;
257264

258-
describe("ensure there are no NATS subscription leaks", () => {
259-
// There is some slight slack at some point due to the clock stuff,
260-
// inventory, etc. It is constant and small, whereas we allocate
261-
// a large number of kv's in the test.
262-
const SLACK = 4;
265+
it("create a client, which initially has only one subscription (the inbox)", async () => {
266+
client = connect();
267+
expect(client.numSubscriptions()).toBe(1);
268+
});
263269

264-
it("creates and closes many kv's and checks there is no leak", async () => {
265-
const before = numSubscriptions();
266-
const COUNT = 20;
270+
const count = 100;
271+
it(`creates and closes ${count} streams and checks there is no leak`, async () => {
272+
const before = client.numSubscriptions();
267273
// create
268274
const a: any = [];
269-
for (let i = 0; i < COUNT; i++) {
275+
for (let i = 0; i < count; i++) {
270276
a[i] = await createDstream({
271277
name: `${Math.random()}`,
272-
noAutosave: true,
273278
});
274279
}
275-
for (let i = 0; i < COUNT; i++) {
280+
for (let i = 0; i < count; i++) {
276281
await a[i].close();
277282
}
278-
const after = numSubscriptions();
279-
expect(Math.abs(after - before)).toBeLessThan(SLACK);
283+
const after = client.numSubscriptions();
284+
expect(after).toBe(before);
285+
286+
// also check count on server went down.
287+
expect((await client.getSubscriptions()).size).toBe(before);
280288
});
281289

282-
it("does another leak test, but with a set operation each time", async () => {
283-
const before = numSubscriptions();
284-
const COUNT = 20;
290+
it("does another leak test, but with a publish operation each time", async () => {
291+
const before = client.numSubscriptions();
285292
// create
286293
const a: any = [];
287-
for (let i = 0; i < COUNT; i++) {
294+
for (let i = 0; i < count; i++) {
288295
a[i] = await createDstream({
289296
name: `${Math.random()}`,
290297
noAutosave: true,
291298
});
292299
a[i].publish(i);
293300
await a[i].save();
294301
}
295-
for (let i = 0; i < COUNT; i++) {
296-
await a[i].purge();
302+
for (let i = 0; i < count; i++) {
297303
await a[i].close();
298304
}
299-
const after = numSubscriptions();
300-
expect(Math.abs(after - before)).toBeLessThan(SLACK);
305+
const after = client.numSubscriptions();
306+
expect(after).toBe(before);
301307
});
302308
});
303-
*/
309+
310+
afterAll(after);

src/packages/conat/persist/server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ async function listenPersist() {
204204
throw Error("must call init first");
205205
}
206206
for await (const mesg of sub) {
207-
console.log("got mesg = ", { data: mesg.data, headers: mesg.headers });
207+
//console.log("got mesg = ", { data: mesg.data, headers: mesg.headers });
208208
if (terminated) {
209209
return;
210210
}
@@ -221,7 +221,7 @@ function metrics() {
221221

222222
async function handleMessage(mesg) {
223223
const request = mesg.headers;
224-
console.log("handleMessage", { data: mesg.data, headers: mesg.headers });
224+
//console.log("handleMessage", { data: mesg.data, headers: mesg.headers });
225225
const user_id = getUserId(mesg.subject);
226226

227227
// [ ] TODO: permissions and sanity checks!
@@ -241,7 +241,7 @@ async function handleMessage(mesg) {
241241
mesg.respond({ resp });
242242
} else if (request.cmd == "get") {
243243
const resp = stream.get({ key: request.key, seq: request.seq });
244-
console.log("got resp = ", resp);
244+
//console.log("got resp = ", resp);
245245
if (resp == null) {
246246
mesg.respond(null);
247247
} else {
@@ -266,7 +266,7 @@ async function handleMessage(mesg) {
266266
}
267267

268268
async function getAll({ mesg, request, user_id, stream }) {
269-
console.log("getAll", request);
269+
//console.log("getAll", request);
270270
// getAll sends multiple responses
271271
let seq = 0;
272272
let lastSend = 0;
@@ -284,7 +284,7 @@ async function getAll({ mesg, request, user_id, stream }) {
284284
}
285285
lastSend = Date.now();
286286
if ((content as StoredMessage)?.raw != null) {
287-
console.log("content = ", content);
287+
// console.log("content = ", content);
288288
// StoredMessage
289289
const { raw, encoding, time, key, ...headers } = content as StoredMessage;
290290
mesg.respond(null, {

src/packages/conat/persist/storage.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,21 +172,21 @@ export class PersistentStream extends EventEmitter {
172172
}
173173
return this.db
174174
.prepare(
175-
"INSERT INTO messages(time, compress, encoding, raw, headers, key) VALUES (?, ?, ?, ?, ?, ?)",
175+
"INSERT INTO messages(time, compress, encoding, raw, headers, key) VALUES (?, ?, ?, ?, ?, ?) RETURNING seq",
176176
)
177-
.run(time / 1000, compress, encoding, raw, headers, key);
177+
.get(time / 1000, compress, encoding, raw, headers, key);
178178
},
179179
);
180-
const { lastInsertRowid } = tx(
180+
const row = tx(
181181
time,
182182
compressedRaw.compress,
183183
encoding,
184184
compressedRaw.raw,
185185
serializedHeaders,
186186
key,
187187
);
188+
const seq = Number((row as any).seq);
188189
// lastInsertRowid - is a bigint from sqlite, but we won't hit that limit
189-
const seq = Number(lastInsertRowid);
190190
this.emit("change", { seq, time, key, encoding, raw, headers });
191191
return { time, seq };
192192
};

src/packages/conat/sync/core-stream.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,12 @@ export class CoreStream<T = any> extends EventEmitter {
254254
if (this.storage == null) {
255255
throw Error("bug -- storage must be set");
256256
}
257-
const { id, stream } = await persistClient.getAll({
257+
const { stream } = await persistClient.getAll({
258258
user: this.user,
259259
storage: this.storage,
260260
start_seq,
261261
});
262-
console.log("got persistent stream", { id });
262+
//console.log("got persistent stream", { id });
263263
this.persistStream = stream;
264264
while (true) {
265265
const { value, done } = await stream.next();
@@ -288,7 +288,11 @@ export class CoreStream<T = any> extends EventEmitter {
288288
// switched to watch mode
289289
return;
290290
}
291-
const { key, seq, time, headers } = m.headers;
291+
const { key, time, headers } = m.headers;
292+
if (headers == null) {
293+
throw Error("headers must be set");
294+
}
295+
const { seq } = headers;
292296
if (typeof seq != "number") {
293297
throw Error("seq must be a number");
294298
}
@@ -555,12 +559,27 @@ export class CoreStream<T = any> extends EventEmitter {
555559
if (this.storage == null) {
556560
throw Error("bug -- storage must be set");
557561
}
558-
return await persistClient.set({
562+
if (options?.msgID && this.msgIDs.has(options.msgID)) {
563+
// it's a dup
564+
return;
565+
}
566+
const x = await persistClient.set({
559567
user: this.user,
560568
storage: this.storage,
561569
key: options?.key,
562-
messageData: messageData(mesg, { headers: options?.headers }),
570+
messageData: messageData(mesg, {
571+
headers: {
572+
...options?.headers,
573+
...(options?.msgID
574+
? { [COCALC_MESSAGE_ID_HEADER]: options?.msgID }
575+
: undefined),
576+
},
577+
}),
563578
});
579+
if (options?.msgID) {
580+
this.msgIDs.add(options.msgID);
581+
}
582+
return x;
564583
} else if (this.leader) {
565584
// sending from leader -- so assign seq, timestamp and send it out.
566585
return await this.sendAsLeader(data, options);

src/packages/conat/sync/dstream.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import { map as awaitMap } from "awaiting";
2929
import { isNumericString } from "@cocalc/util/misc";
3030
import refCache from "@cocalc/util/refcache";
3131
import { encodeBase64 } from "@cocalc/conat/util";
32-
import { COCALC_MESSAGE_ID_HEADER } from "./core-stream";
3332
import type { Client, Headers } from "@cocalc/conat/core/client";
3433
import jsonStableStringify from "json-stable-stringify";
3534
import type { JSONValue } from "@cocalc/util/types";
3635
import { type ValueType } from "@cocalc/conat/types";
36+
import { COCALC_MESSAGE_ID_HEADER } from "./core-stream";
3737

3838
const MAX_PARALLEL = 50;
3939

@@ -237,9 +237,9 @@ export class DStream<T = any> extends EventEmitter {
237237

238238
publish = (
239239
mesg: T,
240-
// NOTE: if you call this.headers(n) it is NOT visible until the publish is confirmed.
241-
// This could be changed with more work if it matters.
242-
options?: { headers?: { [key: string]: string } },
240+
// NOTE: if you call this.headers(n) it is NOT visible until
241+
// the publish is confirmed. This could be changed with more work if it matters.
242+
options?: { headers?: Headers },
243243
): void => {
244244
const id = randomId();
245245
this.local[id] = mesg;
@@ -312,6 +312,9 @@ export class DStream<T = any> extends EventEmitter {
312312
...this.publishOptions[id],
313313
msgID: id,
314314
});
315+
if (this.raw == null) {
316+
return;
317+
}
315318
if ((last(this.raw[this.raw.length - 1])?.seq ?? -1) < seq) {
316319
// it still isn't in this.raw
317320
this.saved[seq] = mesg;

0 commit comments

Comments
 (0)