Skip to content

Commit 027e18e

Browse files
committed
fix(core): fix reporting errors for sse initiation
1 parent 34b4061 commit 027e18e

File tree

1 file changed

+54
-13
lines changed

1 file changed

+54
-13
lines changed

packages/actor-core/src/manager/router.ts

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ import {
1212
createManagerInspectorRouter,
1313
type ManagerInspectorConnHandler,
1414
} from "@/inspector/manager";
15-
import type { UpgradeWebSocket } from "hono/ws";
1615
import { ConnectQuerySchema } from "./protocol/query";
1716
import * as errors from "@/actor/errors";
18-
import type { ActorQuery, ConnectQuery } from "./protocol/query";
17+
import type { ActorQuery } from "./protocol/query";
1918
import { assertUnreachable } from "@/actor/utils";
2019
import invariant from "invariant";
2120
import {
@@ -27,12 +26,11 @@ import {
2726
handleWebSocketConnect,
2827
} from "@/actor/router_endpoints";
2928
import { ManagerDriver } from "./driver";
30-
import { setUncaughtExceptionCaptureCallback } from "process";
3129
import { Encoding, serialize } from "@/actor/protocol/serde";
3230
import { deconstructError } from "@/common/utils";
3331
import { WSContext } from "hono/ws";
3432
import { ToClient } from "@/actor/protocol/message/to-client";
35-
import { upgradeWebSocket } from "hono/deno";
33+
import { streamSSE } from "hono/streaming";
3634

3735
type ProxyMode =
3836
| {
@@ -190,7 +188,6 @@ export function createManagerRouter(
190188
};
191189

192190
// Send the error message to the client
193-
invariant(encoding, "encoding should be defined");
194191
const serialized = serialize(errorMsg, encoding);
195192
ws.send(serialized);
196193

@@ -213,8 +210,11 @@ export function createManagerRouter(
213210

214211
// Proxy SSE connection to actor
215212
app.get("/actors/connect/sse", async (c) => {
216-
logger().debug("sse connection request received");
213+
let encoding: Encoding | undefined;
217214
try {
215+
encoding = getRequestEncoding(c.req);
216+
logger().debug("sse connection request received", { encoding });
217+
218218
const params = ConnectQuerySchema.safeParse({
219219
query: parseQuery(c),
220220
encoding: c.req.query("encoding"),
@@ -264,14 +264,55 @@ export function createManagerRouter(
264264
assertUnreachable(handler.proxyMode);
265265
}
266266
} catch (error) {
267-
logger().error("error setting up sse proxy", { error });
267+
// If we receive an error during setup, we send the error and close the socket immediately
268+
//
269+
// We have to return the error over SSE since SSE clients cannot read vanilla HTTP responses
268270

269-
// Use ProxyError if it's not already an ActorError
270-
if (!(error instanceof errors.ActorError)) {
271-
throw new errors.ProxyError("SSE connection", error);
272-
} else {
273-
throw error;
274-
}
271+
const { code, message, metadata } = deconstructError(error, logger(), {
272+
sseEvent: "setup",
273+
});
274+
275+
return streamSSE(c, async (stream) => {
276+
try {
277+
if (encoding) {
278+
// Serialize and send the connection error
279+
const errorMsg: ToClient = {
280+
b: {
281+
ce: {
282+
c: code,
283+
m: message,
284+
md: metadata,
285+
},
286+
},
287+
};
288+
289+
// Send the error message to the client
290+
const serialized = serialize(errorMsg, encoding);
291+
await stream.writeSSE({
292+
data:
293+
typeof serialized === "string"
294+
? serialized
295+
: Buffer.from(serialized).toString("base64"),
296+
});
297+
} else {
298+
// We don't know the encoding, send an error and close
299+
await stream.writeSSE({
300+
data: code,
301+
event: "error",
302+
});
303+
}
304+
} catch (serializeError) {
305+
logger().error("failed to send error to sse client", {
306+
error: serializeError,
307+
});
308+
await stream.writeSSE({
309+
data: "internal error during error handling",
310+
event: "error",
311+
});
312+
}
313+
314+
// Stream will exit completely once function exits
315+
});
275316
}
276317
});
277318

0 commit comments

Comments
 (0)