Skip to content

Commit 489dfdb

Browse files
committed
feat: add configurable timeouts for actor lifecycle hooks (#885)
1 parent 45dca0c commit 489dfdb

File tree

18 files changed

+1008
-88
lines changed

18 files changed

+1008
-88
lines changed

.github/workflows/test.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
name: 'Test'
44
on:
55
pull_request:
6+
paths:
7+
- 'packages/**'
8+
- 'clients/**'
9+
- 'examples/**'
10+
- '.github/workflows/**'
11+
- 'package.json'
12+
- 'yarn.lock'
13+
- 'tsconfig*.json'
14+
- 'turbo.json'
15+
- 'tsup.base.ts'
16+
- 'biome.json'
17+
- 'Cargo.toml'
618
workflow_dispatch:
719

820
jobs:

packages/actor-core/package.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22
"name": "actor-core",
33
"version": "0.7.9",
44
"license": "Apache-2.0",
5-
"files": ["dist", "src", "deno.json", "bun.json", "package.json"],
5+
"files": [
6+
"dist",
7+
"src",
8+
"deno.json",
9+
"bun.json",
10+
"package.json"
11+
],
612
"type": "module",
713
"bin": "./dist/cli/mod.cjs",
814
"exports": {

packages/actor-core/src/actor/config.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export const ActorConfigSchema = z
2626
createVars: z.function().optional(),
2727
options: z
2828
.object({
29+
lifecycle: z
30+
.object({
31+
createVarsTimeout: z.number().positive().default(5000),
32+
createConnStateTimeout: z.number().positive().default(5000),
33+
onConnectTimeout: z.number().positive().default(5000),
34+
})
35+
.strict()
36+
.default({}),
2937
state: z
3038
.object({
3139
saveInterval: z.number().positive().default(10_000),
@@ -81,7 +89,11 @@ export interface OnConnectOptions<CP> {
8189
// This must have only one or the other or else S will not be able to be inferred
8290
type CreateState<S, CP, CS, V> =
8391
| { state: S }
84-
| { createState: (c: ActorContext<undefined, undefined, undefined, undefined>) => S | Promise<S> }
92+
| {
93+
createState: (
94+
c: ActorContext<undefined, undefined, undefined, undefined>,
95+
) => S | Promise<S>;
96+
}
8597
| Record<never, never>;
8698

8799
// Creates connection state config
@@ -114,7 +126,10 @@ type CreateVars<S, CP, CS, V> =
114126
/**
115127
* @experimental
116128
*/
117-
createVars: (c: ActorContext<undefined, undefined, undefined, undefined>, driverCtx: unknown) => V | Promise<V>;
129+
createVars: (
130+
c: ActorContext<undefined, undefined, undefined, undefined>,
131+
driverCtx: unknown,
132+
) => V | Promise<V>;
118133
}
119134
| Record<never, never>;
120135

packages/actor-core/src/actor/instance.ts

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@ import * as errors from "./errors";
1313
import { processMessage } from "./protocol/message/mod";
1414
import { instanceLogger, logger } from "./log";
1515
import type { ActionContext } from "./action";
16-
import { Lock, deadline } from "./utils";
16+
import { DeadlineError, Lock, deadline } from "./utils";
1717
import { Schedule } from "./schedule";
1818
import type * as wsToServer from "@/actor/protocol/message/to-server";
1919
import { CachedSerializer } from "./protocol/serde";
2020
import { ActorInspector } from "@/inspector/actor";
2121
import { ActorContext } from "./context";
2222
import invariant from "invariant";
23-
import type { PersistedActor, PersistedConn, PersistedScheduleEvents } from "./persisted";
23+
import type {
24+
PersistedActor,
25+
PersistedConn,
26+
PersistedScheduleEvents,
27+
} from "./persisted";
2428

2529
/**
2630
* Options for the `_saveState` method.
@@ -160,9 +164,6 @@ export class ActorInstance<S, CP, CS, V> {
160164

161165
// TODO: Exit process if this errors
162166
if (this.#varsEnabled) {
163-
// TODO: Move to config
164-
const CREATE_VARS_TIMEOUT = 5000; // 5 seconds
165-
166167
let vars: V | undefined = undefined;
167168
if ("createVars" in this.#config) {
168169
const dataOrPromise = this.#config.createVars(
@@ -175,7 +176,10 @@ export class ActorInstance<S, CP, CS, V> {
175176
this.#actorDriver.getContext(this.#actorId),
176177
);
177178
if (dataOrPromise instanceof Promise) {
178-
vars = await deadline(dataOrPromise, CREATE_VARS_TIMEOUT);
179+
vars = await deadline(
180+
dataOrPromise,
181+
this.#config.options.lifecycle.createVarsTimeout,
182+
);
179183
} else {
180184
vars = dataOrPromise;
181185
}
@@ -214,10 +218,10 @@ export class ActorInstance<S, CP, CS, V> {
214218
ar: args,
215219
};
216220

217-
this.actorContext.log.info("scheduling event", {
218-
event: eventId,
219-
timestamp,
220-
action: fn
221+
this.actorContext.log.info("scheduling event", {
222+
event: eventId,
223+
timestamp,
224+
action: fn,
221225
});
222226

223227
// Insert event in to index
@@ -239,7 +243,10 @@ export class ActorInstance<S, CP, CS, V> {
239243

240244
async onAlarm() {
241245
const now = Date.now();
242-
this.actorContext.log.debug("alarm triggered", { now, events: this.#persist.e.length });
246+
this.actorContext.log.debug("alarm triggered", {
247+
now,
248+
events: this.#persist.e.length,
249+
});
243250

244251
// Remove events from schedule that we're about to run
245252
const runIndex = this.#persist.e.findIndex((x) => x.t <= now);
@@ -248,7 +255,9 @@ export class ActorInstance<S, CP, CS, V> {
248255
return;
249256
}
250257
const scheduleEvents = this.#persist.e.splice(0, runIndex + 1);
251-
this.actorContext.log.debug("running events", { count: scheduleEvents.length });
258+
this.actorContext.log.debug("running events", {
259+
count: scheduleEvents.length,
260+
});
252261

253262
// Set alarm for next event
254263
if (this.#persist.e.length > 0) {
@@ -258,13 +267,13 @@ export class ActorInstance<S, CP, CS, V> {
258267
// Iterate by event key in order to ensure we call the events in order
259268
for (const event of scheduleEvents) {
260269
try {
261-
this.actorContext.log.info("running action for event", {
262-
event: event.e,
263-
timestamp: event.t,
264-
action: event.a,
265-
args: event.ar
270+
this.actorContext.log.info("running action for event", {
271+
event: event.e,
272+
timestamp: event.t,
273+
action: event.a,
274+
args: event.ar,
266275
});
267-
276+
268277
// Look up function
269278
const fn: unknown = this.#config.actions[event.a];
270279
if (!fn) throw new Error(`Missing action for alarm ${event.a}`);
@@ -586,7 +595,6 @@ export class ActorInstance<S, CP, CS, V> {
586595
): Promise<CS> {
587596
// Authenticate connection
588597
let connState: CS | undefined = undefined;
589-
const PREPARE_CONNECT_TIMEOUT = 5000; // 5 seconds
590598

591599
const onBeforeConnectOpts = {
592600
request,
@@ -612,7 +620,10 @@ export class ActorInstance<S, CP, CS, V> {
612620
onBeforeConnectOpts,
613621
);
614622
if (dataOrPromise instanceof Promise) {
615-
connState = await deadline(dataOrPromise, PREPARE_CONNECT_TIMEOUT);
623+
connState = await deadline(
624+
dataOrPromise,
625+
this.#config.options.lifecycle.createConnStateTimeout,
626+
);
616627
} else {
617628
connState = dataOrPromise;
618629
}
@@ -676,12 +687,14 @@ export class ActorInstance<S, CP, CS, V> {
676687
this.inspector.onConnChange(this.#connections);
677688

678689
// Handle connection
679-
const CONNECT_TIMEOUT = 5000; // 5 seconds
680690
if (this.#config.onConnect) {
681691
try {
682692
const result = this.#config.onConnect(this.actorContext, conn);
683693
if (result instanceof Promise) {
684-
deadline(result, CONNECT_TIMEOUT).catch((error) => {
694+
deadline(
695+
result,
696+
this.#config.options.lifecycle.onConnectTimeout,
697+
).catch((error) => {
685698
logger().error("error in `onConnect`, closing socket", {
686699
error,
687700
});
@@ -842,13 +855,22 @@ export class ActorInstance<S, CP, CS, V> {
842855
// TODO: Manually call abortable for better error handling
843856
// Call the function on this object with those arguments
844857
try {
858+
// Log when we start executing the action
859+
logger().debug("executing action", { actionName: rpcName, args });
860+
845861
const outputOrPromise = rpcFunction.call(undefined, ctx, ...args);
846862
let output: unknown;
847863
if (outputOrPromise instanceof Promise) {
864+
// Log that we're waiting for an async action
865+
logger().debug("awaiting async action", { actionName: rpcName });
866+
848867
output = await deadline(
849868
outputOrPromise,
850869
this.#config.options.action.timeout,
851870
);
871+
872+
// Log that async action completed
873+
logger().debug("async action completed", { actionName: rpcName });
852874
} else {
853875
output = outputOrPromise;
854876
}
@@ -863,7 +885,13 @@ export class ActorInstance<S, CP, CS, V> {
863885
output,
864886
);
865887
if (processedOutput instanceof Promise) {
888+
logger().debug("awaiting onBeforeActionResponse", {
889+
actionName: rpcName,
890+
});
866891
output = await processedOutput;
892+
logger().debug("onBeforeActionResponse completed", {
893+
actionName: rpcName,
894+
});
867895
} else {
868896
output = processedOutput;
869897
}
@@ -874,11 +902,22 @@ export class ActorInstance<S, CP, CS, V> {
874902
}
875903
}
876904

905+
// Log the output before returning
906+
logger().debug("action completed", {
907+
actionName: rpcName,
908+
outputType: typeof output,
909+
isPromise: output instanceof Promise,
910+
});
911+
877912
return output;
878913
} catch (error) {
879-
if (error instanceof DOMException && error.name === "TimeoutError") {
914+
if (error instanceof DeadlineError) {
880915
throw new errors.ActionTimedOut();
881916
}
917+
logger().error("action error", {
918+
actionName: rpcName,
919+
error: stringifyError(error),
920+
});
882921
throw error;
883922
} finally {
884923
this.#savePersistThrottled();

packages/actor-core/src/actor/protocol/message/mod.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,22 @@ export async function processMessage<S, CP, CS, V>(
101101
rpcId = id;
102102
rpcName = name;
103103

104+
logger().debug("processing RPC request", { id, name, argsCount: args.length });
105+
104106
const ctx = new ActionContext<S, CP, CS, V>(actor.actorContext, conn);
107+
108+
// Process the RPC request and wait for the result
109+
// This will wait for async actions to complete
105110
const output = await handler.onExecuteRpc(ctx, name, args);
106-
111+
112+
logger().debug("sending RPC response", {
113+
id,
114+
name,
115+
outputType: typeof output,
116+
isPromise: output instanceof Promise
117+
});
118+
119+
// Send the response back to the client
107120
conn._sendMessage(
108121
new CachedSerializer<wsToClient.ToClient>({
109122
b: {
@@ -114,6 +127,8 @@ export async function processMessage<S, CP, CS, V>(
114127
},
115128
}),
116129
);
130+
131+
logger().debug("RPC response sent", { id, name });
117132
} else if ("sr" in message.b) {
118133
// Subscription request
119134

@@ -125,12 +140,15 @@ export async function processMessage<S, CP, CS, V>(
125140
}
126141

127142
const { e: eventName, s: subscribe } = message.b.sr;
143+
logger().debug("processing subscription request", { eventName, subscribe });
128144

129145
if (subscribe) {
130146
await handler.onSubscribe(eventName, conn);
131147
} else {
132148
await handler.onUnsubscribe(eventName, conn);
133149
}
150+
151+
logger().debug("subscription request completed", { eventName, subscribe });
134152
} else {
135153
assertUnreachable(message.b);
136154
}
@@ -141,6 +159,13 @@ export async function processMessage<S, CP, CS, V>(
141159
rpcName,
142160
});
143161

162+
logger().debug("sending error response", {
163+
rpcId,
164+
rpcName,
165+
code,
166+
message
167+
});
168+
144169
// Build response
145170
if (rpcId !== undefined) {
146171
conn._sendMessage(
@@ -168,5 +193,7 @@ export async function processMessage<S, CP, CS, V>(
168193
}),
169194
);
170195
}
196+
197+
logger().debug("error response sent", { rpcId, rpcName });
171198
}
172199
}

packages/actor-core/src/actor/utils.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ export const throttle = <
3333
};
3434
};
3535

36+
export class DeadlineError extends Error {
37+
constructor() {
38+
super("Promise did not complete before deadline.")
39+
}
40+
}
41+
3642
export function deadline<T>(promise: Promise<T>, timeout: number): Promise<T> {
3743
const controller = new AbortController();
3844
const signal = controller.signal;
@@ -44,7 +50,7 @@ export function deadline<T>(promise: Promise<T>, timeout: number): Promise<T> {
4450
promise,
4551
new Promise<T>((_, reject) => {
4652
signal.addEventListener("abort", () =>
47-
reject(new Error("Operation timed out")),
53+
reject(new DeadlineError()),
4854
);
4955
}),
5056
]).finally(() => {

0 commit comments

Comments
 (0)