Skip to content

fix(core): fix reporting errors for sse initiation #955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 54 additions & 13 deletions packages/actor-core/src/manager/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import {
createManagerInspectorRouter,
type ManagerInspectorConnHandler,
} from "@/inspector/manager";
import type { UpgradeWebSocket } from "hono/ws";
import { ConnectQuerySchema } from "./protocol/query";
import * as errors from "@/actor/errors";
import type { ActorQuery, ConnectQuery } from "./protocol/query";
import type { ActorQuery } from "./protocol/query";
import { assertUnreachable } from "@/actor/utils";
import invariant from "invariant";
import {
Expand All @@ -27,12 +26,11 @@ import {
handleWebSocketConnect,
} from "@/actor/router_endpoints";
import { ManagerDriver } from "./driver";
import { setUncaughtExceptionCaptureCallback } from "process";
import { Encoding, serialize } from "@/actor/protocol/serde";
import { deconstructError } from "@/common/utils";
import { WSContext } from "hono/ws";
import { ToClient } from "@/actor/protocol/message/to-client";
import { upgradeWebSocket } from "hono/deno";
import { streamSSE } from "hono/streaming";

type ProxyMode =
| {
Expand Down Expand Up @@ -190,7 +188,6 @@ export function createManagerRouter(
};

// Send the error message to the client
invariant(encoding, "encoding should be defined");
const serialized = serialize(errorMsg, encoding);
ws.send(serialized);

Expand All @@ -213,8 +210,11 @@ export function createManagerRouter(

// Proxy SSE connection to actor
app.get("/actors/connect/sse", async (c) => {
logger().debug("sse connection request received");
let encoding: Encoding | undefined;
try {
encoding = getRequestEncoding(c.req);
logger().debug("sse connection request received", { encoding });

const params = ConnectQuerySchema.safeParse({
query: parseQuery(c),
encoding: c.req.query("encoding"),
Expand Down Expand Up @@ -264,14 +264,55 @@ export function createManagerRouter(
assertUnreachable(handler.proxyMode);
}
} catch (error) {
logger().error("error setting up sse proxy", { error });
// If we receive an error during setup, we send the error and close the socket immediately
//
// We have to return the error over SSE since SSE clients cannot read vanilla HTTP responses

// Use ProxyError if it's not already an ActorError
if (!(error instanceof errors.ActorError)) {
throw new errors.ProxyError("SSE connection", error);
} else {
throw error;
}
const { code, message, metadata } = deconstructError(error, logger(), {
sseEvent: "setup",
});

return streamSSE(c, async (stream) => {
try {
if (encoding) {
// Serialize and send the connection error
const errorMsg: ToClient = {
b: {
ce: {
c: code,
m: message,
md: metadata,
},
},
};

// Send the error message to the client
const serialized = serialize(errorMsg, encoding);
await stream.writeSSE({
data:
typeof serialized === "string"
? serialized
: Buffer.from(serialized).toString("base64"),
});
} else {
// We don't know the encoding, send an error and close
await stream.writeSSE({
data: code,
event: "error",
});
}
} catch (serializeError) {
logger().error("failed to send error to sse client", {
error: serializeError,
});
await stream.writeSSE({
data: "internal error during error handling",
event: "error",
});
}

// Stream will exit completely once function exits
});
}
});

Expand Down
Loading