Skip to content

Commit 7c0ff55

Browse files
committed
conat: migrating time test; nailing down order of subscription startup
1 parent d5024e9 commit 7c0ff55

File tree

7 files changed

+74
-41
lines changed

7 files changed

+74
-41
lines changed

src/packages/backend/conat/test/core/connect.test.ts

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
/*
2+
pnpm test ./connect.test.ts
3+
*/
4+
15
import { getPort } from "@cocalc/backend/conat/test/util";
26
import { initConatServer } from "@cocalc/backend/conat/test/setup";
37
import { connect } from "@cocalc/backend/conat/conat";
@@ -50,7 +54,7 @@ describe("create server *after* client and ensure connects properly", () => {
5054
});
5155
});
5256

53-
describe("create server after creating a subscription and publishing a message, and observe that it works and nothing is lost", () => {
57+
describe("create server after sync creating a subscription and publishing a message, and observe that messages are dropped", () => {
5458
let cn;
5559
it("starts a client, despite there being no server yet", async () => {
5660
cn = connect(`http://localhost:${port}`, { path });
@@ -71,17 +75,53 @@ describe("create server after creating a subscription and publishing a message,
7175
await wait({ until: () => cn.conn.connected });
7276
});
7377

74-
it("see both messages we sent before connecting arrive", async () => {
75-
const { value: mesg1 } = await sub.next();
76-
expect(mesg1.data).toBe("hello");
77-
const { value: mesg2 } = await sub.next();
78-
expect(mesg2.data).toBe("conat");
79-
});
80-
81-
it("publish another message", async () => {
78+
it("see that both messages we sent before connecting were dropped", async () => {
8279
const { bytes, count } = await cn.publish("xyz", "more");
8380
expect(count).toBe(1);
8481
expect(bytes).toBe(5);
82+
const { value: mesg1 } = await sub.next();
83+
// we just got a message but it's AFTER the two above.
84+
expect(mesg1.data).toBe("more");
85+
});
86+
87+
it("clean up", () => {
88+
server.close();
89+
cn.close();
90+
sub.close();
91+
});
92+
});
93+
94+
describe("create server after async creating a subscription and async publishing a message, and observe that it DOES works", () => {
95+
let cn;
96+
it("starts a client, despite there being no server yet", async () => {
97+
cn = connect(`http://localhost:${port}`, { path });
98+
expect(cn.conn.connected).toBe(false);
99+
});
100+
101+
let sub;
102+
let recv: any[] = [];
103+
it("create a sync subscription before the server exists", () => {
104+
const f = async () => {
105+
sub = await cn.subscribe("xyz");
106+
await cn.publish("xyz", "hello");
107+
const { value: mesg } = await sub.next();
108+
recv.push(mesg.data);
109+
await cn.publish("xyz", "conat");
110+
const { value: mesg2 } = await sub.next();
111+
recv.push(mesg2.data);
112+
};
113+
f();
114+
});
115+
116+
let server;
117+
it("start the server", async () => {
118+
server = await initConatServer({ port, path });
119+
await wait({ until: () => cn.conn.connected });
120+
});
121+
122+
it("see that both messages we sent before connecting arrive", async () => {
123+
await wait({ until: () => recv.length == 2 });
124+
expect(recv).toEqual(["hello", "conat"]);
85125
});
86126

87127
it("clean up", () => {

src/packages/backend/conat/test/setup.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
} from "@cocalc/conat/core/server";
88
import { Server } from "socket.io";
99
import getLogger from "@cocalc/backend/logger";
10+
import { setNatsClient } from "@cocalc/conat/client";
11+
import { sha1 } from "@cocalc/backend/sha1";
1012

1113
const logger = getLogger("conat:test:setup");
1214

@@ -35,6 +37,12 @@ export async function before() {
3537
port = await getPort();
3638
address = `http://localhost:${port}`;
3739
server = await initConatServer({ port, path });
40+
setNatsClient({
41+
getNatsEnv: async () => {
42+
return { cn: connect(), sha1 } as any;
43+
},
44+
getLogger,
45+
});
3846
}
3947

4048
const clients: Client[] = [];

src/packages/backend/conat/test/time.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ DEVELOPMENT:
44
pnpm test ./time.test.ts
55
*/
66

7-
// this sets client
8-
import "@cocalc/backend/conat";
9-
7+
import { timeClient, createTimeService } from "@cocalc/conat/service/time";
108
import time, { getSkew } from "@cocalc/conat/time";
9+
import { before, after } from "@cocalc/backend/conat/test/setup";
10+
11+
beforeAll(before);
1112

1213
describe("get time from nats", () => {
1314
it("tries to get the time before the skew, so it is not initialized yet", () => {
@@ -29,14 +30,13 @@ describe("get time from nats", () => {
2930
});
3031
});
3132

32-
import { timeClient, createTimeService } from "@cocalc/conat/service/time";
33-
3433
describe("start the time server and client and test that it works", () => {
3534
it("starts the time server and queries it", async () => {
36-
await import("@cocalc/backend/conat");
3735
createTimeService();
3836
const client = timeClient();
3937
const t = await client.time();
4038
expect(Math.abs(Date.now() - t)).toBeLessThan(200);
4139
});
4240
});
41+
42+
afterAll(after);

src/packages/conat/client.ts

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ export class ClientWithState extends EventEmitter {
6262
return this.env;
6363
}
6464
this.env = await client.getNatsEnv();
65-
this.monitorConnectionState(this.env.nc);
6665
return this.env;
6766
});
6867
this.account_id = client.account_id;
@@ -85,7 +84,7 @@ export class ClientWithState extends EventEmitter {
8584
};
8685

8786
close = () => {
88-
this.env?.nc.close();
87+
this.env?.nc?.close();
8988
this.setConnectionState("closed");
9089
this.removeAllListeners();
9190
delete this.env;
@@ -99,22 +98,6 @@ export class ClientWithState extends EventEmitter {
9998
this.emit(state);
10099
this.emit("state", state);
101100
};
102-
103-
private monitorConnectionState = async (nc) => {
104-
this.setConnectionState("connected");
105-
106-
for await (const { type } of nc.status()) {
107-
if (this.state == "closed") {
108-
return;
109-
}
110-
if (type.includes("ping") || type == "update" || type == "reconnect") {
111-
// connection is working well
112-
this.setConnectionState("connected");
113-
} else if (type == "reconnecting") {
114-
this.setConnectionState("connecting");
115-
}
116-
}
117-
};
118101
}
119102

120103
// do NOT do this until some explicit use of nats is initiated, since we shouldn't

src/packages/conat/core/client.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,11 @@ export class Client {
420420
throw Error(`already subscribed to '${subject}'`);
421421
}
422422
this.queueGroups[subject] = queue;
423+
const sub = new SubscriptionEmitter({
424+
client: this,
425+
subject,
426+
closeWhenOffCalled,
427+
});
423428
let promise;
424429
if (confirm) {
425430
const f = (cb) => {
@@ -436,11 +441,6 @@ export class Client {
436441
this.conn.emit("subscribe", { subject, queue });
437442
promise = undefined;
438443
}
439-
const sub = new SubscriptionEmitter({
440-
client: this,
441-
subject,
442-
closeWhenOffCalled,
443-
});
444444
sub.once("close", () => {
445445
if (this.queueGroups?.[subject] == null) {
446446
return;

src/packages/conat/core/server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,9 @@ export class ConatServer {
289289
});
290290
}
291291
const room = socketSubjectRoom({ socket, subject });
292-
socket.join(room);
292+
// critical to await socket.join so we don't advertise that there is
293+
// a subscriber before the socket is actually getting messages.
294+
await socket.join(room);
293295
await this.updateInterest({ op: "add", subject, room, queue });
294296
};
295297

src/packages/conat/service/service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ export async function callNatsService(opts: ServiceCall): Promise<any> {
7171
try {
7272
return await doRequest();
7373
} catch (err) {
74-
//console.log(`request to '${subject}' failed -- ${err}`);
74+
// console.log(`request to '${subject}' failed -- ${err}`);
7575
// it failed.
76-
if (err.name == "NatsError" && !opts.noRetry) {
76+
if (!opts.noRetry) {
7777
// it's a nats problem
7878
const p = opts.path ? `${trunc_middle(opts.path, 64)}:` : "";
7979
if (err.code == 503) {

0 commit comments

Comments
 (0)