diff --git a/packages/actor-core/src/manager/router.ts b/packages/actor-core/src/manager/router.ts index 56554ace4..86e82f801 100644 --- a/packages/actor-core/src/manager/router.ts +++ b/packages/actor-core/src/manager/router.ts @@ -12,10 +12,9 @@ import { createManagerInspectorRouter, type ManagerInspectorConnHandler, } from "@/inspector/manager"; -import type { UpgradeWebSocket } from "hono/ws"; import { ConnectQuerySchema } from "./protocol/query"; import * as errors from "@/actor/errors"; -import type { ActorQuery, ConnectQuery } from "./protocol/query"; +import type { ActorQuery } from "./protocol/query"; import { assertUnreachable } from "@/actor/utils"; import invariant from "invariant"; import { @@ -27,12 +26,11 @@ import { handleWebSocketConnect, } from "@/actor/router_endpoints"; import { ManagerDriver } from "./driver"; -import { setUncaughtExceptionCaptureCallback } from "process"; import { Encoding, serialize } from "@/actor/protocol/serde"; import { deconstructError } from "@/common/utils"; import { WSContext } from "hono/ws"; import { ToClient } from "@/actor/protocol/message/to-client"; -import { upgradeWebSocket } from "hono/deno"; +import { streamSSE } from "hono/streaming"; type ProxyMode = | { @@ -190,7 +188,6 @@ export function createManagerRouter( }; // Send the error message to the client - invariant(encoding, "encoding should be defined"); const serialized = serialize(errorMsg, encoding); ws.send(serialized); @@ -213,8 +210,11 @@ export function createManagerRouter( // Proxy SSE connection to actor app.get("/actors/connect/sse", async (c) => { - logger().debug("sse connection request received"); + let encoding: Encoding | undefined; try { + encoding = getRequestEncoding(c.req); + logger().debug("sse connection request received", { encoding }); + const params = ConnectQuerySchema.safeParse({ query: parseQuery(c), encoding: c.req.query("encoding"), @@ -264,14 +264,55 @@ export function createManagerRouter( assertUnreachable(handler.proxyMode); } } catch (error) { - logger().error("error setting up sse proxy", { error }); + // If we receive an error during setup, we send the error and close the socket immediately + // + // We have to return the error over SSE since SSE clients cannot read vanilla HTTP responses - // Use ProxyError if it's not already an ActorError - if (!(error instanceof errors.ActorError)) { - throw new errors.ProxyError("SSE connection", error); - } else { - throw error; - } + const { code, message, metadata } = deconstructError(error, logger(), { + sseEvent: "setup", + }); + + return streamSSE(c, async (stream) => { + try { + if (encoding) { + // Serialize and send the connection error + const errorMsg: ToClient = { + b: { + ce: { + c: code, + m: message, + md: metadata, + }, + }, + }; + + // Send the error message to the client + const serialized = serialize(errorMsg, encoding); + await stream.writeSSE({ + data: + typeof serialized === "string" + ? serialized + : Buffer.from(serialized).toString("base64"), + }); + } else { + // We don't know the encoding, send an error and close + await stream.writeSSE({ + data: code, + event: "error", + }); + } + } catch (serializeError) { + logger().error("failed to send error to sse client", { + error: serializeError, + }); + await stream.writeSSE({ + data: "internal error during error handling", + event: "error", + }); + } + + // Stream will exit completely once function exits + }); } });