Skip to content

Commit 6295b00

Browse files
committed
conat: more service unit tests
1 parent b5fea3c commit 6295b00

File tree

3 files changed

+145
-29
lines changed

3 files changed

+145
-29
lines changed

src/packages/backend/conat/test/service.test.ts

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,24 @@ import {
1212
createServiceHandler,
1313
} from "@cocalc/conat/service/typed";
1414
import { once } from "@cocalc/util/async-utils";
15-
import { before, after } from "@cocalc/backend/conat/test/setup";
15+
import { before, after, connect } from "@cocalc/backend/conat/test/setup";
1616
import { wait } from "@cocalc/backend/conat/test/util";
1717
import { is_date as isDate } from "@cocalc/util/misc";
1818
import { delay } from "awaiting";
19+
import { initConatServer } from "@cocalc/backend/conat/test/setup";
20+
import { getPort } from "@cocalc/backend/conat/test/util";
1921

2022
beforeAll(before);
2123

2224
describe("create a service and test it out", () => {
2325
let s;
26+
let subject;
2427
it("creates a service", async () => {
2528
s = createConatService({
2629
service: "echo",
2730
handler: (mesg) => mesg.repeat(2),
2831
});
32+
subject = s.subject;
2933
await once(s, "running");
3034
expect(await callConatService({ service: "echo", mesg: "hello" })).toBe(
3135
"hellohello",
@@ -35,9 +39,19 @@ describe("create a service and test it out", () => {
3539
it("closes the services and observes it doesn't work anymore", async () => {
3640
s.close();
3741
await expect(async () => {
38-
await callConatService({ service: "echo", mesg: "hi", timeout: 1000 });
42+
await callConatService({ service: "echo", mesg: "hi", timeout: 250 });
3943
}).rejects.toThrowError("timeout");
4044
});
45+
46+
// [ ] TODO: broken!
47+
it.skip("creates a listener on the same subject and try to call to verify timeout works", async () => {
48+
const client = connect();
49+
const sub = await client.subscribe(subject);
50+
await expect(async () => {
51+
await callConatService({ service: "echo", mesg: "hi", timeout: 250 });
52+
}).rejects.toThrowError("timeout");
53+
sub.close();
54+
});
4155
});
4256

4357
describe("verify that you can create a service AFTER calling it and things to still work fine", () => {
@@ -118,4 +132,103 @@ describe("create and test a more complicated service", () => {
118132
});
119133
});
120134

135+
describe("create a service with specified client, stop and start the server, and see service still works", () => {
136+
let server;
137+
let client;
138+
let client2;
139+
let port;
140+
it("create a conat server and client", async () => {
141+
port = await getPort();
142+
server = await initConatServer({ port });
143+
client = server.client({ reconnectionDelay: 50 });
144+
client2 = server.client({ reconnectionDelay: 50 });
145+
});
146+
147+
let service;
148+
it("create a service using specific client and call it using both clients", async () => {
149+
service = createConatService({
150+
client,
151+
service: "double",
152+
handler: (mesg) => mesg.repeat(2),
153+
});
154+
155+
expect(
156+
await callConatService({ client, service: "double", mesg: "hello" }),
157+
).toBe("hellohello");
158+
159+
expect(
160+
await callConatService({
161+
client: client2,
162+
service: "double",
163+
mesg: "hello",
164+
}),
165+
).toBe("hellohello");
166+
});
167+
168+
it("disconnect client and check service still works on reconnect", async () => {
169+
// cause a disconnect -- client will connect again in 50ms soon
170+
// and handle the request below:
171+
client.conn.io.engine.close();
172+
expect(
173+
await callConatService({
174+
client: client2,
175+
service: "double",
176+
mesg: "hello",
177+
}),
178+
).toBe("hellohello");
179+
});
180+
181+
it("disconnect client2 and check service still works on reconnect", async () => {
182+
// cause a disconnect -- client will connect again in 50ms soon
183+
// and handle the request below:
184+
client2.conn.io.engine.close();
185+
expect(
186+
await callConatService({
187+
client: client2,
188+
service: "double",
189+
mesg: "hello",
190+
}),
191+
).toBe("hellohello");
192+
});
193+
194+
it("disconnect both clients and check service still works on reconnect", async () => {
195+
// cause a disconnect -- client will connect again in 50ms soon
196+
// and handle the request below:
197+
client.conn.io.engine.close();
198+
client2.conn.io.engine.close();
199+
expect(
200+
await callConatService({
201+
client: client2,
202+
service: "double",
203+
mesg: "hello",
204+
}),
205+
).toBe("hellohello");
206+
});
207+
208+
it("kills the server, then makes another one serving on the same port", async () => {
209+
await server.close();
210+
server = await initConatServer({ port });
211+
// Killing the server is not at all a normal thing to expect, and causes loss of
212+
// its state. The clients have to sync realize subscriptions are missing. This
213+
// takes a fraction of a second and the call below won't immediately work without
214+
// a short delay, unfortunately. TODO: should we handle this better?
215+
await delay(100);
216+
expect(
217+
await callConatService({
218+
client: client2,
219+
service: "double",
220+
mesg: "hello",
221+
noRetry: true,
222+
}),
223+
).toBe("hellohello");
224+
});
225+
226+
it("cleans up", () => {
227+
service.close();
228+
client.close();
229+
client2.close();
230+
server.close();
231+
});
232+
});
233+
121234
afterAll(after);

src/packages/conat/core/server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ export class ConatServer {
159159
}
160160
};
161161

162-
close = () => {
163-
this.io.close();
162+
close = async () => {
163+
await this.io.close();
164164
};
165165

166166
private info = (): ServerInfo => {

src/packages/conat/service/service.ts

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { randomId } from "@cocalc/conat/names";
1717
import { delay } from "awaiting";
1818
import { EventEmitter } from "events";
1919
import { encodeBase64 } from "@cocalc/conat/util";
20+
import { type Client } from "@cocalc/conat/core/client";
2021

2122
const DEFAULT_TIMEOUT = 10 * 1000;
2223

@@ -40,16 +41,17 @@ export interface ServiceCall extends ServiceDescription {
4041
mesg: any;
4142
timeout?: number;
4243

43-
// if it fails with NatsError, we wait for service to be ready and try again,
44+
// if it fails with error.code 503, we wait for service to be ready and try again,
4445
// unless this is set -- e.g., when waiting for the service in the first
4546
// place we set this to avoid an infinite loop.
4647
noRetry?: boolean;
48+
49+
client?: Client;
4750
}
4851

4952
export async function callConatService(opts: ServiceCall): Promise<any> {
5053
// console.log("callConatService", opts);
51-
const env = await getEnv();
52-
const { cn } = env;
54+
const cn = opts.client ?? (await getEnv()).cn;
5355
const subject = serviceSubject(opts);
5456
let resp;
5557
const timeout = opts.timeout ?? DEFAULT_TIMEOUT;
@@ -73,28 +75,28 @@ export async function callConatService(opts: ServiceCall): Promise<any> {
7375
} catch (err) {
7476
// console.log(`request to '${subject}' failed -- ${err}`);
7577
// it failed.
76-
if (!opts.noRetry) {
77-
// it's a nats problem
78-
const p = opts.path ? `${trunc_middle(opts.path, 64)}:` : "";
79-
if (err.code == 503) {
80-
// it's actually just not ready, so
81-
// wait for the service to be ready, then try again
82-
await waitForConatService({ options: opts, maxWait: timeout });
83-
try {
84-
return await doRequest();
85-
} catch (err) {
86-
if (err.code == 503) {
87-
err.message = `Not Available: service ${p}${opts.service} is not available`;
88-
}
89-
throw err;
78+
if (opts.noRetry) {
79+
throw err;
80+
}
81+
// it's a nats problem
82+
const p = opts.path ? `${trunc_middle(opts.path, 64)}:` : "";
83+
if (err.code == 503) {
84+
// it's actually just not ready, so
85+
// wait for the service to be ready, then try again
86+
await waitForConatService({ options: opts, maxWait: timeout });
87+
try {
88+
return await doRequest();
89+
} catch (err) {
90+
if (err.code == 503) {
91+
err.message = `Not Available: service ${p}${opts.service} is not available`;
9092
}
91-
} else if (err.code == "TIMEOUT") {
92-
throw Error(
93-
`Timeout: service ${p}${opts.service} did not respond for ${Math.round(timeout / 1000)} seconds`,
94-
);
93+
throw err;
9594
}
95+
} else if (err.code == "TIMEOUT") {
96+
throw Error(
97+
`Timeout: service ${p}${opts.service} did not respond for ${Math.round(timeout / 1000)} seconds`,
98+
);
9699
}
97-
throw err;
98100
}
99101
}
100102

@@ -104,6 +106,7 @@ export interface Options extends ServiceDescription {
104106
description?: string;
105107
version?: string;
106108
handler: (mesg) => Promise<any>;
109+
client?: Client;
107110
}
108111

109112
export function createConatService(options: Options) {
@@ -182,9 +185,9 @@ export function serviceDescription({
182185

183186
export class ConatService extends EventEmitter {
184187
private options: Options;
185-
private subject: string;
188+
public readonly subject: string;
189+
public readonly name: string;
186190
private sub?;
187-
private name: string;
188191

189192
constructor(options: Options) {
190193
super();
@@ -207,7 +210,7 @@ export class ConatService extends EventEmitter {
207210
description: this.options.description,
208211
version: this.options.version,
209212
});
210-
const { cn } = await getEnv();
213+
const cn = this.options.client ?? (await getEnv()).cn;
211214
const queue = this.options.all ? randomId() : "0";
212215
this.sub = await cn.subscribe(this.subject, { queue });
213216
this.emit("running");

0 commit comments

Comments
 (0)