Skip to content

Commit 83df56a

Browse files
committed
nats file read/write servers -- keep them running
- I really need to create a new NATS client that wraps ALL subscriptions with this sort of logic and has better semantics...
1 parent ceeca1e commit 83df56a

File tree

3 files changed

+39
-8
lines changed

3 files changed

+39
-8
lines changed

src/packages/nats/files/read.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-
4141
import { getEnv } from "@cocalc/nats/client";
4242
import { projectSubject } from "@cocalc/nats/names";
4343
import { Empty, headers, type Subscription } from "@nats-io/nats-core";
44+
import { runLoop } from "./util";
4445

4546
let subs: { [name: string]: Subscription } = {};
4647
export async function close({ project_id, compute_server_id, name = "" }) {
@@ -76,13 +77,16 @@ export async function createServer({
7677
return;
7778
}
7879
const { nc } = await getEnv();
79-
// console.log(subject);
80-
const sub = nc.subscribe(subject);
81-
subs[subject] = sub;
82-
listen(sub, createReadStream);
80+
runLoop({
81+
listen,
82+
subs,
83+
subject,
84+
nc,
85+
opts: { createReadStream },
86+
});
8387
}
8488

85-
async function listen(sub, createReadStream) {
89+
async function listen({ sub, createReadStream }) {
8690
// NOTE: we just handle as many messages as we get in parallel, so this
8791
// could be a large number of simultaneous downloads. These are all by
8892
// authenticated users of the project, and the load is on the project,

src/packages/nats/files/util.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { delay } from "awaiting";
2+
import { waitUntilConnected } from "@cocalc/nats/util";
3+
import { getLogger } from "@cocalc/nats/client";
4+
5+
const logger = getLogger("files:util");
6+
7+
export async function runLoop({ subs, listen, opts, subject, nc }) {
8+
while (true) {
9+
const sub = nc.subscribe(subject);
10+
subs[subject] = sub;
11+
try {
12+
await listen({ ...opts, sub });
13+
} catch (err) {
14+
logger.debug(`runLoop: error - ${err}`);
15+
}
16+
if (subs[subject] == null) return;
17+
await delay(3000 + Math.random());
18+
await waitUntilConnected();
19+
if (subs[subject] == null) return;
20+
logger.debug(`runLoop: will restart`);
21+
}
22+
}

src/packages/nats/files/write.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import {
7373
import { projectSubject } from "@cocalc/nats/names";
7474
import { type Subscription } from "@nats-io/nats-core";
7575
import { type Readable } from "node:stream";
76+
import { runLoop } from "./util";
7677

7778
function getWriteSubject({ project_id, compute_server_id }) {
7879
return projectSubject({
@@ -111,9 +112,13 @@ export async function createServer({
111112
return;
112113
}
113114
const { nc } = await getEnv();
114-
sub = nc.subscribe(subject);
115-
subs[subject] = sub;
116-
listen({ sub, createWriteStream, project_id, compute_server_id });
115+
runLoop({
116+
listen,
117+
subs,
118+
subject,
119+
nc,
120+
opts: { createWriteStream, project_id, compute_server_id },
121+
});
117122
}
118123

119124
async function listen({

0 commit comments

Comments
 (0)