Skip to content

fix(core): fix sse support #954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions packages/actor-core/src/actor/protocol/message/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ function getValueLength(value: InputData): number {
return value.size;
} else if (
value instanceof ArrayBuffer ||
value instanceof SharedArrayBuffer
value instanceof SharedArrayBuffer ||
value instanceof Uint8Array
) {
return value.byteLength;
} else if (Buffer.isBuffer(value)) {
return value.length;
} else {
assertUnreachable(value);
}
Expand Down Expand Up @@ -76,7 +75,10 @@ export interface ProcessMessageHandler<S, CP, CS, V> {
args: unknown[],
) => Promise<unknown>;
onSubscribe?: (eventName: string, conn: Conn<S, CP, CS, V>) => Promise<void>;
onUnsubscribe?: (eventName: string, conn: Conn<S, CP, CS, V>) => Promise<void>;
onUnsubscribe?: (
eventName: string,
conn: Conn<S, CP, CS, V>,
) => Promise<void>;
}

export async function processMessage<S, CP, CS, V>(
Expand All @@ -101,19 +103,23 @@ export async function processMessage<S, CP, CS, V>(
rpcId = id;
rpcName = name;

logger().debug("processing RPC request", { id, name, argsCount: args.length });

logger().debug("processing RPC request", {
id,
name,
argsCount: args.length,
});

const ctx = new ActionContext<S, CP, CS, V>(actor.actorContext, conn);

// Process the RPC request and wait for the result
// This will wait for async actions to complete
const output = await handler.onExecuteRpc(ctx, name, args);
logger().debug("sending RPC response", {
id,
name,
outputType: typeof output,
isPromise: output instanceof Promise

logger().debug("sending RPC response", {
id,
name,
outputType: typeof output,
isPromise: output instanceof Promise,
});

// Send the response back to the client
Expand All @@ -127,7 +133,7 @@ export async function processMessage<S, CP, CS, V>(
},
}),
);

logger().debug("RPC response sent", { id, name });
} else if ("sr" in message.b) {
// Subscription request
Expand All @@ -140,15 +146,21 @@ export async function processMessage<S, CP, CS, V>(
}

const { e: eventName, s: subscribe } = message.b.sr;
logger().debug("processing subscription request", { eventName, subscribe });
logger().debug("processing subscription request", {
eventName,
subscribe,
});

if (subscribe) {
await handler.onSubscribe(eventName, conn);
} else {
await handler.onUnsubscribe(eventName, conn);
}

logger().debug("subscription request completed", { eventName, subscribe });

logger().debug("subscription request completed", {
eventName,
subscribe,
});
} else {
assertUnreachable(message.b);
}
Expand All @@ -163,7 +175,7 @@ export async function processMessage<S, CP, CS, V>(
rpcId,
rpcName,
code,
message
message,
});

// Build response
Expand Down Expand Up @@ -193,7 +205,7 @@ export async function processMessage<S, CP, CS, V>(
}),
);
}

logger().debug("error response sent", { rpcId, rpcName });
}
}
11 changes: 7 additions & 4 deletions packages/actor-core/src/actor/protocol/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { assertUnreachable } from "../utils";
import * as cbor from "cbor-x";

/** Data that can be deserialized. */
export type InputData = string | Buffer | Blob | ArrayBufferLike;
export type InputData = string | Buffer | Blob | ArrayBufferLike | Uint8Array;

/** Data that's been serialized. */
export type OutputData = string | Uint8Array;
Expand Down Expand Up @@ -71,9 +71,12 @@ export async function deserialize(data: InputData, encoding: Encoding) {
if (data instanceof Blob) {
const arrayBuffer = await data.arrayBuffer();
return cbor.decode(new Uint8Array(arrayBuffer));
} else if (data instanceof ArrayBuffer) {
return cbor.decode(new Uint8Array(data));
} else if (Buffer.isBuffer(data)) {
} else if (data instanceof Uint8Array) {
return cbor.decode(data);
} else if (
data instanceof ArrayBuffer ||
data instanceof SharedArrayBuffer
) {
return cbor.decode(new Uint8Array(data));
} else {
logger().warn("received non-binary type for cbor parse");
Expand Down
7 changes: 7 additions & 0 deletions packages/actor-core/src/actor/router_endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,20 @@ export async function handleSseConnect(
return streamSSE(c, async (stream) => {
try {
await sseHandler.onOpen(stream);

// Wait for close
const abortResolver = Promise.withResolvers();
c.req.raw.signal.addEventListener("abort", async () => {
try {
abortResolver.resolve(undefined);
await sseHandler.onClose();
} catch (error) {
logger().error("error closing sse connection", { error });
}
});

// Wait until connection aborted
await abortResolver.promise;
} catch (error) {
logger().error("error opening sse connection", { error });
throw error;
Expand Down
8 changes: 4 additions & 4 deletions packages/actor-core/src/actor/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as errors from "./errors";
import { logger } from "./log";

export function assertUnreachable(x: never): never {
logger().error("unreachable", { value: `${x}`, stack: new Error().stack });
throw new errors.Unreachable(x);
}

Expand Down Expand Up @@ -35,7 +37,7 @@ export const throttle = <

export class DeadlineError extends Error {
constructor() {
super("Promise did not complete before deadline.")
super("Promise did not complete before deadline.");
}
}

Expand All @@ -49,9 +51,7 @@ export function deadline<T>(promise: Promise<T>, timeout: number): Promise<T> {
return Promise.race<T>([
promise,
new Promise<T>((_, reject) => {
signal.addEventListener("abort", () =>
reject(new DeadlineError()),
);
signal.addEventListener("abort", () => reject(new DeadlineError()));
}),
]).finally(() => {
clearTimeout(timeoutId);
Expand Down
158 changes: 23 additions & 135 deletions packages/actor-core/src/client/actor_conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,147 +152,35 @@ export class ActorConnRaw {

logger().debug("action", { name, args });

// Check if we have an active websocket connection
if (this.#transport) {
// If we have an active connection, use the websocket RPC
const rpcId = this.#rpcIdCounter;
this.#rpcIdCounter += 1;

const { promise, resolve, reject } =
Promise.withResolvers<wsToClient.RpcResponseOk>();
this.#rpcInFlight.set(rpcId, { name, resolve, reject });

this.#sendMessage({
b: {
rr: {
i: rpcId,
n: name,
a: args,
},
// If we have an active connection, use the websocket RPC
const rpcId = this.#rpcIdCounter;
this.#rpcIdCounter += 1;

const { promise, resolve, reject } =
Promise.withResolvers<wsToClient.RpcResponseOk>();
this.#rpcInFlight.set(rpcId, { name, resolve, reject });

this.#sendMessage({
b: {
rr: {
i: rpcId,
n: name,
a: args,
},
} satisfies wsToServer.ToServer);

// TODO: Throw error if disconnect is called

const { i: responseId, o: output } = await promise;
if (responseId !== rpcId)
throw new Error(
`Request ID ${rpcId} does not match response ID ${responseId}`,
);

return output as Response;
} else {
// If no websocket connection, use HTTP RPC via manager
try {
// Get the manager endpoint from the endpoint provided
const actorQueryStr = encodeURIComponent(
JSON.stringify(this.actorQuery),
);

const url = `${this.endpoint}/actors/rpc/${name}?query=${actorQueryStr}`;
logger().debug("http rpc: request", {
url,
name,
});
},
} satisfies wsToServer.ToServer);

try {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
a: args,
}),
});
// TODO: Throw error if disconnect is called

logger().debug("http rpc: response", {
status: response.status,
ok: response.ok,
});
const { i: responseId, o: output } = await promise;
if (responseId !== rpcId)
throw new Error(
`Request ID ${rpcId} does not match response ID ${responseId}`,
);

if (!response.ok) {
try {
const errorData = await response.json();
logger().error("http rpc error response", { errorData });
throw new errors.ActionError(
errorData.c || "RPC_ERROR",
errorData.m || "RPC call failed",
errorData.md,
);
} catch (parseError) {
// If response is not JSON, get it as text and throw generic error
const errorText = await response.text();
logger().error("http rpc: error parsing response", {
errorText,
});
throw new errors.ActionError(
"RPC_ERROR",
`RPC call failed: ${errorText}`,
{},
);
}
}

// Clone response to avoid consuming it
const responseClone = response.clone();
const responseText = await responseClone.text();

// Parse response body
try {
const responseData = JSON.parse(responseText);
return responseData.o as Response;
} catch (parseError) {
logger().error("http rpc: error parsing json", {
parseError,
});
throw new errors.ActionError(
"RPC_ERROR",
`Failed to parse response: ${parseError}`,
{ responseText },
);
}
} catch (fetchError) {
logger().error("http rpc: fetch error", {
error: fetchError,
});
throw new errors.ActionError(
"RPC_ERROR",
`Fetch failed: ${fetchError}`,
{ cause: fetchError },
);
}
} catch (error) {
if (error instanceof errors.ActionError) {
throw error;
}
throw new errors.ActionError(
"RPC_ERROR",
`Failed to execute RPC ${name}: ${error}`,
{ cause: error },
);
}
}
return output as Response;
}

//async #rpcHttp<Args extends Array<unknown> = unknown[], Response = unknown>(name: string, ...args: Args): Promise<Response> {
// const origin = `${resolved.isTls ? "https": "http"}://${resolved.publicHostname}:${resolved.publicPort}`;
// const url = `${origin}/rpc/${encodeURIComponent(name)}`;
// const res = await fetch(url, {
// method: "POST",
// // TODO: Import type from protocol
// body: JSON.stringify({
// args,
// })
// });
// if (!res.ok) {
// throw new Error(`RPC error (${res.statusText}):\n${await res.text()}`);
// }
// // TODO: Import type from protocol
// const resJson: httpRpc.ResponseOk<Response> = await res.json();
// return resJson.output;
//}

/**
* Do not call this directly.
enc
Expand Down
2 changes: 1 addition & 1 deletion packages/actor-core/src/common/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export function loggerMiddleware(logger: Logger) {
await next();

const duration = Date.now() - startTime;
logger.debug("http request", {
logger.info("http request", {
method,
path,
status: c.res.status,
Expand Down
3 changes: 1 addition & 2 deletions packages/actor-core/src/manager/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export function createManagerRouter(
if ("inline" in handler.proxyMode) {
logger().debug("using inline proxy mode for sse connection");
// Use the shared SSE handler
return handleSseConnect(
return await handleSseConnect(
c,
appConfig,
driverConfig,
Expand Down Expand Up @@ -416,7 +416,6 @@ export function createManagerRouter(
});

if (appConfig.inspector.enabled) {
logger().debug("setting up inspector routes");
app.route(
"/inspect",
createManagerInspectorRouter(
Expand Down
3 changes: 2 additions & 1 deletion vitest.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export default {
testTimeout: 15_000,
env: {
// Enable logging
_LOG_LEVEL: "DEBUG"
_LOG_LEVEL: "DEBUG",
_ACTOR_CORE_ERROR_STACK: "1"
}
},
} satisfies ViteUserConfig;
Loading
Loading