Skip to content

Commit 1492393

Browse files
committed
conat dstream: fix some bugs and also start getting testing framework for persistent working
1 parent 52016ff commit 1492393

File tree

9 files changed

+137
-116
lines changed

9 files changed

+137
-116
lines changed

src/packages/backend/conat/test/setup.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ import { Server } from "socket.io";
99
import getLogger from "@cocalc/backend/logger";
1010
import { setNatsClient } from "@cocalc/conat/client";
1111
import { sha1 } from "@cocalc/backend/sha1";
12+
import {
13+
initServer as initPersistServer,
14+
terminateServer as terminatePersistServer,
15+
} from "@cocalc/backend/conat/persist";
16+
import { syncFiles } from "@cocalc/conat/persist/context";
17+
import { mkdtemp, rm } from "node:fs/promises";
18+
import { tmpdir } from "node:os";
19+
import { join } from "path";
1220

1321
const logger = getLogger("conat:test:setup");
1422

@@ -33,10 +41,16 @@ export async function initConatServer(
3341
export let server;
3442
export let port;
3543
export let address;
44+
export let tempDir;
45+
3646
export async function before() {
47+
tempDir = await mkdtemp(join(tmpdir(), "conat-test"));
3748
port = await getPort();
3849
address = `http://localhost:${port}`;
3950
server = await initConatServer({ port, path });
51+
syncFiles.local = join(tempDir, "local");
52+
syncFiles.archive = join(tempDir, "archive");
53+
initPersistServer({ client: server.client() });
4054
setNatsClient({
4155
getNatsEnv: async () => {
4256
return { cn: connect(), sha1 } as any;
@@ -53,6 +67,8 @@ export function connect() {
5367
}
5468

5569
export async function after() {
70+
terminatePersistServer();
71+
await rm(tempDir, { force: true, recursive: true });
5672
await server.close();
5773
for (const cn of clients) {
5874
cn.close();

src/packages/backend/conat/test/sync/dstream.test.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
/*
2-
Testing basic ops with dsteam (distributed streams)
2+
Testing basic ops with *persistent* dstreams.
33
44
DEVELOPMENT:
55
6-
pnpm test dstream.test.ts
6+
pnpm test ./dstream.test.ts
77
88
*/
99

1010
import { createDstream as create } from "./util";
1111
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12-
import { once } from "@cocalc/util/async-utils";
1312

1413
describe("create a dstream and do some basic operations", () => {
1514
let s;
@@ -55,6 +54,7 @@ describe("create a dstream and do some basic operations", () => {
5554
});
5655
});
5756

57+
/*
5858
describe("create two dstreams and observe sync between them", () => {
5959
const name = `test-${Math.random()}`;
6060
let s1, s2;
@@ -155,7 +155,8 @@ describe("closing also saves by default, but not if autosave is off", () => {
155155
const name = `test-${Math.random()}`;
156156
157157
it("creates stream and write a message", async () => {
158-
s = await createDstream({ name, noAutosave: false /* the default */ });
158+
// noAutosave: false is the default:
159+
s = await createDstream({ name, noAutosave: false});
159160
s.push(389);
160161
});
161162
@@ -299,3 +300,4 @@ describe("ensure there are no NATS subscription leaks", () => {
299300
expect(Math.abs(after - before)).toBeLessThan(SLACK);
300301
});
301302
});
303+
*/

src/packages/conat/persist/server.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ Also getAll using start_seq:
4242

4343
import { pstream, type Message as StoredMessage } from "./storage";
4444
import { getEnv } from "@cocalc/conat/client";
45-
import { type Subscription } from "@cocalc/conat/core/client";
45+
import { type Client, type Subscription } from "@cocalc/conat/core/client";
4646
import { uuid } from "@cocalc/util/misc";
4747
import { getLogger } from "@cocalc/conat/client";
4848
import { delay } from "awaiting";
@@ -67,9 +67,7 @@ export const LAST_CHUNK = "last-chunk";
6767

6868
const logger = getLogger("persist:server");
6969

70-
export const SUBJECT = process.env.COCALC_TEST_MODE
71-
? "persist-test"
72-
: "persist";
70+
export const SUBJECT = "persist";
7371

7472
export type User = { account_id?: string; project_id?: string };
7573
export function persistSubject({ account_id, project_id }: User) {
@@ -108,7 +106,7 @@ function getUserId(subject: string): string {
108106

109107
let terminated = false;
110108
let sub: Subscription | null = null;
111-
export function init() {
109+
export async function init({ client }: { client?: Client } = {}) {
112110
logger.debug("starting persist server");
113111
logger.debug({
114112
DEFAULT_LIFETIME,
@@ -120,8 +118,9 @@ export function init() {
120118
MAX_PERSISTS_PER_SERVER,
121119
SUBJECT,
122120
});
123-
persistService();
124-
renewService();
121+
client = client ?? (await getEnv()).cn;
122+
persistService({ client });
123+
renewService({ client });
125124
}
126125

127126
async function noThrow(f) {
@@ -132,16 +131,14 @@ async function noThrow(f) {
132131
}
133132
}
134133

135-
async function persistService() {
136-
const { cn } = await getEnv();
137-
sub = await cn.subscribe(`${SUBJECT}.*.api`, { queue: "q" });
134+
async function persistService({ client }) {
135+
sub = await client.subscribe(`${SUBJECT}.*.api`, { queue: "q" });
138136
await listenPersist();
139137
}
140138

141139
let renew: Subscription | null = null;
142-
async function renewService() {
143-
const { cn } = await getEnv();
144-
renew = await cn.subscribe(`${SUBJECT}.*.renew`);
140+
async function renewService({ client }) {
141+
renew = await client.subscribe(`${SUBJECT}.*.renew`);
145142
await listenRenew();
146143
}
147144

@@ -383,16 +380,15 @@ async function getAll({ mesg, request, user_id, stream }) {
383380
}
384381

385382
try {
386-
console.log("send the id", { id, lifetime });
387383
await respond(undefined, { id, lifetime });
388384
if (done) {
389385
return;
390386
}
391387

392388
// send the current data
393389
// [ ] TODO: should we just send it all as a single message?
394-
// much faster, but uses much more RAM. Maybe some
395-
// combination based on actual data!
390+
// much faster, but uses much more RAM. **Instead, obviously
391+
// some combination based on actual data!**
396392
for (const message of stream.getAll({ start_seq: request.start_seq })) {
397393
if (done) {
398394
return;

src/packages/conat/persist/storage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export interface Message {
7474
}
7575

7676
export interface Options {
77-
// relative path to sqlite database file. This needs to be a valid filename
77+
// absolute path to sqlite database file. This needs to be a valid filename
7878
// path, and must also be kept under 1K so it can be stored in cloud storage.
7979
path: string;
8080
// if false (the default) do not require sync writes to disk on every set

src/packages/conat/sync/core-stream.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ export class CoreStream<T = any> extends EventEmitter {
259259
storage: this.storage,
260260
start_seq,
261261
});
262-
console.log("getAll got ", { id });
262+
console.log("got persistent stream", { id });
263263
this.persistStream = stream;
264264
while (true) {
265265
const { value, done } = await stream.next();
@@ -388,17 +388,24 @@ export class CoreStream<T = any> extends EventEmitter {
388388
private serveUntilDone = async (sub) => {
389389
for await (const raw of sub) {
390390
if (raw.subject.endsWith(".all")) {
391+
// batch get
392+
391393
const { start_seq = 0 } = raw.data ?? {};
392394

395+
// put exactly the entire data the client needs to get updated
396+
// into a single payload
393397
const payload = this.raw
394398
.filter((x) => x[0].seq >= start_seq)
395399
.map((x) => {
396400
const { headers, encoding, raw } = x[0];
397401
return { headers, encoding, raw };
398402
});
399403

404+
// send it out as a single response.
400405
raw.respond(payload);
401406
} else if (raw.subject.endsWith(".send")) {
407+
// single send: ([ ] TODO need to support a batch send)
408+
402409
const options = raw.headers?.[COCALC_OPTIONS_HEADER];
403410
let resp;
404411
try {
@@ -445,12 +452,7 @@ export class CoreStream<T = any> extends EventEmitter {
445452
return;
446453
}
447454
if (this.persist) {
448-
if (this.persistStream == null) {
449-
throw Error("persistentStream must be defined");
450-
}
451-
for await (const m of this.persistStream) {
452-
this.processPersistentMessage(m, false);
453-
}
455+
this.listenLoopPersist();
454456
return;
455457
} else {
456458
this.sub = await this.client.subscribe(this.subject);
@@ -459,6 +461,19 @@ export class CoreStream<T = any> extends EventEmitter {
459461
this.enforceLimits();
460462
};
461463

464+
private listenLoopPersist = async () => {
465+
if (this.persistStream == null) {
466+
throw Error("persistentStream must be defined");
467+
}
468+
for await (const m of this.persistStream) {
469+
try {
470+
this.processPersistentMessage(m, false);
471+
} catch (err) {
472+
console.log(`WARNING: issue processing persistent message -- ${err}`);
473+
}
474+
}
475+
};
476+
462477
private listenLoop = async () => {
463478
if (this.sub == null) {
464479
throw Error("subscription must be setup");

0 commit comments

Comments
 (0)