Skip to content

Commit 4bb68c3

Browse files
committed
fix(core): fix sse support
1 parent 0a39232 commit 4bb68c3

File tree

9 files changed

+76
-181
lines changed

9 files changed

+76
-181
lines changed

packages/actor-core/src/actor/protocol/message/mod.ts

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,10 @@ function getValueLength(value: InputData): number {
3535
return value.size;
3636
} else if (
3737
value instanceof ArrayBuffer ||
38-
value instanceof SharedArrayBuffer
38+
value instanceof SharedArrayBuffer ||
39+
value instanceof Uint8Array
3940
) {
4041
return value.byteLength;
41-
} else if (Buffer.isBuffer(value)) {
42-
return value.length;
4342
} else {
4443
assertUnreachable(value);
4544
}
@@ -76,7 +75,10 @@ export interface ProcessMessageHandler<S, CP, CS, V> {
7675
args: unknown[],
7776
) => Promise<unknown>;
7877
onSubscribe?: (eventName: string, conn: Conn<S, CP, CS, V>) => Promise<void>;
79-
onUnsubscribe?: (eventName: string, conn: Conn<S, CP, CS, V>) => Promise<void>;
78+
onUnsubscribe?: (
79+
eventName: string,
80+
conn: Conn<S, CP, CS, V>,
81+
) => Promise<void>;
8082
}
8183

8284
export async function processMessage<S, CP, CS, V>(
@@ -101,19 +103,23 @@ export async function processMessage<S, CP, CS, V>(
101103
rpcId = id;
102104
rpcName = name;
103105

104-
logger().debug("processing RPC request", { id, name, argsCount: args.length });
105-
106+
logger().debug("processing RPC request", {
107+
id,
108+
name,
109+
argsCount: args.length,
110+
});
111+
106112
const ctx = new ActionContext<S, CP, CS, V>(actor.actorContext, conn);
107-
113+
108114
// Process the RPC request and wait for the result
109115
// This will wait for async actions to complete
110116
const output = await handler.onExecuteRpc(ctx, name, args);
111-
112-
logger().debug("sending RPC response", {
113-
id,
114-
name,
115-
outputType: typeof output,
116-
isPromise: output instanceof Promise
117+
118+
logger().debug("sending RPC response", {
119+
id,
120+
name,
121+
outputType: typeof output,
122+
isPromise: output instanceof Promise,
117123
});
118124

119125
// Send the response back to the client
@@ -127,7 +133,7 @@ export async function processMessage<S, CP, CS, V>(
127133
},
128134
}),
129135
);
130-
136+
131137
logger().debug("RPC response sent", { id, name });
132138
} else if ("sr" in message.b) {
133139
// Subscription request
@@ -140,15 +146,21 @@ export async function processMessage<S, CP, CS, V>(
140146
}
141147

142148
const { e: eventName, s: subscribe } = message.b.sr;
143-
logger().debug("processing subscription request", { eventName, subscribe });
149+
logger().debug("processing subscription request", {
150+
eventName,
151+
subscribe,
152+
});
144153

145154
if (subscribe) {
146155
await handler.onSubscribe(eventName, conn);
147156
} else {
148157
await handler.onUnsubscribe(eventName, conn);
149158
}
150-
151-
logger().debug("subscription request completed", { eventName, subscribe });
159+
160+
logger().debug("subscription request completed", {
161+
eventName,
162+
subscribe,
163+
});
152164
} else {
153165
assertUnreachable(message.b);
154166
}
@@ -163,7 +175,7 @@ export async function processMessage<S, CP, CS, V>(
163175
rpcId,
164176
rpcName,
165177
code,
166-
message
178+
message,
167179
});
168180

169181
// Build response
@@ -193,7 +205,7 @@ export async function processMessage<S, CP, CS, V>(
193205
}),
194206
);
195207
}
196-
208+
197209
logger().debug("error response sent", { rpcId, rpcName });
198210
}
199211
}

packages/actor-core/src/actor/protocol/serde.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { assertUnreachable } from "../utils";
55
import * as cbor from "cbor-x";
66

77
/** Data that can be deserialized. */
8-
export type InputData = string | Buffer | Blob | ArrayBufferLike;
8+
export type InputData = string | Buffer | Blob | ArrayBufferLike | Uint8Array;
99

1010
/** Data that's been serialized. */
1111
export type OutputData = string | Uint8Array;
@@ -71,9 +71,12 @@ export async function deserialize(data: InputData, encoding: Encoding) {
7171
if (data instanceof Blob) {
7272
const arrayBuffer = await data.arrayBuffer();
7373
return cbor.decode(new Uint8Array(arrayBuffer));
74-
} else if (data instanceof ArrayBuffer) {
75-
return cbor.decode(new Uint8Array(data));
76-
} else if (Buffer.isBuffer(data)) {
74+
} else if (data instanceof Uint8Array) {
75+
return cbor.decode(data);
76+
} else if (
77+
data instanceof ArrayBuffer ||
78+
data instanceof SharedArrayBuffer
79+
) {
7780
return cbor.decode(new Uint8Array(data));
7881
} else {
7982
logger().warn("received non-binary type for cbor parse");

packages/actor-core/src/actor/router_endpoints.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,20 @@ export async function handleSseConnect(
213213
return streamSSE(c, async (stream) => {
214214
try {
215215
await sseHandler.onOpen(stream);
216+
217+
// Wait for close
218+
const abortResolver = Promise.withResolvers();
216219
c.req.raw.signal.addEventListener("abort", async () => {
217220
try {
221+
abortResolver.resolve(undefined);
218222
await sseHandler.onClose();
219223
} catch (error) {
220224
logger().error("error closing sse connection", { error });
221225
}
222226
});
227+
228+
// Wait until connection aborted
229+
await abortResolver.promise;
223230
} catch (error) {
224231
logger().error("error opening sse connection", { error });
225232
throw error;

packages/actor-core/src/actor/utils.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import * as errors from "./errors";
2+
import { logger } from "./log";
23

34
export function assertUnreachable(x: never): never {
5+
logger().error("unreachable", { value: `${x}`, stack: new Error().stack });
46
throw new errors.Unreachable(x);
57
}
68

@@ -35,7 +37,7 @@ export const throttle = <
3537

3638
export class DeadlineError extends Error {
3739
constructor() {
38-
super("Promise did not complete before deadline.")
40+
super("Promise did not complete before deadline.");
3941
}
4042
}
4143

@@ -49,9 +51,7 @@ export function deadline<T>(promise: Promise<T>, timeout: number): Promise<T> {
4951
return Promise.race<T>([
5052
promise,
5153
new Promise<T>((_, reject) => {
52-
signal.addEventListener("abort", () =>
53-
reject(new DeadlineError()),
54-
);
54+
signal.addEventListener("abort", () => reject(new DeadlineError()));
5555
}),
5656
]).finally(() => {
5757
clearTimeout(timeoutId);

packages/actor-core/src/client/actor_conn.ts

Lines changed: 23 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -152,147 +152,35 @@ export class ActorConnRaw {
152152

153153
logger().debug("action", { name, args });
154154

155-
// Check if we have an active websocket connection
156-
if (this.#transport) {
157-
// If we have an active connection, use the websocket RPC
158-
const rpcId = this.#rpcIdCounter;
159-
this.#rpcIdCounter += 1;
160-
161-
const { promise, resolve, reject } =
162-
Promise.withResolvers<wsToClient.RpcResponseOk>();
163-
this.#rpcInFlight.set(rpcId, { name, resolve, reject });
164-
165-
this.#sendMessage({
166-
b: {
167-
rr: {
168-
i: rpcId,
169-
n: name,
170-
a: args,
171-
},
155+
// If we have an active connection, use the websocket RPC
156+
const rpcId = this.#rpcIdCounter;
157+
this.#rpcIdCounter += 1;
158+
159+
const { promise, resolve, reject } =
160+
Promise.withResolvers<wsToClient.RpcResponseOk>();
161+
this.#rpcInFlight.set(rpcId, { name, resolve, reject });
162+
163+
this.#sendMessage({
164+
b: {
165+
rr: {
166+
i: rpcId,
167+
n: name,
168+
a: args,
172169
},
173-
} satisfies wsToServer.ToServer);
174-
175-
// TODO: Throw error if disconnect is called
176-
177-
const { i: responseId, o: output } = await promise;
178-
if (responseId !== rpcId)
179-
throw new Error(
180-
`Request ID ${rpcId} does not match response ID ${responseId}`,
181-
);
182-
183-
return output as Response;
184-
} else {
185-
// If no websocket connection, use HTTP RPC via manager
186-
try {
187-
// Get the manager endpoint from the endpoint provided
188-
const actorQueryStr = encodeURIComponent(
189-
JSON.stringify(this.actorQuery),
190-
);
191-
192-
const url = `${this.endpoint}/actors/rpc/${name}?query=${actorQueryStr}`;
193-
logger().debug("http rpc: request", {
194-
url,
195-
name,
196-
});
170+
},
171+
} satisfies wsToServer.ToServer);
197172

198-
try {
199-
const response = await fetch(url, {
200-
method: "POST",
201-
headers: {
202-
"Content-Type": "application/json",
203-
},
204-
body: JSON.stringify({
205-
a: args,
206-
}),
207-
});
173+
// TODO: Throw error if disconnect is called
208174

209-
logger().debug("http rpc: response", {
210-
status: response.status,
211-
ok: response.ok,
212-
});
175+
const { i: responseId, o: output } = await promise;
176+
if (responseId !== rpcId)
177+
throw new Error(
178+
`Request ID ${rpcId} does not match response ID ${responseId}`,
179+
);
213180

214-
if (!response.ok) {
215-
try {
216-
const errorData = await response.json();
217-
logger().error("http rpc error response", { errorData });
218-
throw new errors.ActionError(
219-
errorData.c || "RPC_ERROR",
220-
errorData.m || "RPC call failed",
221-
errorData.md,
222-
);
223-
} catch (parseError) {
224-
// If response is not JSON, get it as text and throw generic error
225-
const errorText = await response.text();
226-
logger().error("http rpc: error parsing response", {
227-
errorText,
228-
});
229-
throw new errors.ActionError(
230-
"RPC_ERROR",
231-
`RPC call failed: ${errorText}`,
232-
{},
233-
);
234-
}
235-
}
236-
237-
// Clone response to avoid consuming it
238-
const responseClone = response.clone();
239-
const responseText = await responseClone.text();
240-
241-
// Parse response body
242-
try {
243-
const responseData = JSON.parse(responseText);
244-
return responseData.o as Response;
245-
} catch (parseError) {
246-
logger().error("http rpc: error parsing json", {
247-
parseError,
248-
});
249-
throw new errors.ActionError(
250-
"RPC_ERROR",
251-
`Failed to parse response: ${parseError}`,
252-
{ responseText },
253-
);
254-
}
255-
} catch (fetchError) {
256-
logger().error("http rpc: fetch error", {
257-
error: fetchError,
258-
});
259-
throw new errors.ActionError(
260-
"RPC_ERROR",
261-
`Fetch failed: ${fetchError}`,
262-
{ cause: fetchError },
263-
);
264-
}
265-
} catch (error) {
266-
if (error instanceof errors.ActionError) {
267-
throw error;
268-
}
269-
throw new errors.ActionError(
270-
"RPC_ERROR",
271-
`Failed to execute RPC ${name}: ${error}`,
272-
{ cause: error },
273-
);
274-
}
275-
}
181+
return output as Response;
276182
}
277183

278-
//async #rpcHttp<Args extends Array<unknown> = unknown[], Response = unknown>(name: string, ...args: Args): Promise<Response> {
279-
// const origin = `${resolved.isTls ? "https": "http"}://${resolved.publicHostname}:${resolved.publicPort}`;
280-
// const url = `${origin}/rpc/${encodeURIComponent(name)}`;
281-
// const res = await fetch(url, {
282-
// method: "POST",
283-
// // TODO: Import type from protocol
284-
// body: JSON.stringify({
285-
// args,
286-
// })
287-
// });
288-
// if (!res.ok) {
289-
// throw new Error(`RPC error (${res.statusText}):\n${await res.text()}`);
290-
// }
291-
// // TODO: Import type from protocol
292-
// const resJson: httpRpc.ResponseOk<Response> = await res.json();
293-
// return resJson.output;
294-
//}
295-
296184
/**
297185
* Do not call this directly.
298186
enc

packages/actor-core/src/common/router.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export function loggerMiddleware(logger: Logger) {
1515
await next();
1616

1717
const duration = Date.now() - startTime;
18-
logger.debug("http request", {
18+
logger.info("http request", {
1919
method,
2020
path,
2121
status: c.res.status,

packages/actor-core/src/manager/router.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export function createManagerRouter(
239239
if ("inline" in handler.proxyMode) {
240240
logger().debug("using inline proxy mode for sse connection");
241241
// Use the shared SSE handler
242-
return handleSseConnect(
242+
return await handleSseConnect(
243243
c,
244244
appConfig,
245245
driverConfig,
@@ -416,7 +416,6 @@ export function createManagerRouter(
416416
});
417417

418418
if (appConfig.inspector.enabled) {
419-
logger().debug("setting up inspector routes");
420419
app.route(
421420
"/inspect",
422421
createManagerInspectorRouter(

vitest.base.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ export default {
1010
testTimeout: 15_000,
1111
env: {
1212
// Enable logging
13-
_LOG_LEVEL: "DEBUG"
13+
_LOG_LEVEL: "DEBUG",
14+
_ACTOR_CORE_ERROR_STACK: "1"
1415
}
1516
},
1617
} satisfies ViteUserConfig;

0 commit comments

Comments
 (0)