diff --git a/packages/actor-core/src/actor/protocol/message/mod.ts b/packages/actor-core/src/actor/protocol/message/mod.ts index da1f23a64..00a6e5ca2 100644 --- a/packages/actor-core/src/actor/protocol/message/mod.ts +++ b/packages/actor-core/src/actor/protocol/message/mod.ts @@ -35,11 +35,10 @@ function getValueLength(value: InputData): number { return value.size; } else if ( value instanceof ArrayBuffer || - value instanceof SharedArrayBuffer + value instanceof SharedArrayBuffer || + value instanceof Uint8Array ) { return value.byteLength; - } else if (Buffer.isBuffer(value)) { - return value.length; } else { assertUnreachable(value); } @@ -76,7 +75,10 @@ export interface ProcessMessageHandler { args: unknown[], ) => Promise; onSubscribe?: (eventName: string, conn: Conn) => Promise; - onUnsubscribe?: (eventName: string, conn: Conn) => Promise; + onUnsubscribe?: ( + eventName: string, + conn: Conn, + ) => Promise; } export async function processMessage( @@ -101,19 +103,23 @@ export async function processMessage( rpcId = id; rpcName = name; - logger().debug("processing RPC request", { id, name, argsCount: args.length }); - + logger().debug("processing RPC request", { + id, + name, + argsCount: args.length, + }); + const ctx = new ActionContext(actor.actorContext, conn); - + // Process the RPC request and wait for the result // This will wait for async actions to complete const output = await handler.onExecuteRpc(ctx, name, args); - - logger().debug("sending RPC response", { - id, - name, - outputType: typeof output, - isPromise: output instanceof Promise + + logger().debug("sending RPC response", { + id, + name, + outputType: typeof output, + isPromise: output instanceof Promise, }); // Send the response back to the client @@ -127,7 +133,7 @@ export async function processMessage( }, }), ); - + logger().debug("RPC response sent", { id, name }); } else if ("sr" in message.b) { // Subscription request @@ -140,15 +146,21 @@ export async function processMessage( } const { e: eventName, s: subscribe } = message.b.sr; - logger().debug("processing subscription request", { eventName, subscribe }); + logger().debug("processing subscription request", { + eventName, + subscribe, + }); if (subscribe) { await handler.onSubscribe(eventName, conn); } else { await handler.onUnsubscribe(eventName, conn); } - - logger().debug("subscription request completed", { eventName, subscribe }); + + logger().debug("subscription request completed", { + eventName, + subscribe, + }); } else { assertUnreachable(message.b); } @@ -163,7 +175,7 @@ export async function processMessage( rpcId, rpcName, code, - message + message, }); // Build response @@ -193,7 +205,7 @@ export async function processMessage( }), ); } - + logger().debug("error response sent", { rpcId, rpcName }); } } diff --git a/packages/actor-core/src/actor/protocol/serde.ts b/packages/actor-core/src/actor/protocol/serde.ts index 326c8c1bb..8118343ba 100644 --- a/packages/actor-core/src/actor/protocol/serde.ts +++ b/packages/actor-core/src/actor/protocol/serde.ts @@ -5,7 +5,7 @@ import { assertUnreachable } from "../utils"; import * as cbor from "cbor-x"; /** Data that can be deserialized. */ -export type InputData = string | Buffer | Blob | ArrayBufferLike; +export type InputData = string | Buffer | Blob | ArrayBufferLike | Uint8Array; /** Data that's been serialized. */ export type OutputData = string | Uint8Array; @@ -71,9 +71,12 @@ export async function deserialize(data: InputData, encoding: Encoding) { if (data instanceof Blob) { const arrayBuffer = await data.arrayBuffer(); return cbor.decode(new Uint8Array(arrayBuffer)); - } else if (data instanceof ArrayBuffer) { - return cbor.decode(new Uint8Array(data)); - } else if (Buffer.isBuffer(data)) { + } else if (data instanceof Uint8Array) { + return cbor.decode(data); + } else if ( + data instanceof ArrayBuffer || + data instanceof SharedArrayBuffer + ) { return cbor.decode(new Uint8Array(data)); } else { logger().warn("received non-binary type for cbor parse"); diff --git a/packages/actor-core/src/actor/router_endpoints.ts b/packages/actor-core/src/actor/router_endpoints.ts index b3e56b84d..e4d264584 100644 --- a/packages/actor-core/src/actor/router_endpoints.ts +++ b/packages/actor-core/src/actor/router_endpoints.ts @@ -213,13 +213,20 @@ export async function handleSseConnect( return streamSSE(c, async (stream) => { try { await sseHandler.onOpen(stream); + + // Wait for close + const abortResolver = Promise.withResolvers(); c.req.raw.signal.addEventListener("abort", async () => { try { + abortResolver.resolve(undefined); await sseHandler.onClose(); } catch (error) { logger().error("error closing sse connection", { error }); } }); + + // Wait until connection aborted + await abortResolver.promise; } catch (error) { logger().error("error opening sse connection", { error }); throw error; diff --git a/packages/actor-core/src/actor/utils.ts b/packages/actor-core/src/actor/utils.ts index 7464a8bea..dc8b9315e 100644 --- a/packages/actor-core/src/actor/utils.ts +++ b/packages/actor-core/src/actor/utils.ts @@ -1,6 +1,8 @@ import * as errors from "./errors"; +import { logger } from "./log"; export function assertUnreachable(x: never): never { + logger().error("unreachable", { value: `${x}`, stack: new Error().stack }); throw new errors.Unreachable(x); } @@ -35,7 +37,7 @@ export const throttle = < export class DeadlineError extends Error { constructor() { - super("Promise did not complete before deadline.") + super("Promise did not complete before deadline."); } } @@ -49,9 +51,7 @@ export function deadline(promise: Promise, timeout: number): Promise { return Promise.race([ promise, new Promise((_, reject) => { - signal.addEventListener("abort", () => - reject(new DeadlineError()), - ); + signal.addEventListener("abort", () => reject(new DeadlineError())); }), ]).finally(() => { clearTimeout(timeoutId); diff --git a/packages/actor-core/src/client/actor_conn.ts b/packages/actor-core/src/client/actor_conn.ts index 2987eb7ea..bbdec0c50 100644 --- a/packages/actor-core/src/client/actor_conn.ts +++ b/packages/actor-core/src/client/actor_conn.ts @@ -152,147 +152,35 @@ export class ActorConnRaw { logger().debug("action", { name, args }); - // Check if we have an active websocket connection - if (this.#transport) { - // If we have an active connection, use the websocket RPC - const rpcId = this.#rpcIdCounter; - this.#rpcIdCounter += 1; - - const { promise, resolve, reject } = - Promise.withResolvers(); - this.#rpcInFlight.set(rpcId, { name, resolve, reject }); - - this.#sendMessage({ - b: { - rr: { - i: rpcId, - n: name, - a: args, - }, + // If we have an active connection, use the websocket RPC + const rpcId = this.#rpcIdCounter; + this.#rpcIdCounter += 1; + + const { promise, resolve, reject } = + Promise.withResolvers(); + this.#rpcInFlight.set(rpcId, { name, resolve, reject }); + + this.#sendMessage({ + b: { + rr: { + i: rpcId, + n: name, + a: args, }, - } satisfies wsToServer.ToServer); - - // TODO: Throw error if disconnect is called - - const { i: responseId, o: output } = await promise; - if (responseId !== rpcId) - throw new Error( - `Request ID ${rpcId} does not match response ID ${responseId}`, - ); - - return output as Response; - } else { - // If no websocket connection, use HTTP RPC via manager - try { - // Get the manager endpoint from the endpoint provided - const actorQueryStr = encodeURIComponent( - JSON.stringify(this.actorQuery), - ); - - const url = `${this.endpoint}/actors/rpc/${name}?query=${actorQueryStr}`; - logger().debug("http rpc: request", { - url, - name, - }); + }, + } satisfies wsToServer.ToServer); - try { - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - a: args, - }), - }); + // TODO: Throw error if disconnect is called - logger().debug("http rpc: response", { - status: response.status, - ok: response.ok, - }); + const { i: responseId, o: output } = await promise; + if (responseId !== rpcId) + throw new Error( + `Request ID ${rpcId} does not match response ID ${responseId}`, + ); - if (!response.ok) { - try { - const errorData = await response.json(); - logger().error("http rpc error response", { errorData }); - throw new errors.ActionError( - errorData.c || "RPC_ERROR", - errorData.m || "RPC call failed", - errorData.md, - ); - } catch (parseError) { - // If response is not JSON, get it as text and throw generic error - const errorText = await response.text(); - logger().error("http rpc: error parsing response", { - errorText, - }); - throw new errors.ActionError( - "RPC_ERROR", - `RPC call failed: ${errorText}`, - {}, - ); - } - } - - // Clone response to avoid consuming it - const responseClone = response.clone(); - const responseText = await responseClone.text(); - - // Parse response body - try { - const responseData = JSON.parse(responseText); - return responseData.o as Response; - } catch (parseError) { - logger().error("http rpc: error parsing json", { - parseError, - }); - throw new errors.ActionError( - "RPC_ERROR", - `Failed to parse response: ${parseError}`, - { responseText }, - ); - } - } catch (fetchError) { - logger().error("http rpc: fetch error", { - error: fetchError, - }); - throw new errors.ActionError( - "RPC_ERROR", - `Fetch failed: ${fetchError}`, - { cause: fetchError }, - ); - } - } catch (error) { - if (error instanceof errors.ActionError) { - throw error; - } - throw new errors.ActionError( - "RPC_ERROR", - `Failed to execute RPC ${name}: ${error}`, - { cause: error }, - ); - } - } + return output as Response; } - //async #rpcHttp = unknown[], Response = unknown>(name: string, ...args: Args): Promise { - // const origin = `${resolved.isTls ? "https": "http"}://${resolved.publicHostname}:${resolved.publicPort}`; - // const url = `${origin}/rpc/${encodeURIComponent(name)}`; - // const res = await fetch(url, { - // method: "POST", - // // TODO: Import type from protocol - // body: JSON.stringify({ - // args, - // }) - // }); - // if (!res.ok) { - // throw new Error(`RPC error (${res.statusText}):\n${await res.text()}`); - // } - // // TODO: Import type from protocol - // const resJson: httpRpc.ResponseOk = await res.json(); - // return resJson.output; - //} - /** * Do not call this directly. enc diff --git a/packages/actor-core/src/common/router.ts b/packages/actor-core/src/common/router.ts index 3283af224..55247e7a3 100644 --- a/packages/actor-core/src/common/router.ts +++ b/packages/actor-core/src/common/router.ts @@ -15,7 +15,7 @@ export function loggerMiddleware(logger: Logger) { await next(); const duration = Date.now() - startTime; - logger.debug("http request", { + logger.info("http request", { method, path, status: c.res.status, diff --git a/packages/actor-core/src/manager/router.ts b/packages/actor-core/src/manager/router.ts index b63f49425..56554ace4 100644 --- a/packages/actor-core/src/manager/router.ts +++ b/packages/actor-core/src/manager/router.ts @@ -239,7 +239,7 @@ export function createManagerRouter( if ("inline" in handler.proxyMode) { logger().debug("using inline proxy mode for sse connection"); // Use the shared SSE handler - return handleSseConnect( + return await handleSseConnect( c, appConfig, driverConfig, @@ -416,7 +416,6 @@ export function createManagerRouter( }); if (appConfig.inspector.enabled) { - logger().debug("setting up inspector routes"); app.route( "/inspect", createManagerInspectorRouter( diff --git a/vitest.base.ts b/vitest.base.ts index 2fa6f5370..c419adc59 100644 --- a/vitest.base.ts +++ b/vitest.base.ts @@ -10,7 +10,8 @@ export default { testTimeout: 15_000, env: { // Enable logging - _LOG_LEVEL: "DEBUG" + _LOG_LEVEL: "DEBUG", + _ACTOR_CORE_ERROR_STACK: "1" } }, } satisfies ViteUserConfig; diff --git a/yarn.lock b/yarn.lock index 8cef3daf2..c844c7504 100644 --- a/yarn.lock +++ b/yarn.lock @@ -218,7 +218,6 @@ __metadata: dependencies: "@actor-core/driver-test-suite": "workspace:*" "@rivet-gg/actor-core": "npm:^25.1.0" - "@rivet-gg/api": "npm:^25.4.2" "@types/deno": "npm:^2.0.0" "@types/invariant": "npm:^2" "@types/node": "npm:^22.13.1" @@ -2379,20 +2378,6 @@ __metadata: languageName: node linkType: hard -"@rivet-gg/api@npm:^25.4.2": - version: 25.4.2 - resolution: "@rivet-gg/api@npm:25.4.2" - dependencies: - form-data: "npm:^4.0.0" - js-base64: "npm:^3.7.5" - node-fetch: "npm:2" - qs: "npm:^6.11.2" - readable-stream: "npm:^4.5.2" - url-join: "npm:^5.0.0" - checksum: 10c0/eb6a25b1468b9cd8f9b548fa7cdec948d8bcc21bc1274b06507b1b519cbba739cc828974a0917ebee9ab18c92ba7fe228d8ac596b3e71c5efaf4f4f8ed12c8f1 - languageName: node - linkType: hard - "@rollup/rollup-android-arm-eabi@npm:4.39.0": version: 4.39.0 resolution: "@rollup/rollup-android-arm-eabi@npm:4.39.0"